Kafka Consumer
Description
This transform pulls streaming data from Kafka. The Kafka Consumer step runs a sub-pipeline that executes according to message batch size or duration, letting you process a continuous stream of records in near-real-time. This sub-pipeline must start with an Injector transform.
You can define the number of messages to accept for processing, as well as the specific data formats to stream activity data and system metrics. You can set up this step to collect monitored events, track user consumption of data streams, and monitor alerts.
Kafka records are stored within topics, and consist of a category to which the records are published. Topics are divided into a set of logs known as partitions. Kafka scales topic consumption by distributing partitions among a consumer group. A consumer group is a set of consumers sharing a common group identifier.
Since the Kafka Consumer transform continuously ingests streaming data, you may want to use the Abort transform in your parent or sub-pipeline to stop consuming records from Kafka for specific workflows. For example, you can run the parent pipeline on a timed schedule, or abort the sub-pipeline if sensor data exceeds a preset range.
Options
General
Option | Description |
---|---|
Transform name | the name for this transform |
Kafka pipeline | Specify your pipeline to execute by entering its path or clicking Browse and selecting the path. Note that this pipeline must start with an Injector transform. |
Setup
Option | Description |
---|---|
Bootstrap servers | comma separated list of bootstrap servers in a Kafka cluster |
Topics | Enter the name of each Kafka topic from which you want to consume streaming data (messages). |
Consumer group | Enter the name of the group of which you want this consumer to be a member. Each Kafka Consumer transform will start a single thread for consuming. When part of a consumer group, each consumer is assigned a subset of the partitions from topics it has subscribed to, which locks those partitions. Each instance of a Kafka Consumer transform will only run a single consumer thread. |
Batch
Use this tab to determine how many messages to consume before processing. You can specify message count and/or a specific amount of time.
While either option will trigger consumption, the first satisfied option will start the pipeline for the batch. |
Option | Description |
---|---|
Duration (ms) | Specify a time in milliseconds. This value is the amount of time the transform will spend collecting records prior to the execution of the pipeline. If set to a value of ‘0’, then Number of records triggers consumption. |
Number of records | Specify a number. After every ‘X’ number of records, the specified pipeline will be executed and these ‘X’ records will be passed to the pipeline. If set to a value of ‘0’ then Duration triggers consumption. |
Offset management | Choose when to commit
|
Fields
Option | Description |
---|---|
Input name | The input name is received from the Kafka streams. The following are received by default:
|
Output name | The Output name can be mapped to subscriber and member requirements. |
Type | The Type field defines the data format for streaming the record. You must choose the same data type that produced the records. This field applies to the ‘key’ and ‘message’ input names. Options include:
|
Result fields
Return fields from a transform in the sub pipeline for further processing in this pipeline.
Options
Use this tab to configure the property formats of the Kafka consumer broker sources. A few of the most common property formats have been included for your convenience. You can enter any desired Kafka property. For further information on these input names, see the Apache Kafka documentation site: https://kafka.apache.org/documentation/.
The options that are included by default are:
NName | Value |
---|---|
auto.offset.reset | latest |
ssl.key.password | |
ssl.keystore.location | |
ssl.keystore.password | |
ssl.truststore.location | |
ssl.truststore.password |