ConsumeKafkaRecord_1_0
描述:
通过Apache Kafka 1.0版本的消费API来消费消息. 与发送消息的Kafka组件 PublishKafkaRecord_1_0 相对应。请注意,该组件假定所有从分区中接收到的消息记录都具有相同的Schema。 如果任何从Kafka中拉取的消息不能被配置的记录读取服务或输出服务解析,则消息将被单独写入不同的数据流中,并通过'parse.failure'连线输出。否则将输出到成功连线,并在单个的数据流中包含多个单独的消息。 同时,消息记录数'record.count'属性也将添加到数据流上输出。如果两个消息具有不同的Schema,或者具有不同消息头值的消息,则一定不会放到一个数据流输出。
标签:
Kafka, Get, Record, csv, avro, json, Ingest, Ingress, Topic, PubSub, Consume, 1.0, 记录,消费,主题, 获取
参数:
如下列表中,必填参数则标识为加粗. 其他未加粗参数,则表示为可选参数。表中同时提到参数默认值设置, 并且 参数还支持 表达式语言.
名字 | 默认值 | 允许值列表 | 描述 |
---|---|---|---|
Kafka Brokers | localhost:9092 | 由英文逗号分隔的Kafka Brokers列表,其基本格式为 <host>:<port> 支持表达式语言 (仅支持变量) | |
主题名 | 消息消费的Kafka主题名。 多个可用英文逗号分隔 支持表达式语言 (仅支持变量) | ||
主题名格式 | 名字列表 | - 名字列表 - 正则匹配 | 指定主题名的格式,是名字列表还是正则匹配 |
记录读取服务 | 控制器服务API: RecordReaderFactory 实现: ParquetReader GrokReader SyslogReader Syslog5424Reader CSVReader AvroReader JsonPathReader JsonTreeReader ScriptedReader XMLReader | 用于解析输入数据流的记录读取服务 | |
记录输出服务 | 控制器服务API: RecordSetWriterFactory 实现: CSVRecordSetWriter FreeFormTextRecordSetWriter AvroRecordSetWriter JsonRecordSetWriter XMLRecordSetWriter ParquetRecordSetWriter ScriptedRecordSetWriter | 在发送到Kafka之前用于序列化数据记录的服务 | |
事务保证 | 是 | - 是 - 否 | 指定在与Kafka通信时是否应遵守事务性保证。 如果设置否,组件将使用“read_uncomitted“的隔离级别. 也就是说,接收到消息将尽可能的被写入到Kafka,甚至生产者取消了事务处理。如果设置是,组件将在生产者的事务被取消后,不接收任何消息,但是这可能会导致一些延迟,因为消费者必须等待生产者完成其整个事务,而不是在消息可用时就进行拉取。 |
安全协议 | PLAINTEXT | - PLAINTEXT - SSL - SASL_PLAINTEXT - SASL_SSL | 用于Broker通信的协议, 对应于Kafka的 'security.protocol'属性设置 |
Kerberos证书服务 | 控制器服务API: KerberosCredentialsService 实现: KeytabCredentialsService | 指定用于Kerberos授权的证书服务提供 | |
Kerberos服务名 | 用于Kafka运行的Kerberos Principal名,该参数既可在Kafka的JAAS配置也可以在Kafka的配置中。对应于 Kafka的 'security.protocol'属性设置, 当<安全协议>的SASL选项被选择,否则将被忽略。 支持表达式语言 (仅支持变量) | ||
Kerberos Principal | 用于连接Brokers的Kerberos principal。如果不设置,则需要通过bootstrap.conf文件中的JVM属性设置方式设置JAAS配置文件。该值将作为Kafka的'sasl.jaas.config'属性设置。 支持表达式语言 (仅支持变量) | ||
Kerberos Keytab | 用于连接Brokers的Kerberos keytab,如果不设置,则需要通过bootstrap.conf文件中的JVM属性设置方式设置JAAS配置文件。该值将作为Kafka的'sasl.jaas.config'属性设置。 支持表达式语言 (仅支持变量) | ||
SSL服务 | 控制器服务API: SSLContextService 实现: StandardRestrictedSSLContextService StandardSSLContextService | 指定用于通信Kafka的SSL服务 | |
组ID | 在相同消费组内标识消费者的组ID, 对应于Kafka的'group.id'属性设置 | ||
Offset reset | latest | - earliest - latest - none | 提供选择管理当没有初始Offset或者当前Offset不存在(比如数据已经删除). 对应于Kafka的'auto.offset.reset'属性设置 |
消息头编码 | UTF-8 | 任何匹配的消息头将作为输出流的属性输出。该参数决定用于反序列化消息头的字符编码 | |
消息头正则匹配 | 用于匹配所有消息头的正则表达式。任何匹配该正则表达式的消息头的名字将作为输出数据流的属性,如果不设置,将没有消息头作为输出流的属性。 如果恰好有两个消息匹配的消息头一样,但有不同值,则这个两个消息必须通过不同的数据流输出。 | ||
最大Poll记录数 | 10000 | 指定用于Kafka返回单个poll的最大记录数 | |
提交时间 | 1 secs | 指定通过Offset必须提交的最大时间,该值影响Offset的提交频率。 提交Offset通常会减少吞吐量的增加,但也会在提交之间重新平衡或JVM重新启动时增加潜在数据复制的窗口。该设置同时也关系到<最大Poll记录数>和<消息界定符>设置。当使用消息界定功能后,将会拥有比不使用时多得多的未提交消息 |
动态参数:
名字 | 值 | 描述 |
---|---|---|
The name of a Kafka configuration property. | The value of a given Kafka configuration property. | 在加载提供的配置之后,这些属性将被添加到Kafka的配置中,如果动态属性是已设置的属性,则其值将被忽略并记录警告消息。有关可用Kafka属性的列表,请参阅: http://kafka.apache.org/documentation.html#configuration. 不支持表达式语言 |
连线:
名字 | 描述 |
---|---|
success | 成功接收消费消息的数据将输出到此成功连线, 依赖于消息处理策略,是每个消息一个数据流输出,还是每个消息数据流依据主题和分区来分组的数据流输出。 |
parse.failure | 如果消息不能被配置的记录读取服务解析,消息内容将输出到该失败连线 |
读取属性:
未提供。
写入属性:
名字 | 描述 |
---|---|
record.count | 接收的记录数 |
mime.type | 记录输出服务提供的的MIME类型 |
kafka.partition | 消息的主题分区 |
kafka.topic | 消息的主题 |
状态管理:
该组件不保存状态。
限制:
该组件没有限制
输入流要求:
组件禁止提供输入流。
系统资源考量:
未提供。