Run workflows and pipelines from Apache Airflow

Introduction

Apache Airflow is an open-source workflow management platform for data engineering pipelines.

Airflow uses directed acyclic graphs (DAGs) to manage workflow orchestration. Tasks and dependencies are defined in Python and then Airflow manages the scheduling and execution. DAGs can be run either on a defined schedule (e.g. hourly or daily) or based on external event triggers (e.g. a file appearing in an AWS S3 bucket). DAGs can often be written in one Python file.

Apache Hop workflows and pipelines can be used in Airflow through the DockerOperator . Alternatively, the BashOperator to call Hop Run could also be used.

Sample Dag

Running a Hop workflow or pipeline through the Airflow DockerOperator uses Docker to run a workflow or pipeline through a Docker container.

Check the Docker docs for more information on how to run Apache Hop workflows and pipelines with Docker. Check Projects and environments for more information and best practices to set up your project .

In the example below, we’ll run a sample pipeline. The project and environment will be provided as mounted volumes to the container (LOCAL_PATH_TO_PROJECT_FOLDER and LOCAL_PATH_TO_ENV_FOLDER).

Since your Airflow workflows probably will do more than just run a pipeline (e.g. perform a git clone or git pull first), two DummyOperators (start and end) were added to the sample.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.docker_operator import DockerOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from docker.types import Mount
default_args = {
'owner'                 : 'airflow',
'description'           : 'sample-pipeline',
'depend_on_past'        : False,
'start_date'            : datetime(2022, 1, 1),
'email_on_failure'      : False,
'email_on_retry'        : False,
'retries'               : 1,
'retry_delay'           : timedelta(minutes=5)
}

with DAG('sample-pipeline', default_args=default_args, schedule_interval=None, catchup=False, is_paused_upon_creation=False) as dag:
    start_dag = DummyOperator(
        task_id='start_dag'
        )
    end_dag = DummyOperator(
        task_id='end_dag'
        )
    hop = DockerOperator(
        task_id='sample-pipeline',
# use the Apache Hop Docker image. Add your tags here in the default apache/hop:<TAG> syntax
        image='apache/hop',
        api_version='auto',
        auto_remove=True,
        environment= {
            'HOP_RUN_PARAMETERS': 'INPUT_DIR=<YOUR_INPUT_PATH>',
            'HOP_LOG_LEVEL': 'Basic',
            'HOP_FILE_PATH': '${PROJECT_HOME}/etl/sample-pipeline.hpl',
            'HOP_PROJECT_DIRECTORY': '/project',
            'HOP_PROJECT_NAME': 'hop-airflow-sample',
            'HOP_ENVIRONMENT_NAME': 'env-hop-airflow-sample.json',
            'HOP_ENVIRONMENT_CONFIG_FILE_NAME_PATHS': '/project-config/env-hop-airflow-sample.json',
            'HOP_RUN_CONFIG': 'local'
        },
        docker_url="unix://var/run/docker.sock",
        network_mode="bridge",
        mounts=[Mount(source='<LOCAL_PATH_TO_PROJECT_FOLDER>', target='/project', type='bind'), Mount(source='LOCAL_PATH_TO_ENV_FOLDER', target='/project-config', type='bind')],
        force_pull=False
        )
    start_dag >> hop >> end_dag

After you deploy this DAG to your Airflow dags folder (e.g. as hop-airflow-sample.py), it will be picked up by Apache Airflow and is ready to run.

Check the Airflow logs for the sample-pipeline task for the full Hop logs of the pipeline execution.

Apache Airflow - Hop DAG run