From 588715496fbaebaa16dd331cbeb3f9a4170f0203 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Fri, 15 Mar 2024 22:23:06 +0800 Subject: [PATCH] [fix](jni) remove 'push_down_predicates' and fix BE crash with decimal predicate (#32253) --- be/src/vec/exec/jni_connector.cpp | 17 ++++++++++------- be/src/vec/exec/jni_connector.h | 3 +++ .../org/apache/doris/avro/AvroJNIScanner.java | 3 +-- .../org/apache/doris/hudi/HudiJniScanner.java | 16 +--------------- .../org/apache/doris/common/jni/JniScanner.java | 12 +++++++++--- .../apache/doris/common/jni/MockJniScanner.java | 11 +---------- .../doris/maxcompute/MaxComputeJniScanner.java | 16 +++------------- .../apache/doris/paimon/PaimonJniScanner.java | 3 +-- .../TrinoConnectorJniScanner.java | 2 -- 9 files changed, 29 insertions(+), 54 deletions(-) diff --git a/be/src/vec/exec/jni_connector.cpp b/be/src/vec/exec/jni_connector.cpp index 4052af6c37c8a4..0607a216e7153a 100644 --- a/be/src/vec/exec/jni_connector.cpp +++ b/be/src/vec/exec/jni_connector.cpp @@ -102,13 +102,16 @@ Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) { Status JniConnector::init( std::unordered_map* colname_to_value_range) { - _generate_predicates(colname_to_value_range); - if (_predicates_length != 0 && _predicates != nullptr) { - int64_t predicates_address = (int64_t)_predicates.get(); - // We can call org.apache.doris.common.jni.vec.ScanPredicate#parseScanPredicates to parse the - // serialized predicates in java side. - _scanner_params.emplace("push_down_predicates", std::to_string(predicates_address)); - } + // TODO: This logic need to be changed. + // See the comment of "predicates" field in JniScanner.java + + // _generate_predicates(colname_to_value_range); + // if (_predicates_length != 0 && _predicates != nullptr) { + // int64_t predicates_address = (int64_t)_predicates.get(); + // // We can call org.apache.doris.common.jni.vec.ScanPredicate#parseScanPredicates to parse the + // // serialized predicates in java side. + // _scanner_params.emplace("push_down_predicates", std::to_string(predicates_address)); + // } return Status::OK(); } diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h index ed282fc0c401c0..dc21be85b77bf8 100644 --- a/be/src/vec/exec/jni_connector.h +++ b/be/src/vec/exec/jni_connector.h @@ -163,6 +163,9 @@ class JniConnector { char_ptr += s->size; } } else { + // FIXME: it can not handle decimal type correctly. + // but this logic is deprecated and not used. + // so may be deleted or fixed later. for (const CppType* v : values) { int type_len = sizeof(CppType); *reinterpret_cast(char_ptr) = type_len; diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java index 17a185d03ae138..dc845f43cb879e 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java @@ -19,7 +19,6 @@ import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; -import org.apache.doris.common.jni.vec.ScanPredicate; import org.apache.doris.common.jni.vec.TableSchema; import org.apache.doris.thrift.TFileType; @@ -173,7 +172,7 @@ private void initDataReader() { try { initAvroFileContext(); initFieldInspector(); - initTableInfo(requiredTypes, requiredFields, new ScanPredicate[0], fetchSize); + initTableInfo(requiredTypes, requiredFields, fetchSize); } catch (Exception e) { LOG.warn("Failed to init avro scanner. ", e); throw new RuntimeException(e); diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java index 8da006d66bd197..a284c7adcdd105 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java +++ b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java @@ -20,7 +20,6 @@ import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; -import org.apache.doris.common.jni.vec.ScanPredicate; import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.common.security.authentication.HadoopUGI; @@ -59,7 +58,6 @@ public class HudiJniScanner extends JniScanner { private final int fetchSize; private final String debugString; private final HoodieSplit split; - private final ScanPredicate[] predicates; private final ClassLoader classLoader; private long getRecordReaderTimeNs = 0; @@ -123,20 +121,8 @@ public HudiJniScanner(int fetchSize, Map params) { .collect(Collectors.joining("\n")); try { this.classLoader = this.getClass().getClassLoader(); - String predicatesAddressString = params.remove("push_down_predicates"); this.fetchSize = fetchSize; this.split = new HoodieSplit(params); - if (predicatesAddressString == null) { - predicates = new ScanPredicate[0]; - } else { - long predicatesAddress = Long.parseLong(predicatesAddressString); - if (predicatesAddress != 0) { - predicates = ScanPredicate.parseScanPredicates(predicatesAddress, split.requiredTypes()); - LOG.info("HudiJniScanner gets pushed-down predicates: " + ScanPredicate.dump(predicates)); - } else { - predicates = new ScanPredicate[0]; - } - } } catch (Exception e) { LOG.error("Failed to initialize hudi scanner, split params:\n" + debugString, e); throw e; @@ -147,7 +133,7 @@ public HudiJniScanner(int fetchSize, Map params) { public void open() throws IOException { Future avroFuture = avroReadPool.submit(() -> { Thread.currentThread().setContextClassLoader(classLoader); - initTableInfo(split.requiredTypes(), split.requiredFields(), predicates, fetchSize); + initTableInfo(split.requiredTypes(), split.requiredFields(), fetchSize); long startTime = System.nanoTime(); // RecordReader will use ProcessBuilder to start a hotspot process, which may be stuck, // so use another process to kill this stuck process. diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java index 51ed837813c9a8..cb191f4b038f48 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java @@ -33,6 +33,14 @@ public abstract class JniScanner { protected VectorTable vectorTable; protected String[] fields; protected ColumnType[] types; + @Deprecated + // This predicate is from BE, but no used. + // TODO: actually, we can generate the predicate for JNI scanner in FE's planner, + // then serialize it to BE, and BE pass it to JNI scanner directly. + // NO need to use this intermediate expression, because each JNI scanner has its + // own predicate expression format. + // For example, Paimon use "PaimonScannerUtils.decodeStringToObject(paimonPredicate)" + // to deserialize the predicate string to PaimonPredicate object. protected ScanPredicate[] predicates; protected int batchSize; @@ -50,11 +58,9 @@ protected TableSchema parseTableSchema() throws UnsupportedOperationException { throw new UnsupportedOperationException(); } - protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, ScanPredicate[] predicates, - int batchSize) { + protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, int batchSize) { this.types = requiredTypes; this.fields = requiredFields; - this.predicates = predicates; this.batchSize = batchSize; } diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java index bc7561e2a23fc7..000f536e915f86 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java @@ -20,7 +20,6 @@ import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.ColumnValue; -import org.apache.doris.common.jni.vec.ScanPredicate; import org.apache.log4j.Logger; @@ -187,15 +186,7 @@ public MockJniScanner(int batchSize, Map params) { for (int i = 0; i < types.length; i++) { columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]); } - ScanPredicate[] predicates = new ScanPredicate[0]; - if (params.containsKey("push_down_predicates")) { - long predicatesAddress = Long.parseLong(params.get("push_down_predicates")); - if (predicatesAddress != 0) { - predicates = ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes); - LOG.info("MockJniScanner gets pushed-down predicates: " + ScanPredicate.dump(predicates)); - } - } - initTableInfo(columnTypes, requiredFields, predicates, batchSize); + initTableInfo(columnTypes, requiredFields, batchSize); } @Override diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java index a87446b14b7b2c..6a441a69293c9a 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java @@ -19,7 +19,6 @@ import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; -import org.apache.doris.common.jni.vec.ScanPredicate; import com.aliyun.odps.Column; import com.aliyun.odps.OdpsType; @@ -99,15 +98,7 @@ public MaxComputeJniScanner(int batchSize, Map params) { for (int i = 0; i < types.length; i++) { columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]); } - ScanPredicate[] predicates = new ScanPredicate[0]; - if (params.containsKey("push_down_predicates")) { - long predicatesAddress = Long.parseLong(params.get("push_down_predicates")); - if (predicatesAddress != 0) { - predicates = ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes); - LOG.info("MaxComputeJniScanner gets pushed-down predicates: " + ScanPredicate.dump(predicates)); - } - } - initTableInfo(columnTypes, requiredFields, predicates, batchSize); + initTableInfo(columnTypes, requiredFields, batchSize); } public void refreshTableScan() { @@ -133,9 +124,8 @@ public String tableUniqKey() { } @Override - protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, ScanPredicate[] predicates, - int batchSize) { - super.initTableInfo(requiredTypes, requiredFields, predicates, batchSize); + protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, int batchSize) { + super.initTableInfo(requiredTypes, requiredFields, batchSize); readColumns = new ArrayList<>(); readColumnsToId = new HashMap<>(); for (int i = 0; i < fields.length; i++) { diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index 69ec49e3364d49..ad45c72919074d 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -19,7 +19,6 @@ import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; -import org.apache.doris.common.jni.vec.ScanPredicate; import org.apache.doris.common.jni.vec.TableSchema; import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey; import org.apache.doris.paimon.PaimonTableCache.TableExt; @@ -82,7 +81,7 @@ public PaimonJniScanner(int batchSize, Map params) { dbId = Long.parseLong(params.get("db_id")); tblId = Long.parseLong(params.get("tbl_id")); lastUpdateTime = Long.parseLong(params.get("last_update_time")); - initTableInfo(columnTypes, requiredFields, new ScanPredicate[0], batchSize); + initTableInfo(columnTypes, requiredFields, batchSize); paimonOptionParams = params.entrySet().stream() .filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX)) .collect(Collectors diff --git a/fe/be-java-extensions/trino-connector-scanner/src/main/java/org/apache/doris/trinoconnector/TrinoConnectorJniScanner.java b/fe/be-java-extensions/trino-connector-scanner/src/main/java/org/apache/doris/trinoconnector/TrinoConnectorJniScanner.java index 4a8afaf7b898cd..c3347775a16879 100644 --- a/fe/be-java-extensions/trino-connector-scanner/src/main/java/org/apache/doris/trinoconnector/TrinoConnectorJniScanner.java +++ b/fe/be-java-extensions/trino-connector-scanner/src/main/java/org/apache/doris/trinoconnector/TrinoConnectorJniScanner.java @@ -19,7 +19,6 @@ import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; -import org.apache.doris.common.jni.vec.ScanPredicate; import org.apache.doris.common.jni.vec.TableSchema; import org.apache.doris.trinoconnector.TrinoConnectorCache.TrinoConnectorCacheKey; import org.apache.doris.trinoconnector.TrinoConnectorCache.TrinoConnectorCacheValue; @@ -128,7 +127,6 @@ public TrinoConnectorJniScanner(int batchSize, Map params) { catalogNameString = params.get("catalog_name"); super.batchSize = batchSize; super.fields = params.get("required_fields").split(","); - super.predicates = new ScanPredicate[0]; connectorSplitString = params.get("trino_connector_split"); connectorTableHandleString = params.get("trino_connector_table_handle");