Yann Moisan
Yann Moisan

Follow

May 29, 2018 · 9 min read

Spark is the core component of Teads’s Machine Learning stack. We use it for many ML applications, from ad performance predictions to user Look-alike Modeling. Nós também usamos faísca para o processamento de trabalhos intensivos como a extensão de segmento de dispositivo cruzado ou Parquet para a transformação de SSTables para o carregamento de dados em Cassandra.trabalhando com faísca, alcançamos regularmente os limites dos recursos dos nossos clusters em termos de memória, disco ou CPU. Uma escala só empurra a questão para trás, então temos que sujar as mãos.

Aqui está uma coleção de melhores práticas e dicas de otimização para Spark 2.2.0 para alcançar melhor desempenho e código de faísca mais limpo, cobrindo:

  • como alavancar tungstênio,
  • Análise do plano de execução,
  • Gestão de dados (cache, radiodifusão),
  • otimizações relacionadas com a nuvem (incluindo S3).

Update 07/12/2018, see also the second part covering troubleshooting tricks and external data source management.

é senso comum, mas a melhor maneira de melhorar o desempenho do código é abraçar os pontos fortes da Spark. Um deles é tungsténio.

Standard since version 1.5, Tungsten is a Spark SQL component that provides increased performance by rewriting Spark operations in bytecode, at runtime. Tungstênio suprime funções virtuais e alavancagens perto do desempenho em metal nua, centrando-se em trabalhos CPU e eficiência de memória.

para tirar o máximo partido do tungsténio prestamos atenção ao seguinte:

utilize estruturas de Conjuntos de dados em vez de DataFrames

para se certificar de que o nosso código irá beneficiar tanto quanto possível das optimizações de tungsténio usamos a API padrão Dataset com Scala (em vez de RDD).

Dataset traz o melhor de ambos os mundos com uma mistura de transformações relacionais (DataFrame) e funcionais (RDD). Esta API é a mais atualizada e adiciona segurança de tipo, juntamente com um melhor manuseio de erros e testes de unidade muito mais legíveis.

no entanto, ele vem com um tradeoff como funções de mapa e filtro executar mais pobre com esta API. A Frameless é uma solução promissora para enfrentar esta limitação.

evite funções definidas pelo Usuário (UDFs) tanto quanto possível

usando um UDF implica deserialização para processar os dados em Scala clássica e, em seguida, reserializá-lo. UDFs pode ser substituído por funções Spark SQL, já existem muitos deles e novos são regularmente adicionados.

evitar UDFs pode não gerar melhorias instantâneas, mas pelo menos irá evitar problemas de desempenho futuros, caso o código mude. Além disso, ao usar funções de faísca embutidas SQL reduzimos o nosso esforço de teste como tudo é realizado do lado da Spark. Estas funções são concebidas por peritos da JVM, pelo que as UDFs não são susceptíveis de obter um melhor desempenho.por exemplo, o seguinte código pode ser substituído pela função coalesce incorporada:

def currency = udf(
(currencySub: String, currencyParent: String) ⇒
Option(currencyParent) match {
case Some(curr) ⇒ curr
case _ ⇒ currencySub
}
)

quando não há substituição incorporada, ainda é possível implementar e estender a classe de expressão do catalisador (optimizador SQL da faísca). Ele vai jogar bem com a geração de código. Para mais detalhes, Chris Fregly falou sobre isso aqui (ver slide 56). Ao fazer isso, acessamos diretamente o formato de tungstênio, ele resolve o problema de serialização e desempenho bumps.

evitar funções agregadas definidas pelo Utilizador (UDAFs)

a UDAF gera operações Sortagregadas que são significativamente mais lentas do que a HashAggregate. Por exemplo, o que fazemos em vez de escrever um UDAF que calcula uma mediana é usando um equivalente embutido (quantile 0,5):

df.stat.approxQuantile("value”, Array(0.5), 0)

a função approxQuantile usa uma variação do algoritmo de Greenwald-Khanna. No nosso caso, acabou por ser 10 vezes mais rápido que o UDAF equivalente.

evite UDFs ou UDAFs que executam mais do que uma coisa

princípios de artesanato de Software obviamente se aplicam ao escrever coisas de dados grandes (Faça uma coisa e faça bem). Ao dividir UDFs somos capazes de usar funções embutidas para uma parte do código resultante. Também simplifica muito os testes.

2 – olhar sob o capô

analisar o plano de execução da faísca é uma maneira fácil de detectar potenciais melhorias. Este plano é composto por etapas, que são as unidades físicas de execução em centelha. Quando reformulamos o nosso código, a primeira coisa que procuramos é um número anormal de etapas. Um plano suspeito pode ser um que requer 10 estágios em vez de 2-3 para uma operação básica de junção entre dois DataFrames.

em Spark e mais geralmente em computação distribuída, enviar dados através da rede (T. c. p.Shuffle in Spark) é a ação mais cara. Shuffles são caros uma vez que envolvem disco I/O, serialização de dados e rede I/O. Eles são necessários para operações como Join ou groupBy e acontecem entre as fases.considerando isso, reduzir o número de etapas é uma forma óbvia de otimizar um trabalho. Nós usamos o .explique (verdadeiro) comando para mostrar o plano de execução detalhando todas as etapas (etapas) envolvidas para um trabalho. Aqui está um exemplo:

Simple execution plan example

The Directed Acyclic Graph (DAG) in Spark UI can also be used to visualize the task repartition in each stage.

muito simples DAG exemplo — créditos de Imagem

A otimização se baseia muito em ambos os nosso conhecimento dos dados e seu processamento (incl. negocio). Um dos limites da otimização SQL Spark com catalisador é que ele usa regras “mecânicas” para otimizar o plano de execução (em 2.2.0).como muitos outros, estávamos à espera de um motor de otimização baseado em custos além da seleção de junção de transmissão. Agora Parece Disponível em 2.3.0, vamos ter que olhar para isso.

3 – conhecer os seus dados e geri-los eficientemente

vimos como melhorar o desempenho do trabalho, olhando para o plano de execução, mas também há uma abundância de possíveis melhorias no lado dos dados.

Conjuntos de dados altamente desequilibrados

para verificar rapidamente se tudo está bem, vamos rever a duração de execução de cada tarefa e procurar tempo de processo heterogêneo. Se uma das tarefas for significativamente mais lenta do que as outras, estenderá a duração global do trabalho e desperdiçará os recursos dos executores mais rápidos.

é bastante fácil verificar a duração mínima, máxima e mediana na ignição. Aqui está um exemplo equilibrado:

Deixe uma resposta

O seu endereço de email não será publicado.