From 4587c97511b7f5ec740c4abddab5f9698b8fc4bb Mon Sep 17 00:00:00 2001 From: "sky.huang" Date: Wed, 14 Jun 2023 11:46:33 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8D=87=E7=BA=A7=20Flink=20=E7=89=88=E6=9C=AC?= =?UTF-8?q?=E4=B8=BA=201.17.1=20&=20=E8=A7=A3=E5=86=B3=20Kafka=20=E4=BE=9D?= =?UTF-8?q?=E8=B5=96=E5=86=B2=E7=AA=81=E9=97=AE=E9=A2=98=20&=20Redis=20?= =?UTF-8?q?=E6=94=AF=E6=8C=81=20=E5=93=A8=E5=85=B5=E6=A8=A1=E5=BC=8F?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- austin-data-house/pom.xml | 4 ++-- austin-stream/pom.xml | 11 +++++----- .../stream/constants/AustinFlinkConstant.java | 13 +++++++++++- .../stream/utils/LettuceRedisUtils.java | 17 ++++++++++++---- pom.xml | 20 ++++++++++++------- 5 files changed, 46 insertions(+), 19 deletions(-) diff --git a/austin-data-house/pom.xml b/austin-data-house/pom.xml index 30b5381..dba1fab 100644 --- a/austin-data-house/pom.xml +++ b/austin-data-house/pom.xml @@ -23,13 +23,13 @@ org.apache.flink flink-connector-hive_2.12 - 1.16.0 + ${flink.version} provided org.apache.flink flink-table-api-java-bridge - 1.16.0 + ${flink.version} provided diff --git a/austin-stream/pom.xml b/austin-stream/pom.xml index 3bc7137..f995bf6 100644 --- a/austin-stream/pom.xml +++ b/austin-stream/pom.xml @@ -15,26 +15,27 @@ org.apache.flink - flink-walkthrough-common_${scala.binary.version} + flink-walkthrough-common ${flink.version} org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java ${flink.version} provided org.apache.flink - flink-clients_${scala.binary.version} + flink-clients ${flink.version} provided + org.apache.flink - flink-connector-kafka_${scala.binary.version} + flink-connector-kafka ${flink.version} @@ -85,7 +86,7 @@ META-INF/spring.schemas + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> com.java3y.austin.stream.AustinBootStrap diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java b/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java index 7bf2262..d02afae 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java @@ -22,10 +22,21 @@ public class AustinFlinkConstant { * TODO 使用前配置redis ip:port * (真实网络ip,这里不能用配置的hosts,看语雀文档得到真实ip) */ + + public static final String REDIS_MODE_SENTINEL = "SENTINEL"; + + public static final String REDIS_MODE_SINGLE = "SINGLE"; + + public static final String REDIS_MODE = REDIS_MODE_SENTINEL; + public static final String REDIS_IP = "austin-redis"; - public static final String REDIS_PORT = "6379"; + + public static final Integer REDIS_PORT = 6379; + public static final String REDIS_PASSWORD = "austin"; + public static final String MASTER_ID = "mymaster"; + /** * Flink流程常量 diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/utils/LettuceRedisUtils.java b/austin-stream/src/main/java/com/java3y/austin/stream/utils/LettuceRedisUtils.java index 0c64cd2..7c9269a 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/utils/LettuceRedisUtils.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/utils/LettuceRedisUtils.java @@ -1,5 +1,6 @@ package com.java3y.austin.stream.utils; +import cn.hutool.core.util.StrUtil; import com.java3y.austin.stream.callback.RedisPipelineCallBack; import com.java3y.austin.stream.constants.AustinFlinkConstant; import io.lettuce.core.LettuceFutures; @@ -26,10 +27,18 @@ public class LettuceRedisUtils { private static RedisClient redisClient; static { - RedisURI redisUri = RedisURI.Builder.redis(AustinFlinkConstant.REDIS_IP) - .withPort(Integer.valueOf(AustinFlinkConstant.REDIS_PORT)) - .withPassword(AustinFlinkConstant.REDIS_PASSWORD.toCharArray()) - .build(); + RedisURI redisUri = null; + if (StrUtil.equals(AustinFlinkConstant.REDIS_MODE_SENTINEL, AustinFlinkConstant.REDIS_MODE)) { + redisUri = RedisURI.Builder.sentinel(AustinFlinkConstant.REDIS_IP, AustinFlinkConstant.MASTER_ID) + .withPassword(AustinFlinkConstant.REDIS_PASSWORD.toCharArray()) + .build(); + } else if (StrUtil.equals(AustinFlinkConstant.REDIS_MODE_SINGLE, AustinFlinkConstant.REDIS_MODE)) { + redisUri = RedisURI.Builder.redis(AustinFlinkConstant.REDIS_IP) + .withPort(Integer.valueOf(AustinFlinkConstant.REDIS_PORT)) + .withPassword(AustinFlinkConstant.REDIS_PASSWORD.toCharArray()) + .build(); + } + redisClient = RedisClient.create(redisUri); } diff --git a/pom.xml b/pom.xml index 7692650..80e2a69 100644 --- a/pom.xml +++ b/pom.xml @@ -32,9 +32,8 @@ 1.8 UTF-8 - 1.14.3 + 1.17.1 1.8 - 2.11 ${target.java.version} ${target.java.version} 2.17.1 @@ -54,7 +53,7 @@ cn.hutool hutool-all - 5.7.15 + 5.8.16 @@ -135,21 +134,28 @@ 2.3.0 + + + org.apache.kafka + kafka-clients + 3.2.3 + + org.apache.flink - flink-walkthrough-common_${scala.binary.version} + flink-walkthrough-common ${flink.version} org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java ${flink.version} provided org.apache.flink - flink-clients_${scala.binary.version} + flink-clients ${flink.version} provided @@ -157,7 +163,7 @@ org.apache.flink - flink-connector-kafka_${scala.binary.version} + flink-connector-kafka ${flink.version}