diff --git a/dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java b/dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java index cfb419983d..213af7fd5d 100644 --- a/dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java +++ b/dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java @@ -208,6 +208,13 @@ public class TaskDTO extends AbstractStatementDTO { notes = "Flag indicating whether to use auto-canceling") private boolean useAutoCancel = true; + @ApiModelProperty( + value = "Flag indicating whether to mock sink function", + dataType = "boolean", + example = "true", + notes = "Flag indicating whether to mock sink function") + private boolean mockSinkFunction = true; + @ApiModelProperty(value = "Session", dataType = "String", example = "session_id", notes = "The session identifier") private String session; diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index 0d8a4c6fe0..fb08f9c9fd 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -356,7 +356,7 @@ public JobResult debugTask(TaskDTO task) throws Exception { // Debug mode need return result task.setUseResult(true); // Debug mode need execute - task.setStatementSet(false); + task.setStatementSet(task.isMockSinkFunction()); // mode check if (GatewayType.get(task.getType()).isDeployCluster()) { throw new BusException(Status.MODE_IS_NOT_ALLOW_SELECT.getMessage()); diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/parser/SqlType.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/parser/SqlType.java index 96cabd266c..c3053cf7eb 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/parser/SqlType.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/parser/SqlType.java @@ -40,6 +40,8 @@ public enum SqlType { INSERT("INSERT", "^INSERT.*", SqlCategory.DML), + MOCKED_INSERT("MOCKED_INSERT", "^MOCKED_INSERT.*", SqlCategory.DML), + DESC("DESC", "^DESC.*", SqlCategory.DDL), DESCRIBE("DESCRIBE", "^DESCRIBE.*", SqlCategory.DDL), diff --git a/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockDynamicTableSink.java b/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockDynamicTableSink.java new file mode 100644 index 0000000000..85a55ec650 --- /dev/null +++ b/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockDynamicTableSink.java @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.connector.mock.sink; + +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.types.logical.RowType; + +public class MockDynamicTableSink implements DynamicTableSink { + + private final String tableName; + private final RowType rowType; + + public MockDynamicTableSink(String tableName, RowType rowType) { + this.tableName = tableName; + this.rowType = rowType; + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + return requestedMode; + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + return SinkFunctionProvider.of(new MockSinkFunction(tableName, rowType)); + } + + @Override + public DynamicTableSink copy() { + return new MockDynamicTableSink(tableName, rowType); + } + + @Override + public String asSummaryString() { + return "Dinky Sink Mock"; + } +} diff --git a/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockDynamicTableSinkFactory.java b/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockDynamicTableSinkFactory.java new file mode 100644 index 0000000000..cec1a53034 --- /dev/null +++ b/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockDynamicTableSinkFactory.java @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.connector.mock.sink; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Collections; +import java.util.Set; + +public class MockDynamicTableSinkFactory implements DynamicTableSinkFactory { + public static final String IDENTIFIER = "dinky-mock"; + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + return new MockDynamicTableSink( + context.getObjectIdentifier().asSummaryString(), (RowType) context.getCatalogTable() + .getResolvedSchema() + .toPhysicalRowDataType() + .getLogicalType()); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } +} diff --git a/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockSinkFunction.java b/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockSinkFunction.java new file mode 100644 index 0000000000..2bb1ecc33d --- /dev/null +++ b/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockSinkFunction.java @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.connector.mock.sink; + +import org.apache.flink.api.common.accumulators.SerializedListAccumulator; +import org.apache.flink.api.common.typeutils.base.MapSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class MockSinkFunction extends RichSinkFunction { + private final RowType rowType; + private final String tableIdentifier; + // when columns is in VARCHAR or STRING type, rowData will be generated to BinaryStringData, which is not + // serialized, as a result, SerializedListAccumulator is used here + private final SerializedListAccumulator> rowDataList; + + public MockSinkFunction(String tableName, RowType rowType) { + this.rowType = rowType; + this.tableIdentifier = tableName; + this.rowDataList = new SerializedListAccumulator<>(); + } + + @Override + public void open(Configuration parameters) throws Exception { + getRuntimeContext().addAccumulator(tableIdentifier, rowDataList); + super.open(parameters); + } + + @Override + public void invoke(RowData rowData, Context context) throws Exception { + List fieldNames = rowType.getFieldNames(); + Map rowDataMap = new HashMap<>(); + for (int i = 0; i < fieldNames.size(); i++) { + RowData.FieldGetter fieldGetter = RowData.createFieldGetter(rowType.getTypeAt(i), i); + rowDataMap.put(fieldNames.get(i), String.valueOf(fieldGetter.getFieldOrNull(rowData))); + } + rowDataList.add(rowDataMap, new MapSerializer<>(new StringSerializer(), new StringSerializer())); + } +} diff --git a/dinky-core/src/main/java/org/dinky/data/result/MockResultBuilder.java b/dinky-core/src/main/java/org/dinky/data/result/MockResultBuilder.java new file mode 100644 index 0000000000..0b5ec2885a --- /dev/null +++ b/dinky-core/src/main/java/org/dinky/data/result/MockResultBuilder.java @@ -0,0 +1,201 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.data.result; + +import org.dinky.assertion.Asserts; +import org.dinky.job.JobHandler; +import org.dinky.utils.JsonUtils; + +import org.apache.flink.api.common.typeutils.base.MapSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.catalog.ResolvedSchema; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import alluxio.shaded.client.com.google.common.collect.Lists; +import cn.hutool.core.collection.ListUtil; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class MockResultBuilder extends AbstractResultBuilder implements ResultBuilder { + private final Integer maxRowNum; + private final boolean isAutoCancel; + private final String MOCK_RESULT_TABLE_IDENTIFIER = "dinkySinkResultTableIdentifier"; + private final String MOCK_RESULT_COLUMN_IDENTIFIER = "dinkySinkResultColumnIdentifier"; + + public MockResultBuilder(String id, Integer maxRowNum, boolean isAutoCancel) { + this.id = id; + this.maxRowNum = maxRowNum; + this.isAutoCancel = isAutoCancel; + } + + @Override + public IResult getResult(TableResult tableResult) { + Optional Optional = tableResult.getJobClient(); + if (Optional.isPresent()) { + JobClient jobClient = Optional.get(); + // get table identifiers + ResolvedSchema resolvedSchema = tableResult.getResolvedSchema(); + List tableIdentifierList = resolvedSchema.getColumnNames(); + // update row data map + Map>> rowDataMap = new HashMap<>(); + if (tableResult.getJobClient().isPresent()) { + while (!isAllSinkFinished(maxRowNum, rowDataMap, tableIdentifierList)) { + try { + Map accumulatorMap = + jobClient.getAccumulators().get(); + for (String tableIdentifier : tableIdentifierList) { + Object accumulatorObject = accumulatorMap.get(tableIdentifier); + if (accumulatorObject instanceof List) { + List list = (List) accumulatorObject; + for (Object obj : list) { + // deserialize data from accumulator + Map deserialize = deserializeObjFromBytes((byte[]) obj); + // update row data map + List> rowDataList = + rowDataMap.getOrDefault(tableIdentifier, new ArrayList<>()); + rowDataList.add(deserialize); + rowDataMap.put(tableIdentifier, ListUtil.sub(rowDataList, 0, maxRowNum)); + } + } + } + } catch (Exception e) { + // do nothing + } + } + } + if (isAutoCancel) { + try { + jobClient.cancel(); + } catch (Exception e) { + log.error("Cancel job failed, jobId: {}", id); + } + } + return new MockSinkResult(id, rowDataMap); + } else { + return MockSinkResult.buildFailed(); + } + } + + @Override + public IResult getResultWithPersistence(TableResult tableResult, JobHandler jobHandler) { + if (Objects.isNull(tableResult)) { + return MockSinkResult.buildFailed(); + } + MockSinkResult mockSinkResult = (MockSinkResult) getResult(tableResult); + // MockSinkResult -> SelectResult + SelectResult selectResult = new SelectResult( + id, + convertSinkRowData2SelectRowData(mockSinkResult.getTableRowData()), + generateMockResultColumns(mockSinkResult.getTableRowData())); + selectResult.setMockSinkResult(true); + selectResult.setDestroyed(Boolean.TRUE); + try { + ResultPool.put(selectResult); + jobHandler.persistResultData(Lists.newArrayList(this.id)); + } finally { + ResultPool.remove(id); + } + return selectResult; + } + + /** + * convert row data of mocked sink result to the type of select result + * + * @param tableRowData row data of {@link MockSinkResult} + * @return row data of {@link SelectResult} + */ + private List> convertSinkRowData2SelectRowData( + Map>> tableRowData) { + List> resultRowData = new ArrayList<>(); + for (Map.Entry>> entry : tableRowData.entrySet()) { + String tableIdentifier = entry.getKey(); + List> rowDataList = entry.getValue(); + for (Map rowDataElement : rowDataList) { + Map selectRowDataElement = new HashMap<>(); + // table name identifier + selectRowDataElement.put(MOCK_RESULT_TABLE_IDENTIFIER, tableIdentifier); + // row data + selectRowDataElement.putAll(rowDataElement); + resultRowData.add(selectRowDataElement); + } + } + return resultRowData; + } + + /** + * check if all sink has finished + * + * @param maxRowNum maximum row num of each table + * @param rowData row data map, key: table name, value: row data + * @param tableIdentifierList table identifier + * @return true if all tables has caught enough rows + */ + private boolean isAllSinkFinished( + int maxRowNum, Map>> rowData, List tableIdentifierList) { + if (tableIdentifierList.size() > rowData.size()) { + return false; + } + for (List> rowDataList : rowData.values()) { + if (Asserts.isNotNull(rowDataList) && rowDataList.size() < maxRowNum) { + return false; + } + } + return true; + } + + /** + * generate mock result column sets + * + * @param tableRowData row data of (@Link MockSinkResult} + * @return column sets {@link SelectResult} + */ + private LinkedHashSet generateMockResultColumns(Map>> tableRowData) { + LinkedHashSet resultColumn = new LinkedHashSet<>(); + for (Map.Entry>> entry : tableRowData.entrySet()) { + String tableIdentifier = entry.getKey(); + List> rowDataList = entry.getValue(); + Set columns = rowDataList.get(0).keySet(); + Map columnElement = new HashMap<>(); + columnElement.put(MOCK_RESULT_TABLE_IDENTIFIER, tableIdentifier); + columnElement.put(MOCK_RESULT_COLUMN_IDENTIFIER, columns); + resultColumn.add(JsonUtils.toJsonString(columnElement)); + } + return resultColumn; + } + + private static Map deserializeObjFromBytes(byte[] byteArr) throws IOException { + MapSerializer mapSerializer = + new MapSerializer<>(new StringSerializer(), new StringSerializer()); + return mapSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(byteArr))); + } +} diff --git a/dinky-core/src/main/java/org/dinky/data/result/MockSinkResult.java b/dinky-core/src/main/java/org/dinky/data/result/MockSinkResult.java new file mode 100644 index 0000000000..164bf2a2cc --- /dev/null +++ b/dinky-core/src/main/java/org/dinky/data/result/MockSinkResult.java @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.data.result; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; + +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Setter +@Getter +@NoArgsConstructor +public class MockSinkResult extends AbstractResult implements IResult { + + private String taskId; + private Map>> tableRowData; + private boolean truncationFlag = false; + private boolean isDestroyed; + + public MockSinkResult(String taskId, Map>> tableRowData) { + this.taskId = taskId; + this.tableRowData = tableRowData; + } + + public MockSinkResult(String taskId, boolean isDestroyed, boolean success) { + this.taskId = taskId; + this.isDestroyed = isDestroyed; + this.success = success; + this.endTime = LocalDateTime.now(); + } + + public static MockSinkResult buildSuccess(String taskId) { + return new MockSinkResult(taskId, false, true); + } + + public static MockSinkResult buildFailed() { + return new MockSinkResult(null, false, false); + } + + @Override + public String getJobId() { + return this.taskId; + } +} diff --git a/dinky-core/src/main/java/org/dinky/data/result/ResultBuilder.java b/dinky-core/src/main/java/org/dinky/data/result/ResultBuilder.java index 8d84d3dfd3..1984cb7402 100644 --- a/dinky-core/src/main/java/org/dinky/data/result/ResultBuilder.java +++ b/dinky-core/src/main/java/org/dinky/data/result/ResultBuilder.java @@ -48,6 +48,8 @@ static ResultBuilder build( return new ShowResultBuilder(id); case INSERT: return new InsertResultBuilder(); + case MOCKED_INSERT: + return new MockResultBuilder(id, maxRowNum, isAutoCancel); default: return new DDLResultBuilder(); } diff --git a/dinky-core/src/main/java/org/dinky/data/result/SelectResult.java b/dinky-core/src/main/java/org/dinky/data/result/SelectResult.java index b7ea0f1aaf..d074eb68c3 100644 --- a/dinky-core/src/main/java/org/dinky/data/result/SelectResult.java +++ b/dinky-core/src/main/java/org/dinky/data/result/SelectResult.java @@ -53,6 +53,7 @@ public class SelectResult extends AbstractResult implements IResult { private LinkedHashSet columns; private boolean isDestroyed; private boolean truncationFlag = false; + private boolean isMockSinkResult = false; public SelectResult( List> rowData, diff --git a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java index 1b266f4858..53f08b4e68 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java @@ -27,6 +27,7 @@ import org.dinky.data.result.SqlExplainResult; import org.dinky.executor.CustomTableEnvironment; import org.dinky.executor.Executor; +import org.dinky.explainer.mock.MockStatementExplainer; import org.dinky.explainer.print_table.PrintStatementExplainer; import org.dinky.function.data.model.UDF; import org.dinky.function.util.UDFUtil; @@ -182,7 +183,14 @@ public JobParam pretreatStatements(String[] statements) { statementList.add(statement); } } - return new JobParam(statementList, ddl, trans, execute, CollUtil.removeNull(udfList), parsedSql.toString()); + JobParam jobParam = + new JobParam(statementList, ddl, trans, execute, CollUtil.removeNull(udfList), parsedSql.toString()); + + MockStatementExplainer.build(executor.getCustomTableEnvironment()) + .isMockSink(jobManager.getConfig().isMockSinkFunction()) + .jobParamMock(jobParam); + + return jobParam; } private Configuration getCombinationConfig() { diff --git a/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java b/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java new file mode 100644 index 0000000000..5dd1323856 --- /dev/null +++ b/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java @@ -0,0 +1,193 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.explainer.mock; + +import org.dinky.connector.mock.sink.MockDynamicTableSinkFactory; +import org.dinky.executor.CustomTableEnvironment; +import org.dinky.job.JobParam; +import org.dinky.job.StatementParam; +import org.dinky.parser.SqlType; +import org.dinky.utils.JsonUtils; + +import org.apache.calcite.config.Lex; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.dialect.AnsiSqlDialect; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ddl.CreateTableOperation; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class MockStatementExplainer { + + // Because calcite cannot parse flink sql ddl, a table environment is designed here for flink sql ddl pars + private final CustomTableEnvironment tableEnv; + private boolean isMockSink = false; + private final SqlParser.Config calciteConfig; + private final String DROP_TABLE_SQL_TEMPLATE = "DROP TABLE IF EXISTS {0}"; + private final String MOCK_SQL_TEMPLATE = "CREATE TABLE {0} ({1}) WITH ({2})"; + + public static MockStatementExplainer build(CustomTableEnvironment tableEnv) { + return new MockStatementExplainer(tableEnv); + } + + public MockStatementExplainer(CustomTableEnvironment tableEnv) { + this.tableEnv = tableEnv; + this.calciteConfig = SqlParser.config().withLex(Lex.JAVA); + } + + public MockStatementExplainer isMockSink(boolean isMockSink) { + this.isMockSink = isMockSink; + return this; + } + + public void jobParamMock(JobParam jobParam) { + if (isMockSink) { + mockSink(jobParam); + } + } + + /** + * The connector of insert tables will be changed to {@link MockDynamicTableSinkFactory} + * + * @param jobParam job param + */ + private void mockSink(JobParam jobParam) { + // Based on insert statements, get table names need to be mocked, and modify insert statements' target table + Set tablesNeedMock = getTableNamesNeedMockAndModifyTrans(jobParam); + // mock insert table ddl + List mockedDdl = new ArrayList<>(); + + for (StatementParam ddl : jobParam.getDdl()) { + List parseOperationList = tableEnv.getParser().parse(ddl.getValue()); + + for (Operation operation : parseOperationList) { + if (operation instanceof CreateTableOperation) { + CreateTableOperation createOperation = (CreateTableOperation) operation; + CatalogTable catalogTable = createOperation.getCatalogTable(); + // get table name and check if it should be mocked + String tableName = createOperation.getTableIdentifier().getObjectName(); + if (tablesNeedMock.contains(tableName)) { + // drop table first + mockedDdl.add(new StatementParam( + MessageFormat.format(DROP_TABLE_SQL_TEMPLATE, generateMockedTableName(tableName)), + SqlType.DROP)); + // generate mock statement + mockedDdl.add( + new StatementParam(getSinkMockDdlStatement(tableName, catalogTable), SqlType.CREATE)); + } else { + mockedDdl.add(ddl); + } + } + } + } + jobParam.setDdl(mockedDdl); + log.info("Mock sink succeed: {}", JsonUtils.toJsonString(jobParam)); + } + + /** + * get tables names of insert statements, these tables will be mocked + * + * @param jobParam jobParam + * @return a hash set, which contains all insert table names + */ + private Set getTableNamesNeedMockAndModifyTrans(JobParam jobParam) { + List transStatements = jobParam.getTrans(); + List mockedTransStatements = new ArrayList<>(); + Set insertTables = new HashSet<>(); + for (StatementParam statement : transStatements) { + if (statement.getType().equals(SqlType.INSERT)) { + try { + SqlInsert sqlInsert = (SqlInsert) SqlParser.create(statement.getValue(), calciteConfig) + .parseQuery(); + insertTables.add(sqlInsert.getTargetTable().toString()); + SqlInsert mockedInsertTrans = new SqlInsert( + sqlInsert.getParserPosition(), + SqlNodeList.EMPTY, + new SqlIdentifier( + generateMockedTableName( + sqlInsert.getTargetTable().toString()), + SqlParserPos.ZERO), + sqlInsert.getSource(), + sqlInsert.getTargetColumnList()); + mockedTransStatements.add(new StatementParam( + mockedInsertTrans + .toSqlString(AnsiSqlDialect.DEFAULT) + .toString(), + SqlType.INSERT)); + } catch (Exception e) { + log.error("Statement parse error, statement: {}", statement.getValue()); + } + } + } + jobParam.setTrans(mockedTransStatements); + return insertTables; + } + + /** + * get mocked ddl statement + * + * @param tableName table name + * @param catalogTable catalog table + * @return ddl that connector is changed as well as other options not changed + */ + private String getSinkMockDdlStatement(String tableName, CatalogTable catalogTable) { + // options + Map optionsMap = catalogTable.getOptions(); + optionsMap.put("connector", MockDynamicTableSinkFactory.IDENTIFIER); + List withOptionList = new ArrayList<>(optionsMap.size()); + for (Map.Entry entry : optionsMap.entrySet()) { + withOptionList.add("'" + entry.getKey() + "' = '" + entry.getValue() + "'"); + } + String mockedWithOption = String.join(", ", withOptionList); + // columns + Schema unresolvedSchema = catalogTable.getUnresolvedSchema(); + String columns = unresolvedSchema.getColumns().stream() + .map(column -> { + Schema.UnresolvedPhysicalColumn physicalColumn = (Schema.UnresolvedPhysicalColumn) column; + return physicalColumn.getName() + " " + physicalColumn.getDataType(); + }) + .collect(Collectors.joining(", ")); + return MessageFormat.format(MOCK_SQL_TEMPLATE, generateMockedTableName(tableName), columns, mockedWithOption); + } + + /** + * generate table name with mocked prefix + * @param tableName table name + * @return table name with mocked prefix + */ + private String generateMockedTableName(String tableName) { + return "mock_sink_" + tableName; + } +} diff --git a/dinky-core/src/main/java/org/dinky/job/JobConfig.java b/dinky-core/src/main/java/org/dinky/job/JobConfig.java index 146357da23..8718e524aa 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobConfig.java +++ b/dinky-core/src/main/java/org/dinky/job/JobConfig.java @@ -183,6 +183,13 @@ public class JobConfig { notes = "Maximum number of rows") private Integer maxRowNum; + @ApiModelProperty( + value = "Flag indicating whether to mock sink function", + dataType = "boolean", + example = "true", + notes = "Flag indicating whether to mock sink function") + private boolean mockSinkFunction; + @ApiModelProperty(value = "Gateway configuration", dataType = "GatewayConfig", notes = "Gateway configuration") private GatewayConfig gatewayConfig; diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java index 74cb561296..f750c039f2 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java @@ -49,7 +49,6 @@ /** * JobTransBuilder - * */ public class JobTransBuilder extends JobBuilder { @@ -115,7 +114,11 @@ private void processWithoutGateway(List inserts) throws Exception { if (!inserts.isEmpty()) { jobManager.setCurrentSql(String.join(FlinkSQLConstant.SEPARATOR, inserts)); TableResult tableResult = executor.executeStatementSet(inserts); - updateJobWithTableResult(tableResult); + if (jobManager.getConfig().isMockSinkFunction()) { + updateJobWithTableResult(tableResult, SqlType.MOCKED_INSERT); + } else { + updateJobWithTableResult(tableResult); + } } } diff --git a/dinky-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/dinky-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index a0142e634d..73603caf76 100644 --- a/dinky-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/dinky-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -1 +1,2 @@ org.dinky.connector.printnet.sink.PrintNetDynamicTableSinkFactory +org.dinky.connector.mock.sink.MockDynamicTableSinkFactory diff --git a/dinky-web/src/locales/en-US/pages.ts b/dinky-web/src/locales/en-US/pages.ts index 47af5710e6..49ff29b80d 100644 --- a/dinky-web/src/locales/en-US/pages.ts +++ b/dinky-web/src/locales/en-US/pages.ts @@ -508,6 +508,8 @@ export default { 'pages.datastudio.label.execConfig.selectDatabase.tip': 'Select the database to be used', 'pages.datastudio.label.execConfig.maxrow': 'Maximum number of rows', 'pages.datastudio.label.execConfig.maxrow.tip': 'The maximum number of rows of preview data', + 'pages.datastudio.label.execConfig.mocksink': 'SinkMock', + 'pages.datastudio.label.execConfig.mocksink.tip': 'Mock the SinkFunction, result will not be written to the production environment during debugging, but can preview through dinky', 'pages.datastudio.label.jobConfig': 'Job Config', 'pages.datastudio.label.jobConfig.addConfig': 'Add Config item', 'pages.datastudio.label.jobConfig.addConfig.params': 'parameters', diff --git a/dinky-web/src/locales/zh-CN/pages.ts b/dinky-web/src/locales/zh-CN/pages.ts index d82b7b82c7..beb1331867 100644 --- a/dinky-web/src/locales/zh-CN/pages.ts +++ b/dinky-web/src/locales/zh-CN/pages.ts @@ -453,6 +453,8 @@ export default { 'pages.datastudio.label.execConfig.selectDatabase.tip': '选择 Sql 语句执行的数据源', 'pages.datastudio.label.execConfig.maxrow': '最大行数', 'pages.datastudio.label.execConfig.maxrow.tip': '预览数据的最大行数', + 'pages.datastudio.label.execConfig.mocksink': '开启SinkMock', + 'pages.datastudio.label.execConfig.mocksink.tip': '将SinkFunction进行Mock,调试过程中不会向线上环境执行写入,但可以通过dinky预览Sink结果', 'pages.datastudio.label.jobConfig': '作业配置', 'pages.datastudio.label.jobConfig.addConfig': '添加配置项', 'pages.datastudio.label.jobConfig.addConfig.params': '参数', diff --git a/dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx b/dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx index 338356afe8..1dbd17208a 100644 --- a/dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx +++ b/dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx @@ -18,7 +18,7 @@ */ import { - assert, + assert, convertMockResultToList, getCurrentData, getCurrentTab, isDataStudioTabsItemType, @@ -133,9 +133,9 @@ const Result = (props: any) => { } const consoleData = currentTabs.console; - if (consoleData.result && !isRefresh) { + if (consoleData.result && !isRefresh && !consoleData.result.mockSinkResult) { setData(consoleData.result); - } else if (consoleData.results && !isRefresh) { + } else if (consoleData.results && !isRefresh && !consoleData.result.mockSinkResult) { setDataList(consoleData.results); } else { if (assert(current?.dialect, [DIALECT.FLINK_SQL], true, 'includes')) { @@ -162,8 +162,14 @@ const Result = (props: any) => { ); const data = tableData.data; if (tableData.success && data?.success) { - consoleData.result = data; - setData(data); + //mockSinkResult + if (data.mockSinkResult == true) { + consoleData.results = convertMockResultToList(data); + setDataList(consoleData.results); + } else { + consoleData.result = data; + setData(data); + } } else { consoleData.result = {}; setData({}); @@ -270,7 +276,7 @@ const Result = (props: any) => { {dataList.map((data, index) => { return ( - + { }} {...SWITCH_OPTIONS()} /> + + }} + {...SWITCH_OPTIONS()} + /> ({ updateToolContentHeight: (key: number) => @@ -246,9 +246,9 @@ export const getFooterValue = (panes: any, activeKey: string): Partial { export const isEmpty = (value: any): boolean => { return !isNotEmpty(value); }; + +/** + * 转换多表的SelectResult + * @param data + */ +export const convertMockResultToList = (data: any): any [] => { + const rowDataResults: any[] = []; + // 对于每个MockResult的Column,一个元素代表一个表信息 + data.columns.forEach((columnString: string) => { + // 当前表的column信息 + let columnArr: string[] = []; + // 当前表的row data信息 + const rowDataArr: string[] = []; + // 表名 + let tableName: string = ''; + //解析当前表单信息 + const columnJsonInfo = JSON.parse(columnString); + // 提取column信息 + if (columnJsonInfo['dinkySinkResultColumnIdentifier']) { + columnArr = columnJsonInfo['dinkySinkResultColumnIdentifier'] + } + // 提取表名 + if (columnJsonInfo['dinkySinkResultTableIdentifier']) { + tableName = columnJsonInfo['dinkySinkResultTableIdentifier']; + } + // 遍历column信息 + data.rowData.forEach((rowDataElement: any) => { + if (rowDataElement.dinkySinkResultTableIdentifier == tableName) { + rowDataArr.push(rowDataElement); + } + }) + // 构建constant对象 + const rowDataResult = { + 'tableName': tableName, columns: columnArr, rowData: rowDataArr + }; + rowDataResults.push(rowDataResult); + }); + + console.log(rowDataResults) + return rowDataResults; +}; diff --git a/dinky-web/src/pages/DataStudioNew/CenterTabContent/TaskConfig/index.tsx b/dinky-web/src/pages/DataStudioNew/CenterTabContent/TaskConfig/index.tsx index af9af656c7..0e3c855ae7 100644 --- a/dinky-web/src/pages/DataStudioNew/CenterTabContent/TaskConfig/index.tsx +++ b/dinky-web/src/pages/DataStudioNew/CenterTabContent/TaskConfig/index.tsx @@ -112,6 +112,15 @@ export default (props: { }} {...SWITCH_OPTIONS()} /> + + }} + {...SWITCH_OPTIONS()} + /> ); } diff --git a/dinky-web/src/pages/DataStudioNew/Toolbar/Service/Result/index.tsx b/dinky-web/src/pages/DataStudioNew/Toolbar/Service/Result/index.tsx index aed68315fe..303380464c 100644 --- a/dinky-web/src/pages/DataStudioNew/Toolbar/Service/Result/index.tsx +++ b/dinky-web/src/pages/DataStudioNew/Toolbar/Service/Result/index.tsx @@ -65,7 +65,11 @@ export default (props: { taskId: number; action: any; dialect: string }) => { const searchInput = useRef(null); useEffect(() => { if (actionType === DataStudioActionType.TASK_PREVIEW_RESULT) { - setData({ columns: params.columns, rowData: params.rowData }); + if (data.mockSinkResult == true) { + setDataList(convertMockResultToList({ columns: params.columns, rowData: params.rowData })) + } else { + setData({ columns: params.columns, rowData: params.rowData }); + } } }, [props.action]); @@ -87,6 +91,41 @@ export default (props: { taskId: number; action: any; dialect: string }) => { setSearchedColumn(''); } }; + const convertMockResultToList = (data: any): any [] => { + const rowDataResults: any[] = []; + // 对于每个MockResult的Column,一个元素代表一个表信息 + data.columns.forEach((columnString: string) => { + // 当前表的column信息 + let columnArr: string[] = []; + // 当前表的row data信息 + const rowDataArr: string[] = []; + // 表名 + let tableName: string = ''; + //解析当前表单信息 + const columnJsonInfo = JSON.parse(columnString); + // 提取column信息 + if (columnJsonInfo['dinkySinkResultColumnIdentifier']) { + columnArr = columnJsonInfo['dinkySinkResultColumnIdentifier'] + } + // 提取表名 + if (columnJsonInfo['dinkySinkResultTableIdentifier']) { + tableName = columnJsonInfo['dinkySinkResultTableIdentifier']; + } + // 遍历column信息 + data.rowData.forEach((rowDataElement: any) => { + if (rowDataElement.dinkySinkResultTableIdentifier == tableName) { + rowDataArr.push(rowDataElement); + } + }) + // 构建constant对象 + const rowDataResult = { + 'tableName': tableName, columns: columnArr, rowData: rowDataArr + }; + rowDataResults.push(rowDataResult); + }); + + return rowDataResults; + }; const getColumnSearchProps = (dataIndex: string): ColumnType => ({ filterDropdown: ({ setSelectedKeys, selectedKeys, confirm, clearFilters }) => (
e.stopPropagation()}> @@ -154,9 +193,14 @@ export default (props: { taskId: number; action: any; dialect: string }) => { ); const data = tableData.data; if (tableData.success && data?.success) { - setData(data); + if (data.mockSinkResult == true) { + setDataList(convertMockResultToList(data)) + } else { + setData(data); + } } else { setData({}); + setDataList([]) } } @@ -272,7 +316,7 @@ export default (props: { taskId: number; action: any; dialect: string }) => { {dataList.map((data, index) => { return ( - +