Skip to content

Commit

Permalink
[FLINK-35425][table-common] Support convert fresshness to cron expres…
Browse files Browse the repository at this point in the history
…sion in ful refresh mode
  • Loading branch information
lsyldliu committed May 27, 2024
1 parent 0d5d306 commit cfc94ea
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@
@Internal
public class IntervalFreshnessUtils {

private static final String SECOND_CRON_EXPRESSION_TEMPLATE = "0/%s * * * * ? *";
private static final String MINUTE_CRON_EXPRESSION_TEMPLATE = "0 0/%s * * * ? *";
private static final String HOUR_CRON_EXPRESSION_TEMPLATE = "0 0 0/%s * * ? *";
private static final String ONE_DAY_CRON_EXPRESSION_TEMPLATE = "0 0 0 * * ? *";

private static final long SECOND_CRON_UPPER_BOUND = 60;
private static final long MINUTE_CRON_UPPER_BOUND = 60;
private static final long HOUR_CRON_UPPER_BOUND = 24;

private IntervalFreshnessUtils() {}

@VisibleForTesting
Expand Down Expand Up @@ -69,4 +78,72 @@ public static Duration convertFreshnessToDuration(IntervalFreshness intervalFres
intervalFreshness.getTimeUnit()));
}
}

/**
* This is an util method that is used to convert the freshness of materialized table to cron
* expression in full refresh mode. Since freshness and Cron expressions cannot be converted
* equivalently, there are currently only a limited number of freshness that can be converted to
* cron expression.
*/
public static String convertFreshnessToCron(IntervalFreshness intervalFreshness) {
// validate the freshness value firstly
validateIntervalFreshness(intervalFreshness);

switch (intervalFreshness.getTimeUnit()) {
case SECOND:
return validateAndConvertCron(
intervalFreshness,
SECOND_CRON_UPPER_BOUND,
SECOND_CRON_EXPRESSION_TEMPLATE);
case MINUTE:
return validateAndConvertCron(
intervalFreshness,
MINUTE_CRON_UPPER_BOUND,
MINUTE_CRON_EXPRESSION_TEMPLATE);
case HOUR:
return validateAndConvertCron(
intervalFreshness, HOUR_CRON_UPPER_BOUND, HOUR_CRON_EXPRESSION_TEMPLATE);
case DAY:
return validateAndConvertDayCron(intervalFreshness);
default:
throw new ValidationException(
String.format(
"Unknown freshness time unit: %s.",
intervalFreshness.getTimeUnit()));
}
}

private static String validateAndConvertCron(
IntervalFreshness intervalFreshness, long cronUpperBound, String cronTemplate) {
long interval = Long.parseLong(intervalFreshness.getInterval());
IntervalFreshness.TimeUnit timeUnit = intervalFreshness.getTimeUnit();
// Freshness must be less than cronUpperBound for corresponding time unit when convert it
// to cron expression
if (interval >= cronUpperBound) {
throw new ValidationException(
String.format(
"In full refresh mode, freshness must be less than %s when the time unit is %s.",
cronUpperBound, timeUnit));
}
// Freshness must be factors of cronUpperBound for corresponding time unit
if (cronUpperBound % interval != 0) {
throw new ValidationException(
String.format(
"In full refresh mode, only freshness that are factors of %s are currently supported when the time unit is %s.",
cronUpperBound, timeUnit));
}

return String.format(cronTemplate, interval);
}

private static String validateAndConvertDayCron(IntervalFreshness intervalFreshness) {
// Since the number of days in each month is different, only one day of freshness is
// currently supported when the time unit is DAY
long interval = Long.parseLong(intervalFreshness.getInterval());
if (interval > 1) {
throw new ValidationException(
"In full refresh mode, freshness must be 1 when the time unit is DAY.");
}
return ONE_DAY_CRON_EXPRESSION_TEMPLATE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.time.Duration;

import static org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToCron;
import static org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToDuration;
import static org.apache.flink.table.utils.IntervalFreshnessUtils.validateIntervalFreshness;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -55,7 +56,7 @@ void testIllegalIntervalFreshness() {
}

@Test
void testConvertFreshness() {
void testConvertFreshnessToDuration() {
// verify second
Duration actualSecond =
convertFreshnessToDuration(
Expand All @@ -80,4 +81,123 @@ void testConvertFreshness() {
new IntervalFreshness("3", IntervalFreshness.TimeUnit.DAY));
assertThat(actualDay).isEqualTo(Duration.ofDays(3));
}

@Test
void testConvertSecondFreshnessToCronExpression() {
// verify illegal freshness
assertThatThrownBy(
() ->
convertFreshnessToCron(
new IntervalFreshness(
"90", IntervalFreshness.TimeUnit.SECOND)))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"In full refresh mode, freshness must be less than 60 when the time unit is SECOND.");

assertThatThrownBy(
() ->
convertFreshnessToCron(
new IntervalFreshness(
"32", IntervalFreshness.TimeUnit.SECOND)))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"In full refresh mode, only freshness that are factors of 60 are currently supported when the time unit is SECOND.");

String actual1 =
convertFreshnessToCron(
new IntervalFreshness("30", IntervalFreshness.TimeUnit.SECOND));
assertThat(actual1).isEqualTo("0/30 * * * * ? *");

String actual2 =
convertFreshnessToCron(
new IntervalFreshness("5", IntervalFreshness.TimeUnit.SECOND));
assertThat(actual2).isEqualTo("0/5 * * * * ? *");
}

@Test
void testConvertMinuteFreshnessToCronExpression() {
// verify illegal freshness
assertThatThrownBy(
() ->
convertFreshnessToCron(
new IntervalFreshness(
"90", IntervalFreshness.TimeUnit.MINUTE)))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"In full refresh mode, freshness must be less than 60 when the time unit is MINUTE.");

assertThatThrownBy(
() ->
convertFreshnessToCron(
new IntervalFreshness(
"32", IntervalFreshness.TimeUnit.MINUTE)))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"In full refresh mode, only freshness that are factors of 60 are currently supported when the time unit is MINUTE.");

String actual1 =
convertFreshnessToCron(
new IntervalFreshness("30", IntervalFreshness.TimeUnit.MINUTE));
assertThat(actual1).isEqualTo("0 0/30 * * * ? *");

String actual2 =
convertFreshnessToCron(
new IntervalFreshness("5", IntervalFreshness.TimeUnit.MINUTE));
assertThat(actual2).isEqualTo("0 0/5 * * * ? *");

String actual3 =
convertFreshnessToCron(
new IntervalFreshness("1", IntervalFreshness.TimeUnit.MINUTE));
assertThat(actual3).isEqualTo("0 0/1 * * * ? *");
}

@Test
void testConvertHourFreshnessToCronExpression() {
// verify illegal freshness
assertThatThrownBy(
() ->
convertFreshnessToCron(
new IntervalFreshness(
"24", IntervalFreshness.TimeUnit.HOUR)))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"In full refresh mode, freshness must be less than 24 when the time unit is HOUR.");

assertThatThrownBy(
() ->
convertFreshnessToCron(
new IntervalFreshness(
"14", IntervalFreshness.TimeUnit.HOUR)))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"In full refresh mode, only freshness that are factors of 24 are currently supported when the time unit is HOUR.");

String actual1 =
convertFreshnessToCron(
new IntervalFreshness("12", IntervalFreshness.TimeUnit.HOUR));
assertThat(actual1).isEqualTo("0 0 0/12 * * ? *");

String actual2 =
convertFreshnessToCron(new IntervalFreshness("4", IntervalFreshness.TimeUnit.HOUR));
assertThat(actual2).isEqualTo("0 0 0/4 * * ? *");

String actual3 =
convertFreshnessToCron(new IntervalFreshness("1", IntervalFreshness.TimeUnit.HOUR));
assertThat(actual3).isEqualTo("0 0 0/1 * * ? *");
}

@Test
void testConvertDayFreshnessToCronExpression() {
// verify illegal freshness
assertThatThrownBy(
() ->
convertFreshnessToCron(
new IntervalFreshness("2", IntervalFreshness.TimeUnit.DAY)))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"In full refresh mode, freshness must be 1 when the time unit is DAY.");
String actual1 =
convertFreshnessToCron(new IntervalFreshness("1", IntervalFreshness.TimeUnit.DAY));
assertThat(actual1).isEqualTo("0 0 0 * * ? *");
}
}

0 comments on commit cfc94ea

Please sign in to comment.