Logotipo de Zephyrnet

Comience con la integración de datos de Amazon S3 a Amazon Redshift mediante sesiones interactivas de AWS Glue

Fecha:

Las organizaciones están otorgando una alta prioridad a la integración de datos, especialmente para respaldar el análisis, el aprendizaje automático (ML), la inteligencia comercial (BI) y las iniciativas de desarrollo de aplicaciones. Los datos están creciendo exponencialmente y son generados por fuentes de datos cada vez más diversas. La integración de datos se vuelve un desafío cuando se procesan datos a escala y el trabajo pesado inherente asociado con la infraestructura necesaria para administrarlo. Esta es una de las razones clave por las que las organizaciones buscan constantemente soluciones de integración de datos fáciles de usar y de bajo mantenimiento para mover datos de una ubicación a otra o para consolidar sus datos comerciales de varias fuentes en una ubicación centralizada para tomar decisiones comerciales estratégicas. .

La mayoría de las organizaciones usan Spark para sus necesidades de procesamiento de big data. Si está buscando simplificar la integración de datos y no quiere la molestia de poner en marcha servidores, administrar recursos o configurar clústeres de Spark, tenemos la solución para usted.

Pegamento AWS es un servicio de integración de datos sin servidor que facilita el descubrimiento, la preparación y la combinación de datos para análisis, aprendizaje automático y desarrollo de aplicaciones. AWS Glue proporciona interfaces visuales y basadas en código para que la integración de datos sea simple y accesible para todos.

Si prefiere una experiencia basada en código y desea crear trabajos de integración de datos de forma interactiva, le recomendamos sesiones interactivas. Las sesiones interactivas son una función de AWS Glue lanzada recientemente que le permite desarrollar procesos de AWS Glue de forma interactiva, ejecutar y probar cada paso y ver los resultados.

Existen diferentes opciones para utilizar sesiones interactivas. Puede crear y trabajar con sesiones interactivas a través del Interfaz de línea de comandos de AWS (AWS CLI) y API. También puede usar cuadernos compatibles con Jupyter para crear y probar visualmente los scripts de su cuaderno. Las sesiones interactivas proporcionan un kernel de Jupyter que se integra en casi cualquier lugar donde lo haga Jupyter, incluida la integración con IDE como PyCharm, IntelliJ y Visual Studio Code. Esto le permite crear código en su entorno local y ejecutarlo sin problemas en el backend de la sesión interactiva. También puede iniciar un cuaderno a través de AWS Glue Studio; todos los pasos de configuración se realizan por usted para que pueda explorar sus datos y comenzar a desarrollar su script de trabajo después de solo unos segundos. Cuando el código esté listo, puede configurar, programar y monitorear cuadernos de trabajo como trabajos de AWS Glue.

Si no ha probado las sesiones interactivas de AWS Glue antes, esta publicación es muy recomendable. Trabajamos a través de un escenario simple en el que es posible que necesite cargar datos de forma incremental desde Servicio de almacenamiento simple de Amazon (Amazon S3) en Desplazamiento al rojo de Amazon o transforme y enriquezca sus datos antes de cargarlos en Amazon Redshift. En esta publicación, usamos sesiones interactivas dentro de un cuaderno de AWS Glue Studio para cargar el Conjunto de datos de NYC Taxi en una Amazon Redshift sin servidor clúster, consulte el conjunto de datos cargado, guarde nuestro cuaderno Jupyter como un trabajo y programe su ejecución mediante una expresión cron. Empecemos.

Resumen de la solución

Lo guiamos a través de los siguientes pasos:

  1. Configure un cuaderno AWS Glue Jupyter con sesiones interactivas.
  2. Utilice la magia del cuaderno, incluida la conexión y los marcadores de AWS Glue.
  3. Lea datos de Amazon S3 y transfórmelos y cárguelos en Redshift Serverless.
  4. Guarde el cuaderno como un trabajo de AWS Glue y prográmelo para que se ejecute.

Requisitos previos

Para este tutorial, debemos completar los siguientes requisitos previos:

  1. Subir Datos de registros de viaje de taxi amarillo y del tabla de búsqueda de zona de taxis conjuntos de datos en Amazon S3. Los pasos para hacerlo se enumeran en la siguiente sección.
  2. Prepara lo necesario Gestión de identidades y accesos de AWS (IAM) políticas y roles para trabajar con AWS Glue Studio Jupyter notebooks, sesiones interactivas y AWS Glue.
  3. Cree la conexión de AWS Glue para Redshift Serverless.

Cargue conjuntos de datos en Amazon S3

Descargar Datos de registros de viaje de taxi amarillo y datos de la tabla de búsqueda de zona de rodaje a su entorno local. Para esta publicación, descargamos los datos de enero de 2022 para los registros de viaje de taxi amarillo en formato Parquet. Los datos de búsqueda de la zona de rodaje están en formato CSV. También puede descargar el Diccionario de datos para el conjunto de datos del registro de viaje.

  1. En la consola de Amazon S3, crear un cubo , que son my-first-aws-glue-is-project-<random number> existentes us-east-1 Región para almacenar los datos.Los nombres de los depósitos de S3 deben ser únicos en todas las cuentas de AWS en todas las regiones.
  2. Crear carpetas nyc_yellow_taxi y taxi_zone_lookup en el depósito que acaba de crear y cargue los archivos que descargó.
    Las estructuras de carpetas deben parecerse a las siguientes capturas de pantalla.datos del taxi amarillo s3datos de búsqueda s3

Preparar políticas y roles de IAM

Preparemos las políticas y el rol de IAM necesarios para trabajar con los cuadernos y las sesiones interactivas de AWS Glue Studio Jupyter. Para comenzar con los cuadernos en AWS Glue Studio, consulte Introducción a los cuadernos en AWS Glue Studio.

Cree políticas de IAM para el rol de cuaderno de AWS Glue

Crear la política AWSGlueInteractiveSessionPassRolePolicy con los siguientes permisos:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "iam:PassRole", "Resource":"arn:aws:iam::<AWS account ID>:role/AWSGlueServiceRole-GlueIS" } ]
}

Esta política permite que el rol del cuaderno de AWS Glue pase a sesiones interactivas para que se pueda usar el mismo rol en ambos lugares. Tenga en cuenta que AWSGlueServiceRole-GlueIS es el rol que creamos para el cuaderno Jupyter de AWS Glue Studio en un paso posterior. A continuación, cree la política. AmazonS3Access-MyFirstGlueISProject con los siguientes permisos:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::<your s3 bucket name>", "arn:aws:s3:::<your s3 bucket name>/*" ] } ]
}

Esta política permite que el rol del cuaderno de AWS Glue acceda a los datos en el depósito de S3.

Cree un rol de IAM para el cuaderno de AWS Glue

Cree un nuevo rol de AWS Glue llamado AWSGlueServiceRole-GlueIS con las siguientes pólizas adjuntas:

Cree la conexión de AWS Glue para Redshift Serverless

Ahora estamos listos para configurar un grupo de seguridad de Redshift Serverless para conectarse con los componentes de AWS Glue.

  1. En la consola de Redshift Serverless, abra el grupo de trabajo que está utilizando.
    Puede encontrar todos los espacios de nombres y grupos de trabajo en el panel de Redshift Serverless.
  2. under Acceso a los datos, escoger red y seguridad.
  3. Elija el enlace para el grupo de seguridad de VPC sin servidor de Redshift.grupo de seguridad vpc sin servidor redshiftEres redirigido a Nube informática elástica de Amazon (Amazon EC2) consola.
  4. En los detalles del grupo de seguridad de Redshift Serverless, en Reglas de entrada, escoger Editar reglas de entrada.
  5. Agregue una regla de autorreferencia para permitir que los componentes de AWS Glue se comuniquen:
    1. Tipo de Propiedad, escoger Todo TCP.
    2. Protocolo, escoger TCP.
    3. Rango de puertos, incluir todos los puertos.
    4. Fuente, utilice el mismo grupo de seguridad que el ID de grupo.
      grupo de seguridad entrante redshift
  6. Del mismo modo, agregue las siguientes reglas de salida:
    1. Una regla autorreferencial con Tipo de Propiedad as Todo TCP, Protocolo as TCP, Rango de puertos incluyendo todos los puertos, y Destino como el mismo grupo de seguridad que el ID de grupo.
    2. Una regla HTTPS para el acceso a Amazon S3. los s3-prefix-list-id Se requiere un valor en la regla del grupo de seguridad para permitir el tráfico desde la VPC al punto de enlace de la VPC de Amazon S3.
      grupo de seguridad saliente redshift

Si no tiene un punto de enlace de la VPC de Amazon S3, puede crear uno en el Nube privada virtual de Amazon (Amazon VPC) consola.

punto final de s3 vpc

Puede comprobar el valor de s3-prefix-list-id en Listas de prefijos administrados en la consola de Amazon VPC.

lista de prefijos s3

A continuación, vaya a la Conectores página en AWS Glue Studio y crear una nueva conexión JDBC , que son redshiftServerless a su clúster de Redshift Serverless (a menos que ya exista uno). Puede encontrar los detalles del punto final de Redshift Serverless en la cuenta de su grupo de trabajo. Información General sección. La configuración de conexión se parece a la siguiente captura de pantalla.

redshift página de conexión sin servidor

Escriba código interactivo en un cuaderno AWS Glue Studio Jupyter con tecnología de sesiones interactivas

Ahora puede comenzar a escribir código interactivo con el cuaderno AWS Glue Studio Jupyter con tecnología de sesiones interactivas. Tenga en cuenta que es una buena práctica seguir guardando el bloc de notas a intervalos regulares mientras trabaja en él.

  1. En la consola de AWS Glue Studio, cree un nuevo trabajo.
  2. Seleccione Cuaderno Jupyter y seleccionar Crear un nuevo cuaderno desde cero.
  3. Elige Crear.
    pegamento sesión interactiva crear cuaderno
  4. Nombre del trabajo, ingrese un nombre (por ejemplo, myFirstGlueISProject).
  5. Rol de gestión de identidades y accesos, elige el rol que creaste (AWSGlueServiceRole-GlueIS).
  6. Elige Iniciar trabajo de cuaderno.
    Pegue la configuración del cuaderno de sesión interactivaDespués de inicializar el cuaderno, puede ver algunas de las magias disponibles y una celda con código repetitivo. Para ver toda la magia de las sesiones interactivas, ejecute %help en una celda para imprimir una lista completa. Con la excepción de %%sql, ejecutar una celda de solo magia no inicia una sesión, pero establece la configuración para la sesión que comienza cuando ejecuta su primera celda de código.pegamento sesión interactiva inicialización del cuaderno jupyterPara esta publicación, configuramos AWS Glue con la versión 3.0, tres trabajadores G.1X, tiempo de espera inactivo y una conexión de Amazon Redshift con la ayuda de la magia disponible.
  7. Ingresemos la siguiente magia en nuestra primera celda y ejecútela:
    %glue_version 3.0
    %number_of_workers 3
    %worker_type G.1X
    %idle_timeout 60
    %connections redshiftServerless

    Obtenemos la siguiente respuesta:

    Welcome to the Glue Interactive Sessions Kernel
    For more information on available magic commands, please type %help in any new cell. Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
    Installed kernel version: 0.35 Setting Glue version to: 3.0
    Previous number of workers: 5
    Setting new number of workers to: 3
    Previous worker type: G.1X
    Setting new worker type to: G.1X
    Current idle_timeout is 2880 minutes.
    idle_timeout has been set to 60 minutes.
    Connections to be included:
    redshiftServerless

  8. Ejecutemos nuestra primera celda de código (código repetitivo) para iniciar una sesión de cuaderno interactivo en unos segundos:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)

    Obtenemos la siguiente respuesta:

    Authenticating with environment variables and user-defined glue_role_arn:arn:aws:iam::xxxxxxxxxxxx:role/AWSGlueServiceRole-GlueIS
    Attempting to use existing AssumeRole session credentials.
    Trying to create a Glue session for the kernel.
    Worker Type: G.1X
    Number of Workers: 3
    Session ID: 7c9eadb1-9f9b-424f-9fba-d0abc57e610d
    Applying the following default arguments:
    --glue_kernel_version 0.35
    --enable-glue-datacatalog true
    --job-bookmark-option job-bookmark-enable
    Waiting for session 7c9eadb1-9f9b-424f-9fba-d0abc57e610d to get into ready status...
    Session 7c9eadb1-9f9b-424f-9fba-d0abc57e610d has been created

  9. A continuación, lea los datos del taxi amarillo de la ciudad de Nueva York del depósito S3 en un marco dinámico de AWS Glue:
    nyc_taxi_trip_input_dyf = glueContext.create_dynamic_frame.from_options( connection_type = "s3", connection_options = { "paths": ["s3://<your-s3-bucket-name>/nyc_yellow_taxi/"] }, format = "parquet", transformation_ctx = "nyc_taxi_trip_input_dyf"
    )

    Contemos el número de filas, observemos el esquema y algunas filas del conjunto de datos.

  10. Cuente las filas con el siguiente código:
    nyc_taxi_trip_input_df = nyc_taxi_trip_input_dyf.toDF()
    nyc_taxi_trip_input_df.count()

    Obtenemos la siguiente respuesta:

  11. Ver el esquema con el siguiente código:
    nyc_taxi_trip_input_df.printSchema()

    Obtenemos la siguiente respuesta:

    root |-- VendorID: long (nullable = true) |-- tpep_pickup_datetime: timestamp (nullable = true) |-- tpep_dropoff_datetime: timestamp (nullable = true) |-- passenger_count: double (nullable = true) |-- trip_distance: double (nullable = true) |-- RatecodeID: double (nullable = true) |-- store_and_fwd_flag: string (nullable = true) |-- PULocationID: long (nullable = true) |-- DOLocationID: long (nullable = true) |-- payment_type: long (nullable = true) |-- fare_amount: double (nullable = true) |-- extra: double (nullable = true) |-- mta_tax: double (nullable = true) |-- tip_amount: double (nullable = true) |-- tolls_amount: double (nullable = true) |-- improvement_surcharge: double (nullable = true) |-- total_amount: double (nullable = true) |-- congestion_surcharge: double (nullable = true) |-- airport_fee: double (nullable = true)

  12. Vea algunas filas del conjunto de datos con el siguiente código:
    nyc_taxi_trip_input_df.show(5)

    Obtenemos la siguiente respuesta:

    +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
    |VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
    +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
    | 2| 2022-01-18 15:04:43| 2022-01-18 15:12:51| 1.0| 1.13| 1.0| N| 141| 229| 2| 7.0| 0.0| 0.5| 0.0| 0.0| 0.3| 10.3| 2.5| 0.0|
    | 2| 2022-01-18 15:03:28| 2022-01-18 15:15:52| 2.0| 1.36| 1.0| N| 237| 142| 1| 9.5| 0.0| 0.5| 2.56| 0.0| 0.3| 15.36| 2.5| 0.0|
    | 1| 2022-01-06 17:49:22| 2022-01-06 17:57:03| 1.0| 1.1| 1.0| N| 161| 229| 2| 7.0| 3.5| 0.5| 0.0| 0.0| 0.3| 11.3| 2.5| 0.0|
    | 2| 2022-01-09 20:00:55| 2022-01-09 20:04:14| 1.0| 0.56| 1.0| N| 230| 230| 1| 4.5| 0.5| 0.5| 1.66| 0.0| 0.3| 9.96| 2.5| 0.0|
    | 2| 2022-01-24 16:16:53| 2022-01-24 16:31:36| 1.0| 2.02| 1.0| N| 163| 234| 1| 10.5| 1.0| 0.5| 3.7| 0.0| 0.3| 18.5| 2.5| 0.0|
    +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
    only showing top 5 rows

  13. Ahora, lea los datos de búsqueda de la zona de rodaje del depósito S3 en un marco dinámico de AWS Glue:
    nyc_taxi_zone_lookup_dyf = glueContext.create_dynamic_frame.from_options( connection_type = "s3", connection_options = { "paths": ["s3://<your-s3-bucket-name>/taxi_zone_lookup/"] }, format = "csv", format_options= { 'withHeader': True }, transformation_ctx = "nyc_taxi_zone_lookup_dyf"
    )

    Contemos el número de filas, observemos el esquema y algunas filas del conjunto de datos.

  14. Cuente las filas con el siguiente código:
    nyc_taxi_zone_lookup_df = nyc_taxi_zone_lookup_dyf.toDF()
    nyc_taxi_zone_lookup_df.count()

    Obtenemos la siguiente respuesta:

  15. Ver el esquema con el siguiente código:
    nyc_taxi_zone_lookup_apply_mapping_dyf.toDF().printSchema()

    Obtenemos la siguiente respuesta:

    root |-- LocationID: string (nullable = true) |-- Borough: string (nullable = true) |-- Zone: string (nullable = true) |-- service_zone: string (nullable = true)

  16. Vea algunas filas con el siguiente código:
    nyc_taxi_zone_lookup_df.show(5)

    Obtenemos la siguiente respuesta:

    +----------+-------------+--------------------+------------+
    |LocationID| Borough| Zone|service_zone|
    +----------+-------------+--------------------+------------+
    | 1| EWR| Newark Airport| EWR|
    | 2| Queens| Jamaica Bay| Boro Zone|
    | 3| Bronx|Allerton/Pelham G...| Boro Zone|
    | 4| Manhattan| Alphabet City| Yellow Zone|
    | 5|Staten Island| Arden Heights| Boro Zone|
    +----------+-------------+--------------------+------------+
    only showing top 5 rows

  17. Con base en el diccionario de datos, recalibremos los tipos de datos de los atributos en marcos dinámicos correspondientes a ambos marcos dinámicos:
    nyc_taxi_trip_apply_mapping_dyf = ApplyMapping.apply( frame = nyc_taxi_trip_input_dyf, mappings = [ ("VendorID","Long","VendorID","Integer"), ("tpep_pickup_datetime","Timestamp","tpep_pickup_datetime","Timestamp"), ("tpep_dropoff_datetime","Timestamp","tpep_dropoff_datetime","Timestamp"), ("passenger_count","Double","passenger_count","Integer"), ("trip_distance","Double","trip_distance","Double"), ("RatecodeID","Double","RatecodeID","Integer"), ("store_and_fwd_flag","String","store_and_fwd_flag","String"), ("PULocationID","Long","PULocationID","Integer"), ("DOLocationID","Long","DOLocationID","Integer"), ("payment_type","Long","payment_type","Integer"), ("fare_amount","Double","fare_amount","Double"), ("extra","Double","extra","Double"), ("mta_tax","Double","mta_tax","Double"), ("tip_amount","Double","tip_amount","Double"), ("tolls_amount","Double","tolls_amount","Double"), ("improvement_surcharge","Double","improvement_surcharge","Double"), ("total_amount","Double","total_amount","Double"), ("congestion_surcharge","Double","congestion_surcharge","Double"), ("airport_fee","Double","airport_fee","Double") ], transformation_ctx = "nyc_taxi_trip_apply_mapping_dyf"
    )

    nyc_taxi_zone_lookup_apply_mapping_dyf = ApplyMapping.apply( frame = nyc_taxi_zone_lookup_dyf, mappings = [ ("LocationID","String","LocationID","Integer"), ("Borough","String","Borough","String"), ("Zone","String","Zone","String"), ("service_zone","String", "service_zone","String") ], transformation_ctx = "nyc_taxi_zone_lookup_apply_mapping_dyf"
    )

  18. Ahora vamos a comprobar su esquema:
    nyc_taxi_trip_apply_mapping_dyf.toDF().printSchema()

    Obtenemos la siguiente respuesta:

    root |-- VendorID: integer (nullable = true) |-- tpep_pickup_datetime: timestamp (nullable = true) |-- tpep_dropoff_datetime: timestamp (nullable = true) |-- passenger_count: integer (nullable = true) |-- trip_distance: double (nullable = true) |-- RatecodeID: integer (nullable = true) |-- store_and_fwd_flag: string (nullable = true) |-- PULocationID: integer (nullable = true) |-- DOLocationID: integer (nullable = true) |-- payment_type: integer (nullable = true) |-- fare_amount: double (nullable = true) |-- extra: double (nullable = true) |-- mta_tax: double (nullable = true) |-- tip_amount: double (nullable = true) |-- tolls_amount: double (nullable = true) |-- improvement_surcharge: double (nullable = true) |-- total_amount: double (nullable = true) |-- congestion_surcharge: double (nullable = true) |-- airport_fee: double (nullable = true)

    nyc_taxi_zone_lookup_apply_mapping_dyf.toDF().printSchema()

    Obtenemos la siguiente respuesta:

    root |-- LocationID: integer (nullable = true) |-- Borough: string (nullable = true) |-- Zone: string (nullable = true) |-- service_zone: string (nullable = true)

  19. Agreguemos la columna trip_duration para calcular la duración de cada viaje en minutos al cuadro dinámico de viajes en taxi:
    # Function to calculate trip duration in minutes
    def trip_duration(start_timestamp,end_timestamp): minutes_diff = (end_timestamp - start_timestamp).total_seconds() / 60.0 return(minutes_diff)

    # Transformation function for each record
    def transformRecord(rec): rec["trip_duration"] = trip_duration(rec["tpep_pickup_datetime"], rec["tpep_dropoff_datetime"]) return rec
    nyc_taxi_trip_final_dyf = Map.apply( frame = nyc_taxi_trip_apply_mapping_dyf, f = transformRecord, transformation_ctx = "nyc_taxi_trip_final_dyf"
    )

    Contemos el número de filas, observemos el esquema y algunas filas del conjunto de datos después de aplicar la transformación anterior.

  20. Obtenga un recuento de registros con el siguiente código:
    nyc_taxi_trip_final_df = nyc_taxi_trip_final_dyf.toDF()
    nyc_taxi_trip_final_df.count()

    Obtenemos la siguiente respuesta:

  21. Ver el esquema con el siguiente código:
    nyc_taxi_trip_final_df.printSchema()

    Obtenemos la siguiente respuesta:

    root |-- extra: double (nullable = true) |-- tpep_dropoff_datetime: timestamp (nullable = true) |-- trip_duration: double (nullable = true) |-- trip_distance: double (nullable = true) |-- mta_tax: double (nullable = true) |-- improvement_surcharge: double (nullable = true) |-- DOLocationID: integer (nullable = true) |-- congestion_surcharge: double (nullable = true) |-- total_amount: double (nullable = true) |-- airport_fee: double (nullable = true) |-- payment_type: integer (nullable = true) |-- fare_amount: double (nullable = true) |-- RatecodeID: integer (nullable = true) |-- tpep_pickup_datetime: timestamp (nullable = true) |-- VendorID: integer (nullable = true) |-- PULocationID: integer (nullable = true) |-- tip_amount: double (nullable = true) |-- tolls_amount: double (nullable = true) |-- store_and_fwd_flag: string (nullable = true) |-- passenger_count: integer (nullable = true)

  22. Vea algunas filas con el siguiente código:
    nyc_taxi_trip_final_df.show(5)

    Obtenemos la siguiente respuesta:

    +-----+---------------------+------------------+-------------+-------+---------------------+------------+--------------------+------------+-----------+------------+-----------+----------+--------------------+--------+------------+----------+------------+------------------+---------------+
    |extra|tpep_dropoff_datetime| trip_duration|trip_distance|mta_tax|improvement_surcharge|DOLocationID|congestion_surcharge|total_amount|airport_fee|payment_type|fare_amount|RatecodeID|tpep_pickup_datetime|VendorID|PULocationID|tip_amount|tolls_amount|store_and_fwd_flag|passenger_count|
    +-----+---------------------+------------------+-------------+-------+---------------------+------------+--------------------+------------+-----------+------------+-----------+----------+--------------------+--------+------------+----------+------------+------------------+---------------+
    | 0.0| 2022-01-18 15:12:51| 8.133333333333333| 1.13| 0.5| 0.3| 229| 2.5| 10.3| 0.0| 2| 7.0| 1| 2022-01-18 15:04:43| 2| 141| 0.0| 0.0| N| 1|
    | 0.0| 2022-01-18 15:15:52| 12.4| 1.36| 0.5| 0.3| 142| 2.5| 15.36| 0.0| 1| 9.5| 1| 2022-01-18 15:03:28| 2| 237| 2.56| 0.0| N| 2|
    | 3.5| 2022-01-06 17:57:03| 7.683333333333334| 1.1| 0.5| 0.3| 229| 2.5| 11.3| 0.0| 2| 7.0| 1| 2022-01-06 17:49:22| 1| 161| 0.0| 0.0| N| 1|
    | 0.5| 2022-01-09 20:04:14| 3.316666666666667| 0.56| 0.5| 0.3| 230| 2.5| 9.96| 0.0| 1| 4.5| 1| 2022-01-09 20:00:55| 2| 230| 1.66| 0.0| N| 1|
    | 1.0| 2022-01-24 16:31:36|14.716666666666667| 2.02| 0.5| 0.3| 234| 2.5| 18.5| 0.0| 1| 10.5| 1| 2022-01-24 16:16:53| 2| 163| 3.7| 0.0| N| 1|
    +-----+---------------------+------------------+-------------+-------+---------------------+------------+--------------------+------------+-----------+------------+-----------+----------+--------------------+--------+------------+----------+------------+------------------+---------------+
    only showing top 5 rows

  23. A continuación, cargue ambos marcos dinámicos en nuestro clúster sin servidor de Amazon Redshift:
    nyc_taxi_trip_sink_dyf = glueContext.write_dynamic_frame.from_jdbc_conf( frame = nyc_taxi_trip_final_dyf, catalog_connection = "redshiftServerless", connection_options = {"dbtable": "public.f_nyc_yellow_taxi_trip","database": "dev"}, redshift_tmp_dir = "s3://aws-glue-assets-<AWS-account-ID>-us-east-1/temporary/", transformation_ctx = "nyc_taxi_trip_sink_dyf"
    )

    nyc_taxi_zone_lookup_sink_dyf = glueContext.write_dynamic_frame.from_jdbc_conf( frame = nyc_taxi_zone_lookup_apply_mapping_dyf, catalog_connection = "redshiftServerless", connection_options = {"dbtable": "public.d_nyc_taxi_zone_lookup", "database": "dev"}, redshift_tmp_dir = "s3://aws-glue-assets-<AWS-account-ID>-us-east-1/temporary/", transformation_ctx = "nyc_taxi_zone_lookup_sink_dyf"
    )

    Ahora validemos los datos cargados en el clúster sin servidor de Amazon Redshift ejecutando algunas consultas en Editor de consultas de Amazon Redshift v2. También puede utilizar su editor de consultas preferido.

  24. Primero, contamos el número de registros y seleccionamos algunas filas en ambas tablas de destino (f_nyc_yellow_taxi_trip y d_nyc_taxi_zone_lookup):
    SELECT 'f_nyc_yellow_taxi_trip' AS table_name, COUNT(1) FROM "public"."f_nyc_yellow_taxi_trip"
    UNION ALL
    SELECT 'd_nyc_taxi_zone_lookup' AS table_name, COUNT(1) FROM "public"."d_nyc_taxi_zone_lookup";

    salida de consulta de recuento de registros de tabla de corrimiento al rojo

    El número de registros en f_nyc_yellow_taxi_trip (2,463,931) y d_nyc_taxi_zone_lookup (265) coinciden con el número de registros en nuestro marco dinámico de entrada. Esto valida que todos los registros de archivos en Amazon S3 se hayan cargado correctamente en Amazon Redshift.

    Puede ver algunos de los registros de cada tabla con los siguientes comandos:

    SELECT * FROM public.f_nyc_yellow_taxi_trip LIMIT 10;

    consulta de selección de datos de hecho de corrimiento al rojo

    SELECT * FROM public.d_nyc_taxi_zone_lookup LIMIT 10;

    consulta de selección de datos de búsqueda de corrimiento al rojo

  25. Una de las ideas que queremos generar a partir de los conjuntos de datos es obtener las cinco rutas principales con la duración de su viaje. Ejecutemos el SQL para eso en Amazon Redshift:
    SELECT CASE WHEN putzl.zone >= dotzl.zone THEN putzl.zone || ' - ' || dotzl.zone ELSE dotzl.zone || ' - ' || putzl.zone END AS "Route", COUNT(1) AS "Frequency", ROUND(SUM(trip_duration),1) AS "Total Trip Duration (mins)"
    FROM public.f_nyc_yellow_taxi_trip ytt
    INNER JOIN public.d_nyc_taxi_zone_lookup putzl ON ytt.pulocationid = putzl.locationid
    INNER JOIN public.d_nyc_taxi_zone_lookup dotzl ON ytt.dolocationid = dotzl.locationid
    GROUP BY "Route"
    ORDER BY "Frequency" DESC, "Total Trip Duration (mins)" DESC
    LIMIT 5;

    consulta de ruta de corrimiento al rojo 5 principales

Transforme el cuaderno en un trabajo de AWS Glue y prográmelo

Ahora que hemos creado el código y probado su funcionalidad, guardémoslo como un trabajo y programémoslo.

Primero habilitemos marcadores de trabajo. Los marcadores de trabajo ayudan a AWS Glue a mantener la información de estado y evitar el reprocesamiento de datos antiguos. Con los marcadores de trabajo, puede procesar nuevos datos cuando se vuelve a ejecutar en un intervalo programado.

  1. Agregue el siguiente comando mágico después de la primera celda que contiene otros comandos mágicos inicializados durante la creación del código:
    %%configure
    { "--job-bookmark-option": "job-bookmark-enable"
    }

    Para inicializar los marcadores de trabajo, ejecutamos el siguiente código con el nombre del trabajo como argumento predeterminado (myFirstGlueISProject para esta publicación). Los marcadores de trabajo almacenan los estados de un trabajo. siempre debes tener job.init() al comienzo del guión y el job.commit() al final del guión. Estas dos funciones se utilizan para inicializar el servicio de marcadores y actualizar el cambio de estado del servicio. Los marcadores no funcionarán sin llamarlos.

  2. Agregue el siguiente fragmento de código después del código repetitivo:
    params = []
    if '--JOB_NAME' in sys.argv: params.append('JOB_NAME')
    args = getResolvedOptions(sys.argv, params)
    if 'JOB_NAME' in args: jobname = args['JOB_NAME']
    else: jobname = "myFirstGlueISProject"
    job.init(jobname, args)

  3. Luego, comente todas las líneas de código que se crearon para verificar el resultado deseado y que no son necesarias para que el trabajo entregue su propósito:
    #nyc_taxi_trip_input_df = nyc_taxi_trip_input_dyf.toDF()
    #nyc_taxi_trip_input_df.count()
    #nyc_taxi_trip_input_df.printSchema()
    #nyc_taxi_trip_input_df.show(5) #nyc_taxi_zone_lookup_df = nyc_taxi_zone_lookup_dyf.toDF()
    #nyc_taxi_zone_lookup_df.count()
    #nyc_taxi_zone_lookup_df.printSchema()
    #nyc_taxi_zone_lookup_df.show(5) #nyc_taxi_trip_apply_mapping_dyf.toDF().printSchema()
    #nyc_taxi_zone_lookup_apply_mapping_dyf.toDF().printSchema() #nyc_taxi_trip_final_df = nyc_taxi_trip_final_dyf.toDF()
    #nyc_taxi_trip_final_df.count()
    #nyc_taxi_trip_final_df.printSchema()
    #nyc_taxi_trip_final_df.show(5)

  4. Guarde el cuaderno.
    pegamento sesión interactiva guardar trabajo
    Puede consultar el script correspondiente en el Guión .Pegue la pestaña de secuencia de comandos de sesión interactivaTenga en cuenta que job.commit() se agrega automáticamente al final del script. Vamos a ejecutar el cuaderno como un trabajo.
  5. Primero, truncar f_nyc_yellow_taxi_trip y d_nyc_taxi_zone_lookup tablas en Amazon Redshift usando el editor de consultas v2 para que no tengamos duplicados en ambas tablas:
    truncate "public"."f_nyc_yellow_taxi_trip";
    truncate "public"."d_nyc_taxi_zone_lookup";

  6. Elige Ejecutar para ejecutar el trabajo.
    trabajo de ejecución de sesión interactiva de pegamentoPuede comprobar su estado en la Ron .estado de ejecución del trabajo de la sesión interactiva de pegamentoEl trabajo se completó en menos de 5 minutos con G1.x 3 DPU.
  7. Vamos a comprobar el recuento de registros en f_nyc_yellow_taxi_trip y d_nyc_taxi_zone_lookup tablas en Amazon Redshift:
    SELECT 'f_nyc_yellow_taxi_trip' AS table_name, COUNT(1) FROM "public"."f_nyc_yellow_taxi_trip"
    UNION ALL
    SELECT 'd_nyc_taxi_zone_lookup' AS table_name, COUNT(1) FROM "public"."d_nyc_taxi_zone_lookup";

    Salida de consulta de conteo de corrimiento al rojo

    Con los marcadores de trabajos habilitados, incluso si vuelve a ejecutar el trabajo sin archivos nuevos en las carpetas correspondientes en el depósito de S3, no vuelve a procesar los mismos archivos. La siguiente captura de pantalla muestra una ejecución de trabajo posterior en mi entorno, que se completó en menos de 2 minutos porque no había archivos nuevos para procesar.

    Pegue la repetición del trabajo de la sesión interactiva

    Ahora vamos a programar el trabajo.

  8. En Horarios pestaña, elegir Crear horario.
    pegamento sesión interactiva crear horario
  9. Nombre¸ ingrese un nombre (por ejemplo, myFirstGlueISProject-testSchedule).
  10. Frecuencia, escoger Personalizado.
  11. Ingrese una expresión cron para que el trabajo se ejecute todos los lunes a las 6:00 a. m.
  12. Agregue una descripción opcional.
  13. Elige Crear horario.
    pegamento sesión interactiva añadir horario

El horario ha sido guardado y activado. Puede editar, pausar, reanudar o eliminar la programación de la Acciones .

pegar sesión interactiva programar acción

Limpiar

Para evitar incurrir en cargos futuros, elimine los recursos de AWS que creó.

  • Elimine el trabajo de AWS Glue (myFirstGlueISProject para esta publicación).
  • Elimine los objetos y el depósito de Amazon S3 (my-first-aws-glue-is-project-<random number> para esta publicación).
  • Elimine las políticas y roles de AWS IAM (AWSGlueInteractiveSessionPassRolePolicy, AmazonS3Access-MyFirstGlueISProject y AWSGlueServiceRole-GlueIS).
  • Elimine las tablas de Amazon Redshift (f_nyc_yellow_taxi_trip y d_nyc_taxi_zone_lookup).
  • Elimine la conexión JDBC de AWS Glue (redshiftServerless).
  • Elimine también el grupo de seguridad sin servidor de Redshift que hace referencia a sí mismo y el punto final de Amazon S3 (si lo creó mientras seguía los pasos de esta publicación).

Conclusión

En esta publicación, demostramos cómo hacer lo siguiente:

  • Configure un cuaderno AWS Glue Jupyter con sesiones interactivas
  • Utilice la magia de la libreta, incluida la incorporación y los marcadores de conexión de AWS Glue
  • Lea los datos de Amazon S3 y transfórmelos y cárguelos en Amazon Redshift Serverless
  • Configure la magia para habilitar los marcadores de trabajo, guarde el cuaderno como un trabajo de AWS Glue y prográmelo usando una expresión cron

El objetivo de esta publicación es brindarle los fundamentos paso a paso para comenzar con las sesiones interactivas y los cuadernos Jupyter de AWS Glue Studio. Puede configurar un cuaderno Jupyter de AWS Glue en minutos, iniciar una sesión interactiva en segundos y mejorar en gran medida la experiencia de desarrollo con los trabajos de AWS Glue. Las sesiones interactivas tienen una facturación mínima de 1 minuto con funciones de control de costos que reducen el costo de desarrollar aplicaciones de preparación de datos. Puede crear y probar aplicaciones desde el entorno de su elección, incluso en su entorno local, utilizando el backend de sesiones interactivas.

Las sesiones interactivas proporcionan una forma más rápida, económica y flexible de crear y ejecutar aplicaciones de análisis y preparación de datos. Para obtener más información sobre las sesiones interactivas, consulte Desarrollo laboral (sesiones interactivas)y comience a explorar una experiencia de desarrollo completamente nueva con AWS Glue. Además, consulte las siguientes publicaciones para ver más ejemplos del uso de sesiones interactivas con diferentes opciones:


Acerca de los autores

foto del blog de VikasVikas Ömer es un arquitecto de soluciones especialista en análisis principal en Amazon Web Services. Vikas tiene una sólida experiencia en análisis, gestión de la experiencia del cliente (CEM) y monetización de datos, con más de 13 años de experiencia en la industria a nivel mundial. Con seis certificaciones de AWS, incluida la especialidad de análisis, es un defensor de análisis de confianza para los clientes y socios de AWS. Le encanta viajar, conocer clientes y ayudarlos a tener éxito en lo que hacen.

foto de perfil de NoriNoritaka Sekiyama es Arquitecto Principal de Big Data en el equipo de AWS Glue. Le gusta colaborar con diferentes equipos para obtener resultados como este post. En su tiempo libre, disfruta jugar videojuegos con su familia.

Foto del blog de Galchica heyne es gerente de productos de AWS Glue y tiene más de 15 años de experiencia como gerente de productos, ingeniero de datos y arquitecto de datos. Le apasiona desarrollar una comprensión profunda de las necesidades comerciales de los clientes y colaborar con ingenieros para diseñar productos de datos elegantes, potentes y fáciles de usar. Gal tiene una maestría en ciencia de datos de UC Berkeley y le gusta viajar, jugar juegos de mesa e ir a conciertos de música.

punto_img

Información más reciente

punto_img