Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cdataset.glob() and optimize CMIP6 data search #215

Closed
wants to merge 10 commits into from
25 changes: 12 additions & 13 deletions climaf/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import json
from operator import itemgetter

import env
from env.environment import *
from climaf import version
from climaf.utils import Climaf_Cache_Error
from climaf.classes import compare_trees, cobject, cdataset, guess_projects, allow_error_on_ds, ds
from climaf.cmacro import crewrite
from env.clogging import clogger

currentCache = None
cachedirs = None
#: The place to write the index
cacheIndexFileName = None
Expand Down Expand Up @@ -58,14 +58,13 @@ def setNewUniqueCache(path, raz=True):
"""
Define PATH as the sole cache to use from now. And clear it
"""
global currentCache
global cachedirs
global cacheIndexFileName

path = os.path.expanduser(path)
cachedirs = [path] # The list of cache directories
cacheIndexFileName = cachedirs[0] + "/index" # The place to write the index
currentCache = cachedirs[0]
env.environment.currentCache = cachedirs[0]
if raz:
craz(hideError=True)

Expand Down Expand Up @@ -111,9 +110,9 @@ def generateUniqueFileName(expression, format="nc", option="new", create_dirs=Tr

def hash_to_path(vhash, format, option="new", prefix=""):
if option == "new":
rep = os.sep.join([currentCache, prefix + vhash[0:2], vhash[2:]])
rep = os.sep.join([env.environment.currentCache, prefix + vhash[0:2], vhash[2:]])
else:
rep = os.sep.join([currentCache, prefix + stringToPath(vhash[0:fileNameLength - 1], directoryNameLength)])
rep = os.sep.join([env.environment.currentCache, prefix + stringToPath(vhash[0:fileNameLength - 1], directoryNameLength)])
rep = ".".join([rep, format])
rep = os.path.expanduser(rep)
return rep
Expand All @@ -129,7 +128,7 @@ def alternate_filename(fpath):
# Get file format
format = fpath.split(".")[-1]
# Remove cache root location prefix
relative_fpath = fpath[len(currentCache)+1:]
relative_fpath = fpath[len(env.environment.currentCache)+1:]
# Get name without slashes nor extension
vhash = relative_fpath.replace("/", "").split(".")[0]
#
Expand Down Expand Up @@ -655,8 +654,8 @@ def craz(force=False, hideError=False):

"""
global crs2filename
cc = os.path.expanduser(currentCache)
if os.path.exists(currentCache) or hideError is False:
cc = os.path.expanduser(env.environment.currentCache)
if os.path.exists(env.environment.currentCache) or hideError is False:
if force:
os.system("chmod -R +w " + cc)
os.system("rm -fR " + cc + "/*")
Expand Down Expand Up @@ -1076,7 +1075,7 @@ def load_cvalues():
global cvalues

if handle_cvalues is not False:
cache_file = os.sep.join([currentCache, "cvalues.json"])
cache_file = os.sep.join([env.environment.currentCache, "cvalues.json"])
if os.path.exists(cache_file):
with open(cache_file, "r") as f:
cvalues = json.load(f)
Expand All @@ -1090,10 +1089,10 @@ def sync_cvalues():
global cvalues

if handle_cvalues is not False:
if not os.path.isdir(currentCache):
os.makedirs(currentCache)
ccache = os.path.sep.join([currentCache, "cvalues.json"])
tmp = os.path.sep.join([currentCache, "cvalues_tmp.json"])
if not os.path.isdir(env.environment.currentCache):
os.makedirs(env.environment.currentCache)
ccache = os.path.sep.join([env.environment.currentCache, "cvalues.json"])
tmp = os.path.sep.join([env.environment.currentCache, "cvalues_tmp.json"])
#
try:
# Try to get pre-existing on-disk content
Expand Down
100 changes: 98 additions & 2 deletions climaf/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
import six

from env.environment import *
from climaf.utils import Climaf_Classes_Error
from climaf.utils import Climaf_Classes_Error, remove_keys_with_same_values
from climaf.dataloc import isLocal, getlocs, selectFiles, dataloc
from climaf.period import init_period, cperiod, merge_periods, intersect_periods_list, lastyears, firstyears
from climaf.period import init_period, cperiod, merge_periods, intersect_periods_list,\
lastyears, firstyears, group_periods
from env.clogging import clogger
from climaf.netcdfbasics import fileHasVar, varsOfFile, timeLimits, model_id

Expand Down Expand Up @@ -149,6 +150,26 @@ def crs2ds(self, crs):
for i, f in enumerate(self.facets):
kvp[f] = fields[i]
return cdataset(**kvp)


def cvalid(self, attribute, value=None):
"""Set or get the list of valid values for a CliMAF dataset attribute
or facet (such as e.g. 'model', 'simulation' ...). Useful
e.g. for constraining those data files which match a dataset
definition

Example::

>>> cvalid('grid' , [ "gr", "gn", "gr1", "gr2" ])

"""
#
if attribute not in self.facets:
raise Climaf_Classes_Error("project '%s' doesn't use facet '%s'" % (project, attribute))
if value is None:
return self.facet_authorized_values.get(attribute, None)
else:
self.facet_authorized_values[attribute] = value


def cdef(attribute, value=None, project=None):
Expand Down Expand Up @@ -535,6 +556,81 @@ def matches_conditions(self, conditions):
return False
return True

def glob(self, what = None , periods = None, split = None):
senesis marked this conversation as resolved.
Show resolved Hide resolved
"""Datafile exploration for a dataset which possibly has
wildcards (* and ?) in attributes/facets.

Returns info regarding matching datafile or directories:

- if WHAT = 'files' , returns a string of all data filenames

- otherwise, returns a list of facet/value dictionnaries for
matching data (or a pair, see below)

In last case, data file periods are not returned if arg
PERIODS is None and data search is optimized for the project.
In that case, the globbing is done on data directories and not
on data files, which is much faster.

If PERIODS is not None, individual data files periods are
merged among cases with same facets values

if SPLIT is not None, a pair is returned intead of the dicts list :

- first element is a dict with facets which values are the
same among all cases

- second element is the dicts list as above, but in which
facets with common values are discarded

Example :

>>> tos_data = ds(project='CMIP6', variable='tos', period='*',
table='Omon', model='CNRM*', realization='r1i1p1f*' )

>>> common_keys, varied_keys = tos_data.glob(periods=True, split=True)

>>> common_keys
{'mip': 'CMIP', 'institute': 'CNRM-CERFACS', 'experiment': 'historical',
'realization': 'r1i1p1f2', 'table': 'Omon', 'variable': 'tos',
'version': 'latest', 'period': [1850-2014], 'root': '/bdd'}

>>> varied_keys
[{'model': 'CNRM-ESM2-1' , 'grid': 'gn' },
{'model': 'CNRM-ESM2-1' , 'grid': 'gr1'},
{'model': 'CNRM-CM6-1' , 'grid': 'gn' },
{'model': 'CNRM-CM6-1' , 'grid': 'gr1'},
{'model': 'CNRM-CM6-1-HR', 'grid': 'gn' } ]

"""
dic = self.kvp.copy()
if self.alias:
filevar, _, _, _, filenameVar, _, conditions = self.alias
req_var = dic["variable"]
dic["variable"] = string.Template(filevar).safe_substitute(dic)
if filenameVar:
dic["filenameVar"] = filenameVar
clogger.debug("glob() with dic=%s" % repr(dic))
cases = []
files = selectFiles(with_periods = (periods is not None) or (what == 'files'),
return_combinations = cases, **dic)
if what == 'files' :
return files
else :
if periods is not None :
cases = group_periods(cases)
else :
# For non-optimized cases, select_files returns periods,
# but we want an even behaviour
for case in cases :
case.pop('period',None)
if split is not None :
keys = remove_keys_with_same_values(cases)
return keys, cases
else:
return cases


def explore(self, option='check_and_store', group_periods_on=None, operation='intersection', first=None):
"""
Versatile datafile exploration for a dataset which possibly has wildcards (* and ? ) in
Expand Down
Loading