Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix inaccurate creation order on Windows #346

Merged
merged 2 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions hsds/async_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
# request a copy from [email protected]. #
##############################################################################

import time
import hashlib
import numpy as np
from aiohttp.client_exceptions import ClientError
Expand All @@ -23,7 +22,7 @@
from .util.arrayUtil import getNumElements, bytesToArray
from .util.dsetUtil import getHyperslabSelection, getFilterOps, getChunkDims, getFilters
from .util.dsetUtil import getDatasetLayoutClass, getDatasetLayout, getShapeDims

from .util.timeUtil import getNow
from .util.storUtil import getStorKeys, putStorJSONObj, getStorJSONObj
from .util.storUtil import deleteStorObj, getStorBytes, isStorObj
from . import hsds_logger as log
Expand Down Expand Up @@ -383,7 +382,7 @@ async def scanRoot(app, rootid, update=False, bucket=None):
results["logical_bytes"] = 0
results["checksums"] = {} # map of objid to checksums
results["bucket"] = bucket
results["scan_start"] = time.time()
results["scan_start"] = getNow(app)

app["scanRoot_results"] = results
app["scanRoot_keyset"] = set()
Expand Down Expand Up @@ -438,7 +437,7 @@ async def scanRoot(app, rootid, update=False, bucket=None):
# free up memory used by the checksums
del results["checksums"]

results["scan_complete"] = time.time()
results["scan_complete"] = getNow(app)

if update:
# write .info object back to S3
Expand Down
1 change: 1 addition & 0 deletions hsds/basenode.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ def baseInit(node_type):
app["node_number"] = -1
app["node_type"] = node_type
app["start_time"] = int(time.time()) # seconds after epoch
app["start_time_relative"] = time.perf_counter() # high precision time
app["register_time"] = 0
app["max_task_count"] = config.get("max_task_count")
app["storage_clients"] = {} # storage client drivers
Expand Down
4 changes: 2 additions & 2 deletions hsds/ctype_dn.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# data node of hsds cluster
#

import time

from aiohttp.web_exceptions import HTTPBadRequest, HTTPNotFound
from aiohttp.web_exceptions import HTTPInternalServerError
Expand All @@ -23,6 +22,7 @@
from .datanode_lib import get_obj_id, get_metadata_obj, save_metadata_obj
from .datanode_lib import delete_metadata_obj, check_metadata_obj
from .util.domainUtil import isValidBucketName
from .util.timeUtil import getNow
from . import hsds_logger as log


Expand Down Expand Up @@ -121,7 +121,7 @@ async def POST_Datatype(request):
type_json = body["type"]

# ok - all set, create committed type obj
now = time.time()
now = getNow(app)

log.info(f"POST_datatype, typejson: {type_json}")

Expand Down
14 changes: 7 additions & 7 deletions hsds/datanode.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#

import asyncio
import time
import traceback
from aiohttp.web import run_app

Expand All @@ -25,6 +24,7 @@
from .util.httpUtil import isUnixDomainUrl, bindToSocket, getPortFromUrl
from .util.httpUtil import jsonResponse, release_http_client
from .util.storUtil import setBloscThreads, getBloscThreads
from .util.timeUtil import getNow
from .basenode import healthCheck, baseInit
from . import hsds_logger as log
from .domain_dn import GET_Domain, PUT_Domain, DELETE_Domain, PUT_ACL
Expand Down Expand Up @@ -99,7 +99,7 @@ async def bucketScan(app):
short_sleep_time = float(scan_sleep_time) / 10.0
scan_wait_time = int(config.get("scan_wait_time", default=60.0)) # default to ~1min
log.info(f"scan_wait_time: {scan_wait_time}")
last_action = time.time() # keep track of the last time any work was done
last_action = getNow(app) # keep track of the last time any work was done

# update/initialize root object before starting node updates

Expand All @@ -111,7 +111,7 @@ async def bucketScan(app):

root_scan_ids = app["root_scan_ids"]
root_ids = {}
now = time.time()
now = getNow(app)
# copy ids to a new map so we don't need to worry about
# race conditions
for root_id in root_scan_ids:
Expand Down Expand Up @@ -165,9 +165,9 @@ async def bucketScan(app):
tb = traceback.format_exc()
print("traceback:", tb)

last_action = time.time()
last_action = getNow(app)

now = time.time()
now = getNow(app)
if (now - last_action) > scan_sleep_time:
sleep_time = scan_sleep_time # long nap
else:
Expand Down Expand Up @@ -375,10 +375,10 @@ async def preStop(request):
log.request(request)
app = request.app

shutdown_start = time.time()
shutdown_start = getNow(app)
log.warn(f"preStop request calling on_shutdown at {shutdown_start:.2f}")
await on_shutdown(app)
shutdown_elapse_time = time.time() - shutdown_start
shutdown_elapse_time = getNow(app) - shutdown_start
msg = f"shutdown took: {shutdown_elapse_time:.2f} seconds"
if shutdown_elapse_time > 2.0:
# 2.0 is the default grace period for kubernetes
Expand Down
31 changes: 15 additions & 16 deletions hsds/datanode_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import asyncio
import json
import time
import numpy as np
from aiohttp.web_exceptions import HTTPGone, HTTPInternalServerError
from aiohttp.web_exceptions import HTTPNotFound, HTTPForbidden
Expand All @@ -35,7 +34,7 @@
from .util.arrayUtil import arrayToBytes, bytesToArray, jsonToArray
from .util.hdf5dtype import createDataType
from .util.rangegetUtil import ChunkLocation, chunkMunge, getHyperChunkIndex, getHyperChunkFactors

from .util.timeUtil import getNow
from . import config
from . import hsds_logger as log
from .dset_lib import getFillValue
Expand Down Expand Up @@ -166,7 +165,7 @@ async def write_s3_obj(app, obj_id, bucket=None):
del pending_s3_write_tasks[obj_id]
return None

now = time.time()
now = getNow(app)

last_update_time = now
if obj_id in dirty_ids:
Expand Down Expand Up @@ -315,7 +314,7 @@ async def write_s3_obj(app, obj_id, bucket=None):
notify_objs[root_id] = bucket

# calculate time to do the write
elapsed_time = time.time() - now
elapsed_time = getNow(app) - now
log.info(f"s3 write for {obj_id} took {elapsed_time:.3f}s")
return obj_id

Expand Down Expand Up @@ -368,7 +367,7 @@ async def get_metadata_obj(app, obj_id, bucket=None):
store_read_timeout = float(config.get("store_read_timeout", default=2.0))
log.debug(f"store_read_timeout: {store_read_timeout}")
store_read_sleep = float(config.get("store_read_sleep_interval", default=0.1))
while time.time() - read_start_time < store_read_timeout:
while getNow(app) - read_start_time < store_read_timeout:
log.debug(f"waiting for pending s3 read {s3_key}, sleeping")
await asyncio.sleep(store_read_sleep)
if obj_id in meta_cache:
Expand All @@ -384,13 +383,13 @@ async def get_metadata_obj(app, obj_id, bucket=None):
if not obj_json:
log.debug(f"getS3JSONObj({obj_id}, bucket={bucket})")
if obj_id not in pending_s3_read:
pending_s3_read[obj_id] = time.time()
pending_s3_read[obj_id] = getNow(app)
# read S3 object as JSON
try:
obj_json = await getStorJSONObj(app, s3_key, bucket=bucket)
# read complete - remove from pending map
if obj_id in pending_s3_read:
elapsed_time = time.time() - pending_s3_read[obj_id]
elapsed_time = getNow(app) - pending_s3_read[obj_id]
log.info(f"s3 read for {obj_id} took {elapsed_time}")
else:
log.warn(f"s3 read complete but pending object: {obj_id} not found")
Expand Down Expand Up @@ -461,7 +460,7 @@ async def save_metadata_obj(
meta_cache[obj_id] = obj_json

meta_cache.setDirty(obj_id)
now = time.time()
now = getNow(app)
log.debug(f"setting dirty_ids[{obj_id}] = ({now}, {bucket})")
if isValidUuid(obj_id) and not bucket:
log.warn(f"bucket is not defined for save_metadata_obj: {obj_id}")
Expand Down Expand Up @@ -1065,7 +1064,7 @@ async def get_chunk(
config.get("store_read_sleep_interval", default=0.1)
)

while time.time() - read_start_time < store_read_timeout:
while getNow(app) - read_start_time < store_read_timeout:
log.debug("waiting for pending s3 read, sleeping")
await asyncio.sleep(store_read_sleep_interval)
if chunk_id in chunk_cache:
Expand All @@ -1079,7 +1078,7 @@ async def get_chunk(

if chunk_arr is None:
if chunk_id not in pending_s3_read:
pending_s3_read[chunk_id] = time.time()
pending_s3_read[chunk_id] = getNow(app)

try:
kwargs = {
Expand All @@ -1099,7 +1098,7 @@ async def get_chunk(

if chunk_id in pending_s3_read:
# read complete - remove from pending map
elapsed_time = time.time() - pending_s3_read[chunk_id]
elapsed_time = getNow(app) - pending_s3_read[chunk_id]
log.info(f"s3 read for {chunk_id} took {elapsed_time}")
else:
msg = f"expected to find {chunk_id} in "
Expand Down Expand Up @@ -1192,7 +1191,7 @@ def save_chunk(app, chunk_id, dset_json, chunk_arr, bucket=None):

# async write to S3
dirty_ids = app["dirty_ids"]
now = time.time()
now = getNow(app)
dirty_ids[chunk_id] = (now, bucket)


Expand Down Expand Up @@ -1233,7 +1232,7 @@ def callback(future):
log.error(msg)

update_count = 0
s3sync_start = time.time()
s3sync_start = getNow(app)

log.info(f"s3sync - processing {len(dirty_ids)} dirty_ids")
for obj_id in dirty_ids:
Expand Down Expand Up @@ -1335,7 +1334,7 @@ def callback(future):
async def s3syncCheck(app):
s3_sync_interval = config.get("s3_sync_interval")
s3_age_time = config.get("s3_age_time", default=1)
last_update = time.time()
last_update = getNow(app)
if app["node_state"] != "TERMINATING":
s3_dirty_age_to_write = config.get("s3_dirty_age_to_write", default=20)
log.debug(f"s3sync - s3_dirty_age_to_write is {s3_dirty_age_to_write}")
Expand Down Expand Up @@ -1373,11 +1372,11 @@ async def s3syncCheck(app):

if update_count > 0:
log.debug("s3syncCheck short sleep")
last_update = time.time()
last_update = getNow(app)
# give other tasks a chance to run
await asyncio.sleep(0)
else:
last_update_delta = time.time() - last_update
last_update_delta = getNow(app) - last_update
if last_update_delta > s3_sync_interval:
sleep_time = s3_sync_interval
else:
Expand Down
6 changes: 3 additions & 3 deletions hsds/domain_dn.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
# data node of hsds cluster
#

import time
from aiohttp.web_exceptions import HTTPConflict, HTTPInternalServerError
from aiohttp.web import json_response

from .util.authUtil import getAclKeys
from .util.domainUtil import isValidDomain, getBucketForDomain
from .util.idUtil import validateInPartition
from .util.timeUtil import getNow
from .datanode_lib import get_metadata_obj, save_metadata_obj
from .datanode_lib import delete_metadata_obj, check_metadata_obj
from . import hsds_logger as log
Expand Down Expand Up @@ -131,7 +131,7 @@ async def PUT_Domain(request):
log.info("no root id, creating folder")
domain_json["owner"] = body_json["owner"]
domain_json["acls"] = body_json["acls"]
now = time.time()
now = getNow(app)
domain_json["created"] = now
domain_json["lastModified"] = now

Expand Down Expand Up @@ -213,7 +213,7 @@ async def PUT_ACL(request):
acls[acl_username] = acl

# update the timestamp
now = time.time()
now = getNow(app)
domain_json["lastModified"] = now

# write back to S3
Expand Down
6 changes: 3 additions & 3 deletions hsds/domain_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import asyncio
import json
import os.path as op
import time

from aiohttp.web_exceptions import HTTPBadRequest, HTTPForbidden, HTTPNotFound
from aiohttp.web_exceptions import HTTPGone, HTTPInternalServerError
Expand All @@ -37,6 +36,7 @@
from .util.storUtil import getStorKeys, getCompressors
from .util.boolparser import BooleanParser
from .util.globparser import globmatch
from .util.timeUtil import getNow
from .servicenode_lib import getDomainJson, getObjectJson, getObjectIdByPath
from .servicenode_lib import getRootInfo, checkBucketAccess, doFlush, getDomainResponse
from .basenode import getVersion
Expand Down Expand Up @@ -901,7 +901,7 @@ async def PUT_Domain(request):
post_params = {"timestamp": 0} # have scan run immediately
if bucket:
post_params["bucket"] = bucket
req_send_time = time.time()
req_send_time = getNow(app)
await http_post(app, notify_req, data={}, params=post_params)

# Poll until the scan_complete time is greater than
Expand All @@ -913,7 +913,7 @@ async def PUT_Domain(request):
if scan_time > req_send_time:
log.info(f"scan complete for root: {root_id}")
break
if time.time() - req_send_time > MAX_WAIT_TIME:
if getNow(app) - req_send_time > MAX_WAIT_TIME:
log.warn(f"scan failed to complete in {MAX_WAIT_TIME} seconds for {root_id}")
raise HTTPServiceUnavailable()
log.debug(f"do_rescan sleeping for {RESCAN_SLEEP_TIME}s")
Expand Down
4 changes: 2 additions & 2 deletions hsds/dset_dn.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
# data node of hsds cluster
#

import time
from aiohttp.web_exceptions import HTTPBadRequest, HTTPNotFound, HTTPConflict
from aiohttp.web_exceptions import HTTPInternalServerError
from aiohttp.web import json_response


from .util.idUtil import isValidUuid, validateUuid
from .util.domainUtil import isValidBucketName
from .util.timeUtil import getNow
from .datanode_lib import get_obj_id, check_metadata_obj, get_metadata_obj
from .datanode_lib import save_metadata_obj, delete_metadata_obj
from . import hsds_logger as log
Expand Down Expand Up @@ -132,7 +132,7 @@ async def POST_Dataset(request):
layout = body["layout"] # client specified chunk layout

# ok - all set, create committed type obj
now = int(time.time())
now = getNow(app)

log.debug(f"POST_dataset typejson: {type_json}, shapejson: {shape_json}")

Expand Down
Loading
Loading