Yann Moisan
Yann Moisan

Follow

May 29, 2018 · 9 min read

Spark is the core component of Teads’s Machine Learning stack. We use it for many ML applications, from ad performance predictions to user Look-alike Modeling. Używamy również Spark do przetwarzania intensywnych zadań, takich jak rozszerzenie segmentu między urządzeniami lub przekształcenie parkietu na SSTables w celu załadowania danych do Cassandra.

współpracując ze Spark regularnie docieramy do granic zasobów naszych klastrów w zakresie pamięci, dysku czy procesora. Skala-out tylko odpycha problem, więc musimy pobrudzić sobie ręce.

oto zbiór najlepszych praktyk i wskazówek optymalizacyjnych Dla Spark 2.2.0, aby uzyskać lepszą wydajność i czystszy Kod Spark, obejmujący:

  • jak wykorzystać Tungsten,
  • Analiza planu wykonania,
  • zarządzanie danymi (buforowanie, nadawanie),
  • optymalizacje związane z chmurą (w tym S3).

aktualizacja 07/12/2018, Zobacz także drugą część dotyczącą rozwiązywania problemów i zarządzania zewnętrznym źródłem danych.

to zdrowy rozsądek, ale najlepszym sposobem na poprawę wydajności kodu jest wykorzystanie mocnych stron Spark. Jednym z nich jest Wolfram.

Standard od wersji 1.5, Tungsten jest komponentem SQL Spark, który zapewnia zwiększoną wydajność poprzez przepisywanie operacji Spark w bytecode w czasie wykonywania. Tungsten tłumi wirtualne funkcje i wykorzystuje blisko gołej wydajności metalu, koncentrując się na pracy CPU i wydajności pamięci.

aby jak najlepiej wykorzystać Tungsten zwracamy uwagę na następujące kwestie:

użyj struktur Dataset zamiast ramek danych

aby upewnić się, że nasz kod skorzysta jak najwięcej z optymalizacji Tungsten używamy domyślnego API Dataset ze Scala (zamiast RDD).

Dataset przynosi najlepsze z obu światów z mieszanką relacyjnych (DataFrame) i funkcjonalnych (RDD) transformacji. Ten interfejs API jest najbardziej aktualny i dodaje bezpieczeństwo typu wraz z lepszą obsługą błędów i znacznie bardziej czytelnymi testami jednostkowymi.

jednak jest to kompromis, ponieważ funkcje map i filtrów działają gorzej z tym API. Bezramowe jest obiecującym rozwiązaniem rozwiązania tego ograniczenia.

unikaj funkcji zdefiniowanych przez użytkownika (UDFs) w jak największym stopniu

użycie UDF implikuje deserializację w celu przetworzenia danych w klasycznej Scali, a następnie ich reserializacji. UDFs można zastąpić funkcjami Spark SQL, jest ich już wiele, a nowe są regularnie dodawane.

unikanie UDFs może nie generować natychmiastowych ulepszeń, ale przynajmniej zapobiegnie przyszłym problemom z wydajnością, w przypadku zmiany kodu. Ponadto, korzystając z wbudowanych funkcji Spark SQL, zmniejszamy nasz wysiłek testowy, ponieważ wszystko odbywa się po stronie Spark. Funkcje te zostały zaprojektowane przez ekspertów JVM, więc UDFs prawdopodobnie nie osiągnie lepszej wydajności.

na przykład poniższy kod można zastąpić wbudowaną funkcją coalesce:

def currency = udf(
(currencySub: String, currencyParent: String) ⇒
Option(currencyParent) match {
case Some(curr) ⇒ curr
case _ ⇒ currencySub
}
)

gdy nie ma wbudowanego zamiennika, nadal można zaimplementować i rozszerzyć klasę wyrażenia Catalyst (Spark ’ s SQL optimizer). Będzie dobrze grać z generowaniem kodu. Po więcej szczegółów Chris Fregly opowiedział o tym tutaj (patrz slajd 56). W ten sposób uzyskujemy bezpośredni dostęp do formatu Tungsten, który rozwiązuje problem serializacji i zwiększa wydajność.

unikaj funkcji agregacji zdefiniowanych przez użytkownika (UDAFs)

UDAF generuje operacje SortAggregate, które są znacznie wolniejsze niż HashAggregate. Na przykład, to, co robimy zamiast zapisu UDAF, który oblicza medianę, używa wbudowanego odpowiednika (kwantyl 0,5):

df.stat.approxQuantile("value”, Array(0.5), 0)

funkcja approxQuantile używa wariacji algorytmu Greenwalda-Khanna. W naszym przypadku okazało się, że jest 10 razy szybszy niż odpowiednik UDAF.

unikaj UDFs lub Udaf, które wykonują więcej niż jedną rzecz

Zasady kunsztu oprogramowania mają oczywiście zastosowanie podczas pisania dużych zbiorów danych (zrób jedną rzecz i zrób to dobrze). Dzieląc UDFs jesteśmy w stanie użyć wbudowanych funkcji dla jednej części wynikowego kodu. Znacznie upraszcza również testowanie.

2 – zajrzyj pod maskę

Analiza planu wykonania Spark to łatwy sposób na wykrycie potencjalnych ulepszeń. Plan ten składa się z etapów, które są fizycznymi jednostkami egzekucji w Spark. Kiedy refaktorujemy nasz kod, pierwszą rzeczą, której szukamy, jest nieprawidłowa liczba etapów. Podejrzanym planem może być taki, który wymaga 10 etapów zamiast 2-3 dla podstawowej operacji łączenia między dwoma ramkami danych.

w Spark i bardziej ogólnie w komputerach rozproszonych przesyłanie danych przez sieć (a.k.a. Shuffle w Spark) jest najdroższą czynnością. Tasowania są kosztowne, ponieważ obejmują We/Wy dysku, serializację danych i we / wy sieciowe. są one potrzebne do operacji takich jak Join lub groupBy i zdarzają się między etapami.

biorąc to pod uwagę, zmniejszenie liczby etapów jest oczywistym sposobem na optymalizację pracy. Używamy .polecenie explain (true) pokazujące plan wykonania z wyszczególnieniem wszystkich kroków (etapów) związanych z zadaniem. Oto przykład:

Simple execution plan example

The Directed Acyclic Graph (DAG) in Spark UI can also be used to visualize the task repartition in each stage.

bardzo prosty przykład DAG — kredyty obrazkowe

optymalizacja opiera się zarówno na naszej wiedzy o danych, jak i ich przetwarzaniu (w tym. logika biznesowa). Jednym z ograniczeń Spark SQL optimization z Catalyst Jest to, że używa reguł „mechanik” do optymalizacji planu wykonania (w 2.2.0).

podobnie jak wielu innych, czekaliśmy na silnik optymalizacji opartej na kosztach poza wyborem Przyłącz do transmisji. Teraz wydaje się dostępny w wersji 2.3.0, będziemy musieli się temu przyjrzeć.

3 – Poznaj swoje dane i zarządzaj nimi efektywnie

widzieliśmy, jak poprawić wydajność pracy, patrząc na plan wykonania, ale istnieje również wiele możliwych ulepszeń po stronie danych.

wysoce niezrównoważone zbiory danych

aby szybko sprawdzić, czy wszystko jest w porządku, sprawdzamy czas wykonania każdego zadania i szukamy heterogenicznego czasu procesu. Jeśli jedno z zadań jest znacznie wolniejsze niż inne, wydłuży to ogólny czas trwania zadania i zmarnuje zasoby najszybszych wykonawców.

w interfejsie Spark można łatwo sprawdzić min, max i medianę czasu trwania. Oto zrównoważony przykład:

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany.