diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/EmbeddedRefreshHandler.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/EmbeddedRefreshHandler.java new file mode 100644 index 00000000000000..bb10e3c50fd5c1 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/EmbeddedRefreshHandler.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.workflow; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.refresh.RefreshHandler; + +import java.io.Serializable; +import java.util.Objects; + +/** A {@link RefreshHandler} instance for embedded workflow scheduler. */ +@PublicEvolving +public class EmbeddedRefreshHandler implements RefreshHandler, Serializable { + + private static final long serialVersionUID = 1L; + + private final String workflowName; + private final String workflowGroup; + + public EmbeddedRefreshHandler(String workflowName, String workflowGroup) { + this.workflowName = workflowName; + this.workflowGroup = workflowGroup; + } + + @Override + public String asSummaryString() { + return String.format( + "{\n workflowName: %s,\n workflowGroup: %s\n}", workflowName, workflowGroup); + } + + public String getWorkflowName() { + return workflowName; + } + + public String getWorkflowGroup() { + return workflowGroup; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + EmbeddedRefreshHandler that = (EmbeddedRefreshHandler) o; + return Objects.equals(workflowName, that.workflowName) + && Objects.equals(workflowGroup, that.workflowGroup); + } + + @Override + public int hashCode() { + return Objects.hash(workflowName, workflowGroup); + } + + @Override + public String toString() { + return "EmbeddedRefreshHandler{" + + "workflowName='" + + workflowName + + '\'' + + ", workflowGroup='" + + workflowGroup + + '\'' + + '}'; + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/EmbeddedRefreshHandlerSerializer.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/EmbeddedRefreshHandlerSerializer.java new file mode 100644 index 00000000000000..818c702dc0e3da --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/EmbeddedRefreshHandlerSerializer.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.workflow; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.refresh.RefreshHandlerSerializer; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; + +/** {@link RefreshHandlerSerializer} for {@link EmbeddedRefreshHandler}. */ +@PublicEvolving +public class EmbeddedRefreshHandlerSerializer + implements RefreshHandlerSerializer { + + public static final EmbeddedRefreshHandlerSerializer INSTANCE = + new EmbeddedRefreshHandlerSerializer(); + + @Override + public byte[] serialize(EmbeddedRefreshHandler refreshHandler) throws IOException { + return InstantiationUtil.serializeObject(refreshHandler); + } + + @Override + public EmbeddedRefreshHandler deserialize(byte[] serializedBytes, ClassLoader cl) + throws IOException, ClassNotFoundException { + return InstantiationUtil.deserializeObject(serializedBytes, cl); + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/EmbeddedWorkflowScheduler.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/EmbeddedWorkflowScheduler.java new file mode 100644 index 00000000000000..98e1c15d72dc08 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/EmbeddedWorkflowScheduler.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.workflow; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.CreateEmbeddedSchedulerWorkflowHeaders; +import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.DeleteEmbeddedSchedulerWorkflowHeaders; +import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.ResumeEmbeddedSchedulerWorkflowHeaders; +import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.SuspendEmbeddedSchedulerWorkflowHeaders; +import org.apache.flink.table.gateway.rest.message.materializedtable.scheduler.CreateEmbeddedSchedulerWorkflowRequestBody; +import org.apache.flink.table.gateway.rest.message.materializedtable.scheduler.CreateEmbeddedSchedulerWorkflowResponseBody; +import org.apache.flink.table.gateway.rest.message.materializedtable.scheduler.EmbeddedSchedulerWorkflowRequestBody; +import org.apache.flink.table.gateway.workflow.scheduler.EmbeddedQuartzScheduler; +import org.apache.flink.table.workflow.CreatePeriodicRefreshWorkflow; +import org.apache.flink.table.workflow.CreateRefreshWorkflow; +import org.apache.flink.table.workflow.DeleteRefreshWorkflow; +import org.apache.flink.table.workflow.ModifyRefreshWorkflow; +import org.apache.flink.table.workflow.ResumeRefreshWorkflow; +import org.apache.flink.table.workflow.SuspendRefreshWorkflow; +import org.apache.flink.table.workflow.WorkflowException; +import org.apache.flink.table.workflow.WorkflowScheduler; +import org.apache.flink.util.concurrent.Executors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * A workflow scheduler plugin implementation for {@link EmbeddedQuartzScheduler}. It is used to + * create, modify refresh workflow for materialized table. + */ +@PublicEvolving +public class EmbeddedWorkflowScheduler implements WorkflowScheduler { + + private static final Logger LOG = LoggerFactory.getLogger(EmbeddedWorkflowScheduler.class); + + private final Configuration configuration; + private final String restAddress; + private final int port; + + private RestClient restClient; + + public EmbeddedWorkflowScheduler(Configuration configuration) { + this.configuration = configuration; + this.restAddress = configuration.get(RestOptions.ADDRESS); + this.port = configuration.get(RestOptions.PORT); + } + + @Override + public void open() throws WorkflowException { + try { + restClient = new RestClient(configuration, Executors.directExecutor()); + } catch (Exception e) { + throw new WorkflowException( + "Could not create RestClient to connect to embedded scheduler.", e); + } + } + + @Override + public void close() throws WorkflowException { + restClient.closeAsync(); + } + + @Override + public EmbeddedRefreshHandlerSerializer getRefreshHandlerSerializer() { + return EmbeddedRefreshHandlerSerializer.INSTANCE; + } + + @Override + public EmbeddedRefreshHandler createRefreshWorkflow(CreateRefreshWorkflow createRefreshWorkflow) + throws WorkflowException { + if (createRefreshWorkflow instanceof CreatePeriodicRefreshWorkflow) { + CreatePeriodicRefreshWorkflow periodicRefreshWorkflow = + (CreatePeriodicRefreshWorkflow) createRefreshWorkflow; + ObjectIdentifier materializedTableIdentifier = + periodicRefreshWorkflow.getMaterializedTableIdentifier(); + CreateEmbeddedSchedulerWorkflowRequestBody requestBody = + new CreateEmbeddedSchedulerWorkflowRequestBody( + materializedTableIdentifier.asSerializableString(), + periodicRefreshWorkflow.getCronExpression(), + periodicRefreshWorkflow.getDynamicOptions(), + periodicRefreshWorkflow.getExecutionConfig(), + null, + periodicRefreshWorkflow.getRestEndpointUrl()); + CreateEmbeddedSchedulerWorkflowResponseBody responseBody; + try { + responseBody = + restClient + .sendRequest( + restAddress, + port, + CreateEmbeddedSchedulerWorkflowHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + requestBody) + .get(30, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.error( + "Failed to create periodic refresh workflow for materialized table {}.", + materializedTableIdentifier, + e); + throw new WorkflowException( + String.format( + "Failed to create periodic refresh workflow for materialized table %s.", + materializedTableIdentifier), + e); + } + + return new EmbeddedRefreshHandler( + responseBody.getWorkflowName(), responseBody.getWorkflowGroup()); + } else { + LOG.error( + "Unsupported create refresh workflow type {}.", + createRefreshWorkflow.getClass().getSimpleName()); + throw new WorkflowException( + String.format( + "Unsupported create refresh workflow type %s.", + createRefreshWorkflow.getClass().getSimpleName())); + } + } + + @Override + public void modifyRefreshWorkflow( + ModifyRefreshWorkflow modifyRefreshWorkflow) + throws WorkflowException { + EmbeddedRefreshHandler embeddedRefreshHandler = modifyRefreshWorkflow.getRefreshHandler(); + EmbeddedSchedulerWorkflowRequestBody requestBody = + new EmbeddedSchedulerWorkflowRequestBody( + embeddedRefreshHandler.getWorkflowName(), + embeddedRefreshHandler.getWorkflowGroup()); + if (modifyRefreshWorkflow instanceof SuspendRefreshWorkflow) { + try { + restClient + .sendRequest( + restAddress, + port, + SuspendEmbeddedSchedulerWorkflowHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + requestBody) + .get(30, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.error( + "Failed to suspend refresh workflow {}.", + embeddedRefreshHandler.asSummaryString(), + e); + throw new WorkflowException( + String.format( + "Failed to suspend refresh workflow %s.", + embeddedRefreshHandler.asSummaryString()), + e); + } + } else if (modifyRefreshWorkflow instanceof ResumeRefreshWorkflow) { + try { + restClient + .sendRequest( + restAddress, + port, + ResumeEmbeddedSchedulerWorkflowHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + requestBody) + .get(30, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.error( + "Failed to resume refresh workflow {}.", + embeddedRefreshHandler.asSummaryString(), + e); + throw new WorkflowException( + String.format( + "Failed to resume refresh workflow %s.", + embeddedRefreshHandler.asSummaryString()), + e); + } + } else { + LOG.error( + "Unsupported modify refresh workflow type {}.", + modifyRefreshWorkflow.getClass().getSimpleName()); + throw new WorkflowException( + String.format( + "Unsupported modify refresh workflow type %s.", + modifyRefreshWorkflow.getClass().getSimpleName())); + } + } + + @Override + public void deleteRefreshWorkflow( + DeleteRefreshWorkflow deleteRefreshWorkflow) + throws WorkflowException { + EmbeddedRefreshHandler embeddedRefreshHandler = deleteRefreshWorkflow.getRefreshHandler(); + EmbeddedSchedulerWorkflowRequestBody requestBody = + new EmbeddedSchedulerWorkflowRequestBody( + embeddedRefreshHandler.getWorkflowName(), + embeddedRefreshHandler.getWorkflowGroup()); + try { + restClient + .sendRequest( + restAddress, + port, + DeleteEmbeddedSchedulerWorkflowHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + requestBody) + .get(30, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.error( + "Failed to delete refresh workflow {}.", + embeddedRefreshHandler.asSummaryString(), + e); + throw new WorkflowException( + String.format( + "Failed to delete refresh workflow %s.", + embeddedRefreshHandler.asSummaryString()), + e); + } + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/EmbeddedWorkflowSchedulerFactory.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/EmbeddedWorkflowSchedulerFactory.java new file mode 100644 index 00000000000000..caad49ad8f27f0 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/EmbeddedWorkflowSchedulerFactory.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.workflow; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.factories.WorkflowSchedulerFactory; +import org.apache.flink.table.gateway.rest.SqlGatewayRestEndpointFactory; +import org.apache.flink.table.workflow.WorkflowScheduler; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.getEndpointConfig; +import static org.apache.flink.table.gateway.rest.SqlGatewayRestEndpointFactory.rebuildRestEndpointOptions; + +/** The {@link WorkflowSchedulerFactory} to create the {@link EmbeddedWorkflowScheduler}. */ +@PublicEvolving +public class EmbeddedWorkflowSchedulerFactory implements WorkflowSchedulerFactory { + + public static final String IDENTIFIER = "embedded"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } + + @Override + public WorkflowScheduler createWorkflowScheduler(Context context) { + Map flinkConfigMap = context.getConfiguration().toMap(); + Configuration configuration = Configuration.fromMap(flinkConfigMap); + Map restEndpointConfigMap = + getEndpointConfig(configuration, SqlGatewayRestEndpointFactory.IDENTIFIER); + // Use the SqlGateway rest endpoint config + Configuration restConfig = + rebuildRestEndpointOptions(restEndpointConfigMap, flinkConfigMap); + return new EmbeddedWorkflowScheduler(restConfig); + } +} diff --git a/flink-table/flink-sql-gateway/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-sql-gateway/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index db56ab2ccd1e3f..95154f085dbae8 100644 --- a/flink-table/flink-sql-gateway/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-table/flink-sql-gateway/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -14,3 +14,4 @@ # limitations under the License. org.apache.flink.table.gateway.rest.SqlGatewayRestEndpointFactory +org.apache.flink.table.gateway.workflow.EmbeddedWorkflowSchedulerFactory diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITCaseBase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITCaseBase.java index fb27468392a293..6bb22879eaeed6 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITCaseBase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITCaseBase.java @@ -46,7 +46,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** The base class for Rest API IT test. */ -abstract class RestAPIITCaseBase { +public abstract class RestAPIITCaseBase { @RegisterExtension @Order(1) @@ -58,10 +58,10 @@ abstract class RestAPIITCaseBase { new SqlGatewayServiceExtension(MINI_CLUSTER::getClientConfiguration); @Nullable private static TestingRestClient restClient = null; - @Nullable private static String targetAddress = null; + @Nullable protected static String targetAddress = null; @Nullable private static SqlGatewayRestEndpoint sqlGatewayRestEndpoint = null; - private static int port = 0; + protected static int port = 0; @BeforeAll static void start() throws Exception { diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/workflow/EmbeddedRefreshHandlerTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/workflow/EmbeddedRefreshHandlerTest.java new file mode 100644 index 00000000000000..df34b608d421e8 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/workflow/EmbeddedRefreshHandlerTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.workflow; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link EmbeddedRefreshHandler} and {@link EmbeddedRefreshHandlerSerializer}. */ +public class EmbeddedRefreshHandlerTest { + + @Test + void testSerDe() throws Exception { + EmbeddedRefreshHandler expected = new EmbeddedRefreshHandler("a", "b"); + + byte[] serBytes = EmbeddedRefreshHandlerSerializer.INSTANCE.serialize(expected); + EmbeddedRefreshHandler actual = + EmbeddedRefreshHandlerSerializer.INSTANCE.deserialize( + serBytes, this.getClass().getClassLoader()); + + assertThat(actual).isEqualTo(expected); + } +} diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/workflow/EmbeddedSchedulerRelatedITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/workflow/EmbeddedSchedulerRelatedITCase.java new file mode 100644 index 00000000000000..1917578b837a2e --- /dev/null +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/workflow/EmbeddedSchedulerRelatedITCase.java @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.workflow; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.util.RestClientException; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.factories.WorkflowSchedulerFactoryUtil; +import org.apache.flink.table.gateway.rest.RestAPIITCaseBase; +import org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler; +import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.CreateEmbeddedSchedulerWorkflowHeaders; +import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.DeleteEmbeddedSchedulerWorkflowHeaders; +import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.ResumeEmbeddedSchedulerWorkflowHeaders; +import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.SuspendEmbeddedSchedulerWorkflowHeaders; +import org.apache.flink.table.gateway.rest.message.materializedtable.scheduler.CreateEmbeddedSchedulerWorkflowRequestBody; +import org.apache.flink.table.gateway.rest.message.materializedtable.scheduler.CreateEmbeddedSchedulerWorkflowResponseBody; +import org.apache.flink.table.gateway.rest.message.materializedtable.scheduler.EmbeddedSchedulerWorkflowRequestBody; +import org.apache.flink.table.workflow.CreatePeriodicRefreshWorkflow; +import org.apache.flink.table.workflow.CreateRefreshWorkflow; +import org.apache.flink.table.workflow.DeleteRefreshWorkflow; +import org.apache.flink.table.workflow.ModifyRefreshWorkflow; +import org.apache.flink.table.workflow.ResumeRefreshWorkflow; +import org.apache.flink.table.workflow.SuspendRefreshWorkflow; +import org.apache.flink.table.workflow.WorkflowException; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; +import static org.apache.flink.table.factories.FactoryUtil.WORKFLOW_SCHEDULER_TYPE; +import static org.apache.flink.table.gateway.workflow.scheduler.QuartzSchedulerUtils.QUARTZ_JOB_GROUP; +import static org.apache.flink.table.gateway.workflow.scheduler.QuartzSchedulerUtils.QUARTZ_JOB_PREFIX; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Test basic logic of handlers inherited from {@link AbstractSqlGatewayRestHandler} in embedded + * scheduler cases. + */ +public class EmbeddedSchedulerRelatedITCase extends RestAPIITCaseBase { + + private static final EmbeddedSchedulerWorkflowRequestBody nonExistsWorkflow = + new EmbeddedSchedulerWorkflowRequestBody("non-exists", QUARTZ_JOB_GROUP); + + private static final ObjectIdentifier materializedTableIdentifier = + ObjectIdentifier.of("cat", "db", "t1"); + private static final String descriptionStatement = + String.format( + "ALTER MATERIALIZED TABLE %s REFRESH", + materializedTableIdentifier.asSerializableString()); + private static final String cronExpression = "0 0/1 * * * ?"; + + private static final EmbeddedRefreshHandler nonExistsHandler = + new EmbeddedRefreshHandler("non-exits", QUARTZ_JOB_GROUP); + + private CreateEmbeddedSchedulerWorkflowRequestBody createRequestBody; + private CreatePeriodicRefreshWorkflow createPeriodicWorkflow; + + private EmbeddedWorkflowScheduler embeddedWorkflowScheduler; + + @BeforeEach + void setup() throws Exception { + String gatewayRestEndpointURL = String.format("http://%s:%s", targetAddress, port); + createRequestBody = + new CreateEmbeddedSchedulerWorkflowRequestBody( + materializedTableIdentifier.asSerializableString(), + cronExpression, + null, + null, + null, + gatewayRestEndpointURL); + createPeriodicWorkflow = + new CreatePeriodicRefreshWorkflow( + materializedTableIdentifier, + descriptionStatement, + cronExpression, + null, + null, + gatewayRestEndpointURL); + + Configuration configuration = new Configuration(); + configuration.set(WORKFLOW_SCHEDULER_TYPE, "embedded"); + configuration.setString("sql-gateway.endpoint.rest.address", targetAddress); + configuration.setString("sql-gateway.endpoint.rest.port", String.valueOf(port)); + + embeddedWorkflowScheduler = + (EmbeddedWorkflowScheduler) + WorkflowSchedulerFactoryUtil.createWorkflowScheduler( + configuration, + EmbeddedSchedulerRelatedITCase.class.getClassLoader()); + embeddedWorkflowScheduler.open(); + } + + @AfterEach + void cleanup() throws Exception { + embeddedWorkflowScheduler.close(); + } + + @Test + void testCreateWorkflow() throws Exception { + CreateEmbeddedSchedulerWorkflowResponseBody createResponse = + sendRequest( + CreateEmbeddedSchedulerWorkflowHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + createRequestBody) + .get(5, TimeUnit.SECONDS); + + assertThat(createResponse.getWorkflowName()) + .isEqualTo( + QUARTZ_JOB_PREFIX + + "_" + + materializedTableIdentifier.asSerializableString()); + assertThat(createResponse.getWorkflowGroup()).isEqualTo(QUARTZ_JOB_GROUP); + + // create the workflow repeatedly + CompletableFuture repeatedCreateFuture = + sendRequest( + CreateEmbeddedSchedulerWorkflowHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + createRequestBody); + + assertThatFuture(repeatedCreateFuture) + .failsWithin(5, TimeUnit.SECONDS) + .withThrowableOfType(ExecutionException.class) + .withCauseInstanceOf(RestClientException.class) + .withMessageContaining( + "Materialized table `cat`.`db`.`t1` quartz schedule job already exist, job info: default_group.quartz_job_`cat`.`db`.`t1`.") + .satisfies( + e -> + assertThat( + ((RestClientException) e.getCause()) + .getHttpResponseStatus()) + .isEqualTo(HttpResponseStatus.INTERNAL_SERVER_ERROR)); + + // delete the schedule job + EmbeddedSchedulerWorkflowRequestBody deleteRequestBody = + new EmbeddedSchedulerWorkflowRequestBody( + createResponse.getWorkflowName(), createResponse.getWorkflowGroup()); + CompletableFuture deleteFuture = + sendRequest( + DeleteEmbeddedSchedulerWorkflowHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + deleteRequestBody); + assertThatFuture(deleteFuture).succeedsWithin(5, TimeUnit.SECONDS); + } + + @Test + void testSuspendNonExistsWorkflow() throws Exception { + CompletableFuture suspendFuture = + sendRequest( + SuspendEmbeddedSchedulerWorkflowHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + nonExistsWorkflow); + + assertThatFuture(suspendFuture) + .failsWithin(5, TimeUnit.SECONDS) + .withThrowableOfType(ExecutionException.class) + .withCauseInstanceOf(RestClientException.class) + .withMessageContaining( + "Failed to suspend a non-existent quartz schedule job: default_group.non-exists") + .satisfies( + e -> + assertThat( + ((RestClientException) e.getCause()) + .getHttpResponseStatus()) + .isEqualTo(HttpResponseStatus.INTERNAL_SERVER_ERROR)); + } + + @Test + void testResumeNonExistsWorkflow() throws Exception { + CompletableFuture suspendFuture = + sendRequest( + ResumeEmbeddedSchedulerWorkflowHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + nonExistsWorkflow); + + assertThatFuture(suspendFuture) + .failsWithin(5, TimeUnit.SECONDS) + .withThrowableOfType(ExecutionException.class) + .withCauseInstanceOf(RestClientException.class) + .withMessageContaining( + "Failed to resume a non-existent quartz schedule job: default_group.non-exists") + .satisfies( + e -> + assertThat( + ((RestClientException) e.getCause()) + .getHttpResponseStatus()) + .isEqualTo(HttpResponseStatus.INTERNAL_SERVER_ERROR)); + } + + @Test + void testDeleteNonExistsWorkflow() throws Exception { + CompletableFuture suspendFuture = + sendRequest( + DeleteEmbeddedSchedulerWorkflowHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + nonExistsWorkflow); + + assertThatFuture(suspendFuture) + .failsWithin(5, TimeUnit.SECONDS) + .withThrowableOfType(ExecutionException.class) + .withCauseInstanceOf(RestClientException.class) + .withMessageContaining( + "Failed to delete a non-existent quartz schedule job: default_group.non-exists.") + .satisfies( + e -> + assertThat( + ((RestClientException) e.getCause()) + .getHttpResponseStatus()) + .isEqualTo(HttpResponseStatus.INTERNAL_SERVER_ERROR)); + } + + @Test + void testCreateWorkflowByWorkflowSchedulerInterface() throws Exception { + // create workflow + EmbeddedRefreshHandler actual = + embeddedWorkflowScheduler.createRefreshWorkflow(createPeriodicWorkflow); + + EmbeddedRefreshHandler expected = + new EmbeddedRefreshHandler( + QUARTZ_JOB_PREFIX + + "_" + + materializedTableIdentifier.asSerializableString(), + QUARTZ_JOB_GROUP); + assertThat(actual).isEqualTo(expected); + + // create workflow repeatedly + assertThatThrownBy( + () -> + embeddedWorkflowScheduler.createRefreshWorkflow( + createPeriodicWorkflow)) + .isInstanceOf(WorkflowException.class) + .hasMessage( + "Failed to create periodic refresh workflow for materialized table `cat`.`db`.`t1`."); + + // suspend, just to verify suspend function can work + SuspendRefreshWorkflow suspendRefreshWorkflow = + new SuspendRefreshWorkflow<>(actual); + embeddedWorkflowScheduler.modifyRefreshWorkflow(suspendRefreshWorkflow); + + // resume, just to verify suspend function can work + ResumeRefreshWorkflow resumeRefreshWorkflow = + new ResumeRefreshWorkflow<>(actual); + embeddedWorkflowScheduler.modifyRefreshWorkflow(resumeRefreshWorkflow); + + // delete, just to verify suspend function can work + DeleteRefreshWorkflow deleteRefreshWorkflow = + new DeleteRefreshWorkflow<>(actual); + embeddedWorkflowScheduler.deleteRefreshWorkflow(deleteRefreshWorkflow); + } + + @Test + void testCreateWorkflowWithUnsupportedTypeByWorkflowSchedulerInterface() { + assertThatThrownBy( + () -> + embeddedWorkflowScheduler.createRefreshWorkflow( + new UnsupportedCreateRefreshWorkflow())) + .isInstanceOf(WorkflowException.class) + .hasMessage( + "Unsupported create refresh workflow type UnsupportedCreateRefreshWorkflow."); + } + + @Test + void testModifyWorkflowWithUnsupportedTypeByWorkflowSchedulerInterface() { + assertThatThrownBy( + () -> + embeddedWorkflowScheduler.modifyRefreshWorkflow( + new UnsupportedModifyRefreshWorkflow())) + .isInstanceOf(WorkflowException.class) + .hasMessage( + "Unsupported modify refresh workflow type UnsupportedModifyRefreshWorkflow."); + } + + @Test + void testNonExistsWorkflowByWorkflowSchedulerInterface() { + // suspend case + assertThatThrownBy( + () -> + embeddedWorkflowScheduler.modifyRefreshWorkflow( + new SuspendRefreshWorkflow<>(nonExistsHandler))) + .isInstanceOf(WorkflowException.class) + .hasMessage( + "Failed to suspend refresh workflow {\n" + + " workflowName: non-exits,\n" + + " workflowGroup: default_group\n" + + "}."); + + // resume case + assertThatThrownBy( + () -> + embeddedWorkflowScheduler.modifyRefreshWorkflow( + new ResumeRefreshWorkflow<>(nonExistsHandler))) + .isInstanceOf(WorkflowException.class) + .hasMessage( + "Failed to resume refresh workflow {\n" + + " workflowName: non-exits,\n" + + " workflowGroup: default_group\n" + + "}."); + + // delete case + assertThatThrownBy( + () -> + embeddedWorkflowScheduler.deleteRefreshWorkflow( + new DeleteRefreshWorkflow<>(nonExistsHandler))) + .isInstanceOf(WorkflowException.class) + .hasMessage( + "Failed to delete refresh workflow {\n" + + " workflowName: non-exits,\n" + + " workflowGroup: default_group\n" + + "}."); + } + + /** Just used for test. */ + private static class UnsupportedCreateRefreshWorkflow implements CreateRefreshWorkflow {} + + /** Just used for test. */ + private static class UnsupportedModifyRefreshWorkflow + implements ModifyRefreshWorkflow { + + @Override + public EmbeddedRefreshHandler getRefreshHandler() { + return new EmbeddedRefreshHandler("a", "b"); + } + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java index 994647c0284942..2798f4051e4640 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java @@ -29,6 +29,8 @@ @Internal public class ContinuousRefreshHandler implements RefreshHandler, Serializable { + private static final long serialVersionUID = 1L; + // TODO: add clusterId for yarn and k8s resource manager private final String executionTarget; private final String jobId; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowException.java index 5155d5265ec130..6f892e48a2e92b 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowException.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowException.java @@ -19,13 +19,16 @@ package org.apache.flink.table.workflow; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.FlinkException; /** * A workflow-related operation exception to materialized table, including create, suspend, resume, * drop workflow operation, etc. */ @PublicEvolving -public class WorkflowException extends Exception { +public class WorkflowException extends FlinkException { + + private static final long serialVersionUID = 1L; public WorkflowException(String message) { super(message);