Skip to main content

PublishKafkaRecord_1_0

描述:

通过Apache Kafka 1.0版本的生成API来将输入数据流内容作为单独的记录消息发送。 数据流的内容可通过记录读取器获得, 与获取消息的Kafka组件ConsumeKafkaRecord_1_0对应

标签:

Apache, Kafka, Record, csv, json, avro, logs, Put, Send, Message, PubSub, 1.0, 记录, 发送

参数:

如下列表中,必填参数则标识为加粗. 其他未加粗参数,则表示为可选参数。表中同时提到参数默认值设置, 并且 参数还支持 表达式语言.

名字默认值允许值列表描述
Kafka Brokerslocalhost:9092由英文逗号分隔的Kafka Brokers列表,其基本格式为 <host>:<port>

支持表达式语言 (仅支持变量)
主题名要发布的Kafka主题名

支持表达式语言 (支持流属性和变量)
记录读取服务控制器服务API:
RecordReaderFactory
实现:
ParquetReader
GrokReader
SyslogReader
Syslog5424Reader
CSVReader
AvroReader
JsonPathReader
JsonTreeReader
ScriptedReader
XMLReader
用于解析输入数据流的记录读取服务
记录输出服务控制器服务API:
RecordSetWriterFactory
实现:
CSVRecordSetWriter
FreeFormTextRecordSetWriter
AvroRecordSetWriter
JsonRecordSetWriter
XMLRecordSetWriter
ParquetRecordSetWriter
ScriptedRecordSetWriter
在发送到Kafka之前用于序列化数据记录的服务
启用事务启用
- 启用
- 不启用
指定是否应该提供事务机制来与Kafka通信, 如果当发送数据有问题,且该参数设置为不启用事务,则已经发送的消息将继续并被消费者消费。如果设置启用事务,则Kafka的事务机制将回滚,并保证消息不能被消费,启用事务需要。当启用事务要求<消息保证策略>设置为"重复保证"。
消息保证策略高性能
- 高性能数据流将在成功写入到Kafka节点后直接输出到成功连线,而无需等待响应。该策略提供了最好的性能,但可能导致的结果就是数据丢失。
- 单节点保证当消息被单个Kafka节点收到后,数据将输出到成功连线。该策略将比&lt;重复保证&gt;高效,但当Kafka节点崩溃将导致数据丢失。
- 重复保证除非根据主题配置将消息复制到适一定数量的Kafka节点,否则数据流将输出到失败连线
指定发送到Kafka的消息保证策略。 对应于Kafka的'acks'属性设置.
消息头属性正则匹配匹配所有数据流属性的正则表达式。任何属性名匹配的,将作为Kafka的消息头,如果不设置,将没有数据流的属性被添加到消息头中发送
消息头编码UTF-8任何匹配的消息头将作为输出流的属性输出。该参数决定用于反序列化消息头的字符编码
安全协议PLAINTEXT
- PLAINTEXTPLAINTEXT
- SSLSSL
- SASL_PLAINTEXTSASL_PLAINTEXT
- SASL_SSLSASL_SSL
用于Broker通信的协议, 对应于Kafka的 'security.protocol'属性设置
Kerberos证书服务控制器服务API:
KerberosCredentialsService
实现:
KeytabCredentialsService
指定用于Kerberos授权的证书服务提供
Kerberos服务名用于Kafka运行的Kerberos Principal名,该参数既可在Kafka的JAAS配置也可以在Kafka的配置中。对应于 Kafka的 'security.protocol'属性设置, 当<安全协议>的SASL选项被选择,否则将被忽略。

支持表达式语言 (仅支持变量)
Kerberos Principal用于连接Brokers的Kerberos principal。如果不设置,则需要通过bootstrap.conf文件中的JVM属性设置方式设置JAAS配置文件。该值将作为Kafka的'sasl.jaas.config'属性设置。

支持表达式语言 (仅支持变量)
Kerberos Keytab用于连接Brokers的Kerberos keytab,如果不设置,则需要通过bootstrap.conf文件中的JVM属性设置方式设置JAAS配置文件。该值将作为Kafka的'sasl.jaas.config'属性设置。

支持表达式语言 (仅支持变量)
SSL服务控制器服务API:
SSLContextService
实现:
StandardRestrictedSSLContextService
StandardSSLContextService
指定用于通信Kafka的SSL服务
消息键字段被用作Kafka消息的输入流中记录的字段名

支持表达式语言 (支持流属性和变量)
最大请求字节数1 MB消息的最大请求的字节数,对应于Kafka的'max.request.size'属性设置,且默认值为1 MB (1048576).
应答等待时间5 secs指定发送消息后等待响应的最大时间,如果Kafka在该时间内没有应答消息,则数据流将输出到失败连线
元数据等待时间5 sec发布者将等待获取元数据或在发送被调用时等待缓冲区写入的时间。 对应于Kafka的'max.block.ms'属性设置

支持表达式语言 (仅支持变量)
分区算法类默认(随机分配)
- 循环分配消息将按循环的方式分配分区,比如第一个消息分配到第一分区,第二个到第二分区,以此类推,然后再从第一个开始.
- 默认(随机分配)将分配随机分区
指定用于计算消息分区号的算法类。 对应于Kafka的'partitioner.class'属性设置
压缩类型不压缩
- 不压缩
- gzip
- snappy
- lz4
用于指定所有生产者生成的数据的压缩编码

动态参数:

名字描述
The name of a Kafka configuration property.The value of a given Kafka configuration property.在加载提供的配置之后,这些属性将被添加到Kafka的配置中,如果动态属性是已设置的属性,则其值将被忽略并记录警告消息。有关可用Kafka属性的列表,请参阅: http://kafka.apache.org/documentation.html#configuration.
不支持表达式语言

连线:

名字描述
success所有成功发送到Kafka的内容将输出到此成功连线
failure任何不能发送到Kafka的数据流将输出到此失败连线

读取属性:

未提供。

写入属性:

名字描述
msg.count发送到Kafka的消息数. 仅成功数据流上具有该属性

状态管理:

该组件不保存状态。

限制:

该组件没有限制

输入流要求:

组件必须提供输入流。

系统资源考量:

未提供。

参考:

ConsumeKafka_1_0, ConsumeKafkaRecord_1_0, PublishKafka_1_0