From 1336275a318e56a5df4c5f87fe310f45df941a4e Mon Sep 17 00:00:00 2001
From: monster <60029759+MonsterChenzhuo@users.noreply.github.com>
Date: Tue, 16 Jan 2024 21:28:18 +0800
Subject: [PATCH] [core] Support tag date formatter (#2566)
---
.../generated/core_configuration.html | 6 +++
.../java/org/apache/paimon/CoreOptions.java | 34 ++++++++++++
.../apache/paimon/tag/TagPeriodHandler.java | 52 +++++++++++++++++--
.../org/apache/paimon/utils/TagManager.java | 4 --
.../paimon/table/FileStoreTableTestBase.java | 6 ---
.../paimon/tag/TagAutoCreationTest.java | 36 +++++++++++++
6 files changed, 124 insertions(+), 14 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 9466817969a5..35e23e34c9e0 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -605,6 +605,12 @@
Enum |
Whether to create tag automatically. And how to generate tags.
Possible values:- "none": No automatically created tags.
- "process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.
- "watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.
- "batch": In the batch processing scenario, the tag corresponding to the current snapshot is generated after the task is completed.
|
+
+ tag.period-formatter |
+ with_dashes |
+ Enum |
+ The date format for tag periods.
Possible values:- "with_dashes": Dates and hours with dashes, e.g., 'yyyy-MM-dd HH'
- "without_dashes": Dates and hours without dashes, e.g., 'yyyyMMdd HH'
|
+
tag.callback.#.param |
(none) |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 8337db566b1b..5d9e00a57f8d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -951,6 +951,12 @@ public class CoreOptions implements Serializable {
"How long is the delay after the period ends before creating a tag."
+ " This can allow some late data to enter the Tag.");
+ public static final ConfigOption TAG_PERIOD_FORMATTER =
+ key("tag.period-formatter")
+ .enumType(TagPeriodFormatter.class)
+ .defaultValue(TagPeriodFormatter.WITH_DASHES)
+ .withDescription("The date format for tag periods.");
+
public static final ConfigOption TAG_NUM_RETAINED_MAX =
key("tag.num-retained-max")
.intType()
@@ -1489,6 +1495,10 @@ public Duration tagCreationDelay() {
return options.get(TAG_CREATION_DELAY);
}
+ public TagPeriodFormatter tagPeriodFormatter() {
+ return options.get(TAG_PERIOD_FORMATTER);
+ }
+
public Integer tagNumRetainedMax() {
return options.get(TAG_NUM_RETAINED_MAX);
}
@@ -2067,6 +2077,30 @@ public InlineElement getDescription() {
}
}
+ /** The period format options for tag creation. */
+ public enum TagPeriodFormatter implements DescribedEnum {
+ WITH_DASHES("with_dashes", "Dates and hours with dashes, e.g., 'yyyy-MM-dd HH'"),
+ WITHOUT_DASHES("without_dashes", "Dates and hours without dashes, e.g., 'yyyyMMdd HH'");
+
+ private final String value;
+ private final String description;
+
+ TagPeriodFormatter(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
+
/** The period for tag creation. */
public enum TagCreationPeriod implements DescribedEnum {
DAILY("daily", "Generate a tag every day."),
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
index 106b0d974ab1..fa18eebea8d5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
@@ -50,6 +50,16 @@ public interface TagPeriodHandler {
.toFormatter()
.withResolverStyle(ResolverStyle.LENIENT);
+ DateTimeFormatter HOUR_FORMATTER_WITHOUT_DASHES =
+ new DateTimeFormatterBuilder()
+ .appendValue(YEAR, 1, 10, SignStyle.NORMAL)
+ .appendValue(MONTH_OF_YEAR, 2, 2, SignStyle.NORMAL)
+ .appendValue(DAY_OF_MONTH, 2, 2, SignStyle.NORMAL)
+ .appendLiteral(" ")
+ .appendValue(HOUR_OF_DAY, 2, 2, SignStyle.NORMAL)
+ .toFormatter()
+ .withResolverStyle(ResolverStyle.LENIENT);
+
DateTimeFormatter DAY_FORMATTER =
new DateTimeFormatterBuilder()
.appendValue(YEAR, 1, 10, SignStyle.NORMAL)
@@ -60,6 +70,14 @@ public interface TagPeriodHandler {
.toFormatter()
.withResolverStyle(ResolverStyle.LENIENT);
+ DateTimeFormatter DAY_FORMATTER_WITHOUT_DASHES =
+ new DateTimeFormatterBuilder()
+ .appendValue(YEAR, 1, 10, SignStyle.NORMAL)
+ .appendValue(MONTH_OF_YEAR, 2, 2, SignStyle.NORMAL)
+ .appendValue(DAY_OF_MONTH, 2, 2, SignStyle.NORMAL)
+ .toFormatter()
+ .withResolverStyle(ResolverStyle.LENIENT);
+
void validateDelay(Duration delay);
LocalDateTime tagToTime(String tag);
@@ -111,6 +129,12 @@ public LocalDateTime nextTagTime(LocalDateTime time) {
/** Hourly {@link TagPeriodHandler}. */
class HourlyTagPeriodHandler extends BaseTagPeriodHandler {
+ CoreOptions.TagPeriodFormatter formatter;
+
+ public HourlyTagPeriodHandler(CoreOptions.TagPeriodFormatter formatter) {
+ this.formatter = formatter;
+ }
+
static final Duration ONE_PERIOD = Duration.ofHours(1);
@Override
@@ -120,13 +144,26 @@ protected Duration onePeriod() {
@Override
protected DateTimeFormatter formatter() {
- return HOUR_FORMATTER;
+ switch (formatter) {
+ case WITH_DASHES:
+ return HOUR_FORMATTER;
+ case WITHOUT_DASHES:
+ return HOUR_FORMATTER_WITHOUT_DASHES;
+ default:
+ throw new IllegalArgumentException("Unsupported date format type");
+ }
}
}
/** Daily {@link TagPeriodHandler}. */
class DailyTagPeriodHandler extends BaseTagPeriodHandler {
+ CoreOptions.TagPeriodFormatter formatter;
+
+ public DailyTagPeriodHandler(CoreOptions.TagPeriodFormatter formatter) {
+ this.formatter = formatter;
+ }
+
static final Duration ONE_PERIOD = Duration.ofDays(1);
@Override
@@ -136,7 +173,14 @@ protected Duration onePeriod() {
@Override
protected DateTimeFormatter formatter() {
- return DAY_FORMATTER;
+ switch (formatter) {
+ case WITH_DASHES:
+ return DAY_FORMATTER;
+ case WITHOUT_DASHES:
+ return DAY_FORMATTER_WITHOUT_DASHES;
+ default:
+ throw new IllegalArgumentException("Unsupported date format type");
+ }
}
@Override
@@ -165,9 +209,9 @@ protected DateTimeFormatter formatter() {
static TagPeriodHandler create(CoreOptions options) {
switch (options.tagCreationPeriod()) {
case DAILY:
- return new DailyTagPeriodHandler();
+ return new DailyTagPeriodHandler(options.tagPeriodFormatter());
case HOURLY:
- return new HourlyTagPeriodHandler();
+ return new HourlyTagPeriodHandler(options.tagPeriodFormatter());
case TWO_HOURS:
return new TwoHoursTagPeriodHandler();
default:
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index a83e201d5042..f011328b49c1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -70,10 +70,6 @@ public Path tagPath(String tagName) {
public void createTag(Snapshot snapshot, String tagName, List callbacks) {
checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName);
checkArgument(!tagExists(tagName), "Tag name '%s' already exists.", tagName);
- checkArgument(
- !tagName.chars().allMatch(Character::isDigit),
- "Tag name cannot be pure numeric string but is '%s'.",
- tagName);
Path newTagPath = tagPath(tagName);
try {
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index abb07083af74..f8e864e8ce26 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -927,12 +927,6 @@ public void testUnsupportedTagName() throws Exception {
AssertionUtils.anyCauseMatches(
IllegalArgumentException.class,
String.format("Tag name '%s' is blank", "")));
-
- assertThatThrownBy(() -> table.createTag("10", 1))
- .satisfies(
- AssertionUtils.anyCauseMatches(
- IllegalArgumentException.class,
- "Tag name cannot be pure numeric string but is '10'."));
}
@Test
diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
index 3b753093b857..9b0233ef4c4b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
@@ -20,6 +20,7 @@
import org.apache.paimon.CoreOptions.TagCreationMode;
import org.apache.paimon.CoreOptions.TagCreationPeriod;
+import org.apache.paimon.CoreOptions.TagPeriodFormatter;
import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
@@ -40,6 +41,7 @@
import static org.apache.paimon.CoreOptions.TAG_CREATION_DELAY;
import static org.apache.paimon.CoreOptions.TAG_CREATION_PERIOD;
import static org.apache.paimon.CoreOptions.TAG_NUM_RETAINED_MAX;
+import static org.apache.paimon.CoreOptions.TAG_PERIOD_FORMATTER;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for tag automatic creation. */
@@ -246,6 +248,40 @@ public void testSavepointTag() {
.containsOnly("savepoint-11", "2023-07-18 13", "2023-07-18 14", "2023-07-18 15");
}
+ @Test
+ public void testTagDatePeriodFormatter() {
+ Options options = new Options();
+ options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
+ options.set(TAG_CREATION_PERIOD, TagCreationPeriod.DAILY);
+ options.set(TAG_PERIOD_FORMATTER, TagPeriodFormatter.WITHOUT_DASHES);
+ FileStoreTable table = this.table.copy(options.toMap());
+ TableCommitImpl commit = table.newCommit(commitUser).ignoreEmptyCommit(false);
+ TagManager tagManager = table.store().newTagManager();
+
+ commit.commit(new ManifestCommittable(0, utcMills("2023-07-18T12:12:00")));
+ assertThat(tagManager.tags().values()).containsOnly("20230717");
+
+ commit.commit(new ManifestCommittable(1, utcMills("2023-07-19T12:12:00")));
+ assertThat(tagManager.tags().values()).contains("20230717", "20230718");
+ }
+
+ @Test
+ public void testTagHourlyPeriodFormatter() {
+ Options options = new Options();
+ options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
+ options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY);
+ options.set(TAG_PERIOD_FORMATTER, TagPeriodFormatter.WITHOUT_DASHES);
+ FileStoreTable table = this.table.copy(options.toMap());
+ TableCommitImpl commit = table.newCommit(commitUser).ignoreEmptyCommit(false);
+ TagManager tagManager = table.store().newTagManager();
+
+ commit.commit(new ManifestCommittable(0, utcMills("2023-07-18T12:12:00")));
+ assertThat(tagManager.tags().values()).containsOnly("20230718 11");
+
+ commit.commit(new ManifestCommittable(1, utcMills("2023-07-18T13:13:00")));
+ assertThat(tagManager.tags().values()).contains("20230718 11", "20230718 12");
+ }
+
private long localZoneMills(String timestamp) {
return LocalDateTime.parse(timestamp)
.atZone(ZoneId.systemDefault())