From 0ec35de23a7ae2a0e12fb6ec5be3f5d5196ee8dd Mon Sep 17 00:00:00 2001 From: Xujian Duan <50550370+DarvenDuan@users.noreply.github.com> Date: Mon, 9 Dec 2024 17:08:31 +0800 Subject: [PATCH 1/4] [bug](s3) fix S3 file system gets absolute path (#44965) ### What problem does this PR solve? Issue Number: close https://github.com/apache/doris/issues/44902 Related PR: #xxx ### Problem Summary If we create a S3 resource for cooldown data storage and the s3.root.path has a leading slash(/), such as: /root, the remote S3 data file will not be deleted if we drop the cooldown tablet from Doris. ### Reason Doris will get the path of all files of a tablet and then delete those files, The AWS `S3Client:ListObjectsV2` uses objects prefix to get the object files, but if the prefix has a leading slash(/), the`S3Client:ListObjectsV2` gets empty result. ### Solution use _prefix instead of _root_path in S3FileSystem for getting absolute path, _prefix is normalized in constructor and it is removed the first and last '/'. ### test case 1. create a s3 resource with a leading '/' and a storage policy base on it: ``` CREATE RESOURCE "test_resource" PROPERTIES ( "type" = "s3", "s3.endpoint" = "xxx", "s3.region" = "xxx", "s3.bucket" = "xxx", "s3.root.path" = "/tmp", "s3.access_key" = "xx", "s3.secret_key" = "xx" ); CREATE STORAGE POLICY test_policy PROPERTIES ( "storage_resource" = "test_resource", "cooldown_ttl" = "1" ) ``` 2. create a table and set the storage_policy to `test_policy` and insert test data: ``` CREATE TABLE `test_table` ( `k` bigint, `v` bigint ) ENGINE=OLAP DUPLICATE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1 PROPERTIES ( "replication_num" = "1", "storage_policy" = "test_policy" ); insert into test_table values (1,2),(2,3); ``` 3. wait for the rowset cooldown to s3. 4. truncate test table ``` truncate table test_table force; ``` 5. If we get a log like ` delete remote rowsets of tablet. root_path="/tmp", tablet_id=xxx` in be.INFO, the remote tablet file should be deleted. ### Release note None ### Check List (For Author) - Test - [x] Regression test - [x] Unit Test - [x] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason - Behavior changed: - [x] No. - [ ] Yes. - Does this need documentation? - [x] No. - [ ] Yes. ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label --- be/src/io/fs/s3_file_system.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h index 61967a63e44379..f6efa5053324ff 100644 --- a/be/src/io/fs/s3_file_system.h +++ b/be/src/io/fs/s3_file_system.h @@ -121,7 +121,7 @@ class S3FileSystem final : public RemoteFileSystem { abs_path = path; } else { // path with no schema - abs_path = _root_path / path; + abs_path = _prefix / path; } return Status::OK(); } From 8c5307e40d340446c765836bf4635fa5c65b477a Mon Sep 17 00:00:00 2001 From: Siyang Tang Date: Mon, 9 Dec 2024 17:51:16 +0800 Subject: [PATCH 2/4] [fix](meta-service) Avoid rowset meta exceeds 2G result in protobuf fatal (#44780) Rowset meta may exceed limit of 2GB incase of table of extremely wide variant type. Check the size before serializing. --- cloud/src/meta-service/meta_service.cpp | 18 ++++- cloud/test/txn_lazy_commit_test.cpp | 90 ++++++++++++++++++++++++- 2 files changed, 104 insertions(+), 4 deletions(-) diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 9da5750d8d83f9..33c109d19bcc31 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -1328,15 +1328,27 @@ void internal_get_rowset(Transaction* txn, int64_t start, int64_t end, while (it->has_next()) { auto [k, v] = it->next(); - auto rs = response->add_rowset_meta(); + auto* rs = response->add_rowset_meta(); + auto byte_size = rs->ByteSizeLong(); + TEST_SYNC_POINT_CALLBACK("get_rowset:meta_exceed_limit", &byte_size); + if (byte_size + v.size() > std::numeric_limits::max()) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = fmt::format( + "rowset meta exceeded 2G, unable to serialize, key={}. byte_size={}", + hex(k), byte_size); + LOG(WARNING) << msg; + return; + } if (!rs->ParseFromArray(v.data(), v.size())) { code = MetaServiceCode::PROTOBUF_PARSE_ERR; - msg = "malformed rowset meta, unable to deserialize"; + msg = "malformed rowset meta, unable to serialize"; LOG(WARNING) << msg << " key=" << hex(k); return; } ++num_rowsets; - if (!it->has_next()) key0 = k; + if (!it->has_next()) { + key0 = k; + } } key0.push_back('\x00'); // Update to next smallest key for iteration } while (it->more()); diff --git a/cloud/test/txn_lazy_commit_test.cpp b/cloud/test/txn_lazy_commit_test.cpp index 9a7679f3dd9e23..0f284508a3f34e 100644 --- a/cloud/test/txn_lazy_commit_test.cpp +++ b/cloud/test/txn_lazy_commit_test.cpp @@ -25,7 +25,9 @@ #include #include +#include #include +#include #include #include #include @@ -1812,4 +1814,90 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase4Test) { ASSERT_TRUE(abort_timeout_txn_hit); ASSERT_EQ(txn_id, txn_info_pb.txn_id()); } -} // namespace doris::cloud \ No newline at end of file + +TEST(TxnLazyCommitTest, RowsetMetaSizeExceedTest) { + auto txn_kv = get_mem_txn_kv(); + + int64_t db_id = 5252025; + int64_t table_id = 35201043384; + int64_t index_id = 256439; + int64_t partition_id = 732536259; + + auto meta_service = get_meta_service(txn_kv, true); + int64_t tablet_id = 25910248; + + { + create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, + tablet_id); + } + { + int tmp_txn_id = 0; + { + brpc::Controller cntl; + BeginTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + TxnInfoPB txn_info_pb; + txn_info_pb.set_db_id(db_id); + txn_info_pb.set_label("test_label_32ae213dasg3"); + txn_info_pb.add_table_ids(table_id); + txn_info_pb.set_timeout_ms(36000); + req.mutable_txn_info()->CopyFrom(txn_info_pb); + BeginTxnResponse res; + meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + tmp_txn_id = res.txn_id(); + ASSERT_GT(res.txn_id(), 0); + } + { + auto tmp_rowset = create_rowset(tmp_txn_id, tablet_id, partition_id); + CreateRowsetResponse res; + commit_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + { + brpc::Controller cntl; + CommitTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_db_id(db_id); + req.set_txn_id(tmp_txn_id); + req.set_is_2pc(false); + req.set_enable_txn_lazy_commit(true); + CommitTxnResponse res; + meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + } + + auto* sp = SyncPoint::get_instance(); + sp->set_call_back("get_rowset:meta_exceed_limit", [](auto&& args) { + auto* byte_size = try_any_cast(args[0]); + *byte_size = std::numeric_limits::max(); + ++(*byte_size); + }); + + sp->enable_processing(); + { + brpc::Controller cntl; + GetRowsetRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + auto* tablet_idx = req.mutable_idx(); + tablet_idx->set_table_id(table_id); + tablet_idx->set_index_id(index_id); + tablet_idx->set_partition_id(partition_id); + tablet_idx->set_tablet_id(tablet_id); + req.set_start_version(0); + req.set_end_version(-1); + req.set_cumulative_compaction_cnt(0); + req.set_base_compaction_cnt(0); + req.set_cumulative_point(2); + + GetRowsetResponse res; + meta_service->get_rowset(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, + &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::PROTOBUF_PARSE_ERR); + } +} + +} // namespace doris::cloud From 0a0c50253649f8a5dc1f377162df7c9549347b36 Mon Sep 17 00:00:00 2001 From: Socrates Date: Mon, 9 Dec 2024 18:44:24 +0800 Subject: [PATCH 3/4] [fix](be-ut) fix compile warning: declaration shadows a local variable (#45204) ### What problem does this PR solve? Problem Summary: ```text FAILED: src/vec/CMakeFiles/Vec.dir/sink/writer/vtablet_writer.cpp.o /root/doris/be/src/vec/sink/writer/vtablet_writer.cpp:1402:23: error: declaration shadows a local variable [-Werror,-Wshadow] } else if (Status st = ich.check_tablet_filtered_rows_consistency(); !st.ok()) { /root/doris/be/src/vec/sink/writer/vtablet_writer.cpp:1400:23: note: previous declaration is here } else if (Status st = ich.check_tablet_received_rows_consistency(); !st.ok()) { ``` --- be/src/vec/sink/writer/vtablet_writer.cpp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 55b6845b6bc871..0de868c00b995e 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -1394,13 +1394,12 @@ static Status cancel_channel_and_check_intolerable_failure(Status status, nch.cancel(err_msg); // check if index has intolerable failure - Status index_st = ich.check_intolerable_failure(); - if (!index_st.ok()) { + if (Status index_st = ich.check_intolerable_failure(); !index_st.ok()) { status = std::move(index_st); - } else if (Status st = ich.check_tablet_received_rows_consistency(); !st.ok()) { - status = std::move(st); - } else if (Status st = ich.check_tablet_filtered_rows_consistency(); !st.ok()) { - status = std::move(st); + } else if (Status receive_st = ich.check_tablet_received_rows_consistency(); !receive_st.ok()) { + status = std::move(receive_st); + } else if (Status filter_st = ich.check_tablet_filtered_rows_consistency(); !filter_st.ok()) { + status = std::move(filter_st); } return status; } From 0a73618313c3fd3dfcb377a95c97049673277ecd Mon Sep 17 00:00:00 2001 From: Mryange Date: Mon, 9 Dec 2024 19:14:53 +0800 Subject: [PATCH 4/4] [feature](udf) Support for IP types in Java UDF. (#44871) ### What problem does this PR solve? Add support for IPV types in Java UDFs. doc https://github.com/apache/doris-website/pull/1444 --- be/src/vec/exec/jni_connector.cpp | 12 +- .../common/jni/utils/JavaUdfDataType.java | 8 + .../common/jni/utils/TypeNativeBytes.java | 16 ++ .../doris/common/jni/utils/UdfUtils.java | 6 + .../doris/common/jni/vec/ColumnType.java | 20 +++ .../doris/common/jni/vec/VectorColumn.java | 79 ++++++++++ .../java/org/apache/doris/catalog/Type.java | 4 +- .../nereids_p0/javaudf/test_javaudf_ip.out | 42 +++++ .../org/apache/doris/udf/IPV4TypeTest.java | 74 +++++++++ .../org/apache/doris/udf/IPV6TypeTest.java | 73 +++++++++ .../java/org/apache/doris/udf/MySumIP.java | 76 +++++++++ .../nereids_p0/javaudf/test_javaudf_ip.groovy | 146 ++++++++++++++++++ 12 files changed, 554 insertions(+), 2 deletions(-) create mode 100644 regression-test/data/nereids_p0/javaudf/test_javaudf_ip.out create mode 100644 regression-test/java-udf-src/src/main/java/org/apache/doris/udf/IPV4TypeTest.java create mode 100644 regression-test/java-udf-src/src/main/java/org/apache/doris/udf/IPV6TypeTest.java create mode 100644 regression-test/java-udf-src/src/main/java/org/apache/doris/udf/MySumIP.java create mode 100644 regression-test/suites/nereids_p0/javaudf/test_javaudf_ip.groovy diff --git a/be/src/vec/exec/jni_connector.cpp b/be/src/vec/exec/jni_connector.cpp index 4c977b69ad6a42..a87ccf987ac7af 100644 --- a/be/src/vec/exec/jni_connector.cpp +++ b/be/src/vec/exec/jni_connector.cpp @@ -63,7 +63,9 @@ namespace doris::vectorized { M(TypeIndex::Date, ColumnVector, Int64) \ M(TypeIndex::DateV2, ColumnVector, UInt32) \ M(TypeIndex::DateTime, ColumnVector, Int64) \ - M(TypeIndex::DateTimeV2, ColumnVector, UInt64) + M(TypeIndex::DateTimeV2, ColumnVector, UInt64) \ + M(TypeIndex::IPv4, ColumnVector, IPv4) \ + M(TypeIndex::IPv6, ColumnVector, IPv6) Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) { _state = state; @@ -450,6 +452,10 @@ std::string JniConnector::get_jni_type(const DataTypePtr& data_type) { return "float"; case TYPE_DOUBLE: return "double"; + case TYPE_IPV4: + return "ipv4"; + case TYPE_IPV6: + return "ipv6"; case TYPE_VARCHAR: [[fallthrough]]; case TYPE_CHAR: @@ -534,6 +540,10 @@ std::string JniConnector::get_jni_type(const TypeDescriptor& desc) { return "float"; case TYPE_DOUBLE: return "double"; + case TYPE_IPV4: + return "ipv4"; + case TYPE_IPV6: + return "ipv6"; case TYPE_VARCHAR: { buffer << "varchar(" << desc.len << ")"; return buffer.str(); diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/JavaUdfDataType.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/JavaUdfDataType.java index 9f973543b29413..6077f713e8319d 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/JavaUdfDataType.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/JavaUdfDataType.java @@ -27,6 +27,7 @@ import java.math.BigDecimal; import java.math.BigInteger; +import java.net.InetAddress; import java.util.ArrayList; import java.util.HashSet; import java.util.Set; @@ -55,6 +56,9 @@ public class JavaUdfDataType { public static final JavaUdfDataType DECIMAL64 = new JavaUdfDataType("DECIMAL64", TPrimitiveType.DECIMAL64, 8); public static final JavaUdfDataType DECIMAL128 = new JavaUdfDataType("DECIMAL128", TPrimitiveType.DECIMAL128I, 16); + + public static final JavaUdfDataType IPV4 = new JavaUdfDataType("IPV4", TPrimitiveType.IPV4, 4); + public static final JavaUdfDataType IPV6 = new JavaUdfDataType("IPV6", TPrimitiveType.IPV6, 16); public static final JavaUdfDataType ARRAY_TYPE = new JavaUdfDataType("ARRAY_TYPE", TPrimitiveType.ARRAY, 0); public static final JavaUdfDataType MAP_TYPE = new JavaUdfDataType("MAP_TYPE", TPrimitiveType.MAP, 0); public static final JavaUdfDataType STRUCT_TYPE = new JavaUdfDataType("STRUCT_TYPE", TPrimitiveType.STRUCT, 0); @@ -83,6 +87,8 @@ public class JavaUdfDataType { JavaUdfDataTypeSet.add(ARRAY_TYPE); JavaUdfDataTypeSet.add(MAP_TYPE); JavaUdfDataTypeSet.add(STRUCT_TYPE); + JavaUdfDataTypeSet.add(IPV4); + JavaUdfDataTypeSet.add(IPV6); } private final String description; @@ -156,6 +162,8 @@ public static Set getCandidateTypes(Class c) { return Sets.newHashSet(JavaUdfDataType.ARRAY_TYPE, JavaUdfDataType.STRUCT_TYPE); } else if (c == java.util.HashMap.class) { return Sets.newHashSet(JavaUdfDataType.MAP_TYPE); + } else if (c == InetAddress.class) { + return Sets.newHashSet(JavaUdfDataType.IPV4, JavaUdfDataType.IPV6); } return Sets.newHashSet(JavaUdfDataType.INVALID_TYPE); } diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/TypeNativeBytes.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/TypeNativeBytes.java index c6c3c28d228bd7..7474ce4467d31f 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/TypeNativeBytes.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/TypeNativeBytes.java @@ -20,6 +20,8 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.math.RoundingMode; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.time.DateTimeException; import java.time.LocalDate; import java.time.LocalDateTime; @@ -58,6 +60,20 @@ public static BigInteger getBigInteger(byte[] bytes) { return new BigInteger(originalBytes); } + public static InetAddress getInetAddress(byte[] bytes) { + // Convert the byte order back if necessary + byte[] originalBytes = convertByteOrder(bytes); + try { + return InetAddress.getByAddress(originalBytes); + } catch (UnknownHostException e) { + return null; + } + } + + public static byte[] getInetAddressBytes(InetAddress v) { + return convertByteOrder(v.getAddress()); + } + public static byte[] getDecimalBytes(BigDecimal v, int scale, int size) { BigDecimal retValue = v.setScale(scale, RoundingMode.HALF_EVEN); BigInteger data = retValue.unscaledValue(); diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfUtils.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfUtils.java index 2ef1956118b26a..d5f6e746132443 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfUtils.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfUtils.java @@ -197,6 +197,12 @@ public static Pair setArgTypes(Type[] parameterTypes StructType structType = (StructType) parameterTypes[finalI]; ArrayList fields = structType.getFields(); inputArgTypes[i].setFields(fields); + } else if (parameterTypes[finalI].isIP()) { + if (parameterTypes[finalI].isIPv4()) { + inputArgTypes[i] = new JavaUdfDataType(JavaUdfDataType.IPV4); + } else { + inputArgTypes[i] = new JavaUdfDataType(JavaUdfDataType.IPV6); + } } if (res.length == 0) { return Pair.of(false, inputArgTypes); diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnType.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnType.java index 37a58075978c18..f234015c9bfc5e 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnType.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnType.java @@ -56,6 +56,8 @@ public enum Type { DECIMAL32(4), DECIMAL64(8), DECIMAL128(16), + IPV4(4), + IPV6(16), STRING(-1), ARRAY(-1), MAP(-1), @@ -155,6 +157,18 @@ public boolean isArray() { return type == Type.ARRAY; } + public boolean isIpv4() { + return type == Type.IPV4; + } + + public boolean isIpv6() { + return type == Type.IPV6; + } + + public boolean isIp() { + return isIpv4() || isIpv6(); + } + public boolean isMap() { return type == Type.MAP; } @@ -287,6 +301,12 @@ public static ColumnType parseType(String columnName, String hiveType) { case "double": type = Type.DOUBLE; break; + case "ipv4": + type = Type.IPV4; + break; + case "ipv6": + type = Type.IPV6; + break; case "datev1": type = Type.DATE; break; diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java index 839027b03b61c5..940a6ce4f16481 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java @@ -28,6 +28,8 @@ import java.math.BigDecimal; import java.math.BigInteger; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.charset.StandardCharsets; import java.time.LocalDate; import java.time.LocalDateTime; @@ -78,6 +80,19 @@ public class VectorColumn { // todo: support pruned struct fields private List structFieldIndex; + + public static final InetAddress DEFAULT_IPV4; + public static final InetAddress DEFAULT_IPV6; + + static { + try { + DEFAULT_IPV4 = InetAddress.getByName("127.0.0.1"); + DEFAULT_IPV6 = InetAddress.getByName("::1"); + } catch (UnknownHostException e) { + throw new RuntimeException("Failed to initialize default InetAddress values", e); + } + } + // Create writable column private VectorColumn(ColumnType columnType, int capacity) { this.isConst = false; @@ -376,6 +391,10 @@ public int appendNull(ColumnType.Type typeValue) { return appendLong(0); case LARGEINT: return appendBigInteger(BigInteger.ZERO); + case IPV4: + return appendInetAddress(DEFAULT_IPV4); + case IPV6: + return appendInetAddress(DEFAULT_IPV6); case FLOAT: return appendFloat(0); case DOUBLE: @@ -857,6 +876,56 @@ public BigInteger[] getBigIntegerColumn(int start, int end) { return result; } + public byte[] getInetAddressBytes(int rowId) { + int typeSize = columnType.getTypeSize(); + byte[] bytes = new byte[typeSize]; + OffHeap.copyMemory(null, data + (long) rowId * typeSize, bytes, OffHeap.BYTE_ARRAY_OFFSET, typeSize); + return bytes; + } + + public InetAddress getInetAddress(int rowId) { + return TypeNativeBytes.getInetAddress(getInetAddressBytes(rowId)); + } + + public InetAddress[] getInetAddressColumn(int start, int end) { + InetAddress[] result = new InetAddress[end - start]; + for (int i = start; i < end; ++i) { + if (!isNullAt(i)) { + result[i - start] = getInetAddress(i); + } + } + return result; + } + + public int appendInetAddress(InetAddress v) { + reserve(appendIndex + 1); + putInetAddress(appendIndex, v); + return appendIndex++; + } + + public void appendInetAddress(InetAddress[] batch, boolean isNullable) { + reserve(appendIndex + batch.length); + for (InetAddress v : batch) { + if (v == null) { + putNull(appendIndex); + if (columnType.isIpv4()) { + putInetAddress(appendIndex, DEFAULT_IPV4); + } else { + putInetAddress(appendIndex, DEFAULT_IPV6); + } + } else { + putInetAddress(appendIndex, v); + } + appendIndex++; + } + } + + private void putInetAddress(int rowId, InetAddress v) { + int typeSize = columnType.getTypeSize(); + byte[] bytes = TypeNativeBytes.getInetAddressBytes(v); + OffHeap.copyMemory(bytes, OffHeap.BYTE_ARRAY_OFFSET, null, data + (long) rowId * typeSize, typeSize); + } + public int appendDecimal(BigDecimal v) { reserve(appendIndex + 1); putDecimal(appendIndex, v); @@ -1382,6 +1451,9 @@ public Object[] newObjectContainerArray(ColumnType.Type type, int size) { return new Long[size]; case LARGEINT: return new BigInteger[size]; + case IPV4: + case IPV6: + return new InetAddress[size]; case FLOAT: return new Float[size]; case DOUBLE: @@ -1431,6 +1503,10 @@ public void appendObjectColumn(Object[] batch, boolean isNullable) { case LARGEINT: appendBigInteger((BigInteger[]) batch, isNullable); break; + case IPV4: + case IPV6: + appendInetAddress((InetAddress[]) batch, isNullable); + break; case FLOAT: appendFloat((Float[]) batch, isNullable); break; @@ -1493,6 +1569,9 @@ public Object[] getObjectColumn(int start, int end) { return getLongColumn(start, end); case LARGEINT: return getBigIntegerColumn(start, end); + case IPV4: + case IPV6: + return getInetAddressColumn(start, end); case FLOAT: return getFloatColumn(start, end); case DOUBLE: diff --git a/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java b/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java index 7a8dda5aabedef..7dfcfd15ebec84 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java +++ b/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java @@ -36,6 +36,7 @@ import java.math.BigDecimal; import java.math.BigInteger; +import java.net.InetAddress; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; @@ -321,7 +322,8 @@ public abstract class Type { .put(PrimitiveType.FLOAT, Sets.newHashSet(Float.class, float.class)) .put(PrimitiveType.DOUBLE, Sets.newHashSet(Double.class, double.class)) .put(PrimitiveType.BIGINT, Sets.newHashSet(Long.class, long.class)) - .put(PrimitiveType.IPV4, Sets.newHashSet(Integer.class, int.class)) + .put(PrimitiveType.IPV4, Sets.newHashSet(InetAddress.class)) + .put(PrimitiveType.IPV6, Sets.newHashSet(InetAddress.class)) .put(PrimitiveType.STRING, Sets.newHashSet(String.class)) .put(PrimitiveType.DATE, DATE_SUPPORTED_JAVA_TYPE) .put(PrimitiveType.DATEV2, DATE_SUPPORTED_JAVA_TYPE) diff --git a/regression-test/data/nereids_p0/javaudf/test_javaudf_ip.out b/regression-test/data/nereids_p0/javaudf/test_javaudf_ip.out new file mode 100644 index 00000000000000..b7eb66cab9cc01 --- /dev/null +++ b/regression-test/data/nereids_p0/javaudf/test_javaudf_ip.out @@ -0,0 +1,42 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_ipv4_1 -- +1 0.0.0.123 /0.0.0.123 +2 0.0.12.42 /0.0.12.42 +3 0.119.130.67 /0.119.130.67 +4 \N null + +-- !select_ipv4_2 -- +1 0.0.0.123 0.0.0.123 +2 0.0.0.123 0.0.0.123 + +-- !select_ipv4_3 -- +nulludf/0.0.0.123udf/0.0.12.42udf/0.119.130.67udf + +-- !select_ipv4_4 -- +1 ["127.0.0.1", "127.0.0.1", "127.0.0.1", null, null, "127.0.0.1"] +2 ["127.0.0.1", "127.0.0.1", "127.0.0.1", null, null, "127.0.0.1"] +3 ["127.0.0.1", "127.0.0.1", "127.0.0.1", null, null, "127.0.0.1"] +4 ["127.0.0.1", "127.0.0.1", "127.0.0.1", null, null, "127.0.0.1"] + +-- !select_ipv6_1 -- +1 ::855d /0:0:0:0:0:0:0:855d +2 ::0.4.221.183 /0:0:0:0:0:0:4:ddb7 +3 ::a:7429:d0d6:6e08:9f5f /0:0:0:a:7429:d0d6:6e08:9f5f +4 \N null + +-- !select_ipv6_2 -- +3 2001:0DB8:AC10:FE01:FEED:BABE:CAFE:F00D 2001:db8:ac10:fe01:feed:babe:cafe:f00d +4 2001:0DB8:AC10:FE01:FEED:BABE:CAFE:F00D 2001:db8:ac10:fe01:feed:babe:cafe:f00d + +-- !select_ipv6_3 -- +nulludf/0:0:0:0:0:0:0:855dudf/0:0:0:0:0:0:4:ddb7udf/0:0:0:a:7429:d0d6:6e08:9f5fudf + +-- !select_ipv6_4 -- +1 ["::1", "::1", "::1", null, null, "::1"] +2 ["::1", "::1", "::1", null, null, "::1"] +3 ["::1", "::1", "::1", null, null, "::1"] +4 ["::1", "::1", "::1", null, null, "::1"] + +-- !select_ipv6_5 -- +/0:0:0:0:0:0:0:855d, /0:0:0:0:0:0:4:ddb7, /0:0:0:a:7429:d0d6:6e08:9f5f + diff --git a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/IPV4TypeTest.java b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/IPV4TypeTest.java new file mode 100644 index 00000000000000..65d6ea2ab1a31a --- /dev/null +++ b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/IPV4TypeTest.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.udf; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; + +public class IPV4TypeTest { + // input ipv4 + public String evaluate(InetAddress x) { + if (x == null) { + return "null"; + } + return x.toString(); + } + + // output ipv4 + public InetAddress evaluate(String s) { + try { + InetAddress ipv4Address = InetAddress.getByName(s); + if (ipv4Address.getAddress().length == 4) { + return ipv4Address; + } else { + return null; + } + } catch (UnknownHostException e) { + return null; + } + } + + // input array + public String evaluate(ArrayList s) { + String ret = ""; + for (InetAddress ip : s) { + ret += evaluate(ip) + "udf"; + } + return ret; + } + + // output array + public ArrayList evaluate() { + ArrayList ret = new ArrayList(); + InetAddress DEFAULT_IPV = null; + try { + DEFAULT_IPV = InetAddress.getByName("127.0.0.1"); + } catch (UnknownHostException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + ret.add(DEFAULT_IPV); + ret.add(DEFAULT_IPV); + ret.add(DEFAULT_IPV); + ret.add(null); + ret.add(null); + ret.add(DEFAULT_IPV); + return ret; + } +} diff --git a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/IPV6TypeTest.java b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/IPV6TypeTest.java new file mode 100644 index 00000000000000..f4345cbab5de6e --- /dev/null +++ b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/IPV6TypeTest.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.udf; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; + +public class IPV6TypeTest { + // input ipv6 + public String evaluate(InetAddress x) { + if (x == null) { + return "null"; + } + return x.toString(); + } + + // output ipv6 + public InetAddress evaluate(String s) { + try { + InetAddress ipv6Address = InetAddress.getByName(s); + if (ipv6Address.getAddress().length == 16) { + return ipv6Address; + } else { + return null; + } + } catch (UnknownHostException e) { + return null; + } + } + + // input array + public String evaluate(ArrayList s) { + String ret = ""; + for (InetAddress ip : s) { + ret += evaluate(ip) + "udf"; + } + return ret; + } + + // output array + public ArrayList evaluate() { + ArrayList ret = new ArrayList(); + InetAddress DEFAULT_IPV = null; + try { + DEFAULT_IPV = InetAddress.getByName("::1"); + } catch (UnknownHostException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + ret.add(DEFAULT_IPV); + ret.add(DEFAULT_IPV); + ret.add(DEFAULT_IPV); + ret.add(null); + ret.add(null); + ret.add(DEFAULT_IPV); + return ret; + } +} diff --git a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/MySumIP.java b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/MySumIP.java new file mode 100644 index 00000000000000..b57418ec04c37b --- /dev/null +++ b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/MySumIP.java @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.udf; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.stream.Collectors; + +public class MySumIP { + public static class State { + public ArrayList list = new ArrayList<>(); + } + + public State create() { + return new State(); + } + + public void destroy(State state) { + } + + public void add(State state, InetAddress val1) { + if (val1 == null) + return; + state.list.add(val1.toString()); + } + + public void serialize(State state, DataOutputStream out) throws IOException { + out.writeInt(state.list.size()); + + for (String str : state.list) { + out.writeUTF(str); + } + } + + public void deserialize(State state, DataInputStream in) throws IOException { + int size = in.readInt(); + + state.list = new ArrayList<>(size); + + for (int i = 0; i < size; i++) { + state.list.add(in.readUTF()); + } + } + + public void merge(State state, State rhs) { + for (String s : rhs.list) { + state.list.add(s); + } + } + + public String getValue(State state) { + state.list.sort(Comparator.nullsLast(String::compareTo)); + String result = state.list.stream() + .filter(s -> s != null) + .collect(Collectors.joining(", ")); + return result; + } +} \ No newline at end of file diff --git a/regression-test/suites/nereids_p0/javaudf/test_javaudf_ip.groovy b/regression-test/suites/nereids_p0/javaudf/test_javaudf_ip.groovy new file mode 100644 index 00000000000000..aa079af16d049d --- /dev/null +++ b/regression-test/suites/nereids_p0/javaudf/test_javaudf_ip.groovy @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.nio.file.Paths + +suite("nereids_test_javaudf_ip") { + sql 'set enable_nereids_planner=true' + sql 'set enable_fallback_to_original_planner=false' + + def jarPath = """${context.file.parent}/../../javaudf_p0/jars/java-udf-case-jar-with-dependencies.jar""" + scp_udf_file_to_all_be(jarPath) + + try { + sql """ DROP FUNCTION IF EXISTS java_udf_ipv4_test1(ipv4);""" + sql """ CREATE FUNCTION java_udf_ipv4_test1(ipv4) RETURNS string PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.IPV4TypeTest", + "type"="JAVA_UDF" + ); """ + sql """ DROP FUNCTION IF EXISTS java_udf_ipv4_test2(string);""" + sql """ CREATE FUNCTION java_udf_ipv4_test2(string) RETURNS ipv4 PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.IPV4TypeTest", + "type"="JAVA_UDF" + ); """ + sql """ DROP FUNCTION IF EXISTS java_udf_ipv4_test3(array);""" + sql """ CREATE FUNCTION java_udf_ipv4_test3(array) RETURNS string PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.IPV4TypeTest", + "type"="JAVA_UDF" + ); """ + sql """ DROP FUNCTION IF EXISTS java_udf_ipv4_test4();""" + sql """ CREATE FUNCTION java_udf_ipv4_test4() RETURNS array PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.IPV4TypeTest", + "type"="JAVA_UDF" + ); """ + + sql """DROP TABLE IF EXISTS test_udf_ip;""" + sql """ + CREATE TABLE test_udf_ip + ( + k1 BIGINT , + k4 ipv4 , + k6 ipv6 , + s string + ) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES("replication_num" = "1"); + """ + sql """ insert into test_udf_ip values(1,123,34141,"0.0.0.123") , (2,3114,318903,"0.0.0.123") , (3,7832131,192837891738927931231,"2001:0DB8:AC10:FE01:FEED:BABE:CAFE:F00D"),(4,null,null,"2001:0DB8:AC10:FE01:FEED:BABE:CAFE:F00D"); """ + qt_select_ipv4_1 """ select k1,k4,java_udf_ipv4_test1(k4) from test_udf_ip order by k1 """ + qt_select_ipv4_2 """ select k1,s,java_udf_ipv4_test2(s) from test_udf_ip where IS_IPV4_STRING(s) order by k1 """ + qt_select_ipv4_3 """ select java_udf_ipv4_test3(array_sort(array_agg(k4))) from test_udf_ip """ + qt_select_ipv4_4 """ select k1, java_udf_ipv4_test4() from test_udf_ip order by k1 """ + + } finally { + try_sql("DROP FUNCTION IF EXISTS java_udf_ipv4_test1(ipv4);") + try_sql("DROP FUNCTION IF EXISTS java_udf_ipv4_test2(string);") + try_sql("DROP FUNCTION IF EXISTS java_udf_ipv4_test3(array);") + try_sql("DROP FUNCTION IF EXISTS java_udf_ipv4_test4();") + } + + try { + sql """ DROP FUNCTION IF EXISTS java_udf_ipv6_test1(ipv6);""" + sql """ CREATE FUNCTION java_udf_ipv6_test1(ipv6) RETURNS string PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.IPV6TypeTest", + "type"="JAVA_UDF" + ); """ + sql """ DROP FUNCTION IF EXISTS java_udf_ipv6_test2(string);""" + sql """ CREATE FUNCTION java_udf_ipv6_test2(string) RETURNS ipv6 PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.IPV6TypeTest", + "type"="JAVA_UDF" + ); """ + sql """ DROP FUNCTION IF EXISTS java_udf_ipv6_test3(array);""" + sql """ CREATE FUNCTION java_udf_ipv6_test3(array) RETURNS string PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.IPV6TypeTest", + "type"="JAVA_UDF" + ); """ + sql """ DROP FUNCTION IF EXISTS java_udf_ipv6_test4();""" + sql """ CREATE FUNCTION java_udf_ipv6_test4() RETURNS array PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.IPV6TypeTest", + "type"="JAVA_UDF" + ); """ + + sql """DROP TABLE IF EXISTS test_udf_ip;""" + sql """ + CREATE TABLE test_udf_ip + ( + k1 BIGINT , + k4 ipv4 , + k6 ipv6 , + s string + ) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES("replication_num" = "1"); + """ + sql """ insert into test_udf_ip values(1,123,34141,"0.0.0.123") , (2,3114,318903,"0.0.0.123") , (3,7832131,192837891738927931231,"2001:0DB8:AC10:FE01:FEED:BABE:CAFE:F00D"),(4,null,null,"2001:0DB8:AC10:FE01:FEED:BABE:CAFE:F00D"); """ + qt_select_ipv6_1 """ select k1,k6,java_udf_ipv6_test1(k6) from test_udf_ip order by k1 """ + qt_select_ipv6_2 """ select k1,s,java_udf_ipv6_test2(s) from test_udf_ip where IS_IPV6_STRING(s) order by k1 """ + qt_select_ipv6_3 """ select java_udf_ipv6_test3(array_sort(array_agg(k6))) from test_udf_ip """ + qt_select_ipv6_4 """ select k1, java_udf_ipv6_test4() from test_udf_ip order by k1 """ + + + + + sql """ CREATE AGGREGATE FUNCTION udaf_my_sum_ip(ipv6) RETURNS string PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.MySumIP", + "always_nullable"="false", + "type"="JAVA_UDF" + ); """ + + + qt_select_ipv6_5 """ select udaf_my_sum_ip(k6) from test_udf_ip; """ + + } finally { + try_sql("DROP FUNCTION IF EXISTS java_udf_ipv6_test1(ipv4);") + try_sql("DROP FUNCTION IF EXISTS java_udf_ipv6_test2(string);") + try_sql("DROP FUNCTION IF EXISTS java_udf_ipv6_test3(array);") + try_sql("DROP FUNCTION IF EXISTS java_udf_ipv6_test4();") + try_sql("DROP FUNCTION IF EXISTS udaf_my_sum_ip(ipv6);") + } +}