From 2c54981e448b2e2feaab7673fc3d70b7277375f0 Mon Sep 17 00:00:00 2001 From: Christian Alfoni Date: Sat, 2 Sep 2023 14:12:35 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B6=88=E8=B4=B9=E8=80=85=E5=B1=9E=E6=80=A7?= =?UTF-8?q?=E7=94=A8=E6=88=B7/=E5=AF=86=E7=A0=81=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=EF=BC=9B=E5=A2=9E=E5=8A=A0=E6=B6=88=E8=B4=B9=E8=80=85=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../demokafka/controller/TestController.java | 26 ++++++++++++++++--- .../com/example/demokafka/util/KafkaUtil.java | 4 +-- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/example/demokafka/controller/TestController.java b/src/main/java/com/example/demokafka/controller/TestController.java index 658d984..8267622 100644 --- a/src/main/java/com/example/demokafka/controller/TestController.java +++ b/src/main/java/com/example/demokafka/controller/TestController.java @@ -1,11 +1,17 @@ package com.example.demokafka.controller; import com.alibaba.fastjson.JSONObject; +import com.example.demokafka.common.api.vo.Result; import com.example.demokafka.model.Person; import com.example.demokafka.service.ProducerService; import com.example.demokafka.util.Kafka; +import com.example.demokafka.util.KafkaUtil; import io.swagger.annotations.*; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.format.annotation.DateTimeFormat; @@ -13,10 +19,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.*; import springfox.documentation.annotations.ApiIgnore; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; +import java.util.*; /** * @author 赵恒 @@ -41,4 +44,19 @@ public class TestController { producerService.sendMessage(topicName, "测试"); return new ArrayList<>(); } + + @GetMapping("/consumer/{topic}") + @ApiOperation(value = "consumer") + public Result getConsumerData(@PathVariable String topic) { + List result = new ArrayList<>(); + Consumer consumer = new KafkaConsumer(KafkaUtil.getSafeConsumerPro()); + consumer.subscribe(Arrays.asList(topic)); + ConsumerRecords records = consumer.poll(5000); + for (ConsumerRecord consumerRecord : records) { + String kfkContent = consumerRecord.value(); + result.add(kfkContent); + } + consumer.close(); + return Result.ok(result); + } } diff --git a/src/main/java/com/example/demokafka/util/KafkaUtil.java b/src/main/java/com/example/demokafka/util/KafkaUtil.java index 0f406be..6f37a92 100644 --- a/src/main/java/com/example/demokafka/util/KafkaUtil.java +++ b/src/main/java/com/example/demokafka/util/KafkaUtil.java @@ -39,7 +39,7 @@ public class KafkaUtil { properties.setProperty("group.id", "isv-kafka"); properties.setProperty("key.deserializer", StringDeserializer.class.getName()); properties.setProperty("value.deserializer", StringDeserializer.class.getName()); - properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"e92224\" password=\"323236g6#\";"); + properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"dwVendor\" password=\"fEVcb^QFB;IN$K5\";"); properties.setProperty("sasl.mechanism", "SCRAM-SHA-512"); properties.setProperty("security.protocol", "SASL_SSL"); properties.setProperty("ssl.truststore.location", trustestore); @@ -51,4 +51,4 @@ public class KafkaUtil { properties.setProperty("ssl.endpoint.identification.algorithm", ""); return properties; } -} \ No newline at end of file +}