# 分布式消息使用

此处为 kafka 消息的使用及教程,消息使用有一定的场景限制,比如分布式事务,还有日志等

# 添加依赖

初始化依赖

<dependency>
    <groupId>com.alinesno.cloud.compoment.kafka</groupId>
	<artifactId>alinesno-cloud-component-kafka-producer</artifactId>
    <version>${最新版本}</version>
</dependency>

消息注解

@EnableKafka
@SpringBootApplication
public class BaseBootWebApplication {

	public static void main(String[] args) {
		SpringApplication.run(BaseBootWebApplication.class, args);
	}

}

# 消息使用

发送消息

// 添加依赖
@Autowired
private KafkaProducer kafkaProducer ;

kafkaProducer.blockSendMessage("消息主题", "消息内容");

手动提交消息

@Component
class AskListener {

    @KafkaListener(topicPattern = "test-01", containerFactory = "kafkaManualAckListenerContainerFactory")
    public void listen(ConsumerRecord<?, ?> record, @Payload String data ,
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,
            Acknowledgment ack) {

        log.info("data:{} , key:{} , partition:{} , topic:{} , ts:{}" ,data , key , partition , topic , ts );

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            log.info("message =topic:" + topic + ", " + message);
        }

        ack.acknowledge();
    }
}

自动提交消息

@Component
class AutoListener {

    @KafkaListener(topicPattern = "test")
    public void listen(ConsumerRecord<?, ?> record, @Payload String data ,
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
            ) {

        log.info("data:{} , key:{} , partition:{} , topic:{} , ts:{}" ,data , key , partition , topic , ts );


        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            log.info("message =topic:" + topic + ", " + message);
        }
    }
}