package com.example.zxweb.controller; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.example.zxweb.common.api.vo.Result; import com.example.zxweb.dto.GuangZhaoChuanGanQiDTO; import com.example.zxweb.dto.HuanJingJianCeDTO; import com.example.zxweb.dto.KafkaConsumerDTO; import com.example.zxweb.dto.KongQiZhiLiangChuanGanQiDTO; import com.example.zxweb.utils.CacheManager; import com.example.zxweb.utils.KafkaUtil; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.RetriableException; import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; import java.util.*; /** * @Description * @Author ZhouWenTao * @Date 2023/9/5 9:57 */ @RestController @RequestMapping("/kafka") @Slf4j @Api(tags = "Kafka接口") public class KafkaController { @Resource private KafkaUtil kafkaUtil; @GetMapping("/test") public Result test(){ String test = (String) CacheManager.get("test"); if (StringUtils.isEmpty(test)) { CacheManager.put("test","张三"+new Date().getTime()); } return Result.OK(test); } /* * 不鉴权 **/ @GetMapping("/consumer/{topic}") @ApiOperation(value = "consumer") public Result getConsumerData(@PathVariable String topic) { if (StringUtils.isEmpty(topic)) { return Result.error("请确认topic是否正确"); } List resultList = new ArrayList<>(); Consumer consumer = new KafkaConsumer<>(kafkaUtil.getSafeConsumerPro(false)); consumer.subscribe(Collections.singletonList(topic)); ConsumerRecords records = consumer.poll(5000); JSONArray jsonArray=new JSONArray(); for (ConsumerRecord consumerRecord : records) { String kfkContent = consumerRecord.value(); resultList.add(kfkContent); if (!StringUtils.isEmpty(kfkContent)) { jsonArray.add(JSONObject.parseObject(kfkContent)); } } consumer.close(); long newCreateTime =0L; switch (topic) { case "guangzhaochuanganqi": { //光照传感器 List jsonObjects = jsonArray.toJavaList(JSONObject.class); List> kafkaConsumerDTOList; KafkaConsumerDTO kafkaConsumerDTO; //排除掉旧的重复aeid Map> aeidMap = new HashMap<>(); for (JSONObject jsonObject : jsonObjects) { kafkaConsumerDTO = jsonObject.toJavaObject(KafkaConsumerDTO.class); String aeid = kafkaConsumerDTO.getAeid(); String createtimeStr = kafkaConsumerDTO.getCreatetime(); newCreateTime = Long.parseLong(createtimeStr); KafkaConsumerDTO kafkaConsumerDTO1 = aeidMap.get(kafkaConsumerDTO.getAeid()); if (kafkaConsumerDTO1 == null) { aeidMap.put(aeid, kafkaConsumerDTO); } else { long createtime = Long.parseLong(kafkaConsumerDTO1.getCreatetime() == null ? "0" : kafkaConsumerDTO1.getCreatetime()); if (StringUtils.isEmpty(kafkaConsumerDTO.getCreatetime())) { continue; } if (createtime >= newCreateTime) { continue; } } aeidMap.put(aeid, kafkaConsumerDTO); } kafkaConsumerDTOList = new ArrayList<>(aeidMap.values()); return Result.OK(kafkaConsumerDTOList); } case "kongqizhiliangchuanganqi": { //空气质量传感器 List jsonObjects = jsonArray.toJavaList(JSONObject.class); List> kafkaConsumerDTOList; KafkaConsumerDTO kafkaConsumerDTO; //排除掉旧的重复aeid Map> aeidMap = new HashMap<>(); for (JSONObject jsonObject : jsonObjects) { kafkaConsumerDTO = jsonObject.toJavaObject(KafkaConsumerDTO.class); String aeid = kafkaConsumerDTO.getAeid(); String createtimeStr = kafkaConsumerDTO.getCreatetime(); newCreateTime = Long.parseLong(createtimeStr); KafkaConsumerDTO kafkaConsumerDTO1 = aeidMap.get(kafkaConsumerDTO.getAeid()); if (kafkaConsumerDTO1 == null) { aeidMap.put(aeid, kafkaConsumerDTO); } else { long createtime = Long.parseLong(kafkaConsumerDTO1.getCreatetime() == null ? "0" : kafkaConsumerDTO1.getCreatetime()); if (StringUtils.isEmpty(kafkaConsumerDTO.getCreatetime())) { continue; } if (createtime >= newCreateTime) { continue; } } aeidMap.put(aeid, kafkaConsumerDTO); } kafkaConsumerDTOList = new ArrayList<>(aeidMap.values()); return Result.OK(kafkaConsumerDTOList); } case "huanjingjiance": { //环境监测 List jsonObjects = jsonArray.toJavaList(JSONObject.class); List> kafkaConsumerDTOList = new ArrayList<>(); KafkaConsumerDTO kafkaConsumerDTO; //排除掉旧的重复aeid Map> aeidMap = new HashMap<>(); for (JSONObject jsonObject : jsonObjects) { kafkaConsumerDTO = jsonObject.toJavaObject(KafkaConsumerDTO.class); String aeid = kafkaConsumerDTO.getAeid(); String createtimeStr = kafkaConsumerDTO.getCreatetime(); newCreateTime = Long.parseLong(createtimeStr); KafkaConsumerDTO kafkaConsumerDTO1 = aeidMap.get(kafkaConsumerDTO.getAeid()); if (kafkaConsumerDTO1 == null) { aeidMap.put(aeid, kafkaConsumerDTO); } else { long createtime = Long.parseLong(kafkaConsumerDTO1.getCreatetime() == null ? "0" : kafkaConsumerDTO1.getCreatetime()); if (StringUtils.isEmpty(kafkaConsumerDTO.getCreatetime())) { continue; } if (createtime >= newCreateTime) { continue; } } aeidMap.put(aeid, kafkaConsumerDTO); } kafkaConsumerDTOList = new ArrayList<>(aeidMap.values()); return Result.OK(kafkaConsumerDTOList); } default: return Result.error("请确认topic是否正确"); } } /* * 鉴权 **/ @GetMapping("/consumer2/{topic}") public Result getConsumerData2(@PathVariable String topic) { if (StringUtils.isEmpty(topic)) { return Result.error("请确认topic是否正确"); } List resultList = new ArrayList<>(); Consumer consumer = new KafkaConsumer<>(kafkaUtil.getSafeConsumerPro(true)); consumer.subscribe(Collections.singletonList(topic)); ConsumerRecords records = consumer.poll(5000); JSONArray jsonArray=new JSONArray(); for (ConsumerRecord consumerRecord : records) { String kfkContent = consumerRecord.value(); resultList.add(kfkContent); if (!StringUtils.isEmpty(kfkContent)) { jsonArray.add(JSONObject.parseObject(kfkContent)); } } consumer.close(); long newCreateTime =0L; switch (topic) { case "guangzhaochuanganqi": { //光照传感器 List jsonObjects = jsonArray.toJavaList(JSONObject.class); List> kafkaConsumerDTOList; KafkaConsumerDTO kafkaConsumerDTO; //排除掉旧的重复aeid Map> aeidMap = new HashMap<>(); for (JSONObject jsonObject : jsonObjects) { kafkaConsumerDTO = jsonObject.toJavaObject(KafkaConsumerDTO.class); String aeid = kafkaConsumerDTO.getAeid(); String createtimeStr = kafkaConsumerDTO.getCreatetime(); newCreateTime = Long.parseLong(createtimeStr); KafkaConsumerDTO kafkaConsumerDTO1 = aeidMap.get(kafkaConsumerDTO.getAeid()); if (kafkaConsumerDTO1 == null) { aeidMap.put(aeid, kafkaConsumerDTO); } else { long createtime = Long.parseLong(kafkaConsumerDTO1.getCreatetime() == null ? "0" : kafkaConsumerDTO1.getCreatetime()); if (StringUtils.isEmpty(kafkaConsumerDTO.getCreatetime())) { continue; } if (createtime >= newCreateTime) { continue; } } aeidMap.put(aeid, kafkaConsumerDTO); } kafkaConsumerDTOList = new ArrayList<>(aeidMap.values()); return Result.OK(kafkaConsumerDTOList); } case "kongqizhiliangchuanganqi": { //空气质量传感器 List jsonObjects = jsonArray.toJavaList(JSONObject.class); List> kafkaConsumerDTOList; KafkaConsumerDTO kafkaConsumerDTO; //排除掉旧的重复aeid Map> aeidMap = new HashMap<>(); for (JSONObject jsonObject : jsonObjects) { kafkaConsumerDTO = jsonObject.toJavaObject(KafkaConsumerDTO.class); String aeid = kafkaConsumerDTO.getAeid(); String createtimeStr = kafkaConsumerDTO.getCreatetime(); newCreateTime = Long.parseLong(createtimeStr); KafkaConsumerDTO kafkaConsumerDTO1 = aeidMap.get(kafkaConsumerDTO.getAeid()); if (kafkaConsumerDTO1 == null) { aeidMap.put(aeid, kafkaConsumerDTO); } else { long createtime = Long.parseLong(kafkaConsumerDTO1.getCreatetime() == null ? "0" : kafkaConsumerDTO1.getCreatetime()); if (StringUtils.isEmpty(kafkaConsumerDTO.getCreatetime())) { continue; } if (createtime >= newCreateTime) { continue; } } aeidMap.put(aeid, kafkaConsumerDTO); } kafkaConsumerDTOList = new ArrayList<>(aeidMap.values()); return Result.OK(kafkaConsumerDTOList); } case "huanjingjiance": { //环境监测 List jsonObjects = jsonArray.toJavaList(JSONObject.class); List> kafkaConsumerDTOList = new ArrayList<>(); KafkaConsumerDTO kafkaConsumerDTO; //排除掉旧的重复aeid Map> aeidMap = new HashMap<>(); for (JSONObject jsonObject : jsonObjects) { kafkaConsumerDTO = jsonObject.toJavaObject(KafkaConsumerDTO.class); String aeid = kafkaConsumerDTO.getAeid(); String createtimeStr = kafkaConsumerDTO.getCreatetime(); newCreateTime = Long.parseLong(createtimeStr); KafkaConsumerDTO kafkaConsumerDTO1 = aeidMap.get(kafkaConsumerDTO.getAeid()); if (kafkaConsumerDTO1 == null) { aeidMap.put(aeid, kafkaConsumerDTO); } else { long createtime = Long.parseLong(kafkaConsumerDTO1.getCreatetime() == null ? "0" : kafkaConsumerDTO1.getCreatetime()); if (StringUtils.isEmpty(kafkaConsumerDTO.getCreatetime())) { continue; } if (createtime >= newCreateTime) { continue; } } aeidMap.put(aeid, kafkaConsumerDTO); } kafkaConsumerDTOList = new ArrayList<>(aeidMap.values()); return Result.OK(kafkaConsumerDTOList); } default: return Result.error("请确认topic是否正确"); } } @PostMapping("/producer") @ApiOperation(value = "producer") public Result postProducerData(@RequestBody JSONObject requestBody){ String topic = requestBody.getString("topic"); // 创建 KafkaProducer KafkaProducer producer = new KafkaProducer<>(kafkaUtil.getSafeProducerPro()); // 发送消息 ProducerRecord record = new ProducerRecord<>(topic,requestBody.toJSONString()); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { log.info("kafka send successful"); } else { if (e instanceof RetriableException) { //处理可重试异常 try { log.error("kafka send fail Retry sending."); Thread.sleep(3000); //MyProducer.getInstance().send(); } catch (InterruptedException e1) { log.error("kafka error :", e1); } } else { throw new KafkaException("kafka server message error."); } } } }); log.info("-------------------"); log.info(record.value()); // 关闭 KafkaProducer producer.close(); return Result.OK(); } }