消费者属性用户/密码修改;增加消费者接口

main
Christian Alfoni 2 years ago
parent 2a5745f4ee
commit 2c54981e44

@ -1,11 +1,17 @@
package com.example.demokafka.controller;
import com.alibaba.fastjson.JSONObject;
import com.example.demokafka.common.api.vo.Result;
import com.example.demokafka.model.Person;
import com.example.demokafka.service.ProducerService;
import com.example.demokafka.util.Kafka;
import com.example.demokafka.util.KafkaUtil;
import io.swagger.annotations.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.format.annotation.DateTimeFormat;
@ -13,10 +19,7 @@ import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;
import springfox.documentation.annotations.ApiIgnore;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.*;
/**
* @author
@ -41,4 +44,19 @@ public class TestController {
producerService.sendMessage(topicName, "测试");
return new ArrayList<>();
}
@GetMapping("/consumer/{topic}")
@ApiOperation(value = "consumer")
public Result getConsumerData(@PathVariable String topic) {
List<String> result = new ArrayList<>();
Consumer<String, String> consumer = new KafkaConsumer<String, String>(KafkaUtil.getSafeConsumerPro());
consumer.subscribe(Arrays.asList(topic));
ConsumerRecords<String, String> records = consumer.poll(5000);
for (ConsumerRecord<String, String> consumerRecord : records) {
String kfkContent = consumerRecord.value();
result.add(kfkContent);
}
consumer.close();
return Result.ok(result);
}
}

@ -39,7 +39,7 @@ public class KafkaUtil {
properties.setProperty("group.id", "isv-kafka");
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", StringDeserializer.class.getName());
properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"e92224\" password=\"323236g6#\";");
properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"dwVendor\" password=\"fEVcb^QFB;IN$K5\";");
properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
properties.setProperty("security.protocol", "SASL_SSL");
properties.setProperty("ssl.truststore.location", trustestore);
@ -51,4 +51,4 @@ public class KafkaUtil {
properties.setProperty("ssl.endpoint.identification.algorithm", "");
return properties;
}
}
}

Loading…
Cancel
Save