Tutorial 5: going Spark

This section provides a tutorial on how to use Apache Spark in Eskapade. Spark works 'out of the box' in the Eskapade docker/vagrant image. For details on how to setup a custom Spark setup, see the Spark section in the Appendix.

In this tutorial we will basically redo Tutorial 1 but use Spark instead of Pandas for data processing. The following paragraphs describe step-by-step how to run a Spark job, use existing links and write your own links for Spark queries.

Note

To get familiar with Spark in Eskapade you can follow the exercises in tutorials/tutorial_5.py.

Running the tutorial macro

The very first step to run the tutorial Spark job is:

$ source setup.sh
$ run_eskapade.py tutorials/tutorial_5.py

Eskapade will start a Spark session, do nothing, and quit - there are no chains/links defined yet. The Spark session is created via the SparkManager which, like the DataStore, is a singleton that configures and controls Spark sessions centrally. It is activated through the magic line:

proc_mgr.service(SparkManager).spark_session

Note that when the Spark session is created, the following line appears in logs:

Adding Python modules to ZIP archive /Users/gossie/git/gitlab-nl/decision-engine/eskapade/es_python_modules.zip

This is the SparkManager that ensures all Eskapade source code is uploaded and available to the Spark cluster when running in a distributed environment.

If there was an ImportError: No module named pyspark then, most likely, SPARK_HOME and PYTHONPATH are not set up correctly. For details, see the Spark section in the Appendix.

Reading data

Spark can read data from various sources, e.g. local disk, HDFS, HIVE tables. Eskapade provides the SparkDfReader link that uses the pyspark.sql.DataFrameReader to read flat CSV files into Spark DataFrames, RDD's, and Pandas DataFrames. To read in the Tutorial data, the following link should be added to the Data chain:

reader = spark_analysis.SparkDfReader(name='Read_LA_ozone', store_key='data', read_methods=['csv'])
reader.read_meth_args['csv'] = (DATA_FILE_PATH,)
reader.read_meth_kwargs['csv'] = dict(sep=',', header=True, inferSchema=True)
proc_mgr.get_chain('Data').add_link(reader)

The DataStore holds a pointer to the Spark dataframe in (distributed) memory. This is different from a Pandas dataframe, where the entire dataframe is stored in the DataStore, because a Spark dataframe residing on the cluster may not fit entirely in the memory of the machine running Eskapade. This means that Spark dataframes are never written to disk in DataStore pickles!

Spark examples

Example Eskapade macros using Spark can be found in the tutorials directory, see esk601_spark_configuration.py and further.

Spark Streaming

Eskapade supports the use of Spark Streaming as demonstrated in the word count example tutorials/esk610_spark_streaming_wordcount.py. The data is processed in (near) real-time as micro batches of RDD's, so-called discretized streaming, where the stream originates from either new incoming files or network connection. As with regulard Spark queries, various transformations can be defined and applied in subsequent Eskapade links.

For details on Spark Streaming, see also https://spark.apache.org/docs/latest/streaming-programming-guide.html.

File stream

The word count example using the file stream method can be run by executing in two different terminals:

terminal 1 $ for ((i=0; i<=100; i++)); do echo "Hello world" > /tmp/dummy_$(printf %05d ${i}); sleep 0.1; done
terminal 2 $ run_eskapade -c stream_type='tcp' $ESKAPADE/tutorials/esk610_spark_streaming.py

Where bash for-loop will create a new file containing Hello world in the /tmp directory every 0.1 second. Spark Streaming will pick up and process these files and in terminal 2 a word count of the processed data will by dispayed. Output is stored in $ESKAPADE/results/esk610_spark_streaming/data/v0/dstream/wordcount.

TCP stream

The word count example using the TCP stream method can be run by executing in two different terminals:

terminal 1 $ nc -lk 9999
terminal 2 $ run_eskapade -c stream_type='tcp' $ESKAPADE/tutorials/esk610_spark_streaming.py

Where nc (netcat) will stream data to port 9999 and Spark Streaming will listen to this port and process incoming data. In terminal 1 random words can be type (followed by enter) and in terminal 2 a word count of the processed data will by dispayed. Output is stored in $ESKAPADE/results/esk610_spark_streaming/data/v0/dstream/wordcount.