main
周文涛 2 years ago
parent c748a29aea
commit cd464421f1

@ -16,8 +16,12 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.RetriableException;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;
@ -171,7 +175,27 @@ public class KafkaController {
KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaUtil.getSafeProducerPro());
// 发送消息
ProducerRecord<String, String> record = new ProducerRecord<>(topic,requestBody.toJSONString());
producer.send(record);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
log.info("kafka send successful");
} else {
if (e instanceof RetriableException) {
//处理可重试异常
try {
log.error("kafka send fail Retry sending.");
Thread.sleep(3000);
//MyProducer.getInstance().send();
} catch (InterruptedException e1) {
log.error("kafka error :", e1);
}
} else {
throw new KafkaException("kafka server message error.");
}
}
}
});
log.info("-------------------");
log.info(record.value());
// 关闭 KafkaProducer

Loading…
Cancel
Save