main
周文涛 2 years ago
parent 60ab9fce2c
commit 2a5745f4ee

@ -45,7 +45,6 @@
<!-- <version>2.0.9</version>-->
<!-- </dependency>-->
<dependency>
<groupId>com.spring4all</groupId>
<artifactId>swagger-spring-boot-starter</artifactId>
@ -57,11 +56,21 @@
<artifactId>knife4j-spring-ui</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!--<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
</dependency>
</dependency>-->
</dependencies>
<build>

@ -2,10 +2,14 @@ package com.example.demokafka.controller;
import com.alibaba.fastjson.JSONObject;
import com.example.demokafka.model.Person;
import com.example.demokafka.service.ProducerService;
import com.example.demokafka.util.Kafka;
import io.swagger.annotations.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;
import springfox.documentation.annotations.ApiIgnore;
@ -24,10 +28,17 @@ import java.util.Map;
@Api(tags = "测试")
public class TestController {
@Autowired
private ProducerService producerService;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${spring.kafka.topic}")
private String topicName;
@GetMapping("/kafka01")
@ApiOperation(value = "kafka01")
public List<Person> kafka01() {
new Kafka().start();
producerService.sendMessage(topicName, "测试");
return new ArrayList<>();
}
}

@ -0,0 +1,70 @@
package com.example.demokafka.service;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.messaging.Message;
import java.util.concurrent.ExecutionException;
/**
* @Description
* @Author ZhouWenTao
* @Date 2023/9/2 12:07
*/
public interface ProducerService {
/**
*
* @param topic
* @param data
* @throws ExecutionException
* @throws InterruptedException
*/
void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException;
/**
*
* @param topic
* @param data
*/
void sendMessage(String topic, String data);
/**
*
* @param record
*/
void sendMessage(ProducerRecord<String, String> record);
/**
* Message
* @param message
*/
void sendMessage(Message<String> message);
/**
* key
* @param topic
* @param key
* @param data
*/
void sendMessage(String topic, String key, String data);
/**
* key
* @param topic
* @param partition
* @param key
* @param data
*/
void sendMessage(String topic, Integer partition, String key, String data);
/**
* key
* @param topic
* @param partition
* @param timestamp
* @param key
* @param data
*/
void sendMessage(String topic, Integer partition, Long timestamp, String key, String data);
}

@ -0,0 +1,117 @@
package com.example.demokafka.service.impl;
import com.example.demokafka.service.ProducerService;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.concurrent.ExecutionException;
/**
* @Description
* @Author ZhouWenTao
* @Date 2023/9/2 12:09
*/
@Service
@RequiredArgsConstructor
public class ProducerServiceImpl implements ProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
private static final Logger logger = LoggerFactory.getLogger(ProducerServiceImpl.class);
@Override
public void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException {
SendResult<String, String> sendResult = kafkaTemplate.send(topic, data).get();
RecordMetadata recordMetadata = sendResult.getRecordMetadata();
logger.info("发送同步消息成功!发送的主题为:{}", recordMetadata.topic());
}
@Override
public void sendMessage(String topic, String data) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data);
future.addCallback(success -> logger.info("发送消息成功!"), failure -> logger.error("发送消息失败!失败原因是:{}", failure.getMessage()));
}
@Override
public void sendMessage(ProducerRecord<String, String> record) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(record);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> sendResult) {
RecordMetadata metadata = sendResult.getRecordMetadata();
logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
}
});
}
@Override
public void sendMessage(Message<String> message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> sendResult) {
RecordMetadata metadata = sendResult.getRecordMetadata();
logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
}
});
}
@Override
public void sendMessage(String topic, String key, String data) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, data);
future.addCallback(success -> logger.info("发送消息成功!"), failure -> logger.error("发送消息失败!失败原因是:{}", failure.getMessage()));
}
@Override
public void sendMessage(String topic, Integer partition, String key, String data) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, data);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> sendResult) {
RecordMetadata metadata = sendResult.getRecordMetadata();
logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
}
});
}
@Override
public void sendMessage(String topic, Integer partition, Long timestamp, String key, String data) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, timestamp, key, data);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> sendResult) {
RecordMetadata metadata = sendResult.getRecordMetadata();
logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
}
});
}
}

@ -26,7 +26,7 @@ public class Kafka extends Thread {
}
private void poll(String sourceTopicName) {
try {
/*try {
consumer116.subscribe(Arrays.asList(sourceTopicName));
while (true) {
ConsumerRecords<String, String> records = consumer116.poll(Duration.ofMillis(5000));
@ -41,7 +41,7 @@ public class Kafka extends Thread {
if (consumer116 != null) {
consumer116.close();
}
}
}*/
}
}

@ -1,2 +0,0 @@
server.port=10011
knife4j.enable=true

@ -0,0 +1,23 @@
server:
port: 10011
knife4j:
enable: true
spring:
kafka:
#???????
topic: sourcetopic
producer:
bootstrap-servers: 10.0.10.153:29551
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 3
properties:
retry.backoff.ms: 100 #?????????100
linger.ms: 0 #???0???????????????????batch???
max.request.size: 1048576 #??1MB??????????
connections.max.idle.ms: 540000 #??9???????????????
receive.buffer.bytes: 32768 #??32KB???socket????????????-1??????????
send.buffer.bytes: 131072 #??128KB???socket???????????-1??????????
request.timeout.ms: 10000 #??30000ms??????????????
transaction.timeout.ms: 5000
Loading…
Cancel
Save