Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inherit defect related triples when operations are performed on the structure #159

Merged
merged 15 commits into from
Aug 15, 2024
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.9.10
current_version = 0.9.11
commit = True
tag = False

Expand Down
2 changes: 1 addition & 1 deletion CITATION.cff
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ url: 'https://atomrdf.pyscal.org'
license: "MIT"
repository-code: https://github.com/pyscal/atomRDF
type: software
version: 0.9.10
version: 0.9.11
166 changes: 148 additions & 18 deletions atomrdf/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,13 @@ def _is_valid(self, input_list):

def _is_ontoterm(self, term):
return type(term).__name__ == "OntoTerm"


def _is_uriref(self, term):
return type(term).__name__ == "URIRef"

def _is_bnode(self, term):
return not term.toPython().startswith("http")

def _modify_triple(self, triple):
modified_triple = []
for term in triple:
Expand Down Expand Up @@ -1160,34 +1166,155 @@ def activity_ids(self):
"""

return [x[0] for x in self.triples((None, RDF.type, PROV.Activity))]

def iterate_graph(self, item, create_new_graph=False):

def _is_of_type(self, item, target_item):
"""
Check if an item is of a specific type

item - direct from graph, comparison only makes sense if it is URIRef
if item is node with https - direct comparison
if not - check the type of the item

target_item - URIRef or OntoTerm
"""
if not self._is_uriref(item):
return False

if self._is_bnode(item):
rdftype = self.value(item, RDF.type)
if rdftype is not None:
rdftype = rdftype.toPython()
else:
rdftype = item.toPython()

target_type = target_item.toPython()
return rdftype == target_type

def iterate_graph(self, item, create_new_graph=False, create_new_list=False,
stop_at_sample=False):
"""
Iterate through the graph starting from the given item.

Parameters
----------
item : object
The item to start the iteration from.
create_new_graph : bool, optional
If True, create a new KnowledgeGraph object to store the iteration results.
Default is False. The results are stored in `self.sgraph`.
create_new_list : bool, optional
If True, create a new list to store extracted triples, this is needed when
calling this function iteratively
stop_at_sample : bool, optional
If True, stops the iteration at the when a sample object is encountered. Default is False.
will only stop if `item` is a sample object

Returns
-------
None
"""
if isinstance(item, str):
item = URIRef(item)

if create_new_graph:
self.sgraph = KnowledgeGraph()
if not type(item).__name__ == 'URIRef':
return

if create_new_list:
self.slist = []
stop_at_sample = stop_at_sample and self._is_of_type(item, CMSO.AtomicScaleSample)

triples = list(self.triples((item, None, None)))

for triple in triples:
if not (stop_at_sample and self._is_of_type(triple[1], PROV.wasDerivedFrom)):
self.slist.append(triple)
self.iterate_graph(triple[2], stop_at_sample=stop_at_sample)

def iterate_and_create_graph(self, item, stop_at_sample=False):
self.iterate_graph(item, create_new_list=True, stop_at_sample=stop_at_sample)
triples = copy.deepcopy(self.slist)
self.slist = []
sgraph = KnowledgeGraph()
for triple in triples:
self.sgraph.graph.add(triple)
self.iterate_graph(triple[2])
sgraph.add(triple)
return sgraph

def get_sample(self, sample, no_atoms=False):
def _create_a_new_name(self, uristring):
"""
take a given uriref string name and create one in similar fashion
"""
raw = uristring.split(':')
if len(raw) > 1:
prologue = raw[0]+':'
else:
prologue = ''

raw = uristring.split('_')
if len(raw) > 1:
epilogue = '_'+"_".join(raw[1:])
else:
epilogue = ''
return f"{prologue}{uuid.uuid4()}{epilogue}"

def iterate_and_rename_triples(self, item):
self.iterate_graph(item, create_new_list=True)
triples = copy.deepcopy(self.slist)
#now we have to edit this triples, and reapply them
#for that we make a dict of all URIRef values in this graph
uri_dict = {}
for triple in triples:
if isinstance(triple[0], URIRef):
if triple[0].toPython() not in uri_dict.keys():
if self._is_bnode(triple[0]):
uri_dict[triple[0].toPython()] = self._create_a_new_name(triple[0].toPython())
else:
uri_dict[triple[0].toPython()] = triple[0].toPython()

new_triples = []
for triple in triples:
subject = triple[0]
if subject.toPython() in uri_dict.keys():
subject = URIRef(uri_dict[subject.toPython()])
predicate = triple[1]
object = triple[2]
if object.toPython() in uri_dict.keys():
object = URIRef(uri_dict[object.toPython()])
new_triples.append((subject, predicate, object))

return URIRef(uri_dict[item.toPython()]), new_triples

def copy_defects(self, sample, parent_sample):
"""
Copy defects from one sample to another
"""
parent_material = list([k[2] for k in self.triples((parent_sample, CMSO.hasMaterial, None))])[0]
parent_defects = list([x[2] for x in self.triples((parent_material, CMSO.hasDefect, None))])

material = list([k[2] for k in self.triples((sample, CMSO.hasMaterial, None))])[0]

for defect in parent_defects:
new_defect, defect_triples = self.iterate_and_rename_triples(defect)
#add the new defect to the new material
self.add((material, CMSO.hasDefect, new_defect))
#add the triples to the graph
for triple in defect_triples:
#print(triple)
self.add(triple)

#we need to add special items which are mapped to the sample directly
# now add the special props for vacancy, interstitial &substitional
for triple in self.triples(
(parent_sample, PODO.hasVacancyConcentration, None)
):
self.add((sample, triple[1], triple[2]))
#for triple in self.graph.triples(
# (parent_sample, PODO.hasNumberOfVacancies, None)
#):
# self.graph.add((self.sample, triple[1], triple[2]))
for triple in self.triples(
(parent_sample, PODO.hasImpurityConcentration, None)
):
self.add((sample, triple[1], triple[2]))
#for triple in self.graph.triples(
# (parent_sample, PODO.hasNumberOfImpurityAtoms, None)
#):
# self.graph.add((self.sample, triple[1], triple[2]))

def get_sample(self, sample, no_atoms=False, stop_at_sample=True):
"""
Get the Sample as a KnowledgeGraph

Expand All @@ -1199,6 +1326,9 @@ def get_sample(self, sample, no_atoms=False):
no_atoms: bool, optional
if True, returns the number of atoms in the sample

stop_at_sample: bool, optional
if True, stops the iteration at the when a sample object is encountered. Default is True.

Returns
-------
sgraph: :py:class:`RDFGraph`
Expand All @@ -1210,11 +1340,11 @@ def get_sample(self, sample, no_atoms=False):
if isinstance(sample, str):
sample = URIRef(sample)

self.iterate_graph(sample, create_new_graph=True)
sgraph = self.iterate_and_create_graph(sample, stop_at_sample=stop_at_sample)
if no_atoms:
na = self.sgraph.value(sample, CMSO.hasNumberOfAtoms).toPython()
return self.sgraph, na
return self.sgraph
na = sgraph.value(sample, CMSO.hasNumberOfAtoms).toPython()
return sgraph, na
return sgraph

def get_label(self, item):
label = self.graph.value(item, RDFS.label)
Expand Down
20 changes: 18 additions & 2 deletions atomrdf/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,11 +527,12 @@ def _make_grain_boundary_aimsgb(
to_primitive=primitive,
)
asestruct = AseAtomsAdaptor().get_atoms(structure=gb_struct)
sys = System.read.ase(asestruct, graph=graph, names=names, label=label)
sys = System.read.ase(asestruct, graph=None, names=names, label=label)
sys.atoms._lattice = structure
sys.atoms._lattice_constant = _declass(lattice_constant)
sys._structure_dict = sdict
sys.label = label
sys.graph = graph
sys.to_graph()
sys.add_property_mappings(lattice_constant, mapping_quantity='lattice_constant')
sys.add_property_mappings(ca_ratio, mapping_quantity='lattice_constant')
Expand Down Expand Up @@ -619,13 +620,14 @@ def _make_grain_boundary_inbuilt(
if 'repetitions' not in sdict.keys():
sdict['repetitions'] = repetitions

s = System(graph=graph, names=names)
s = System(graph=None, names=names)
s.box = box
s.atoms = atoms
s.atoms._lattice = structure
s.atoms._lattice_constant = _declass(lattice_constant)
s._structure_dict = sdict
s.label = label
s.graph = graph
s.to_graph()
s.add_property_mappings(lattice_constant, mapping_quantity='lattice_constant')

Expand Down Expand Up @@ -934,6 +936,7 @@ def repeat(self, repetitions):
new_system._structure_dict = {}
new_system._structure_dict["repetitions"] = repetitions
new_system.to_graph()
new_system.copy_defects(self.sample)
return new_system

def delete(self, ids=None, indices=None, condition=None, selection=False, copy_structure=False):
Expand Down Expand Up @@ -966,6 +969,7 @@ def delete(self, ids=None, indices=None, condition=None, selection=False, copy_s
sys = self.duplicate()
#and add this new structure to the graph
sys.to_graph()
sys.copy_defects(self.sample)
else:
sys = self

Expand Down Expand Up @@ -1182,6 +1186,7 @@ def substitute_atoms(
sys = self.duplicate()
#and add this new structure to the graph
sys.to_graph()
sys.copy_defects(self.sample)
else:
sys = self

Expand Down Expand Up @@ -1384,6 +1389,7 @@ def add_interstitial_impurities(
#sys = self.duplicate()
sys = System(source=sys.add_atoms({"positions": randpos, "species": element}))
sys.to_graph()
sys.copy_defects(self.sample)
else:
#sys = self.duplicate()
sys = System(source=self.add_atoms({"positions": randpos, "species": element}))
Expand Down Expand Up @@ -2408,6 +2414,7 @@ def rotate(self, rotation_vectors, graph=None, label=None):
else:
output_structure.label = self.label
output_structure.to_graph()
output_structure.copy_defects(self.sample)
if output_structure.graph is not None:
self.add_rotation_triples(rotation_vectors, output_structure.sample)
return output_structure
Expand Down Expand Up @@ -2465,6 +2472,7 @@ def translate(self, translation_vector,
sys = self.duplicate()
#and add this new structure to the graph
sys.to_graph()
sys.copy_defects(self.sample)
else:
sys = self

Expand Down Expand Up @@ -2516,6 +2524,7 @@ def shear(self, shear_vector,
sys = self.duplicate()
#and add this new structure to the graph
sys.to_graph()
sys.copy_defects(self.sample)
else:
sys = self

Expand Down Expand Up @@ -2553,3 +2562,10 @@ def add_shear_triples(self, translation_vector, plane, distance, ):
self.graph.add((plane_vector, CMSO.hasComponent_y, Literal(plane[1], datatype=XSD.float),))
self.graph.add((plane_vector, CMSO.hasComponent_z, Literal(plane[2], datatype=XSD.float),))
self.graph.add((activity, UNSAFECMSO.hasDistance, Literal(distance, datatype=XSD.float)))

def copy_defects(self, parent_sample):
if self.sample is None:
return
if parent_sample is None:
return
self.graph.copy_defects(self.sample, parent_sample)
42 changes: 2 additions & 40 deletions atomrdf/workflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,49 +207,11 @@ def _add_inherited_properties(
):
# Here we need to add inherited info: CalculatedProperties will be lost
# Defects will be inherited

if sample is None:
return

parent_material = list(
[
k[2]
for k in self.kg.triples((parent_sample, CMSO.hasMaterial, None))
]
)[0]
parent_defects = list(
[x[2] for x in self.kg.triples((parent_material, CMSO.hasDefect, None))]
)
# now for each defect we copy add this to the final sample
material = list(
[k[2] for k in self.kg.triples((sample, CMSO.hasMaterial, None))]
)[0]

for defect in parent_defects:
new_defect = URIRef(defect.toPython())
self.kg.add((material, CMSO.hasDefect, new_defect))
# now fetch all defect based info
for triple in self.kg.triples((defect, None, None)):
self.kg.add((new_defect, triple[1], triple[2]))

# now add the special props for vacancy, interstitial &substitional
for triple in self.kg.triples(
(parent_sample, PODO.hasVacancyConcentration, None)
):
self.kg.add((sample, triple[1], triple[2]))
for triple in self.kg.triples(
(parent_sample, PODO.hasNumberOfVacancies, None)
):
self.kg.add((sample, triple[1], triple[2]))
for triple in self.kg.triples(
(parent_sample, PODO.hasImpurityConcentration, None)
):
self.kg.add((sample, triple[1], triple[2]))
for triple in self.kg.triples(
(parent_sample, PODO.hasNumberOfImpurityAtoms, None)
):
self.kg.add((sample, triple[1], triple[2]))


self.kg.copy_defects(sample, parent_sample)

def _add_method(
self, job_dict,
Expand Down
Loading
Loading