Skip to content

Commit

Permalink
Internal Change
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 705613520
  • Loading branch information
aayooush authored and copybara-github committed Dec 13, 2024
1 parent fb97d67 commit 7b546d9
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 45 deletions.
30 changes: 16 additions & 14 deletions cpp/array_record_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ void ArrayRecordReaderBase::Initialize() {
if (state_->pool) {
max_parallelism = state_->pool->NumThreads();
if (state_->options.max_parallelism().has_value()) {
max_parallelism =
std::min(max_parallelism, state_->options.max_parallelism().value());
max_parallelism = std::min<size_t>(
max_parallelism, state_->options.max_parallelism().value());
}
}
state_->options.set_max_parallelism(max_parallelism);
Expand Down Expand Up @@ -324,16 +324,16 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecords(
if (state_->chunk_offsets.empty()) {
return absl::OkStatus();
}
uint64_t num_chunk_groups =
CeilOfRatio(state_->chunk_offsets.size(), state_->chunk_group_size);
uint64_t num_chunk_groups = CeilOfRatio<size_t>(state_->chunk_offsets.size(),
state_->chunk_group_size);
const auto reader = get_backing_reader();
auto status = ParallelForWithStatus<1>(
Seq(num_chunk_groups), state_->pool, [&](size_t buf_idx) -> absl::Status {
uint64_t chunk_idx_start = buf_idx * state_->chunk_group_size;
// inclusive index, not the conventional exclusive index.
uint64_t last_chunk_idx =
std::min((buf_idx + 1) * state_->chunk_group_size - 1,
state_->chunk_offsets.size() - 1);
std::min<size_t>((buf_idx + 1) * state_->chunk_group_size - 1,
state_->chunk_offsets.size() - 1);
uint64_t buf_len = state_->ChunkEndOffset(last_chunk_idx) -
state_->chunk_offsets[chunk_idx_start];
AR_ENDO_JOB(
Expand Down Expand Up @@ -398,17 +398,18 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecordsInRange(
begin, end, NumRecords());
}
uint64_t chunk_idx_begin = begin / state_->record_group_size;
uint64_t chunk_idx_end = CeilOfRatio(end, state_->record_group_size);
uint64_t chunk_idx_end = CeilOfRatio<size_t>(end, state_->record_group_size);
uint64_t num_chunks = chunk_idx_end - chunk_idx_begin;
uint64_t num_chunk_groups = CeilOfRatio(num_chunks, state_->chunk_group_size);
uint64_t num_chunk_groups =
CeilOfRatio<size_t>(num_chunks, state_->chunk_group_size);

const auto reader = get_backing_reader();
auto status = ParallelForWithStatus<1>(
Seq(num_chunk_groups), state_->pool, [&](size_t buf_idx) -> absl::Status {
uint64_t chunk_idx_start =
chunk_idx_begin + buf_idx * state_->chunk_group_size;
// inclusive index, not the conventional exclusive index.
uint64_t last_chunk_idx = std::min(
uint64_t last_chunk_idx = std::min<size_t>(
chunk_idx_begin + (buf_idx + 1) * state_->chunk_group_size - 1,
chunk_idx_end - 1);
uint64_t buf_len = state_->ChunkEndOffset(last_chunk_idx) -
Expand Down Expand Up @@ -604,7 +605,7 @@ bool ArrayRecordReaderBase::SeekRecord(uint64_t record_index) {
if (!ok()) {
return false;
}
state_->record_idx = std::min(record_index, state_->num_records);
state_->record_idx = std::min<size_t>(record_index, state_->num_records);
return true;
}

Expand Down Expand Up @@ -654,8 +655,9 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) {
std::vector<ChunkDecoder> decoders;
decoders.reserve(state_->chunk_group_size);
uint64_t chunk_start = buffer_idx * state_->chunk_group_size;
uint64_t chunk_end = std::min(state_->chunk_offsets.size(),
(buffer_idx + 1) * state_->chunk_group_size);
uint64_t chunk_end =
std::min<size_t>(state_->chunk_offsets.size(),
(buffer_idx + 1) * state_->chunk_group_size);
const auto reader = get_backing_reader();
for (uint64_t chunk_idx = chunk_start; chunk_idx < chunk_end; ++chunk_idx) {
uint64_t chunk_offset = state_->chunk_offsets[chunk_idx];
Expand Down Expand Up @@ -695,8 +697,8 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) {
chunk_offsets.reserve(state_->chunk_group_size);
uint64_t chunk_start = buffer_to_add * state_->chunk_group_size;
uint64_t chunk_end =
std::min(state_->chunk_offsets.size(),
(buffer_to_add + 1) * state_->chunk_group_size);
std::min<size_t>(state_->chunk_offsets.size(),
(buffer_to_add + 1) * state_->chunk_group_size);
for (uint64_t chunk_idx = chunk_start; chunk_idx < chunk_end; ++chunk_idx) {
chunk_offsets.push_back(state_->chunk_offsets[chunk_idx]);
}
Expand Down
31 changes: 17 additions & 14 deletions cpp/shareable_dependency_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License.

#include "cpp/shareable_dependency.h"

#include <memory>
#include <optional>
#include <utility>

Expand Down Expand Up @@ -73,13 +74,14 @@ TEST_F(ShareableDependencyTest, SanityTest) {
EXPECT_FALSE(new_main.IsUnique()); // NOLINT(bugprone-use-after-move)

absl::Notification notification;
pool_->Schedule([refobj = main.Share(), &notification] {
notification.WaitForNotification();
absl::SleepFor(absl::Milliseconds(10));
EXPECT_EQ(refobj->value(), 1);
const auto second_ref = refobj;
refobj->add_value(1);
});
pool_->Schedule(
[refobj = std::make_shared<DependencyShare<FooBase*>>(main.Share()),
&notification] {
notification.WaitForNotification();
absl::SleepFor(absl::Milliseconds(10));
EXPECT_EQ(refobj->get()->value(), 1);
refobj->get()->add_value(1);
});
EXPECT_FALSE(main.IsUnique());
notification.Notify();
auto& unique = main.WaitUntilUnique();
Expand All @@ -97,13 +99,14 @@ TEST_F(ShareableDependencyTest, SanityTestWithReset) {
EXPECT_TRUE(main.IsUnique());

absl::Notification notification;
pool_->Schedule([refobj = main.Share(), &notification] {
notification.WaitForNotification();
absl::SleepFor(absl::Milliseconds(10));
EXPECT_EQ(refobj->value(), 1);
const auto second_ref = refobj;
refobj->add_value(1);
});
pool_->Schedule(
[refobj = std::make_shared<DependencyShare<FooBase*>>(main.Share()),
&notification] {
notification.WaitForNotification();
absl::SleepFor(absl::Milliseconds(10));
EXPECT_EQ(refobj->get()->value(), 1);
refobj->get()->add_value(1);
});
EXPECT_FALSE(main.IsUnique());
notification.Notify();
auto& unique = main.WaitUntilUnique();
Expand Down
9 changes: 6 additions & 3 deletions oss/build_whl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ function main() {
write_to_bazelrc "build -c opt"
write_to_bazelrc "build --cxxopt=-std=c++17"
write_to_bazelrc "build --host_cxxopt=-std=c++17"
write_to_bazelrc "build --linkopt=\"-lrt -lm\""
write_to_bazelrc "build --experimental_repo_remote_exec"
write_to_bazelrc "build --python_path=\"${PYTHON_BIN}\""
PLATFORM="$(uname)"
if [[ "$PLATFORM" != "Darwin" ]]; then
write_to_bazelrc "build --linkopt=\"-lrt -lm\""
fi

if [ -n "${CROSSTOOL_TOP}" ]; then
write_to_bazelrc "build --crosstool_top=${CROSSTOOL_TOP}"
Expand All @@ -40,8 +43,8 @@ function main() {

export USE_BAZEL_VERSION="${BAZEL_VERSION}"
bazel clean
bazel build ...
bazel test --verbose_failures --test_output=errors ...
bazel build ... --action_env PYTHON_BIN_PATH="${PYTHON_BIN}"
bazel test --verbose_failures --test_output=errors ... --action_env PYTHON_BIN_PATH="${PYTHON_BIN}"

DEST="/tmp/array_record/all_dist"
# Create the directory, then do dirname on a non-existent file inside it to
Expand Down
46 changes: 32 additions & 14 deletions oss/runner_common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,26 @@ function install_and_init_pyenv {
if [[ ! -d $PYENV_ROOT ]]; then
echo "Installing pyenv.."
git clone https://github.com/pyenv/pyenv.git "$PYENV_ROOT"
pushd "$PYENV_ROOT"
git checkout "v2.4.21"
popd
export PATH="/home/kbuilder/.local/bin:$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init --path)"
fi

echo "Python setup..."
pyenv install -s "$PYENV_PYTHON_VERSION"
pyenv global "$PYENV_PYTHON_VERSION"
PYTHON=$(pyenv which python)
export PYTHON_BIN=$(pyenv which python)
}

function setup_env_vars_py310 {
function setup_env_vars_py {
# This controls the python binary to use.
PYTHON=python3.10
PYTHON_STR=python3.10
PYTHON_MAJOR_VERSION=3
PYTHON_MINOR_VERSION=10
PYTHON_MAJOR_VERSION=$1
PYTHON_MINOR_VERSION=$2
# This is for pyenv install.
PYENV_PYTHON_VERSION=3.10.13
PYENV_PYTHON_VERSION=${PYTHON_MAJOR_VERSION}.${PYTHON_MINOR_VERSION}
PYTHON="python$PYENV_PYTHON_VERSION"
}

function update_bazel_macos {
Expand All @@ -78,7 +80,17 @@ function update_bazel_macos {
./bazel-${BAZEL_VERSION}-installer-darwin-${ARCH}.sh --user
rm -f ./bazel-${BAZEL_VERSION}-installer-darwin-${ARCH}.sh
# Add new bazel installation to path
PATH="/Users/kbuilder/bin:$PATH"
export PATH="/Users/kbuilder/bin:$PATH"
}

function install_ar_deps {
$PYTHON_BIN -m pip install -U \
absl-py \
build \
etils[epath] \
setuptools \
twine \
wheel;
}

function build_and_test_array_record_macos() {
Expand All @@ -90,13 +102,19 @@ function build_and_test_array_record_macos() {
update_bazel_macos ${BAZEL_VERSION}
bazel --version

# Set up Pyenv.
setup_env_vars_py310
install_and_init_pyenv
PYTHON_MAJOR_VERSION=3
for PYTHON_MINOR_VERSION in 10 11 12
do
# Set up Pyenv.
PYTHON_VERSION=${PYTHON_MAJOR_VERSION}.${PYTHON_MINOR_VERSION}
echo "Creating array_record wheel for Python Version $PYTHON_VERSION"
setup_env_vars_py $PYTHON_MAJOR_VERSION $PYTHON_MINOR_VERSION
install_and_init_pyenv
install_ar_deps

# Build and test ArrayRecord.
cd ${SOURCE_DIR}
bash ${SOURCE_DIR}/oss/build_whl.sh
# Build and test ArrayRecord.
bash ${SOURCE_DIR}/oss/build_whl.sh
done

ls ${SOURCE_DIR}/all_dist/*.whl
}

0 comments on commit 7b546d9

Please sign in to comment.