Yann Moisan
Yann Moisan

Follow

May 29, 2018 · 9 min read

Spark is the core component of Teads’s Machine Learning stack. We use it for many ML applications, from ad performance predictions to user Look-alike Modeling. Utilizziamo Spark anche per l’elaborazione di lavori intensivi come l’estensione del segmento cross-device o la trasformazione di Parquet to SSTables per il caricamento dei dati in Cassandra.

Lavorando con Spark raggiungiamo regolarmente i limiti delle risorse dei nostri cluster in termini di memoria, disco o CPU. Una scala-out spinge solo indietro il problema, quindi dobbiamo sporcarci le mani.

Ecco una raccolta di best practice e suggerimenti di ottimizzazione per Spark 2.2.0 per ottenere prestazioni migliori e un codice Spark più pulito, che copre:

  • Come sfruttare Tungsten,
  • Analisi del piano di esecuzione,
  • Gestione dei dati (caching, broadcasting),
  • Ottimizzazioni legate al cloud (incluso S3).

Aggiornamento 07/12/2018, vedere anche la seconda parte che copre trucchi risoluzione dei problemi e la gestione delle origini dati esterne.

È buon senso, ma il modo migliore per migliorare le prestazioni del codice è quello di abbracciare i punti di forza di Spark. Uno di questi è il tungsteno.

Standard dalla versione 1.5, Tungsten è un componente SQL Spark che fornisce prestazioni migliorate riscrivendo le operazioni Spark in bytecode, in fase di runtime. Tungsten sopprime le funzioni virtuali e sfrutta vicino alle prestazioni bare metal, concentrandosi sui lavori CPU e l’efficienza della memoria.

Per ottenere il massimo da Tungsten prestiamo attenzione a quanto segue:

Usa le strutture del Dataset piuttosto che i dataframe

Per assicurarci che il nostro codice tragga il massimo beneficio dalle ottimizzazioni di Tungsten usiamo l’API Dataset predefinita con Scala (invece di RDD).

Dataset offre il meglio di entrambi i mondi con un mix di trasformazioni relazionali (DataFrame) e funzionali (RDD). Questa API è la più aggiornata e aggiunge sicurezza del tipo insieme a una migliore gestione degli errori e test unitari molto più leggibili.

Tuttavia, viene fornito con un compromesso in quanto le funzioni di mappa e filtro sono più povere con questa API. Frameless è una soluzione promettente per affrontare questa limitazione.

Evita il più possibile le funzioni definite dall’utente (UDF)

L’utilizzo di un UDF implica la deserializzazione per elaborare i dati in Scala classica e quindi reserializzarli. Le UDF possono essere sostituite dalle funzioni Spark SQL, ce ne sono già molte e ne vengono regolarmente aggiunte di nuove.

Evitare UDFs potrebbe non generare miglioramenti istantanei, ma almeno eviterà problemi di prestazioni future, se il codice dovesse cambiare. Inoltre, utilizzando le funzioni SQL Spark integrate, riduciamo il nostro sforzo di test poiché tutto viene eseguito sul lato di Spark. Queste funzioni sono progettate da esperti JVM in modo UDF non sono suscettibili di ottenere prestazioni migliori.

Ad esempio, il seguente codice può essere sostituito dalla funzione coalesce integrata:

def currency = udf(
(currencySub: String, currencyParent: String) ⇒
Option(currencyParent) match {
case Some(curr) ⇒ curr
case _ ⇒ currencySub
}
)

Quando non c’è una sostituzione integrata, è ancora possibile implementare ed estendere la classe di espressioni Catalyst (SQL optimizer di Spark). Giocherà bene con la generazione del codice. Per maggiori dettagli, Chris Fregly ne ha parlato qui (vedi slide 56). In questo modo accediamo direttamente formato tungsteno, risolve il problema di serializzazione e urti prestazioni.

Evita le funzioni aggregate definite dall’utente (UDAF)

Un UDAF genera operazioni di SortAggregate che sono significativamente più lente di HashAggregate. Ad esempio, quello che facciamo invece di scrivere un UDAF che calcola una mediana sta usando un equivalente incorporato (quantile 0,5):

df.stat.approxQuantile("value”, Array(0.5), 0)

La funzione approxQuantile utilizza una variazione dell’algoritmo di Greenwald-Khanna. Nel nostro caso, ha finito per essere 10 volte più veloce dell’UDAF equivalente.

Evita UDF o UDAF che eseguono più di una cosa

I principi di artigianalità del software si applicano ovviamente quando si scrivono cose di big data (fai una cosa e fallo bene). Dividendo UDF siamo in grado di utilizzare funzioni integrate per una parte del codice risultante. Inoltre semplifica notevolmente i test.

2 – Guarda sotto il cofano

Analizzare il piano di esecuzione di Spark è un modo semplice per individuare potenziali miglioramenti. Questo piano è composto da fasi, che sono le unità fisiche di esecuzione in Spark. Quando rifattorizziamo il nostro codice, la prima cosa che cerchiamo è un numero anormale di stadi. Un piano sospetto può essere uno che richiede 10 fasi invece di 2-3 per un’operazione di join di base tra due dataframe.

In Spark e più in generale nel calcolo distribuito, l’invio di dati sulla rete (aka Shuffle in Spark) è l’azione più costosa. Gli shuffle sono costosi poiché coinvolgono I / O del disco, la serializzazione dei dati e l’I/O di rete.Sono necessari per operazioni come Join o groupBy e avvengono tra le fasi.

Considerando questo, ridurre il numero di fasi è un modo ovvio per ottimizzare un lavoro. Usiamo il .comando explain (true) per mostrare il piano di esecuzione in dettaglio tutti i passaggi (fasi) coinvolti per un lavoro. Ecco un esempio:

Simple execution plan example

The Directed Acyclic Graph (DAG) in Spark UI can also be used to visualize the task repartition in each stage.

molto semplice DAG esempio — Image credits

L’ottimizzazione si basa molto sulla nostra conoscenza dei dati e la loro elaborazione (incl. logica aziendale). Uno dei limiti dell’ottimizzazione SQL Spark con Catalyst è che utilizza regole “meccaniche” per ottimizzare il piano di esecuzione (in 2.2.0).

Come molti altri, stavamo aspettando un motore di ottimizzazione basato sui costi oltre alla selezione di join broadcast. Ora sembra disponibile in 2.3.0, dovremo guardare a questo.

3 – Conosci i tuoi dati e gestiscili in modo efficiente

Abbiamo visto come migliorare le prestazioni del lavoro esaminando il piano di esecuzione, ma ci sono anche molti possibili miglioramenti sul lato dati.

Set di dati altamente squilibrati

Per verificare rapidamente se tutto è ok, esaminiamo la durata di esecuzione di ogni attività e cerchiamo il tempo di processo eterogeneo. Se una delle attività è significativamente più lenta delle altre, estenderà la durata complessiva del lavoro e sprecherà le risorse degli esecutori più veloci.

È abbastanza facile controllare la durata minima, massima e mediana in Spark UI. Ecco un esempio equilibrato:

Lascia un commento

Il tuo indirizzo email non sarà pubblicato.