-
Notifications
You must be signed in to change notification settings - Fork 25
/
redlock.rb
90 lines (82 loc) · 2.4 KB
/
redlock.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
require 'redis'
class Redlock
DefaultRetryCount=3
DefaultRetryDelay=200
ClockDriftFactor = 0.01
UnlockScript='
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end'
def initialize(*server_urls)
@servers = []
server_urls.each{|url|
@servers << Redis.new(:url => url)
}
@quorum = server_urls.length / 2 + 1
@retry_count = DefaultRetryCount
@retry_delay = DefaultRetryDelay
@urandom = File.new("/dev/urandom")
end
def set_retry(count,delay)
@retry_count = count
@retry_delay = delay
end
def lock_instance(redis,resource,val,ttl)
begin
return redis.client.call([:set,resource,val,:nx,:px,ttl])
rescue
return false
end
end
def unlock_instance(redis,resource,val)
begin
redis.client.call([:eval,UnlockScript,1,resource,val])
rescue
# Nothing to do, unlocking is just a best-effort attempt.
end
end
def get_unique_lock_id
val = ""
bytes = @urandom.read(20)
bytes.each_byte{|b|
val << b.to_s(32)
}
val
end
def lock(resource,ttl)
val = get_unique_lock_id
@retry_count.times {
n = 0
start_time = (Time.now.to_f*1000).to_i
@servers.each{|s|
n += 1 if lock_instance(s,resource,val,ttl)
}
# Add 2 milliseconds to the drift to account for Redis expires
# precision, which is 1 milliescond, plus 1 millisecond min drift
# for small TTLs.
drift = (ttl*ClockDriftFactor).to_i + 2
validity_time = ttl-((Time.now.to_f*1000).to_i - start_time)-drift
if n >= @quorum && validity_time > 0
return {
:validity => validity_time,
:resource => resource,
:val => val
}
else
@servers.each{|s|
unlock_instance(s,resource,val)
}
end
# Wait a random delay before to retry
sleep(rand(@retry_delay).to_f/1000)
}
return false
end
def unlock(lock)
@servers.each{|s|
unlock_instance(s,lock[:resource],lock[:val])
}
end
end