From 60ab9fce2c0b0b8388e2b3c4e77b1b44089c56f9 Mon Sep 17 00:00:00 2001 From: zhouwentao <1577701412@qq.com> Date: Fri, 1 Sep 2023 21:03:13 +0800 Subject: [PATCH] updates --- pom.xml | 84 +++++++++ .../example/demokafka/KafkaApplication.java | 13 ++ .../demokafka/common/api/vo/Result.java | 177 ++++++++++++++++++ .../common/constant/CommonConstant.java | 16 ++ .../demokafka/config/SwaggerConfig.java | 48 +++++ .../demokafka/controller/TestController.java | 33 ++++ .../com/example/demokafka/model/Person.java | 22 +++ .../com/example/demokafka/util/Kafka.java | 47 +++++ .../com/example/demokafka/util/KafkaUtil.java | 54 ++++++ src/main/resources/application.properties | 2 + 10 files changed, 496 insertions(+) create mode 100644 pom.xml create mode 100644 src/main/java/com/example/demokafka/KafkaApplication.java create mode 100644 src/main/java/com/example/demokafka/common/api/vo/Result.java create mode 100644 src/main/java/com/example/demokafka/common/constant/CommonConstant.java create mode 100644 src/main/java/com/example/demokafka/config/SwaggerConfig.java create mode 100644 src/main/java/com/example/demokafka/controller/TestController.java create mode 100644 src/main/java/com/example/demokafka/model/Person.java create mode 100644 src/main/java/com/example/demokafka/util/Kafka.java create mode 100644 src/main/java/com/example/demokafka/util/KafkaUtil.java create mode 100644 src/main/resources/application.properties diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..78c7711 --- /dev/null +++ b/pom.xml @@ -0,0 +1,84 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.0.6.RELEASE + + + com.example + demo-kafka + 0.0.1-SNAPSHOT + demo-kafka + Demo project for Spring Boot + + 1.8 + + + + org.springframework.boot + spring-boot-starter-web + + + + org.projectlombok + lombok + true + + + + com.alibaba + fastjson + 1.2.83 + + + + + + + + + + + + com.spring4all + swagger-spring-boot-starter + 1.9.1.RELEASE + + + + com.github.xiaoymin + knife4j-spring-ui + 2.0.4 + + + org.apache.kafka + kafka-clients + 2.8.1 + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + org.projectlombok + lombok + + + + + + + + diff --git a/src/main/java/com/example/demokafka/KafkaApplication.java b/src/main/java/com/example/demokafka/KafkaApplication.java new file mode 100644 index 0000000..67760c2 --- /dev/null +++ b/src/main/java/com/example/demokafka/KafkaApplication.java @@ -0,0 +1,13 @@ +package com.example.demokafka; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class KafkaApplication { + + public static void main(String[] args) { + SpringApplication.run(KafkaApplication.class, args); + } + +} diff --git a/src/main/java/com/example/demokafka/common/api/vo/Result.java b/src/main/java/com/example/demokafka/common/api/vo/Result.java new file mode 100644 index 0000000..1965c16 --- /dev/null +++ b/src/main/java/com/example/demokafka/common/api/vo/Result.java @@ -0,0 +1,177 @@ +package com.example.demokafka.common.api.vo; + +import com.example.demokafka.common.constant.CommonConstant; +import com.fasterxml.jackson.annotation.JsonIgnore; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.io.Serializable; + +/** + * 接口返回数据格式 + * @author scott + * @email jeecgos@163.com + * @date 2019年1月19日 + */ +@Data +@ApiModel(value="接口返回对象", description="接口返回对象") +public class Result implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * 成功标志 + */ + @ApiModelProperty(value = "成功标志") + private boolean success = true; + + /** + * 返回处理消息 + */ + @ApiModelProperty(value = "返回处理消息") + private String message = ""; + + /** + * 返回代码 + */ + @ApiModelProperty(value = "返回代码") + private Integer code = 0; + + /** + * 返回数据对象 data + */ + @ApiModelProperty(value = "返回数据对象") + private T result; + + /** + * 时间戳 + */ + @ApiModelProperty(value = "时间戳") + private long timestamp = System.currentTimeMillis(); + + public Result() { + } + + /** + * 兼容VUE3版token失效不跳转登录页面 + * @param code + * @param message + */ + public Result(Integer code, String message) { + this.code = code; + this.message = message; + } + + public Result success(String message) { + this.message = message; + this.code = CommonConstant.SC_OK_200; + this.success = true; + return this; + } + + public static Result ok() { + Result r = new Result(); + r.setSuccess(true); + r.setCode(CommonConstant.SC_OK_200); + return r; + } + + public static Result ok(String msg) { + Result r = new Result(); + r.setSuccess(true); + r.setCode(CommonConstant.SC_OK_200); + //Result OK(String msg)方法会造成兼容性问题 issues/I4IP3D + r.setResult((T) msg); + r.setMessage(msg); + return r; + } + + public static Result ok(T data) { + Result r = new Result(); + r.setSuccess(true); + r.setCode(CommonConstant.SC_OK_200); + r.setResult(data); + return r; + } + + public static Result OK() { + Result r = new Result(); + r.setSuccess(true); + r.setCode(CommonConstant.SC_OK_200); + return r; + } + + /** + * 此方法是为了兼容升级所创建 + * + * @param msg + * @param + * @return + */ + public static Result OK(String msg) { + Result r = new Result(); + r.setSuccess(true); + r.setCode(CommonConstant.SC_OK_200); + r.setMessage(msg); + //Result OK(String msg)方法会造成兼容性问题 issues/I4IP3D + r.setResult((T) msg); + return r; + } + + public static Result OK(T data) { + Result r = new Result(); + r.setSuccess(true); + r.setCode(CommonConstant.SC_OK_200); + r.setResult(data); + return r; + } + + public static Result OK(String msg, T data) { + Result r = new Result(); + r.setSuccess(true); + r.setCode(CommonConstant.SC_OK_200); + r.setMessage(msg); + r.setResult(data); + return r; + } + + public static Result error(String msg, T data) { + Result r = new Result(); + r.setSuccess(false); + r.setCode(CommonConstant.SC_INTERNAL_SERVER_ERROR_500); + r.setMessage(msg); + r.setResult(data); + return r; + } + + public static Result error(String msg) { + return error(CommonConstant.SC_INTERNAL_SERVER_ERROR_500, msg); + } + + public static Result error(int code, String msg) { + Result r = new Result(); + r.setCode(code); + r.setMessage(msg); + r.setSuccess(false); + return r; + } + + public Result error500(String message) { + this.message = message; + this.code = CommonConstant.SC_INTERNAL_SERVER_ERROR_500; + this.success = false; + return this; + } + + /** + * 无权限访问返回结果 + */ + public static Result noauth(String msg) { + return error(CommonConstant.SC_JEECG_NO_AUTHZ, msg); + } + + @JsonIgnore + private String onlTable; + +} \ No newline at end of file diff --git a/src/main/java/com/example/demokafka/common/constant/CommonConstant.java b/src/main/java/com/example/demokafka/common/constant/CommonConstant.java new file mode 100644 index 0000000..4656264 --- /dev/null +++ b/src/main/java/com/example/demokafka/common/constant/CommonConstant.java @@ -0,0 +1,16 @@ +package com.example.demokafka.common.constant; + +/** + * @Description + * @Author ZhouWenTao + * @Date 2023/9/1 15:19 + */ +public interface CommonConstant { + /*请求成功*/ + Integer SC_OK_200=200; + Integer SC_INTERNAL_SERVER_ERROR_500=500; + /**访问权限认证未通过 510*/ + Integer SC_JEECG_NO_AUTHZ=510; + + String X_ACCESS_TOKEN="x-access-token"; +} diff --git a/src/main/java/com/example/demokafka/config/SwaggerConfig.java b/src/main/java/com/example/demokafka/config/SwaggerConfig.java new file mode 100644 index 0000000..e8c34b2 --- /dev/null +++ b/src/main/java/com/example/demokafka/config/SwaggerConfig.java @@ -0,0 +1,48 @@ +package com.example.demokafka.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import springfox.documentation.builders.RequestHandlerSelectors; +import springfox.documentation.service.ApiInfo; +import springfox.documentation.service.Contact; +import springfox.documentation.spi.DocumentationType; +import springfox.documentation.spring.web.plugins.Docket; +import springfox.documentation.swagger2.annotations.EnableSwagger2; + +import java.util.ArrayList; + +/** + * @author 赵恒 + * @date 2023/5/12 + * Swagger 配置页面 + */ +@Configuration +@EnableSwagger2 +public class SwaggerConfig { + + @Bean + public Docket docketYichang() { + return new Docket(DocumentationType.SWAGGER_2) + .apiInfo(getApiInfo()) + .groupName("demo1") + .select().apis(RequestHandlerSelectors.basePackage("com.example")) + .build(); + } + + @Bean + public Docket docketWuhan() { + return new Docket(DocumentationType.SWAGGER_2) + .apiInfo(getApiInfo()) + .groupName("武汉") + .select().apis(RequestHandlerSelectors.basePackage("com.example")) + .build(); + } + + // String title, String description, String version, String termsOfServiceUrl, Contact contact, String license, String licenseUrl, Collection + @Bean + public ApiInfo getApiInfo() { + // Contact(String name, String url, String email) + Contact contact = new Contact("hxlc", "https://www.baotao.com", "898773575@qq.com"); + return new ApiInfo("宜昌天镜", "页面描述了宜昌天镜各个接口的模型", "1.0", "url", contact, "Apache Foundation", "http://www.baidu.com", new ArrayList<>()); + } +} diff --git a/src/main/java/com/example/demokafka/controller/TestController.java b/src/main/java/com/example/demokafka/controller/TestController.java new file mode 100644 index 0000000..1664b21 --- /dev/null +++ b/src/main/java/com/example/demokafka/controller/TestController.java @@ -0,0 +1,33 @@ +package com.example.demokafka.controller; + +import com.alibaba.fastjson.JSONObject; +import com.example.demokafka.model.Person; +import com.example.demokafka.util.Kafka; +import io.swagger.annotations.*; +import lombok.extern.slf4j.Slf4j; +import org.springframework.format.annotation.DateTimeFormat; +import org.springframework.web.bind.annotation.*; +import springfox.documentation.annotations.ApiIgnore; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +/** + * @author 赵恒 + * @date 2023/5/11 + */ +@RestController +@RequestMapping("/test") +@Slf4j +@Api(tags = "测试") +public class TestController { + + @GetMapping("/kafka01") + @ApiOperation(value = "kafka01") + public List kafka01() { + new Kafka().start(); + return new ArrayList<>(); + } +} diff --git a/src/main/java/com/example/demokafka/model/Person.java b/src/main/java/com/example/demokafka/model/Person.java new file mode 100644 index 0000000..59bf193 --- /dev/null +++ b/src/main/java/com/example/demokafka/model/Person.java @@ -0,0 +1,22 @@ +package com.example.demokafka.model; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.time.LocalDateTime; + +/** + * @author 赵恒 + * @date 2023/5/11 + */ +@Data +@ApiModel +public class Person { + @ApiModelProperty(value = "人物ID", example = "1") + private Integer id; + @ApiModelProperty(value = "人物姓名") + private String name; + private LocalDateTime birthDay; + +} diff --git a/src/main/java/com/example/demokafka/util/Kafka.java b/src/main/java/com/example/demokafka/util/Kafka.java new file mode 100644 index 0000000..47f9511 --- /dev/null +++ b/src/main/java/com/example/demokafka/util/Kafka.java @@ -0,0 +1,47 @@ +package com.example.demokafka.util; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.time.Duration; +import java.util.Arrays; + +/** + * @Description + * @Author ZhouWenTao + * @Date 2023/9/1 14:05 + */ +public class Kafka extends Thread { + Consumer consumer116 = new KafkaConsumer(KafkaUtil.getSafeConsumerPro()); + + @Override + public void run() { + poll("sourcetopic"); + } + + public static void main(String[] args) { + new Kafka().start(); + } + + private void poll(String sourceTopicName) { + try { + consumer116.subscribe(Arrays.asList(sourceTopicName)); + while (true) { + ConsumerRecords records = consumer116.poll(Duration.ofMillis(5000)); + for (ConsumerRecord consumerRecord : records) { + String kfkContent = consumerRecord.value(); + System.out.println("Success to receive kafka :" + kfkContent); + } + } + } catch (Exception e) { + System.out.println("thread exception:" + e); + } finally { + if (consumer116 != null) { + consumer116.close(); + } + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/example/demokafka/util/KafkaUtil.java b/src/main/java/com/example/demokafka/util/KafkaUtil.java new file mode 100644 index 0000000..0f406be --- /dev/null +++ b/src/main/java/com/example/demokafka/util/KafkaUtil.java @@ -0,0 +1,54 @@ +package com.example.demokafka.util; + +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.util.Properties; + +/** + * @Description + * @Author ZhouWenTao + * @Date 2023/9/1 13:40 + */ +public class KafkaUtil { + public static String trustestore="/opt/kafka/client.trustestore.p12"; + + public static String appCode = "42142fd0jkbf4515853b7fcec64748f6"; + + public static String X_Consumer_Username = "dwVendor"; + + public static Properties getSafeProducerPro() { + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", "10.0.10.153:29551,10.0.10.153:29552,10.0.10.153:29553"); + properties.setProperty("acks", "all"); + properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"e92224\" password=\"323236g6#\";"); + properties.setProperty("sasl.mechanism", "SCRAM-SHA-512"); + properties.setProperty("security.protocol", "SASL_SSL"); + properties.setProperty("ssl.truststore.location", trustestore); + properties.setProperty("ssl.truststore.password", "pwd123"); + properties.setProperty("buffer.memory", "33554432"); + properties.setProperty("retries", "0"); + properties.setProperty("ssl.endpoint.identification.algorithm", ""); + return properties; + } + //keytool -keystore D:/keystore/client.trustestore.p12 -storepass pwd123 -noprompt -alias client.trustestore -import -file D:/keystore/ca.crt -storetype PKCS12 + public static Properties getSafeConsumerPro() { + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", "10.0.10.153:29551,10.0.10.153:29552,10.0.10.153:29553"); + properties.setProperty("group.id", "isv-kafka"); + properties.setProperty("key.deserializer", StringDeserializer.class.getName()); + properties.setProperty("value.deserializer", StringDeserializer.class.getName()); + properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"e92224\" password=\"323236g6#\";"); + properties.setProperty("sasl.mechanism", "SCRAM-SHA-512"); + properties.setProperty("security.protocol", "SASL_SSL"); + properties.setProperty("ssl.truststore.location", trustestore); + properties.setProperty("ssl.truststore.password", "pwd123"); + properties.setProperty("enable.auto.commit", "true"); + properties.setProperty("auto.commit.interval.ms", "1000"); + properties.setProperty("session.timeout.ms", "30000"); + properties.setProperty("auto.offset.reset", "earliest"); + properties.setProperty("ssl.endpoint.identification.algorithm", ""); + return properties; + } +} \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties new file mode 100644 index 0000000..ef6ac6c --- /dev/null +++ b/src/main/resources/application.properties @@ -0,0 +1,2 @@ +server.port=10011 +knife4j.enable=true