Yann Moisan
Yann Moisan

Follow

May 29, 2018 · 9 min read

Spark is the core component of Teads’s Machine Learning stack. We use it for many ML applications, from ad performance predictions to user Look-alike Modeling. Wir verwenden Spark auch für die Verarbeitung intensiver Jobs wie geräteübergreifende Segmenterweiterung oder Parkett-zu-SSTables-Transformation zum Laden von Daten in Cassandra.

Bei der Arbeit mit Spark stoßen wir regelmäßig an die Grenzen der Ressourcen unserer Cluster in Bezug auf Speicher, Festplatte oder CPU. Ein Scale-Out schiebt das Problem nur zurück, sodass wir uns die Hände schmutzig machen müssen.

Hier finden Sie eine Sammlung von Best Practices und Optimierungstipps für Spark 2.2.0, um eine bessere Leistung und einen saubereren Spark-Code zu erzielen:

  • Wie man Tungsten nutzt,
  • Ausführungsplananalyse,
  • Datenmanagement (Caching, Broadcasting),
  • Cloud-bezogene Optimierungen (einschließlich S3).

Update 07/12/2018, siehe auch den zweiten Teil über Tricks zur Fehlerbehebung und Verwaltung externer Datenquellen.

Es ist gesunder Menschenverstand, aber der beste Weg, die Code-Performance zu verbessern, besteht darin, die Stärken von Spark zu nutzen. Einer davon ist Wolfram.

Standard seit Version 1.5 ist Tungsten eine Spark SQL-Komponente, die durch das Umschreiben von Spark-Operationen in Bytecode zur Laufzeit eine höhere Leistung bietet. Tungsten unterdrückt virtuelle Funktionen und nutzt die Bare-Metal-Leistung, indem es sich auf die CPU- und Speichereffizienz konzentriert.

Um das Beste aus Tungsten herauszuholen, achten wir auf Folgendes:

Verwenden Sie Dataset-Strukturen anstelle von DataFrames

Um sicherzustellen, dass unser Code so viel wie möglich von Tungsten-Optimierungen profitiert, verwenden wir die Standard-Dataset-API mit Scala (anstelle von RDD).

Dataset bietet das Beste aus beiden Welten mit einer Mischung aus relationalen (DataFrame) und funktionalen (RDD) Transformationen. Diese API ist die aktuellste und bietet Typsicherheit sowie eine bessere Fehlerbehandlung und weitaus besser lesbare Komponententests.

Es gibt jedoch einen Kompromiss, da Karten- und Filterfunktionen mit dieser API schlechter funktionieren. Frameless ist eine vielversprechende Lösung, um diese Einschränkung anzugehen.

Vermeiden Sie benutzerdefinierte Funktionen (UDFs) so weit wie möglich

Die Verwendung einer UDF impliziert eine Deserialisierung, um die Daten in Classic Scala zu verarbeiten und dann erneut zu serialisieren. UDFs können durch Spark SQL-Funktionen ersetzt werden, von denen es bereits viele gibt und regelmäßig neue hinzugefügt werden.

Das Vermeiden von UDFs führt möglicherweise nicht zu sofortigen Verbesserungen, verhindert jedoch zumindest zukünftige Leistungsprobleme, falls sich der Code ändert. Durch die Verwendung der integrierten Spark SQL-Funktionen reduzieren wir unseren Testaufwand, da alles auf der Seite von Spark ausgeführt wird. Diese Funktionen wurden von JVM-Experten entwickelt, sodass UDFs wahrscheinlich keine bessere Leistung erzielen.

Der folgende Code kann beispielsweise durch die integrierte Coalesce-Funktion ersetzt werden:

def currency = udf(
(currencySub: String, currencyParent: String) ⇒
Option(currencyParent) match {
case Some(curr) ⇒ curr
case _ ⇒ currencySub
}
)

Wenn kein integrierter Ersatz vorhanden ist, ist es weiterhin möglich, die Expression-Klasse von Catalyst (Spark SQL Optimizer) zu implementieren und zu erweitern. Es wird gut mit der Codegenerierung spielen. Für weitere Details hat Chris Fregly hier darüber gesprochen (siehe Folie 56). Auf diese Weise greifen wir direkt auf das Format zu, es löst das Serialisierungsproblem und verbessert die Leistung.

Vermeiden Sie benutzerdefinierte Aggregatfunktionen (UDAFs)

Ein UDAF generiert SortAggregate-Operationen, die deutlich langsamer sind als HashAggregate. Anstatt beispielsweise einen UDAF zu schreiben, der einen Median berechnet, verwenden wir ein integriertes Äquivalent (Quantil 0,5):

df.stat.approxQuantile("value”, Array(0.5), 0)

Die approxQuantile-Funktion verwendet eine Variation des Greenwald-Khanna-Algorithmus. In unserem Fall war es 10-mal schneller als das äquivalente UDAF.

Vermeiden Sie UDFs oder UDAFs, die mehr als eine Sache ausführen

Beim Schreiben von Big Data-Daten gelten offensichtlich die Prinzipien der Software-Handwerkskunst (machen Sie eine Sache und machen Sie es gut). Durch das Aufteilen von UDFs können wir integrierte Funktionen für einen Teil des resultierenden Codes verwenden. Es vereinfacht auch das Testen erheblich.

2- Schauen Sie unter die Haube

Die Analyse des Ausführungsplans von Spark ist eine einfache Möglichkeit, potenzielle Verbesserungen zu erkennen. Dieser Plan besteht aus Stufen, die die physischen Einheiten der Ausführung in Spark sind. Wenn wir unseren Code umgestalten, suchen wir zuerst nach einer abnormalen Anzahl von Stufen. Ein verdächtiger Plan kann einer sein, der 10 Stufen anstelle von 2-3 für eine grundlegende Verknüpfungsoperation zwischen zwei Datenrahmen erfordert.

In Spark und allgemeiner im verteilten Computing ist das Senden von Daten über das Netzwerk (auch bekannt als Shuffle in Spark) die teuerste Aktion. Shuffles sind teuer, da sie Festplatten-E / A, Datenserialisierung und Netzwerk-E / A umfassen.

In Anbetracht dessen ist die Reduzierung der Anzahl der Stufen eine offensichtliche Möglichkeit, einen Job zu optimieren. Wir verwenden die .explain(true) Befehl zum Anzeigen des Ausführungsplans mit allen Schritten (Phasen), die für einen Job erforderlich sind. Hier ein Beispiel:

Simple execution plan example

The Directed Acyclic Graph (DAG) in Spark UI can also be used to visualize the task repartition in each stage.

Ein sehr einfaches DAG—Beispiel – Bildnachweis

Die Optimierung hängt sehr stark von unserem Wissen über die Daten und deren Verarbeitung (inkl. Geschäftslogik). Eine der Grenzen der Spark SQL-Optimierung mit Catalyst besteht darin, dass „mechanische“ Regeln verwendet werden, um den Ausführungsplan zu optimieren (in 2.2.0).

Wie viele andere warteten wir auf eine kostenbasierte Optimierungs-Engine, die über die Broadcast-Join-Auswahl hinausgeht. Es scheint jetzt in 2.3.0 verfügbar zu sein, das müssen wir uns ansehen.

3 – Kennen Sie Ihre Daten und verwalten Sie sie effizient

Wir haben gesehen, wie Sie die Arbeitsleistung verbessern können, indem Sie sich den Ausführungsplan ansehen, aber es gibt auch viele mögliche Verbesserungen auf der Datenseite.

Stark unausgeglichene Datensätze

Um schnell zu überprüfen, ob alles in Ordnung ist, überprüfen wir die Ausführungsdauer jeder Aufgabe und suchen nach der heterogenen Prozesszeit. Wenn eine der Aufgaben wesentlich langsamer ist als die anderen, verlängert sich die Gesamtauftragsdauer und die Ressourcen der schnellsten Ausführenden werden verschwendet.

Es ist ziemlich einfach, die minimale, maximale und mittlere Dauer in der Spark-Benutzeroberfläche zu überprüfen. Hier ein ausgewogenes Beispiel:

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht.