main
周文涛 2 years ago
parent d439758236
commit 647a9fe56f

@ -61,7 +61,7 @@ public class KafkaController {
List<String> resultList = new ArrayList<>();
Consumer<String, String> consumer = new KafkaConsumer<>(kafkaUtil.getSafeConsumerPro(false));
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String, String> records = consumer.poll(5000);
ConsumerRecords<String, String> records = consumer.poll(100);
JSONArray jsonArray=new JSONArray();
for (ConsumerRecord<String, String> consumerRecord : records) {
String kfkContent = consumerRecord.value();
@ -176,7 +176,7 @@ public class KafkaController {
List<String> resultList = new ArrayList<>();
Consumer<String, String> consumer = new KafkaConsumer<>(kafkaUtil.getSafeConsumerPro(true));
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String, String> records = consumer.poll(5000);
ConsumerRecords<String, String> records = consumer.poll(100);
JSONArray jsonArray=new JSONArray();
for (ConsumerRecord<String, String> consumerRecord : records) {
String kfkContent = consumerRecord.value();

@ -68,7 +68,12 @@ public class KafkaUtil {
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("session.timeout.ms", "30000");
properties.setProperty("session.timeout.ms", "30000");//一次session的连接超时时间
properties.setProperty("heartbeat.interval.ms", "10000");// 心跳时间一般为超时时间的1/3Consumer在被判定为死亡之前能够发送至少 3 轮的心跳请求
//properties.setProperty("max.poll.interval.ms", "30000");//每隔多长时间去拉取消息。合理设置预期值尽量但间隔时间消费者处理完业务逻辑否则就会被coordinator判定为死亡踢出Consumer Group进行Rebalance
//properties.setProperty("max.poll.records", "30000");//一次从拉取出来的数据条数。根据消费业务处理耗费时长合理设置如果每次max.poll.interval.ms 设置的时间较短可以max.poll.records设置小点儿少拉取些这样不会超时。
properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("ssl.endpoint.identification.algorithm", "");
return properties;

Loading…
Cancel
Save