From 647a9fe56ff915b01002a7b00c7a118a49e91df2 Mon Sep 17 00:00:00 2001 From: zhouwentao <1577701412@qq.com> Date: Thu, 21 Sep 2023 20:49:47 +0800 Subject: [PATCH] updates --- .../java/com/example/zxweb/controller/KafkaController.java | 4 ++-- src/main/java/com/example/zxweb/utils/KafkaUtil.java | 7 ++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/example/zxweb/controller/KafkaController.java b/src/main/java/com/example/zxweb/controller/KafkaController.java index 797f248..d5684de 100644 --- a/src/main/java/com/example/zxweb/controller/KafkaController.java +++ b/src/main/java/com/example/zxweb/controller/KafkaController.java @@ -61,7 +61,7 @@ public class KafkaController { List resultList = new ArrayList<>(); Consumer consumer = new KafkaConsumer<>(kafkaUtil.getSafeConsumerPro(false)); consumer.subscribe(Collections.singletonList(topic)); - ConsumerRecords records = consumer.poll(5000); + ConsumerRecords records = consumer.poll(100); JSONArray jsonArray=new JSONArray(); for (ConsumerRecord consumerRecord : records) { String kfkContent = consumerRecord.value(); @@ -176,7 +176,7 @@ public class KafkaController { List resultList = new ArrayList<>(); Consumer consumer = new KafkaConsumer<>(kafkaUtil.getSafeConsumerPro(true)); consumer.subscribe(Collections.singletonList(topic)); - ConsumerRecords records = consumer.poll(5000); + ConsumerRecords records = consumer.poll(100); JSONArray jsonArray=new JSONArray(); for (ConsumerRecord consumerRecord : records) { String kfkContent = consumerRecord.value(); diff --git a/src/main/java/com/example/zxweb/utils/KafkaUtil.java b/src/main/java/com/example/zxweb/utils/KafkaUtil.java index ac81a30..11a3b5c 100644 --- a/src/main/java/com/example/zxweb/utils/KafkaUtil.java +++ b/src/main/java/com/example/zxweb/utils/KafkaUtil.java @@ -68,7 +68,12 @@ public class KafkaUtil { properties.setProperty("enable.auto.commit", "true"); properties.setProperty("auto.commit.interval.ms", "1000"); - properties.setProperty("session.timeout.ms", "30000"); + properties.setProperty("session.timeout.ms", "30000");//一次session的连接超时时间 + properties.setProperty("heartbeat.interval.ms", "10000");// 心跳时间,一般为超时时间的1/3,Consumer在被判定为死亡之前,能够发送至少 3 轮的心跳请求 + + //properties.setProperty("max.poll.interval.ms", "30000");//每隔多长时间去拉取消息。合理设置预期值,尽量但间隔时间消费者处理完业务逻辑,否则就会被coordinator判定为死亡,踢出Consumer Group,进行Rebalance + //properties.setProperty("max.poll.records", "30000");//一次从拉取出来的数据条数。根据消费业务处理耗费时长合理设置,如果每次max.poll.interval.ms 设置的时间较短,可以max.poll.records设置小点儿,少拉取些,这样不会超时。 + properties.setProperty("auto.offset.reset", "earliest"); properties.setProperty("ssl.endpoint.identification.algorithm", ""); return properties;