From 0dad49081b585e1ead7837e3206d806a7a27b656 Mon Sep 17 00:00:00 2001 From: 3y Date: Tue, 5 Apr 2022 22:39:03 +0800 Subject: [PATCH 1/9] =?UTF-8?q?vip=E6=90=AD=E5=BB=BA=E5=A5=BDredis/kafka/m?= =?UTF-8?q?ysql/apollo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/java3y/austin/AustinApplication.java | 2 +- .../src/main/resources/application.properties | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/austin-web/src/main/java/com/java3y/austin/AustinApplication.java b/austin-web/src/main/java/com/java3y/austin/AustinApplication.java index 7aa37a3..f562187 100644 --- a/austin-web/src/main/java/com/java3y/austin/AustinApplication.java +++ b/austin-web/src/main/java/com/java3y/austin/AustinApplication.java @@ -11,7 +11,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; public class AustinApplication { public static void main(String[] args) { // TODO apollo的ip/port【must】 - System.setProperty("apollo.config-service", "http://ip:7000"); + System.setProperty("apollo.config-service", "http://austin.apollo.config:5001"); SpringApplication.run(AustinApplication.class, args); } } diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index 20c1b3c..a5dd3b1 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -3,19 +3,19 @@ # TODO please replace 【must】 config value # todo [database] ip/port/username/password 【must】 -austin-database-ip= -austin-database-port= -austin-database-username= -austin-database-password= +austin-database-ip=austin.mysql +austin-database-port=5004 +austin-database-username=root +austin-database-password=root123_A # todo [kafka] ip/port【must】 -austin-kafka-ip= -austin-kafka-port= +austin-kafka-ip=austin.kafka +austin-kafka-port=9092 # todo [redis] ip/port/password【must】 -austin-redis-ip= -austin-redis-port= -austin-redis-password= +austin-redis-ip=austin.redis +austin-redis-port=5003 +austin-redis-password=austin # todo [xxl-job] ip/port【optional】 austin-xxl-job-ip=127.0.0.1 From ea5ab979edf667b4c3e102ca27d21fa03d279fca Mon Sep 17 00:00:00 2001 From: 3y Date: Wed, 6 Apr 2022 20:05:39 +0800 Subject: [PATCH 2/9] =?UTF-8?q?vip=20=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../austin/stream/constants/AustinFlinkConstant.java | 8 +++++--- austin-web/src/main/resources/application.properties | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java b/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java index 75710cd..5498a67 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java @@ -9,17 +9,19 @@ public class AustinFlinkConstant { /** * Kafka 配置信息 * TODO 使用前配置kafka broker ip:port + * (真实网络ip,这里不能用配置的hosts,看语雀文档得到真实ip) */ public static final String GROUP_ID = "austinLogGroup"; public static final String TOPIC_NAME = "austinLog"; - public static final String BROKER = "ip:port"; + public static final String BROKER = "austin.flink"; /** * redis 配置 * TODO 使用前配置redis ip:port + * (真实网络ip,这里不能用配置的hosts,看语雀文档得到真实ip) */ - public static final String REDIS_IP = "ip"; - public static final String REDIS_PORT = "port"; + public static final String REDIS_IP = "austin.redis"; + public static final String REDIS_PORT = "5003"; public static final String REDIS_PASSWORD = "austin"; diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index a5dd3b1..103494b 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -22,7 +22,7 @@ austin-xxl-job-ip=127.0.0.1 austin-xxl-job-port=6767 # todo [grayLog] ip【optional】 -austin-grayLog-ip=127.0.0.1 +austin-grayLog-ip=austin.graylog ##################### system properties ##################### server.shutdown=graceful From 876f79d2c600e06c32232a29a914ce34bf9f8c97 Mon Sep 17 00:00:00 2001 From: 3y Date: Tue, 28 Jun 2022 20:40:19 +0800 Subject: [PATCH 3/9] =?UTF-8?q?vip=20=E6=94=AF=E6=8C=81=20kafka=20tag=20?= =?UTF-8?q?=E8=BF=87=E6=BB=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../austin/handler/receiver/Receiver.java | 6 ++- .../handler/receiver/ReceiverStart.java | 40 +++++++++++++++++++ .../service/api/impl/action/SendMqAction.java | 7 +++- .../austin/support/utils/KafkaUtils.java | 30 +++++++++++++- .../src/main/resources/application.properties | 5 +++ .../src/main/resources/local.properties | 2 +- 6 files changed, 84 insertions(+), 6 deletions(-) diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java index 29465ea..425f8c3 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java @@ -13,6 +13,7 @@ import com.java3y.austin.handler.utils.GroupIdMappingUtils; import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.utils.LogUtils; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; @@ -50,10 +51,11 @@ public class Receiver { /** * 发送消息 + * * @param consumerRecord * @param topicGroupId */ - @KafkaListener(topics = "#{'${austin.business.topic.name}'}") + @KafkaListener(topics = "#{'${austin.business.topic.name}'}", containerFactory = "filterContainerFactory") public void consumer(ConsumerRecord consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId) { Optional kafkaMessage = Optional.ofNullable(consumerRecord.value()); if (kafkaMessage.isPresent()) { @@ -78,7 +80,7 @@ public class Receiver { * 撤回消息 * @param consumerRecord */ - @KafkaListener(topics = "#{'${austin.business.recall.topic.name}'}",groupId = "#{'${austin.business.recall.group.name}'}") + @KafkaListener(topics = "#{'${austin.business.recall.topic.name}'}",groupId = "#{'${austin.business.recall.group.name}'}",containerFactory = "filterContainerFactory") public void recall(ConsumerRecord consumerRecord){ Optional kafkaMessage = Optional.ofNullable(consumerRecord.value()); if(kafkaMessage.isPresent()){ diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/ReceiverStart.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/ReceiverStart.java index a7f02a6..754fb43 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/ReceiverStart.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/ReceiverStart.java @@ -1,15 +1,26 @@ package com.java3y.austin.handler.receiver; +import com.alibaba.fastjson.JSON; import com.java3y.austin.handler.utils.GroupIdMappingUtils; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Optional; /** * 启动消费者 @@ -18,9 +29,12 @@ import java.util.List; * @date 2021/12/4 */ @Service +@Slf4j public class ReceiverStart { @Autowired private ApplicationContext context; + @Autowired + private ConsumerFactory consumerFactory; /** * receiver的消费方法常量 @@ -63,4 +77,30 @@ public class ReceiverStart { return attrs; }; } + + /** + * 针对tag消息过滤 + * producer 将tag写进header里 + * @return + */ + @Bean + public ConcurrentKafkaListenerContainerFactory filterContainerFactory(@Value("${austin.business.tagId.key}") String tagIdKey, + @Value("${austin.business.tagId.value}") String tagIdValue) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory); + factory.setAckDiscarded(true); + + factory.setRecordFilterStrategy(consumerRecord -> { + if (Optional.ofNullable(consumerRecord.value()).isPresent()) { + for (Header header : consumerRecord.headers()) { + if (header.key().equals(tagIdKey) && new String(header.value()).equals(new String(tagIdValue.getBytes(StandardCharsets.UTF_8)))) { + return false; + } + } + } + //返回true将会被丢弃 + return true; + }); + return factory; + } } diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java index 7711a7e..d82d37c 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java @@ -33,16 +33,19 @@ public class SendMqAction implements BusinessProcess { @Value("${austin.business.recall.topic.name}") private String austinRecall; + @Value("${austin.business.tagId.value}") + private String tagId; + @Override public void process(ProcessContext context) { SendTaskModel sendTaskModel = context.getProcessModel(); try { if (BusinessCode.COMMON_SEND.getCode().equals(context.getCode())) { String message = JSON.toJSONString(sendTaskModel.getTaskInfo(), new SerializerFeature[]{SerializerFeature.WriteClassName}); - kafkaUtils.send(sendMessageTopic, message); + kafkaUtils.send(sendMessageTopic, message, tagId); } else if (BusinessCode.RECALL.getCode().equals(context.getCode())) { String message = JSON.toJSONString(sendTaskModel.getMessageTemplate(), new SerializerFeature[]{SerializerFeature.WriteClassName}); - kafkaUtils.send(austinRecall, message); + kafkaUtils.send(austinRecall, message, tagId); } } catch (Exception e) { context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR)); diff --git a/austin-support/src/main/java/com/java3y/austin/support/utils/KafkaUtils.java b/austin-support/src/main/java/com/java3y/austin/support/utils/KafkaUtils.java index 193f1ee..b2a1587 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/utils/KafkaUtils.java +++ b/austin-support/src/main/java/com/java3y/austin/support/utils/KafkaUtils.java @@ -1,10 +1,19 @@ package com.java3y.austin.support.utils; +import cn.hutool.core.util.StrUtil; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; + /** * @author 3y * @date 2022/2/16 @@ -16,6 +25,8 @@ public class KafkaUtils { @Autowired private KafkaTemplate kafkaTemplate; + @Value("${austin.business.tagId.key}") + private String tagIdKey; /** * 发送kafka消息 @@ -24,7 +35,24 @@ public class KafkaUtils { * @param jsonMessage */ public void send(String topicName, String jsonMessage) { - kafkaTemplate.send(topicName, jsonMessage); + kafkaTemplate.send(topicName, jsonMessage, null); } + /** + * 发送kafka消息 + * 支持tag过滤 + * + * @param topicName + * @param jsonMessage + * @param tagId + */ + public void send(String topicName, String jsonMessage, String tagId) { + if (StrUtil.isNotBlank(tagId)) { + List
headers = Arrays.asList(new RecordHeader(tagIdKey, tagId.getBytes(StandardCharsets.UTF_8))); + kafkaTemplate.send(new ProducerRecord(topicName, null, null, null, jsonMessage, headers)); + } else { + kafkaTemplate.send(topicName, jsonMessage); + } + + } } diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index b3cdaa0..1e7dbc6 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -60,6 +60,11 @@ austin.business.recall.topic.name=austinRecall austin.business.recall.group.name=recallGroupId austin.business.log.topic.name=austinLog austin.business.graylog.ip=${austin-grayLog-ip} + +# TODO kafka tag filter,if you need, replace tagIdValue ,eg:com.java3y.austin.yyy +austin.business.tagId.key=kafka_tag_id +austin.business.tagId.value=com.java3y.austin.3y + # TODO if windows os and need upload file to send message ,replace path ! austin.business.upload.crowd.path=/Users/3y/temp diff --git a/austin-web/src/main/resources/local.properties b/austin-web/src/main/resources/local.properties index 8e6f9f2..807c7f2 100644 --- a/austin-web/src/main/resources/local.properties +++ b/austin-web/src/main/resources/local.properties @@ -1,6 +1,6 @@ discardMsgIds = [] deduplicationRule = {"deduplication_10":{"num":1,"time":300},"deduplication_20":{"num":5}} -emailAccount =[{"email_10":{"host":"smtp.qq.com","port":465,"user":"23423423@qq.com","pass":"23423432432423423","from":"234@qq.com","starttlsEnable":true,"auth":true,"sslEnable":true}},{"email_20":{"host":"smtp.163.com","port":465,"user":"22222@163.com","pass":"23432423","from":"234324324234@163.com","starttlsEnable":false,"auth":true,"sslEnable":true}}] +emailAccount = [{"email_10":{"host":"smtp.qq.com","port":465,"user":"23423432@qq.com","pass":"234324324","from":"123123@qq.com","starttlsEnable":"true","auth":true,"sslEnable":true}},{"email_20":{"host":"smtp.163.com","port":465,"user":"23423423@163.com","pass":"234234324","from":"112312312@163.com","starttlsEnable":"false","auth":true,"sslEnable":true}}] smsAccount = [{"sms_10":{"url":"sms.tencentcloudapi.com","region":"ap-guangzhou","secretId":"234234","secretKey":"234324324","smsSdkAppId":"2343242","templateId":"234234","signName":"Java3y公众号","supplierId":10,"supplierName":"腾讯云"}},{"sms_20":{"url":"https://sms.yunpian.com/v2/sms/tpl_batch_send.json","apikey":"23423432","tpl_id":"23423432","supplierId":20,"supplierName":"云片"}}] enterpriseWechatAccount = [{"enterprise_wechat_10":{"corpId":"23423423","corpSecret":"-234324234","agentId":1000002,"token":"234234","aesKey":"23423423"}}] dingDingRobotAccount = [{"ding_ding_robot_10":{"secret":"234324324324","webhook":"https://oapi.dingtalk.com/robot/send?access_token=8d03b68d081f732343243242343247328b0c3003d164715d2c6c6e56"}}] From 47d9e75d392efacd2adf82302cec1917c7852c2b Mon Sep 17 00:00:00 2001 From: 3y Date: Thu, 7 Jul 2022 20:30:37 +0800 Subject: [PATCH 4/9] =?UTF-8?q?vip:=E4=BF=AE=E5=A4=8D=E9=9D=9Ekafka=20tag?= =?UTF-8?q?=E8=BF=87=E6=BB=A4=E7=9A=84=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java3y/austin/stream/constants/AustinFlinkConstant.java | 2 +- .../main/java/com/java3y/austin/support/utils/KafkaUtils.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java b/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java index 5498a67..48b34e5 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java @@ -13,7 +13,7 @@ public class AustinFlinkConstant { */ public static final String GROUP_ID = "austinLogGroup"; public static final String TOPIC_NAME = "austinLog"; - public static final String BROKER = "austin.flink"; + public static final String BROKER = "austin.kafka"; /** * redis 配置 diff --git a/austin-support/src/main/java/com/java3y/austin/support/utils/KafkaUtils.java b/austin-support/src/main/java/com/java3y/austin/support/utils/KafkaUtils.java index b2a1587..4550491 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/utils/KafkaUtils.java +++ b/austin-support/src/main/java/com/java3y/austin/support/utils/KafkaUtils.java @@ -29,13 +29,13 @@ public class KafkaUtils { private String tagIdKey; /** - * 发送kafka消息 + * 发送kafka消息(不支持tag过滤) * * @param topicName * @param jsonMessage */ public void send(String topicName, String jsonMessage) { - kafkaTemplate.send(topicName, jsonMessage, null); + send(topicName, jsonMessage, null); } /** From bd481fad8012bfffd402e79e812d95ded3ee99ad Mon Sep 17 00:00:00 2001 From: 3y Date: Thu, 7 Jul 2022 20:32:36 +0800 Subject: [PATCH 5/9] =?UTF-8?q?=E6=9B=B4=E6=94=B9=20vip=20=E5=86=99?= =?UTF-8?q?=E5=85=A5=20kafka=20=E5=85=A8=E9=93=BE=E8=B7=AF=E8=BF=BD?= =?UTF-8?q?=E8=B8=AA=E7=9A=84topic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/java3y/austin/stream/constants/AustinFlinkConstant.java | 2 +- austin-web/src/main/resources/application.properties | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java b/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java index 48b34e5..a6b95f3 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java @@ -12,7 +12,7 @@ public class AustinFlinkConstant { * (真实网络ip,这里不能用配置的hosts,看语雀文档得到真实ip) */ public static final String GROUP_ID = "austinLogGroup"; - public static final String TOPIC_NAME = "austinLog"; + public static final String TOPIC_NAME = "austinTraceLog"; public static final String BROKER = "austin.kafka"; /** diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index 1e7dbc6..ecf9a99 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -58,7 +58,7 @@ spring.redis.password=${austin-redis-password} austin.business.topic.name=austinBusiness austin.business.recall.topic.name=austinRecall austin.business.recall.group.name=recallGroupId -austin.business.log.topic.name=austinLog +austin.business.log.topic.name=austinTraceLog austin.business.graylog.ip=${austin-grayLog-ip} # TODO kafka tag filter,if you need, replace tagIdValue ,eg:com.java3y.austin.yyy From 0099f9c1b781082348082390360e6bec0ce4a6c4 Mon Sep 17 00:00:00 2001 From: 3y Date: Thu, 7 Jul 2022 20:42:13 +0800 Subject: [PATCH 6/9] add comment --- .../com/java3y/austin/stream/constants/AustinFlinkConstant.java | 1 + 1 file changed, 1 insertion(+) diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java b/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java index a6b95f3..b41f5e6 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java @@ -10,6 +10,7 @@ public class AustinFlinkConstant { * Kafka 配置信息 * TODO 使用前配置kafka broker ip:port * (真实网络ip,这里不能用配置的hosts,看语雀文档得到真实ip) + * (如果想要自己监听到所有的消息,改掉groupId) */ public static final String GROUP_ID = "austinLogGroup"; public static final String TOPIC_NAME = "austinTraceLog"; From 0f6f66bd9e05dd22e2e1932fd939e0cd342509bb Mon Sep 17 00:00:00 2001 From: 3y Date: Mon, 11 Jul 2022 20:14:05 +0800 Subject: [PATCH 7/9] update vip --- austin-web/src/main/resources/application.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index f9950da..e795ca0 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -14,7 +14,7 @@ austin-redis-port=5003 austin-redis-password=austin # TODO kafka/eventbus -austin-mq-pipeline=eventbus +austin-mq-pipeline=kafka # todo [kafka] ip/port【optional】, if austin-mq-pipeline=kafka 【must】 austin-kafka-ip=austin.kafka From f6b6e23866a1c2474fa64123ce4556117ae34fe9 Mon Sep 17 00:00:00 2001 From: xzcawl Date: Fri, 15 Jul 2022 17:36:28 +0800 Subject: [PATCH 8/9] add rabbitMq send --- austin-support/pom.xml | 6 +++ .../constans/MessageQueuePipeline.java | 1 + .../mq/rabbit/RabbitSendMqServiceImpl.java | 45 +++++++++++++++++++ .../src/main/resources/application.properties | 19 ++++++++ pom.xml | 8 ++++ 5 files changed, 79 insertions(+) create mode 100644 austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java diff --git a/austin-support/pom.xml b/austin-support/pom.xml index 96ae2e0..afc9ab8 100644 --- a/austin-support/pom.xml +++ b/austin-support/pom.xml @@ -93,6 +93,12 @@ com.aliyun alibaba-dingtalk-service-sdk + + + org.springframework.amqp + spring-rabbit + + \ No newline at end of file diff --git a/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java b/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java index 3cf93fd..e31b81e 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java +++ b/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java @@ -10,5 +10,6 @@ public interface MessageQueuePipeline { String EVENT_BUS = "eventBus"; String KAFKA = "kafka"; String ROCKET_MQ = "rocketMq"; + String RABBIT_MQ = "rabbitMq"; } diff --git a/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java b/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java new file mode 100644 index 0000000..661f4b7 --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java @@ -0,0 +1,45 @@ +package com.java3y.austin.support.mq.rabbit; + +import com.java3y.austin.support.constans.MessageQueuePipeline; +import com.java3y.austin.support.mq.SendMqService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + + +/** + * @Autor xzcawl + * @Date 2022/7/15 17:29 + */ +@Slf4j +@Service +@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.RABBIT_MQ) +public class RabbitSendMqServiceImpl implements SendMqService { + + @Autowired + private RabbitTemplate rabbitTemplate; + + @Value("${austin.rabbitmq.topic.name}") + private String confTopic; + + @Value("${austin.rabbitmq.exchange.name}") + private String exchangeName; + + + @Override + public void send(String topic, String jsonValue, String tagId) { + if (topic.equals(confTopic)) { + rabbitTemplate.convertAndSend(exchangeName, confTopic, jsonValue); + } else { + log.error("RabbitSendMqServiceImpl send topic error! topic:{},confTopic:{}", topic, confTopic); + } + } + + @Override + public void send(String topic, String jsonValue) { + send(topic, jsonValue, null); + } +} diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index e795ca0..5c32455 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -53,6 +53,25 @@ spring.kafka.consumer.auto.offset.reset=earliest spring.kafka.consumer.auto-commit-interval=1000 spring.kafka.consumer.enable-auto-commit=true +##################### Rabbit properties ##################### +server.port=8080 +spring.application.name=cl +#RabbitMq所在服务器IP +spring.rabbitmq.host=127.0.0.1 +#连接端口号 +spring.rabbitmq.port=5672 +#用户名 +spring.rabbitmq.username=root +#用户密码 +spring.rabbitmq.password=123456 +# 开启发送确认 +spring.rabbitmq.publisher-confirm-type=correlated +# 开启发送失败退回 +spring.rabbitmq.publisher-returns=true +spring.rabbitmq.virtual-host=/ +austin.rabbitmq.topic.name=austinRabbit +austin.rabbitmq.exchange.name=austin.point + ##################### redis properties ##################### spring.redis.host=${austin-redis-ip} spring.redis.port=${austin-redis-port} diff --git a/pom.xml b/pom.xml index 49b1152..ea836cb 100644 --- a/pom.xml +++ b/pom.xml @@ -153,6 +153,13 @@ ${flink.version} + + + org.apache.flink + flink-connector-rabbitmq + 1.15.1 + + com.github.binarywang @@ -187,6 +194,7 @@ alibaba-dingtalk-service-sdk 2.0.0 + From 95236d69e307f8f933fc431eadfd8804f543ef7a Mon Sep 17 00:00:00 2001 From: dominicpoi Date: Sun, 17 Jul 2022 00:46:50 +0800 Subject: [PATCH 9/9] =?UTF-8?q?=E5=A2=9E=E5=8A=A0rocketmq=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rocketmq/RocketMqBizReceiver.java | 43 +++++++++++++++++++ .../rocketmq/RocketMqRecallReceiver.java | 41 ++++++++++++++++++ austin-support/pom.xml | 5 +++ .../rocketmq/RocketMqSendMqServiceImpl.java | 39 +++++++++++++++++ .../src/main/resources/application.properties | 10 +++++ pom.xml | 6 +++ 6 files changed, 144 insertions(+) create mode 100644 austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocketmq/RocketMqBizReceiver.java create mode 100644 austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocketmq/RocketMqRecallReceiver.java create mode 100644 austin-support/src/main/java/com/java3y/austin/support/mq/rocketmq/RocketMqSendMqServiceImpl.java diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocketmq/RocketMqBizReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocketmq/RocketMqBizReceiver.java new file mode 100644 index 0000000..92e4efb --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocketmq/RocketMqBizReceiver.java @@ -0,0 +1,43 @@ +package com.java3y.austin.handler.receiver.rocketmq; + +import com.alibaba.fastjson.JSON; +import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.handler.receiver.service.ConsumeService; +import com.java3y.austin.support.constans.MessageQueuePipeline; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.annotation.SelectorType; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * Description: + * + * @author elpsycongroo + * create date: 2022/7/16 + */ +@Component +@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.ROCKET_MQ) +@RocketMQMessageListener(topic = "${austin.business.topic.name}", + consumerGroup = "${austin-rocketmq-biz-consumer-group}", + selectorType = SelectorType.TAG, + selectorExpression = "${austin.business.tagId.value}" +) +public class RocketMqBizReceiver implements RocketMQListener { + + @Autowired + private ConsumeService consumeService; + + @Override + public void onMessage(String message) { + if (StringUtils.isBlank(message)) { + return; + } + List taskInfoLists = JSON.parseArray(message, TaskInfo.class); + consumeService.consume2Send(taskInfoLists); + } +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocketmq/RocketMqRecallReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocketmq/RocketMqRecallReceiver.java new file mode 100644 index 0000000..aebacd0 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocketmq/RocketMqRecallReceiver.java @@ -0,0 +1,41 @@ +package com.java3y.austin.handler.receiver.rocketmq; + +import com.alibaba.fastjson.JSON; +import com.java3y.austin.handler.receiver.service.ConsumeService; +import com.java3y.austin.support.constans.MessageQueuePipeline; +import com.java3y.austin.support.domain.MessageTemplate; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.annotation.SelectorType; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * Description: + * + * @author elpsycongroo + * create date: 2022/7/16 + */ +@Component +@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.ROCKET_MQ) +@RocketMQMessageListener(topic = "${austin.business.recall.topic.name}", + consumerGroup = "${austin-rocketmq-recall-consumer-group}", + selectorType = SelectorType.TAG, + selectorExpression = "${austin.business.tagId.value}" +) +public class RocketMqRecallReceiver implements RocketMQListener { + + @Autowired + private ConsumeService consumeService; + + @Override + public void onMessage(String message) { + if (StringUtils.isBlank(message)) { + return; + } + MessageTemplate messageTemplate = JSON.parseObject(message, MessageTemplate.class); + consumeService.consume2recall(messageTemplate); + } +} diff --git a/austin-support/pom.xml b/austin-support/pom.xml index 96ae2e0..3f9a4ec 100644 --- a/austin-support/pom.xml +++ b/austin-support/pom.xml @@ -63,6 +63,11 @@ spring-kafka + + org.apache.rocketmq + rocketmq-spring-boot-starter + + org.springframework.boot spring-boot-starter-data-redis diff --git a/austin-support/src/main/java/com/java3y/austin/support/mq/rocketmq/RocketMqSendMqServiceImpl.java b/austin-support/src/main/java/com/java3y/austin/support/mq/rocketmq/RocketMqSendMqServiceImpl.java new file mode 100644 index 0000000..3749ee9 --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/mq/rocketmq/RocketMqSendMqServiceImpl.java @@ -0,0 +1,39 @@ +package com.java3y.austin.support.mq.rocketmq; + +import com.java3y.austin.support.constans.MessageQueuePipeline; +import com.java3y.austin.support.mq.SendMqService; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; + +/** + * Description: + * + * @author elpsycongroo + * create date: 2022/7/15 + */ +@Slf4j +@Service +@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.ROCKET_MQ) +public class RocketMqSendMqServiceImpl implements SendMqService { + + @Autowired + private RocketMQTemplate rocketMQTemplate; + + @Override + public void send(String topic, String jsonValue, String tagId) { + if (StringUtils.isNotBlank(tagId)) { + topic = topic + ":" + tagId; + } + send(topic, jsonValue); + } + + @Override + public void send(String topic, String jsonValue) { + rocketMQTemplate.send(topic, MessageBuilder.withPayload(jsonValue).build()); + } +} diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index e795ca0..784a328 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -20,6 +20,12 @@ austin-mq-pipeline=kafka austin-kafka-ip=austin.kafka austin-kafka-port=9092 +# todo [rocketmq] 【optional】, if austin-mq-pipeline=rocketMq【must】 +austin-rocketmq-nameserver-ip=127.0.0.1 +austin-rocketmq-nameserver-port=9876 +austin-rocketmq-producer-group=unique-producer-group +austin-rocketmq-biz-consumer-group=unique-biz-consumer-group +austin-rocketmq-recall-consumer-group=unique-recall-consumer-group # todo [xxl-job] switch/ip/port/【optional】 xxl-job.enabled=false @@ -53,6 +59,10 @@ spring.kafka.consumer.auto.offset.reset=earliest spring.kafka.consumer.auto-commit-interval=1000 spring.kafka.consumer.enable-auto-commit=true +##################### rocketmq properties ##################### +rocketmq.name-server=${austin-rocketmq-nameserver-ip}:${austin-rocketmq-nameserver-port} +rocketmq.producer.group=${austin-rocketmq-producer-group} + ##################### redis properties ##################### spring.redis.host=${austin-redis-ip} spring.redis.port=${austin-redis-port} diff --git a/pom.xml b/pom.xml index 49b1152..89b008c 100644 --- a/pom.xml +++ b/pom.xml @@ -187,6 +187,12 @@ alibaba-dingtalk-service-sdk 2.0.0 + + + org.apache.rocketmq + rocketmq-spring-boot-starter + 2.2.2 +