From 845904ff3955c315c3daec5a1dac0966607d0bbf Mon Sep 17 00:00:00 2001 From: 3y Date: Tue, 15 Feb 2022 21:20:19 +0800 Subject: [PATCH] flink init --- austin-stream/pom.xml | 100 ++++++++++++++++++ .../java3y/austin/stream/AustinBootStrap.java | 44 ++++++++ .../java3y/austin/stream/sink/AustinSink.java | 17 +++ .../austin/stream/source/AustinSource.java | 34 ++++++ .../austin/stream/utils/FlinkUtils.java | 36 +++++++ .../stream/utils/SpringContextUtils.java | 31 ++++++ pom.xml | 34 ++++++ 7 files changed, 296 insertions(+) create mode 100644 austin-stream/pom.xml create mode 100644 austin-stream/src/main/java/com/java3y/austin/stream/AustinBootStrap.java create mode 100644 austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java create mode 100644 austin-stream/src/main/java/com/java3y/austin/stream/source/AustinSource.java create mode 100644 austin-stream/src/main/java/com/java3y/austin/stream/utils/FlinkUtils.java create mode 100644 austin-stream/src/main/java/com/java3y/austin/stream/utils/SpringContextUtils.java diff --git a/austin-stream/pom.xml b/austin-stream/pom.xml new file mode 100644 index 0000000..d2ae418 --- /dev/null +++ b/austin-stream/pom.xml @@ -0,0 +1,100 @@ + + + + austin + com.java3y.austin + 0.0.1-SNAPSHOT + + 4.0.0 + + austin-stream + + + + + org.apache.flink + flink-walkthrough-common_${scala.binary.version} + ${flink.version} + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + provided + + + org.apache.flink + flink-clients_${scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-connector-kafka_${scala.binary.version} + ${flink.version} + + + + org.projectlombok + lombok + + + + com.java3y.austin + austin-common + 0.0.1-SNAPSHOT + + + com.alibaba + fastjson + + + + org.springframework.boot + spring-boot-starter + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + + + META-INF/spring.handlers + + + META-INF/spring.factories + + + META-INF/spring.schemas + + + + com.java3y.austin.stream.AustinBootStrap + + + + + + + + + \ No newline at end of file diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/AustinBootStrap.java b/austin-stream/src/main/java/com/java3y/austin/stream/AustinBootStrap.java new file mode 100644 index 0000000..31c8216 --- /dev/null +++ b/austin-stream/src/main/java/com/java3y/austin/stream/AustinBootStrap.java @@ -0,0 +1,44 @@ +package com.java3y.austin.stream; + +import com.java3y.austin.stream.utils.FlinkUtils; +import com.java3y.austin.stream.utils.SpringContextUtils; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; + +/** + * flink启动类 + * + * @author 3y + */ +@Slf4j +public class AustinBootStrap { + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + String topicName = "austinTopicV2"; + String groupId = "austinTopicV23"; + + KafkaSource kafkaConsumer = SpringContextUtils.getBean(FlinkUtils.class) + .getKafkaConsumer(topicName, groupId); + DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafkaSource"); + kafkaSource.addSink(new SinkFunction() { + @Override + public void invoke(String value, Context context) throws Exception { + log.error("kafka value:{}", value); + } + }); +// DataStream stream = envBatchPendingThread +// .addSource(new AustinSource()) +// .name("transactions"); +// +// stream.addSink(new AustinSink()); + + env.execute("AustinBootStrap"); + } + +} diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java b/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java new file mode 100644 index 0000000..583db1a --- /dev/null +++ b/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java @@ -0,0 +1,17 @@ +package com.java3y.austin.stream.sink; + +import com.alibaba.fastjson.JSON; +import com.java3y.austin.common.domain.AnchorInfo; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; + +@Slf4j +public class AustinSink extends RichSinkFunction { + + @Override + public void invoke(AnchorInfo value, Context context) throws Exception { + + log.error("sink consume value:{}", JSON.toJSONString(value)); + + } +} diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/source/AustinSource.java b/austin-stream/src/main/java/com/java3y/austin/stream/source/AustinSource.java new file mode 100644 index 0000000..1427727 --- /dev/null +++ b/austin-stream/src/main/java/com/java3y/austin/stream/source/AustinSource.java @@ -0,0 +1,34 @@ +package com.java3y.austin.stream.source; + +import com.java3y.austin.common.domain.AnchorInfo; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; + +import java.util.ArrayList; +import java.util.List; + +/** + * 数据源 mock + * + * @author 3y + */ +public class AustinSource extends RichSourceFunction { + @Override + public void run(SourceContext sourceContext) throws Exception { + List anchorInfoList = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + anchorInfoList.add(AnchorInfo.builder() + .state(10).businessId(333L) + .timestamp(System.currentTimeMillis()).build()); + + } + for (AnchorInfo anchorInfo : anchorInfoList) { + sourceContext.collect(anchorInfo); + } + } + + @Override + public void cancel() { + + } +} diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/utils/FlinkUtils.java b/austin-stream/src/main/java/com/java3y/austin/stream/utils/FlinkUtils.java new file mode 100644 index 0000000..a8dcdbf --- /dev/null +++ b/austin-stream/src/main/java/com/java3y/austin/stream/utils/FlinkUtils.java @@ -0,0 +1,36 @@ +package com.java3y.austin.stream.utils; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** + * flink工具类 + * + * @author 3y + */ +@Component +public class FlinkUtils { + + @Value("${spring.kafka.bootstrap-servers}") + private String broker; + + /** + * 获取kafkaConsumer + * @param topicName + * @param groupId + * @return + */ + public KafkaSource getKafkaConsumer(String topicName, String groupId) { + KafkaSource source = KafkaSource.builder() + .setBootstrapServers(broker) + .setTopics(topicName) + .setGroupId(groupId) + .setStartingOffsets(OffsetsInitializer.earliest()) + .setValueOnlyDeserializer(new SimpleStringSchema()) + .build(); + return source; + } +} diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/utils/SpringContextUtils.java b/austin-stream/src/main/java/com/java3y/austin/stream/utils/SpringContextUtils.java new file mode 100644 index 0000000..fae8e4e --- /dev/null +++ b/austin-stream/src/main/java/com/java3y/austin/stream/utils/SpringContextUtils.java @@ -0,0 +1,31 @@ +package com.java3y.austin.stream.utils; + +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; + +/** + * @author 3y + * @date 2022/2/15 + * 获取SpringContext对象 + */ +public class SpringContextUtils implements ApplicationContextAware { + private static ApplicationContext applicationContext; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + applicationContext = applicationContext; + } + + + /** + * 根据bean的class来查找对象 + * + * @param clazz + * @return + */ + public static T getBean(Class clazz) { + return applicationContext.getBean(clazz); + } + +} diff --git a/pom.xml b/pom.xml index 7fed6ac..48ee4a4 100644 --- a/pom.xml +++ b/pom.xml @@ -12,6 +12,7 @@ austin-support austin-handler austin-cron + austin-stream @@ -29,6 +30,13 @@ 1.8 + UTF-8 + 1.14.3 + 1.8 + 2.11 + ${target.java.version} + ${target.java.version} + 2.17.1 @@ -118,6 +126,32 @@ 2.3.0 + + + org.apache.flink + flink-walkthrough-common_${scala.binary.version} + ${flink.version} + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + provided + + + org.apache.flink + flink-clients_${scala.binary.version} + ${flink.version} + provided + + + + + org.apache.flink + flink-connector-kafka_${scala.binary.version} + ${flink.version} + +