diff --git a/pom.xml b/pom.xml index 78c7711..a010ba0 100644 --- a/pom.xml +++ b/pom.xml @@ -45,7 +45,6 @@ - com.spring4all swagger-spring-boot-starter @@ -57,11 +56,21 @@ knife4j-spring-ui 2.0.4 + + + org.springframework.boot + spring-boot-starter-web + + org.springframework.kafka + spring-kafka + + + diff --git a/src/main/java/com/example/demokafka/controller/TestController.java b/src/main/java/com/example/demokafka/controller/TestController.java index 1664b21..658d984 100644 --- a/src/main/java/com/example/demokafka/controller/TestController.java +++ b/src/main/java/com/example/demokafka/controller/TestController.java @@ -2,10 +2,14 @@ package com.example.demokafka.controller; import com.alibaba.fastjson.JSONObject; import com.example.demokafka.model.Person; +import com.example.demokafka.service.ProducerService; import com.example.demokafka.util.Kafka; import io.swagger.annotations.*; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.format.annotation.DateTimeFormat; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.*; import springfox.documentation.annotations.ApiIgnore; @@ -24,10 +28,17 @@ import java.util.Map; @Api(tags = "测试") public class TestController { + @Autowired + private ProducerService producerService; + @Autowired + private KafkaTemplate kafkaTemplate; + @Value("${spring.kafka.topic}") + private String topicName; + @GetMapping("/kafka01") @ApiOperation(value = "kafka01") public List kafka01() { - new Kafka().start(); + producerService.sendMessage(topicName, "测试"); return new ArrayList<>(); } } diff --git a/src/main/java/com/example/demokafka/service/ProducerService.java b/src/main/java/com/example/demokafka/service/ProducerService.java new file mode 100644 index 0000000..dd56dd7 --- /dev/null +++ b/src/main/java/com/example/demokafka/service/ProducerService.java @@ -0,0 +1,70 @@ +package com.example.demokafka.service; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.messaging.Message; + +import java.util.concurrent.ExecutionException; + +/** + * @Description + * @Author ZhouWenTao + * @Date 2023/9/2 12:07 + */ +public interface ProducerService { + + /** + * 发送同步消息 + * @param topic + * @param data + * @throws ExecutionException + * @throws InterruptedException + */ + void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException; + + /** + * 发送普通消息 + * @param topic + * @param data + */ + void sendMessage(String topic, String data); + + /** + * 发送带附加信息的消息 + * @param record + */ + void sendMessage(ProducerRecord record); + + /** + * 发送Message消息 + * @param message + */ + void sendMessage(Message message); + + /** + * 发送带key的消息 + * @param topic + * @param key + * @param data + */ + void sendMessage(String topic, String key, String data); + + /** + * 发送带key和分区的消息 + * @param topic + * @param partition + * @param key + * @param data + */ + void sendMessage(String topic, Integer partition, String key, String data); + + /** + * 发送有分区,当前时间,key的消息 + * @param topic + * @param partition + * @param timestamp + * @param key + * @param data + */ + void sendMessage(String topic, Integer partition, Long timestamp, String key, String data); +} + diff --git a/src/main/java/com/example/demokafka/service/impl/ProducerServiceImpl.java b/src/main/java/com/example/demokafka/service/impl/ProducerServiceImpl.java new file mode 100644 index 0000000..80a1f73 --- /dev/null +++ b/src/main/java/com/example/demokafka/service/impl/ProducerServiceImpl.java @@ -0,0 +1,117 @@ +package com.example.demokafka.service.impl; + +import com.example.demokafka.service.ProducerService; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Service; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; + +import java.util.concurrent.ExecutionException; + +/** + * @Description + * @Author ZhouWenTao + * @Date 2023/9/2 12:09 + */ +@Service +@RequiredArgsConstructor +public class ProducerServiceImpl implements ProducerService { + + private final KafkaTemplate kafkaTemplate; + private static final Logger logger = LoggerFactory.getLogger(ProducerServiceImpl.class); + + @Override + public void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException { + SendResult sendResult = kafkaTemplate.send(topic, data).get(); + RecordMetadata recordMetadata = sendResult.getRecordMetadata(); + logger.info("发送同步消息成功!发送的主题为:{}", recordMetadata.topic()); + } + + @Override + public void sendMessage(String topic, String data) { + ListenableFuture> future = kafkaTemplate.send(topic, data); + future.addCallback(success -> logger.info("发送消息成功!"), failure -> logger.error("发送消息失败!失败原因是:{}", failure.getMessage())); + } + + @Override + public void sendMessage(ProducerRecord record) { + ListenableFuture> future = kafkaTemplate.send(record); + future.addCallback(new ListenableFutureCallback>() { + @Override + public void onFailure(Throwable throwable) { + logger.error("发送消息失败!失败原因是:{}", throwable.getMessage()); + } + + @Override + public void onSuccess(SendResult sendResult) { + RecordMetadata metadata = sendResult.getRecordMetadata(); + logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition()); + } + }); + } + + @Override + public void sendMessage(Message message) { + ListenableFuture> future = kafkaTemplate.send(message); + future.addCallback(new ListenableFutureCallback>() { + @Override + public void onFailure(Throwable throwable) { + logger.error("发送消息失败!失败原因是:{}", throwable.getMessage()); + } + + @Override + public void onSuccess(SendResult sendResult) { + RecordMetadata metadata = sendResult.getRecordMetadata(); + logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition()); + } + }); + } + + @Override + public void sendMessage(String topic, String key, String data) { + ListenableFuture> future = kafkaTemplate.send(topic, key, data); + future.addCallback(success -> logger.info("发送消息成功!"), failure -> logger.error("发送消息失败!失败原因是:{}", failure.getMessage())); + } + + @Override + public void sendMessage(String topic, Integer partition, String key, String data) { + ListenableFuture> future = kafkaTemplate.send(topic, partition, key, data); + future.addCallback(new ListenableFutureCallback>() { + @Override + public void onFailure(Throwable throwable) { + logger.error("发送消息失败!失败原因是:{}", throwable.getMessage()); + } + + @Override + public void onSuccess(SendResult sendResult) { + RecordMetadata metadata = sendResult.getRecordMetadata(); + logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition()); + } + }); + } + + @Override + public void sendMessage(String topic, Integer partition, Long timestamp, String key, String data) { + ListenableFuture> future = kafkaTemplate.send(topic, partition, timestamp, key, data); + future.addCallback(new ListenableFutureCallback>() { + @Override + public void onFailure(Throwable throwable) { + logger.error("发送消息失败!失败原因是:{}", throwable.getMessage()); + } + + @Override + public void onSuccess(SendResult sendResult) { + RecordMetadata metadata = sendResult.getRecordMetadata(); + logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition()); + } + }); + } +} + diff --git a/src/main/java/com/example/demokafka/util/Kafka.java b/src/main/java/com/example/demokafka/util/Kafka.java index 47f9511..b5f9c24 100644 --- a/src/main/java/com/example/demokafka/util/Kafka.java +++ b/src/main/java/com/example/demokafka/util/Kafka.java @@ -26,7 +26,7 @@ public class Kafka extends Thread { } private void poll(String sourceTopicName) { - try { + /*try { consumer116.subscribe(Arrays.asList(sourceTopicName)); while (true) { ConsumerRecords records = consumer116.poll(Duration.ofMillis(5000)); @@ -41,7 +41,7 @@ public class Kafka extends Thread { if (consumer116 != null) { consumer116.close(); } - } + }*/ } } \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties deleted file mode 100644 index ef6ac6c..0000000 --- a/src/main/resources/application.properties +++ /dev/null @@ -1,2 +0,0 @@ -server.port=10011 -knife4j.enable=true diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..ac74d59 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,23 @@ +server: + port: 10011 +knife4j: + enable: true + +spring: + kafka: + #??????? + topic: sourcetopic + producer: + bootstrap-servers: 10.0.10.153:29551 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + retries: 3 + properties: + retry.backoff.ms: 100 #?????????100 + linger.ms: 0 #???0???????????????????batch??? + max.request.size: 1048576 #??1MB?????????? + connections.max.idle.ms: 540000 #??9??????????????? + receive.buffer.bytes: 32768 #??32KB???socket????????????-1?????????? + send.buffer.bytes: 131072 #??128KB???socket???????????-1?????????? + request.timeout.ms: 10000 #??30000ms?????????????? + transaction.timeout.ms: 5000 \ No newline at end of file