Skip to content

Commit

Permalink
[core] Support tag date formatter (apache#2566)
Browse files Browse the repository at this point in the history
  • Loading branch information
MonsterChenzhuo authored Jan 16, 2024
1 parent db0076f commit 1336275
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 14 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,12 @@
<td><p>Enum</p></td>
<td>Whether to create tag automatically. And how to generate tags.<br /><br />Possible values:<ul><li>"none": No automatically created tags.</li><li>"process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.</li><li>"watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.</li><li>"batch": In the batch processing scenario, the tag corresponding to the current snapshot is generated after the task is completed.</li></ul></td>
</tr>
<tr>
<td><h5>tag.period-formatter</h5></td>
<td style="word-wrap: break-word;">with_dashes</td>
<td><p>Enum</p></td>
<td>The date format for tag periods.<br /><br />Possible values:<ul><li>"with_dashes": Dates and hours with dashes, e.g., 'yyyy-MM-dd HH'</li><li>"without_dashes": Dates and hours without dashes, e.g., 'yyyyMMdd HH'</li></ul></td>
</tr>
<tr>
<td><h5>tag.callback.#.param</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
34 changes: 34 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,12 @@ public class CoreOptions implements Serializable {
"How long is the delay after the period ends before creating a tag."
+ " This can allow some late data to enter the Tag.");

public static final ConfigOption<TagPeriodFormatter> TAG_PERIOD_FORMATTER =
key("tag.period-formatter")
.enumType(TagPeriodFormatter.class)
.defaultValue(TagPeriodFormatter.WITH_DASHES)
.withDescription("The date format for tag periods.");

public static final ConfigOption<Integer> TAG_NUM_RETAINED_MAX =
key("tag.num-retained-max")
.intType()
Expand Down Expand Up @@ -1489,6 +1495,10 @@ public Duration tagCreationDelay() {
return options.get(TAG_CREATION_DELAY);
}

public TagPeriodFormatter tagPeriodFormatter() {
return options.get(TAG_PERIOD_FORMATTER);
}

public Integer tagNumRetainedMax() {
return options.get(TAG_NUM_RETAINED_MAX);
}
Expand Down Expand Up @@ -2067,6 +2077,30 @@ public InlineElement getDescription() {
}
}

/** The period format options for tag creation. */
public enum TagPeriodFormatter implements DescribedEnum {
WITH_DASHES("with_dashes", "Dates and hours with dashes, e.g., 'yyyy-MM-dd HH'"),
WITHOUT_DASHES("without_dashes", "Dates and hours without dashes, e.g., 'yyyyMMdd HH'");

private final String value;
private final String description;

TagPeriodFormatter(String value, String description) {
this.value = value;
this.description = description;
}

@Override
public String toString() {
return value;
}

@Override
public InlineElement getDescription() {
return text(description);
}
}

/** The period for tag creation. */
public enum TagCreationPeriod implements DescribedEnum {
DAILY("daily", "Generate a tag every day."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ public interface TagPeriodHandler {
.toFormatter()
.withResolverStyle(ResolverStyle.LENIENT);

DateTimeFormatter HOUR_FORMATTER_WITHOUT_DASHES =
new DateTimeFormatterBuilder()
.appendValue(YEAR, 1, 10, SignStyle.NORMAL)
.appendValue(MONTH_OF_YEAR, 2, 2, SignStyle.NORMAL)
.appendValue(DAY_OF_MONTH, 2, 2, SignStyle.NORMAL)
.appendLiteral(" ")
.appendValue(HOUR_OF_DAY, 2, 2, SignStyle.NORMAL)
.toFormatter()
.withResolverStyle(ResolverStyle.LENIENT);

DateTimeFormatter DAY_FORMATTER =
new DateTimeFormatterBuilder()
.appendValue(YEAR, 1, 10, SignStyle.NORMAL)
Expand All @@ -60,6 +70,14 @@ public interface TagPeriodHandler {
.toFormatter()
.withResolverStyle(ResolverStyle.LENIENT);

DateTimeFormatter DAY_FORMATTER_WITHOUT_DASHES =
new DateTimeFormatterBuilder()
.appendValue(YEAR, 1, 10, SignStyle.NORMAL)
.appendValue(MONTH_OF_YEAR, 2, 2, SignStyle.NORMAL)
.appendValue(DAY_OF_MONTH, 2, 2, SignStyle.NORMAL)
.toFormatter()
.withResolverStyle(ResolverStyle.LENIENT);

void validateDelay(Duration delay);

LocalDateTime tagToTime(String tag);
Expand Down Expand Up @@ -111,6 +129,12 @@ public LocalDateTime nextTagTime(LocalDateTime time) {
/** Hourly {@link TagPeriodHandler}. */
class HourlyTagPeriodHandler extends BaseTagPeriodHandler {

CoreOptions.TagPeriodFormatter formatter;

public HourlyTagPeriodHandler(CoreOptions.TagPeriodFormatter formatter) {
this.formatter = formatter;
}

static final Duration ONE_PERIOD = Duration.ofHours(1);

@Override
Expand All @@ -120,13 +144,26 @@ protected Duration onePeriod() {

@Override
protected DateTimeFormatter formatter() {
return HOUR_FORMATTER;
switch (formatter) {
case WITH_DASHES:
return HOUR_FORMATTER;
case WITHOUT_DASHES:
return HOUR_FORMATTER_WITHOUT_DASHES;
default:
throw new IllegalArgumentException("Unsupported date format type");
}
}
}

/** Daily {@link TagPeriodHandler}. */
class DailyTagPeriodHandler extends BaseTagPeriodHandler {

CoreOptions.TagPeriodFormatter formatter;

public DailyTagPeriodHandler(CoreOptions.TagPeriodFormatter formatter) {
this.formatter = formatter;
}

static final Duration ONE_PERIOD = Duration.ofDays(1);

@Override
Expand All @@ -136,7 +173,14 @@ protected Duration onePeriod() {

@Override
protected DateTimeFormatter formatter() {
return DAY_FORMATTER;
switch (formatter) {
case WITH_DASHES:
return DAY_FORMATTER;
case WITHOUT_DASHES:
return DAY_FORMATTER_WITHOUT_DASHES;
default:
throw new IllegalArgumentException("Unsupported date format type");
}
}

@Override
Expand Down Expand Up @@ -165,9 +209,9 @@ protected DateTimeFormatter formatter() {
static TagPeriodHandler create(CoreOptions options) {
switch (options.tagCreationPeriod()) {
case DAILY:
return new DailyTagPeriodHandler();
return new DailyTagPeriodHandler(options.tagPeriodFormatter());
case HOURLY:
return new HourlyTagPeriodHandler();
return new HourlyTagPeriodHandler(options.tagPeriodFormatter());
case TWO_HOURS:
return new TwoHoursTagPeriodHandler();
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ public Path tagPath(String tagName) {
public void createTag(Snapshot snapshot, String tagName, List<TagCallback> callbacks) {
checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName);
checkArgument(!tagExists(tagName), "Tag name '%s' already exists.", tagName);
checkArgument(
!tagName.chars().allMatch(Character::isDigit),
"Tag name cannot be pure numeric string but is '%s'.",
tagName);

Path newTagPath = tagPath(tagName);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -927,12 +927,6 @@ public void testUnsupportedTagName() throws Exception {
AssertionUtils.anyCauseMatches(
IllegalArgumentException.class,
String.format("Tag name '%s' is blank", "")));

assertThatThrownBy(() -> table.createTag("10", 1))
.satisfies(
AssertionUtils.anyCauseMatches(
IllegalArgumentException.class,
"Tag name cannot be pure numeric string but is '10'."));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.CoreOptions.TagCreationMode;
import org.apache.paimon.CoreOptions.TagCreationPeriod;
import org.apache.paimon.CoreOptions.TagPeriodFormatter;
import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
Expand All @@ -40,6 +41,7 @@
import static org.apache.paimon.CoreOptions.TAG_CREATION_DELAY;
import static org.apache.paimon.CoreOptions.TAG_CREATION_PERIOD;
import static org.apache.paimon.CoreOptions.TAG_NUM_RETAINED_MAX;
import static org.apache.paimon.CoreOptions.TAG_PERIOD_FORMATTER;
import static org.assertj.core.api.Assertions.assertThat;

/** Test for tag automatic creation. */
Expand Down Expand Up @@ -246,6 +248,40 @@ public void testSavepointTag() {
.containsOnly("savepoint-11", "2023-07-18 13", "2023-07-18 14", "2023-07-18 15");
}

@Test
public void testTagDatePeriodFormatter() {
Options options = new Options();
options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
options.set(TAG_CREATION_PERIOD, TagCreationPeriod.DAILY);
options.set(TAG_PERIOD_FORMATTER, TagPeriodFormatter.WITHOUT_DASHES);
FileStoreTable table = this.table.copy(options.toMap());
TableCommitImpl commit = table.newCommit(commitUser).ignoreEmptyCommit(false);
TagManager tagManager = table.store().newTagManager();

commit.commit(new ManifestCommittable(0, utcMills("2023-07-18T12:12:00")));
assertThat(tagManager.tags().values()).containsOnly("20230717");

commit.commit(new ManifestCommittable(1, utcMills("2023-07-19T12:12:00")));
assertThat(tagManager.tags().values()).contains("20230717", "20230718");
}

@Test
public void testTagHourlyPeriodFormatter() {
Options options = new Options();
options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY);
options.set(TAG_PERIOD_FORMATTER, TagPeriodFormatter.WITHOUT_DASHES);
FileStoreTable table = this.table.copy(options.toMap());
TableCommitImpl commit = table.newCommit(commitUser).ignoreEmptyCommit(false);
TagManager tagManager = table.store().newTagManager();

commit.commit(new ManifestCommittable(0, utcMills("2023-07-18T12:12:00")));
assertThat(tagManager.tags().values()).containsOnly("20230718 11");

commit.commit(new ManifestCommittable(1, utcMills("2023-07-18T13:13:00")));
assertThat(tagManager.tags().values()).contains("20230718 11", "20230718 12");
}

private long localZoneMills(String timestamp) {
return LocalDateTime.parse(timestamp)
.atZone(ZoneId.systemDefault())
Expand Down

0 comments on commit 1336275

Please sign in to comment.