Logotipo de Zephyrnet

Cree una canalización de transmisión sin servidor de un extremo a otro con Apache Kafka en Amazon MSK usando Python | Servicios web de Amazon

Fecha:

El volumen de datos generados a nivel mundial continúa aumentando, desde juegos, comercio minorista y finanzas hasta manufactura, atención médica y viajes. Las organizaciones buscan más formas de utilizar rápidamente el flujo constante de datos para innovar para sus empresas y clientes. Tienen que capturar, procesar, analizar y cargar los datos de manera confiable en una infinidad de almacenes de datos, todo en tiempo real.

Apache Kafka es una opción popular para estas necesidades de transmisión en tiempo real. Sin embargo, puede resultar complicado configurar un clúster de Kafka junto con otros componentes de procesamiento de datos que se escalan automáticamente según las necesidades de su aplicación. Corre el riesgo de no aprovisionar suficiente para el tráfico pico, lo que puede provocar tiempo de inactividad, o de aprovisionar demasiado para la carga base, lo que genera desperdicio. AWS ofrece múltiples servicios sin servidor como Streaming administrado por Amazon para Apache Kafka (Amazon MSK), Manguera de datos de Amazon, Amazon DynamoDBy AWS Lambda que escala automáticamente dependiendo de sus necesidades.

En esta publicación, explicamos cómo puede utilizar algunos de estos servicios, incluido MSK sin servidor, para crear una plataforma de datos sin servidor que satisfaga sus necesidades en tiempo real.

Resumen de la solución

Imaginemos un escenario. Usted es responsable de administrar miles de módems para un proveedor de servicios de Internet implementados en múltiples geografías. Quiere monitorear la calidad de la conectividad del módem, lo que tiene un impacto significativo en la productividad y satisfacción del cliente. Su implementación incluye diferentes módems que deben monitorearse y mantenerse para garantizar un tiempo de inactividad mínimo. Cada dispositivo transmite miles de registros de 1 KB por segundo, como el uso de la CPU, el uso de la memoria, las alarmas y el estado de la conexión. Quiere acceso en tiempo real a estos datos para poder monitorear el rendimiento en tiempo real y detectar y mitigar problemas rápidamente. También necesita acceso a más largo plazo a estos datos para que los modelos de aprendizaje automático (ML) ejecuten evaluaciones de mantenimiento predictivo, encuentren oportunidades de optimización y pronostiquen la demanda.

Los clientes que recopilan los datos en el sitio están escritos en Python y pueden enviar todos los datos como temas de Apache Kafka a Amazon MSK. Para el acceso a datos en tiempo real y de baja latencia de su aplicación, puede utilizar Lambda y DynamoDB. Para el almacenamiento de datos a largo plazo, puede utilizar el servicio de conector administrado sin servidor Manguera de datos de Amazon para enviar datos a su lago de datos.

El siguiente diagrama muestra cómo puede crear esta aplicación sin servidor de un extremo a otro.

aplicación sin servidor de un extremo a otro

Sigamos los pasos de las siguientes secciones para implementar esta arquitectura.

Cree un clúster Kafka sin servidor en Amazon MSK

Usamos Amazon MSK para ingerir datos de telemetría en tiempo real desde módems. Crear un clúster Kafka sin servidor es sencillo en Amazon MSK. Sólo toma unos minutos usar el Consola de administración de AWS o SDK de AWS. Para utilizar la consola, consulte Introducción al uso de clústeres sin servidor de MSK. Creas un clúster sin servidor, Gestión de identidades y accesos de AWS (IAM) y máquina cliente.

Crear un tema de Kafka usando Python

Cuando su clúster y su máquina cliente estén listos, conecte SSH a su máquina cliente e instale Kafka Python y la biblioteca MSK IAM para Python.

pip install kafka-python

pip install aws-msk-iam-sasl-signer-python

  • Crea un nuevo archivo llamado createTopic.py.
  • Copie el siguiente código en este archivo, reemplazando el bootstrap_servers y region información con los detalles de su clúster. Para obtener instrucciones sobre cómo recuperar el bootstrap_servers información para su clúster MSK, consulte Obtención de los agentes de arranque para un clúster de Amazon MSK.
from kafka.admin import KafkaAdminClient, NewTopic
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

# AWS region where MSK cluster is located
region= '<UPDATE_AWS_REGION_NAME_HERE>'

# Class to provide MSK authentication token
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

# Create an instance of MSKTokenProvider class
tp = MSKTokenProvider()

# Initialize KafkaAdminClient with required configurations
admin_client = KafkaAdminClient(
    bootstrap_servers='<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>',
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id='client1',
)

# create topic
topic_name="mytopic"
topic_list =[NewTopic(name=topic_name, num_partitions=1, replication_factor=2)]
existing_topics = admin_client.list_topics()
if(topic_name not in existing_topics):
    admin_client.create_topics(topic_list)
    print("Topic has been created")
else:
    print("topic already exists!. List of topics are:" + str(existing_topics))

  • Ejecute el createTopic.py script para crear un nuevo tema de Kafka llamado mytopic en su clúster sin servidor:
python createTopic.py

Producir registros usando Python

Generemos algunos datos de telemetría del módem de muestra.

  • Crea un nuevo archivo llamado kafkaDataGen.py.
  • Copie el siguiente código en este archivo, actualizando el BROKERS y region información con los detalles de su cluster:
from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import json
import random
from datetime import datetime
topicname='mytopic'

BROKERS = '<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>'
region= '<UPDATE_AWS_REGION_NAME_HERE>'
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

tp = MSKTokenProvider()

producer = KafkaProducer(
    bootstrap_servers=BROKERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    retry_backoff_ms=500,
    request_timeout_ms=20000,
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,)

# Method to get a random model name
def getModel():
    products=["Ultra WiFi Modem", "Ultra WiFi Booster", "EVG2000", "Sagemcom 5366 TN", "ASUS AX5400"]
    randomnum = random.randint(0, 4)
    return (products[randomnum])

# Method to get a random interface status
def getInterfaceStatus():
    status=["connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "down", "down"]
    randomnum = random.randint(0, 13)
    return (status[randomnum])

# Method to get a random CPU usage
def getCPU():
    i = random.randint(50, 100)
    return (str(i))

# Method to get a random memory usage
def getMemory():
    i = random.randint(1000, 1500)
    return (str(i))
    
# Method to generate sample data
def generateData():
    
    model=getModel()
    deviceid='dvc' + str(random.randint(1000, 10000))
    interface='eth4.1'
    interfacestatus=getInterfaceStatus()
    cpuusage=getCPU()
    memoryusage=getMemory()
    now = datetime.now()
    event_time = now.strftime("%Y-%m-%d %H:%M:%S")
    
    modem_data={}
    modem_data["model"]=model
    modem_data["deviceid"]=deviceid
    modem_data["interface"]=interface
    modem_data["interfacestatus"]=interfacestatus
    modem_data["cpuusage"]=cpuusage
    modem_data["memoryusage"]=memoryusage
    modem_data["event_time"]=event_time
    return modem_data

# Continuously generate and send data
while True:
    data =generateData()
    print(data)
    try:
        future = producer.send(topicname, value=data)
        producer.flush()
        record_metadata = future.get(timeout=10)
        
    except Exception as e:
        print(e.with_traceback())

  • Ejecute el kafkaDataGen.py para generar continuamente datos aleatorios y publicarlos en el tema de Kafka especificado:
python kafkaDataGen.py

Almacenar eventos en Amazon S3

Ahora almacena todos los datos del evento sin procesar en un Servicio de almacenamiento simple de Amazon (Amazon S3) lago de datos para análisis. Puede utilizar los mismos datos para entrenar modelos de ML. El integración con Amazon Data Firehose permite a Amazon MSK cargar datos sin problemas desde sus clústeres de Apache Kafka en un lago de datos de S3. Complete los siguientes pasos para transmitir datos continuamente desde Kafka a Amazon S3, eliminando la necesidad de crear o administrar sus propias aplicaciones de conector:

  • En la consola de Amazon S3, cree un nuevo depósito. También puede utilizar un depósito existente.
  • Cree una nueva carpeta en su depósito S3 llamada streamingDataLake.
  • En la consola de Amazon MSK, elija su clúster MSK Serverless.
  • En Acciones menú, seleccione Editar política de clúster.

política de clúster

  • Seleccione Incluir director de servicio de Firehose y elige Guardar los cambios.

director de servicio de manguera contra incendios

  • En Entrega S3 pestaña, elegir Crear flujo de entrega.

flujo de entrega

  • Fuente, escoger Amazon MSK.
  • Destino, escoger Amazon S3.

origen y destino

  • Conectividad del clúster de Amazon MSK, seleccione Corredores de arranque privados.
  • Tema, ingresa un nombre de tema (para esta publicación, mytopic).

configuración de fuente

  • Cucharón S3, escoger Explorar y elija su cubo S3.
  • Participar streamingDataLake como prefijo de depósito S3.
  • Participar streamingDataLakeErr como prefijo de salida de error del depósito S3.

configuración de destino

  • Elige Crear flujo de entrega.

crear flujo de entrega

Puede verificar que los datos se escribieron en su depósito S3. Deberías ver que el streamingDataLake Se creó el directorio y los archivos se almacenan en particiones.

amazon s3

Almacenar eventos en DynamoDB

Para el último paso, almacena los datos del módem más recientes en DynamoDB. Esto permite que la aplicación cliente acceda al estado del módem e interactúe con el módem de forma remota desde cualquier lugar, con baja latencia y alta disponibilidad. Lambda funciona perfectamente con Amazon MSK. Lambda sondea internamente nuevos mensajes desde el origen del evento y luego invoca sincrónicamente la función Lambda de destino. Lambda lee los mensajes en lotes y los proporciona a su función como una carga útil de evento.

Primero creemos una tabla en DynamoDB. Referirse a Permisos de la API de DynamoDB: referencia de acciones, recursos y condiciones para verificar que su máquina cliente tenga los permisos necesarios.

  • Crea un nuevo archivo llamado createTable.py.
  • Copie el siguiente código en el archivo, actualizando el region :
import boto3
region='<UPDATE_AWS_REGION_NAME_HERE>'
dynamodb = boto3.client('dynamodb', region_name=region)
table_name = 'device_status'
key_schema = [
    {
        'AttributeName': 'deviceid',
        'KeyType': 'HASH'
    }
]
attribute_definitions = [
    {
        'AttributeName': 'deviceid',
        'AttributeType': 'S'
    }
]
# Create the table with on-demand capacity mode
dynamodb.create_table(
    TableName=table_name,
    KeySchema=key_schema,
    AttributeDefinitions=attribute_definitions,
    BillingMode='PAY_PER_REQUEST'
)
print(f"Table '{table_name}' created with on-demand capacity mode.")

  • Ejecute el createTable.py script para crear una tabla llamada device_status en DynamoDB:
python createTable.py

Ahora configuremos la función Lambda.

  • En la consola Lambda, elija Clave en el panel de navegación.
  • Elige Crear función.
  • Seleccione Autor desde cero.
  • Nombre de la función¸ ingrese un nombre (por ejemplo, my-notification-kafka).
  • Runtime, escoger 3.11 Python.
  • Permisos, seleccione Use un rol existente y elige un rol con permisos para leer desde su cluster.
  • Crea la función.

En la página de configuración de la función Lambda, ahora puede configurar fuentes, destinos y el código de su aplicación.

  • Elige Agregar disparador.
  • Configuración de disparo, introduzca MSK para configurar Amazon MSK como desencadenador de la función fuente de Lambda.
  • Clúster MSK, introduzca myCluster.
  • Deseleccionar Activar gatillo, porque aún no ha configurado su función Lambda.
  • Tamaño del lote, introduzca 100.
  • Posición inicial, escoger Últimos.
  • Nombre del tema¸ ingrese un nombre (por ejemplo, mytopic).
  • Elige Añada.
  • En la página de detalles de la función Lambda, en el Código pestaña, ingrese el siguiente código:
import base64
import boto3
import json
import os
import random

def convertjson(payload):
    try:
        aa=json.loads(payload)
        return aa
    except:
        return 'err'

def lambda_handler(event, context):
    base64records = event['records']['mytopic-0']
    
    raw_records = [base64.b64decode(x["value"]).decode('utf-8') for x in base64records]
    
    for record in raw_records:
        item = json.loads(record)
        deviceid=item['deviceid']
        interface=item['interface']
        interfacestatus=item['interfacestatus']
        cpuusage=item['cpuusage']
        memoryusage=item['memoryusage']
        event_time=item['event_time']
        
        dynamodb = boto3.client('dynamodb')
        table_name = 'device_status'
        item = {
            'deviceid': {'S': deviceid},  
            'interface': {'S': interface},               
            'interface': {'S': interface},
            'interfacestatus': {'S': interfacestatus},
            'cpuusage': {'S': cpuusage},          
            'memoryusage': {'S': memoryusage},
            'event_time': {'S': event_time},
        }
        
        # Write the item to the DynamoDB table
        response = dynamodb.put_item(
            TableName=table_name,
            Item=item
        )
        
        print(f"Item written to DynamoDB")

  • Implemente la función Lambda.
  • En Configuración pestaña, elegir Editar para editar el disparador.

editar disparador

  • Seleccione el disparador, luego elija Guardar.
  • En la consola DynamoDB, elija Explorar elementos en el panel de navegación.
  • Selecciona la mesa device_status.

Verá que Lambda está escribiendo eventos generados en el tema de Kafka en DynamoDB.

tabla ddb

Resumen

Los canales de transmisión de datos son fundamentales para crear aplicaciones en tiempo real. Sin embargo, configurar y gestionar la infraestructura puede resultar desalentador. En esta publicación, explicamos cómo crear una canalización de transmisión sin servidor en AWS utilizando Amazon MSK, Lambda, DynamoDB, Amazon Data Firehose y otros servicios. Los beneficios clave son la ausencia de servidores que administrar, la escalabilidad automática de la infraestructura y un modelo de pago por uso que utiliza servicios totalmente administrados.

¿Listo para construir su propio canal en tiempo real? Comience hoy con una cuenta de AWS gratuita. Con el poder de la tecnología sin servidor, puede concentrarse en la lógica de su aplicación mientras AWS se encarga del trabajo pesado indiferenciado. ¡Construyamos algo increíble en AWS!


Acerca de los autores

Masudur Rahaman Sayem es Arquitecto de datos de transmisión en AWS. Trabaja con clientes de AWS en todo el mundo para diseñar y crear arquitecturas de transmisión de datos para resolver problemas comerciales del mundo real. Se especializa en la optimización de soluciones que utilizan servicios de transmisión de datos y NoSQL. Sayem es un apasionado de la computación distribuida.

Michael Oguike es gerente de producto de Amazon MSK. Le apasiona utilizar datos para descubrir conocimientos que impulsen la acción. Le gusta ayudar a clientes de una amplia gama de industrias a mejorar sus negocios mediante la transmisión de datos. A Michael también le encanta aprender sobre ciencias del comportamiento y psicología a través de libros y podcasts.

punto_img

Información más reciente

punto_img