Logotipo de Zephyrnet

Mejores prácticas para implementar análisis casi en tiempo real mediante Amazon Redshift Streaming Ingestion con Amazon MSK | Servicios web de Amazon

Fecha:

Desplazamiento al rojo de Amazon es un almacén de datos en la nube escalable y totalmente administrado que acelera el tiempo para obtener información valiosa con análisis a escala rápidos, sencillos y seguros. Decenas de miles de clientes confían en Amazon Redshift para analizar exabytes de datos y ejecutar consultas analíticas complejas, lo que lo convierte en el almacén de datos en la nube más utilizado. Puede ejecutar y escalar análisis en segundos sobre todos sus datos, sin tener que administrar su infraestructura de almacén de datos.

Puede utilizar el Ingestión de streaming de Amazon Redshift Capacidad para actualizar sus bases de datos analíticas casi en tiempo real. La ingesta de streaming de Amazon Redshift simplifica las canalizaciones de datos al permitirle crear vistas materializadas directamente sobre los flujos de datos. Con esta capacidad en Amazon Redshift, puede utilizar el lenguaje de consulta estructurado (SQL) para conectarse e ingerir datos directamente desde flujos de datos, como Secuencias de datos de Amazon Kinesis or Streaming administrado por Amazon para Apache Kafka (Amazon MSK) flujos de datos y extraer datos directamente a Amazon Redshift.

En esta publicación, analizamos las mejores prácticas para implementar análisis casi en tiempo real mediante la ingesta de streaming de Amazon Redshift con Amazon MSK.

Resumen de la solución

Analizamos un canal de ejemplo para ingerir datos de un tema de MSK en Amazon Redshift mediante la ingestión de streaming de Amazon Redshift. También mostramos cómo desanidar datos JSON utilizando notación de puntos en Amazon Redshift. El siguiente diagrama ilustra la arquitectura de nuestra solución.

El flujo del proceso consta de los siguientes pasos:

  1. Cree una vista materializada de transmisión en su clúster de Redshift para consumir datos de transmisión en vivo de los temas de MSK.
  2. Utilice un procedimiento almacenado para implementar la captura de datos modificados (CDC) mediante la combinación única de Kafka Partition y Kafka Offset en el nivel de registro para el tema MSK ingerido.
  3. Cree una tabla orientada al usuario en el clúster Redshift y utilice la notación de puntos para desanidar el documento JSON de la vista materializada de transmisión en columnas de datos de la tabla. Puede cargar datos nuevos continuamente llamando al procedimiento almacenado a intervalos regulares.
  4. Establecer conectividad entre un Amazon QuickSight panel de control y Amazon Redshift para ofrecer visualización e información valiosa.

Como parte de esta publicación, también discutimos los siguientes temas:

  • Pasos para configurar la ingesta de streaming entre cuentas desde Amazon MSK a Amazon Redshift
  • Mejores prácticas para lograr un rendimiento optimizado a partir de vistas materializadas en streaming
  • Técnicas de monitoreo para rastrear fallas en la ingesta de streaming de Amazon Redshift

Requisitos previos

Debes tener lo siguiente:

  • Una cuenta de AWS.
  • Uno de los siguientes recursos, según su caso de uso:
  • Un clúster MSK. Para obtener instrucciones, consulte Crear un clúster de Amazon MSK.
  • Un tema en su clúster de MSK donde su productor de datos puede publicar datos.
  • Un productor de datos para escribir datos en el tema en su clúster MSK.

Consideraciones al configurar su tema MSK

Tenga en cuenta las siguientes consideraciones al configurar su tema MSK:

  • Asegúrese de que el nombre de su tema de MSK no tenga más de 128 caracteres.
  • Al momento de escribir este artículo, los registros MSK que contienen datos comprimidos no se pueden consultar directamente en Amazon Redshift. Amazon Redshift no admite ningún método de descompresión nativo para datos comprimidos del lado del cliente en un tema de MSK.
  • Seguir y las mejores prácticas mientras configura su clúster MSK.
  • Revisar la ingesta de streaming limitaciones por cualquier otra consideración.

Configurar la ingestión de transmisión

Para configurar la ingestión de transmisión, complete los siguientes pasos:

  1. Configura el Gestión de identidades y accesos de AWS (IAM) rol y política de confianza necesarios para la ingestión de streaming. Para obtener instrucciones, consulte la Configurar IAM y realizar la ingesta de streaming desde Kafka.
  2. Asegúrese de que los datos fluyan hacia su tema de MSK usando Reloj en la nube de Amazon métrica (por ejemplo, Bytes de salida por segundo).
  3. Inicie el editor de consultas v2 desde la consola de Amazon Redshift o utilice su cliente SQL preferido para conectarse a su clúster de Redshift para los siguientes pasos. Los siguientes pasos se ejecutaron en el editor de consultas v2.
  4. Cree un esquema externo para asignarlo al clúster MSK. Reemplace el ARN de su función de IAM y el ARN del clúster de MSK en la siguiente declaración:
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  'iam-role-arn' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn';
    

  5. Opcionalmente, si los nombres de sus temas distinguen entre mayúsculas y minúsculas, debe habilitar enable_case_sensitive_identifier para poder acceder a ellos en Amazon Redshift. Para utilizar identificadores que distingan entre mayúsculas y minúsculas, establezca enable_case_sensitive_identifier a verdadero a nivel de sesión, usuario o clúster:
    SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;

  6. Cree una vista materializada para consumir los datos de transmisión del tema MSK:
    CREATE MATERIALIZED VIEW Orders_Stream_MV AS
    SELECT kafka_partition, 
     kafka_offset, 
     refresh_time,
     JSON_PARSE(kafka_value) as Data
    FROM custschema."ORDERTOPIC"
    WHERE CAN_JSON_PARSE(kafka_value);
    

La columna de metadatos kafka_value que llega desde Amazon MSK se almacena en VARBYTE formato en Amazon Redshift. Para esta publicación, utilizas el JSON_PARSE función para convertir kafka_value a una Tipo de datos SÚPER. También usas el CAN_JSON_PARSE funcionan en la condición de filtro para omitir registros JSON no válidos y protegerse contra errores debidos a fallas de análisis de JSON. Más adelante en esta publicación analizamos cómo almacenar los datos no válidos para futuras depuraciones.

  1. Actualice la vista materializada de streaming, lo que hace que Amazon Redshift lea el tema de MSK y cargue datos en la vista materializada:
    REFRESH MATERIALIZED VIEW Orders_Stream_MV;

También puede configurar su vista materializada de transmisión para usar capacidades de actualización automática. Esto actualizará automáticamente su vista materializada a medida que los datos lleguen a la transmisión. Ver CREAR VISTA MATERIALIZADA para obtener instrucciones para crear una vista materializada con actualización automática.

Desanida el documento JSON

El siguiente es un ejemplo de un documento JSON que se ingirió desde el tema MSK en la columna Datos de tipo SUPER en la vista materializada de streaming. Orders_Stream_MV:

{
   "EventType":"Orders",
   "OrderID":"103",
   "CustomerID":"C104",
   "CustomerName":"David Smith",
   "OrderDate":"2023-09-02",
   "Store_Name":"Store-103",
   "ProductID":"P004",
   "ProductName":"Widget-X-003",
   "Quatity":"5",
   "Price":"2500",
   "OrderStatus":"Initiated"
}

Utilice la notación de puntos como se muestra en el siguiente código para desanidar su carga útil JSON:

SELECT 
    data."OrderID"::INT4 as OrderID
    ,data."ProductID"::VARCHAR(36) as ProductID
    ,data."ProductName"::VARCHAR(36) as ProductName
    ,data."CustomerID"::VARCHAR(36) as CustomerID
    ,data."CustomerName"::VARCHAR(36) as CustomerName
    ,data."Store_Name"::VARCHAR(36) as Store_Name
    ,data."OrderDate"::TIMESTAMPTZ as OrderDate
    ,data."Quatity"::INT4 as Quatity
    ,data."Price"::DOUBLE PRECISION as Price
    ,data."OrderStatus"::VARCHAR(36) as OrderStatus
    ,"kafka_partition"::BIGINT  
    ,"kafka_offset"::BIGINT
FROM orders_stream_mv;

La siguiente captura de pantalla muestra cómo se ve el resultado después de desanidar.

Si tiene matrices en su documento JSON, considere desanidar sus datos usando PartiQL Declaraciones en Amazon Redshift. Para obtener más información, consulte la sección Desanida el documento JSON en el post Análisis casi en tiempo real mediante la ingestión de streaming de Amazon Redshift con Amazon Kinesis Data Streams y Amazon DynamoDB.

Estrategia de carga de datos incremental

Complete los siguientes pasos para implementar una carga de datos incremental:

  1. Cree una tabla denominada Pedidos en Amazon Redshift, que los usuarios finales utilizarán para visualización y análisis empresarial:
    CREATE TABLE public.Orders (
        orderid integer ENCODE az64,
        productid character varying(36) ENCODE lzo,
        productname character varying(36) ENCODE lzo,
        customerid character varying(36) ENCODE lzo,
        customername character varying(36) ENCODE lzo,
        store_name character varying(36) ENCODE lzo,
        orderdate timestamp with time zone ENCODE az64,
        quatity integer ENCODE az64,
        price double precision ENCODE raw,
        orderstatus character varying(36) ENCODE lzo
    ) DISTSTYLE AUTO;
    

A continuación, crea un procedimiento almacenado llamado SP_Orders_Load implementar CDC desde una vista materializada de transmisión y cargarlo en la versión final Orders mesa. Usas la combinación de Kafka_Partition y Kafka_Offset disponibles en la vista materializada de streaming como columnas del sistema para implementar CDC. La combinación de estas dos columnas siempre será única dentro de un tema de MSK, lo que garantiza que no se pierda ninguno de los registros durante el proceso. El procedimiento almacenado contiene los siguientes componentes:

  • Para utilizar identificadores que distinguen entre mayúsculas y minúsculas, establezca enable_case_sensitive_identifier en verdadero a nivel de sesión, usuario o clúster.
  • Actualice la vista materializada de transmisión manualmente si la actualización automática no está habilitada.
  • Cree una tabla de auditoría llamada Orders_Streaming_Audit si no existe para realizar un seguimiento del último desplazamiento de una partición que se cargó en la tabla Pedidos durante la última ejecución del procedimiento almacenado.
  • Desanidar e insertar solo datos nuevos o modificados en una tabla provisional llamada Orders_Staging_Table, leyendo desde la vista materializada de streaming Orders_Stream_MV, Donde Kafka_Offset es mayor que el último procesado Kafka_Offset registrado en la tabla de auditoría Orders_Streaming_Audit para Kafka_Partition siendo procesado.
  • Al cargar por primera vez utilizando este procedimiento almacenado, no habrá datos en el Orders_Streaming_Audit tabla y todos los datos de Orders_Stream_MV se cargará en la tabla de Órdenes.
  • Inserte solo columnas relevantes para el negocio en la página orientada al usuario. Orders tabla, seleccionando de la tabla de preparación Orders_Staging_Table.
  • Insertar el máximo Kafka_Offset por cada cargado Kafka_Partition en la mesa de auditoría Orders_Streaming_Audit

Hemos agregado la tabla de preparación intermedia. Orders_Staging_Table en esta solución para ayudar con la depuración en caso de fallas inesperadas y la rastreabilidad. Saltarse el paso de preparación y cargar directamente en la mesa final desde Orders_Stream_MV puede proporcionar una latencia más baja según su caso de uso.

  1. Cree el procedimiento almacenado con el siguiente código:
    CREATE OR REPLACE PROCEDURE SP_Orders_Load()
        AS $$
        BEGIN
    
        SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;
        REFRESH MATERIALIZED VIEW Orders_Stream_MV;
    
        --create an audit table if not exists to keep track of Max Offset per Partition that was loaded into Orders table  
    
        CREATE TABLE IF NOT EXISTS Orders_Streaming_Audit
        (
        "kafka_partition" BIGINT,
        "kafka_offset" BIGINT
        )
        SORTKEY("kafka_partition", "kafka_offset"); 
    
        DROP TABLE IF EXISTS Orders_Staging_Table;  
    
        --Insert only newly available data into staging table from streaming View based on the max offset for new/existing partitions
      --When loading for 1st time i.e. there is no data in Orders_Streaming_Audit table then all the data gets loaded from streaming View  
        CREATE TABLE Orders_Staging_Table as 
        SELECT 
        data."OrderID"."N"::INT4 as OrderID
        ,data."ProductID"."S"::VARCHAR(36) as ProductID
        ,data."ProductName"."S"::VARCHAR(36) as ProductName
        ,data."CustomerID"."S"::VARCHAR(36) as CustomerID
        ,data."CustomerName"."S"::VARCHAR(36) as CustomerName
        ,data."Store_Name"."S"::VARCHAR(36) as Store_Name
        ,data."OrderDate"."S"::TIMESTAMPTZ as OrderDate
        ,data."Quatity"."N"::INT4 as Quatity
        ,data."Price"."N"::DOUBLE PRECISION as Price
        ,data."OrderStatus"."S"::VARCHAR(36) as OrderStatus
        , s."kafka_partition"::BIGINT , s."kafka_offset"::BIGINT
        FROM Orders_Stream_MV s
        LEFT JOIN (
        SELECT
        "kafka_partition",
        MAX("kafka_offset") AS "kafka_offset"
        FROM Orders_Streaming_Audit
        GROUP BY "kafka_partition"
        ) AS m
        ON nvl(s."kafka_partition",0) = nvl(m."kafka_partition",0)
        WHERE
        m."kafka_offset" IS NULL OR
        s."kafka_offset" > m."kafka_offset";
    
        --Insert only business relevant column to final table selecting from staging table
        Insert into Orders 
        SELECT 
        OrderID
        ,ProductID
        ,ProductName
        ,CustomerID
        ,CustomerName
        ,Store_Name
        ,OrderDate
        ,Quatity
        ,Price
        ,OrderStatus
        FROM Orders_Staging_Table;
    
        --Insert the max kafka_offset for every loaded Kafka partitions into Audit table 
        INSERT INTO Orders_Streaming_Audit
        SELECT kafka_partition, MAX(kafka_offset)
        FROM Orders_Staging_Table
        GROUP BY kafka_partition;   
    
        END;
        $$ LANGUAGE plpgsql;
    

  2. Ejecute el procedimiento almacenado para cargar datos en el Orders mesa:
    call SP_Orders_Load();

  3. Validar datos en la tabla de Pedidos.

Establecer la ingesta de streaming entre cuentas

Si su clúster de MSK pertenece a una cuenta diferente, complete los siguientes pasos para crear roles de IAM para configurar la ingesta de streaming entre cuentas. Supongamos que el clúster Redshift está en la cuenta A y el clúster MSK está en la cuenta B, como se muestra en el siguiente diagrama.

Complete los siguientes pasos:

  1. En la cuenta B, cree una función de IAM llamada MyRedshiftMSKRole que permite a Amazon Redshift (cuenta A) comunicarse con el clúster MSK (cuenta B) denominado MyTestCluster. Dependiendo de si su clúster MSK utiliza autenticación IAM o acceso no autenticado para conectarse, debe crear una función de IAM con una de las siguientes políticas:
    • un IAM policAmazonAmazon MSK usando acceso no autenticado:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }

    • Una política de IAM para Amazon MSK cuando se utiliza la autenticación de IAM:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKIAMpolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka-cluster:ReadData",
                      "kafka-cluster:DescribeTopic",
                      "kafka-cluster:Connect"
                  ],
                  "Resource": [
                      "arn:aws:kafka:us-east-1:0123456789:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1",
                      "arn:aws:kafka:us-east-1:0123456789:topic/MyTestCluster/*"
                  ]
              },
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }
      

La sección de recursos del ejemplo anterior proporciona acceso a todos los temas del MyTestCluster Clúster MSK. Si necesita restringir la función de IAM a temas específicos, debe reemplazar el recurso del tema con una política de recursos más restrictiva.

  1. Después de crear la función de IAM en la cuenta B, tome nota del ARN de la función de IAM (por ejemplo, arn:aws:iam::0123456789:role/MyRedshiftMSKRole).
  2. En la cuenta A, cree una función de IAM personalizable de Redshift llamada MyRedshiftRole, que asumirá Amazon Redshift al conectarse a Amazon MSK. El rol debe tener una política como la siguiente, que permita que el rol de Amazon Redshift IAM en la cuenta A asuma el rol de Amazon MSK en la cuenta B:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "RedshiftMSKAssumePolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::0123456789:role/MyRedshiftMSKRole"        
           }
        ]
    }
    

  3. Tome nota del rol ARN para el rol de IAM de Amazon Redshift (por ejemplo, arn:aws:iam::9876543210:role/MyRedshiftRole).
  4. Vuelva a la cuenta B y agregue este rol en la política de confianza del rol de IAM arn:aws:iam::0123456789:role/MyRedshiftMSKRole para permitir que la cuenta B confíe en la función de IAM de la cuenta A. La política de confianza debería verse como el siguiente código:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": "sts:AssumeRole",
          "Principal": {
            "AWS": "arn:aws:iam::9876543210:role/MyRedshiftRole"
          }
        }
      ]
    } 
    

  5. Inicie sesión en la consola de Amazon Redshift como cuenta A.
  6. Inicie el editor de consultas v2 o su cliente SQL preferido y ejecute las siguientes instrucciones para acceder al tema de MSK en la cuenta B. Para asignarlo al clúster de MSK, cree un esquema externo usando encadenamiento de roles especificando los ARN de las funciones de IAM, separados por una coma sin espacios alrededor. El rol asignado al clúster Redshift es el primero en la cadena.
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  
    'arn:aws:iam::9876543210:role/MyRedshiftRole,arn:aws:iam::0123456789:role/MyRedshiftMSKRole' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn'; --replace with ARN of MSK cluster 
    

Consideraciones de rendimiento

Tenga en cuenta las siguientes consideraciones de rendimiento:

  • Mantenga la vista materializada de transmisión simple y mueva transformaciones como desanidación, agregación y expresiones de caso a un paso posterior, por ejemplo, creando otra vista materializada encima de la vista materializada de transmisión.
  • Considere la posibilidad de crear solo una vista materializada de transmisión en un único clúster o grupo de trabajo de Redshift para un tema de MSK determinado. La creación de varias vistas materializadas por tema de MSK puede ralentizar el rendimiento de la ingesta porque cada vista materializada se convierte en un consumidor de ese tema y comparte el ancho de banda de Amazon MSK para ese tema. Los datos de transmisión en vivo en una vista materializada de transmisión se pueden compartir entre múltiples clústeres de Redshift o grupos de trabajo de Redshift Serverless usando compartir datos.
  • Al definir su vista materializada de transmisión, evite usar Json_Extract_Path_Text triturar previamente los datos, porque Json_extract_path_text opera en los datos fila por fila, lo que afecta significativamente el rendimiento de la ingesta. Es preferible obtener los datos tal cual del flujo y luego triturarlos más tarde.
  • Siempre que sea posible, considere omitir la clave de clasificación en la vista materializada de transmisión para acelerar la velocidad de ingesta. Cuando una vista materializada de transmisión tiene una clave de clasificación, se producirá una operación de clasificación con cada lote de datos ingeridos de la transmisión. La clasificación tiene un rendimiento que se escucha según el tipo de datos de la clave de clasificación, el número de columnas de la clave de clasificación y la cantidad de datos ingeridos en cada lote. Este paso de clasificación puede aumentar la latencia antes de que los datos de transmisión estén disponibles para su consulta. Debes sopesar qué es más importante: la latencia al ingerir o la latencia al consultar los datos.
  • Para optimizar el rendimiento de la vista materializada en streaming y reducir el uso de almacenamiento, elimine ocasionalmente los datos de la vista materializada usando borrar, truncaro alterar tabla adjuntar.
  • Si necesita incorporar varios temas de MSK en paralelo en Amazon Redshift, comience con una cantidad menor de vistas materializadas en streaming y siga agregando más vistas materializadas para evaluar el rendimiento general de la ingesta dentro de un clúster o grupo de trabajo.
  • Aumentar la cantidad de nodos en un clúster aprovisionado de Redshift o la RPU base de un grupo de trabajo sin servidor de Redshift puede ayudar a aumentar el rendimiento de la ingesta de una vista materializada de transmisión. Para obtener un rendimiento óptimo, debe intentar tener tantos segmentos en su clúster aprovisionado de Redshift como particiones en su tema MSK, u 8 RPU por cada cuatro particiones en su tema MSK.

Técnicas de seguimiento

Se omitirán los registros del tema que excedan el tamaño de la columna de vista materializada de destino en el momento de la ingesta. Los registros que se omiten en la actualización de la vista materializada se registrarán en el SYS_STREAM_SCAN_ERRORS tabla del sistema.

Los errores que ocurren al procesar un registro debido a un cálculo o una conversión de tipo de datos o alguna otra lógica en la definición de la vista materializada darán como resultado una falla en la actualización de la vista materializada hasta que el registro infractor haya caducado del tema. Para evitar este tipo de problemas, pruebe cuidadosamente la lógica de la definición de su vista materializada; de lo contrario, coloque los registros en la columna VARBYTE predeterminada y procéselos más tarde.

Las siguientes son vistas de monitoreo disponibles:

  • SYS_MV_REFRESH_HISTORY – Utilice esta vista para recopilar información sobre el historial de actualización de sus vistas materializadas de transmisión. Los resultados incluyen el tipo de actualización, como manual o automática, y el estado de la actualización más reciente. La siguiente consulta muestra el historial de actualización de una vista materializada de streaming:
    select mv_name, refresh_type, status, duration  from SYS_MV_REFRESH_HISTORY where mv_name='mv_store_sales'

  • SYS_STREAM_SCAN_ERRORS – Utilice esta vista para verificar el motivo por el cual un registro no se pudo cargar mediante la ingesta de transmisión desde un tema de MSK. Al momento de escribir esta publicación, al realizar la ingesta desde Amazon MSK, esta vista solo registra errores cuando el registro es mayor que el tamaño de la columna de la vista materializada. Esta vista también mostrará el identificador único (desplazamiento) del registro MSK en la columna de posición. La siguiente consulta muestra el código de error y el motivo del error cuando un registro excedió el límite de tamaño máximo:
    select mv_name, external_schema_name, stream_name, record_time, query_id, partition_id, "position", error_code, error_reason
    from SYS_STREAM_SCAN_ERRORS  where mv_name='test_mv' and external_schema_name ='streaming_schema'	;
    

  • SYS_STREAM_SCAN_STATES – Utilice esta vista para monitorear la cantidad de registros escaneados en un momento de registro determinado. Esta vista también rastrea el desplazamiento del último registro leído en el lote. La siguiente consulta muestra datos de tema para una vista materializada específica:
    select mv_name,external_schema_name,stream_name,sum(scanned_rows) total_records,
    sum(scanned_bytes) total_bytes 
    from SYS_STREAM_SCAN_STATES where mv_name='test_mv' and external_schema_name ='streaming_schema' group by 1,2,3;
    

  • SYS_QUERY_HISTORY – Utilice esta vista para verificar las métricas generales para una actualización de la vista materializada de transmisión. Esto también registrará errores en la columna error_message para errores que no aparecen en SYS_STREAM_SCAN_ERRORS. La siguiente consulta muestra el error que causa el error de actualización de una vista materializada de transmisión:
    select  query_id, query_type, status, query_text, error_message from sys_query_history where status='failed' and start_time>='2024-02-03 03:18:00' order by start_time desc

Consideraciones adicionales para la implementación

Tiene la opción de generar opcionalmente una vista materializada sobre una vista materializada de transmisión, lo que le permite desanidar y precalcular los resultados para los usuarios finales. Este enfoque elimina la necesidad de almacenar los resultados en una tabla final mediante un procedimiento almacenado.

En esta publicación, usas el Función CAN_JSON_PARSE para protegerse contra errores e ingerir datos con mayor éxito; en este caso, Amazon Redshift omite los registros de transmisión que no se pueden analizar. Sin embargo, si desea realizar un seguimiento de sus registros de errores, considere almacenarlos en una columna utilizando el siguiente SQL al crear la vista materializada de transmisión:

CREATE MATERIALIZED VIEW Orders_Stream_MV AS 
SELECT
kafka_partition, 
kafka_offset, 
refresh_time, 
JSON_PARSE(kafka_value) as Data 
case when CAN_JSON_PARSE(kafka_value) = true then json_parse(kafka_value) end Data,
case when CAN_JSON_PARSE(kafka_value) = false then kafka_value end Invalid_Data
FROM custschema."ORDERTOPIC";

También puedes considerar descargando datos desde la vista SYS_STREAM_SCAN_ERRORS en una Servicio de almacenamiento simple de Amazon (Amazon S3) y reciba alertas mediante enviando un informe por correo electrónico usando Servicio de notificación simple de Amazon (Amazon SNS) notificaciones cada vez que se crea un nuevo objeto S3.

Por último, según sus requisitos de actualización de datos, puede utilizar Puente de eventos de Amazon para programar los trabajos en su almacén de datos para llamar a lo antes mencionado SP_Orders_Load procedimiento almacenado de forma regular. EventBridge hace esto a intervalos fijos y es posible que necesite tener un mecanismo (por ejemplo, un Funciones de paso de AWS máquina de estado) para monitorear si se completó la llamada anterior al procedimiento. Para obtener más información, consulte Creación de una regla de Amazon EventBridge que se ejecuta según un cronograma. También puedes consultar Acelere la orquestación de un proceso ELT con AWS Step Functions y Amazon Redshift Data API. Otra opción es usar Editor de consultas de Amazon Redshift v2 para programar la actualización. Para más detalles, consulte Programar una consulta con el editor de consultas v2.

Conclusión

En esta publicación, analizamos las mejores prácticas para implementar análisis casi en tiempo real mediante la ingesta de streaming de Amazon Redshift con Amazon MSK. Le mostramos un canal de ejemplo para ingerir datos de un tema de MSK en Amazon Redshift mediante la ingestión de streaming. También mostramos una estrategia confiable para realizar una carga de datos de transmisión incremental en Amazon Redshift utilizando Kafka Partition y Kafka Offset. Además, demostramos los pasos para configurar la ingesta de streaming entre cuentas desde Amazon MSK a Amazon Redshift y analizamos las consideraciones de rendimiento para una tasa de ingesta optimizada. Por último, analizamos técnicas de monitoreo para rastrear fallas en la ingesta de streaming de Amazon Redshift.

Si tienes alguna duda, déjala en la sección de comentarios.


Acerca de los autores

Poulomi Dasgupta es arquitecto sénior de soluciones de análisis en AWS. Le apasiona ayudar a los clientes a crear soluciones de análisis basadas en la nube para resolver sus problemas comerciales. Fuera del trabajo, le gusta viajar y pasar tiempo con su familia.

Adekunle Adedotun es ingeniero sénior de bases de datos con el servicio Amazon Redshift. Ha estado trabajando en bases de datos MPP durante 6 años con un enfoque en el ajuste del rendimiento. También brinda orientación al equipo de desarrollo para funciones de servicio nuevas y existentes.

punto_img

Información más reciente

punto_img