Skip to main content

Runtime流程设计-13种设计模式

一、请求响应模式

  • 说明:
    • 需要起始使用HandleHTTPRequest,结尾使用HandleHTTPResponse组件,中间是获取数据以及组装数据的组件,来组成一个完整的会话。
  • 流程图

  • 配置 HandleHTTPRequest
    • 监听端口
    • HTTP会话服务
    • 请求路径
  • 配置 HandleHttpResponse
    • HTTP状态码,正常 200
    • HTTP会话服务 必须和配置HandleHTTPRequest 中的HTTP会话服务 一致
    • 可以增加配置 Content-type,例如:Content-type:application/json;charset=UTF-8(避免中文乱码)

二、抓取和推送模式

  • 说明:
    • 抓取模式需要使用可以抓取数据的组件,如Get,Fetch,List*以及其他命名的组件
    • 推送模式需要使用可以推送数据的组件,如Put* 或其他命名的组件

1. 数据同步流程图

  • 配置 QueryDatabaseTable
    • 数据库连接池
    • 数据库类型
    • 表名
    • 选择性配置 自定义查询 或 返回字段 或 额外的WHERE子句等等
  • 配置 PutDatabaseRecord
    • 记录读取器,AvroReader
    • 语句类型 INSERT/UPDATE/DELETE
    • 数据库连接池
    • 表名

三、异步请求模式

  • 说明: 需要起始使用Wait和Notiry组件配合使用,完成异步转同步的功能,为提高效率,同时并行请求多个服务,最后等待全部服务都请求完成后再处理后继步骤。
  • 流程图-等待唤醒模式

  • 配置 Wait
    • 信号释放标识符
    • 缓存服务
  • 配置 Notify
    • 信号释放标识符
    • 缓存服务

四、拆分和合并模式

  • 说明:
    • 拆分模式组件有split*组件以及其他组件
    • 合并模式组件有merge*组件以及其他组件

1. 流程图-Split-Defragment

  • 配置 SplitContent
    • 字节序列(分隔符)
  • 配置 MergeContent
    • 合并策略 Defragment
    • 其他无需配置

2. 流程图-Bin-Packing

  • 配置 MergeRecord
    • 记录读取器
    • 记录写入器
    • 合并策略
    • 关联属性名(合并规则使用) http.context.identifier
    • 最小记录数
    • 最大记录数
    • 记录最大生命周期(超时则会强制按当前条件合并当前flow)

五、提取模式

  • 说明:
    • 数据提取的组件多以Extract、Evaluate命名,例如ExtractText,EvaluateJsonPath,EvaludateXPath, EvaludateXQuery

1. 流程图-ExtractText

  • 配置 Extract
    • Enable DOTALL Mode 处理全部
    • Maximum Capture Group Length 抽取到属性的值的截取长度设置,默认1024
    • 动态增加提取到属性,例如 all = .* 代表把全部都放到all.0属性中

2. 流程图-EvaluateJsonPath

  • 配置 EvaluateJsonPath
    • 目标 放到属性中还是放到flow中,本流程设置为属性
    • 返回类型 json
    • 动态增加提取到属性的JSONPath表达式,例如 newID=$.id 意思是提取FLow中第一级json中属性为id的值,放到属性为newID中
    • 动态属性提取表达式可以通过$.users.user.id 来获取json多层级下属性值。

3. 流程图-EvaluateXPath

  • 配置 EvaluateXPath
    • 目标 放到属性中还是放到flow中,本流程设置为放到flow中
    • 返回类型 json
    • 动态增加提取到属性,表达式为XPath,例如 data=/RBSPMessage/Method/Items/Item/Value 意思是提取FLow中这个路径/RBSPMessage/Method/Items/Item/Value 对应的值设置到data中。

六、路由模式

  • 说明:
    • 路由属性RoutOnAttribute
    • 路由内容RountOnContent

1. 流程图-RountOnContent

  • 配置 RountOnContent
    • 匹配要求(完全匹配/包含匹配),本次设置为完全匹配
    • 动态增加带路由条件的输出流 allMatch =123,表示如果内容完全等于123,则flow会流向allMatch 的flow;不能完全等于123的,则会流向unmatched的flow

2. 流程图-RountOnAttribute

  • 配置 RountOnAttribute
    • 路由策略(路由到属性名/全匹配路由到match/任意匹配路由到match),本次设置为完全匹配
    • 动态增加带路由条件的输出流 validAge = ${age:ge(18)},表示如果age大于等于18,则会流向validAge的flow,否则会流向unmatched的flow。

七、重试模式

  • 说明:
    • 有的组件自带retry的flow,但是如何保证重试的频次,才能保证重试的效率,仍需设计重试机制。
  • 流程图

  • 配置 重试组件的上一个组件
    • 配置一个变量 retryTimes变量,初始化为 0 。如果不具备动态增加属性的情况,可以在重试组件的前面增加updateAttribute组件,在它上面增加属性。
  • 配置 重试组件的retry连线到 UpdateAttribute组件
  • 配置 UpdateAttribute
    • retryTimes = ${retryTimes:plus(1)}
  • 配置 RouteOnAttribute
    • 路由策略=路由到属性名
    • canRetry=${retryTimes:le(${maxRetryTimes})} 其中 ${maxRetryTimes} 是允许最大重试次数,也可以写成常量数字。
    • 调度里的运行安排 设置重试的时间间隔,比如5秒,设置为 5 sec
    • 输出canRetry的 flow返回到重试的组件,形成回流。

八、文件传输模式

  • 说明: 上传组件使用 InvokeHTTP,下载组件也使用 InvokeHTTP

1. 流程图-上传

  • 配置 GetFile
    • 读取目录
    • 文件过滤器
  • 配置 InvokeHTTP
    • HTTP方法: POST
    • 远程URL
    • Content-Type: Multipart/form-data;charset=UTF-8
    • 动态增加fileName : ${filename}

2. 流程图-下载

  • 配置 InvokeHTTP
    • HTTP方法: GET
    • 远程URL
    • 动态增加fileName : ${filename}
  • 配置 PutFile
    • 目录
    • 冲突解决策略
    • 创建不存在目录

九、增量同步模式

  • 说明:
    • 使用QueryDatabaseTable可以对数据库进行增量查询
  • 流程图

  • 配置 QueryDatabaseTable
    • 数据库连接池 数据库连接池
    • 数据库类型
    • 表名
    • 最大值字段 (这个配置决定了增量查询的条件),本次流程使用ID
    • 查看当前查询的最大的ID值 增量同步

十、JSON变换模式

  • 说明:
    • 使用JoltTransformJSON进行结构变换
  • 流程图

  • 配置 JoltTransformJSON
    • Jolt Specification
[{
"operation": "shift",
"spec": {
"users": {
"*": {
"name": "users[&1].name",
"age": "users[&1].age",
"@(2,timestamp)": "users[&1].timestamp"
}
}
}
}]

十一、格式转换模式

  • 说明:
    • 使用Convert*等组件来进行格式转换

1. 流程图-Avro->Json

  • 配置 ConvertAvroToJSON
    • 无需配置

2. 流程图-Xml->Json

  • 配置 ConvertRecord
    • 记录写入服务 JsonRecordSetWriter
      • 无需配置
    • 记录读取服务 XMLReader
      • Schema访问策略:使用 'Schema内容' 属性
      • Schema内容 :
{
"namespace": "example.avro",
"type": "record",
"name": "Delivery",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}

十二、循环模式

  • 说明:
    • 使用RouteOnAttribute和UpdateAttribute组合,设计多种多样,不拘一格。

1. 流程图-loop

  • 配置 GenerateFlowFile
    • 动态增加index = 0
    • 动态增加max = 100
    • 动态增加step = 1
  • 配置 UpdateAttribute
    • 动态增加 index = ${index:plus(${step})}
  • 配置 RouteOnAttribute
    • 动态增加 moreThanMax = ${index:gt(${max})}
    • 配置unmatched 的flow 输出为continue、loop
    • 配置 moreThanMax 的flow 为结束退出。

2. 流程图-loop2

  • 配置 GenerateFlowFile
    • 动态增加index = 1
    • 动态增加max = 100
    • 动态增加step = 1
  • 配置 RouteOnAttribute
    • 动态增加 leMax = ${index:le(${max})}
    • 配置leMax 的flow 输出为continue、loop
    • 配置unmatched 的flow 到exit。
  • 配置 UpdateAttribute
    • 动态增加 index = ${index:plus(${step})}

十三、lookup模式

  • 说明:
    • 使用LookupRecord或lookupAttribute组件。
  • 流程图

  • 配置 LookupAttribute
    • Lookup Service 配置key 和value
    • 动态增加 birthday = ${id} 表示取 id值是${id}的birthday数据,放到属性birthday中。