Skip to content

Commit

Permalink
[flink] Add paimon procedures in flink generic catalog (apache#2651)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuangchong authored Jan 8, 2024
1 parent 58ee0fd commit 0aad580
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.flink;

import org.apache.paimon.flink.procedure.ProcedureUtil;

import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
Expand All @@ -36,6 +38,7 @@
import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
Expand All @@ -46,6 +49,7 @@
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FunctionDefinitionFactory;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.table.procedures.Procedure;

import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -477,4 +481,24 @@ public List<CatalogColumnStatistics> bulkGetPartitionColumnStatistics(
return flink.bulkGetPartitionColumnStatistics(tablePath, partitionSpecs);
}
}

/**
* Do not annotate with <code>@override</code> here to maintain compatibility with Flink 1.17-.
*/
public List<String> listProcedures(String dbName)
throws DatabaseNotExistException, CatalogException {
if (paimon.databaseExists(dbName)) {
return ProcedureUtil.listProcedures();
}
return flink.listProcedures(dbName);
}

/**
* Do not annotate with <code>@override</code> here to maintain compatibility with Flink 1.17-.
*/
public Procedure getProcedure(ObjectPath procedurePath)
throws ProcedureNotExistException, CatalogException {
return ProcedureUtil.getProcedure(paimon.catalog(), procedurePath)
.orElse(flink.getProcedure(procedurePath));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,12 @@ public void testReadPaimonSystemTable() {

assertThat(result).containsExactly(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L, "APPEND"));
}

@Test
public void testReadPaimonAllProcedures() {
List<Row> result = sql("SHOW PROCEDURES");

assertThat(result)
.contains(Row.of("compact"), Row.of("merge_into"), Row.of("migrate_table"));
}
}

0 comments on commit 0aad580

Please sign in to comment.