Skip to content

Commit

Permalink
[fix](jni) remove 'push_down_predicates' and fix BE crash with decima…
Browse files Browse the repository at this point in the history
…l predicate (apache#32253)
  • Loading branch information
morningman authored Mar 15, 2024
1 parent 1db78fe commit 5887154
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 54 deletions.
17 changes: 10 additions & 7 deletions be/src/vec/exec/jni_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,16 @@ Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) {

Status JniConnector::init(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
_generate_predicates(colname_to_value_range);
if (_predicates_length != 0 && _predicates != nullptr) {
int64_t predicates_address = (int64_t)_predicates.get();
// We can call org.apache.doris.common.jni.vec.ScanPredicate#parseScanPredicates to parse the
// serialized predicates in java side.
_scanner_params.emplace("push_down_predicates", std::to_string(predicates_address));
}
// TODO: This logic need to be changed.
// See the comment of "predicates" field in JniScanner.java

// _generate_predicates(colname_to_value_range);
// if (_predicates_length != 0 && _predicates != nullptr) {
// int64_t predicates_address = (int64_t)_predicates.get();
// // We can call org.apache.doris.common.jni.vec.ScanPredicate#parseScanPredicates to parse the
// // serialized predicates in java side.
// _scanner_params.emplace("push_down_predicates", std::to_string(predicates_address));
// }
return Status::OK();
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/jni_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ class JniConnector {
char_ptr += s->size;
}
} else {
// FIXME: it can not handle decimal type correctly.
// but this logic is deprecated and not used.
// so may be deleted or fixed later.
for (const CppType* v : values) {
int type_len = sizeof(CppType);
*reinterpret_cast<int*>(char_ptr) = type_len;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.common.jni.vec.TableSchema;
import org.apache.doris.thrift.TFileType;

Expand Down Expand Up @@ -173,7 +172,7 @@ private void initDataReader() {
try {
initAvroFileContext();
initFieldInspector();
initTableInfo(requiredTypes, requiredFields, new ScanPredicate[0], fetchSize);
initTableInfo(requiredTypes, requiredFields, fetchSize);
} catch (Exception e) {
LOG.warn("Failed to init avro scanner. ", e);
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopUGI;

Expand Down Expand Up @@ -59,7 +58,6 @@ public class HudiJniScanner extends JniScanner {
private final int fetchSize;
private final String debugString;
private final HoodieSplit split;
private final ScanPredicate[] predicates;
private final ClassLoader classLoader;

private long getRecordReaderTimeNs = 0;
Expand Down Expand Up @@ -123,20 +121,8 @@ public HudiJniScanner(int fetchSize, Map<String, String> params) {
.collect(Collectors.joining("\n"));
try {
this.classLoader = this.getClass().getClassLoader();
String predicatesAddressString = params.remove("push_down_predicates");
this.fetchSize = fetchSize;
this.split = new HoodieSplit(params);
if (predicatesAddressString == null) {
predicates = new ScanPredicate[0];
} else {
long predicatesAddress = Long.parseLong(predicatesAddressString);
if (predicatesAddress != 0) {
predicates = ScanPredicate.parseScanPredicates(predicatesAddress, split.requiredTypes());
LOG.info("HudiJniScanner gets pushed-down predicates: " + ScanPredicate.dump(predicates));
} else {
predicates = new ScanPredicate[0];
}
}
} catch (Exception e) {
LOG.error("Failed to initialize hudi scanner, split params:\n" + debugString, e);
throw e;
Expand All @@ -147,7 +133,7 @@ public HudiJniScanner(int fetchSize, Map<String, String> params) {
public void open() throws IOException {
Future<?> avroFuture = avroReadPool.submit(() -> {
Thread.currentThread().setContextClassLoader(classLoader);
initTableInfo(split.requiredTypes(), split.requiredFields(), predicates, fetchSize);
initTableInfo(split.requiredTypes(), split.requiredFields(), fetchSize);
long startTime = System.nanoTime();
// RecordReader will use ProcessBuilder to start a hotspot process, which may be stuck,
// so use another process to kill this stuck process.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ public abstract class JniScanner {
protected VectorTable vectorTable;
protected String[] fields;
protected ColumnType[] types;
@Deprecated
// This predicate is from BE, but no used.
// TODO: actually, we can generate the predicate for JNI scanner in FE's planner,
// then serialize it to BE, and BE pass it to JNI scanner directly.
// NO need to use this intermediate expression, because each JNI scanner has its
// own predicate expression format.
// For example, Paimon use "PaimonScannerUtils.decodeStringToObject(paimonPredicate)"
// to deserialize the predicate string to PaimonPredicate object.
protected ScanPredicate[] predicates;
protected int batchSize;

Expand All @@ -50,11 +58,9 @@ protected TableSchema parseTableSchema() throws UnsupportedOperationException {
throw new UnsupportedOperationException();
}

protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, ScanPredicate[] predicates,
int batchSize) {
protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, int batchSize) {
this.types = requiredTypes;
this.fields = requiredFields;
this.predicates = predicates;
this.batchSize = batchSize;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ColumnValue;
import org.apache.doris.common.jni.vec.ScanPredicate;

import org.apache.log4j.Logger;

Expand Down Expand Up @@ -187,15 +186,7 @@ public MockJniScanner(int batchSize, Map<String, String> params) {
for (int i = 0; i < types.length; i++) {
columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
}
ScanPredicate[] predicates = new ScanPredicate[0];
if (params.containsKey("push_down_predicates")) {
long predicatesAddress = Long.parseLong(params.get("push_down_predicates"));
if (predicatesAddress != 0) {
predicates = ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes);
LOG.info("MockJniScanner gets pushed-down predicates: " + ScanPredicate.dump(predicates));
}
}
initTableInfo(columnTypes, requiredFields, predicates, batchSize);
initTableInfo(columnTypes, requiredFields, batchSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ScanPredicate;

import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsType;
Expand Down Expand Up @@ -99,15 +98,7 @@ public MaxComputeJniScanner(int batchSize, Map<String, String> params) {
for (int i = 0; i < types.length; i++) {
columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
}
ScanPredicate[] predicates = new ScanPredicate[0];
if (params.containsKey("push_down_predicates")) {
long predicatesAddress = Long.parseLong(params.get("push_down_predicates"));
if (predicatesAddress != 0) {
predicates = ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes);
LOG.info("MaxComputeJniScanner gets pushed-down predicates: " + ScanPredicate.dump(predicates));
}
}
initTableInfo(columnTypes, requiredFields, predicates, batchSize);
initTableInfo(columnTypes, requiredFields, batchSize);
}

public void refreshTableScan() {
Expand All @@ -133,9 +124,8 @@ public String tableUniqKey() {
}

@Override
protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, ScanPredicate[] predicates,
int batchSize) {
super.initTableInfo(requiredTypes, requiredFields, predicates, batchSize);
protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, int batchSize) {
super.initTableInfo(requiredTypes, requiredFields, batchSize);
readColumns = new ArrayList<>();
readColumnsToId = new HashMap<>();
for (int i = 0; i < fields.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.common.jni.vec.TableSchema;
import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey;
import org.apache.doris.paimon.PaimonTableCache.TableExt;
Expand Down Expand Up @@ -82,7 +81,7 @@ public PaimonJniScanner(int batchSize, Map<String, String> params) {
dbId = Long.parseLong(params.get("db_id"));
tblId = Long.parseLong(params.get("tbl_id"));
lastUpdateTime = Long.parseLong(params.get("last_update_time"));
initTableInfo(columnTypes, requiredFields, new ScanPredicate[0], batchSize);
initTableInfo(columnTypes, requiredFields, batchSize);
paimonOptionParams = params.entrySet().stream()
.filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX))
.collect(Collectors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.common.jni.vec.TableSchema;
import org.apache.doris.trinoconnector.TrinoConnectorCache.TrinoConnectorCacheKey;
import org.apache.doris.trinoconnector.TrinoConnectorCache.TrinoConnectorCacheValue;
Expand Down Expand Up @@ -128,7 +127,6 @@ public TrinoConnectorJniScanner(int batchSize, Map<String, String> params) {
catalogNameString = params.get("catalog_name");
super.batchSize = batchSize;
super.fields = params.get("required_fields").split(",");
super.predicates = new ScanPredicate[0];

connectorSplitString = params.get("trino_connector_split");
connectorTableHandleString = params.get("trino_connector_table_handle");
Expand Down

0 comments on commit 5887154

Please sign in to comment.