# 业务消息集成

# 工程示例

系统应用集成示例工程打开 (opens new window)

# 概述

业务集成消息的方式和使用方式,包括消息的发送和消息接收

# 本内容你将获得

  • 消息发送集成
  • 消息事务消费处理
  • 消息幂等处理
  • 消息异常处理

# 消息使用方式

这里主要演示的消息收发管理和使用,这里主要是集成消息的业务处理方式,包括分布式消息锁, 消息幂等处理等

# 生产端发送消息

消息发送有预处理和直接发送两各方式,预处理方式是在业务完成之后,确认发送处理,直接发送 则是业务完成之后直接发送即可

# 预处理

预发送消息

MqMessageEntity dto = new MqMessageEntity() ;

dto.setTopic("kafka-test.02");
dto.setData("消息内容");

String json = JSONObject.toJSONString(dto) ;

String result = HttpRequest.post(HOST_API + "saveMessageLocal")
        .header("Content-Type", "application/json")//头信息,多个头信息多次调用此方法即可
        .header("messageBusinessId",messageBusinessId)
        .body(json)//等价于使用.form() 但是两者之间传递的参数类型不一样
        .execute().body();

log.debug("res = {}" , result);

完成之后,确认消息并发送

String messageId = "f9f13e563037b5dc544c62abd138dcd6" ; // 示例数据

MqMessageEntity dto = new MqMessageEntity() ;
dto.setId(messageId);

String json = JSONObject.toJSONString(dto) ;

String result = HttpRequest.post(HOST_API + "confirmAndSendMessage")
    .header("Content-Type", "application/json")//头信息,多个头信息多次调用此方法即可
    .header("messageBusinessId",messageBusinessId)
    .body(json)//等价于使用.form() 但是两者之间传递的参数类型不一样
    .execute().body();

log.debug("res = {}" , result);

# 直接发送

这类型的直接发送消息即可

RequestMessageDto dto = new RequestMessageDto() ;

dto.setBusinessId("业务关键字");
dto.setTopicName("消息主题");
dto.setMessageBody("消息内容");

String json = JSONObject.toJSONString(dto) ;

String result = HttpRequest.post(HOST_API + "sendMessage")
        .header("Content-Type", "application/json")//头信息,多个头信息多次调用此方法即可
        .header("messageBusinessId",messageBusinessId)
        .body(json)//等价于使用.form() 但是两者之间传递的参数类型不一样
        .execute().body();

log.debug("res = {}" , result);

# 消费端接收消息

消费端接收到消息之后,解析消息体进行业务处理,这里演示kafka接收,如下

@Component
public class KafkaSubscriber {

	private static final Logger log = LoggerFactory.getLogger(KafkaSubscriber.class);

	/**
	 * 监控通配的topic
	 *
	 * @param record
	 * @param topic
	 */
	@KafkaListener(topicPattern = "kafka-test.*")
	public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
		Optional<?> kafkaMessage = Optional.ofNullable(record.value());
		if (kafkaMessage.isPresent()) {
			Object message = kafkaMessage.get();

			log.info("==>>>>>>>> record:{} , topic:{}", record, topic);
			log.info("==>>>>>>>> message:{} , topic:{}", message, topic);
		}
	}

	/**
	 * 监听某个topic的消息
	 */
	@KafkaListener(id = "id01", topics = "kafka-test-topic-02")
	public void listen(String data) {
		log.info("==>>>>>>>> data :{} , topic:{}", data, "kafka-test-topic-02");
		// 更新消息状态

	}

}

# 消费端异常处理

消息异常处理这里是将异常抛出和写回消息中间件,示例如下:

@KafkaListener(id = "id01", topics = "kafka-test-topic-02")
public void listen(String data) {
    log.info("==>>>>>>>> data :{} , topic:{}", data, "kafka-test-topic-02");
    try{
        // 处理业务
    }catch(Exception e){
        // 异常处理,消息写回中间件
        // 调用方法回调异常
        // 抛出异常
        thrown new RpcServiceException("消息["+data+"]处理异常:" + e)
    }

}

# 消费端加锁处理

分布式事务加锁处理的情况,针对同一业务不能并发执行时的处理,根据业务关键字进行的加锁处理, 查看分布式锁技术章节,这里集成的是redis进行锁处理, 示例如下:

@RedisMultiLocked(expression = "T(com.example.MyUtils).getNamesWithId(#p0)")
@KafkaListener(id = "id01", topics = "kafka-test-topic-02")
public void listen(String data) {
    log.info("==>>>>>>>> data :{} , topic:{}", data, "kafka-test-topic-02");
    // 更新消息状态

}

# 其它