master:单机限流解耦 修改部分代码逻辑

master
TOM 3 years ago
parent e127e1d9a6
commit 77c895c48a

@ -36,7 +36,7 @@ public class TaskHandlerImpl implements TaskHandler {
public void handle(Long messageTemplateId) { public void handle(Long messageTemplateId) {
MessageTemplate messageTemplate = messageTemplateDao.findById(messageTemplateId).get(); MessageTemplate messageTemplate = messageTemplateDao.findById(messageTemplateId).get();
if (messageTemplate == null || StrUtil.isBlank(messageTemplate.getCronCrowdPath())) { if (StrUtil.isBlank(messageTemplate.getCronCrowdPath())) {
log.error("TaskHandler#handle crowdPath empty! messageTemplateId:{}", messageTemplateId); log.error("TaskHandler#handle crowdPath empty! messageTemplateId:{}", messageTemplateId);
return; return;
} }

@ -15,8 +15,8 @@ import java.util.Map;
@Service @Service
public class DeduplicationHolder { public class DeduplicationHolder {
private Map<Integer, Builder> builderHolder = new HashMap<>(4); private final Map<Integer, Builder> builderHolder = new HashMap<>(4);
private Map<Integer, DeduplicationService> serviceHolder = new HashMap<>(4); private final Map<Integer, DeduplicationService> serviceHolder = new HashMap<>(4);
public Builder selectBuilder(Integer key) { public Builder selectBuilder(Integer key) {
return builderHolder.get(key); return builderHolder.get(key);

@ -29,7 +29,7 @@ public class SimpleLimitService extends AbstractLimitService {
public Set<String> limitFilter(AbstractDeduplicationService service, TaskInfo taskInfo, DeduplicationParam param) { public Set<String> limitFilter(AbstractDeduplicationService service, TaskInfo taskInfo, DeduplicationParam param) {
Set<String> filterReceiver = new HashSet<>(taskInfo.getReceiver().size()); Set<String> filterReceiver = new HashSet<>(taskInfo.getReceiver().size());
// 获取redis记录 // 获取redis记录
Map<String, String> readyPutRedisReceiver = new HashMap(taskInfo.getReceiver().size()); Map<String, String> readyPutRedisReceiver = new HashMap<>(taskInfo.getReceiver().size());
//redis数据隔离 //redis数据隔离
List<String> keys = deduplicationAllKey(service, taskInfo).stream().map(key -> LIMIT_TAG + key).collect(Collectors.toList()); List<String> keys = deduplicationAllKey(service, taskInfo).stream().map(key -> LIMIT_TAG + key).collect(Collectors.toList());
Map<String, String> inRedisValue = redisUtils.mGet(keys); Map<String, String> inRedisValue = redisUtils.mGet(keys);

@ -1,4 +1,4 @@
package com.java3y.austin.handler.flowcontrol.impl; package com.java3y.austin.handler.flowcontrol;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
@ -7,11 +7,18 @@ import com.java3y.austin.common.constant.AustinConstant;
import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.ChannelType; import com.java3y.austin.common.enums.ChannelType;
import com.java3y.austin.handler.enums.RateLimitStrategy; import com.java3y.austin.handler.enums.RateLimitStrategy;
import com.java3y.austin.handler.flowcontrol.FlowControlParam; import com.java3y.austin.handler.flowcontrol.annotations.LocalRateLimit;
import com.java3y.austin.handler.flowcontrol.FlowControlService;
import com.java3y.austin.support.service.ConfigService; import com.java3y.austin.support.service.ConfigService;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
/** /**
@ -20,22 +27,26 @@ import org.springframework.stereotype.Service;
*/ */
@Service @Service
@Slf4j @Slf4j
public class FlowControlServiceImpl implements FlowControlService { public class FlowControlFactory implements ApplicationContextAware {
private static final String FLOW_CONTROL_KEY = "flowControlRule"; private static final String FLOW_CONTROL_KEY = "flowControlRule";
private static final String FLOW_CONTROL_PREFIX = "flow_control_"; private static final String FLOW_CONTROL_PREFIX = "flow_control_";
private final Map<RateLimitStrategy, FlowControlService> flowControlServiceMap = new ConcurrentHashMap<>();
@Autowired @Autowired
private ConfigService config; private ConfigService config;
private ApplicationContext applicationContext;
@Override @Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public void flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam) { public void flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam) {
RateLimiter rateLimiter = flowControlParam.getRateLimiter(); RateLimiter rateLimiter;
Double rateInitValue = flowControlParam.getRateInitValue(); Double rateInitValue = flowControlParam.getRateInitValue();
double costTime = 0;
// 对比 初始限流值 与 配置限流值,以 配置中心的限流值为准 // 对比 初始限流值 与 配置限流值,以 配置中心的限流值为准
Double rateLimitConfig = getRateLimitConfig(taskInfo.getSendChannel()); Double rateLimitConfig = getRateLimitConfig(taskInfo.getSendChannel());
if (rateLimitConfig != null && !rateInitValue.equals(rateLimitConfig)) { if (rateLimitConfig != null && !rateInitValue.equals(rateLimitConfig)) {
@ -43,13 +54,12 @@ public class FlowControlServiceImpl implements FlowControlService {
flowControlParam.setRateInitValue(rateLimitConfig); flowControlParam.setRateInitValue(rateLimitConfig);
flowControlParam.setRateLimiter(rateLimiter); flowControlParam.setRateLimiter(rateLimiter);
} }
if (RateLimitStrategy.REQUEST_RATE_LIMIT.equals(flowControlParam.getRateLimitStrategy())) { FlowControlService flowControlService = flowControlServiceMap.get(flowControlParam.getRateLimitStrategy());
costTime = rateLimiter.acquire(1); if (Objects.isNull(flowControlService)) {
log.error("没有找到对应的单机限流策略");
return;
} }
if (RateLimitStrategy.SEND_USER_NUM_RATE_LIMIT.equals(flowControlParam.getRateLimitStrategy())) { double costTime = flowControlService.flowControl(taskInfo, flowControlParam);
costTime = rateLimiter.acquire(taskInfo.getReceiver().size());
}
if (costTime > 0) { if (costTime > 0) {
log.info("consumer {} flow control time {}", log.info("consumer {} flow control time {}",
ChannelType.getEnumByCode(taskInfo.getSendChannel()).getDescription(), costTime); ChannelType.getEnumByCode(taskInfo.getSendChannel()).getDescription(), costTime);
@ -73,4 +83,17 @@ public class FlowControlServiceImpl implements FlowControlService {
} }
return jsonObject.getDouble(FLOW_CONTROL_PREFIX + channelCode); return jsonObject.getDouble(FLOW_CONTROL_PREFIX + channelCode);
} }
@PostConstruct
private void init() {
Map<String, Object> serviceMap = this.applicationContext.getBeansWithAnnotation(LocalRateLimit.class);
serviceMap.forEach((name, service) -> {
if (service instanceof FlowControlService) {
LocalRateLimit localRateLimit = AopUtils.getTargetClass(service).getAnnotation(LocalRateLimit.class);
RateLimitStrategy rateLimitStrategy = localRateLimit.rateLimitStrategy();
//通常情况下 实现的限流service与rateLimitStrategy一一对应
flowControlServiceMap.put(rateLimitStrategy, (FlowControlService) service);
}
});
}
} }

@ -15,6 +15,6 @@ public interface FlowControlService {
* @param taskInfo * @param taskInfo
* @param flowControlParam * @param flowControlParam
*/ */
void flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam); Double flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam);
} }

@ -0,0 +1,22 @@
package com.java3y.austin.handler.flowcontrol.annotations;
import com.java3y.austin.handler.enums.RateLimitStrategy;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.stereotype.Service;
/**
*
* Created by TOM
* On 2022/7/21 17:03
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Service
public @interface LocalRateLimit {
RateLimitStrategy rateLimitStrategy() default RateLimitStrategy.REQUEST_RATE_LIMIT;
}

@ -0,0 +1,28 @@
package com.java3y.austin.handler.flowcontrol.impl;
import com.google.common.util.concurrent.RateLimiter;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.enums.RateLimitStrategy;
import com.java3y.austin.handler.flowcontrol.FlowControlParam;
import com.java3y.austin.handler.flowcontrol.FlowControlService;
import com.java3y.austin.handler.flowcontrol.annotations.LocalRateLimit;
/**
* Created by TOM
* On 2022/7/21 17:05
*/
@LocalRateLimit(rateLimitStrategy = RateLimitStrategy.REQUEST_RATE_LIMIT)
public class RequestRateLimitService implements FlowControlService {
/**
*
*
* @param taskInfo
* @param flowControlParam
*/
@Override
public Double flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam) {
RateLimiter rateLimiter = flowControlParam.getRateLimiter();
return rateLimiter.acquire(1);
}
}

@ -0,0 +1,28 @@
package com.java3y.austin.handler.flowcontrol.impl;
import com.google.common.util.concurrent.RateLimiter;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.enums.RateLimitStrategy;
import com.java3y.austin.handler.flowcontrol.FlowControlParam;
import com.java3y.austin.handler.flowcontrol.FlowControlService;
import com.java3y.austin.handler.flowcontrol.annotations.LocalRateLimit;
/**
* Created by TOM
* On 2022/7/21 17:14
*/
@LocalRateLimit(rateLimitStrategy = RateLimitStrategy.SEND_USER_NUM_RATE_LIMIT)
public class SendUserNumRateLimitService implements FlowControlService {
/**
*
*
* @param taskInfo
* @param flowControlParam
*/
@Override
public Double flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam) {
RateLimiter rateLimiter = flowControlParam.getRateLimiter();
return rateLimiter.acquire(taskInfo.getReceiver().size());
}
}

@ -1,16 +1,13 @@
package com.java3y.austin.handler.handler; package com.java3y.austin.handler.handler;
import com.google.common.util.concurrent.RateLimiter;
import com.java3y.austin.common.domain.AnchorInfo; import com.java3y.austin.common.domain.AnchorInfo;
import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.AnchorState; import com.java3y.austin.common.enums.AnchorState;
import com.java3y.austin.handler.enums.RateLimitStrategy; import com.java3y.austin.handler.flowcontrol.FlowControlFactory;
import com.java3y.austin.handler.flowcontrol.FlowControlParam; import com.java3y.austin.handler.flowcontrol.FlowControlParam;
import com.java3y.austin.handler.flowcontrol.FlowControlService;
import com.java3y.austin.support.utils.LogUtils; import com.java3y.austin.support.utils.LogUtils;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
/** /**
* @author 3y * @author 3y
@ -22,7 +19,7 @@ public abstract class BaseHandler implements Handler {
@Autowired @Autowired
private LogUtils logUtils; private LogUtils logUtils;
@Autowired @Autowired
private FlowControlService flowControlService; private FlowControlFactory flowControlFactory;
/** /**
* Code * Code
@ -52,7 +49,7 @@ public abstract class BaseHandler implements Handler {
public void flowControl(TaskInfo taskInfo) { public void flowControl(TaskInfo taskInfo) {
// 只有子类指定了限流参数,才需要限流 // 只有子类指定了限流参数,才需要限流
if (flowControlParam != null) { if (flowControlParam != null) {
flowControlService.flowControl(taskInfo, flowControlParam); flowControlFactory.flowControl(taskInfo, flowControlParam);
} }
} }
@Override @Override

@ -50,7 +50,7 @@ public class ShieldServiceImpl implements ShieldService {
} }
if (ShieldType.NIGHT_SHIELD_BUT_NEXT_DAY_SEND.getCode().equals(taskInfo.getShieldType())) { if (ShieldType.NIGHT_SHIELD_BUT_NEXT_DAY_SEND.getCode().equals(taskInfo.getShieldType())) {
redisUtils.lPush(NIGHT_SHIELD_BUT_NEXT_DAY_SEND_KEY, JSON.toJSONString(taskInfo, redisUtils.lPush(NIGHT_SHIELD_BUT_NEXT_DAY_SEND_KEY, JSON.toJSONString(taskInfo,
new SerializerFeature[]{SerializerFeature.WriteClassName}), SerializerFeature.WriteClassName),
(DateUtil.offsetDay(new Date(), 1).getTime() / 1000) - DateUtil.currentSeconds()); (DateUtil.offsetDay(new Date(), 1).getTime() / 1000) - DateUtil.currentSeconds());
logUtils.print(AnchorInfo.builder().state(AnchorState.NIGHT_SHIELD_NEXT_SEND.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build()); logUtils.print(AnchorInfo.builder().state(AnchorState.NIGHT_SHIELD_NEXT_SEND.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build());
} }

Loading…
Cancel
Save