forked from chyld/asyncio-summer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
asyncread.py
55 lines (38 loc) · 1.33 KB
/
asyncread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import asyncio
import sys
from pathlib import Path
import aionotify
from aiofile import AIOFile, LineReader
async def follow(rawfn):
cleanfn = Path(rawfn).name
fnpath = Path().cwd() / cleanfn
if not fnpath.is_file():
raise Exception(reason="File {} not found".format(rawfn))
# Setup watcher for file
watcher = aionotify.Watcher()
watcher.watch(alias=rawfn, path=str(fnpath), flags=aionotify.Flags.MODIFY)
await watcher.setup(asyncio.get_event_loop())
print('Watcher created for "{}"'.format(fnpath))
# Open and read what is in file already
async with AIOFile(fnpath, mode="r", encoding="utf-8") as afp:
print("Sending lines!")
reader = LineReader(afp)
async for line in reader:
print(line, end="")
while True:
event = await watcher.get_event()
print("Got event! {}".format(event))
async for line in reader:
print(line, end="")
watcher.close()
async def main(*args):
tasks = [follow(name) for name in args]
await asyncio.gather(*tasks)
# await follow("one.txt")
if __name__ == "__main__":
args = sys.argv[1:]
args = ["one.txt", "two.txt"]
asyncio.run(main(*args))
# loop = asyncio.get_event_loop()
# asyncio.ensure_future(main(*args))
# loop.run_forever()