Skip to content

Commit

Permalink
[Feature-3893][core][admin][web] Flink SQL task Support insert result…
Browse files Browse the repository at this point in the history
… preview (#3897)
  • Loading branch information
MactavishCui authored Nov 11, 2024
1 parent 0efce63 commit 40e32e0
Show file tree
Hide file tree
Showing 22 changed files with 803 additions and 24 deletions.
7 changes: 7 additions & 0 deletions dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ public class TaskDTO extends AbstractStatementDTO {
notes = "Flag indicating whether to use auto-canceling")
private boolean useAutoCancel = true;

@ApiModelProperty(
value = "Flag indicating whether to mock sink function",
dataType = "boolean",
example = "true",
notes = "Flag indicating whether to mock sink function")
private boolean mockSinkFunction = true;

@ApiModelProperty(value = "Session", dataType = "String", example = "session_id", notes = "The session identifier")
private String session;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public JobResult debugTask(TaskDTO task) throws Exception {
// Debug mode need return result
task.setUseResult(true);
// Debug mode need execute
task.setStatementSet(false);
task.setStatementSet(task.isMockSinkFunction());
// mode check
if (GatewayType.get(task.getType()).isDeployCluster()) {
throw new BusException(Status.MODE_IS_NOT_ALLOW_SELECT.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public enum SqlType {

INSERT("INSERT", "^INSERT.*", SqlCategory.DML),

MOCKED_INSERT("MOCKED_INSERT", "^MOCKED_INSERT.*", SqlCategory.DML),

DESC("DESC", "^DESC.*", SqlCategory.DDL),

DESCRIBE("DESCRIBE", "^DESCRIBE.*", SqlCategory.DDL),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.connector.mock.sink;

import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.types.logical.RowType;

public class MockDynamicTableSink implements DynamicTableSink {

private final String tableName;
private final RowType rowType;

public MockDynamicTableSink(String tableName, RowType rowType) {
this.tableName = tableName;
this.rowType = rowType;
}

@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return requestedMode;
}

@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return SinkFunctionProvider.of(new MockSinkFunction(tableName, rowType));
}

@Override
public DynamicTableSink copy() {
return new MockDynamicTableSink(tableName, rowType);
}

@Override
public String asSummaryString() {
return "Dinky Sink Mock";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.connector.mock.sink;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.types.logical.RowType;

import java.util.Collections;
import java.util.Set;

public class MockDynamicTableSinkFactory implements DynamicTableSinkFactory {
public static final String IDENTIFIER = "dinky-mock";

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
return new MockDynamicTableSink(
context.getObjectIdentifier().asSummaryString(), (RowType) context.getCatalogTable()
.getResolvedSchema()
.toPhysicalRowDataType()
.getLogicalType());
}

@Override
public String factoryIdentifier() {
return IDENTIFIER;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
return Collections.emptySet();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.connector.mock.sink;

import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MockSinkFunction extends RichSinkFunction<RowData> {
private final RowType rowType;
private final String tableIdentifier;
// when columns is in VARCHAR or STRING type, rowData will be generated to BinaryStringData, which is not
// serialized, as a result, SerializedListAccumulator is used here
private final SerializedListAccumulator<Map<String, String>> rowDataList;

public MockSinkFunction(String tableName, RowType rowType) {
this.rowType = rowType;
this.tableIdentifier = tableName;
this.rowDataList = new SerializedListAccumulator<>();
}

@Override
public void open(Configuration parameters) throws Exception {
getRuntimeContext().addAccumulator(tableIdentifier, rowDataList);
super.open(parameters);
}

@Override
public void invoke(RowData rowData, Context context) throws Exception {
List<String> fieldNames = rowType.getFieldNames();
Map<String, String> rowDataMap = new HashMap<>();
for (int i = 0; i < fieldNames.size(); i++) {
RowData.FieldGetter fieldGetter = RowData.createFieldGetter(rowType.getTypeAt(i), i);
rowDataMap.put(fieldNames.get(i), String.valueOf(fieldGetter.getFieldOrNull(rowData)));
}
rowDataList.add(rowDataMap, new MapSerializer<>(new StringSerializer(), new StringSerializer()));
}
}
Loading

0 comments on commit 40e32e0

Please sign in to comment.