diff --git a/austin-stream/pom.xml b/austin-stream/pom.xml
new file mode 100644
index 0000000..d2ae418
--- /dev/null
+++ b/austin-stream/pom.xml
@@ -0,0 +1,100 @@
+
+
+
+ austin
+ com.java3y.austin
+ 0.0.1-SNAPSHOT
+
+ 4.0.0
+
+ austin-stream
+
+
+
+
+ org.apache.flink
+ flink-walkthrough-common_${scala.binary.version}
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-clients_${scala.binary.version}
+ ${flink.version}
+ provided
+
+
+
+ org.apache.flink
+ flink-connector-kafka_${scala.binary.version}
+ ${flink.version}
+
+
+
+ org.projectlombok
+ lombok
+
+
+
+ com.java3y.austin
+ austin-common
+ 0.0.1-SNAPSHOT
+
+
+ com.alibaba
+ fastjson
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ package
+
+ shade
+
+
+
+
+ META-INF/spring.handlers
+
+
+ META-INF/spring.factories
+
+
+ META-INF/spring.schemas
+
+
+
+ com.java3y.austin.stream.AustinBootStrap
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/AustinBootStrap.java b/austin-stream/src/main/java/com/java3y/austin/stream/AustinBootStrap.java
new file mode 100644
index 0000000..31c8216
--- /dev/null
+++ b/austin-stream/src/main/java/com/java3y/austin/stream/AustinBootStrap.java
@@ -0,0 +1,44 @@
+package com.java3y.austin.stream;
+
+import com.java3y.austin.stream.utils.FlinkUtils;
+import com.java3y.austin.stream.utils.SpringContextUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+/**
+ * flink启动类
+ *
+ * @author 3y
+ */
+@Slf4j
+public class AustinBootStrap {
+
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ String topicName = "austinTopicV2";
+ String groupId = "austinTopicV23";
+
+ KafkaSource kafkaConsumer = SpringContextUtils.getBean(FlinkUtils.class)
+ .getKafkaConsumer(topicName, groupId);
+ DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafkaSource");
+ kafkaSource.addSink(new SinkFunction() {
+ @Override
+ public void invoke(String value, Context context) throws Exception {
+ log.error("kafka value:{}", value);
+ }
+ });
+// DataStream stream = envBatchPendingThread
+// .addSource(new AustinSource())
+// .name("transactions");
+//
+// stream.addSink(new AustinSink());
+
+ env.execute("AustinBootStrap");
+ }
+
+}
diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java b/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java
new file mode 100644
index 0000000..583db1a
--- /dev/null
+++ b/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java
@@ -0,0 +1,17 @@
+package com.java3y.austin.stream.sink;
+
+import com.alibaba.fastjson.JSON;
+import com.java3y.austin.common.domain.AnchorInfo;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+@Slf4j
+public class AustinSink extends RichSinkFunction {
+
+ @Override
+ public void invoke(AnchorInfo value, Context context) throws Exception {
+
+ log.error("sink consume value:{}", JSON.toJSONString(value));
+
+ }
+}
diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/source/AustinSource.java b/austin-stream/src/main/java/com/java3y/austin/stream/source/AustinSource.java
new file mode 100644
index 0000000..1427727
--- /dev/null
+++ b/austin-stream/src/main/java/com/java3y/austin/stream/source/AustinSource.java
@@ -0,0 +1,34 @@
+package com.java3y.austin.stream.source;
+
+import com.java3y.austin.common.domain.AnchorInfo;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 数据源 mock
+ *
+ * @author 3y
+ */
+public class AustinSource extends RichSourceFunction {
+ @Override
+ public void run(SourceContext sourceContext) throws Exception {
+ List anchorInfoList = new ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ anchorInfoList.add(AnchorInfo.builder()
+ .state(10).businessId(333L)
+ .timestamp(System.currentTimeMillis()).build());
+
+ }
+ for (AnchorInfo anchorInfo : anchorInfoList) {
+ sourceContext.collect(anchorInfo);
+ }
+ }
+
+ @Override
+ public void cancel() {
+
+ }
+}
diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/utils/FlinkUtils.java b/austin-stream/src/main/java/com/java3y/austin/stream/utils/FlinkUtils.java
new file mode 100644
index 0000000..a8dcdbf
--- /dev/null
+++ b/austin-stream/src/main/java/com/java3y/austin/stream/utils/FlinkUtils.java
@@ -0,0 +1,36 @@
+package com.java3y.austin.stream.utils;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * flink工具类
+ *
+ * @author 3y
+ */
+@Component
+public class FlinkUtils {
+
+ @Value("${spring.kafka.bootstrap-servers}")
+ private String broker;
+
+ /**
+ * 获取kafkaConsumer
+ * @param topicName
+ * @param groupId
+ * @return
+ */
+ public KafkaSource getKafkaConsumer(String topicName, String groupId) {
+ KafkaSource source = KafkaSource.builder()
+ .setBootstrapServers(broker)
+ .setTopics(topicName)
+ .setGroupId(groupId)
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setValueOnlyDeserializer(new SimpleStringSchema())
+ .build();
+ return source;
+ }
+}
diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/utils/SpringContextUtils.java b/austin-stream/src/main/java/com/java3y/austin/stream/utils/SpringContextUtils.java
new file mode 100644
index 0000000..fae8e4e
--- /dev/null
+++ b/austin-stream/src/main/java/com/java3y/austin/stream/utils/SpringContextUtils.java
@@ -0,0 +1,31 @@
+package com.java3y.austin.stream.utils;
+
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
+/**
+ * @author 3y
+ * @date 2022/2/15
+ * 获取SpringContext对象
+ */
+public class SpringContextUtils implements ApplicationContextAware {
+ private static ApplicationContext applicationContext;
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ applicationContext = applicationContext;
+ }
+
+
+ /**
+ * 根据bean的class来查找对象
+ *
+ * @param clazz
+ * @return
+ */
+ public static T getBean(Class clazz) {
+ return applicationContext.getBean(clazz);
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 7fed6ac..48ee4a4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,6 +12,7 @@
austin-support
austin-handler
austin-cron
+ austin-stream
@@ -29,6 +30,13 @@
1.8
+ UTF-8
+ 1.14.3
+ 1.8
+ 2.11
+ ${target.java.version}
+ ${target.java.version}
+ 2.17.1
@@ -118,6 +126,32 @@
2.3.0
+
+
+ org.apache.flink
+ flink-walkthrough-common_${scala.binary.version}
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-clients_${scala.binary.version}
+ ${flink.version}
+ provided
+
+
+
+
+ org.apache.flink
+ flink-connector-kafka_${scala.binary.version}
+ ${flink.version}
+
+