Skip to content

Commit

Permalink
[flink] Introduce CreateTagFromTimestampAction for flink (#4077)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuzifu666 authored Aug 28, 2024
1 parent 99db1b7 commit b8639d4
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.paimon.flink.procedure.CreateTagFromTimestampProcedure;

import org.apache.flink.table.procedure.DefaultProcedureContext;

import java.util.Map;

/** Create tag from timestamp action for Flink. */
public class CreateTagFromTimestampAction extends ActionBase {
private final String table;
private final String tag;
private final Long timestamp;
private final String timeRetained;

public CreateTagFromTimestampAction(
String warehouse,
String table,
String tag,
Long timestamp,
String timeRetained,
Map<String, String> catalogConfig) {
super(warehouse, catalogConfig);
this.table = table;
this.tag = tag;
this.timestamp = timestamp;
this.timeRetained = timeRetained;
}

@Override
public void run() throws Exception {
CreateTagFromTimestampProcedure createTagFromTimestampProcedure =
new CreateTagFromTimestampProcedure();
createTagFromTimestampProcedure.withCatalog(catalog);
createTagFromTimestampProcedure.call(
new DefaultProcedureContext(env), table, tag, timestamp, timeRetained);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import java.util.Map;
import java.util.Optional;

/** Factory to create {@link CreateTagFromTimestampAction}. */
public class CreateTagFromTimestampActionFactory implements ActionFactory {
public static final String IDENTIFIER = "create_tag_from_timestamp";

private static final String TABLE = "table";

private static final String TAG = "tag";

private static final String TIMESTAMP = "timestamp";

private static final String TIME_RETAINED = "time_retained";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
String warehouse = params.get(WAREHOUSE);
String table = params.get(TABLE);
String tag = params.get(TAG);
Long timestamp = Long.parseLong(params.get(TIMESTAMP));
String timeRetained = params.get(TIME_RETAINED);
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);

CreateTagFromTimestampAction migrateFileAction =
new CreateTagFromTimestampAction(
warehouse, table, tag, timestamp, timeRetained, catalogConfig);
return Optional.of(migrateFileAction);
}

@Override
public void printHelp() {
System.out.println("Action \"create_tag_from_timestamp\" create tag from timestamp.");
System.out.println();

System.out.println("Syntax:");
System.out.println(
" create_tag_from_timestamp --warehouse <warehouse_path> "
+ "--table <database.table_name> "
+ "--tag <tag> "
+ "--timestamp <timestamp> "
+ "[--timeRetained <duration>] "
+ "[--options <key>=<value>,<key>=<value>,...]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ org.apache.paimon.flink.action.DeleteActionFactory
org.apache.paimon.flink.action.MergeIntoActionFactory
org.apache.paimon.flink.action.RollbackToActionFactory
org.apache.paimon.flink.action.CreateTagActionFactory
org.apache.paimon.flink.action.CreateTagFromTimestampActionFactory
org.apache.paimon.flink.action.DeleteTagActionFactory
org.apache.paimon.flink.action.ResetConsumerActionFactory
org.apache.paimon.flink.action.MigrateTableActionFactory
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
import static org.assertj.core.api.Assertions.assertThat;

/** IT cases for {@link CreateTagFromTimestampAction}. */
public class CreateTagFromTimestampActionITTest extends ActionITCaseBase {
private static final DataType[] FIELD_TYPES =
new DataType[] {DataTypes.STRING(), DataTypes.STRING()};

private static final RowType ROW_TYPE = RowType.of(FIELD_TYPES, new String[] {"k", "v"});

@BeforeEach
public void setUp() {
init(warehouse);
}

@Test
public void testCreateTagFromTimestampAction() throws Exception {
FileStoreTable table = prepareTable();
TableScan.Plan plan = table.newReadBuilder().newScan().plan();
List<String> actual = getResult(table.newReadBuilder().newRead(), plan.splits(), ROW_TYPE);
List<String> expected;
expected = Arrays.asList("+I[1, 2024-01-01]", "+I[2, 2024-01-02]", "+I[3, 2024-01-03]");

assertThat(actual).isEqualTo(expected);
long ts = table.snapshotManager().latestSnapshot().timeMillis();
String tag = "tag_test";

createAction(
CreateTagFromTimestampAction.class,
"create_tag_from_timestamp",
"--warehouse",
warehouse,
"--table",
database + "." + tableName,
"--tag",
tag,
"--timestamp",
Long.toString(ts))
.run();

Snapshot snapshot = table.tagManager().tags().firstKey();

assertThat(table.tagManager().tagExists(tag)).isTrue();
assertThat(table.tagManager().tagCount()).isEqualTo(1);
assertThat(snapshot.timeMillis()).isEqualTo(ts);
}

private FileStoreTable prepareTable() throws Exception {
init(warehouse);

RowType rowType =
RowType.of(
new DataType[] {DataTypes.STRING(), DataTypes.STRING()},
new String[] {"k", "v"});
String[] pk = {"k", "v"};
FileStoreTable table =
createFileStoreTable(
rowType,
Collections.singletonList("v"),
new ArrayList<>(Arrays.asList(pk)),
Collections.singletonList("k"),
Collections.emptyMap());

StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser);
write = writeBuilder.newWrite();
commit = writeBuilder.newCommit();

// 3 snapshots
writeData(rowData(BinaryString.fromString("1"), BinaryString.fromString("2024-01-01")));
writeData(rowData(BinaryString.fromString("2"), BinaryString.fromString("2024-01-02")));
writeData(rowData(BinaryString.fromString("3"), BinaryString.fromString("2024-01-03")));

return table;
}
}

0 comments on commit b8639d4

Please sign in to comment.