-
-
Notifications
You must be signed in to change notification settings - Fork 497
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fixes #1572 nng creates too many threads
This further limits some of the thread counts, but principally it offers a new runtime facility, nng_init_set_parameter(), which can be used to set certain runtime parameters on the number of threads, provided it is called before the rest of application start up. This facility is quite intentionally "undocumented", at least for now, as we want to limit our commitment to it. Still this should be helpful for applications that need to reduce the number of threads that are created.
- Loading branch information
Showing
12 changed files
with
415 additions
and
134 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
# | ||
# Copyright 2023 Staysail Systems, Inc. <[email protected]> | ||
# Copyright 2024 Staysail Systems, Inc. <[email protected]> | ||
# Copyright (c) 2012 Martin Sustrik All rights reserved. | ||
# Copyright (c) 2013 GoPivotal, Inc. All rights reserved. | ||
# Copyright (c) 2015-2016 Jack R. Dunaway. All rights reserved. | ||
|
@@ -114,24 +114,32 @@ endif () | |
|
||
nng_defines_if(NNG_ENABLE_STATS NNG_ENABLE_STATS) | ||
|
||
set(NNG_RESOLV_CONCURRENCY 4 CACHE STRING "Resolver (DNS) concurrency.") | ||
mark_as_advanced(NNG_RESOLV_CONCURRENCY) | ||
if (NNG_RESOLV_CONCURRENCY) | ||
add_definitions(-DNNG_RESOLV_CONCURRENCY=${NNG_RESOLV_CONCURRENCY}) | ||
endif () | ||
mark_as_advanced(NNG_RESOLV_CONCURRENCY) | ||
|
||
set(NNG_NUM_TASKQ_THREADS 0 CACHE STRING "Fixed number of task threads, 0 for automatic") | ||
mark_as_advanced(NNG_NUM_TASKQ_THREADS) | ||
if (NNG_NUM_TASKQ_THREADS) | ||
add_definitions(-DNNG_NUM_TASKQ_THREADS=${NNG_NUM_TASKQ_THREADS}) | ||
endif () | ||
mark_as_advanced(NNG_NUM_TASKQ_THREADS) | ||
|
||
set(NNG_MAX_TASKQ_THREADS 16 CACHE STRING "Upper bound on taskq threads, 0 for no limit") | ||
set(NNG_MAX_TASKQ_THREADS 16 CACHE STRING "Upper bound on task threads, 0 for no limit") | ||
mark_as_advanced(NNG_MAX_TASKQ_THREADS) | ||
if (NNG_MAX_TASKQ_THREADS) | ||
add_definitions(-DNNG_MAX_TASKQ_THREADS=${NNG_MAX_TASKQ_THREADS}) | ||
endif () | ||
|
||
# Expire threads. This runs the timeout handling, and having more of them | ||
# reduces contention on the common locks used for aio expiration. | ||
set(NNG_NUM_EXPIRE_THREADS 0 CACHE STRING "Fixed number of expire threads, 0 for automatic") | ||
mark_as_advanced(NNG_NUM_EXPIRE_THREADS) | ||
if (NNG_NUM_EXPIRE_THREADS) | ||
add_definitions(-DNNG_NUM_EXPIRE_THREADS=${NNG_NUM_EXPIRE_THREADS}) | ||
endif () | ||
|
||
set(NNG_MAX_EXPIRE_THREADS 8 CACHE STRING "Upper bound on expire threads, 0 for no limit") | ||
mark_as_advanced(NNG_MAX_EXPIRE_THREADS) | ||
if (NNG_MAX_EXPIRE_THREADS) | ||
|
@@ -140,6 +148,12 @@ endif() | |
|
||
# Poller threads. These threads run the pollers. This is mostly used | ||
# on Windows right now, as the POSIX platforms use a single threaded poller. | ||
set(NNG_NUM_POLLER_THREADS 0 CACHE STRING "Fixed number of I/O poller threads, 0 for automatic") | ||
if (NNG_NUM_POLLER_THREADS) | ||
add_definitions(-DNNG_NUM_POLLER_THREADS=${NNG_NUM_POLLER_THREADS}) | ||
endif () | ||
mark_as_advanced(NNG_NUM_POLLER_THREADS) | ||
|
||
set(NNG_MAX_POLLER_THREADS 8 CACHE STRING "Upper bound on I/O poller threads, 0 for no limit") | ||
mark_as_advanced(NNG_MAX_POLLER_THREADS) | ||
if (NNG_MAX_POLLER_THREADS) | ||
|
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
# | ||
# Copyright 2021 Staysail Systems, Inc. <[email protected]> | ||
# Copyright 2024 Staysail Systems, Inc. <[email protected]> | ||
# | ||
# This software is supplied under the terms of the MIT License, a | ||
# copy of which should be located in the distribution where this | ||
|
@@ -78,6 +78,7 @@ nng_test(aio_test) | |
nng_test(buf_size_test) | ||
nng_test(errors_test) | ||
nng_test(id_test) | ||
nng_test(init_test) | ||
nng_test(list_test) | ||
nng_test(message_test) | ||
nng_test(reconnect_test) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
// | ||
// Copyright 2023 Staysail Systems, Inc. <[email protected]> | ||
// Copyright 2024 Staysail Systems, Inc. <[email protected]> | ||
// Copyright 2018 Capitar IT Group BV <[email protected]> | ||
// | ||
// This software is supplied under the terms of the MIT License, a | ||
|
@@ -843,18 +843,28 @@ int | |
nni_aio_sys_init(void) | ||
{ | ||
int num_thr; | ||
int max_thr; | ||
|
||
#ifndef NNG_NUM_EXPIRE_THREADS | ||
num_thr = nni_plat_ncpu(); | ||
#else | ||
num_thr = NNG_NUM_EXPIRE_THREADS; | ||
#ifndef NNG_MAX_EXPIRE_THREADS | ||
#define NNG_MAX_EXPIRE_THREADS 8 | ||
#endif | ||
#if NNG_MAX_EXPIRE_THREADS > 0 | ||
if (num_thr > NNG_MAX_EXPIRE_THREADS) { | ||
num_thr = NNG_MAX_EXPIRE_THREADS; | ||
} | ||
|
||
#ifndef NNG_NUM_EXPIRE_THREADS | ||
#define NNG_NUM_EXPIRE_THREADS nni_plat_ncpu() | ||
#endif | ||
|
||
max_thr = (int) nni_init_get_param( | ||
NNG_INIT_MAX_EXPIRE_THREADS, NNG_MAX_EXPIRE_THREADS); | ||
|
||
num_thr = (int) nni_init_get_param( | ||
NNG_INIT_NUM_EXPIRE_THREADS, NNG_NUM_EXPIRE_THREADS); | ||
|
||
if ((max_thr > 0) && (num_thr > max_thr)) { | ||
num_thr = max_thr; | ||
} | ||
if (num_thr < 0) { | ||
num_thr = 1; | ||
} | ||
nni_aio_expire_q_list = | ||
nni_zalloc(sizeof(nni_aio_expire_q *) * num_thr); | ||
nni_aio_expire_q_cnt = num_thr; | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
// | ||
// Copyright 2023 Staysail Systems, Inc. <[email protected]> | ||
// Copyright 2024 Staysail Systems, Inc. <[email protected]> | ||
// Copyright 2018 Capitar IT Group BV <[email protected]> | ||
// | ||
// This software is supplied under the terms of the MIT License, a | ||
|
@@ -54,10 +54,69 @@ nni_init(void) | |
return (nni_plat_init(nni_init_helper)); | ||
} | ||
|
||
// accessing the list of parameters | ||
typedef struct nni_init_param { | ||
nni_list_node node; | ||
nng_init_parameter param; | ||
uint64_t value; | ||
} nni_init_param; | ||
|
||
static nni_list nni_init_params = | ||
NNI_LIST_INITIALIZER(nni_init_params, nni_init_param, node); | ||
|
||
void | ||
nni_init_set_param(nng_init_parameter p, uint64_t value) | ||
{ | ||
if (nni_inited) { | ||
// this is paranoia -- if some library code started already | ||
// then we cannot safely change parameters, and modifying the list is not | ||
// thread safe. | ||
return; | ||
} | ||
nni_init_param *item; | ||
NNI_LIST_FOREACH (&nni_init_params, item) { | ||
if (item->param == p) { | ||
item->value = value; | ||
return; | ||
} | ||
} | ||
if ((item = NNI_ALLOC_STRUCT(item)) != NULL) { | ||
item->param = p; | ||
item->value = value; | ||
nni_list_append(&nni_init_params, item); | ||
} | ||
} | ||
|
||
uint64_t | ||
nni_init_get_param(nng_init_parameter p, uint64_t default_value) | ||
{ | ||
nni_init_param *item; | ||
NNI_LIST_FOREACH (&nni_init_params, item) { | ||
if (item->param == p) { | ||
return (item->value); | ||
} | ||
} | ||
return (default_value); | ||
} | ||
|
||
static void | ||
nni_init_params_fini(void) | ||
{ | ||
nni_init_param *item; | ||
printf("FINI\n"); | ||
while ((item = nni_list_first(&nni_init_params)) != NULL) { | ||
printf("DOING a removal of %d", (int)item->param); | ||
nni_list_remove(&nni_init_params, item); | ||
NNI_FREE_STRUCT(item); | ||
} | ||
} | ||
|
||
void | ||
nni_fini(void) | ||
{ | ||
if (!nni_inited) { | ||
// make sure we discard parameters even if we didn't startup | ||
nni_init_params_fini(); | ||
return; | ||
} | ||
nni_sp_tran_sys_fini(); | ||
|
@@ -67,6 +126,7 @@ nni_fini(void) | |
nni_taskq_sys_fini(); | ||
nni_reap_sys_fini(); // must be before timer and aio (expire) | ||
nni_id_map_sys_fini(); | ||
nni_init_params_fini(); | ||
|
||
nni_plat_fini(); | ||
nni_inited = false; | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,6 @@ | ||
// | ||
// Copyright 2017 Garrett D'Amore <[email protected]> | ||
// Copyright 2024 Staysail Systems, Inc. <[email protected]> | ||
// Copyright 2017 Capitar IT Group BV <[email protected]> | ||
// Copyright 2017 Staysail Systems, Inc. <[email protected]> | ||
// | ||
// This software is supplied under the terms of the MIT License, a | ||
// copy of which should be located in the distribution where this | ||
|
@@ -23,4 +22,12 @@ int nni_init(void); | |
// that all resources used by the library are released back to the system. | ||
void nni_fini(void); | ||
|
||
// nni_init_param is used by applications (via nng_init_param) to configure | ||
// some tunable settings at runtime. It must be called before any other NNG | ||
// functions are called, in order to have any effect at all. | ||
void nni_init_set_param(nng_init_parameter, uint64_t value); | ||
|
||
// subsystems can call this to obtain a parameter value. | ||
uint64_t nni_init_get_param(nng_init_parameter parameter, uint64_t default_value); | ||
|
||
#endif // CORE_INIT_H |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
// | ||
// Copyright 2024 Staysail Systems, Inc. <[email protected]> | ||
// | ||
// This software is supplied under the terms of the MIT License, a | ||
// copy of which should be located in the distribution where this | ||
// file was obtained (LICENSE.txt). A copy of the license may also be | ||
// found online at https://opensource.org/licenses/MIT. | ||
// | ||
|
||
#include <nuts.h> | ||
#include "init.h" | ||
|
||
void | ||
test_init_param(void) | ||
{ | ||
NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 456) == 456); | ||
nng_init_set_parameter(NNG_INIT_PARAMETER_NONE, 123); | ||
NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 567) == 123); | ||
nng_fini(); | ||
NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 567) == 567); | ||
} | ||
|
||
|
||
NUTS_TESTS = { | ||
{ "init parameter", test_init_param }, | ||
{ NULL, NULL }, | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
// | ||
// Copyright 2022 Staysail Systems, Inc. <[email protected]> | ||
// Copyright 2024 Staysail Systems, Inc. <[email protected]> | ||
// Copyright 2018 Capitar IT Group BV <[email protected]> | ||
// | ||
// This software is supplied under the terms of the MIT License, a | ||
|
@@ -245,20 +245,31 @@ nni_task_fini(nni_task *task) | |
int | ||
nni_taskq_sys_init(void) | ||
{ | ||
int nthrs; | ||
int num_thr; | ||
int max_thr; | ||
|
||
#ifndef NNG_NUM_TASKQ_THREADS | ||
nthrs = nni_plat_ncpu() * 2; | ||
#else | ||
nthrs = NNG_NUM_TASKQ_THREADS; | ||
#define NNG_NUM_TASKQ_THREADS (nni_plat_ncpu() * 2) | ||
#endif | ||
#if NNG_MAX_TASKQ_THREADS > 0 | ||
if (nthrs > NNG_MAX_TASKQ_THREADS) { | ||
nthrs = NNG_MAX_TASKQ_THREADS; | ||
} | ||
|
||
#ifndef NNG_MAX_TASKQ_THREADS | ||
#define NNG_MAX_TASKQ_THREADS 16 | ||
#endif | ||
|
||
return (nni_taskq_init(&nni_taskq_systq, nthrs)); | ||
max_thr = (int) nni_init_get_param( | ||
NNG_INIT_MAX_TASK_THREADS, NNG_MAX_TASKQ_THREADS); | ||
|
||
num_thr = (int) nni_init_get_param( | ||
NNG_INIT_NUM_TASK_THREADS, NNG_NUM_TASKQ_THREADS); | ||
|
||
if (num_thr > max_thr) { | ||
num_thr = max_thr; | ||
} | ||
if (num_thr < 2) { | ||
num_thr = 2; | ||
} | ||
|
||
return (nni_taskq_init(&nni_taskq_systq, num_thr)); | ||
} | ||
|
||
void | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
// | ||
// Copyright 2021 Staysail Systems, Inc. <[email protected]> | ||
// Copyright 2024 Staysail Systems, Inc. <[email protected]> | ||
// Copyright 2018 Capitar IT Group BV <[email protected]> | ||
// | ||
// This software is supplied under the terms of the MIT License, a | ||
|
@@ -25,14 +25,10 @@ | |
// for it to ensure that names can be looked up concurrently. This isn't | ||
// as elegant or scalable as a true asynchronous resolver would be, but | ||
// it has the advantage of being fairly portable, and concurrent enough for | ||
// the vast, vast majority of use cases. The total thread count can be | ||
// the vast majority of use cases. The total thread count can be | ||
// changed with this define. Note that some platforms may not have a | ||
// thread-safe getaddrinfo(). In that case they should set this to 1. | ||
|
||
#ifndef NNG_RESOLV_CONCURRENCY | ||
#define NNG_RESOLV_CONCURRENCY 4 | ||
#endif | ||
|
||
#ifndef AI_NUMERICSERV | ||
#define AI_NUMERICSERV 0 | ||
#endif | ||
|
@@ -41,7 +37,8 @@ static nni_mtx resolv_mtx = NNI_MTX_INITIALIZER; | |
static nni_cv resolv_cv = NNI_CV_INITIALIZER(&resolv_mtx); | ||
static bool resolv_fini = false; | ||
static nni_list resolv_aios; | ||
static nni_thr resolv_thrs[NNG_RESOLV_CONCURRENCY]; | ||
static nni_thr *resolv_thrs; | ||
static int resolv_num_thr; | ||
|
||
typedef struct resolv_item resolv_item; | ||
struct resolv_item { | ||
|
@@ -450,14 +447,29 @@ nni_posix_resolv_sysinit(void) | |
resolv_fini = false; | ||
nni_aio_list_init(&resolv_aios); | ||
|
||
for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) { | ||
#ifndef NNG_RESOLV_CONCURRENCY | ||
#define NNG_RESOLV_CONCURRENCY 4 | ||
#endif | ||
|
||
resolv_num_thr = nni_init_get_param( | ||
NNG_INIT_NUM_RESOLVER_THREADS, NNG_RESOLV_CONCURRENCY); | ||
if (resolv_num_thr < 1) { | ||
resolv_num_thr = 1; | ||
} | ||
// no limit on the maximum for now | ||
resolv_thrs = nni_zalloc(sizeof (nni_thr *) * resolv_num_thr); | ||
if (resolv_thrs == NULL) { | ||
return (NNG_ENOMEM); | ||
} | ||
|
||
for (int i = 0; i < resolv_num_thr; i++) { | ||
int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL); | ||
if (rv != 0) { | ||
nni_posix_resolv_sysfini(); | ||
return (rv); | ||
} | ||
} | ||
for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) { | ||
for (int i = 0; i < resolv_num_thr; i++) { | ||
nni_thr_run(&resolv_thrs[i]); | ||
} | ||
|
||
|
@@ -472,8 +484,11 @@ nni_posix_resolv_sysfini(void) | |
nni_cv_wake(&resolv_cv); | ||
nni_mtx_unlock(&resolv_mtx); | ||
|
||
for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) { | ||
nni_thr_fini(&resolv_thrs[i]); | ||
if (resolv_thrs != NULL) { | ||
for (int i = 0; i < resolv_num_thr; i++) { | ||
nni_thr_fini(&resolv_thrs[i]); | ||
} | ||
nni_free(resolv_thrs, sizeof (nni_thr *) * resolv_num_thr); | ||
} | ||
} | ||
|
||
|
Oops, something went wrong.