Skip to content

Commit

Permalink
code style optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
MactavishCui committed Nov 2, 2024
1 parent 0df34e4 commit defa77d
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 84 deletions.
8 changes: 4 additions & 4 deletions dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,10 @@ public class TaskDTO extends AbstractStatementDTO {
private boolean useAutoCancel = true;

@ApiModelProperty(
value = "Flag indicating whether to mock sink function",
dataType = "boolean",
example = "true",
notes = "Flag indicating whether to mock sink function")
value = "Flag indicating whether to mock sink function",
dataType = "boolean",
example = "true",
notes = "Flag indicating whether to mock sink function")
private boolean mockSinkFunction = true;

@ApiModelProperty(value = "Session", dataType = "String", example = "session_id", notes = "The session identifier")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

package org.dinky.parser;

import com.google.common.collect.Lists;

import java.util.List;
import java.util.regex.Pattern;

import com.google.common.collect.Lists;

/**
* SqlType
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
package org.dinky.connector.mock.sink;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.connector.mock.sink;

import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
Expand All @@ -11,9 +29,7 @@ public class MockDynamicTableSink implements DynamicTableSink {
private final String tableName;
private final RowType rowType;

public MockDynamicTableSink(
String tableName,
RowType rowType) {
public MockDynamicTableSink(String tableName, RowType rowType) {
this.tableName = tableName;
this.rowType = rowType;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,26 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.connector.mock.sink;

import static org.apache.flink.configuration.ConfigOptions.key;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
Expand All @@ -8,8 +29,6 @@
import java.util.Collections;
import java.util.Set;

import static org.apache.flink.configuration.ConfigOptions.key;

public class MockDynamicTableSinkFactory implements DynamicTableSinkFactory {
public static final String IDENTIFIER = "dinky-mock";
public static final ConfigOption<String> MOCK_IDENTIFIER = key("mock-identifier")
Expand All @@ -19,8 +38,11 @@ public class MockDynamicTableSinkFactory implements DynamicTableSinkFactory {

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
return new MockDynamicTableSink(context.getObjectIdentifier().asSummaryString(),
(RowType) context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType().getLogicalType());
return new MockDynamicTableSink(
context.getObjectIdentifier().asSummaryString(), (RowType) context.getCatalogTable()
.getResolvedSchema()
.toPhysicalRowDataType()
.getLogicalType());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.connector.mock.sink;

import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
Expand All @@ -15,12 +34,11 @@
public class MockSinkFunction extends RichSinkFunction<RowData> {
private final RowType rowType;
private final String tableIdentifier;
// when columns is in VARCHAR or STRING type, rowData will be generated to BinaryStringData, which is not serialized, as a result, SerializedListAccumulator is used here
// when columns is in VARCHAR or STRING type, rowData will be generated to BinaryStringData, which is not
// serialized, as a result, SerializedListAccumulator is used here
private final SerializedListAccumulator<Map<String, String>> rowDataList;

public MockSinkFunction(
String tableName,
RowType rowType) {
public MockSinkFunction(String tableName, RowType rowType) {
this.rowType = rowType;
this.tableIdentifier = tableName;
this.rowDataList = new SerializedListAccumulator<>();
Expand All @@ -42,4 +60,4 @@ public void invoke(RowData rowData, Context context) throws Exception {
}
rowDataList.add(rowDataMap, new MapSerializer<>(new StringSerializer(), new StringSerializer()));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,31 +1,57 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.data.result;

import alluxio.shaded.client.com.google.common.collect.Lists;
import cn.hutool.core.collection.ListUtil;
import lombok.extern.slf4j.Slf4j;
import org.dinky.assertion.Asserts;
import org.dinky.job.JobHandler;
import org.dinky.utils.JsonUtils;

import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.dinky.assertion.Asserts;
import org.dinky.job.JobHandler;
import org.dinky.utils.JsonUtils;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

@Slf4j
import alluxio.shaded.client.com.google.common.collect.Lists;
import cn.hutool.core.collection.ListUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MockResultBuilder extends AbstractResultBuilder implements ResultBuilder {
private final Integer maxRowNum;
private final boolean isAutoCancel;
private final String MOCK_RESULT_TABLE_IDENTIFIER = "dinkySinkResultTableIdentifier";
private final String MOCK_RESULT_COLUMN_IDENTIFIER = "dinkySinkResultColumnIdentifier";


public MockResultBuilder(String id, Integer maxRowNum, boolean isAutoCancel) {
this.id = id;
this.maxRowNum = maxRowNum;
Expand All @@ -45,7 +71,8 @@ public IResult getResult(TableResult tableResult) {
if (tableResult.getJobClient().isPresent()) {
while (!isAllSinkFinished(maxRowNum, rowDataMap, tableIdentifierList)) {
try {
Map<String, Object> accumulatorMap = jobClient.getAccumulators().get();
Map<String, Object> accumulatorMap =
jobClient.getAccumulators().get();
for (String tableIdentifier : tableIdentifierList) {
Object accumulatorObject = accumulatorMap.get(tableIdentifier);
if (accumulatorObject instanceof List) {
Expand All @@ -54,7 +81,8 @@ public IResult getResult(TableResult tableResult) {
// deserialize data from accumulator
Map<String, String> deserialize = deserializeObjFromBytes((byte[]) obj);
// update row data map
List<Map<String, String>> rowDataList = rowDataMap.getOrDefault(tableIdentifier, new ArrayList<>());
List<Map<String, String>> rowDataList =
rowDataMap.getOrDefault(tableIdentifier, new ArrayList<>());
rowDataList.add(deserialize);
rowDataMap.put(tableIdentifier, ListUtil.sub(rowDataList, 0, maxRowNum));
}
Expand Down Expand Up @@ -85,8 +113,10 @@ public IResult getResultWithPersistence(TableResult tableResult, JobHandler jobH
}
MockSinkResult mockSinkResult = (MockSinkResult) getResult(tableResult);
// MockSinkResult -> SelectResult
SelectResult selectResult =
new SelectResult(id, convertSinkRowData2SelectRowData(mockSinkResult.getTableRowData()), generateMockResultColumns(mockSinkResult.getTableRowData()));
SelectResult selectResult = new SelectResult(
id,
convertSinkRowData2SelectRowData(mockSinkResult.getTableRowData()),
generateMockResultColumns(mockSinkResult.getTableRowData()));
selectResult.setMockSinkResult(true);
selectResult.setDestroyed(Boolean.TRUE);
try {
Expand All @@ -104,7 +134,8 @@ public IResult getResultWithPersistence(TableResult tableResult, JobHandler jobH
* @param tableRowData row data of {@link MockSinkResult}
* @return row data of {@link SelectResult}
*/
private List<Map<String, Object>> convertSinkRowData2SelectRowData(Map<String, List<Map<String, String>>> tableRowData) {
private List<Map<String, Object>> convertSinkRowData2SelectRowData(
Map<String, List<Map<String, String>>> tableRowData) {
List<Map<String, Object>> resultRowData = new ArrayList<>();
for (Map.Entry<String, List<Map<String, String>>> entry : tableRowData.entrySet()) {
String tableIdentifier = entry.getKey();
Expand All @@ -129,7 +160,8 @@ private List<Map<String, Object>> convertSinkRowData2SelectRowData(Map<String, L
* @param tableIdentifierList table identifier
* @return true if all tables has caught enough rows
*/
private boolean isAllSinkFinished(int maxRowNum, Map<String, List<Map<String, String>>> rowData, List<String> tableIdentifierList) {
private boolean isAllSinkFinished(
int maxRowNum, Map<String, List<Map<String, String>>> rowData, List<String> tableIdentifierList) {
if (tableIdentifierList.size() > rowData.size()) {
return false;
}
Expand Down Expand Up @@ -162,8 +194,8 @@ private LinkedHashSet<String> generateMockResultColumns(Map<String, List<Map<Str
}

private static Map<String, String> deserializeObjFromBytes(byte[] byteArr) throws IOException {
MapSerializer<String, String> mapSerializer = new MapSerializer<>(new StringSerializer(), new StringSerializer());
MapSerializer<String, String> mapSerializer =
new MapSerializer<>(new StringSerializer(), new StringSerializer());
return mapSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(byteArr)));
}

}
}
32 changes: 24 additions & 8 deletions dinky-core/src/main/java/org/dinky/data/result/MockSinkResult.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,33 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.data.result;

import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;

@Slf4j
@Setter
@Getter
Expand All @@ -20,9 +39,7 @@ public class MockSinkResult extends AbstractResult implements IResult {
private boolean truncationFlag = false;
private boolean isDestroyed;

public MockSinkResult(
String taskId,
Map<String, List<Map<String, String>>> tableRowData) {
public MockSinkResult(String taskId, Map<String, List<Map<String, String>>> tableRowData) {
this.taskId = taskId;
this.tableRowData = tableRowData;
}
Expand All @@ -46,5 +63,4 @@ public static MockSinkResult buildFailed() {
public String getJobId() {
return this.taskId;
}

}
Loading

0 comments on commit defa77d

Please sign in to comment.