Lineage observation hub
Apache Hop provides an in-engine lineage observation hub for recording metadata-level observations (pipeline context, file I/O, HTTP traffic, structural hints) and delivering them asynchronously to pluggable lineage sinks. The design keeps third-party integrations (OpenLineage, Datadog, custom exporters) out of the engine: sinks are separate plugins that map Hop’s neutral events to their own formats.
The public API lives under the Java package org.apache.hop.lineage in the hop-engine module.
Goals
-
Central entry point — Producers call
LineageHub.getInstance().emit(…)instead of talking to each backend directly. -
Safe defaults — The hub is off until
HOP_LINEAGE_ENABLEDis set toY. -
Non-blocking pipelines — Events are queued and dispatched on a dedicated worker thread; sinks run in batch with per-sink error isolation.
-
Extensibility — New backends ship as
ILineageSinkimplementations registered via@LineageSinkPlugin.
Package layout
| Package | Role |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
| Extension points that call |
Event model
Each observation is a LineageEvent:
-
Event id — Unique id (UUID by default).
-
Timestamp — Milliseconds since epoch.
-
LineageEventKind— One ofRUN_LIFECYCLE,TRANSFORM_SCHEMA,FILE_IO,HTTP_IO. -
LineageContext— Who/what produced the observation (see above). -
LineagePayload— Optional typed payload; may be null if the kind and context are enough.
Payload types are plain Java types (strings, longs, enums) so sinks can serialize them without dragging engine objects across boundaries:
-
FileIoLineagePayload— Operation (READ,WRITE,COPY,MOVE,DELETE), source/target URIs, byte counts, success, message, and optionalFileIoContentSchema: tabular columns actually read/written (including JSONPath / YAML path / XPath locators) plus optionalstructureRoots(FileIoSchemaNodetree) merged from those paths for nested file layouts. Text File Output and JSON Output attach the file column layout (not the full transform output row when a subset is written). -
HttpLineagePayload—CLIENTvsSERVER, method, URL, status, request/response bytes, duration, success, message. -
RunLifecycleLineagePayload—STARTED/FINISHEDplus optional detail text. -
TransformSchemaLineagePayload—INPUTorOUTPUTplus a list of field names, Hop type names, length, and precision (boundary schema; emitted when a transform finishes). Fields usually come from runtime row metadata on the transform or row sets. If input was never observed (e.g. Abort whengetRow()immediately returns null), input is derived viaPipelineMeta#getPrevTransformFields. If output was never written (e.g. Detect empty stream with a non-empty stream), output is derived viaPipelineMeta#getTransformFields. Graph-derived events set context attributetransformSchemaSourcetoDESIGN_GRAPH.
Convenience factory: LineageEvent.of(kind, context, payload) assigns id and current time.
Hub behavior
LineageHub is a singleton (getInstance()). When lineage is disabled, emit returns immediately.
When enabled:
-
The first
emitstarts a daemon worker thread (Hop-LineageHub-Dispatcher). -
Events are placed on a bounded queue. If the queue is full, the event is dropped and an error is logged (drops are counted).
-
The worker builds batches up to
HOP_LINEAGE_BATCH_MAX, waiting up toHOP_LINEAGE_BATCH_LINGER_MSfor additional events before sending a partial batch. -
Each batch is passed to every loaded sink via
ILineageSink.accept(List<LineageEvent>). If one sink throws, others still receive the same batch; failures are logged.
flush() blocks until queued events (and coalesced flush markers) have been processed. flushQuietly() catches and logs errors.
shutdown() stops the worker and calls shutdown() on sinks. It is invoked from HopEnvironment.shutdown() and HopEnvironment.reset().
After PluginRegistry.init(), HopEnvironment calls LineageHub.getInstance().environmentReady() so sink lists can be rebuilt when the environment is re-initialized.
The queue is allocated lazily on the first emit from the resolved configuration, so HOP_LINEAGE_QUEUE_CAPACITY applies to the singleton as well as to isolated test hubs. The capacity is fixed for the lifetime of the queue; to change it, restart the JVM (or call shutdown() and emit again in tests).
Run lifecycle (extension points)
The org.apache.hop.lineage.xp package registers extension points that emit RUN_LIFECYCLE events (via LineageRunLifecycleEmitter) and flush the hub where needed:
-
PipelineStart—LineageHubPipelineStartXp— phaseSTARTED -
PipelineFinish—LineageHubPipelineFinishXp— phaseFINISHING(transforms stopped, cleanup) -
PipelineCompleted—LineageHubPipelineCompletedXp— phaseFINISHEDorFAILED(from pipeline error count), thenflushQuietly() -
WorkflowStart—LineageHubWorkflowStartXp— phaseSTARTED -
WorkflowFinish—LineageHubWorkflowFinishXp— phaseFINISHEDorFAILED(from workflow/result errors), thenflushQuietly() -
HopServerShutdown—LineageHubServerShutdownFlushXp— flush only -
TransformBeforeStart—LineageHubTransformStartXp— transform phaseSTARTED(local pipeline engine; transform log channel + pipeline correlation attributes) -
TransformFinished—LineageHubTransformFinishXp— transformFINISHEDorFAILEDfrom transform error count, thenTRANSFORM_SCHEMAevents for observed input/output row shapes when non-empty -
WorkflowBeforeActionExecution—LineageHubWorkflowActionBeforeXp— actionSTARTED(workflow log channel + action name; template action log channel id as attribute when present) -
WorkflowAfterActionExecution—LineageHubWorkflowActionAfterXp— actionFINISHEDorFAILEDfrom the actionResult; uses the executed action’s log channel id when available (WorkflowExecutionExtension.actionLogChannelId/actionExecutionResult)
Remote (RemotePipelineEngine) and Beam (BeamPipelineEngine) engines also call PipelineStart and PipelineFinish so they align with the local pipeline engine.
Beam and other engines that do not use the classic Hop thread-per-transform model typically do not fire TransformBeforeStart / TransformFinished the same way as the local Pipeline engine.
Configuration
Variables are defined on org.apache.hop.lineage.LineageVariables and appear in the variable registry (ENGINE scope) like other Hop settings.
| Variable | Default | Meaning |
|---|---|---|
|
| Set to |
|
| Bounded queue capacity. Read on first |
|
| Maximum events per batch per sink. |
|
| How long the worker waits for more events before sending a smaller batch. |
| (empty) | Comma-separated sink plugin ids (case-insensitive). Empty means all discovered sinks are loaded. |
LineageConfiguration.resolve() reads the active IVariables (typically after Variables.initializeFrom so system properties and Hop config apply).
LineageSink plugins
Sinks implement org.apache.hop.lineage.spi.ILineageSink:
-
init(IVariables variables, ILogChannel log)— Optional; called once before the first batch. -
accept(List<LineageEvent> events)— Receives a non-empty batch; should not hold references to the list after returning. -
shutdown()— Optional cleanup.
The implementation class is annotated with @LineageSinkPlugin (id, optional name, description), mirroring other Hop plugin types (see Plugin Development).
The plugin type LineageSinkPluginType is registered in HopEnvironment.getStandardPluginTypes() alongside transforms, actions, execution info locations, etc. Plugins are discovered from the classpath like other native/engine plugins.
No sink implementations ship with the engine. Integrators implement ILineageSink, annotate the class with @LineageSinkPlugin(id = "…"), and package it as a Hop plugin. With no sinks loaded, the hub remains a no-op even when HOP_LINEAGE_ENABLED=Y.
Emitting events from code
Producers (pipeline/workflow hooks, VFS usage, HTTP client/server filters, transforms) should build a LineageContext (often via LineageContext.builder()), optionally attach a payload, then emit:
// IVariables vars = pipeline; // or workflow — must expose PROJECT_HOME when normalizing
LineageEvent event =
LineageEvent.of(
LineageEventKind.FILE_IO,
LineageContext.builder()
.subjectType(LineageSubjectType.ACTION)
.hopFilename(filename)
.hopFilenamePortableKey(
LineagePortableFilename.portableKey(filename, vars))
.build(),
new FileIoLineagePayload(
FileIoOperation.COPY, sourceUri, targetUri, bytes, true, null));
LineageHub.getInstance().emit(event); Keep call sites thin: one object construction plus emit. Heavy work belongs in sinks or async code inside the sink implementation.
Related topics
-
Plugin Development — plugin registry,
BasePluginType, annotations. -
Integration testing — patterns that call
HopEnvironment.init()when tests need a full plugin scan.