Zephyrnet-Logo

Orchestrieren Sie eine End-to-End-ETL-Pipeline mit Amazon S3, AWS Glue und Amazon Redshift Serverless mit Amazon MWAA | Amazon Web Services

Datum:

Von Amazon verwaltete Workflows für Apache Airflow (Amazon MWAA) ist ein verwalteter Orchestrierungsdienst für Apache-Luftstrom mit dem Sie Datenpipelines in der Cloud im großen Maßstab einrichten und betreiben können. Apache Airflow ist ein Open-Source-Tool zum programmgesteuerten Erstellen, Planen und Überwachen von Prozess- und Aufgabensequenzen, genannt Workflows. Mit Amazon MWAA können Sie Apache Airflow und Python verwenden, um Workflows zu erstellen, ohne die zugrunde liegende Infrastruktur für Skalierbarkeit, Verfügbarkeit und Sicherheit verwalten zu müssen.

Durch die Verwendung mehrerer AWS-Konten können Unternehmen ihre Arbeitslasten effektiv skalieren und deren Komplexität bewältigen, wenn sie wachsen. Dieser Ansatz bietet einen robusten Mechanismus, um die potenziellen Auswirkungen von Störungen oder Ausfällen abzumildern und sicherzustellen, dass kritische Workloads betriebsbereit bleiben. Darüber hinaus ermöglicht es eine Kostenoptimierung durch die Ausrichtung der Ressourcen auf bestimmte Anwendungsfälle und stellt so sicher, dass die Ausgaben gut kontrolliert werden. Durch die Isolierung von Workloads mit spezifischen Sicherheitsanforderungen oder Compliance-Anforderungen können Unternehmen ein Höchstmaß an Datenschutz und Sicherheit gewährleisten. Darüber hinaus können Sie durch die Möglichkeit, mehrere AWS-Konten strukturiert zu organisieren, Ihre Geschäftsprozesse und Ressourcen an Ihre individuellen betrieblichen, behördlichen und budgetären Anforderungen anpassen. Dieser Ansatz fördert Effizienz, Flexibilität und Skalierbarkeit und ermöglicht es großen Unternehmen, ihre sich ändernden Anforderungen zu erfüllen und ihre Ziele zu erreichen.

In diesem Beitrag wird gezeigt, wie Sie mithilfe von eine End-to-End-ETL-Pipeline (Extrahieren, Transformieren und Laden) orchestrieren Amazon Simple Storage-Service (Amazon S3), AWS-Kleber und Amazon Redshift ohne Server mit Amazon MWAA.

Lösungsüberblick

In diesem Beitrag betrachten wir einen Anwendungsfall, bei dem ein Datenentwicklungsteam einen ETL-Prozess aufbauen und seinen Endbenutzern die beste Erfahrung bieten möchte, wenn sie die neuesten Daten abfragen möchten, nachdem neue Rohdateien zu Amazon S3 in der Zentrale hinzugefügt wurden Konto (Konto A im folgenden Architekturdiagramm). Das Data-Engineering-Team möchte die Rohdaten in sein eigenes AWS-Konto (Konto B im Diagramm) aufteilen, um die Sicherheit und Kontrolle zu erhöhen. Sie möchten die Datenverarbeitungs- und Transformationsarbeiten auch in ihrem eigenen Konto (Konto B) durchführen, um Aufgaben zu unterteilen und unbeabsichtigte Änderungen an den Quellrohdaten im zentralen Konto (Konto A) zu verhindern. Dieser Ansatz ermöglicht es dem Team, die von Konto A extrahierten Rohdaten in Konto B zu verarbeiten, das für Datenverarbeitungsaufgaben vorgesehen ist. Dadurch wird sichergestellt, dass die Roh- und verarbeiteten Daten bei Bedarf sicher getrennt über mehrere Konten verwaltet werden können, um die Datenverwaltung und -sicherheit zu verbessern.

Unsere Lösung verwendet eine von Amazon MWAA orchestrierte End-to-End-ETL-Pipeline, die an einem Amazon S3-Speicherort in Konto A, wo die Rohdaten vorhanden sind, nach neuen inkrementellen Dateien sucht. Dies erfolgt durch Aufrufen von AWS Glue ETL-Jobs und Schreiben in Datenobjekte in einem Redshift Serverless-Cluster in Konto B. Die Pipeline beginnt dann mit der Ausführung Gespeicherte Prozeduren und SQL-Befehle auf Redshift Serverless. Sobald die Ausführung der Abfragen abgeschlossen ist, wird ein ENTLADEN Der Vorgang wird vom Redshift-Data-Warehouse zum S3-Bucket in Konto A aufgerufen.

Da Sicherheit wichtig ist, behandelt dieser Beitrag auch die Konfiguration einer Airflow-Verbindung mit AWS Secrets Manager um das Speichern von Datenbankanmeldeinformationen in Airflow-Verbindungen und -Variablen zu vermeiden.

Das folgende Diagramm veranschaulicht die Architekturübersicht der an der Orchestrierung des Workflows beteiligten Komponenten.

Der Workflow besteht aus folgenden Komponenten:

  • Die Quell- und Ziel-S3-Buckets befinden sich in einem zentralen Konto (Konto A), während sich Amazon MWAA, AWS Glue und Amazon Redshift in einem anderen Konto (Konto B) befinden. Zwischen S3-Buckets in Konto A und Ressourcen in Konto B wurde ein kontoübergreifender Zugriff eingerichtet, um Daten laden und entladen zu können.
  • Im zweiten Konto wird Amazon MWAA in einer VPC und Redshift Serverless in einer anderen VPC gehostet, die über VPC-Peering verbunden sind. Eine Redshift Serverless-Arbeitsgruppe ist in privaten Subnetzen über drei Availability Zones hinweg gesichert.
  • Geheimnisse wie Benutzername, Passwort, DB-Port und AWS-Region für Redshift Serverless werden im Secrets Manager gespeichert.
  • VPC-Endpunkte werden für Amazon S3 und Secrets Manager erstellt, um mit anderen Ressourcen zu interagieren.
  • Normalerweise erstellen Dateningenieure einen Airflow Directed Asymmetric Graph (DAG) und übermitteln ihre Änderungen an GitHub. Mit GitHub-Aktionen werden sie in einem S3-Bucket in Konto B bereitgestellt (für diesen Beitrag laden wir die Dateien direkt in den S3-Bucket hoch). Der S3-Bucket speichert Airflow-bezogene Dateien wie DAG-Dateien, requirements.txt Dateien und Plugins. AWS Glue ETL-Skripte und Assets werden in einem anderen S3-Bucket gespeichert. Diese Trennung trägt dazu bei, die Organisation aufrechtzuerhalten und Verwirrung zu vermeiden.
  • Der Airflow DAG nutzt verschiedene Operatoren, Sensoren, Verbindungen, Aufgaben und Regeln, um die Datenpipeline nach Bedarf auszuführen.
  • Die Airflow-Protokolle werden angemeldet Amazon CloudWatch, und Warnungen können für Überwachungsaufgaben konfiguriert werden. Weitere Informationen finden Sie unter Überwachung von Dashboards und Alarmen auf Amazon MWAA.

Voraussetzungen:

Da sich diese Lösung auf die Verwendung von Amazon MWAA zur Orchestrierung der ETL-Pipeline konzentriert, müssen Sie zuvor bestimmte grundlegende Ressourcen kontoübergreifend einrichten. Insbesondere müssen Sie die S3-Buckets und -Ordner, AWS Glue-Ressourcen und Redshift Serverless-Ressourcen in ihren jeweiligen Konten erstellen, bevor Sie die vollständige Workflow-Integration mit Amazon MWAA implementieren.

Stellen Sie Ressourcen in Konto A mithilfe von AWS CloudFormation bereit

Starten Sie in Konto A das bereitgestellte AWS CloudFormation Stack, um die folgenden Ressourcen zu erstellen:

  • Die Quell- und Ziel-S3-Buckets und -Ordner. Als Best Practice werden die Eingabe- und Ausgabe-Bucket-Strukturen mit der Hive-Partitionierung als formatiert s3://<bucket>/products/YYYY/MM/DD/.
  • Ein Beispieldatensatz namens products.csv, die wir in diesem Beitrag verwenden.

Laden Sie den AWS Glue-Auftrag auf Amazon S3 in Konto B hoch

Erstellen Sie in Konto B einen Amazon S3-Standort mit dem Namen aws-glue-assets-<account-id>-<region>/scripts (falls nicht vorhanden). Ersetzen Sie die Parameter für die Konto-ID und die Region im sample_glue_job.py Skript und laden Sie die AWS Glue-Auftragsdatei an den Amazon S3-Speicherort hoch.

Stellen Sie Ressourcen in Konto B mithilfe von AWS CloudFormation bereit

Starten Sie in Konto B die bereitgestellte CloudFormation-Stack-Vorlage, um die folgenden Ressourcen zu erstellen:

  • Der S3-Eimer airflow-<username>-bucket zum Speichern von Airflow-bezogenen Dateien mit der folgenden Struktur:
    • Dags – Der Ordner für DAG-Dateien.
    • Plugins – Die Datei für alle benutzerdefinierten oder Community-Airflow-Plugins.
    • Anforderungen - Die requirements.txt Datei für alle Python-Pakete.
    • Skripte – Alle im DAG verwendeten SQL-Skripte.
    • technische Daten – Alle im DAG verwendeten Datensätze.
  • Eine serverlose Redshift-Umgebung. Dem Namen der Arbeitsgruppe und des Namespace wird ein Präfix vorangestellt sample.
  • Eine AWS Glue-Umgebung, die Folgendes enthält:
    • Ein AWS-Kleber Crawler, das die Daten aus dem S3-Quell-Bucket crawlt sample-inp-bucket-etl-<username> in Konto A.
    • Eine Datenbank namens products_db im AWS Glue-Datenkatalog.
    • Ein ELT Job namens sample_glue_job. Dieser Job kann Dateien aus dem lesen products Tabelle im Datenkatalog und laden Sie Daten in die Redshift-Tabelle products.
  • Ein VPC-Gateway-Endpunkt zu Amazon S3.
  • Eine Amazon MWAA-Umgebung. Ausführliche Schritte zum Erstellen einer Amazon MWAA-Umgebung mithilfe der Amazon MWAA-Konsole finden Sie unter Einführung von Amazon Managed Workflows für Apache Airflow (MWAA).

Startstapel 1

Erstellen Sie Amazon Redshift-Ressourcen

Erstellen Sie zwei Tabellen und eine gespeicherte Prozedur in einer Redshift Serverless-Arbeitsgruppe mit products.sql Datei.

In diesem Beispiel erstellen wir zwei Tabellen namens products und products_f. Der Name der gespeicherten Prozedur lautet sp_products.

Konfigurieren Sie Airflow-Berechtigungen

Nachdem die Amazon MWAA-Umgebung erfolgreich erstellt wurde, wird der Status als angezeigt Verfügbar. Wählen Sie Öffnen Sie die Airflow-Benutzeroberfläche , um die Airflow-Benutzeroberfläche anzuzeigen. DAGs werden automatisch aus dem S3-Bucket synchronisiert und sind in der Benutzeroberfläche sichtbar. Zu diesem Zeitpunkt befinden sich jedoch keine DAGs im S3-Ordner.

Fügen Sie die vom Kunden verwaltete Richtlinie hinzu AmazonMWAAFullConsoleAccess, wodurch Airflow-Benutzern Zugriffsberechtigungen gewährt werden AWS Identity and Access Management and (IAM)-Ressourcen und fügen Sie diese Richtlinie der Amazon MWAA-Rolle hinzu. Weitere Informationen finden Sie unter Zugriff auf eine Amazon MWAA-Umgebung.

Die mit der Amazon MWAA-Rolle verbundenen Richtlinien haben vollen Zugriff und dürfen nur zu Testzwecken in einer sicheren Testumgebung verwendet werden. Befolgen Sie für Produktionsbereitstellungen das Prinzip der geringsten Rechte.

Richten Sie die Umgebung ein

In diesem Abschnitt werden die Schritte zum Konfigurieren der Umgebung beschrieben. Der Prozess umfasst die folgenden allgemeinen Schritte:

  1. Aktualisieren Sie alle erforderlichen Anbieter.
  2. Kontoübergreifenden Zugriff einrichten.
  3. Stellen Sie eine VPC-Peering-Verbindung zwischen der Amazon MWAA VPC und der Amazon Redshift VPC her.
  4. Konfigurieren Sie Secrets Manager für die Integration mit Amazon MWAA.
  5. Definieren Sie Luftstromverbindungen.

Aktualisieren Sie die Anbieter

Befolgen Sie die Schritte in diesem Abschnitt, wenn Ihre Version von Amazon MWAA niedriger als 2.8.1 ist (die neueste Version zum Zeitpunkt des Verfassens dieses Beitrags).

Anbieter sind Pakete, die von der Community verwaltet werden und alle Kernoperatoren, Hooks und Sensoren für einen bestimmten Dienst enthalten. Der Amazon-Anbieter wird für die Interaktion mit AWS-Diensten wie Amazon S3, Amazon Redshift Serverless, AWS Glue und mehr verwendet. Innerhalb des Amazon-Anbieters gibt es über 200 Module.

Obwohl die in Amazon MWAA unterstützte Airflow-Version 2.6.3 ist, die im Lieferumfang der von Amazon bereitgestellten Paketversion 8.2.0 enthalten ist, wurde die Unterstützung für Amazon Redshift Serverless erst mit der von Amazon bereitgestellten Paketversion 8.4.0 hinzugefügt. Da die standardmäßig gebündelte Anbieterversion älter ist als zum Zeitpunkt der Einführung der Redshift Serverless-Unterstützung, muss die Anbieterversion aktualisiert werden, um diese Funktionalität nutzen zu können.

Der erste Schritt besteht darin, die Einschränkungsdatei zu aktualisieren und requirements.txt Datei mit den richtigen Versionen. Beziehen auf Angabe neuerer Anbieterpakete Schritte zum Aktualisieren des Amazon-Anbieterpakets.

  1. Geben Sie die Anforderungen wie folgt an:
    --constraint "/usr/local/airflow/dags/constraints-3.10-mod.txt"
    apache-airflow-providers-amazon==8.4.0

  2. Aktualisieren Sie die Version in der Einschränkungsdatei auf 8.4.0 oder höher.
  3. Fügen Sie Einschränkungen-3.11-aktualisiert.txt Datei an die /dags -Ordner.

Beziehen auf Apache Airflow-Versionen in Amazon Managed Workflows für Apache Airflow für korrekte Versionen der Einschränkungsdatei abhängig von der Airflow-Version.

  1. Navigieren Sie zur Amazon MWAA-Umgebung und wählen Sie Bearbeiten.
  2. Der DAG-Code in Amazon S3Z. Anforderungsdatei, wählen Sie die neueste Version.
  3. Auswählen Speichern.

Dadurch wird die Umgebung aktualisiert und neue Anbieter werden wirksam.

  1. Um die Version des Anbieters zu überprüfen, gehen Sie zu Anbieter unter dem Administrator Tabelle.

Die Version für das Amazon-Anbieterpaket sollte 8.4.0 sein, wie im folgenden Screenshot gezeigt. Wenn nicht, ist beim Laden ein Fehler aufgetreten requirements.txt. Um etwaige Fehler zu beheben, gehen Sie zur CloudWatch-Konsole und öffnen Sie die requirements_install_ip Einloggen Streams protokollieren, wo Fehler aufgelistet sind. Beziehen auf Aktivieren von Protokollen auf der Amazon MWAA-Konsole für weitere Informationen an.

Kontoübergreifenden Zugriff einrichten

Sie müssen kontoübergreifende Richtlinien und Rollen zwischen Konto A und Konto B einrichten, um auf die S3-Buckets zugreifen und Daten laden und entladen zu können. Führen Sie die folgenden Schritte aus:

  1. Konfigurieren Sie in Konto A die Bucket-Richtlinie für den Bucket sample-inp-bucket-etl-<username> um den AWS Glue- und Amazon MWAA-Rollen in Konto B Berechtigungen für Objekte im Bucket zu erteilen sample-inp-bucket-etl-<username>:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": [
                        "arn:aws:iam::<account-id-of- AcctB>:role/service-role/<Glue-role>",
                        "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                    ]
                },
                "Action": [
                    "s3:GetObject",
    "s3:PutObject",
    		   "s3:PutObjectAcl",
    		   "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  2. Konfigurieren Sie auf ähnliche Weise die Bucket-Richtlinie für den Bucket sample-opt-bucket-etl-<username> So erteilen Sie Amazon MWAA-Rollen in Konto B Berechtigungen zum Einfügen von Objekten in diesen Bucket:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                },
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl",
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  3. Erstellen Sie in Konto A eine IAM-Richtlinie mit dem Namen policy_for_roleA, was die erforderlichen Amazon S3-Aktionen für den Ausgabe-Bucket ermöglicht:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "kms:Decrypt",
                    "kms:Encrypt",
                    "kms:GenerateDataKey"
                ],
                "Resource": [
                    "<KMS_KEY_ARN_Used_for_S3_encryption>"
                ]
            },
            {
                "Sid": "VisualEditor1",
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:GetBucketAcl",
                    "s3:GetBucketCors",
                    "s3:GetEncryptionConfiguration",
                    "s3:GetBucketLocation",
                    "s3:ListAllMyBuckets",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:ListBucketVersions",
                    "s3:ListMultipartUploadParts"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*"
                ]
            }
        ]
    }

  4. Erstellen Sie eine neue IAM-Rolle mit dem Namen RoleA mit Konto B als vertrauenswürdige Entitätsrolle und fügen Sie diese Richtlinie der Rolle hinzu. Dadurch kann Konto B die Rolle A übernehmen, um die erforderlichen Amazon S3-Aktionen für den Ausgabe-Bucket auszuführen.
  5. Erstellen Sie in Konto B eine IAM-Richtlinie mit dem Namen s3-cross-account-access mit der Berechtigung, auf Objekte im Bucket zuzugreifen sample-inp-bucket-etl-<username>, das sich auf Konto A befindet.
  6. Fügen Sie diese Richtlinie zur AWS Glue-Rolle und der Amazon MWAA-Rolle hinzu:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl"
                ],
                "Resource": "arn:aws:s3:::sample-inp-bucket-etl-<username>/*"
            }
        ]
    }

  7. Erstellen Sie in Konto B die IAM-Richtlinie policy_for_roleB Angabe von Konto A als vertrauenswürdige Entität. Das Folgende ist die Vertrauensrichtlinie, die angenommen werden sollte RoleA in Konto A:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "CrossAccountPolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::<account-id-of-AcctA>:role/RoleA"
            }
        ]
    }

  8. Erstellen Sie eine neue IAM-Rolle mit dem Namen RoleB mit Amazon Redshift als vertrauenswürdigem Entitätstyp und fügen Sie diese Richtlinie der Rolle hinzu. Dies erlaubt RoleB annehmen RoleA in Konto A und auch von Amazon Redshift übernommen werden.
  9. Anfügen RoleB in den Redshift Serverless-Namespace, sodass Amazon Redshift Objekte in den S3-Ausgabe-Bucket in Konto A schreiben kann.
  10. Anhängen der Richtlinie policy_for_roleB zur Amazon MWAA-Rolle, die Amazon MWAA den Zugriff auf den Ausgabe-Bucket in Konto A ermöglicht.

Beziehen auf Wie gewähre ich kontoübergreifenden Zugriff auf Objekte, die sich in Amazon S3-Buckets befinden? Weitere Informationen zum Einrichten des kontoübergreifenden Zugriffs auf Objekte in Amazon S3 über AWS Glue und Amazon MWAA. Beziehen auf Wie kann ich Daten von Amazon Redshift in einen Amazon S3-Bucket in einem anderen Konto KOPIEREN oder ENTLADEN? Weitere Informationen zum Einrichten von Rollen zum Entladen von Daten von Amazon Redshift nach Amazon S3 von Amazon MWAA.

Richten Sie VPC-Peering zwischen den Amazon MWAA- und Amazon Redshift-VPCs ein

Da sich Amazon MWAA und Amazon Redshift in zwei separaten VPCs befinden, müssen Sie VPC-Peering zwischen ihnen einrichten. Sie müssen den Routing-Tabellen, die den Subnetzen für beide Dienste zugeordnet sind, eine Route hinzufügen. Beziehen auf Arbeiten Sie mit VPC-Peering-Verbindungen Einzelheiten zum VPC-Peering finden Sie hier.

Stellen Sie sicher, dass der CIDR-Bereich der Amazon MWAA VPC in der Redshift-Sicherheitsgruppe zulässig ist und der CIDR-Bereich der Amazon Redshift VPC in der Amazon MWAA-Sicherheitsgruppe zulässig ist, wie im folgenden Screenshot gezeigt.

Wenn einer der vorherigen Schritte falsch konfiguriert ist, wird bei der DAG-Ausführung wahrscheinlich ein „Verbindungs-Timeout“-Fehler auftreten.

Konfigurieren Sie die Amazon MWAA-Verbindung mit Secrets Manager

Wenn die Amazon MWAA-Pipeline für die Verwendung von Secrets Manager konfiguriert ist, sucht sie zunächst in einem alternativen Backend (wie Secrets Manager) nach Verbindungen und Variablen. Wenn das alternative Backend den benötigten Wert enthält, wird er zurückgegeben. Andernfalls wird die Metadatendatenbank auf den Wert überprüft und stattdessen dieser zurückgegeben. Weitere Einzelheiten finden Sie unter Konfigurieren einer Apache Airflow-Verbindung mithilfe eines AWS Secrets Manager-Geheimnisses.

Führen Sie die folgenden Schritte aus:

  1. Konfigurieren Sie a VPC-Endpunkt um Amazon MWAA und Secrets Manager zu verknüpfen (com.amazonaws.us-east-1.secretsmanager).

Dadurch kann Amazon MWAA auf die im Secrets Manager gespeicherten Anmeldeinformationen zugreifen.

  1. Um Amazon MWAA die Berechtigung zum Zugriff auf die geheimen Schlüssel von Secrets Manager zu erteilen, fügen Sie die Richtlinie mit dem Namen hinzu SecretsManagerReadWrite zur IAM-Rolle der Umgebung.
  2. Um das Secrets Manager-Backend als Apache Airflow-Konfigurationsoption zu erstellen, gehen Sie zu den Airflow-Konfigurationsoptionen, fügen Sie die folgenden Schlüssel-Wert-Paare hinzu und speichern Sie Ihre Einstellungen.

Dadurch wird Airflow so konfiguriert, dass es nach Verbindungszeichenfolgen und Variablen sucht airflow/connections/* und airflow/variables/* Wege:

secrets.backend: airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend Secrets.backend_kwargs: {"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables"}

  1. Um eine Airflow-Verbindungs-URI-Zeichenfolge zu generieren, gehen Sie zu AWS CloudShell und geben Sie eine Python-Shell ein.
  2. Führen Sie den folgenden Code aus, um die Verbindungs-URI-Zeichenfolge zu generieren:
    import urllib.parse
    conn_type = 'redshift'
    host = 'sample-workgroup.<account-id-of-AcctB>.us-east-1.redshift-serverless.amazonaws.com' #Specify the Amazon Redshift workgroup endpoint
    port = '5439'
    login = 'admin' #Specify the username to use for authentication with Amazon Redshift
    password = '<password>' #Specify the password to use for authentication with Amazon Redshift
    role_arn = urllib.parse.quote_plus('arn:aws:iam::<account_id>:role/service-role/<MWAA-role>')
    database = 'dev'
    region = 'us-east-1' #YOUR_REGION
    conn_string = '{0}://{1}:{2}@{3}:{4}?role_arn={5}&database={6}&region={7}'.format(conn_type, login, password, host, port, role_arn, database, region)
    print(conn_string)
    

Die Verbindungszeichenfolge sollte wie folgt generiert werden:

redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=<region>

  1. Fügen Sie die Verbindung im Secrets Manager hinzu, indem Sie den folgenden Befehl im verwenden AWS-Befehlszeilenschnittstelle (AWS-CLI).

Dies kann auch über die Secrets Manager-Konsole erfolgen. Dies wird im Secrets Manager als Klartext hinzugefügt.

aws secretsmanager create-secret --name airflow/connections/secrets_redshift_connection --description "Apache Airflow to Redshift Cluster" --secret-string "redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=us-east-1" --region=us-east-1

Nutzen Sie die Verbindung airflow/connections/secrets_redshift_connection in der DAG. Wenn der DAG ausgeführt wird, sucht er nach dieser Verbindung und ruft die Geheimnisse vom Secrets Manager ab. Im Falle von RedshiftDataOperator, pass die secret_arn als Parameter anstelle des Verbindungsnamens.

Sie können Geheimnisse auch über die Secrets Manager-Konsole als Schlüssel-Wert-Paare hinzufügen.

  1. Fügen Sie im Secrets Manager ein weiteres Geheimnis hinzu und speichern Sie es unter airflow/connections/redshift_conn_test.

Erstellen Sie eine Airflow-Verbindung über die Metadatendatenbank

Sie können auch Verbindungen in der Benutzeroberfläche erstellen. In diesem Fall werden die Verbindungsdetails in einer Airflow-Metadatendatenbank gespeichert. Wenn die Amazon MWAA-Umgebung nicht für die Verwendung des Secrets Manager-Backends konfiguriert ist, überprüft sie die Metadatendatenbank auf den Wert und gibt diesen zurück. Sie können eine Airflow-Verbindung über die Benutzeroberfläche, AWS CLI oder API erstellen. In diesem Abschnitt zeigen wir, wie Sie mithilfe der Airflow-Benutzeroberfläche eine Verbindung herstellen.

  1. Aussichten für Verbindungs-IDGeben Sie einen Namen für die Verbindung ein.
  2. Aussichten für Verbindungstyp, wählen Amazon RedShift.
  3. Aussichten für GastgeberGeben Sie den Redshift-Endpunkt (ohne Port und Datenbank) für Redshift Serverless ein.
  4. Aussichten für Datenbase, eingeben dev.
  5. Aussichten für Mitglied, geben Sie Ihren Admin-Benutzernamen ein.
  6. Aussichten für Passwort, geben Sie Ihr Passwort ein.
  7. Aussichten für Hafen, verwenden Sie Port 5439.
  8. Aussichten für ExtraStellen Sie die region und timeout Parameter.
  9. Testen Sie die Verbindung und speichern Sie dann Ihre Einstellungen.

Erstellen Sie eine DAG und führen Sie sie aus

In diesem Abschnitt beschreiben wir, wie Sie mit verschiedenen Komponenten eine DAG erstellen. Nachdem Sie den DAG erstellt und ausgeführt haben, können Sie die Ergebnisse überprüfen, indem Sie Redshift-Tabellen abfragen und die Ziel-S3-Buckets überprüfen.

Erstellen Sie einen DAG

In Airflow werden Datenpipelines im Python-Code als DAGs definiert. Wir erstellen eine DAG, die aus verschiedenen Operatoren, Sensoren, Verbindungen, Aufgaben und Regeln besteht:

  • Der DAG beginnt mit der Suche nach Quelldateien im S3-Bucket sample-inp-bucket-etl-<username> unter Konto A für den aktuellen Tag verwenden S3KeySensor. S3KeySensor wird verwendet, um darauf zu warten, dass ein oder mehrere Schlüssel in einem S3-Bucket vorhanden sind.
    • Unser S3-Bucket ist beispielsweise partitioniert als s3://bucket/products/YYYY/MM/DD/, daher sollte unser Sensor nach Ordnern mit dem aktuellen Datum suchen. Wir haben das aktuelle Datum im DAG abgeleitet und an übergeben S3KeySensor, das nach neuen Dateien im aktuellen Tagesordner sucht.
    • Wir haben auch eingestellt wildcard_match as True, was die Suche ermöglicht bucket_key als Unix-Wildcard-Muster zu interpretieren. Stellen Sie die ein mode zu reschedule sodass die Sensoraufgabe den Worker-Slot freigibt, wenn die Kriterien nicht erfüllt sind, und sie zu einem späteren Zeitpunkt neu geplant wird. Es empfiehlt sich, diesen Modus zu verwenden, wenn poke_interval ist länger als 1 Minute, um eine Überlastung des Planers zu verhindern.
  • Nachdem die Datei im S3-Bucket verfügbar ist, wird der AWS Glue-Crawler mit ausgeführt GlueCrawlerOperator um den S3-Quell-Bucket zu crawlen sample-inp-bucket-etl-<username> unter Konto A und aktualisiert die Tabellenmetadaten unter products_db Datenbank im Datenkatalog. Der Crawler verwendet die AWS Glue-Rolle und die Datenkatalogdatenbank, die in den vorherigen Schritten erstellt wurden.
  • Die DAG verwendet GlueCrawlerSensor warten, bis der Crawler abgeschlossen ist.
  • Wenn der Crawler-Job abgeschlossen ist, GlueJobOperator wird zum Ausführen des AWS Glue-Jobs verwendet. Der Name des AWS Glue-Skripts (zusammen mit dem Standort) wird zusammen mit der AWS Glue IAM-Rolle an den Operator übergeben. Andere Parameter wie GlueVersion, NumberofWorkers und WorkerType werden mit übergeben create_job_kwargs Parameters.
  • Die DAG verwendet GlueJobSensor um auf den Abschluss des AWS Glue-Auftrags zu warten. Wenn es fertig ist, ist die Redshift-Staging-Tabelle products wird mit Daten aus der S3-Datei geladen.
  • Sie können von Airflow aus auf drei verschiedene Arten eine Verbindung zu Amazon Redshift herstellen Betreiber:
    • PythonOperator.
    • SQLExecuteQueryOperator, das eine PostgreSQL-Verbindung verwendet und redshift_default als Standardverbindung.
    • RedshiftDataOperator, das die Redshift Data API verwendet und aws_default als Standardverbindung.

In unserer DAG verwenden wir SQLExecuteQueryOperator und RedshiftDataOperator um zu zeigen, wie diese Operatoren verwendet werden. Die gespeicherten Redshift-Prozeduren werden ausgeführt RedshiftDataOperator. Die DAG führt auch SQL-Befehle in Amazon Redshift aus, um die Daten mithilfe von aus der Staging-Tabelle zu löschen SQLExecuteQueryOperator.

Da wir unsere Amazon MWAA-Umgebung so konfiguriert haben, dass sie im Secrets Manager nach Verbindungen sucht, ruft der DAG beim Ausführen die Redshift-Verbindungsdetails wie Benutzername, Passwort, Host, Port und Region vom Secrets Manager ab. Wenn die Verbindung im Secrets Manager nicht gefunden wird, werden die Werte von den Standardverbindungen abgerufen.

In SQLExecuteQueryOperatorübergeben wir den Verbindungsnamen, den wir im Secrets Manager erstellt haben. Es sucht airflow/connections/secrets_redshift_connection und ruft die Geheimnisse vom Secrets Manager ab. Wenn Secrets Manager nicht eingerichtet ist, wird die Verbindung manuell erstellt (z. B. redshift-conn-id) übergeben werden kann.

In RedshiftDataOperator, wir übergeben den Secret_arn des airflow/connections/redshift_conn_test Verbindung, die im Secrets Manager erstellt wurde, als Parameter.

  • Als letzte Aufgabe RedshiftToS3Operator wird verwendet, um Daten aus der Redshift-Tabelle in einen S3-Bucket zu entladen sample-opt-bucket-etl in Konto B. airflow/connections/redshift_conn_test vom Secrets Manager wird zum Entladen der Daten verwendet.
  • TriggerRule eingestellt ist ALL_DONE, wodurch der nächste Schritt ausgeführt werden kann, nachdem alle Upstream-Aufgaben abgeschlossen sind.
  • Die Abhängigkeit von Aufgaben wird über definiert chain() Funktion, die bei Bedarf die parallele Ausführung von Aufgaben ermöglicht. In unserem Fall möchten wir, dass alle Aufgaben nacheinander ausgeführt werden.

Das Folgende ist der vollständige DAG-Code. Der dag_id sollte mit dem Namen des DAG-Skripts übereinstimmen, andernfalls wird es nicht mit der Airflow-Benutzeroberfläche synchronisiert.

from datetime import datetime
from airflow import DAG 
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
from airflow.utils.trigger_rule import TriggerRule


dag_id = "data_pipeline"
vYear = datetime.today().strftime("%Y")
vMonth = datetime.today().strftime("%m")
vDay = datetime.today().strftime("%d")
src_bucket_name = "sample-inp-bucket-etl-<username>"
tgt_bucket_name = "sample-opt-bucket-etl-<username>"
s3_folder="products"
#Please replace the variable with the glue_role_arn
glue_role_arn_key = "arn:aws:iam::<account_id>:role/<Glue-role>"
glue_crawler_name = "products"
glue_db_name = "products_db"
glue_job_name = "sample_glue_job"
glue_script_location="s3://aws-glue-assets-<account_id>-<region>/scripts/sample_glue_job.py"
workgroup_name = "sample-workgroup"
redshift_table = "products_f"
redshift_conn_id_name="secrets_redshift_connection"
db_name = "dev"
secret_arn="arn:aws:secretsmanager:us-east-1:<account_id>:secret:airflow/connections/redshift_conn_test-xxxx"
poll_interval = 10

@task
def get_role_name(arn: str) -> str:
    return arn.split("/")[-1]

@task
def get_s3_loc(s3_folder: str) -> str:
    s3_loc  = s3_folder + "/year=" + vYear + "/month=" + vMonth + "/day=" + vDay + "/*.csv"
    return s3_loc

with DAG(
    dag_id=dag_id,
    schedule="@once",
    start_date=datetime(2021, 1, 1),
    tags=["example"],
    catchup=False,
) as dag:
    role_arn = glue_role_arn_key
    glue_role_name = get_role_name(role_arn)
    s3_loc = get_s3_loc(s3_folder)


    # Check for new incremental files in S3 source/input bucket
    sensor_key = S3KeySensor(
        task_id="sensor_key",
        bucket_key=s3_loc,
        bucket_name=src_bucket_name,
        wildcard_match=True,
        #timeout=18*60*60,
        #poke_interval=120,
        timeout=60,
        poke_interval=30,
        mode="reschedule"
    )

    # Run Glue crawler
    glue_crawler_config = {
        "Name": glue_crawler_name,
        "Role": role_arn,
        "DatabaseName": glue_db_name,
    }

    crawl_s3 = GlueCrawlerOperator(
        task_id="crawl_s3",
        config=glue_crawler_config,
    )

    # GlueCrawlerOperator waits by default, setting as False to test the Sensor below.
    crawl_s3.wait_for_completion = False

    # Wait for Glue crawler to complete
    wait_for_crawl = GlueCrawlerSensor(
        task_id="wait_for_crawl",
        crawler_name=glue_crawler_name,
    )

    # Run Glue Job
    submit_glue_job = GlueJobOperator(
        task_id="submit_glue_job",
        job_name=glue_job_name,
        script_location=glue_script_location,
        iam_role_name=glue_role_name,
        create_job_kwargs={"GlueVersion": "4.0", "NumberOfWorkers": 10, "WorkerType": "G.1X"},
    )

    # GlueJobOperator waits by default, setting as False to test the Sensor below.
    submit_glue_job.wait_for_completion = False

    # Wait for Glue Job to complete
    wait_for_job = GlueJobSensor(
        task_id="wait_for_job",
        job_name=glue_job_name,
        # Job ID extracted from previous Glue Job Operator task
        run_id=submit_glue_job.output,
        verbose=True,  # prints glue job logs in airflow logs
    )

    wait_for_job.poke_interval = 5

    # Execute the Stored Procedure in Redshift Serverless using Data Operator
    execute_redshift_stored_proc = RedshiftDataOperator(
        task_id="execute_redshift_stored_proc",
        database=db_name,
        workgroup_name=workgroup_name,
        secret_arn=secret_arn,
        sql="""CALL sp_products();""",
        poll_interval=poll_interval,
        wait_for_completion=True,
    )

    # Execute the Stored Procedure in Redshift Serverless using SQL Operator
    delete_from_table = SQLExecuteQueryOperator(
        task_id="delete_from_table",
        conn_id=redshift_conn_id_name,
        sql="DELETE FROM products;",
        trigger_rule=TriggerRule.ALL_DONE,
    )

    # Unload the data from Redshift table to S3
    transfer_redshift_to_s3 = RedshiftToS3Operator(
        task_id="transfer_redshift_to_s3",
        s3_bucket=tgt_bucket_name,
        s3_key=s3_loc,
        schema="PUBLIC",
        table=redshift_table,
        redshift_conn_id=redshift_conn_id_name,
    )

    transfer_redshift_to_s3.trigger_rule = TriggerRule.ALL_DONE

    #Chain the tasks to be executed
    chain(
        sensor_key,
        crawl_s3,
        wait_for_crawl,
        submit_glue_job,
        wait_for_job,
        execute_redshift_stored_proc,
        delete_from_table,
        transfer_redshift_to_s3
        )
    

Überprüfen Sie die DAG-Ausführung

Nachdem Sie die DAG-Datei erstellt (die Variablen im DAG-Skript ersetzt) ​​und in die Datei hochgeladen haben s3://sample-airflow-instance/dags Ordner, wird er automatisch mit der Airflow-Benutzeroberfläche synchronisiert. Alle DAGs erscheinen auf der DAGs Tab. Schalten Sie die um ON Option, um die DAG ausführbar zu machen. Weil unser DAG darauf eingestellt ist schedule="@once", müssen Sie den Job manuell ausführen, indem Sie unten auf das Symbol „Ausführen“ klicken Aktionen. Wenn der DAG abgeschlossen ist, wird der Status in Grün aktualisiert, wie im folgenden Screenshot gezeigt.

Im Links Im Abschnitt gibt es Optionen zum Anzeigen des Codes, der Grafik, des Rasters, des Protokolls und mehr. Wählen Graph um die DAG in einem Diagrammformat zu visualisieren. Wie im folgenden Screenshot gezeigt, bezeichnet jede Farbe des Knotens einen bestimmten Operator und die Farbe der Knotenumrisse bezeichnet einen bestimmten Status.

Überprüfen Sie die Ergebnisse

Navigieren Sie in der Amazon Redshift-Konsole zu der Abfrage-Editor v2 und wählen Sie die Daten im aus products_f Tisch. Die Tabelle sollte geladen sein und die gleiche Anzahl an Datensätzen wie S3-Dateien haben.

Navigieren Sie auf der Amazon S3-Konsole zum S3-Bucket s3://sample-opt-bucket-etl in Konto B. Die product_f Dateien sollten unter der Ordnerstruktur erstellt werden s3://sample-opt-bucket-etl/products/YYYY/MM/DD/.

Aufräumen

Bereinigen Sie die im Rahmen dieses Beitrags erstellten Ressourcen, um laufende Kosten zu vermeiden:

  1. Löschen Sie die CloudFormation-Stacks und den S3-Bucket, die Sie als Voraussetzungen erstellt haben.
  2. Löschen Sie die VPCs und VPC-Peering-Verbindungen, kontoübergreifenden Richtlinien und Rollen sowie Geheimnisse im Secrets Manager.

Zusammenfassung

Mit Amazon MWAA können Sie mithilfe von Airflow und Python komplexe Arbeitsabläufe erstellen, ohne Cluster, Knoten oder anderen betrieblichen Aufwand zu verwalten, der normalerweise mit der Bereitstellung und Skalierung von Airflow in der Produktion verbunden ist. In diesem Beitrag haben wir gezeigt, wie Amazon MWAA eine automatisierte Möglichkeit bietet, Daten zwischen verschiedenen Konten und Diensten innerhalb von AWS aufzunehmen, umzuwandeln, zu analysieren und zu verteilen. Weitere Beispiele für andere AWS-Betreiber finden Sie im Folgenden GitHub-Repository; Wir ermutigen Sie, mehr zu erfahren, indem Sie einige dieser Beispiele ausprobieren.


Über die Autoren


Radhika Jakkula ist Big Data Prototyping Solutions Architect bei AWS. Sie hilft Kunden beim Erstellen von Prototypen mithilfe von AWS-Analysediensten und speziell entwickelten Datenbanken. Sie ist Spezialistin für die Bewertung einer Vielzahl von Anforderungen und die Anwendung relevanter AWS-Services, Big-Data-Tools und Frameworks zur Erstellung einer robusten Architektur.

Sidhanth Muralidhar ist Principal Technical Account Manager bei AWS. Er arbeitet mit großen Unternehmenskunden zusammen, die ihre Workloads auf AWS ausführen. Ihm liegt die Zusammenarbeit mit Kunden am Herzen und er unterstützt sie bei der Gestaltung von Workloads im Hinblick auf Kosten, Zuverlässigkeit, Leistung und betriebliche Exzellenz im richtigen Maßstab auf ihrem Weg in die Cloud. Er hat auch ein großes Interesse an Datenanalysen.

spot_img

Neueste Intelligenz

spot_img