Logotipo de Zephyrnet

Generación dinámica de DAG con YAML y DAG Factory en Amazon MWAA | Servicios web de Amazon

Fecha:

Flujo de trabajo administrado por Amazon para Apache Airflow (Amazon MWAA) es un servicio administrado que le permite utilizar una versión familiar Flujo de aire Apache entorno con escalabilidad, disponibilidad y seguridad mejoradas para mejorar y escalar los flujos de trabajo de su negocio sin la carga operativa de administrar la infraestructura subyacente. En flujo de aire, Gráficos acíclicos dirigidos (DAG) se definen como código Python. DAG dinámicos se refiere a la capacidad de generar DAG sobre la marcha durante el tiempo de ejecución, generalmente en función de algunas condiciones, configuraciones o parámetros externos. Los DAG dinámicos le ayudan a crear, programar y ejecutar tareas dentro de un DAG en función de datos y configuraciones que pueden cambiar con el tiempo.

Hay varias formas de introducir dinamismo en los DAG de Airflow (generación dinámica de DAG) utilizando variables de entorno y archivos externos. Uno de los enfoques es utilizar el Fábrica DAG Método de archivo de configuración basado en YAML. Esta biblioteca tiene como objetivo facilitar la creación y configuración de nuevos DAG mediante el uso de parámetros declarativos en YAML. Permite personalizaciones predeterminadas y es de código abierto, lo que simplifica la creación y personalización de nuevas funcionalidades.

En esta publicación, exploramos el proceso de creación de DAG dinámicos con archivos YAML, utilizando el Fábrica DAG biblioteca. Los DAG dinámicos ofrecen varios beneficios:

  1. Reutilización de código mejorada – Al estructurar DAG a través de archivos YAML, promovemos componentes reutilizables, lo que reduce la redundancia en las definiciones de su flujo de trabajo.
  2. Mantenimiento simplificado – La generación de DAG basada en YAML simplifica el proceso de modificación y actualización de los flujos de trabajo, lo que garantiza procedimientos de mantenimiento más fluidos.
  3. Parametrización flexible – Con YAML, puede parametrizar las configuraciones de DAG, lo que facilita ajustes dinámicos a los flujos de trabajo en función de los distintos requisitos.
  4. Eficiencia mejorada del programador – Los DAG dinámicos permiten una programación más eficiente, optimizando la asignación de recursos y mejorando las ejecuciones generales del flujo de trabajo.
  5. Escalabilidad mejorada – Los DAG basados ​​en YAML permiten ejecuciones paralelas, lo que permite flujos de trabajo escalables capaces de manejar mayores cargas de trabajo de manera eficiente.

Al aprovechar el poder de los archivos YAML y la biblioteca DAG Factory, desarrollamos un enfoque versátil para crear y administrar DAG, lo que le permite crear canales de datos sólidos, escalables y mantenibles.

Resumen de la solución

En esta publicación, usaremos un archivo DAG de ejemplo diseñado para procesar un conjunto de datos de COVID-19. El proceso de flujo de trabajo implica procesar un conjunto de datos de código abierto ofrecido por OMS-COVID-19-Global. Después de instalar el Fábrica DAG Paquete Python, creamos un archivo YAML que tiene definiciones de varias tareas. Procesamos el recuento de muertes específico del país pasando Country como variable, lo que crea GCI individuales basados ​​en países.

El siguiente diagrama ilustra la solución general junto con los flujos de datos dentro de bloques lógicos.

Descripción general de la solución

Requisitos previos

Para este tutorial, debe tener los siguientes requisitos previos:

Además, complete los siguientes pasos (ejecute la instalación en un Región de AWS donde Amazon MWAA está disponible):

  1. Crear una Entorno Amazon MWAA (si aún no tienes uno). Si es la primera vez que utiliza Amazon MWAA, consulte Presentamos los flujos de trabajo administrados de Amazon para Apache Airflow (MWAA).

Asegúrese de que el Gestión de identidades y accesos de AWS El usuario o rol (IAM) utilizado para configurar el entorno tiene políticas de IAM adjuntas para los siguientes permisos:

Las políticas de acceso mencionadas aquí son solo para el ejemplo de esta publicación. En un entorno de producción, proporcione solo los permisos granulares necesarios ejerciendo principios de mínimo privilegio.

  1. Cree un nombre de depósito de Amazon S3 único (dentro de una cuenta) mientras crea su entorno de Amazon MWAA y cree carpetas llamadas dags y requirements.
    Bucket de Amazon S3
  2. Crea y sube un requirements.txt archivo con el siguiente contenido al requirements carpeta. Reemplazar {environment-version} con el número de versión de su entorno, y {Python-version} con la versión de Python que sea compatible con su entorno:
    --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-{Airflow-version}/constraints-{Python-version}.txt"
    dag-factory==0.19.0
    pandas==2.1.4

Pandas es necesario solo para el caso de uso de ejemplo descrito en esta publicación, y dag-factory es el único complemento requerido. Se recomienda comprobar la compatibilidad de la última versión de dag-factory con Amazon MWAA. El boto y psycopg2-binary Las bibliotecas se incluyen con la instalación básica de Apache Airflow v2 y no es necesario especificarlas en su requirements.txt archivo.

  1. Descargue nuestra OMS-COVID-19-archivo de datos globales a su máquina local y cárguelo en el dags prefijo de su depósito S3.

Asegúrese de apuntar a la última versión del depósito AWS S3 de su requirements.txt archivo para que se realice la instalación del paquete adicional. Normalmente, esto debería tardar entre 15 y 20 minutos, según la configuración de su entorno.

Validar los DAG

Cuando su entorno Amazon MWAA se muestra como Disponible en la consola de Amazon MWAA, navegue hasta la interfaz de usuario de Airflow eligiendo Abrir la interfaz de usuario de flujo de aire junto a tu entorno.

Validar el DAG

Verifique los DAG existentes navegando a la pestaña DAG.

Verificar el DAG

Configura tus DAG

Complete los siguientes pasos:

  1. Crea archivos vacíos llamados dynamic_dags.yml, example_dag_factory.py y process_s3_data.py en su máquina local
  2. Editar el process_s3_data.py archivo y guárdelo con el siguiente contenido del código, luego vuelva a cargar el archivo en el depósito de Amazon S3 dags carpeta. Estamos haciendo un procesamiento de datos básico en el código:
    1. Leer el archivo desde una ubicación de Amazon S3
    2. Renombrar el Country_code columna según corresponda al país.
    3. Filtrar datos por el país dado.
    4. Escriba los datos finales procesados ​​en formato CSV y cárguelos nuevamente al prefijo S3.
import boto3
import pandas as pd
import io
   
def process_s3_data(COUNTRY):
### Top level Variables replace S3_BUCKET with your bucket name ###
    s3 = boto3.client('s3')
    S3_BUCKET = "my-mwaa-assets-bucket-sfj33ddkm"
    INPUT_KEY = "dags/WHO-COVID-19-global-data.csv"
    OUTPUT_KEY = "dags/count_death"
### get csv file ###
   response = s3.get_object(Bucket=S3_BUCKET, Key=INPUT_KEY)
   status = response['ResponseMetadata']['HTTPStatusCode']
   if status == 200:
### read csv file and filter based on the country to write back ###
       df = pd.read_csv(response.get("Body"))
       df.rename(columns={"Country_code": "country"}, inplace=True)
       filtered_df = df[df['country'] == COUNTRY]
       with io.StringIO() as csv_buffer:
                   filtered_df.to_csv(csv_buffer, index=False)
                   response = s3.put_object(
                       Bucket=S3_BUCKET, Key=OUTPUT_KEY + '_' + COUNTRY + '.csv', Body=csv_buffer.getvalue()
                   )
       status = response['ResponseMetadata']['HTTPStatusCode']
       if status == 200:
           print(f"Successful S3 put_object response. Status - {status}")
       else:
           print(f"Unsuccessful S3 put_object response. Status - {status}")
   else:
       print(f"Unsuccessful S3 get_object response. Status - {status}")

  1. Editar el dynamic_dags.yml y guárdelo con el siguiente contenido del código, luego cargue el archivo nuevamente en el dags carpeta. Estamos uniendo varios DAG según el país de la siguiente manera:
    1. Defina los argumentos predeterminados que se pasan a todos los DAG.
    2. Cree una definición de DAG para países individuales pasando op_args
    3. Mapear el process_s3_data funcionar con python_callable_name.
    4. Uso Operador Python para procesar datos de archivos csv almacenados en el depósito de Amazon S3.
    5. hemos establecido schedule_interval como 10 minutos, pero siéntase libre de ajustar este valor según sea necesario.
default:
  default_args:
    owner: "airflow"
    start_date: "2024-03-01"
    retries: 1
    retry_delay_sec: 300
  concurrency: 1
  max_active_runs: 1
  dagrun_timeout_sec: 600
  default_view: "tree"
  orientation: "LR"
  schedule_interval: "*/10 * * * *"
 
module3_dynamic_dag_Australia:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Australia"
 
module3_dynamic_dag_Brazil:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Brazil"
 
module3_dynamic_dag_India:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "India"
 
module3_dynamic_dag_Japan:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Japan"
 
module3_dynamic_dag_Mexico:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Mexico"
 
module3_dynamic_dag_Russia:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Russia"
 
module3_dynamic_dag_Spain:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Spain"

  1. Editar el archivo example_dag_factory.py y guárdelo con el siguiente contenido del código, luego cargue el archivo nuevamente en dags carpeta. El código limpia los DAG existentes y genera clean_dags() método y la creación de nuevos DAG utilizando el generate_dags() método del DagFactory ejemplo.
from airflow import DAG
import dagfactory
  
config_file = "/usr/local/airflow/dags/dynamic_dags.yml"
example_dag_factory = dagfactory.DagFactory(config_file)
  
## to clean up or delete any existing DAGs ##
example_dag_factory.clean_dags(globals())
## generate and create new DAGs ##
example_dag_factory.generate_dags(globals())

  1. Después de cargar los archivos, regrese a la consola de Airflow UI y navegue hasta la pestaña DAG, donde encontrará nuevos DAG.
    Listar los nuevos DAG
  2. Una vez que cargue los archivos, regrese a la consola de Airflow UI y en la pestaña DAG encontrará nuevos DAG que aparecen como se muestra a continuación:DAG

Puede habilitar DAG activándolos y probándolos individualmente. Tras la activación, un archivo CSV adicional llamado count_death_{COUNTRY_CODE}.csv se genera en la carpeta dags.

Limpiar

Es posible que existan costos asociados con el uso de los diversos servicios de AWS que se analizan en esta publicación. Para evitar incurrir en cargos futuros, elimine el entorno de Amazon MWAA después de haber completado las tareas descritas en esta publicación y vacíe y elimine el depósito S3.

Conclusión

En esta publicación de blog demostramos cómo usar el fábrica-dag Biblioteca para crear DAG dinámicos. Los DAG dinámicos se caracterizan por su capacidad de generar resultados con cada análisis del archivo DAG en función de las configuraciones. Considere el uso de DAG dinámicos en los siguientes escenarios:

  • Automatizar la migración de un sistema heredado a Airflow, donde la flexibilidad en la generación de DAG es crucial
  • Situaciones en las que solo cambia un parámetro entre diferentes DAG, lo que agiliza el proceso de gestión del flujo de trabajo
  • Gestionar DAG que dependen de la estructura en evolución de un sistema fuente, proporcionando adaptabilidad a los cambios.
  • Establecer prácticas estandarizadas para los DAG en todo su equipo u organización mediante la creación de estos planos, promoviendo la coherencia y la eficiencia.
  • Adoptar declaraciones basadas en YAML en lugar de codificación Python compleja, simplificando los procesos de configuración y mantenimiento de DAG
  • Crear flujos de trabajo basados ​​en datos que se adapten y evolucionen en función de las entradas de datos, lo que permite una automatización eficiente.

Al incorporar DAG dinámicos en su flujo de trabajo, puede mejorar la automatización, la adaptabilidad y la estandarización y, en última instancia, mejorar la eficiencia y eficacia de la gestión de su canal de datos.

Para obtener más información sobre Amazon MWAA DAG Factory, visite Taller de Amazon MWAA para análisis: DAG Factory. Para obtener detalles adicionales y ejemplos de código en Amazon MWAA, visite el Guía del usuario de Amazon MWAA y del Ejemplos de Amazon MWAA GitHub repositorio.


Acerca de los autores

 Jayesh Shinde es arquitecto sénior de aplicaciones en AWS ProServe India. Se especializa en la creación de diversas soluciones centradas en la nube utilizando prácticas modernas de desarrollo de software como sin servidor, DevOps y análisis.

Harshd Yeola Es arquitecto sénior de la nube en AWS ProServe India y ayuda a los clientes a migrar y modernizar su infraestructura a AWS. Se especializa en la creación de DevSecOps e infraestructura escalable utilizando contenedores, AIOP y herramientas y servicios para desarrolladores de AWS.

punto_img

Información más reciente

punto_img