forked from envoyproxy/envoy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.cc
127 lines (105 loc) · 4.63 KB
/
server.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#include "test/integration/server.h"
#include <string>
#include "envoy/http/header_map.h"
#include "common/filesystem/filesystem_impl.h"
#include "common/local_info/local_info_impl.h"
#include "common/network/utility.h"
#include "common/stats/thread_local_store.h"
#include "common/thread_local/thread_local_impl.h"
#include "server/hot_restart_nop_impl.h"
#include "test/integration/integration.h"
#include "test/integration/utility.h"
#include "test/mocks/runtime/mocks.h"
#include "test/mocks/server/mocks.h"
#include "test/test_common/environment.h"
#include "gtest/gtest.h"
namespace Envoy {
IntegrationTestServerPtr
IntegrationTestServer::create(const std::string& config_path,
const Network::Address::IpVersion version,
std::function<void()> pre_worker_start_test_steps, bool deterministic,
Event::TestTimeSystem& time_system) {
IntegrationTestServerPtr server{new IntegrationTestServer(time_system, config_path)};
server->start(version, pre_worker_start_test_steps, deterministic);
return server;
}
void IntegrationTestServer::start(const Network::Address::IpVersion version,
std::function<void()> pre_worker_start_test_steps,
bool deterministic) {
ENVOY_LOG(info, "starting integration test server");
ASSERT(!thread_);
thread_.reset(new Thread::Thread(
[version, deterministic, this]() -> void { threadRoutine(version, deterministic); }));
// If any steps need to be done prior to workers starting, do them now. E.g., xDS pre-init.
if (pre_worker_start_test_steps != nullptr) {
pre_worker_start_test_steps();
}
// Wait for the server to be created and the number of initial listeners to wait for to be set.
server_set_.waitReady();
// Now wait for the initial listeners to actually be listening on the worker. At this point
// the server is up and ready for testing.
Thread::LockGuard guard(listeners_mutex_);
while (pending_listeners_ != 0) {
listeners_cv_.wait(listeners_mutex_); // Safe since CondVar::wait won't throw.
}
ENVOY_LOG(info, "listener wait complete");
}
IntegrationTestServer::~IntegrationTestServer() {
ENVOY_LOG(info, "stopping integration test server");
if (admin_address_ != nullptr) {
BufferingStreamDecoderPtr response = IntegrationUtil::makeSingleRequest(
admin_address_, "POST", "/quitquitquit", "", Http::CodecClient::Type::HTTP1);
EXPECT_TRUE(response->complete());
EXPECT_STREQ("200", response->headers().Status()->value().c_str());
}
thread_->join();
}
void IntegrationTestServer::onWorkerListenerAdded() {
if (on_worker_listener_added_cb_) {
on_worker_listener_added_cb_();
}
Thread::LockGuard guard(listeners_mutex_);
if (pending_listeners_ > 0) {
pending_listeners_--;
listeners_cv_.notifyOne();
}
}
void IntegrationTestServer::onWorkerListenerRemoved() {
if (on_worker_listener_removed_cb_) {
on_worker_listener_removed_cb_();
}
}
void IntegrationTestServer::threadRoutine(const Network::Address::IpVersion version,
bool deterministic) {
Server::TestOptionsImpl options(config_path_, version);
Server::HotRestartNopImpl restarter;
Thread::MutexBasicLockable lock;
ThreadLocal::InstanceImpl tls;
Stats::HeapStatDataAllocator stats_allocator;
Stats::StatsOptionsImpl stats_options;
Stats::ThreadLocalStoreImpl stats_store(stats_options, stats_allocator);
stat_store_ = &stats_store;
Runtime::RandomGeneratorPtr random_generator;
if (deterministic) {
random_generator = std::make_unique<testing::NiceMock<Runtime::MockRandomGenerator>>();
} else {
random_generator = std::make_unique<Runtime::RandomGeneratorImpl>();
}
server_.reset(new Server::InstanceImpl(
options, time_system_, Network::Utility::getLocalAddress(version), *this, restarter,
stats_store, lock, *this, std::move(random_generator), tls));
pending_listeners_ = server_->listenerManager().listeners().size();
ENVOY_LOG(info, "waiting for {} test server listeners", pending_listeners_);
// This is technically thread unsafe (assigning to a shared_ptr accessed
// across threads), but because we synchronize below on server_set, the only
// consumer on the main test thread in ~IntegrationTestServer will not race.
admin_address_ = server_->admin().socket().localAddress();
server_set_.setReady();
server_->run();
server_.reset();
stat_store_ = nullptr;
}
Server::TestOptionsImpl Server::TestOptionsImpl::asConfigYaml() {
return TestOptionsImpl("", Filesystem::fileReadToEnd(config_path_), local_address_ip_version_);
}
} // namespace Envoy