Logotipo de Zephyrnet

Implemente dimensiones que cambian lentamente en un lago de datos con AWS Glue y Delta

Fecha:

En un almacén de datos, un dimensión es una estructura que categoriza hechos y medidas para permitir a los usuarios responder preguntas comerciales. Para ilustrar un ejemplo, en un dominio de ventas típico, el cliente, el tiempo o el producto son dimensiones y las transacciones de ventas son un hecho. Los atributos dentro de la dimensión pueden cambiar con el tiempo: un cliente puede cambiar su dirección, un empleado puede pasar de un puesto de contratista a un puesto de tiempo completo, o un producto puede tener varias revisiones. A dimensión que cambia lentamente (SCD) es un concepto de almacenamiento de datos que contiene datos relativamente estáticos que pueden cambiar lentamente durante un período de tiempo. Hay tres tipos principales de SCD que se mantienen en el almacenamiento de datos: Tipo 1 (sin historial), Tipo 2 (historial completo) y Tipo 3 (historial limitado). La captura de datos modificados (CDC) es una característica de una base de datos que brinda la capacidad de identificar los datos que cambiaron entre dos cargas de la base de datos, de modo que se pueda realizar una acción en los datos modificados.

A medida que las organizaciones de todo el mundo están modernizando sus plataformas de datos con lagos de datos en Servicio de almacenamiento simple de Amazon (Amazon S3), el manejo de SCD en lagos de datos puede ser un desafío. Se vuelve aún más desafiante cuando los sistemas de origen no proporcionan un mecanismo para identificar los datos modificados para su procesamiento dentro del lago de datos y hace que el procesamiento de datos sea muy complejo si la fuente de datos es semiestructurada en lugar de una base de datos. El objetivo clave al manejar los SCD de tipo 2 es definir las fechas de inicio y finalización del conjunto de datos con precisión para realizar un seguimiento de los cambios dentro del lago de datos, ya que esto proporciona la capacidad de generación de informes puntuales para las aplicaciones de consumo.

En esta publicación, nos enfocamos en demostrar cómo identificar los datos modificados para una fuente semiestructurada (JSON) y capturar los cambios de datos históricos completos (SCD Tipo 2) y almacenarlos en un lago de datos S3, usando Pegamento AWS y formato de lago de datos abierto Delta.io. Esta implementación admite los siguientes casos de uso:

  • Seguimiento de SCD de tipo 2 con fechas de inicio y finalización para identificar los registros históricos actuales y completos y una bandera para identificar los registros eliminados en el lago de datos (eliminaciones lógicas)
  • Utilizar herramientas de consumo como Atenea amazónica para consultar registros históricos sin problemas

Resumen de la solución

Esta publicación demuestra la solución con un caso de uso de extremo a extremo utilizando un conjunto de datos de empleados de muestra. El conjunto de datos representa los detalles de los empleados, como ID, nombre, dirección, número de teléfono, contratista o no, y más. Para demostrar la implementación de SCD, considere las siguientes suposiciones:

  • El equipo de ingeniería de datos recibe archivos diarios que son instantáneas completas de los registros y no contienen ningún mecanismo para identificar los cambios en los registros de origen.
  • El equipo tiene la tarea de implementar la funcionalidad SCD Tipo 2 para identificar registros nuevos, actualizados y eliminados de la fuente y preservar los cambios históricos en el lago de datos.
  • Debido a que los sistemas de origen no brindan la capacidad de CDC, se debe desarrollar un mecanismo para identificar los registros nuevos, actualizados y eliminados y conservarlos en la capa del lago de datos.

La arquitectura se implementa de la siguiente manera:

  • Los sistemas de origen ingieren archivos en el depósito de aterrizaje de S3 (este paso se imita generando los registros de muestra utilizando el AWS Lambda función en el cubo de aterrizaje)
  • Un trabajo de AWS Glue (trabajo Delta) selecciona el archivo de datos de origen y procesa los datos modificados de la carga de archivo anterior (nuevas inserciones, actualizaciones de los registros existentes y registros eliminados del origen) en el lago de datos de S3 (depósito de capa procesada)
  • La arquitectura utiliza el formato de lago de datos abierto (Delta) y construye el lago de datos S3 como un lago Delta, que es mutable, porque los nuevos cambios se pueden actualizar, se pueden agregar nuevas inserciones y las eliminaciones de fuentes se pueden identificar con precisión y marcar. con un delete_flag propuesta de
  • Un rastreador de AWS Glue cataloga los datos, que Athena puede consultar

El siguiente diagrama ilustra nuestra arquitectura.

Requisitos previos

Antes de comenzar, asegúrese de tener los siguientes requisitos previos:

Implementar la solución

Para esta solución, proporcionamos una plantilla de CloudFormation que configura los servicios incluidos en la arquitectura para permitir implementaciones repetibles. Esta plantilla crea los siguientes recursos:

  • Dos cubos S3: un cubo de aterrizaje para almacenar datos de empleados de muestra y un cubo de capa procesada para el lago de datos mutable (Delta Lake)
  • Una función Lambda para generar registros de muestra
  • Un trabajo de extracción, transformación y carga (ETL) de AWS Glue para procesar los datos de origen desde el depósito de aterrizaje hasta el depósito procesado

Para implementar la solución, complete los siguientes pasos:

  1. Elige Pila de lanzamiento para iniciar la pila de CloudFormation:

  1. Ingrese un nombre de pila.
  2. Seleccione Reconozco que AWS CloudFormation podría crear recursos de IAM con nombres personalizados.
  3. Elige Crear pila.

Una vez completada la implementación de la pila de CloudFormation, vaya a la consola de AWS CloudFormation para anotar los siguientes recursos en la Salidas lengüeta:

  • Recursos del lago de datos – Los cubos S3 scd-blog-landing-xxxx y scd-blog-processed-xxxx (denominado scd-blog-landing y scd-blog-processed en las secciones siguientes de este post)
  • Generador de registros de muestra Función LambdaSampleDataGenaratorLambda-<CloudFormation Stack Name> (denominado SampleDataGeneratorLambda)
  • Base de datos del catálogo de datos de AWS Gluedeltalake_xxxxxx (denominado deltalake)
  • Trabajo Delta de pegamento de AWS<CloudFormation-Stack-Name>-src-to-processed (denominado src-to-processed)

Tenga en cuenta que la implementación de la pila de CloudFormation en su cuenta incurre en cargos por uso de AWS.

Pruebe la implementación de SCD Tipo 2

Con la infraestructura instalada, está listo para probar el diseño general de la solución y consultar los registros históricos del conjunto de datos de los empleados. Esta publicación está diseñada para implementarse en un caso de uso de un cliente real, en el que obtiene datos de instantáneas completos a diario. Probamos los siguientes aspectos de la implementación de SCD:

  • Ejecute un trabajo de AWS Glue para la carga inicial
  • Simule un escenario donde no hay cambios en la fuente
  • Simule escenarios de inserción, actualización y eliminación agregando nuevos registros y modificando y eliminando registros existentes
  • Simule un escenario en el que el registro eliminado vuelve como una nueva inserción

Generar un conjunto de datos de empleados de muestra

Para probar la solución, y antes de que pueda iniciar la ingesta de datos inicial, es necesario identificar la fuente de datos. Para simplificar ese paso, se implementó una función Lambda en la pila de CloudFormation que acaba de implementar.

Abra la función y configure un evento de prueba, con el valor predeterminado hello-world evento de plantilla JSON como se ve en la siguiente captura de pantalla. Proporcione un nombre de evento sin cambios en la plantilla y guarde el evento de prueba.

Elige Probar para invocar un evento de prueba, que invoca la función Lambda para generar los registros de muestra.

Cuando la función Lambda complete su invocación, podrá ver el siguiente conjunto de datos de empleados de muestra en el depósito de destino.

Ejecute el trabajo de AWS Glue

Confirme si ve el conjunto de datos del empleado en la ruta s3://scd-blog-landing/dataset/employee/. Puede descargar el conjunto de datos y abrirlo en un editor de código como VS Code. El siguiente es un ejemplo del conjunto de datos:

{"emp_id":1,"first_name":"Melissa","last_name":"Parks","Address":"19892 Williamson Causeway Suite 737nKarenborough, IN 11372","phone_number":"001-372-612-0684","isContractor":false}
{"emp_id":2,"first_name":"Laura","last_name":"Delgado","Address":"93922 Rachel Parkways Suite 717nKaylaville, GA 87563","phone_number":"001-759-461-3454x80784","isContractor":false}
{"emp_id":3,"first_name":"Luis","last_name":"Barnes","Address":"32386 Rojas SpringsnDicksonchester, DE 05474","phone_number":"127-420-4928","isContractor":false}
{"emp_id":4,"first_name":"Jonathan","last_name":"Wilson","Address":"682 Pace Springs Apt. 011nNew Wendy, GA 34212","phone_number":"761.925.0827","isContractor":true}
{"emp_id":5,"first_name":"Kelly","last_name":"Gomez","Address":"4780 Johnson TunnelnMichaelland, WI 22423","phone_number":"+1-303-418-4571","isContractor":false}
{"emp_id":6,"first_name":"Robert","last_name":"Smith","Address":"04171 Mitchell Springs Suite 748nNorth Juliaview, CT 87333","phone_number":"261-155-3071x3915","isContractor":true}
{"emp_id":7,"first_name":"Glenn","last_name":"Martinez","Address":"4913 Robert ViewsnWest Lisa, ND 75950","phone_number":"001-638-239-7320x4801","isContractor":false}
{"emp_id":8,"first_name":"Teresa","last_name":"Estrada","Address":"339 Scott ValleynGonzalesfort, PA 18212","phone_number":"435-600-3162","isContractor":false}
{"emp_id":9,"first_name":"Karen","last_name":"Spencer","Address":"7284 Coleman Club Apt. 813nAndersonville, AS 86504","phone_number":"484-909-3127","isContractor":true}
{"emp_id":10,"first_name":"Daniel","last_name":"Foley","Address":"621 Sarah Lock Apt. 537nJessicaton, NH 95446","phone_number":"457-716-2354x4945","isContractor":true}
{"emp_id":11,"first_name":"Amy","last_name":"Stevens","Address":"94661 Young Lodge Suite 189nCynthiamouth, PR 01996","phone_number":"241.375.7901x6915","isContractor":true}
{"emp_id":12,"first_name":"Nicholas","last_name":"Aguirre","Address":"7474 Joyce MeadowsnLake Billy, WA 40750","phone_number":"495.259.9738","isContractor":true}
{"emp_id":13,"first_name":"John","last_name":"Valdez","Address":"686 Brian Forges Suite 229nSullivanbury, MN 25872","phone_number":"+1-488-011-0464x95255","isContractor":false}
{"emp_id":14,"first_name":"Michael","last_name":"West","Address":"293 Jones Squares Apt. 997nNorth Amandabury, TN 03955","phone_number":"146.133.9890","isContractor":true}
{"emp_id":15,"first_name":"Perry","last_name":"Mcguire","Address":"2126 Joshua Forks Apt. 050nPort Angela, MD 25551","phone_number":"001-862-800-3814","isContractor":true}
{"emp_id":16,"first_name":"James","last_name":"Munoz","Address":"74019 Banks EstatesnEast Nicolefort, GU 45886","phone_number":"6532485982","isContractor":false}
{"emp_id":17,"first_name":"Todd","last_name":"Barton","Address":"2795 Kelly Shoal Apt. 500nWest Lindsaytown, TN 55404","phone_number":"079-583-6386","isContractor":true}
{"emp_id":18,"first_name":"Christopher","last_name":"Noble","Address":"Unit 7816 Box 9004nDPO AE 29282","phone_number":"215-060-7721","isContractor":true}
{"emp_id":19,"first_name":"Sandy","last_name":"Hunter","Address":"7251 Sarah CreeknWest Jasmine, CO 54252","phone_number":"8759007374","isContractor":false}
{"emp_id":20,"first_name":"Jennifer","last_name":"Ballard","Address":"77628 Owens Key Apt. 659nPort Victorstad, IN 02469","phone_number":"+1-137-420-7831x43286","isContractor":true}
{"emp_id":21,"first_name":"David","last_name":"Morris","Address":"192 Leslie Groves Apt. 930nWest Dylan, NY 04000","phone_number":"990.804.0382x305","isContractor":false}
{"emp_id":22,"first_name":"Paula","last_name":"Jones","Address":"045 Johnson Viaduct Apt. 732nNorrisstad, AL 12416","phone_number":"+1-193-919-7527x2207","isContractor":true}
{"emp_id":23,"first_name":"Lisa","last_name":"Thompson","Address":"1295 Judy Ports Suite 049nHowardstad, PA 11905","phone_number":"(623)577-5982x33215","isContractor":true}
{"emp_id":24,"first_name":"Vickie","last_name":"Johnson","Address":"5247 Jennifer Run Suite 297nGlenberg, NC 88615","phone_number":"708-367-4447x9366","isContractor":false}
{"emp_id":25,"first_name":"John","last_name":"Hamilton","Address":"5899 Barnes PlainnHarrisville, NC 43970","phone_number":"341-467-5286x20961","isContractor":false}

Descargue el conjunto de datos y manténgalo listo, ya que modificará el conjunto de datos para futuros casos de uso para simular las inserciones, actualizaciones y eliminaciones. El conjunto de datos de muestra generado para usted será completamente diferente al que ve en el ejemplo anterior.

Para ejecutar el trabajo, complete los siguientes pasos:

  1. En la consola de AWS Glue, elija Empleo en el panel de navegación.
  2. elige el trabajo src-to-processed.
  3. En Ron pestaña, elegir Ejecutar.

Cuando el trabajo de AWS Glue se ejecuta por primera vez, el trabajo lee el conjunto de datos del empleado de la ruta del depósito de destino e ingiere los datos en el depósito procesado como una tabla Delta.

Cuando se completa el trabajo, puede crear un rastreador para ver la carga de datos inicial. La siguiente captura de pantalla muestra la base de datos disponible en el Bases de datos .

  1. Elige Rastreadores en el panel de navegación.
  2. Elige Crear rastreador.

  1. Nombra tu rastreador delta-lake-crawler, A continuación, elija Siguiente.

  1. Seleccione Todavía no para datos ya asignados a tablas de AWS Glue.
  2. Elige Agregar una fuente de datos.

  1. En Fuente de datos menú desplegable, elija Delta Lake.
  2. Ingrese la ruta a la tabla Delta.
  3. Seleccione Crear tablas nativas.
  4. Elige Agregar una fuente de datos de Delta Lake.

  1. Elige Siguiente.

  1. Elija el rol que fue creado por la plantilla de CloudFormation, luego elija Siguiente.

  1. Elija la base de datos que fue creada por la plantilla de CloudFormation, luego elija Siguiente.

  1. Elige Crear rastreador.

  1. Seleccione su rastreador y elija Ejecutar.

Consultar los datos

Una vez que se completa el rastreador, puede ver la tabla que creó.

Para consultar los datos, complete los siguientes pasos:

  1. Elija la tabla de empleados y en la Acciones menú, seleccione Ver datos.

Será redirigido a la consola de Athena. Si no tiene el último motor de Athena, cree un nuevo grupo de trabajo de Athena con el último motor de Athena.

  1. under Administración en el panel de navegación, elija Grupos de trabajo.

  1. Elige Crear grupo de trabajo.

  1. Proporcione un nombre para el grupo de trabajo, como DeltaWorkgroup.
  2. Seleccione Atenea SQL como el motor, y elija Motor Athena versión 3 para Versión del motor de consultas.

  1. Elige Crear grupo de trabajo.

  1. Después de crear el grupo de trabajo, seleccione el grupo de trabajo (DeltaWorkgroup) en el menú desplegable del editor de consultas de Athena.

  1. Ejecute la siguiente consulta en el employee mesa:
SELECT * FROM "deltalake_2438fbd0"."employee";

Nota: Actualice el nombre correcto de la base de datos de los resultados de CloudFormation antes de ejecutar la consulta anterior.

Puedes observar que el employee La tabla tiene 25 registros. La siguiente captura de pantalla muestra el total de registros de empleados con algunos registros de muestra.

La tabla Delta se almacena con un emp_key, que es único para todos y cada uno de los cambios y se utiliza para realizar un seguimiento de los cambios. El emp_key se crea para cada inserción, actualización y eliminación, y se puede usar para encontrar todos los cambios relacionados con un solo emp_id.

La emp_key se crea usando el algoritmo hash SHA256, como se muestra en el siguiente código:

df.withColumn("emp_key", sha2(concat_ws("||", col("emp_id"), col("first_name"), col("last_name"), col("Address"), col("phone_number"), col("isContractor")), 256))

Realizar inserciones, actualizaciones y eliminaciones

Antes de realizar cambios en el conjunto de datos, ejecutemos el mismo trabajo una vez más. Suponiendo que la carga actual del origen sea la misma que la carga inicial sin cambios, el trabajo de AWS Glue no debería realizar ningún cambio en el conjunto de datos. Después de completar el trabajo, ejecute el anterior Select consulta en el editor de consultas de Athena y confirma que todavía hay 25 registros activos con los siguientes valores:

  • Los 25 registros con la columna isCurrent=true
  • Los 25 registros con la columna end_date=Null
  • Los 25 registros con la columna delete_flag=false

Después de confirmar la ejecución del trabajo anterior con estos valores, modifiquemos nuestro conjunto de datos inicial con los siguientes cambios:

  1. Cambie el isContractor bandera a false (cambiarlo a true si su conjunto de datos ya muestra false) para emp_id=12.
  2. Eliminar toda la fila donde emp_id=8 (asegúrese de guardar el registro en un editor de texto, porque usamos este registro en otro caso de uso).
  3. Copie la fila para emp_id=25 e inserte una nueva fila. Cambiar el emp_id para ser 26y asegúrese de cambiar los valores para otras columnas también.

Después de realizar estos cambios, el conjunto de datos de origen del empleado tiene el siguiente código (para facilitar la lectura, solo hemos incluido los registros modificados como se describe en los tres pasos anteriores):

{"emp_id":12,"first_name":"Nicholas","last_name":"Aguirre","Address":"7474 Joyce MeadowsnLake Billy, WA 40750","phone_number":"495.259.9738","isContractor":false}
{"emp_id":26,"first_name":"John-copied","last_name":"Hamilton-copied","Address":"6000 Barnes PlainnHarrisville-city, NC 5000","phone_number":"444-467-5286x20961","isContractor":true}

  1. Ahora, sube los cambios. fake_emp_data.json archivo al mismo prefijo de origen.

  1. Después de cargar el conjunto de datos de empleados modificado en Amazon S3, vaya a la consola de AWS Glue y ejecute el trabajo.
  2. Cuando se complete el trabajo, ejecute la siguiente consulta en el editor de consultas de Athena y confirme que hay 27 registros en total con los siguientes valores:
SELECT * FROM "deltalake_2438fbd0"."employee";

Nota: Actualice el nombre correcto de la base de datos de la salida de CloudFormation antes de ejecutar la consulta anterior.

  1. Ejecute otra consulta en el editor de consultas de Athena y confirme que hay 4 registros devueltos con los siguientes valores:
SELECT * FROM "AwsDataCatalog"."deltalake_2438fbd0"."employee" where emp_id in (8, 12, 26)
order by emp_id;

Nota: Actualice el nombre correcto de la base de datos de la salida de CloudFormation antes de ejecutar la consulta anterior.

Verá dos registros para emp_id=12:

  • Un emp_id=12 registro con los siguientes valores (para el registro que se ingirió como parte de la carga inicial):
    • emp_key=44cebb094ef289670e2c9325d5f3e4ca18fdd53850b7ccd98d18c7a57cb6d4b4
    • isCurrent=false
    • delete_flag=false
    • end_date=’2023-03-02’
  • Un segundo emp_id=12 registro con los siguientes valores (para el registro que se ingirió como parte del cambio en la fuente):
    • emp_key=b60547d769e8757c3ebf9f5a1002d472dbebebc366bfbc119227220fb3a3b108
    • isCurrent=true
    • delete_flag=false
    • end_date=Null (o cadena vacía)

El récord de emp_id=8 que se eliminó en la fuente como parte de esta ejecución seguirá existiendo pero con los siguientes cambios en los valores:

  • isCurrent=false
  • end_date=’2023-03-02’
  • delete_flag=true

El nuevo registro de empleado se insertará con los siguientes valores:

  • emp_id=26
  • isCurrent=true
  • end_date=NULL (o cadena vacía)
  • delete_flag=false

Tenga en cuenta que emp_key los valores en su tabla real pueden ser diferentes a los que se proporcionan aquí como ejemplo.

  1. Para las eliminaciones, buscamos el emp_id de la tabla base junto con el nuevo archivo fuente y la unión interna con emp_key.
  2. Si la condición se evalúa como verdadera, verificamos si la tabla base de empleados emp_key es igual a las nuevas actualizaciones emp_key y obtenemos el registro actual sin eliminar (isCurrent=true y delete_flag=false).
  3. Fusionamos los cambios de eliminación del nuevo archivo con la tabla base para todas las filas de condiciones de eliminación coincidentes y actualizamos lo siguiente:
    1. isCurrent=false
    2. delete_flag=true
    3. end_date=current_date

Ver el siguiente código:

delete_join_cond = "employee.emp_id=employeeUpdates.emp_id and employee.emp_key = employeeUpdates.emp_key"
delete_cond = "employee.emp_key == employeeUpdates.emp_key and employee.isCurrent = true and employeeUpdates.delete_flag = true" base_tbl.alias("employee") .merge(union_updates_dels.alias("employeeUpdates"), delete_join_cond) .whenMatchedUpdate(condition=delete_cond, set={"isCurrent": "false", "end_date": current_date(), "delete_flag": "true"}).execute()

  1. Tanto para las actualizaciones como para las inserciones, verificamos la condición si la tabla base employee.emp_id es igual a new changes.emp_id y del employee.emp_key es igual a new changes.emp_key, mientras solo recupera los registros actuales.
  2. Si esta condición se evalúa como true, luego obtenemos el registro actual (isCurrent=true y delete_flag=false).
  3. Fusionamos los cambios actualizando lo siguiente:
    1. Si la segunda condición se evalúa como true:
      1. isCurrent=false
      2. end_date=current_date
    2. O insertamos la fila completa de la siguiente manera si la segunda condición se evalúa como false:
      1. emp_id=new record’s emp_key
      2. emp_key=new record’s emp_key
      3. first_name=new record’s first_name
      4. last_name=new record’s last_name
      5. address=new record’s address
      6. phone_number=new record’s phone_number
      7. isContractor=new record’s isContractor
      8. start_date=current_date
      9. end_date=NULL (o cadena vacía)
      10. isCurrent=true
      11. delete_flag=false

Ver el siguiente código:

upsert_cond = "employee.emp_id=employeeUpdates.emp_id and employee.emp_key = employeeUpdates.emp_key and employee.isCurrent = true"
upsert_update_cond = "employee.isCurrent = true and employeeUpdates.delete_flag = false" base_tbl.alias("employee").merge(union_updates_dels.alias("employeeUpdates"), upsert_cond) .whenMatchedUpdate(condition=upsert_update_cond, set={"isCurrent": "false", "end_date": current_date() }) .whenNotMatchedInsert( values={ "isCurrent": "true", "emp_id": "employeeUpdates.emp_id", "first_name": "employeeUpdates.first_name", "last_name": "employeeUpdates.last_name", "Address": "employeeUpdates.Address", "phone_number": "employeeUpdates.phone_number", "isContractor": "employeeUpdates.isContractor", "emp_key": "employeeUpdates.emp_key", "start_date": current_date(), "delete_flag": "employeeUpdates.delete_flag", "end_date": "null" }) .execute()

Como último paso, recuperemos el registro eliminado del cambio anterior al conjunto de datos de origen y veamos cómo se reinserta en el employee table en el lago de datos y observe cómo se mantiene el historial completo.

Modifiquemos nuestro conjunto de datos modificado del paso anterior y hagamos los siguientes cambios.

  1. Agregar lo eliminado emp_id=8 volver al conjunto de datos.

Después de realizar estos cambios, el conjunto de datos de origen de mi empleado tiene el siguiente código (para facilitar la lectura, solo hemos incluido el registro agregado como se describe en el paso anterior):

{"emp_id":8,"first_name":"Teresa","last_name":"Estrada","Address":"339 Scott ValleynGonzalesfort, PA 18212","phone_number":"435-600-3162","isContractor":false}

  1. Cargue el archivo del conjunto de datos del empleado modificado en el mismo prefijo de origen.
  2. Después de cargar el cambio fake_emp_data.json conjunto de datos a Amazon S3, vaya a la consola de AWS Glue y vuelva a ejecutar el trabajo.
  3. Cuando se complete el trabajo, ejecute la siguiente consulta en el editor de consultas de Athena y confirme que hay 28 registros en total con los siguientes valores:
SELECT * FROM "deltalake_2438fbd0"."employee";

Nota: Actualice el nombre correcto de la base de datos de la salida de CloudFormation antes de ejecutar la consulta anterior.

  1. Ejecute la siguiente consulta y confirme que hay 5 registros:
SELECT * FROM "AwsDataCatalog"."deltalake_2438fbd0"."employee" where emp_id in (8, 12, 26)
order by emp_id;

Nota: Actualice el nombre correcto de la base de datos de la salida de CloudFormation antes de ejecutar la consulta anterior.

Verá dos registros para emp_id=8:

  • Un emp_id=8 registro con los siguientes valores (el registro anterior que se eliminó):
    • emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
    • isCurrent=false
    • deleted_flag=true
    • end_date=’2023-03-02
  • Otra emp_id=8 registro con los siguientes valores (el nuevo registro que se insertó en la última ejecución):
    • emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
    • isCurrent=true
    • deleted_flag=false
    • end_date=NULL (o cadena vacía)

La emp_key los valores en su tabla real pueden ser diferentes a los que se proporcionan aquí como ejemplo. También tenga en cuenta que debido a que este es el mismo registro eliminado que se reinsertó en la carga posterior sin ningún cambio, no habrá cambios en el emp_key.

Consultas de muestra del usuario final

Las siguientes son algunas consultas de usuario final de muestra para demostrar cómo se puede atravesar el historial de datos de cambios de empleados para generar informes:

  • Consulta 1 – Recuperar una lista de todos los empleados que abandonaron la organización en el mes actual (por ejemplo, marzo de 2023).
SELECT * FROM "deltalake_2438fbd0"."employee" where delete_flag=true and date_format(CAST(end_date AS date),'%Y/%m') ='2023/03'

Nota: Actualice el nombre correcto de la base de datos de la salida de CloudFormation antes de ejecutar la consulta anterior.

La consulta anterior devolvería dos registros de empleados que abandonaron la organización.

  • Consulta 2 – Recuperar una lista de nuevos empleados que se incorporaron a la organización en el mes actual (por ejemplo, marzo de 2023).
SELECT * FROM "deltalake_2438fbd0"."employee" where date_format(start_date,'%Y/%m') ='2023/03' and iscurrent=true

Nota: Actualice el nombre correcto de la base de datos de la salida de CloudFormation antes de ejecutar la consulta anterior.

La consulta anterior devolvería 23 registros de empleados activos que se unieron a la organización.

  • Consulta 3 – Encuentre el historial de cualquier empleado dado en la organización (en este caso, el empleado 18).
SELECT * FROM "deltalake_2438fbd0"."employee" where emp_id=18

Nota: Actualice el nombre correcto de la base de datos de la salida de CloudFormation antes de ejecutar la consulta anterior.

En la consulta anterior, podemos observar que el empleado 18 tuvo dos cambios en sus registros de empleados antes de dejar la organización.

Tenga en cuenta que los resultados de los datos proporcionados en este ejemplo son diferentes de lo que verá en sus registros específicos según los datos de muestra generados por la función Lambda.

Limpiar

Cuando haya terminado de experimentar con esta solución, limpie sus recursos para evitar que se incurra en cargos de AWS:

  1. Vacíe los cubos S3.
  2. Elimine la pila de la consola de AWS CloudFormation.

Conclusión

En esta publicación, demostramos cómo identificar los datos modificados para una fuente de datos semiestructurados y preservar los cambios históricos (SCD Tipo 2) en un S3 Delta Lake, cuando los sistemas de origen no pueden proporcionar la capacidad de captura de datos modificados, con AWS Pegamento. Puede ampliar aún más esta solución para permitir que las aplicaciones posteriores creen personalizaciones adicionales a partir de los datos de CDC capturados en el lago de datos.

Además, puede extender esta solución como parte de una orquestación usando Funciones de paso de AWS u otros orquestadores de uso común con los que su organización esté familiarizada. También puede ampliar esta solución agregando particiones cuando corresponda. También puede mantener la tabla delta por compactando los archivos pequeños.


Sobre los autores

Nith Govindasivan, es Arquitecto de Data Lake con AWS Professional Services, donde ayuda a los clientes a incorporarse en su viaje de arquitectura de datos moderna a través de la implementación de soluciones de Big Data y Analytics. Fuera del trabajo, Nith es un ávido fanático del cricket, mira casi cualquier cricket durante su tiempo libre y disfruta de viajes largos y viajes internacionales.

Vijay Vélpula es Arquitecto de datos con Servicios profesionales de AWS. Ayuda a los clientes a implementar Big Data y Analytics Solutions. Fuera del trabajo, le gusta pasar tiempo con la familia, viajar, hacer caminatas y andar en bicicleta.

Sriharsh Adari es arquitecto sénior de soluciones en Amazon Web Services (AWS), donde ayuda a los clientes a trabajar hacia atrás a partir de los resultados comerciales para desarrollar soluciones innovadoras en AWS. A lo largo de los años, ha ayudado a varios clientes en la transformación de plataformas de datos en verticales de la industria. Su área principal de especialización incluye estrategia tecnológica, análisis de datos y ciencia de datos. En su tiempo libre, le gusta practicar deportes, ver programas de televisión en exceso y jugar Tabla.

punto_img

Información más reciente

punto_img