Skip to content

Commit

Permalink
Merge branch 'master' into recover_binlog
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp authored Dec 9, 2024
2 parents 9ec28ef + 0a73618 commit a01a787
Show file tree
Hide file tree
Showing 16 changed files with 664 additions and 13 deletions.
2 changes: 1 addition & 1 deletion be/src/io/fs/s3_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class S3FileSystem final : public RemoteFileSystem {
abs_path = path;
} else {
// path with no schema
abs_path = _root_path / path;
abs_path = _prefix / path;
}
return Status::OK();
}
Expand Down
12 changes: 11 additions & 1 deletion be/src/vec/exec/jni_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ namespace doris::vectorized {
M(TypeIndex::Date, ColumnVector<Int64>, Int64) \
M(TypeIndex::DateV2, ColumnVector<UInt32>, UInt32) \
M(TypeIndex::DateTime, ColumnVector<Int64>, Int64) \
M(TypeIndex::DateTimeV2, ColumnVector<UInt64>, UInt64)
M(TypeIndex::DateTimeV2, ColumnVector<UInt64>, UInt64) \
M(TypeIndex::IPv4, ColumnVector<IPv4>, IPv4) \
M(TypeIndex::IPv6, ColumnVector<IPv6>, IPv6)

Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) {
_state = state;
Expand Down Expand Up @@ -450,6 +452,10 @@ std::string JniConnector::get_jni_type(const DataTypePtr& data_type) {
return "float";
case TYPE_DOUBLE:
return "double";
case TYPE_IPV4:
return "ipv4";
case TYPE_IPV6:
return "ipv6";
case TYPE_VARCHAR:
[[fallthrough]];
case TYPE_CHAR:
Expand Down Expand Up @@ -534,6 +540,10 @@ std::string JniConnector::get_jni_type(const TypeDescriptor& desc) {
return "float";
case TYPE_DOUBLE:
return "double";
case TYPE_IPV4:
return "ipv4";
case TYPE_IPV6:
return "ipv6";
case TYPE_VARCHAR: {
buffer << "varchar(" << desc.len << ")";
return buffer.str();
Expand Down
11 changes: 5 additions & 6 deletions be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1394,13 +1394,12 @@ static Status cancel_channel_and_check_intolerable_failure(Status status,
nch.cancel(err_msg);

// check if index has intolerable failure
Status index_st = ich.check_intolerable_failure();
if (!index_st.ok()) {
if (Status index_st = ich.check_intolerable_failure(); !index_st.ok()) {
status = std::move(index_st);
} else if (Status st = ich.check_tablet_received_rows_consistency(); !st.ok()) {
status = std::move(st);
} else if (Status st = ich.check_tablet_filtered_rows_consistency(); !st.ok()) {
status = std::move(st);
} else if (Status receive_st = ich.check_tablet_received_rows_consistency(); !receive_st.ok()) {
status = std::move(receive_st);
} else if (Status filter_st = ich.check_tablet_filtered_rows_consistency(); !filter_st.ok()) {
status = std::move(filter_st);
}
return status;
}
Expand Down
18 changes: 15 additions & 3 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1328,15 +1328,27 @@ void internal_get_rowset(Transaction* txn, int64_t start, int64_t end,

while (it->has_next()) {
auto [k, v] = it->next();
auto rs = response->add_rowset_meta();
auto* rs = response->add_rowset_meta();
auto byte_size = rs->ByteSizeLong();
TEST_SYNC_POINT_CALLBACK("get_rowset:meta_exceed_limit", &byte_size);
if (byte_size + v.size() > std::numeric_limits<int32_t>::max()) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = fmt::format(
"rowset meta exceeded 2G, unable to serialize, key={}. byte_size={}",
hex(k), byte_size);
LOG(WARNING) << msg;
return;
}
if (!rs->ParseFromArray(v.data(), v.size())) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed rowset meta, unable to deserialize";
msg = "malformed rowset meta, unable to serialize";
LOG(WARNING) << msg << " key=" << hex(k);
return;
}
++num_rowsets;
if (!it->has_next()) key0 = k;
if (!it->has_next()) {
key0 = k;
}
}
key0.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());
Expand Down
90 changes: 89 additions & 1 deletion cloud/test/txn_lazy_commit_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@

#include <atomic>
#include <condition_variable>
#include <cstddef>
#include <cstdint>
#include <limits>
#include <memory>
#include <random>
#include <string>
Expand Down Expand Up @@ -1812,4 +1814,90 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase4Test) {
ASSERT_TRUE(abort_timeout_txn_hit);
ASSERT_EQ(txn_id, txn_info_pb.txn_id());
}
} // namespace doris::cloud

TEST(TxnLazyCommitTest, RowsetMetaSizeExceedTest) {
auto txn_kv = get_mem_txn_kv();

int64_t db_id = 5252025;
int64_t table_id = 35201043384;
int64_t index_id = 256439;
int64_t partition_id = 732536259;

auto meta_service = get_meta_service(txn_kv, true);
int64_t tablet_id = 25910248;

{
create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id,
tablet_id);
}
{
int tmp_txn_id = 0;
{
brpc::Controller cntl;
BeginTxnRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
TxnInfoPB txn_info_pb;
txn_info_pb.set_db_id(db_id);
txn_info_pb.set_label("test_label_32ae213dasg3");
txn_info_pb.add_table_ids(table_id);
txn_info_pb.set_timeout_ms(36000);
req.mutable_txn_info()->CopyFrom(txn_info_pb);
BeginTxnResponse res;
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
tmp_txn_id = res.txn_id();
ASSERT_GT(res.txn_id(), 0);
}
{
auto tmp_rowset = create_rowset(tmp_txn_id, tablet_id, partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
{
brpc::Controller cntl;
CommitTxnRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
req.set_db_id(db_id);
req.set_txn_id(tmp_txn_id);
req.set_is_2pc(false);
req.set_enable_txn_lazy_commit(true);
CommitTxnResponse res;
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
}

auto* sp = SyncPoint::get_instance();
sp->set_call_back("get_rowset:meta_exceed_limit", [](auto&& args) {
auto* byte_size = try_any_cast<size_t*>(args[0]);
*byte_size = std::numeric_limits<int32_t>::max();
++(*byte_size);
});

sp->enable_processing();
{
brpc::Controller cntl;
GetRowsetRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
auto* tablet_idx = req.mutable_idx();
tablet_idx->set_table_id(table_id);
tablet_idx->set_index_id(index_id);
tablet_idx->set_partition_id(partition_id);
tablet_idx->set_tablet_id(tablet_id);
req.set_start_version(0);
req.set_end_version(-1);
req.set_cumulative_compaction_cnt(0);
req.set_base_compaction_cnt(0);
req.set_cumulative_point(2);

GetRowsetResponse res;
meta_service->get_rowset(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req,
&res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::PROTOBUF_PARSE_ERR);
}
}

} // namespace doris::cloud
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
Expand Down Expand Up @@ -55,6 +56,9 @@ public class JavaUdfDataType {
public static final JavaUdfDataType DECIMAL64 = new JavaUdfDataType("DECIMAL64", TPrimitiveType.DECIMAL64, 8);
public static final JavaUdfDataType DECIMAL128 = new JavaUdfDataType("DECIMAL128", TPrimitiveType.DECIMAL128I,
16);

public static final JavaUdfDataType IPV4 = new JavaUdfDataType("IPV4", TPrimitiveType.IPV4, 4);
public static final JavaUdfDataType IPV6 = new JavaUdfDataType("IPV6", TPrimitiveType.IPV6, 16);
public static final JavaUdfDataType ARRAY_TYPE = new JavaUdfDataType("ARRAY_TYPE", TPrimitiveType.ARRAY, 0);
public static final JavaUdfDataType MAP_TYPE = new JavaUdfDataType("MAP_TYPE", TPrimitiveType.MAP, 0);
public static final JavaUdfDataType STRUCT_TYPE = new JavaUdfDataType("STRUCT_TYPE", TPrimitiveType.STRUCT, 0);
Expand Down Expand Up @@ -83,6 +87,8 @@ public class JavaUdfDataType {
JavaUdfDataTypeSet.add(ARRAY_TYPE);
JavaUdfDataTypeSet.add(MAP_TYPE);
JavaUdfDataTypeSet.add(STRUCT_TYPE);
JavaUdfDataTypeSet.add(IPV4);
JavaUdfDataTypeSet.add(IPV6);
}

private final String description;
Expand Down Expand Up @@ -156,6 +162,8 @@ public static Set<JavaUdfDataType> getCandidateTypes(Class<?> c) {
return Sets.newHashSet(JavaUdfDataType.ARRAY_TYPE, JavaUdfDataType.STRUCT_TYPE);
} else if (c == java.util.HashMap.class) {
return Sets.newHashSet(JavaUdfDataType.MAP_TYPE);
} else if (c == InetAddress.class) {
return Sets.newHashSet(JavaUdfDataType.IPV4, JavaUdfDataType.IPV6);
}
return Sets.newHashSet(JavaUdfDataType.INVALID_TYPE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.DateTimeException;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -58,6 +60,20 @@ public static BigInteger getBigInteger(byte[] bytes) {
return new BigInteger(originalBytes);
}

public static InetAddress getInetAddress(byte[] bytes) {
// Convert the byte order back if necessary
byte[] originalBytes = convertByteOrder(bytes);
try {
return InetAddress.getByAddress(originalBytes);
} catch (UnknownHostException e) {
return null;
}
}

public static byte[] getInetAddressBytes(InetAddress v) {
return convertByteOrder(v.getAddress());
}

public static byte[] getDecimalBytes(BigDecimal v, int scale, int size) {
BigDecimal retValue = v.setScale(scale, RoundingMode.HALF_EVEN);
BigInteger data = retValue.unscaledValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ public static Pair<Boolean, JavaUdfDataType[]> setArgTypes(Type[] parameterTypes
StructType structType = (StructType) parameterTypes[finalI];
ArrayList<StructField> fields = structType.getFields();
inputArgTypes[i].setFields(fields);
} else if (parameterTypes[finalI].isIP()) {
if (parameterTypes[finalI].isIPv4()) {
inputArgTypes[i] = new JavaUdfDataType(JavaUdfDataType.IPV4);
} else {
inputArgTypes[i] = new JavaUdfDataType(JavaUdfDataType.IPV6);
}
}
if (res.length == 0) {
return Pair.of(false, inputArgTypes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public enum Type {
DECIMAL32(4),
DECIMAL64(8),
DECIMAL128(16),
IPV4(4),
IPV6(16),
STRING(-1),
ARRAY(-1),
MAP(-1),
Expand Down Expand Up @@ -155,6 +157,18 @@ public boolean isArray() {
return type == Type.ARRAY;
}

public boolean isIpv4() {
return type == Type.IPV4;
}

public boolean isIpv6() {
return type == Type.IPV6;
}

public boolean isIp() {
return isIpv4() || isIpv6();
}

public boolean isMap() {
return type == Type.MAP;
}
Expand Down Expand Up @@ -287,6 +301,12 @@ public static ColumnType parseType(String columnName, String hiveType) {
case "double":
type = Type.DOUBLE;
break;
case "ipv4":
type = Type.IPV4;
break;
case "ipv6":
type = Type.IPV6;
break;
case "datev1":
type = Type.DATE;
break;
Expand Down
Loading

0 comments on commit a01a787

Please sign in to comment.