Logotipo de Zephyrnet

Extraiga series temporales de datos meteorológicos satelitales con AWS Lambda | Servicios web de Amazon

Fecha:

La extracción de series de tiempo en coordenadas geográficas dadas de datos satelitales o de predicción meteorológica numérica puede ser un desafío debido al volumen de datos y a su naturaleza multidimensional (tiempo, latitud, longitud, altura, múltiples parámetros). Este tipo de procesamiento se puede encontrar en la investigación meteorológica y climática, pero también en aplicaciones como la energía fotovoltaica y eólica. Por ejemplo, las series temporales que describen la cantidad de energía solar que llega a puntos geográficos específicos pueden ayudar a diseñar plantas de energía fotovoltaica, monitorear su funcionamiento y detectar pérdidas de rendimiento.

Una generalización del problema podría establecerse de la siguiente manera: ¿cómo podemos extraer datos a lo largo de una dimensión que no es la clave de partición de un gran volumen de datos multidimensionales? Para datos tabulares, este problema se puede resolver fácilmente con Pegamento AWS, que puede usar para crear un trabajo para filtrar y volver a particionar los datos, como se muestra al final de esta publicación. Pero, ¿qué sucede si los datos son multidimensionales y se proporcionan en un formato específico de dominio, como en el caso de uso que queremos abordar?

AWS Lambda es un servicio informático sin servidor que le permite ejecutar código sin aprovisionar ni administrar servidores. Con Funciones de paso de AWS, puede iniciar ejecuciones paralelas de funciones de Lambda. Esta publicación muestra cómo puede usar estos servicios para ejecutar tareas paralelas, con el ejemplo de la extracción de series temporales de un gran volumen de datos meteorológicos satelitales almacenados en Servicio de almacenamiento simple de Amazon (Amazon S3). También utiliza AWS Glue para consolidar los archivos producidos por las tareas paralelas.

Tenga en cuenta que Lambda es un motor sin servidor de propósito general. No ha sido diseñado específicamente para tareas pesadas de transformación de datos. Lo estamos usando aquí después de haber confirmado lo siguiente:

  • La duración de la tarea es predecible y es inferior a 15 minutos, que es el tiempo de espera máximo para las funciones de Lambda
  • El caso de uso es simple, con requisitos informáticos bajos y sin dependencias externas que puedan ralentizar el proceso.

Trabajamos sobre un conjunto de datos proporcionado por EUMESAT: el flujo de onda corta de superficie descendente total y difusa de MSG (MDSSFTD). Este conjunto de datos contiene datos satelitales a intervalos de 15 minutos, en netcdf formato, lo que representa aproximadamente 100 GB durante 1 año.

Procesamos el año 2018 para extraer series temporales sobre 100 puntos geográficos.

Resumen de la solución

Para lograr nuestro objetivo, utilizamos funciones Lambda paralelas. Cada función de Lambda procesa 1 día de datos: 96 archivos que representan un volumen de aproximadamente 240 MB. Luego tenemos archivos 365 que contienen los datos extraídos para cada día, y usamos AWS Glue para concatenarlos durante todo el año y dividirlos en los 100 puntos geográficos. Este flujo de trabajo se muestra en el siguiente diagrama de arquitectura.

Despliegue de esta solución: En esta publicación, proporcionamos instrucciones paso a paso para implementar cada parte de la arquitectura manualmente. Si prefiere un despliegue automático, hemos preparado para usted un Repositorio de Github que contiene la infraestructura requerida como plantilla de código.

El conjunto de datos se divide por día, con YYYY/MM/DD/ prefijos Cada partición contiene 96 archivos que serán procesados ​​por una función Lambda.

Usamos Step Functions para lanzar el procesamiento paralelo de los 365 días del año 2018. Step Functions ayuda a los desarrolladores a usar los servicios de AWS para crear aplicaciones distribuidas, automatizar procesos, orquestar microservicios y crear canalizaciones de datos y aprendizaje automático (ML).

Pero antes de comenzar, debemos descargar el conjunto de datos y cargarlo en un depósito S3.

Requisitos previos

Cree un depósito S3 para almacenar el conjunto de datos de entrada, las salidas intermedias y las salidas finales de la extracción de datos.

Descargue el conjunto de datos y cárguelo en Amazon S3

Un registro gratuito en el sitio web del proveedor de datos es necesario para descargar el conjunto de datos. Para descargar el conjunto de datos, puede usar el siguiente comando desde una terminal de Linux. Proporcione las credenciales que obtuvo en el registro. Su terminal Linux podría estar en su máquina local, pero también puede usar un Nube de AWS9 instancia. Asegúrese de tener al menos 100 GB de almacenamiento gratuito para manejar todo el conjunto de datos.

wget -c --no-check-certificate -r -np -nH --user=[YOUR_USERNAME] --password=[YOUR_PASSWORD] -R "*.html, *.tmp" https://datalsasaf.lsasvcs.ipma.pt/PRODUCTS/MSG/MDSSFTD/NETCDF/2018/

Debido a que el conjunto de datos es bastante grande, esta descarga podría demorar mucho tiempo. Mientras tanto, puede preparar los próximos pasos.

Cuando se completa la descarga, puede cargar el conjunto de datos en un depósito S3 con el siguiente comando:

aws s3 cp ./PRODUCTS/ s3://[YOUR_BUCKET_NAME]/ --recursive

Si usa credenciales temporales, es posible que caduquen antes de que se complete la copia. En este caso, puede reanudar utilizando el comando de sincronización aws s3.

Ahora que los datos están en Amazon S3, puede eliminar el directorio que se descargó de su máquina Linux.

Crear las funciones Lambda

Para obtener instrucciones paso a paso sobre cómo crear una función Lambda, consulte Empezando con Lambda.

La primera función de Lambda en el flujo de trabajo genera la lista de días que queremos procesar:

from datetime import datetime
from datetime import timedelta def lambda_handler(event, context): ''' Generate a list of dates (string format) ''' begin_date_str = "20180101" end_date_str = "20181231" # carry out conversion between string # to datetime object current_date = datetime.strptime(begin_date_str, "%Y%m%d") end_date = datetime.strptime(end_date_str, "%Y%m%d") result = [] while current_date <= end_date: current_date_str = current_date.strftime("%Y%m%d") result.append(current_date_str) # adding 1 day current_date += timedelta(days=1) return result

Luego usamos el Estado del mapa de funciones de paso para procesar cada día. El estado Map lanzará una función Lambda para cada elemento devuelto por la función anterior y pasará este elemento como entrada. Estas funciones de Lambda se ejecutarán simultáneamente para todos los elementos de la lista. El tiempo de procesamiento para todo el año será, por lo tanto, idéntico al tiempo necesario para procesar 1 solo día, lo que permitirá la escalabilidad para largas series temporales y grandes volúmenes de datos de entrada.

El siguiente es un ejemplo de código para la función Lambda que procesa cada día:

import boto3
import netCDF4 as nc
import numpy as np
import pandas as pd
from datetime import datetime
import time
import os
import random # Bucket containing input data
INPUT_BUCKET_NAME = "[INPUT_BUCKET_NAME]" # example: "my-bucket-name"
LOCATION = "[PREFIX_OF_INPUT_DATA_WITH_TRAILING_SLASH]" # example: "MSG/MDSSFTD/NETCDF/" # Local output files
TMP_FILE_NAME = "/tmp/tmp.nc"
LOCAL_OUTPUT_FILE = "/tmp/dataframe.parquet" # Bucket for output data
OUTPUT_BUCKET = "[OUTPUT_BUCKET_NAME]"
OUTPUT_PREFIX = "[PREFIX_OF_OUTPUT_DATA_WITH_TRAILING_SLASH]" # example: "output/intermediate/" # Create 100 random coordinates
random.seed(10)
coords = [(random.randint(1000,2500), random.randint(1000,2500)) for _ in range(100)] client = boto3.resource('s3')
bucket = client.Bucket(INPUT_BUCKET_NAME) def date_to_partition_name(date): ''' Transform a date like "20180302" to partition like "2018/03/02/" ''' d = datetime.strptime(date, "%Y%m%d") return d.strftime("%Y/%m/%d/") def lambda_handler(event, context): # Get date from input date = str(event) print("Processing date: ", date) # Initialize output dataframe COLUMNS_NAME = ['time', 'point_id', 'DSSF_TOT', 'FRACTION_DIFFUSE'] df = pd.DataFrame(columns = COLUMNS_NAME) prefix = LOCATION + date_to_partition_name(date) print("Loading files from prefix: ", prefix) # List input files (weather files) objects = bucket.objects.filter(Prefix=prefix) keys = [obj.key for obj in objects] # For each file for key in keys: # Download input file from S3 bucket.download_file(key, TMP_FILE_NAME) print("Processing: ", key) try: # Load the dataset with netcdf library dataset = nc.Dataset(TMP_FILE_NAME) # Get values from the dataset for our list of geographical coordinates lats, lons = zip(*coords) data_1 = dataset['DSSF_TOT'][0][lats, lons] data_2 = dataset['FRACTION_DIFFUSE'][0][lats, lons] # Prepare data to add it into the output dataframe nb_points = len(lats) data_time = dataset.__dict__['time_coverage_start'] time_list = [data_time for _ in range(nb_points)] point_id_list = [i for i in range(nb_points)] tuple_list = list(zip(time_list, point_id_list, data_1, data_2)) # Add data to the output dataframe new_data = pd.DataFrame(tuple_list, columns = COLUMNS_NAME) df = pd.concat ([df, new_data]) except OSError: print("Error processing file: ", key) # Replace masked by NaN (otherwise we cannot save to parquet) df = df.applymap(lambda x: np.NaN if type(x) == np.ma.core.MaskedConstant else x) # Save to parquet print("Writing result to tmp parquet file: ", LOCAL_OUTPUT_FILE) df.to_parquet(LOCAL_OUTPUT_FILE) # Copy result to S3 s3_output_name = OUTPUT_PREFIX + date + '.parquet' s3_client = boto3.client('s3') s3_client.upload_file(LOCAL_OUTPUT_FILE, OUTPUT_BUCKET, s3_output_name)

Debe asociar un rol a la función de Lambda para autorizarla a acceder a los depósitos de S3. Debido a que el tiempo de ejecución es de aproximadamente un minuto, también debe configurar el tiempo de espera de la función Lambda en consecuencia. Pongámoslo en 5 minutos. También aumentamos la memoria asignada a la función Lambda a 2048 MB, que la biblioteca netcdf4 necesita para extraer varios puntos a la vez de los datos del satélite.

Esta función Lambda depende de las bibliotecas pandas y netcdf4. Se pueden instalar como Lambda ponedoras. La biblioteca pandas se proporciona como una capa administrada por AWS. La biblioteca netcdf4 deberá empaquetarse en un capa personalizada.

Configurar el flujo de trabajo de Step Functions

Después de crear las dos funciones Lambda, puede diseñar el flujo de trabajo de Step Functions en el editor visual mediante los bloques Lambda Invoke y Map, como se muestra en el siguiente diagrama.

En el bloque de estado del mapa, elija Repartido modo de procesamiento y aumentar el límite de concurrencia a 365 en Configuración de tiempo de ejecución. Esto permitirá el procesamiento paralelo de todos los días.

La cantidad de funciones de Lambda que se pueden ejecutar simultáneamente está limitada para cada cuenta. Su cuenta puede tener una cuota insuficiente. Puede Solicitar un aumento de la cuota.

Lanzar la máquina de estado

Ahora puede iniciar la máquina de estado. En la consola de Step Functions, navegue a su máquina de estado y elija Iniciar ejecución para ejecutar su flujo de trabajo.

Esto activará una ventana emergente en la que puede ingresar una entrada opcional para su máquina de estado. Para esta publicación, puede dejar los valores predeterminados y elegir Iniciar ejecución.

La máquina de estado debería tardar entre 1 y 2 minutos en ejecutarse, tiempo durante el cual podrá supervisar el progreso de su flujo de trabajo. Puede seleccionar uno de los bloques en el diagrama e inspeccionar su entrada, salida y otra información en tiempo real, como se muestra en la siguiente captura de pantalla. Esto puede ser muy útil para fines de depuración.

Cuando todos los bloques se vuelven verdes, la máquina de estado está completa. En este paso, hemos extraído los datos de 100 puntos geográficos para un año completo de datos satelitales.

En el bucket de S3 configurado como salida para la función Lambda de procesamiento, podemos verificar que tenemos un archivo por día, que contiene los datos de los 100 puntos.

Transforme los datos por día en datos por punto geográfico con AWS Glue

Por ahora, tenemos un archivo por día. Sin embargo, nuestro objetivo es obtener series temporales para cada punto geográfico. Esta transformación implica cambiar la forma en que se dividen los datos. De una partición de día, tenemos que pasar a una partición de punto geográfico.

Afortunadamente, esta operación se puede realizar de forma muy sencilla con AWS Glue.

  1. En la consola de AWS Glue Studio, cree un nuevo trabajo y elija Visual con un lienzo en blanco.

Para este ejemplo, creamos un trabajo simple con un bloque de origen y destino.

  1. Agregue un bloque de origen de datos.
  2. En Propiedades de la fuente de datos seleccione Ubicación S3 para Tipo de fuente S3.
  3. URL de S3, ingrese la ubicación donde creó sus archivos en el paso anterior.
  4. Formato de datos, mantenga el valor predeterminado como parquet.
  5. Elige Inferir esquema y ver la esquema de salida pestaña para confirmar que el esquema se ha detectado correctamente.

  1. Agregue un bloque de destino de datos.
  2. En Propiedades de destino de datos pestaña, para Formato, escoger parquet.
  3. Tipo de compresión, escoger Rápido.
  4. Ubicación de destino S3, ingrese la ubicación de destino de S3 para sus archivos de salida.

¡Ahora tenemos que configurar la magia!

  1. Agregue una clave de partición y elija point_id.

Esto le dice a AWS Glue cómo desea que se particionen sus datos de salida. AWS Glue dividirá automáticamente los datos de salida según el point_id columna, y por lo tanto obtendremos una carpeta para cada punto geográfico, que contiene toda la serie temporal para este punto según lo solicitado.

Para finalizar la configuración, necesitamos asignar un Gestión de identidades y accesos de AWS (IAM) al trabajo de AWS Glue.

  1. Elige Trabajos detalles y para Rol de IAM¸ elija un rol que tenga permisos para leer desde el depósito de S3 de entrada y para escribir en el depósito de S3 de salida.

Es posible que deba crear el rol en la consola de IAM si aún no tiene uno adecuado.

  1. Ingrese un nombre para nuestro trabajo de AWS Glue, guárdelo y ejecútelo.

Podemos monitorear la ejecución eligiendo Ejecutar detalles. Debe tomar de 1 a 2 minutos para completarse.

Resultados finales

Una vez que el trabajo de AWS Glue se realiza correctamente, podemos verificar en el depósito de S3 de salida que tenemos una carpeta para cada punto geográfico, que contiene algunos archivos de Parquet con todo el año de datos, como se esperaba.

Para cargar la serie temporal de un punto específico en un marco de datos de pandas, puede utilizar el biblioteca awswrangler de su código Python:

import awswrangler as wr
import pandas as pd # Retrieving the data directly from Amazon S3
df = wr.s3.read_parquet("s3://[BUCKET]/[PREFIX]/", dataset=True)

Si desea probar este código ahora, puede crear un instancia de cuaderno in Amazon SageMakery, a continuación, abra un cuaderno de Jupyter Notebook. La siguiente captura de pantalla ilustra la ejecución del código anterior en un cuaderno Jupyter.

Como podemos ver, ¡hemos extraído con éxito la serie temporal para puntos geográficos específicos!

Limpiar

Para evitar incurrir en cargos futuros, elimine los recursos que ha creado:

  • El cubo S3
  • El trabajo de AWS Glue
  • La máquina de estados de Step Functions
  • Las dos funciones Lambda
  • La instancia del cuaderno de SageMaker

Conclusión

En esta publicación, mostramos cómo usar Lambda, Step Functions y AWS Glue para ETL (extracción, transformación y carga) sin servidor en un gran volumen de datos meteorológicos. La arquitectura propuesta permite extraer y volver a particionar los datos en solo unos minutos. Es escalable y rentable, y se puede adaptar a otros casos de uso de procesamiento de datos y ETL.

¿Interesado en aprender más sobre los servicios presentados en esta publicación? Puede encontrar laboratorios prácticos para mejorar su conocimiento con Talleres de AWS. Además, consulte la documentación oficial de Pegamento AWS, lambday Funciones de paso. También puede descubrir más patrones arquitectónicos y mejores prácticas en Documentos técnicos y guías de AWS.


Sobre la autora

Lior Pérez es Arquitecto Principal de Soluciones en el equipo de Enterprise con sede en Toulouse, Francia. Le gusta ayudar a los clientes en su viaje de transformación digital, utilizando big data y aprendizaje automático para ayudarlos a resolver sus desafíos comerciales. También le apasiona personalmente la robótica y el IoT, y busca constantemente nuevas formas de aprovechar las tecnologías para la innovación.

punto_img

Información más reciente

punto_img