Running the Apache Beam samples With Apache Spark

Get Spark

Check the Supported Versions on the Getting Started With Beam page to find the latest supported Spark version for your Hop version.

For example, for Hop 1.2, the latest currently supported version is 3.1.2.

Download your selected Spark version and unzip to a convenient location.

Apache Spark Download

Start your local Spark single node cluster

To keep things as simple as possible, we’ll run a local single node Spark cluster.

First we need to start our local master. This can be done with a single command from the folder where you unzipped Spark:

run <SPARK_FOLDER>/sbin/start-master.sh.

Your output should look similar to the one below:

starting org.apache.spark.deploy.master.Master, logging to <PATH>/spark-3.1.2-bin-hadoop3.2/logs/spark-<USER>-org.apache.spark.deploy.master.Master-1-<HOSTNAME>.out

You should now be able to access the Spark Master’s web ui at http://localhost:8080.

Copy the master’s url from the master’s page header, e.g. spark://localhost.localdomain:7077.

Apache Spark - Master web ui

With the master in place, we can start a worker (formerly called slave). Similar to the master, this is a single command that takes the master’s url that yo

sbin/start-worker.sh spark://localhost.localdomain:7077.

Your output should look similar to the one below:

starting org.apache.spark.deploy.worker.Worker, logging to <PATH>/spark-3.1.2-bin-hadoop3.2/logs/spark-<USER>-org.apache.spark.deploy.worker.Worker-1-<HOSTNAME>.out

Run sample pipeline with Spark Submit

Since Spark doesn’t support remote execution, we’ll be running one of the sample pipelines through Spark Submit.

Use a command like the one below to pass all the information required by spark-submit.

bin/spark-submit \
  --master spark://localhost.localdomain:7077 \
  --class org.apache.hop.beam.run.MainBeam \
  --driver-java-options '-DPROJECT_HOME=<PATH>/hop/config/projects/samples' \
  /opt/spark/hop-fat-jar.jar \
  <PATH>/hop/config/projects/samples/beam/pipelines/input-process-output.hpl \
  /opt/spark/hop-metadata.json \
  Spark

In this case, the fat jar and metadata export files were saved to /opt/spark. The last argument, Spark, is the name of the Spark pipeline run configuration in the samples project. Replace with the necessary arguments for your environment and run.

You should see verbose logging output similar to the output below:

22/02/10 21:19:00 WARN Utils: Your hostname, <HOSTNAME> resolves to a loopback address: 127.0.1.1; using 192.168.86.44 instead (on interface wlp0s20f3)
22/02/10 21:19:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/02/10 21:19:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Argument 1 : Pipeline filename (.hpl)   : <PATH>/hop/config/projects/samples/beam/pipelines/input-process-output.hpl
Argument 2 : Metadata filename (.json)  : /opt/spark/hop-metadata.json
Argument 3 : Pipeline run configuration : Spark
>>>>>> Initializing Hop...
log4j:WARN No appenders could be found for logger (org.apache.commons.vfs2.impl.DefaultFileSystemManager).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
>>>>>> Loading pipeline metadata
>>>>>> Building Apache Beam Pipeline...
>>>>>> Found Beam Input transform plugin class loader
>>>>>> Pipeline executing starting...
2022/02/10 21:19:06 - General - Created Apache Beam pipeline with name 'input-process-output'
2022/02/10 21:19:06 - General - Handled transform (INPUT) : Customers
2022/02/10 21:19:06 - General - Handled generic transform (TRANSFORM) : Only CA, gets data from 1 previous transform(s), targets=0, infos=0
2022/02/10 21:19:07 - General - Handled generic transform (TRANSFORM) : Limit fields, re-order, gets data from 1 previous transform(s), targets=0, infos=0
2022/02/10 21:19:07 - General - Handled transform (OUTPUT) : input-process-output, gets data from Limit fields, re-order
2022/02/10 21:19:07 - General - Executing this pipeline using the Beam Pipeline Engine with run configuration 'Spark'
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
22/02/10 21:19:08 INFO SparkContext: Running Spark version 3.1.2
22/02/10 21:19:08 INFO ResourceUtils: ==============================================================
22/02/10 21:19:08 INFO ResourceUtils: No custom resources configured for spark.driver.
22/02/10 21:19:08 INFO ResourceUtils: ==============================================================
##
##

Lots of output omitted.

##
##
22/02/10 21:19:20 INFO SparkContext: Successfully stopped SparkContext
2022/02/10 21:19:20 - General - Beam pipeline execution has finished.
>>>>>> Execution finished...
22/02/10 21:19:20 INFO ShutdownHookManager: Shutdown hook called
22/02/10 21:19:20 INFO ShutdownHookManager: Deleting directory /tmp/spark-42c62ef7-2bea-4d0b-aac0-6882f55b611a
22/02/10 21:19:20 INFO ShutdownHookManager: Deleting directory /tmp/spark-c75691d8-6a0d-4d07-ac22-31f98a834940

After your pipeline finishes and the spark-submit command ends, your Spark master ui will show a new entry in the 'Finished Applications' list. You can follow up any running applications in the 'Running Applications' and drill down into their execution details while running.

Finished pipeline over Apache Hop and Apache Beam in Apache Spark