Kafka Consumer

Description

The Kafka Consumer transform pulls streaming data from Kafka. It runs a sub-pipeline that executes according to message batch size or duration, letting you process a continuous stream of records in near-real-time.

This sub-pipeline must start with an Injector transform.

You can define the number of messages to accept for processing, as well as the specific data formats to stream activity data and system metrics.

You can set up this step to collect monitored events, track user consumption of data streams, and monitor alerts.

Kafka records are stored within topics, and consist of a category to which the records are published. Topics are divided into a set of logs known as partitions.

Kafka scales topic consumption by distributing partitions among a consumer group.

A consumer group is a set of consumers sharing a common group identifier.

Since the Kafka Consumer transform continuously ingests streaming data, you may want to use the Abort transform in your parent or sub-pipeline to stop consuming records from Kafka for specific workflows.

For example, you can run the parent pipeline on a timed schedule, or abort the sub-pipeline if sensor data exceeds a preset range.

Options

General

Option Description

Transform name

The name for this transform

Kafka pipeline

Specify the pipeline to execute by entering its path or clicking Browse and selecting the path.

Note: this pipeline must start with an Injector transform.

Setup

Option Description

Bootstrap servers

Comma separated list of bootstrap servers in a Kafka cluster

Topics

Enter the name of each Kafka topic from which you want to consume streaming data (messages).

Consumer group

Enter the name of the group of which you want this consumer to be a member. Each Kafka Consumer transform will start a single thread for consuming. If left empty, it will be defaulted to Apache Hop.

When part of a consumer group, each consumer is assigned a subset of the partitions from topics it has subscribed to, which locks those partitions. Each instance of a Kafka Consumer transform will only run a single consumer thread.

Batch

Use this tab to determine how many messages to consume before processing. You can specify message count and/or a specific amount of time.

While either option will trigger consumption, the first satisfied option will start the pipeline for the batch.
Option Description

Duration (ms)

Specify a time in milliseconds. This value is the amount of time the transform will spend collecting records prior to the execution of the pipeline. If set to a value of ‘0’, then Number of records triggers consumption.

Number of records

Specify a number. After every ‘X’ number of records, the specified pipeline will be executed and these ‘X’ records will be passed to the pipeline. If set to a value of ‘0’ then Duration triggers consumption.

Offset management

Choose when to commit

  • when record read

  • when batch completed

Fields

Option Description

Input name

The input name is received from the Kafka streams. The following are received by default:

  • key: Determines message distribution to partitions. If no key is present, messages are randomly distributed from partitions.

  • message: The individual message contained in a record. Each record consists of a key, a value, and a timestamp.

  • topic: The category to which records are published.

  • partition: An ordered sequence of records that is continuously appended. You cannot have more consumers than the number of partitions.

  • offset: A sequential ID number assigned by Kafka to each record. It uniquely identifies each record within the partition.

  • timestamp: The time the message is received on the server.

Output name

The Output name can be mapped to subscriber and member requirements.

Type

The Type field defines the data format for streaming the record. You must choose the same data type that produced the records. This field applies to the ‘key’ and ‘message’ input names. Options include:

  • String

  • Boolean

  • Number

  • Integer

  • Binary

Result fields

Return fields from a transform in the sub pipeline for further processing in this pipeline.

Options

Use this tab to configure the property formats of the Kafka consumer broker sources. A few of the most common property formats have been included for your convenience. You can enter any desired Kafka property. For further information on these input names, see the Apache Kafka documentation site: https://kafka.apache.org/documentation/.

The options that are included by default are:

NName Value

auto.offset.reset

latest

ssl.key.password

ssl.keystore.location

ssl.keystore.password

ssl.truststore.location

ssl.truststore.password

Error handling

The Kafka consumer supports error handling but this is limited to the case of Batch Size equal to 1 (records from the Kafka queue are processed one by one). In this case, whenever a record coming from the Kafka queue generates an error in the called sub pipeline, that record will be sent along to the error path for further processing. The record that enter the error path is committed in Kafka and next records in the queue can be processed without any problem. Apart the errors fields inserted by error handling, the layout of the record flowing into the error path will be the layout specified in Field tab.

This feature is very useful in any case where we have a record in the Kafka queue that is stopping processing of further records (because it generates an error during processing) and we want to "remove it" from the queue so that we can process the remaining records in the queue. The failing record can be saved somewhere (for example saved to a file or to a DB table) for further investigation.

Avro and Schema registry

Here are some options you need to consume Avro Record values from a Kafka server. The schema of Avro values are not sent to Kafka but to a schema registry. As such you need to have one available. Here are some options you need to set to make this work on a Confluent Cloud Kafka instance. There are various parts of the software stack that need authentication, hence the bit of redundancy. We recommend that you put these options in variables in your environment configuration file.

Option Example

schema.registry.url

https://abcd-12345x.europe-west3.gcp.confluent.cloud

key.deserializer

org.apache.kafka.common.serialization.StringDeserializer

value.deserializer

io.confluent.kafka.serializers.KafkaAvroDeserializer

value.converter.schema.registry.url

https://abcd-12345x.europe-west3.gcp.confluent.cloud

sasl.jaas.config

org.apache.kafka.common.security.plain.PlainLoginModule required username="CLUSTER_API_KEY" password="CLUSTER_API_SECRET";

security.protocol

SASL_SSL

basic.auth.credentials.source

USER_INFO

basic.auth.user.info

CLUSTER_API_KEY:CLUSTER_API_SECRET

schema.registry.basic.auth.user.info

SCHEMA_REGISTRY_API_KEY:SCHEMA_REGISTRY_API_SECRET

sasl.mechanism

PLAIN

client.dns.lookup

use_all_dns_ips

acks

ALL