main
周文涛 2 years ago
parent cd464421f1
commit c729f60980

@ -51,6 +51,27 @@ public class KafkaController {
return Result.OK(test); return Result.OK(test);
} }
@GetMapping("/consumer2/{topic}")
public Result<?> consumer2(@PathVariable String topic){
if (StringUtils.isEmpty(topic)) {
return Result.error("请确认topic是否正确");
}
List<String> resultList = new ArrayList<>();
Consumer<String, String> consumer = new KafkaConsumer<>(kafkaUtil.getSafeConsumerPro());
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String, String> records = consumer.poll(5000);
JSONArray jsonArray=new JSONArray();
for (ConsumerRecord<String, String> consumerRecord : records) {
String kfkContent = consumerRecord.value();
resultList.add(kfkContent);
if (!StringUtils.isEmpty(kfkContent)) {
jsonArray.add(JSONObject.parseObject(kfkContent));
}
}
consumer.close();
return Result.OK(jsonArray);
}
@GetMapping("/consumer/{topic}") @GetMapping("/consumer/{topic}")
@ApiOperation(value = "consumer") @ApiOperation(value = "consumer")
public Result<?> getConsumerData(@PathVariable String topic) { public Result<?> getConsumerData(@PathVariable String topic) {

Loading…
Cancel
Save