diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/ReceiverStart.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/ReceiverStart.java index 49bd953..0ba8242 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/ReceiverStart.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/ReceiverStart.java @@ -1,11 +1,9 @@ package com.java3y.austin.handler.receiver.kafka; -import com.alibaba.fastjson.JSON; import com.java3y.austin.handler.utils.GroupIdMappingUtils; +import com.java3y.austin.support.constans.MessageQueuePipeline; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; -import com.java3y.austin.support.constans.MessageQueuePipeline; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -14,8 +12,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.listener.ContainerProperties; -import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; @@ -40,6 +36,9 @@ public class ReceiverStart { @Autowired private ConsumerFactory consumerFactory; + @Value("${austin.nacos.enabled}") + private Boolean nacosEnabled; + /** * receiver的消费方法常量 */ @@ -60,7 +59,13 @@ public class ReceiverStart { */ @PostConstruct public void init() { - for (int i = 0; i < groupIds.size(); i++) { + int total = groupIds.size(); + if (nacosEnabled) { + // 当nacos开启时 会导致Receiver提前加载 所以这里getBean次数-1 + // nacos issue: https://github.com/nacos-group/nacos-spring-project/issues/249 + total -= 1; + } + for (int i = 0; i < total; i++) { context.getBean(Receiver.class); } } @@ -74,8 +79,7 @@ public class ReceiverStart { if (element instanceof Method) { String name = ((Method) element).getDeclaringClass().getSimpleName() + "." + ((Method) element).getName(); if (RECEIVER_METHOD_NAME.equals(name)) { - attrs.put("groupId", groupIds.get(index)); - index++; + attrs.put("groupId", groupIds.get(index++)); } } return attrs; diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java index ed01385..85e80c9 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java @@ -36,6 +36,9 @@ public class SendMqAction implements BusinessProcess { @Value("${austin.business.tagId.value}") private String tagId; + @Value("${austin.mq.pipeline}") + private String mqPipeline; + @Override public void process(ProcessContext context) { @@ -50,7 +53,7 @@ public class SendMqAction implements BusinessProcess { } } catch (Exception e) { context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR)); - log.error("send kafka fail! e:{},params:{}", Throwables.getStackTraceAsString(e) + log.error("send {} fail! e:{},params:{}", mqPipeline, Throwables.getStackTraceAsString(e) , JSON.toJSONString(CollUtil.getFirst(sendTaskModel.getTaskInfo().listIterator()))); } }