Apache Kafka作为分布式流处理平台的核心功能之一就是高效的消息传递。生产者(Producer)通过发送消息(Put)到指定主题(Topic),消费者(Consumer)则从主题中拉取消息。本文将深入解析Kafka发送消息的完整实现过程。
以下是使用Java客户端发送消息的最小化实现代码:
首先需要在项目中添加Kafka客户端依赖:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version> </dependency>
创建生产者并发送同步消息的核心代码:
Properties props = new Properties(); props.put(bootstrap.servers, localhost:9092); props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer); props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer); Producer<String, String> producer = new KafkaProducer<>(props); try { ProducerRecord<String, String> record = new ProducerRecord<>(test-topic, message-key, Hello Kafka!); RecordMetadata metadata = producer.send(record).get(); System.out.println(消息发送成功,分区: + metadata.partition()); } finally { producer.close(); }
通过回调函数实现非阻塞发送:
producer.send(record, (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); } else { System.out.println(异步发送成功,偏移量: + metadata.offset()); } });
配置批量发送参数提升吞吐量:
props.put(batch.size, 16384); // 16KB批量大小 props.put(linger.ms, 5); // 等待5ms
建议实现以下异常处理机制:
提升Kafka发送性能的关键方法:
通过设置max.in.flight.requests.per.connection=1确保单分区顺序性
建议实现幂等生产者或业务去重逻辑
合理配置connection.max.idle.ms和request.timeout.ms参数