Logotipo de Zephyrnet

Utilice los nuevos comandos SQL MERGE y QUALIFY para implementar y validar la captura de datos de cambios en Amazon Redshift | Servicios web de Amazon

Fecha:

Desplazamiento al rojo de Amazon es un servicio de almacenamiento de datos a escala de petabytes completamente administrado en la nube. Decenas de miles de clientes usan Amazon Redshift para procesar exabytes de datos todos los días para potenciar sus cargas de trabajo de análisis.

Amazon Redshift ha agregado muchas características para mejorar el procesamiento analítico como CONJUNTOS DE ENROLLAMIENTO, CUBO Y AGRUPACIÓN, que fueron demostrados en la publicación Simplifique las consultas de procesamiento analítico en línea (OLAP) en Amazon Redshift mediante nuevas construcciones de SQL como ROLLUP, CUBE y GROUPING SETS. Amazon Redshift ha agregado recientemente muchos comandos y expresiones SQL. En esta publicación, hablamos de dos nuevas características de SQL, la UNIR mando y CALIFICAR cláusula, que simplifica la ingesta y el filtrado de datos.

Una tarea familiar en la mayoría de las aplicaciones posteriores es la captura de datos modificados (CDC) y su aplicación a sus tablas de destino. Esta tarea requiere examinar los datos de origen para determinar si se trata de una actualización o una inserción de datos de destino existentes. Sin el comando MERGE, era necesario probar el nuevo conjunto de datos con el conjunto de datos existente utilizando una clave comercial. Cuando eso no coincidía, insertabas nuevas filas en el conjunto de datos existente; de lo contrario, actualizó las filas del conjunto de datos existente con nuevos valores del conjunto de datos.

La UNIR El comando fusiona condicionalmente filas de una tabla de origen en una tabla de destino. Tradicionalmente, esto sólo se podía lograr mediante el uso de múltiples instrucciones de inserción, actualización o eliminación por separado. Cuando se utilizan varias declaraciones para actualizar o insertar datos, existe el riesgo de que se produzcan inconsistencias entre las diferentes operaciones. La operación de fusión reduce este riesgo al garantizar que todas las operaciones se realicen juntas en una sola transacción.

La CALIFICAR La cláusula filtra los resultados de una función de ventana previamente calculada según las condiciones de búsqueda especificadas por el usuario. Puede utilizar la cláusula para aplicar condiciones de filtrado al resultado de una función de ventana sin utilizar una subconsulta. Esto es similar al TENIENDO cláusula, que aplica una condición para filtrar aún más filas de una cláusula WHERE. La diferencia entre QUALIFY y HAVING es que los resultados filtrados de la cláusula QUALIFY podrían basarse en el resultado de ejecutar funciones de ventana en los datos. Puede utilizar las cláusulas QUALIFY y HAVING en una consulta.

En esta publicación, demostramos cómo usar el comando MERGE para implementar CDC y cómo usar QUALIFY para simplificar la validación de esos cambios.

Resumen de la solución

En este caso de uso, tenemos un almacén de datos, en el que tenemos una tabla de dimensiones del cliente que siempre necesita obtener los datos más recientes del sistema fuente. Estos datos también deben reflejar la hora de creación inicial y la hora de la última actualización para fines de auditoría y seguimiento.

Una forma sencilla de resolver esto es anular completamente la dimensión del cliente todos los días; sin embargo, eso no logrará el seguimiento de actualizaciones, que es un mandato de auditoría, y podría no ser factible hacerlo en tablas más grandes.

Puede cargar datos de muestra desde Amazon S3 siguiendo las instrucciones esta página. Usando la tabla de clientes existente en sample_data_dev.tpcds, creamos una tabla de dimensiones de clientes y una tabla de origen que contendrá actualizaciones para clientes existentes e inserciones para clientes nuevos. Usamos el comando FUSIONAR para fusionar los datos de la tabla de origen con la tabla de destino (dimensión del cliente). También mostramos cómo utilizar la cláusula QUALIFY para simplificar la validación de los cambios en la tabla de destino.

Para seguir los pasos de esta publicación, recomendamos descargar el archivo adjunto cuaderno, que contiene todos los scripts que se ejecutarán para esta publicación. Para obtener información sobre cómo crear y ejecutar cuadernos, consulte Creación y ejecución de cuadernos.

Requisitos previos

Debe tener los siguientes requisitos previos:

Crear y completar la tabla de dimensiones

Usamos la tabla de clientes existente en sample_data_dev.tpcds para crear un customer_dimension mesa. Complete los siguientes pasos:

  1. Cree una tabla usando algunos campos seleccionados, incluida la clave comercial, y agregue un par de campos de mantenimiento para insertar y actualizar marcas de tiempo:
     -- create the customer dimension table DROP TABLE IF EXISTS customer_dim CASCADE;
    CREATE TABLE customer_dim ( customer_dim_id bigint GENERATED BY DEFAULT AS IDENTITY(1, 1), c_customer_sk integer NOT NULL ENCODE az64 distkey,
    c_first_name character(20) ENCODE lzo,
    c_last_name character(30) ENCODE lzo,
    c_current_addr_sk integer ENCODE az64,
    c_birth_country character varying(20) ENCODE lzo,
    c_email_address character(50) ENCODE lzo,
    record_insert_ts timestamp WITHOUT time ZONE DEFAULT current_timestamp ,
    record_upd_ts timestamp WITHOUT time ZONE DEFAULT NULL
    )
    SORTKEY (c_customer_sk);

  2. Complete la tabla de dimensiones:
    -- populate dimension insert into customer_dim (c_customer_sk, c_first_name,c_last_name, c_current_addr_sk, c_birth_country, c_email_address) select c_customer_sk, c_first_name,c_last_name, c_current_addr_sk, c_birth_country, c_email_address
    from “sample_data_dev”.”tpcds”.”customer”;

  3. Valide el recuento de filas y el contenido de la tabla:
    -- check customers count and look at sample data
    select count(1) from customer_dim; select * from customer_dim limit 10;

Simular cambios en la tabla de clientes

Utilice el siguiente código para simular los cambios realizados en la tabla:

-- create a source table with some updates and some inserts
-- Update- Email has changed for 100 customers drop table if exists src_customer;
create table src_customer distkey(c_customer_sk) as select c_customer_sk , c_first_name , c_last_name, c_current_addr_sk, c_birth_country, ‘x’+c_email_address as c_email_address, getdate() as effective_dt
from customer_dim where c_email_address is not null
limit 100; -- also let’s add three completely new customers
insert into src_customer values (15000001, ‘Customer#15’,’000001’, 10001 ,’USA’ , ‘Customer#15000001@gmail.com’, getdate() ),
(15000002, ‘Customer#15’,’000002’, 10002 ,’MEXICO’ , ‘Customer#15000002@gmail.com’, getdate() ),
(15000003, ‘Customer#15’,’000003’, 10003 ,’CANADA’ , ‘Customer#15000003@gmail.com’, getdate() ); -- check source count
select count(1) from src_customer;

Fusionar la tabla de origen con la tabla de destino

Ahora tiene una tabla de origen con algunos cambios que debe fusionar con la tabla de dimensiones del cliente.

Antes del comando MERGE, este tipo de tarea necesitaba dos comandos UPDATE e INSERT separados para implementarse:

-- merge changes to dim customer
BEGIN TRANSACTION;
-- update current records
UPDATE customer_dim
SET c_first_name = src.c_first_name , c_last_name = src.c_last_name , c_current_addr_sk = src.c_current_addr_sk , c_birth_country = src.c_birth_country , c_email_address = src.c_email_address , record_upd_ts = current_timestamp
from src_customer AS src
where customer_dim.c_customer_sk = src.c_customer_sk ;
-- Insert new records
INSERT INTO customer_dim (c_customer_sk, c_first_name,c_last_name, c_current_addr_sk, c_birth_country, c_email_address) select src.c_customer_sk, src.c_first_name,src.c_last_name, src.c_current_addr_sk, src.c_birth_country, src.c_email_address from src_customer AS src
where src.c_customer_sk NOT IN (select c_customer_sk from customer_dim);
-- end merge operation
COMMIT TRANSACTION;

El comando MERGE utiliza una sintaxis más sencilla, en la que utilizamos el resultado de la comparación de claves para decidir si realizamos una operación de actualización de DML (cuando coincide) o una operación de inserción de DML (cuando no coincide):

MERGE INTO customer_dim using src_customer AS src ON customer_dim.c_customer_sk = src.c_customer_sk
WHEN MATCHED THEN UPDATE SET c_first_name = src.c_first_name , c_last_name = src.c_last_name , c_current_addr_sk = src.c_current_addr_sk , c_birth_country = src.c_birth_country , c_email_address = src.c_email_address , record_upd_ts = current_timestamp
WHEN NOT MATCHED THEN INSERT (c_customer_sk, c_first_name,c_last_name, c_current_addr_sk, c_birth_country, c_email_address) VALUES (src.c_customer_sk, src.c_first_name,src.c_last_name, src.c_current_addr_sk, src.c_birth_country, src.c_email_address );

Validar los cambios de datos en la tabla de destino.

Ahora necesitamos validar que los datos hayan llegado correctamente a la tabla de destino. Primero podemos verificar los datos actualizados usando la marca de tiempo de actualización. Como esta fue nuestra primera actualización, podemos examinar todas las filas donde la marca de tiempo de la actualización no es nula:

-- Check the changes
-- to get updates
select * from customer_dim
where record_upd_ts is not null

Utilice QUALIFY para simplificar la validación de los cambios de datos.

Necesitamos examinar los datos insertados en esta tabla más recientemente. Una forma de hacerlo es clasificar los datos según su marca de tiempo de inserción y obtener aquellos con la primera clasificación. Esto requiere usar la función de ventana. rank() y también requiere una subconsulta para obtener los resultados.

Antes de que QUALIFY estuviera disponible, necesitábamos crearlo usando una subconsulta como la siguiente:

select customer_dim_id,c_customer_sk ,c_first_name ,c_last_name ,c_current_addr_sk,c_birth_country ,c_email_address ,record_insert_ts ,record_upd_ts from ( select rank() OVER (ORDER BY DATE_TRUNC(‘second’,record_insert_ts) desc) AS rnk, customer_dim_id,c_customer_sk ,c_first_name ,c_last_name ,c_current_addr_sk,c_birth_country ,c_email_address ,record_insert_ts ,record_upd_ts from customer_dim where record_upd_ts is null)
where rnk = 1;

La función QUALIFY elimina la necesidad de la subconsulta, como se muestra en el siguiente fragmento de código:

-- to get the newly inserted rows we can make use of Qualify feature
select * from customer_dim
where record_upd_ts is null
qualify rank() OVER (ORDER BY DATE_TRUNC(‘second’,record_insert_ts) desc) = 1 

Validar todos los cambios de datos

Podemos unir los resultados de ambas consultas para obtener todas las inserciones y cambios de actualización:

-- To get all changes
select *
from (
select 'Updates' as operations, cd.* from customer_dim as cd
where cd.record_upd_ts is not null
union select 'Inserts' as operations, cd.* from customer_dim cd
where cd.record_upd_ts is null
qualify rank() OVER (ORDER BY DATE_TRUNC('second',cd.record_insert_ts) desc) = 1 ) order by 1

Limpiar

Para limpiar los recursos utilizados en la publicación, elimine el clúster aprovisionado de Redshift o el grupo de trabajo sin servidor de Redshift y el espacio de nombres que creó para esta publicación (esto también eliminará todos los objetos creados).

Si usó un clúster aprovisionado de Redshift existente o un grupo de trabajo y espacio de nombres de Redshift Serverless, use el siguiente código para eliminar estos objetos:

DROP TABLE IF EXISTS customer_dim CASCADE;
DROP TABLE IF EXISTS src_customer CASCADE;

Conclusión

Cuando se utilizan varias declaraciones para actualizar o insertar datos, existe el riesgo de que se produzcan inconsistencias entre las diferentes operaciones. La operación MERGE reduce este riesgo al garantizar que todas las operaciones se realicen juntas en una sola transacción. Para los clientes de Amazon Redshift que están migrando desde otros sistemas de almacenamiento de datos o que regularmente necesitan incorporar datos que cambian rápidamente en su almacén de Redshift, el comando MERGE es una forma sencilla de insertar, actualizar y eliminar de forma condicional datos de tablas de destino basadas en datos existentes y nueva fuente de datos.

En la mayoría de las consultas analíticas que utilizan funciones de ventana, es posible que también necesite utilizar esas funciones de ventana en su cláusula WHERE. Sin embargo, esto no está permitido y, para hacerlo, debe crear una subconsulta que contenga la función de ventana requerida y luego usar los resultados en la consulta principal en la cláusula WHERE. El uso de la cláusula QUALIFY elimina la necesidad de una subconsulta y, por lo tanto, simplifica la declaración SQL y hace que sea menos difícil de escribir y leer.

Le animamos a que empiece a utilizar esas nuevas funciones y nos dé su opinión. Para obtener más detalles, consulte UNIR y Cláusula CALIFICAR.


Sobre los autores

Yanzhu-ji es gerente de producto en el equipo de Amazon Redshift. Tiene experiencia en visión y estrategia de productos en plataformas y productos de datos líderes en la industria. Tiene una habilidad sobresaliente en la creación de productos de software sustanciales utilizando técnicas de desarrollo web, diseño de sistemas, bases de datos y programación distribuida. En su vida personal, a Yanzhu le gusta pintar, fotografiar y jugar al tenis.

Ahmed Shehat es arquitecto sénior de soluciones especialista en análisis en AWS con sede en Toronto. Tiene más de dos décadas de experiencia ayudando a los clientes a modernizar sus plataformas de datos. A Ahmed le apasiona ayudar a los clientes a crear soluciones analíticas eficientes, con rendimiento y escalables.

Ranjan birmano es un arquitecto de soluciones especialista en análisis en AWS. Se especializa en Amazon Redshift y ayuda a los clientes a crear soluciones analíticas escalables. Tiene más de 16 años de experiencia en diferentes tecnologías de bases de datos y almacenamiento de datos. Le apasiona automatizar y resolver los problemas de los clientes con soluciones en la nube.

punto_img

Información más reciente

punto_img