Logotipo de Zephyrnet

Apache Airflow: ¿Cómo obtener datos y correo electrónico de forma dinámica?

Fecha:

Este artículo fue publicado como parte del Blogatón de ciencia de datos.

Introducción

La automatización de trabajos redundantes con herramientas de gestión de flujos de trabajo ahorra una cantidad considerable de tiempo y recursos. Apache Airflow es actualmente el líder del mercado en herramientas de gestión de flujos de trabajo. Airflow es de código abierto y viene preempaquetado con muchos operadores, ganchos, sensores y mucho más, que cubre un conjunto diverso de servicios externos.

Fuente: Foto de Andrew Pons en Unsplash

Airflow es una plataforma desarrollada por la comunidad de python que permite conectar numerosas fuentes de datos para analizar y extraer valores de significado. Airflow mantiene el linaje usando DAG y simplifica los datos/ML trabajos de ingeniería que les permiten diseñar casos de uso en flujos de trabajo automatizados.

Esta publicación tiene como objetivo mostrar cómo extraer datos de fuentes en línea para analizar y enviar correos electrónicos a los equipos para realizar acciones específicas de casos de uso.

Requisitos previos

  • Configuración de flujo de aire y SMTP.
  • python3

Requisito

El caso de uso contempla que nuestros equipos de datos deben obtener los datos de la fuente para analizar y construir modelos alrededor de los datos. Los archivos en la ubicación de origen deben analizarse para determinar si contienen datos relevantes o están vacíos.

Enviaremos los archivos por correo electrónico a nuestros equipos de datos en función de la disponibilidad del contenido del archivo tal como se presenta.

flujo de aire apache

Desarrollo de flujo de trabajo

Leer los archivos de las fuentes puede ser complicado y complejo sin un mecanismo de autenticación confiable y seguro. Con respecto a esta demostración para una audiencia mayor, extraeremos datos estadísticos disponibles públicamente desde una plataforma segura para simplificar la extracción de datos.

Los datos deben almacenarse en algún lugar como referencia antes de ser analizados. Para simplificar y facilitar la implementación, configuremos un destino de archivo para guardar nuestros datos usando la ruta de registro airflow.cfg.

Apache Airflow proporciona una biblioteca de configuración similar a python ConfigParser para leer los valores. Usando esta biblioteca, podemos obtener las claves de valores del elemento.

importar solicitudes desde airflow.configuration import conf files_dir = conf.get("logging","base_log_folder") def get_file(url: str): resp = request.get(url) with open(f"{files_dir}/file_name.csv ","wb") como f: f.write(resp.content)

Con nuestro método get_files en su lugar, el siguiente paso es verificar el contenido del archivo. El enfoque que tomamos es abrir todos los archivos de nuestro directorio de archivos, leer líneas y validar la longitud para garantizar que nuestros archivos contengan datos antes de enviarlos al equipo de datos.

from os import listdir from os.path import isfile, join files = [f for f in listdir(files_dir) if isfile(join(files_dir, f))] def check_all_files(): get_file("https://stats.govt. nz/assets/Uploads/Annual-enterprise-survey/ Annual-enterprise-survey-2021-financial-year-provisional/Download-data/anual-enterprise-survey-2021-financial-year-provisional-size-bands-csv. csv") get_file("https://stats.govt.nz/assets/Uploads/Business-operations-survey/ Business-operations-survey-2021/Download-data/bos2021ModC.csv") get_file("Business-operations- encuesta/Business-operations-survey-2021/ Download-data/bos2021ModC.csv") def is_file_empty(file_name: str): try: with open(file_name, 'r+') as file: data = [] for line in file: data.append(line.replace('n', '')) if len(data) <= 1: return True else: return False excepto: return 'ERROR: FILE_NOT_FOUND'

El método check_all_files llamará iterativamente al método get_files personalizado para descargar conjuntos de datos disponibles públicamente. Tenemos un submétodo definido dentro de check_all_files que abre todos los archivos descargados de forma recursiva para verificar la longitud de los archivos y devuelve el estado del archivo en función de la longitud del archivo.

El siguiente paso es diseñar un mecanismo para activar la alerta de correo electrónico en función del tipo de retorno de nuestro método check_all_files. Para el estado, mantendremos los resultados de las variables globales para capturar el equivalente booleano de nuestro tipo de retorno de llamada al método is_empty_file. Ahora recorreremos los archivos y agregaremos el tipo de devolución a la lista de resultados. Usaremos esta variable de resultados para verificar si nuestros archivos tienen contenido o están vacíos para registrar el estado.

resultados = [] para el archivo en archivos: resultados globales. else: resultado = [] para índice, datos en enumerar (resultados): if data == False: resultado.

Llamaremos a la función anterior usando un PythonOperator.

get_files=PythonOperator( task_id='get_files', python_callable=check_all_files )

Ahora usaremos el estado de retorno de la condición check_all_files y el arquitecto airflow BranchPythonOperator. El método check_for_email espera una instancia de tarea y extraerá los archivos dinámicamente durante el tiempo de ejecución utilizando la clase xcom_pull.

def comprobar_para_email(ti): comprobar_archivos= ti.xcom_pull(task_ids='obtener_archivos',clave='comprobar_archivos') if comprobar_archivos == "SIN ACTUALIZACIONES": devolver 'no_actualizar' elif comprobar_archivos == 'ERROR: ARCHIVOS_NO_ENCONTRADOS': devolver 'archivo_no_encontrado ' más: devuelve 'email_alert'

El operador branch python puede ser una opción excelente para activar operadores en función de las condiciones y omitir el resto. El operador acepta un python_callable que devuelve un task_id, y se hace referencia a este task_id y se trata como el elemento principal en la bifurcación del método.

check_if_file_is_empty = BranchPythonOperator( task_id="get_and_check_file_contents", python_callable=check_for_email, provide_context=True)

Luego de una bifurcación exitosa, ingresaremos en el operador ficticio o PythonOperator a través del operador python de sucursal. A continuación, enviaremos los correos electrónicos en función del contenido real del archivo. Aprovecharemos la instancia de la tarea aquí para obtener los archivos de la memoria. Al agregar los archivos recibidos de xcom_pull con el estado de los archivos, podemos enviar un correo electrónico con una plantilla predefinida como se muestra a continuación.

def email_actual_file(ti, **kwargs): check_all_files() pull_files = ti.xcom_pull(task_ids='get_files') actual_files = [] file_status = [] for (content_file, status) in zip(pull_files, results): if status = = False: actual_files.append(content_file) file_status.append(status) if any(file_status) == False: email = EmailOperator( task_id="email_alert", to='GME@dhr-rgv.com', subject="Alerta de CMS sobre actualización diaria para precios de ASP, paso de peatones de ASP y NOC", html_content='''
Hola, Aquí hay una actualización. Verifique el archivo adjunto''', #dag = context.get("dag"), files=[*actual_files]) email.execute(context=kwargs)

Las funciones están definidas; ahora, agregar todos los operadores en un DAG y cambiarlos para su ejecución es el siguiente paso de nuestro caso de uso.

Dag de flujo de aire de Apache

Como es el mandato y la mejor práctica, definiremos los argumentos predeterminados y declararemos los operadores dentro del dag. El indicador render_template_as_native_obj debe configurarse como True para obtener varios archivos del operador de correo electrónico. El operador de correo electrónico personalizado espera **kwargs como contexto; necesitamos configurar el indicador provide_context para que sea verdadero para aceptar los **kwargs durante el tiempo de ejecución.

de airflow.operators.email_operator importar EmailOperator de airflow.operators.dummy_operator importar DummyOperator de airflow.operators.python importar BranchPythonOperator de airflow.models importar DAG de airflow.operators.python_operator importar PythonOperator de airflow.utils.dates importar days_ago default_agrs={" propietario":"Jay", "email_on_failure": True, "start_date":"2022, 01, 01"} con DAG(dag_id='cms-asp_pricing_pg',default_args=default_agrs, render_template_as_native_obj=True, schedule_interval='@daily' ) como dag: get_files=PythonOperator( task_id='get_files', python_callable=check_all_files ) check_if_file_is_empty = BranchPythonOperator( task_id="get_and_check_file_contents", python_callable=check_for_email, provide_context=True ) no_update = DummyOperator( task_id='o_not_update' = task_id='file_not_found' ) email_alert = PythonOperator(task_id = "email_alert", python_callable=email_actual_file, provide_context=True) get_files >> check_if_file_is _vacío >> [no_update,file_not_found, email_alert]

Salida

flujo de aire apache

Conclusión

Apache Airflow es la herramienta de gestión de flujo de trabajo más popular que existe. Las organizaciones están adoptando el flujo de aire en su pila tecnológica y tratando el flujo de aire como una plataforma crítica e inevitable para orquestar y automatizar los flujos de trabajo con eficiencia.

La publicación nos llevó a un viaje emocionante sobre cómo podemos desarrollar Apache Airflow dag para lograr la extracción de datos y reenviarlos por correo electrónico.

  • Diseñamos un mecanismo para descargar conjuntos de datos disponibles públicamente, analizarlos y validar el contenido del archivo.

  • Hemos ideado un método para aprovechar el operador de python de la rama Airflow para resolver nuestro problema específico del caso de uso.

  • Hemos desarrollado un método para filtrar y enviar por correo electrónico solo los archivos que contienen datos.

  • Al final, combinamos todos los operadores y construimos un DAG, pasamos los métodos de python como invocables y encadenamos las tareas en un DAG funcional.

Los medios que se muestran en este artículo no son propiedad de Analytics Vidhya y se utilizan a discreción del autor.

punto_img

Información más reciente

punto_img

Habla con nosotros!

¡Hola! ¿Le puedo ayudar en algo?