From d4397582369ebbaaa125d265c75cf454b39b9927 Mon Sep 17 00:00:00 2001 From: zhouwentao <1577701412@qq.com> Date: Thu, 21 Sep 2023 18:23:12 +0800 Subject: [PATCH] updates --- .../zxweb/controller/KafkaController.java | 118 ++++++++++++++++-- .../com/example/zxweb/utils/KafkaUtil.java | 4 +- 2 files changed, 109 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/example/zxweb/controller/KafkaController.java b/src/main/java/com/example/zxweb/controller/KafkaController.java index 5e63d80..797f248 100644 --- a/src/main/java/com/example/zxweb/controller/KafkaController.java +++ b/src/main/java/com/example/zxweb/controller/KafkaController.java @@ -40,8 +40,6 @@ import java.util.*; public class KafkaController { @Resource private KafkaUtil kafkaUtil; - - @GetMapping("/test") public Result test(){ String test = (String) CacheManager.get("test"); @@ -51,13 +49,17 @@ public class KafkaController { return Result.OK(test); } - @GetMapping("/consumer2/{topic}") - public Result consumer2(@PathVariable String topic){ + /* + * 不鉴权 + **/ + @GetMapping("/consumer/{topic}") + @ApiOperation(value = "consumer") + public Result getConsumerData(@PathVariable String topic) { if (StringUtils.isEmpty(topic)) { return Result.error("请确认topic是否正确"); } List resultList = new ArrayList<>(); - Consumer consumer = new KafkaConsumer<>(kafkaUtil.getSafeConsumerPro()); + Consumer consumer = new KafkaConsumer<>(kafkaUtil.getSafeConsumerPro(false)); consumer.subscribe(Collections.singletonList(topic)); ConsumerRecords records = consumer.poll(5000); JSONArray jsonArray=new JSONArray(); @@ -69,18 +71,110 @@ public class KafkaController { } } consumer.close(); - return Result.OK(jsonArray); + long newCreateTime =0L; + switch (topic) { + case "guangzhaochuanganqi": { + //光照传感器 + List jsonObjects = jsonArray.toJavaList(JSONObject.class); + List> kafkaConsumerDTOList; + KafkaConsumerDTO kafkaConsumerDTO; + //排除掉旧的重复aeid + Map> aeidMap = new HashMap<>(); + for (JSONObject jsonObject : jsonObjects) { + kafkaConsumerDTO = jsonObject.toJavaObject(KafkaConsumerDTO.class); + String aeid = kafkaConsumerDTO.getAeid(); + String createtimeStr = kafkaConsumerDTO.getCreatetime(); + newCreateTime = Long.parseLong(createtimeStr); + KafkaConsumerDTO kafkaConsumerDTO1 = aeidMap.get(kafkaConsumerDTO.getAeid()); + if (kafkaConsumerDTO1 == null) { + aeidMap.put(aeid, kafkaConsumerDTO); + } else { + long createtime = Long.parseLong(kafkaConsumerDTO1.getCreatetime() == null ? "0" : kafkaConsumerDTO1.getCreatetime()); + if (StringUtils.isEmpty(kafkaConsumerDTO.getCreatetime())) { + continue; + } + if (createtime >= newCreateTime) { + continue; + } + } + aeidMap.put(aeid, kafkaConsumerDTO); + } + kafkaConsumerDTOList = new ArrayList<>(aeidMap.values()); + return Result.OK(kafkaConsumerDTOList); + } + case "kongqizhiliangchuanganqi": { + //空气质量传感器 + List jsonObjects = jsonArray.toJavaList(JSONObject.class); + List> kafkaConsumerDTOList; + KafkaConsumerDTO kafkaConsumerDTO; + //排除掉旧的重复aeid + Map> aeidMap = new HashMap<>(); + for (JSONObject jsonObject : jsonObjects) { + kafkaConsumerDTO = jsonObject.toJavaObject(KafkaConsumerDTO.class); + String aeid = kafkaConsumerDTO.getAeid(); + String createtimeStr = kafkaConsumerDTO.getCreatetime(); + newCreateTime = Long.parseLong(createtimeStr); + KafkaConsumerDTO kafkaConsumerDTO1 = aeidMap.get(kafkaConsumerDTO.getAeid()); + if (kafkaConsumerDTO1 == null) { + aeidMap.put(aeid, kafkaConsumerDTO); + } else { + long createtime = Long.parseLong(kafkaConsumerDTO1.getCreatetime() == null ? "0" : kafkaConsumerDTO1.getCreatetime()); + if (StringUtils.isEmpty(kafkaConsumerDTO.getCreatetime())) { + continue; + } + if (createtime >= newCreateTime) { + continue; + } + } + aeidMap.put(aeid, kafkaConsumerDTO); + } + kafkaConsumerDTOList = new ArrayList<>(aeidMap.values()); + return Result.OK(kafkaConsumerDTOList); + } + case "huanjingjiance": { + //环境监测 + List jsonObjects = jsonArray.toJavaList(JSONObject.class); + List> kafkaConsumerDTOList = new ArrayList<>(); + KafkaConsumerDTO kafkaConsumerDTO; + //排除掉旧的重复aeid + Map> aeidMap = new HashMap<>(); + for (JSONObject jsonObject : jsonObjects) { + kafkaConsumerDTO = jsonObject.toJavaObject(KafkaConsumerDTO.class); + String aeid = kafkaConsumerDTO.getAeid(); + String createtimeStr = kafkaConsumerDTO.getCreatetime(); + newCreateTime = Long.parseLong(createtimeStr); + KafkaConsumerDTO kafkaConsumerDTO1 = aeidMap.get(kafkaConsumerDTO.getAeid()); + if (kafkaConsumerDTO1 == null) { + aeidMap.put(aeid, kafkaConsumerDTO); + } else { + long createtime = Long.parseLong(kafkaConsumerDTO1.getCreatetime() == null ? "0" : kafkaConsumerDTO1.getCreatetime()); + if (StringUtils.isEmpty(kafkaConsumerDTO.getCreatetime())) { + continue; + } + if (createtime >= newCreateTime) { + continue; + } + } + aeidMap.put(aeid, kafkaConsumerDTO); + } + kafkaConsumerDTOList = new ArrayList<>(aeidMap.values()); + return Result.OK(kafkaConsumerDTOList); + } + default: + return Result.error("请确认topic是否正确"); + } } - @GetMapping("/consumer/{topic}") - @ApiOperation(value = "consumer") - public Result getConsumerData(@PathVariable String topic) { + /* + * 鉴权 + **/ + @GetMapping("/consumer2/{topic}") + public Result getConsumerData2(@PathVariable String topic) { if (StringUtils.isEmpty(topic)) { return Result.error("请确认topic是否正确"); } - List resultList = new ArrayList<>(); - Consumer consumer = new KafkaConsumer<>(kafkaUtil.getSafeConsumerPro()); + Consumer consumer = new KafkaConsumer<>(kafkaUtil.getSafeConsumerPro(true)); consumer.subscribe(Collections.singletonList(topic)); ConsumerRecords records = consumer.poll(5000); JSONArray jsonArray=new JSONArray(); @@ -186,6 +280,8 @@ public class KafkaController { } } + + @PostMapping("/producer") @ApiOperation(value = "producer") public Result postProducerData(@RequestBody JSONObject requestBody){ diff --git a/src/main/java/com/example/zxweb/utils/KafkaUtil.java b/src/main/java/com/example/zxweb/utils/KafkaUtil.java index 8ae7700..ac81a30 100644 --- a/src/main/java/com/example/zxweb/utils/KafkaUtil.java +++ b/src/main/java/com/example/zxweb/utils/KafkaUtil.java @@ -49,12 +49,12 @@ public class KafkaUtil { //keytool -keystore D:/keystore/client.trustestore.p12 -storepass fEVcb^QFB;IN$K5 -noprompt -alias client.trustestore -import -file D:/keystore/ca.crt -storetype PKCS12 //keytool -keystore D:/keystore/client.trustestore.p12 -storepass pwd123 -noprompt -alias client.trustestore -import -file D:/keystore/ca.crt -storetype PKCS12 - public Properties getSafeConsumerPro() { + public Properties getSafeConsumerPro(boolean sslFlag) { Properties properties = new Properties(); properties.setProperty("group.id", "isv-kafka"); properties.setProperty("key.deserializer", StringDeserializer.class.getName()); properties.setProperty("value.deserializer", StringDeserializer.class.getName()); - if (false) { + if (sslFlag) { System.out.println("使用鉴权"); properties.setProperty("bootstrap.servers", "10.0.10.153:29553,10.0.10.153:29554"); properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"dwVendor\" password=\"fEVcb^QFB;IN$K5\";");