Skip to content

Commit

Permalink
[core] Introduce PurgeFilesProcedure to purge table (#4717)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuzifu666 authored Dec 16, 2024
1 parent 0ccdf25 commit 5309ccc
Show file tree
Hide file tree
Showing 9 changed files with 382 additions and 0 deletions.
21 changes: 21 additions & 0 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,27 @@ All available procedures are listed below.
CALL sys.rollback_to_watermark(`table` => 'default.T', watermark => 1730292023000)
</td>
</tr>
<tr>
<td>purge_files</td>
<td>
-- for Flink 1.18<br/>
-- clear table with purge files directly.<br/>
CALL sys.purge_files('identifier')<br/><br/>
-- for Flink 1.19 and later<br/>
-- clear table with purge files directly.<br/>
CALL sys.purge_files(`table` => 'default.T')<br/><br/>
</td>
<td>
To clear table with purge files directly. Argument:
<li>identifier: the target table identifier. Cannot be empty.</li>
</td>
<td>
-- for Flink 1.18<br/>
CALL sys.purge_files('default.T')
-- for Flink 1.19 and later<br/>
CALL sys.purge_files(`table` => 'default.T')
</td>
</tr>
<tr>
<td>expire_snapshots</td>
<td>
Expand Down
10 changes: 10 additions & 0 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,16 @@ This section introduce all available spark procedures about paimon.
CALL sys.rollback_to_watermark(table => 'default.T', watermark => 1730292023000)<br/><br/>
</td>
</tr>
<tr>
<td>purge_files</td>
<td>
To clear table with purge files directly. Argument:
<li>table: the target table identifier. Cannot be empty.</li>
</td>
<td>
CALL sys.purge_files(table => 'default.T')<br/><br/>
</td>
</tr>
<tr>
<td>migrate_database</td>
<td>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;

import org.apache.flink.table.procedure.ProcedureContext;

import java.io.IOException;
import java.util.Arrays;

/** A procedure to purge files for a table. */
public class PurgeFilesProcedure extends ProcedureBase {
public static final String IDENTIFIER = "purge_files";

public String[] call(ProcedureContext procedureContext, String tableId)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileIO fileIO = fileStoreTable.fileIO();
Path tablePath = fileStoreTable.snapshotManager().tablePath();
try {
Arrays.stream(fileIO.listStatus(tablePath))
.filter(f -> !f.getPath().getName().contains("schema"))
.forEach(
fileStatus -> {
try {
fileIO.delete(fileStatus.getPath(), true);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
} catch (IOException e) {
throw new RuntimeException(e);
}
return new String[] {
String.format("Success purge files with table: %s.", fileStoreTable.name())
};
}

@Override
public String identifier() {
return IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;

import java.io.IOException;
import java.util.Arrays;

/**
* A procedure to purge files for a table. Usage:
*
* <pre><code>
* -- rollback to the snapshot which earlier or equal than watermark.
* CALL sys.purge_files(`table` => 'tableId')
* </code></pre>
*/
public class PurgeFilesProcedure extends ProcedureBase {

public static final String IDENTIFIER = "purge_files";

@ProcedureHint(argument = {@ArgumentHint(name = "table", type = @DataTypeHint("STRING"))})
public String[] call(ProcedureContext procedureContext, String tableId)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileIO fileIO = fileStoreTable.fileIO();
Path tablePath = fileStoreTable.snapshotManager().tablePath();
try {
Arrays.stream(fileIO.listStatus(tablePath))
.filter(f -> !f.getPath().getName().contains("schema"))
.forEach(
fileStatus -> {
try {
fileIO.delete(fileStatus.getPath(), true);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
} catch (IOException e) {
throw new RuntimeException(e);
}
return new String[] {
String.format("Success purge files with table: %s.", fileStoreTable.name())
};
}

@Override
public String identifier() {
return IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
org.apache.paimon.flink.procedure.QueryServiceProcedure
org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
org.apache.paimon.flink.procedure.ExpirePartitionsProcedure
org.apache.paimon.flink.procedure.PurgeFilesProcedure
org.apache.paimon.flink.procedure.privilege.InitFileBasedPrivilegeProcedure
org.apache.paimon.flink.procedure.privilege.CreatePrivilegedUserProcedure
org.apache.paimon.flink.procedure.privilege.DropPrivilegedUserProcedure
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.flink.CatalogITCaseBase;

import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

/** IT Case for {@link PurgeFilesProcedure}. */
public class PurgeFilesProcedureITCase extends CatalogITCaseBase {

@Test
public void testPurgeFiles() throws Exception {
sql(
"CREATE TABLE T (id INT, name STRING,"
+ " PRIMARY KEY (id) NOT ENFORCED)"
+ " WITH ('bucket'='1')");

sql("INSERT INTO T VALUES (1, 'a')");
assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a"));

sql("INSERT INTO T VALUES (1, 'a')");
sql("CALL sys.purge_files(`table` => 'default.T')");
assertThat(sql("select * from `T`")).containsExactly();

sql("INSERT INTO T VALUES (2, 'a')");
assertThat(sql("select * from `T`")).containsExactly(Row.of(2, "a"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.paimon.spark.procedure.MigrateTableProcedure;
import org.apache.paimon.spark.procedure.Procedure;
import org.apache.paimon.spark.procedure.ProcedureBuilder;
import org.apache.paimon.spark.procedure.PurgeFilesProcedure;
import org.apache.paimon.spark.procedure.RefreshObjectTableProcedure;
import org.apache.paimon.spark.procedure.RemoveOrphanFilesProcedure;
import org.apache.paimon.spark.procedure.RenameTagProcedure;
Expand Down Expand Up @@ -74,6 +75,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
procedureBuilders.put("rollback", RollbackProcedure::builder);
procedureBuilders.put("rollback_to_timestamp", RollbackToTimestampProcedure::builder);
procedureBuilders.put("rollback_to_watermark", RollbackToWatermarkProcedure::builder);
procedureBuilders.put("purge_files", PurgeFilesProcedure::builder);
procedureBuilders.put("create_tag", CreateTagProcedure::builder);
procedureBuilders.put("replace_tag", ReplaceTagProcedure::builder);
procedureBuilders.put("rename_tag", RenameTagProcedure::builder);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.procedure;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.table.FileStoreTable;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;

import java.io.IOException;
import java.util.Arrays;

import static org.apache.spark.sql.types.DataTypes.StringType;

/** A procedure to purge files for a table. */
public class PurgeFilesProcedure extends BaseProcedure {

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {ProcedureParameter.required("table", StringType)};

private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
new StructField("result", StringType, true, Metadata.empty())
});

private PurgeFilesProcedure(TableCatalog tableCatalog) {
super(tableCatalog);
}

@Override
public ProcedureParameter[] parameters() {
return PARAMETERS;
}

@Override
public StructType outputType() {
return OUTPUT_TYPE;
}

@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());

return modifyPaimonTable(
tableIdent,
table -> {
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileIO fileIO = fileStoreTable.fileIO();
Path tablePath = fileStoreTable.snapshotManager().tablePath();
try {
Arrays.stream(fileIO.listStatus(tablePath))
.filter(f -> !f.getPath().getName().contains("schema"))
.forEach(
fileStatus -> {
try {
fileIO.delete(fileStatus.getPath(), true);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
} catch (IOException e) {
throw new RuntimeException(e);
}

InternalRow outputRow =
newInternalRow(
UTF8String.fromString(
String.format(
"Success purge files with table: %s.",
fileStoreTable.name())));
return new InternalRow[] {outputRow};
});
}

public static ProcedureBuilder builder() {
return new BaseProcedure.Builder<PurgeFilesProcedure>() {
@Override
public PurgeFilesProcedure doBuild() {
return new PurgeFilesProcedure(tableCatalog());
}
};
}

@Override
public String description() {
return "PurgeFilesProcedure";
}
}
Loading

0 comments on commit 5309ccc

Please sign in to comment.