parent
c0b1d78e23
commit
7eff106d0b
@ -0,0 +1,157 @@
|
|||||||
|
package com.example.demokafka.controller;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSONArray;
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import com.example.demokafka.common.api.vo.Result;
|
||||||
|
import com.example.demokafka.dto.GuangZhaoChuanGanQiDTO;
|
||||||
|
import com.example.demokafka.dto.HuanJingJianCeDTO;
|
||||||
|
import com.example.demokafka.dto.KafkaConsumerDTO;
|
||||||
|
import com.example.demokafka.dto.KongQiZhiLiangChuanGanQiDTO;
|
||||||
|
import com.example.demokafka.util.KafkaUtil;
|
||||||
|
import io.swagger.annotations.Api;
|
||||||
|
import io.swagger.annotations.ApiOperation;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.kafka.clients.consumer.Consumer;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
|
import org.springframework.util.StringUtils;
|
||||||
|
import org.springframework.web.bind.annotation.GetMapping;
|
||||||
|
import org.springframework.web.bind.annotation.PathVariable;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @Description
|
||||||
|
* @Author ZhouWenTao
|
||||||
|
* @Date 2023/9/5 9:57
|
||||||
|
*/
|
||||||
|
@RestController
|
||||||
|
@RequestMapping("/kafka")
|
||||||
|
@Slf4j
|
||||||
|
@Api(tags = "Kafka接口")
|
||||||
|
public class KafkaController {
|
||||||
|
@Resource
|
||||||
|
private KafkaUtil kafkaUtil;
|
||||||
|
@GetMapping("/consumer/{topic}")
|
||||||
|
@ApiOperation(value = "consumer")
|
||||||
|
public Result<?> getConsumerData(@PathVariable String topic) {
|
||||||
|
if (StringUtils.isEmpty(topic)) {
|
||||||
|
return Result.error("请确认topic是否正确");
|
||||||
|
}
|
||||||
|
|
||||||
|
List<String> resultList = new ArrayList<>();
|
||||||
|
Consumer<String, String> consumer = new KafkaConsumer<>(kafkaUtil.getSafeConsumerPro());
|
||||||
|
consumer.subscribe(Collections.singletonList(topic));
|
||||||
|
ConsumerRecords<String, String> records = consumer.poll(5000);
|
||||||
|
JSONArray jsonArray=new JSONArray();
|
||||||
|
for (ConsumerRecord<String, String> consumerRecord : records) {
|
||||||
|
String kfkContent = consumerRecord.value();
|
||||||
|
resultList.add(kfkContent);
|
||||||
|
if (!StringUtils.isEmpty(kfkContent)) {
|
||||||
|
jsonArray.add(JSONObject.parseObject(kfkContent));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
consumer.close();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
long newCreateTime =0L;
|
||||||
|
|
||||||
|
switch (topic) {
|
||||||
|
case "guangzhaochuanganqi": {
|
||||||
|
//光照传感器
|
||||||
|
List<JSONObject> jsonObjects = jsonArray.toJavaList(JSONObject.class);
|
||||||
|
List<KafkaConsumerDTO<GuangZhaoChuanGanQiDTO>> kafkaConsumerDTOList;
|
||||||
|
KafkaConsumerDTO<GuangZhaoChuanGanQiDTO> kafkaConsumerDTO;
|
||||||
|
//排除掉旧的重复aeid
|
||||||
|
Map<String, KafkaConsumerDTO<GuangZhaoChuanGanQiDTO>> aeidMap = new HashMap<>();
|
||||||
|
for (JSONObject jsonObject : jsonObjects) {
|
||||||
|
kafkaConsumerDTO = jsonObject.toJavaObject(KafkaConsumerDTO.class);
|
||||||
|
String aeid = kafkaConsumerDTO.getAeid();
|
||||||
|
String createtimeStr = kafkaConsumerDTO.getCreatetime();
|
||||||
|
newCreateTime = Long.parseLong(createtimeStr);
|
||||||
|
KafkaConsumerDTO<GuangZhaoChuanGanQiDTO> kafkaConsumerDTO1 = aeidMap.get(kafkaConsumerDTO.getAeid());
|
||||||
|
if (kafkaConsumerDTO1 == null) {
|
||||||
|
aeidMap.put(aeid, kafkaConsumerDTO);
|
||||||
|
} else {
|
||||||
|
long createtime = Long.parseLong(kafkaConsumerDTO1.getCreatetime() == null ? "0" : kafkaConsumerDTO1.getCreatetime());
|
||||||
|
if (StringUtils.isEmpty(kafkaConsumerDTO.getCreatetime())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (createtime >= newCreateTime) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
aeidMap.put(aeid, kafkaConsumerDTO);
|
||||||
|
}
|
||||||
|
kafkaConsumerDTOList = new ArrayList<>(aeidMap.values());
|
||||||
|
return Result.OK(kafkaConsumerDTOList);
|
||||||
|
}
|
||||||
|
case "kongqizhiliangchuanganqi": {
|
||||||
|
//空气质量传感器
|
||||||
|
List<JSONObject> jsonObjects = jsonArray.toJavaList(JSONObject.class);
|
||||||
|
List<KafkaConsumerDTO<KongQiZhiLiangChuanGanQiDTO>> kafkaConsumerDTOList;
|
||||||
|
KafkaConsumerDTO<KongQiZhiLiangChuanGanQiDTO> kafkaConsumerDTO;
|
||||||
|
//排除掉旧的重复aeid
|
||||||
|
Map<String, KafkaConsumerDTO<KongQiZhiLiangChuanGanQiDTO>> aeidMap = new HashMap<>();
|
||||||
|
for (JSONObject jsonObject : jsonObjects) {
|
||||||
|
kafkaConsumerDTO = jsonObject.toJavaObject(KafkaConsumerDTO.class);
|
||||||
|
String aeid = kafkaConsumerDTO.getAeid();
|
||||||
|
String createtimeStr = kafkaConsumerDTO.getCreatetime();
|
||||||
|
newCreateTime = Long.parseLong(createtimeStr);
|
||||||
|
KafkaConsumerDTO<KongQiZhiLiangChuanGanQiDTO> kafkaConsumerDTO1 = aeidMap.get(kafkaConsumerDTO.getAeid());
|
||||||
|
if (kafkaConsumerDTO1 == null) {
|
||||||
|
aeidMap.put(aeid, kafkaConsumerDTO);
|
||||||
|
} else {
|
||||||
|
long createtime = Long.parseLong(kafkaConsumerDTO1.getCreatetime() == null ? "0" : kafkaConsumerDTO1.getCreatetime());
|
||||||
|
if (StringUtils.isEmpty(kafkaConsumerDTO.getCreatetime())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (createtime >= newCreateTime) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
aeidMap.put(aeid, kafkaConsumerDTO);
|
||||||
|
}
|
||||||
|
kafkaConsumerDTOList = new ArrayList<>(aeidMap.values());
|
||||||
|
return Result.OK(kafkaConsumerDTOList);
|
||||||
|
}
|
||||||
|
case "huanjingjiance": {
|
||||||
|
//环境监测
|
||||||
|
List<JSONObject> jsonObjects = jsonArray.toJavaList(JSONObject.class);
|
||||||
|
List<KafkaConsumerDTO<HuanJingJianCeDTO>> kafkaConsumerDTOList = new ArrayList<>();
|
||||||
|
KafkaConsumerDTO<HuanJingJianCeDTO> kafkaConsumerDTO;
|
||||||
|
//排除掉旧的重复aeid
|
||||||
|
Map<String, KafkaConsumerDTO<HuanJingJianCeDTO>> aeidMap = new HashMap<>();
|
||||||
|
for (JSONObject jsonObject : jsonObjects) {
|
||||||
|
kafkaConsumerDTO = jsonObject.toJavaObject(KafkaConsumerDTO.class);
|
||||||
|
String aeid = kafkaConsumerDTO.getAeid();
|
||||||
|
String createtimeStr = kafkaConsumerDTO.getCreatetime();
|
||||||
|
newCreateTime = Long.parseLong(createtimeStr);
|
||||||
|
KafkaConsumerDTO<HuanJingJianCeDTO> kafkaConsumerDTO1 = aeidMap.get(kafkaConsumerDTO.getAeid());
|
||||||
|
if (kafkaConsumerDTO1 == null) {
|
||||||
|
aeidMap.put(aeid, kafkaConsumerDTO);
|
||||||
|
} else {
|
||||||
|
long createtime = Long.parseLong(kafkaConsumerDTO1.getCreatetime() == null ? "0" : kafkaConsumerDTO1.getCreatetime());
|
||||||
|
if (StringUtils.isEmpty(kafkaConsumerDTO.getCreatetime())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (createtime >= newCreateTime) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
aeidMap.put(aeid, kafkaConsumerDTO);
|
||||||
|
}
|
||||||
|
kafkaConsumerDTOList = new ArrayList<>(aeidMap.values());
|
||||||
|
return Result.OK(kafkaConsumerDTOList);
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return Result.error("请确认topic是否正确");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -1,62 +0,0 @@
|
|||||||
package com.example.demokafka.controller;
|
|
||||||
|
|
||||||
import com.alibaba.fastjson.JSONObject;
|
|
||||||
import com.example.demokafka.common.api.vo.Result;
|
|
||||||
import com.example.demokafka.model.Person;
|
|
||||||
import com.example.demokafka.service.ProducerService;
|
|
||||||
import com.example.demokafka.util.Kafka;
|
|
||||||
import com.example.demokafka.util.KafkaUtil;
|
|
||||||
import io.swagger.annotations.*;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.apache.kafka.clients.consumer.Consumer;
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
|
||||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
|
||||||
import org.springframework.format.annotation.DateTimeFormat;
|
|
||||||
import org.springframework.kafka.core.KafkaTemplate;
|
|
||||||
import org.springframework.web.bind.annotation.*;
|
|
||||||
import springfox.documentation.annotations.ApiIgnore;
|
|
||||||
|
|
||||||
import java.util.*;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author 赵恒
|
|
||||||
* @date 2023/5/11
|
|
||||||
*/
|
|
||||||
@RestController
|
|
||||||
@RequestMapping("/test")
|
|
||||||
@Slf4j
|
|
||||||
@Api(tags = "测试")
|
|
||||||
public class TestController {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private ProducerService producerService;
|
|
||||||
@Autowired
|
|
||||||
private KafkaTemplate<String, String> kafkaTemplate;
|
|
||||||
@Value("${spring.kafka.topic}")
|
|
||||||
private String topicName;
|
|
||||||
|
|
||||||
@GetMapping("/kafka01")
|
|
||||||
@ApiOperation(value = "kafka01")
|
|
||||||
public List<Person> kafka01() {
|
|
||||||
producerService.sendMessage(topicName, "测试");
|
|
||||||
return new ArrayList<>();
|
|
||||||
}
|
|
||||||
|
|
||||||
@GetMapping("/consumer/{topic}")
|
|
||||||
@ApiOperation(value = "consumer")
|
|
||||||
public Result getConsumerData(@PathVariable String topic) {
|
|
||||||
List<String> result = new ArrayList<>();
|
|
||||||
Consumer<String, String> consumer = new KafkaConsumer<String, String>(KafkaUtil.getSafeConsumerPro());
|
|
||||||
consumer.subscribe(Arrays.asList(topic));
|
|
||||||
ConsumerRecords<String, String> records = consumer.poll(5000);
|
|
||||||
for (ConsumerRecord<String, String> consumerRecord : records) {
|
|
||||||
String kfkContent = consumerRecord.value();
|
|
||||||
result.add(kfkContent);
|
|
||||||
}
|
|
||||||
consumer.close();
|
|
||||||
return Result.ok(result);
|
|
||||||
}
|
|
||||||
}
|
|
@ -0,0 +1,23 @@
|
|||||||
|
package com.example.demokafka.dto;
|
||||||
|
|
||||||
|
import io.swagger.annotations.ApiModel;
|
||||||
|
import io.swagger.annotations.ApiModelProperty;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @Description 光照传感器返回对象
|
||||||
|
* @Author ZhouWenTao
|
||||||
|
* @Date 2023/9/5 16:18
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@ApiModel(value="光照传感器返回对象", description="光照传感器返回对象")
|
||||||
|
public class GuangZhaoChuanGanQiDTO {
|
||||||
|
@ApiModelProperty(value = "灯控设备id")
|
||||||
|
private String devid;
|
||||||
|
@ApiModelProperty(value = "Zigbee信号强度")
|
||||||
|
private String device_lqi;
|
||||||
|
@ApiModelProperty(value = "光照度")
|
||||||
|
private String lux;
|
||||||
|
@ApiModelProperty(value = "电池电量情况")
|
||||||
|
private String battery_percent;
|
||||||
|
}
|
@ -0,0 +1,64 @@
|
|||||||
|
package com.example.demokafka.dto;
|
||||||
|
|
||||||
|
import io.swagger.annotations.ApiModel;
|
||||||
|
import io.swagger.annotations.ApiModelProperty;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @Description kafka返回数据
|
||||||
|
* @Author ZhouWenTao
|
||||||
|
* @Date 2023/9/5 14:19
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@ApiModel(value="Kafka返回对象", description="Kafka返回对象")
|
||||||
|
public class KafkaConsumerDTO<T> {
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 设备序列号
|
||||||
|
*/
|
||||||
|
@ApiModelProperty(value = "设备序列号")
|
||||||
|
private String aeid = "";
|
||||||
|
/**
|
||||||
|
* 设备创建者
|
||||||
|
*/
|
||||||
|
@ApiModelProperty(value = "设备创建者")
|
||||||
|
private String creator;
|
||||||
|
/**
|
||||||
|
* 创建时间
|
||||||
|
*/
|
||||||
|
@ApiModelProperty(value = "创建时间")
|
||||||
|
private String createtime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 事件类型
|
||||||
|
*/
|
||||||
|
@ApiModelProperty(value = "事件类型")
|
||||||
|
private String eventtype;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 设备类型编号
|
||||||
|
*/
|
||||||
|
@ApiModelProperty(value = "设备类型编号")
|
||||||
|
private String modelno;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 事件名
|
||||||
|
*/
|
||||||
|
@ApiModelProperty(value = "事件名")
|
||||||
|
private String eventname;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 是否是重要的设备
|
||||||
|
*/
|
||||||
|
@ApiModelProperty(value = "是否是重要的设备")
|
||||||
|
private String isvip;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 返回数据对象 data
|
||||||
|
*/
|
||||||
|
@ApiModelProperty(value = "返回数据对象")
|
||||||
|
private T content;
|
||||||
|
}
|
@ -1,22 +0,0 @@
|
|||||||
package com.example.demokafka.model;
|
|
||||||
|
|
||||||
import io.swagger.annotations.ApiModel;
|
|
||||||
import io.swagger.annotations.ApiModelProperty;
|
|
||||||
import lombok.Data;
|
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author 赵恒
|
|
||||||
* @date 2023/5/11
|
|
||||||
*/
|
|
||||||
@Data
|
|
||||||
@ApiModel
|
|
||||||
public class Person {
|
|
||||||
@ApiModelProperty(value = "人物ID", example = "1")
|
|
||||||
private Integer id;
|
|
||||||
@ApiModelProperty(value = "人物姓名")
|
|
||||||
private String name;
|
|
||||||
private LocalDateTime birthDay;
|
|
||||||
|
|
||||||
}
|
|
Loading…
Reference in new issue