Apache Beam Spark Pipeline Engine
Beam Spark
The Apache Spark Runner can be used to execute Beam pipelines using Apache Spark version 3.1.
The Spark Runner executes Beam pipelines on top of Apache Spark, providing:
-
Batch and streaming (and combined) pipelines.
-
The same fault-tolerance guarantees as provided by RDDs and DStreams.
-
The same security features Spark provides.
-
Built-in metrics reporting using Spark’s metrics system, which reports Beam Aggregators as well.
-
Native support for Beam side-inputs via spark’s Broadcast variables
Check the Apache Beam Spark runner docs for more information.
Options
Option | Description | Default |
---|---|---|
The Spark master | The url of the Spark Master. This is the equivalent of setting SparkConf#setMaster(String) and can either be local[x] to run local with x cores, spark://host:port to connect to a Spark Standalone cluster, mesos://host:port to connect to a Mesos cluster, or yarn to connect to a yarn cluster. | local[4] |
Streaming: batch interval (ms) | The StreamingContext’s batchDuration - setting Spark’s batch interval. | 1000 |
Streaming: checkpoint directory | A checkpoint directory for streaming resilience, ignored in batch. For durability, a reliable filesystem such as HDFS/S3/GS is necessary. | local dir in /tmp |
Streaming: checkpoint duration (ms) | ||
Enable Metrics sink | A servlet within the existing Spark UI to serve metrics data as JSON data. | |
Streaming: maximum records per batch | The maximum records per batch interval. | |
Streaming: minimum read time (ms) | Mimimum elapsed read time. | |
Bundle size | The maximum number of elements in a bundle. | |
User agent | A user agent string as per RFC2616, describing the pipeline to external services. | |
Temp location | Path for temporary files. | |
Plugins to stage (, delimited) | Comma separated list of plugins. | |
Transform plugin classes | List of transform plugin classes. | |
XP plugin classes | List of extensions point plugins. | |
Streaming Hop transforms flush interval (ms) | The amount of time after which the internal buffer is sent completely over the network and emptied. | |
Hop streaming transforms buffer size | The internal buffer size to use. | |
Fat jar file location | Fat jar location. |
Running remotely
Since execution of a pipeline on Spark is only possible from the Spark Master it is possible to start a Hop server on the master. Then you can remotely execute from anywhere on your Spark master of choice. Make sure that any referencable artifacts like the fat-jar you want to use is available to the Hop server.
Running with Spark Submit
You can also execute using the 'spark-submit' tool. There is a main class you can use:
org.apache.hop.beam.run.MainBeam
It accepts 3 arguments:
Argument | Description |
---|---|
1 | The filename of the pipeline to execute. |
2 | The filename of the metadata to load (JSON). You can export metadata in the Hop GUI under the tools menu (part of the Beam plugin in |
3 | The name of the pipeline run configuration to use |
Spark-submit also needs a fat jar. This can be generated in the Hop GUI under the tools menu or using command:
sh hop-config.sh -fj /path/to/fat.jar
Important : project configurations, environments and these things are not valid in the context of the Spark runtime. This is a TODO for the Hop community to think how we can do this best. Your input is welcome. In the meantime pass variables to the JVM with the option:
--driver-java-options '-DPROJECT_HOME=/path/to/project-home'
In general, it is better not to use relative paths like ${Internal.Entry.Current.Folder}
when specifying filenames when executing pipelines remotely. It’s usually better to pick a few root folders as variables. PROJECT_HOME is as good as any variable to use.
An example spark-submit command might look like this:
spark-submit \
--master spark://master-host:7077 \
--class org.apache.hop.beam.run.MainBeam \
--driver-java-options '-DPROJECT_HOME=/my/project/home' \
hop-0.70-fat.jar \
/my/project/home/pipeline.hpl \
metadata-export.json \
SparkRunConfig
Spark embedded
You can specify a master of local[4]
to run using an embedded Spark engine. It’s primarily used for testing locally. The number 4 in the example is the desired number of threads to use when executing. You can also specify *
to automatically figure that out for your system.
Please note that you can get an error like the following:
Cannot assign requested address: Service 'sparkDriver' failed after 16 retries
In this case you can set system environment variable SPARK_LOCAL_IP
to 127.0.0.1
.
export SPARK_LOCAL_IP="127.0.0.1"
Log4j
If you get an error about a missing log4j library please note that we don’t ship this Spark dependency because of security concerns and because it is quite old (from 2012).
Example error:
java.lang.ClassNotFoundException: org.apache.log4j.spi.Filter
When running on a Spark cluster, the library is included there. When running embedded we advise you to add the library manually in the plugins/engines/beam/lib
folder.
Scala
If you get an error about a missing Scala library, you also need to include scala version 2.12 libraries. You can copy these from a Spark installation ${SPARK_HOME}/jars/scala-*-2.12.10.jar
into plugins/engines/beam/lib
.
Alternatively, you can use a Direct runner to test your Beam pipeline in embedded mode.