From cfc94eadf983efc3a9826cbabf9938d3c3ef917e Mon Sep 17 00:00:00 2001 From: fengli Date: Mon, 27 May 2024 20:54:39 +0800 Subject: [PATCH] [FLINK-35425][table-common] Support convert fresshness to cron expression in ful refresh mode --- .../table/utils/IntervalFreshnessUtils.java | 77 +++++++++++ .../utils/IntervalFreshnessUtilsTest.java | 122 +++++++++++++++++- 2 files changed, 198 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/IntervalFreshnessUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/IntervalFreshnessUtils.java index 5260a7b18b812c..17867d6837932d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/IntervalFreshnessUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/IntervalFreshnessUtils.java @@ -31,6 +31,15 @@ @Internal public class IntervalFreshnessUtils { + private static final String SECOND_CRON_EXPRESSION_TEMPLATE = "0/%s * * * * ? *"; + private static final String MINUTE_CRON_EXPRESSION_TEMPLATE = "0 0/%s * * * ? *"; + private static final String HOUR_CRON_EXPRESSION_TEMPLATE = "0 0 0/%s * * ? *"; + private static final String ONE_DAY_CRON_EXPRESSION_TEMPLATE = "0 0 0 * * ? *"; + + private static final long SECOND_CRON_UPPER_BOUND = 60; + private static final long MINUTE_CRON_UPPER_BOUND = 60; + private static final long HOUR_CRON_UPPER_BOUND = 24; + private IntervalFreshnessUtils() {} @VisibleForTesting @@ -69,4 +78,72 @@ public static Duration convertFreshnessToDuration(IntervalFreshness intervalFres intervalFreshness.getTimeUnit())); } } + + /** + * This is an util method that is used to convert the freshness of materialized table to cron + * expression in full refresh mode. Since freshness and Cron expressions cannot be converted + * equivalently, there are currently only a limited number of freshness that can be converted to + * cron expression. + */ + public static String convertFreshnessToCron(IntervalFreshness intervalFreshness) { + // validate the freshness value firstly + validateIntervalFreshness(intervalFreshness); + + switch (intervalFreshness.getTimeUnit()) { + case SECOND: + return validateAndConvertCron( + intervalFreshness, + SECOND_CRON_UPPER_BOUND, + SECOND_CRON_EXPRESSION_TEMPLATE); + case MINUTE: + return validateAndConvertCron( + intervalFreshness, + MINUTE_CRON_UPPER_BOUND, + MINUTE_CRON_EXPRESSION_TEMPLATE); + case HOUR: + return validateAndConvertCron( + intervalFreshness, HOUR_CRON_UPPER_BOUND, HOUR_CRON_EXPRESSION_TEMPLATE); + case DAY: + return validateAndConvertDayCron(intervalFreshness); + default: + throw new ValidationException( + String.format( + "Unknown freshness time unit: %s.", + intervalFreshness.getTimeUnit())); + } + } + + private static String validateAndConvertCron( + IntervalFreshness intervalFreshness, long cronUpperBound, String cronTemplate) { + long interval = Long.parseLong(intervalFreshness.getInterval()); + IntervalFreshness.TimeUnit timeUnit = intervalFreshness.getTimeUnit(); + // Freshness must be less than cronUpperBound for corresponding time unit when convert it + // to cron expression + if (interval >= cronUpperBound) { + throw new ValidationException( + String.format( + "In full refresh mode, freshness must be less than %s when the time unit is %s.", + cronUpperBound, timeUnit)); + } + // Freshness must be factors of cronUpperBound for corresponding time unit + if (cronUpperBound % interval != 0) { + throw new ValidationException( + String.format( + "In full refresh mode, only freshness that are factors of %s are currently supported when the time unit is %s.", + cronUpperBound, timeUnit)); + } + + return String.format(cronTemplate, interval); + } + + private static String validateAndConvertDayCron(IntervalFreshness intervalFreshness) { + // Since the number of days in each month is different, only one day of freshness is + // currently supported when the time unit is DAY + long interval = Long.parseLong(intervalFreshness.getInterval()); + if (interval > 1) { + throw new ValidationException( + "In full refresh mode, freshness must be 1 when the time unit is DAY."); + } + return ONE_DAY_CRON_EXPRESSION_TEMPLATE; + } } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/IntervalFreshnessUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/IntervalFreshnessUtilsTest.java index 841d0fff8a4c21..26e0ddce56f58c 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/IntervalFreshnessUtilsTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/IntervalFreshnessUtilsTest.java @@ -25,6 +25,7 @@ import java.time.Duration; +import static org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToCron; import static org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToDuration; import static org.apache.flink.table.utils.IntervalFreshnessUtils.validateIntervalFreshness; import static org.assertj.core.api.Assertions.assertThat; @@ -55,7 +56,7 @@ void testIllegalIntervalFreshness() { } @Test - void testConvertFreshness() { + void testConvertFreshnessToDuration() { // verify second Duration actualSecond = convertFreshnessToDuration( @@ -80,4 +81,123 @@ void testConvertFreshness() { new IntervalFreshness("3", IntervalFreshness.TimeUnit.DAY)); assertThat(actualDay).isEqualTo(Duration.ofDays(3)); } + + @Test + void testConvertSecondFreshnessToCronExpression() { + // verify illegal freshness + assertThatThrownBy( + () -> + convertFreshnessToCron( + new IntervalFreshness( + "90", IntervalFreshness.TimeUnit.SECOND))) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "In full refresh mode, freshness must be less than 60 when the time unit is SECOND."); + + assertThatThrownBy( + () -> + convertFreshnessToCron( + new IntervalFreshness( + "32", IntervalFreshness.TimeUnit.SECOND))) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "In full refresh mode, only freshness that are factors of 60 are currently supported when the time unit is SECOND."); + + String actual1 = + convertFreshnessToCron( + new IntervalFreshness("30", IntervalFreshness.TimeUnit.SECOND)); + assertThat(actual1).isEqualTo("0/30 * * * * ? *"); + + String actual2 = + convertFreshnessToCron( + new IntervalFreshness("5", IntervalFreshness.TimeUnit.SECOND)); + assertThat(actual2).isEqualTo("0/5 * * * * ? *"); + } + + @Test + void testConvertMinuteFreshnessToCronExpression() { + // verify illegal freshness + assertThatThrownBy( + () -> + convertFreshnessToCron( + new IntervalFreshness( + "90", IntervalFreshness.TimeUnit.MINUTE))) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "In full refresh mode, freshness must be less than 60 when the time unit is MINUTE."); + + assertThatThrownBy( + () -> + convertFreshnessToCron( + new IntervalFreshness( + "32", IntervalFreshness.TimeUnit.MINUTE))) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "In full refresh mode, only freshness that are factors of 60 are currently supported when the time unit is MINUTE."); + + String actual1 = + convertFreshnessToCron( + new IntervalFreshness("30", IntervalFreshness.TimeUnit.MINUTE)); + assertThat(actual1).isEqualTo("0 0/30 * * * ? *"); + + String actual2 = + convertFreshnessToCron( + new IntervalFreshness("5", IntervalFreshness.TimeUnit.MINUTE)); + assertThat(actual2).isEqualTo("0 0/5 * * * ? *"); + + String actual3 = + convertFreshnessToCron( + new IntervalFreshness("1", IntervalFreshness.TimeUnit.MINUTE)); + assertThat(actual3).isEqualTo("0 0/1 * * * ? *"); + } + + @Test + void testConvertHourFreshnessToCronExpression() { + // verify illegal freshness + assertThatThrownBy( + () -> + convertFreshnessToCron( + new IntervalFreshness( + "24", IntervalFreshness.TimeUnit.HOUR))) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "In full refresh mode, freshness must be less than 24 when the time unit is HOUR."); + + assertThatThrownBy( + () -> + convertFreshnessToCron( + new IntervalFreshness( + "14", IntervalFreshness.TimeUnit.HOUR))) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "In full refresh mode, only freshness that are factors of 24 are currently supported when the time unit is HOUR."); + + String actual1 = + convertFreshnessToCron( + new IntervalFreshness("12", IntervalFreshness.TimeUnit.HOUR)); + assertThat(actual1).isEqualTo("0 0 0/12 * * ? *"); + + String actual2 = + convertFreshnessToCron(new IntervalFreshness("4", IntervalFreshness.TimeUnit.HOUR)); + assertThat(actual2).isEqualTo("0 0 0/4 * * ? *"); + + String actual3 = + convertFreshnessToCron(new IntervalFreshness("1", IntervalFreshness.TimeUnit.HOUR)); + assertThat(actual3).isEqualTo("0 0 0/1 * * ? *"); + } + + @Test + void testConvertDayFreshnessToCronExpression() { + // verify illegal freshness + assertThatThrownBy( + () -> + convertFreshnessToCron( + new IntervalFreshness("2", IntervalFreshness.TimeUnit.DAY))) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "In full refresh mode, freshness must be 1 when the time unit is DAY."); + String actual1 = + convertFreshnessToCron(new IntervalFreshness("1", IntervalFreshness.TimeUnit.DAY)); + assertThat(actual1).isEqualTo("0 0 0 * * ? *"); + } }