-
Notifications
You must be signed in to change notification settings - Fork 0
/
SocketPool.cs
206 lines (187 loc) · 7.38 KB
/
SocketPool.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
//Copyright (c) 2007-2008 Henrik Schröder, Oliver Kofoed Pedersen
//Permission is hereby granted, free of charge, to any person
//obtaining a copy of this software and associated documentation
//files (the "Software"), to deal in the Software without
//restriction, including without limitation the rights to use,
//copy, modify, merge, publish, distribute, sublicense, and/or sell
//copies of the Software, and to permit persons to whom the
//Software is furnished to do so, subject to the following
//conditions:
//The above copyright notice and this permission notice shall be
//included in all copies or substantial portions of the Software.
//THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
//EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
//OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
//NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
//HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
//WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
//FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
//OTHER DEALINGS IN THE SOFTWARE.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Threading;
namespace BeIT.MemCached {
/// <summary>
/// The SocketPool encapsulates the list of PooledSockets against one specific host, and contains methods for
/// acquiring or returning PooledSockets.
/// </summary>
[DebuggerDisplay("[ Host: {Host} ]")]
internal class SocketPool {
private static LogAdapter logger = LogAdapter.GetLogger(typeof(SocketPool));
/// <summary>
/// If the host stops responding, we mark it as dead for this amount of seconds,
/// and we double this for each consecutive failed retry. If the host comes alive
/// again, we reset this to 1 again.
/// </summary>
private int deadEndPointSecondsUntilRetry = 1;
private const int maxDeadEndPointSecondsUntilRetry = 60*10; //10 minutes
private ServerPool owner;
private IPEndPoint endPoint;
private Queue<PooledSocket> queue;
//Debug variables and properties
private int newsockets = 0;
private int failednewsockets = 0;
private int reusedsockets = 0;
private int deadsocketsinpool = 0;
private int deadsocketsonreturn = 0;
private int dirtysocketsonreturn = 0;
private int acquired = 0;
public int NewSockets { get { return newsockets; } }
public int FailedNewSockets { get { return failednewsockets; } }
public int ReusedSockets { get { return reusedsockets; } }
public int DeadSocketsInPool { get { return deadsocketsinpool; } }
public int DeadSocketsOnReturn { get { return deadsocketsonreturn; } }
public int DirtySocketsOnReturn { get { return dirtysocketsonreturn; } }
public int Acquired { get { return acquired; } }
public int Poolsize { get { return queue.Count; } }
//Public variables and properties
public readonly string Host;
private bool isEndPointDead = false;
public bool IsEndPointDead { get { return isEndPointDead; } }
private DateTime deadEndPointRetryTime;
public DateTime DeadEndPointRetryTime { get { return deadEndPointRetryTime; } }
internal SocketPool(ServerPool owner, string host) {
Host = host;
this.owner = owner;
endPoint = getEndPoint(host);
queue = new Queue<PooledSocket>();
}
/// <summary>
/// This method parses the given string into an IPEndPoint.
/// If the string is malformed in some way, or if the host cannot be resolved, this method will throw an exception.
/// </summary>
private static IPEndPoint getEndPoint(string host) {
//Parse port, default to 11211.
int port = 11211;
if(host.Contains(":")) {
string[] split = host.Split(new char[] { ':' });
if(!Int32.TryParse(split[1], out port)) {
throw new ArgumentException("Unable to parse host: " + host);
}
host = split[0];
}
//Parse host string.
IPAddress address;
if(IPAddress.TryParse(host, out address)) {
//host string successfully resolved as an IP address.
} else {
//See if we can resolve it as a hostname
try {
address = Dns.GetHostEntry(host).AddressList[0];
} catch(Exception e) {
throw new ArgumentException("Unable to resolve host: " + host, e);
}
}
return new IPEndPoint(address, port);
}
/// <summary>
/// Gets a socket from the pool.
/// If there are no free sockets, a new one will be created. If something goes
/// wrong while creating the new socket, this pool's endpoint will be marked as dead
/// and all subsequent calls to this method will return null until the retry interval
/// has passed.
/// </summary>
internal PooledSocket Acquire() {
//Do we have free sockets in the pool?
//if so - return the first working one.
//if not - create a new one.
Interlocked.Increment(ref acquired);
lock(queue) {
while(queue.Count > 0) {
PooledSocket socket = queue.Dequeue();
if(socket != null && socket.IsAlive) {
Interlocked.Increment(ref reusedsockets);
return socket;
}
Interlocked.Increment(ref deadsocketsinpool);
}
}
Interlocked.Increment(ref newsockets);
//If we know the endpoint is dead, check if it is time for a retry, otherwise return null.
if (isEndPointDead) {
if (DateTime.Now > deadEndPointRetryTime) {
//Retry
isEndPointDead = false;
} else {
//Still dead
return null;
}
}
//Try to create a new socket. On failure, mark endpoint as dead and return null.
try {
PooledSocket socket = new PooledSocket(this, endPoint, owner.SendReceiveTimeout);
//Reset retry timer on success.
deadEndPointSecondsUntilRetry = 1;
return socket;
}
catch (Exception e) {
Interlocked.Increment(ref failednewsockets);
logger.Error("Error connecting to: " + endPoint.Address, e);
//Mark endpoint as dead
isEndPointDead = true;
//Retry in 2 minutes
deadEndPointRetryTime = DateTime.Now.AddSeconds(deadEndPointSecondsUntilRetry);
if (deadEndPointSecondsUntilRetry < maxDeadEndPointSecondsUntilRetry) {
deadEndPointSecondsUntilRetry = deadEndPointSecondsUntilRetry * 2; //Double retry interval until next time
}
return null;
}
}
/// <summary>
/// Returns a socket to the pool.
/// If the socket is dead, it will be destroyed.
/// If there are more than MaxPoolSize sockets in the pool, it will be destroyed.
/// If there are less than MinPoolSize sockets in the pool, it will always be put back.
/// If there are something inbetween those values, the age of the socket is checked.
/// If it is older than the SocketRrecycleAge, it is destroyed, otherwise it will be
/// put back in the pool.
/// </summary>
internal void Return(PooledSocket socket) {
//If the socket is dead, destroy it.
if (!socket.IsAlive) {
Interlocked.Increment(ref deadsocketsonreturn);
socket.Close();
} else {
//Clean up socket
if (socket.Reset()) {
Interlocked.Increment(ref dirtysocketsonreturn);
}
//Check pool size.
if (queue.Count >= owner.MaxPoolSize) {
//If the pool is full, destroy the socket.
socket.Close();
} else if (queue.Count > owner.MinPoolSize && DateTime.Now - socket.Created > owner.SocketRecycleAge) {
//If we have more than the minimum amount of sockets, but less than the max, and the socket is older than the recycle age, we destroy it.
socket.Close();
} else {
//Put the socket back in the pool.
lock (queue) {
queue.Enqueue(socket);
}
}
}
}
}
}