forked from envoyproxy/envoy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.h
320 lines (275 loc) · 11.6 KB
/
server.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
#pragma once
#include <chrono>
#include <cstdint>
#include <list>
#include <memory>
#include <string>
#include "envoy/server/options.h"
#include "envoy/stats/stats.h"
#include "common/common/assert.h"
#include "common/common/lock_guard.h"
#include "common/common/logger.h"
#include "common/common/thread.h"
#include "common/stats/source_impl.h"
#include "server/server.h"
#include "server/test_hooks.h"
#include "test/integration/server_stats.h"
#include "test/test_common/test_time_system.h"
#include "test/test_common/utility.h"
namespace Envoy {
namespace Server {
/**
* Integration test options.
*/
class TestOptionsImpl : public Options {
public:
TestOptionsImpl(const std::string& config_path, Network::Address::IpVersion ip_version)
: config_path_(config_path), local_address_ip_version_(ip_version),
service_cluster_name_("cluster_name"), service_node_name_("node_name"),
service_zone_("zone_name") {}
TestOptionsImpl(const std::string& config_path, const std::string& config_yaml,
Network::Address::IpVersion ip_version)
: config_path_(config_path), config_yaml_(config_yaml), local_address_ip_version_(ip_version),
service_cluster_name_("cluster_name"), service_node_name_("node_name"),
service_zone_("zone_name") {}
// Server::Options
uint64_t baseId() const override { return 0; }
uint32_t concurrency() const override { return 1; }
const std::string& configPath() const override { return config_path_; }
const std::string& configYaml() const override { return config_yaml_; }
bool v2ConfigOnly() const override { return false; }
const std::string& adminAddressPath() const override { return admin_address_path_; }
Network::Address::IpVersion localAddressIpVersion() const override {
return local_address_ip_version_;
}
std::chrono::seconds drainTime() const override { return std::chrono::seconds(1); }
spdlog::level::level_enum logLevel() const override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
const std::vector<std::pair<std::string, spdlog::level::level_enum>>&
componentLogLevels() const override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}
const std::string& logFormat() const override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
std::chrono::seconds parentShutdownTime() const override { return std::chrono::seconds(2); }
const std::string& logPath() const override { return log_path_; }
uint64_t restartEpoch() const override { return 0; }
std::chrono::milliseconds fileFlushIntervalMsec() const override {
return std::chrono::milliseconds(50);
}
Mode mode() const override { return Mode::Serve; }
const std::string& serviceClusterName() const override { return service_cluster_name_; }
const std::string& serviceNodeName() const override { return service_node_name_; }
const std::string& serviceZone() const override { return service_zone_; }
uint64_t maxStats() const override { return 16384; }
const Stats::StatsOptions& statsOptions() const override { return stats_options_; }
bool hotRestartDisabled() const override { return false; }
// asConfigYaml returns a new config that empties the configPath() and populates configYaml()
Server::TestOptionsImpl asConfigYaml();
private:
const std::string config_path_;
const std::string config_yaml_;
const std::string admin_address_path_;
const Network::Address::IpVersion local_address_ip_version_;
const std::string service_cluster_name_;
const std::string service_node_name_;
const std::string service_zone_;
Stats::StatsOptionsImpl stats_options_;
const std::string log_path_;
};
class TestDrainManager : public DrainManager {
public:
// Server::DrainManager
bool drainClose() const override { return draining_; }
void startDrainSequence(std::function<void()>) override {}
void startParentShutdownSequence() override {}
bool draining_{};
};
class TestComponentFactory : public ComponentFactory {
public:
Server::DrainManagerPtr createDrainManager(Server::Instance&) override {
return Server::DrainManagerPtr{new Server::TestDrainManager()};
}
Runtime::LoaderPtr createRuntime(Server::Instance& server,
Server::Configuration::Initial& config) override {
return Server::InstanceUtil::createRuntime(server, config);
}
};
} // namespace Server
namespace Stats {
/**
* This is a wrapper for Scopes for the TestIsolatedStoreImpl to ensure new scopes do
* not interact with the store without grabbing the lock from TestIsolatedStoreImpl.
*/
class TestScopeWrapper : public Scope {
public:
TestScopeWrapper(Thread::MutexBasicLockable& lock, ScopePtr wrapped_scope)
: lock_(lock), wrapped_scope_(std::move(wrapped_scope)) {}
ScopePtr createScope(const std::string& name) override {
Thread::LockGuard lock(lock_);
return ScopePtr{new TestScopeWrapper(lock_, wrapped_scope_->createScope(name))};
}
void deliverHistogramToSinks(const Histogram& histogram, uint64_t value) override {
Thread::LockGuard lock(lock_);
wrapped_scope_->deliverHistogramToSinks(histogram, value);
}
Counter& counter(const std::string& name) override {
Thread::LockGuard lock(lock_);
return wrapped_scope_->counter(name);
}
Gauge& gauge(const std::string& name) override {
Thread::LockGuard lock(lock_);
return wrapped_scope_->gauge(name);
}
Histogram& histogram(const std::string& name) override {
Thread::LockGuard lock(lock_);
return wrapped_scope_->histogram(name);
}
const StatsOptions& statsOptions() const override { return stats_options_; }
private:
Thread::MutexBasicLockable& lock_;
ScopePtr wrapped_scope_;
StatsOptionsImpl stats_options_;
};
/**
* This is a variant of the isolated store that has locking across all operations so that it can
* be used during the integration tests.
*/
class TestIsolatedStoreImpl : public StoreRoot {
public:
TestIsolatedStoreImpl() : source_(*this) {}
// Stats::Scope
Counter& counter(const std::string& name) override {
Thread::LockGuard lock(lock_);
return store_.counter(name);
}
ScopePtr createScope(const std::string& name) override {
Thread::LockGuard lock(lock_);
return ScopePtr{new TestScopeWrapper(lock_, store_.createScope(name))};
}
void deliverHistogramToSinks(const Histogram&, uint64_t) override {}
Gauge& gauge(const std::string& name) override {
Thread::LockGuard lock(lock_);
return store_.gauge(name);
}
Histogram& histogram(const std::string& name) override {
Thread::LockGuard lock(lock_);
return store_.histogram(name);
}
const StatsOptions& statsOptions() const override { return stats_options_; }
// Stats::Store
std::vector<CounterSharedPtr> counters() const override {
Thread::LockGuard lock(lock_);
return store_.counters();
}
std::vector<GaugeSharedPtr> gauges() const override {
Thread::LockGuard lock(lock_);
return store_.gauges();
}
std::vector<ParentHistogramSharedPtr> histograms() const override {
Thread::LockGuard lock(lock_);
return store_.histograms();
}
// Stats::StoreRoot
void addSink(Sink&) override {}
void setTagProducer(TagProducerPtr&&) override {}
void setStatsMatcher(StatsMatcherPtr&&) override {}
void initializeThreading(Event::Dispatcher&, ThreadLocal::Instance&) override {}
void shutdownThreading() override {}
void mergeHistograms(PostMergeCb) override {}
Source& source() override { return source_; }
private:
mutable Thread::MutexBasicLockable lock_;
IsolatedStoreImpl store_;
SourceImpl source_;
StatsOptionsImpl stats_options_;
};
} // namespace Stats
class IntegrationTestServer;
typedef std::unique_ptr<IntegrationTestServer> IntegrationTestServerPtr;
/**
* Wrapper for running the real server for the purpose of integration tests.
*/
class IntegrationTestServer : Logger::Loggable<Logger::Id::testing>,
public TestHooks,
public IntegrationTestServerStats,
public Server::ComponentFactory {
public:
static IntegrationTestServerPtr create(const std::string& config_path,
const Network::Address::IpVersion version,
std::function<void()> pre_worker_start_test_steps,
bool deterministic, Event::TestTimeSystem& time_system);
~IntegrationTestServer();
Server::TestDrainManager& drainManager() { return *drain_manager_; }
Server::InstanceImpl& server() {
RELEASE_ASSERT(server_ != nullptr, "");
return *server_;
}
void setOnWorkerListenerAddedCb(std::function<void()> on_worker_listener_added) {
on_worker_listener_added_cb_ = on_worker_listener_added;
}
void setOnWorkerListenerRemovedCb(std::function<void()> on_worker_listener_removed) {
on_worker_listener_removed_cb_ = on_worker_listener_removed;
}
void start(const Network::Address::IpVersion version,
std::function<void()> pre_worker_start_test_steps, bool deterministic);
void waitForCounterGe(const std::string& name, uint64_t value) override {
while (counter(name) == nullptr || counter(name)->value() < value) {
time_system_.sleep(std::chrono::milliseconds(10));
}
}
void waitForGaugeGe(const std::string& name, uint64_t value) override {
while (gauge(name) == nullptr || gauge(name)->value() < value) {
time_system_.sleep(std::chrono::milliseconds(10));
}
}
void waitForGaugeEq(const std::string& name, uint64_t value) override {
while (gauge(name) == nullptr || gauge(name)->value() != value) {
time_system_.sleep(std::chrono::milliseconds(10));
}
}
Stats::CounterSharedPtr counter(const std::string& name) override {
// When using the thread local store, only counters() is thread safe. This also allows us
// to test if a counter exists at all versus just defaulting to zero.
return TestUtility::findCounter(*stat_store_, name);
}
Stats::GaugeSharedPtr gauge(const std::string& name) override {
// When using the thread local store, only gauges() is thread safe. This also allows us
// to test if a counter exists at all versus just defaulting to zero.
return TestUtility::findGauge(*stat_store_, name);
}
std::vector<Stats::CounterSharedPtr> counters() override { return stat_store_->counters(); }
std::vector<Stats::GaugeSharedPtr> gauges() override { return stat_store_->gauges(); }
// TestHooks
void onWorkerListenerAdded() override;
void onWorkerListenerRemoved() override;
// Server::ComponentFactory
Server::DrainManagerPtr createDrainManager(Server::Instance&) override {
drain_manager_ = new Server::TestDrainManager();
return Server::DrainManagerPtr{drain_manager_};
}
Runtime::LoaderPtr createRuntime(Server::Instance& server,
Server::Configuration::Initial& config) override {
return Server::InstanceUtil::createRuntime(server, config);
}
protected:
IntegrationTestServer(Event::TestTimeSystem& time_system, const std::string& config_path)
: time_system_(time_system), config_path_(config_path) {}
private:
/**
* Runs the real server on a thread.
*/
void threadRoutine(const Network::Address::IpVersion version, bool deterministic);
Event::TestTimeSystem& time_system_;
const std::string config_path_;
Thread::ThreadPtr thread_;
Thread::CondVar listeners_cv_;
Thread::MutexBasicLockable listeners_mutex_;
uint64_t pending_listeners_;
ConditionalInitializer server_set_;
std::unique_ptr<Server::InstanceImpl> server_;
Server::TestDrainManager* drain_manager_{};
Stats::Store* stat_store_{};
std::function<void()> on_worker_listener_added_cb_;
std::function<void()> on_worker_listener_removed_cb_;
Network::Address::InstanceConstSharedPtr admin_address_;
};
} // namespace Envoy