Logotipo de Zephyrnet

Presentamos la compatibilidad de Amazon MWAA con Apache Airflow versión 2.8.1 | Servicios web de Amazon

Fecha:

Flujos de trabajo administrados por Amazon para Apache Airflow (Amazon MWAA) es un servicio de orquestación administrado para Flujo de aire Apache eso hace que sea sencillo configurar y operar canales de datos de un extremo a otro en la nube.

Las organizaciones utilizan Amazon MWAA para mejorar sus flujos de trabajo comerciales. Por ejemplo, Genómica C2i utiliza Amazon MWAA en su plataforma de datos para orquestar la validación de algoritmos que procesan datos genómicos del cáncer en miles de millones de registros. Twitch, una plataforma de transmisión en vivo, gestiona y organiza la capacitación y la implementación de sus modelos de recomendación para más de 140 millones de usuarios activos. Utilizan Amazon MWAA para escalar, al tiempo que mejoran significativamente la seguridad y reducen los gastos generales de administración de infraestructura.

Hoy anunciamos la disponibilidad de los entornos Apache Airflow versión 2.8.1 en Amazon MWAA. En esta publicación, le explicamos algunas de las nuevas características y capacidades de Airflow ahora disponibles en Amazon MWAA y cómo puede configurar o actualizar su entorno de Amazon MWAA a la versión 2.8.1.

Almacenamiento de objetos

A medida que los canales de datos crecen, los ingenieros luchan por administrar el almacenamiento en múltiples sistemas con API, métodos de autenticación y convenciones únicos para acceder a los datos, lo que requiere lógica personalizada y operadores específicos de almacenamiento. Airflow ahora ofrece una capa de abstracción de almacenamiento de objetos unificada que maneja estos detalles, lo que permite a los ingenieros centrarse en sus canales de datos. Usos del almacenamiento de objetos de flujo de aire fsspec para permitir un código de acceso a datos consistente en diferentes sistemas de almacenamiento de objetos, simplificando así la complejidad de la infraestructura.

Los siguientes son algunos de los beneficios clave de la función:

  • Flujos de trabajo portátiles – Puede cambiar los servicios de almacenamiento con cambios mínimos en sus gráficos acíclicos dirigidos (DAG)
  • Transferencias de datos eficientes – Puede transmitir datos en lugar de cargarlos en la memoria
  • Reducción de Mantenimiento – No necesita operadores separados, lo que hace que el mantenimiento de sus tuberías sea sencillo
  • Experiencia familiar en programación. – Puedes usar módulos de Python, como shutil, para operaciones de archivos

Para utilizar el almacenamiento de objetos con Servicio de almacenamiento simple de Amazon (Amazon S3), es necesario instalar el paquete adicional s3fs con el proveedor de Amazon (apache-airflow-providers-amazon[s3fs]==x.x.x).

En el código de muestra siguiente, puede ver cómo mover datos directamente desde Google Cloud Storage a Amazon S3. Porque el almacenamiento de objetos de Airflow utiliza shutil.copyfileobj, los datos de los objetos se leen en fragmentos de gcs_data_source y transmitido a amazon_s3_data_target.

gcs_data_source = ObjectStoragePath("gcs://source-bucket/prefix/", conn_id="google_cloud_default")

amazon_s3_data_target = ObjectStoragePath("s3://target-bucket/prefix/", conn_id="aws_default ")

with DAG(
    dag_id="copy_from_gcs_to_amazon_s3",
    start_date=datetime(2024, 2, 26),
    schedule="0 0 * * *",
    catchup=False,    
    tags=["2.8", "ObjectStorage"],
) as dag:

    def list_objects(path: ObjectStoragePath) -> list[ObjectStoragePath]:
        objects = [f for f in path.iterdir() if f.is_file()]
        return objects

    def copy_object(path: ObjectStoragePath, object: ObjectStoragePath):    
        object.copy(dst=path)

    objects_list = list_objects(path=gcs_data_source)
    copy_object.partial(path=amazon_s3_data_target).expand(object=objects_list)

Para obtener más información sobre el almacenamiento de objetos de Airflow, consulte Almacenamiento de objetos.

Interfaz de usuario de XCom

XCom (comunicaciones cruzadas) permite el paso de datos entre tareas, facilitando la comunicación y coordinación entre ellas. Anteriormente, los desarrolladores tenían que cambiar a una vista diferente para ver los XCom relacionados con una tarea. Con Airflow 2.8, los valores-clave de XCom se representan directamente en una pestaña dentro de la vista Airflow Grid, como se muestra en la siguiente captura de pantalla.

El nuevo xcom La pestaña proporciona los siguientes beneficios:

  • Visibilidad de XCom mejorada – Una pestaña dedicada en la interfaz de usuario proporciona una forma cómoda y fácil de usar de ver todos los XCom asociados con un DAG o tarea.
  • Depuración mejorada – Poder ver los valores de XCom directamente en la interfaz de usuario es útil para depurar DAG. Puede ver rápidamente el resultado de las tareas anteriores sin necesidad de extraerlas e inspeccionarlas manualmente utilizando el código Python.

Registrador de contexto de tareas

La gestión de los ciclos de vida de las tareas es crucial para el buen funcionamiento de los canales de datos en Airflow. Sin embargo, ciertos desafíos han persistido, particularmente en escenarios donde las tareas se detienen inesperadamente. Esto puede ocurrir debido a varias razones, incluyendo tiempos de espera del programador, zombi tareas (tareas que permanecen en estado de ejecución sin enviar latidos) o instancias en las que el trabajador se queda sin memoria.

Tradicionalmente, este tipo de fallas, particularmente aquellas provocadas por componentes centrales de Airflow como el programador o el ejecutor, no se registraban en los registros de tareas. Esta limitación requería que los usuarios solucionaran problemas fuera de la interfaz de usuario de Airflow, lo que complicaba el proceso de identificación y resolución de problemas.

Airflow 2.8 introdujo una mejora significativa que soluciona este problema. Los componentes de Airflow, incluidos el programador y el ejecutor, ahora pueden usar el nuevo TaskContextLogger para reenviar mensajes de error directamente a los registros de tareas. Esta función le permite ver todos los mensajes de error relevantes relacionados con la ejecución de una tarea en un solo lugar. Esto simplifica el proceso de descubrir por qué falló una tarea y ofrece una perspectiva completa de lo que salió mal en una única vista de registro.

La siguiente captura de pantalla muestra cómo se detecta la tarea como zombiey el registro del programador se incluye como parte del registro de tareas.

Debe establecer el parámetro de configuración del entorno. enable_task_context_logger a True, para habilitar la función. Una vez habilitado, Airflow puede enviar registros desde el programador, el ejecutor o el contexto de ejecución de devolución de llamada a los registros de tareas y ponerlos a disposición en la interfaz de usuario de Airflow.

Ganchos de escucha para conjuntos de datos

Conjuntos de datos se introdujeron en Airflow 2.4 como una agrupación lógica de fuentes de datos para crear una programación basada en datos y dependencias entre DAG. Por ejemplo, puede programar un DAG de consumidor para que se ejecute cuando un DAG de productor actualice un conjunto de datos. Oyentes permitir a los usuarios de Airflow crear suscripciones a ciertos eventos que suceden en el entorno. En Airflow 2.8, se agregan oyentes para dos eventos de conjuntos de datos: on_dataset_creado y on_dataset_changed, permitiendo efectivamente a los usuarios de Airflow escribir código personalizado para reaccionar a las operaciones de administración de conjuntos de datos. Por ejemplo, puede activar un sistema externo o enviar una notificación.

Usar ganchos de escucha para conjuntos de datos es sencillo. Complete los siguientes pasos para crear un oyente para on_dataset_changed:

  1. Crea el oyente (dataset_listener.py):
    from airflow import Dataset
    from airflow.listeners import hookimpl
    
    @hookimpl
    def on_dataset_changed(dataset: Dataset):
        """Following custom code is executed when a dataset is changed."""
        print("Invoking external endpoint")
    
        """Validating a specific dataset"""
        if dataset.uri == "s3://bucket-prefix/object-key.ext":
            print ("Execute specific/different action for this dataset")

  2. Cree un complemento para registrar al oyente en su entorno Airflow (dataset_listener_plugin.py):
    from airflow.plugins_manager import AirflowPlugin
    from plugins import listener_code
    
    class DatasetListenerPlugin(AirflowPlugin):
        name = "dataset_listener_plugin"
        listeners = [dataset_listener]

Para obtener más información sobre cómo instalar complementos en Amazon MWAA, consulte Instalación de complementos personalizados.

Configure un nuevo entorno Airflow 2.8.1 en Amazon MWAA

Puedes iniciar el Configure en su cuenta y región preferida usando el Consola de administración de AWS, API o Interfaz de línea de comandos de AWS (AWS CLI). Si está adoptando infraestructura como código (IaC), puede automatizar la configuración usando Formación en la nube de AWS, la Kit de desarrollo en la nube de AWS (AWS CDK) o scripts de Terraform.

Tras la creación exitosa de un entorno Airflow versión 2.8.1 en Amazon MWAA, ciertos paquetes se instalan automáticamente en los nodos programador y trabajador. Para obtener una lista completa de los paquetes instalados y sus versiones, consulte Paquetes del proveedor Apache Airflow instalados en entornos Amazon MWAA. Puede instalar paquetes adicionales utilizando un archivo de requisitos.

Actualice desde versiones anteriores de Airflow a la versión 2.8.1

Puede aprovechar estas capacidades más recientes actualizando sus entornos anteriores basados ​​en la versión 2.x de Airflow a la versión 2.8.1 mediante actualizaciones de versión locales. Para obtener más información sobre las actualizaciones de versiones locales, consulte Actualización de la versión de Apache Airflow or Presentación de actualizaciones de versiones locales con Amazon MWAA.

Conclusión

En esta publicación, analizamos algunas características importantes introducidas en la versión 2.8 de Airflow, como el almacenamiento de objetos, la nueva pestaña XCom agregada a la vista de cuadrícula, el registro de contexto de tareas, los enlaces de escucha para conjuntos de datos y cómo puede comenzar a usarlos. También proporcionamos código de muestra para mostrar implementaciones en Amazon MWAA. Para obtener la lista completa de cambios, consulte Notas de la versión de Airflow.

Para obtener detalles adicionales y ejemplos de códigos en Amazon MWAA, visite el Guía del usuario de Amazon MWAA y del Ejemplos de repositorio de GitHub de Amazon MWAA.

Apache, Apache Airflow y Airflow son marcas comerciales registradas o marcas comerciales de Apache Software Foundation en los Estados Unidos y / o en otros países.


Acerca de los autores

Mansi Bhutada es un arquitecto de soluciones ISV con sede en los Países Bajos. Ayuda a los clientes a diseñar e implementar soluciones bien diseñadas en AWS que aborden sus problemas comerciales. Le apasiona el análisis de datos y las redes. Más allá del trabajo, le gusta experimentar con la comida, jugar pickleball y sumergirse en divertidos juegos de mesa.

Hernán García es arquitecto senior de soluciones en AWS con sede en los Países Bajos. Trabaja en la industria de servicios financieros, apoyando a las empresas en su adopción de la nube. Le apasionan las tecnologías sin servidor, la seguridad y el cumplimiento. Le gusta pasar tiempo con familiares y amigos y probar nuevos platos de diferentes cocinas.

punto_img

Información más reciente

punto_img