diff --git a/dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java b/dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java index cfb419983d..213af7fd5d 100644 --- a/dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java +++ b/dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java @@ -208,6 +208,13 @@ public class TaskDTO extends AbstractStatementDTO { notes = "Flag indicating whether to use auto-canceling") private boolean useAutoCancel = true; + @ApiModelProperty( + value = "Flag indicating whether to mock sink function", + dataType = "boolean", + example = "true", + notes = "Flag indicating whether to mock sink function") + private boolean mockSinkFunction = true; + @ApiModelProperty(value = "Session", dataType = "String", example = "session_id", notes = "The session identifier") private String session; diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/Job2MysqlHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/Job2MysqlHandler.java index d7813233a0..7050bb48cd 100644 --- a/dinky-admin/src/main/java/org/dinky/job/handler/Job2MysqlHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/Job2MysqlHandler.java @@ -180,12 +180,13 @@ public boolean success() { history.setClusterId(clusterId); historyService.updateById(history); - if (!job.getJobConfig().isStatementSet()) { + if (!job.isPipeline()) { return true; } - if (Asserts.isNullCollection(job.getJids()) || Asserts.isNullString(job.getJobManagerAddress())) { - throw new BusException("The JobID or JobManagerAddress is null. "); + if (Asserts.isNullCollection(job.getJids())) { + throw new BusException("Job ID retrieval failed, possibly due to timeout of job deployment. " + + "Please modify the system configuration to increase the waiting time for job submission."); } JobInstance jobInstance = history.buildJobInstance(); diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index 0d8a4c6fe0..583de12f7f 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -355,8 +355,6 @@ public JobResult submitTask(TaskSubmitDto submitDto) throws Exception { public JobResult debugTask(TaskDTO task) throws Exception { // Debug mode need return result task.setUseResult(true); - // Debug mode need execute - task.setStatementSet(false); // mode check if (GatewayType.get(task.getType()).isDeployCluster()) { throw new BusException(Status.MODE_IS_NOT_ALLOW_SELECT.getMessage()); diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/parser/SqlType.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/parser/SqlType.java index 96cabd266c..c5e55d43c8 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/parser/SqlType.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/parser/SqlType.java @@ -79,6 +79,8 @@ public enum SqlType { private static final List TRANS_SQL_TYPES = Lists.newArrayList(INSERT, SELECT, WITH, SHOW, DESCRIBE, DESC, CTAS); + private static final List PIPELINE_SQL_TYPES = Lists.newArrayList(INSERT, SELECT, WITH, CTAS); + SqlType(String type, String regrex, SqlCategory category) { this.type = type; this.pattern = Pattern.compile(regrex, Pattern.CASE_INSENSITIVE | Pattern.DOTALL); @@ -104,4 +106,8 @@ public boolean match(String statement) { public static List getTransSqlTypes() { return TRANS_SQL_TYPES; } + + public boolean isPipeline() { + return PIPELINE_SQL_TYPES.contains(this); + } } diff --git a/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockDynamicTableSink.java b/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockDynamicTableSink.java new file mode 100644 index 0000000000..85a55ec650 --- /dev/null +++ b/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockDynamicTableSink.java @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.connector.mock.sink; + +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.types.logical.RowType; + +public class MockDynamicTableSink implements DynamicTableSink { + + private final String tableName; + private final RowType rowType; + + public MockDynamicTableSink(String tableName, RowType rowType) { + this.tableName = tableName; + this.rowType = rowType; + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + return requestedMode; + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + return SinkFunctionProvider.of(new MockSinkFunction(tableName, rowType)); + } + + @Override + public DynamicTableSink copy() { + return new MockDynamicTableSink(tableName, rowType); + } + + @Override + public String asSummaryString() { + return "Dinky Sink Mock"; + } +} diff --git a/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockDynamicTableSinkFactory.java b/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockDynamicTableSinkFactory.java new file mode 100644 index 0000000000..cec1a53034 --- /dev/null +++ b/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockDynamicTableSinkFactory.java @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.connector.mock.sink; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Collections; +import java.util.Set; + +public class MockDynamicTableSinkFactory implements DynamicTableSinkFactory { + public static final String IDENTIFIER = "dinky-mock"; + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + return new MockDynamicTableSink( + context.getObjectIdentifier().asSummaryString(), (RowType) context.getCatalogTable() + .getResolvedSchema() + .toPhysicalRowDataType() + .getLogicalType()); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } +} diff --git a/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockSinkFunction.java b/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockSinkFunction.java new file mode 100644 index 0000000000..2bb1ecc33d --- /dev/null +++ b/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockSinkFunction.java @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.connector.mock.sink; + +import org.apache.flink.api.common.accumulators.SerializedListAccumulator; +import org.apache.flink.api.common.typeutils.base.MapSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class MockSinkFunction extends RichSinkFunction { + private final RowType rowType; + private final String tableIdentifier; + // when columns is in VARCHAR or STRING type, rowData will be generated to BinaryStringData, which is not + // serialized, as a result, SerializedListAccumulator is used here + private final SerializedListAccumulator> rowDataList; + + public MockSinkFunction(String tableName, RowType rowType) { + this.rowType = rowType; + this.tableIdentifier = tableName; + this.rowDataList = new SerializedListAccumulator<>(); + } + + @Override + public void open(Configuration parameters) throws Exception { + getRuntimeContext().addAccumulator(tableIdentifier, rowDataList); + super.open(parameters); + } + + @Override + public void invoke(RowData rowData, Context context) throws Exception { + List fieldNames = rowType.getFieldNames(); + Map rowDataMap = new HashMap<>(); + for (int i = 0; i < fieldNames.size(); i++) { + RowData.FieldGetter fieldGetter = RowData.createFieldGetter(rowType.getTypeAt(i), i); + rowDataMap.put(fieldNames.get(i), String.valueOf(fieldGetter.getFieldOrNull(rowData))); + } + rowDataList.add(rowDataMap, new MapSerializer<>(new StringSerializer(), new StringSerializer())); + } +} diff --git a/dinky-core/src/main/java/org/dinky/data/result/MockResultBuilder.java b/dinky-core/src/main/java/org/dinky/data/result/MockResultBuilder.java new file mode 100644 index 0000000000..0b5ec2885a --- /dev/null +++ b/dinky-core/src/main/java/org/dinky/data/result/MockResultBuilder.java @@ -0,0 +1,201 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.data.result; + +import org.dinky.assertion.Asserts; +import org.dinky.job.JobHandler; +import org.dinky.utils.JsonUtils; + +import org.apache.flink.api.common.typeutils.base.MapSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.catalog.ResolvedSchema; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import alluxio.shaded.client.com.google.common.collect.Lists; +import cn.hutool.core.collection.ListUtil; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class MockResultBuilder extends AbstractResultBuilder implements ResultBuilder { + private final Integer maxRowNum; + private final boolean isAutoCancel; + private final String MOCK_RESULT_TABLE_IDENTIFIER = "dinkySinkResultTableIdentifier"; + private final String MOCK_RESULT_COLUMN_IDENTIFIER = "dinkySinkResultColumnIdentifier"; + + public MockResultBuilder(String id, Integer maxRowNum, boolean isAutoCancel) { + this.id = id; + this.maxRowNum = maxRowNum; + this.isAutoCancel = isAutoCancel; + } + + @Override + public IResult getResult(TableResult tableResult) { + Optional Optional = tableResult.getJobClient(); + if (Optional.isPresent()) { + JobClient jobClient = Optional.get(); + // get table identifiers + ResolvedSchema resolvedSchema = tableResult.getResolvedSchema(); + List tableIdentifierList = resolvedSchema.getColumnNames(); + // update row data map + Map>> rowDataMap = new HashMap<>(); + if (tableResult.getJobClient().isPresent()) { + while (!isAllSinkFinished(maxRowNum, rowDataMap, tableIdentifierList)) { + try { + Map accumulatorMap = + jobClient.getAccumulators().get(); + for (String tableIdentifier : tableIdentifierList) { + Object accumulatorObject = accumulatorMap.get(tableIdentifier); + if (accumulatorObject instanceof List) { + List list = (List) accumulatorObject; + for (Object obj : list) { + // deserialize data from accumulator + Map deserialize = deserializeObjFromBytes((byte[]) obj); + // update row data map + List> rowDataList = + rowDataMap.getOrDefault(tableIdentifier, new ArrayList<>()); + rowDataList.add(deserialize); + rowDataMap.put(tableIdentifier, ListUtil.sub(rowDataList, 0, maxRowNum)); + } + } + } + } catch (Exception e) { + // do nothing + } + } + } + if (isAutoCancel) { + try { + jobClient.cancel(); + } catch (Exception e) { + log.error("Cancel job failed, jobId: {}", id); + } + } + return new MockSinkResult(id, rowDataMap); + } else { + return MockSinkResult.buildFailed(); + } + } + + @Override + public IResult getResultWithPersistence(TableResult tableResult, JobHandler jobHandler) { + if (Objects.isNull(tableResult)) { + return MockSinkResult.buildFailed(); + } + MockSinkResult mockSinkResult = (MockSinkResult) getResult(tableResult); + // MockSinkResult -> SelectResult + SelectResult selectResult = new SelectResult( + id, + convertSinkRowData2SelectRowData(mockSinkResult.getTableRowData()), + generateMockResultColumns(mockSinkResult.getTableRowData())); + selectResult.setMockSinkResult(true); + selectResult.setDestroyed(Boolean.TRUE); + try { + ResultPool.put(selectResult); + jobHandler.persistResultData(Lists.newArrayList(this.id)); + } finally { + ResultPool.remove(id); + } + return selectResult; + } + + /** + * convert row data of mocked sink result to the type of select result + * + * @param tableRowData row data of {@link MockSinkResult} + * @return row data of {@link SelectResult} + */ + private List> convertSinkRowData2SelectRowData( + Map>> tableRowData) { + List> resultRowData = new ArrayList<>(); + for (Map.Entry>> entry : tableRowData.entrySet()) { + String tableIdentifier = entry.getKey(); + List> rowDataList = entry.getValue(); + for (Map rowDataElement : rowDataList) { + Map selectRowDataElement = new HashMap<>(); + // table name identifier + selectRowDataElement.put(MOCK_RESULT_TABLE_IDENTIFIER, tableIdentifier); + // row data + selectRowDataElement.putAll(rowDataElement); + resultRowData.add(selectRowDataElement); + } + } + return resultRowData; + } + + /** + * check if all sink has finished + * + * @param maxRowNum maximum row num of each table + * @param rowData row data map, key: table name, value: row data + * @param tableIdentifierList table identifier + * @return true if all tables has caught enough rows + */ + private boolean isAllSinkFinished( + int maxRowNum, Map>> rowData, List tableIdentifierList) { + if (tableIdentifierList.size() > rowData.size()) { + return false; + } + for (List> rowDataList : rowData.values()) { + if (Asserts.isNotNull(rowDataList) && rowDataList.size() < maxRowNum) { + return false; + } + } + return true; + } + + /** + * generate mock result column sets + * + * @param tableRowData row data of (@Link MockSinkResult} + * @return column sets {@link SelectResult} + */ + private LinkedHashSet generateMockResultColumns(Map>> tableRowData) { + LinkedHashSet resultColumn = new LinkedHashSet<>(); + for (Map.Entry>> entry : tableRowData.entrySet()) { + String tableIdentifier = entry.getKey(); + List> rowDataList = entry.getValue(); + Set columns = rowDataList.get(0).keySet(); + Map columnElement = new HashMap<>(); + columnElement.put(MOCK_RESULT_TABLE_IDENTIFIER, tableIdentifier); + columnElement.put(MOCK_RESULT_COLUMN_IDENTIFIER, columns); + resultColumn.add(JsonUtils.toJsonString(columnElement)); + } + return resultColumn; + } + + private static Map deserializeObjFromBytes(byte[] byteArr) throws IOException { + MapSerializer mapSerializer = + new MapSerializer<>(new StringSerializer(), new StringSerializer()); + return mapSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(byteArr))); + } +} diff --git a/dinky-core/src/main/java/org/dinky/data/result/MockSinkResult.java b/dinky-core/src/main/java/org/dinky/data/result/MockSinkResult.java new file mode 100644 index 0000000000..164bf2a2cc --- /dev/null +++ b/dinky-core/src/main/java/org/dinky/data/result/MockSinkResult.java @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.data.result; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; + +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Setter +@Getter +@NoArgsConstructor +public class MockSinkResult extends AbstractResult implements IResult { + + private String taskId; + private Map>> tableRowData; + private boolean truncationFlag = false; + private boolean isDestroyed; + + public MockSinkResult(String taskId, Map>> tableRowData) { + this.taskId = taskId; + this.tableRowData = tableRowData; + } + + public MockSinkResult(String taskId, boolean isDestroyed, boolean success) { + this.taskId = taskId; + this.isDestroyed = isDestroyed; + this.success = success; + this.endTime = LocalDateTime.now(); + } + + public static MockSinkResult buildSuccess(String taskId) { + return new MockSinkResult(taskId, false, true); + } + + public static MockSinkResult buildFailed() { + return new MockSinkResult(null, false, false); + } + + @Override + public String getJobId() { + return this.taskId; + } +} diff --git a/dinky-core/src/main/java/org/dinky/data/result/ResultBuilder.java b/dinky-core/src/main/java/org/dinky/data/result/ResultBuilder.java index 8d84d3dfd3..faae72a2ac 100644 --- a/dinky-core/src/main/java/org/dinky/data/result/ResultBuilder.java +++ b/dinky-core/src/main/java/org/dinky/data/result/ResultBuilder.java @@ -38,6 +38,17 @@ static ResultBuilder build( boolean isChangeLog, boolean isAutoCancel, String timeZone) { + return build(operationType, id, maxRowNum, isChangeLog, isAutoCancel, timeZone, false); + } + + static ResultBuilder build( + SqlType operationType, + String id, + Integer maxRowNum, + boolean isChangeLog, + boolean isAutoCancel, + String timeZone, + boolean isMockSinkFunction) { switch (operationType) { case SELECT: case WITH: @@ -47,7 +58,9 @@ static ResultBuilder build( case DESCRIBE: return new ShowResultBuilder(id); case INSERT: - return new InsertResultBuilder(); + return isMockSinkFunction + ? new MockResultBuilder(id, maxRowNum, isAutoCancel) + : new InsertResultBuilder(); default: return new DDLResultBuilder(); } diff --git a/dinky-core/src/main/java/org/dinky/data/result/SelectResult.java b/dinky-core/src/main/java/org/dinky/data/result/SelectResult.java index b7ea0f1aaf..d074eb68c3 100644 --- a/dinky-core/src/main/java/org/dinky/data/result/SelectResult.java +++ b/dinky-core/src/main/java/org/dinky/data/result/SelectResult.java @@ -53,6 +53,7 @@ public class SelectResult extends AbstractResult implements IResult { private LinkedHashSet columns; private boolean isDestroyed; private boolean truncationFlag = false; + private boolean isMockSinkResult = false; public SelectResult( List> rowData, diff --git a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java index 1b266f4858..98c3f63cb5 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java @@ -27,6 +27,7 @@ import org.dinky.data.result.SqlExplainResult; import org.dinky.executor.CustomTableEnvironment; import org.dinky.executor.Executor; +import org.dinky.explainer.mock.MockStatementExplainer; import org.dinky.explainer.print_table.PrintStatementExplainer; import org.dinky.function.data.model.UDF; import org.dinky.function.util.UDFUtil; @@ -159,9 +160,6 @@ public JobParam pretreatStatements(String[] statements) { } else if (transSqlTypeSet.contains(operationType)) { trans.add(new StatementParam(statement, operationType)); statementList.add(statement); - if (!useStatementSet) { - break; - } } else if (operationType.equals(SqlType.EXECUTE)) { execute.add(new StatementParam(statement, operationType)); } else if (operationType.equals(SqlType.PRINT)) { @@ -182,7 +180,14 @@ public JobParam pretreatStatements(String[] statements) { statementList.add(statement); } } - return new JobParam(statementList, ddl, trans, execute, CollUtil.removeNull(udfList), parsedSql.toString()); + JobParam jobParam = + new JobParam(statementList, ddl, trans, execute, CollUtil.removeNull(udfList), parsedSql.toString()); + + MockStatementExplainer.build(executor.getCustomTableEnvironment()) + .isMockSink(jobManager.getConfig().isMockSinkFunction()) + .jobParamMock(jobParam); + + return jobParam; } private Configuration getCombinationConfig() { diff --git a/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java b/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java new file mode 100644 index 0000000000..8f6f7873b9 --- /dev/null +++ b/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java @@ -0,0 +1,190 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.explainer.mock; + +import org.dinky.connector.mock.sink.MockDynamicTableSinkFactory; +import org.dinky.executor.CustomTableEnvironment; +import org.dinky.job.JobParam; +import org.dinky.job.StatementParam; +import org.dinky.parser.SqlType; +import org.dinky.utils.JsonUtils; + +import org.apache.calcite.config.Lex; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.dialect.AnsiSqlDialect; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ddl.CreateTableOperation; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class MockStatementExplainer { + + // Because calcite cannot parse flink sql ddl, a table environment is designed here for flink sql ddl pars + private final CustomTableEnvironment tableEnv; + private boolean isMockSink = false; + private final SqlParser.Config calciteConfig; + private final String DROP_TABLE_SQL_TEMPLATE = "DROP TABLE IF EXISTS {0}"; + private final String MOCK_SQL_TEMPLATE = "CREATE TABLE {0} ({1}) WITH ({2})"; + + public static MockStatementExplainer build(CustomTableEnvironment tableEnv) { + return new MockStatementExplainer(tableEnv); + } + + public MockStatementExplainer(CustomTableEnvironment tableEnv) { + this.tableEnv = tableEnv; + this.calciteConfig = SqlParser.config().withLex(Lex.JAVA); + } + + public MockStatementExplainer isMockSink(boolean isMockSink) { + this.isMockSink = isMockSink; + return this; + } + + public void jobParamMock(JobParam jobParam) { + if (isMockSink) { + mockSink(jobParam); + } + } + + /** + * The connector of insert tables will be changed to {@link MockDynamicTableSinkFactory} + * + * @param jobParam job param + */ + private void mockSink(JobParam jobParam) { + // Get table names that need to be mocked, and modify insert statement. + Set tablesNeedMock = getTableNamesNeedMockAndModifyTrans(jobParam); + // mock insert table ddl + List mockedDdl = new ArrayList<>(); + for (StatementParam ddl : jobParam.getDdl()) { + List parseOperationList = tableEnv.getParser().parse(ddl.getValue()); + for (Operation operation : parseOperationList) { + if (operation instanceof CreateTableOperation) { + CreateTableOperation createOperation = (CreateTableOperation) operation; + CatalogTable catalogTable = createOperation.getCatalogTable(); + // get table name and check if it should be mocked + String tableName = createOperation.getTableIdentifier().getObjectName(); + if (tablesNeedMock.contains(tableName)) { + // generate mock statement + mockedDdl.add( + new StatementParam(getSinkMockDdlStatement(tableName, catalogTable), SqlType.CREATE)); + } else { + mockedDdl.add(ddl); + } + } + } + } + jobParam.setDdl(mockedDdl); + log.debug("Mock sink succeed: {}", JsonUtils.toJsonString(jobParam)); + } + + /** + * get tables names of insert statements, these tables will be mocked + * + * @param jobParam jobParam + * @return a hash set, which contains all insert table names + */ + private Set getTableNamesNeedMockAndModifyTrans(JobParam jobParam) { + List transStatements = jobParam.getTrans(); + List mockedTransStatements = new ArrayList<>(); + Set insertTables = new HashSet<>(); + for (StatementParam statement : transStatements) { + if (statement.getType().equals(SqlType.INSERT)) { + try { + SqlInsert sqlInsert = (SqlInsert) SqlParser.create(statement.getValue(), calciteConfig) + .parseQuery(); + insertTables.add(sqlInsert.getTargetTable().toString()); + SqlInsert mockedInsertTrans = new SqlInsert( + sqlInsert.getParserPosition(), + SqlNodeList.EMPTY, + new SqlIdentifier( + generateMockedTableIdentifier( + sqlInsert.getTargetTable().toString()), + SqlParserPos.ZERO), + sqlInsert.getSource(), + sqlInsert.getTargetColumnList()); + mockedTransStatements.add(new StatementParam( + mockedInsertTrans + .toSqlString(AnsiSqlDialect.DEFAULT) + .toString(), + SqlType.INSERT)); + } catch (Exception e) { + log.error("Statement parse error, statement: {}", statement.getValue()); + } + } else { + mockedTransStatements.add(statement); + } + } + jobParam.setTrans(mockedTransStatements); + return insertTables; + } + + /** + * get mocked ddl statement + * + * @param tableName table name + * @param catalogTable catalog table + * @return ddl that connector is changed as well as other options not changed + */ + private String getSinkMockDdlStatement(String tableName, CatalogTable catalogTable) { + String mockedOption = "'connector'='" + MockDynamicTableSinkFactory.IDENTIFIER + "'"; + // columns + Schema unresolvedSchema = catalogTable.getUnresolvedSchema(); + String columns = unresolvedSchema.getColumns().stream() + .map(column -> { + Schema.UnresolvedPhysicalColumn physicalColumn = (Schema.UnresolvedPhysicalColumn) column; + return physicalColumn.getName() + " " + physicalColumn.getDataType(); + }) + .collect(Collectors.joining(", ")); + return MessageFormat.format( + MOCK_SQL_TEMPLATE, + StringUtils.join(generateMockedTableIdentifier(tableName), "."), + columns, + mockedOption); + } + + /** + * generate table identifier with mocked prefix info + * @param tableName table name + * @return table identifier with mocked prefix info + */ + private List generateMockedTableIdentifier(String tableName) { + List names = new ArrayList<>(); + names.add("default_catalog"); + names.add("default_database"); + names.add("mock_sink_" + tableName); + return names; + } +} diff --git a/dinky-core/src/main/java/org/dinky/job/Job.java b/dinky-core/src/main/java/org/dinky/job/Job.java index e352e6d8cc..724f46aab3 100644 --- a/dinky-core/src/main/java/org/dinky/job/Job.java +++ b/dinky-core/src/main/java/org/dinky/job/Job.java @@ -54,6 +54,7 @@ public class Job { private Executor executor; private boolean useGateway; private List jids; + private boolean isPipeline = true; @Getter public enum JobStatus { @@ -113,7 +114,8 @@ public JobResult getJobResult() { error, result, startTime, - endTime); + endTime, + isPipeline); } public boolean isFailed() { diff --git a/dinky-core/src/main/java/org/dinky/job/JobConfig.java b/dinky-core/src/main/java/org/dinky/job/JobConfig.java index 146357da23..8718e524aa 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobConfig.java +++ b/dinky-core/src/main/java/org/dinky/job/JobConfig.java @@ -183,6 +183,13 @@ public class JobConfig { notes = "Maximum number of rows") private Integer maxRowNum; + @ApiModelProperty( + value = "Flag indicating whether to mock sink function", + dataType = "boolean", + example = "true", + notes = "Flag indicating whether to mock sink function") + private boolean mockSinkFunction; + @ApiModelProperty(value = "Gateway configuration", dataType = "GatewayConfig", notes = "Gateway configuration") private GatewayConfig gatewayConfig; diff --git a/dinky-core/src/main/java/org/dinky/job/JobManager.java b/dinky-core/src/main/java/org/dinky/job/JobManager.java index b49a2a0203..0e5d7410d9 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobManager.java +++ b/dinky-core/src/main/java/org/dinky/job/JobManager.java @@ -108,6 +108,7 @@ public class JobManager { private boolean useGateway = false; private boolean isPlanMode = false; private boolean useStatementSet = false; + private boolean useMockSinkFunction = false; private boolean useRestAPI = false; private GatewayType runMode = GatewayType.LOCAL; private JobParam jobParam = null; @@ -214,6 +215,7 @@ public void init() { handler = JobHandler.build(); } useStatementSet = config.isStatementSet(); + useMockSinkFunction = config.isMockSinkFunction(); useRestAPI = SystemConfiguration.getInstances().isUseRestAPI(); executorConfig = config.getExecutorSetting(); executorConfig.setPlan(isPlanMode); diff --git a/dinky-core/src/main/java/org/dinky/job/JobResult.java b/dinky-core/src/main/java/org/dinky/job/JobResult.java index 6f127a6c5d..780fa65cd9 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobResult.java +++ b/dinky-core/src/main/java/org/dinky/job/JobResult.java @@ -114,6 +114,13 @@ public class JobResult { notes = "End time of job execution") private LocalDateTime endTime; + @ApiModelProperty( + value = "Flag indicating whether the job was pipeline", + dataType = "boolean", + example = "true", + notes = "Flag indicating whether the job was pipeline") + private boolean isPipeline; + public JobResult() {} public JobResult( @@ -127,7 +134,8 @@ public JobResult( String error, IResult result, LocalDateTime startTime, - LocalDateTime endTime) { + LocalDateTime endTime, + boolean isPipeline) { this.id = id; this.jobInstanceId = jobInstanceId; this.jobConfig = jobConfig; @@ -140,6 +148,7 @@ public JobResult( this.result = result; this.startTime = startTime; this.endTime = endTime; + this.isPipeline = isPipeline; } public void setStartTimeNow() { diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java index 74cb561296..cf10e0518c 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java @@ -42,14 +42,13 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.table.api.TableResult; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.stream.Collectors; /** * JobTransBuilder - * */ public class JobTransBuilder extends JobBuilder { @@ -67,17 +66,27 @@ public void run() throws Exception { return; } - if (useStatementSet) { + if (inferStatementSet()) { handleStatementSet(); - return; + } else { + handleNonStatementSet(); } + } - handleNonStatementSet(); + private boolean inferStatementSet() { + boolean hasInsert = false; + for (StatementParam item : jobParam.getTrans()) { + if (item.getType().equals(SqlType.INSERT)) { + hasInsert = true; + break; + } + } + return hasInsert; } private void handleStatementSet() throws Exception { - List inserts = collectInserts(); - + List inserts = + jobParam.getTrans().stream().map(StatementParam::getValue).collect(Collectors.toList()); if (useGateway) { processWithGateway(inserts); return; @@ -93,18 +102,6 @@ private void handleNonStatementSet() throws Exception { processFirstStatement(); } - private List collectInserts() { - List inserts = new ArrayList<>(); - List statementParams = useStatementSet - ? jobParam.getTrans() - : Collections.singletonList(jobParam.getTrans().get(0)); - for (StatementParam item : statementParams) { - - inserts.add(item.getValue()); - } - return inserts; - } - private void processWithGateway(List inserts) throws Exception { jobManager.setCurrentSql(String.join(FlinkSQLConstant.SEPARATOR, inserts)); GatewayResult gatewayResult = submitByGateway(inserts); @@ -120,7 +117,9 @@ private void processWithoutGateway(List inserts) throws Exception { } private void processSingleInsertWithGateway() throws Exception { - List singleInsert = collectInserts(); + List singleInsert = + Collections.singletonList(jobParam.getTrans().get(0).getValue()); + job.setPipeline(jobParam.getTrans().get(0).getType().isPipeline()); processWithGateway(singleInsert); } @@ -130,6 +129,7 @@ private void processFirstStatement() throws Exception { } // Only process the first statement when not using statement set StatementParam item = jobParam.getTrans().get(0); + job.setPipeline(item.getType().isPipeline()); jobManager.setCurrentSql(item.getValue()); processSingleStatement(item); } @@ -175,7 +175,8 @@ private void updateJobWithTableResult(TableResult tableResult, SqlType sqlType) config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel(), - executor.getTimeZone()) + executor.getTimeZone(), + jobManager.getConfig().isMockSinkFunction()) .getResultWithPersistence(tableResult, jobManager.getHandler()); job.setResult(result); } diff --git a/dinky-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/dinky-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index a0142e634d..73603caf76 100644 --- a/dinky-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/dinky-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -1 +1,2 @@ org.dinky.connector.printnet.sink.PrintNetDynamicTableSinkFactory +org.dinky.connector.mock.sink.MockDynamicTableSinkFactory diff --git a/dinky-web/src/locales/en-US/pages.ts b/dinky-web/src/locales/en-US/pages.ts index 47af5710e6..49ff29b80d 100644 --- a/dinky-web/src/locales/en-US/pages.ts +++ b/dinky-web/src/locales/en-US/pages.ts @@ -508,6 +508,8 @@ export default { 'pages.datastudio.label.execConfig.selectDatabase.tip': 'Select the database to be used', 'pages.datastudio.label.execConfig.maxrow': 'Maximum number of rows', 'pages.datastudio.label.execConfig.maxrow.tip': 'The maximum number of rows of preview data', + 'pages.datastudio.label.execConfig.mocksink': 'SinkMock', + 'pages.datastudio.label.execConfig.mocksink.tip': 'Mock the SinkFunction, result will not be written to the production environment during debugging, but can preview through dinky', 'pages.datastudio.label.jobConfig': 'Job Config', 'pages.datastudio.label.jobConfig.addConfig': 'Add Config item', 'pages.datastudio.label.jobConfig.addConfig.params': 'parameters', diff --git a/dinky-web/src/locales/zh-CN/pages.ts b/dinky-web/src/locales/zh-CN/pages.ts index d82b7b82c7..beb1331867 100644 --- a/dinky-web/src/locales/zh-CN/pages.ts +++ b/dinky-web/src/locales/zh-CN/pages.ts @@ -453,6 +453,8 @@ export default { 'pages.datastudio.label.execConfig.selectDatabase.tip': '选择 Sql 语句执行的数据源', 'pages.datastudio.label.execConfig.maxrow': '最大行数', 'pages.datastudio.label.execConfig.maxrow.tip': '预览数据的最大行数', + 'pages.datastudio.label.execConfig.mocksink': '开启SinkMock', + 'pages.datastudio.label.execConfig.mocksink.tip': '将SinkFunction进行Mock,调试过程中不会向线上环境执行写入,但可以通过dinky预览Sink结果', 'pages.datastudio.label.jobConfig': '作业配置', 'pages.datastudio.label.jobConfig.addConfig': '添加配置项', 'pages.datastudio.label.jobConfig.addConfig.params': '参数', diff --git a/dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx b/dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx index 338356afe8..1dbd17208a 100644 --- a/dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx +++ b/dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx @@ -18,7 +18,7 @@ */ import { - assert, + assert, convertMockResultToList, getCurrentData, getCurrentTab, isDataStudioTabsItemType, @@ -133,9 +133,9 @@ const Result = (props: any) => { } const consoleData = currentTabs.console; - if (consoleData.result && !isRefresh) { + if (consoleData.result && !isRefresh && !consoleData.result.mockSinkResult) { setData(consoleData.result); - } else if (consoleData.results && !isRefresh) { + } else if (consoleData.results && !isRefresh && !consoleData.result.mockSinkResult) { setDataList(consoleData.results); } else { if (assert(current?.dialect, [DIALECT.FLINK_SQL], true, 'includes')) { @@ -162,8 +162,14 @@ const Result = (props: any) => { ); const data = tableData.data; if (tableData.success && data?.success) { - consoleData.result = data; - setData(data); + //mockSinkResult + if (data.mockSinkResult == true) { + consoleData.results = convertMockResultToList(data); + setDataList(consoleData.results); + } else { + consoleData.result = data; + setData(data); + } } else { consoleData.result = {}; setData({}); @@ -270,7 +276,7 @@ const Result = (props: any) => { {dataList.map((data, index) => { return ( - + { }} {...SWITCH_OPTIONS()} /> + + }} + {...SWITCH_OPTIONS()} + /> ({ updateToolContentHeight: (key: number) => @@ -246,9 +246,9 @@ export const getFooterValue = (panes: any, activeKey: string): Partial { export const isEmpty = (value: any): boolean => { return !isNotEmpty(value); }; + +/** + * 转换多表的SelectResult + * @param data + */ +export const convertMockResultToList = (data: any): any [] => { + const rowDataResults: any[] = []; + // 对于每个MockResult的Column,一个元素代表一个表信息 + data.columns.forEach((columnString: string) => { + // 当前表的column信息 + let columnArr: string[] = []; + // 当前表的row data信息 + const rowDataArr: string[] = []; + // 表名 + let tableName: string = ''; + //解析当前表单信息 + const columnJsonInfo = JSON.parse(columnString); + // 提取column信息 + if (columnJsonInfo['dinkySinkResultColumnIdentifier']) { + columnArr = columnJsonInfo['dinkySinkResultColumnIdentifier'] + } + // 提取表名 + if (columnJsonInfo['dinkySinkResultTableIdentifier']) { + tableName = columnJsonInfo['dinkySinkResultTableIdentifier']; + } + // 遍历column信息 + data.rowData.forEach((rowDataElement: any) => { + if (rowDataElement.dinkySinkResultTableIdentifier == tableName) { + rowDataArr.push(rowDataElement); + } + }) + // 构建constant对象 + const rowDataResult = { + 'tableName': tableName, columns: columnArr, rowData: rowDataArr + }; + rowDataResults.push(rowDataResult); + }); + + console.log(rowDataResults) + return rowDataResults; +}; diff --git a/dinky-web/src/pages/DataStudioNew/CenterTabContent/SqlTask.tsx b/dinky-web/src/pages/DataStudioNew/CenterTabContent/SqlTask.tsx index 17c958daad..d990f71484 100644 --- a/dinky-web/src/pages/DataStudioNew/CenterTabContent/SqlTask.tsx +++ b/dinky-web/src/pages/DataStudioNew/CenterTabContent/SqlTask.tsx @@ -422,32 +422,28 @@ export const SqlTask = memo((props: FlinkSqlProps & any) => { updateAction({ actionType: DataStudioActionType.TASK_RUN_DEBUG, params: { - taskId: params.taskId + taskId: params.taskId, + columns: res.data?.result?.columns ?? [], + rowData: res.data?.result?.rowData ?? [], } }); setCurrentState((prevState) => { return { ...prevState, - status: res.data.status === 'SUCCESS' ? 'RUNNING' : res.data.status + status: res.data.status === 'SUCCESS' ? (res.data.pipeline?'RUNNING':'SUCCESS') : res.data.status }; }); } }, [currentState, updateAction]); const handleStop = useCallback(async () => { - const result = await cancelTask( - l('pages.datastudio.editor.stop.job'), - currentState.taskId, - false - ); - if (result.success) { - setCurrentState((prevState) => { - return { - ...prevState, - status: 'CANCEL' - }; - }); - } + const result = await cancelTask('', currentState.taskId, false); + setCurrentState((prevState) => { + return { + ...prevState, + status: 'CANCEL' + }; + }); }, [currentState.taskId]); const handleGotoDevOps = useCallback(async () => { diff --git a/dinky-web/src/pages/DataStudioNew/CenterTabContent/TaskConfig/index.tsx b/dinky-web/src/pages/DataStudioNew/CenterTabContent/TaskConfig/index.tsx index af9af656c7..0e3c855ae7 100644 --- a/dinky-web/src/pages/DataStudioNew/CenterTabContent/TaskConfig/index.tsx +++ b/dinky-web/src/pages/DataStudioNew/CenterTabContent/TaskConfig/index.tsx @@ -112,6 +112,15 @@ export default (props: { }} {...SWITCH_OPTIONS()} /> + + }} + {...SWITCH_OPTIONS()} + /> ); } diff --git a/dinky-web/src/pages/DataStudioNew/Toolbar/Service/Result/index.tsx b/dinky-web/src/pages/DataStudioNew/Toolbar/Service/Result/index.tsx index aed68315fe..40eabf00f0 100644 --- a/dinky-web/src/pages/DataStudioNew/Toolbar/Service/Result/index.tsx +++ b/dinky-web/src/pages/DataStudioNew/Toolbar/Service/Result/index.tsx @@ -63,9 +63,15 @@ export default (props: { taskId: number; action: any; dialect: string }) => { const [searchText, setSearchText] = useState(''); const [searchedColumn, setSearchedColumn] = useState(''); const searchInput = useRef(null); + useEffect(() => { - if (actionType === DataStudioActionType.TASK_PREVIEW_RESULT) { - setData({ columns: params.columns, rowData: params.rowData }); + if (actionType === DataStudioActionType.TASK_PREVIEW_RESULT + || actionType === DataStudioActionType.TASK_RUN_DEBUG) { + if (data.mockSinkResult == true) { + setDataList(convertMockResultToList({ columns: params.columns, rowData: params.rowData })) + } else { + setData({ columns: params.columns, rowData: params.rowData }); + } } }, [props.action]); @@ -73,6 +79,7 @@ export default (props: { taskId: number; action: any; dialect: string }) => { clearFilters(); setSearchText(''); }; + const handleSearch = ( selectedKeys: string[], confirm: (param?: FilterConfirmProps) => void, @@ -87,6 +94,42 @@ export default (props: { taskId: number; action: any; dialect: string }) => { setSearchedColumn(''); } }; + + const convertMockResultToList = (data: any): any [] => { + const rowDataResults: any[] = []; + // 对于每个MockResult的Column,一个元素代表一个表信息 + data.columns.forEach((columnString: string) => { + // 当前表的column信息 + let columnArr: string[] = []; + // 当前表的row data信息 + const rowDataArr: string[] = []; + // 表名 + let tableName: string = ''; + //解析当前表单信息 + const columnJsonInfo = JSON.parse(columnString); + // 提取column信息 + if (columnJsonInfo['dinkySinkResultColumnIdentifier']) { + columnArr = columnJsonInfo['dinkySinkResultColumnIdentifier'] + } + // 提取表名 + if (columnJsonInfo['dinkySinkResultTableIdentifier']) { + tableName = columnJsonInfo['dinkySinkResultTableIdentifier']; + } + // 遍历column信息 + data.rowData.forEach((rowDataElement: any) => { + if (rowDataElement.dinkySinkResultTableIdentifier == tableName) { + rowDataArr.push(rowDataElement); + } + }) + // 构建constant对象 + const rowDataResult = { + 'tableName': tableName, columns: columnArr, rowData: rowDataArr + }; + rowDataResults.push(rowDataResult); + }); + return rowDataResults; + }; + const getColumnSearchProps = (dataIndex: string): ColumnType => ({ filterDropdown: ({ setSelectedKeys, selectedKeys, confirm, clearFilters }) => (
e.stopPropagation()}> @@ -154,9 +197,14 @@ export default (props: { taskId: number; action: any; dialect: string }) => { ); const data = tableData.data; if (tableData.success && data?.success) { - setData(data); + if (data.mockSinkResult == true) { + setDataList(convertMockResultToList(data)) + } else { + setData(data); + } } else { setData({}); + setDataList([]) } } @@ -193,7 +241,7 @@ export default (props: { taskId: number; action: any; dialect: string }) => { const renderFlinkSQLContent = () => { return ( <> - {!isSql(dialect) && !data.destroyed ? ( + {!isSql(dialect) ? ( @@ -201,6 +249,7 @@ export default (props: { taskId: number; action: any; dialect: string }) => { ); }; + const renderDownloadButton = () => { if (data.columns) { const _utf = '\uFEFF'; @@ -212,6 +261,7 @@ export default (props: { taskId: number; action: any; dialect: string }) => { } return undefined; }; + const renderAVA = () => { return (