You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

322 lines
16 KiB

package com.example.zxweb.controller;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.example.zxweb.common.api.vo.Result;
import com.example.zxweb.dto.GuangZhaoChuanGanQiDTO;
import com.example.zxweb.dto.HuanJingJianCeDTO;
import com.example.zxweb.dto.KafkaConsumerDTO;
import com.example.zxweb.dto.KongQiZhiLiangChuanGanQiDTO;
import com.example.zxweb.utils.CacheManager;
import com.example.zxweb.utils.KafkaUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.RetriableException;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.*;
/**
* @Description
* @Author ZhouWenTao
* @Date 2023/9/5 9:57
*/
@RestController
@RequestMapping("/kafka")
@Slf4j
@Api(tags = "Kafka接口")
public class KafkaController {
@Resource
private KafkaUtil kafkaUtil;
@GetMapping("/test")
public Result<?> test(){
String test = (String) CacheManager.get("test");
if (StringUtils.isEmpty(test)) {
CacheManager.put("test","张三"+new Date().getTime());
}
return Result.OK(test);
}
/*
* 不鉴权
**/
@GetMapping("/consumer/{topic}")
@ApiOperation(value = "consumer")
public Result<?> getConsumerData(@PathVariable String topic) {
if (StringUtils.isEmpty(topic)) {
return Result.error("请确认topic是否正确");
}
List<String> resultList = new ArrayList<>();
Consumer<String, String> consumer = new KafkaConsumer<>(kafkaUtil.getSafeConsumerPro(false));
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String, String> records = consumer.poll(100);
JSONArray jsonArray=new JSONArray();
for (ConsumerRecord<String, String> consumerRecord : records) {
String kfkContent = consumerRecord.value();
System.out.println(kfkContent);
resultList.add(kfkContent);
if (!StringUtils.isEmpty(kfkContent)) {
jsonArray.add(JSONObject.parseObject(kfkContent));
}
}
consumer.close();
long newCreateTime =0L;
switch (topic) {
case "guangzhaochuanganqi": {
//光照传感器
List<JSONObject> jsonObjects = jsonArray.toJavaList(JSONObject.class);
List<KafkaConsumerDTO<GuangZhaoChuanGanQiDTO>> kafkaConsumerDTOList;
KafkaConsumerDTO<GuangZhaoChuanGanQiDTO> kafkaConsumerDTO;
//排除掉旧的重复aeid
Map<String, KafkaConsumerDTO<GuangZhaoChuanGanQiDTO>> aeidMap = new HashMap<>();
for (JSONObject jsonObject : jsonObjects) {
kafkaConsumerDTO = jsonObject.toJavaObject(KafkaConsumerDTO.class);
String aeid = kafkaConsumerDTO.getAeid();
String createtimeStr = kafkaConsumerDTO.getCreatetime();
newCreateTime = Long.parseLong(createtimeStr);
KafkaConsumerDTO<GuangZhaoChuanGanQiDTO> kafkaConsumerDTO1 = aeidMap.get(kafkaConsumerDTO.getAeid());
if (kafkaConsumerDTO1 == null) {
aeidMap.put(aeid, kafkaConsumerDTO);
} else {
long createtime = Long.parseLong(kafkaConsumerDTO1.getCreatetime() == null ? "0" : kafkaConsumerDTO1.getCreatetime());
if (StringUtils.isEmpty(kafkaConsumerDTO.getCreatetime())) {
continue;
}
if (createtime >= newCreateTime) {
continue;
}
}
aeidMap.put(aeid, kafkaConsumerDTO);
}
kafkaConsumerDTOList = new ArrayList<>(aeidMap.values());
return Result.OK(kafkaConsumerDTOList);
}
case "kongqizhiliangchuanganqi": {
//空气质量传感器
List<JSONObject> jsonObjects = jsonArray.toJavaList(JSONObject.class);
List<KafkaConsumerDTO<KongQiZhiLiangChuanGanQiDTO>> kafkaConsumerDTOList;
KafkaConsumerDTO<KongQiZhiLiangChuanGanQiDTO> kafkaConsumerDTO;
//排除掉旧的重复aeid
Map<String, KafkaConsumerDTO<KongQiZhiLiangChuanGanQiDTO>> aeidMap = new HashMap<>();
for (JSONObject jsonObject : jsonObjects) {
kafkaConsumerDTO = jsonObject.toJavaObject(KafkaConsumerDTO.class);
String aeid = kafkaConsumerDTO.getAeid();
String createtimeStr = kafkaConsumerDTO.getCreatetime();
newCreateTime = Long.parseLong(createtimeStr);
KafkaConsumerDTO<KongQiZhiLiangChuanGanQiDTO> kafkaConsumerDTO1 = aeidMap.get(kafkaConsumerDTO.getAeid());
if (kafkaConsumerDTO1 == null) {
aeidMap.put(aeid, kafkaConsumerDTO);
} else {
long createtime = Long.parseLong(kafkaConsumerDTO1.getCreatetime() == null ? "0" : kafkaConsumerDTO1.getCreatetime());
if (StringUtils.isEmpty(kafkaConsumerDTO.getCreatetime())) {
continue;
}
if (createtime >= newCreateTime) {
continue;
}
}
aeidMap.put(aeid, kafkaConsumerDTO);
}
kafkaConsumerDTOList = new ArrayList<>(aeidMap.values());
return Result.OK(kafkaConsumerDTOList);
}
case "huanjingjiance": {
//环境监测
List<JSONObject> jsonObjects = jsonArray.toJavaList(JSONObject.class);
List<KafkaConsumerDTO<HuanJingJianCeDTO>> kafkaConsumerDTOList = new ArrayList<>();
KafkaConsumerDTO<HuanJingJianCeDTO> kafkaConsumerDTO;
//排除掉旧的重复aeid
Map<String, KafkaConsumerDTO<HuanJingJianCeDTO>> aeidMap = new HashMap<>();
for (JSONObject jsonObject : jsonObjects) {
kafkaConsumerDTO = jsonObject.toJavaObject(KafkaConsumerDTO.class);
String aeid = kafkaConsumerDTO.getAeid();
String createtimeStr = kafkaConsumerDTO.getCreatetime();
newCreateTime = Long.parseLong(createtimeStr);
KafkaConsumerDTO<HuanJingJianCeDTO> kafkaConsumerDTO1 = aeidMap.get(kafkaConsumerDTO.getAeid());
if (kafkaConsumerDTO1 == null) {
aeidMap.put(aeid, kafkaConsumerDTO);
} else {
long createtime = Long.parseLong(kafkaConsumerDTO1.getCreatetime() == null ? "0" : kafkaConsumerDTO1.getCreatetime());
if (StringUtils.isEmpty(kafkaConsumerDTO.getCreatetime())) {
continue;
}
if (createtime >= newCreateTime) {
continue;
}
}
aeidMap.put(aeid, kafkaConsumerDTO);
}
kafkaConsumerDTOList = new ArrayList<>(aeidMap.values());
return Result.OK(kafkaConsumerDTOList);
}
default:
return Result.error("请确认topic是否正确");
}
}
/*
* 鉴权
**/
@GetMapping("/consumer2/{topic}")
public Result<?> getConsumerData2(@PathVariable String topic) {
if (StringUtils.isEmpty(topic)) {
return Result.error("请确认topic是否正确");
}
List<String> resultList = new ArrayList<>();
Consumer<String, String> consumer = new KafkaConsumer<>(kafkaUtil.getSafeConsumerPro(true));
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String, String> records = consumer.poll(100);
JSONArray jsonArray=new JSONArray();
for (ConsumerRecord<String, String> consumerRecord : records) {
String kfkContent = consumerRecord.value();
resultList.add(kfkContent);
if (!StringUtils.isEmpty(kfkContent)) {
jsonArray.add(JSONObject.parseObject(kfkContent));
}
}
consumer.close();
long newCreateTime =0L;
switch (topic) {
case "guangzhaochuanganqi": {
//光照传感器
List<JSONObject> jsonObjects = jsonArray.toJavaList(JSONObject.class);
List<KafkaConsumerDTO<GuangZhaoChuanGanQiDTO>> kafkaConsumerDTOList;
KafkaConsumerDTO<GuangZhaoChuanGanQiDTO> kafkaConsumerDTO;
//排除掉旧的重复aeid
Map<String, KafkaConsumerDTO<GuangZhaoChuanGanQiDTO>> aeidMap = new HashMap<>();
for (JSONObject jsonObject : jsonObjects) {
kafkaConsumerDTO = jsonObject.toJavaObject(KafkaConsumerDTO.class);
String aeid = kafkaConsumerDTO.getAeid();
String createtimeStr = kafkaConsumerDTO.getCreatetime();
newCreateTime = Long.parseLong(createtimeStr);
KafkaConsumerDTO<GuangZhaoChuanGanQiDTO> kafkaConsumerDTO1 = aeidMap.get(kafkaConsumerDTO.getAeid());
if (kafkaConsumerDTO1 == null) {
aeidMap.put(aeid, kafkaConsumerDTO);
} else {
long createtime = Long.parseLong(kafkaConsumerDTO1.getCreatetime() == null ? "0" : kafkaConsumerDTO1.getCreatetime());
if (StringUtils.isEmpty(kafkaConsumerDTO.getCreatetime())) {
continue;
}
if (createtime >= newCreateTime) {
continue;
}
}
aeidMap.put(aeid, kafkaConsumerDTO);
}
kafkaConsumerDTOList = new ArrayList<>(aeidMap.values());
return Result.OK(kafkaConsumerDTOList);
}
case "kongqizhiliangchuanganqi": {
//空气质量传感器
List<JSONObject> jsonObjects = jsonArray.toJavaList(JSONObject.class);
List<KafkaConsumerDTO<KongQiZhiLiangChuanGanQiDTO>> kafkaConsumerDTOList;
KafkaConsumerDTO<KongQiZhiLiangChuanGanQiDTO> kafkaConsumerDTO;
//排除掉旧的重复aeid
Map<String, KafkaConsumerDTO<KongQiZhiLiangChuanGanQiDTO>> aeidMap = new HashMap<>();
for (JSONObject jsonObject : jsonObjects) {
kafkaConsumerDTO = jsonObject.toJavaObject(KafkaConsumerDTO.class);
String aeid = kafkaConsumerDTO.getAeid();
String createtimeStr = kafkaConsumerDTO.getCreatetime();
newCreateTime = Long.parseLong(createtimeStr);
KafkaConsumerDTO<KongQiZhiLiangChuanGanQiDTO> kafkaConsumerDTO1 = aeidMap.get(kafkaConsumerDTO.getAeid());
if (kafkaConsumerDTO1 == null) {
aeidMap.put(aeid, kafkaConsumerDTO);
} else {
long createtime = Long.parseLong(kafkaConsumerDTO1.getCreatetime() == null ? "0" : kafkaConsumerDTO1.getCreatetime());
if (StringUtils.isEmpty(kafkaConsumerDTO.getCreatetime())) {
continue;
}
if (createtime >= newCreateTime) {
continue;
}
}
aeidMap.put(aeid, kafkaConsumerDTO);
}
kafkaConsumerDTOList = new ArrayList<>(aeidMap.values());
return Result.OK(kafkaConsumerDTOList);
}
case "huanjingjiance": {
//环境监测
List<JSONObject> jsonObjects = jsonArray.toJavaList(JSONObject.class);
List<KafkaConsumerDTO<HuanJingJianCeDTO>> kafkaConsumerDTOList = new ArrayList<>();
KafkaConsumerDTO<HuanJingJianCeDTO> kafkaConsumerDTO;
//排除掉旧的重复aeid
Map<String, KafkaConsumerDTO<HuanJingJianCeDTO>> aeidMap = new HashMap<>();
for (JSONObject jsonObject : jsonObjects) {
kafkaConsumerDTO = jsonObject.toJavaObject(KafkaConsumerDTO.class);
String aeid = kafkaConsumerDTO.getAeid();
String createtimeStr = kafkaConsumerDTO.getCreatetime();
newCreateTime = Long.parseLong(createtimeStr);
KafkaConsumerDTO<HuanJingJianCeDTO> kafkaConsumerDTO1 = aeidMap.get(kafkaConsumerDTO.getAeid());
if (kafkaConsumerDTO1 == null) {
aeidMap.put(aeid, kafkaConsumerDTO);
} else {
long createtime = Long.parseLong(kafkaConsumerDTO1.getCreatetime() == null ? "0" : kafkaConsumerDTO1.getCreatetime());
if (StringUtils.isEmpty(kafkaConsumerDTO.getCreatetime())) {
continue;
}
if (createtime >= newCreateTime) {
continue;
}
}
aeidMap.put(aeid, kafkaConsumerDTO);
}
kafkaConsumerDTOList = new ArrayList<>(aeidMap.values());
return Result.OK(kafkaConsumerDTOList);
}
default:
return Result.error("请确认topic是否正确");
}
}
@PostMapping("/producer")
@ApiOperation(value = "producer")
public Result<?> postProducerData(@RequestBody JSONObject requestBody){
String topic = requestBody.getString("topic");
// 创建 KafkaProducer
KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaUtil.getSafeProducerPro());
// 发送消息
ProducerRecord<String, String> record = new ProducerRecord<>(topic,requestBody.toJSONString());
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
log.info("kafka send successful");
} else {
if (e instanceof RetriableException) {
//处理可重试异常
try {
log.error("kafka send fail Retry sending.");
Thread.sleep(3000);
//MyProducer.getInstance().send();
} catch (InterruptedException e1) {
log.error("kafka error :", e1);
}
} else {
throw new KafkaException("kafka server message error.");
}
}
}
});
log.info("-------------------");
log.info(record.value());
// 关闭 KafkaProducer
producer.close();
return Result.OK();
}
}