Logotipo de Zephyrnet

Una comparación en paralelo de Apache Spark y Apache Flink para casos de uso comunes de transmisión | Servicios web de Amazon

Fecha:

Apache Flink y Apache Spark son marcos de procesamiento de datos distribuidos de código abierto que se utilizan ampliamente para el procesamiento y análisis de big data. Spark es conocido por su facilidad de uso, API de alto nivel y la capacidad de procesar grandes cantidades de datos. Flink destaca por su capacidad para manejar el procesamiento de flujos de datos en tiempo real y cálculos con estado de baja latencia. Ambos admiten una variedad de lenguajes de programación, soluciones escalables para manejar grandes cantidades de datos y una amplia gama de conectores. Históricamente, Spark comenzó como un marco de trabajo por lotes y Flink comenzó como un marco de transmisión.

En esta publicación, compartimos un estudio comparativo de patrones de transmisión que se usan comúnmente para crear aplicaciones de procesamiento de transmisión, cómo se pueden resolver con Spark (principalmente Spark Structured Streaming) y Flink, y las variaciones menores en su enfoque. Los ejemplos cubren fragmentos de código en Python y SQL para ambos marcos en tres temas principales: preparación de datos, procesamiento de datos y enriquecimiento de datos. Si es un usuario de Spark que busca resolver sus casos de uso de procesamiento de transmisiones con Flink, esta publicación es para usted. No pretendemos cubrir la elección de tecnología entre Spark y Flink porque es importante evaluar ambos marcos para su carga de trabajo específica y cómo encaja la elección en su arquitectura; más bien, esta publicación destaca las diferencias clave para los casos de uso en los que ambas tecnologías se consideran comúnmente.

Ofertas Apache Flink API en capas que ofrecen diferentes niveles de expresividad y control y están diseñados para apuntar a diferentes tipos de casos de uso. Las tres capas de la API son funciones de proceso (también conocidas como API de procesamiento de flujo de estado), flujo de datos y tabla y SQL. La API Stateful Stream Processing requiere escribir código detallado pero ofrece el mayor control sobre el tiempo y el estado, que son conceptos centrales en el procesamiento de flujo con estado. La API de DataStream es compatible con Java, Scala y Python y ofrece primitivas para muchas operaciones comunes de procesamiento de secuencias, así como un equilibrio entre la verbosidad del código o la expresividad y el control. Las API Table y SQL son API relacionales que ofrecen soporte para Java, Scala, Python y SQL. Ofrecen la mayor abstracción y un control declarativo intuitivo similar a SQL sobre los flujos de datos. Flink también permite una transición y un cambio sin inconvenientes entre estas API. Para obtener más información sobre las API en capas de Flink, consulte API en capas.

Apache Spark Structured Streaming ofrece las API Dataset y DataFrames, que proporcionan API de transmisión declarativa de alto nivel para representar datos estáticos y limitados, así como datos de transmisión sin límites. Las operaciones son compatibles con Scala, Java, Python y R. Spark tiene un conjunto de funciones y una sintaxis enriquecidos con construcciones simples para selección, agregación, creación de ventanas, uniones y más. También puede usar Streaming Table API para leer tablas como DataFrames de transmisión como una extensión de DataFrames API. Aunque es difícil establecer paralelismos directos entre Flink y Spark en todas las construcciones de procesamiento de transmisión, en un nivel muy alto, podríamos decir que las API de transmisión estructurada de Spark son equivalentes a las API de tabla y SQL de Flink. Spark Structured Streaming, sin embargo, aún no ofrece (en el momento de escribir este artículo) un equivalente a las API de nivel inferior en Flink que ofrecen control granular de tiempo y estado.

Tanto Flink como Spark Structured Streaming (en adelante, Spark) son proyectos en evolución. La siguiente tabla proporciona una comparación simple de las capacidades de Flink y Spark para las primitivas de transmisión comunes (al momento de escribir este artículo).

. Flink Spark
Procesamiento basado en filas
Funciones definidas por el usuario
Acceso detallado al estado Sí, a través de DataStream y API de bajo nivel No
Controlar cuándo ocurre el desalojo estatal Sí, a través de DataStream y API de bajo nivel No
Estructuras de datos flexibles para almacenamiento de estado y consultas Sí, a través de DataStream y API de bajo nivel No
Temporizadores para procesamiento y operaciones con estado Sí, a través de API de bajo nivel No

En las siguientes secciones, cubrimos los principales factores comunes para que podamos mostrar cómo los usuarios de Spark pueden relacionarse con Flink y viceversa. Para obtener más información sobre las API de bajo nivel de Flink, consulte Función de proceso. En aras de la simplicidad, cubrimos los cuatro casos de uso en esta publicación utilizando la Flink Table API. Usamos una combinación de Python y SQL para una comparación de manzanas con manzanas con Spark.

Preparación de datos

En esta sección, comparamos los métodos de preparación de datos para Spark y Flink.

Lectura de datos

Primero observamos las formas más simples de leer datos de un flujo de datos. Las siguientes secciones asumen el siguiente esquema para los mensajes:

symbol: string,
price: int,
timestamp: timestamp,
company_info:
{ name: string, employees_count: int
}

Lectura de datos de una fuente en Spark Structured Streaming

En Spark Structured Streaming, usamos un DataFrame de streaming en Python que lee directamente los datos en formato JSON:

spark = ... # spark session # specify schema
stock_ticker_schema = ... # Create a streaming DataFrame
df = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "mybroker1:port") .option("topic", "stock_ticker") .load() .select(from_json(col("value"), stock_ticker_schema).alias("ticker_data")) .select(col("ticker_data.*"))

Tenga en cuenta que tenemos que proporcionar un objeto de esquema que capture nuestro esquema de cotizaciones bursátiles (stock_ticker_schema). Compare esto con el enfoque de Flink en la siguiente sección.

Lectura de datos de una fuente usando Flink Table API

Para Flink, usamos la instrucción SQL DDL CREATE TABLE. Puede especificar el esquema de la secuencia como lo haría con cualquier tabla SQL. La cláusula WITH nos permite especificar el conector para el flujo de datos (Kafka en este caso), las propiedades asociadas para el conector y las especificaciones de formato de datos. Ver el siguiente código:

# Create table using DDL CREATE TABLE stock_ticker ( symbol string, price INT, timestamp TIMESTAMP(3), company_info STRING, WATERMARK FOR timestamp AS timestamp - INTERVAL '3' MINUTE
) WITH ( 'connector' = 'kafka', 'topic' = 'stock_ticker', 'properties.bootstrap.servers' = 'mybroker1:port', 'properties.group.id' = 'testGroup', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true'
)

Aplanamiento de JSON

El aplanamiento de JSON es el proceso de convertir un objeto JSON anidado o jerárquico en una estructura plana de un solo nivel. Esto convierte múltiples niveles de anidamiento en un objeto donde todas las claves y valores están en el mismo nivel. Las claves se combinan mediante un delimitador como un punto (.) o un guión bajo (_) para indicar la jerarquía original. El aplanamiento de JSON es útil cuando necesita trabajar con un formato más simplificado. Tanto en Spark como en Flink, los JSON anidados pueden ser complicados para trabajar y pueden necesitar procesamiento adicional o funciones definidas por el usuario para manipular. Los JSON aplanados pueden simplificar el procesamiento y mejorar el rendimiento debido a la reducción de la sobrecarga computacional, especialmente con operaciones como uniones complejas, agregaciones y ventanas. Además, los JSON aplanados pueden ayudar a depurar y solucionar problemas de canalizaciones de procesamiento de datos más fácilmente porque hay menos niveles de anidamiento para navegar.

Aplanamiento de JSON en Spark Structured Streaming

El aplanamiento de JSON en Spark Structured Streaming requiere que use el método de selección y especifique el esquema que necesita aplanar. El aplanamiento de JSON en Spark Structured Streaming implica especificar el nombre del campo anidado que desea mostrar en la lista de campos de nivel superior. En el siguiente ejemplo, company_info es un campo anidado y dentro company_info, hay un campo llamado company_name. Con la siguiente consulta, estamos aplanando company_info.name a company_name:

stock_ticker_df = ... # Streaming DataFrame w/ schema shown above stock_ticker_df.select("symbol", "timestamp", "price", "company_info.name" as "company_name")

Aplanamiento de JSON en Flink

En Flink SQL, puede usar la función JSON_VALUE. Tenga en cuenta que puede usar esta función solo en versiones de Flink iguales o superiores a 1.14. Ver el siguiente código:

SELECT symbol, timestamp, price, JSON_VALUE(company_info, 'lax $.name' DEFAULT NULL ON EMPTY) AS company_name
FROM stock_ticker

El término laxo en la consulta anterior tiene que ver con el manejo de expresiones de ruta JSON en Flink SQL. Para obtener más información, consulte Funciones del sistema (integradas).

Proceso de datos

Ahora que ha leído los datos, podemos ver algunos patrones comunes de procesamiento de datos.

Deduplicación

La deduplicación de datos en el procesamiento de flujo es crucial para mantener la calidad de los datos y garantizar la coherencia. Mejora la eficiencia al reducir la tensión en el procesamiento de datos duplicados y ayuda a ahorrar costos en almacenamiento y procesamiento.

Consulta de deduplicación de Spark Streaming

El siguiente fragmento de código está relacionado con un Spark Streaming DataFrame llamado stock_ticker. El código realiza una operación para descartar filas duplicadas según el symbol columna. El método dropDuplicates se usa para eliminar filas duplicadas en un DataFrame basado en una o más columnas.

stock_ticker = ... # Streaming DataFrame w/ schema shown above stock_ticker.dropDuplicates("symbol")

Consulta de deduplicación de Flink

El código siguiente muestra el equivalente de Flink SQL para deduplicar datos en función de la symbol columna. La consulta recupera la primera fila para cada valor distinto en el symbol columna de la stock_ticker flujo, basado en el orden ascendente de proctime:

SELECT symbol, timestamp, price
FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY proctime ASC) AS row_num FROM stock_ticker)
WHERE row_num = 1

Ventanas

La creación de ventanas en la transmisión de datos es una construcción fundamental para procesar los datos dentro de las especificaciones. Las ventanas suelen tener límites de tiempo, número de registros u otros criterios. Estos límites de tiempo dividen flujos de datos continuos e ilimitados en fragmentos manejables llamados ventanas para procesar. Windows ayuda a analizar datos y obtener información en tiempo real mientras mantiene la eficiencia del procesamiento. Los análisis u operaciones se realizan en la actualización constante de datos de transmisión dentro de una ventana.

Hay dos ventanas comunes basadas en el tiempo que se usan tanto en Transmisión de chispa y Flink que detallaremos en este post: ventanas abatibles y correderas. A ventana que cae es una ventana basada en el tiempo que tiene un tamaño fijo y no tiene intervalos superpuestos. A ventana deslizante es una ventana basada en el tiempo que tiene un tamaño fijo y avanza en intervalos fijos que pueden superponerse.

Consulta de ventana giratoria de Spark Streaming

La siguiente es una consulta de ventana giratoria de Spark Streaming con un tamaño de ventana de 10 minutos:

stock_ticker = ... # Streaming DataFrame w/ schema shown above # Get max stock price in tumbling window
# of size 10 minutes
visitsByWindowAndUser = visits .withWatermark("timestamp", "3 minutes") .groupBy( window(stock_ticker.timestamp, "10 minutes"), stock_ticker.symbol) .max(stock_ticker.price)

Consulta de ventana giratoria de Flink Streaming

La siguiente es una consulta de ventana de volteo equivalente en Flink con un tamaño de ventana de 10 minutos:

SELECT symbol, MAX(price) FROM TABLE( TUMBLE(TABLE stock_ticker, DESCRIPTOR(timestamp), INTERVAL '10' MINUTES)) GROUP BY ticker;

Consulta de ventana deslizante de Spark Streaming

La siguiente es una consulta de ventana deslizante de Spark Streaming con un tamaño de ventana de 10 minutos y un intervalo de deslizamiento de 5 minutos:

stock_ticker = ... # Streaming DataFrame w/ schema shown above # Get max stock price in sliding window
# of size 10 minutes and slide interval of size
# 5 minutes visitsByWindowAndUser = visits .withWatermark("timestamp", "3 minutes") .groupBy( window(stock_ticker.timestamp, "10 minutes", "5 minutes"), stock_ticker.symbol) .max(stock_ticker.price)

Consulta de ventana deslizante de Flink Streaming

La siguiente es una consulta de ventana deslizante de Flink con un tamaño de ventana de 10 minutos y un intervalo de deslizamiento de 5 minutos:

SELECT symbol, MAX(price) FROM TABLE( HOP(TABLE stock_ticker, DESCRIPTOR(timestamp), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)) GROUP BY ticker;

Manejo de datos atrasados

Compatibilidad con Spark Structured Streaming y Flink procesamiento de tiempo de evento, donde se puede usar un campo dentro de la carga útil para definir ventanas de tiempo distintas de la hora del reloj de pared de las máquinas que realizan el procesamiento. Tanto Flink como Spark usan marcas de agua para este propósito.

La marca de agua se utiliza en los motores de procesamiento de secuencias para gestionar los retrasos. Una marca de agua es como un temporizador que establece cuánto tiempo puede esperar el sistema por eventos tardíos. Si llega un evento y está dentro del tiempo establecido (marca de agua), el sistema lo utilizará para actualizar una solicitud. Si es posterior a la marca de agua, el sistema la ignorará.

En las consultas de ventanas anteriores, especifica el umbral de retraso en Spark con el siguiente código:

.withWatermark("timestamp", "3 minutes")

Esto significa que se descartarán todos los registros que se retrasen 3 minutos según el seguimiento del reloj de tiempo del evento.

Por el contrario, con Flink Table API, puede especificar un umbral de retraso análogo directamente en el DDL:

WATERMARK FOR timestamp AS timestamp - INTERVAL '3' MINUTE

Tenga en cuenta que Flink proporciona construcciones adicionales para especificar el retraso en sus diversas API.

Enriquecimiento de datos

En esta sección, comparamos los métodos de enriquecimiento de datos con Spark y Flink.

Llamar a una API externa

Llamar a API externas desde funciones definidas por el usuario (UDF) es similar en Spark y Flink. Tenga en cuenta que se llamará a su UDF para cada registro procesado, lo que puede resultar en que se llame a la API a una tasa de solicitud muy alta. Además, en escenarios de producción, su código UDF a menudo se ejecuta en paralelo en varios nodos, lo que amplifica aún más la tasa de solicitudes.

Para los siguientes fragmentos de código, supongamos que la llamada a la API externa implica llamar a la función:

response = my_external_api(request)

Llamada API externa en Spark UDF

El siguiente código usa Spark:

class Predict(ScalarFunction):
def open(self, function_context): with open("resources.zip/resources/model.pkl", "rb") as f:
self.model = pickle.load(f) def eval(self, x):
return self.model.predict(x)

Llamada API externa en Flink UDF

Para Flink, supongamos que definimos el UDF callExternalAPIUDF, que toma como entrada el símbolo del símbolo de teletipo y devuelve información enriquecida sobre el símbolo a través de un punto final REST. entonces podemos registrarte y llamar a la UDF de la siguiente manera:

callExternalAPIUDF = udf(callExternalAPIUDF(), result_type=DataTypes.STRING()) SELECT symbol, callExternalAPIUDF(symbol) as enriched_symbol
FROM stock_ticker;

Las UDF de Flink proporcionan un método de inicialización que se ejecuta una vez (a diferencia de una vez por registro procesado).

Tenga en cuenta que debe usar los UDF con prudencia, ya que un UDF implementado incorrectamente puede hacer que su trabajo se ralentice, cause contrapresión y, finalmente, detenga su aplicación de procesamiento de flujo. Es recomendable utilizar UDF de forma asíncrona para mantener un alto rendimiento, especialmente para casos de uso vinculados a E/S o cuando se trata de recursos externos como bases de datos o API REST. Para obtener más información sobre cómo puede usar la E/S asíncrona con Apache Flink, consulte Enriquezca su flujo de datos de forma asíncrona con Amazon Kinesis Data Analytics para Apache Flink.

Conclusión

Apache Flink y Apache Spark son proyectos que evolucionan rápidamente y brindan una forma rápida y eficiente de procesar big data. Esta publicación se centró en los principales casos de uso que encontramos comúnmente cuando los clientes querían ver paralelismos entre las dos tecnologías para crear aplicaciones de procesamiento de flujo en tiempo real. Hemos incluido las muestras que se solicitaron con más frecuencia en el momento de escribir este artículo. Háganos saber si desea más ejemplos en la sección de comentarios.


Acerca del autor.

Deepthi Mohan es gerente principal de productos en el equipo de análisis de datos de Amazon Kinesis.

Karthi Thyagarajan fue arquitecto principal de soluciones en el equipo de Amazon Kinesis.

punto_img

Información más reciente

punto_img