main
周文涛 2 years ago
parent 1138df89da
commit 9127574316

27
.gitignore vendored

@ -0,0 +1,27 @@
# Maven #
target/
#log#
logs/
# IDEA #
.idea/
*.iml
# Eclipse #
.settings/
.metadata/
.classpath
.project
Servers/
*.class
# Mac #
.DS_Store
/.factorypath
/.checkstyle
/.scannerwork
/sonar-project.properties
bin/
/log4j.log

@ -9,9 +9,9 @@
<relativePath/> <!-- lookup parent from repository --> <relativePath/> <!-- lookup parent from repository -->
</parent> </parent>
<groupId>com.example</groupId> <groupId>com.example</groupId>
<artifactId>demo-kafka</artifactId> <artifactId>zx-java</artifactId>
<version>0.0.1-SNAPSHOT</version> <version>0.0.1</version>
<name>demo-kafka</name> <name>zx-java</name>
<description>Demo project for Spring Boot</description> <description>Demo project for Spring Boot</description>
<properties> <properties>
<java.version>1.8</java.version> <java.version>1.8</java.version>

@ -1,3 +1,5 @@
#### 中青智造&中兴通讯-中国电信项目 Java服务
###配置 ###配置
#### AppCode: 42142fd0jkbf4515853b7fcec64748f6 #### AppCode: 42142fd0jkbf4515853b7fcec64748f6
#### X-Consumer-Username: dwVendor #### X-Consumer-Username: dwVendor
@ -10,3 +12,12 @@
#### user: root #### user: root
#### pwd: qwer1234 #### pwd: qwer1234
# #
### Kafka 消息间隔
####1、温湿度突变上报温度超过2° 湿度超过10%会上报)
####2、无变化数据上报是30min上报一次
####3、电量是2H上报一次
####4、光照度变化50lux上报一次
####5、光照无变化的情况下30min上报一次

@ -1,4 +1,4 @@
package com.example.demokafka; package com.example.zxweb;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;

@ -1,6 +1,6 @@
package com.example.demokafka.common.api.vo; package com.example.zxweb.common.api.vo;
import com.example.demokafka.common.constant.CommonConstant; import com.example.zxweb.common.constant.CommonConstant;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;

@ -1,4 +1,4 @@
package com.example.demokafka.common.constant; package com.example.zxweb.common.constant;
/** /**
* @Description * @Description

@ -1,4 +1,4 @@
package com.example.demokafka.config; package com.example.zxweb.config;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;

@ -1,13 +1,13 @@
package com.example.demokafka.controller; package com.example.zxweb.controller;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.example.demokafka.common.api.vo.Result; import com.example.zxweb.common.api.vo.Result;
import com.example.demokafka.dto.GuangZhaoChuanGanQiDTO; import com.example.zxweb.dto.GuangZhaoChuanGanQiDTO;
import com.example.demokafka.dto.HuanJingJianCeDTO; import com.example.zxweb.dto.HuanJingJianCeDTO;
import com.example.demokafka.dto.KafkaConsumerDTO; import com.example.zxweb.dto.KafkaConsumerDTO;
import com.example.demokafka.dto.KongQiZhiLiangChuanGanQiDTO; import com.example.zxweb.dto.KongQiZhiLiangChuanGanQiDTO;
import com.example.demokafka.util.KafkaUtil; import com.example.zxweb.utils.KafkaUtil;
import io.swagger.annotations.Api; import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -15,11 +15,10 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.*; import java.util.*;
@ -56,11 +55,7 @@ public class KafkaController {
} }
} }
consumer.close(); consumer.close();
long newCreateTime =0L; long newCreateTime =0L;
switch (topic) { switch (topic) {
case "guangzhaochuanganqi": { case "guangzhaochuanganqi": {
//光照传感器 //光照传感器
@ -154,4 +149,22 @@ public class KafkaController {
} }
} }
@PostMapping("/producer")
@ApiOperation(value = "producer")
public Result<?> postProducerData(@RequestBody JSONObject requestBody){
String topic = requestBody.getString("topic");
String key = requestBody.getString("key");
String value = requestBody.getString("value");
// 创建 KafkaProducer
KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaUtil.getSafeProducerPro());
// 发送消息
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record);
// 关闭 KafkaProducer
producer.close();
return Result.OK();
}
} }

@ -1,4 +1,4 @@
package com.example.demokafka.dto; package com.example.zxweb.dto;
import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;

@ -1,4 +1,4 @@
package com.example.demokafka.dto; package com.example.zxweb.dto;
import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;

@ -1,11 +1,9 @@
package com.example.demokafka.dto; package com.example.zxweb.dto;
import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;
import lombok.Data; import lombok.Data;
import java.io.Serializable;
/** /**
* @Description kafka * @Description kafka
* @Author ZhouWenTao * @Author ZhouWenTao

@ -1,4 +1,4 @@
package com.example.demokafka.dto; package com.example.zxweb.dto;
import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;

@ -1,4 +1,4 @@
package com.example.demokafka.service; package com.example.zxweb.service;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;

@ -1,6 +1,6 @@
package com.example.demokafka.service.impl; package com.example.zxweb.service.impl;
import com.example.demokafka.service.ProducerService; import com.example.zxweb.service.ProducerService;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.RecordMetadata;

@ -1,4 +1,4 @@
package com.example.demokafka.util; package com.example.zxweb.utils;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;

@ -1,4 +1,4 @@
package com.example.demokafka.util; package com.example.zxweb.utils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
@ -22,18 +22,25 @@ public class KafkaUtil {
public String secret; public String secret;
public static String X_Consumer_Username = "dwVendor"; public static String X_Consumer_Username = "dwVendor";
public static Properties getSafeProducerPro() { public Properties getSafeProducerPro() {
Properties properties = new Properties(); Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "10.0.10.153:29551,10.0.10.153:29552,10.0.10.153:29553"); properties.setProperty("bootstrap.servers", "10.0.10.153:29551,10.0.10.153:29552,10.0.10.153:29553");
properties.setProperty("acks", "all"); properties.setProperty("acks", "all");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//properties.setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";",appCode,secret)); if (StringUtils.isNotBlank(appCode)) {
//properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"e92224\" password=\"323236g6#\";"); System.out.println("使用鉴权");
properties.setProperty("bootstrap.servers", "10.0.10.153:29553,10.0.10.153:29554");
properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"dwVendor\" password=\"fEVcb^QFB;IN$K5\";");
properties.setProperty("sasl.mechanism", "SCRAM-SHA-512"); properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
properties.setProperty("security.protocol", "SASL_SSL"); properties.setProperty("security.protocol", "SASL_SSL");
properties.setProperty("ssl.truststore.location", trustestore); properties.setProperty("ssl.truststore.location", trustestore);
properties.setProperty("ssl.truststore.password", "pwd123"); properties.setProperty("ssl.truststore.password", "pwd123");
}else{
properties.setProperty("bootstrap.servers", "10.0.10.153:29551,10.0.10.153:29552");
}
//properties.setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";",appCode,secret));
//properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"e92224\" password=\"323236g6#\";");
properties.setProperty("buffer.memory", "33554432"); properties.setProperty("buffer.memory", "33554432");
properties.setProperty("retries", "0"); properties.setProperty("retries", "0");
properties.setProperty("ssl.endpoint.identification.algorithm", ""); properties.setProperty("ssl.endpoint.identification.algorithm", "");
@ -53,7 +60,7 @@ public class KafkaUtil {
properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"dwVendor\" password=\"fEVcb^QFB;IN$K5\";"); properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"dwVendor\" password=\"fEVcb^QFB;IN$K5\";");
properties.setProperty("sasl.mechanism", "SCRAM-SHA-512"); properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
properties.setProperty("security.protocol", "SASL_SSL"); properties.setProperty("security.protocol", "SASL_SSL");
properties.setProperty("ssl.truststore.location", "/opt/kafka/client.trustestore.p12"); properties.setProperty("ssl.truststore.location", trustestore);
properties.setProperty("ssl.truststore.password", "pwd123"); properties.setProperty("ssl.truststore.password", "pwd123");
}else{ }else{
properties.setProperty("bootstrap.servers", "10.0.10.153:29551,10.0.10.153:29552"); properties.setProperty("bootstrap.servers", "10.0.10.153:29551,10.0.10.153:29552");
Loading…
Cancel
Save