Logotipo de Zephyrnet

Únase a una fuente de datos de transmisión con datos de CDC para análisis de datos sin servidor en tiempo real mediante AWS Glue, AWS DMS y Amazon DynamoDB | Servicios web de Amazon

Fecha:

Los clientes han estado utilizando soluciones de almacenamiento de datos para realizar sus tareas de análisis tradicionales. Recientemente, los lagos de datos han ganado mucha tracción para convertirse en la base de las soluciones analíticas, porque vienen con beneficios tales como escalabilidad, tolerancia a fallas y soporte para conjuntos de datos estructurados, semiestructurados y no estructurados.

Los lagos de datos no son transaccionales por defecto; sin embargo, existen múltiples marcos de código abierto que mejoran los lagos de datos con propiedades ACID, lo que brinda la mejor solución de ambos mundos entre los mecanismos de almacenamiento transaccional y no transaccional.

Las canalizaciones tradicionales de ingesta y procesamiento por lotes que implican operaciones como la limpieza de datos y la unión con datos de referencia son fáciles de crear y rentables de mantener. Sin embargo, existe el desafío de ingerir conjuntos de datos, como Internet de las cosas (IoT) y flujos de clics, a un ritmo rápido con SLA de entrega casi en tiempo real. También querrá aplicar actualizaciones incrementales con captura de datos modificados (CDC) desde el sistema de origen hasta el destino. Para tomar decisiones basadas en datos de manera oportuna, debe tener en cuenta los registros perdidos y la contrapresión, y mantener el orden y la integridad de los eventos, especialmente si los datos de referencia también cambian rápidamente.

En esta publicación, nuestro objetivo es abordar estos desafíos. Brindamos una guía paso a paso para unir datos de transmisión a una tabla de referencia que cambia en tiempo real usando Pegamento AWS, Amazon DynamoDBy Servicio de migración de bases de datos de AWS (AWS DMS). También demostramos cómo ingerir datos de transmisión en un lago de datos transaccionales usando apache hudi para lograr actualizaciones incrementales con transacciones ACID.

Resumen de la solución

Para nuestro caso de uso de ejemplo, los datos de transmisión están llegando Secuencias de datos de Amazon Kinesisy los datos de referencia se gestionan en MySQL. Los datos de referencia se replican continuamente desde MySQL a DynamoDB a través de AWS DMS. El requisito aquí es enriquecer los datos de flujo en tiempo real al unirlos con los datos de referencia casi en tiempo real, y hacerlos consultables desde un motor de consulta como Atenea amazónica manteniendo la consistencia. En este caso de uso, los datos de referencia en MySQL se pueden actualizar cuando se cambia el requisito, y luego las consultas deben devolver resultados al reflejar las actualizaciones en los datos de referencia.

Esta solución aborda el problema de los usuarios que desean unirse a secuencias con conjuntos de datos de referencia cambiantes cuando el tamaño del conjunto de datos de referencia es pequeño. Los datos de referencia se mantienen en las tablas de DynamoDB y el trabajo de transmisión carga la tabla completa en la memoria para cada microlote, uniendo una transmisión de alto rendimiento a un pequeño conjunto de datos de referencia.

El siguiente diagrama ilustra la arquitectura de la solución.

Arquitectura

Requisitos previos

Para este tutorial, debe tener los siguientes requisitos previos:

Crear roles de IAM y depósito S3

En esta sección, usted crea un Servicio de almacenamiento simple de Amazon (Amazon S3) cubeta y dos Gestión de identidades y accesos de AWS (IAM): uno para el trabajo de AWS Glue y otro para AWS DMS. Hacemos esto usando un Formación en la nube de AWS modelo. Complete los siguientes pasos:

  1. Inicie sesión en la consola de AWS CloudFormation.
  2. Elige Pila de lanzamiento::
  3. Elige Siguiente.
  4. Nombre de pila, ingrese un nombre para su pila.
  5. Nombre de tabla de DynamoDB, introduzca tgt_country_lookup_table. Este es el nombre de su nueva tabla de DynamoDB.
  6. S3BucketNamePrefijo, ingrese el prefijo de su nuevo depósito S3.
  7. Seleccione Reconozco que AWS CloudFormation podría crear recursos de IAM con nombres personalizados.
  8. Elige Crear pila.

La creación de la pila puede tardar aproximadamente 1 minuto.

Crear un flujo de datos de Kinesis

En esta sección, creará un flujo de datos de Kinesis:

  1. En la consola de Kinesis, elija Flujos de datos en el panel de navegación.
  2. Elige Crear flujo de datos.
  3. Nombre del flujo de datos, ingresa el nombre de tu transmisión.
  4. Deje las configuraciones restantes como predeterminadas y elija Crear flujo de datos.

Se crea un flujo de datos de Kinesis con el modo bajo demanda.

Crear y configurar un clúster de Aurora MySQL

En esta sección, creará y configurará un clúster de Aurora MySQL como base de datos de origen. Primero, configure su clúster de base de datos Aurora MySQL de origen para habilitar CDC a través de AWS DMS a DynamoDB.

Crear un grupo de parámetros

Complete los siguientes pasos para crear un nuevo grupo de parámetros:

  1. En la consola de Amazon RDS, elija Grupos de parámetros en el panel de navegación.
  2. Elige Crear grupo de parámetros.
  3. Familia de grupos de parámetros, seleccione aurora-mysql5.7.
  4. Tipo de Propiedad, escoger Grupo de parámetros de clúster de base de datos.
  5. Nombre del grupo, introduzca my-mysql-dynamodb-cdc.
  6. Descripción, introduzca Parameter group for demo Aurora MySQL database.
  7. Elige Crear.
  8. Seleccione my-mysql-dynamodb-cdc, y elige Editar bajo Acciones de grupo de parámetros.
  9. Edite el grupo de parámetros de la siguiente manera:
Nombre Valor
binlog_row_imagen ser completados
binlog_formato FILA
binlog_checksum NINGUNO
log_slave_updates 1
  1. Elige Guardar los cambios.

Grupo de parámetros RDS

Crear el clúster de Aurora MySQL

Complete los siguientes pasos para crear el clúster de Aurora MySQL:

  1. En la consola de Amazon RDS, elija Bases de datos en el panel de navegación.
  2. Elige Crear base de datos.
  3. Elija un método de creación de base de datos, escoger Creación estándar.
  4. under Opciones de motor, Para Tipo de motor, escoger Aurora (Compatible con MySQL).
  5. Versión del motor, escoger Aurora (MySQL 5.7) 2.11.2.
  6. Plantillas, escoger Producción.
  7. under Ajustes, Para Identificador de clúster de base de datos, ingrese un nombre para su base de datos.
  8. Nombre de usuario maestro, ingrese su nombre de usuario principal.
  9. Contraseña maestra y Confirmar contraseña maestra, ingrese su contraseña principal.
  10. under Configuración de instancia, Para clase de instancia de base de datos, escoger Clases Burstable (incluye clases t) y elige db.t3.pequeño.
  11. under Disponibilidad y durabilidad, Para Implementación Multi-AZ, escoger No cree una réplica de Aurora.
  12. under Conectividad, Para recurso de cómputo, escoger No conectarse a un recurso informático de EC2.
  13. Tipo de red, escoger IPv4.
  14. Nube privada virtual (VPC), elija su VPC.
  15. Grupo de subred de base de datos, elija su subred pública.
  16. Acceso público, escoger .
  17. Grupo de seguridad de VPC (cortafuegos), elija el grupo de seguridad para su subred pública.
  18. under Autenticación de base de datos, Para Opciones de autenticación de base de datos, escoger Autenticación de contraseña.
  19. under Configuración adicional, Para Grupo de parámetros de clúster de base de datos, elija el grupo de parámetros de clúster que creó anteriormente.
  20. Elige Crear base de datos.

Otorgar permisos a la base de datos de origen

El siguiente paso es otorgar el permiso requerido en la base de datos Aurora MySQL de origen. Ahora puede conectarse al clúster de base de datos utilizando el Utilidad MySQL. Puede ejecutar consultas para completar las siguientes tareas:

  • Cree una base de datos y una tabla de demostración y ejecute consultas sobre los datos
  • Otorgar permiso a un usuario utilizado por el punto de enlace de AWS DMS

Complete los siguientes pasos:

  1. Inicie sesión en la instancia EC2 que está utilizando para conectarse a su clúster de base de datos.
  2. Ingrese el siguiente comando en el símbolo del sistema para conectarse a la instancia de base de datos principal de su clúster de base de datos:
$ mysql -h mycluster.cluster-123456789012.us-east-1.rds.amazonaws.com -P 3306 -u admin -p

  1. Ejecute el siguiente comando SQL para crear una base de datos:
> CREATE DATABASE mydev;

  1. Ejecute el siguiente comando SQL para crear una tabla:
> use mydev; > CREATE TABLE country_lookup_table
(
code varchar(5),
countryname varchar(40) not null,
combinedname varchar(40) not null
);

  1. Ejecute el siguiente comando SQL para completar la tabla con datos:
> INSERT INTO country_lookup_table(code, countryname, combinedname) VALUES ('IN', 'India', 'IN-India'), ('US', 'USA', 'US-USA'), ('CA', 'Canada', 'CA-Canada'), ('CN', 'China', 'CN-China');

  1. Ejecute el siguiente comando SQL para crear un usuario para el punto de enlace de AWS DMS y otorgar permisos para tareas de CDC (reemplace el marcador de posición con su contraseña preferida):
> CREATE USER repl IDENTIFIED BY '<your-password>';
> GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'repl'@'%';
> GRANT SELECT ON mydev.country_lookup_table TO 'repl'@'%';

Cree y configure recursos de AWS DMS para cargar datos en la tabla de referencia de DynamoDB

En esta sección, creará y configurará AWS DMS para replicar datos en la tabla de referencia de DynamoDB.

Crear una instancia de replicación de AWS DMS

Primero, cree una instancia de replicación de AWS DMS completando los siguientes pasos:

  1. En la consola de AWS DMS, elija Instancias de replicación en el panel de navegación.
  2. Elige Crear instancia de replicación.
  3. under Ajustes, Para Nombre, ingrese un nombre para su instancia.
  4. under Configuración de instancia, Para Alta disponibilidad, escoger Carga de trabajo de desarrollo o prueba (Single-AZ).
  5. under Conectividad y seguridad, Para Grupos de seguridad de VPC, escoger tu préstamo estudiantil.
  6. Elige Crear instancia de replicación.

Crear puntos de enlace de Amazon VPC

Opcionalmente, puede crear Puntos de enlace de Amazon VPC para DynamoDB cuando necesite conectarse a su tabla de DynamoDB desde la instancia de AWS DMS en una red privada. También asegúrese de habilitar Públicamente Accesible cuando necesite conectarse a una base de datos fuera de su VPC.

Crear un punto de enlace de origen de AWS DMS

Cree un punto de enlace de origen de AWS DMS completando los siguientes pasos:

  1. En la consola de AWS DMS, elija Endpoints en el panel de navegación.
  2. Elige Crear punto final.
  3. Tipo de punto final, escoger Extremo de origen.
  4. under Configuración de punto final, Para Identificador de punto final, ingrese un nombre para su punto final.
  5. motor de origen, escoger Amazon Aurora MySQL.
  6. Acceso a la base de datos de puntos finales, escoger Proporcionar información de acceso manualmente.
  7. Nombre del servidor, ingrese el nombre del punto final de su instancia de escritor de Aurora (por ejemplo, mycluster.cluster-123456789012.us-east-1.rds.amazonaws.com).
  8. Puerto, introduzca 3306.
  9. nombre de usuario, ingrese un nombre de usuario para su tarea de AWS DMS.
  10. Contraseña, Ingrese una contraseña.
  11. Elige Crear punto final.

Cree un punto de enlace de destino de AWS DMS

Cree un punto de enlace de destino de AWS DMS completando los siguientes pasos:

  1. En la consola de AWS DMS, elija Endpoints en el panel de navegación.
  2. Elige Crear punto final.
  3. Tipo de punto final, escoger punto final de destino.
  4. under Configuración de punto final, Para Identificador de punto final, ingrese un nombre para su punto final.
  5. motor de destino, escoger Amazon DynamoDB.
  6. ARN del rol de acceso al servicio, ingrese el rol de IAM para su tarea de AWS DMS.
  7. Elige Crear punto final.

Crear tareas de migración de AWS DMS

Cree tareas de migración de bases de datos de AWS DMS completando los siguientes pasos:

  1. En la consola de AWS DMS, elija Tareas de migración de base de datos en el panel de navegación.
  2. Elige Crear tarea.
  3. under Configuración de tareas, Para Identificador de tareas, ingrese un nombre para su tarea.
  4. Instancia de replicación, elija su instancia de replicación.
  5. Extremo de la base de datos de origen, elija su punto final de origen.
  6. Punto final de la base de datos de destino, elija su punto final de destino.
  7. Tipo de migración, escoger Migre los datos existentes y replique los cambios en curso.
  8. under Configuraciones de tareas, Para Modo de preparación de la tabla de objetivos, escoger Hacer nada.
  9. Detener la tarea después de completar la carga completa, escoger No pares.
  10. Configuración de la columna LOB, escoger Modo LOB limitado.
  11. Registros de tareas, habilitar Activar registros de CloudWatch y Activar la aplicación optimizada por lotes.
  12. under Asignaciones de tablas, escoger Editor JSON e introduzca las siguientes reglas.

Aquí puede agregar valores a la columna. Con las siguientes reglas, la tarea CDC de AWS DMS primero creará una nueva tabla de DynamoDB con el nombre especificado en target-table-name. Luego replicará todos los registros, asignación de las columnas de la tabla de base de datos a los atributos de la tabla de DynamoDB.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "mydev", "table-name": "country_lookup_table" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "2", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "mydev", "table-name": "country_lookup_table" }, "target-table-name": "tgt_country_lookup_table", "mapping-parameters": { "partition-key-name": "code", "sort-key-name": "countryname", "exclude-columns": [ "code", "countryname" ], "attribute-mappings": [ { "target-attribute-name": "code", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${code}" }, { "target-attribute-name": "countryname", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${countryname}" } ], "apply-during-cdc": true } } ]
}

Asignación de tablas DMS

  1. Elige Crear tarea.

Ahora se ha iniciado la tarea de replicación de AWS DMS.

  1. Esperen al Estado para mostrar como Carga completa.

tarea de SGD

  1. En la consola DynamoDB, elija Mesas en el panel de navegación.
  2. Seleccione la tabla de referencia de DynamoDB y elija Explora los elementos de la mesa para revisar los registros replicados.

Tabla de referencia de DynamoDB inicial

Cree una tabla de AWS Glue Data Catalog y un trabajo de ETL de transmisión de AWS Glue

En esta sección, creará una tabla de AWS Glue Data Catalog y un trabajo de extracción, transformación y carga (ETL) de transmisión de AWS Glue.

Crear una tabla de catálogo de datos

Cree una tabla de AWS Glue Data Catalog para el flujo de datos de origen de Kinesis con los siguientes pasos:

  1. En la consola de AWS Glue, elija Bases de datos bajo Catálogo de datos en el panel de navegación.
  2. Elige Agregar base de datos.
  3. Nombre, introduzca my_kinesis_db.
  4. Elige Crear base de datos.
  5. Elige Mesas bajo Bases de datos, A continuación, elija Agregar tabla.
  6. Nombre, introduzca my_stream_src_table.
  7. Base de datos, escoger my_kinesis_db.
  8. Seleccione el tipo de fuente, escoger Kinesis.
  9. El flujo de datos de Kinesis se encuentra en, escoger mi cuenta.
  10. Nombre de transmisión de Kinesis, ingrese un nombre para su flujo de datos.
  11. Clasificación, seleccione JSON.
  12. Elige Siguiente.
  13. Elige Editar esquema como JSON, ingrese el siguiente JSON, luego elija Guardar.
[ { "Name": "uuid", "Type": "string", "Comment": "" }, { "Name": "country", "Type": "string", "Comment": "" }, { "Name": "itemtype", "Type": "string", "Comment": "" }, { "Name": "saleschannel", "Type": "string", "Comment": "" }, { "Name": "orderpriority", "Type": "string", "Comment": "" }, { "Name": "orderdate", "Type": "string", "Comment": "" }, { "Name": "region", "Type": "string", "Comment": "" }, { "Name": "shipdate", "Type": "string", "Comment": "" }, { "Name": "unitssold", "Type": "string", "Comment": "" }, { "Name": "unitprice", "Type": "string", "Comment": "" }, { "Name": "unitcost", "Type": "string", "Comment": "" }, { "Name": "totalrevenue", "Type": "string", "Comment": "" }, { "Name": "totalcost", "Type": "string", "Comment": "" }, { "Name": "totalprofit", "Type": "string", "Comment": "" }, { "Name": "impressiontime", "Type": "string", "Comment": "" }
]

Esquema de tabla de catálogo de pegamento

    1. Elige Siguiente, A continuación, elija Crear.

Cree un trabajo ETL de transmisión de AWS Glue

A continuación, crea un trabajo de transmisión de AWS Glue. AWS Glue 3.0 y versiones posteriores admiten Apache Hudi de forma nativa, por lo que usamos esta integración nativa para ingerir en una tabla de Hudi. Complete los siguientes pasos para crear el trabajo de transmisión de AWS Glue:

  1. En la consola de AWS Glue Studio, elija Editor de secuencias de comandos de chispa y elige Crear.
  2. under Detalles del trabajo pestaña, para Nombre, ingrese un nombre para su trabajo.
  3. Rol de IAM, elija el rol de IAM para su trabajo de AWS Glue.
  4. Tipo de Propiedad, seleccione Transmisión de chispa.
  5. Versión con pegamento, escoger Glue 4.0 - Soporta Spark 3.3, Scala 2, Python 3.
  6. Número solicitado de trabajadores, introduzca 3.
  7. under Propiedades avanzadas, Para Parámetros de trabajo, escoger Añadir nuevo parámetro.
  8. Clave, introduzca --conf.
  9. Valor, introduzca spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false.
  10. Elige Añadir nuevo parámetro.
  11. Clave, introduzca --datalake-formats.
  12. Valor, introduzca hudi.
  13. Ruta de la secuencia de comandos, introduzca s3://<S3BucketName>/scripts/.
  14. ruta temporal, introduzca s3://<S3BucketName>/temporary/.
  15. Opcionalmente, para Ruta de registros de la interfaz de usuario de Spark, introduzca s3://<S3BucketName>/sparkHistoryLogs/.

Parámetro de trabajo de pegamento

  1. En Guión pestaña, ingrese el siguiente script en el editor de AWS Glue Studio y elija Crear.

El trabajo de transmisión casi en tiempo real enriquece los datos al unir un flujo de datos de Kinesis con una tabla de DynamoDB que contiene datos de referencia actualizados con frecuencia. El conjunto de datos enriquecido se carga en la tabla Hudi de destino en el lago de datos. Reemplazar con su depósito que creó a través de AWS CloudFormation:

import sys, json
import boto3
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job args = getResolvedOptions(sys.argv,["JOB_NAME"]) # Initialize spark session and Glue context
sc = SparkContext() glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args) # job paramters
dydb_lookup_table = "tgt_country_lookup_table"
kin_src_database_name = "my_kinesis_db" kin_src_table_name = "my_stream_src_table" hudi_write_operation = "upsert" hudi_record_key = "uuid" hudi_precomb_key = "orderdate" checkpoint_path = "s3://<S3BucketName>/streamlab/checkpoint/" s3_output_folder = "s3://<S3BucketName>/output/"
hudi_table = "hudi_table"
hudi_database = "my_kinesis_db" # hudi options additional_options={ "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.write.recordkey.field": hudi_record_key, "hoodie.datasource.hive_sync.database": hudi_database, "hoodie.table.name": hudi_table, "hoodie.consistency.check.enabled": "true", "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.NonpartitionedKeyGenerator", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.NonPartitionedExtractor", "hoodie.datasource.write.hive_style_partitioning": "false", "hoodie.datasource.write.precombine.field": hudi_precomb_key, "hoodie.bulkinsert.shuffle.parallelism": "4", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.write.operation": hudi_write_operation, "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
} # Scan and load the reference data table from DynamoDB into AWS Glue DynamicFrames using boto3 API.
def readDynamoDb(): dynamodb = boto3.resource(“dynamodb”) table = dynamodb.Table(dydb_lookup_table) response = table.scan() items = response[“Items”] jsondata = sc.parallelize(items) lookupDf = glueContext.read.json(jsondata) return lookupDf # Load the Amazon Kinesis data stream from Amazon Glue Data Catalog.
source_df = glueContext.create_data_frame.from_catalog( database=kin_src_database_name, table_name=kin_src_table_name, transformation_ctx=”source_df”, additional_options={“startingPosition”: “TRIM_HORIZON”},
) # As part of batch processing, implement the transformation logic for joining streaming data frames with reference data frames.
def processBatch(data_frame, batchId): if data_frame.count() > 0: # Refresh the dymanodb table to pull latest snapshot for each microbatch country_lookup_df = readDynamoDb() final_frame = data_frame.join( country_lookup_df, data_frame["country"] == country_lookup_df["countryname"], 'left' ).drop( "countryname", "country", "unitprice", "unitcost", "totalrevenue", "totalcost", "totalprofit" ) # Script generated for node my-lab-hudi-connector final_frame.write.format("hudi") .options(**additional_options) .mode("append") .save(s3_output_folder) try: glueContext.forEachBatch( frame=source_df, batch_function=processBatch, options={"windowSize": "60 seconds", "checkpointLocation": checkpoint_path}, )
except Exception as e: print(f"Error is @@@ ....{e}")

  1. Elige Ejecutar para iniciar el trabajo de transmisión.

La siguiente captura de pantalla muestra ejemplos de DataFrames data_frame, country_lookup_dfy final_frame.

Inicial de salida de registro de trabajo de pegamento

El trabajo de AWS Glue unió correctamente los registros provenientes del flujo de datos de Kinesis y la tabla de referencia en DynamoDB, y luego ingirió los registros unidos en Amazon S3 en formato Hudi.

Cree y ejecute una secuencia de comandos de Python para generar datos de muestra y cargarlos en el flujo de datos de Kinesis.

En esta sección, creará y ejecutará Python para generar datos de muestra y cargarlos en el flujo de datos de origen de Kinesis. Complete los siguientes pasos:

  1. Inicie sesión en AWS Cloud9, su instancia EC2 o cualquier otro host informático que coloque registros en su flujo de datos.
  2. Cree un archivo de Python llamado generate-data-for-kds.py:
$ python3 generate-data-for-kds.py

  1. Abra el archivo Python e ingrese el siguiente script:
import json
import random
import boto3
import time STREAM_NAME = "<mystreamname>" def get_data(): return { "uuid": random.randrange(0, 1000001, 1), "country": random.choice( [ "United Arab Emirates", "China", "India", "United Kingdom", "United States of America", ] ), "itemtype": random.choice( [ "Snacks", "Cereals", "Cosmetics", "Fruits", "Clothes", "Babycare", "Household", ] ), "saleschannel": random.choice( [ "Snacks", "Cereals", "Cosmetics", "Fruits", "Clothes", "Babycare", "Household", ] ), "orderpriority": random.choice(["H", "L", "M", "C"]), "orderdate": random.choice( [ "1/4/10", "2/28/10", "2/15/11", "11/8/11", "2/1/12", "2/18/12", "3/1/12", "9/24/12", "10/13/12", "12/2/12", "12/29/12", "3/30/13", "7/29/13", "3/23/14", "6/14/14", "7/15/14", "10/19/14", "5/7/15", "10/11/15", "11/22/15", "8/23/16", "1/15/17", "1/27/17", "2/25/17", "3/10/17", "4/1/17", ] ), "region": random.choice( ["Asia" "Europe", "Americas", "Middle Eastern", "Africa"] ), "shipdate": random.choice( [ "1/4/10", "2/28/10", "2/15/11", "11/8/11", "2/1/12", "2/18/12", "3/1/12", "9/24/12", "10/13/12", "12/2/12", "12/29/12", "3/30/13", "7/29/13", "3/23/14", "6/14/14", "7/15/14", "10/19/14", "5/7/15", "10/11/15", "11/22/15", "8/23/16", "1/15/17", "1/27/17", "2/25/17", "3/10/17", "4/1/17", ] ), "unitssold": random.choice( [ "8217", "3465", "8877", "2882", "70", "7044", "6307", "2384", "1327", "2572", "8794", "4131", "5793", "9091", "4314", "9085", "5270", "5459", "1982", "8245", "4860", "4656", "8072", "65", "7864", "9778", ] ), "unitprice": random.choice( [ "97.44", "117.11", "364.69", "502.54", "263.33", "117.11", "35.84", "6.92", "35.84", "6.92", "35.84", "56.67", "159.42", "502.54", "117.11", "56.67", "524.96", "502.54", "56.67", "56.67", "159.42", "56.67", "35.84", "159.42", "502.54", "31.79", ] ), "unitcost": random.choice( [ "97.44", "117.11", "364.69", "502.54", "263.33", "117.11", "35.84", "6.92", "35.84", "6.92", "35.84", "56.67", "159.42", "502.54", "117.11", "56.67", "524.96", "502.54", "56.67", "56.67", "159.42", "56.67", "35.84", "159.42", "502.54", "31.79", ] ), "totalrevenue": random.choice( [ "1253749.86", "712750.5", "3745117.53", "1925954.14", "30604", "1448950.8", "689228.96", "22242.72", "145014.56", "23996.76", "961008.32", "337626.63", "1478837.04", "6075242.57", "887389.8", "742517.05", "3431876.7", "3648085.93", "161988.86", "673863.85", "1240660.8", "380534.88", "882108.16", "16593.2", "5255275.28", "463966.1", ] ), "totalcost": random.choice( [ "800664.48", "405786.15", "3237353.13", "1448320.28", "18433.1", "824922.84", "226042.88", "16497.28", "47559.68", "17798.24", "315176.96", "234103.77", "923520.06", "4568591.14", "505212.54", "514846.95", "2766539.2", "2743365.86", "112319.94", "467244.15", "774781.2", "263855.52", "289300.48", "10362.3", "3951974.56", "310842.62", ] ), "totalprofit": random.choice( [ "453085.38", "306964.35", "507764.4", "477633.86", "12170.9", "624027.96", "463186.08", "5745.44", "97454.88", "6198.52", "645831.36", "103522.86", "555316.98", "1506651.43", "382177.26", "227670.1", "665337.5", "904720.07", "49668.92", "206619.7", "465879.6", "116679.36", "592807.68", "6230.9", "1303300.72", "153123.48", ] ), "impressiontime": random.choice( [ "2022-10-24T02:27:41Z", "2022-10-24T02:27:41Z", "2022-11-24T02:27:41Z", "2022-12-24T02:27:41Z", "2022-13-24T02:27:41Z", "2022-14-24T02:27:41Z", "2022-15-24T02:27:41Z", ] ), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) time.sleep(2) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

Este script coloca un registro de flujo de datos de Kinesis cada 2 segundos.

Simule la actualización de la tabla de referencia en el clúster de Aurora MySQL

Ahora todos los recursos y configuraciones están listos. Para este ejemplo, queremos agregar un código de país de 3 dígitos a la tabla de referencia. Actualicemos registros en la tabla de Aurora MySQL para simular cambios. Complete los siguientes pasos:

  1. Asegúrese de que el trabajo de transmisión de AWS Glue ya se esté ejecutando.
  2. Vuelva a conectarse a la instancia de base de datos principal, como se describió anteriormente.
  3. Ingrese sus comandos SQL para actualizar registros:
> UPDATE country_lookup_table SET combinedname='US-USA-US' WHERE code='US';
> UPDATE country_lookup_table SET combinedname='CA-CAN-Canada' WHERE code='CA';
> UPDATE country_lookup_table SET combinedname='CN-CHN-China' WHERE code='CN';
> UPDATE country_lookup_table SET combinedname='IN-IND-India' WHERE code='IN';

Ahora se ha actualizado la tabla de referencia en la base de datos de origen de Aurora MySQL. Luego, los cambios se replican automáticamente en la tabla de referencia en DynamoDB.

Tabla de referencia de DynamoDB actualizada

Las siguientes tablas muestran los registros en data_frame, country_lookup_dfy final_frame. En country_lookup_df y final_frame, el combinedname columna tiene valores formateados como <2-digit-country-code>-<3-digit-country-code>-<country-name>, que muestra que los registros modificados en la tabla a la que se hace referencia se reflejan en la tabla sin reiniciar el trabajo de transmisión de AWS Glue. Significa que el trabajo de AWS Glue une correctamente los registros entrantes del flujo de datos de Kinesis con la tabla de referencia incluso cuando la tabla de referencia está cambiando.
Salida de registro de trabajo de pegamento actualizada

Consulta la tabla Hudi usando Athena

Consultemos la tabla Hudi usando Athena para ver los registros en la tabla de destino. Complete los siguientes pasos:

  1. Asegúrese de que el script y el trabajo de AWS Glue Streaming sigan funcionando:
    1. El script de Python (generate-data-for-kds.py) todavía se está ejecutando.
    2. Los datos generados se envían al flujo de datos.
    3. El trabajo de transmisión de AWS Glue aún se está ejecutando.
  2. En la consola de Athena, ejecute el siguiente SQL en el editor de consultas:
select shipdate, unitssold, impressiontime, code,combinedname from <database>.<table>
where combinedname is not null
limit 10;

El siguiente resultado de la consulta muestra los registros que se procesan antes de cambiar la tabla a la que se hace referencia. Registros en el combinedname columna son similares a <2-digit-country-code>-<country-name>.

Resultado inicial de la consulta de Athena

El siguiente resultado de la consulta muestra los registros que se procesan después de cambiar la tabla a la que se hace referencia. Registros en el combinedname columna son similares a <2-digit-country-code>-<3-digit-country-code>-<country-name>.

Resultado de la consulta de Athena actualizado

Ahora comprende que los datos de referencia modificados se reflejan correctamente en la tabla Hudi de destino que une los registros del flujo de datos de Kinesis y los datos de referencia en DynamoDB.

Limpiar

Como paso final, limpie los recursos:

  1. Elimine el flujo de datos de Kinesis.
  2. Elimine la tarea de migración, el punto de enlace y la instancia de replicación de AWS DMS.
  3. Detenga y elimine el trabajo de transmisión de AWS Glue.
  4. Elimine el entorno de AWS Cloud9.
  5. Elimine la plantilla de CloudFormation.

Conclusión

La creación y el mantenimiento de un lago de datos transaccionales que involucra la ingesta y el procesamiento de datos en tiempo real tiene múltiples componentes variables y decisiones que se deben tomar, como qué servicio de ingesta usar, cómo almacenar sus datos de referencia y qué marco de lago de datos transaccionales usar. En esta publicación, proporcionamos los detalles de implementación de dicha canalización, utilizando componentes nativos de AWS como componentes básicos y Apache Hudi como marco de código abierto para un lago de datos transaccionales.

Creemos que esta solución puede ser un punto de partida para las organizaciones que buscan implementar un nuevo lago de datos con tales requisitos. Además, los diferentes componentes son totalmente conectables y se pueden mezclar y combinar con lagos de datos existentes para abordar nuevos requisitos o migrar los existentes, abordando sus puntos débiles.


Sobre los autores

manish kola es un arquitecto de soluciones de laboratorio de datos en AWS, donde trabaja en estrecha colaboración con clientes de diversas industrias para diseñar soluciones nativas en la nube para sus necesidades de inteligencia artificial y análisis de datos. Se asocia con los clientes en su recorrido por AWS para resolver sus problemas comerciales y crear prototipos escalables. Antes de unirse a AWS, la experiencia de Manish incluye ayudar a los clientes a implementar proyectos de almacenamiento de datos, BI, integración de datos y lago de datos.

santosh kotagiri es arquitecto de soluciones en AWS con experiencia en análisis de datos y soluciones en la nube que conducen a resultados comerciales tangibles. Su experiencia radica en el diseño e implementación de soluciones de análisis de datos escalables para clientes de todas las industrias, con un enfoque en servicios nativos de la nube y de código abierto. Le apasiona aprovechar la tecnología para impulsar el crecimiento empresarial y resolver problemas complejos.

Chiho Sugimoto es ingeniero de soporte en la nube en el equipo de soporte de Big Data de AWS. Le apasiona ayudar a los clientes a crear lagos de datos mediante cargas de trabajo de ETL. Le encanta la ciencia planetaria y disfruta estudiar el asteroide Ryugu los fines de semana.

Noritaka Sekiyama es Arquitecto Principal de Big Data en el equipo de AWS Glue. Es responsable de crear artefactos de software para ayudar a los clientes. En su tiempo libre, disfruta andar en bicicleta con su nueva bicicleta de carretera.

punto_img

Información más reciente

punto_img