Skip to content

Commit

Permalink
Debugging a normal job returns multiple query result sets
Browse files Browse the repository at this point in the history
  • Loading branch information
yangzehan committed Dec 20, 2023
1 parent 01272a4 commit cba23b4
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.enums.ProcessStepType;
import org.dinky.data.enums.Status;
import org.dinky.data.exception.TaskNotDoneException;
import org.dinky.data.model.Column;
import org.dinky.data.model.DataBase;
import org.dinky.data.model.QueryData;
Expand All @@ -44,10 +45,7 @@
import org.apache.commons.lang3.StringUtils;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -275,7 +273,6 @@ public JobResult executeCommonSql(SqlDTO sqlDTO) {
result.setEndTime(LocalDateTime.now());
return result;
}

DataBase dataBase = getById(sqlDTO.getDatabaseId());
if (Asserts.isNull(dataBase)) {
result.setSuccess(false);
Expand Down Expand Up @@ -334,24 +331,29 @@ public JobResult StreamExecuteCommonSql(SqlDTO sqlDTO) {
result.setEndTime(LocalDateTime.now());
return result;
}
List<JdbcSelectResult> jdbcSelectResults=new ArrayList<>();
try (Driver driver = Driver.build(dataBase.getDriverConfig())) {

Stream<JdbcSelectResult> jdbcSelectResultStream =
driver.StreamExecuteSql(sqlDTO.getStatement(), sqlDTO.getMaxRowNum());
List<JdbcSelectResult> jdbcSelectResults = jdbcSelectResultStream
.takeWhile(res -> {
if (!res.isSuccess()) {
result.setSuccess(false);
result.setError(res.getError());
result.setEndTime(LocalDateTime.now());
return false;
}
return true;
})
.collect(Collectors.toList());
if (result.getError() == null) {
result.setSuccess(true);
jdbcSelectResultStream.forEach(res->{
jdbcSelectResults.add(res);
if (!res.isSuccess()){
throw new RuntimeException();
}
});
result.setResults(jdbcSelectResults);
result.setSuccess(true);
return result;
}
catch (RuntimeException e){
if (!jdbcSelectResults.isEmpty()){
result.setError(jdbcSelectResults.get(jdbcSelectResults.size()-1).getError());
}
else {
result.setError(e.getMessage());
}
result.setSuccess(false);
result.setEndTime(LocalDateTime.now());
result.setResults(jdbcSelectResults);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ public Stream<JdbcSelectResult> StreamExecuteSql(String sql, Integer limit) {
return result;
}
}
result.success();
return result;
});
}
Expand Down

0 comments on commit cba23b4

Please sign in to comment.