From 3734a884fcaf984d179113a57b15aa6cb38f9c33 Mon Sep 17 00:00:00 2001 From: Jameson Lopp Date: Sun, 24 Dec 2017 15:03:47 +0100 Subject: [PATCH 1/2] add batching support to statsd client --- src/statsd_client.cpp | 55 +++++++++++++++++++++++++++++++++++++------ src/statsd_client.h | 19 +++++++++++---- 2 files changed, 63 insertions(+), 11 deletions(-) diff --git a/src/statsd_client.cpp b/src/statsd_client.cpp index 803bfdd6e68d3..39d8dc43992f0 100644 --- a/src/statsd_client.cpp +++ b/src/statsd_client.cpp @@ -35,11 +35,9 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include #include #include -#include #include #include "statsd_client.h" #include "util.h" -#include using namespace std; namespace statsd { @@ -74,16 +72,41 @@ struct _StatsdClientData { char errmsg[1024]; }; -StatsdClient::StatsdClient(const string& host, int port, const string& ns) +StatsdClient::StatsdClient(const string& host, + int port, + const string& ns) { d = new _StatsdClientData; d->sock = -1; config(host, port, ns); srandom(time(NULL)); + + pthread_spin_init(&batching_spin_lock_, PTHREAD_PROCESS_PRIVATE); + batching_thread_ = std::thread([this] { + while (!exit_) { + std::deque staged_message_queue; + + pthread_spin_lock(&batching_spin_lock_); + batching_message_queue_.swap(staged_message_queue); + pthread_spin_unlock(&batching_spin_lock_); + + while(!staged_message_queue.empty()) { + send_to_daemon(staged_message_queue.front()); + staged_message_queue.pop_front(); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + }); } StatsdClient::~StatsdClient() { + exit_ = true; + batching_thread_.join(); + pthread_spin_destroy(&batching_spin_lock_); + + // close socket if (d->sock >= 0) { close(d->sock); @@ -130,8 +153,10 @@ int StatsdClient::init() ret = getaddrinfo(d->host.c_str(), NULL, &hints, &result); if ( ret ) { + close(d->sock); + d->sock = -1; snprintf(d->errmsg, sizeof(d->errmsg), - "getaddrinfo fail, error=%d, msg=%s", ret, gai_strerror(ret) ); + "getaddrinfo fail, error=%d, msg=%s", ret, gai_strerror(ret) ); return -2; } struct sockaddr_in* host_addr = (struct sockaddr_in*)result->ai_addr; @@ -204,12 +229,12 @@ int StatsdClient::send(string key, size_t value, const string &type, float sampl if ( fequal( sample_rate, 1.0 ) ) { snprintf(buf, sizeof(buf), "%s%s:%zd|%s", - d->ns.c_str(), key.c_str(), value, type.c_str()); + d->ns.c_str(), key.c_str(), value, type.c_str()); } else { snprintf(buf, sizeof(buf), "%s%s:%zd|%s|@%.2f", - d->ns.c_str(), key.c_str(), value, type.c_str(), sample_rate); + d->ns.c_str(), key.c_str(), value, type.c_str(), sample_rate); } return send(buf); @@ -244,6 +269,20 @@ int StatsdClient::sendDouble(string key, double value, const string &type, float int StatsdClient::send(const string &message) { + pthread_spin_lock(&batching_spin_lock_); + if (batching_message_queue_.empty() || + batching_message_queue_.back().length() > max_batching_size) { + batching_message_queue_.push_back(message); + } else { + (*batching_message_queue_.rbegin()).append("\n").append(message); + } + pthread_spin_unlock(&batching_spin_lock_); + + return 0; +} + + +int StatsdClient::send_to_daemon(const string &message) { int ret = init(); if ( ret ) { @@ -252,9 +291,10 @@ int StatsdClient::send(const string &message) ret = sendto(d->sock, message.data(), message.size(), 0, (struct sockaddr *) &d->server, sizeof(d->server)); if ( ret == -1) { snprintf(d->errmsg, sizeof(d->errmsg), - "sendto server fail, host=%s:%d, err=%m", d->host.c_str(), d->port); + "sendto server fail, host=%s:%d, err=%m", d->host.c_str(), d->port); return -1; } + return 0; } @@ -265,3 +305,4 @@ const char* StatsdClient::errmsg() } + diff --git a/src/statsd_client.h b/src/statsd_client.h index c748267df1eea..01f27791076cf 100644 --- a/src/statsd_client.h +++ b/src/statsd_client.h @@ -5,7 +5,11 @@ #include #include #include +#include #include +#include +#include +#include namespace statsd { @@ -20,6 +24,7 @@ class StatsdClient { // you can config at anytime; client will use new address (useful for Singleton) void config(const std::string& host, int port, const std::string& ns = ""); const char* errmsg(); + int send_to_daemon(const std::string &); public: int inc(const std::string& key, float sample_rate = 1.0); @@ -40,18 +45,24 @@ class StatsdClient { * type = "c", "g" or "ms" */ int send(std::string key, size_t value, - const std::string& type, float sample_rate); - int sendDouble(std::string key, double value, - const std::string& type, float sample_rate); + const std::string& type, float sample_rate); + int sendDouble(std::string key, double value, + const std::string& type, float sample_rate); protected: int init(); void cleanup(std::string& key); protected: struct _StatsdClientData* d; + + bool exit_; + pthread_spinlock_t batching_spin_lock_; + std::thread batching_thread_; + std::deque batching_message_queue_; + const uint64_t max_batching_size = 32768; }; }; // end namespace -#endif +#endif \ No newline at end of file From abede32275b0044e9c1a7366add303398331c73e Mon Sep 17 00:00:00 2001 From: Jameson Lopp Date: Sun, 24 Dec 2017 15:03:47 +0100 Subject: [PATCH 2/2] add batching support to statsd client --- src/statsd_client.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/statsd_client.cpp b/src/statsd_client.cpp index 39d8dc43992f0..656f2b3a8656d 100644 --- a/src/statsd_client.cpp +++ b/src/statsd_client.cpp @@ -80,6 +80,7 @@ StatsdClient::StatsdClient(const string& host, d->sock = -1; config(host, port, ns); srandom(time(NULL)); + exit_ = false; pthread_spin_init(&batching_spin_lock_, PTHREAD_PROCESS_PRIVATE); batching_thread_ = std::thread([this] {