From 2a5745f4eed8ae43cd10d90a9ecec07569ad00cd Mon Sep 17 00:00:00 2001
From: zhouwentao <1577701412@qq.com>
Date: Sat, 2 Sep 2023 12:29:38 +0800
Subject: [PATCH] updates
---
pom.xml | 13 +-
.../demokafka/controller/TestController.java | 13 +-
.../demokafka/service/ProducerService.java | 70 +++++++++++
.../service/impl/ProducerServiceImpl.java | 117 ++++++++++++++++++
.../com/example/demokafka/util/Kafka.java | 4 +-
src/main/resources/application.properties | 2 -
src/main/resources/application.yml | 23 ++++
7 files changed, 235 insertions(+), 7 deletions(-)
create mode 100644 src/main/java/com/example/demokafka/service/ProducerService.java
create mode 100644 src/main/java/com/example/demokafka/service/impl/ProducerServiceImpl.java
delete mode 100644 src/main/resources/application.properties
create mode 100644 src/main/resources/application.yml
diff --git a/pom.xml b/pom.xml
index 78c7711..a010ba0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,7 +45,6 @@
-
com.spring4all
swagger-spring-boot-starter
@@ -57,11 +56,21 @@
knife4j-spring-ui
2.0.4
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+ org.springframework.kafka
+ spring-kafka
+
+
+
diff --git a/src/main/java/com/example/demokafka/controller/TestController.java b/src/main/java/com/example/demokafka/controller/TestController.java
index 1664b21..658d984 100644
--- a/src/main/java/com/example/demokafka/controller/TestController.java
+++ b/src/main/java/com/example/demokafka/controller/TestController.java
@@ -2,10 +2,14 @@ package com.example.demokafka.controller;
import com.alibaba.fastjson.JSONObject;
import com.example.demokafka.model.Person;
+import com.example.demokafka.service.ProducerService;
import com.example.demokafka.util.Kafka;
import io.swagger.annotations.*;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.format.annotation.DateTimeFormat;
+import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;
import springfox.documentation.annotations.ApiIgnore;
@@ -24,10 +28,17 @@ import java.util.Map;
@Api(tags = "测试")
public class TestController {
+ @Autowired
+ private ProducerService producerService;
+ @Autowired
+ private KafkaTemplate kafkaTemplate;
+ @Value("${spring.kafka.topic}")
+ private String topicName;
+
@GetMapping("/kafka01")
@ApiOperation(value = "kafka01")
public List kafka01() {
- new Kafka().start();
+ producerService.sendMessage(topicName, "测试");
return new ArrayList<>();
}
}
diff --git a/src/main/java/com/example/demokafka/service/ProducerService.java b/src/main/java/com/example/demokafka/service/ProducerService.java
new file mode 100644
index 0000000..dd56dd7
--- /dev/null
+++ b/src/main/java/com/example/demokafka/service/ProducerService.java
@@ -0,0 +1,70 @@
+package com.example.demokafka.service;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.messaging.Message;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * @Description
+ * @Author ZhouWenTao
+ * @Date 2023/9/2 12:07
+ */
+public interface ProducerService {
+
+ /**
+ * 发送同步消息
+ * @param topic
+ * @param data
+ * @throws ExecutionException
+ * @throws InterruptedException
+ */
+ void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException;
+
+ /**
+ * 发送普通消息
+ * @param topic
+ * @param data
+ */
+ void sendMessage(String topic, String data);
+
+ /**
+ * 发送带附加信息的消息
+ * @param record
+ */
+ void sendMessage(ProducerRecord record);
+
+ /**
+ * 发送Message消息
+ * @param message
+ */
+ void sendMessage(Message message);
+
+ /**
+ * 发送带key的消息
+ * @param topic
+ * @param key
+ * @param data
+ */
+ void sendMessage(String topic, String key, String data);
+
+ /**
+ * 发送带key和分区的消息
+ * @param topic
+ * @param partition
+ * @param key
+ * @param data
+ */
+ void sendMessage(String topic, Integer partition, String key, String data);
+
+ /**
+ * 发送有分区,当前时间,key的消息
+ * @param topic
+ * @param partition
+ * @param timestamp
+ * @param key
+ * @param data
+ */
+ void sendMessage(String topic, Integer partition, Long timestamp, String key, String data);
+}
+
diff --git a/src/main/java/com/example/demokafka/service/impl/ProducerServiceImpl.java b/src/main/java/com/example/demokafka/service/impl/ProducerServiceImpl.java
new file mode 100644
index 0000000..80a1f73
--- /dev/null
+++ b/src/main/java/com/example/demokafka/service/impl/ProducerServiceImpl.java
@@ -0,0 +1,117 @@
+package com.example.demokafka.service.impl;
+
+import com.example.demokafka.service.ProducerService;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.messaging.Message;
+import org.springframework.stereotype.Service;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * @Description
+ * @Author ZhouWenTao
+ * @Date 2023/9/2 12:09
+ */
+@Service
+@RequiredArgsConstructor
+public class ProducerServiceImpl implements ProducerService {
+
+ private final KafkaTemplate kafkaTemplate;
+ private static final Logger logger = LoggerFactory.getLogger(ProducerServiceImpl.class);
+
+ @Override
+ public void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException {
+ SendResult sendResult = kafkaTemplate.send(topic, data).get();
+ RecordMetadata recordMetadata = sendResult.getRecordMetadata();
+ logger.info("发送同步消息成功!发送的主题为:{}", recordMetadata.topic());
+ }
+
+ @Override
+ public void sendMessage(String topic, String data) {
+ ListenableFuture> future = kafkaTemplate.send(topic, data);
+ future.addCallback(success -> logger.info("发送消息成功!"), failure -> logger.error("发送消息失败!失败原因是:{}", failure.getMessage()));
+ }
+
+ @Override
+ public void sendMessage(ProducerRecord record) {
+ ListenableFuture> future = kafkaTemplate.send(record);
+ future.addCallback(new ListenableFutureCallback>() {
+ @Override
+ public void onFailure(Throwable throwable) {
+ logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
+ }
+
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ RecordMetadata metadata = sendResult.getRecordMetadata();
+ logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
+ }
+ });
+ }
+
+ @Override
+ public void sendMessage(Message message) {
+ ListenableFuture> future = kafkaTemplate.send(message);
+ future.addCallback(new ListenableFutureCallback>() {
+ @Override
+ public void onFailure(Throwable throwable) {
+ logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
+ }
+
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ RecordMetadata metadata = sendResult.getRecordMetadata();
+ logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
+ }
+ });
+ }
+
+ @Override
+ public void sendMessage(String topic, String key, String data) {
+ ListenableFuture> future = kafkaTemplate.send(topic, key, data);
+ future.addCallback(success -> logger.info("发送消息成功!"), failure -> logger.error("发送消息失败!失败原因是:{}", failure.getMessage()));
+ }
+
+ @Override
+ public void sendMessage(String topic, Integer partition, String key, String data) {
+ ListenableFuture> future = kafkaTemplate.send(topic, partition, key, data);
+ future.addCallback(new ListenableFutureCallback>() {
+ @Override
+ public void onFailure(Throwable throwable) {
+ logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
+ }
+
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ RecordMetadata metadata = sendResult.getRecordMetadata();
+ logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
+ }
+ });
+ }
+
+ @Override
+ public void sendMessage(String topic, Integer partition, Long timestamp, String key, String data) {
+ ListenableFuture> future = kafkaTemplate.send(topic, partition, timestamp, key, data);
+ future.addCallback(new ListenableFutureCallback>() {
+ @Override
+ public void onFailure(Throwable throwable) {
+ logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
+ }
+
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ RecordMetadata metadata = sendResult.getRecordMetadata();
+ logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
+ }
+ });
+ }
+}
+
diff --git a/src/main/java/com/example/demokafka/util/Kafka.java b/src/main/java/com/example/demokafka/util/Kafka.java
index 47f9511..b5f9c24 100644
--- a/src/main/java/com/example/demokafka/util/Kafka.java
+++ b/src/main/java/com/example/demokafka/util/Kafka.java
@@ -26,7 +26,7 @@ public class Kafka extends Thread {
}
private void poll(String sourceTopicName) {
- try {
+ /*try {
consumer116.subscribe(Arrays.asList(sourceTopicName));
while (true) {
ConsumerRecords records = consumer116.poll(Duration.ofMillis(5000));
@@ -41,7 +41,7 @@ public class Kafka extends Thread {
if (consumer116 != null) {
consumer116.close();
}
- }
+ }*/
}
}
\ No newline at end of file
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
deleted file mode 100644
index ef6ac6c..0000000
--- a/src/main/resources/application.properties
+++ /dev/null
@@ -1,2 +0,0 @@
-server.port=10011
-knife4j.enable=true
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
new file mode 100644
index 0000000..ac74d59
--- /dev/null
+++ b/src/main/resources/application.yml
@@ -0,0 +1,23 @@
+server:
+ port: 10011
+knife4j:
+ enable: true
+
+spring:
+ kafka:
+ #???????
+ topic: sourcetopic
+ producer:
+ bootstrap-servers: 10.0.10.153:29551
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.apache.kafka.common.serialization.StringSerializer
+ retries: 3
+ properties:
+ retry.backoff.ms: 100 #?????????100
+ linger.ms: 0 #???0???????????????????batch???
+ max.request.size: 1048576 #??1MB??????????
+ connections.max.idle.ms: 540000 #??9???????????????
+ receive.buffer.bytes: 32768 #??32KB???socket????????????-1??????????
+ send.buffer.bytes: 131072 #??128KB???socket???????????-1??????????
+ request.timeout.ms: 10000 #??30000ms??????????????
+ transaction.timeout.ms: 5000
\ No newline at end of file