Skip to main content

RocketMQ配置示例

目前平台还没有直接支持 RocketMQ 的组件,研发那边已经在考虑开发,目前通过脚本方式实现消息推送和消费,此文档仅作为参考,后续相关组件开发好之后再更新此文档

一、PublishRocketMQ

  • ExecuteScript
@Grab(group='org.apache.rocketmq', module='rocketmq-client', version='5.0.0-ALPHA')
@Grab(group='org.apache.rocketmq', module='rocketmq-acl', version='5.0.0-ALPHA')
@Grab(group='org.apache.rocketmq', module='rocketmq-common', version='5.0.0-ALPHA')
@Grab(group='org.apache.rocketmq', module='rocketmq-remoting', version='5.0.0-ALPHA')
import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
* { 依赖包 }
* https://repo1.maven.org/maven2/org/apache/rocketmq/rocketmq-remoting/5.0.0-ALPHA/rocketmq-remoting-5.0.0-ALPHA.jar
* https://repo1.maven.org/maven2/org/apache/rocketmq/rocketmq-common/5.0.0-ALPHA/rocketmq-common-5.0.0-ALPHA.jar
* https://repo1.maven.org/maven2/org/apache/rocketmq/rocketmq-acl/5.0.0-ALPHA/rocketmq-acl-5.0.0-ALPHA.jar
* https://repo1.maven.org/maven2/org/apache/rocketmq/rocketmq-client/5.0.0-ALPHA/rocketmq-client-5.0.0-ALPHA.jar
*/

def flowFile = session.get();
if (flowFile == null) {
return;
}
def group = context.newPropertyValue('${group}').evaluateAttributeExpressions().getValue()
def namesrvAddr = context.newPropertyValue('${namesrvAddr}').evaluateAttributeExpressions().getValue()
def topic = context.newPropertyValue('${topic}').evaluateAttributeExpressions().getValue()
def msgTag = context.newPropertyValue('${msgTag}').evaluateAttributeExpressions().getValue()
def accessKey = context.newPropertyValue('${accessKey}').evaluateAttributeExpressions().getValue()
def secretKey = context.newPropertyValue('${secretKey}').evaluateAttributeExpressions().getValue()

def msgId = ''
def sendStatus = ''
try {
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)

/**
* 1. 创建Producer,并开启消息轨迹。设置为Group ID。
* 如果不想开启消息轨迹,可以按照如下方式创建:
* DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
*/
DefaultMQProducer producer = new DefaultMQProducer(group, getAclRPCHook(accessKey,secretKey), true, null);
/**
* 在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项。
*/
producer.setAccessChannel(AccessChannel.CLOUD);
/**
* 设置RocketMQ的接入点信息
*/
producer.setNamesrvAddr(namesrvAddr);
// 2.启动生产者客户端
producer.start();
// 3.发消息
Message msg = new Message(topic,msgTag,
text.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
msgId = sendResult.getMsgId()
sendStatus = sendResult.getSendStatus().name()
outputStream.write(text.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
flowFile = session.putAttribute(flowFile, 'message_id', msgId)
flowFile = session.putAttribute(flowFile, 'sendStatus', sendStatus)
session.transfer(flowFile, REL_SUCCESS)
session.commit()
}
catch(Exception e) {
//消息发送失败,考虑是否需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
log.error("Send mq message failed:", e)
flowFile = session.putAttribute(flowFile, "send.error", e.getMessage())
session.transfer(flowFile, REL_FAILURE)
session.commit()
}


/**
* ACL AccessKey和SecretKey
*/
private static RPCHook getAclRPCHook(String accessKey,String secretKey) {
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}

二、ConsumeRockerMQ

  • InvokeScriptedProcessor
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnRemoved;

import java.util.List;
import org.apache.nifi.logging.ComponentLog

// 依赖包
// commons-codec-1.15.jar
// fastjson-1.2.76.jar
// netty-all-4.1.65.Final.jar
// rocketmq-acl-5.0.0-ALPHA.jar
// rocketmq-client-5.0.0-ALPHA.jar
// rocketmq-common-5.0.0-ALPHA.jar
// rocketmq-logging-5.0.0-ALPHA.jar
// rocketmq-remoting-5.0.0-ALPHA.jar

class ReceiverProcessor implements Processor {


def REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build();
def REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles that were failed to processed").build();
def ComponentLog log;
private static DefaultMQPushConsumer consumer;

@Override
void initialize(ProcessorInitializationContext context) {
log = context.getLogger()
}

@Override

Set<Relationship> getRelationships() {
return [REL_SUCCESS, REL_FAILURE] as Set
}

@Override
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
def group = context.newPropertyValue('${group}').evaluateAttributeExpressions().getValue()
def namesrvAddr = context.newPropertyValue('${namesrvAddr}').evaluateAttributeExpressions().getValue()
def topic = context.newPropertyValue('${topic}').evaluateAttributeExpressions().getValue()
def msgTag = context.newPropertyValue('${msgTag}').evaluateAttributeExpressions().getValue()
def accessKey = context.newPropertyValue('${accessKey}').evaluateAttributeExpressions().getValue()
def secretKey = context.newPropertyValue('${secretKey}').evaluateAttributeExpressions().getValue()

try {

if(consumer == null) {

/**
* 创建Consumer,并开启消息轨迹。设置RocketMQ创建的Group ID。
* 如果不想开启消息轨迹,可以按照如下方式创建:
* DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
*/
consumer = new DefaultMQPushConsumer(group, getAclRPCHook(accessKey,secretKey), new AllocateMessageQueueAveragely(), true, null);

//设置RocketMQ版实例的接入点
consumer.setNamesrvAddr(namesrvAddr);
//消息轨迹需要设置为CLOUD方式,在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项。
consumer.setAccessChannel(AccessChannel.CLOUD);
// 订阅这个 topic 下所有的消息
consumer.subscribe(topic, "*");

// 第三步:注册消息的监听
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext consumeContext) {
try{
def session = sessionFactory.createSession()
def FlowFile flowFile = session.create()

// 接收消息
StringBuffer str = new StringBuffer();
for(MessageExt msg : msgs){
str.append(new String(msg.getBody())).append("\n");;
}
flowFile = session.write(flowFile, {outputStream ->
outputStream.write(str.toString().getBytes('utf-8'))
} as OutputStreamCallback)

session.transfer(flowFile, REL_SUCCESS)
session.commit()
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} catch(Exception e) {
log.error('Something went wrong', e)
session.transfer(flowFile, REL_FAILURE)
session.commit()
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});

// 第四部:启动消费者客户端
consumer.start();

}
} catch(e) {
log.error('Something went wrong', e)

}
}

@Override
Collection<ValidationResult> validate(ValidationContext context) { return null }

@Override
PropertyDescriptor getPropertyDescriptor(String name) { return null }

@Override

void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {

}

@Override

List<PropertyDescriptor> getPropertyDescriptors() { return null }

@Override

String getIdentifier() { return null }

@OnRemoved
void stop(){
consumer.shutdown();
}
@OnStopped
void stop1(){
consumer.shutdown();
}

/**
* ACL AccessKey和SecretKey
*/
private RPCHook getAclRPCHook(String accessKey,String secretKey) {
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}

}

processor = new ReceiverProcessor()

三、流程模板

参见:模板文件

四、RocketMQ 环境

version: '3.2'

networks:
integrated-dev-net:
driver: bridge

services:
# RocketMQ
#Service for nameserver
namesrv:
image: apacherocketmq/rocketmq
networks:
- integrated-dev-net
container_name: "rocketmq-namesrv-bizd"
restart: always
ports:
- 9876:9876
environment:
- JAVA_OPT_EXT=-server -Xms256m -Xmx256m -Xmn256m
volumes:
- /data/integrated/rocketmq/data/namesrv/logs:/root/logs
command: sh mqnamesrv

#Service for broker
broker:
image: apacherocketmq/rocketmq
networks:
- integrated-dev-net
container_name: "rocketmq-broker-bizd"
restart: always
links:
- namesrv
depends_on:
- namesrv
ports:
- 10909:10909
- 10911:10911
- 10912:10912
environment:
- NAMESRV_ADDR=namesrv:9876
- JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m
volumes:
- /data/integrated/rocketmq/data/broker/logs:/home/rocketmq/logs
- /data/integrated/rocketmq/data/broker/store:/home/rocketmq/store
- /data/integrated/rocketmq/etc/broker/broker.conf:/home/rocketmq/conf/broker.conf
command: sh mqbroker -c /home/rocketmq/conf/broker.conf

#Service for rocketmq-dashboard
dashboard:
image: apacherocketmq/rocketmq-dashboard
networks:
- integrated-dev-net
container_name: "rocketmq-dashboard-bizd"
restart: always
ports:
- 8081:8080
links:
- namesrv
depends_on:
- namesrv
environment:
- NAMESRV_ADDR=namesrv:9876