Zephyrnet-Logo

Dynamische DAG-Generierung mit YAML und DAG Factory in Amazon MWAA | Amazon Web Services

Datum:

Von Amazon verwalteter Workflow für Apache Airflow (Amazon MWAA) ist ein verwalteter Dienst, der Ihnen die Nutzung eines vertrauten Produkts ermöglicht Apache-Luftstrom Umgebung mit verbesserter Skalierbarkeit, Verfügbarkeit und Sicherheit, um Ihre Geschäftsabläufe zu verbessern und zu skalieren, ohne die betriebliche Belastung durch die Verwaltung der zugrunde liegenden Infrastruktur. Im Luftstrom, Gerichtete azyklische Graphen (DAGs) werden als Python-Code definiert. Dynamische DAGs beziehen sich auf die Möglichkeit, DAGs im laufenden Betrieb zur Laufzeit zu generieren, typischerweise basierend auf einigen externen Bedingungen, Konfigurationen oder Parametern. Mit dynamischen DAGs können Sie Aufgaben innerhalb eines DAG erstellen, planen und ausführen, basierend auf Daten und Konfigurationen, die sich im Laufe der Zeit ändern können.

Es gibt verschiedene Möglichkeiten, Dynamik in Airflow-DAGs einzuführen (dynamische DAG-Generierung) mithilfe von Umgebungsvariablen und externen Dateien. Einer der Ansätze besteht darin, das zu verwenden DAG-Fabrik YAML-basierte Konfigurationsdateimethode. Diese Bibliothek soll die Erstellung und Konfiguration neuer DAGs durch die Verwendung deklarativer Parameter in YAML erleichtern. Es ermöglicht Standardanpassungen und ist Open Source, sodass neue Funktionen einfach erstellt und angepasst werden können.

In diesem Beitrag untersuchen wir den Prozess der Erstellung dynamischer DAGs mit YAML-Dateien mithilfe von DAG-Fabrik Bibliothek. Dynamische DAGs bieten mehrere Vorteile:

  1. Verbesserte Wiederverwendbarkeit des Codes – Durch die Strukturierung von DAGs über YAML-Dateien fördern wir wiederverwendbare Komponenten und reduzieren so die Redundanz in Ihren Workflow-Definitionen.
  2. Optimierte Wartung – Die YAML-basierte DAG-Generierung vereinfacht den Prozess der Änderung und Aktualisierung von Arbeitsabläufen und sorgt so für reibungslosere Wartungsabläufe.
  3. Flexible Parametrierung – Mit YAML können Sie DAG-Konfigurationen parametrisieren und so dynamische Anpassungen von Arbeitsabläufen basierend auf unterschiedlichen Anforderungen ermöglichen.
  4. Verbesserte Effizienz des Planers – Dynamische DAGs ermöglichen eine effizientere Planung, optimieren die Ressourcenzuteilung und verbessern die gesamten Arbeitsabläufe
  5. Verbesserte Skalierbarkeit – YAML-gesteuerte DAGs ermöglichen parallele Ausführungen und ermöglichen so skalierbare Workflows, mit denen erhöhte Arbeitslasten effizient bewältigt werden können.

Indem wir die Leistungsfähigkeit von YAML-Dateien und der DAG Factory-Bibliothek nutzen, entfesseln wir einen vielseitigen Ansatz zum Erstellen und Verwalten von DAGs und ermöglichen Ihnen die Erstellung robuster, skalierbarer und wartbarer Datenpipelines.

Lösungsübersicht

In diesem Beitrag verwenden wir eine Beispiel-DAG-Datei, die für die Verarbeitung eines COVID-19-Datensatzes konzipiert ist. Der Workflow-Prozess umfasst die Verarbeitung eines Open-Source-Datensatzes, der von angeboten wird WHO-COVID-19-Global. Nachdem wir die installiert haben DAG-Fabrik Mit dem Python-Paket erstellen wir eine YAML-Datei, die Definitionen verschiedener Aufgaben enthält. Die landesspezifischen Sterbefallzahlen verarbeiten wir passiv Country als Variable, die einzelne länderbasierte DAGs erstellt.

Das folgende Diagramm veranschaulicht die Gesamtlösung zusammen mit den Datenflüssen innerhalb logischer Blöcke.

Überblick über die Lösung

Voraussetzungen:

Für diese exemplarische Vorgehensweise sollten Sie die folgenden Voraussetzungen erfüllen:

Führen Sie außerdem die folgenden Schritte aus (führen Sie das Setup in einem aus AWS-Region wo Amazon MWAA verfügbar ist):

  1. Erstellen Sie ein Amazon MWAA-Umgebung (falls Sie noch keins haben). Wenn Sie Amazon MWAA zum ersten Mal verwenden, lesen Sie Einführung von Amazon Managed Workflows für Apache Airflow (MWAA).

Stellen Sie sicher, dass die AWS Identity and Access Management and (IAM)-Benutzer oder -Rolle, die zum Einrichten der Umgebung verwendet werden, verfügen über IAM-Richtlinien für die folgenden Berechtigungen:

Die hier erwähnten Zugriffsrichtlinien dienen nur als Beispiel in diesem Beitrag. Stellen Sie in einer Produktionsumgebung nur die erforderlichen detaillierten Berechtigungen durch Ausübung bereit Prinzipien der geringsten Privilegien.

  1. Erstellen Sie beim Erstellen Ihrer Amazon MWAA-Umgebung einen eindeutigen (innerhalb eines Kontos) Amazon S3-Bucket-Namen und erstellen Sie Ordner mit dem Namen dags und requirements.
    Amazon S3 Eimer
  2. Erstellen und hochladen Sie a requirements.txt Datei mit folgendem Inhalt zum requirements Ordner. Ersetzen {environment-version} mit der Versionsnummer Ihrer Umgebung und {Python-version} mit der Python-Version, die mit Ihrer Umgebung kompatibel ist:
    --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-{Airflow-version}/constraints-{Python-version}.txt"
    dag-factory==0.19.0
    pandas==2.1.4

Pandas wird nur für den in diesem Beitrag beschriebenen Beispielanwendungsfall benötigt dag-factory ist das einzige erforderliche Plug-in. Es wird empfohlen, die Kompatibilität der neuesten Version von zu überprüfen dag-factory mit Amazon MWAA. Der boto und psycopg2-binary Bibliotheken sind in der Basisinstallation von Apache Airflow v2 enthalten und müssen in Ihrer nicht angegeben werden requirements.txt Datei.

  1. Laden Sie die WHO-COVID-19-globale Datendatei auf Ihren lokalen Computer und laden Sie es unter hoch dags Präfix Ihres S3-Buckets.

Stellen Sie sicher, dass Sie auf die neueste AWS S3-Bucket-Version Ihres verweisen requirements.txt Datei, damit die zusätzliche Paketinstallation durchgeführt werden kann. Dies sollte je nach Umgebungskonfiguration normalerweise zwischen 15 und 20 Minuten dauern.

Validieren Sie die DAGs

Wenn Ihre Amazon MWAA-Umgebung als angezeigt wird Verfügbar Navigieren Sie auf der Amazon MWAA-Konsole zur Airflow-Benutzeroberfläche, indem Sie auswählen Öffnen Sie die Airflow-Benutzeroberfläche neben Ihrer Umgebung.

Validieren Sie die DAG

Überprüfen Sie die vorhandenen DAGs, indem Sie zur Registerkarte „DAGs“ navigieren.

Überprüfen Sie die DAG

Konfigurieren Sie Ihre DAGs

Führen Sie die folgenden Schritte aus:

  1. Erstellen Sie leere Dateien mit dem Namen dynamic_dags.yml, example_dag_factory.py und process_s3_data.py auf Ihrem lokalen Computer.
  2. Bearbeiten Sie das process_s3_data.py Datei und speichern Sie sie mit dem folgenden Codeinhalt. Laden Sie die Datei dann wieder in den Amazon S3-Bucket hoch dags Ordner. Wir führen einige grundlegende Datenverarbeitungen im Code durch:
    1. Lesen Sie die Datei von einem Amazon S3-Speicherort
    2. Benennen Sie die Country_code Geben Sie die entsprechende Spalte für das Land ein.
    3. Filtern Sie Daten nach dem angegebenen Land.
    4. Schreiben Sie die verarbeiteten Enddaten in das CSV-Format und laden Sie sie zurück in das S3-Präfix hoch.
import boto3
import pandas as pd
import io
   
def process_s3_data(COUNTRY):
### Top level Variables replace S3_BUCKET with your bucket name ###
    s3 = boto3.client('s3')
    S3_BUCKET = "my-mwaa-assets-bucket-sfj33ddkm"
    INPUT_KEY = "dags/WHO-COVID-19-global-data.csv"
    OUTPUT_KEY = "dags/count_death"
### get csv file ###
   response = s3.get_object(Bucket=S3_BUCKET, Key=INPUT_KEY)
   status = response['ResponseMetadata']['HTTPStatusCode']
   if status == 200:
### read csv file and filter based on the country to write back ###
       df = pd.read_csv(response.get("Body"))
       df.rename(columns={"Country_code": "country"}, inplace=True)
       filtered_df = df[df['country'] == COUNTRY]
       with io.StringIO() as csv_buffer:
                   filtered_df.to_csv(csv_buffer, index=False)
                   response = s3.put_object(
                       Bucket=S3_BUCKET, Key=OUTPUT_KEY + '_' + COUNTRY + '.csv', Body=csv_buffer.getvalue()
                   )
       status = response['ResponseMetadata']['HTTPStatusCode']
       if status == 200:
           print(f"Successful S3 put_object response. Status - {status}")
       else:
           print(f"Unsuccessful S3 put_object response. Status - {status}")
   else:
       print(f"Unsuccessful S3 get_object response. Status - {status}")

  1. Bearbeiten Sie das dynamic_dags.yml und speichern Sie es mit dem folgenden Codeinhalt, dann laden Sie die Datei wieder hoch dags Ordner. Wir fügen verschiedene DAGs je nach Land wie folgt zusammen:
    1. Definieren Sie die Standardargumente, die an alle DAGs übergeben werden.
    2. Erstellen Sie durch Übergabe eine DAG-Definition für einzelne Länder op_args
    3. Karte der process_s3_data funktionieren mit python_callable_name.
    4. Verwenden Sie die Python-Operator um im Amazon S3-Bucket gespeicherte CSV-Dateidaten zu verarbeiten.
    5. Wir haben eingestellt schedule_interval B. 10 Minuten, Sie können diesen Wert jedoch nach Bedarf anpassen.
default:
  default_args:
    owner: "airflow"
    start_date: "2024-03-01"
    retries: 1
    retry_delay_sec: 300
  concurrency: 1
  max_active_runs: 1
  dagrun_timeout_sec: 600
  default_view: "tree"
  orientation: "LR"
  schedule_interval: "*/10 * * * *"
 
module3_dynamic_dag_Australia:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Australia"
 
module3_dynamic_dag_Brazil:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Brazil"
 
module3_dynamic_dag_India:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "India"
 
module3_dynamic_dag_Japan:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Japan"
 
module3_dynamic_dag_Mexico:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Mexico"
 
module3_dynamic_dag_Russia:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Russia"
 
module3_dynamic_dag_Spain:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Spain"

  1. Bearbeiten Sie die Datei example_dag_factory.py und speichern Sie es mit dem folgenden Codeinhalt, dann laden Sie die Datei wieder hoch dags Ordner. Der Code bereinigt die vorhandenen DAGs und generiert sie clean_dags() Methode und das Erstellen neuer DAGs mithilfe der generate_dags() Methode aus dem DagFactory Beispiel.
from airflow import DAG
import dagfactory
  
config_file = "/usr/local/airflow/dags/dynamic_dags.yml"
example_dag_factory = dagfactory.DagFactory(config_file)
  
## to clean up or delete any existing DAGs ##
example_dag_factory.clean_dags(globals())
## generate and create new DAGs ##
example_dag_factory.generate_dags(globals())

  1. Nachdem Sie die Dateien hochgeladen haben, kehren Sie zur Airflow-UI-Konsole zurück und navigieren Sie zur Registerkarte „DAGs“, wo Sie neue DAGs finden.
    Listen Sie die neuen DAGs auf
  2. Sobald Sie die Dateien hochgeladen haben, kehren Sie zur Airflow-UI-Konsole zurück. Auf der Registerkarte „DAGs“ werden neue DAGs angezeigt, wie unten dargestellt:DAGs

Sie können DAGs aktivieren, indem Sie sie aktiv machen und einzeln testen. Bei der Aktivierung wird eine zusätzliche CSV-Datei namens count_death_{COUNTRY_CODE}.csv wird im dags-Ordner generiert.

Aufräumen

Mit der Nutzung der verschiedenen in diesem Beitrag besprochenen AWS-Dienste können Kosten verbunden sein. Um zu verhindern, dass in Zukunft Gebühren anfallen, löschen Sie die Amazon MWAA-Umgebung, nachdem Sie die in diesem Beitrag beschriebenen Aufgaben abgeschlossen haben, und leeren und löschen Sie den S3-Bucket.

Zusammenfassung

In diesem Blogbeitrag haben wir gezeigt, wie man das verwendet Dag-Fabrik Bibliothek zum Erstellen dynamischer DAGs. Dynamische DAGs zeichnen sich durch ihre Fähigkeit aus, bei jedem Parsen der DAG-Datei basierend auf Konfigurationen Ergebnisse zu generieren. Erwägen Sie die Verwendung dynamischer DAGs in den folgenden Szenarien:

  • Automatisierung der Migration von einem Altsystem zu Airflow, wo Flexibilität bei der DAG-Generierung von entscheidender Bedeutung ist
  • Situationen, in denen sich nur ein Parameter zwischen verschiedenen DAGs ändert, wodurch der Workflow-Management-Prozess optimiert wird
  • Verwalten von DAGs, die auf die sich entwickelnde Struktur eines Quellsystems angewiesen sind, und sorgen so für Anpassungsfähigkeit an Änderungen
  • Etablieren Sie standardisierte Praktiken für DAGs in Ihrem Team oder Ihrer Organisation, indem Sie diese Blaupausen erstellen und so Konsistenz und Effizienz fördern
  • Umfassende YAML-basierte Deklarationen statt komplexer Python-Codierung, wodurch DAG-Konfigurations- und Wartungsprozesse vereinfacht werden
  • Erstellen Sie datengesteuerte Arbeitsabläufe, die sich auf der Grundlage der Dateneingaben anpassen und weiterentwickeln und so eine effiziente Automatisierung ermöglichen

Durch die Integration dynamischer DAGs in Ihren Workflow können Sie die Automatisierung, Anpassungsfähigkeit und Standardisierung verbessern und letztendlich die Effizienz und Effektivität Ihres Datenpipeline-Managements verbessern.

Weitere Informationen zur Amazon MWAA DAG Factory finden Sie unter Amazon MWAA für Analytics-Workshop: DAG Factory. Weitere Details und Codebeispiele zu Amazon MWAA finden Sie unter Amazon MWAA-Benutzerhandbuch und für Amazon MWAA-Beispiele GitHub Repository.


Über die Autoren

 Jayesh Shinde ist Senior Application Architect bei AWS ProServe India. Er ist auf die Entwicklung verschiedener Cloud-zentrierter Lösungen spezialisiert und nutzt moderne Softwareentwicklungspraktiken wie Serverless, DevOps und Analytics.

Harshd Yeola ist Sr. Cloud Architect bei AWS ProServe India und unterstützt Kunden bei der Migration und Modernisierung ihrer Infrastruktur in AWS. Er ist auf den Aufbau von DevSecOps und skalierbarer Infrastruktur mithilfe von Containern, AIOPs sowie AWS-Entwicklertools und -Services spezialisiert.

spot_img

Neueste Intelligenz

spot_img