main
周文涛 2 years ago
commit 60ab9fce2c

@ -0,0 +1,84 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo-kafka</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.github.xiaoymin</groupId>-->
<!-- <artifactId>knife4j-spring-boot-starter</artifactId>-->
<!-- &lt;!&ndash;在引用时请在maven中央仓库搜索2.X最新版本号&ndash;&gt;-->
<!-- <version>2.0.9</version>-->
<!-- </dependency>-->
<dependency>
<groupId>com.spring4all</groupId>
<artifactId>swagger-spring-boot-starter</artifactId>
<version>1.9.1.RELEASE</version>
</dependency>
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>knife4j-spring-ui</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,13 @@
package com.example.demokafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
}

@ -0,0 +1,177 @@
package com.example.demokafka.common.api.vo;
import com.example.demokafka.common.constant.CommonConstant;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.io.Serializable;
/**
*
* @author scott
* @email jeecgos@163.com
* @date 2019119
*/
@Data
@ApiModel(value="接口返回对象", description="接口返回对象")
public class Result<T> implements Serializable {
private static final long serialVersionUID = 1L;
/**
*
*/
@ApiModelProperty(value = "成功标志")
private boolean success = true;
/**
*
*/
@ApiModelProperty(value = "返回处理消息")
private String message = "";
/**
*
*/
@ApiModelProperty(value = "返回代码")
private Integer code = 0;
/**
* data
*/
@ApiModelProperty(value = "返回数据对象")
private T result;
/**
*
*/
@ApiModelProperty(value = "时间戳")
private long timestamp = System.currentTimeMillis();
public Result() {
}
/**
* VUE3token
* @param code
* @param message
*/
public Result(Integer code, String message) {
this.code = code;
this.message = message;
}
public Result<T> success(String message) {
this.message = message;
this.code = CommonConstant.SC_OK_200;
this.success = true;
return this;
}
public static<T> Result<T> ok() {
Result<T> r = new Result<T>();
r.setSuccess(true);
r.setCode(CommonConstant.SC_OK_200);
return r;
}
public static<T> Result<T> ok(String msg) {
Result<T> r = new Result<T>();
r.setSuccess(true);
r.setCode(CommonConstant.SC_OK_200);
//Result OK(String msg)方法会造成兼容性问题 issues/I4IP3D
r.setResult((T) msg);
r.setMessage(msg);
return r;
}
public static<T> Result<T> ok(T data) {
Result<T> r = new Result<T>();
r.setSuccess(true);
r.setCode(CommonConstant.SC_OK_200);
r.setResult(data);
return r;
}
public static<T> Result<T> OK() {
Result<T> r = new Result<T>();
r.setSuccess(true);
r.setCode(CommonConstant.SC_OK_200);
return r;
}
/**
*
*
* @param msg
* @param <T>
* @return
*/
public static<T> Result<T> OK(String msg) {
Result<T> r = new Result<T>();
r.setSuccess(true);
r.setCode(CommonConstant.SC_OK_200);
r.setMessage(msg);
//Result OK(String msg)方法会造成兼容性问题 issues/I4IP3D
r.setResult((T) msg);
return r;
}
public static<T> Result<T> OK(T data) {
Result<T> r = new Result<T>();
r.setSuccess(true);
r.setCode(CommonConstant.SC_OK_200);
r.setResult(data);
return r;
}
public static<T> Result<T> OK(String msg, T data) {
Result<T> r = new Result<T>();
r.setSuccess(true);
r.setCode(CommonConstant.SC_OK_200);
r.setMessage(msg);
r.setResult(data);
return r;
}
public static<T> Result<T> error(String msg, T data) {
Result<T> r = new Result<T>();
r.setSuccess(false);
r.setCode(CommonConstant.SC_INTERNAL_SERVER_ERROR_500);
r.setMessage(msg);
r.setResult(data);
return r;
}
public static<T> Result<T> error(String msg) {
return error(CommonConstant.SC_INTERNAL_SERVER_ERROR_500, msg);
}
public static<T> Result<T> error(int code, String msg) {
Result<T> r = new Result<T>();
r.setCode(code);
r.setMessage(msg);
r.setSuccess(false);
return r;
}
public Result<T> error500(String message) {
this.message = message;
this.code = CommonConstant.SC_INTERNAL_SERVER_ERROR_500;
this.success = false;
return this;
}
/**
* 访
*/
public static<T> Result<T> noauth(String msg) {
return error(CommonConstant.SC_JEECG_NO_AUTHZ, msg);
}
@JsonIgnore
private String onlTable;
}

@ -0,0 +1,16 @@
package com.example.demokafka.common.constant;
/**
* @Description
* @Author ZhouWenTao
* @Date 2023/9/1 15:19
*/
public interface CommonConstant {
/*请求成功*/
Integer SC_OK_200=200;
Integer SC_INTERNAL_SERVER_ERROR_500=500;
/**访问权限认证未通过 510*/
Integer SC_JEECG_NO_AUTHZ=510;
String X_ACCESS_TOKEN="x-access-token";
}

@ -0,0 +1,48 @@
package com.example.demokafka.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
import java.util.ArrayList;
/**
* @author
* @date 2023/5/12
* Swagger
*/
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket docketYichang() {
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(getApiInfo())
.groupName("demo1")
.select().apis(RequestHandlerSelectors.basePackage("com.example"))
.build();
}
@Bean
public Docket docketWuhan() {
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(getApiInfo())
.groupName("武汉")
.select().apis(RequestHandlerSelectors.basePackage("com.example"))
.build();
}
// String title, String description, String version, String termsOfServiceUrl, Contact contact, String license, String licenseUrl, Collection<VendorExtension>
@Bean
public ApiInfo getApiInfo() {
// Contact(String name, String url, String email)
Contact contact = new Contact("hxlc", "https://www.baotao.com", "898773575@qq.com");
return new ApiInfo("宜昌天镜", "页面描述了宜昌天镜各个接口的模型", "1.0", "url", contact, "Apache Foundation", "http://www.baidu.com", new ArrayList<>());
}
}

@ -0,0 +1,33 @@
package com.example.demokafka.controller;
import com.alibaba.fastjson.JSONObject;
import com.example.demokafka.model.Person;
import com.example.demokafka.util.Kafka;
import io.swagger.annotations.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.web.bind.annotation.*;
import springfox.documentation.annotations.ApiIgnore;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* @author
* @date 2023/5/11
*/
@RestController
@RequestMapping("/test")
@Slf4j
@Api(tags = "测试")
public class TestController {
@GetMapping("/kafka01")
@ApiOperation(value = "kafka01")
public List<Person> kafka01() {
new Kafka().start();
return new ArrayList<>();
}
}

@ -0,0 +1,22 @@
package com.example.demokafka.model;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.time.LocalDateTime;
/**
* @author
* @date 2023/5/11
*/
@Data
@ApiModel
public class Person {
@ApiModelProperty(value = "人物ID", example = "1")
private Integer id;
@ApiModelProperty(value = "人物姓名")
private String name;
private LocalDateTime birthDay;
}

@ -0,0 +1,47 @@
package com.example.demokafka.util;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
/**
* @Description
* @Author ZhouWenTao
* @Date 2023/9/1 14:05
*/
public class Kafka extends Thread {
Consumer<String, String> consumer116 = new KafkaConsumer<String, String>(KafkaUtil.getSafeConsumerPro());
@Override
public void run() {
poll("sourcetopic");
}
public static void main(String[] args) {
new Kafka().start();
}
private void poll(String sourceTopicName) {
try {
consumer116.subscribe(Arrays.asList(sourceTopicName));
while (true) {
ConsumerRecords<String, String> records = consumer116.poll(Duration.ofMillis(5000));
for (ConsumerRecord<String, String> consumerRecord : records) {
String kfkContent = consumerRecord.value();
System.out.println("Success to receive kafka :" + kfkContent);
}
}
} catch (Exception e) {
System.out.println("thread exception:" + e);
} finally {
if (consumer116 != null) {
consumer116.close();
}
}
}
}

@ -0,0 +1,54 @@
package com.example.demokafka.util;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
/**
* @Description
* @Author ZhouWenTao
* @Date 2023/9/1 13:40
*/
public class KafkaUtil {
public static String trustestore="/opt/kafka/client.trustestore.p12";
public static String appCode = "42142fd0jkbf4515853b7fcec64748f6";
public static String X_Consumer_Username = "dwVendor";
public static Properties getSafeProducerPro() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "10.0.10.153:29551,10.0.10.153:29552,10.0.10.153:29553");
properties.setProperty("acks", "all");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"e92224\" password=\"323236g6#\";");
properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
properties.setProperty("security.protocol", "SASL_SSL");
properties.setProperty("ssl.truststore.location", trustestore);
properties.setProperty("ssl.truststore.password", "pwd123");
properties.setProperty("buffer.memory", "33554432");
properties.setProperty("retries", "0");
properties.setProperty("ssl.endpoint.identification.algorithm", "");
return properties;
}
//keytool -keystore D:/keystore/client.trustestore.p12 -storepass pwd123 -noprompt -alias client.trustestore -import -file D:/keystore/ca.crt -storetype PKCS12
public static Properties getSafeConsumerPro() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "10.0.10.153:29551,10.0.10.153:29552,10.0.10.153:29553");
properties.setProperty("group.id", "isv-kafka");
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", StringDeserializer.class.getName());
properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"e92224\" password=\"323236g6#\";");
properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
properties.setProperty("security.protocol", "SASL_SSL");
properties.setProperty("ssl.truststore.location", trustestore);
properties.setProperty("ssl.truststore.password", "pwd123");
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("session.timeout.ms", "30000");
properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("ssl.endpoint.identification.algorithm", "");
return properties;
}
}

@ -0,0 +1,2 @@
server.port=10011
knife4j.enable=true
Loading…
Cancel
Save