From cd464421f1f8fc3cf263de9518606add0b1a3fe7 Mon Sep 17 00:00:00 2001 From: zhouwentao <1577701412@qq.com> Date: Tue, 12 Sep 2023 15:17:10 +0800 Subject: [PATCH] updates --- .../zxweb/controller/KafkaController.java | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/example/zxweb/controller/KafkaController.java b/src/main/java/com/example/zxweb/controller/KafkaController.java index fff1491..9ede275 100644 --- a/src/main/java/com/example/zxweb/controller/KafkaController.java +++ b/src/main/java/com/example/zxweb/controller/KafkaController.java @@ -16,8 +16,12 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.RetriableException; import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.*; @@ -171,7 +175,27 @@ public class KafkaController { KafkaProducer producer = new KafkaProducer<>(kafkaUtil.getSafeProducerPro()); // 发送消息 ProducerRecord record = new ProducerRecord<>(topic,requestBody.toJSONString()); - producer.send(record); + producer.send(record, new Callback() { + @Override + public void onCompletion(RecordMetadata recordMetadata, Exception e) { + if (e == null) { + log.info("kafka send successful"); + } else { + if (e instanceof RetriableException) { + //处理可重试异常 + try { + log.error("kafka send fail Retry sending."); + Thread.sleep(3000); + //MyProducer.getInstance().send(); + } catch (InterruptedException e1) { + log.error("kafka error :", e1); + } + } else { + throw new KafkaException("kafka server message error."); + } + } + } + }); log.info("-------------------"); log.info(record.value()); // 关闭 KafkaProducer