Beam Kafka Produce
Description
The Beam Kafka Produce transform publishes records to a Kafka cluster using the Beam execution engine.
Limitations
The main limitation of the Kafka Producer is that it currently only supports writing or producing Strings as keys and String and Avro Record as values.
Options
Option | Description |
---|---|
Transform name | Name of the transform, this name has to be unique in a single pipeline. |
Bootstrap servers | A comma separated list of hosts which are Kafka brokers in a "bootstrap" Kafka cluster. |
The topics | The topics to publish to. |
The field to use as key | The record key. |
The field to use as message | The record message. |
Avro and Schema registry
Here are some options you need to send Avro Record values to a Kafka server. The schema of Avro values are not sent to Kafka but to a schema registry. As such you need to have one available. Here are some options you need to set to make this work on a Confluent Cloud Kafka instance. There are various parts of the software stack that need authentication, hence the bit of redundancy. We recommend that you put these options in variables in your environment configuration file.
Option | Example |
---|---|
schema.registry.url | |
value.converter.schema.registry.url | |
auto.register.schemas | true |
security.protocol | SASL_SSL |
sasl.jaas.config | org.apache.kafka.common.security.plain.PlainLoginModule required username="CLUSTER_API_KEY" password="CLUSTER_API_SECRET"; |
username | CLUSTER_API_KEY |
password | CLUSTER_API_SECRET |
sasl.mechanism | PLAIN |
client.dns.lookup | use_all_dns_ips |
acks | ALL |
basic.auth.credentials.source | USER_INFO |
basic.auth.user.info | CLUSTER_API_KEY:CLUSTER_API_SECRET |
schema.registry.basic.auth.user.info | SCHEMA_REGISTRY_API_KEY:SCHEMA_REGISTRY_API_SECRET |