Skip to content

Commit

Permalink
Merge updates 'usnistgov/integration' into rel/1.3.X (file locking fix)
Browse files Browse the repository at this point in the history
  • Loading branch information
RayPlante committed Apr 15, 2020
2 parents c9192d6 + 641294d commit f7179a8
Show file tree
Hide file tree
Showing 2 changed files with 321 additions and 23 deletions.
168 changes: 147 additions & 21 deletions python/nistoar/pdr/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Utility functions useful across the pdr package
"""
from collections import OrderedDict, Mapping
import hashlib, json, re, shutil, os, time, subprocess, logging
import hashlib, json, re, shutil, os, time, subprocess, logging, threading
try:
import fcntl
except ImportError:
Expand All @@ -27,6 +27,146 @@ def blab(log, msg, *args, **kwargs):
"""
log.log(BLAB, msg, *args, **kwargs)

class LockedFile(object):
"""
An object representing a file in a locked state. The file is locked against
simultaneous accesses across both threads and processes.
The easiest way to use this class is via the with statement. For example,
to read a file with a shared lock (many reads, no writes):
.. code-block:: python
with LockedFile(filename) as fd:
data = json.load(fd)
And to write a file with an exclusive write (no other simultaneous reads
or writes):
.. code-block:: python
with LockedFile(filename, 'w') as fd:
json.dump(data, fd)
An example of its use without the with statement might be:
.. code-block:: python
lkdfile = LockedFile(filename)
fd = lkdfile.open()
data = json.load(fd)
lkdfile.close() # do not call fd.close() !!!
lkdfile.mode = 'w'
with lkdfile as fd:
json.dump(data, fd)
"""
_thread_locks = {}
_class_lock = threading.RLock()

class _ThreadLock(object):
_reader_count = 0
def __init__(self):
self.ex_lock = threading.Lock()
self.sh_lock = threading.Lock()
def acquire_shared(self):
with self.ex_lock:
if not self._reader_count:
self.sh_lock.acquire()
self._reader_count += 1
def release_shared(self):
with self.ex_lock:
if self._reader_count > 0:
self._reader_count -= 1
if self._reader_count <= 0:
self.sh_lock.release()
def acquire_exclusive(self):
with self.sh_lock:
self.ex_lock.acquire()
def release_exclusive(self):
self.ex_lock.release()

@classmethod
def _get_thread_lock_for(cls, filepath):
filepath = os.path.abspath(filepath)
with cls._class_lock:
if filepath not in cls._thread_locks:
cls._thread_locks[filepath] = cls._ThreadLock()
return cls._thread_locks[filepath]

def __init__(self, filename, mode='r'):
self.mode = mode
self._fname = filename
self._thread_lock = self._get_thread_lock_for(filename)
self._writing = None
self._fo = None

@property
def fo(self):
"""
the open file object or None if the file is not currently open
"""
return self._fo

def _acquire_thread_lock(self):
if self._writing:
self._thread_lock.acquire_exclusive()
else:
self._thread_lock.acquire_shared()
def _release_thread_lock(self):
if self._writing:
self._thread_lock.release_exclusive()
else:
self._thread_lock.release_shared()

def open(self, mode=None):
"""
Open the file so that it is appropriate locked. If mode is not
provided, the mode will be the value set when this object was
created.
"""
if self._fo:
raise StateException(self._fname+": file is already open")
if mode:
self.mode = mode

self._writing = 'a' in self.mode or 'w' in self.mode or '+' in self.mode
self._acquire_thread_lock()
try:
self._fo = open(self._fname, self.mode)
except:
self._release_thread_lock()
if self._fo:
try:
self._fo.close()
except:
pass
self._fo = None
self._writing = None

if fcntl:
lock_type = (self._writing and fcntl.LOCK_EX) or fcntl.LOCK_SH
fcntl.lockf(self.fo, lock_type)
return self.fo

def close(self):
if not self._fo:
return
try:
self._fo.close()
finally:
self._fo = None
self._release_thread_lock()
self._writing = None

def __enter__(self):
return self.open()

def __exit__(self, e1, e2, e3):
return False

def __del__(self):
if self._fo:
self.close()

def read_nerd(nerdfile):
"""
read the JSON-formatted NERDm metadata in the given file
Expand Down Expand Up @@ -69,22 +209,11 @@ def read_json(jsonfile, nolock=False):
the file contents
:raise ValueError: if JSON format errors are detected.
"""
with open(jsonfile) as fd:
if fcntl and not nolock:
fcntl.lockf(fd, fcntl.LOCK_SH)
blab(log, "Acquired shared lock for reading: "+jsonfile)
data = fd.read()
with LockedFile(jsonfile) as fd:
blab(log, "Acquired shared lock for reading: "+jsonfile)
out = json.load(fd, object_pairs_hook=OrderedDict)
blab(log, "released SH")
if not data:
# this is an unfortunate hack multithreaded reading/writing
time.sleep(0.02)
with open(jsonfile) as fd:
if fcntl and not nolock:
fcntl.lockf(fd, fcntl.LOCK_SH)
blab(log, "(Re)Acquired shared lock for reading: "+jsonfile)
data = fd.read()
blab(log, "released SH")
return json.loads(data, object_pairs_hook=OrderedDict)
return out

def write_json(jsdata, destfile, indent=4, nolock=False):
"""
Expand All @@ -99,14 +228,11 @@ def write_json(jsdata, destfile, indent=4, nolock=False):
data without a lock
"""
try:
with open(destfile, 'a') as fd:
if fcntl and not nolock:
fcntl.lockf(fd, fcntl.LOCK_EX)
blab(log, "Acquired exclusive lock for writing: "+destfile)
with LockedFile(destfile, 'a') as fd:
blab(log, "Acquired exclusive lock for writing: "+destfile)
fd.truncate(0)
json.dump(jsdata, fd, indent=indent, separators=(',', ': '))
blab(log, "released EX")

except Exception, ex:
raise StateException("{0}: Failed to write JSON data to file: {1}"
.format(destfile, str(ex)), cause=ex)
Expand Down
176 changes: 174 additions & 2 deletions python/tests/nistoar/pdr/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import os, sys, pdb, json, subprocess
import os, sys, pdb, json, subprocess, threading, time
import unittest as test

from nistoar.testing import *
import nistoar.pdr.utils as utils

testdir = os.path.dirname(os.path.abspath(__file__))
testdatadir = os.path.join(testdir, 'data')
testdatadir2 = os.path.join(testdir, 'preserv', 'data', 'simplesip')
testdatadir3 = os.path.join(testdir, 'preserv', 'data')
testdatadir2 = os.path.join(testdatadir3, 'simplesip')

class TestMimeTypeLoading(test.TestCase):

Expand Down Expand Up @@ -134,7 +135,178 @@ def test_rmfile(self):
utils.rmtree(top)
self.assertTrue(os.path.exists(root))
self.assertFalse(os.path.exists(top))


class TestLockedFile(test.TestCase):

class OtherThread(threading.Thread):
def __init__(self, func, pause=0.05):
threading.Thread.__init__(self)
self.f = func
self.pause = pause
def run(self):
if self.f:
time.sleep(self.pause)
self.f('o')

def lockedop(self, who, mode='r', sleep=0.5):
with utils.LockedFile(self.lfile, mode) as lockdfile:
self.rfd.write(who+'a')
time.sleep(sleep)
self.rfd.write(who+'r')

def setUp(self):
self.tf = Tempfiles()
self.lfile = self.tf("test.txt")
self.rfile = self.tf("result.txt")
self.rfd = None

def tearDown(self):
self.tf.clean()

def test_shared_reads(self):
def f(who):
self.lockedop(who, 'r')
t = self.OtherThread(f)
with open(self.rfile,'w') as self.rfd:
t.start()
f('t')
t.join()
with open(self.rfile) as self.rfd:
data = self.rfd.read()

self.assertEqual(data, "taoatror")

def test_exclusive_writes1(self):
def f(who):
self.lockedop(who, 'w')
t = self.OtherThread(f)
with open(self.rfile,'w') as self.rfd:
t.start()
f('t')
t.join()
with open(self.rfile) as self.rfd:
data = self.rfd.read()

self.assertEqual(data, "tatroaor")

def test_exclusive_writes2(self):
def f(who):
self.lockedop(who, 'w')
t = self.OtherThread(f)
with open(self.rfile,'w') as self.rfd:
t.start()
self.lockedop('t', 'r')
t.join()
with open(self.rfile) as self.rfd:
data = self.rfd.read()

self.assertEqual(data, "tatroaor")

def test_exclusive_writes3(self):
def f(who):
self.lockedop(who, 'r')
t = self.OtherThread(f)
with open(self.rfile,'w') as self.rfd:
t.start()
self.lockedop('t', 'w')
t.join()
with open(self.rfile) as self.rfd:
data = self.rfd.read()

self.assertEqual(data, "tatroaor")

class TestJsonIO(test.TestCase):
# this class focuses on testing the locking of JSON file IO

testdata = os.path.join(testdatadir3,
"3A1EE2F169DD3B8CE0531A570681DB5D1491.json")

def setUp(self):
self.tf = Tempfiles()
self.jfile = self.tf("data.json")

def tearDown(self):
self.tf.clean()

class OtherThread(threading.Thread):
def __init__(self, func, pause=0.05):
threading.Thread.__init__(self)
self.f = func
self.pause = pause
def run(self):
if self.f:
time.sleep(self.pause)
self.f()

def write_test_data(self):
with open(self.testdata) as fd:
data = json.load(fd)

def test_writes(self):
# this is not a definitive test that the use of LockedFile is working
data = utils.read_json(self.testdata)
data['foo'] = 'bar'
def f():
utils.write_json(data, self.jfile)
t = self.OtherThread(f)

data2 = dict(data)
data2['foo'] = 'BAR'

t.start()
utils.write_json(data2, self.jfile)
t.join()

# success in these two lines indicate that the file was not corrupted
data = utils.read_json(self.jfile)
self.assertIn('@id', data)

# success in this test indicates that writing happened in the expected
# order; failure means that the test function is not test what we
# exected.
self.assertEqual(data['foo'], 'bar')

def test_readwrite(self):
# this is not a definitive test that the use of LockedFile is working
data = utils.read_json(self.testdata)
with open(self.jfile,'w') as fd:
json.dump(data, fd)
data['foo'] = 'bar'
def f():
utils.write_json(data, self.jfile)
t = self.OtherThread(f)

t.start()
td = utils.read_json(self.jfile)
t.join()

self.assertIn('@id', td)
self.assertNotIn('foo', td)
td = utils.read_json(self.jfile)
self.assertIn('@id', td)
self.assertEqual(td['foo'], 'bar')

def test_writeread(self):
# this is not a definitive test that the use of LockedFile is working
data = utils.read_json(self.testdata)
with open(self.jfile,'w') as fd:
json.dump(data, fd)
data['foo'] = 'bar'
self.td = None
def f():
self.td = utils.read_json(self.jfile)
t = self.OtherThread(f)

t.start()
utils.write_json(data, self.jfile)
t.join()

self.assertIn('@id', self.td)
self.assertEqual(self.td['foo'], 'bar')




if __name__ == '__main__':
test.main()

0 comments on commit f7179a8

Please sign in to comment.