package com.java3y.austin.action; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; import com.google.common.base.Throwables; import com.java3y.austin.domain.SendTaskModel; import com.java3y.austin.enums.RespStatusEnum; import com.java3y.austin.pipeline.BusinessProcess; import com.java3y.austin.pipeline.ProcessContext; import com.java3y.austin.vo.BasicResultVO; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; /** * @author 3y * 将消息发送到MQ */ @Slf4j public class SendMqAction implements BusinessProcess { @Autowired private KafkaTemplate kafkaTemplate; @Value("${austin.business.topic.name}") private String topicName; @Override public void process(ProcessContext context) { SendTaskModel sendTaskModel = (SendTaskModel) context.getProcessModel(); try { kafkaTemplate.send(topicName, JSON.toJSONString(sendTaskModel.getTaskInfo(), new SerializerFeature[] {SerializerFeature.WriteClassName})); } catch (Exception e) { context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR)); log.error("send kafka fail! e:{},params:{}", Throwables.getStackTraceAsString(e) , JSON.toJSONString(sendTaskModel.getTaskInfo().get(0))); } } }