Skip to content

Commit

Permalink
Merge branch 'refs/heads/dev' into fork/Zzm0809/Add-tag-right-click-t…
Browse files Browse the repository at this point in the history
…o-disable-function

# Conflicts:
#	dinky-web/src/pages/DataStudioNew/CenterTabContent/SqlTask.tsx
  • Loading branch information
zackyoungh committed Nov 12, 2024
2 parents 3fb87d8 + 11630fc commit 16948d8
Show file tree
Hide file tree
Showing 27 changed files with 872 additions and 70 deletions.
7 changes: 7 additions & 0 deletions dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ public class TaskDTO extends AbstractStatementDTO {
notes = "Flag indicating whether to use auto-canceling")
private boolean useAutoCancel = true;

@ApiModelProperty(
value = "Flag indicating whether to mock sink function",
dataType = "boolean",
example = "true",
notes = "Flag indicating whether to mock sink function")
private boolean mockSinkFunction = true;

@ApiModelProperty(value = "Session", dataType = "String", example = "session_id", notes = "The session identifier")
private String session;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,13 @@ public boolean success() {
history.setClusterId(clusterId);
historyService.updateById(history);

if (!job.getJobConfig().isStatementSet()) {
if (!job.isPipeline()) {
return true;
}

if (Asserts.isNullCollection(job.getJids()) || Asserts.isNullString(job.getJobManagerAddress())) {
throw new BusException("The JobID or JobManagerAddress is null. ");
if (Asserts.isNullCollection(job.getJids())) {
throw new BusException("Job ID retrieval failed, possibly due to timeout of job deployment. "
+ "Please modify the system configuration to increase the waiting time for job submission.");
}

JobInstance jobInstance = history.buildJobInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,6 @@ public JobResult submitTask(TaskSubmitDto submitDto) throws Exception {
public JobResult debugTask(TaskDTO task) throws Exception {
// Debug mode need return result
task.setUseResult(true);
// Debug mode need execute
task.setStatementSet(false);
// mode check
if (GatewayType.get(task.getType()).isDeployCluster()) {
throw new BusException(Status.MODE_IS_NOT_ALLOW_SELECT.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public enum SqlType {
private static final List<SqlType> TRANS_SQL_TYPES =
Lists.newArrayList(INSERT, SELECT, WITH, SHOW, DESCRIBE, DESC, CTAS);

private static final List<SqlType> PIPELINE_SQL_TYPES = Lists.newArrayList(INSERT, SELECT, WITH, CTAS);

SqlType(String type, String regrex, SqlCategory category) {
this.type = type;
this.pattern = Pattern.compile(regrex, Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
Expand All @@ -104,4 +106,8 @@ public boolean match(String statement) {
public static List<SqlType> getTransSqlTypes() {
return TRANS_SQL_TYPES;
}

public boolean isPipeline() {
return PIPELINE_SQL_TYPES.contains(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.connector.mock.sink;

import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.types.logical.RowType;

public class MockDynamicTableSink implements DynamicTableSink {

private final String tableName;
private final RowType rowType;

public MockDynamicTableSink(String tableName, RowType rowType) {
this.tableName = tableName;
this.rowType = rowType;
}

@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return requestedMode;
}

@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return SinkFunctionProvider.of(new MockSinkFunction(tableName, rowType));
}

@Override
public DynamicTableSink copy() {
return new MockDynamicTableSink(tableName, rowType);
}

@Override
public String asSummaryString() {
return "Dinky Sink Mock";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.connector.mock.sink;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.types.logical.RowType;

import java.util.Collections;
import java.util.Set;

public class MockDynamicTableSinkFactory implements DynamicTableSinkFactory {
public static final String IDENTIFIER = "dinky-mock";

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
return new MockDynamicTableSink(
context.getObjectIdentifier().asSummaryString(), (RowType) context.getCatalogTable()
.getResolvedSchema()
.toPhysicalRowDataType()
.getLogicalType());
}

@Override
public String factoryIdentifier() {
return IDENTIFIER;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
return Collections.emptySet();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.connector.mock.sink;

import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MockSinkFunction extends RichSinkFunction<RowData> {
private final RowType rowType;
private final String tableIdentifier;
// when columns is in VARCHAR or STRING type, rowData will be generated to BinaryStringData, which is not
// serialized, as a result, SerializedListAccumulator is used here
private final SerializedListAccumulator<Map<String, String>> rowDataList;

public MockSinkFunction(String tableName, RowType rowType) {
this.rowType = rowType;
this.tableIdentifier = tableName;
this.rowDataList = new SerializedListAccumulator<>();
}

@Override
public void open(Configuration parameters) throws Exception {
getRuntimeContext().addAccumulator(tableIdentifier, rowDataList);
super.open(parameters);
}

@Override
public void invoke(RowData rowData, Context context) throws Exception {
List<String> fieldNames = rowType.getFieldNames();
Map<String, String> rowDataMap = new HashMap<>();
for (int i = 0; i < fieldNames.size(); i++) {
RowData.FieldGetter fieldGetter = RowData.createFieldGetter(rowType.getTypeAt(i), i);
rowDataMap.put(fieldNames.get(i), String.valueOf(fieldGetter.getFieldOrNull(rowData)));
}
rowDataList.add(rowDataMap, new MapSerializer<>(new StringSerializer(), new StringSerializer()));
}
}
Loading

0 comments on commit 16948d8

Please sign in to comment.