Quickstart: Apache Spark no Kubernetes

Utilizando Apache Spark Operator no Kubernetes

Introduction

Apache Spark Operator para Kubernetes

Desde o seu lançamento em 2014 pela Google, o Kubernetes tem ganhado muita popularidade junto com o próprio Docker e desde 2016 passou a ser o de facto Container orchestrator, estabelecido como um padrão de mercado. Possuindo versões gerenciadas em todas as major Clouds [1] [2] [3] (inclusive Digital Ocean e Alibaba).

Toda essa popularidade tem atraído novas implementações e use-cases para o orquestrador, dentre eles a execução de Stateful applications incluindo bancos de dados em containers.

Qual seria a necessidade de ter um banco de dados orquestrado? Ótima pergunta. Por hoje, vamos focar na utilização do Spark Operator para executar Spark jobs em Kubernetes.

A idéia do Spark Operator surgiu em 2016, antes disso haviam apenas alguns jeitinhos, por exemplo: com Apache Zeppelin dentro do Kubernetes, ou então, mais refinado ainda criando o seu próprio Apache Spark cluster dentro do Kubernetes (exemplo do repositório oficial do Kubernetes) que usaria o Spark Standalone mode.

Porém, executar nativamente seria muito mais interessante pois poderia aproveitar o Kubernetes Scheduler para ações relacionadas à alocação dos recursos no cluster, dando mais elasticidade e uma interface mais simples para gerenciar os workloads no Apache Spark.

Considernando esses pontos o desenvolvimento do Apache Spark Operator ganhou atenção, foi mergeado e publicado na versão do Apache Spark 2.3.0 em Fevereiro de 2018.

Se você estiver interessado em ler mais sobre a proposta do Apache Spark Operator, existe um design document publicado no Google Docs.

Por que Kubernetes?

Como atualmente as empresas estão buscando se reinventar por meio da tão falada transformação digital para que possam ter competitividade e, principalmente, sobreviver diante de um mercado cada vez mais dinâmico, é comum ver abordagens que incluam Big Data, Inteligência Artificial e Cloud Computing [1] [2] [3].

Para compreender os benefícios de utilizar Cloud ao invés de On-premises no contexto de Big Data vale a pena ler o artigo da Databricks, que é a empresa fundada pelos criadores do Apache Spark.

Como nós vemos uma adoção de Cloud Computing generalizada (até por empresas que teriam condições de bancar o próprio hardware), também podemos notar que na maiorira dessas implementações de Cloud não existem clusters de Apache Hadoop já que os times de Dados (BI/Data Science/Analytics) optam cada vez mais por utilizar ferramentas como Google BigQuery ou AWS Redshift. Portanto, não faz sentido subir um Hadoop apenas para utilizar o YARN como gerenciador os recursos.

Uma alternativa é a utilização de provisionadores de clusters Hadoop como o Google DataProc ou o AWS EMR para a criação de clusters efêmeros. Apenas para nomear algumas opções.

Para entender melhor o design do Spark Operator, recomendo a leitura da documentação gerada pela equipe da GCP no GitHub.

Hora de meter a mão na massa!

Aquecendo o motor

Agora que toda a palavra já foi passada, vamos ao hands-on para mostrar a coisa acontecendo. Para isso, vamos usar:

Assim que o Apache estiver descompactado, vamos adicionar o caminho no PATH para facilitar a execução:

export PATH=${PATH}:/path/to/apache-spark-X.Y.Z/bin

Criando o “cluster” com Minikube

Agora, para ter um Kubernetes vamos iniciar um minikube com o propósito de rodar um dos exemplos disponíveis no repositório do Spark chamado SparkPi apenas para demonstração.

minikube start --cpus=2 \
    --memory=4g

Buildando a imagem Docker

Vamos utilizar o Docker daemon do Minikube para não depender de um registry externo (e só gerar lixo na VM, facilitando a limpeza depois). Para isso, o minikube tem um wrapper que facilita a nossa vida:

eval $(minikube docker-env)

Após ter configurado as variáveis de ambiente para o Docker daemon, vamos precisar de uma imagem Docker para executar os jobs. Existe um shell script no repositório do Spark para ajudar com isso. Considerando que o PATH está configurado corretamente, basta executar:

docker-image-tool.sh -m -t latest build

Obs.: O parâmetro -m aqui indica que é um build para o minikube.

Vamos pegar a via expressa para executar o SparkPi, usando o mesmo comando que seria utilizado para um cluster Spark spark-submit.

Porém, o Spark Operator dá suporte a definição de jobs no “dialeto do Kubernetes” usando CRD, aqui tem alguns exemplos - para depois.

Fire in the hole!

Cuidado com o vão entre a versão do Scala e a plataforma quando estiver parametrizando o job:

spark-submit --master k8s://https://$(minikube ip):8443 \
    --deploy-mode cluster \
    --name spark-pi \
    --class org.apache.spark.examples.SparkPi \
    --conf spark.executor.instances=2 \
    --executor-memory 1024m \
    --conf spark.kubernetes.container.image=spark:latest \
    local:///opt/spark/examples/jars/spark-examples_2.11-X.Y.Z.jar # aqui

O que temos de novidade é:

  • --master: Aceita o prefixo k8s:// na URL, para o endpoint da API do Kubernetes, exposta pelo commando https://$(minikube ip):8443. Aliás, se estiver interessado, isso é um command substitution no shell;
  • --conf spark.kubernetes.container.image=: Configuração para a imagem Docker que será executada no Kubernetes.

Com o output:

...

19/08/22 11:59:09 INFO LoggingPodStatusWatcherImpl: State changed,
new state: pod name: spark-pi-1566485909677-driver namespace: default
labels: spark-app-selector -> spark-20477e803e7648a59e9bcd37394f7f60,
spark-role -> driver pod uid: c789c4d2-27c4-45ce-ba10-539940cccb8d
creation time: 2019-08-22T14:58:30Z service account name: default
volumes: spark-local-dir-1, spark-conf-volume, default-token-tj7jn
node name: minikube start time: 2019-08-22T14:58:30Z container
images: spark:docker phase: Succeeded status:
[ContainerStatus(containerID=docker://e044d944d2ebee2855cd2b993c62025d
6406258ef247648a5902bf6ac09801cc, image=spark:docker,
imageID=docker://sha256:86649110778a10aa5d6997d1e3d556b35454e9657978f3
a87de32c21787ff82f, lastState=ContainerState(running=null,
terminated=null, waiting=null, additionalProperties={}),
name=spark-kubernetes-driver, ready=false, restartCount=0,
state=ContainerState(running=null,
terminated=ContainerStateTerminated(containerID=docker://e044d944d2ebe
e2855cd2b993c62025d6406258ef247648a5902bf6ac09801cc, exitCode=0,
finishedAt=2019-08-22T14:59:08Z, message=null, reason=Completed,
signal=null, startedAt=2019-08-22T14:58:32Z,
additionalProperties={}), waiting=null, additionalProperties={}),
additionalProperties={})]

19/08/22 11:59:09 INFO LoggingPodStatusWatcherImpl: Container final
statuses: Container name: spark-kubernetes-driver Container image:
spark:docker Container state: Terminated Exit code: 0

Para ver o resultado do job (e toda a execução), podemos mandar um kubectl logs passando o nome do pod do driver como parâmetro:

kubectl logs $(kubectl get pods | grep 'spark-pi.*-driver')

Que traz o output (algumas entradas foram omitidas), parecido com:

...
19/08/22 14:59:08 INFO TaskSetManager: Finished task 1.0 in stage 0.0
(TID 1) in 52 ms on 172.17.0.7 (executor 1) (2/2)
19/08/22 14:59:08 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose
tasks have all completed, from pool19/08/22 14:59:08 INFO
DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in
0.957 s
19/08/22 14:59:08 INFO DAGScheduler: Job 0 finished: reduce at
SparkPi.scala:38, took 1.040608 s Pi is roughly 3.138915694578473
19/08/22 14:59:08 INFO SparkUI: Stopped Spark web UI at
http://spark-pi-1566485909677-driver-svc.default.svc:4040
19/08/22 14:59:08 INFO KubernetesClusterSchedulerBackend: Shutting
down all executors
19/08/22 14:59:08 INFO
KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking
each executor to shut down
19/08/22 14:59:08 WARN ExecutorPodsWatchSnapshotSource: Kubernetes
client has been closed (this is expected if the application is
shutting down.)
19/08/22 14:59:08 INFO MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!
19/08/22 14:59:08 INFO MemoryStore: MemoryStore cleared
19/08/22 14:59:08 INFO BlockManager: BlockManager stopped
19/08/22 14:59:08 INFO BlockManagerMaster: BlockManagerMaster stopped
19/08/22 14:59:08 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
OutputCommitCoordinator stopped!
19/08/22 14:59:08 INFO SparkContext: Successfully stopped SparkContext
19/08/22 14:59:08 INFO ShutdownHookManager: Shutdown hook called
19/08/22 14:59:08 INFO ShutdownHookManager: Deleting directory
/tmp/spark-aeadc6ba-36aa-4b7e-8c74-53aa48c3c9b2
19/08/22 14:59:08 INFO ShutdownHookManager: Deleting directory
/var/data/spark-084e8326-c8ce-4042-a2ed-75c1eb80414a/spark-ef8117bf-90
d0-4a0d-9cab-f36a7bb18910

O resultado aparece no stdout:

19/08/22 14:59:08 INFO DAGScheduler: Job 0 finished: reduce at
SparkPi.scala:38, took 1.040608 s Pi is roughly 3.138915694578473

Para finalizar, vamos deletar a VM que o Minikube gera, para limpar o ambiente (a menos que você queira continuar brincando com ele):

minikube delete

Últimas palavras

Espero ter dispertado bastante curiosidade e algumas ideias para ir além no desenvolvimento dos seus workloads de Big Data. Se tiver alguma dúvida ou sugestão, não deixe de postar na seção de comentários.

Matheus Cunha
Matheus Cunha
Engenheiro de Sistemas e Mágico

Apenas um amante de tecnologia empoderando empresas com computação “high-tech” para ajudar na inovação (:

comments powered by Disqus