Apache Arrow Flight Data Stream

Overview

Apache Arrow Flight provides high-performance, low-latency RPC-based streaming using the Arrow columnar format over gRPC.

This implementation allows Hop to exchange data with Python (and other Arrow Flight clients) without writing intermediate files.

Configuration

The following properties are available when creating an Arrow Flight Data Stream:

Property Description

Name

The unique name of this Data Stream. This name is used as the path on the Flight server (FlightDescriptor.for_path(name)).

Description

Optional description of the stream.

Static Schema

The expected Arrow schema for this stream. When data is sent to the Hop Flight server, the incoming schema is checked against this static schema. If they do not match exactly, an error is thrown.

Batch Size

Hint for the number of rows per batch (default: 10,000).

Important Behavior

  • The stream name is simply the name of the Data Stream metadata element.

  • Schema validation is strict: the client must send data using exactly the same schema defined in the Static Schema field.

  • The Flight server must be started separately using the hop arrow flight-server command.

Python

Generate an Arrow stream file

import pyarrow as pa
import pyarrow.ipc as ipc
import pandas as pd
from datetime import datetime

schema = pa.schema([
    ("id", pa.int64()),
    ("name", pa.string()),
    ("value", pa.float64()),
    ("timestamp", pa.timestamp("ms")),
    ("active", pa.bool_())
])

# Generate 10 batches of 500 rows (5000 rows)
#
number_of_batches=10
batch_size=500

with open("from_python.arrow", "wb") as f:
    #
    # Write data in an Arrow File Stream format.
    # This can be read back by Apache Hop using the Arrow File Stream data stream plugin type.
    #
    with ipc.new_stream(f, schema) as writer:
        for i in range(number_of_batches):
            batch = pa.RecordBatch.from_pydict({
                "id": list(range(i*batch_size+1, i*batch_size + batch_size+1)),
                "name": [f"row_{j}" for j in range(i*batch_size, i*batch_size + batch_size)],
                "value": [j * 1.23 for j in range(i*batch_size, i*batch_size + batch_size)],
                "timestamp": [datetime.now() for _ in range(batch_size)],
                "active": [j % 2 == 0 for j in range(i*batch_size, i*batch_size + batch_size)]
            }, schema=schema)
            writer.write_batch(batch)

print("✅ Wrote from_python.arrow in Arrow IPC Streaming Format")

This file can then be read using the Data Stream Input transform and a Apache Arrow file stream data stream type.

Read a stream file

The following example reads and prints all rows from the specified stream file:

import pyarrow as pa
import pyarrow.ipc as ipc

file_path = "/path/to/stream.arrow"

# Best practice: use memory mapping for performance
with pa.memory_map(file_path, 'r') as source:
    reader = ipc.open_stream(source)

    schema = reader.schema
    print("Schema:", schema)

    # Read everything into one Table (simple & efficient for most cases)
    table = pa.Table.from_batches(reader)     # or list(reader) then from_batches
    print(f"Total rows: {len(table)}")

    # Convert to pandas (or Polars) if needed
    df = table.to_pandas()

    df = df.reset_index()

    for index, row in df.iterrows():
        print(row['id'], row['str'], row['num'], row['int'], row['uuid'], row['sysdate'])

    #print(df.to_csv())

You can generate the .arrow stream file with the Data Stream Output transform and the Apache Arrow file stream data stream type.