diff --git a/pom.xml b/pom.xml index a010ba0..2412329 100644 --- a/pom.xml +++ b/pom.xml @@ -15,62 +15,62 @@ Demo project for Spring Boot 1.8 + 2.6 + + org.springframework.boot spring-boot-starter-web - - - org.projectlombok - lombok - true - + + + org.projectlombok + lombok + true + + com.alibaba fastjson 1.2.83 - - - - - - - + com.spring4all swagger-spring-boot-starter 1.9.1.RELEASE - com.github.xiaoymin knife4j-spring-ui 2.0.4 + + + org.springframework.kafka + spring-kafka + + - org.springframework.boot - spring-boot-starter-web + commons-io + commons-io + ${commons.version} - org.springframework.kafka - spring-kafka + commons-lang + commons-lang + ${commons.version} - diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..464a85a --- /dev/null +++ b/readme.md @@ -0,0 +1,12 @@ +###配置 +#### AppCode: 42142fd0jkbf4515853b7fcec64748f6 +#### X-Consumer-Username: dwVendor +#### key: dwVendor +#### Secret: fEVcb^QFB;IN$K5 +#### URL:https://10.0.10.153:2443/api/iot/v1/devices/methods + +###服务器连接 +#### IP: 10.0.10.153 +#### user: root +#### pwd: qwer1234 +# \ No newline at end of file diff --git a/src/main/java/com/example/demokafka/controller/KafkaController.java b/src/main/java/com/example/demokafka/controller/KafkaController.java new file mode 100644 index 0000000..64557ad --- /dev/null +++ b/src/main/java/com/example/demokafka/controller/KafkaController.java @@ -0,0 +1,157 @@ +package com.example.demokafka.controller; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.example.demokafka.common.api.vo.Result; +import com.example.demokafka.dto.GuangZhaoChuanGanQiDTO; +import com.example.demokafka.dto.HuanJingJianCeDTO; +import com.example.demokafka.dto.KafkaConsumerDTO; +import com.example.demokafka.dto.KongQiZhiLiangChuanGanQiDTO; +import com.example.demokafka.util.KafkaUtil; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.util.StringUtils; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; +import java.util.*; + +/** + * @Description + * @Author ZhouWenTao + * @Date 2023/9/5 9:57 + */ +@RestController +@RequestMapping("/kafka") +@Slf4j +@Api(tags = "Kafka接口") +public class KafkaController { + @Resource + private KafkaUtil kafkaUtil; + @GetMapping("/consumer/{topic}") + @ApiOperation(value = "consumer") + public Result getConsumerData(@PathVariable String topic) { + if (StringUtils.isEmpty(topic)) { + return Result.error("请确认topic是否正确"); + } + + List resultList = new ArrayList<>(); + Consumer consumer = new KafkaConsumer<>(kafkaUtil.getSafeConsumerPro()); + consumer.subscribe(Collections.singletonList(topic)); + ConsumerRecords records = consumer.poll(5000); + JSONArray jsonArray=new JSONArray(); + for (ConsumerRecord consumerRecord : records) { + String kfkContent = consumerRecord.value(); + resultList.add(kfkContent); + if (!StringUtils.isEmpty(kfkContent)) { + jsonArray.add(JSONObject.parseObject(kfkContent)); + } + } + consumer.close(); + + + + long newCreateTime =0L; + + switch (topic) { + case "guangzhaochuanganqi": { + //光照传感器 + List jsonObjects = jsonArray.toJavaList(JSONObject.class); + List> kafkaConsumerDTOList; + KafkaConsumerDTO kafkaConsumerDTO; + //排除掉旧的重复aeid + Map> aeidMap = new HashMap<>(); + for (JSONObject jsonObject : jsonObjects) { + kafkaConsumerDTO = jsonObject.toJavaObject(KafkaConsumerDTO.class); + String aeid = kafkaConsumerDTO.getAeid(); + String createtimeStr = kafkaConsumerDTO.getCreatetime(); + newCreateTime = Long.parseLong(createtimeStr); + KafkaConsumerDTO kafkaConsumerDTO1 = aeidMap.get(kafkaConsumerDTO.getAeid()); + if (kafkaConsumerDTO1 == null) { + aeidMap.put(aeid, kafkaConsumerDTO); + } else { + long createtime = Long.parseLong(kafkaConsumerDTO1.getCreatetime() == null ? "0" : kafkaConsumerDTO1.getCreatetime()); + if (StringUtils.isEmpty(kafkaConsumerDTO.getCreatetime())) { + continue; + } + if (createtime >= newCreateTime) { + continue; + } + } + aeidMap.put(aeid, kafkaConsumerDTO); + } + kafkaConsumerDTOList = new ArrayList<>(aeidMap.values()); + return Result.OK(kafkaConsumerDTOList); + } + case "kongqizhiliangchuanganqi": { + //空气质量传感器 + List jsonObjects = jsonArray.toJavaList(JSONObject.class); + List> kafkaConsumerDTOList; + KafkaConsumerDTO kafkaConsumerDTO; + //排除掉旧的重复aeid + Map> aeidMap = new HashMap<>(); + for (JSONObject jsonObject : jsonObjects) { + kafkaConsumerDTO = jsonObject.toJavaObject(KafkaConsumerDTO.class); + String aeid = kafkaConsumerDTO.getAeid(); + String createtimeStr = kafkaConsumerDTO.getCreatetime(); + newCreateTime = Long.parseLong(createtimeStr); + KafkaConsumerDTO kafkaConsumerDTO1 = aeidMap.get(kafkaConsumerDTO.getAeid()); + if (kafkaConsumerDTO1 == null) { + aeidMap.put(aeid, kafkaConsumerDTO); + } else { + long createtime = Long.parseLong(kafkaConsumerDTO1.getCreatetime() == null ? "0" : kafkaConsumerDTO1.getCreatetime()); + if (StringUtils.isEmpty(kafkaConsumerDTO.getCreatetime())) { + continue; + } + if (createtime >= newCreateTime) { + continue; + } + } + aeidMap.put(aeid, kafkaConsumerDTO); + } + kafkaConsumerDTOList = new ArrayList<>(aeidMap.values()); + return Result.OK(kafkaConsumerDTOList); + } + case "huanjingjiance": { + //环境监测 + List jsonObjects = jsonArray.toJavaList(JSONObject.class); + List> kafkaConsumerDTOList = new ArrayList<>(); + KafkaConsumerDTO kafkaConsumerDTO; + //排除掉旧的重复aeid + Map> aeidMap = new HashMap<>(); + for (JSONObject jsonObject : jsonObjects) { + kafkaConsumerDTO = jsonObject.toJavaObject(KafkaConsumerDTO.class); + String aeid = kafkaConsumerDTO.getAeid(); + String createtimeStr = kafkaConsumerDTO.getCreatetime(); + newCreateTime = Long.parseLong(createtimeStr); + KafkaConsumerDTO kafkaConsumerDTO1 = aeidMap.get(kafkaConsumerDTO.getAeid()); + if (kafkaConsumerDTO1 == null) { + aeidMap.put(aeid, kafkaConsumerDTO); + } else { + long createtime = Long.parseLong(kafkaConsumerDTO1.getCreatetime() == null ? "0" : kafkaConsumerDTO1.getCreatetime()); + if (StringUtils.isEmpty(kafkaConsumerDTO.getCreatetime())) { + continue; + } + if (createtime >= newCreateTime) { + continue; + } + } + aeidMap.put(aeid, kafkaConsumerDTO); + } + kafkaConsumerDTOList = new ArrayList<>(aeidMap.values()); + return Result.OK(kafkaConsumerDTOList); + } + default: + return Result.error("请确认topic是否正确"); + } + } + +} diff --git a/src/main/java/com/example/demokafka/controller/TestController.java b/src/main/java/com/example/demokafka/controller/TestController.java deleted file mode 100644 index 8267622..0000000 --- a/src/main/java/com/example/demokafka/controller/TestController.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.example.demokafka.controller; - -import com.alibaba.fastjson.JSONObject; -import com.example.demokafka.common.api.vo.Result; -import com.example.demokafka.model.Person; -import com.example.demokafka.service.ProducerService; -import com.example.demokafka.util.Kafka; -import com.example.demokafka.util.KafkaUtil; -import io.swagger.annotations.*; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.format.annotation.DateTimeFormat; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.web.bind.annotation.*; -import springfox.documentation.annotations.ApiIgnore; - -import java.util.*; - -/** - * @author 赵恒 - * @date 2023/5/11 - */ -@RestController -@RequestMapping("/test") -@Slf4j -@Api(tags = "测试") -public class TestController { - - @Autowired - private ProducerService producerService; - @Autowired - private KafkaTemplate kafkaTemplate; - @Value("${spring.kafka.topic}") - private String topicName; - - @GetMapping("/kafka01") - @ApiOperation(value = "kafka01") - public List kafka01() { - producerService.sendMessage(topicName, "测试"); - return new ArrayList<>(); - } - - @GetMapping("/consumer/{topic}") - @ApiOperation(value = "consumer") - public Result getConsumerData(@PathVariable String topic) { - List result = new ArrayList<>(); - Consumer consumer = new KafkaConsumer(KafkaUtil.getSafeConsumerPro()); - consumer.subscribe(Arrays.asList(topic)); - ConsumerRecords records = consumer.poll(5000); - for (ConsumerRecord consumerRecord : records) { - String kfkContent = consumerRecord.value(); - result.add(kfkContent); - } - consumer.close(); - return Result.ok(result); - } -} diff --git a/src/main/java/com/example/demokafka/dto/GuangZhaoChuanGanQiDTO.java b/src/main/java/com/example/demokafka/dto/GuangZhaoChuanGanQiDTO.java new file mode 100644 index 0000000..8777aa1 --- /dev/null +++ b/src/main/java/com/example/demokafka/dto/GuangZhaoChuanGanQiDTO.java @@ -0,0 +1,23 @@ +package com.example.demokafka.dto; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +/** + * @Description 光照传感器返回对象 + * @Author ZhouWenTao + * @Date 2023/9/5 16:18 + */ +@Data +@ApiModel(value="光照传感器返回对象", description="光照传感器返回对象") +public class GuangZhaoChuanGanQiDTO { + @ApiModelProperty(value = "灯控设备id") + private String devid; + @ApiModelProperty(value = "Zigbee信号强度") + private String device_lqi; + @ApiModelProperty(value = "光照度") + private String lux; + @ApiModelProperty(value = "电池电量情况") + private String battery_percent; +} diff --git a/src/main/java/com/example/demokafka/dto/HuanJingJianCeDTO.java b/src/main/java/com/example/demokafka/dto/HuanJingJianCeDTO.java new file mode 100644 index 0000000..6337d1e --- /dev/null +++ b/src/main/java/com/example/demokafka/dto/HuanJingJianCeDTO.java @@ -0,0 +1,27 @@ +package com.example.demokafka.dto; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.io.Serializable; + +/** + * @Description 环境监测返回对象 + * @Author ZhouWenTao + * @Date 2023/9/5 14:34 + */ +@Data +@ApiModel(value="环境监测返回对象", description="环境监测返回对象") +public class HuanJingJianCeDTO implements Serializable { + @ApiModelProperty(value = "环境监测设备id") + private String devid; + @ApiModelProperty(value = "电池电量情况") + private String battery_percent; + @ApiModelProperty(value = "温度(除以100为 ℃,如3350除以100为33.5℃。)") + private String temperature; + @ApiModelProperty(value = "湿度(除以100为 %rh)") + private String humidity; + @ApiModelProperty(value = "Zigbee信号强度") + private String device_lqi; +} diff --git a/src/main/java/com/example/demokafka/dto/KafkaConsumerDTO.java b/src/main/java/com/example/demokafka/dto/KafkaConsumerDTO.java new file mode 100644 index 0000000..2209422 --- /dev/null +++ b/src/main/java/com/example/demokafka/dto/KafkaConsumerDTO.java @@ -0,0 +1,64 @@ +package com.example.demokafka.dto; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.io.Serializable; + +/** + * @Description kafka返回数据 + * @Author ZhouWenTao + * @Date 2023/9/5 14:19 + */ +@Data +@ApiModel(value="Kafka返回对象", description="Kafka返回对象") +public class KafkaConsumerDTO { + private static final long serialVersionUID = 1L; + + /** + * 设备序列号 + */ + @ApiModelProperty(value = "设备序列号") + private String aeid = ""; + /** + * 设备创建者 + */ + @ApiModelProperty(value = "设备创建者") + private String creator; + /** + * 创建时间 + */ + @ApiModelProperty(value = "创建时间") + private String createtime; + + /** + * 事件类型 + */ + @ApiModelProperty(value = "事件类型") + private String eventtype; + + /** + * 设备类型编号 + */ + @ApiModelProperty(value = "设备类型编号") + private String modelno; + + /** + * 事件名 + */ + @ApiModelProperty(value = "事件名") + private String eventname; + + /** + * 是否是重要的设备 + */ + @ApiModelProperty(value = "是否是重要的设备") + private String isvip; + + /** + * 返回数据对象 data + */ + @ApiModelProperty(value = "返回数据对象") + private T content; +} diff --git a/src/main/java/com/example/demokafka/dto/KongQiZhiLiangChuanGanQiDTO.java b/src/main/java/com/example/demokafka/dto/KongQiZhiLiangChuanGanQiDTO.java new file mode 100644 index 0000000..c125631 --- /dev/null +++ b/src/main/java/com/example/demokafka/dto/KongQiZhiLiangChuanGanQiDTO.java @@ -0,0 +1,37 @@ +package com.example.demokafka.dto; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.io.Serializable; + +/** + * @Description 空气质量传感器 + * @Author ZhouWenTao + * @Date 2023/9/5 17:01 + */ +@Data +@ApiModel(value="空气质量传感器返回对象", description="空气质量传感器返回对象") +public class KongQiZhiLiangChuanGanQiDTO implements Serializable { + @ApiModelProperty(value = "灯控设备id") + private String devid; + @ApiModelProperty(value = "Zigbee信号强度") + private String device_lqi; + @ApiModelProperty(value = "光照度") + private String lux; + @ApiModelProperty(value = "湿度(除以100为 %rh)") + private String humidity; + @ApiModelProperty(value = "温度(除以100为 ℃,如3350除以100为33.5℃。)") + private String temperature; + @ApiModelProperty(value = "PM2.5浓度(1ug/m³)") + private String pm25; + @ApiModelProperty(value = "二氧化碳浓度(ppm)") + private String co2; + @ApiModelProperty(value = "甲醛浓度(10ug/m³)") + private String ch2o; + @ApiModelProperty(value = "空气中挥发的有机化合物含量(10ug/m³)") + private String voc; + @ApiModelProperty(value = "电压(电池电压值, 100mV为单位。 上报30的话就是 30*100mV=3V。基于此判断低电量。3.3V正常,2.7以下为异常。)") + private String volt; +} diff --git a/src/main/java/com/example/demokafka/model/Person.java b/src/main/java/com/example/demokafka/model/Person.java deleted file mode 100644 index 59bf193..0000000 --- a/src/main/java/com/example/demokafka/model/Person.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.example.demokafka.model; - -import io.swagger.annotations.ApiModel; -import io.swagger.annotations.ApiModelProperty; -import lombok.Data; - -import java.time.LocalDateTime; - -/** - * @author 赵恒 - * @date 2023/5/11 - */ -@Data -@ApiModel -public class Person { - @ApiModelProperty(value = "人物ID", example = "1") - private Integer id; - @ApiModelProperty(value = "人物姓名") - private String name; - private LocalDateTime birthDay; - -} diff --git a/src/main/java/com/example/demokafka/util/FastJsonUtils.java b/src/main/java/com/example/demokafka/util/FastJsonUtils.java new file mode 100644 index 0000000..00f08af --- /dev/null +++ b/src/main/java/com/example/demokafka/util/FastJsonUtils.java @@ -0,0 +1,42 @@ +package com.example.demokafka.util; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import org.springframework.util.CollectionUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * @Description + * @Author ZhouWenTao + * @Date 2023/9/5 15:34 + */ +public class FastJsonUtils { + /** + * 将 fastjson的JSONArray转化为泛型列表 + * @param jsonArray 源数据 + * @param clz 泛型类 + * @param 泛型 + * @return list + */ + public static List convertJSONArrayToTypeList(JSONArray jsonArray, Class clz){ + if (CollectionUtils.isEmpty(jsonArray)){ + return new ArrayList(); + } + List result = new ArrayList(jsonArray.size()); + jsonArray.forEach(element->{ + // 基础类型不可以转化为JSONObject,需要特殊处理 + if (element instanceof String + || element instanceof Number + || element instanceof Boolean + ){ + result.add((T)element); + }else { + T t = JSONObject.toJavaObject((JSONObject)element, clz); + result.add(t); + } + }); + return result; + } +} diff --git a/src/main/java/com/example/demokafka/util/Kafka.java b/src/main/java/com/example/demokafka/util/Kafka.java index b5f9c24..4455d9e 100644 --- a/src/main/java/com/example/demokafka/util/Kafka.java +++ b/src/main/java/com/example/demokafka/util/Kafka.java @@ -1,3 +1,4 @@ +/* package com.example.demokafka.util; import org.apache.kafka.clients.consumer.Consumer; @@ -8,11 +9,13 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Arrays; +*/ /** * @Description * @Author ZhouWenTao * @Date 2023/9/1 14:05 - */ + *//* + public class Kafka extends Thread { Consumer consumer116 = new KafkaConsumer(KafkaUtil.getSafeConsumerPro()); @@ -26,10 +29,10 @@ public class Kafka extends Thread { } private void poll(String sourceTopicName) { - /*try { + try { consumer116.subscribe(Arrays.asList(sourceTopicName)); while (true) { - ConsumerRecords records = consumer116.poll(Duration.ofMillis(5000)); + ConsumerRecords records = consumer116.poll(5000); for (ConsumerRecord consumerRecord : records) { String kfkContent = consumerRecord.value(); System.out.println("Success to receive kafka :" + kfkContent); @@ -41,7 +44,7 @@ public class Kafka extends Thread { if (consumer116 != null) { consumer116.close(); } - }*/ + } } -} \ No newline at end of file +}*/ diff --git a/src/main/java/com/example/demokafka/util/KafkaUtil.java b/src/main/java/com/example/demokafka/util/KafkaUtil.java index fa5d44c..88e4445 100644 --- a/src/main/java/com/example/demokafka/util/KafkaUtil.java +++ b/src/main/java/com/example/demokafka/util/KafkaUtil.java @@ -1,6 +1,9 @@ package com.example.demokafka.util; +import org.apache.commons.lang.StringUtils; import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; import java.util.Properties; @@ -9,11 +12,14 @@ import java.util.Properties; * @Author ZhouWenTao * @Date 2023/9/1 13:40 */ +@Component public class KafkaUtil { public static String trustestore="/opt/kafka/client.trustestore.p12"; - public static String appCode = "42142fd0jkbf4515853b7fcec64748f6"; - + @Value("${appCode}") + public String appCode; + @Value("${secret}") + public String secret; public static String X_Consumer_Username = "dwVendor"; public static Properties getSafeProducerPro() { @@ -33,17 +39,23 @@ public class KafkaUtil { return properties; } //keytool -keystore D:/keystore/client.trustestore.p12 -storepass pwd123 -noprompt -alias client.trustestore -import -file D:/keystore/ca.crt -storetype PKCS12 - public static Properties getSafeConsumerPro() { + public Properties getSafeConsumerPro() { Properties properties = new Properties(); - properties.setProperty("bootstrap.servers", "10.0.10.153:29551,10.0.10.153:29552"); + properties.setProperty("group.id", "isv-kafka"); properties.setProperty("key.deserializer", StringDeserializer.class.getName()); properties.setProperty("value.deserializer", StringDeserializer.class.getName()); -// properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"dwVendor\" password=\"fEVcb^QFB;IN$K5\";"); -// properties.setProperty("sasl.mechanism", "SCRAM-SHA-512"); -// properties.setProperty("security.protocol", "SASL_SSL"); -// properties.setProperty("ssl.truststore.location", trustestore); -// properties.setProperty("ssl.truststore.password", "pwd123"); + if (StringUtils.isNotBlank(appCode)) { + properties.setProperty("bootstrap.servers", "10.0.10.153:29553,10.0.10.153:29554"); + properties.setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";",appCode,secret)); + properties.setProperty("sasl.mechanism", "SCRAM-SHA-512"); + properties.setProperty("security.protocol", "SASL_SSL"); + properties.setProperty("ssl.truststore.location", trustestore); + properties.setProperty("ssl.truststore.password", secret); + }else{ + properties.setProperty("bootstrap.servers", "10.0.10.153:29551,10.0.10.153:29552"); + } + properties.setProperty("enable.auto.commit", "true"); properties.setProperty("auto.commit.interval.ms", "1000"); properties.setProperty("session.timeout.ms", "30000"); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index ac74d59..ee816be 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -8,7 +8,7 @@ spring: #??????? topic: sourcetopic producer: - bootstrap-servers: 10.0.10.153:29551 + bootstrap-servers: 10.0.10.153:29551,10.0.10.153:29552,10.0.10.153:29553,10.0.10.153:29554 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 3 @@ -20,4 +20,8 @@ spring: receive.buffer.bytes: 32768 #??32KB???socket????????????-1?????????? send.buffer.bytes: 131072 #??128KB???socket???????????-1?????????? request.timeout.ms: 10000 #??30000ms?????????????? - transaction.timeout.ms: 5000 \ No newline at end of file + transaction.timeout.ms: 5000 + +key: dwVendor +secret: fEVcb^QFB;IN$K5 +appCode: 42142fd0jkbf4515853b7fcec64748f6