mq 插拔式 设计

master
3y 3 years ago
parent 444eb05c4e
commit c46717d3ea

@ -0,0 +1,37 @@
package com.java3y.austin.handler.receiver.eventbus;
import com.google.common.eventbus.Subscribe;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.receiver.service.ConsumeService;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import com.java3y.austin.support.domain.MessageTemplate;
import com.java3y.austin.support.mq.eventbus.EventBusListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author 3y
*/
@Component
@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.EVENT_BUS)
public class EventBusReceiver implements EventBusListener {
@Autowired
private ConsumeService consumeService;
@Override
@Subscribe
public void consume(List<TaskInfo> lists) {
consumeService.consume2Send(lists);
}
@Override
@Subscribe
public void recall(MessageTemplate messageTemplate) {
consumeService.consume2recall(messageTemplate);
}
}

@ -1,4 +1,4 @@
package com.java3y.austin.handler.receiver;
package com.java3y.austin.handler.receiver.kafka;
import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON;
@ -9,13 +9,16 @@ import com.java3y.austin.common.enums.AnchorState;
import com.java3y.austin.handler.handler.HandlerHolder;
import com.java3y.austin.handler.pending.Task;
import com.java3y.austin.handler.pending.TaskPendingHolder;
import com.java3y.austin.handler.receiver.service.ConsumeService;
import com.java3y.austin.handler.utils.GroupIdMappingUtils;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import com.java3y.austin.support.domain.MessageTemplate;
import com.java3y.austin.support.utils.LogUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Scope;
import org.springframework.kafka.annotation.KafkaListener;
@ -33,21 +36,10 @@ import java.util.Optional;
@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.KAFKA)
public class Receiver {
private static final String LOG_BIZ_TYPE = "Receiver#consumer";
private static final String LOG_BIZ_RECALL_TYPE = "Receiver#recall";
@Autowired
private ApplicationContext context;
@Autowired
private TaskPendingHolder taskPendingHolder;
@Autowired
private LogUtils logUtils;
@Autowired
private HandlerHolder handlerHolder;
private ConsumeService consumeService;
/**
*
* @param consumerRecord
@ -60,16 +52,11 @@ public class Receiver {
List<TaskInfo> taskInfoLists = JSON.parseArray(kafkaMessage.get(), TaskInfo.class);
String messageGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator()));
/**
*
*/
if (topicGroupId.equals(messageGroupId)) {
for (TaskInfo taskInfo : taskInfoLists) {
logUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), AnchorInfo.builder().ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build());
Task task = context.getBean(Task.class).setTaskInfo(taskInfo);
taskPendingHolder.route(topicGroupId).execute(task);
}
consumeService.consume2Send(taskInfoLists);
}
}
}
@ -83,8 +70,7 @@ public class Receiver {
Optional<String> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if(kafkaMessage.isPresent()){
MessageTemplate messageTemplate = JSON.parseObject(kafkaMessage.get(), MessageTemplate.class);
logUtils.print(LogParam.builder().bizType(LOG_BIZ_RECALL_TYPE).object(messageTemplate).build());
handlerHolder.route(messageTemplate.getSendChannel()).recall(messageTemplate);
consumeService.consume2recall(messageTemplate);
}
}
}

@ -1,7 +1,9 @@
package com.java3y.austin.handler.receiver;
package com.java3y.austin.handler.receiver.kafka;
import com.java3y.austin.handler.utils.GroupIdMappingUtils;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
@ -18,7 +20,9 @@ import java.util.List;
* @date 2021/12/4
*/
@Service
@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.KAFKA)
public class ReceiverStart {
@Autowired
private ApplicationContext context;

@ -0,0 +1,32 @@
package com.java3y.austin.handler.receiver.service;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.support.domain.MessageTemplate;
import java.util.List;
/**
*
*
* @author 3y
*/
public interface ConsumeService {
/**
* MQ
*
* @param taskInfoLists
*/
void consume2Send(List<TaskInfo> taskInfoLists);
/**
* MQ
*
* @param messageTemplate
*/
void consume2recall(MessageTemplate messageTemplate);
}

@ -0,0 +1,56 @@
package com.java3y.austin.handler.receiver.service.impl;
import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.common.domain.AnchorInfo;
import com.java3y.austin.common.domain.LogParam;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.AnchorState;
import com.java3y.austin.handler.handler.HandlerHolder;
import com.java3y.austin.handler.pending.Task;
import com.java3y.austin.handler.pending.TaskPendingHolder;
import com.java3y.austin.handler.receiver.service.ConsumeService;
import com.java3y.austin.handler.utils.GroupIdMappingUtils;
import com.java3y.austin.support.domain.MessageTemplate;
import com.java3y.austin.support.utils.LogUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @author 3y
*/
@Service
public class ConsumeServiceImpl implements ConsumeService {
private static final String LOG_BIZ_TYPE = "Receiver#consumer";
private static final String LOG_BIZ_RECALL_TYPE = "Receiver#recall";
@Autowired
private ApplicationContext context;
@Autowired
private TaskPendingHolder taskPendingHolder;
@Autowired
private LogUtils logUtils;
@Autowired
private HandlerHolder handlerHolder;
@Override
public void consume2Send(List<TaskInfo> taskInfoLists) {
String topicGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator()));
for (TaskInfo taskInfo : taskInfoLists) {
logUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), AnchorInfo.builder().ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build());
Task task = context.getBean(Task.class).setTaskInfo(taskInfo);
taskPendingHolder.route(topicGroupId).execute(task);
}
}
@Override
public void consume2recall(MessageTemplate messageTemplate) {
logUtils.print(LogParam.builder().bizType(LOG_BIZ_RECALL_TYPE).object(messageTemplate).build());
handlerHolder.route(messageTemplate.getSendChannel()).recall(messageTemplate);
}
}

@ -4,10 +4,14 @@ import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.google.common.base.Throwables;
import com.google.common.eventbus.EventBus;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.RespStatusEnum;
import com.java3y.austin.common.vo.BasicResultVO;
import com.java3y.austin.service.api.enums.BusinessCode;
import com.java3y.austin.service.api.impl.domain.SendTaskModel;
import com.java3y.austin.support.mq.SendMqService;
import com.java3y.austin.support.mq.eventbus.EventBusListener;
import com.java3y.austin.support.pipeline.BusinessProcess;
import com.java3y.austin.support.pipeline.ProcessContext;
import com.java3y.austin.support.utils.KafkaUtils;
@ -16,6 +20,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @author 3y
* MQ
@ -24,14 +30,17 @@ import org.springframework.stereotype.Service;
@Service
public class SendMqAction implements BusinessProcess<SendTaskModel> {
@Autowired
private KafkaUtils kafkaUtils;
private SendMqService sendMqService;
@Value("${austin.business.topic.name}")
private String sendMessageTopic;
@Value("${austin.business.recall.topic.name}")
private String austinRecall;
@Value("${austin.business.tagId.value}")
private String tagId;
@Override
public void process(ProcessContext<SendTaskModel> context) {
@ -39,10 +48,10 @@ public class SendMqAction implements BusinessProcess<SendTaskModel> {
try {
if (BusinessCode.COMMON_SEND.getCode().equals(context.getCode())) {
String message = JSON.toJSONString(sendTaskModel.getTaskInfo(), new SerializerFeature[]{SerializerFeature.WriteClassName});
kafkaUtils.send(sendMessageTopic, message);
sendMqService.send(sendMessageTopic, message, tagId);
} else if (BusinessCode.RECALL.getCode().equals(context.getCode())) {
String message = JSON.toJSONString(sendTaskModel.getMessageTemplate(), new SerializerFeature[]{SerializerFeature.WriteClassName});
kafkaUtils.send(austinRecall, message);
sendMqService.send(austinRecall, message, tagId);
}
} catch (Exception e) {
context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR));
@ -50,4 +59,5 @@ public class SendMqAction implements BusinessProcess<SendTaskModel> {
, JSON.toJSONString(CollUtil.getFirst(sendTaskModel.getTaskInfo().listIterator())));
}
}
}

@ -0,0 +1,14 @@
package com.java3y.austin.support.constans;
/**
*
*
* @author 3y
*/
public interface MessageQueuePipeline {
String EVENT_BUS = "eventBus";
String KAFKA = "kafka";
String ROCKET_MQ = "rocketMq";
}

@ -0,0 +1,27 @@
package com.java3y.austin.support.mq;
/**
* @author 3y
*
*/
public interface SendMqService {
/**
*
*
* @param topic
* @param jsonValue
* @param tagId
*/
void send(String topic, String jsonValue, String tagId);
/**
*
*
* @param topic
* @param jsonValue
*/
void send(String topic, String jsonValue);
}

@ -0,0 +1,27 @@
package com.java3y.austin.support.mq.eventbus;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.support.domain.MessageTemplate;
import java.util.List;
/**
* @author 3y
*
*/
public interface EventBusListener {
/**
*
* @param lists
*/
void consume(List<TaskInfo> lists);
/**
*
* @param messageTemplate
*/
void recall(MessageTemplate messageTemplate);
}

@ -0,0 +1,52 @@
package com.java3y.austin.support.mq.eventbus;
import com.alibaba.fastjson.JSON;
import com.google.common.eventbus.EventBus;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import com.java3y.austin.support.domain.MessageTemplate;
import com.java3y.austin.support.mq.SendMqService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
/**
* @author 3y
* EventBus
*/
@Slf4j
@Service
@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.EVENT_BUS)
public class EventBusSendMqServiceImpl implements SendMqService {
private EventBus eventBus = new EventBus();
@Autowired
private EventBusListener eventBusListener;
@Value("${austin.business.topic.name}")
private String sendTopic;
@Value("${austin.business.recall.topic.name}")
private String recallTopic;
/**
* tagId
* @param topic
* @param jsonValue
* @param tagId
*/
@Override
public void send(String topic, String jsonValue, String tagId) {
eventBus.register(eventBusListener);
if (topic.equals(sendTopic)) {
eventBus.post(JSON.parseArray(jsonValue, TaskInfo.class));
} else if (topic.equals(recallTopic)) {
eventBus.post(JSON.parseObject(jsonValue, MessageTemplate.class));
}
}
@Override
public void send(String topic, String jsonValue) {
send(topic, jsonValue, null);
}
}

@ -0,0 +1,50 @@
package com.java3y.austin.support.mq.kafka;
import cn.hutool.core.util.StrUtil;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import com.java3y.austin.support.mq.SendMqService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
/**
* @author 3y
* kafka
*/
@Slf4j
@Service
@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.KAFKA)
public class KafkaSendMqServiceImpl implements SendMqService {
@Autowired
private KafkaTemplate kafkaTemplate;
@Value("${austin.business.tagId.key}")
private String tagIdKey;
@Override
public void send(String topic, String jsonValue, String tagId) {
if (StrUtil.isNotBlank(tagId)) {
List<Header> headers = Arrays.asList(new RecordHeader(tagIdKey, tagId.getBytes(StandardCharsets.UTF_8)));
kafkaTemplate.send(new ProducerRecord(topic, null, null, null, jsonValue, headers));
} else {
kafkaTemplate.send(topic, jsonValue);
}
}
@Override
public void send(String topic, String jsonValue) {
send(topic, jsonValue, null);
}
}

@ -2,21 +2,27 @@
# TODO please replace 【must】 config value
# TODO please replace 【must】 config value
# todo [database] ip/port/username/password 【must】
austin-database-ip=
austin-database-port=
austin-database-username=
austin-database-password=
# todo [kafka] ip/port【must】
austin-kafka-ip=
austin-kafka-port=
# todo [redis] ip/port/password【must】
austin-redis-ip=
austin-redis-port=
austin-redis-password=
# TODO kafka/eventbus
austin-mq-pipeline=eventbus
# todo [kafka] ip/port【optional】, if austin-mq-pipeline=kafka 【must】
austin-kafka-ip=
austin-kafka-port=
# todo [xxl-job] switch/ip/port/【optional】
xxl-job.enabled=false
austin-xxl-job-ip=127.0.0.1
@ -49,6 +55,10 @@ spring.kafka.consumer.auto.offset.reset=earliest
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.enable-auto-commit=true
# TODO austin support kafka tag filter,if you need, replace tagIdValue ,eg:com.java3y.austin.yyy
austin.business.tagId.key=kafka_tag_id
austin.business.tagId.value=com.java3y.austin.3y
##################### redis properties #####################
spring.redis.host=${austin-redis-ip}
spring.redis.port=${austin-redis-port}

Loading…
Cancel
Save