Yann Moisan
Yann Moisan

Follow

May 29, 2018 · 9 min read

Spark is the core component of Teads’s Machine Learning stack. We use it for many ML applications, from ad performance predictions to user Look-alike Modeling. Nous utilisons également Spark pour traiter des tâches intensives telles que l’extension de segment multi-appareils ou la transformation de Parquet en SSTables pour charger des données dans Cassandra.

En travaillant avec Spark, nous atteignons régulièrement les limites des ressources de nos clusters en termes de mémoire, de disque ou de CPU. Un scale-out ne fait que repousser le problème, nous devons donc nous salir les mains.

Voici une collection de bonnes pratiques et de conseils d’optimisation pour Spark 2.2.0 afin d’obtenir de meilleures performances et un code Spark plus propre, couvrant:

  • Comment tirer parti de Tungsten,
  • Analyse du plan d’exécution,
  • Gestion des données (mise en cache, diffusion),
  • Optimisations liées au Cloud (y compris S3).

Mise à jour 07/12/2018, voir aussi la deuxième partie couvrant les astuces de dépannage et la gestion des sources de données externes.

C’est du bon sens, mais la meilleure façon d’améliorer les performances du code est d’embrasser les forces de Spark. L’un d’eux est le tungstène.

Standard depuis la version 1.5, Tungsten est un composant Spark SQL qui offre des performances accrues en réécrivant les opérations Spark en bytecode, au moment de l’exécution. Tungsten supprime les fonctions virtuelles et tire parti des performances proches du métal nu en se concentrant sur l’efficacité du processeur et de la mémoire.

Pour tirer le meilleur parti de Tungsten, nous prêtons attention aux points suivants:

Utilisez des structures d’ensembles de données plutôt que des cadres de données

Pour vous assurer que notre code bénéficiera autant que possible des optimisations de Tungsten, nous utilisons l’API d’ensembles de données par défaut avec Scala (au lieu de RDD).

Dataset apporte le meilleur des deux mondes avec un mélange de transformations relationnelles (DataFrame) et fonctionnelles (RDD). Cette API est la plus à jour et ajoute une sécurité de type avec une meilleure gestion des erreurs et des tests unitaires beaucoup plus lisibles.

Cependant, il est livré avec un compromis car les fonctions de carte et de filtre fonctionnent moins bien avec cette API. Frameless est une solution prometteuse pour s’attaquer à cette limitation.

Évitez autant que possible les Fonctions définies par l’utilisateur (UDF)

L’utilisation d’un UDF implique une désérialisation pour traiter les données en Scala classique puis les resérialiser. Les UDF peuvent être remplacés par des fonctions SQL Spark, il y en a déjà beaucoup et de nouvelles sont régulièrement ajoutées.

Éviter les UDF peut ne pas générer d’améliorations instantanées, mais au moins cela évitera de futurs problèmes de performances, si le code change. De plus, en utilisant des fonctions SQL Spark intégrées, nous réduisons nos efforts de test car tout est effectué du côté de Spark. Ces fonctions sont conçues par des experts de la JVM, de sorte que les UDF ne sont pas susceptibles d’obtenir de meilleures performances.

Par exemple, le code suivant peut être remplacé par la fonction coalesce intégrée:

def currency = udf(
(currencySub: String, currencyParent: String) ⇒
Option(currencyParent) match {
case Some(curr) ⇒ curr
case _ ⇒ currencySub
}
)

Lorsqu’il n’y a pas de remplacement intégré, il est toujours possible d’implémenter et d’étendre la classe d’expression de Catalyst (l’optimiseur SQL de Spark). Cela fonctionnera bien avec la génération de code. Pour plus de détails, Chris Fregly en a parlé ici (voir diapositive 56). En faisant cela, nous accédons directement au format Tungsten, il résout le problème de sérialisation et les performances des bosses.

Évitez les fonctions d’agrégat définies par l’utilisateur (UDAFs)

Une UDAF génère des opérations de trigregate qui sont nettement plus lentes que HashAggregate. Par exemple, ce que nous faisons au lieu d’écrire un UDAF qui calcule une médiane utilise un équivalent intégré (quantile 0,5):

df.stat.approxQuantile("value”, Array(0.5), 0)

La fonction approxQuantile utilise une variante de l’algorithme de Greenwald-Khanna. Dans notre cas, il a fini par être 10 fois plus rapide que l’UDAF équivalent.

Évitez les UDFS ou les UDAFs qui exécutent plus d’une chose

Les principes de l’artisanat logiciel s’appliquent évidemment lors de l’écriture de choses Big Data (faites une chose et faites-le bien). En divisant les UDF, nous pouvons utiliser des fonctions intégrées pour une partie du code résultant. Cela simplifie également considérablement les tests.

2- Regardez sous le capot

Analyser le plan d’exécution de Spark est un moyen facile de repérer les améliorations potentielles. Ce plan est composé d’étapes, qui sont les unités physiques d’exécution dans Spark. Lorsque nous refactorisons notre code, la première chose que nous recherchons est un nombre anormal d’étapes. Un plan suspect peut nécessiter 10 étapes au lieu de 2-3 pour une opération de jointure de base entre deux trames de données.

Dans Spark et plus généralement dans l’informatique distribuée, l’envoi de données sur le réseau (alias Shuffle dans Spark) est l’action la plus coûteuse. Les brassages sont coûteux car ils impliquent des E / S de disque, la sérialisation des données et des E / S réseau. Ils sont nécessaires pour des opérations comme Join ou groupBy et se produisent entre les étapes.

Compte tenu de cela, réduire le nombre d’étapes est un moyen évident d’optimiser un travail. Nous utilisons le.commande explain (true) pour afficher le plan d’exécution détaillant toutes les étapes (étapes) impliquées pour un travail. Voici un exemple:

Simple execution plan example

The Directed Acyclic Graph (DAG) in Spark UI can also be used to visualize the task repartition in each stage.

Un exemple DAG très simple — Crédits d’image

figcaption>

L’optimisation repose beaucoup sur notre connaissance des données et de leur traitement (incl. logique métier). L’une des limites de l’optimisation SQL de Spark avec Catalyst est qu’elle utilise des règles « mécaniques » pour optimiser le plan d’exécution (en 2.2.0).

Comme beaucoup d’autres, nous attendions un moteur d’optimisation basé sur les coûts au-delà de la sélection des jointures de diffusion. Il semble maintenant disponible en 2.3.0, nous devrons regarder cela.

3- Connaissez vos données et gérez-les efficacement

Nous avons vu comment améliorer les performances des tâches en examinant le plan d’exécution, mais il existe également de nombreuses améliorations possibles du côté des données.

Ensembles de données hautement déséquilibrés

Pour vérifier rapidement si tout va bien, nous examinons la durée d’exécution de chaque tâche et recherchons des temps de processus hétérogènes. Si l’une des tâches est nettement plus lente que les autres, cela prolongera la durée globale du travail et gaspillera les ressources des exécuteurs les plus rapides.

Il est assez facile de vérifier la durée minimale, maximale et médiane dans l’interface utilisateur Spark. Voici un exemple équilibré :

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée.