Apache Arrow Flight Data Stream
Overview
Apache Arrow Flight provides high-performance, low-latency RPC-based streaming using the Arrow columnar format over gRPC.
This implementation allows Hop to exchange data with Python (and other Arrow Flight clients) without writing intermediate files.
Configuration
The following properties are available when creating an Arrow Flight Data Stream:
| Property | Description |
|---|---|
Name | The unique name of this Data Stream. This name is used as the path on the Flight server ( |
Description | Optional description of the stream. |
Static Schema | The expected Arrow schema for this stream. When data is sent to the Hop Flight server, the incoming schema is checked against this static schema. If they do not match exactly, an error is thrown. |
Batch Size | Hint for the number of rows per batch (default: 10,000). |
Important Behavior
-
The stream name is simply the name of the Data Stream metadata element.
-
Schema validation is strict: the client must send data using exactly the same schema defined in the Static Schema field.
-
The Flight server must be started separately using the
hop arrow flight-servercommand.
Python
Generate an Arrow stream file
import pyarrow as pa
import pyarrow.ipc as ipc
import pandas as pd
from datetime import datetime
schema = pa.schema([
("id", pa.int64()),
("name", pa.string()),
("value", pa.float64()),
("timestamp", pa.timestamp("ms")),
("active", pa.bool_())
])
# Generate 10 batches of 500 rows (5000 rows)
#
number_of_batches=10
batch_size=500
with open("from_python.arrow", "wb") as f:
#
# Write data in an Arrow File Stream format.
# This can be read back by Apache Hop using the Arrow File Stream data stream plugin type.
#
with ipc.new_stream(f, schema) as writer:
for i in range(number_of_batches):
batch = pa.RecordBatch.from_pydict({
"id": list(range(i*batch_size+1, i*batch_size + batch_size+1)),
"name": [f"row_{j}" for j in range(i*batch_size, i*batch_size + batch_size)],
"value": [j * 1.23 for j in range(i*batch_size, i*batch_size + batch_size)],
"timestamp": [datetime.now() for _ in range(batch_size)],
"active": [j % 2 == 0 for j in range(i*batch_size, i*batch_size + batch_size)]
}, schema=schema)
writer.write_batch(batch)
print("✅ Wrote from_python.arrow in Arrow IPC Streaming Format") This file can then be read using the Data Stream Input transform and a Apache Arrow file stream data stream type.
Read a stream file
The following example reads and prints all rows from the specified stream file:
import pyarrow as pa
import pyarrow.ipc as ipc
file_path = "/path/to/stream.arrow"
# Best practice: use memory mapping for performance
with pa.memory_map(file_path, 'r') as source:
reader = ipc.open_stream(source)
schema = reader.schema
print("Schema:", schema)
# Read everything into one Table (simple & efficient for most cases)
table = pa.Table.from_batches(reader) # or list(reader) then from_batches
print(f"Total rows: {len(table)}")
# Convert to pandas (or Polars) if needed
df = table.to_pandas()
df = df.reset_index()
for index, row in df.iterrows():
print(row['id'], row['str'], row['num'], row['int'], row['uuid'], row['sysdate'])
#print(df.to_csv()) You can generate the .arrow stream file with the Data Stream Output transform and the Apache Arrow file stream data stream type.