Logotipo de Zephyrnet

Valide la transmisión de datos a través de Amazon MSK mediante esquemas en el Registro de esquemas de AWS Glue entre cuentas

Fecha:

Las empresas de hoy se enfrentan a un crecimiento sin precedentes en el volumen de datos. Una parte cada vez mayor de los datos se genera en tiempo real mediante dispositivos IoT, sitios web, aplicaciones comerciales y otras fuentes. Las empresas necesitan procesar y analizar estos datos tan pronto como llegan para tomar decisiones comerciales en tiempo real. Streaming administrado por Amazon para Apache Kafka (Amazon MSK) es un servicio completamente administrado que permite crear y ejecutar aplicaciones de procesamiento de secuencias que utilizan Apache Kafka para recopilar y procesar datos en tiempo real.

Las aplicaciones de procesamiento de secuencias que utilizan Apache Kafka no se comunican entre sí directamente; se comunican enviando y recibiendo mensajes sobre temas de Kafka. Para que las aplicaciones de procesamiento de flujo se comuniquen de manera eficiente y segura, se debe definir una estructura de carga de mensajes en términos de atributos y tipos de datos. Esta estructura describe el uso de aplicaciones de esquema al enviar y recibir mensajes. Sin embargo, con una gran cantidad de aplicaciones de producción y consumo, incluso un pequeño cambio en el esquema (eliminar un campo, agregar un nuevo campo o cambiar el tipo de datos) puede causar problemas para las aplicaciones posteriores que son difíciles de depurar y corregir.

Tradicionalmente, los equipos se han basado en procesos de gestión de cambios (como aprobaciones y ventanas de mantenimiento) u otros mecanismos informales (documentación, correos electrónicos, herramientas de colaboración, etc.) para informarse mutuamente sobre los cambios en el esquema de datos. Sin embargo, estos mecanismos no escalan y son propensos a errores. El Registro de esquemas de AWS Glue le permite publicar, descubrir, controlar, validar y desarrollar esquemas de forma centralizada para aplicaciones de procesamiento de flujo. Con el Registro de esquemas de AWS Glue, puede administrar y aplicar esquemas en aplicaciones de transmisión de datos mediante Apache Kafka, AmazonMSK, Secuencias de datos de Amazon Kinesis, Análisis de datos de Amazon Kinesis para Apache Flinky AWS Lambda.

Esta publicación demuestra cómo las aplicaciones de procesamiento de secuencias de Apache Kafka validan los mensajes mediante un apache avro esquema almacenado en el Registro de esquema de AWS Glue residiendo en una cuenta central de AWS. usamos el Biblioteca SerDe del registro de esquemas de AWS Glue y Avro SpecificRecord para validar mensajes en aplicaciones de procesamiento de flujo al enviar y recibir mensajes de un tema de Kafka en un clúster de Amazon MSK. Aunque usamos un esquema Avro para esta publicación, el mismo enfoque y concepto también se aplica a los esquemas JSON.

Caso de uso

Supongamos una empresa ficticia de viajes compartidos que ofrece paseos en unicornio. Para obtener información procesable, necesitan procesar un flujo de mensajes de solicitud de paseo en unicornio. Esperan que los viajes sean muy populares y quieren asegurarse de que su solución pueda escalar. También están construyendo un lago de datos central donde se almacenan todos sus datos de transmisión y operación para su análisis. Están obsesionados con los clientes, por lo que esperan agregar nuevas funciones divertidas para viajes futuros, como elegir el color de cabello de su unicornio, y deberán reflejar estos atributos en los mensajes de solicitud de viaje. Para evitar problemas en las aplicaciones posteriores debido a futuros cambios en el esquema, necesitan un mecanismo para validar los mensajes con un esquema alojado en un registro de esquema central. Tener esquemas en un registro central de esquemas facilita que los equipos de aplicaciones publiquen, validen, evolucionen y mantengan esquemas en un solo lugar.

Resumen de la solución

La empresa utiliza Amazon MSK para capturar y distribuir los mensajes de solicitud de viaje en unicornio a escala. Definen un esquema de Avro para solicitudes de paseos en unicornio porque proporciona estructuras de datos ricas, admite el mapeo directo a JSON, así como un formato de datos binario, rápido y compacto. Debido a que el esquema se acordó de antemano, decidieron usar Avro SpecificRecord.SpecificRecord es una interfaz de la biblioteca Avro que permite el uso de un registro Avro como POJO. Esto se hace generando una clase (o clases) de Java a partir del esquema, utilizando avro-maven-plugin. Ellos usan Gestión de identidades y accesos de AWS (YO SOY) roles entre cuentas para permitir que las aplicaciones de productores y consumidores de la otra cuenta de AWS accedan de forma segura a los esquemas en la cuenta central del registro de esquemas.

El registro de esquemas de AWS Glue está en la cuenta B, mientras que el clúster de MSK y las aplicaciones de productor y consumidor de Kafka están en la cuenta A. Utilizamos los siguientes dos roles de IAM para habilitar el acceso entre cuentas al registro de esquemas de AWS Glue. Los clientes de Apache Kafka en la cuenta A asumen un rol en la cuenta B mediante una política basada en identidad porque AWS Glue Schema Registry no admite políticas basadas en recursos.

  • Cuenta A Rol de IAM – Permite que las aplicaciones de productor y consumidor asuman un rol de IAM en la Cuenta B.
  • Cuenta B Rol de IAM – Confía en todas las entidades principales de IAM de la cuenta A y les permite realizar acciones de lectura en el registro de esquema de AWS Glue en la cuenta B. En un escenario de caso de uso real, las entidades principales de IAM que pueden asumir roles entre cuentas deben tener un alcance más específico.

El siguiente diagrama de arquitectura ilustra la solución:

La solución funciona de la siguiente manera:

  1. Un productor de Kafka que se ejecuta en la cuenta A asume el rol de IAM de registro de esquema entre cuentas en la cuenta B llamando al Servicio de token de seguridad de AWS (STS de AWS) assumeRole API.
  2. El productor de Kafka recupera el ID de la versión del esquema Avro de la solicitud de paseo en unicornio del registro de esquemas de AWS Glue para el esquema que está incrustado en la solicitud de paseo en unicornio POJO. El serializador de AWS Glue Schema Registry SerDe administra internamente la obtención del ID de la versión del esquema. El serializador debe configurarse como parte de la configuración del productor de Kafka.
  3. Si el esquema existe en el registro de esquemas de AWS Glue, el serializador decora el registro de datos con el ID de la versión del esquema y luego lo serializa antes de enviarlo al tema de Kafka en el clúster de MSK.
  4. El consumidor de Kafka que se ejecuta en la cuenta A asume el rol de IAM de registro de esquema entre cuentas en la cuenta B llamando a AWS STS assumeRole API.
  5. El consumidor de Kafka comienza a sondear el tema de Kafka en el clúster de MSK en busca de registros de datos.
  6. El consumidor de Kafka recupera el esquema de Avro de la solicitud de paseo en unicornio del Registro de esquemas de AWS Glue, que coincide con el ID de la versión del esquema que está codificado en el registro de datos de la solicitud de paseo en unicornio. Obteniendo el esquema
    a es administrado internamente por el deserializador de AWS Glue Schema Registry SerDe. El deserializador debe configurarse como parte de la configuración del consumidor de Kafka. Si el esquema existe en el registro de esquemas de AWS Glue, el deserializador deserializa el registro de datos en el POJO de solicitud de paseo en unicornio para que el consumidor lo procese.

La biblioteca SerDe de AWS Glue Schema Registry también admite una configuración de compresión opcional para ahorrar en transferencias de datos. Para obtener más información sobre el registro de esquemas, consulte Cómo funciona el registro de esquemas.

Solicitud de paseo en unicornio Esquema Avro

El siguiente esquema (UnicornRideRequest.avsc) define un registro que representa una solicitud de viaje unicornio, que contiene atributos de solicitud de viaje junto con los atributos del cliente y los atributos unicornio recomendados por el sistema:

{ "type": "record", "name": "UnicornRideRequest", "namespace": "demo.glue.schema.registry.avro", "fields": [ {"name": "request_id", "type": "int", "doc": "customer request id"}, {"name": "pickup_address","type": "string","doc": "customer pickup address"}, {"name": "destination_address","type": "string","doc": "customer destination address"}, {"name": "ride_fare","type": "float","doc": "ride fare amount (USD)"}, {"name": "ride_duration","type": "int","doc": "ride duration in minutes"}, {"name": "preferred_unicorn_color","type": {"type": "enum","name": "UnicornPreferredColor","symbols": ["WHITE","BLACK","RED","BLUE","GREY"]}, "default": "WHITE"}, { "name": "recommended_unicorn", "type": { "type": "record", "name": "RecommendedUnicorn", "fields": [ {"name": "unicorn_id","type": "int", "doc": "recommended unicorn id"}, {"name": "color","type": {"type": "enum","name": "unicorn_color","symbols": ["WHITE","RED","BLUE"]}}, {"name": "stars_rating", "type": ["null", "int"], "default": null, "doc": "unicorn star ratings based on customers feedback"} ] } }, { "name": "customer", "type": { "type": "record", "name": "Customer", "fields": [ {"name": "customer_account_no","type": "int", "doc": "customer account number"}, {"name": "first_name","type": "string"}, {"name": "middle_name","type": ["null","string"], "default": null}, {"name": "last_name","type": "string"}, {"name": "email_addresses","type": ["null", {"type":"array", "items":"string"}]}, {"name": "customer_address","type": "string","doc": "customer address"}, {"name": "mode_of_payment","type": {"type": "enum","name": "ModeOfPayment","symbols": ["CARD","CASH"]}, "default": "CARD"}, {"name": "customer_rating", "type": ["null", "int"], "default": null} ] } } ] }

Requisitos previos

Para usar esta solución, debe tener dos cuentas de AWS:

Para esta solución, usamos Region us-east-1, pero puede cambiar esto según sus requisitos.

A continuación, creamos los recursos en cada cuenta usando Formación en la nube de AWS plantillas.

Crear recursos en la Cuenta B

Creamos los siguientes recursos en la Cuenta B:

  • Un registro de esquema
  • Un esquema Avro
  • Un rol de IAM con el AWSGlueSchemaRegistryReadonlyAccess política administrada y un perfil de instancia, que permite que todas las entidades principales de IAM de la cuenta A la asuman
  • El UnicornRideRequest.avsc Esquema de Avro mostrado anteriormente, que se utiliza como definición de esquema en la plantilla de CloudFormation

Asegúrese de tener los permisos adecuados para crear estos recursos.

  1. Inicie sesión en la cuenta B.
  2. Lanzar lo siguiente Pila de CloudFormation.
  3. Nombre de pila, introduzca SchemaRegistryStack.
  4. Nombre del registro de esquema, introduzca unicorn-ride-request-registry.
  5. Nombre del esquema de Avro, introduzca unicorn-ride-request-schema-avro.
  6. Para el ID de cuenta de AWS del cliente de Kafka, ingrese su ID de cuenta A.
  7. ID externo, ingrese una ID aleatoria única (por ejemplo, demo10A), que deben proporcionar los clientes de Kafka en la cuenta A mientras asumen el rol de IAM en esta cuenta.

Para obtener más información acerca de la seguridad entre cuentas, consulte El problema del diputado confundido.

  1. Cuando la pila esté completa, en el Salidas pestaña de la pila, copie el valor para CrossAccountGlueSchemaRegistryRoleArn.

Las aplicaciones de productor y consumidor de Kafka creadas en la cuenta A asumen este rol para acceder al registro de esquemas y al esquema en la cuenta B.

  1. Para verificar que se crearon los recursos, en la consola de AWS Glue, elija Registros de esquema en la barra de navegación y busque unicorn-ride-request-registry.
  2. Elige el registro unicorn-ride-request-registry y verificar que contiene unicorn-ride-request-schema-avro existentes Esquemas .
  3. Elija el esquema para ver su contenido.

El rol de IAM creado por el SchemaRegistryStack stack permite que todas las entidades principales de IAM de la cuenta A lo asuman y realicen acciones de lectura en el registro de esquemas de AWS Glue. Veamos las relaciones de confianza del rol de IAM.

  1. En SchemaRegistryStack montón Salidas pestaña, copie el valor de CrossAccountGlueSchemaRegistryRoleName.
  2. En la consola de IAM, busque este rol.
  3. Elige Relaciones de confianza y mire sus entidades de confianza para confirmar que la Cuenta A está en la lista.
  4. En Condiciones sección, confirme que sts:ExternalId tiene la misma ID aleatoria única proporcionada durante la creación de la pila.

Crear recursos en la Cuenta A

Creamos los siguientes recursos en la Cuenta A:

  • una VPC
  • Instancias EC2 para el productor y consumidor Kafka
  • Un entorno AWS Cloud9
  • Un clúster de MSK

Como requisito previo, cree un par de claves EC2 y descárguelo en su máquina para poder acceder a instancias EC2 mediante SSH. También crea un Configuración del clúster de MSK con valores predeterminados. Necesita tener permisos para crear CloudFormation
pila, instancias EC2, entorno de AWS Cloud9, clúster de MSK, configuración de clúster de MSK y rol de IAM.

  1. Inicie sesión en la cuenta A.
  2. Lanzar lo siguiente Pila de CloudFormation para lanzar la VPC, las instancias EC2 y el entorno de AWS Cloud9.
  3. Nombre de pila, introduzca MSKClientStack.
  4. Proporcione los rangos de CIDR de subred y VPC.
  5. Par de claves EC2, elija un par de claves EC2 existente.
  6. Para obtener el ID de AMI de EC2 más reciente, seleccione la opción predeterminada.
  7. Para el ARN del rol de IAM entre cuentas, use el valor para CrossAccountGlueSchemaRegistryRoleArn (disponible en el Salidas pestaña de SchemaRegistryStack).
  8. Espere a que la pila se cree correctamente.
  9. Lanzar lo siguiente Pila de CloudFormation para crear el clúster de MSK.
  10. Nombre de pila, introduzca MSKClusterStack.
  11. Utilice Amazon MSK versión 2.7.1.
  12. Para el ARN de configuración del clúster de MSK, ingrese el ARN de configuración del clúster de MSK. Uno que haya creado como parte del requisito previo.
  13. Para el número de revisión de la configuración del clúster de MSK, ingrese 1 o cámbielo según su versión.
  14. Para el nombre de la pila de CloudFormation del cliente, ingrese MSKClientStack (el nombre de la pila que creó antes de esta pila).

Configurar el productor Kafka

Para configurar el productor de Kafka que accede al registro de esquemas en la cuenta central de AWS, complete los siguientes pasos:

  1. Inicie sesión en la cuenta A.
  2. En la consola de AWS Cloud9, elija el Cloud9EC2Bastion entorno creado por la MSKClientStack asociación.
  3. En Archive menú, seleccione Cargar archivos locales.
  4. Cargue el archivo de par de claves EC2 que utilizó anteriormente al crear la pila.
  5. Abra una nueva terminal y cambie los permisos del par de claves EC2:
    chmod 0400 <keypair PEM file>

  6. SSH en el KafkaProducerInstance Instancia EC2 y configure la región según sus requisitos:
    ssh -i <keypair PEM file> ec2-user@<KafkaProducerInstance Private IP address>
    aws configure set region <region>

  7. Establecer la variable de entorno MSK_CLUSTER_ARN apuntando al ARN del clúster de MSK:
    export MSK_CLUSTER_ARN=$(aws kafka list-clusters | jq '.ClusterInfoList[] | select (.ClusterName == "MSKClusterStack") | {ClusterArn} | join (" ")' | tr -d ")

Cambie el .ClusterName valor en el código si usó un nombre diferente para la pila de CloudFormation del clúster de MSK. El nombre del clúster es el mismo que el nombre de la pila.

  1. Establecer la variable de entorno BOOTSTRAP_BROKERS apuntando a los corredores de arranque:
    export BOOTSTRAP_BROKERS=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_CLUSTER_ARN | jq -r .BootstrapBrokerString)

  2. Verifique las variables de entorno:
    echo $MSK_CLUSTER_ARN
    echo $BOOTSTRAP_BROKERS

  3. Cree un tema de Kafka llamado unicorn-ride-request-topic en su clúster de MSK, que las aplicaciones de productor y consumidor de Kafka utilizan más adelante:
    cd ~/kafka ./bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS --topic unicorn-ride-request-topic --create --partitions 3 --replication-factor 2 ./bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS --list

El MSKClientStack stack copió el archivo JAR del cliente productor de Kafka llamado kafka-cross-account-gsr-producer.jar En el correo electrónico “Su Cuenta de Usuario en su Nuevo Sistema XNUMXCX”. KafkaProducerInstance ejemplo. Contiene el cliente productor de Kafka que envía mensajes al tema de Kafka unicorn-ride-request-topic en el clúster de MSK y accede al unicorn-ride-request-schema-avro Esquema Avro de la unicorn-ride-request-registry registro de esquema en la cuenta B. El código de productor de Kafka, que trataremos más adelante en esta publicación, está disponible en GitHub.

  1. Ejecute los siguientes comandos y verifique kafka-cross-account-gsr-producer.jar existe:
    cd ~
    ls -ls

  2. Ejecute el siguiente comando para ejecutar el productor de Kafka en el KafkaProducerInstance Terminal:
    java -jar kafka-cross-account-gsr-producer.jar -bs $BOOTSTRAP_BROKERS -rn <Account B IAM role arn that Kafka producer application needs to assume> -topic unicorn-ride-request-topic -reg us-east-1 -nm 500 -externalid <Account B IAM role external Id that you used while creating a CF stack in Account B>

El código tiene los siguientes parámetros:

  • -bs$BOOTSTRAP_BROKERS (los agentes de arranque del clúster de MSK)
  • -rn - El CrossAccountGlueSchemaRegistryRoleArn valor de la SchemaRegistryStack apilar salidas en la cuenta B
  • -tema – el tema de Kafka unicorn-ride-request-topic
  • -regus-east-1 (cámbielo según su región, se usa para el punto de enlace de AWS STS y el registro de esquemas)
  • -Nuevo Méjico: 500 (la cantidad de mensajes que la aplicación del productor envía al tema de Kafka)
  • -Id externo – El mismo ID externo (por ejemplo, demo10A) que usó al crear la pila de CloudFormation en la Cuenta B

La siguiente captura de pantalla muestra los registros del productor de Kafka que muestran Schema Version Id received..., lo que significa que ha recuperado el esquema de Avro unicorn-ride-request-schema-avro de la cuenta B y los mensajes se enviaron al tema de Kafka en el clúster de MSK en la cuenta A.

Código de productor de Kafka

La implementación completa del productor de Kafka está disponible en GitHub. En esta sección, desglosamos el código.

  • getProducerConfig() inicializa las propiedades del productor, como se muestra en el siguiente código:
    • VALUE_SERIALIZER_CLASS_CONFIG - El GlueSchemaRegistryKafkaSerializer.class.getName() Implementación del serializador de AWS que serializa registros de datos (la implementación está disponible en GitHub)
    • NOMBRE_REGISTRO – El registro de esquema de la cuenta B
    • ESQUEMA_NOMBRE – El nombre del esquema de la Cuenta B
    • AVRO_RECORD_TYPEAvroRecordType.SPECIFIC_RECORD
private Properties getProducerConfig() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); props.put(ProducerConfig.ACKS_CONFIG, "-1"); props.put(ProducerConfig.CLIENT_ID_CONFIG,"msk-cross-account-gsr-producer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name()); props.put(AWSSchemaRegistryConstants.AWS_REGION,regionName); props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "unicorn-ride-request-registry"); props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "unicorn-ride-request-schema-avro"); props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName()); return props;
}

  • startProducer() asume el rol en la Cuenta B para poder conectarse con el Registro de esquemas en la Cuenta B y envía mensajes al tema de Kafka en el clúster de MSK:
public void startProducer() { assumeGlueSchemaRegistryRole(); KafkaProducer<String, UnicornRideRequest> producer = new KafkaProducer<String,UnicornRideRequest>(getProducerConfig()); int numberOfMessages = Integer.valueOf(str_numOfMessages); logger.info("Starting to send records..."); for(int i = 0;i < numberOfMessages;i ++) { UnicornRideRequest rideRequest = getRecord(i); String key = "key-" + i; ProducerRecord<String, UnicornRideRequest> record = new ProducerRecord<String, UnicornRideRequest>(topic, key, rideRequest); producer.send(record, new ProducerCallback()); } }

  • assumeGlueSchemaRegistryRole() como se muestra en el siguiente código, utiliza AWS STS para asumir el rol de IAM de registro de esquema entre cuentas en la cuenta B. (Para obtener más información, consulte Credenciales de seguridad temporales en IAM.) La respuesta de stsClient.assumeRole(roleRequest) contiene las credenciales temporales, que incluyen accessKeyId, secretAccessKey, Y un sessionToken. Luego establece las credenciales temporales en las propiedades del sistema. El AWS SDK para Java utiliza estas credenciales al acceder al Registro de esquemas (a través del serializador de Registro de esquemas). Para más información, ver Uso de credenciales.
    public void assumeGlueSchemaRegistryRole() { try { Region region = Region.of(regionName); if(!Region.regions().contains(region)) throw new RuntimeException("Region : " + regionName + " is invalid."); StsClient stsClient = StsClient.builder().region(region).build(); AssumeRoleRequest roleRequest = AssumeRoleRequest.builder() .roleArn(this.assumeRoleARN) .roleSessionName("kafka-producer-cross-account-glue-schemaregistry-demo") .externalId(this.externalId) .build(); AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest); Credentials myCreds = roleResponse.credentials(); System.setProperty("aws.accessKeyId", myCreds.accessKeyId()); System.setProperty("aws.secretAccessKey", myCreds.secretAccessKey()); System.setProperty("aws.sessionToken", myCreds.sessionToken()); stsClient.close(); } catch (StsException e) { logger.error(e.getMessage()); System.exit(1); } }

  • createUnicornRideRequest() utiliza las clases generadas del esquema Avro (esquema de solicitud de viaje de unicornio) para crear un SpecificRecord. Para esta publicación, los valores de los atributos de solicitud de paseo en unicornio están codificados en este método. Ver el siguiente código:
    public UnicornRideRequest getRecord(int requestId){ /* Initialise UnicornRideRequest object of class that is generated from AVRO Schema */ UnicornRideRequest rideRequest = UnicornRideRequest.newBuilder() .setRequestId(requestId) .setPickupAddress("Melbourne, Victoria, Australia") .setDestinationAddress("Sydney, NSW, Aus") .setRideFare(1200.50F) .setRideDuration(120) .setPreferredUnicornColor(UnicornPreferredColor.WHITE) .setRecommendedUnicorn(RecommendedUnicorn.newBuilder() .setUnicornId(requestId*2) .setColor(unicorn_color.WHITE) .setStarsRating(5).build()) .setCustomer(Customer.newBuilder() .setCustomerAccountNo(1001) .setFirstName("Dummy") .setLastName("User") .setEmailAddresses(Arrays.asList("demo@example.com")) .setCustomerAddress("Flinders Street Station") .setModeOfPayment(ModeOfPayment.CARD) .setCustomerRating(5).build()).build(); logger.info(rideRequest.toString()); return rideRequest; }

Configurar el consumidor de Kafka

El MSKClientStack pila creó el KafkaConsumerInstance instancia para la aplicación de consumidor de Kafka. Puede ver todas las instancias creadas por la pila en la consola de Amazon EC2.

Para configurar el acceso del consumidor de Kafka al registro de esquemas en la cuenta central de AWS, complete los siguientes pasos:

  1. Abra una nueva terminal en el Cloud9EC2Bastion Entorno AWS Cloud9.
  2. SSH en el KafkaConsumerInstance Instancia EC2 y configure la región según sus requisitos:
    ssh -i <keypair PEM file> ec2-user@<KafkaConsumerInstance Private IP address>
    aws configure set region <region>

  3. Establecer la variable de entorno MSK_CLUSTER_ARN apuntando al ARN del clúster de MSK:
    export MSK_CLUSTER_ARN=$(aws kafka list-clusters | jq '.ClusterInfoList[] | select (.ClusterName == "MSKClusterStack") | {ClusterArn} | join (" ")' | tr -d ")

Cambie el .ClusterName valor si usó un nombre diferente para la pila de CloudFormation del clúster de MSK. El nombre del clúster es el mismo que el nombre de la pila.

  1. Establecer la variable de entorno BOOTSTRAP_BROKERS apuntando a los corredores de arranque:
    export BOOTSTRAP_BROKERS=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_CLUSTER_ARN | jq -r .BootstrapBrokerString)

  2. Verifique las variables de entorno:
    echo $MSK_CLUSTER_ARN
    echo $BOOTSTRAP_BROKERS

El MSKClientStack stack copió el archivo JAR del cliente consumidor de Kafka llamado kafka-cross-account-gsr-consumer.jar En el correo electrónico “Su Cuenta de Usuario en su Nuevo Sistema XNUMXCX”. KafkaConsumerInstance ejemplo. Contiene el cliente consumidor de Kafka que lee mensajes del tema de Kafka. unicorn-ride-request-topic en el clúster de MSK y accede a la unicorn-ride-request-schema-avro Esquema Avro de la unicorn-ride-request-registry registro en la cuenta B. El código de consumidor de Kafka, que cubrimos más adelante en esta publicación, está disponible en GitHub.

  1. Ejecute los siguientes comandos y verifique kafka-cross-account-gsr-consumer.jar existe:
    cd ~
    ls -ls

  2. Ejecute el siguiente comando para ejecutar el consumidor de Kafka en el KafkaConsumerInstance Terminal:
    java -jar kafka-cross-account-gsr-consumer.jar -bs $BOOTSTRAP_BROKERS -rn <Account B IAM role arn that Kafka consumer application needs to assume> -topic unicorn-ride-request-topic -reg us-east-1 -externalid <Account B IAM role external Id that you used while creating a CF stack in Account B>

El código tiene los siguientes parámetros:

  • -bs$BOOTSTRAP_BROKERS (los agentes de arranque del clúster de MSK)
  • -rn - El CrossAccountGlueSchemaRegistryRoleArn valor de la SchemaRegistryStack apilar salidas en la cuenta B
  • -tema – El tema de Kafka unicorn-ride-request-topic
  • -regus-east-1 (cámbielo según su región, se usa para el punto de enlace de AWS STS y el registro de esquemas)
  • -Id externo – El mismo ID externo (por ejemplo, demo10A) que usó al crear la pila de CloudFormation en la Cuenta B

La siguiente captura de pantalla muestra los registros del consumidor de Kafka leyendo correctamente los mensajes del tema de Kafka en el clúster de MSK en la cuenta A y accediendo al esquema de Avro. unicorn-ride-request-schema-avro del desplegable unicorn-ride-request-registry registro de esquema en la cuenta B.

Si ve registros similares, significa que ambas aplicaciones de consumo de Kafka han podido conectarse correctamente con el registro de esquemas centralizado en la cuenta B y pueden validar mensajes mientras envían y consumen mensajes del clúster de MSK en la cuenta A.

Código de consumo de Kafka

La implementación completa del consumidor de Kafka está disponible en GitHub. En esta sección, desglosamos el código.

  • getConsumerConfig() inicializa las propiedades del consumidor, como se muestra en el siguiente código:
    • VALUE_DESERIALIZER_CLASS_CONFIG - El GlueSchemaRegistryKafkaDeserializer.class.getName() Implementación del deserializador de AWS que deserializa el SpecificRecord según el ID de esquema codificado del Registro de esquemas (la implementación está disponible en GitHub).
    • AVRO_RECORD_TYPEAvroRecordType.SPECIFIC_RECORD
private Properties getConsumerConfig() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "unicorn.riderequest.consumer"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, regionName); props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName()); return props;
}

  • startConsumer() asume el rol en la Cuenta B para poder conectarse con el Registro de esquemas en la Cuenta B y lee los mensajes del tema de Kafka en el clúster de MSK:
public void startConsumer() { logger.info("starting consumer..."); assumeGlueSchemaRegistryRole(); KafkaConsumer<String, UnicornRideRequest> consumer = new KafkaConsumer<String, UnicornRideRequest>(getConsumerConfig()); consumer.subscribe(Collections.singletonList(topic)); int count = 0; while (true) { final ConsumerRecords<String, UnicornRideRequest> records = consumer.poll(Duration.ofMillis(1000)); for (final ConsumerRecord<String, UnicornRideRequest> record : records) { final UnicornRideRequest rideRequest = record.value(); logger.info(String.valueOf(rideRequest.getRequestId())); logger.info(rideRequest.toString()); } }
}

  • assumeGlueSchemaRegistryRole() como se muestra en el siguiente código, utiliza AWS STS para asumir el rol de IAM de registro de esquema entre cuentas en la cuenta B. La respuesta de stsClient.assumeRole(roleRequest) contiene las credenciales temporales, que incluyen accessKeyId, secretAccessKey, Y un sessionToken. Luego establece las credenciales temporales en las propiedades del sistema. El SDK para Java utiliza estas credenciales al acceder al Registro de esquemas (a través del serializador de Registro de esquemas). Para más información, ver Uso de credenciales.
public void assumeGlueSchemaRegistryRole() { try { Region region = Region.of(regionName); if(!Region.regions().contains(region)) throw new RuntimeException("Region : " + regionName + " is invalid."); StsClient stsClient = StsClient.builder().region(region).build(); AssumeRoleRequest roleRequest = AssumeRoleRequest.builder() .roleArn(this.assumeRoleARN) .roleSessionName("kafka-consumer-cross-account-glue-schemaregistry-demo") .externalId(this.externalId) .build(); AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest); Credentials myCreds = roleResponse.credentials(); System.setProperty("aws.accessKeyId", myCreds.accessKeyId()); System.setProperty("aws.secretAccessKey", myCreds.secretAccessKey()); System.setProperty("aws.sessionToken", myCreds.sessionToken()); stsClient.close(); } catch (StsException e) { logger.error(e.getMessage()); System.exit(1); } }

Compile y genere clases de esquema Avro

Como cualquier otra parte de la creación e implementación de su aplicación, la compilación de esquemas y el proceso de generación de clases de esquemas de Avro deben incluirse en su canalización de CI/CD. Hay varias formas de generar clases de esquema Avro; usamos avro-maven-plugin para esta publicación El proceso CI/CD también puede utilizar avro-tools para compilar el esquema de Avro para generar clases. El siguiente código es un ejemplo de cómo puede utilizar avro-tools:

java -jar /path/to/avro-tools-1.10.2.jar compile schema <schema file> <destination> //compiling unicorn_ride_request.avsc
java -jar avro-tools-1.10.2.jar compile schema unicorn_ride_request.avsc .

Visión general de implementación

En resumen, comenzamos con la definición y el registro de un esquema de Avro para el mensaje de solicitud de paseo en unicornio en el registro de esquemas de AWS Glue en la cuenta B, la cuenta del lago de datos central. En la Cuenta A, creamos un clúster de MSK e instancias EC2 de productor y consumidor de Kafka con su respectivo código de aplicación (kafka-cross-account-gsr-consumer.jar y kafka-cross-account-gsr-producer.jar) y se implementó en ellos utilizando la pila de CloudFormation.

Cuando ejecutamos la aplicación del productor en la Cuenta A, el serializador (GlueSchemaRegistryKafkaSerializer) de la biblioteca SerDe de AWS Glue Schema Registry proporcionada a medida que la configuración obtiene el esquema de solicitud de viaje de unicornio (UnicornRideRequest.avsc) del registro de esquema central que reside en la cuenta B para serializar el mensaje de solicitud de paseo en unicornio. Utiliza el rol de IAM (credenciales temporales) en Cuenta B y Región, nombre de registro de esquema (unicorn-ride-request-registry), y el nombre del esquema (unicorn-ride-request-schema-avro) proporcionada como la configuración para conectarse al registro central de esquemas. Una vez que el mensaje se serializa correctamente, la aplicación del productor lo envía al tema de Kafka (unicorn-ride-request-topic) en el clúster de MSK.

Cuando ejecutamos la aplicación del consumidor en la Cuenta A, el deserializador (GlueSchemaRegistryKafkaDeserializer) de la biblioteca Schema Registry SerDe proporcionada como la configuración extrae el ID de esquema codificado del mensaje leído del tema de Kafka (unicorn-ride-request-topic) y obtiene el esquema para el mismo ID del registro central de esquemas en la cuenta B. Luego, deserializa el mensaje. Utiliza el rol de IAM (credenciales temporales) en la Cuenta B y la Región proporcionada como configuración para conectarse al Registro de esquema central. La aplicación del consumidor también configura los SPECIFIC_RECORD para informar al deserializador que el mensaje es de un tipo específico (solicitud de viaje de unicornio). Una vez que el mensaje se deserializa correctamente, la aplicación del consumidor lo procesa según los requisitos.

Limpiar

El paso final es limpiar. Para evitar cargos innecesarios, debe eliminar todos los recursos creados por las pilas de CloudFormation utilizadas para esta publicación. La forma más sencilla de hacerlo es eliminar las pilas. Primero borra el MSKClusterStack seguido por MSKClientStack de la Cuenta A. Luego elimine el SchemaRegistryStack de la cuenta B.

Conclusión

En esta publicación, demostramos cómo usar AWS Glue Schema Registry con Amazon MSK y aplicaciones de procesamiento de flujo para validar mensajes usando un esquema Avro. Creamos una arquitectura distribuida donde el registro de esquema reside en una cuenta de AWS central (cuenta de lago de datos) y las aplicaciones de productor y consumidor de Kafka residen en una cuenta de AWS separada. Creamos un esquema de Avro en el registro de esquemas de la cuenta central para que los equipos de aplicaciones puedan mantener los esquemas en un solo lugar de manera eficiente. Debido a que AWS Glue Schema Registry admite políticas de acceso basadas en la identidad, utilizamos el rol de IAM entre cuentas para permitir que las aplicaciones de productor y consumidor de Kafka que se ejecutan en una cuenta separada accedan de forma segura al esquema desde la cuenta central para validar los mensajes. Debido a que el esquema de Avro se acordó de antemano, usamos Avro SpecificRecord para garantizar la seguridad de tipos en tiempo de compilación y evitar problemas de validación de esquemas en tiempo de ejecución en el lado del cliente. El código utilizado para esta publicación está disponible en GitHub para referencia.

Para obtener más información sobre los servicios y recursos de esta solución, consulte Registro de esquemas de AWS Glue, la Guía para desarrolladores de Amazon MSK, la Biblioteca SerDe del registro de esquemas de AWS Gluey Tutorial de IAM: delegue el acceso a las cuentas de AWS mediante roles de IAM.


Sobre la autora

Vikas Bajaj es Arquitecto Principal de Soluciones en Amazon Web Service. Vikas trabaja con clientes nativos digitales y los asesora sobre arquitectura y modelado de tecnología, y opciones y soluciones para cumplir con los objetivos comerciales estratégicos. Se asegura de que los diseños y las soluciones sean eficientes, sostenibles y adecuados para las necesidades comerciales actuales y futuras. Aparte de las discusiones sobre arquitectura y tecnología, le gusta ver y jugar al cricket.

punto_img

Información más reciente

punto_img