add comment && ThreadPoolConfig && AfterParamCheckAction

master
3y 4 years ago
parent 1f54934806
commit d111618c96

@ -3,6 +3,7 @@ package com.java3y.austin.constant;
/**
*
* @author 3y
*/
public class AustinConstant {
@ -12,4 +13,10 @@ public class AustinConstant {
public final static Integer TRUE = 1;
public final static Integer FALSE = 0;
/**
*
*/
public final static String YYYYMMDD = "yyyyMMdd";
}

@ -3,6 +3,7 @@ package com.java3y.austin.dto;
/**
*
* ()
* @author 3y
*/
public class ContentModel {
}

@ -1,5 +1,8 @@
package com.java3y.austin.dto;
/**
* @author 3y
*/
public class EmailContentModel extends ContentModel {

@ -1,5 +1,8 @@
package com.java3y.austin.dto;
/**
* @author 3y
*/
public class ImContentModel extends ContentModel {

@ -1,5 +1,8 @@
package com.java3y.austin.dto;
/**
* @author 3y
*/
public class MiniProgramContentModel extends ContentModel {
}

@ -1,5 +1,8 @@
package com.java3y.austin.dto;
/**
* @author 3y
*/
public class OfficialAccountsContentModel extends ContentModel {
}

@ -1,5 +1,9 @@
package com.java3y.austin.dto;
/**
* @author 3y
*/
public class PushContentModel extends ContentModel {
}

@ -8,11 +8,14 @@ import lombok.ToString;
/**
*
* @author 3y
*/
@Getter
@ToString
@AllArgsConstructor
public enum ChannelType {
IM(10, "IM(站内信)", ImContentModel.class),
PUSH(20, "push(通知栏)", PushContentModel.class),
SMS(30, "sms(短信)", SmsContentModel.class),
@ -21,8 +24,13 @@ public enum ChannelType {
MINI_PROGRAM(60, "miniProgram(小程序)", MiniProgramContentModel.class),
;
/** 编码值 */
private Integer code;
/** 描述 */
private String description;
/** 内容模型Class */
private Class contentModelClass;

@ -7,6 +7,7 @@ import lombok.ToString;
/**
* ID
* @author 3y
*/
@Getter
@ToString

@ -6,6 +6,7 @@ import lombok.ToString;
/**
*
* @author 3y
*/
@Getter
@ToString

@ -4,6 +4,10 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
/**
*
* @author 3y
*/
@Getter
@ToString
@AllArgsConstructor

@ -4,6 +4,10 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
/**
*
* @author 3y
*/
@Getter
@ToString
@AllArgsConstructor

@ -1,12 +1,12 @@
package com.java3y.austin;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.config.TaskInfoParseConfig;
import com.java3y.austin.domain.TaskInfo;
import com.java3y.austin.handler.SmsHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@ -24,7 +24,7 @@ public class Receiver {
@Autowired
private SmsHandler smsHandler;
@KafkaListener(topics = {"austin"}, groupId = "sms")
@KafkaListener(topics = "#{'${austin.topic.name}'}", groupId = "austin")
public void consumer(ConsumerRecord<?, String> consumerRecord) {
Optional<String> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (kafkaMessage.isPresent()) {

@ -0,0 +1,52 @@
package com.java3y.austin.config;
import cn.hutool.core.thread.ExecutorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import java.util.concurrent.*;
/**
* 线
* @author 3y
*/
@Configuration
@EnableAsync
public class ThreadPoolConfig {
@Bean("smsThreadPool")
public static ExecutorService getSmsThreadPool() {
ThreadPoolExecutor threadPoolExecutor = ExecutorBuilder.create()
.setCorePoolSize(4)
.setMaxPoolSize(4)
.setKeepAliveTime(60)
.setWorkQueue(new LinkedBlockingQueue<>(1000))
.setHandler((r, executor) -> {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
}
})
.build();
return threadPoolExecutor;
}
@Bean("emailThreadPoll")
public static ExecutorService getEmailThreadPool() {
ThreadPoolExecutor threadPoolExecutor = ExecutorBuilder.create()
.setCorePoolSize(2)
.setMaxPoolSize(2)
.setKeepAliveTime(60)
.setWorkQueue(new LinkedBlockingQueue<>(1000))
.setHandler((r, executor) -> {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
}
})
.build();
return threadPoolExecutor;
}
}

@ -2,12 +2,19 @@ package com.java3y.austin.handler;
import com.java3y.austin.domain.TaskInfo;
/**
* @author 3y
* handler
*/
public interface Handler {
boolean doHandler(TaskInfo TaskInfo);
/**
* handler
*
* @param taskInfo
* @return
*/
boolean doHandler(TaskInfo taskInfo);
}

@ -15,43 +15,49 @@ import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author
* @author 3y
*/
@Component
public class SmsHandler implements Handler {
@Autowired
private SmsRecordDao smsRecordDao;
@Autowired
private SmsRecordDao smsRecordDao;
@Autowired
private SmsScript smsScript;
@Autowired
private SmsScript smsScript;
@Override
public boolean doHandler(TaskInfo taskInfo) {
@Override
public boolean doHandler(TaskInfo taskInfo) {
SmsParam smsParam = SmsParam.builder()
.phones(taskInfo.getReceiver())
.content(getSmsContent(taskInfo))
.messageTemplateId(taskInfo.getMessageTemplateId())
.supplierId(10)
.supplierName("腾讯云通知类消息渠道").build();
List<SmsRecord> recordList = smsScript.send(smsParam);
SmsContentModel smsContentModel = (SmsContentModel) taskInfo.getContentModel();
if (!CollUtil.isEmpty(recordList)) {
smsRecordDao.saveAll(recordList);
return true;
}
String resultContent;
if (StrUtil.isNotBlank(smsContentModel.getUrl())) {
resultContent = smsContentModel.getContent() + " " + smsContentModel.getUrl();
} else {
resultContent = smsContentModel.getContent();
return false;
}
SmsParam smsParam = SmsParam.builder()
.phones(taskInfo.getReceiver())
.content(resultContent)
.messageTemplateId(taskInfo.getMessageTemplateId())
.supplierId(10)
.supplierName("腾讯云通知类消息渠道").build();
List<SmsRecord> recordList = smsScript.send(smsParam);
if (CollUtil.isEmpty(recordList)) {
return false;
/**
*
* <p>
* PS:
* PS: TD退
*/
private String getSmsContent(TaskInfo taskInfo) {
SmsContentModel smsContentModel = (SmsContentModel) taskInfo.getContentModel();
if (StrUtil.isNotBlank(smsContentModel.getUrl())) {
return smsContentModel.getContent() + " " + smsContentModel.getUrl();
} else {
return smsContentModel.getContent();
}
}
smsRecordDao.saveAll(recordList);
return true;
}
}

@ -5,10 +5,15 @@ import com.java3y.austin.domain.SmsParam;
import java.util.List;
/**
*
* @author 3y
*/
public interface SmsScript {
/**
*
* @param smsParam
* @return
*/

@ -5,6 +5,7 @@ import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON;
import com.google.common.base.Throwables;
import com.java3y.austin.constant.AustinConstant;
import com.java3y.austin.domain.SmsRecord;
import com.java3y.austin.enums.SmsStatus;
import com.java3y.austin.domain.SmsParam;
@ -65,9 +66,7 @@ public class TencentSmsScript implements SmsScript {
public List<SmsRecord> send(SmsParam smsParam) {
try {
SmsClient client = init();
SendSmsRequest request = assembleReq(smsParam);
SendSmsResponse response = client.SendSms(request);
return assembleSmsRecord(smsParam,response);
@ -85,13 +84,14 @@ public class TencentSmsScript implements SmsScript {
}
List<SmsRecord> smsRecordList = new ArrayList<>();
for (SendStatus sendStatus : response.getSendStatusSet()) {
// 腾讯返回的电话号有前缀,这里取巧直接翻转获取手机号
String phone = new StringBuilder(new StringBuilder(sendStatus.getPhoneNumber())
.reverse().substring(0, PHONE_NUM)).reverse().toString();
SmsRecord smsRecord = SmsRecord.builder()
.sendDate(Integer.valueOf(DateUtil.format(new Date(), "yyyyMMdd")))
.sendDate(Integer.valueOf(DateUtil.format(new Date(), AustinConstant.YYYYMMDD)))
.messageTemplateId(smsParam.getMessageTemplateId())
.phone(Long.valueOf(phone))
.supplierId(smsParam.getSupplierId())

@ -0,0 +1,79 @@
package com.java3y.austin.action;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ReUtil;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.domain.SendTaskModel;
import com.java3y.austin.domain.TaskInfo;
import com.java3y.austin.enums.ChannelType;
import com.java3y.austin.enums.IdType;
import com.java3y.austin.enums.RespStatusEnum;
import com.java3y.austin.pipeline.BusinessProcess;
import com.java3y.austin.pipeline.ProcessContext;
import com.java3y.austin.vo.BasicResultVO;
import lombok.extern.slf4j.Slf4j;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author 3y
*
*
*/
@Slf4j
public class AfterParamCheckAction implements BusinessProcess {
public static final String PHONE_REGEX_EXP = "^((13[0-9])|(14[5,7,9])|(15[0-3,5-9])|(166)|(17[0-9])|(18[0-9])|(19[1,8,9]))\\d{8}$";
@Override
public void process(ProcessContext context) {
SendTaskModel sendTaskModel = (SendTaskModel) context.getProcessModel();
List<TaskInfo> taskInfo = sendTaskModel.getTaskInfo();
// 1. 过滤掉不合法的手机号
filterIllegalPhoneNum(taskInfo);
// 2.
if (CollUtil.isEmpty(taskInfo)) {
context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.CLIENT_BAD_PARAMETERS));
return;
}
}
/**
*
* @param taskInfo
*/
private void filterIllegalPhoneNum(List<TaskInfo> taskInfo) {
Integer idType = taskInfo.get(0).getIdType();
Integer sendChannel = taskInfo.get(0).getSendChannel();
if (IdType.PHONE.getCode().equals(idType) && ChannelType.SMS.getCode().equals(sendChannel)) {
Iterator<TaskInfo> iterator = taskInfo.iterator();
// 利用正则找出不合法的手机号
while (iterator.hasNext()) {
TaskInfo task = iterator.next();
Set<String> illegalPhone = task.getReceiver().stream()
.filter(phone -> !ReUtil.isMatch(PHONE_REGEX_EXP, phone))
.collect(Collectors.toSet());
if (CollUtil.isNotEmpty(illegalPhone)) {
task.getReceiver().removeAll(illegalPhone);
log.error("{} find illegal phone!{}", task.getMessageTemplateId(), JSON.toJSONString(illegalPhone));
}
if (CollUtil.isEmpty(task.getReceiver())) {
iterator.remove();
}
}
}
}
}

@ -69,6 +69,7 @@ public class AssembleAction implements BusinessProcess {
List<TaskInfo> taskInfoList = new ArrayList<>();
for (MessageParam messageParam : messageParamList) {
TaskInfo taskInfo = TaskInfo.builder()
.messageTemplateId(messageTemplate.getId())
.businessId(TaskInfoUtils.generateBusinessId(messageTemplate.getId(), messageTemplate.getTemplateType()))
@ -91,18 +92,21 @@ public class AssembleAction implements BusinessProcess {
/**
* contentModel,
* contentModelmsgContent
*/
private static ContentModel getContentModelValue(MessageTemplate messageTemplate, MessageParam messageParam) {
// 得到真正的ContentModel 类型
Integer sendChannel = messageTemplate.getSendChannel();
Class contentModelClass = ChannelType.getChanelModelClassByCode(sendChannel);
// 得到模板的 msgContent 和 入参
Map<String, String> variables = messageParam.getVariables();
JSONObject jsonObject = JSON.parseObject(messageTemplate.getMsgContent());
Class contentModelClass = ChannelType.getChanelModelClassByCode(sendChannel);
/**
*
*/
// 通过反射 组装出 contentModel
Field[] fields = ReflectUtil.getFields(contentModelClass);
ContentModel contentModel = (ContentModel) ReflectUtil.newInstance(contentModelClass);
for (Field field : fields) {
@ -116,16 +120,4 @@ public class AssembleAction implements BusinessProcess {
return contentModel;
}
public static void main(String[] args) {
MessageTemplate messageTemplate = MessageTemplate.builder().sendChannel(ChannelType.SMS.getCode()).msgContent("{\"url\":\"www.baidu.com/{$urlParam}\",\"content\":\"{$contentValue}\"}").build();
HashMap<String, String> map = new HashMap<>();
map.put("urlParam", "2222");
map.put("contentValue", "3333");
MessageParam messageParam = new MessageParam().setVariables(map);
ContentModel contentModelValue = getContentModelValue(messageTemplate, messageParam);
System.out.println(JSON.toJSONString(contentModelValue));
}
}

@ -1,6 +1,8 @@
package com.java3y.austin.action;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.java3y.austin.enums.RespStatusEnum;
import com.java3y.austin.domain.MessageParam;
import com.java3y.austin.domain.SendTaskModel;
@ -10,6 +12,7 @@ import com.java3y.austin.vo.BasicResultVO;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author 3y
@ -17,7 +20,7 @@ import java.util.List;
* @description
*/
@Slf4j
public class PreParamAction implements BusinessProcess {
public class PreParamCheckAction implements BusinessProcess {
@Override
public void process(ProcessContext context) {
@ -26,9 +29,21 @@ public class PreParamAction implements BusinessProcess {
Long messageTemplateId = sendTaskModel.getMessageTemplateId();
List<MessageParam> messageParamList = sendTaskModel.getMessageParamList();
// 没有传入 消息模板Id 和 messageParam
if (messageTemplateId == null || CollUtil.isEmpty(messageParamList)) {
context.setNeedBreak(true);
context.setResponse(BasicResultVO.fail(RespStatusEnum.CLIENT_BAD_PARAMETERS));
context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.CLIENT_BAD_PARAMETERS));
return;
}
// 过滤接收者为null的messageParam
List<MessageParam> resultMessageParamList = messageParamList.stream()
.filter(messageParam -> !StrUtil.isBlank(messageParam.getReceiver()))
.collect(Collectors.toList());
if (CollUtil.isEmpty(resultMessageParamList)) {
context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.CLIENT_BAD_PARAMETERS));
return;
}
sendTaskModel.setMessageParamList(resultMessageParamList);
}
}

@ -36,7 +36,8 @@ public class SendMqAction implements BusinessProcess {
new SerializerFeature[] {SerializerFeature.WriteClassName}));
} catch (Exception e) {
context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR));
log.error("send kafka fail! e:{}", Throwables.getStackTraceAsString(e));
log.error("send kafka fail! e:{},params:{}", Throwables.getStackTraceAsString(e)
, JSON.toJSONString(sendTaskModel.getTaskInfo().get(0)));
}
}
}

@ -1,7 +1,8 @@
package com.java3y.austin.config;
import com.java3y.austin.action.AfterParamCheckAction;
import com.java3y.austin.action.AssembleAction;
import com.java3y.austin.action.PreParamAction;
import com.java3y.austin.action.PreParamCheckAction;
import com.java3y.austin.action.SendMqAction;
import com.java3y.austin.enums.BusinessCode;
import com.java3y.austin.pipeline.BusinessProcess;
@ -14,6 +15,10 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
/**
* apipipeline
* @author 3y
*/
@Configuration
public class PipelineConfig {
@ -23,6 +28,7 @@ public class PipelineConfig {
* 1.
* 2.
* 3. MQ
*
* @return
*/
@Bean("commonSendTemplate")
@ -30,8 +36,9 @@ public class PipelineConfig {
ProcessTemplate processTemplate = new ProcessTemplate();
ArrayList<BusinessProcess> processList = new ArrayList<>();
processList.add(preParamAction());
processList.add(preParamCheckAction());
processList.add(assembleAction());
processList.add(afterParamCheckAction());
processList.add(sendMqAction());
processTemplate.setProcessList(processList);
@ -42,12 +49,13 @@ public class PipelineConfig {
* pipeline
*
* BusinessCodeProcessTemplate
*
* @return
*/
@Bean
public ProcessController processController() {
ProcessController processController = new ProcessController();
Map<String, ProcessTemplate> templateConfig = new HashMap<>();
Map<String, ProcessTemplate> templateConfig = new HashMap<>(4);
templateConfig.put(BusinessCode.COMMON_SEND.getCode(), commonSendTemplate());
processController.setTemplateConfig(templateConfig);
return processController;
@ -56,6 +64,7 @@ public class PipelineConfig {
/**
* Action
*
* @return
*/
@Bean
@ -64,16 +73,28 @@ public class PipelineConfig {
}
/**
* Action
* Action
*
* @return
*/
@Bean
public PreParamAction preParamAction() {
return new PreParamAction();
public PreParamCheckAction preParamCheckAction() {
return new PreParamCheckAction();
}
/**
* Action
*
* @return
*/
@Bean
public AfterParamCheckAction afterParamCheckAction() {
return new AfterParamCheckAction();
}
/**
* MQAction
*
* @return
*/
@Bean

@ -1,31 +0,0 @@
package com.java3y.austin.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
/**
* @author 3y
* @date 2021/11/22
*
*/
@Getter
@ToString
@AllArgsConstructor
public enum RequestType {
SINGLE(10, "请求接口为 single 类型"),
BATCH(20, "请求接口为 batch 类型");
/**
* code
*/
private Integer code;
/**
*
*/
private String description;
}

@ -8,6 +8,7 @@ import java.util.Map;
/**
*
* single
* @author 3y
*/
@Data
@Accessors(chain = true)

@ -13,8 +13,10 @@ import lombok.ToString;
@AllArgsConstructor
public enum BusinessCode {
/** 普通发送流程 */
COMMON_SEND("send", "普通发送"),
/** 撤回流程 */
RECALL("recall", "撤回消息");

@ -3,6 +3,10 @@ package com.java3y.austin.dao;
import com.java3y.austin.domain.MessageTemplate;
import org.springframework.data.repository.CrudRepository;
/**
* Dao
* @author 3y
*/
public interface MessageTemplateDao extends CrudRepository<MessageTemplate, Long> {
}

@ -3,6 +3,11 @@ package com.java3y.austin.dao;
import com.java3y.austin.domain.SmsRecord;
import org.springframework.data.repository.CrudRepository;
/**
* Dao
* @author 3y
*
*/
public interface SmsRecordDao extends CrudRepository<SmsRecord, Long> {

@ -17,8 +17,9 @@ import javax.persistence.Id;
@AllArgsConstructor
@Entity
/**
* @autor 3y
*
* DO
* @author 3y
*/
public class MessageTemplate {

@ -17,6 +17,7 @@ import javax.persistence.Id;
@Entity
/**
*
* @author 3y
*/
public class SmsRecord {

@ -1,19 +0,0 @@
package com.java3y.austin.pipeline;
import lombok.Builder;
/**
* @author 3y
* @date 2021/11/22
* @cription
*/
@Builder
public class ProcessResponse {
/** 返回值编码 */
private final String code;
/** 返回值描述 */
private final String description;
}

@ -15,19 +15,24 @@ import java.util.Map;
*/
public class ContentHolderUtil {
// 占位符前缀
/**
*
*/
private static final String PLACE_HOLDER_PREFIX = "{$";
// 占位符后缀
/**
*
*/
private static final String PLACE_HOLDER_ENDFIX = "}";
private static final StandardEvaluationContext evalutionContext;
private static final StandardEvaluationContext EVALUTION_CONTEXT;
private static PropertyPlaceholderHelper propertyPlaceholderHelper = new PropertyPlaceholderHelper(
PLACE_HOLDER_PREFIX, PLACE_HOLDER_ENDFIX);
static {
evalutionContext = new StandardEvaluationContext();
evalutionContext.addPropertyAccessor(new MapAccessor());
EVALUTION_CONTEXT = new StandardEvaluationContext();
EVALUTION_CONTEXT.addPropertyAccessor(new MapAccessor());
}
public static String replacePlaceHolder(final String template, final Map<String, String> paramMap) {

@ -1,6 +1,7 @@
package com.java3y.austin.utils;
import cn.hutool.core.date.DateUtil;
import com.java3y.austin.constant.AustinConstant;
import java.util.Date;
@ -19,7 +20,7 @@ public class TaskInfoUtils {
* (16)
*/
public static Long generateBusinessId(Long templateId, Integer templateType) {
Integer today = Integer.valueOf(DateUtil.format(new Date(), "yyyyMMdd"));
Integer today = Integer.valueOf(DateUtil.format(new Date(), AustinConstant.YYYYMMDD));
return Long.valueOf(String.format("%d%s", templateType * TYPE_FLAG + templateId, today));
}

@ -4,6 +4,9 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author 3y
*/
@SpringBootApplication
public class AustinApplication {
public static void main(String[] args) {

@ -1,24 +0,0 @@
//package com.java3y.austin.controller;
//
//import com.java3y.austin.kafkatest.UserLogProducer;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.web.bind.annotation.GetMapping;
//import org.springframework.web.bind.annotation.RestController;
//
//@RestController
//public class KafkaTestController {
//
// @Autowired
// private UserLogProducer userLogProducer;
//
// /**
// * test insert
// */
// @GetMapping("/kafka/insert")
// public String insert(String userId) {
// userLogProducer.sendLog(userId);
//
// return null;
// }
//
//}

@ -11,7 +11,9 @@ import org.springframework.web.bind.annotation.RestController;
/**
*
* MessageTemplateController
*
* @author 3y
*/
@RestController
public class MessageTemplateController {

@ -28,12 +28,12 @@ public class SendController {
/**
*
* messageTemplate Id 1
* {"auditStatus":10,"auditor":"yyyyyyz","created":1636978066,"creator":"yyyyc","deduplicationTime":1,"expectPushTime":"0","flowId":"yyyy","id":1,"idType":20,"isDeleted":0,"isNightShield":0,"msgContent":"{\"content\":\"{$contentValue}\"}","msgStatus":10,"msgType":10,"name":"test短信","proposer":"yyyy22","sendAccount":66,"sendChannel":30,"team":"yyyt","templateType":10,"updated":1636978066,"updator":"yyyyu"}
* {"auditStatus":10,"auditor":"yyyyyyz","created":1636978066,"creator":"yyyyc","deduplicationTime":1,"expectPushTime":"0","flowId":"yyyy","id":1,"idType":30,"isDeleted":0,"isNightShield":0,"msgContent":"{\"content\":\"{$contentValue}\"}","msgStatus":10,"msgType":10,"name":"test短信","proposer":"yyyy22","sendAccount":66,"sendChannel":30,"team":"yyyt","templateType":10,"updated":1636978066,"updator":"yyyyu"}
*
*/
// 文案参数
Map<String, String> variables = new HashMap<>();
Map<String, String> variables = new HashMap<>(8);
variables.put("contentValue", "6666");
MessageParam messageParam = new MessageParam().setReceiver(phone).setVariables(variables);

@ -1,32 +0,0 @@
package com.java3y.austin.controller;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.dao.SmsRecordDao;
import com.java3y.austin.domain.MessageTemplate;
import com.java3y.austin.domain.SmsRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
public class SmsRecordController {
@Autowired
private SmsRecordDao smsRecordDao;
/**
* test insert
*/
@GetMapping("/insert")
public String insert(Integer phone) {
return null;
}
/**
* test query
*/
@GetMapping("/query")
public String query() {
return null;
}
}

@ -1,18 +0,0 @@
package com.java3y.austin.kafkatest;
import lombok.Data;
import lombok.experimental.Accessors;
/**
* @Author 18011618
* @Description
* @Date 14:42 2018/7/20
* @Modify By
*/
@Data
@Accessors(chain = true)
public class UserLog {
private String username;
private String userid;
private String state;
}

@ -1,30 +0,0 @@
package com.java3y.austin.kafkatest;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* @Author 18011618
* @Description
* @Date 14:50 2018/7/20
* @Modify By
*/
@Component
@Slf4j
public class UserLogConsumer {
@KafkaListener(topics = {"austin"},groupId = "austinGroup1")
public void consumer(ConsumerRecord<?,?> consumerRecord){
//判断是否为null
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
log.info(">>>>>>>>>> record =" + kafkaMessage);
if(kafkaMessage.isPresent()){
//得到Optional实例中的值
Object message = kafkaMessage.get();
System.err.println("消费消息:"+message);
}
}
}

@ -1,29 +0,0 @@
package com.java3y.austin.kafkatest;
import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
/**
* @Author 18011618
* @Description
* @Date 14:43 2018/7/20
* @Modify By
*/
@Component
public class UserLogProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
*
* @param userid
*/
public void sendLog(String userid){
UserLog userLog = new UserLog();
userLog.setUsername("jhp").setUserid(userid).setState("0");
System.err.println("发送用户日志数据:"+userLog);
kafkaTemplate.send("austin", JSON.toJSONString(userLog));
}
}
Loading…
Cancel
Save