Logotipo de Zephyrnet

Cómo Goldman Sachs creó el etiquetado de personas con Apache Flink en Amazon EMR

Fecha:

La Investigación de inversión global (GIR) de Goldman Sachs es responsable de proporcionar análisis e información a los clientes de la empresa en los mercados de acciones, renta fija, divisas y materias primas. Uno de los objetivos a largo plazo del equipo de GIR es brindar una experiencia personalizada y contenido de investigación relevante a sus usuarios de investigación. Anteriormente, con el fin de personalizar la experiencia del usuario para sus diversos tipos de clientes, GIR ofreció algunas ediciones distintas de su sitio de investigación que se proporcionaron a los usuarios en función de criterios amplios. Sin embargo, GIR no tenía ninguna forma de crear un flujo de contenido seleccionado personalmente a nivel de usuario individual. Para proporcionar esta funcionalidad, GIR quería implementar un sistema para filtrar activamente el contenido que se recomienda a sus usuarios por usuario, según características como el cargo del usuario o la región de trabajo. Tener este tipo de sistema mejoraría la experiencia del usuario y simplificaría los flujos de trabajo de los usuarios de investigación de GIR, al reducir la cantidad de tiempo y esfuerzo necesarios para encontrar el contenido de investigación que necesitan.

El primer paso para lograr esto es clasificar directamente a los usuarios de investigación de GIR en función de sus perfiles y lectores. Con ese fin, GIR creó un sistema para etiquetar a los usuarios con personas. Cada persona representa un tipo o clasificación con la que se puede etiquetar a los usuarios individuales, según ciertos criterios. Por ejemplo, GIR tiene una serie de personas para clasificar el cargo de un usuario, y un usuario etiquetado con la persona "Director de inversiones" tendrá un contenido de investigación diferente resaltado y tendrá una experiencia de sitio diferente en comparación con uno que esté etiquetado con la persona "Director de inversiones". Tesorero”. Este sistema de etiquetado de personas puede llevar a cabo de manera eficiente las operaciones de datos requeridas para etiquetar a los usuarios, así como crear nuevas personas según sea necesario para adaptarse a los casos de uso a medida que surgen.

En esta publicación, observamos cómo GIR implementó este sistema usando EMR de Amazon.

Desafío

Dada la cantidad de contactos (es decir, millones) y la creciente cantidad de publicaciones mantenidas en el almacén de datos de investigación de GIR, crear un sistema para clasificar usuarios y recomendar contenido es un desafío de escalabilidad. Una persona recién creada podría aplicarse potencialmente a casi todos los contactos, en cuyo caso sería necesario realizar una operación de etiquetado en varios millones de entradas de datos. En general, la cantidad de contactos, la complejidad de los datos almacenados por contacto y la cantidad de criterios para la personalización solo pueden aumentar. Para preparar su flujo de trabajo para el futuro, GIR necesitaba asegurarse de que su solución pudiera manejar el procesamiento de grandes cantidades de datos como un caso esperado y frecuente.

El objetivo comercial de GIR es admitir dos tipos de flujos de trabajo para los criterios de clasificación: ad hoc y en curso. Un criterio ad hoc hace que los usuarios que actualmente se ajustan a la condición del criterio de definición sean inmediatamente etiquetados con la persona requerida, y está destinado a facilitar el etiquetado único de contactos específicos. Por otro lado, un criterio continuo es un proceso continuo que etiqueta automáticamente a los usuarios con una persona si un cambio en sus atributos hace que se ajusten a la condición del criterio. El siguiente diagrama ilustra el flujo de personalización deseado:

En el resto de esta publicación, nos enfocamos en el diseño e implementación del flujo de trabajo ad hoc de GIR.

Apache Flink en Amazon EMR

Para satisfacer las demandas de escalabilidad de GIR, determinaron que EMR de Amazon era el que mejor se adaptaba a su caso de uso, ya que se trataba de una plataforma de big data gestionada destinada a procesar grandes cantidades de datos utilizando tecnologías de código abierto como Apache Flink. Aunque GIR evaluó algunas otras opciones que abordaron sus preocupaciones de escalabilidad (como AWS Glue), GIR eligió Amazon EMR por su facilidad de integración en sus sistemas existentes y la posibilidad de adaptarse a flujos de trabajo por lotes y de transmisión.

Apache Flink es un motor de procesamiento por lotes y flujo distribuido de big data de código abierto que procesa eficientemente datos de eventos continuos. Flink ofrece garantías de una sola vez, alto rendimiento y baja latencia, y es adecuado para manejar flujos de datos masivos. Además, Flink proporciona muchas API fáciles de usar y mitiga la necesidad de que el programador se preocupe por las fallas. Sin embargo, construir y mantener una canalización basada en Flink conlleva una sobrecarga operativa y requiere una experiencia considerable, además de aprovisionar recursos físicos.

Amazon EMR permite a los usuarios crear, operar y escalar entornos de big data como Apache Flink de manera rápida y rentable. Podemos optimizar costos usando Escalamiento administrado de Amazon EMR para aumentar o disminuir automáticamente los nodos del clúster en función de la carga de trabajo. En el caso de uso de GIR, sus usuarios deben poder activar operaciones de etiquetado personal en cualquier momento y requieren un tiempo de finalización predecible para sus trabajos. Para ello, GIR decidió lanzar un clúster de ejecución prolongada, que permite enviar varios trabajos de Flink simultáneamente al mismo clúster.

Flujo de trabajo e infraestructura de etiquetado personal ad hoc

El siguiente diagrama ilustra la arquitectura del flujo de trabajo de etiquetado personal ad hoc de GIR en la nube de AWS.

Esta es una descripción general amplia, y los detalles específicos de las redes y la seguridad entre los componentes están fuera del alcance de esta publicación.

A un alto nivel, podemos discutir el flujo de trabajo de GIR en cuatro partes:

  1. Cargue los artefactos del trabajo de Flink en el clúster de EMR.
  2. Activar el trabajo Flink.
  3. Dentro del trabajo de Flink, transforme y luego almacene los datos del usuario.
  4. Monitoreo continuo.

Puede interactuar con Flink en Amazon EMR a través de la consola de Amazon EMR o la interfaz de línea de comandos de AWS (AWS CLI). Después de lanzar el clúster, GIR utilizó la API de Flink para interactuar y enviar trabajos a la aplicación Flink. La API de Flink proporcionó un poco más de funcionalidad y fue mucho más fácil de invocar desde una aplicación de AWS Lambda.

El objetivo final de la configuración es tener una canalización en la que los usuarios internos de GIR puedan realizar libremente solicitudes para actualizar los datos de contacto (que en este caso de uso es etiquetar o desetiquetar contactos con varias personas), y luego hacer que los datos de contacto actualizados se vuelvan a cargar en el Tienda de contactos GIR.

Cargue los artefactos de trabajo de Flink en Amazon EMR

GIR tiene un proyecto de GitLab local para administrar los contenidos de su trabajo de Flink. Para desencadenar la primera parte de su flujo de trabajo e implementar una nueva versión del trabajo de Flink en el clúster, se ejecuta una canalización de GitLab que primero crea un archivo .zip que contiene el archivo JAR, las propiedades y los archivos de configuración del trabajo de Flink.

El diagrama anterior muestra la secuencia de eventos que ocurre en la carga del trabajo:

  1. La canalización de GitLab se activa manualmente cuando se debe cargar un nuevo trabajo de Flink. Esto transfiere el archivo .zip que contiene el trabajo de Flink a un depósito de Amazon Simple Storage Service (Amazon S3) en la cuenta de GIR AWS, etiquetado como "artefactos de implementación de S3".
  2. Se activa una función de Lambda ("Cargar Lambda") en respuesta al evento de creación de Amazon S3.
  3. La función carga primero el JAR del trabajo de Flink en el clúster de Flink de Amazon EMR y recupera el ID de la aplicación para la sesión de Flink.
  4. Finalmente, la función carga el archivo de propiedades de la aplicación en un depósito S3 específico ("Propiedades del trabajo S3 Flink").

Activar el trabajo Flink

La segunda parte del flujo de trabajo maneja el envío del trabajo de Flink real al clúster cuando se generan solicitudes de trabajo. GIR tiene una aplicación web orientada al usuario llamada Personalization Workbench que proporciona la interfaz de usuario para llevar a cabo operaciones de etiquetado de personas. Los administradores y los usuarios internos de Goldman Sachs pueden crear solicitudes para etiquetar o desetiquetar contactos con personas a través de esta aplicación web. Cuando se envía una solicitud, se genera un archivo de datos que contiene los detalles de la solicitud.

Los pasos de este flujo de trabajo son los siguientes:

  1. La estación de trabajo de personalización envía los detalles de la solicitud de trabajo al depósito Flink Data S3, etiquetado como "S3 Flink data".
  2. Se activa una función Lambda ("Ejecutar Lambda") en respuesta al evento de creación de Amazon S3.
  3. La función primero lee el archivo de propiedades del trabajo cargado en el paso anterior para obtener la identificación del trabajo de Flink.
  4. Finalmente, la función realiza una llamada a la API para ejecutar el trabajo de Flink requerido.

Procesar datos

Los datos de contacto se procesan de acuerdo con las solicitudes de etiquetado de personas, y luego los datos transformados se vuelven a cargar en el almacén de contactos de GIR.

Los pasos de este flujo de trabajo son los siguientes:

  1. El trabajo de Flink primero lee el archivo de propiedades de la aplicación que se cargó como parte del primer paso.
  2. A continuación, lee el archivo de datos del segundo flujo de trabajo que contiene los datos personales y de contacto que se actualizarán. A continuación, el trabajo lleva a cabo el procesamiento de la operación de etiquetado o desetiquetado.
  3. Los resultados se vuelven a cargar en la tienda de contactos de GIR.
  4. Finalmente, tanto las solicitudes exitosas como las fallidas se vuelven a escribir en Amazon S3.

Monitoreo continuo

La parte final del flujo de trabajo general involucra el monitoreo continuo del clúster de EMR para garantizar que el flujo de trabajo de etiquetado de GIR sea estable y que el clúster esté en buen estado. Para garantizar que se mantenga el nivel más alto de seguridad con los datos de sus clientes, GIR quería evitar el acceso SSH sin restricciones a sus recursos de AWS. La restricción de acceder al nodo principal del clúster de EMR directamente a través de SSH significaba que GIR inicialmente no tenía visibilidad de los registros del nodo principal de EMR o de la interfaz web de Flink.

De forma predeterminada, Amazon EMR archiva los archivos de registro almacenados en el nodo principal en Amazon S3 en intervalos de 5 minutos. Debido a que esta canalización sirve como una plataforma central para procesar muchas solicitudes de etiquetado personal ad hoc a la vez, era crucial para GIR crear un sistema de monitoreo continuo adecuado que les permitiera diagnosticar rápidamente cualquier problema con el clúster.

Para lograr esto, GIR implementó dos soluciones de monitoreo:

  • GIR instaló un agente de Amazon CloudWatch en cada nodo de su clúster EMR a través de acciones de arranque. El agente de CloudWatch recopila y publica métricas de Flink en CloudWatch bajo un espacio de nombres de métrica personalizado, donde se pueden ver en la consola de CloudWatch. GIR configuró el archivo de configuración del agente de CloudWatch para capturar métricas relevantes, como la utilización de la CPU y el total de instancias de EMR en ejecución. El resultado es un clúster de EMR en el que las métricas se emiten a CloudWatch a un ritmo mucho más rápido que si se esperan vaciados periódicos de registros de S3.
  • También habilitaron la interfaz de usuario de Flink en modo de solo lectura enfrentando el nodo principal del clúster con un balanceador de carga de red y estableciendo conectividad desde la red local de Goldman Sachs. Este cambio permitió a GIR obtener visibilidad directa del estado de su clúster EMR en ejecución y trabajos en curso.

Observaciones, desafíos enfrentados y lecciones aprendidas

El esfuerzo de personalización marcó la adopción por primera vez de Amazon EMR dentro de GIR. Hasta la fecha, se han creado cientos de criterios de personalización en el entorno de producción de GIR. En términos de visitas web y tasa de clics, la participación del sitio con el contenido personalizado de GIR ha aumentado gradualmente desde la implementación del sistema de etiquetado de personas.

GIR enfrentó algunos desafíos notables durante el desarrollo, de la siguiente manera:

Reglas de grupo de seguridad restrictivas

De forma predeterminada, Amazon EMR crea sus grupos de seguridad con reglas que son menos restrictivas, porque Amazon EMR no puede anticipar la configuración personalizada específica para las reglas de entrada y salida requeridas por casos de uso individuales. Sin embargo, la administración adecuada de las reglas del grupo de seguridad es fundamental para proteger la canalización y los datos en el clúster. GIR usó grupos de seguridad administrados de forma personalizada para sus nodos de clúster EMR e incluyó solo las reglas de grupo de seguridad necesarias para la conectividad, a fin de cumplir con esta postura de seguridad más estricta.

AMI personalizada

Hubo desafíos para garantizar que los paquetes requeridos estuvieran disponibles cuando se usaban AMI de Amazon Linux personalizadas para Amazon EMR. Como parte de los controles SDLC de desarrollo de Goldman Sachs, todas las instancias de Amazon Elastic Compute Cloud (Amazon EC2) en cuentas de AWS propiedad de Goldman Sachs deben utilizar AMI internas creadas por Goldman Sachs. Cuando GIR comenzó el desarrollo, la única AMI compatible que estaba disponible bajo este control era una AMI mínima basada en la AMI mínima de Amazon Linux 2 disponible públicamente (amzn2-ami-minimal*-x86_64-ebs). Sin embargo, Amazon EMR recomienda utilizar la AMI de Linux de Amazon 2 predeterminada completa porque tiene todos los paquetes necesarios preinstalados. Esto resultó en varios errores de inicio sin una indicación clara de las bibliotecas faltantes.

GIR trabajó con el soporte de AWS para identificar y resolver el problema comparando las AMI mínimas y completas e instalando los 177 paquetes faltantes individualmente (consulte el apéndice para ver la lista completa de paquetes). Además, el proceso interno de creación de AMI de Goldman Sachs había configurado varios archivos relacionados con AMI con permisos de solo lectura. La restauración de estos permisos a acceso completo de lectura/escritura permitió a GIR iniciar con éxito su clúster.

Empleos de Flink estancados

Durante el lanzamiento de producción inicial de GIR, GIR experimentó un problema en el que su clúster EMR falló silenciosamente y provocó que sus funciones Lambda se agotaran. En una depuración adicional, GIR descubrió que este problema estaba relacionado con un Akka quarantine-after-silence ajuste de tiempo de espera. De forma predeterminada, se configuró en 48 horas, lo que provocó que los clústeres rechazaran más trabajos después de ese tiempo. GIR encontró una solución al establecer el valor de akka.jvm-exit-on-fatal-error a false en el archivo de configuración de Flink.

Conclusión

En esta publicación, discutimos cómo el equipo de GIR en Goldman Sachs configuró un sistema usando Apache Flink en Amazon EMR para llevar a cabo el etiquetado de usuarios con varias personas, a fin de seleccionar mejor las ofertas de contenido para esos usuarios. También cubrimos algunos de los desafíos que enfrentó GIR con la configuración de su clúster EMR. Esto representa un primer paso importante para proporcionar a los usuarios de GIR una curaduría completa de contenido personalizado en función de sus perfiles y lectores individuales.

AGRADECIMIENTOS

Los autores desean agradecer a los siguientes miembros de los equipos de AWS y GIR por su estrecha colaboración y orientación en esta publicación:

  • Elizabeth Byrnes, Directora General, GIR
  • Moon Wang, Director General, GIR
  • Ankur Gurha, Vicepresidente, GIR
  • Jeremiah O'Connor, arquitecto de soluciones, AWS
  • Ley Nezifort, Asociada, GIR
  • Shruthi Venkatraman, analista, GIR

Acerca de los autores

Sakthivel balasubramaniano es vicepresidente de Goldman Sachs en Nueva York. Tiene más de 16 años de experiencia en liderazgo tecnológico y trabajó en muchos proyectos de autorización, autenticación y personalización en toda la empresa. Bala impulsa la estrategia de ingeniería de datos y acceso de clientes de la división Global Investment Research, incluida la arquitectura, el diseño y las prácticas para permitir que las líneas de negocios tomen decisiones informadas e impulsen el valor. Es un innovador y un experto en el desarrollo y la entrega de software distribuido a gran escala que resuelve problemas del mundo real, con un éxito demostrado al visualizar e implementar una amplia gama de plataformas, productos y arquitectura altamente escalables.

Víctor Gan es Analista en Goldman Sachs en Nueva York. Victor se unió a la división Global Investment Research en 2020 después de graduarse de la Universidad de Cornell y ha sido responsable de desarrollar y aprovisionar la infraestructura en la nube para los sistemas de derechos de usuario de GIR. Se centra en el aprendizaje de nuevas tecnologías y la optimización de las implementaciones de sistemas en la nube.

manjula nagineni es Arquitecto de soluciones con AWS con sede en Nueva York. Trabaja con las principales instituciones de servicios financieros, diseñando y modernizando sus aplicaciones a gran escala mientras adopta los servicios en la nube de AWS. Le apasiona diseñar cargas de trabajo de big data de forma nativa en la nube. Tiene más de 20 años de experiencia en TI en desarrollo de software, análisis y arquitectura en múltiples dominios, como finanzas, fabricación y telecomunicaciones.

 
 


Apéndice

GIR ejecutó el siguiente comando para instalar los paquetes AMI faltantes:

yum install -y libevent.x86_64 python2-botocore.noarch device-mapper-event-libs.x86_64 bind-license.noarch libwebp.x86_64 sgpio.x86_64 rsync.x86_64 perl-podlators.noarch libbasicobjects.x86_64 langtable.noarch sssd-client.x86_64 perl-Time-Local.noarch dosfstools.x86_64 attr.x86_64 perl-macros.x86_64 hwdata.x86_64 gpm-libs.x86_64 libtirpc.x86_64 device-mapper-persistent-data.x86_64 libconfig.x86_64 setserial.x86_64 rdate.x86_64 bc.x86_64 amazon-ssm-agent.x86_64 virt-what.x86_64 zip.x86_64 lvm2-libs.x86_64 python2-futures.noarch perl-threads.x86_64 dmraid-events.x86_64 bridge-utils.x86_64 mdadm.x86_64 ec2-net-utils.noarch kbd.x86_64 libtiff.x86_64 perl-File-Path.noarch quota-nls.noarch libstoragemgmt-python.noarch man-pages-overrides.x86_64 python2-rsa.noarch perl-Pod-Usage.noarch psacct.x86_64 libnl3-cli.x86_64 libstoragemgmt-python-clibs.x86_64 tcp_wrappers.x86_64 yum-utils.noarch libaio.x86_64 mtr.x86_64 teamd.x86_64 hibagent.noarch perl-PathTools.x86_64 libxml2-python.x86_64 dmraid.x86_64 pm-utils.x86_64 amazon-linux-extras-yum-plugin.noarch strace.x86_64 bzip2.x86_64 perl-libs.x86_64 kbd-legacy.noarch perl-Storable.x86_64 perl-parent.noarch bind-utils.x86_64 libverto-libevent.x86_64 ntsysv.x86_64 yum-langpacks.noarch libjpeg-turbo.x86_64 plymouth-core-libs.x86_64 perl-threads-shared.x86_64 kernel-tools.x86_64 bind-libs-lite.x86_64 screen.x86_64 perl-Text-ParseWords.noarch perl-Encode.x86_64 libcollection.x86_64 xfsdump.x86_64 perl-Getopt-Long.noarch man-pages.noarch pciutils.x86_64 python2-s3transfer.noarch plymouth-scripts.x86_64 device-mapper-event.x86_64 json-c.x86_64 pciutils-libs.x86_64 perl-Exporter.noarch libdwarf.x86_64 libpath_utils.x86_64 perl.x86_64 libpciaccess.x86_64 hunspell-en-US.noarch nfs-utils.x86_64 tcsh.x86_64 libdrm.x86_64 awscli.noarch cryptsetup.x86_64 python-colorama.noarch ec2-hibinit-agent.noarch usermode.x86_64 rpcbind.x86_64 perl-File-Temp.noarch libnl3.x86_64 generic-logos.noarch python-kitchen.noarch words.noarch kbd-misc.noarch python-docutils.noarch hunspell-en.noarch dyninst.x86_64 perl-Filter.x86_64 libnfsidmap.x86_64 kpatch-runtime.noarch python-simplejson.x86_64 time.x86_64 perl-Pod-Escapes.noarch perl-Pod-Perldoc.noarch langtable-data.noarch vim-enhanced.x86_64 bind-libs.x86_64 boost-system.x86_64 jbigkit-libs.x86_64 binutils.x86_64 wget.x86_64 libdaemon.x86_64 ed.x86_64 at.x86_64 libref_array.x86_64 libstoragemgmt.x86_64 libteam.x86_64 hunspell.x86_64 python-daemon.noarch dmidecode.x86_64 perl-Time-HiRes.x86_64 blktrace.x86_64 bash-completion.noarch lvm2.x86_64 mlocate.x86_64 aws-cfn-bootstrap.noarch plymouth.x86_64 parted.x86_64 tcpdump.x86_64 sysstat.x86_64 vim-filesystem.noarch lm_sensors-libs.x86_64 hunspell-en-GB.noarch cyrus-sasl-plain.x86_64 perl-constant.noarch libini_config.x86_64 python-lockfile.noarch perl-Socket.x86_64 nano.x86_64 setuptool.x86_64 traceroute.x86_64 unzip.x86_64 perl-Pod-Simple.noarch langtable-python.noarch jansson.x86_64 pystache.noarch keyutils.x86_64 acpid.x86_64 perl-Carp.noarch GeoIP.x86_64 python2-dateutil.noarch systemtap-runtime.x86_64 scl-utils.x86_64 python2-jmespath.noarch quota.x86_64 perl-HTTP-Tiny.noarch ec2-instance-connect.noarch vim-common.x86_64 libsss_idmap.x86_64 libsss_nss_idmap.x86_64 perl-Scalar-List-Utils.x86_64 gssproxy.x86_64 lsof.x86_64 ethtool.x86_64 boost-date-time.x86_64 python-pillow.x86_64 boost-thread.x86_64 yajl.x86_64

Fuente: https://aws.amazon.com/blogs/big-data/how-goldman-sachs-built-persona-tagging-using-apache-flink-on-amazon-emr/

punto_img

Información más reciente

punto_img