Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Sep 15, 2023
1 parent 6bae5a9 commit 2ef94e0
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -891,11 +891,7 @@ public List<String> listProcedures(String dbName)
*/
public Procedure getProcedure(ObjectPath procedurePath)
throws ProcedureNotExistException, CatalogException {
Procedure procedure = ProcedureUtil.getProcedure(catalog, procedurePath.getObjectName());
if (procedure == null) {
throw new ProcedureNotExistException(name, procedurePath);
}

return procedure;
return ProcedureUtil.getProcedure(catalog, procedurePath.getObjectName())
.orElseThrow(() -> new ProcedureNotExistException(name, procedurePath));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.flink.table.procedures.Procedure;
Expand Down Expand Up @@ -86,8 +87,8 @@ public String[] call(
ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
String name = conf.getOptional(PipelineOptions.NAME).orElse("Compact job");
if (conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING) {
env.executeAsync(name);
return new String[] {"Running"};
JobClient jobClient = env.executeAsync(name);
return new String[] {"JobID=" + jobClient.getJobID()};
} else {
env.execute(name);
return new String[] {"Success"};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@

import org.apache.flink.table.procedures.Procedure;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

/** Utility methods for {@link Procedure}. */
public class ProcedureUtil {
Expand All @@ -44,13 +43,12 @@ public static List<String> listProcedures() {
return Collections.unmodifiableList(SYSTEM_PROCEDURES);
}

@Nullable
public static Procedure getProcedure(Catalog catalog, String procedureName) {
public static Optional<Procedure> getProcedure(Catalog catalog, String procedureName) {
switch (procedureName) {
case CompactActionFactory.IDENTIFIER:
return new CompactProcedure(catalog.warehouse(), catalog.options());
return Optional.of(new CompactProcedure(catalog.warehouse(), catalog.options()));
default:
return null;
return Optional.empty();
}
}
}

0 comments on commit 2ef94e0

Please sign in to comment.