diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/Receiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/Receiver.java index 8f1ec37..c29a1cf 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/Receiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/Receiver.java @@ -42,10 +42,11 @@ public class Receiver { private ConsumeService consumeService; /** * 发送消息 + * * @param consumerRecord * @param topicGroupId */ - @KafkaListener(topics = "#{'${austin.business.topic.name}'}") + @KafkaListener(topics = "#{'${austin.business.topic.name}'}", containerFactory = "filterContainerFactory") public void consumer(ConsumerRecord consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId) { Optional kafkaMessage = Optional.ofNullable(consumerRecord.value()); if (kafkaMessage.isPresent()) { @@ -65,7 +66,7 @@ public class Receiver { * 撤回消息 * @param consumerRecord */ - @KafkaListener(topics = "#{'${austin.business.recall.topic.name}'}",groupId = "#{'${austin.business.recall.group.name}'}") + @KafkaListener(topics = "#{'${austin.business.recall.topic.name}'}",groupId = "#{'${austin.business.recall.group.name}'}",containerFactory = "filterContainerFactory") public void recall(ConsumerRecord consumerRecord){ Optional kafkaMessage = Optional.ofNullable(consumerRecord.value()); if(kafkaMessage.isPresent()){ diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/ReceiverStart.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/ReceiverStart.java index 800a984..01887ae 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/ReceiverStart.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/ReceiverStart.java @@ -1,17 +1,28 @@ package com.java3y.austin.handler.receiver.kafka; +import com.alibaba.fastjson.JSON; import com.java3y.austin.handler.utils.GroupIdMappingUtils; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; import com.java3y.austin.support.constans.MessageQueuePipeline; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Optional; /** * 启动消费者 @@ -21,10 +32,13 @@ import java.util.List; */ @Service @ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.KAFKA) +@Slf4j public class ReceiverStart { @Autowired private ApplicationContext context; + @Autowired + private ConsumerFactory consumerFactory; /** * receiver的消费方法常量 @@ -67,4 +81,30 @@ public class ReceiverStart { return attrs; }; } + + /** + * 针对tag消息过滤 + * producer 将tag写进header里 + * @return + */ + @Bean + public ConcurrentKafkaListenerContainerFactory filterContainerFactory(@Value("${austin.business.tagId.key}") String tagIdKey, + @Value("${austin.business.tagId.value}") String tagIdValue) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory); + factory.setAckDiscarded(true); + + factory.setRecordFilterStrategy(consumerRecord -> { + if (Optional.ofNullable(consumerRecord.value()).isPresent()) { + for (Header header : consumerRecord.headers()) { + if (header.key().equals(tagIdKey) && new String(header.value()).equals(new String(tagIdValue.getBytes(StandardCharsets.UTF_8)))) { + return false; + } + } + } + //返回true将会被丢弃 + return true; + }); + return factory; + } } diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java index e95ccda..ed01385 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java @@ -36,6 +36,7 @@ public class SendMqAction implements BusinessProcess { @Value("${austin.business.tagId.value}") private String tagId; + @Override public void process(ProcessContext context) { SendTaskModel sendTaskModel = context.getProcessModel(); diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java b/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java index 75710cd..4a54da7 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java @@ -9,6 +9,8 @@ public class AustinFlinkConstant { /** * Kafka 配置信息 * TODO 使用前配置kafka broker ip:port + * (真实网络ip,这里不能用配置的hosts,看语雀文档得到真实ip) + * (如果想要自己监听到所有的消息,改掉groupId) */ public static final String GROUP_ID = "austinLogGroup"; public static final String TOPIC_NAME = "austinLog"; @@ -17,6 +19,7 @@ public class AustinFlinkConstant { /** * redis 配置 * TODO 使用前配置redis ip:port + * (真实网络ip,这里不能用配置的hosts,看语雀文档得到真实ip) */ public static final String REDIS_IP = "ip"; public static final String REDIS_PORT = "port"; diff --git a/austin-support/pom.xml b/austin-support/pom.xml index 96ae2e0..afc9ab8 100644 --- a/austin-support/pom.xml +++ b/austin-support/pom.xml @@ -93,6 +93,12 @@ com.aliyun alibaba-dingtalk-service-sdk + + + org.springframework.amqp + spring-rabbit + + \ No newline at end of file diff --git a/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java b/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java index 3cf93fd..e31b81e 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java +++ b/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java @@ -10,5 +10,6 @@ public interface MessageQueuePipeline { String EVENT_BUS = "eventBus"; String KAFKA = "kafka"; String ROCKET_MQ = "rocketMq"; + String RABBIT_MQ = "rabbitMq"; } diff --git a/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java b/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java new file mode 100644 index 0000000..661f4b7 --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java @@ -0,0 +1,45 @@ +package com.java3y.austin.support.mq.rabbit; + +import com.java3y.austin.support.constans.MessageQueuePipeline; +import com.java3y.austin.support.mq.SendMqService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + + +/** + * @Autor xzcawl + * @Date 2022/7/15 17:29 + */ +@Slf4j +@Service +@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.RABBIT_MQ) +public class RabbitSendMqServiceImpl implements SendMqService { + + @Autowired + private RabbitTemplate rabbitTemplate; + + @Value("${austin.rabbitmq.topic.name}") + private String confTopic; + + @Value("${austin.rabbitmq.exchange.name}") + private String exchangeName; + + + @Override + public void send(String topic, String jsonValue, String tagId) { + if (topic.equals(confTopic)) { + rabbitTemplate.convertAndSend(exchangeName, confTopic, jsonValue); + } else { + log.error("RabbitSendMqServiceImpl send topic error! topic:{},confTopic:{}", topic, confTopic); + } + } + + @Override + public void send(String topic, String jsonValue) { + send(topic, jsonValue, null); + } +} diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index ced6182..f966799 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -55,9 +55,24 @@ spring.kafka.consumer.auto.offset.reset=earliest spring.kafka.consumer.auto-commit-interval=1000 spring.kafka.consumer.enable-auto-commit=true -# TODO austin support kafka tag filter,if you need, replace tagIdValue ,eg:com.java3y.austin.yyy -austin.business.tagId.key=kafka_tag_id -austin.business.tagId.value=com.java3y.austin.3y +##################### Rabbit properties ##################### +server.port=8080 +spring.application.name=cl +#RabbitMq所在服务器IP +spring.rabbitmq.host=127.0.0.1 +#连接端口号 +spring.rabbitmq.port=5672 +#用户名 +spring.rabbitmq.username=root +#用户密码 +spring.rabbitmq.password=123456 +# 开启发送确认 +spring.rabbitmq.publisher-confirm-type=correlated +# 开启发送失败退回 +spring.rabbitmq.publisher-returns=true +spring.rabbitmq.virtual-host=/ +austin.rabbitmq.topic.name=austinRabbit +austin.rabbitmq.exchange.name=austin.point ##################### redis properties ##################### spring.redis.host=${austin-redis-ip} @@ -70,6 +85,11 @@ austin.business.recall.topic.name=austinRecall austin.business.recall.group.name=recallGroupId austin.business.log.topic.name=austinLog austin.business.graylog.ip=${austin-grayLog-ip} + +# TODO kafka tag filter,if you need, replace tagIdValue ,eg:com.java3y.austin.yyy +austin.business.tagId.key=kafka_tag_id +austin.business.tagId.value=com.java3y.austin.3y + # TODO if windows os and need upload file to send message ,replace path ! austin.business.upload.crowd.path=/Users/3y/temp diff --git a/austin-web/src/main/resources/local.properties b/austin-web/src/main/resources/local.properties index 8e6f9f2..807c7f2 100644 --- a/austin-web/src/main/resources/local.properties +++ b/austin-web/src/main/resources/local.properties @@ -1,6 +1,6 @@ discardMsgIds = [] deduplicationRule = {"deduplication_10":{"num":1,"time":300},"deduplication_20":{"num":5}} -emailAccount =[{"email_10":{"host":"smtp.qq.com","port":465,"user":"23423423@qq.com","pass":"23423432432423423","from":"234@qq.com","starttlsEnable":true,"auth":true,"sslEnable":true}},{"email_20":{"host":"smtp.163.com","port":465,"user":"22222@163.com","pass":"23432423","from":"234324324234@163.com","starttlsEnable":false,"auth":true,"sslEnable":true}}] +emailAccount = [{"email_10":{"host":"smtp.qq.com","port":465,"user":"23423432@qq.com","pass":"234324324","from":"123123@qq.com","starttlsEnable":"true","auth":true,"sslEnable":true}},{"email_20":{"host":"smtp.163.com","port":465,"user":"23423423@163.com","pass":"234234324","from":"112312312@163.com","starttlsEnable":"false","auth":true,"sslEnable":true}}] smsAccount = [{"sms_10":{"url":"sms.tencentcloudapi.com","region":"ap-guangzhou","secretId":"234234","secretKey":"234324324","smsSdkAppId":"2343242","templateId":"234234","signName":"Java3y公众号","supplierId":10,"supplierName":"腾讯云"}},{"sms_20":{"url":"https://sms.yunpian.com/v2/sms/tpl_batch_send.json","apikey":"23423432","tpl_id":"23423432","supplierId":20,"supplierName":"云片"}}] enterpriseWechatAccount = [{"enterprise_wechat_10":{"corpId":"23423423","corpSecret":"-234324234","agentId":1000002,"token":"234234","aesKey":"23423423"}}] dingDingRobotAccount = [{"ding_ding_robot_10":{"secret":"234324324324","webhook":"https://oapi.dingtalk.com/robot/send?access_token=8d03b68d081f732343243242343247328b0c3003d164715d2c6c6e56"}}] diff --git a/pom.xml b/pom.xml index 49b1152..ea836cb 100644 --- a/pom.xml +++ b/pom.xml @@ -153,6 +153,13 @@ ${flink.version} + + + org.apache.flink + flink-connector-rabbitmq + 1.15.1 + + com.github.binarywang @@ -187,6 +194,7 @@ alibaba-dingtalk-service-sdk 2.0.0 +