# 分布式消息使用
此处为 kafka 消息的使用及教程,消息使用有一定的场景限制,比如分布式事务,还有日志等
# 添加依赖
初始化依赖
<dependency>
<groupId>com.alinesno.cloud.compoment.kafka</groupId>
<artifactId>alinesno-cloud-component-kafka-producer</artifactId>
<version>${最新版本}</version>
</dependency>
消息注解
@EnableKafka
@SpringBootApplication
public class BaseBootWebApplication {
public static void main(String[] args) {
SpringApplication.run(BaseBootWebApplication.class, args);
}
}
# 消息使用
发送消息
// 添加依赖
@Autowired
private KafkaProducer kafkaProducer ;
kafkaProducer.blockSendMessage("消息主题", "消息内容");
手动提交消息
@Component
class AskListener {
@KafkaListener(topicPattern = "test-01", containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(ConsumerRecord<?, ?> record, @Payload String data ,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,
Acknowledgment ack) {
log.info("data:{} , key:{} , partition:{} , topic:{} , ts:{}" ,data , key , partition , topic , ts );
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("message =topic:" + topic + ", " + message);
}
ack.acknowledge();
}
}
自动提交消息
@Component
class AutoListener {
@KafkaListener(topicPattern = "test")
public void listen(ConsumerRecord<?, ?> record, @Payload String data ,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
log.info("data:{} , key:{} , partition:{} , topic:{} , ts:{}" ,data , key , partition , topic , ts );
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("message =topic:" + topic + ", " + message);
}
}
}