forked from envoyproxy/envoy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
active_tcp_socket.cc
152 lines (134 loc) · 5.7 KB
/
active_tcp_socket.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#include "source/server/active_tcp_socket.h"
#include "envoy/network/filter.h"
#include "source/common/stream_info/stream_info_impl.h"
#include "source/server/active_stream_listener_base.h"
namespace Envoy {
namespace Server {
ActiveTcpSocket::ActiveTcpSocket(ActiveStreamListenerBase& listener,
Network::ConnectionSocketPtr&& socket,
bool hand_off_restored_destination_connections)
: listener_(listener), socket_(std::move(socket)),
hand_off_restored_destination_connections_(hand_off_restored_destination_connections),
iter_(accept_filters_.end()),
stream_info_(std::make_unique<StreamInfo::StreamInfoImpl>(
listener_.dispatcher().timeSource(), socket_->connectionInfoProviderSharedPtr(),
StreamInfo::FilterState::LifeSpan::Connection)) {
listener_.stats_.downstream_pre_cx_active_.inc();
}
ActiveTcpSocket::~ActiveTcpSocket() {
accept_filters_.clear();
listener_.stats_.downstream_pre_cx_active_.dec();
// If the underlying socket is no longer attached, it means that it has been transferred to
// an active connection. In this case, the active connection will decrement the number
// of listener connections.
// TODO(mattklein123): In general the way we account for the number of listener connections
// is incredibly fragile. Revisit this by potentially merging ActiveTcpSocket and
// ActiveTcpConnection, having a shared object which does accounting (but would require
// another allocation, etc.).
if (socket_ != nullptr) {
listener_.decNumConnections();
}
}
Event::Dispatcher& ActiveTcpSocket::dispatcher() { return listener_.dispatcher(); }
void ActiveTcpSocket::onTimeout() {
listener_.stats_.downstream_pre_cx_timeout_.inc();
ASSERT(inserted());
ENVOY_LOG(debug, "listener filter times out after {} ms",
listener_.listener_filters_timeout_.count());
if (listener_.continue_on_listener_filters_timeout_) {
ENVOY_LOG(debug, "fallback to default listener filter");
newConnection();
}
unlink();
}
void ActiveTcpSocket::startTimer() {
if (listener_.listener_filters_timeout_.count() > 0) {
timer_ = listener_.dispatcher().createTimer([this]() -> void { onTimeout(); });
timer_->enableTimer(listener_.listener_filters_timeout_);
}
}
void ActiveTcpSocket::unlink() {
auto removed = listener_.removeSocket(std::move(*this));
if (removed->timer_ != nullptr) {
removed->timer_->disableTimer();
}
// Emit logs if a connection is not established.
if (!connected_ && stream_info_ != nullptr) {
ActiveStreamListenerBase::emitLogs(*listener_.config_, *stream_info_);
}
listener_.dispatcher().deferredDelete(std::move(removed));
}
void ActiveTcpSocket::continueFilterChain(bool success) {
if (success) {
bool no_error = true;
if (iter_ == accept_filters_.end()) {
iter_ = accept_filters_.begin();
} else {
iter_ = std::next(iter_);
}
for (; iter_ != accept_filters_.end(); iter_++) {
Network::FilterStatus status = (*iter_)->onAccept(*this);
if (status == Network::FilterStatus::StopIteration) {
// The filter is responsible for calling us again at a later time to continue the filter
// chain from the next filter.
if (!socket().ioHandle().isOpen()) {
// break the loop but should not create new connection
no_error = false;
break;
} else {
// Blocking at the filter but no error
return;
}
}
}
// Successfully ran all the accept filters.
if (no_error) {
newConnection();
} else {
// Signal the caller that no extra filter chain iteration is needed.
iter_ = accept_filters_.end();
}
}
// Filter execution concluded, unlink and delete this ActiveTcpSocket if it was linked.
if (inserted()) {
unlink();
}
}
void ActiveTcpSocket::setDynamicMetadata(const std::string& name,
const ProtobufWkt::Struct& value) {
stream_info_->setDynamicMetadata(name, value);
}
void ActiveTcpSocket::newConnection() {
connected_ = true;
// Check if the socket may need to be redirected to another listener.
Network::BalancedConnectionHandlerOptRef new_listener;
if (hand_off_restored_destination_connections_ &&
socket_->connectionInfoProvider().localAddressRestored()) {
// Find a listener associated with the original destination address.
new_listener =
listener_.getBalancedHandlerByAddress(*socket_->connectionInfoProvider().localAddress());
}
if (new_listener.has_value()) {
// Hands off connections redirected by iptables to the listener associated with the
// original destination address. Pass 'hand_off_restored_destination_connections' as false to
// prevent further redirection.
// Leave the new listener to decide whether to execute re-balance.
// Note also that we must account for the number of connections properly across both listeners.
// TODO(mattklein123): See note in ~ActiveTcpSocket() related to making this accounting better.
listener_.decNumConnections();
new_listener.value().get().onAcceptWorker(std::move(socket_), false, false);
} else {
// Set default transport protocol if none of the listener filters did it.
if (socket_->detectedTransportProtocol().empty()) {
socket_->setDetectedTransportProtocol("raw_buffer");
}
// Reset the file events which are registered by listener filter.
// reference https://github.com/envoyproxy/envoy/issues/8925.
socket_->ioHandle().resetFileEvents();
accept_filters_.clear();
// Create a new connection on this listener.
listener_.newConnection(std::move(socket_), std::move(stream_info_));
}
}
} // namespace Server
} // namespace Envoy