Skip to main content

EnforceOrder

描述:

强制将相同数据组的数据流按照一定的期望顺序。尽管PriorityAttributePrioritizer可以被用来确保数据流连线的处理级别顺序。而依赖于错误处理、分支以及流程设计,也可能导致数据流顺序问题。该组件可用于强制原始数据流的顺序。 [重要] 为了让该组件生效, FirstInFirstOutPrioritizer 应该在每个下游连线中使用,直到数据流的物理顺序通过诸如MergeContent的组件来得以固定

标签:

sort, order, 排序, 顺序

参数:

如下列表中,必填参数则标识为加粗. 其他未加粗参数,则表示为可选参数。表中同时提到参数默认值设置, 并且 参数还支持 表达式语言.

名字默认值允许值列表描述
组标识${filename}可以有多个排序组。 该参数用于决定数据流属于哪个组。该参数将被每个输入数据流计算。 如果计算结果为空,则会路由到失败连线

支持表达式语言 (支持流属性和变量)
次序属性指定用于强制数据流分组排序的属性名。如果输入数据流没有此属性,或值不是整数,则路由到失败连线
初始次序0当组的第一个数据流到达时,将计算初始目标次序并存储其状态。之后,组件将开始跟踪目标次序,并存储其状态。如果该参数为表达式,当计算结果不为整数,则会路由到失败连线,并且初始次序将保持未知,直到有其他数据流提供有效的初始次序。

支持表达式语言 (支持流属性和变量)
最大次序如果指定,任何大于该次序的数据流将输出到失败连线。对于给定的组,此属性只计算一次。在计算最大次序之后,它将被持久化在状态管理中,并用于属于同一组的其他数据流。 如果该参数为表达式,当计算结果不为整数,则会路由到失败连线,并且最大次序将保持未知,直到有其他数据流提供有效的最大次序。

支持表达式语言 (支持流属性和变量)
批量数1000组件执行时可处理的最大数据流数。
等待超时10 min指定等待数据流将被路由到“overtook”连线的时长
不活动超时30 min指定非活动组的状态从存储的状态中清除的时长。如果在指定的时间内没有看到任何新传入数据流,则组被确定为不活动。该参数必须大于《等待超时》 。如果数据流在其组被清除后仍旧延迟到达,它将被视为一个全新的组,但永远不会与原次序匹配,因为预期的前一个数据流已经不存在。数据流最终将超时等待并路由到“overtook”连线。为了避免这种情况,组状态应该保持足够长的时间,但是,较短的时长将有助于再次重用相同的组标识

连线:

名字描述
overtook等待前面的数据流超过《等待超时》并超过这些数据流,将被路由到此连线
skipped数据流的次序小于当前(意味着太晚到达)并被忽略的数据流路由到此连线
success具有匹配次序数量的数据流将输出到此成功连线
wait没有匹配次序的数据流将输出到此连线
failure数据流没有相应属性或计算失败将输出到此失败连线

读取属性:

未提供。

写入属性:

名字描述
EnforceOrder.startedAt通过此组件的所有数据流都将具有此属性。该值用于确定等待超时
EnforceOrder.result通过此组件的所有流文件都将具有此属性,指示将其路由到哪个连线
EnforceOrder.detail当数据路由到'failure' 或 'skipped' 连线时,该属性指示失败细节信息
EnforceOrder.expectedOrder当数据路由到'wait' 或 'skipped' 连线时,指示处理数据流时的预期次序的属性。

状态管理:

范围描述
LOCAL组件为每个次序组使用如下状态: <groupId>.target表示等待下一个到达的次序。当匹配次序的数据流到达时,或者因为《等待超时》,超过了等待的数据流时,目标次序将更新(FlowFile.order + 1). <groupId>.max为组的最大次序。 <groupId>.updatedAt为上次更新组次序时的时间戳。一旦一个组被确定为不活动,这些管理状态将被自动删除, 查看参数 《不活动超时》了解更多。

限制:

该组件没有限制

输入流要求:

组件必须提供输入流。

系统资源考量:

未提供。