-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathpipe_read.py
70 lines (47 loc) · 1.94 KB
/
pipe_read.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
"""This script demonstrates how to read from a named pipe with a timeout.
The use case is handling situations where the producer on the other end of the
pipe has hung or gone away or is simply taking too long to respond.
To try it out, create a new named pipe at your shell.
$ mkfifo my-demo-pipe
Run this script against that pipe with no timeout specified.
$ python pipe_read.py my-demo-pipe
It will wait for data to be written to the pipe. You can do that with the
echo command.
$ echo 'Hello, pipe!' > my-demo-pipe
Next, try this script with a timeout.
$ python pipe_read.py my-demo-pipe --timeout=3.14
It will exit after ~3.14 seconds unless you send some data through the pipe.
In the case of a timeout, the open system call is interrupted by a SIGALRM that
is generated by the setitimer system call.
"""
import argparse
import signal
import sys
_DONT_REPEAT = 0
_CLEAR_TIMEOUT = 0
def main():
# Handle CLI stuff.
args = parse_args()
if args.timeout:
# Set the timer and wire up the signal handler.
signal.setitimer(signal.ITIMER_REAL, args.timeout, _DONT_REPEAT)
signal.signal(signal.SIGALRM, lambda sig, frame: sys.exit(1))
# Try and read from the named pipe.
# The `open` call will block if there is no data in the pipe. If a timeout
# was specified the call can be interrupted.
with open(args.pipe) as f:
# Clear the timer now that we were able to open the file.
signal.setitimer(signal.ITIMER_REAL, _CLEAR_TIMEOUT, _DONT_REPEAT)
print f.read().strip()
def parse_args():
parser = argparse.ArgumentParser(description="Read from a named pipe.")
parser.add_argument(
'pipe', metavar="PIPE", help="The named pipe to read from.",
)
parser.add_argument(
'-t', '--timeout', default=0, type=float,
help="How long to wait before timing out the read.",
)
return parser.parse_args()
if __name__ == '__main__':
main()