En un almacén de datos, un dimensión es una estructura que categoriza hechos y medidas para permitir a los usuarios responder preguntas comerciales. Para ilustrar un ejemplo, en un dominio de ventas típico, el cliente, el tiempo o el producto son dimensiones y las transacciones de ventas son un hecho. Los atributos dentro de la dimensión pueden cambiar con el tiempo: un cliente puede cambiar su dirección, un empleado puede pasar de un puesto de contratista a un puesto de tiempo completo, o un producto puede tener varias revisiones. A dimensión que cambia lentamente (SCD) es un concepto de almacenamiento de datos que contiene datos relativamente estáticos que pueden cambiar lentamente durante un período de tiempo. Hay tres tipos principales de SCD que se mantienen en el almacenamiento de datos: Tipo 1 (sin historial), Tipo 2 (historial completo) y Tipo 3 (historial limitado). La captura de datos modificados (CDC) es una característica de una base de datos que brinda la capacidad de identificar los datos que cambiaron entre dos cargas de la base de datos, de modo que se pueda realizar una acción en los datos modificados.
A medida que las organizaciones de todo el mundo están modernizando sus plataformas de datos con lagos de datos en Servicio de almacenamiento simple de Amazon (Amazon S3), el manejo de SCD en lagos de datos puede ser un desafío. Se vuelve aún más desafiante cuando los sistemas de origen no proporcionan un mecanismo para identificar los datos modificados para su procesamiento dentro del lago de datos y hace que el procesamiento de datos sea muy complejo si la fuente de datos es semiestructurada en lugar de una base de datos. El objetivo clave al manejar los SCD de tipo 2 es definir las fechas de inicio y finalización del conjunto de datos con precisión para realizar un seguimiento de los cambios dentro del lago de datos, ya que esto proporciona la capacidad de generación de informes puntuales para las aplicaciones de consumo.
En esta publicación, nos enfocamos en demostrar cómo identificar los datos modificados para una fuente semiestructurada (JSON) y capturar los cambios de datos históricos completos (SCD Tipo 2) y almacenarlos en un lago de datos S3, usando Pegamento AWS y formato de lago de datos abierto Delta.io. Esta implementación admite los siguientes casos de uso:
- Seguimiento de SCD de tipo 2 con fechas de inicio y finalización para identificar los registros históricos actuales y completos y una bandera para identificar los registros eliminados en el lago de datos (eliminaciones lógicas)
- Utilizar herramientas de consumo como Atenea amazónica para consultar registros históricos sin problemas
Resumen de la solución
Esta publicación demuestra la solución con un caso de uso de extremo a extremo utilizando un conjunto de datos de empleados de muestra. El conjunto de datos representa los detalles de los empleados, como ID, nombre, dirección, número de teléfono, contratista o no, y más. Para demostrar la implementación de SCD, considere las siguientes suposiciones:
- El equipo de ingeniería de datos recibe archivos diarios que son instantáneas completas de los registros y no contienen ningún mecanismo para identificar los cambios en los registros de origen.
- El equipo tiene la tarea de implementar la funcionalidad SCD Tipo 2 para identificar registros nuevos, actualizados y eliminados de la fuente y preservar los cambios históricos en el lago de datos.
- Debido a que los sistemas de origen no brindan la capacidad de CDC, se debe desarrollar un mecanismo para identificar los registros nuevos, actualizados y eliminados y conservarlos en la capa del lago de datos.
La arquitectura se implementa de la siguiente manera:
- Los sistemas de origen ingieren archivos en el depósito de aterrizaje de S3 (este paso se imita generando los registros de muestra utilizando el AWS Lambda función en el cubo de aterrizaje)
- Un trabajo de AWS Glue (trabajo Delta) selecciona el archivo de datos de origen y procesa los datos modificados de la carga de archivo anterior (nuevas inserciones, actualizaciones de los registros existentes y registros eliminados del origen) en el lago de datos de S3 (depósito de capa procesada)
- La arquitectura utiliza el formato de lago de datos abierto (Delta) y construye el lago de datos S3 como un lago Delta, que es mutable, porque los nuevos cambios se pueden actualizar, se pueden agregar nuevas inserciones y las eliminaciones de fuentes se pueden identificar con precisión y marcar. con un
delete_flag
propuesta de - Un rastreador de AWS Glue cataloga los datos, que Athena puede consultar
El siguiente diagrama ilustra nuestra arquitectura.
Requisitos previos
Antes de comenzar, asegúrese de tener los siguientes requisitos previos:
Implementar la solución
Para esta solución, proporcionamos una plantilla de CloudFormation que configura los servicios incluidos en la arquitectura para permitir implementaciones repetibles. Esta plantilla crea los siguientes recursos:
- Dos cubos S3: un cubo de aterrizaje para almacenar datos de empleados de muestra y un cubo de capa procesada para el lago de datos mutable (Delta Lake)
- Una función Lambda para generar registros de muestra
- Un trabajo de extracción, transformación y carga (ETL) de AWS Glue para procesar los datos de origen desde el depósito de aterrizaje hasta el depósito procesado
Para implementar la solución, complete los siguientes pasos:
- Elige Pila de lanzamiento para iniciar la pila de CloudFormation:
- Ingrese un nombre de pila.
- Seleccione Reconozco que AWS CloudFormation podría crear recursos de IAM con nombres personalizados.
- Elige Crear pila.
Una vez completada la implementación de la pila de CloudFormation, vaya a la consola de AWS CloudFormation para anotar los siguientes recursos en la Salidas lengüeta:
- Recursos del lago de datos – Los cubos S3
scd-blog-landing-xxxx
yscd-blog-processed-xxxx
(denominadoscd-blog-landing
yscd-blog-processed
en las secciones siguientes de este post) - Generador de registros de muestra Función Lambda –
SampleDataGenaratorLambda-<CloudFormation Stack Name>
(denominadoSampleDataGeneratorLambda
) - Base de datos del catálogo de datos de AWS Glue –
deltalake_xxxxxx
(denominadodeltalake
) - Trabajo Delta de pegamento de AWS –
<CloudFormation-Stack-Name>-src-to-processed
(denominadosrc-to-processed
)
Tenga en cuenta que la implementación de la pila de CloudFormation en su cuenta incurre en cargos por uso de AWS.
Pruebe la implementación de SCD Tipo 2
Con la infraestructura instalada, está listo para probar el diseño general de la solución y consultar los registros históricos del conjunto de datos de los empleados. Esta publicación está diseñada para implementarse en un caso de uso de un cliente real, en el que obtiene datos de instantáneas completos a diario. Probamos los siguientes aspectos de la implementación de SCD:
- Ejecute un trabajo de AWS Glue para la carga inicial
- Simule un escenario donde no hay cambios en la fuente
- Simule escenarios de inserción, actualización y eliminación agregando nuevos registros y modificando y eliminando registros existentes
- Simule un escenario en el que el registro eliminado vuelve como una nueva inserción
Generar un conjunto de datos de empleados de muestra
Para probar la solución, y antes de que pueda iniciar la ingesta de datos inicial, es necesario identificar la fuente de datos. Para simplificar ese paso, se implementó una función Lambda en la pila de CloudFormation que acaba de implementar.
Abra la función y configure un evento de prueba, con el valor predeterminado hello-world
evento de plantilla JSON como se ve en la siguiente captura de pantalla. Proporcione un nombre de evento sin cambios en la plantilla y guarde el evento de prueba.
Elige Probar para invocar un evento de prueba, que invoca la función Lambda para generar los registros de muestra.
Cuando la función Lambda complete su invocación, podrá ver el siguiente conjunto de datos de empleados de muestra en el depósito de destino.
Ejecute el trabajo de AWS Glue
Confirme si ve el conjunto de datos del empleado en la ruta s3://scd-blog-landing/dataset/employee/
. Puede descargar el conjunto de datos y abrirlo en un editor de código como VS Code. El siguiente es un ejemplo del conjunto de datos:
Descargue el conjunto de datos y manténgalo listo, ya que modificará el conjunto de datos para futuros casos de uso para simular las inserciones, actualizaciones y eliminaciones. El conjunto de datos de muestra generado para usted será completamente diferente al que ve en el ejemplo anterior.
Para ejecutar el trabajo, complete los siguientes pasos:
- En la consola de AWS Glue, elija Empleo en el panel de navegación.
- elige el trabajo
src-to-processed
. - En Ron pestaña, elegir Ejecutar.
Cuando el trabajo de AWS Glue se ejecuta por primera vez, el trabajo lee el conjunto de datos del empleado de la ruta del depósito de destino e ingiere los datos en el depósito procesado como una tabla Delta.
Cuando se completa el trabajo, puede crear un rastreador para ver la carga de datos inicial. La siguiente captura de pantalla muestra la base de datos disponible en el Bases de datos .
- Elige Rastreadores en el panel de navegación.
- Elige Crear rastreador.
- Nombra tu rastreador
delta-lake-crawler
, A continuación, elija Siguiente.
- Seleccione Todavía no para datos ya asignados a tablas de AWS Glue.
- Elige Agregar una fuente de datos.
- En Fuente de datos menú desplegable, elija Delta Lake.
- Ingrese la ruta a la tabla Delta.
- Seleccione Crear tablas nativas.
- Elige Agregar una fuente de datos de Delta Lake.
- Elige Siguiente.
- Elija el rol que fue creado por la plantilla de CloudFormation, luego elija Siguiente.
- Elija la base de datos que fue creada por la plantilla de CloudFormation, luego elija Siguiente.
- Elige Crear rastreador.
- Seleccione su rastreador y elija Ejecutar.
Consultar los datos
Una vez que se completa el rastreador, puede ver la tabla que creó.
Para consultar los datos, complete los siguientes pasos:
- Elija la tabla de empleados y en la Acciones menú, seleccione Ver datos.
Será redirigido a la consola de Athena. Si no tiene el último motor de Athena, cree un nuevo grupo de trabajo de Athena con el último motor de Athena.
- under Administración en el panel de navegación, elija Grupos de trabajo.
- Elige Crear grupo de trabajo.
- Proporcione un nombre para el grupo de trabajo, como
DeltaWorkgroup
. - Seleccione Atenea SQL como el motor, y elija Motor Athena versión 3 para Versión del motor de consultas.
- Elige Crear grupo de trabajo.
- Después de crear el grupo de trabajo, seleccione el grupo de trabajo (
DeltaWorkgroup
) en el menú desplegable del editor de consultas de Athena.
- Ejecute la siguiente consulta en el
employee
mesa:
Nota: Actualice el nombre correcto de la base de datos de los resultados de CloudFormation antes de ejecutar la consulta anterior.
Puedes observar que el employee
La tabla tiene 25 registros. La siguiente captura de pantalla muestra el total de registros de empleados con algunos registros de muestra.
La tabla Delta se almacena con un emp_key
, que es único para todos y cada uno de los cambios y se utiliza para realizar un seguimiento de los cambios. El emp_key
se crea para cada inserción, actualización y eliminación, y se puede usar para encontrar todos los cambios relacionados con un solo emp_id
.
La emp_key
se crea usando el algoritmo hash SHA256, como se muestra en el siguiente código:
Realizar inserciones, actualizaciones y eliminaciones
Antes de realizar cambios en el conjunto de datos, ejecutemos el mismo trabajo una vez más. Suponiendo que la carga actual del origen sea la misma que la carga inicial sin cambios, el trabajo de AWS Glue no debería realizar ningún cambio en el conjunto de datos. Después de completar el trabajo, ejecute el anterior Select
consulta en el editor de consultas de Athena y confirma que todavía hay 25 registros activos con los siguientes valores:
- Los 25 registros con la columna
isCurrent=true
- Los 25 registros con la columna
end_date=Null
- Los 25 registros con la columna
delete_flag=false
Después de confirmar la ejecución del trabajo anterior con estos valores, modifiquemos nuestro conjunto de datos inicial con los siguientes cambios:
- Cambie el
isContractor
bandera afalse
(cambiarlo atrue
si su conjunto de datos ya muestrafalse
) paraemp_id=12
. - Eliminar toda la fila donde
emp_id=8
(asegúrese de guardar el registro en un editor de texto, porque usamos este registro en otro caso de uso). - Copie la fila para
emp_id=25
e inserte una nueva fila. Cambiar elemp_id
para ser26
y asegúrese de cambiar los valores para otras columnas también.
Después de realizar estos cambios, el conjunto de datos de origen del empleado tiene el siguiente código (para facilitar la lectura, solo hemos incluido los registros modificados como se describe en los tres pasos anteriores):
- Ahora, sube los cambios.
fake_emp_data.json
archivo al mismo prefijo de origen.
- Después de cargar el conjunto de datos de empleados modificado en Amazon S3, vaya a la consola de AWS Glue y ejecute el trabajo.
- Cuando se complete el trabajo, ejecute la siguiente consulta en el editor de consultas de Athena y confirme que hay 27 registros en total con los siguientes valores:
Nota: Actualice el nombre correcto de la base de datos de la salida de CloudFormation antes de ejecutar la consulta anterior.
- Ejecute otra consulta en el editor de consultas de Athena y confirme que hay 4 registros devueltos con los siguientes valores:
Nota: Actualice el nombre correcto de la base de datos de la salida de CloudFormation antes de ejecutar la consulta anterior.
Verá dos registros para emp_id=12
:
- Un
emp_id=12
registro con los siguientes valores (para el registro que se ingirió como parte de la carga inicial):emp_key=44cebb094ef289670e2c9325d5f3e4ca18fdd53850b7ccd98d18c7a57cb6d4b4
isCurrent=false
delete_flag=false
end_date=’2023-03-02’
- Un segundo
emp_id=12
registro con los siguientes valores (para el registro que se ingirió como parte del cambio en la fuente):emp_key=b60547d769e8757c3ebf9f5a1002d472dbebebc366bfbc119227220fb3a3b108
isCurrent=true
delete_flag=false
end_date=Null
(o cadena vacía)
El récord de emp_id=8
que se eliminó en la fuente como parte de esta ejecución seguirá existiendo pero con los siguientes cambios en los valores:
isCurrent=false
end_date=’2023-03-02’
delete_flag=true
El nuevo registro de empleado se insertará con los siguientes valores:
emp_id=26
isCurrent=true
end_date=NULL
(o cadena vacía)delete_flag=false
Tenga en cuenta que emp_key
los valores en su tabla real pueden ser diferentes a los que se proporcionan aquí como ejemplo.
- Para las eliminaciones, buscamos el emp_id de la tabla base junto con el nuevo archivo fuente y la unión interna con emp_key.
- Si la condición se evalúa como verdadera, verificamos si la tabla base de empleados emp_key es igual a las nuevas actualizaciones emp_key y obtenemos el registro actual sin eliminar (isCurrent=true y delete_flag=false).
- Fusionamos los cambios de eliminación del nuevo archivo con la tabla base para todas las filas de condiciones de eliminación coincidentes y actualizamos lo siguiente:
isCurrent=false
delete_flag=true
end_date=current_date
Ver el siguiente código:
- Tanto para las actualizaciones como para las inserciones, verificamos la condición si la tabla base
employee.emp_id
es igual anew changes.emp_id
y delemployee.emp_key
es igual anew changes.emp_key
, mientras solo recupera los registros actuales. - Si esta condición se evalúa como
true
, luego obtenemos el registro actual (isCurrent=true
ydelete_flag=false
). - Fusionamos los cambios actualizando lo siguiente:
- Si la segunda condición se evalúa como
true
:isCurrent=false
end_date=current_date
- O insertamos la fila completa de la siguiente manera si la segunda condición se evalúa como
false
:emp_id=new record’s emp_key
emp_key=new record’s emp_key
first_name=new record’s first_name
last_name=new record’s last_name
address=new record’s address
phone_number=new record’s phone_number
isContractor=new record’s isContractor
start_date=current_date
end_date=NULL
(o cadena vacía)isCurrent=true
delete_flag=false
- Si la segunda condición se evalúa como
Ver el siguiente código:
Como último paso, recuperemos el registro eliminado del cambio anterior al conjunto de datos de origen y veamos cómo se reinserta en el employee
table en el lago de datos y observe cómo se mantiene el historial completo.
Modifiquemos nuestro conjunto de datos modificado del paso anterior y hagamos los siguientes cambios.
- Agregar lo eliminado
emp_id=8
volver al conjunto de datos.
Después de realizar estos cambios, el conjunto de datos de origen de mi empleado tiene el siguiente código (para facilitar la lectura, solo hemos incluido el registro agregado como se describe en el paso anterior):
{"emp_id":8,"first_name":"Teresa","last_name":"Estrada","Address":"339 Scott ValleynGonzalesfort, PA 18212","phone_number":"435-600-3162","isContractor":false}
- Cargue el archivo del conjunto de datos del empleado modificado en el mismo prefijo de origen.
- Después de cargar el cambio
fake_emp_data.json
conjunto de datos a Amazon S3, vaya a la consola de AWS Glue y vuelva a ejecutar el trabajo. - Cuando se complete el trabajo, ejecute la siguiente consulta en el editor de consultas de Athena y confirme que hay 28 registros en total con los siguientes valores:
Nota: Actualice el nombre correcto de la base de datos de la salida de CloudFormation antes de ejecutar la consulta anterior.
- Ejecute la siguiente consulta y confirme que hay 5 registros:
Verá dos registros para emp_id=8
:
- Un
emp_id=8
registro con los siguientes valores (el registro anterior que se eliminó):emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
isCurrent=false
deleted_flag=true
end_date=’2023-03-02’
- Otra
emp_id=8
registro con los siguientes valores (el nuevo registro que se insertó en la última ejecución):emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
isCurrent=true
deleted_flag=false
end_date=NULL
(o cadena vacía)
La emp_key
los valores en su tabla real pueden ser diferentes a los que se proporcionan aquí como ejemplo. También tenga en cuenta que debido a que este es el mismo registro eliminado que se reinsertó en la carga posterior sin ningún cambio, no habrá cambios en el emp_key
.
Consultas de muestra del usuario final
Las siguientes son algunas consultas de usuario final de muestra para demostrar cómo se puede atravesar el historial de datos de cambios de empleados para generar informes:
- Consulta 1 – Recuperar una lista de todos los empleados que abandonaron la organización en el mes actual (por ejemplo, marzo de 2023).
La consulta anterior devolvería dos registros de empleados que abandonaron la organización.
- Consulta 2 – Recuperar una lista de nuevos empleados que se incorporaron a la organización en el mes actual (por ejemplo, marzo de 2023).
Nota: Actualice el nombre correcto de la base de datos de la salida de CloudFormation antes de ejecutar la consulta anterior.
La consulta anterior devolvería 23 registros de empleados activos que se unieron a la organización.
- Consulta 3 – Encuentre el historial de cualquier empleado dado en la organización (en este caso, el empleado 18).
Nota: Actualice el nombre correcto de la base de datos de la salida de CloudFormation antes de ejecutar la consulta anterior.
En la consulta anterior, podemos observar que el empleado 18 tuvo dos cambios en sus registros de empleados antes de dejar la organización.
Tenga en cuenta que los resultados de los datos proporcionados en este ejemplo son diferentes de lo que verá en sus registros específicos según los datos de muestra generados por la función Lambda.
Limpiar
Cuando haya terminado de experimentar con esta solución, limpie sus recursos para evitar que se incurra en cargos de AWS:
- Vacíe los cubos S3.
- Elimine la pila de la consola de AWS CloudFormation.
Conclusión
En esta publicación, demostramos cómo identificar los datos modificados para una fuente de datos semiestructurados y preservar los cambios históricos (SCD Tipo 2) en un S3 Delta Lake, cuando los sistemas de origen no pueden proporcionar la capacidad de captura de datos modificados, con AWS Pegamento. Puede ampliar aún más esta solución para permitir que las aplicaciones posteriores creen personalizaciones adicionales a partir de los datos de CDC capturados en el lago de datos.
Además, puede extender esta solución como parte de una orquestación usando Funciones de paso de AWS u otros orquestadores de uso común con los que su organización esté familiarizada. También puede ampliar esta solución agregando particiones cuando corresponda. También puede mantener la tabla delta por compactando los archivos pequeños.
Sobre los autores
Nith Govindasivan, es Arquitecto de Data Lake con AWS Professional Services, donde ayuda a los clientes a incorporarse en su viaje de arquitectura de datos moderna a través de la implementación de soluciones de Big Data y Analytics. Fuera del trabajo, Nith es un ávido fanático del cricket, mira casi cualquier cricket durante su tiempo libre y disfruta de viajes largos y viajes internacionales.
Vijay Vélpula es Arquitecto de datos con Servicios profesionales de AWS. Ayuda a los clientes a implementar Big Data y Analytics Solutions. Fuera del trabajo, le gusta pasar tiempo con la familia, viajar, hacer caminatas y andar en bicicleta.
Sriharsh Adari es arquitecto sénior de soluciones en Amazon Web Services (AWS), donde ayuda a los clientes a trabajar hacia atrás a partir de los resultados comerciales para desarrollar soluciones innovadoras en AWS. A lo largo de los años, ha ayudado a varios clientes en la transformación de plataformas de datos en verticales de la industria. Su área principal de especialización incluye estrategia tecnológica, análisis de datos y ciencia de datos. En su tiempo libre, le gusta practicar deportes, ver programas de televisión en exceso y jugar Tabla.
- Distribución de relaciones públicas y contenido potenciado por SEO. Consiga amplificado hoy.
- Platoblockchain. Inteligencia del Metaverso Web3. Conocimiento amplificado. Accede Aquí.
- Fuente: https://aws.amazon.com/blogs/big-data/implement-slowly-changing-dimensions-in-a-data-lake-using-aws-glue-and-delta/