Kafka Producer transform Icon Kafka Producer

Description

The Kafka Producer transform allows you to publish messages in near-real-time across worker nodes where multiple, subscribed members have access.

A Kafka Producer transform publishes a stream of records to one Kafka topic.

Supported Engines

Hop Engine

Supported

Spark

Maybe Supported

Flink

Maybe Supported

Dataflow

Maybe Supported

Options

Option Description

Transform name

the name for this transform

Bootstrap servers

comma separated list of bootstrap servers in a Kafka cluster

Client ID

The unique Client identifier, used to identify and set up a durable connection path to the server to make requests and to distinguish between different clients.

Topic

The category to which records are published.

Key Field

In Kafka, all messages can be keyed, allowing for messages to be distributed to partitions based on their keys in a default routing scheme. If no key is present, messages are randomly distributed to partitions.

Message Field

The individual record contained in a topic.

Options

Use this tab to configure the property formats of the Kafka consumer broker sources. A few of the most common property formats have been included for your convenience. You can enter any desired Kafka property. For further information on these input names, see the Apache Kafka documentation site: https://kafka.apache.org/documentation/.

The options that are included by default are:

Option Value

auto.offset.reset

latest

ssl.key.password

ssl.keystore.location

ssl.keystore.password

ssl.truststore.location

ssl.truststore.password

Avro and Schema registry

Here are some options you need to send Avro Record values to a Kafka server. The schema of Avro values are not sent to Kafka but to a schema registry. As such you need to have one available. Here are some options you need to set to make this work on a Confluent Cloud Kafka instance. There are various parts of the software stack that need authentication, hence the bit of redundancy. We recommend that you put these options in variables in your environment configuration file.

Option Example

schema.registry.url

https://abcd-12345x.europe-west3.gcp.confluent.cloud

value.converter.schema.registry.url

https://abcd-12345x.europe-west3.gcp.confluent.cloud

auto.register.schemas

true

security.protocol

SASL_SSL

sasl.jaas.config

org.apache.kafka.common.security.plain.PlainLoginModule required username="CLUSTER_API_KEY" password="CLUSTER_API_SECRET";

username

CLUSTER_API_KEY

password

CLUSTER_API_SECRET

sasl.mechanism

PLAIN

client.dns.lookup

use_all_dns_ips

acks

ALL

basic.auth.credentials.source

USER_INFO

basic.auth.user.info

CLUSTER_API_KEY:CLUSTER_API_SECRET

schema.registry.basic.auth.user.info

SCHEMA_REGISTRY_API_KEY:SCHEMA_REGISTRY_API_SECRET