Apache Spark

Eskapade supports the use of Apache Spark for parallel processing of large data volumes. Jobs can run on a single laptop using Spark libraries as well as on a Spark/Hadoop cluster in combination with YARN. This section describes how to setup and configure Spark for use with Eskapade. For examples on running Spark jobs with Eskapade, see the Spark tutorial.

Note

Eskapade supports both batch and real-time streaming (micro batch) processing with Apache Spark.

Requirements

A default working setup of the Apache Spark libraries is included in both the Eskapade docker and vagrant image (see section Installation). For installation of Spark libraries in a custom setup, please refer to the Spark documentation.

NB: not all combinations of Spark and Python versions work properly together.

Spark installation

The environment variables SPARK_HOME and PYTHONPATH need be set and to point to the location of the Spark installation and the Python libraries of Spark and py4j (dependency). In the Eskapade docker, for example, it is set to:

$ echo $SPARK_HOME
/opt/spark/pro/
$ echo $PYTHONPATH
/opt/spark/pro/python:/opt/spark/pro/python/lib/py4j-0.10.4-src.zip:...

Configuration

The Spark configuration can be set using three different methods:

  1. environment variables
  2. an Eskapade macro (preferred)
  3. an Eskapade link

This is demonstrated in the following tutorial macro:

$ run_eskapade tutorials/esk601_spark_configuration.py

The methods are described in the sections below. For a description of configuration settings, see Spark Configuration. In case configuration settings seem not to be picked up correctly, please check Notes at the end of this section.

Environment variables

Configuration for Spark jobs can be set through the PYSPARK_SUBMIT_ARGS environment variable, e.g.:

PYSPARK_SUBMIT_ARGS=--master local[4] --num-executors 1 --executor-cores 4 --executor-memory 4g pyspark-shell

The Spark session is then started directly from a macro with specified settings.

sm = proc_mgr.service(SparkManager)
sm.spark_session
sm.spark_context.setLogLevel('INFO')

Eskapade macro (preferred)

This method allows to specify settings per macro, i.e. per analysis, and is therefore the preferred way for bookkeeping analysis-specific settings.

Configuration for Spark jobs can be set through the SparkConf class that holds a list of key/value pairs with configuration settings, e.g.:

conf = pyspark.conf.SparkConf()
conf.setAppName(settings['analysisName'])
conf.setAll([('spark.driver.host', '127.0.0.1'), ('spark.master', 'local[2]')])

The Spark session is then started directly from a macro with specified settings.

sm = proc_mgr.service(SparkManager)
sm.spark_conf = conf
sm.spark_session
sm.spark_context.setLogLevel('INFO')

Parameters

The most important parameters to play with for optimal performance:

  • num-executors
  • executor-cores
  • executor-memory
  • driver-memory

Dynamic allocation

Since version 2.1, Spark allows for dynamic resouce allocation. This requires the following settings:

  • spark.dynamicAllocation.enabled=true
  • spark.shuffle.service.enabled=true

Depending on the mode (standalone, YARN, Mesos), an additional shuffle service needs to be set up. See the documentation for details.

Logging

The logging level of Spark can be controlled in two ways:

  1. through $SPARK_HOME/conf/log4j.properties
log4j.logger.org.apache.spark.api.python.PythonGatewayServer=INFO
  1. through the SparkContext in Python:
proc_mgr.service(SparkManager).spark_context.setLogLevel('INFO')

PS: the loggers in Python can be controlled through:

import logging
print(logging.Logger.manager.loggerDict) # obtain list of all registered loggers
logging.getLogger('py4j').setLevel('INFO')
logging.getLogger('py4j.java_gateway').setLevel('INFO')

However, not all Spark-related loggers are available here (as they are JAVA-based).

Notes

There are a few pitfalls w.r.t. setting up Spark correctly:

  1. If the environment variable PYSPARK_SUBMIT_ARGS is defined, its settings may override those specified in the macro/link. This can be prevented by unsetting the variable:
$ unset PYSPARK_SUBMIT_ARGS

or in the macro:

import os
del os.environ['PYSPARK_SUBMIT_ARGS']

The former will clear the variable from the shell session, whereas the latter will only clear it in the Python session.

  1. In client mode not all driver options set via SparkConf are picked up at job submission because the JVM has already been started. Those settings should therefore be passed through the SPARK_OPTS environment variable, instead of using SparkConf in an Eskapade macro or link:
SPARK_OPTS=--driver-java-options=-Xms1024M --driver-java-options=-Xmx4096M --driver-java-options=-Dlog4j.logLevel=info --driver-memory 2g
  1. In case a Spark machine is not connected to a network, setting the SPARK_LOCAL_HOSTNAME environment variable or the spark.driver.host key in SparkConf to the value localhost may fix DNS resolution timeouts which prevent Spark from starting jobs.