Logotipo de Zephyrnet

Comparta y publique sus datos de Snowflake en AWS Data Exchange mediante el uso compartido de datos de Amazon Redshift

Fecha:

Desplazamiento al rojo de Amazon es un servicio de almacenamiento de datos a escala de petabyte totalmente administrado en la nube. Puede comenzar con unos pocos cientos de gigabytes de datos y escalar a un petabyte o más. En la actualidad, decenas de miles de clientes de AWS, desde empresas Fortune 500, empresas emergentes y todo lo demás, utilizan Amazon Redshift para ejecutar paneles de inteligencia comercial (BI) de misión crítica, analizar datos de transmisión en tiempo real y ejecutar análisis predictivos. Con el aumento constante de los datos generados, los clientes de Amazon Redshift continúan logrando éxitos para brindar un mejor servicio a sus usuarios finales, mejorar sus productos y administrar un negocio eficiente y eficaz.

En esta publicación, hablamos de un cliente que actualmente usa Snowflake para almacenar datos analíticos. El cliente debe ofrecer estos datos a los clientes que utilizan Amazon Redshift a través de Intercambio de datos de AWS, el servicio más completo del mundo para conjuntos de datos de terceros. Explicamos en detalle cómo implementar un proceso completamente integrado que ingiera automáticamente datos de Snowflake en Amazon Redshift y los ofrezca a los clientes a través de AWS Data Exchange.

Resumen de la solución

La solución consta de cuatro pasos de alto nivel:

  1. Configure Snowflake para enviar los datos modificados de las tablas identificadas a un Servicio de almacenamiento simple de Amazon (Amazon S3) cubo.
  2. Utilice un personalizado Cargador automático Redshift para cargar estos datos aterrizados de Amazon S3 en Amazon Redshift.
  3. Combine los datos de las tablas provisionales de S3 de captura de datos de cambios (CDC) con las tablas de Amazon Redshift.
  4. Utilice el uso compartido de datos de Amazon Redshift para licenciar los datos a los clientes a través de AWS Data Exchange como una oferta pública o privada.

El siguiente diagrama ilustra este flujo de trabajo.

Diagrama de la arquitectura de la solución

Requisitos previos

Para comenzar, necesita los siguientes requisitos previos:

Configure Snowflake para realizar un seguimiento de los datos modificados y descargarlos en Amazon S3

En Snowflake, identifique las tablas que necesita replicar en Amazon Redshift. Para el propósito de esta demostración, usamos los datos en el TPCH_SF1 esquema Customer, LineItemy Orders mesas de la SNOWFLAKE_SAMPLE_DATA base de datos, que viene de fábrica con su cuenta de Snowflake.

  1. Asegúrate de que el nombre artístico externo de Snowflake unload_to_s3 creado en los requisitos previos apunta al prefijo S3 s3-redshift-loader-sourcecreado en el paso anterior.
  2. Crear un nuevo esquema BLOG_DEMO existentes DEMO_DB base de datos:CREATE SCHEMA demo_db.blog_demo;
  3. Duplica el Customer, LineItemy Orders tablas en el TPCH_SF1 esquema a la BLOG_DEMO esquema:
    CREATE TABLE CUSTOMER AS SELECT * FROM snowflake_sample_data.tpch_sf1.CUSTOMER;
    CREATE TABLE ORDERS AS
    SELECT * FROM snowflake_sample_data.tpch_sf1.ORDERS;
    CREATE TABLE LINEITEM AS SELECT * FROM snowflake_sample_data.tpch_sf1.LINEITEM;

  4. Verifique que las tablas se hayan duplicado correctamente:
    SELECT table_catalog, table_schema, table_name, row_count, bytes
    FROM INFORMATION_SCHEMA.TABLES
    WHERE TABLE_SCHEMA = 'BLOG_DEMO'
    ORDER BY ROW_COUNT;

    descargar-paso-4

  5. Crear flujos de mesa para realizar un seguimiento de los cambios del lenguaje de manipulación de datos (DML) realizados en las tablas, incluidas las inserciones, actualizaciones y eliminaciones:
    CREATE OR REPLACE STREAM CUSTOMER_CHECK ON TABLE CUSTOMER;
    CREATE OR REPLACE STREAM ORDERS_CHECK ON TABLE ORDERS;
    CREATE OR REPLACE STREAM LINEITEM_CHECK ON TABLE LINEITEM;

  6. Realice cambios DML en las tablas (para esta publicación, ejecutamos UPDATE en todas las tablas y MERGE en el customer mesa):
    UPDATE customer SET c_comment = 'Sample comment for blog demo' WHERE c_custkey between 0 and 10; UPDATE orders SET o_comment = 'Sample comment for blog demo' WHERE o_orderkey between 1800001 and 1800010; UPDATE lineitem SET l_comment = 'Sample comment for blog demo' WHERE l_orderkey between 3600001 and 3600010;
    MERGE INTO customer c USING ( SELECT n_nationkey FROM snowflake_sample_data.tpch_sf1.nation s WHERE n_name = 'UNITED STATES') n ON n.n_nationkey = c.c_nationkey WHEN MATCHED THEN UPDATE SET c.c_comment = 'This is US based customer1';

  7. Valide que las tablas de flujo hayan registrado todos los cambios:
    SELECT * FROM CUSTOMER_CHECK; SELECT * FROM ORDERS_CHECK; SELECT * FROM LINEITEM_CHECK;

    Por ejemplo, podemos consultar el siguiente valor de clave de cliente para verificar cómo se registraron los eventos para la instrucción MERGE en la tabla de clientes:

    SELECT * FROM CUSTOMER_CHECK where c_custkey = 60027;

    Podemos ver el METADATA$ISUPDATE columna como TRUE, y vemos DELETE seguido de INSERT en el METADATA$ACTION columna.
    descargar-val-paso-7

  8. Ejecute el comando COPY para descargar el CDC de las tablas de flujo al depósito S3 usando el nombre de la etapa externa unload_to_s3.En el siguiente código, también estamos copiando los datos a las carpetas S3 que terminan en _stg para asegurarse de que cuando Redshift Auto Loader cree automáticamente estas tablas en Amazon Redshift, se creen y marquen como tablas provisionales:
    COPY INTO @unload_to_s3/customer_stg/
    FROM (select *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.customer_check)
    FILE_FORMAT = (TYPE = PARQUET)
    OVERWRITE = TRUE HEADER = TRUE;

    COPY INTO @unload_to_s3/customer_stg/
    FROM (select *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.customer_check)
    FILE_FORMAT = (TYPE = PARQUET)
    OVERWRITE = TRUE HEADER = TRUE;

    COPY INTO @unload_to_s3/lineitem_stg/ FROM (select *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.lineitem_check) FILE_FORMAT = (TYPE = PARQUET) OVERWRITE = TRUE HEADER = TRUE;

  9. Verifique los datos en el depósito S3. Se crearán tres subcarpetas en la carpeta s3-redshift-loader-source del depósito S3, y cada una tendrá archivos de datos .parquet.descargar-paso-9-valdescargar-paso-9-valTambién puede automatizar los comandos COPY anteriores mediante tareas, que se pueden programar para que se ejecuten con una frecuencia establecida para la copia automática de datos de CDC de Snowflake a Amazon S3.
  10. Ingrese al ACCOUNTADMIN papel para asignar el EXECUTE TASK privilegio. En este escenario, estamos asignando los privilegios al SYSADMIN papel:
    USE ROLE accountadmin;
    GRANT EXECUTE TASK, EXECUTE MANAGED TASK ON ACCOUNT TO ROLE sysadmin;

  11. Ingrese al SYSADMIN rol para crear tres tareas separadas para ejecutar tres comandos COPY cada 5 minutos: USE ROLE sysadmin;
    /* Task to offload Customer CDC table */ CREATE TASK sf_rs_customer_cdc WAREHOUSE = SMALL SCHEDULE = 'USING CRON 5 * * * * UTC' AS COPY INTO @unload_to_s3/customer_stg/ FROM (select *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.customer_check) FILE_FORMAT = (TYPE = PARQUET) OVERWRITE = TRUE HEADER = TRUE;
    /*Task to offload Orders CDC table */ CREATE TASK sf_rs_orders_cdc WAREHOUSE = SMALL SCHEDULE = 'USING CRON 5 * * * * UTC' AS COPY INTO @unload_to_s3/orders_stg/ FROM (select *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.orders_check)
    FILE_FORMAT = (TYPE = PARQUET)
    OVERWRITE = TRUE HEADER = TRUE;

    /* Task to offload Lineitem CDC table */ CREATE TASK sf_rs_lineitem_cdc WAREHOUSE = SMALL SCHEDULE = 'USING CRON 5 * * * * UTC' AS COPY INTO @unload_to_s3/lineitem_stg/ FROM (select *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.lineitem_check)
    FILE_FORMAT = (TYPE = PARQUET)
    OVERWRITE = TRUE HEADER = TRUE;

    Cuando las tareas se crean por primera vez, están en un SUSPENDED estado.

  12. Modifique las tres tareas y configúrelas en el estado REANUDAR:
    ALTER TASK sf_rs_customer_cdc RESUME;
    ALTER TASK sf_rs_orders_cdc RESUME;
    ALTER TASK sf_rs_lineitem_cdc RESUME;

  13. Valide que las tres tareas se hayan reanudado correctamente: SHOW TASKS;descargar-setp-13-valAhora las tareas se ejecutarán cada 5 minutos y buscarán nuevos datos en las tablas de transmisión para descargarlos en Amazon S3. Tan pronto como los datos se migren de Snowflake a Amazon S3, Redshift Auto Loader infiere automáticamente el esquema y crea instantáneamente las tablas correspondientes en Amazon Redshift. . Luego, de manera predeterminada, comienza a cargar datos de Amazon S3 a Amazon Redshift cada 5 minutos. Tú también puedes cambiar la configuración predeterminada de 5 minutos
  14. En la consola de Amazon Redshift, inicie el editor de consultas v2 y conéctese a su clúster de Amazon Redshift.
  15. Navega hasta el dev base de datos, public esquema y ampliar Mesas.
    Puede ver tres tablas provisionales creadas con el mismo nombre que las carpetas correspondientes en Amazon S3.
  16. Valide los datos en una de las tablas ejecutando la siguiente consulta:SELECT * FROM "dev"."public"."customer_stg";descargar-paso-16-val

Configurar la utilidad Redshift Auto Loader

Redshift Auto Loader facilita significativamente la ingesta de datos en Amazon Redshift porque carga automáticamente los archivos de datos de Amazon S3 a Amazon Redshift. Los archivos se asignan a las tablas respectivas con solo colocar los archivos en ubicaciones preconfiguradas en Amazon S3. Para obtener más detalles sobre la arquitectura y el flujo de trabajo interno, consulte el Repositorio GitHub.

Usamos un Formación en la nube de AWS plantilla para configurar Redshift Auto Loader. Complete los siguientes pasos:

  1. Inicie CloudFormation plantilla.
  2. Elige Siguiente.
    cargador automático-paso-2
  3. Nombre de pila, ingresa un nombre.
  4. Proporcione los parámetros enumerados en la siguiente tabla.
    Parámetro de plantilla de CloudFormation Valores permitidos Descripción
    RedshiftClusterIdentifier Identificador de clúster de Amazon Redshift Ingrese el identificador del clúster de Amazon Redshift.
    DatabaseUserName Nombre de usuario de la base de datos en el clúster de Amazon Redshift El nombre de usuario de la base de datos de Amazon Redshift que tiene acceso para ejecutar el script SQL.
    DatabaseName Nombre del depósito S3 El nombre de la base de datos principal de Amazon Redshift donde se ejecuta el script SQL.
    DatabaseSchemaName Nombre de la base de datos en Amazon Redshift El nombre del esquema de Amazon Redshift donde se crean las tablas.
    RedshiftIAMRoleARN ARN de rol de IAM predeterminado o válido adjunto al clúster de Amazon Redshift El ARN del rol de IAM asociado con el clúster de Amazon Redshift. Su rol de IAM predeterminado está configurado para el clúster y tiene acceso a su depósito S3, déjelo en el valor predeterminado.
    CopyCommandOptions Opción de copia; el valor predeterminado es el delimitador '|' gzip

    Proporcione los parámetros adicionales de formato de datos del comando COPY.

    Si InitiateSchemaDetection = Sí, el proceso intenta detectar el esquema y establece automáticamente las opciones de comando de copia adecuadas.

    En caso de falla en la detección del esquema o cuando InitiateSchemaDetection = No, este valor se usa como las opciones predeterminadas del comando COPY para cargar datos.

    SourceS3Bucket Nombre del depósito S3 El depósito de S3 donde se almacenan los datos. Asegúrese de que el rol de IAM asociado al clúster de Amazon Redshift tenga acceso a este depósito.
    InitiateSchemaDetection Si no

    Se establece en para detectar dinámicamente el esquema antes de cargar el archivo y crear una tabla en Amazon Redshift si aún no existe. Si ya existe una tabla, no la eliminará ni la volverá a crear en Amazon Redshift.

    Si la detección del esquema falla, el proceso usa las opciones COPY predeterminadas como se especifica en CopyCommandOptions.

    Redshift Auto Loader utiliza el comando COPY para cargar datos en Amazon Redshift. Para esta publicación, establezca CopyCommandOptions de la siguiente manera, y configure cualquier opción de comando COPY admitida:

    delimiter '|' dateformat 'auto' TIMEFORMAT 'auto'

    parámetros de entrada del autocargador

  5. Elige Siguiente.
  6. Acepte los valores predeterminados en la página siguiente y elija Siguiente.
  7. Seleccione la casilla de verificación de acuse de recibo y elija Crear pila.
    cargador automático-paso-7
  8. Supervise el progreso de la creación de la pila y espere hasta que se complete.
  9. Para verificar la configuración del cargador automático de Redshift, inicie sesión en la consola de Amazon S3 y navegue hasta el depósito de S3 que proporcionó.
    Deberías ver un nuevo directorio. s3-redshift-loader-source es creado.
    cargador automático-paso-9

Copie todos los archivos de datos exportados de Snowflake en s3-redshift-loader-source.

Combine los datos de las tablas provisionales de CDC S3 con las tablas de Amazon Redshift

Para fusionar sus datos de Amazon S3 a Amazon Redshift, complete los siguientes pasos:

  1. Crear una tabla de preparación temporal merge_stg e inserte todas las filas de la tabla provisional de S3 que tienen metadata_action as INSERT, usando el siguiente código. Esto incluye todas las nuevas inserciones, así como la actualización.
    CREATE TEMP TABLE merge_stg AS
    SELECT * FROM
    (
    SELECT *, DENSE_RANK() OVER (PARTITION BY c_custkey ORDER BY last_updated_ts DESC
    ) AS rnk
    FROM customer_stg WHERE rnk = 1 AND metadata$action = 'INSERT'

    El código anterior usa una función de ventana. DENSE_RANK() para seleccionar las últimas entradas para un determinado c_custkey asignando un rango a cada fila para un determinado c_custkey y ordene los datos en orden descendente usando last_updated_ts. Luego seleccionamos las filas con rnk=1 y metadata$action = ‘INSERT’ para capturar todas las inserciones.

  2. Usar la tabla de preparación de S3 customer_stg para eliminar los registros de la tabla base customer, que se marcan como eliminaciones o actualizaciones:
    DELETE FROM customer USING customer_stg WHERE customer.c_custkey = customer_stg.c_custkey;

    Esto elimina todas las filas que están presentes en la tabla provisional de CDC S3, que se encarga de las filas marcadas para eliminación y actualizaciones.

  3. Usar la tabla de preparación temporal merge_stg para insertar los registros marcados para actualizaciones o inserciones:
    INSERT INTO customer SELECT c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment FROM merge_stg;

  4. Trunca la tabla de etapas, porque ya hemos actualizado la tabla de destino:truncate customer_stg;
  5. También puede ejecutar los pasos anteriores como un procedimiento almacenado:
    CREATE OR REPLACE PROCEDURE merge_customer()
    AS $$
    BEGIN
    /*CREATING TEMP TABLE TO GET THE MOST LATEST RECORDS FOR UPDATES/NEW INSERTS*/
    CREATE TEMP TABLE merge_stg AS
    SELECT * FROM
    (
    SELECT *, DENSE_RANK() OVER (PARTITION BY c_custkey ORDER BY last_updated_ts DESC ) AS rnk
    FROM customer_stg
    )
    WHERE rnk = 1 AND metadata$action = 'INSERT';
    /* DELETING FROM THE BASE TABLE USING THE CDC STAGING TABLE ALL THE RECORDS MARKED AS DELETES OR UPDATES*/
    DELETE FROM customer
    USING customer_stg
    WHERE customer.c_custkey = customer_stg.c_custkey;
    /*INSERTING NEW/UPDATED RECORDS IN THE BASE TABLE*/ INSERT INTO customer
    SELECT c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment
    FROM merge_stg;
    truncate customer_stg;
    END;
    $$ LANGUAGE plpgsql;

    Por ejemplo, observemos los estados de antes y después de la tabla de clientes cuando ha habido un cambio en los datos de un cliente en particular.

    La siguiente captura de pantalla muestra los nuevos cambios registrados en el customer_stg mesa para c_custkey = 74360.
    combinar-proceso-nuevos-cambios
    Podemos ver dos registros para un cliente con c_custkey=74360 uno con metadata$action as DELETE y uno con metadata$action as INSERT. Eso significa que el registro con c_custkey se actualizó en el origen y estos cambios deben aplicarse al destino customer tabla en Amazon Redshift.

    La siguiente captura de pantalla muestra el estado actual del customer tabla antes de que estos cambios se hayan fusionado utilizando el procedimiento almacenado anterior:
    fusión-proceso-estado-actual

  6. Ahora, para actualizar la tabla de destino, podemos ejecutar el procedimiento almacenado de la siguiente manera: CALL merge_customer()La siguiente captura de pantalla muestra el estado final de la tabla de destino una vez que se completa el procedimiento almacenado.
    combinar-proceso-después-sp

Ejecutar el procedimiento almacenado en un horario

También puede ejecutar el procedimiento almacenado en un horario a través de Puente de eventos de Amazon. Los pasos de programación son los siguientes:

  1. En la consola de EventBridge, elija Crear regla.
    sp-horario-1
  2. Nombre, ingrese un nombre significativo, por ejemplo, Trigger-Snowflake-Redshift-CDC-Merge.
  3. Bus de eventos, escoger tu préstamo estudiantil.
  4. Tipo de regla, selecciona Horarios.
  5. Elige Siguiente.
    sp-planificación-paso-5
  6. Patrón de programación, seleccione Un programa que se ejecuta a un ritmo regular, como cada 10 minutos.
  7. Tasa de expresión, introduzca Valor como 5 y elige Unidad as Min.
  8. Elige Siguiente.
    sp-planificación-paso-8
  9. Tipos de objetivo, escoger Servicio de AWS.
  10. Seleccione un objetivo, escoger Cúmulo de corrimiento al rojo.
  11. Médico, elija el identificador de clúster de Amazon Redshift.
  12. Nombre de la base de datos, escoger dev.
  13. usuario de la base de datos, ingrese un nombre de usuario con acceso para ejecutar el procedimiento almacenado. Utiliza credenciales temporales para autenticarse.
  14. Opcionalmente, también puede utilizar Director de secretos de AWS para autenticación.
  15. Declaración SQL, introduzca CALL merge_customer().
  16. Rol de ejecución, seleccione Cree un nuevo rol para este recurso específico.
  17. Elige Siguiente.
    sp-planificación-paso-17
  18. Revise los parámetros de la regla y elija Crear regla.

Una vez creada la regla, activa automáticamente el procedimiento almacenado en Amazon Redshift cada 5 minutos para fusionar los datos de CDC en la tabla de destino.

Configure Amazon Redshift para compartir los datos identificados con AWS Data Exchange

Ahora que tiene los datos almacenados en Amazon Redshift, puede publicarlos para los clientes mediante AWS Data Exchange.

  1. En Amazon Redshift, con cualquier editor de consultas, cree el recurso compartido de datos y agregue las tablas para compartir:
    CREATE DATASHARE salesshare MANAGEDBY ADX;
    ALTER DATASHARE salesshare ADD SCHEMA tpch_sf1;
    ALTER DATASHARE salesshare ADD TABLE tpch_sf1.customer;

    ADX-paso 1

  2. En la consola de AWS Data Exchange, cree su conjunto de datos.
  3. Seleccione Uso compartido de datos de Amazon Redshift.
    ADX-paso3-crear-compartir datos
  4. Cree una revisión en el conjunto de datos.
    ADX-paso4-crear-revisión
  5. Agregue activos a la revisión (en este caso, el recurso compartido de datos de Amazon Redshift).
    ADX-addassets
  6. Finaliza la revisión.
    ADX-paso-6-finalizar revisión

Después de crear el conjunto de datos, puede publicarlo en el catálogo público o directamente a los clientes como un producto privado. Para obtener instrucciones sobre cómo crear y publicar productos, consulte NUEVO: intercambio de datos de AWS para Amazon Redshift

Limpiar

Para evitar incurrir en cargos futuros, complete los siguientes pasos:

  1. Elimine la pila de CloudFormation utilizada para crear el cargador automático Redshift.
  2. Elimine el clúster de Amazon Redshift creado para esta demostración.
  3. Si estaba usando un clúster existente, descarte la tabla externa creada y el esquema externo.
  4. Elimine el depósito S3 que creó.
  5. Elimine los objetos Snowflake que creó.

Conclusión

En esta publicación, demostramos cómo puede configurar un proceso completamente integrado que replique continuamente los datos de Snowflake a Amazon Redshift y luego use Amazon Redshift para ofrecer datos a los clientes intermedios a través de AWS Data Exchange. Puede utilizar la misma arquitectura para otros fines, como compartir datos con otros clústeres de Amazon Redshift dentro de la misma cuenta, entre cuentas o incluso entre regiones si es necesario.


Acerca de los autores

Raks KhareRaks Khare es un arquitecto de soluciones especialista en análisis en AWS con sede en Pensilvania. Ayuda a los clientes a diseñar soluciones de análisis de datos a escala en la plataforma de AWS.

Ekta Ahuja es arquitecto sénior de soluciones especialista en análisis en AWS. Le apasiona ayudar a los clientes a crear soluciones de análisis y datos escalables y sólidas. Antes de AWS, trabajó en varios roles diferentes de análisis e ingeniería de datos. Fuera del trabajo, le gusta hornear, viajar y los juegos de mesa.

Tahir Aziz es arquitecto de soluciones de análisis en AWS. Ha trabajado en la creación de almacenes de datos y soluciones de big data durante más de 13 años. Le encanta ayudar a los clientes a diseñar soluciones de análisis integrales en AWS. Fuera del trabajo, le gusta viajar.
y cocinar.

Ahmed Shehat es arquitecto sénior de soluciones especialista en análisis en AWS con sede en Toronto. Tiene más de dos décadas de experiencia ayudando a los clientes a modernizar sus plataformas de datos. A Ahmed le apasiona ayudar a los clientes a crear soluciones analíticas eficientes, escalables y con buen rendimiento.

punto_img

Información más reciente

punto_img