Yann Moisan
Yann Moisan

Follow

May 29, 2018 · 9 min read

Spark is the core component of Teads’s Machine Learning stack. We use it for many ML applications, from ad performance predictions to user Look-alike Modeling. También utilizamos Spark para procesar trabajos intensivos, como la extensión de segmentos entre dispositivos o la transformación de parquet a SSTables para cargar datos en Cassandra.

Al trabajar con Spark, alcanzamos regularmente los límites de los recursos de nuestros clústeres en términos de memoria, disco o CPU. Una ampliación de escala solo hace retroceder el problema, por lo que tenemos que ensuciarnos las manos.

Aquí hay una colección de mejores prácticas y consejos de optimización para Spark 2.2.0 para lograr un mejor rendimiento y un código Spark más limpio, que cubre:

  • Cómo aprovechar Tungsten,
  • Análisis del plan de ejecución,
  • Gestión de datos (almacenamiento en caché, difusión),
  • Optimizaciones relacionadas con la nube (incluida S3).

Actualización 07/12/2018, consulte también la segunda parte que trata sobre trucos de solución de problemas y gestión de fuentes de datos externas.

Es de sentido común, pero la mejor manera de mejorar el rendimiento del código es adoptar las fortalezas de Spark. Uno de ellos es Tungsteno.

Estándar desde la versión 1.5, Tungsten es un componente SQL de Spark que proporciona un mayor rendimiento al reescribir las operaciones de Spark en código de bytes, en tiempo de ejecución. Tungsten suprime las funciones virtuales y aprovecha el rendimiento casi de metal al centrarse en la eficiencia de la CPU y la memoria de los trabajos.

Para aprovechar al máximo Tungsten, prestamos atención a lo siguiente:

Utilizar estructuras de conjuntos de datos en lugar de marcos de datos

Para asegurarnos de que nuestro código se beneficie lo máximo posible de las optimizaciones de Tungsten, utilizamos la API de conjuntos de datos predeterminada con Scala (en lugar de RDD).

Dataset ofrece lo mejor de ambos mundos con una mezcla de transformaciones relacionales (DataFrame) y funcionales (RDD). Esta API es la más actualizada y agrega seguridad de tipo junto con un mejor manejo de errores y pruebas unitarias mucho más legibles.

Sin embargo, viene con una compensación, ya que las funciones de mapa y filtro funcionan peor con esta API. Sin marco es una solución prometedora para hacer frente a esta limitación.

Evite las Funciones definidas por el usuario (UDF) en la medida de lo posible

El uso de un UDF implica deserialización para procesar los datos en Scala clásica y luego reserializarlos. Los UDF se pueden reemplazar por funciones SQL de Spark, ya hay muchas y se agregan nuevas regularmente.

Evitar los UDF podría no generar mejoras instantáneas, pero al menos evitará problemas de rendimiento futuros, en caso de que el código cambie. Además, mediante el uso de funciones SQL de Spark integradas, reducimos nuestro esfuerzo de prueba, ya que todo se realiza del lado de Spark. Estas funciones están diseñadas por expertos en JVM, por lo que no es probable que los UDF logren un mejor rendimiento.

Por ejemplo, el siguiente código se puede reemplazar por la función coalesce integrada:

def currency = udf(
(currencySub: String, currencyParent: String) ⇒
Option(currencyParent) match {
case Some(curr) ⇒ curr
case _ ⇒ currencySub
}
)

Cuando no hay reemplazo incorporado, todavía es posible implementar y extender la clase de expresión de Catalyst (optimizador SQL de Spark). Jugará bien con la generación de código. Para más detalles, Chris Fregly habló de ello aquí (ver diapositiva 56). Al hacer esto, accedemos directamente al formato de Tungsteno, que resuelve el problema de serialización y reduce el rendimiento.

Evite las Funciones Agregadas definidas por el usuario (UDAF)

Un UDAF genera operaciones de clasificación que son significativamente más lentas que HashAggregate. Por ejemplo, lo que hacemos en lugar de escribir un UDAF que calcula una mediana es usar un equivalente incorporado (cuantil 0,5):

df.stat.approxQuantile("value”, Array(0.5), 0)

La función approxQuantile utiliza una variación del algoritmo Greenwald-Khanna. En nuestro caso, terminó siendo 10 veces más rápido que el UDAF equivalente.

Evite los UDF o UDAF que realizan más de una cosa

Los principios de artesanía del software obviamente se aplican al escribir cosas de big data (haga una cosa y hágalo bien). Al dividir UDF, podemos usar funciones integradas para una parte del código resultante. También simplifica enormemente las pruebas.

2-Mira debajo del capó

Analizar el plan de ejecución de Spark es una forma fácil de detectar posibles mejoras. Este plan se compone de etapas, que son las unidades físicas de ejecución en Chispa. Cuando refactorizamos nuestro código, lo primero que buscamos es un número anormal de etapas. Un plan sospechoso puede requerir 10 etapas en lugar de 2-3 para una operación de unión básica entre dos marcos de datos.

En Spark y más generalmente en computación distribuida, enviar datos a través de la red (también conocido como Shuffle en Spark) es la acción más cara. Las mezclas son costosas, ya que implican E/S de disco, serialización de datos y E/S de red.Son necesarias para operaciones como Unirse o agruparse y ocurren entre etapas.

Teniendo en cuenta esto, reducir el número de etapas es una forma obvia de optimizar un trabajo. Usamos el .comando explicar (verdadero) para mostrar el plan de ejecución que detalla todos los pasos (etapas) involucrados en un trabajo. Aquí hay un ejemplo:

Simple execution plan example

The Directed Acyclic Graph (DAG) in Spark UI can also be used to visualize the task repartition in each stage.

muy simple DAG ejemplo — créditos de la Imagen

La optimización se basa mucho en nuestro conocimiento de los datos y su procesamiento (incl. lógica de negocios). Uno de los límites de la optimización SQL de Spark con Catalyst es que utiliza reglas «mecánicas» para optimizar el plan de ejecución (en 2.2.0).

Al igual que muchos otros, estábamos esperando un motor de optimización basado en costos más allá de la selección de unirse a broadcast. Ahora parece disponible en 2.3.0, tendremos que verlo.

3-Conozca sus datos y adminístrelos de manera eficiente

Hemos visto cómo mejorar el rendimiento del trabajo mirando el plan de ejecución, pero también hay muchas mejoras posibles en el lado de los datos.

Conjuntos de datos altamente desequilibrados

Para comprobar rápidamente si todo está bien, revisamos la duración de ejecución de cada tarea y buscamos el tiempo de proceso heterogéneo. Si una de las tareas es significativamente más lenta que las demás, extenderá la duración total del trabajo y desperdiciará los recursos de los ejecutores más rápidos.

Es bastante fácil comprobar la duración mínima, máxima y mediana en la interfaz de usuario de Spark. Este es un ejemplo equilibrado:

Deja una respuesta

Tu dirección de correo electrónico no será publicada.