diff --git a/src/main/java/com/example/demokafka/controller/TestController.java b/src/main/java/com/example/demokafka/controller/TestController.java index 658d984..8267622 100644 --- a/src/main/java/com/example/demokafka/controller/TestController.java +++ b/src/main/java/com/example/demokafka/controller/TestController.java @@ -1,11 +1,17 @@ package com.example.demokafka.controller; import com.alibaba.fastjson.JSONObject; +import com.example.demokafka.common.api.vo.Result; import com.example.demokafka.model.Person; import com.example.demokafka.service.ProducerService; import com.example.demokafka.util.Kafka; +import com.example.demokafka.util.KafkaUtil; import io.swagger.annotations.*; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.format.annotation.DateTimeFormat; @@ -13,10 +19,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.*; import springfox.documentation.annotations.ApiIgnore; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; +import java.util.*; /** * @author 赵恒 @@ -41,4 +44,19 @@ public class TestController { producerService.sendMessage(topicName, "测试"); return new ArrayList<>(); } + + @GetMapping("/consumer/{topic}") + @ApiOperation(value = "consumer") + public Result getConsumerData(@PathVariable String topic) { + List result = new ArrayList<>(); + Consumer consumer = new KafkaConsumer(KafkaUtil.getSafeConsumerPro()); + consumer.subscribe(Arrays.asList(topic)); + ConsumerRecords records = consumer.poll(5000); + for (ConsumerRecord consumerRecord : records) { + String kfkContent = consumerRecord.value(); + result.add(kfkContent); + } + consumer.close(); + return Result.ok(result); + } } diff --git a/src/main/java/com/example/demokafka/util/KafkaUtil.java b/src/main/java/com/example/demokafka/util/KafkaUtil.java index 0f406be..6f37a92 100644 --- a/src/main/java/com/example/demokafka/util/KafkaUtil.java +++ b/src/main/java/com/example/demokafka/util/KafkaUtil.java @@ -39,7 +39,7 @@ public class KafkaUtil { properties.setProperty("group.id", "isv-kafka"); properties.setProperty("key.deserializer", StringDeserializer.class.getName()); properties.setProperty("value.deserializer", StringDeserializer.class.getName()); - properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"e92224\" password=\"323236g6#\";"); + properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"dwVendor\" password=\"fEVcb^QFB;IN$K5\";"); properties.setProperty("sasl.mechanism", "SCRAM-SHA-512"); properties.setProperty("security.protocol", "SASL_SSL"); properties.setProperty("ssl.truststore.location", trustestore); @@ -51,4 +51,4 @@ public class KafkaUtil { properties.setProperty("ssl.endpoint.identification.algorithm", ""); return properties; } -} \ No newline at end of file +}