diff --git a/INSTALL.md b/INSTALL.md index 4decd99..d0515ce 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -183,10 +183,10 @@ docker ps docker exec -it kafka sh ``` -创建一个topic(这里我的**topicName**就叫austinTopic,你们可以改成自己的) +创建一个topic(这里我的**topicName**就叫austinBusiness,你们可以改成自己的) ``` -$KAFKA_HOME/bin/kafka-topics.sh --create --topic austinTopic --partitions 4 --zookeeper zookeeper:2181 --replication-factor 1 +$KAFKA_HOME/bin/kafka-topics.sh --create --topic austinBusiness --partitions 4 --zookeeper zookeeper:2181 --replication-factor 1 ``` 查看刚创建的topic信息: @@ -344,16 +344,16 @@ global: scrape_configs: - job_name: 'prometheus'   static_configs: -   - targets: ['ip:9090'] // TODO ip自己写 +   - targets: ['ip:9090'] - job_name: 'cadvisor'   static_configs: -   - targets: ['ip:8899'] // TODO ip自己写 +   - targets: ['ip:8899'] - job_name: 'node'   static_configs: -   - targets: ['ip:9100'] // TODO ip自己写 +   - targets: ['ip:9100'] ``` -(**这里要注意端口,按自己配置的来**) +(**这里要注意端口,按自己配置的来,ip也要填写为自己的**) 把这份`prometheus.yml`的配置往`/etc/prometheus/prometheus.yml` 路径下**复制**一份。随后在目录下`docker-compose up -d`启动,于是我们就可以分别访问: diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/AbstractDeduplicationService.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/AbstractDeduplicationService.java index 5456eb1..9c0bbc4 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/AbstractDeduplicationService.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/AbstractDeduplicationService.java @@ -33,6 +33,9 @@ public abstract class AbstractDeduplicationService implements DeduplicationServi @Autowired private RedisUtils redisUtils; + @Autowired + private LogUtils logUtils; + @Override public void deduplication(DeduplicationParam param) { @@ -62,7 +65,7 @@ public abstract class AbstractDeduplicationService implements DeduplicationServi // 剔除符合去重条件的用户 if (CollUtil.isNotEmpty(filterReceiver)) { taskInfo.getReceiver().removeAll(filterReceiver); - LogUtils.print(AnchorInfo.builder().businessId(taskInfo.getBusinessId()).ids(filterReceiver).state(param.getAnchorState().getCode()).build()); + logUtils.print(AnchorInfo.builder().businessId(taskInfo.getBusinessId()).ids(filterReceiver).state(param.getAnchorState().getCode()).build()); } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/discard/DiscardMessageService.java b/austin-handler/src/main/java/com/java3y/austin/handler/discard/DiscardMessageService.java index 2668555..776afed 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/discard/DiscardMessageService.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/discard/DiscardMessageService.java @@ -9,6 +9,7 @@ import com.java3y.austin.common.domain.AnchorInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.enums.AnchorState; import com.java3y.austin.support.utils.LogUtils; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** @@ -22,6 +23,10 @@ public class DiscardMessageService { @ApolloConfig("boss.austin") private Config config; + @Autowired + private LogUtils logUtils; + + /** * 丢弃消息,配置在apollo * @param taskInfo @@ -33,7 +38,7 @@ public class DiscardMessageService { AustinConstant.APOLLO_DEFAULT_VALUE_JSON_ARRAY)); if (array.contains(String.valueOf(taskInfo.getMessageTemplateId()))) { - LogUtils.print(AnchorInfo.builder().businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).state(AnchorState.DISCARD.getCode()).build()); + logUtils.print(AnchorInfo.builder().businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).state(AnchorState.DISCARD.getCode()).build()); return true; } return false; diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/BaseHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/BaseHandler.java index 0edf0da..ec16778 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/BaseHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/BaseHandler.java @@ -23,6 +23,9 @@ public abstract class BaseHandler implements Handler { @Autowired private HandlerHolder handlerHolder; + @Autowired + private LogUtils logUtils; + /** * 初始化渠道与Handler的映射关系 @@ -35,10 +38,10 @@ public abstract class BaseHandler implements Handler { @Override public void doHandler(TaskInfo taskInfo) { if (handler(taskInfo)) { - LogUtils.print(AnchorInfo.builder().state(AnchorState.SEND_SUCCESS.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build()); + logUtils.print(AnchorInfo.builder().state(AnchorState.SEND_SUCCESS.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build()); return; } - LogUtils.print(AnchorInfo.builder().state(AnchorState.SEND_FAIL.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build()); + logUtils.print(AnchorInfo.builder().state(AnchorState.SEND_FAIL.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build()); } /** diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java index c68f6c5..1eb9b0d 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java @@ -38,6 +38,9 @@ public class Receiver { @Autowired private TaskPendingHolder taskPendingHolder; + @Autowired + private LogUtils logUtils; + @KafkaListener(topics = "#{'${austin.business.topic.name}'}") public void consumer(ConsumerRecord consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId) { Optional kafkaMessage = Optional.ofNullable(consumerRecord.value()); @@ -51,7 +54,7 @@ public class Receiver { */ if (topicGroupId.equals(messageGroupId)) { for (TaskInfo taskInfo : taskInfoLists) { - LogUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), AnchorInfo.builder().ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build()); + logUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), AnchorInfo.builder().ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build()); Task task = context.getBean(Task.class).setTaskInfo(taskInfo); taskPendingHolder.route(topicGroupId).execute(task); } diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java index 4f667be..e441e03 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java @@ -9,10 +9,10 @@ import com.java3y.austin.common.vo.BasicResultVO; import com.java3y.austin.service.api.impl.domain.SendTaskModel; import com.java3y.austin.support.pipeline.BusinessProcess; import com.java3y.austin.support.pipeline.ProcessContext; +import com.java3y.austin.support.utils.KafkaUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.core.KafkaTemplate; /** * @author 3y @@ -22,7 +22,7 @@ import org.springframework.kafka.core.KafkaTemplate; public class SendMqAction implements BusinessProcess { @Autowired - private KafkaTemplate kafkaTemplate; + private KafkaUtils kafkaUtils; @Value("${austin.business.topic.name}") private String topicName; @@ -30,14 +30,14 @@ public class SendMqAction implements BusinessProcess { @Override public void process(ProcessContext context) { SendTaskModel sendTaskModel = (SendTaskModel) context.getProcessModel(); + String message = JSON.toJSONString(sendTaskModel.getTaskInfo(), new SerializerFeature[]{SerializerFeature.WriteClassName}); + try { - kafkaTemplate.send(topicName, JSON.toJSONString(sendTaskModel.getTaskInfo(), - new SerializerFeature[] {SerializerFeature.WriteClassName})); + kafkaUtils.send(topicName, message); } catch (Exception e) { context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR)); log.error("send kafka fail! e:{},params:{}", Throwables.getStackTraceAsString(e) , JSON.toJSONString(CollUtil.getFirst(sendTaskModel.getTaskInfo().listIterator()))); - } } } diff --git a/austin-stream/pom.xml b/austin-stream/pom.xml index be9e53a..81f43fc 100644 --- a/austin-stream/pom.xml +++ b/austin-stream/pom.xml @@ -38,12 +38,6 @@ ${flink.version} - - org.projectlombok - lombok - - - com.java3y.austin austin-support diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/AustinBootStrap.java b/austin-stream/src/main/java/com/java3y/austin/stream/AustinBootStrap.java index 85f896b..c72c114 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/AustinBootStrap.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/AustinBootStrap.java @@ -1,5 +1,6 @@ package com.java3y.austin.stream; +import com.java3y.austin.stream.constants.AustinFlinkConstant; import com.java3y.austin.stream.utils.FlinkUtils; import com.java3y.austin.stream.utils.SpringContextUtils; import lombok.extern.slf4j.Slf4j; @@ -8,7 +9,6 @@ import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.springframework.context.ApplicationContext; /** * flink启动类 @@ -20,29 +20,28 @@ public class AustinBootStrap { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + SpringContextUtils.loadContext(AustinFlinkConstant.SPRING_CONFIG_PATH); - String topicName = "austinTopicV2"; - String groupId = "austinTopicV23"; - ApplicationContext applicationContext = SpringContextUtils.loadContext("classpath*:austin-spring.xml"); - FlinkUtils flinkUtils = applicationContext.getBean(FlinkUtils.class); - KafkaSource kafkaConsumer = flinkUtils.getKafkaConsumer(topicName, groupId); + /** + * 1.获取KafkaConsumer + */ + KafkaSource kafkaConsumer = SpringContextUtils.getBean(FlinkUtils.class).getKafkaConsumer(AustinFlinkConstant.TOPIC_NAME, AustinFlinkConstant.GROUP_ID, AustinFlinkConstant.BROKER); + DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), AustinFlinkConstant.SOURCE_NAME); - DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafkaSource"); + + /** + * 2. 数据转换处理 + */ + + /** + * 3. 将实时数据多维度写入Redis(已实现),离线数据写入hive(未实现) + */ kafkaSource.addSink(new SinkFunction() { @Override public void invoke(String value, Context context) throws Exception { log.error("kafka value:{}", value); } }); - - - - // DataStream stream = envBatchPendingThread -// .addSource(new AustinSource()) -// .name("transactions"); -// -// stream.addSink(new AustinSink()); - env.execute("AustinBootStrap"); } diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java b/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java new file mode 100644 index 0000000..e19af36 --- /dev/null +++ b/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java @@ -0,0 +1,27 @@ +package com.java3y.austin.stream.constants; + +public class AustinFlinkConstant { + + /** + * Kafka 配置信息 + * TODO 使用前需要把broker配置 + */ + public static final String GROUP_ID = "austinLogGroup"; + public static final String TOPIC_NAME = "austinLog"; + public static final String BROKER = "ip:port"; + + + /** + * spring配置文件路径 + */ + public static final String SPRING_CONFIG_PATH = "classpath*:austin-spring.xml"; + + + /** + * Flink流程常量 + */ + public static final String SOURCE_NAME = "austin_kafka_source"; + public static final String FUNCTION_NAME = "austin_transfer"; + public static final String SINK_NAME = "austin_sink"; + +} diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java b/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java index 583db1a..7e0e3b1 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java @@ -5,6 +5,9 @@ import com.java3y.austin.common.domain.AnchorInfo; import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +/** + * mock + */ @Slf4j public class AustinSink extends RichSinkFunction { diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/source/AustinSource.java b/austin-stream/src/main/java/com/java3y/austin/stream/source/AustinSource.java deleted file mode 100644 index 1427727..0000000 --- a/austin-stream/src/main/java/com/java3y/austin/stream/source/AustinSource.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.java3y.austin.stream.source; - -import com.java3y.austin.common.domain.AnchorInfo; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; - -import java.util.ArrayList; -import java.util.List; - -/** - * 数据源 mock - * - * @author 3y - */ -public class AustinSource extends RichSourceFunction { - @Override - public void run(SourceContext sourceContext) throws Exception { - List anchorInfoList = new ArrayList<>(); - - for (int i = 0; i < 10; i++) { - anchorInfoList.add(AnchorInfo.builder() - .state(10).businessId(333L) - .timestamp(System.currentTimeMillis()).build()); - - } - for (AnchorInfo anchorInfo : anchorInfoList) { - sourceContext.collect(anchorInfo); - } - } - - @Override - public void cancel() { - - } -} diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/utils/FlinkUtils.java b/austin-stream/src/main/java/com/java3y/austin/stream/utils/FlinkUtils.java index d605a0c..76e71c0 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/utils/FlinkUtils.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/utils/FlinkUtils.java @@ -1,5 +1,6 @@ package com.java3y.austin.stream.utils; +import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; @@ -9,17 +10,18 @@ import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsIni * * @author 3y */ +@Slf4j public class FlinkUtils { - /** * 获取kafkaConsumer + * * @param topicName * @param groupId * @return */ - public KafkaSource getKafkaConsumer(String topicName, String groupId) { + public KafkaSource getKafkaConsumer(String topicName, String groupId, String broker) { KafkaSource source = KafkaSource.builder() - .setBootstrapServers("ip:port") + .setBootstrapServers(broker) .setTopics(topicName) .setGroupId(groupId) .setStartingOffsets(OffsetsInitializer.earliest()) diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/utils/SpringContextUtils.java b/austin-stream/src/main/java/com/java3y/austin/stream/utils/SpringContextUtils.java index 0ce863f..9f26e5c 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/utils/SpringContextUtils.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/utils/SpringContextUtils.java @@ -1,5 +1,6 @@ package com.java3y.austin.stream.utils; +import cn.hutool.core.collection.CollUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; @@ -13,32 +14,35 @@ import java.util.List; * 获取SpringContext对象 */ @Slf4j -public class SpringContextUtils { +public class SpringContextUtils { private static ApplicationContext context; - + /** + * XML配置 + */ private static List xmlPath = new ArrayList<>(); - public static ApplicationContext loadContext(String path) { return loadContext(new String[]{path}); } + /** + * 通过spring.xml文件配置将信息 装载 context + * + * @param paths + * @return + */ public static synchronized ApplicationContext loadContext(String[] paths) { if (null != paths && paths.length > 0) { - //筛选新增 List newPaths = new ArrayList<>(); for (String path : paths) { if (!xmlPath.contains(path)) { - log.info("ApplicationContextFactory add new path {}", path); newPaths.add(path); - } else { - log.info("ApplicationContextFactory already load path {}", path); } } - if (!newPaths.isEmpty()) { + if (CollUtil.isNotEmpty(newPaths)) { String[] array = new String[newPaths.size()]; - for (int i=0; i