Skip to main content

JoinEnrichment

描述:

将来自两个不同流文件的记录连接在一起,其中一个流文件,“original”包含任意记录,第二个流文件,“enrichment”包含额外的数据,应该用来充实第一个流文件。有关如何配置这个处理器以及它要实现的不同用例的更多信息,请参阅附加详细信息。

标签:

fork, join, enrich, json, record, sql, wrap, recordpath, merge, combine, streams

参数:

如下列表中,必填参数则标识为加粗. 其他未加粗参数,则表示为可选参数。表中同时提到参数默认值设置, 并且 参数还支持 表达式语言.

名字默认值允许值列表描述
Original读取服务控制器服务API:
RecordReaderFactory
实现:
ParquetReader
GrokReader
SyslogReader
Syslog5424Reader
CSVReader
AvroReader
JsonPathReader
JsonTreeReader
ScriptedReader
XMLReader
指定用于解析传入数据和确定数据结构的服务
Enrichment读取服务控制器服务API:
RecordReaderFactory
实现:
ParquetReader
GrokReader
SyslogReader
Syslog5424Reader
CSVReader
AvroReader
JsonPathReader
JsonTreeReader
ScriptedReader
XMLReader
指定用于解析传入数据和确定数据结构的服务。用于写入结果的Record Writer。如果将Record Writer配置为继承记录的Schema,那么它将继承的schema将是“Original”的schema和“Enrichment”的schema的合并结果。
写入服务控制器服务API:
RecordSetWriterFactory
实现:
CSVRecordSetWriter
FreeFormTextRecordSetWriter
AvroRecordSetWriter
JsonRecordSetWriter
XMLRecordSetWriter
ParquetRecordSetWriter
ScriptedRecordSetWriter
指定用于将结果写入数据流的服务
联接策略包装器
- 包装器输出是一个包含两个字段的Record:'original'和'enrichment'字段。(1) 'original':包含来自原始流文件的记录;(2)'enrichment'包含来自enrichment标识的流文件的记录。记录将根据它们在FlowFile中的索引进行关联。如果一个FlowFile的记录比另一个多,则使用空值。
- SQL输出是通过计算一个SQL SELECT语句得到的,该语句允许两个表:'original'和'enrichment'。这允许使用SQL JOIN语句来关联两个FlowFile的Records,因此在FlowFile中, Record的索引并不重要。
- 插入丰富字段通过将enrichment标识的Record的所有字段放入来自原始流文件的相应记录中,记录与原始流文件连接在一起。记录将根据它们在FlowFile中的索引进行关联。
指定如何将两个流文件连接为单个流文件。当设置为SQL时,SQL需要设置该值,才能作用生效。当设置为“插入丰富字段”时,“插入记录路径”需要设置该值,才能作用生效。
SQLSELECT original., enrichment.

FROM original LEFT OUTER JOIN enrichment ON original.id = enrichment.id| |要计算的SQL SELECT语句。可以提供表达式语言,但这样做可能会导致性能较差。因为这个处理器同时处理两个flowfile,所以理解属性将如何被引用也很重要。如果两个FlowFile都有一个名称相同但值不同的属性,表达式语言将解析为“enrichment”FlowFile提供的值。

支持表达式语言 (支持流属性和变量)| | 默认的小数精度 | 10 | |当DECIMAL/NUMBER值被写为“DECIMAL”Avro逻辑类型时,需要特定的“精度”表示可用数字的数量。通常,精度是由列数据类型定义或数据库引擎默认值定义的。然而,某些数据库引擎可以返回未定义的精度(0)。'Default Decimal Precision'在写入那些未定义的精度数字时使用。

支持表达式语言 (支持流属性和变量)| | 默认十进位制 | 0 | |当DECIMAL/NUMBER值被写为“DECIMAL”Avro逻辑类型时,需要一个特定的“刻度”,表示可用的十进制数字的数量。通常,规模由列数据类型定义或数据库引擎默认值定义。然而,当返回未定义的精度(0)时,某些数据库引擎的规模也可能是不确定的。当写入那些未定义的数字时,使用默认十进制刻度。如果一个值的小数多于指定的比例,那么该值将被四舍五入,例如,1.53在比例为0时为2,在比例为1时为1.5。

支持表达式语言 (支持流属性和变量)| | 插入记录的路径 | | |指定在'original'记录中'enrichment'的字段应该插入的位置。请注意,如果RecordPath不指向原始记录中的任何现有字段,则不会插入enrichment内容。

支持表达式语言 (支持流属性和变量)| | 最大Bin数 | 10000 | |Specifies the maximum number of bins that can be held in memory at any one time| | Timeout | 10 min | |Specifies the maximum amount of time to wait for the second FlowFile once the first arrives at the processor, after which point the first FlowFile will be routed to the 'timeout' relationship.|

连线:

名字描述
timeout如果一个传入的流文件(即'original'流文件或'enrichment'流文件)到达这个处理器,但另一个没有在配置的超时时间内到达,则到达的流文件被路由到此关系。
joined生成的流文件是original流文件和enrichment流文件的记录连接在一起,将被路由到此关系
failure如果“enrichment”和“original”的流文件都到达组件处理,但在连接记录时发生了失败,这两个流文件将被路由到此关系。
original传入的两个流文件('original'和'enrichment')都将被路由到这个关系。也就是说,这是这两个flowfile的“原始”版本。

读取属性:

未提供。

写入属性:

名字描述
mime.type将mime.type属性设置为写入服务指定的MIME类型
record.countFlowFile中的记录数量

状态管理:

该组件不保存状态。

限制:

该组件没有限制

输入流要求:

组件必须提供输入流。

系统资源考量:

资源描述
MEMORYThis Processor will load into heap all FlowFiles that are on its incoming queues. While it loads the FlowFiles themselves, and not their content, the FlowFile attributes can be very memory intensive. Additionally, if the Join Strategy is set to SQL, the SQL engine may require buffering the entire contents of the enrichment FlowFile for each concurrent task. See Processor's Additional Details for more details and for steps on how to mitigate these concerns.

参考:

ForkEnrichment