Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigating partitioned_vector problems #6050

Merged
merged 1 commit into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (c) 2014 Anuj R. Sharma
// Copyright (c) 2014-2017 Hartmut Kaiser
// Copyright (c) 2014-2022 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down Expand Up @@ -339,8 +339,8 @@ namespace hpx {
partitioned_vector_partition<T, Data>::get_ptr() const
{
error_code ec(throwmode::lightweight);
return hpx::get_ptr<server::partitioned_vector<T, Data>>(this->get_id())
.get(ec);
return hpx::get_ptr<server::partitioned_vector<T, Data>>(
hpx::launch::sync, this->get_id(), ec);
}

template <typename T, typename Data /*= std::vector<T> */>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (c) 2014 Anuj R. Sharma
// Copyright (c) 2014-2017 Hartmut Kaiser
// Copyright (c) 2014-2022 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (c) 2014 Anuj R. Sharma
// Copyright (c) 2014-2017 Hartmut Kaiser
// Copyright (c) 2014-2022 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand All @@ -12,7 +12,6 @@
#include <hpx/assert.hpp>
#include <hpx/async_base/launch_policy.hpp>
#include <hpx/async_combinators/wait_all.hpp>
#include <hpx/async_combinators/when_all.hpp>
#include <hpx/components/client_base.hpp>
#include <hpx/components/get_ptr.hpp>
#include <hpx/distribution_policies/container_distribution_policy.hpp>
Expand Down Expand Up @@ -73,6 +72,7 @@ namespace hpx {

std::uint32_t this_locality = get_locality_id();
std::vector<future<void>> ptrs;
ptrs.reserve(partitions_.size());

typedef typename partitions_vector_type::const_iterator const_iterator;

Expand Down Expand Up @@ -346,6 +346,7 @@ namespace hpx {
// now initialize our data structures
std::uint32_t this_locality = get_locality_id();
std::vector<future<void>> ptrs;
ptrs.reserve(num_parts);

std::size_t num_part = 0;
std::size_t allocated_size = 0;
Expand Down Expand Up @@ -397,7 +398,7 @@ namespace hpx {
}
HPX_ASSERT(l == num_parts);

hpx::when_all(ptrs).get();
hpx::wait_all(ptrs);

// cache our partition size
partition_size_ = get_partition_size();
Expand Down Expand Up @@ -440,11 +441,12 @@ namespace hpx {

std::uint32_t this_locality = get_locality_id();
std::vector<future<void>> ptrs;
ptrs.reserve(rhs.partitions_.size());

partitions_vector_type partitions;
// Fixing the size of partitions to avoid race conditions between
// possible reallocations during push back and the continuation
// to set the local partition data
partitions_vector_type partitions;
partitions.resize(rhs.partitions_.size());
for (std::size_t i = 0; i != rhs.partitions_.size(); ++i)
{
Expand All @@ -461,7 +463,7 @@ namespace hpx {
}
}

hpx::when_all(ptrs).get();
hpx::wait_all(ptrs);

size_ = rhs.size_;
partition_size_ = rhs.partition_size_;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (c) 2014 Anuj R. Sharma
// Copyright (c) 2014-2016 Hartmut Kaiser
// Copyright (c) 2014-2022 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down
3 changes: 2 additions & 1 deletion examples/throttle/spin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ int hpx_main()

for (id_type const& locality_ : localities)
{
address addr = hpx::agas::resolve(locality_).get();
address addr =
hpx::agas::resolve(hpx::launch::sync, locality_);

hpx::util::format_to(std::cout, " [{1}] {2}\n",
get_locality_id_from_gid(locality_.get_gid()),
Expand Down
1 change: 1 addition & 0 deletions libs/core/futures/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ set(futures_headers
hpx/futures/future.hpp
hpx/futures/future_fwd.hpp
hpx/futures/futures_factory.hpp
hpx/futures/future_or_value.hpp
hpx/futures/detail/future_data.hpp
hpx/futures/detail/future_transforms.hpp
hpx/futures/packaged_continuation.hpp
Expand Down
71 changes: 71 additions & 0 deletions libs/core/futures/include/hpx/futures/future_or_value.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) 2022 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#pragma once

#include <hpx/config.hpp>
#include <hpx/datastructures/variant.hpp>
#include <hpx/futures/future.hpp>

namespace hpx {

template <typename T>
struct future_or_value
{
future_or_value(T const& value)
: data(value)
{
}

future_or_value(T&& value) noexcept
: data(HPX_MOVE(value))
{
}

future_or_value(hpx::future<T>&& value) noexcept
: data(HPX_MOVE(value))
{
}

constexpr bool has_value() const noexcept
{
return hpx::holds_alternative<T>(data);
}
constexpr bool has_future() const noexcept
{
return hpx::holds_alternative<hpx::future<T>>(data);
}

T& get_value() &
{
return hpx::get<T>(data);
}
T const& get_value() const&
{
return hpx::get<T>(data);
}
T&& get_value() &&
{
return hpx::get<T>(HPX_MOVE(data));
}

hpx::future<T>& get_future() &
{
return hpx::get<hpx::future<T>>(data);
}
hpx::future<T> const& get_future() const&
{
return hpx::get<hpx::future<T>>(data);
}
hpx::future<T>&& get_future() &&
{
return hpx::get<hpx::future<T>>(HPX_MOVE(data));
}

private:
hpx::variant<T, hpx::future<T>> data;
};
} // namespace hpx
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,16 @@ namespace hpx::serialization::detail {
++num_futures_;
}

void decrement_future_count()
{
std::lock_guard<mutex_type> l(mtx_);
HPX_ASSERT(num_futures_ > 0);
if (--num_futures_ == 0)
{
done_ = true;
}
}

void reset()
{
std::lock_guard<mutex_type> l(mtx_);
Expand Down
12 changes: 5 additions & 7 deletions libs/full/actions/include/hpx/actions/transfer_action.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,14 @@ namespace hpx::actions {
{
// If this is a direct action and deferred schedule was requested,
// that is we are not the last parcel, return immediately
if (base_type::direct_execution::value)
if constexpr (base_type::direct_execution::value)
{
return;
}
else
{
// If this is not a direct action, we can safely set
// deferred_schedule to false
deferred_schedule = false;
}

// If this is not a direct action, we can safely set
// deferred_schedule to false
deferred_schedule = false;
}

schedule_thread(HPX_MOVE(target), lva, comptype, num_thread);
Expand Down
33 changes: 24 additions & 9 deletions libs/full/agas/include/hpx/agas/addressing_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <hpx/functional/function.hpp>
#include <hpx/modules/agas_base.hpp>
#include <hpx/modules/errors.hpp>
#include <hpx/modules/futures.hpp>
#include <hpx/modules/runtime_configuration.hpp>
#include <hpx/naming_base/address.hpp>
#include <hpx/naming_base/id_type.hpp>
Expand Down Expand Up @@ -177,7 +178,7 @@ namespace hpx { namespace agas {
void garbage_collect(error_code& ec = throws);

static std::int64_t synchronize_with_async_incref(
hpx::future<std::int64_t> fut, hpx::id_type const& id,
std::int64_t old_credit, hpx::id_type const& id,
std::int64_t compensated_credit);

server::primary_namespace& get_local_primary_namespace_service()
Expand Down Expand Up @@ -221,7 +222,7 @@ namespace hpx { namespace agas {
util::runtime_configuration& rtcfg);

naming::address resolve_full_postproc(naming::gid_type const& id,
future<primary_namespace::resolved_type> f);
primary_namespace::resolved_type const&);
bool bind_postproc(
naming::gid_type const& id, gva const& g, future<bool> f);

Expand Down Expand Up @@ -856,15 +857,17 @@ namespace hpx { namespace agas {
}

///////////////////////////////////////////////////////////////////////////
hpx::future<naming::address> resolve_async(naming::gid_type const& id);
hpx::future_or_value<naming::address> resolve_async(
naming::gid_type const& id);

hpx::future<naming::address> resolve_async(hpx::id_type const& id)
hpx::future_or_value<naming::address> resolve_async(
hpx::id_type const& id)
{
return resolve_async(id.get_gid());
}

///////////////////////////////////////////////////////////////////////////
hpx::future<hpx::id_type> get_colocation_id_async(
hpx::future_or_value<id_type> get_colocation_id_async(
hpx::id_type const& id);

///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -893,10 +896,11 @@ namespace hpx { namespace agas {
return addr;
}

hpx::future<naming::address> resolve_full_async(
hpx::future_or_value<naming::address> resolve_full_async(
naming::gid_type const& id);

hpx::future<naming::address> resolve_full_async(hpx::id_type const& id)
hpx::future_or_value<naming::address> resolve_full_async(
hpx::id_type const& id)
{
return resolve_full_async(id.get_gid());
}
Expand Down Expand Up @@ -978,14 +982,25 @@ namespace hpx { namespace agas {
/// throw but returns the result code using the
/// parameter \a ec. Otherwise it throws an instance
/// of hpx#exception.
hpx::future<std::int64_t> incref_async(naming::gid_type const& gid,
hpx::future_or_value<std::int64_t> incref_async(
naming::gid_type const& gid, std::int64_t credits = 1,
hpx::id_type const& keep_alive = hpx::invalid_id);

/// \cond NOINTERN
std::int64_t incref_async_helper(naming::gid_type const& gid,
std::int64_t credits = 1,
hpx::id_type const& keep_alive = hpx::invalid_id);
/// \endcond

std::int64_t incref(naming::gid_type const& gid,
std::int64_t credits = 1, error_code& ec = throws)
{
return incref_async(gid, credits).get(ec);
auto result = incref_async(gid, credits);
if (result.has_value())
{
return HPX_MOVE(result).get_value();
}
return result.get_future().get(ec);
}

/// \brief Decrement the global reference count for the given id
Expand Down
Loading