Runtime流程设计-13种设计模式
一、请求响应模式
- 说明:
- 需要起始使用HandleHTTPRequest,结尾使用HandleHTTPResponse组件,中间是获取数据以及组装数据的组件,来组成一个完整的会话。
- 流程图
- 配置
HandleHTTPRequest
- 监听端口
- HTTP会话服务
- 请求路径
- 配置
HandleHttpResponse
- HTTP状态码,正常 200
- HTTP会话服务 必须和配置HandleHTTPRequest 中的HTTP会话服务 一致
- 可以增加配置 Content-type,例如:Content-type:application/json;charset=UTF-8(避免中文乱码)
二、抓取和推送模式
- 说明:
- 抓取模式需要使用可以抓取数据的组件,如Get,Fetch,List*以及其他命名的组件
- 推送模式需要使用可以推送数据的组件,如Put* 或其他命名的组件
1. 数据同步流程图
- 配置
QueryDatabaseTable
- 数据库连接池
- 数据库类型
- 表名
- 选择性配置 自定义查询 或 返回字段 或 额外的WHERE子句等等
- 数据库连接池
- 配置
PutDatabaseRecord
- 记录读取器,AvroReader
- 语句类型 INSERT/UPDATE/DELETE
- 数据库连接池
- 表名
三、异步请求模式
- 说明: 需要起始使用Wait和Notiry组件配合使用,完成异步转同步的功能,为提高效率,同时并行请求多个服务,最后等待全部服务都请求完成后再处理后继步骤。
- 流程图-等待唤醒模式
- 配置
Wait
- 信号释放标识符
- 缓存服务
- 配置
Notify
- 信号释放标识符
- 缓存服务
四、拆分和合并模式
- 说明:
- 拆分模式组件有split*组件以及其他组件
- 合并模式组件有merge*组件以及其他组件
1. 流程图-Split-Defragment
- 配置
SplitContent
- 字节序列(分隔符)
- 配置
MergeContent
- 合并策略 Defragment
- 其他无需配置
2. 流程图-Bin-Packing
- 配置
MergeRecord
- 记录读取器
- 记录写入器
- 合并策略
- 关联属性名(合并规则使用) http.context.identifier
- 最小记录数
- 最大记录数
- 记录最大生命周期(超时则会强制按当前条件合并当前flow)
五、提取模式
- 说明:
- 数据提取的组件多以Extract、Evaluate命名,例如ExtractText,EvaluateJsonPath,EvaludateXPath, EvaludateXQuery
1. 流程图-ExtractText
- 配置
Extract
- Enable DOTALL Mode 处理全部
- Maximum Capture Group Length 抽取到属性的值的截取长度设置,默认1024
- 动态增加提取到属性,例如 all = .* 代表把全部都放到all.0属性中
2. 流程图-EvaluateJsonPath
- 配置
EvaluateJsonPath
- 目标 放到属性中还是放到flow中,本流程设置为属性
- 返回类型 json
- 动态增加提取到属性的JSONPath表达式,例如 newID=$.id 意思是提取FLow中第一级json中属性为id的值,放到属性为newID中
- 动态属性提取表达式可以通过$.users.user.id 来获取json多层级下属性值。
3. 流程图-EvaluateXPath
- 配置
EvaluateXPath
- 目标 放到属性中还是放到flow中,本流程设置为放到flow中
- 返回类型 json
- 动态增加提取到属性,表达式为XPath,例如 data=/RBSPMessage/Method/Items/Item/Value 意思是提取FLow中这个路径/RBSPMessage/Method/Items/Item/Value 对应的值设置到data中。
六、路由模式
- 说明:
- 路由属性RoutOnAttribute
- 路由内容RountOnContent
1. 流程图-RountOnContent
- 配置
RountOnContent
- 匹配要求(完全匹配/包含匹配),本次设置为完全匹配
- 动态增加带路由条件的输出流 allMatch =123,表示如果内容完全等于123,则flow会流向allMatch 的flow;不能完全等于123的,则会流向unmatched的flow
2. 流程图-RountOnAttribute
- 配置
RountOnAttribute
- 路由策略(路由到属性名/全匹配路由到match/任意匹配路由到match),本次设置为完全匹配
- 动态增加带路由条件的输出流 validAge = ${age:ge(18)},表示如果age大于等于18,则会流向validAge的flow,否则会流向unmatched的flow。
七、重试模式
- 说明:
- 有的组件自带retry的flow,但是如何保证重试的频次,才能保证重试的效率,仍需设计重试机制。
- 流程图
- 配置 重试组件的上一个组件
- 配置一个变量 retryTimes变量,初始化为 0 。如果不具备动态增加属性的情况,可以在重试组件的前面增加updateAttribute组件,在它上面增加属性。
- 配置 重试组件的
retry
连线到UpdateAttribute
组件 - 配置
UpdateAttribute
- retryTimes = ${retryTimes:plus(1)}
- 配置
RouteOnAttribute
- 路由策略=路由到属性名
- canRetry=${retryTimes:le(${maxRetryTimes})} 其中 ${maxRetryTimes} 是允许最大重试次数,也可以写成常量数字。
- 调度里的运行安排 设置重试的时间间隔,比如5秒,设置为 5 sec
- 输出canRetry的 flow返回到重试的组件,形成回流。
八、文件传输模式
- 说明: 上传组件使用
InvokeHTTP
,下载组件也使用InvokeHTTP
。
1. 流程图-上传
- 配置
GetFile
- 读取目录
- 文件过滤器
- 配置
InvokeHTTP
- HTTP方法: POST
- 远程URL
- Content-Type: Multipart/form-data;charset=UTF-8
- 动态增加fileName : ${filename}
2. 流程图-下载
- 配置
InvokeHTTP
- HTTP方法: GET
- 远程URL
- 动态增加fileName : ${filename}
- 配置
PutFile
- 目录
- 冲突解决策略
- 创建不存在目录
九、增量同步模式
- 说明:
- 使用
QueryDatabaseTable
可以对数据库进行增量查询
- 使用
- 流程图
- 配置
QueryDatabaseTable
- 数据库连接池
- 数据库类型
- 表名
- 最大值字段 (这个配置决定了增量查询的条件),本次流程使用ID
- 查看当前查询的最大的ID值
- 数据库连接池
十、JSON变换模式
- 说明:
- 使用
JoltTransformJSON
进行结构变换
- 使用
- 流程图
- 配置
JoltTransformJSON
- Jolt Specification
[{
"operation": "shift",
"spec": {
"users": {
"*": {
"name": "users[&1].name",
"age": "users[&1].age",
"@(2,timestamp)": "users[&1].timestamp"
}
}
}
}]
十一、格式转换模式
- 说明:
- 使用Convert*等组件来进行格式转换
1. 流程图-Avro->Json
- 配置
ConvertAvroToJSON
- 无需配置
2. 流程图-Xml->Json
- 配置
ConvertRecord
- 记录写入服务 JsonRecordSetWriter
- 无需配置
- 记录读取服务 XMLReader
- Schema访问策略:使用 'Schema内容' 属性
- Schema内容 :
- 记录写入服务 JsonRecordSetWriter
{
"namespace": "example.avro",
"type": "record",
"name": "Delivery",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
十二、循环模式
- 说明:
- 使用RouteOnAttribute和UpdateAttribute组合,设计多种多样,不拘一格。
1. 流程图-loop
- 配置
GenerateFlowFile
- 动态增加index = 0
- 动态增加max = 100
- 动态增加step = 1
- 配置
UpdateAttribute
- 动态增加 index = ${index:plus(${step})}
- 配置
RouteOnAttribute
- 动态增加 moreThanMax = ${index:gt(${max})}
- 配置unmatched 的flow 输出为continue、loop
- 配置 moreThanMax 的flow 为结束退出。
2. 流程图-loop2
- 配置
GenerateFlowFile
- 动态增加index = 1
- 动态增加max = 100
- 动态增加step = 1
- 配置
RouteOnAttribute
- 动态增加 leMax = ${index:le(${max})}
- 配置leMax 的flow 输出为continue、loop
- 配置unmatched 的flow 到exit。
- 配置
UpdateAttribute
- 动态增加 index = ${index:plus(${step})}
十三、lookup模式
- 说明:
- 使用LookupRecord或lookupAttribute组件。
- 流程图
- 配置
LookupAttribute
- Lookup Service 配置key 和value
- 动态增加 birthday = ${id} 表示取 id值是${id}的birthday数据,放到属性birthday中。
- Lookup Service 配置key 和value