diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/handler/NightShieldLazyPendingHandler.java b/austin-cron/src/main/java/com/java3y/austin/cron/handler/NightShieldLazyPendingHandler.java index c7452bb..0ee2191 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/handler/NightShieldLazyPendingHandler.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/handler/NightShieldLazyPendingHandler.java @@ -6,12 +6,12 @@ import com.alibaba.fastjson.serializer.SerializerFeature; import com.google.common.base.Throwables; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.support.config.SupportThreadPoolConfig; -import com.java3y.austin.support.utils.KafkaUtils; import com.java3y.austin.support.utils.RedisUtils; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import java.util.Arrays; @@ -31,7 +31,7 @@ public class NightShieldLazyPendingHandler { private static final String NIGHT_SHIELD_BUT_NEXT_DAY_SEND_KEY = "night_shield_send"; @Autowired - private KafkaUtils kafkaUtils; + private KafkaTemplate kafkaTemplate; @Value("${austin.business.topic.name}") private String topicName; @Autowired @@ -48,7 +48,7 @@ public class NightShieldLazyPendingHandler { String taskInfo = redisUtils.lPop(NIGHT_SHIELD_BUT_NEXT_DAY_SEND_KEY); if (StrUtil.isNotBlank(taskInfo)) { try { - kafkaUtils.send(topicName, JSON.toJSONString(Arrays.asList(JSON.parseObject(taskInfo, TaskInfo.class)) + kafkaTemplate.send(topicName, JSON.toJSONString(Arrays.asList(JSON.parseObject(taskInfo, TaskInfo.class)) , new SerializerFeature[]{SerializerFeature.WriteClassName})); } catch (Exception e) { log.error("nightShieldLazyJob send kafka fail! e:{},params:{}", Throwables.getStackTraceAsString(e), taskInfo); diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java index fda58c7..e95ccda 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java @@ -4,24 +4,18 @@ import cn.hutool.core.collection.CollUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; import com.google.common.base.Throwables; -import com.google.common.eventbus.EventBus; -import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.enums.RespStatusEnum; import com.java3y.austin.common.vo.BasicResultVO; import com.java3y.austin.service.api.enums.BusinessCode; import com.java3y.austin.service.api.impl.domain.SendTaskModel; import com.java3y.austin.support.mq.SendMqService; -import com.java3y.austin.support.mq.eventbus.EventBusListener; import com.java3y.austin.support.pipeline.BusinessProcess; import com.java3y.austin.support.pipeline.ProcessContext; -import com.java3y.austin.support.utils.KafkaUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import java.util.List; - /** * @author 3y * 将消息发送到MQ diff --git a/austin-support/src/main/java/com/java3y/austin/support/utils/KafkaUtils.java b/austin-support/src/main/java/com/java3y/austin/support/utils/KafkaUtils.java deleted file mode 100644 index 193f1ee..0000000 --- a/austin-support/src/main/java/com/java3y/austin/support/utils/KafkaUtils.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.java3y.austin.support.utils; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.stereotype.Component; - -/** - * @author 3y - * @date 2022/2/16 - * Kafka工具类 - */ -@Component -@Slf4j -public class KafkaUtils { - - @Autowired - private KafkaTemplate kafkaTemplate; - - /** - * 发送kafka消息 - * - * @param topicName - * @param jsonMessage - */ - public void send(String topicName, String jsonMessage) { - kafkaTemplate.send(topicName, jsonMessage); - } - -} diff --git a/austin-support/src/main/java/com/java3y/austin/support/utils/LogUtils.java b/austin-support/src/main/java/com/java3y/austin/support/utils/LogUtils.java index 055f065..1b542d1 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/utils/LogUtils.java +++ b/austin-support/src/main/java/com/java3y/austin/support/utils/LogUtils.java @@ -6,9 +6,11 @@ import com.alibaba.fastjson.JSON; import com.google.common.base.Throwables; import com.java3y.austin.common.domain.AnchorInfo; import com.java3y.austin.common.domain.LogParam; +import com.java3y.austin.support.constans.MessageQueuePipeline; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; /** @@ -20,8 +22,11 @@ import org.springframework.stereotype.Component; @Component public class LogUtils extends CustomLogListener { + @Value("${austin-mq-pipeline}") + private String mqPipeline; + @Autowired - private KafkaUtils kafkaUtils; + private KafkaTemplate kafkaTemplate; @Value("${austin.business.log.topic.name}") private String topicName; @@ -49,13 +54,15 @@ public class LogUtils extends CustomLogListener { anchorInfo.setTimestamp(System.currentTimeMillis()); String message = JSON.toJSONString(anchorInfo); log.info(message); - - try { - kafkaUtils.send(topicName, message); - } catch (Exception e) { - log.error("LogUtils#print kafka fail! e:{},params:{}", Throwables.getStackTraceAsString(e) - , JSON.toJSONString(anchorInfo)); + if (MessageQueuePipeline.KAFKA.equals(mqPipeline)) { + try { + kafkaTemplate.send(topicName, message); + } catch (Exception e) { + log.error("LogUtils#print kafka fail! e:{},params:{}", Throwables.getStackTraceAsString(e) + , JSON.toJSONString(anchorInfo)); + } } + } /**