Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stats): split off transmission to RawSender, implement batching and queueing support, add streamlined prefix and suffix support #6267

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from
Open
1 change: 0 additions & 1 deletion contrib/devtools/copyright_header.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
'src/crypto/*',
'src/ctpl_stl.h',
'src/reverse_iterator.h',
'src/statsd_client.cpp',
'src/test/fuzz/FuzzedDataProvider.h',
'src/tinyformat.h',
'src/bench/nanobench.h',
Expand Down
25 changes: 25 additions & 0 deletions doc/release-notes-6267.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
Statistics
-----------

### New Features

- The Statsd client now supports queueing and batching messages, reducing the number of packets and the rate at which
they are sent to the Statsd daemon.

- The maximum size of each batch of messages (default, 1KiB) can be adjusted using `-statsbatchsize` (in bytes)
and the frequency at which queued messages are sent to the daemon (default, 1 second) can be adjusted using
`-statsduration` (in milliseconds)
- `-statsduration` has no bearing on `-statsperiod`, which dictates how frequently some stats are _collected_.

### Deprecations

- `-statsenabled` has been deprecated and enablement will now be implied by the presence of `-statshost`. `-statsenabled`
will be removed in a future release.

- `-statshostname` has been deprecated and replaced with `-statssuffix` as the latter is better representative of the
argument's purpose. They behave identically to each other. `-statshostname` will be removed in a future
release.

- `-statsns` has been deprecated and replaced with `-statsprefix` as the latter is better representative of the
argument's purpose. `-statsprefix`, unlike `-statsns`, will enforce the usage of a delimiter between the prefix
and key. `-statsns` will be removed in a future release.
6 changes: 4 additions & 2 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ BITCOIN_CORE_H = \
spork.h \
stacktraces.h \
streams.h \
statsd_client.h \
stats/client.h \
stats/rawsender.h \
support/allocators/mt_pooled_secure.h \
support/allocators/pool.h \
support/allocators/pooled_secure.h \
Expand Down Expand Up @@ -526,7 +527,8 @@ libbitcoin_server_a_SOURCES = \
script/sigcache.cpp \
shutdown.cpp \
spork.cpp \
statsd_client.cpp \
stats/client.cpp \
stats/rawsender.cpp \
timedata.cpp \
torcontrol.cpp \
txdb.cpp \
Expand Down
22 changes: 11 additions & 11 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
#include <llmq/snapshot.h>
#include <llmq/signing_shares.h>

#include <statsd_client.h>
#include <stats/client.h>

#include <algorithm>
#include <condition_variable>
Expand Down Expand Up @@ -771,12 +771,16 @@ void SetupServerArgs(ArgsManager& argsman)
argsman.AddArg("-rpcworkqueue=<n>", strprintf("Set the depth of the work queue to service RPC calls (default: %d)", DEFAULT_HTTP_WORKQUEUE), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::RPC);
argsman.AddArg("-server", "Accept command line and JSON-RPC commands", ArgsManager::ALLOW_ANY, OptionsCategory::RPC);

argsman.AddArg("-statsenabled", strprintf("Publish internal stats to statsd (default: %u)", DEFAULT_STATSD_ENABLE), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
hidden_args.emplace_back("-statsenabled");
argsman.AddArg("-statsbatchsize=<bytes>", strprintf("Specify the size of each batch of stats messages (default: %d)", DEFAULT_STATSD_BATCH_SIZE), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
argsman.AddArg("-statsduration=<ms>", strprintf("Specify the number of milliseconds between stats messages (default: %d)", DEFAULT_STATSD_DURATION), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
argsman.AddArg("-statshost=<ip>", strprintf("Specify statsd host (default: %s)", DEFAULT_STATSD_HOST), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
argsman.AddArg("-statshostname=<ip>", strprintf("Specify statsd host name (default: %s)", DEFAULT_STATSD_HOSTNAME), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
hidden_args.emplace_back("-statshostname");
argsman.AddArg("-statsport=<port>", strprintf("Specify statsd port (default: %u)", DEFAULT_STATSD_PORT), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
argsman.AddArg("-statsns=<ns>", strprintf("Specify additional namespace prefix (default: %s)", DEFAULT_STATSD_NAMESPACE), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
hidden_args.emplace_back("-statsns");
argsman.AddArg("-statsperiod=<seconds>", strprintf("Specify the number of seconds between periodic measurements (default: %d)", DEFAULT_STATSD_PERIOD), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
argsman.AddArg("-statsprefix=<string>", strprintf("Specify an optional string prepended to every stats key (default: %s)", DEFAULT_STATSD_PREFIX), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
argsman.AddArg("-statssuffix=<string>", strprintf("Specify an optional string appended to every stats key (default: %s)", DEFAULT_STATSD_SUFFIX), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
#if HAVE_DECL_FORK
argsman.AddArg("-daemon", strprintf("Run in the background as a daemon and accept commands (default: %d)", DEFAULT_DAEMON), ArgsManager::ALLOW_BOOL, OptionsCategory::OPTIONS);
argsman.AddArg("-daemonwait", strprintf("Wait for initialization to be finished before exiting. This implies -daemon (default: %d)", DEFAULT_DAEMONWAIT), ArgsManager::ALLOW_BOOL, OptionsCategory::OPTIONS);
Expand Down Expand Up @@ -836,7 +840,7 @@ static void StartupNotify(const ArgsManager& args)

static void PeriodicStats(ArgsManager& args, ChainstateManager& chainman, const CTxMemPool& mempool)
{
assert(args.GetBoolArg("-statsenabled", DEFAULT_STATSD_ENABLE));
assert(::g_stats_client->active());
CCoinsStats stats{CoinStatsHashType::NONE};
chainman.ActiveChainstate().ForceFlushStateToDisk();
if (WITH_LOCK(cs_main, return GetUTXOStats(&chainman.ActiveChainstate().CoinsDB(), std::ref(chainman.m_blockman), stats, RpcInterruptionPoint, chainman.ActiveChain().Tip()))) {
Expand Down Expand Up @@ -1539,11 +1543,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
// We need to initialize g_stats_client early as currently, g_stats_client is called
// regardless of whether transmitting stats are desirable or not and if
// g_stats_client isn't present when that attempt is made, the client will crash.
::g_stats_client = std::make_unique<statsd::StatsdClient>(args.GetArg("-statshost", DEFAULT_STATSD_HOST),
args.GetArg("-statshostname", DEFAULT_STATSD_HOSTNAME),
args.GetArg("-statsport", DEFAULT_STATSD_PORT),
args.GetArg("-statsns", DEFAULT_STATSD_NAMESPACE),
args.GetBoolArg("-statsenabled", DEFAULT_STATSD_ENABLE));
::g_stats_client = InitStatsClient(args);

{

Expand Down Expand Up @@ -2274,7 +2274,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
#endif // ENABLE_WALLET
}

if (args.GetBoolArg("-statsenabled", DEFAULT_STATSD_ENABLE)) {
if (::g_stats_client->active()) {
int nStatsPeriod = std::min(std::max((int)args.GetArg("-statsperiod", DEFAULT_STATSD_PERIOD), MIN_STATSD_PERIOD), MAX_STATSD_PERIOD);
node.scheduler->scheduleEvery(std::bind(&PeriodicStats, std::ref(*node.args), std::ref(chainman), std::cref(*node.mempool)), std::chrono::seconds{nStatsPeriod});
}
Expand Down
6 changes: 3 additions & 3 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
#include <coinjoin/coinjoin.h>
#include <evo/deterministicmns.h>

#include <statsd_client.h>
#include <stats/client.h>

#ifdef WIN32
#include <string.h>
Expand Down Expand Up @@ -1688,7 +1688,7 @@ void CConnman::NotifyNumConnectionsChanged(CMasternodeSync& mn_sync)

void CConnman::CalculateNumConnectionsChangedStats()
{
if (!gArgs.GetBoolArg("-statsenabled", DEFAULT_STATSD_ENABLE)) {
if (!::g_stats_client->active()) {
return;
}

Expand Down Expand Up @@ -4145,7 +4145,7 @@ void CConnman::RecordBytesRecv(uint64_t bytes)
{
nTotalBytesRecv += bytes;
::g_stats_client->count("bandwidth.bytesReceived", bytes, 0.1f);
::g_stats_client->gauge("bandwidth.totalBytesReceived", nTotalBytesRecv, 0.01f);
::g_stats_client->gauge("bandwidth.totalBytesReceived", nTotalBytesRecv.load(), 0.01f);
}

void CConnman::RecordBytesSent(uint64_t bytes)
Expand Down
2 changes: 1 addition & 1 deletion src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
#include <llmq/signing_shares.h>
#include <llmq/snapshot.h>

#include <statsd_client.h>
#include <stats/client.h>

/** Maximum number of in-flight objects from a peer */
static constexpr int32_t MAX_PEER_OBJECT_IN_FLIGHT = 100;
Expand Down
179 changes: 179 additions & 0 deletions src/stats/client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright (c) 2014-2017 Statoshi Developers
// Copyright (c) 2017-2023 Vincent Thiery
// Copyright (c) 2020-2024 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

#include <stats/client.h>

#include <util/system.h>

#include <cmath>
#include <cstdio>
#include <random>

namespace {
/** Threshold below which a value is considered effectively zero */
static constexpr float EPSILON{0.0001f};

/** Delimiter segmenting two fully formed Statsd messages */
static constexpr char STATSD_MSG_DELIMITER{'\n'};
/** Delimiter segmenting namespaces in a Statsd key */
static constexpr char STATSD_NS_DELIMITER{'.'};
/** Character used to denote Statsd message type as count */
static constexpr char STATSD_METRIC_COUNT[]{"c"};
/** Character used to denote Statsd message type as gauge */
static constexpr char STATSD_METRIC_GAUGE[]{"g"};
/** Characters used to denote Statsd message type as timing */
static constexpr char STATSD_METRIC_TIMING[]{"ms"};
} // anonymous namespace

std::unique_ptr<StatsdClient> g_stats_client;

std::unique_ptr<StatsdClient> InitStatsClient(const ArgsManager& args)
{
auto is_enabled = args.GetBoolArg("-statsenabled", /*fDefault=*/false);
auto host = args.GetArg("-statshost", /*fDefault=*/"");

if (is_enabled && host.empty()) {
// Stats are enabled but host has not been specified, then use
// default host. This is to preserve old behavior.
host = DEFAULT_STATSD_HOST;
} else if (!host.empty()) {
// Host is specified but stats are not explcitly enabled. Assume
// that if a host has been specified, we want stats enabled. This
// is new behaviour and will substitute old behaviour in a future
// release.
is_enabled = true;
}

auto sanitize_string = [](std::string& string) {
// Remove key delimiters from the front and back as they're added back
if (!string.empty()) {
if (string.front() == STATSD_NS_DELIMITER) string.erase(string.begin());
if (string.back() == STATSD_NS_DELIMITER) string.pop_back();
}
};

// Get our prefix and suffix and if we get nothing, try again with the
// deprecated argument. If we still get nothing, that's fine, they're optional.
auto prefix = args.GetArg("-statsprefix", DEFAULT_STATSD_PREFIX);
if (prefix.empty()) {
prefix = args.GetArg("-statsns", DEFAULT_STATSD_PREFIX);
} else {
// We restrict sanitization logic to our newly added arguments to
// prevent breaking changes.
sanitize_string(prefix);
// We need to add the delimiter here for backwards compatibility with
// the deprecated argument.
//
// TODO: Move this step into the constructor when removing deprecated
// args support
prefix += STATSD_NS_DELIMITER;
}

auto suffix = args.GetArg("-statssuffix", DEFAULT_STATSD_SUFFIX);
if (suffix.empty()) {
suffix = args.GetArg("-statshostname", DEFAULT_STATSD_SUFFIX);
} else {
// We restrict sanitization logic to our newly added arguments to
// prevent breaking changes.
sanitize_string(suffix);
}

return std::make_unique<StatsdClient>(
host,
args.GetArg("-statsport", DEFAULT_STATSD_PORT),
args.GetArg("-statsbatchsize", DEFAULT_STATSD_BATCH_SIZE),
args.GetArg("-statsduration", DEFAULT_STATSD_DURATION),
prefix,
suffix,
is_enabled
);
}

StatsdClient::StatsdClient(const std::string& host, uint16_t port, uint64_t batch_size, uint64_t interval_ms,
const std::string& prefix, const std::string& suffix, bool enabled) :
m_prefix{prefix},
m_suffix{[suffix]() { return !suffix.empty() ? STATSD_NS_DELIMITER + suffix : suffix; }()}
{
if (!enabled) {
LogPrintf("Transmitting stats are disabled, will not init StatsdClient\n");
return;
}

std::optional<std::string> error_opt;
m_sender = std::make_unique<RawSender>(host, port,
std::make_pair(batch_size, static_cast<uint8_t>(STATSD_MSG_DELIMITER)),
interval_ms, error_opt);
if (error_opt.has_value()) {
LogPrintf("ERROR: %s, cannot initialize StatsdClient.\n", error_opt.value());
m_sender.reset();
return;
}

LogPrintf("StatsdClient initialized to transmit stats to %s:%d\n", host, port);
}

bool StatsdClient::dec(const std::string& key, float sample_rate) { return count(key, -1, sample_rate); }

bool StatsdClient::inc(const std::string& key, float sample_rate) { return count(key, 1, sample_rate); }

bool StatsdClient::count(const std::string& key, int64_t delta, float sample_rate)
{
return send(key, delta, STATSD_METRIC_COUNT, sample_rate);
}

bool StatsdClient::gauge(const std::string& key, int64_t value, float sample_rate)
{
return send(key, value, STATSD_METRIC_GAUGE, sample_rate);
}

bool StatsdClient::gaugeDouble(const std::string& key, double value, float sample_rate)
{
return send(key, value, STATSD_METRIC_GAUGE, sample_rate);
}

bool StatsdClient::timing(const std::string& key, uint64_t ms, float sample_rate)
{
return send(key, ms, STATSD_METRIC_TIMING, sample_rate);
}

template <typename T1>
bool StatsdClient::send(const std::string& key, T1 value, const std::string& type, float sample_rate)
{
static_assert(std::is_arithmetic<T1>::value, "Must specialize to an arithmetic type");

if (!m_sender) {
return false;
}

// Determine if we should send the message at all but claim that we did even if we don't
sample_rate = std::clamp(sample_rate, 0.f, 1.f);
bool always_send = std::fabs(sample_rate - 1.f) < EPSILON;
bool never_send = std::fabs(sample_rate) < EPSILON;
if (never_send || (!always_send &&
WITH_LOCK(cs, return sample_rate < std::uniform_real_distribution<float>(0.f, 1.f)(insecure_rand)))) {
return true;
}

// Construct the message and if our message isn't always-send, report the sample rate
RawMessage msg{strprintf("%s%s%s:%f|%s", m_prefix, key, m_suffix, value, type)};
if (!always_send) {
msg += strprintf("|@%.2f", sample_rate);
}

// Send it and report an error if we encounter one
if (auto error_opt = m_sender->Send(msg); error_opt.has_value()) {
LogPrintf("ERROR: %s.\n", error_opt.value());
return false;
}

return true;
}

template bool StatsdClient::send(const std::string& key, double value, const std::string& type, float sample_rate);
template bool StatsdClient::send(const std::string& key, int32_t value, const std::string& type, float sample_rate);
template bool StatsdClient::send(const std::string& key, int64_t value, const std::string& type, float sample_rate);
template bool StatsdClient::send(const std::string& key, uint32_t value, const std::string& type, float sample_rate);
template bool StatsdClient::send(const std::string& key, uint64_t value, const std::string& type, float sample_rate);
Loading
Loading