Skip to main content

ConsumeKafkaRecord_0_10

描述:

通过Apache Kafka 0.10.x版本的消费API来消费消息. 与发送消息的Kafka组件 PublishKafkaRecord_1_0 相对应。请注意,该组件假定所有从分区中接收到的消息记录都具有相同的Schema。 如果任何从Kafka中拉取的消息不能被配置的记录读取服务或输出服务解析,则消息将被单独写入不同的数据流中,并通过'parse.failure'连线输出。否则将输出到成功连线,并在单个的数据流中包含多个单独的消息。 同时,消息记录数'record.count'属性也将添加到数据流上输出。如果两个消息具有不同的Schema,或者具有不同消息头值的消息,则一定不会放到一个数据流输出。

标签:

Kafka, Get, Record, csv, avro, json, Ingest, Ingress, Topic, PubSub, Consume, 0.10.x, 记录,消费,主题, 获取

参数:

如下列表中,必填参数则标识为加粗. 其他未加粗参数,则表示为可选参数。表中同时提到参数默认值设置, 并且 参数还支持 表达式语言.

名字默认值允许值列表描述
Kafka Brokerslocalhost:9092由英文逗号分隔的Kafka Brokers列表,其基本格式为 <host>:<port>

支持表达式语言 (仅支持变量)
主题名消息消费的Kafka主题名。 多个可用英文逗号分隔

支持表达式语言 (仅支持变量)
主题名格式名字列表
- 名字列表一个主题名或由英文逗号分隔的多个主题名
- 正则匹配主题使用Java正则表达式语法匹配
指定主题名的格式,是名字列表还是正则匹配
记录读取服务控制器服务API:
RecordReaderFactory
实现:
ParquetReader
GrokReader
SyslogReader
Syslog5424Reader
CSVReader
AvroReader
JsonPathReader
JsonTreeReader
ScriptedReader
XMLReader
用于解析输入数据流的记录读取服务
记录输出服务控制器服务API:
RecordSetWriterFactory
实现:
CSVRecordSetWriter
FreeFormTextRecordSetWriter
AvroRecordSetWriter
JsonRecordSetWriter
XMLRecordSetWriter
ParquetRecordSetWriter
ScriptedRecordSetWriter
在发送到Kafka之前用于序列化数据记录的服务
安全协议PLAINTEXT
- PLAINTEXTPLAINTEXT
- SSLSSL
- SASL_PLAINTEXTSASL_PLAINTEXT
- SASL_SSLSASL_SSL
用于Broker通信的协议, 对应于Kafka的 'security.protocol'属性设置
Kerberos证书服务控制器服务API:
KerberosCredentialsService
实现:
KeytabCredentialsService
指定用于Kerberos授权的证书服务提供
Kerberos服务名用于Kafka运行的Kerberos Principal名,该参数既可在Kafka的JAAS配置也可以在Kafka的配置中。对应于 Kafka的 'security.protocol'属性设置, 当<安全协议>的SASL选项被选择,否则将被忽略。

支持表达式语言 (仅支持变量)
Kerberos Principal用于连接Brokers的Kerberos principal。如果不设置,则需要通过bootstrap.conf文件中的JVM属性设置方式设置JAAS配置文件。该值将作为Kafka的'sasl.jaas.config'属性设置。

支持表达式语言 (仅支持变量)
Kerberos Keytab用于连接Brokers的Kerberos keytab,如果不设置,则需要通过bootstrap.conf文件中的JVM属性设置方式设置JAAS配置文件。该值将作为Kafka的'sasl.jaas.config'属性设置。

支持表达式语言 (仅支持变量)
SSL服务控制器服务API:
SSLContextService
实现:
StandardRestrictedSSLContextService
StandardSSLContextService
指定用于通信Kafka的SSL服务
组ID在相同消费组内标识消费者的组ID, 对应于Kafka的'group.id'属性设置
Offset resetlatest
- earliest当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
- latest当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
- none主题各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
提供选择管理当没有初始Offset或者当前Offset不存在(比如数据已经删除). 对应于Kafka的'auto.offset.reset'属性设置
最大Poll记录数10000指定用于Kafka返回单个poll的最大记录数
提交时间1 secs指定通过Offset必须提交的最大时间,该值影响Offset的提交频率。 提交Offset通常会减少吞吐量的增加,但也会在提交之间重新平衡或JVM重新启动时增加潜在数据复制的窗口。该设置同时也关系到<最大Poll记录数>和<消息界定符>设置。当使用消息界定功能后,将会拥有比不使用时多得多的未提交消息

动态参数:

名字描述
The name of a Kafka configuration property.The value of a given Kafka configuration property.在加载提供的配置之后,这些属性将被添加到Kafka的配置中,如果动态属性是已设置的属性,则其值将被忽略并记录警告消息。有关可用Kafka属性的列表,请参阅: http://kafka.apache.org/documentation.html#configuration.
不支持表达式语言

连线:

名字描述
success成功接收消费消息的数据将输出到此成功连线, 依赖于消息处理策略,是每个消息一个数据流输出,还是每个消息数据流依据主题和分区来分组的数据流输出。
parse.failure如果消息不能被配置的记录读取服务解析,消息内容将输出到该失败连线

读取属性:

未提供。

写入属性:

名字描述
record.count接收的记录数
mime.type记录输出服务提供的的MIME类型
kafka.partition消息的主题分区
kafka.topic消息的主题

状态管理:

该组件不保存状态。

限制:

该组件没有限制

输入流要求:

组件禁止提供输入流。

系统资源考量:

未提供。

参考:

ConsumeKafka_0_10, PublishKafka_0_10, PublishKafkaRecord_0_10