Logotipo de Zephyrnet

Detección de anomalías de series temporales en tiempo real para aplicaciones de streaming en Amazon Kinesis Data Analytics | Servicios web de Amazon

Fecha:

La detección de anomalías en tiempo real de flujos de alto rendimiento es clave para informar sobre decisiones oportunas para adaptarse y responder a escenarios inesperados. Marcos de procesamiento de flujo como Apache Flink empoderar a los usuarios para diseñar sistemas que puedan ingerir y procesar flujos continuos de datos a escala. En esta publicación, presentamos un algoritmo de detección de anomalías de series temporales de transmisión basado en perfiles de matriz y discordancias izquierdas, inspirado en Lu et al., 2022, con Apache Flink, y proporcione un ejemplo de trabajo que lo ayudará a comenzar con una solución administrada de Apache Flink usando Análisis de datos de Amazon Kinesis.

Desafíos de la detección de anomalías

La detección de anomalías juega un papel clave en una variedad de aplicaciones del mundo real, como detección de fraude, análisis de ventas, ciberseguridad, mantenimiento predictivo y detección de fallas, entre otras. La mayoría de estos casos de uso requieren que se tomen medidas casi en tiempo real. Por ejemplo, las redes de pago con tarjeta deben poder identificar y rechazar transacciones potencialmente fraudulentas antes de procesarlas. Esto plantea el desafío de diseñar sistemas de detección de anomalías casi en tiempo real que puedan escalar a flujos de datos de llegada ultrarrápida.

Otro desafío clave al que se enfrentan los sistemas de detección de anomalías es deriva del concepto. La naturaleza cambiante de algunos casos de uso requiere que los modelos se adapten dinámicamente a nuevos escenarios. Por ejemplo, en un escenario de mantenimiento predictivo, podría usar varios dispositivos de Internet de las cosas (IoT) para monitorear las vibraciones producidas por un motor eléctrico con el objetivo de detectar anomalías y prevenir daños irreparables. Los sonidos emitidos por las vibraciones del motor pueden variar significativamente con el tiempo debido a diferentes condiciones ambientales, como variaciones de temperatura, y este cambio en el patrón puede invalidar el modelo. Esta clase de escenarios crea la necesidad de aprendizaje en línea: la capacidad del modelo para aprender continuamente de nuevos datos.

Detección de anomalías de series temporales

Las series de tiempo son una clase particular de datos que incorporan el tiempo en su estructuración. Los puntos de datos que caracterizan una serie temporal se registran de manera ordenada y son de naturaleza cronológica. Esta clase de datos está presente en todas las industrias y es común en el núcleo de muchos requisitos comerciales o indicadores clave de rendimiento (KPI). Las fuentes naturales de datos de series temporales incluyen transacciones con tarjetas de crédito, ventas, mediciones de sensores, registros de máquinas y análisis de usuarios.

En el dominio de las series de tiempo, una anomalía puede definirse como una desviación de los patrones esperados que caracterizan la serie temporal. Por ejemplo, una serie de tiempo se puede caracterizar por sus rangos esperados, tendencias, patrones estacionales o cíclicos. Cualquier alteración significativa de este flujo normal de puntos de datos se considera una anomalía.

La detección de anomalías puede ser más o menos desafiante según el dominio. Por ejemplo, un enfoque basado en umbrales podría ser adecuado para series de tiempo que están informadas de sus rangos esperados, como la temperatura de trabajo de una máquina o la utilización de la CPU. Por otro lado, las aplicaciones como la detección de fraudes, la seguridad cibernética y el mantenimiento predictivo no se pueden clasificar a través de enfoques simples basados ​​en reglas y requieren un mecanismo más detallado para capturar observaciones inesperadas. Gracias a su configuración paralelizable y basada en eventos, los motores de transmisión como Apache Flink brindan un entorno excelente para escalar la detección de anomalías en tiempo real a flujos de datos de llegada rápida.

Resumen de la solución

Apache Flink es un motor de procesamiento distribuido para cálculos con estado sobre flujos. Un programa Flink se puede implementar en Java, Scala o Python. Admite la ingesta, la manipulación y la entrega de datos a los destinos deseados. Kinesis Data Analytics le permite ejecutar aplicaciones de Flink en un entorno completamente administrado en AWS.

La detección de anomalías basada en la distancia es un enfoque popular en el que un modelo se caracteriza por una serie de puntos de datos almacenados internamente que se utilizan para compararlos con los nuevos puntos de datos entrantes. En el momento de la inferencia, estos métodos calculan las distancias y clasifican los nuevos puntos de datos según cuán diferentes son de las observaciones anteriores. A pesar de la plétora de algoritmos en la literatura, cada vez hay más pruebas de que los algoritmos de detección de anomalías basados ​​en la distancia siguen siendo competitivos con el estado del arte (Nakamura y otros, 2020).

En esta publicación, presentamos una versión de transmisión de un algoritmo de detección de anomalías no supervisado basado en la distancia llamado discordias de la serie de tiempoy explore algunas de las optimizaciones introducidas por el algoritmo Discord Aware Matrix Profile (DAMP) (Lu et al., 2022), que desarrolla aún más el método discords para escalar a billones de puntos de datos.

Entendiendo el algoritmo

A discordia izquierda es una subsecuencia que es significativamente diferente de todas las subsecuencias que la preceden. En esta publicación, demostramos cómo usar el concepto de discordia izquierda para identificar anomalías de series de tiempo en flujos usando Kinesis Data Analytics para Apache Flink.

Consideremos un flujo ilimitado y todas sus subsecuencias de longitud n. m las subsecuencias más recientes se almacenarán y utilizarán para la inferencia. Cuando llega un nuevo punto de datos, se forma una nueva subsecuencia que incluye el nuevo evento. El algoritmo compara esta última subsecuencia (consulta) con la m subsecuencias retenidas del modelo, con la exclusión de la última n subsecuencias porque se superponen con la consulta y, por lo tanto, caracterizarían una auto-coincidencia. Después de calcular estas distancias, el algoritmo clasifica la consulta como una anomalía si su distancia desde su punto más cercano no coincidente subsecuencia está por encima de un determinado umbral de movimiento.

Para esta publicación, usamos un flujo de datos de Kinesis para ingerir los datos de entrada, un Análisis de datos de Kinesis aplicación para ejecutar el programa de detección de anomalías Flink y otro flujo de datos de Kinesis para ingerir la salida producida por su aplicación. Con fines de visualización, consumimos del flujo de salida mediante Kinesis Data Analytics Studio, que proporciona una Cuaderno Apache Zepelín que utilizamos para visualizar e interactuar con los datos en tiempo real.

Detalles de implementacion

El código de la aplicación Java para este ejemplo es disponible en GitHub. Para descargar el código de la aplicación, complete los siguientes pasos:

  1. Clone el repositorio remoto usando el siguiente comando:
    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples

  2. Navegue hasta la amazon-kinesis-data-analytics-java-examples/AnomalyDetection/LeftDiscords directorio:

Veamos el código paso a paso.

El MPStreamingJob clase define el flujo de datos de la aplicación, y el MPProcessFunction La clase define la lógica de la función que detecta anomalías.

La implementación se describe mejor mediante tres componentes principales:

  • El origen del flujo de datos de Kinesis, utilizado para leer desde el flujo de entrada
  • La función del proceso de detección de anomalías
  • El sumidero del flujo de datos de Kinesis, que se usa para enviar la salida al flujo de salida

La función de detección de anomalías se implementa como un ProcessFunction<String, String>. Su método MPProcessFunction#processElement se llama para cada punto de datos:

@Override
public void processElement(String dataPoint, ProcessFunction<String, OutputWithLabel>.Context context,
                            Collector<OutputWithLabel> collector) {    Double record = Double.parseDouble(dataPoint);    int currentIndex = timeSeriesData.add(record);    Double minDistance = 0.0;
   String anomalyTag = "INITIALISING";    if (timeSeriesData.readyToCompute()) {
       minDistance = timeSeriesData.computeNearestNeighbourDistance();
       threshold.update(minDistance);
   }    /*
   * Algorithm will wait for initializationPeriods * sequenceLength data points until starting
   * to compute the Matrix Profile (MP).
   */
   if (timeSeriesData.readyToInfer()) {
       anomalyTag = minDistance > threshold.getThreshold() ? "IS_ANOMALY" : "IS_NOT_ANOMALY";
   }    OutputWithLabel output = new OutputWithLabel(currentIndex, record, minDistance, anomalyTag);    collector.collect(output);
}

Para cada punto de datos entrante, el algoritmo de detección de anomalías realiza las siguientes acciones:

  1. Agrega el registro a la timeSeriesData.
  2. Si ha observado al menos 2 * sequenceLength puntos de datos, comienza a calcular el perfil de la matriz.
  3. Si ha observado al menos initializationPeriods * sequenceLength puntos de datos, comienza a generar etiquetas de anomalías.

Tras estas acciones, el MPProcessFunction la función genera un OutputWithLabel objeto con cuatro atributos:

  • índice – El índice del punto de datos en la serie temporal
  • Las opciones de entrada – Los datos de entrada sin ninguna transformación (función de identidad)
  • mp – La distancia a la subsecuencia no autoajustable más cercana para la subsecuencia que termina en índice
  • etiqueta de anomalía – Una etiqueta binaria que indica si la subsecuencia es una anomalía

En la implementación provista, el umbral se aprende en línea ajustando una distribución normal a los datos del perfil de la matriz:

/*
 * Computes the threshold as two standard deviations away from the mean (p = 0.02)
 *
 * @return an estimated threshold
 */
public Double getThreshold() {
   Double mean = sum/counter;    return mean + 2 * Math.sqrt(squaredSum/counter - mean*mean);
}

En este ejemplo, el algoritmo clasifica como anomalías aquellas subsecuencias cuya distancia de su vecino más cercano se desvía significativamente de la distancia mínima promedio (más de dos desviaciones estándar de la media).

El TimeSeries La clase implementa la estructura de datos que conserva la ventana de contexto, es decir, los registros almacenados internamente que se utilizan para la comparación con los nuevos registros entrantes. En la implementación provista, el n se conservan los registros más recientes, y cuando el TimeSeries el objeto está al máximo de su capacidad, se anulan los registros más antiguos.

Requisitos previos

Antes de crear una aplicación de Kinesis Data Analytics para este ejercicio, cree dos flujos de datos de Kinesis: InputStream y OutputStream in us-east-1. La aplicación Flink utilizará estos flujos como sus respectivos flujos de origen y destino. Para crear estos recursos, inicie lo siguiente Formación en la nube de AWS apilar:

Pila de lanzamiento

Alternativamente, siga las instrucciones en Creación y actualización de flujos de datos.

Crea la aplicación

Para crear su aplicación, complete los siguientes pasos:

  1. Clone el repositorio remoto usando el siguiente comando:
    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples 

  2. Navegue hasta la amazon-kinesis-data-analytics-java-examples/AnomalyDetection/LeftDiscords/core directorio.
    cd amazon-kinesis-data-analytics-java-examples/AnomalyDetection/LeftDiscords/core

  3. Cree su archivo JAR ejecutando el siguiente comando de Maven en el directorio central, que contiene el pom.xml archivo:
    mvn package -Dflink.version=1.15.4
  4. Crear una Servicio de almacenamiento simple de Amazon (Amazon S3) depósito y cargue el archivo target/left-discords-1.0.0.jar.
  5. Cree y ejecute una aplicación de Kinesis Data Analytics como se describe en Crear y ejecutar la aplicación Kinesis Data Analytics:
    1. Ingrese al target/left-discords-1.0.0.jar.
    2. Tenga en cuenta que los flujos de entrada y salida se llaman InputStream y OutputStream, respectivamente.
    3. El ejemplo proporcionado está configurado para ejecutarse en us-east-1.

Rellenar el flujo de entrada

Puedes poblar InputStream ejecutando el script.py archivo del repositorio clonado, usando el comando python script.py. Al editar las dos últimas líneas, puede completar la secuencia con datos sintéticos o con datos reales de un conjunto de datos CSV.

Visualice datos en Kinesis Data Analytics Studio

Kinesis Data Analytics Studio proporciona la configuración perfecta para observar datos en tiempo real. La siguiente captura de pantalla muestra visualizaciones de muestra. El primer gráfico muestra los datos de series temporales entrantes, el segundo gráfico muestra el perfil de la matriz y el tercero muestra qué puntos de datos se han clasificado como anomalías.

Para visualizar los datos, complete los siguientes pasos:

  1. crear un cuaderno.
  2. Agregue los siguientes párrafos a la nota de Zeppelin:

Cree una tabla y defina la forma de los registros generados por la aplicación:

%flink.ssql CREATE TABLE data (
index INT,
input VARCHAR(6),
mp VARCHAR(6),
anomalyTag VARCHAR(20)
)
PARTITIONED BY (index)
WITH ( 'connector' = 'kinesis', 'stream' = 'OutputStream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'
)

Visualice los datos de entrada (elija Gráfico de líneas desde las opciones de visualización):

%flink.ssql(type=update) SELECT index, input FROM data;

Visualice los datos del perfil de la matriz de salida (elija Gráfico de dispersión desde las opciones de visualización):

%flink.ssql(type=update) SELECT index, mp FROM data;

Visualice los datos etiquetados (elija Gráfico de dispersión desde las opciones de visualización):

%flink.ssql(type=update) SELECT index, anomalyTag FROM data;

Limpiar

Para eliminar todos los recursos que creó, siga las instrucciones en Limpie los recursos de AWS.

Desarrollos futuros

En esta sección, discutimos los desarrollos futuros para esta solución.

Optimizar para velocidad

El algoritmo de discords de series temporales en línea se desarrolla y optimiza aún más para la velocidad en Lu et al., 2022. Las optimizaciones propuestas incluyen:

  • Parada anticipada – Si el algoritmo encuentra una subsecuencia lo suficientemente similar (por debajo del umbral), deja de buscar y marca la consulta como no anómala.
  • Ventana de anticipación – Mire una cierta cantidad de datos en el futuro y compárelos con la consulta actual para descubrir y eliminar de manera económica las subsecuencias futuras que no se pueden dejar: discordias. Tenga en cuenta que esto introduce algún retraso. La razón por la que la descalificación mejora el rendimiento es que es más probable que los puntos de datos cercanos en el tiempo sean similares que los puntos de datos distantes en el tiempo.
  • Uso de MASA - El MASS (Algoritmo de Mueen para búsqueda de similitud) algoritmo de búsqueda está diseñado para descubrir de manera eficiente la subsecuencia más similar en el pasado.

Paralelización

El algoritmo anterior opera con paralelismo 1, lo que significa que cuando un solo trabajador es suficiente para manejar el rendimiento del flujo de datos, el algoritmo anterior se puede usar directamente. Este diseño se puede mejorar con más lógica de distribución para manejar escenarios de alto rendimiento. Para paralelizar este algoritmo, puede diseñar un operador de partición que asegure que los operadores de detección de anomalías tengan a su disposición los puntos de datos pasados ​​relevantes. El algoritmo puede mantener un conjunto de los registros más recientes con los que compara la consulta. Es interesante explorar las ventajas y desventajas de la eficiencia y la precisión de las soluciones aproximadas. Dado que la mejor solución para paralelizar el algoritmo depende en gran medida de la naturaleza de los datos, recomendamos experimentar con varios enfoques utilizando su conocimiento específico del dominio.

Conclusión

En esta publicación, presentamos una versión de transmisión de un algoritmo de detección de anomalías basado en discordias de izquierda. Al implementar esta solución, aprendió a implementar una solución de detección de anomalías basada en Apache Flink en Kinesis Data Analytics y exploró el potencial de Kinesis Data Analytics Studio para visualizar e interactuar con la transmisión de datos en tiempo real. Para obtener más detalles sobre cómo implementar soluciones de detección de anomalías en Apache Flink, consulte el Repositorio GitHub que acompaña esta publicación. Para obtener más información sobre Kinesis Data Analytics y Apache Flink, explore la Guía para desarrolladores de análisis de datos de Amazon Kinesis.

Pruébelo y comparta sus comentarios en la sección de comentarios.


Acerca de los autores

antonio vespoli es ingeniero de desarrollo de software en AWS. Trabaja en Amazon Kinesis Data Analytics, la oferta administrada para ejecutar aplicaciones Apache Flink en AWS.

Samuel Siebenmann es ingeniero de desarrollo de software en AWS. Trabaja en Amazon Kinesis Data Analytics, la oferta administrada para ejecutar aplicaciones Apache Flink en AWS.

nuño afonso es ingeniero de desarrollo de software en AWS. Trabaja en Amazon Kinesis Data Analytics, la oferta administrada para ejecutar aplicaciones Apache Flink en AWS.

punto_img

Información más reciente

punto_img