parent
7c1feb607d
commit
0de73b0da1
@ -0,0 +1,15 @@
|
||||
package com.java3y.austin.constant;
|
||||
|
||||
|
||||
/**
|
||||
* 基础的常量信息
|
||||
*/
|
||||
public class AustinConstant {
|
||||
|
||||
/**
|
||||
* boolean转换
|
||||
*/
|
||||
public final static Integer TRUE = 1;
|
||||
public final static Integer FALSE = 0;
|
||||
|
||||
}
|
@ -0,0 +1,8 @@
|
||||
package com.java3y.austin.dto;
|
||||
|
||||
/**
|
||||
* 发送内容的模型
|
||||
* (不同的渠道会有不同的消息体)
|
||||
*/
|
||||
public class ContentModel {
|
||||
}
|
@ -0,0 +1,6 @@
|
||||
package com.java3y.austin.dto;
|
||||
|
||||
public class EmailContentModel extends ContentModel {
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,6 @@
|
||||
package com.java3y.austin.dto;
|
||||
|
||||
public class ImContentModel extends ContentModel {
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,5 @@
|
||||
package com.java3y.austin.dto;
|
||||
|
||||
public class MiniProgramContentModel extends ContentModel {
|
||||
|
||||
}
|
@ -0,0 +1,5 @@
|
||||
package com.java3y.austin.dto;
|
||||
|
||||
public class OfficialAccountsContentModel extends ContentModel {
|
||||
|
||||
}
|
@ -0,0 +1,5 @@
|
||||
package com.java3y.austin.dto;
|
||||
|
||||
public class PushContentModel extends ContentModel {
|
||||
|
||||
}
|
@ -0,0 +1,40 @@
|
||||
package com.java3y.austin;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.java3y.austin.config.TaskInfoParseConfig;
|
||||
import com.java3y.austin.domain.TaskInfo;
|
||||
import com.java3y.austin.handler.SmsHandler;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* @author 3y
|
||||
* 消费MQ的消息
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class Receiver {
|
||||
|
||||
@Autowired
|
||||
private SmsHandler smsHandler;
|
||||
|
||||
@KafkaListener(topics = {"austin"}, groupId = "sms")
|
||||
public void consumer(ConsumerRecord<?, String> consumerRecord) {
|
||||
Optional<String> kafkaMessage = Optional.ofNullable(consumerRecord.value());
|
||||
if (kafkaMessage.isPresent()) {
|
||||
List<TaskInfo> lists = JSON.parseArray(kafkaMessage.get(), TaskInfo.class);
|
||||
for (TaskInfo taskInfo : lists) {
|
||||
smsHandler.doHandler(taskInfo);
|
||||
}
|
||||
log.info("receiver message:{}", JSON.toJSONString(lists));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package com.java3y.austin.pojo;
|
||||
package com.java3y.austin.domain;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
@ -1,17 +1,131 @@
|
||||
package com.java3y.austin.action;
|
||||
|
||||
import cn.hutool.core.util.ReflectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.java3y.austin.constant.AustinConstant;
|
||||
import com.java3y.austin.dto.ContentModel;
|
||||
import com.java3y.austin.dto.SmsContentModel;
|
||||
import com.java3y.austin.enums.ChannelType;
|
||||
import com.java3y.austin.enums.RespStatusEnum;
|
||||
import com.java3y.austin.dao.MessageTemplateDao;
|
||||
import com.java3y.austin.domain.MessageParam;
|
||||
import com.java3y.austin.domain.MessageTemplate;
|
||||
import com.java3y.austin.domain.SendTaskModel;
|
||||
import com.java3y.austin.pipeline.BusinessProcess;
|
||||
import com.java3y.austin.pipeline.ProcessContext;
|
||||
import com.java3y.austin.domain.TaskInfo;
|
||||
import com.java3y.austin.utils.ContentHolderUtil;
|
||||
import com.java3y.austin.utils.TaskInfoUtils;
|
||||
import com.java3y.austin.vo.BasicResultVO;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* @author 3y
|
||||
* @date 2021/11/22
|
||||
* @description 拼装参数
|
||||
*/
|
||||
@Slf4j
|
||||
public class AssembleAction implements BusinessProcess {
|
||||
|
||||
@Autowired
|
||||
private MessageTemplateDao messageTemplateDao;
|
||||
|
||||
@Override
|
||||
public void process(ProcessContext context) {
|
||||
SendTaskModel sendTaskModel = (SendTaskModel) context.getProcessModel();
|
||||
Long messageTemplateId = sendTaskModel.getMessageTemplateId();
|
||||
|
||||
try {
|
||||
Optional<MessageTemplate> messageTemplate = messageTemplateDao.findById(messageTemplateId);
|
||||
if (!messageTemplate.isPresent() || messageTemplate.get().getIsDeleted().equals(AustinConstant.TRUE)) {
|
||||
context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.TEMPLATE_NOT_FOUND));
|
||||
return;
|
||||
}
|
||||
|
||||
List<TaskInfo> taskInfos = assembleTaskInfo(sendTaskModel, messageTemplate.get());
|
||||
sendTaskModel.setTaskInfo(taskInfos);
|
||||
} catch (Exception e) {
|
||||
context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR));
|
||||
log.error("assemble task fail! templateId:{}, e:{}", messageTemplateId, Throwables.getStackTraceAsString(e));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 组装 TaskInfo 任务消息
|
||||
*
|
||||
* @param sendTaskModel
|
||||
* @param messageTemplate
|
||||
*/
|
||||
private List<TaskInfo> assembleTaskInfo(SendTaskModel sendTaskModel, MessageTemplate messageTemplate) {
|
||||
List<MessageParam> messageParamList = sendTaskModel.getMessageParamList();
|
||||
List<TaskInfo> taskInfoList = new ArrayList<>();
|
||||
|
||||
for (MessageParam messageParam : messageParamList) {
|
||||
TaskInfo taskInfo = TaskInfo.builder()
|
||||
.messageTemplateId(messageTemplate.getId())
|
||||
.businessId(TaskInfoUtils.generateBusinessId(messageTemplate.getId(), messageTemplate.getTemplateType()))
|
||||
.receiver(new HashSet<>(Arrays.asList(messageParam.getReceiver().split(String.valueOf(StrUtil.C_COMMA)))))
|
||||
.idType(messageTemplate.getIdType())
|
||||
.sendChannel(messageTemplate.getSendChannel())
|
||||
.templateType(messageTemplate.getTemplateType())
|
||||
.msgType(messageTemplate.getMsgType())
|
||||
.sendAccount(messageTemplate.getSendAccount())
|
||||
.contentModel(getContentModelValue(messageTemplate, messageParam))
|
||||
.deduplicationTime(messageTemplate.getDeduplicationTime())
|
||||
.isNightShield(messageTemplate.getIsNightShield()).build();
|
||||
|
||||
taskInfoList.add(taskInfo);
|
||||
}
|
||||
|
||||
return taskInfoList;
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 获取 contentModel,替换占位符信息
|
||||
*/
|
||||
private static ContentModel getContentModelValue(MessageTemplate messageTemplate, MessageParam messageParam) {
|
||||
Integer sendChannel = messageTemplate.getSendChannel();
|
||||
Map<String, String> variables = messageParam.getVariables();
|
||||
JSONObject jsonObject = JSON.parseObject(messageTemplate.getMsgContent());
|
||||
Class contentModelClass = ChannelType.getChanelModelClassByCode(sendChannel);
|
||||
|
||||
|
||||
/**
|
||||
* 反射获取得到不同的渠道对应的值
|
||||
*/
|
||||
Field[] fields = ReflectUtil.getFields(contentModelClass);
|
||||
ContentModel contentModel = (ContentModel) ReflectUtil.newInstance(contentModelClass);
|
||||
for (Field field : fields) {
|
||||
String originValue = jsonObject.getString(field.getName());
|
||||
|
||||
if (StrUtil.isNotBlank(originValue)) {
|
||||
String resultValue = ContentHolderUtil.replacePlaceHolder(originValue, variables);
|
||||
ReflectUtil.setFieldValue(contentModel, field, resultValue);
|
||||
}
|
||||
}
|
||||
|
||||
return contentModel;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
MessageTemplate messageTemplate = MessageTemplate.builder().sendChannel(ChannelType.SMS.getCode()).msgContent("{\"url\":\"www.baidu.com/{$urlParam}\",\"content\":\"{$contentValue}\"}").build();
|
||||
HashMap<String, String> map = new HashMap<>();
|
||||
map.put("urlParam", "2222");
|
||||
map.put("contentValue", "3333");
|
||||
MessageParam messageParam = new MessageParam().setVariables(map);
|
||||
|
||||
ContentModel contentModelValue = getContentModelValue(messageTemplate, messageParam);
|
||||
System.out.println(JSON.toJSONString(contentModelValue));
|
||||
}
|
||||
}
|
||||
|
@ -1,17 +0,0 @@
|
||||
package com.java3y.austin.action;
|
||||
|
||||
import com.java3y.austin.domain.SendTaskModel;
|
||||
import com.java3y.austin.pipeline.BusinessProcess;
|
||||
import com.java3y.austin.pipeline.ProcessContext;
|
||||
|
||||
/**
|
||||
* @author 3y
|
||||
* @date 2021/11/22
|
||||
* @description 参数校验
|
||||
*/
|
||||
public class ParamAction implements BusinessProcess {
|
||||
@Override
|
||||
public void process(ProcessContext context) {
|
||||
SendTaskModel sendTaskModel = (SendTaskModel) context.getProcessModel();
|
||||
}
|
||||
}
|
@ -0,0 +1,34 @@
|
||||
package com.java3y.austin.action;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import com.java3y.austin.enums.RespStatusEnum;
|
||||
import com.java3y.austin.domain.MessageParam;
|
||||
import com.java3y.austin.domain.SendTaskModel;
|
||||
import com.java3y.austin.pipeline.BusinessProcess;
|
||||
import com.java3y.austin.pipeline.ProcessContext;
|
||||
import com.java3y.austin.vo.BasicResultVO;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author 3y
|
||||
* @date 2021/11/22
|
||||
* @description 前置参数校验
|
||||
*/
|
||||
@Slf4j
|
||||
public class PreParamAction implements BusinessProcess {
|
||||
|
||||
@Override
|
||||
public void process(ProcessContext context) {
|
||||
SendTaskModel sendTaskModel = (SendTaskModel) context.getProcessModel();
|
||||
|
||||
Long messageTemplateId = sendTaskModel.getMessageTemplateId();
|
||||
List<MessageParam> messageParamList = sendTaskModel.getMessageParamList();
|
||||
|
||||
if (messageTemplateId == null || CollUtil.isEmpty(messageParamList)) {
|
||||
context.setNeedBreak(true);
|
||||
context.setResponse(BasicResultVO.fail(RespStatusEnum.CLIENT_BAD_PARAMETERS));
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,42 @@
|
||||
package com.java3y.austin.action;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.serializer.SerializerFeature;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.java3y.austin.domain.SendTaskModel;
|
||||
import com.java3y.austin.domain.TaskInfo;
|
||||
import com.java3y.austin.enums.RespStatusEnum;
|
||||
import com.java3y.austin.pipeline.BusinessProcess;
|
||||
import com.java3y.austin.pipeline.ProcessContext;
|
||||
import com.java3y.austin.pipeline.ProcessModel;
|
||||
import com.java3y.austin.vo.BasicResultVO;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
|
||||
/**
|
||||
* @author 3y
|
||||
* 将消息发送到MQ
|
||||
*/
|
||||
@Slf4j
|
||||
public class SendMqAction implements BusinessProcess {
|
||||
|
||||
@Autowired
|
||||
private KafkaTemplate kafkaTemplate;
|
||||
|
||||
@Value("${austin.topic.name}")
|
||||
private String topicName;
|
||||
|
||||
@Override
|
||||
public void process(ProcessContext context) {
|
||||
SendTaskModel sendTaskModel = (SendTaskModel) context.getProcessModel();
|
||||
try {
|
||||
kafkaTemplate.send(topicName, JSON.toJSONString(sendTaskModel.getTaskInfo(),
|
||||
new SerializerFeature[] {SerializerFeature.WriteClassName}));
|
||||
} catch (Exception e) {
|
||||
context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR));
|
||||
log.error("send kafka fail! e:{}", Throwables.getStackTraceAsString(e));
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,84 @@
|
||||
package com.java3y.austin.config;
|
||||
|
||||
import com.java3y.austin.action.AssembleAction;
|
||||
import com.java3y.austin.action.PreParamAction;
|
||||
import com.java3y.austin.action.SendMqAction;
|
||||
import com.java3y.austin.enums.BusinessCode;
|
||||
import com.java3y.austin.pipeline.BusinessProcess;
|
||||
import com.java3y.austin.pipeline.ProcessController;
|
||||
import com.java3y.austin.pipeline.ProcessTemplate;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@Configuration
|
||||
public class PipelineConfig {
|
||||
|
||||
|
||||
/**
|
||||
* 普通发送执行流程
|
||||
* 1. 参数校验
|
||||
* 2. 组装参数
|
||||
* 3. 发送消息至MQ
|
||||
* @return
|
||||
*/
|
||||
@Bean("commonSendTemplate")
|
||||
public ProcessTemplate commonSendTemplate() {
|
||||
ProcessTemplate processTemplate = new ProcessTemplate();
|
||||
ArrayList<BusinessProcess> processList = new ArrayList<>();
|
||||
|
||||
processList.add(preParamAction());
|
||||
processList.add(assembleAction());
|
||||
processList.add(sendMqAction());
|
||||
|
||||
processTemplate.setProcessList(processList);
|
||||
return processTemplate;
|
||||
}
|
||||
|
||||
/**
|
||||
* pipeline流程控制器
|
||||
* 目前暂定只有 普通发送的流程
|
||||
* 后续扩展则加BusinessCode和ProcessTemplate
|
||||
* @return
|
||||
*/
|
||||
@Bean
|
||||
public ProcessController processController() {
|
||||
ProcessController processController = new ProcessController();
|
||||
Map<String, ProcessTemplate> templateConfig = new HashMap<>();
|
||||
templateConfig.put(BusinessCode.COMMON_SEND.getCode(), commonSendTemplate());
|
||||
processController.setTemplateConfig(templateConfig);
|
||||
return processController;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 组装参数Action
|
||||
* @return
|
||||
*/
|
||||
@Bean
|
||||
public AssembleAction assembleAction() {
|
||||
return new AssembleAction();
|
||||
}
|
||||
|
||||
/**
|
||||
* 参数校验Action
|
||||
* @return
|
||||
*/
|
||||
@Bean
|
||||
public PreParamAction preParamAction() {
|
||||
return new PreParamAction();
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送消息至MQ的Action
|
||||
* @return
|
||||
*/
|
||||
@Bean
|
||||
public SendMqAction sendMqAction() {
|
||||
return new SendMqAction();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,59 @@
|
||||
package com.java3y.austin.utils;
|
||||
|
||||
import org.springframework.context.expression.MapAccessor;
|
||||
import org.springframework.expression.spel.support.StandardEvaluationContext;
|
||||
import org.springframework.util.PropertyPlaceholderHelper;
|
||||
|
||||
import java.text.MessageFormat;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author 3y
|
||||
* 内容占位符 替换
|
||||
*
|
||||
* austin占位符格式{$var}
|
||||
*/
|
||||
public class ContentHolderUtil {
|
||||
|
||||
// 占位符前缀
|
||||
private static final String PLACE_HOLDER_PREFIX = "{$";
|
||||
// 占位符后缀
|
||||
private static final String PLACE_HOLDER_ENDFIX = "}";
|
||||
|
||||
private static final StandardEvaluationContext evalutionContext;
|
||||
|
||||
private static PropertyPlaceholderHelper propertyPlaceholderHelper = new PropertyPlaceholderHelper(
|
||||
PLACE_HOLDER_PREFIX, PLACE_HOLDER_ENDFIX);
|
||||
|
||||
static {
|
||||
evalutionContext = new StandardEvaluationContext();
|
||||
evalutionContext.addPropertyAccessor(new MapAccessor());
|
||||
}
|
||||
|
||||
public static String replacePlaceHolder(final String template, final Map<String, String> paramMap) {
|
||||
String replacedPushContent = propertyPlaceholderHelper.replacePlaceholders(template,
|
||||
new CustomPlaceholderResolver(paramMap));
|
||||
return replacedPushContent;
|
||||
}
|
||||
|
||||
private static class CustomPlaceholderResolver implements PropertyPlaceholderHelper.PlaceholderResolver {
|
||||
private Map<String, String> paramMap;
|
||||
|
||||
public CustomPlaceholderResolver(Map<String, String> paramMap) {
|
||||
super();
|
||||
this.paramMap = paramMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String resolvePlaceholder(String placeholderName) {
|
||||
String value = paramMap.get(placeholderName);
|
||||
if (null == value) {
|
||||
String errorStr = MessageFormat.format("template:{} require param:{},but not exist! paramMap:{}",
|
||||
placeholderName, paramMap.toString());
|
||||
throw new IllegalArgumentException(errorStr);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,26 @@
|
||||
package com.java3y.austin.utils;
|
||||
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* 生成 消息推送的URL 工具类
|
||||
*
|
||||
* @author 3y
|
||||
*/
|
||||
public class TaskInfoUtils {
|
||||
|
||||
private static int TYPE_FLAG = 1000000;
|
||||
|
||||
/**
|
||||
* 生成BusinessId
|
||||
* 模板类型+模板ID+当天日期
|
||||
* (固定16位)
|
||||
*/
|
||||
public static Long generateBusinessId(Long templateId, Integer templateType) {
|
||||
Integer today = Integer.valueOf(DateUtil.format(new Date(), "yyyyMMdd"));
|
||||
return Long.valueOf(String.format("%d%s", templateType * TYPE_FLAG + templateId, today));
|
||||
}
|
||||
|
||||
}
|
@ -1,24 +1,24 @@
|
||||
package com.java3y.austin.controller;
|
||||
|
||||
import com.java3y.austin.kafkatest.UserLogProducer;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
@RestController
|
||||
public class KafkaTestController {
|
||||
|
||||
@Autowired
|
||||
private UserLogProducer userLogProducer;
|
||||
|
||||
/**
|
||||
* test insert
|
||||
*/
|
||||
@GetMapping("/kafka/insert")
|
||||
public String insert(String userId) {
|
||||
userLogProducer.sendLog(userId);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
//package com.java3y.austin.controller;
|
||||
//
|
||||
//import com.java3y.austin.kafkatest.UserLogProducer;
|
||||
//import org.springframework.beans.factory.annotation.Autowired;
|
||||
//import org.springframework.web.bind.annotation.GetMapping;
|
||||
//import org.springframework.web.bind.annotation.RestController;
|
||||
//
|
||||
//@RestController
|
||||
//public class KafkaTestController {
|
||||
//
|
||||
// @Autowired
|
||||
// private UserLogProducer userLogProducer;
|
||||
//
|
||||
// /**
|
||||
// * test insert
|
||||
// */
|
||||
// @GetMapping("/kafka/insert")
|
||||
// public String insert(String userId) {
|
||||
// userLogProducer.sendLog(userId);
|
||||
//
|
||||
// return null;
|
||||
// }
|
||||
//
|
||||
//}
|
||||
|
Loading…
Reference in new issue