PublishKafkaRecord_1_0
描述:
通过Apache Kafka 1.0版本的生成API来将输入数据流内容作为单独的记录消息发送。 数据流的内容可通过记录读取器获得, 与获取消息的Kafka组件ConsumeKafkaRecord_1_0对应
标签:
Apache, Kafka, Record, csv, json, avro, logs, Put, Send, Message, PubSub, 1.0, 记录, 发送
参数:
如下列表中,必填参数则标识为加粗. 其他未加粗参数,则表示为可选参数。表中同时提到参数默认值设置, 并且 参数还支持 表达式语言.
名字 | 默认值 | 允许值列表 | 描述 |
---|---|---|---|
Kafka Brokers | localhost:9092 | 由英文逗号分隔的Kafka Brokers列表,其基本格式为 <host>:<port> 支持表达式语言 (仅支持变量) | |
主题名 | 要发布的Kafka主题名 支持表达式语言 (支持流属性和变量) | ||
记录读取服务 | 控制器服务API: RecordReaderFactory 实现: ParquetReader GrokReader SyslogReader Syslog5424Reader CSVReader AvroReader JsonPathReader JsonTreeReader ScriptedReader XMLReader | 用于解析输入数据流的记录读取服务 | |
记录输出服务 | 控制器服务API: RecordSetWriterFactory 实现: CSVRecordSetWriter FreeFormTextRecordSetWriter AvroRecordSetWriter JsonRecordSetWriter XMLRecordSetWriter ParquetRecordSetWriter ScriptedRecordSetWriter | 在发送到Kafka之前用于序列化数据记录的服务 | |
启用事务 | 启用 | - 启用 - 不启用 | 指定是否应该提供事务机制来与Kafka通信, 如果当发送数据有问题,且该参数设置为不启用事务,则已经发送的消息将继续并被消费者消费。如果设置启用事务,则Kafka的事务机制将回滚,并保证消息不能被消费,启用事务需要。当启用事务要求<消息保证策略>设置为"重复保证"。 |
消息保证策略 | 高性能 | - 高性能 - 单节点保证 - 重复保证 | 指定发送到Kafka的消息保证策略。 对应于Kafka的'acks'属性设置. |
消息头属性正则匹配 | 匹配所有数据流属性的正则表达式。任何属性名匹配的,将作为Kafka的消息头,如果不设置,将没有数据流的属性被添加到消息头中发送 | ||
消息头编码 | UTF-8 | 任何匹配的消息头将作为输出流的属性输出。该参数决定用于反序列化消息头的字符编码 | |
安全协议 | PLAINTEXT | - PLAINTEXT - SSL - SASL_PLAINTEXT - SASL_SSL | 用于Broker通信的协议, 对应于Kafka的 'security.protocol'属性设置 |
Kerberos证书服务 | 控制器服务API: KerberosCredentialsService 实现: KeytabCredentialsService | 指定用于Kerberos授权的证书服务提供 | |
Kerberos服务名 | 用于Kafka运行的Kerberos Principal名,该参数既可在Kafka的JAAS配置也可以在Kafka的配置中。对应于 Kafka的 'security.protocol'属性设置, 当<安全协议>的SASL选项被选择,否则将被忽略。 支持表达式语言 (仅支持变量) | ||
Kerberos Principal | 用于连接Brokers的Kerberos principal。如果不设置,则需要通过bootstrap.conf文件中的JVM属性设置方式设置JAAS配置文件。该值将作为Kafka的'sasl.jaas.config'属性设置。 支持表达式语言 (仅支持变量) | ||
Kerberos Keytab | 用于连接Brokers的Kerberos keytab,如果不设置,则需要通过bootstrap.conf文件中的JVM属性设置方式设置JAAS配置文件。该值将作为Kafka的'sasl.jaas.config'属性设置。 支持表达式语言 (仅支持变量) | ||
SSL服务 | 控制器服务API: SSLContextService 实现: StandardRestrictedSSLContextService StandardSSLContextService | 指定用于通信Kafka的SSL服务 | |
消息键字段 | 被用作Kafka消息的输入流中记录的字段名 支持表达式语言 (支持流属性和变量) | ||
最大请求字节数 | 1 MB | 消息的最大请求的字节数,对应于Kafka的'max.request.size'属性设置,且默认值为1 MB (1048576). | |
应答等待时间 | 5 secs | 指定发送消息后等待响应的最大时间,如果Kafka在该时间内没有应答消息,则数据流将输出到失败连线 | |
元数据等待时间 | 5 sec | 发布者将等待获取元数据或在发送被调用时等待缓冲区写入的时间。 对应于Kafka的'max.block.ms'属性设置 支持表达式语言 (仅支持变量) | |
分区算法类 | 默认(随机分配) | - 循环分配 - 默认(随机分配) | 指定用于计算消息分区号的算法类。 对应于Kafka的'partitioner.class'属性设置 |
压缩类型 | 不压缩 | - 不压缩 - gzip - snappy - lz4 | 用于指定所有生产者生成的数据的压缩编码 |
动态参数:
名字 | 值 | 描述 |
---|---|---|
The name of a Kafka configuration property. | The value of a given Kafka configuration property. | 在加载提供的配置之后,这些属性将被添加到Kafka的配置中,如果动态属性是已设置的属性,则其值将被忽略并记录警告消息。有关可用Kafka属性的列表,请参阅: http://kafka.apache.org/documentation.html#configuration. 不支持表达式语言 |
连线:
名字 | 描述 |
---|---|
success | 所有成功发送到Kafka的内容将输出到此成功连线 |
failure | 任何不能发送到Kafka的数据流将输出到此失败连线 |
读取属性:
未提供。
写入属性:
名字 | 描述 |
---|---|
msg.count | 发送到Kafka的消息数. 仅成功数据流上具有该属性 |
状态管理:
该组件不保存状态。
限制:
该组件没有限制
输入流要求:
组件必须提供输入流。
系统资源考量:
未提供。