Skip to content

Commit

Permalink
Writeback removed - performance improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
Danilo Poccia authored and Danilo Poccia committed Nov 29, 2013
1 parent 76dd5ce commit 843dfba
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 2,205 deletions.
111 changes: 71 additions & 40 deletions yas3fs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ class YAS3FS(LoggingMixIn, Operations):
def __init__(self, options):
# Some constants
### self.http_listen_path_length = 30
self.download_running = True
self.running = True
self.check_status_interval = 5.0 # Seconds, no need to configure that

# Initialization
Expand Down Expand Up @@ -741,7 +741,7 @@ class YAS3FS(LoggingMixIn, Operations):
# Cleanup for unmount
logger.info('File system unmount...')

self.download_running = False
self.running = False

if self.http_listen_thread:
self.httpd.shutdown() # To stop HTTP listen thread
Expand Down Expand Up @@ -924,7 +924,7 @@ class YAS3FS(LoggingMixIn, Operations):
logger.info("num_entries, mem_size, disk_size, download_queue, prefetch_queue: %i, %i, %i, %i, %i"
% (num_entries, mem_size, disk_size, self.download_queue.qsize(), self.prefetch_queue.qsize()))

if self.download_running:
if self.running:
for i in self.download_threads.keys():
if not self.download_threads[i].is_alive():
logger.debug("Download thread restarted!")
Expand Down Expand Up @@ -992,7 +992,8 @@ class YAS3FS(LoggingMixIn, Operations):
(parent_path, dir) = os.path.split(path)
logger.debug("parent_path '%s'" % (parent_path))
with self.cache.get_lock(path):
dirs = self.cache.get(parent_path, 'readdir')
# dirs = self.cache.get(parent_path, 'readdir')
dirs = self.readdir(parent_path)
if dirs != None and dirs.count(dir) > 0:
dirs.remove(dir)

Expand All @@ -1017,16 +1018,27 @@ class YAS3FS(LoggingMixIn, Operations):
if key:
logger.debug("get_key from cache '%s'" % (path))
return key
logger.debug("get_key from S3 #1 '%s'" % (path))
key = self.s3_bucket.get_key(self.join_prefix(path))
if not key and path != '/':
full_path = path + '/'
logger.debug("get_key from S3 #2 '%s' '%s'" % (path, full_path))
key = self.s3_bucket.get_key(self.join_prefix(full_path))
if key:
logger.debug("get_key to cache '%s'" % (path))
self.cache.set(path, 'key', key)
look_on_S3 = False
if path == '/':
look_on_S3 = True
else:
(parent_path, file) = os.path.split(path)
dirs = self.readdir(parent_path)
if file in dirs: # We know it can be found on S3
look_on_S3 = True
if look_on_S3:
logger.debug("get_key from S3 #1 '%s'" % (path))
key = self.s3_bucket.get_key(self.join_prefix(path))
if not key and path != '/':
full_path = path + '/'
logger.debug("get_key from S3 #2 '%s' '%s'" % (path, full_path))
key = self.s3_bucket.get_key(self.join_prefix(full_path))
if key:
logger.debug("get_key to cache '%s'" % (path))
self.cache.set(path, 'key', key)
else:
logger.debug("get_key not on S3 '%s'" % (path))
if not key:
logger.debug("get_key no '%s'" % (path))
return key

Expand Down Expand Up @@ -1114,7 +1126,7 @@ class YAS3FS(LoggingMixIn, Operations):
if not (metadata_name == 'attr' and k == 'st_size')]) # For the size use the key.size
key.metadata[metadata_name] = s
if (not data) or (data and (not data.has('change'))):
logger.debug("writing metadata '%s' '%s'" % (path, key))
logger.debug("writing metadata '%s' '%s' S3" % (path, key))
md = key.metadata
md['Content-Type'] = key.content_type # Otherwise we loose the Content-Type with S3 Copy
key.copy(key.bucket.name, key.name, md, preserve_acl=False) # Do I need to preserve ACL?
Expand Down Expand Up @@ -1211,16 +1223,19 @@ class YAS3FS(LoggingMixIn, Operations):
full_path = path + '/'
else:
full_path = path # To manage '/' with an empty s3_prefix
if path != '/' or self.write_metadata:
k.key = self.join_prefix(full_path)
logger.debug("mkdir '%s' '%s' '%s' S3" % (path, mode, k))
k.set_contents_from_string('', headers={'Content-Type': 'application/x-directory'})
self.cache.set(path, 'key', k)
data.delete('change')
if path != '/':
self.cache.set(path, 'readdir', ['.', '..']) # the directory is empty
self.add_to_parent_readdir(path)

if path != '/' or self.write_metadata:
k.key = self.join_prefix(full_path)
logger.debug("mkdir '%s' '%s' '%s' S3" % (path, mode, k))
k.set_contents_from_string('', headers={'Content-Type': 'application/x-directory'})
data.delete('change')
if path != '/': ### Do I need this???
self.publish(['mkdir', path])

return 0

def symlink(self, path, link):
Expand All @@ -1240,7 +1255,7 @@ class YAS3FS(LoggingMixIn, Operations):
attr['st_ctime'] = now
attr['st_size'] = 0
attr['st_mode'] = (stat.S_IFLNK | 0755)
self.cache.delete(path)
self.cache.delete(path)
self.cache.add(path)
if self.cache_on_disk > 0:
data = FSData(self.cache, 'mem', path) # New files (almost) always cache in mem - is it ok ???
Expand All @@ -1255,12 +1270,14 @@ class YAS3FS(LoggingMixIn, Operations):
self.write(path, link, 0)
data.close()
k.key = self.join_prefix(path)
logger.debug("symlink '%s' '%s' '%s' S3" % (path, link, k))
k.set_contents_from_string(link, headers={'Content-Type': 'application/x-symlink'})
self.cache.set(path, 'key', k)
data.delete('change')
self.add_to_parent_readdir(path)
self.publish(['symlink', path])

logger.debug("symlink '%s' '%s' '%s' S3" % (path, link, k))
k.set_contents_from_string(link, headers={'Content-Type': 'application/x-symlink'})
data.delete('change')
self.publish(['symlink', path])

return 0

def check_data(self, path):
Expand Down Expand Up @@ -1323,7 +1340,7 @@ class YAS3FS(LoggingMixIn, Operations):
self.download_queue.put(option_list)

def download(self, prefetch=False):
while self.download_running:
while self.running:
try:
if prefetch:
(path, start, end) = self.prefetch_queue.get(True, 1) # 1 second time-out
Expand Down Expand Up @@ -1469,16 +1486,22 @@ class YAS3FS(LoggingMixIn, Operations):
if not k:
logger.debug("rmdir '%s' ENOENT" % (path))
raise FuseOSError(errno.ENOENT)
full_path = self.join_prefix(path + '/')
key_list = self.s3_bucket.list(full_path) # Don't need to set a delimeter here
for l in key_list:
if l.name != full_path:
logger.debug("rmdir '%s' ENOTEMPTY" % (path))
raise FuseOSError(errno.ENOTEMPTY)
k.delete()
if len(self.readdir(path)) > 2:
logger.debug("rmdir '%s' ENOTEMPTY" % (path))
raise FuseOSError(errno.ENOTEMPTY)
#full_path = self.join_prefix(path + '/')
#key_list = self.s3_bucket.list(full_path) # Don't need to set a delimeter here
#for l in key_list:
# if l.name != full_path:
# logger.debug("rmdir '%s' ENOTEMPTY" % (path))
# raise FuseOSError(errno.ENOTEMPTY)

logger.debug("rmdir '%s' '%s' S3" % (path, k))
k.delete()
self.publish(['rmdir', path])

self.cache.reset(path) # Cache invaliation
self.remove_from_parent_readdir(path)
self.publish(['rmdir', path])
return 0

def truncate(self, path, size):
Expand Down Expand Up @@ -1558,7 +1581,8 @@ class YAS3FS(LoggingMixIn, Operations):
md['Content-Type'] = key.content_type # Otherwise we loose the Content-Type with S3 Copy
key.copy(key.bucket.name, target, md, preserve_acl=False) # Do I need to preserve ACL?
key.delete()
self.publish(['rename', source_path, target_path])
self.publish(['rename', source_path, target_path])

self.remove_from_parent_readdir(path)
self.add_to_parent_readdir(new_path)

Expand Down Expand Up @@ -1605,10 +1629,12 @@ class YAS3FS(LoggingMixIn, Operations):
logger.debug("unlink '%s' ENOENT" % (path))
raise FuseOSError(errno.ENOENT)
if k:
logger.debug("unlink '%s' '%s' S3" % (path, k))
k.delete()
self.publish(['unlink', path])

self.cache.reset(path)
self.remove_from_parent_readdir(path)
self.publish(['unlink', path])
return 0

def create(self, path, mode, fi=None):
Expand Down Expand Up @@ -1737,20 +1763,25 @@ class YAS3FS(LoggingMixIn, Operations):
old_size = 0
else:
old_size = k.size

written = False
if self.multipart_num > 0:
full_size = attr['st_size']
if full_size > self.multipart_size:
k = self.multipart_upload(k.name, data, full_size,
headers={'Content-Type': type}, metadata=k.metadata)
k = self.get_key(path, cache=False)
logger.debug("flush '%s' '%s' '%s' '%s' S3" % (path, fh, k, type))
new_k = self.multipart_upload(k.name, data, full_size,
headers={'Content-Type': type}, metadata=k.metadata)
new_k = self.get_key(path, cache=False)
etag = new_k.etag[1:-1]
written = True
if not written:
logger.debug("flush '%s' '%s' '%s' '%s' S3" % (path, fh, k, type))
k.set_contents_from_file(data.content, headers={'Content-Type': type})
data.update_etag(k.etag[1:-1])
etag = k.etag[1:-1]
data.update_etag(etag)
data.delete('change')
self.publish(['flush', path, k.etag[1:-1]])
self.publish(['flush', path, etag])

return 0

def multipart_upload(self, key_path, data, full_size, headers, metadata):
Expand Down
Loading

0 comments on commit 843dfba

Please sign in to comment.