diff --git a/pom.xml b/pom.xml index a8e1b35..0a16bbb 100644 --- a/pom.xml +++ b/pom.xml @@ -58,11 +58,18 @@ 2.0.4 - + + + + org.apache.kafka + kafka-clients + 2.8.1 + commons-io diff --git a/src/main/java/com/example/zxweb/service/ProducerService.java b/src/main/java/com/example/zxweb/service/ProducerService.java deleted file mode 100644 index ca75aeb..0000000 --- a/src/main/java/com/example/zxweb/service/ProducerService.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.example.zxweb.service; - -import org.apache.kafka.clients.producer.ProducerRecord; -import org.springframework.messaging.Message; - -import java.util.concurrent.ExecutionException; - -/** - * @Description - * @Author ZhouWenTao - * @Date 2023/9/2 12:07 - */ -public interface ProducerService { - - /** - * 发送同步消息 - * @param topic - * @param data - * @throws ExecutionException - * @throws InterruptedException - */ - void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException; - - /** - * 发送普通消息 - * @param topic - * @param data - */ - void sendMessage(String topic, String data); - - /** - * 发送带附加信息的消息 - * @param record - */ - void sendMessage(ProducerRecord record); - - /** - * 发送Message消息 - * @param message - */ - void sendMessage(Message message); - - /** - * 发送带key的消息 - * @param topic - * @param key - * @param data - */ - void sendMessage(String topic, String key, String data); - - /** - * 发送带key和分区的消息 - * @param topic - * @param partition - * @param key - * @param data - */ - void sendMessage(String topic, Integer partition, String key, String data); - - /** - * 发送有分区,当前时间,key的消息 - * @param topic - * @param partition - * @param timestamp - * @param key - * @param data - */ - void sendMessage(String topic, Integer partition, Long timestamp, String key, String data); -} - diff --git a/src/main/java/com/example/zxweb/service/impl/ProducerServiceImpl.java b/src/main/java/com/example/zxweb/service/impl/ProducerServiceImpl.java deleted file mode 100644 index fe321d4..0000000 --- a/src/main/java/com/example/zxweb/service/impl/ProducerServiceImpl.java +++ /dev/null @@ -1,117 +0,0 @@ -package com.example.zxweb.service.impl; - -import com.example.zxweb.service.ProducerService; -import lombok.RequiredArgsConstructor; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.SendResult; -import org.springframework.messaging.Message; -import org.springframework.stereotype.Service; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; - -import java.util.concurrent.ExecutionException; - -/** - * @Description - * @Author ZhouWenTao - * @Date 2023/9/2 12:09 - */ -@Service -@RequiredArgsConstructor -public class ProducerServiceImpl implements ProducerService { - - private final KafkaTemplate kafkaTemplate; - private static final Logger logger = LoggerFactory.getLogger(ProducerServiceImpl.class); - - @Override - public void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException { - SendResult sendResult = kafkaTemplate.send(topic, data).get(); - RecordMetadata recordMetadata = sendResult.getRecordMetadata(); - logger.info("发送同步消息成功!发送的主题为:{}", recordMetadata.topic()); - } - - @Override - public void sendMessage(String topic, String data) { - ListenableFuture> future = kafkaTemplate.send(topic, data); - future.addCallback(success -> logger.info("发送消息成功!"), failure -> logger.error("发送消息失败!失败原因是:{}", failure.getMessage())); - } - - @Override - public void sendMessage(ProducerRecord record) { - ListenableFuture> future = kafkaTemplate.send(record); - future.addCallback(new ListenableFutureCallback>() { - @Override - public void onFailure(Throwable throwable) { - logger.error("发送消息失败!失败原因是:{}", throwable.getMessage()); - } - - @Override - public void onSuccess(SendResult sendResult) { - RecordMetadata metadata = sendResult.getRecordMetadata(); - logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition()); - } - }); - } - - @Override - public void sendMessage(Message message) { - ListenableFuture> future = kafkaTemplate.send(message); - future.addCallback(new ListenableFutureCallback>() { - @Override - public void onFailure(Throwable throwable) { - logger.error("发送消息失败!失败原因是:{}", throwable.getMessage()); - } - - @Override - public void onSuccess(SendResult sendResult) { - RecordMetadata metadata = sendResult.getRecordMetadata(); - logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition()); - } - }); - } - - @Override - public void sendMessage(String topic, String key, String data) { - ListenableFuture> future = kafkaTemplate.send(topic, key, data); - future.addCallback(success -> logger.info("发送消息成功!"), failure -> logger.error("发送消息失败!失败原因是:{}", failure.getMessage())); - } - - @Override - public void sendMessage(String topic, Integer partition, String key, String data) { - ListenableFuture> future = kafkaTemplate.send(topic, partition, key, data); - future.addCallback(new ListenableFutureCallback>() { - @Override - public void onFailure(Throwable throwable) { - logger.error("发送消息失败!失败原因是:{}", throwable.getMessage()); - } - - @Override - public void onSuccess(SendResult sendResult) { - RecordMetadata metadata = sendResult.getRecordMetadata(); - logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition()); - } - }); - } - - @Override - public void sendMessage(String topic, Integer partition, Long timestamp, String key, String data) { - ListenableFuture> future = kafkaTemplate.send(topic, partition, timestamp, key, data); - future.addCallback(new ListenableFutureCallback>() { - @Override - public void onFailure(Throwable throwable) { - logger.error("发送消息失败!失败原因是:{}", throwable.getMessage()); - } - - @Override - public void onSuccess(SendResult sendResult) { - RecordMetadata metadata = sendResult.getRecordMetadata(); - logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition()); - } - }); - } -} - diff --git a/src/main/java/com/example/zxweb/utils/KafkaUtil.java b/src/main/java/com/example/zxweb/utils/KafkaUtil.java index 03c2f16..cd626b6 100644 --- a/src/main/java/com/example/zxweb/utils/KafkaUtil.java +++ b/src/main/java/com/example/zxweb/utils/KafkaUtil.java @@ -63,7 +63,7 @@ public class KafkaUtil { properties.setProperty("ssl.truststore.location", trustestore); properties.setProperty("ssl.truststore.password", "pwd123"); }else{ - properties.setProperty("bootstrap.servers", "10.0.10.153:29551,10.0.10.153:29552"); + properties.setProperty("bootstrap.servers", "10.0.10.153:29551,10.0.10.153:29552,10.0.10.153:29553"); } properties.setProperty("enable.auto.commit", "true"); diff --git a/src/main/resources/application-test.yml b/src/main/resources/application-test.yml index 8b9b797..bde9447 100644 --- a/src/main/resources/application-test.yml +++ b/src/main/resources/application-test.yml @@ -20,7 +20,7 @@ spring: receive.buffer.bytes: 32768 #??32KB???socket????????????-1?????????? send.buffer.bytes: 131072 #??128KB???socket???????????-1?????????? request.timeout.ms: 10000 #??30000ms?????????????? - transaction.timeout.ms: 5000 + transaction.timeout.ms: 10000 # thymeleaf 页面配置 thymeleaf: cache: false