Eskapade supports the use of Apache Spark for parallel processing of large data volumes. Jobs can run on a single laptop using Spark libraries as well as on a Spark/Hadoop cluster in combination with YARN. This section describes how to setup and configure Spark for use with Eskapade. For examples on running Spark jobs with Eskapade, see the Spark tutorial.
Eskapade supports both batch and streaming processing with Apache Spark.
A working setup of the Apache Spark libraries is included in both the Eskapade docker and vagrant image (see section Installation). For installation of Spark libraries in a custom setup, please refer to the Spark documentation.
The environment variables
PYTHONPATH need be set and to point to the location of the Spark
installation and the Python libraries of Spark and
py4j (dependency). In the Eskapade docker, for example, it is set to:
$ echo $SPARK_HOME /opt/spark/pro/ $ echo $PYTHONPATH /opt/spark/pro/python:/opt/spark/pro/python/lib/py4j-0.10.4-src.zip:...
The Spark configuration can be set in two ways:
- an Eskapade macro (preferred)
- an Eskapade link
This is demonstrated in the following tutorial macro:
$ eskapade_run python/eskapade/tutorials/esk601_spark_configuration.py
Both methods are described below. For a full explanation of Spark configuration settings, see Spark Configuration. In case configuration settings seem not to be picked up correctly, please check Notes at the end of this section.
Eskapade macro (preferred)¶
This method allows to specify settings per macro, i.e. per analysis, and is therefore the preferred way for bookkeeping analysis-specific settings.
The most easy way to start a Spark session is:
from eskapade import process_manager from eskapade.spark_analysis import SparkManager spark = sm.create_session(eskapade_settings=settings) sc = spark.sparkContext
The default Spark configuration file
python/eskapade/config/spark/spark.cfg will be picked up. It contains the following settings:
[spark] spark.app.name=es_spark spark.jars.packages=org.diana-hep:histogrammar-sparksql_2.11:1.0.4 spark.master=local[*] spark.driver.host=localhost
The default Spark settings can be adapted here for all macros at once. In case, alternative settings are only relevant for a single analysis, those settings can also be specified in the macro using the argument variables in the
create_session method of the SparkManager:
from eskapade import process_manager from eskapade.spark_analysis import SparkManager spark = sm.create_session(spark_settings=[('spark.app.name', 'es_spark_alt_config'), ('spark.master', 'local')]) sm = process_manager.service(SparkManager) spark = sm.create_session(eskapade_settings=settings, spark_settings=spark_settings, config_path='/path/to/alternative/spark.cfg', enable_hive_support=False, include_eskapade_modules=False )
Where all arguments are optional:
eskapade_settingsdefault configuration file as specified by the
sparkCfgFilekey in ConfigObject (i.e.
config_pathalternative path to configuration file
spark_settingslist of key-value pairs to specify additional Spark settings
enable_hive_support: switch to disable/enable Spark Hive support
include_eskapade_modules: switch to include/exclude Eskapade modules in Spark job submission (e.g. for user-defined functions)
The most important parameters to play with for optimal performance:
Since version 2.1, Spark allows for dynamic resouce allocation. This requires the following settings:
Depending on the mode (standalone, YARN, Mesos), an additional shuffle service needs to be set up. See the documentation for details.
The logging level of Spark can be controlled in two ways:
- through the
spark = process_manager.service(SparkManager).get_session() spark.sparkContext.setLogLevel('INFO')
PS: the loggers in Python can be controlled through:
import logging print(logging.Logger.manager.loggerDict) # obtain list of all registered loggers logging.getLogger('py4j').setLevel('INFO') logging.getLogger('py4j.java_gateway').setLevel('INFO')
However, not all Spark-related loggers are available here (as they are JAVA-based).
There are a few pitfalls w.r.t. setting up Spark correctly:
1. If the environment variable
PYSPARK_SUBMIT_ARGS is defined, its settings may override those specified
in the macro/link. This can be prevented by unsetting the variable:
$ unset PYSPARK_SUBMIT_ARGS
or in the macro:
import os del os.environ['PYSPARK_SUBMIT_ARGS']
The former will clear the variable from the shell session, whereas the latter will only clear it in the Python session.
2. In client mode not all driver options set via
SparkConf are picked up at job submission because the JVM has
already been started. Those settings should therefore be passed through the
SPARK_OPTS environment variable,
instead of using
SparkConf in an Eskapade macro or link:
SPARK_OPTS=--driver-java-options=-Xms1024M --driver-java-options=-Xmx4096M --driver-java-options=-Dlog4j.logLevel=info --driver-memory 2g
3. In case a Spark machine is not connected to a network, setting the
SPARK_LOCAL_HOSTNAME environment variable or
spark.driver.host key in
SparkConf to the value
localhost may fix DNS resolution timeouts which prevent
Spark from starting jobs.