Skip to content

Commit

Permalink
[ISSUE #2203]💫Implement select_mapped_buffer and select_mapped_buffer…
Browse files Browse the repository at this point in the history
…_with_position method🍻 (#2204)
  • Loading branch information
mxsm authored Jan 11, 2025
1 parent ebcce97 commit be21e0c
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 69 deletions.
9 changes: 7 additions & 2 deletions rocketmq-store/src/log_file/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,9 @@ impl CommitLog {
Some(mmap_file) => {
let pos = offset % mapped_file_size as i64;
let mut select_mapped_buffer_result =
MappedFile::select_mapped_buffer_size(mmap_file, pos as i32, size);
mmap_file.select_mapped_buffer(pos as i32, size);
if let Some(ref mut result) = select_mapped_buffer_result {
result.mapped_file = Some(mmap_file);
result.is_in_cache = self.cold_data_check_service.is_data_in_page_cache();
}
select_mapped_buffer_result
Expand Down Expand Up @@ -1131,7 +1132,11 @@ impl CommitLog {
.find_mapped_file_by_offset(offset, return_first_on_not_found);
if let Some(mapped_file) = mapped_file {
let pos = (offset % mapped_file_size) as i32;
DefaultMappedFile::select_mapped_buffer(mapped_file, pos)
let mut result = mapped_file.select_mapped_buffer_with_position(pos);
if let Some(ref mut result) = result {
result.mapped_file = Some(mapped_file);
}
result
} else {
None
}
Expand Down
66 changes: 29 additions & 37 deletions rocketmq-store/src/log_file/mapped_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

use std::fs::File;
use std::io;
use std::sync::Arc;

use cheetah_string::CheetahString;
use rocketmq_common::common::message::message_batch::MessageExtBatch;
Expand Down Expand Up @@ -176,6 +175,35 @@ pub trait MappedFile {
/// `true` if the append operation was successful, `false` otherwise.
fn append_message_offset_length(&self, data: &[u8], offset: usize, length: usize) -> bool;

/// Selects a portion of the mapped file based on the specified size.
///
/// This method provides a way to access a specific portion of the mapped file, identified by
/// the starting position (`pos`) and the size (`size`). It is useful for reading or modifying
/// sections of the file without needing to load the entire file into memory.
///
/// # Arguments
/// * `pos` - The starting position from which the buffer is selected.
/// * `size` - The size of the buffer to select.
///
/// # Returns
/// An `Option<SelectMappedBufferResult>` containing the selected buffer if available, or `None`
/// if the specified range is not valid or the file is not available.
fn select_mapped_buffer(&self, pos: i32, size: i32) -> Option<SelectMappedBufferResult>;

/// Selects a buffer from the mapped file starting from the specified position.
///
/// Similar to `select_mapped_buffer_size`, but selects the buffer starting from `pos` to the
/// end of the file. This method is useful for operations that require processing the
/// remainder of the file from a given position.
///
/// # Arguments
/// * `pos` - The starting position from which the buffer is selected.
///
/// # Returns
/// An `Option<SelectMappedBufferResult>` containing the selected buffer if available, or `None`
/// if the starting position is not valid or the file is not available.
fn select_mapped_buffer_with_position(&self, pos: i32) -> Option<SelectMappedBufferResult>;

/// Retrieves a byte slice from the mapped file.
///
/// This method returns a byte slice starting from the specified position and of the specified
Expand Down Expand Up @@ -333,39 +361,6 @@ pub trait MappedFile {
/// The number of pages actually committed.
fn commit(&self, commit_least_pages: i32) -> i32;

/// Selects a portion of the mapped file based on the specified size.
///
/// This method provides a way to access a specific portion of the mapped file, identified by
/// the starting position (`pos`) and the size (`size`). It is useful for reading or modifying
/// sections of the file without needing to load the entire file into memory.
///
/// # Arguments
/// * `pos` - The starting position from which the buffer is selected.
/// * `size` - The size of the buffer to select.
///
/// # Returns
/// An `Option<SelectMappedBufferResult>` containing the selected buffer if available, or `None`
/// if the specified range is not valid or the file is not available.
fn select_mapped_buffer_size(
self: Arc<Self>,
pos: i32,
size: i32,
) -> Option<SelectMappedBufferResult>;

/// Selects a buffer from the mapped file starting from the specified position.
///
/// Similar to `select_mapped_buffer_size`, but selects the buffer starting from `pos` to the
/// end of the file. This method is useful for operations that require processing the
/// remainder of the file from a given position.
///
/// # Arguments
/// * `pos` - The starting position from which the buffer is selected.
///
/// # Returns
/// An `Option<SelectMappedBufferResult>` containing the selected buffer if available, or `None`
/// if the starting position is not valid or the file is not available.
fn select_mapped_buffer(self: Arc<Self>, pos: i32) -> Option<SelectMappedBufferResult>;

/// Retrieves the entire mapped byte buffer.
///
/// This method provides access to the entire byte buffer of the mapped file. It is useful for
Expand Down Expand Up @@ -619,9 +614,6 @@ pub trait MappedFileRefactor {
cb: &dyn CompactionAppendMsgCallback,
) -> AppendMessageResult;

fn commit(&self, commit_least_pages: usize) -> usize;
fn select_mapped_buffer(&self, pos: usize, size: usize) -> Option<SelectMappedBufferResult>;
fn select_mapped_buffer_with_position(&self, pos: usize) -> Option<SelectMappedBufferResult>;
fn get_mapped_byte_buffer(&self) -> &[u8];
fn slice_byte_buffer(&self) -> &[u8];
fn get_store_timestamp(&self) -> u64;
Expand Down
53 changes: 28 additions & 25 deletions rocketmq-store/src/log_file/mapped_file/default_mapped_file_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use std::sync::atomic::AtomicI32;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use bytes::Bytes;
use bytes::BytesMut;
Expand Down Expand Up @@ -488,27 +487,30 @@ impl MappedFile for DefaultMappedFile {

#[inline]
fn commit(&self, commit_least_pages: i32) -> i32 {
0
unimplemented!(
// need to implement
"commit"
)
}

#[inline]
fn select_mapped_buffer_size(
self: Arc<Self>,
pos: i32,
size: i32,
) -> Option<SelectMappedBufferResult> {
fn select_mapped_buffer(&self, pos: i32, size: i32) -> Option<SelectMappedBufferResult> {
let read_position = self.get_read_position();
if pos + size <= read_position {
if MappedFile::hold(self.as_ref()) {
if MappedFile::hold(self) {
self.mapped_byte_buffer_access_count_since_last_swap
.fetch_add(1, Ordering::SeqCst);
.fetch_add(1, Ordering::AcqRel);
Some(SelectMappedBufferResult {
start_offset: self.file_from_offset + pos as u64,
size,
mapped_file: Some(self),
mapped_file: None,
is_in_cache: true,
})
} else {
warn!(
"matched, but hold failed, request pos: {}, fileFromOffset: {}",
pos, self.file_from_offset
);
None
}
} else {
Expand All @@ -521,21 +523,6 @@ impl MappedFile for DefaultMappedFile {
}
}

#[inline]
fn select_mapped_buffer(self: Arc<Self>, pos: i32) -> Option<SelectMappedBufferResult> {
let read_position = self.get_read_position();
if pos < read_position && read_position > 0 && MappedFile::hold(self.as_ref()) {
Some(SelectMappedBufferResult {
start_offset: self.get_file_from_offset() + pos as u64,
size: read_position - pos,
mapped_file: Some(self),
is_in_cache: true,
})
} else {
None
}
}

#[inline]
fn get_mapped_byte_buffer(&self) -> Bytes {
todo!()
Expand Down Expand Up @@ -714,6 +701,22 @@ impl MappedFile for DefaultMappedFile {
fn is_loaded(&self, position: i64, size: usize) -> bool {
true
}

fn select_mapped_buffer_with_position(&self, pos: i32) -> Option<SelectMappedBufferResult> {
let read_position = self.get_read_position();
if pos < read_position && pos >= 0 && MappedFile::hold(self) {
self.mapped_byte_buffer_access_count_since_last_swap
.fetch_add(1, Ordering::AcqRel);
Some(SelectMappedBufferResult {
start_offset: self.get_file_from_offset() + pos as u64,
size: read_position - pos,
mapped_file: None,
is_in_cache: true,
})
} else {
None
}
}
}

#[allow(unused_variables)]
Expand Down
18 changes: 13 additions & 5 deletions rocketmq-store/src/queue/single_consume_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,12 @@ impl ConsumeQueue {
.mapped_file_queue
.find_mapped_file_by_offset(offset, false)
{
return mapped_file.select_mapped_buffer((offset % mapped_file_size as i64) as i32);
let mut result = mapped_file
.select_mapped_buffer_with_position((offset % mapped_file_size as i64) as i32);
if let Some(ref mut result) = result {
result.mapped_file = Some(mapped_file);
}
return result;
}
}
None
Expand Down Expand Up @@ -631,11 +636,13 @@ impl ConsumeQueueTrait for ConsumeQueue {
}
let last_mapped_file = last_mapped_file.unwrap();
let max_readable_position = last_mapped_file.get_read_position();
let last_record = MappedFile::select_mapped_buffer_size(
last_mapped_file.clone(),
let mut last_record = last_mapped_file.select_mapped_buffer(
max_readable_position - CQ_STORE_UNIT_SIZE,
CQ_STORE_UNIT_SIZE,
);
if let Some(ref mut result) = last_record {
result.mapped_file = Some(last_mapped_file.clone());
}
if let Some(last_record) = last_record {
let mut bytes = last_record
.mapped_file
Expand Down Expand Up @@ -680,7 +687,7 @@ impl ConsumeQueueTrait for ConsumeQueue {
);
return;
}
let result = MappedFile::select_mapped_buffer(mapped_file.clone(), start as i32);
let result = mapped_file.select_mapped_buffer_with_position(start as i32);
if result.is_none() {
warn!(
"[Bug] Failed to scan consume queue entries from file on correcting min \
Expand All @@ -689,7 +696,8 @@ impl ConsumeQueueTrait for ConsumeQueue {
);
return;
}
let result = result.unwrap();
let mut result = result.unwrap();
result.mapped_file = Some(mapped_file);
if result.size == 0 {
debug!(
"ConsumeQueue[topic={}, queue-id={}] contains no valid entries",
Expand Down

0 comments on commit be21e0c

Please sign in to comment.