main
周文涛 2 years ago
parent 93835b1e28
commit 468de48d4b

@ -58,11 +58,18 @@
<version>2.0.4</version>
</dependency>
<!--Kafka-->
<dependency>
<!--<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
</dependency>
<!-- commons -->
<dependency>
<groupId>commons-io</groupId>

@ -1,70 +0,0 @@
package com.example.zxweb.service;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.messaging.Message;
import java.util.concurrent.ExecutionException;
/**
* @Description
* @Author ZhouWenTao
* @Date 2023/9/2 12:07
*/
public interface ProducerService {
/**
*
* @param topic
* @param data
* @throws ExecutionException
* @throws InterruptedException
*/
void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException;
/**
*
* @param topic
* @param data
*/
void sendMessage(String topic, String data);
/**
*
* @param record
*/
void sendMessage(ProducerRecord<String, String> record);
/**
* Message
* @param message
*/
void sendMessage(Message<String> message);
/**
* key
* @param topic
* @param key
* @param data
*/
void sendMessage(String topic, String key, String data);
/**
* key
* @param topic
* @param partition
* @param key
* @param data
*/
void sendMessage(String topic, Integer partition, String key, String data);
/**
* key
* @param topic
* @param partition
* @param timestamp
* @param key
* @param data
*/
void sendMessage(String topic, Integer partition, Long timestamp, String key, String data);
}

@ -1,117 +0,0 @@
package com.example.zxweb.service.impl;
import com.example.zxweb.service.ProducerService;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.concurrent.ExecutionException;
/**
* @Description
* @Author ZhouWenTao
* @Date 2023/9/2 12:09
*/
@Service
@RequiredArgsConstructor
public class ProducerServiceImpl implements ProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
private static final Logger logger = LoggerFactory.getLogger(ProducerServiceImpl.class);
@Override
public void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException {
SendResult<String, String> sendResult = kafkaTemplate.send(topic, data).get();
RecordMetadata recordMetadata = sendResult.getRecordMetadata();
logger.info("发送同步消息成功!发送的主题为:{}", recordMetadata.topic());
}
@Override
public void sendMessage(String topic, String data) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data);
future.addCallback(success -> logger.info("发送消息成功!"), failure -> logger.error("发送消息失败!失败原因是:{}", failure.getMessage()));
}
@Override
public void sendMessage(ProducerRecord<String, String> record) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(record);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> sendResult) {
RecordMetadata metadata = sendResult.getRecordMetadata();
logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
}
});
}
@Override
public void sendMessage(Message<String> message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> sendResult) {
RecordMetadata metadata = sendResult.getRecordMetadata();
logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
}
});
}
@Override
public void sendMessage(String topic, String key, String data) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, data);
future.addCallback(success -> logger.info("发送消息成功!"), failure -> logger.error("发送消息失败!失败原因是:{}", failure.getMessage()));
}
@Override
public void sendMessage(String topic, Integer partition, String key, String data) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, data);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> sendResult) {
RecordMetadata metadata = sendResult.getRecordMetadata();
logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
}
});
}
@Override
public void sendMessage(String topic, Integer partition, Long timestamp, String key, String data) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, timestamp, key, data);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> sendResult) {
RecordMetadata metadata = sendResult.getRecordMetadata();
logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
}
});
}
}

@ -63,7 +63,7 @@ public class KafkaUtil {
properties.setProperty("ssl.truststore.location", trustestore);
properties.setProperty("ssl.truststore.password", "pwd123");
}else{
properties.setProperty("bootstrap.servers", "10.0.10.153:29551,10.0.10.153:29552");
properties.setProperty("bootstrap.servers", "10.0.10.153:29551,10.0.10.153:29552,10.0.10.153:29553");
}
properties.setProperty("enable.auto.commit", "true");

@ -20,7 +20,7 @@ spring:
receive.buffer.bytes: 32768 #??32KB???socket????????????-1??????????
send.buffer.bytes: 131072 #??128KB???socket???????????-1??????????
request.timeout.ms: 10000 #??30000ms??????????????
transaction.timeout.ms: 5000
transaction.timeout.ms: 10000
# thymeleaf 页面配置
thymeleaf:
cache: false

Loading…
Cancel
Save