# 业务消息集成
# 工程示例
系统应用集成示例工程打开 (opens new window)
# 概述
业务集成消息的方式和使用方式,包括消息的发送和消息接收
# 本内容你将获得
- 消息发送集成
- 消息事务消费处理
- 消息幂等处理
- 消息异常处理
# 消息使用方式
这里主要演示的消息收发管理和使用,这里主要是集成消息的业务处理方式,包括分布式消息锁, 消息幂等处理等
# 生产端发送消息
消息发送有预处理和直接发送两各方式,预处理方式是在业务完成之后,确认发送处理,直接发送 则是业务完成之后直接发送即可
# 预处理
预发送消息
MqMessageEntity dto = new MqMessageEntity() ;
dto.setTopic("kafka-test.02");
dto.setData("消息内容");
String json = JSONObject.toJSONString(dto) ;
String result = HttpRequest.post(HOST_API + "saveMessageLocal")
.header("Content-Type", "application/json")//头信息,多个头信息多次调用此方法即可
.header("messageBusinessId",messageBusinessId)
.body(json)//等价于使用.form() 但是两者之间传递的参数类型不一样
.execute().body();
log.debug("res = {}" , result);
完成之后,确认消息并发送
String messageId = "f9f13e563037b5dc544c62abd138dcd6" ; // 示例数据
MqMessageEntity dto = new MqMessageEntity() ;
dto.setId(messageId);
String json = JSONObject.toJSONString(dto) ;
String result = HttpRequest.post(HOST_API + "confirmAndSendMessage")
.header("Content-Type", "application/json")//头信息,多个头信息多次调用此方法即可
.header("messageBusinessId",messageBusinessId)
.body(json)//等价于使用.form() 但是两者之间传递的参数类型不一样
.execute().body();
log.debug("res = {}" , result);
# 直接发送
这类型的直接发送消息即可
RequestMessageDto dto = new RequestMessageDto() ;
dto.setBusinessId("业务关键字");
dto.setTopicName("消息主题");
dto.setMessageBody("消息内容");
String json = JSONObject.toJSONString(dto) ;
String result = HttpRequest.post(HOST_API + "sendMessage")
.header("Content-Type", "application/json")//头信息,多个头信息多次调用此方法即可
.header("messageBusinessId",messageBusinessId)
.body(json)//等价于使用.form() 但是两者之间传递的参数类型不一样
.execute().body();
log.debug("res = {}" , result);
# 消费端接收消息
消费端接收到消息之后,解析消息体进行业务处理,这里演示kafka接收,如下
@Component
public class KafkaSubscriber {
private static final Logger log = LoggerFactory.getLogger(KafkaSubscriber.class);
/**
* 监控通配的topic
*
* @param record
* @param topic
*/
@KafkaListener(topicPattern = "kafka-test.*")
public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("==>>>>>>>> record:{} , topic:{}", record, topic);
log.info("==>>>>>>>> message:{} , topic:{}", message, topic);
}
}
/**
* 监听某个topic的消息
*/
@KafkaListener(id = "id01", topics = "kafka-test-topic-02")
public void listen(String data) {
log.info("==>>>>>>>> data :{} , topic:{}", data, "kafka-test-topic-02");
// 更新消息状态
}
}
# 消费端异常处理
消息异常处理这里是将异常抛出和写回消息中间件,示例如下:
@KafkaListener(id = "id01", topics = "kafka-test-topic-02")
public void listen(String data) {
log.info("==>>>>>>>> data :{} , topic:{}", data, "kafka-test-topic-02");
try{
// 处理业务
}catch(Exception e){
// 异常处理,消息写回中间件
// 调用方法回调异常
// 抛出异常
thrown new RpcServiceException("消息["+data+"]处理异常:" + e)
}
}
# 消费端加锁处理
分布式事务加锁处理的情况,针对同一业务不能并发执行时的处理,根据业务关键字进行的加锁处理, 查看分布式锁技术章节,这里集成的是redis进行锁处理, 示例如下:
@RedisMultiLocked(expression = "T(com.example.MyUtils).getNamesWithId(#p0)")
@KafkaListener(id = "id01", topics = "kafka-test-topic-02")
public void listen(String data) {
log.info("==>>>>>>>> data :{} , topic:{}", data, "kafka-test-topic-02");
// 更新消息状态
}
# 其它
- 无