Skip to content

Commit

Permalink
Merge #6050
Browse files Browse the repository at this point in the history
6050: Investigating partitioned_vector problems r=hkaiser a=hkaiser

- flyby: reduce the need for make_ready_future for various AGAS operations


Co-authored-by: Hartmut Kaiser <[email protected]>
  • Loading branch information
StellarBot and hkaiser committed Aug 28, 2023
2 parents 8edd9b3 + 7f61e4c commit 962288c
Show file tree
Hide file tree
Showing 30 changed files with 483 additions and 205 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (c) 2014 Anuj R. Sharma
// Copyright (c) 2014-2017 Hartmut Kaiser
// Copyright (c) 2014-2022 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down Expand Up @@ -339,8 +339,8 @@ namespace hpx {
partitioned_vector_partition<T, Data>::get_ptr() const
{
error_code ec(throwmode::lightweight);
return hpx::get_ptr<server::partitioned_vector<T, Data>>(this->get_id())
.get(ec);
return hpx::get_ptr<server::partitioned_vector<T, Data>>(
hpx::launch::sync, this->get_id(), ec);
}

template <typename T, typename Data /*= std::vector<T> */>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (c) 2014 Anuj R. Sharma
// Copyright (c) 2014-2017 Hartmut Kaiser
// Copyright (c) 2014-2022 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (c) 2014 Anuj R. Sharma
// Copyright (c) 2014-2017 Hartmut Kaiser
// Copyright (c) 2014-2022 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand All @@ -12,7 +12,6 @@
#include <hpx/assert.hpp>
#include <hpx/async_base/launch_policy.hpp>
#include <hpx/async_combinators/wait_all.hpp>
#include <hpx/async_combinators/when_all.hpp>
#include <hpx/components/client_base.hpp>
#include <hpx/components/get_ptr.hpp>
#include <hpx/distribution_policies/container_distribution_policy.hpp>
Expand Down Expand Up @@ -73,6 +72,7 @@ namespace hpx {

std::uint32_t this_locality = get_locality_id();
std::vector<future<void>> ptrs;
ptrs.reserve(partitions_.size());

typedef typename partitions_vector_type::const_iterator const_iterator;

Expand Down Expand Up @@ -346,6 +346,7 @@ namespace hpx {
// now initialize our data structures
std::uint32_t this_locality = get_locality_id();
std::vector<future<void>> ptrs;
ptrs.reserve(num_parts);

std::size_t num_part = 0;
std::size_t allocated_size = 0;
Expand Down Expand Up @@ -397,7 +398,7 @@ namespace hpx {
}
HPX_ASSERT(l == num_parts);

hpx::when_all(ptrs).get();
hpx::wait_all(ptrs);

// cache our partition size
partition_size_ = get_partition_size();
Expand Down Expand Up @@ -440,11 +441,12 @@ namespace hpx {

std::uint32_t this_locality = get_locality_id();
std::vector<future<void>> ptrs;
ptrs.reserve(rhs.partitions_.size());

partitions_vector_type partitions;
// Fixing the size of partitions to avoid race conditions between
// possible reallocations during push back and the continuation
// to set the local partition data
partitions_vector_type partitions;
partitions.resize(rhs.partitions_.size());
for (std::size_t i = 0; i != rhs.partitions_.size(); ++i)
{
Expand All @@ -461,7 +463,7 @@ namespace hpx {
}
}

hpx::when_all(ptrs).get();
hpx::wait_all(ptrs);

size_ = rhs.size_;
partition_size_ = rhs.partition_size_;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (c) 2014 Anuj R. Sharma
// Copyright (c) 2014-2016 Hartmut Kaiser
// Copyright (c) 2014-2022 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down
3 changes: 2 additions & 1 deletion examples/throttle/spin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ int hpx_main()

for (id_type const& locality_ : localities)
{
address addr = hpx::agas::resolve(locality_).get();
address addr =
hpx::agas::resolve(hpx::launch::sync, locality_);

hpx::util::format_to(std::cout, " [{1}] {2}\n",
get_locality_id_from_gid(locality_.get_gid()),
Expand Down
1 change: 1 addition & 0 deletions libs/core/futures/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ set(futures_headers
hpx/futures/future.hpp
hpx/futures/future_fwd.hpp
hpx/futures/futures_factory.hpp
hpx/futures/future_or_value.hpp
hpx/futures/detail/future_data.hpp
hpx/futures/detail/future_transforms.hpp
hpx/futures/packaged_continuation.hpp
Expand Down
71 changes: 71 additions & 0 deletions libs/core/futures/include/hpx/futures/future_or_value.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) 2022 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#pragma once

#include <hpx/config.hpp>
#include <hpx/datastructures/variant.hpp>
#include <hpx/futures/future.hpp>

namespace hpx {

template <typename T>
struct future_or_value
{
future_or_value(T const& value)
: data(value)
{
}

future_or_value(T&& value) noexcept
: data(HPX_MOVE(value))
{
}

future_or_value(hpx::future<T>&& value) noexcept
: data(HPX_MOVE(value))
{
}

constexpr bool has_value() const noexcept
{
return hpx::holds_alternative<T>(data);
}
constexpr bool has_future() const noexcept
{
return hpx::holds_alternative<hpx::future<T>>(data);
}

T& get_value() &
{
return hpx::get<T>(data);
}
T const& get_value() const&
{
return hpx::get<T>(data);
}
T&& get_value() &&
{
return hpx::get<T>(HPX_MOVE(data));
}

hpx::future<T>& get_future() &
{
return hpx::get<hpx::future<T>>(data);
}
hpx::future<T> const& get_future() const&
{
return hpx::get<hpx::future<T>>(data);
}
hpx::future<T>&& get_future() &&
{
return hpx::get<hpx::future<T>>(HPX_MOVE(data));
}

private:
hpx::variant<T, hpx::future<T>> data;
};
} // namespace hpx
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,16 @@ namespace hpx::serialization::detail {
++num_futures_;
}

void decrement_future_count()
{
std::lock_guard<mutex_type> l(mtx_);
HPX_ASSERT(num_futures_ > 0);
if (--num_futures_ == 0)
{
done_ = true;
}
}

void reset()
{
std::lock_guard<mutex_type> l(mtx_);
Expand Down
12 changes: 5 additions & 7 deletions libs/full/actions/include/hpx/actions/transfer_action.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,14 @@ namespace hpx::actions {
{
// If this is a direct action and deferred schedule was requested,
// that is we are not the last parcel, return immediately
if (base_type::direct_execution::value)
if constexpr (base_type::direct_execution::value)
{
return;
}
else
{
// If this is not a direct action, we can safely set
// deferred_schedule to false
deferred_schedule = false;
}

// If this is not a direct action, we can safely set
// deferred_schedule to false
deferred_schedule = false;
}

schedule_thread(HPX_MOVE(target), lva, comptype, num_thread);
Expand Down
33 changes: 24 additions & 9 deletions libs/full/agas/include/hpx/agas/addressing_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <hpx/functional/function.hpp>
#include <hpx/modules/agas_base.hpp>
#include <hpx/modules/errors.hpp>
#include <hpx/modules/futures.hpp>
#include <hpx/modules/runtime_configuration.hpp>
#include <hpx/naming_base/address.hpp>
#include <hpx/naming_base/id_type.hpp>
Expand Down Expand Up @@ -177,7 +178,7 @@ namespace hpx { namespace agas {
void garbage_collect(error_code& ec = throws);

static std::int64_t synchronize_with_async_incref(
hpx::future<std::int64_t> fut, hpx::id_type const& id,
std::int64_t old_credit, hpx::id_type const& id,
std::int64_t compensated_credit);

server::primary_namespace& get_local_primary_namespace_service()
Expand Down Expand Up @@ -221,7 +222,7 @@ namespace hpx { namespace agas {
util::runtime_configuration& rtcfg);

naming::address resolve_full_postproc(naming::gid_type const& id,
future<primary_namespace::resolved_type> f);
primary_namespace::resolved_type const&);
bool bind_postproc(
naming::gid_type const& id, gva const& g, future<bool> f);

Expand Down Expand Up @@ -856,15 +857,17 @@ namespace hpx { namespace agas {
}

///////////////////////////////////////////////////////////////////////////
hpx::future<naming::address> resolve_async(naming::gid_type const& id);
hpx::future_or_value<naming::address> resolve_async(
naming::gid_type const& id);

hpx::future<naming::address> resolve_async(hpx::id_type const& id)
hpx::future_or_value<naming::address> resolve_async(
hpx::id_type const& id)
{
return resolve_async(id.get_gid());
}

///////////////////////////////////////////////////////////////////////////
hpx::future<hpx::id_type> get_colocation_id_async(
hpx::future_or_value<id_type> get_colocation_id_async(
hpx::id_type const& id);

///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -893,10 +896,11 @@ namespace hpx { namespace agas {
return addr;
}

hpx::future<naming::address> resolve_full_async(
hpx::future_or_value<naming::address> resolve_full_async(
naming::gid_type const& id);

hpx::future<naming::address> resolve_full_async(hpx::id_type const& id)
hpx::future_or_value<naming::address> resolve_full_async(
hpx::id_type const& id)
{
return resolve_full_async(id.get_gid());
}
Expand Down Expand Up @@ -978,14 +982,25 @@ namespace hpx { namespace agas {
/// throw but returns the result code using the
/// parameter \a ec. Otherwise it throws an instance
/// of hpx#exception.
hpx::future<std::int64_t> incref_async(naming::gid_type const& gid,
hpx::future_or_value<std::int64_t> incref_async(
naming::gid_type const& gid, std::int64_t credits = 1,
hpx::id_type const& keep_alive = hpx::invalid_id);

/// \cond NOINTERN
std::int64_t incref_async_helper(naming::gid_type const& gid,
std::int64_t credits = 1,
hpx::id_type const& keep_alive = hpx::invalid_id);
/// \endcond

std::int64_t incref(naming::gid_type const& gid,
std::int64_t credits = 1, error_code& ec = throws)
{
return incref_async(gid, credits).get(ec);
auto result = incref_async(gid, credits);
if (result.has_value())
{
return HPX_MOVE(result).get_value();
}
return result.get_future().get(ec);
}

/// \brief Decrement the global reference count for the given id
Expand Down
Loading

0 comments on commit 962288c

Please sign in to comment.