Logotipo de Zephyrnet

Amazon Managed Service para Apache Flink ahora es compatible con la versión 1.18 de Apache Flink | Servicios web de Amazon

Fecha:

Apache Flink es un motor de procesamiento distribuido de código abierto que ofrece potentes interfaces de programación para procesamiento por secuencias y por lotes, con soporte de primera clase para procesamiento con estado y semántica de tiempo de eventos. Apache Flink admite múltiples lenguajes de programación, Java, Python, Scala, SQL y múltiples API con diferentes niveles de abstracción, que se pueden usar indistintamente en la misma aplicación.

Servicio administrado de Amazon para Apache Flink, que ofrece una experiencia sin servidor totalmente administrada al ejecutar aplicaciones Apache Flink, ahora admite ApacheFlink 1.18.1, la última versión de Apache Flink en el momento de escribir este artículo.

En esta publicación, analizamos algunas de las nuevas características y capacidades interesantes de Apache Flink, introducidas con las versiones principales más recientes, 1.16, 1.17 y 1.18, y ahora compatibles con Managed Service para Apache Flink.

Nuevos conectores

Antes de sumergirnos en las nuevas funcionalidades de Apache Flink disponibles con la versión 1.18.1, exploremos las nuevas capacidades que surgen de la disponibilidad de muchos conectores nuevos de código abierto.

Opensearch

Un dedicado Opensearch El conector ahora está disponible para incluirlo en sus proyectos, lo que permite que una aplicación Apache Flink escriba datos directamente en OpenSearch, sin depender del modo de compatibilidad de Elasticsearch. Este conector es compatible con Servicio Amazon OpenSearch provisionado y Servicio OpenSearch sin servidor.

Este nuevo conector admite API de tablas y SQL, trabajando con Java y Python, y el API de transmisión de datos, sólo para Java. Desde el primer momento, proporciona garantías de al menos una vez, sincronizando las escrituras con los puntos de control de Flink. Puede lograr una semántica exactamente una vez utilizando ID deterministas y el método upsert.

De forma predeterminada, el conector utiliza bibliotecas cliente OpenSearch versión 1.x. Puede cambiar a la versión 2.x haciendo agregando las dependencias correctas.

Amazon DynamoDB

Los desarrolladores de Apache Flink ahora pueden usar un conector dedicado para escribir datos Amazon DynamoDB. Este conector se basa en el Apache Flink AsyncSink, desarrollado por AWS y ahora parte integral del proyecto Apache Flink, para simplificar la implementación de conectores receptores eficientes, utilizando solicitudes de escritura sin bloqueo y procesamiento por lotes adaptativo.

Este conector también admite ambos SQL y tabla API, Java y Python, y Flujo de datos API, solo para Java. De forma predeterminada, el receptor escribe en lotes para optimizar el rendimiento. Una característica notable de la versión SQL es la compatibilidad con la cláusula PARTITIONED BY. Al especificar una o más claves, puede lograr cierta deduplicación del lado del cliente, enviando solo el último registro por clave con cada escritura por lotes. Se puede lograr un equivalente con la API DataStream especificando una lista de claves de partición para sobrescribir dentro de cada lote.

Este conector sólo funciona como fregadero. No puede usarlo para leer desde DynamoDB. Para buscar datos en DynamoDB, aún necesita implementar una búsqueda usando el API de E/S asíncrona de Flink o implementar una función personalizada definida por el usuario (UDF), para SQL.

MongoDB

Otro conector interesante es para MongoDB. En este caso, tanto la fuente como el sumidero están disponibles, tanto para el SQL y tabla API y Flujo de datos API. El nuevo conector ahora forma parte oficialmente del proyecto Apache Flink y cuenta con el respaldo de la comunidad. Este nuevo conector reemplaza al anterior proporcionado directamente por MongoDB, que solo admite las API de Flink Sink y Source más antiguas.

En cuanto a otros conectores de almacén de datos, la fuente se puede utilizar como fuente limitada, en modo por lotes o para búsquedas. El receptor funciona tanto en modo por lotes como en streaming, y admite tanto el modo upsert como el de adición.

Entre las muchas características notables de este conector, una que vale la pena mencionar es la capacidad de habilitar el almacenamiento en caché cuando se utiliza la fuente para búsquedas. Fuera de la caja, el fregadero admite al menos una garantía. Cuando se define una clave principal, el receptor puede admitir semántica exactamente una vez a través de inserciones idempotentes. El conector receptor también admite semántica exactamente una vez, con inserciones idempotentes, cuando se define la clave principal.

Versionado del nuevo conector

No es una característica nueva, pero sí un factor importante a considerar al actualizar una aplicación Apache Flink anterior, es el nuevo control de versiones del conector. A partir de la versión 1.17 de Apache Flink, la mayoría de los conectores se han externalizado desde la distribución principal de Apache Flink y siguen versiones independientes.

Para incluir la dependencia correcta, debe especificar la versión del artefacto con el formulario: <connector-version>-<flink-version>

Por ejemplo, el último conector Kafka, que también funciona con Streaming administrado por Amazon para Apache Kafka (Amazon MSK), en el momento de escribir este artículo es la versión 3.1.0. Si estás utilizando Apache Flink 1.18, la dependencia a utilizar será la siguiente:

<dependency> 
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId> 
    <version>3.1.0-1.18</version>
</dependency>

Kinesis amazónica, la nueva versión del conector es 4.2.0. La dependencia para Apache Flink 1.18 será la siguiente:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kinesis</artifactId> 
    <version>4.2.0-1.18</version>
</dependency>

En las siguientes secciones, analizamos más funciones nuevas y potentes que ahora están disponibles en Apache Flink 1.18 y son compatibles con Amazon Managed Service para Apache Flink.

SQL

En Apache Flink SQL, los usuarios pueden proporcionar consejos para unir consultas que se pueden utilizar para sugerir que el optimizador tenga un efecto en el plan de consultas. En particular, en aplicaciones de streaming, uniones de búsqueda se utilizan para enriquecer una tabla, que representa datos en streaming, con datos que se consultan desde un sistema externo, normalmente una base de datos. Desde la versión 1.16, se han introducido varias mejoras para las uniones de búsqueda, lo que le permite ajustar el comportamiento de la unión y mejorar el rendimiento:

  • caché de búsqueda es una característica poderosa que le permite almacenar en caché en memoria los registros utilizados con más frecuencia, lo que reduce la presión sobre la base de datos. Anteriormente, la caché de búsqueda era específica de algunos conectores. Desde Apache Flink 1.16, esta opción está disponible para todos los conectores que admiten internamente la búsqueda (FLIP-221). Al momento de escribir este artículo, JDBC, Colmenay HBase Los conectores admiten caché de búsqueda. La caché de búsqueda tiene tres modos disponibles: FULL, para un pequeño conjunto de datos que se puede mantener completamente en la memoria, PARTIAL, para un conjunto de datos grande, almacenar en caché solo los registros más recientes, o NONE, para desactivar completamente el caché. Para PARTIAL caché, también puede configurar el número de filas que se almacenarán en el búfer y el tiempo de vida.
  • búsqueda asíncrona es otra característica que puede mejorar enormemente el rendimiento. La búsqueda asíncrona proporciona en Apache Flink SQL una funcionalidad similar a E/S asíncrona disponible en la API DataStream. Permite a Apache Flink emitir nuevas solicitudes a la base de datos sin bloquear el hilo de procesamiento hasta que se hayan recibido respuestas a búsquedas anteriores. De manera similar a la E/S asíncrona, puede configurar la búsqueda asíncrona para imponer el orden o permitir resultados desordenados, o ajustar la capacidad del búfer y el tiempo de espera.
  • También puedes configurar un estrategia de reintento de búsqueda en combinación con PARTIAL or NONE caché de búsqueda, para configurar el comportamiento en caso de una búsqueda fallida en la base de datos externa.

Todos estos comportamientos se pueden controlar mediante un LOOKUP sugerencia, como en el siguiente ejemplo, donde mostramos una combinación de búsqueda mediante búsqueda asíncrona:

SELECT 
    /*+ LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered') */ 
    O.order_id, O.total, C.address
FROM Orders AS O 
JOIN Customers FOR SYSTEM_TIME AS OF O.proc_time AS C 
  ON O.customer_id = O.customer_id

PyFlink

En esta sección, analizamos nuevas mejoras y soporte en PyFlink.

Compatibilidad con Python 3.10

Las versiones más recientes de Apache Flink introdujeron varias mejoras para los usuarios de PyFlink. En primer lugar, ahora se admite Python 3.10 y se eliminó por completo el soporte de Python 3.6 (FLINK-29421). El servicio administrado para Apache Flink actualmente usa el tiempo de ejecución Python 3.10 para ejecutar aplicaciones PyFlink.

Cada vez más cerca de la paridad de características

Desde la perspectiva de la API de programación, PyFlink se está acercando a Java en cada versión. La API DataStream ahora admite funciones como salidas laterales y estado de transmisión, y se han cerrado las brechas en la API de ventanas. PyFlink ahora también admite nuevos conectores como Secuencias de datos de Amazon Kinesis directamente desde la API DataStream.

Mejoras en el modo hilo

PyFlink es muy eficiente. La sobrecarga de ejecutar operadores de API de Flink en PyFlink es mínima en comparación con Java o Scala, porque el tiempo de ejecución en realidad ejecuta la implementación del operador en la JVM directamente, independientemente del idioma de su aplicación. Pero cuando tienes una función definida por el usuario, las cosas son ligeramente diferentes. Una línea de código Python tan simple como lambda x: x + 1, o tan complejo como una función de Pandas, debe ejecutarse en un tiempo de ejecución de Python.

De forma predeterminada, Apache Flink ejecuta un tiempo de ejecución de Python en cada administrador de tareas, externo a la JVM. Cada registro se serializa, se entrega al tiempo de ejecución de Python mediante comunicación entre procesos, se deserializa y se procesa en el tiempo de ejecución de Python. Luego, el resultado se serializa y se devuelve a la JVM, donde se deserializa. Este es el PyFlink Modo PROCESO. Es muy estable pero introduce una sobrecarga y, en algunos casos, puede convertirse en un cuello de botella en el rendimiento.

Desde la versión 1.15, Apache Flink también admite modo HILO para PyFlink. En este modo, las funciones definidas por el usuario de Python se ejecutan dentro de la propia JVM, eliminando la serialización/deserialización y la sobrecarga de comunicación entre procesos. El modo THREAD tiene algunas limitaciones; por ejemplo, el modo THREAD no se puede utilizar para Pandas o UDAF (funciones agregadas definidas por el usuario, que constan de muchos registros de entrada y un registro de salida), pero puede mejorar sustancialmente el rendimiento de una aplicación PyFlink.

Con la versión 1.16, el soporte del modo THREAD se ha ampliado sustancialmente, cubriendo también la API Python DataStream.

El modo THREAD es compatible con Managed Service para Apache Flink y se puede habilitado directamente desde su aplicación PyFlink.

Soporte de Apple Silicon

Si utiliza máquinas basadas en Apple Silicon para desarrollar aplicaciones PyFlink, desarrollando para PyFlink 1.15, probablemente haya encontrado algunos de los problemas conocidos de dependencia de Python en Apple Silicon. Estos problemas finalmente se han resuelto (FLINK-25188). Estas limitaciones no afectaron las aplicaciones de PyFlink que se ejecutan en el servicio administrado para Apache Flink. Antes de la versión 1.16, si deseaba desarrollar una aplicación PyFlink en una máquina que usara el chipset M1, M2 o M3, tenía que usar algunos soluciones, porque era imposible instalar PyFlink 1.15 o una versión anterior directamente en la máquina.

Mejoras en los puntos de control no alineados

Apache Flink 1.15 ya admitía puntos de control incrementales y eliminación de búfer. Estas características se pueden utilizar, particularmente en combinación, para mejorar el rendimiento de los puntos de control, haciendo que la duración de los puntos de control sea más predecible, especialmente en presencia de contrapresión. Para obtener más información sobre estas funciones, consulte Optimice los puntos de control en su servicio administrado de Amazon para aplicaciones Apache Flink con eliminación de búfer y puntos de control no alineados..

Con las versiones 1.16 y 1.17 se han introducido varios cambios para mejorar la estabilidad y el rendimiento.

Manejo del sesgo de datos

Usos de Apache Flink marcas de agua para respaldar la semántica en el momento del evento. Las marcas de agua son registros especiales, normalmente inyectados en el flujo desde el operador de origen, que marcan el progreso del tiempo del evento para operadores como las agregaciones de ventanas de tiempo del evento. Una técnica común es retrasar las marcas de agua desde la última hora del evento observado, para permitir que los eventos estén desordenados, al menos hasta cierto punto.

Sin embargo, el uso de marcas de agua conlleva un desafío. Cuando la aplicación tiene múltiples fuentes, por ejemplo recibe eventos de múltiples particiones de un tema de Kafka, las marcas de agua se generan de forma independiente para cada partición. Internamente, cada operador siempre espera la misma marca de agua en todas las particiones de entrada, prácticamente alineándola en la partición más lenta. El inconveniente es que si una de las particiones no recibe datos, las marcas de agua no progresan, lo que aumenta la latencia de un extremo a otro. Por esta razón, un tiempo de espera de inactividad opcional se ha introducido en muchas fuentes de transmisión. Después del tiempo de espera configurado, la generación de marcas de agua ignora cualquier partición que no reciba ningún registro y las marcas de agua pueden progresar.

También puede enfrentar un desafío similar pero opuesto si una fuente recibe eventos mucho más rápido que las demás. Las marcas de agua están alineadas con la partición más lenta, lo que significa que cualquier agregación de ventanas esperará la marca de agua. Los registros de la fuente rápida tienen que esperar y almacenarse en el buffer. Esto puede dar como resultado el almacenamiento en búfer de un volumen excesivo de datos y un crecimiento incontrolable del estado del operador.

Para abordar el problema de las fuentes más rápidas, a partir de Apache Flink 1.17, puede habilitar la alineación de marca de agua de las divisiones de fuentes (FLINK-28853). Este mecanismo, deshabilitado de forma predeterminada, garantiza que ninguna partición progrese sus marcas de agua demasiado rápido, en comparación con otras particiones. Puede vincular varias fuentes, como varios temas de entrada, asignando el mismo ID de grupo de alineación y configurando la duración de la deriva máxima desde la marca de agua actual. Si una partición específica recibe eventos demasiado rápido, el operador de origen detiene el consumo de esa partición hasta que la deriva se reduce por debajo del umbral configurado.

Puede habilitarlo para cada fuente por separado. Todo lo que necesita es especificar un ID de grupo de alineación, que unirá todas las fuentes que tengan el mismo ID y la duración de la deriva máxima desde la marca de agua mínima actual. Esto detendrá el consumo de la subtarea de origen que avanza demasiado rápido, hasta que la deriva sea inferior al umbral especificado.

El siguiente fragmento de código muestra cómo se puede configurar la alineación de marcas de agua de divisiones de fuentes en una fuente Kafka que emite marcas de agua delimitadas y desordenadas:

KafkaSource<Event> kafkaSource = ...
DataStream<Event> stream = env.fromSource(
    kafkaSource,
    WatermarkStrategy.<Event>forBoundedOutOfOrderness( Duration.ofSeconds(20))
        .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1)),
    "Kafka source"));

Esta característica sólo está disponible con FLIP-217 fuentes compatibles, lo que admite la alineación de marcas de agua de las divisiones de fuentes. Al momento de escribir este artículo, entre los principales conectores de fuentes de transmisión, solo la fuente Kafka admite esta función.

Soporte directo para el formato Protobuf.

Las API de SQL y Table ahora admiten directamente formato protobuf. Para utilizar este formato, necesita generar las clases Java de Protobuf desde el .proto archivos de definición de esquema e inclúyalos como dependencias en su aplicación.

El formato Protobuf solo funciona con SQL y Table API y solo para leer o escribir datos serializados en Protobuf desde una fuente o un receptor. Actualmente, Flink no admite directamente que Protobuf serialice el estado directamente y no admite la evolución del esquema, como lo hace con Avro, Por ejemplo. Aún necesitas registrar un serializador personalizado con algunos gastos generales para su aplicación.

Mantener Apache Flink de código abierto

Apache Flink depende internamente de Akka para enviar datos entre subtareas. En 2022, Lightbend, la empresa detrás de Akka, anunció un cambio de licencia para futuras versiones de Akka, desde Apache 2.0 hasta una licencia más restrictiva, y que Akka 2.6, la versión utilizada por Apache Flink, no recibiría ninguna actualización o corrección de seguridad adicional.

Aunque Akka ha sido históricamente muy estable y no requiere actualizaciones frecuentes, este cambio de licencia representó un riesgo para el proyecto Apache Flink. La decisión de la comunidad Apache Flink fue reemplazar Akka con un fork de la versión 2.6, llamado apache pekko (FLINK-32468). Esta bifurcación conservará la licencia Apache 2.0 y recibirá las actualizaciones necesarias por parte de la comunidad. Mientras tanto, la comunidad Apache Flink considerará si eliminar por completo la dependencia de Akka o Pekko.

Compresión de estado

Apache Flink ofrece compresión opcional (predeterminada: desactivada) para todos los puntos de control y de guardado. Apache Flink identificó un error en Flink 1.18.1 donde el estado del operador no se pudo restaurar correctamente cuando la compresión de instantáneas está habilitada. Esto podría provocar la pérdida de datos o la imposibilidad de restaurarlos desde el punto de control. Para resolver esto, Managed Service para Apache Flink ha respaldado el reparar que se incluirá en futuras versiones de Apache Flink.

Actualizaciones de versiones locales con servicio administrado para Apache Flink

Si actualmente está ejecutando una aplicación en Managed Service para Apache Flink usando Apache Flink 1.15 o anterior, ahora puede actualizarla localmente a 1.18 sin perder el estado, usando el Interfaz de línea de comandos de AWS (CLI de AWS), Formación en la nube de AWS or Kit de desarrollo en la nube de AWS (AWS CDK), o cualquier herramienta que utilice la API de AWS.

El ActualizarAplicación La acción API ahora admite la actualización de la versión de tiempo de ejecución de Apache Flink de una aplicación de servicio administrado para Apache Flink existente. Puede utilizar UpdateApplication directamente en una aplicación en ejecución.

Antes de continuar con la actualización local, debe verificar y actualizar las dependencias incluidas en su aplicación, asegurándose de que sean compatibles con la nueva versión de Apache Flink. En particular, debe actualizar cualquier biblioteca, conectores y posiblemente la versión de Scala de Apache Flink.

Además, recomendamos probar la aplicación actualizada antes de continuar con la actualización. Recomendamos realizar pruebas localmente y en un entorno que no sea de producción, utilizando la versión de tiempo de ejecución de Apache Flink de destino, para garantizar que no se hayan introducido regresiones.

Y finalmente, si su aplicación tiene estado, le recomendamos realizar una instantánea del estado de la aplicación en ejecución. Esto le permitirá volver a la versión anterior de la aplicación.

Cuando esté listo, ahora puede usar el ActualizarAplicación Acción API o aplicación de actualización Comando de AWS CLI para actualizar la versión en tiempo de ejecución de la aplicación y apuntarla al nuevo artefacto de la aplicación, JAR o archivo zip, con las dependencias actualizadas.

Para obtener información más detallada sobre el proceso y la API, consulte Actualización de la versión local para Apache Flink. La documentación incluye instrucciones paso a paso y un vídeo para guiarle a través del proceso de actualización.

Conclusiones

En esta publicación, examinamos algunas de las nuevas características de Apache Flink, compatibles con Amazon Managed Service para Apache Flink. Esta lista no es exhaustiva. Apache Flink también introdujo algunas características muy prometedoras, como TTL a nivel de operador para SQL y Table API [FLIP-292] y Viaje en el tiempo [FLIP-308], pero aún no son compatibles con la API y todavía no son accesibles para los usuarios. Por este motivo decidimos no abordarlos en este post.

Con el soporte de Apache Flink 1.18, el servicio administrado para Apache Flink ahora admite la última versión lanzada de Apache Flink. Hemos visto algunas de las nuevas características interesantes y nuevos conectores disponibles con Apache Flink 1.18 y cómo el Servicio Administrado para Apache Flink lo ayuda a actualizar una aplicación existente.

Puede encontrar más detalles sobre los lanzamientos recientes en el blog de Apache Flink y en las notas de la versión:

Si es nuevo en Apache Flink, le recomendamos nuestro Guía para elegir la API y el idioma correctos y siguiendo el consulte la guía de inicio para comenzar a utilizar el servicio administrado para Apache Flink.


Acerca de los autores

lorenzo nicaralorenzo nicara Trabaja como arquitecto sénior de soluciones de streaming en AWS y ayuda a los clientes de toda EMEA. Lleva más de 25 años creando sistemas nativos de la nube con uso intensivo de datos, trabajando en la industria financiera a través de consultorías y para empresas de productos FinTech. Ha aprovechado ampliamente las tecnologías de código abierto y contribuido a varios proyectos, incluido Apache Flink.

francisco morillofrancisco morillo es arquitecto de soluciones de streaming en AWS. Francisco trabaja con clientes de AWS, ayudándolos a diseñar arquitecturas de análisis en tiempo real utilizando los servicios de AWS, brindando soporte a Amazon MSK y Amazon Managed Service para Apache Flink.

punto_img

Información más reciente

punto_img