Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
Committed-by: xiaolei.zl from Dev container
  • Loading branch information
zhanglei1949 committed Aug 21, 2024
2 parents 9077190 + aac9060 commit 272d4ef
Show file tree
Hide file tree
Showing 47 changed files with 11,828 additions and 19 deletions.
2 changes: 1 addition & 1 deletion flex/engines/graph_db/runtime/common/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ const std::shared_ptr<IContextColumn> Context::get(int alias) const {
return head;
}
assert(static_cast<size_t>(alias) < columns.size());
assert(columns[alias] != nullptr);
// return nullptr if the column is not set
return columns[alias];
}

Expand Down
8 changes: 8 additions & 0 deletions flex/engines/graph_db/runtime/common/operators/intersect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ static Context intersect_impl(std::vector<Context>&& ctxs, int key) {
return idx_col1.get_value(a) < idx_col1.get_value(b);
});
std::vector<size_t> shuffle_offsets;
std::vector<size_t> shuffle_offsets_1;
size_t idx0 = 0, idx1 = 0;
while (idx0 < idx_col0.size() && idx1 < idx_col1.size()) {
if (idx_col0.get_value(offsets0[idx0]) <
Expand All @@ -151,6 +152,7 @@ static Context intersect_impl(std::vector<Context>&& ctxs, int key) {
auto v1 = vlist1.get_vertex(offsets1[idx1]);
if (v0 == v1) {
shuffle_offsets.push_back(offsets0[idx0]);
shuffle_offsets_1.push_back(offsets1[idx1]);
} else if (v0 < v1) {
break;
} else {
Expand All @@ -164,7 +166,13 @@ static Context intersect_impl(std::vector<Context>&& ctxs, int key) {
}

ctxs[0].reshuffle(shuffle_offsets);
ctxs[1].reshuffle(shuffle_offsets_1);
ctxs[0].pop_idx_col();
for (size_t i = 0; i < ctxs[1].col_num(); ++i) {
if (i >= ctxs[0].col_num() || ctxs[0].get(i) == nullptr) {
ctxs[0].set(i, ctxs[1].get(i));
}
}
return ctxs[0];
}
}
Expand Down
2 changes: 2 additions & 0 deletions interactive_engine/executor/common/huge_container/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/target
/Cargo.lock
10 changes: 10 additions & 0 deletions interactive_engine/executor/common/huge_container/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "huge_container"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
libc = "0.2"
lazy_static = "1.4"
144 changes: 144 additions & 0 deletions interactive_engine/executor/common/huge_container/src/huge_vec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
//
//! Copyright 2020 Alibaba Group Holding Limited.
//!
//! Licensed under the Apache License, Version 2.0 (the "License");
//! you may not use this file except in compliance with the License.
//! You may obtain a copy of the License at
//!
//! http://www.apache.org/licenses/LICENSE-2.0
//!
//! Unless required by applicable law or agreed to in writing, software
//! distributed under the License is distributed on an "AS IS" BASIS,
//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//! See the License for the specific language governing permissions and
//! limitations under the License.
use std::fmt;
use std::ops;

pub struct HugeVec<T> {
ptr: *mut T,
cap: usize,
len: usize,
}

impl<T> HugeVec<T> {
pub fn new() -> Self {
Self { ptr: std::ptr::null_mut(), cap: 0, len: 0 }
}

pub fn with_capacity(capacity: usize) -> Self {
let cap_in_bytes = capacity * std::mem::size_of::<T>();
let ptr = crate::hugepage_alloc(cap_in_bytes) as *mut T;
Self { ptr, cap: capacity, len: 0 }
}

pub fn len(&self) -> usize {
self.len
}

pub fn capacity(&self) -> usize {
self.cap
}

pub fn reserve(&mut self, additional: usize) {
let new_cap = self.cap + additional;
let new_cap_in_bytes = new_cap * std::mem::size_of::<T>();
let new_ptr = crate::hugepage_alloc(new_cap_in_bytes) as *mut T;

if self.len > 0 {
unsafe {
std::ptr::copy_nonoverlapping(self.ptr, new_ptr, self.len);
}
}
if self.cap > 0 {
crate::hugepage_dealloc(self.ptr as *mut u8, self.cap * std::mem::size_of::<T>());
}

self.ptr = new_ptr;
self.cap = new_cap;
}

pub fn as_ptr(&self) -> *const T {
self.ptr
}

pub fn as_mut_ptr(&mut self) -> *mut T {
self.ptr
}

pub fn push(&mut self, value: T) {
if self.len == self.cap {
self.reserve(1);
}

unsafe {
self.ptr.add(self.len).write(value);
}

self.len += 1;
}

pub fn clear(&mut self) {
unsafe { std::ptr::drop_in_place(std::slice::from_raw_parts_mut(self.ptr, self.len)) }
self.len = 0;
}

pub fn resize(&mut self, new_len: usize, value: T)
where
T: Clone,
{
if new_len > self.len {
if new_len > self.cap {
self.reserve(new_len - self.len);
}

for i in self.len..new_len {
unsafe {
self.ptr.add(i).write(value.clone());
}
}
} else {
unsafe {
std::ptr::drop_in_place(std::slice::from_raw_parts_mut(
self.ptr.add(new_len),
self.len - new_len,
));
}
}

self.len = new_len;
}
}

impl<T> Drop for HugeVec<T> {
fn drop(&mut self) {
self.clear();
if self.cap > 0 {
crate::hugepage_dealloc(self.ptr as *mut u8, self.cap * std::mem::size_of::<T>());
}
}
}

impl<T> ops::Deref for HugeVec<T> {
type Target = [T];

fn deref(&self) -> &Self::Target {
unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
}
}

impl<T> ops::DerefMut for HugeVec<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { std::slice::from_raw_parts_mut(self.ptr, self.len) }
}
}

impl<T: fmt::Debug> fmt::Debug for HugeVec<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}

unsafe impl<T> Sync for HugeVec<T> {}
unsafe impl<T> Send for HugeVec<T> {}
57 changes: 57 additions & 0 deletions interactive_engine/executor/common/huge_container/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//
//! Copyright 2020 Alibaba Group Holding Limited.
//!
//! Licensed under the Apache License, Version 2.0 (the "License");
//! you may not use this file except in compliance with the License.
//! You may obtain a copy of the License at
//!
//! http://www.apache.org/licenses/LICENSE-2.0
//!
//! Unless required by applicable law or agreed to in writing, software
//! distributed under the License is distributed on an "AS IS" BASIS,
//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//! See the License for the specific language governing permissions and
//! limitations under the License.
#[cfg(target_os = "linux")]
mod linux_hugepages;
#[cfg(target_os = "linux")]
use linux_hugepages::hugepage_alloc;
#[cfg(target_os = "linux")]
use linux_hugepages::hugepage_dealloc;

#[cfg(not(target_os = "linux"))]
mod notlinux_hugepages;
#[cfg(not(target_os = "linux"))]
use notlinux_hugepages::hugepage_alloc;
#[cfg(not(target_os = "linux"))]
use notlinux_hugepages::hugepage_dealloc;

mod huge_vec;

pub use huge_vec::HugeVec;

pub fn add(left: usize, right: usize) -> usize {
left + right
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);

let mut vec = HugeVec::<i32>::new();
vec.push(1);
vec.push(2);
vec.push(3);

assert_eq!(vec.len(), 3);
assert_eq!(vec[0], 1);
assert_eq!(vec[1], 2);
assert_eq!(vec[2], 3);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//
//! Copyright 2020 Alibaba Group Holding Limited.
//!
//! Licensed under the Apache License, Version 2.0 (the "License");
//! you may not use this file except in compliance with the License.
//! You may obtain a copy of the License at
//!
//! http://www.apache.org/licenses/LICENSE-2.0
//!
//! Unless required by applicable law or agreed to in writing, software
//! distributed under the License is distributed on an "AS IS" BASIS,
//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//! See the License for the specific language governing permissions and
//! limitations under the License.
use lazy_static::lazy_static;
use std::{
fs::File,
io::{self, BufRead, BufReader},
};

fn get_hugepage_size() -> io::Result<usize> {
let file = File::open("/proc/meminfo")?;
let reader = BufReader::new(file);

for line in reader.lines() {
let line = line?;
if line.starts_with("Hugepagesize:") {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 {
if let Ok(size_kb) = parts[1].parse::<usize>() {
match parts[2] {
"kB" => return Ok(size_kb * 1024),
"MB" => return Ok(size_kb * 1024 * 1024),
"GB" => return Ok(size_kb * 1024 * 1024 * 1024),
_ => {}
}
}
}
}
}

Err(io::Error::new(io::ErrorKind::NotFound, "Hugepagesize info not found"))
}

lazy_static! {
static ref HUGE_PAGE_SIZE: usize = get_hugepage_size().unwrap();
}

fn align_to(size: usize, align: usize) -> usize {
(size + align - 1) & !(align - 1)
}

pub(crate) fn hugepage_alloc(size: usize) -> *mut u8 {
let len = align_to(size, *HUGE_PAGE_SIZE);
let p = unsafe {
libc::mmap(
std::ptr::null_mut(),
len,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_PRIVATE | libc::MAP_ANONYMOUS | libc::MAP_HUGETLB,
-1,
0,
)
};
p as *mut u8
}

pub(crate) fn hugepage_dealloc(ptr: *mut u8, size: usize) {
let len = align_to(size, *HUGE_PAGE_SIZE);
let ret = unsafe { libc::munmap(ptr as *mut libc::c_void, len) };
if ret != 0 {
panic!("hugepage deallocation failed, {} - {} -> {}", ret, size, len);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//
//! Copyright 2020 Alibaba Group Holding Limited.
//!
//! Licensed under the Apache License, Version 2.0 (the "License");
//! you may not use this file except in compliance with the License.
//! You may obtain a copy of the License at
//!
//! http://www.apache.org/licenses/LICENSE-2.0
//!
//! Unless required by applicable law or agreed to in writing, software
//! distributed under the License is distributed on an "AS IS" BASIS,
//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//! See the License for the specific language governing permissions and
//! limitations under the License.
pub(crate) fn hugepage_alloc(size: usize) -> *mut u8 {
let ptr = unsafe {
libc::mmap(
std::ptr::null_mut(),
size,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_PRIVATE | libc::MAP_ANONYMOUS,
-1,
0,
)
};

if ptr == libc::MAP_FAILED {
panic!("hugepage allocation failed");
}

ptr as *mut u8
}

pub(crate) fn hugepage_dealloc(ptr: *mut u8, size: usize) {
let ret = unsafe { libc::munmap(ptr as *mut libc::c_void, size) };
if ret != 0 {
panic!("hugepage deallocation failed, {}", ret);
}
}
3 changes: 2 additions & 1 deletion interactive_engine/executor/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ members = [
"mcsr",
"global_query",
"groot",
"exp_store"
"exp_store",
"bmcsr"
]

[profile.release]
Expand Down
Loading

0 comments on commit 272d4ef

Please sign in to comment.