diff --git a/dinky-admin/pom.xml b/dinky-admin/pom.xml index 8181ec7da1..b446bbd7c6 100644 --- a/dinky-admin/pom.xml +++ b/dinky-admin/pom.xml @@ -379,6 +379,25 @@ <groupId>org.dinky</groupId> <artifactId>dinky-alert-http</artifactId> </dependency> + <dependency> + <groupId>org.signal</groupId> + <artifactId>embedded-redis</artifactId> + <version>0.9.0</version> + </dependency> + <dependency> + <groupId>org.redisson</groupId> + <artifactId>redisson-spring-boot-starter</artifactId> + <exclusions> + <exclusion> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-actuator</artifactId> + </exclusion> + <exclusion> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-web</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> <build> diff --git a/dinky-admin/src/main/java/org/dinky/configure/RedisConfiguration.java b/dinky-admin/src/main/java/org/dinky/configure/RedisConfiguration.java new file mode 100644 index 0000000000..8c2a0fd7f3 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/configure/RedisConfiguration.java @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.configure; + +import org.dinky.configure.properties.DinkyRedisProperties; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.data.redis.RedisProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.Order; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; + +import redis.embedded.RedisServer; + +@Configuration +@EnableConfigurationProperties(DinkyRedisProperties.class) +public class RedisConfiguration { + @Bean + @ConditionalOnMissingBean + @ConditionalOnProperty(prefix = "spring.redis", name = "mode", havingValue = "embedded", matchIfMissing = true) + @Order(-1) + public RedisServer redisServer(RedisProperties redisProperties) { + RedisServer redisServer = new RedisServer(redisProperties.getPort()); + redisServer.start(); + return redisServer; + } + + @Bean + @Order(1) + public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { + RedisTemplate<String, Object> template = new RedisTemplate<>(); + // 设置Redis链接工厂对象 + template.setConnectionFactory(redisConnectionFactory); + + // 设置键的序列化方式为 String + template.setKeySerializer(new StringRedisSerializer()); + template.setHashKeySerializer(new StringRedisSerializer()); + + // 设置值的序列化方式为 JSON + template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); + template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer()); + + return template; + } +} diff --git a/dinky-admin/src/main/java/org/dinky/configure/properties/DinkyRedisProperties.java b/dinky-admin/src/main/java/org/dinky/configure/properties/DinkyRedisProperties.java new file mode 100644 index 0000000000..981e111541 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/configure/properties/DinkyRedisProperties.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.configure.properties; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +import lombok.Getter; +import lombok.Setter; + +@ConfigurationProperties(prefix = "spring.redis") +@Getter +@Setter +public class DinkyRedisProperties { + private RedisRunMode mode; + + public enum RedisRunMode { + Standalone, + Embedded + } + + public boolean isEmbedded() { + return RedisRunMode.Embedded.equals(mode); + } +} diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/JobMetricsHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/JobMetricsHandler.java index 2916c25fdd..b09851d3d2 100644 --- a/dinky-admin/src/main/java/org/dinky/job/handler/JobMetricsHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/JobMetricsHandler.java @@ -27,8 +27,8 @@ import org.dinky.utils.JsonUtils; import org.dinky.utils.TimeUtil; -import java.io.UnsupportedEncodingException; import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; @@ -89,13 +89,8 @@ private static void fetchFlinkMetrics(String v, Map<String, String> m, String[] } String metricsName = String.join(",", m.keySet()); - String urlParam = null; - try { - urlParam = String.format( - "/jobs/%s/vertices/%s/metrics?get=%s", jid, v, URLEncoder.encode(metricsName, "UTF-8")); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } + String urlParam = String.format( + "/jobs/%s/vertices/%s/metrics?get=%s", jid, v, URLEncoder.encode(metricsName, StandardCharsets.UTF_8)); HttpUtils.request(new ArrayList<>(Arrays.asList(urlList)), urlParam, NetConstant.READ_TIME_OUT, x -> { List<Dict> array = JsonUtils.toList(x.body(), Dict.class); diff --git a/dinky-admin/src/main/resources/application-redis.yml b/dinky-admin/src/main/resources/application-redis.yml new file mode 100644 index 0000000000..6320e2a203 --- /dev/null +++ b/dinky-admin/src/main/resources/application-redis.yml @@ -0,0 +1,43 @@ +spring: + redis: + host: 127.0.0.1 + port: 6379 + # redis operating mode,standalone\embedded + mode: embedded + database: 8 + redisson: + # Single-node configuration + singleServerConfig: + # Connection idle timeout, in milliseconds + idleConnectionTimeout: 10000 + # Connection timeout in milliseconds + connectTimeout: 10000 + # Command wait timeout, in milliseconds + timeout: 3000 + # Command Failed Retries (an error is thrown if the attempt reaches retryAttempts and the command still fails to send the command to a specified node). + # If the attempt to send within this limit is successful, timeout timing is enabled. + retryAttempts: 3 + # The command retry send interval in milliseconds + retryInterval: 1500 + # password + password: ${spring.redis.password} + # clientName + clientName: dinky-redis + # The maximum number of subscriptions for a single connection + subscriptionsPerConnection: 5 + # Node address + address: redis://${spring.redis.host}:${spring.redis.port} + # The minimum number of idle connections for publish and subscribe connections + subscriptionConnectionMinimumIdleSize: 1 + # Pub and sub connection pool size + subscriptionConnectionPoolSize: 50 + # Minimum number of idle connections + connectionMinimumIdleSize: 5 + # Connection pool size + connectionPoolSize: 10 + # Database number + database: ${spring.redis.database} + # DNS monitoring interval, in milliseconds + dnsMonitoringInterval: 5000 + # Transmission mode + transportMode: "NIO" diff --git a/dinky-admin/src/main/resources/application.yml b/dinky-admin/src/main/resources/application.yml index 0855ab1b26..404cbff885 100644 --- a/dinky-admin/src/main/resources/application.yml +++ b/dinky-admin/src/main/resources/application.yml @@ -20,6 +20,7 @@ spring: include: - jmx - flyway + - redis lifecycle: timeout-per-shutdown-phase: 30s diff --git a/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonTask.java b/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonTask.java index db56f8cc2b..a8bcfd743c 100644 --- a/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonTask.java +++ b/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonTask.java @@ -41,10 +41,8 @@ static Optional<DaemonTask> get(DaemonTaskConfig config) { static DaemonTask build(DaemonTaskConfig config) { Optional<DaemonTask> optionalDaemonTask = DaemonTask.get(config); - if (!optionalDaemonTask.isPresent()) { - throw new DaemonTaskException(Status.DAEMON_TASK_NOT_SUPPORT.getMessage() + config.getType()); - } - return optionalDaemonTask.get(); + return optionalDaemonTask.orElseThrow( + () -> new DaemonTaskException(Status.DAEMON_TASK_NOT_SUPPORT.getMessage() + config.getType())); } DaemonTask setConfig(DaemonTaskConfig config); diff --git a/pom.xml b/pom.xml index cd73757a55..7bcbf68800 100644 --- a/pom.xml +++ b/pom.xml @@ -109,6 +109,7 @@ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <protobuf-java.version>2.5.0</protobuf-java.version> + <redisson.version>3.39.0</redisson.version> <reflections.version>0.10.2</reflections.version> <revision>1.2.0-rc4</revision> <sa-token.version>1.37.0</sa-token.version> @@ -734,6 +735,11 @@ <artifactId>auto-service</artifactId> <version>1.1.1</version> </dependency> + <dependency> + <groupId>org.redisson</groupId> + <artifactId>redisson-spring-boot-starter</artifactId> + <version>${redisson.version}</version> + </dependency> </dependencies> </dependencyManagement>