Logotipo de Zephyrnet

Realice copias de seguridad y restaure datos de temas de Kafka con Amazon MSK Connect

Fecha:

Puede usar Apache Kafka para ejecutar sus cargas de trabajo de transmisión. Kafka brinda resiliencia ante fallas y protege sus datos desde el primer momento mediante la replicación de datos entre los intermediarios del clúster. Esto garantiza que los datos del clúster sean duraderos. Puede lograr sus SLA de durabilidad cambiando el factor de replicación del tema. Sin embargo, la transmisión de datos almacenada en temas de Kafka tiende a ser transitoria y, por lo general, tiene un tiempo de retención de días o semanas. Es posible que desee realizar una copia de seguridad de los datos almacenados en su tema de Kafka mucho después de que expire su tiempo de retención por varias razones. Por ejemplo, es posible que tenga requisitos de cumplimiento que requieran que almacene los datos durante varios años. O puede haber seleccionado datos sintéticos que deben hidratarse repetidamente en temas de Kafka antes de comenzar las pruebas de integración de su carga de trabajo. O un sistema ascendente sobre el que no tiene control produce datos incorrectos y necesita restaurar su tema a un buen estado anterior.

El almacenamiento de datos de forma indefinida en los temas de Kafka es una opción, pero a veces el caso de uso requiere una copia separada. Herramientas como MirrorMaker le permiten hacer una copia de seguridad de sus datos en otro clúster de Kafka. Sin embargo, esto requiere que se ejecute otro clúster de Kafka activo como respaldo, lo que aumenta los costos de cómputo y almacenamiento. Una forma rentable y duradera de hacer una copia de seguridad de los datos de su clúster de Kafka es usar un servicio de almacenamiento de objetos como Servicio de almacenamiento simple de Amazon (Amazon S3).

En esta publicación, analizamos una solución que le permite hacer una copia de seguridad de sus datos para almacenamiento en frío usando Conexión de Amazon MSK. Restauramos los datos respaldados a otro tema de Kafka y restablecemos las compensaciones del consumidor según su caso de uso.

Resumen de la solución

Conexión Kafka es un componente de Apache Kafka que simplifica la transmisión de datos entre temas de Kafka y sistemas externos como almacenes de objetos, bases de datos y sistemas de archivos. Utiliza conectores de receptor para transmitir datos de temas de Kafka a sistemas externos y conectores de origen para transmitir datos de sistemas externos a temas de Kafka. Puede utilizar conectores listos para usar escritos por terceros o escribir sus propios conectores para cumplir con sus requisitos específicos.

MSK Connect es una función de Streaming administrado por Amazon para Apache Kafka (Amazon MSK) que le permite ejecutar cargas de trabajo de Kafka Connect completamente administradas. Funciona con clústeres de MSK y con clústeres de Kafka autogestionados compatibles. En esta publicación, usamos el Lentes Conector AWS S3 para hacer una copia de seguridad de los datos almacenados en un tema en un clúster de Amazon MSK en Amazon S3 y restaurar estos datos en otro tema. El siguiente diagrama muestra la arquitectura de nuestra solución.

Para implementar esta solución, completamos los siguientes pasos:

  1. Realice una copia de seguridad de los datos mediante un conector receptor de MSK Connect en un depósito S3.
  2. Restaure los datos mediante un conector de origen de MSK Connect en un nuevo tema de Kafka.
  3. Restablezca las compensaciones de los consumidores en función de diferentes escenarios.

Requisitos previos

Asegúrese de completar los siguientes pasos como requisitos previos:

  1. Configurar los recursos necesarios para Amazon MSK, Amazon S3 y Gestión de identidades y accesos de AWS (SOY).
  2. Cree dos temas de Kafka en el clúster de MSK: source_topic y target_topic.
  3. Crear un complemento de MSK Connect usando el Lentes Conector AWS S3.
  4. Instale Kafka CLI siguiendo el Paso 1 de Inicio rápido de Apache Kafka.
  5. Instale la utilidad kcat para enviar mensajes de prueba al tema de Kafka.

Haz una copia de seguridad de tus temas

Según el caso de uso, es posible que desee hacer una copia de seguridad de todos los temas en su clúster de Kafka o hacer una copia de seguridad de algunos temas específicos. En esta publicación, cubrimos cómo hacer una copia de seguridad de un solo tema, pero puede ampliar la solución para hacer una copia de seguridad de varios temas.

El formato en el que se almacenan los datos en Amazon S3 es importante. Es posible que desee inspeccionar los datos almacenados en Amazon S3 para depurar problemas como la introducción de datos incorrectos. Puede examinar los datos almacenados como JSON o texto sin formato mediante el uso de editores de texto y buscando en los marcos de tiempo que le interesen. También puede examinar grandes cantidades de datos almacenados en Amazon S3 como JSON o Parquet utilizando servicios de AWS como Atenea amazónica. Lenses AWS S3 Connector admite el almacenamiento de objetos como JSON, Avro, Parquet, texto sin formato o binario.

En esta publicación, enviamos datos JSON al tema de Kafka y los almacenamos en Amazon S3. Según el tipo de datos que cumpla con sus requisitos, actualice el connect.s3.kcql declaración y *.converter configuración. Puedes referirte a la Documentación del conector del disipador de lentes para obtener detalles sobre los formatos admitidos y las configuraciones relacionadas. Si los conectores existentes no funcionan para su caso de uso, también puede escribe tu propio conector o ampliar los conectores existentes. Puede particionar los datos almacenados en Amazon S3 según los campos de tipos primitivos en el encabezado del mensaje o la carga útil. Usamos los campos de fecha almacenados en el encabezado para particionar los datos en Amazon S3.

Siga estos pasos para hacer una copia de seguridad de su tema:

  1. Cree un nuevo conector de sumidero de Amazon MSK ejecutando el siguiente comando:
    aws kafkaconnect create-connector --capacity "autoScaling={maxWorkerCount=2,mcuCount=1,minWorkerCount=1,scaleInPolicy={cpuUtilizationPercentage=10},scaleOutPolicy={cpuUtilizationPercentage=80}}" --connector-configuration "connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector, key.converter.schemas.enable=false, connect.s3.kcql=INSERT IGNORE INTO <<S3 Bucket Name>>:my_workload SELECT * FROM source_topic PARTITIONBY _header.year,_header.month,_header.day,_header.hour STOREAS `JSON` WITHPARTITIONER=KeysAndValues WITH_FLUSH_COUNT = 5, aws.region=us-east-1, tasks.max=2, topics=source_topic, schema.enable=false, errors.log.enable=true, value.converter=org.apache.kafka.connect.storage.StringConverter, key.converter=org.apache.kafka.connect.storage.StringConverter " --connector-name "backup-msk-to-s3-v1" --kafka-cluster '{"apacheKafkaCluster": {"bootstrapServers": "<<MSK broker list>>","vpc": {"securityGroups": [ <<Security Group>> ],"subnets": [ <<Subnet List>&
    gt; ]}}}' --kafka-cluster-client-authentication "authenticationType=NONE" --kafka-cluster-encryption-in-transit "encryptionType=PLAINTEXT" --kafka-connect-version "2.7.1" --plugins "customPlugin={customPluginArn=<< ARN of the MSK Connect Plugin >>,revision=1}" --service-execution-role-arn " <<ARN of the IAM Role>> "

  2. Enviar datos al tema usando kcat:
    ./kcat -b <<broker list>> -t source_topic -H "year=$(date +"%Y")" -H "month=$(date +"%m")" -H "day=$(date +"%d")" -H "hour=$(date +"%H")" -P
    {"message":"interesset eros vel elit salutatus"}
    {"message":"impetus deterruisset per aliquam luctus"}
    {"message":"ridens vocibus feugait vitae cras"}
    {"message":"interesset eros vel elit salutatus"}
    {"message":"impetus deterruisset per aliquam luctus"}
    {"message":"ridens vocibus feugait vitae cras"}

  3. Compruebe el depósito de S3 para asegurarse de que se están escribiendo los datos.

MSK Connect publica métricas para Reloj en la nube de Amazon que puede usar para monitorear su proceso de copia de seguridad. Las métricas importantes son SinkRecordReadRate y SinkRecordSendRate, que miden la cantidad promedio de registros leídos de Kafka y escritos en Amazon S3, respectivamente.

Además, asegúrese de que el conector de respaldo se mantenga al día con la velocidad a la que el tema de Kafka recibe mensajes al monitorear el retraso de compensación del conector. Si utiliza Amazon MSK, puede hacerlo de la siguiente manera: activar métricas a nivel de partición en Amazon MSK y monitorear el OffsetLag métrica de todas las particiones para el grupo de consumidores del conector de copia de seguridad. Debe mantener esto lo más cerca posible de 0 ajustando la cantidad máxima de instancias de trabajo de MSK Connect. El comando que usamos en el paso anterior configura MSK Connect para escalar automáticamente hasta dos trabajadores. Ajustar el --capacity configuración para aumentar o disminuir el número máximo de trabajadores de MSK Connect según el OffsetLag métrico.

Restaurar datos a sus temas

Puede restaurar sus datos respaldados a un nuevo tema con el mismo nombre en el mismo clúster de Kafka, un tema diferente en el mismo clúster de Kafka o un tema diferente en un clúster de Kafka diferente por completo. En esta publicación, analizamos el escenario de restauración de datos de los que se realizó una copia de seguridad en Amazon S3 a un tema diferente, target_topic, en el mismo clúster de Kafka. Puede extender esto a otros escenarios cambiando el tema y los detalles del intermediario en la configuración del conector.

Siga estos pasos para restaurar los datos:

  1. Cree un conector de origen de Amazon MSK ejecutando el siguiente comando:
    aws kafkaconnect create-connector --capacity "autoScaling={maxWorkerCount=2,mcuCount=1,minWorkerCount=1,scaleInPolicy={cpuUtilizationPercentage=10},scaleOutPolicy={cpuUtilizationPercentage=80}}" --connector-configuration "connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector, key.converter.schemas.enable=false, connect.s3.kcql=INSERT IGNORE INTO target_topic SELECT * FROM <<S3 Bucket Name>>:my_workload PARTITIONBY _header.year,_header.month,_header.day,_header.hour STOREAS `JSON` WITHPARTITIONER=KeysAndValues WITH_FLUSH_COUNT = 5 , aws.region=us-east-1, tasks.max=2, topics=target_topic, schema.enable=false, errors.log.enable=true, value.converter=org.apache.kafka.connect.storage.StringConverter, key.converter=org.apache.kafka.connect.storage.StringConverter " --connector-name "restore-s3-to-msk-v1" --kafka-cluster '{"apacheKafkaCluster": {"bootstrapServers": "<<MSK broker list>>","vpc": {"securityGroups": [<<Security Group>>],"subnets": [ <<Subnet List>> ]}}}' --kafka-cluster-client-authentication "authenticationType=NONE" --kafka-cluster-encryption-in-transit "encryptionType=PLAINTEXT" --kafka-connect-version "2.7.1" --plugins "customPlugin={customPluginArn=<< ARN of the MSK Connect Plugin >>,revision=1}" --service-execution-role-arn " <<ARN of the IAM Role>> "

El conector lee los datos del depósito S3 y los vuelve a reproducir para target_topic.

  1. Verifique si los datos se escriben en el tema de Kafka ejecutando el siguiente comando:
    ./kafka-console-consumer.sh --bootstrap-server <<MSK broker list>> --topic target_topic --from-beginning

Los conectores de MSK Connect se ejecutan indefinidamente, a la espera de que se escriban nuevos datos en el origen. Sin embargo, durante la restauración, debe detener el conector después de que todos los datos se hayan copiado en el tema. MSK Connect publica el SourceRecordPollRate y SourceRecordWriteRate métricas a CloudWatch, que miden la cantidad promedio de registros sondeados de Amazon S3 y la cantidad de registros escritos en el clúster de Kafka, respectivamente. Puede monitorear estas métricas para rastrear el estado del proceso de restauración. Cuando estas métricas llegan a 0, los datos de Amazon S3 se restauran al target_topic. Puede recibir una notificación de la finalización configurando una alarma de CloudWatch en estas métricas. Puede extender la automatización para invocar un AWS Lambda función que elimina el conector cuando se completa la restauración.

Al igual que con el proceso de copia de seguridad, puede acelerar el proceso de restauración escalando la cantidad de trabajadores de MSK Connect. Cambiar el --capacity parámetro para ajustar los trabajadores máximo y mínimo a un número que cumpla con los SLA de restauración de su carga de trabajo.

Restablecer compensaciones de consumidores

Dependiendo de los requisitos para restaurar los datos a un nuevo tema de Kafka, es posible que también deba restablecer las compensaciones del grupo de consumidores antes de consumirlos o producirlos. Identificar el desplazamiento real al que desea restablecer depende de su caso de uso comercial específico e implica un trabajo manual para identificarlo. Puede utilizar herramientas como Amazon S3 Select, Athena u otras herramientas personalizadas para inspeccionar los objetos. La siguiente captura de pantalla muestra la lectura de los registros que terminan en desplazamiento 14 de partición 2 de tema source_topic utilizando S3 Seleccionar.

Después de identificar las nuevas compensaciones de inicio para sus grupos de consumidores, debe restablecerlas en su clúster de Kafka. Puede hacerlo con las herramientas de la CLI que se incluyen con Kafka.

Grupos de consumidores existentes

Si desea utilizar el mismo nombre de grupo de consumidores después de restaurar el tema, puede hacerlo ejecutando el siguiente comando para cada partición del tema restaurado:

 ./kafka-consumer-groups.sh --bootstrap-server <<broker list>> --group <<consumer group>> --topic target_topic:<<partition>> --to-offset <<desired offset>> --reset-offsets --execute

Verifique esto ejecutando el --describe opción del comando:

./kafka-consumer-groups.sh --bootstrap-server <<brok
er list>> --group <<consumer group>> --describe
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG ...
source_topic 0 211006 188417765 188206759 ...
source_topic 1 212847 192997707 192784860 ...
source_topic 2 211147 196410627 196199480 ...
target_topic 0 211006 188417765 188206759 ...
target_topic 1 212847 192997707 192784860 ...
target_topic 2 211147 196410627 196199480 ...

Nuevo grupo de consumidores

Si desea que su carga de trabajo cree un nuevo grupo de consumidores y busque compensaciones personalizadas, puede hacerlo invocando el buscar método en su consumidor de Kafka para cada partición. Como alternativa, puede crear el nuevo grupo de consumidores ejecutando el siguiente código:

./kafka-console-consumer.sh --bootstrap-server <<broker list>> --topic target_topic --group <<consumer group>> --from-beginning --max-messages 1

Restablezca el desplazamiento a los desplazamientos deseados para cada partición ejecutando el siguiente comando:

./kafka-consumer-groups.sh --bootstrap-server <<broker list>> --group <<New consumer group>> --topic target_topic:<<partition>> --to-offset <<desired offset>> --reset-offsets --execute

Limpiar

Para evitar incurrir en cargos continuos, complete los siguientes pasos de limpieza:

  1. Elimine los conectores y el complemento de MSK Connect.
  2. Elimine el clúster de MSK.
  3. Elimine los cubos de S3.
  4. Elimine cualquier recurso de CloudWatch que haya creado.

Conclusión

En esta publicación, le mostramos cómo realizar copias de seguridad y restaurar datos de temas de Kafka mediante MSK Connect. Puede extender esta solución a múltiples temas y otros formatos de datos según su carga de trabajo. Asegúrese de probar varios escenarios a los que pueden enfrentarse sus cargas de trabajo y documente el runbook para cada uno de esos escenarios.

Para obtener más información, consulte los siguientes recursos:


Sobre la autora

Rakshit Rao es arquitecto sénior de soluciones en AWS. Trabaja con los clientes estratégicos de AWS para crear y operar sus cargas de trabajo clave en AWS.

punto_img

Información más reciente

punto_img