Skip to content

Commit

Permalink
Trigger reload on deleted files (#198)
Browse files Browse the repository at this point in the history
* Ignore more environment files

* Detect file deletion

* Ignore fewer environment files

* Add failing test for multiple tasks

* Track modification times for each task
  • Loading branch information
bhrutledge authored and lepture committed May 7, 2019
1 parent f80cb3a commit 86f7134
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 16 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pip-log.txt
.coverage
.tox
.env/
venv/

docs/_build
example/style.css
Expand Down
69 changes: 58 additions & 11 deletions livereload/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@


class Watcher(object):
"""A file watcher registery."""
"""A file watcher registry."""
def __init__(self):
self._tasks = {}
self._mtimes = {}

# modification time of filepaths for each task,
# before and after checking for changes
self._task_mtimes = {}
self._new_mtimes = {}

# setting changes
self._changes = []
Expand Down Expand Up @@ -65,6 +69,7 @@ def watch(self, path, func=None, delay=0, ignore=None):
'func': func,
'delay': delay,
'ignore': ignore,
'mtimes': {},
}

def start(self, callback):
Expand All @@ -73,7 +78,10 @@ def start(self, callback):
return False

def examine(self):
"""Check if there are changes, if true, run the given task."""
"""Check if there are changes. If so, run the given task.
Returns a tuple of modified filepath and reload delay.
"""
if self._changes:
return self._changes.pop()

Expand All @@ -82,6 +90,7 @@ def examine(self):
delays = set()
for path in self._tasks:
item = self._tasks[path]
self._task_mtimes = item['mtimes']
if self.is_changed(path, item['ignore']):
func = item['func']
delay = item['delay']
Expand All @@ -102,13 +111,49 @@ def examine(self):
return self.filepath, delay

def is_changed(self, path, ignore=None):
"""Check if any filepaths have been added, modified, or removed.
Updates filepath modification times in self._task_mtimes.
"""
self._new_mtimes = {}
changed = False

if os.path.isfile(path):
return self.is_file_changed(path, ignore)
changed = self.is_file_changed(path, ignore)
elif os.path.isdir(path):
return self.is_folder_changed(path, ignore)
return self.is_glob_changed(path, ignore)
changed = self.is_folder_changed(path, ignore)
else:
changed = self.is_glob_changed(path, ignore)

if not changed:
changed = self.is_file_removed()

self._task_mtimes.update(self._new_mtimes)
return changed

def is_file_removed(self):
"""Check if any filepaths have been removed since last check.
Deletes removed paths from self._task_mtimes.
Sets self.filepath to one of the removed paths.
"""
removed_paths = set(self._task_mtimes) - set(self._new_mtimes)
if not removed_paths:
return False

for path in removed_paths:
self._task_mtimes.pop(path)
# self.filepath seems purely informational, so setting one
# of several removed files seems sufficient
self.filepath = path
return True

def is_file_changed(self, path, ignore=None):
"""Check if filepath has been added or modified since last check.
Updates filepath modification times in self._new_mtimes.
Sets self.filepath to changed path.
"""
if not os.path.isfile(path):
return False

Expand All @@ -120,20 +165,21 @@ def is_file_changed(self, path, ignore=None):

mtime = os.path.getmtime(path)

if path not in self._mtimes:
self._mtimes[path] = mtime
if path not in self._task_mtimes:
self._new_mtimes[path] = mtime
self.filepath = path
return mtime > self._start

if self._mtimes[path] != mtime:
self._mtimes[path] = mtime
if self._task_mtimes[path] != mtime:
self._new_mtimes[path] = mtime
self.filepath = path
return True

self._mtimes[path] = mtime
self._new_mtimes[path] = mtime
return False

def is_folder_changed(self, path, ignore=None):
"""Check if directory path has any changed filepaths."""
for root, dirs, files in os.walk(path, followlinks=True):
for d in self.ignored_dirs:
if d in dirs:
Expand All @@ -145,6 +191,7 @@ def is_folder_changed(self, path, ignore=None):
return False

def is_glob_changed(self, path, ignore=None):
"""Check if glob path has any changed filepaths."""
for f in glob.glob(path):
if self.is_file_changed(f, ignore):
return True
Expand Down
66 changes: 61 additions & 5 deletions tests/test_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,27 @@ def test_watch_dir(self):
assert watcher.is_changed(tmpdir) is False

# sleep 1 second so that mtime will be different
# TODO: This doesn't seem necessary; test passes without it
time.sleep(1)

with open(os.path.join(tmpdir, 'foo'), 'w') as f:
filepath = os.path.join(tmpdir, 'foo')

with open(filepath, 'w') as f:
f.write('')

assert watcher.is_changed(tmpdir)
assert watcher.is_changed(tmpdir) is False

os.remove(filepath)
assert watcher.is_changed(tmpdir)
assert watcher.is_changed(tmpdir) is False

def test_watch_file(self):
watcher = Watcher()
watcher.count = 0

# sleep 1 second so that mtime will be different
# TODO: This doesn't seem necessary; test passes without it
time.sleep(1)

filepath = os.path.join(tmpdir, 'foo')
Expand All @@ -56,17 +64,25 @@ def add_count():

watcher.watch(filepath, add_count)
assert watcher.is_changed(filepath)
assert watcher.is_changed(filepath) is False

# sleep 1 second so that mtime will be different
# TODO: This doesn't seem necessary; test passes without it
time.sleep(1)

with open(filepath, 'w') as f:
f.write('')

rv = watcher.examine()
assert rv[0] == os.path.abspath(filepath)
abs_filepath = os.path.abspath(filepath)
assert watcher.examine() == (abs_filepath, None)
assert watcher.examine() == (None, None)
assert watcher.count == 1

os.remove(filepath)
assert watcher.examine() == (abs_filepath, None)
assert watcher.examine() == (None, None)
assert watcher.count == 2

def test_watch_glob(self):
watcher = Watcher()
watcher.watch(tmpdir + '/*')
Expand All @@ -82,8 +98,13 @@ def test_watch_glob(self):
with open(filepath, 'w') as f:
f.write('')

rv = watcher.examine()
assert rv[0] == os.path.abspath(filepath)
abs_filepath = os.path.abspath(filepath)
assert watcher.examine() == (abs_filepath, None)
assert watcher.examine() == (None, None)

os.remove(filepath)
assert watcher.examine() == (abs_filepath, None)
assert watcher.examine() == (None, None)

def test_watch_ignore(self):
watcher = Watcher()
Expand All @@ -94,3 +115,38 @@ def test_watch_ignore(self):
f.write('')

assert watcher.examine() == (None, None)

def test_watch_multiple_dirs(self):
first_dir = os.path.join(tmpdir, 'first')
second_dir = os.path.join(tmpdir, 'second')

watcher = Watcher()

os.mkdir(first_dir)
watcher.watch(first_dir)
assert watcher.examine() == (None, None)

first_path = os.path.join(first_dir, 'foo')
with open(first_path, 'w') as f:
f.write('')
assert watcher.examine() == (first_path, None)
assert watcher.examine() == (None, None)

os.mkdir(second_dir)
watcher.watch(second_dir)
assert watcher.examine() == (None, None)

second_path = os.path.join(second_dir, 'bar')
with open(second_path, 'w') as f:
f.write('')
assert watcher.examine() == (second_path, None)
assert watcher.examine() == (None, None)

with open(first_path, 'a') as f:
f.write('foo')
assert watcher.examine() == (first_path, None)
assert watcher.examine() == (None, None)

os.remove(second_path)
assert watcher.examine() == (second_path, None)
assert watcher.examine() == (None, None)

0 comments on commit 86f7134

Please sign in to comment.