diff --git a/python/python/raphtory/__init__.pyi b/python/python/raphtory/__init__.pyi index 7a6ad44fc5..d000b539d3 100644 --- a/python/python/raphtory/__init__.pyi +++ b/python/python/raphtory/__init__.pyi @@ -1235,6 +1235,25 @@ class Graph(GraphView): path (str): The path to the cache file """ + def create_node( + self, + timestamp: TimeInput, + id: str | int, + properties: Optional[PropInput] = None, + node_type: Optional[str] = None, + ) -> MutableNode: + """ + Creates a new node with the given id and properties to the graph. It fails if the node already exists. + + Arguments: + timestamp (TimeInput): The timestamp of the node. + id (str|int): The id of the node. + properties (PropInput, optional): The properties of the node. + node_type (str, optional): The optional string which will be used as a node type + Returns: + MutableNode: The created node + """ + @staticmethod def deserialise(bytes: bytes): """ @@ -3506,6 +3525,26 @@ class PersistentGraph(GraphView): path (str): The path to the cache file """ + def create_node( + self, + timestamp: TimeInput, + id: str | int, + properties: dict = None, + node_type: str = None, + ): + """ + Creates a new node with the given id and properties to the graph. It fails if the node already exists. + + Arguments: + timestamp (TimeInput): The timestamp of the node. + id (str | int): The id of the node. + properties (dict): The properties of the node. + node_type (str) : The optional string which will be used as a node type + + Returns: + MutableNode + """ + def delete_edge( self, timestamp: int, src: str | int, dst: str | int, layer: str = None ): diff --git a/python/python/raphtory/graphql/__init__.pyi b/python/python/raphtory/graphql/__init__.pyi index 28827d4877..c66635a92c 100644 --- a/python/python/raphtory/graphql/__init__.pyi +++ b/python/python/raphtory/graphql/__init__.pyi @@ -329,9 +329,6 @@ class RemoteEdgeAddition(object): """Create and return a new object. See help(type) for accurate signature.""" class RemoteGraph(object): - def __new__(self, path, client) -> RemoteGraph: - """Create and return a new object. See help(type) for accurate signature.""" - def add_constant_properties(self, properties: dict): """ Adds constant properties to the remote graph. @@ -406,6 +403,25 @@ class RemoteGraph(object): properties (dict): The temporal properties of the graph. """ + def create_node( + self, + timestamp: int | str | datetime, + id: str | int, + properties: Optional[dict] = None, + node_type: Optional[str] = None, + ): + """ + Create a new node with the given id and properties to the remote graph and fail if the node already exists. + + Arguments: + timestamp (int|str|datetime): The timestamp of the node. + id (str|int): The id of the node. + properties (dict, optional): The properties of the node. + node_type (str, optional): The optional string which will be used as a node type + Returns: + RemoteNode + """ + def delete_edge( self, timestamp: int, diff --git a/python/tests/graphql/edit_graph/test_graphql.py b/python/tests/graphql/edit_graph/test_graphql.py index 28b36f959d..a75d6259f0 100644 --- a/python/tests/graphql/edit_graph/test_graphql.py +++ b/python/tests/graphql/edit_graph/test_graphql.py @@ -21,8 +21,8 @@ def test_encode_graph(): encoded = encode_graph(g) assert ( - encoded - == "EgxaCgoIX2RlZmF1bHQSDBIKCghfZGVmYXVsdBoFCgNiZW4aCQoFaGFtemEYARoLCgdoYWFyb29uGAIiAhABIgYIAhABGAEiBBACGAIqAhoAKgQSAhABKgQSAhADKgIKACoGEgQIARABKgYSBAgBEAIqBAoCCAEqBhIECAIQAioGEgQIAhADKgQKAggCKgQ6AhABKgIyACoIOgYIARACGAEqBDICCAEqCDoGCAIQAxgCKgQyAggC" + encoded + == "EgxaCgoIX2RlZmF1bHQSDBIKCghfZGVmYXVsdBoFCgNiZW4aCQoFaGFtemEYARoLCgdoYWFyb29uGAIiAhABIgYIAhABGAEiBBACGAIqAhoAKgQSAhABKgQSAhADKgIKACoGEgQIARABKgYSBAgBEAIqBAoCCAEqBhIECAIQAioGEgQIAhADKgQKAggCKgQ6AhABKgIyACoIOgYIARACGAEqBDICCAEqCDoGCAIQAxgCKgQyAggC" ) @@ -42,8 +42,8 @@ def test_wrong_url(): with pytest.raises(Exception) as excinfo: client = RaphtoryClient("http://broken_url.com") assert ( - str(excinfo.value) - == "Could not connect to the given server - no response --error sending request for url (http://broken_url.com/)" + str(excinfo.value) + == "Could not connect to the given server - no response --error sending request for url (http://broken_url.com/)" ) @@ -393,16 +393,14 @@ def test_create_node(): query_nodes = """{graph(path: "g") {nodes {list {name}}}}""" assert client.query(query_nodes) == { - "graph": { - "nodes": { - "list": [{"name": "ben"}, {"name": "shivam"}] - } - } + "graph": {"nodes": {"list": [{"name": "ben"}, {"name": "shivam"}]}} } create_node_query = """{updateGraph(path: "g") { createNode(time: 0, name: "oogway") { success } }}""" - assert client.query(create_node_query) == {"updateGraph": {"createNode": {"success": True}}} + assert client.query(create_node_query) == { + "updateGraph": {"createNode": {"success": True}} + } assert client.query(query_nodes) == { "graph": { "nodes": { @@ -428,11 +426,7 @@ def test_create_node_using_client(): query_nodes = """{graph(path: "g") {nodes {list {name}}}}""" assert client.query(query_nodes) == { - "graph": { - "nodes": { - "list": [{"name": "ben"}, {"name": "shivam"}] - } - } + "graph": {"nodes": {"list": [{"name": "ben"}, {"name": "shivam"}]}} } remote_graph = client.remote_graph(path="g") @@ -460,23 +454,56 @@ def test_create_node_using_client_with_properties(): client = RaphtoryClient("http://localhost:1737") client.send_graph(path="g", graph=g) - query_nodes = """{graph(path: "g") {nodes {list {name, properties { keys }}}}}""" + query_nodes = ( + """{graph(path: "g") {nodes {list {name, properties { keys }}}}}""" + ) assert client.query(query_nodes) == { "graph": { "nodes": { - "list": [{"name": "ben", 'properties': {'keys': []}}, {"name": "shivam", 'properties': {'keys': []}}] + "list": [ + {"name": "ben", "properties": {"keys": []}}, + {"name": "shivam", "properties": {"keys": []}}, + ] } } } remote_graph = client.remote_graph(path="g") - remote_graph.create_node(timestamp=0, id="oogway", properties={"prop1": 60, "prop2": 31.3, "prop3": "abc123", "prop4": True, "prop5": [1, 2, 3]}) - nodes = json.loads(json.dumps(client.query(query_nodes)))['graph']['nodes']['list'] - node_oogway = next(node for node in nodes if node['name'] == 'oogway') - assert sorted(node_oogway['properties']['keys']) == ['prop1', 'prop2', 'prop3', 'prop4', 'prop5'] + remote_graph.create_node( + timestamp=0, + id="oogway", + properties={ + "prop1": 60, + "prop2": 31.3, + "prop3": "abc123", + "prop4": True, + "prop5": [1, 2, 3], + }, + ) + nodes = json.loads(json.dumps(client.query(query_nodes)))["graph"]["nodes"][ + "list" + ] + node_oogway = next(node for node in nodes if node["name"] == "oogway") + assert sorted(node_oogway["properties"]["keys"]) == [ + "prop1", + "prop2", + "prop3", + "prop4", + "prop5", + ] with pytest.raises(Exception) as excinfo: - remote_graph.create_node(timestamp=0, id="oogway", properties={"prop1": 60, "prop2": 31.3, "prop3": "abc123", "prop4": True, "prop5": [1, 2, 3]}) + remote_graph.create_node( + timestamp=0, + id="oogway", + properties={ + "prop1": 60, + "prop2": 31.3, + "prop3": "abc123", + "prop4": True, + "prop5": [1, 2, 3], + }, + ) assert "Node already exists" in str(excinfo.value) @@ -494,20 +521,57 @@ def test_create_node_using_client_with_properties_node_type(): assert client.query(query_nodes) == { "graph": { "nodes": { - "list": [{"name": "ben", 'nodeType': None, 'properties': {'keys': []}}, {"name": "shivam", 'nodeType': None, 'properties': {'keys': []}}] + "list": [ + {"name": "ben", "nodeType": None, "properties": {"keys": []}}, + { + "name": "shivam", + "nodeType": None, + "properties": {"keys": []}, + }, + ] } } } remote_graph = client.remote_graph(path="g") - remote_graph.create_node(timestamp=0, id="oogway", properties={"prop1": 60, "prop2": 31.3, "prop3": "abc123", "prop4": True, "prop5": [1, 2, 3]}, node_type="master") - nodes = json.loads(json.dumps(client.query(query_nodes)))['graph']['nodes']['list'] - node_oogway = next(node for node in nodes if node['name'] == 'oogway') - assert node_oogway['nodeType'] == 'master' - assert sorted(node_oogway['properties']['keys']) == ['prop1', 'prop2', 'prop3', 'prop4', 'prop5'] + remote_graph.create_node( + timestamp=0, + id="oogway", + properties={ + "prop1": 60, + "prop2": 31.3, + "prop3": "abc123", + "prop4": True, + "prop5": [1, 2, 3], + }, + node_type="master", + ) + nodes = json.loads(json.dumps(client.query(query_nodes)))["graph"]["nodes"][ + "list" + ] + node_oogway = next(node for node in nodes if node["name"] == "oogway") + assert node_oogway["nodeType"] == "master" + assert sorted(node_oogway["properties"]["keys"]) == [ + "prop1", + "prop2", + "prop3", + "prop4", + "prop5", + ] with pytest.raises(Exception) as excinfo: - remote_graph.create_node(timestamp=0, id="oogway", properties={"prop1": 60, "prop2": 31.3, "prop3": "abc123", "prop4": True, "prop5": [1, 2, 3]}, node_type="master") + remote_graph.create_node( + timestamp=0, + id="oogway", + properties={ + "prop1": 60, + "prop2": 31.3, + "prop3": "abc123", + "prop4": True, + "prop5": [1, 2, 3], + }, + node_type="master", + ) assert "Node already exists" in str(excinfo.value) @@ -525,7 +589,10 @@ def test_create_node_using_client_with_node_type(): assert client.query(query_nodes) == { "graph": { "nodes": { - "list": [{"name": "ben", 'nodeType': None}, {"name": "shivam", 'nodeType': None}] + "list": [ + {"name": "ben", "nodeType": None}, + {"name": "shivam", "nodeType": None}, + ] } } } @@ -535,7 +602,11 @@ def test_create_node_using_client_with_node_type(): assert client.query(query_nodes) == { "graph": { "nodes": { - "list": [{"name": "ben", 'nodeType': None}, {"name": "shivam", 'nodeType': None}, {"name": "oogway", 'nodeType': "master"}] + "list": [ + {"name": "ben", "nodeType": None}, + {"name": "shivam", "nodeType": None}, + {"name": "oogway", "nodeType": "master"}, + ] } } } diff --git a/python/tests/test_graphdb/test_graphdb.py b/python/tests/test_graphdb/test_graphdb.py index eb1c43b6bd..bed24790ca 100644 --- a/python/tests/test_graphdb/test_graphdb.py +++ b/python/tests/test_graphdb/test_graphdb.py @@ -353,8 +353,8 @@ def test_getitem(): @with_disk_graph def check(g): assert ( - g.node(1).properties.temporal.get("cost") - == g.node(1).properties.temporal["cost"] + g.node(1).properties.temporal.get("cost") + == g.node(1).properties.temporal["cost"] ) check(g) @@ -607,7 +607,7 @@ def time_history_test(time, key, value): assert g.at(time).node(1).properties.temporal.get(key) is None assert g.at(time).nodes.properties.temporal.get(key) is None assert ( - g.at(time).nodes.out_neighbours.properties.temporal.get(key) is None + g.at(time).nodes.out_neighbours.properties.temporal.get(key) is None ) else: assert g.at(time).node(1).properties.temporal.get(key).items() == value @@ -812,22 +812,22 @@ def no_static_property_test(key, value): assert sorted(g.node(1).properties.temporal.keys()) == expected_names_no_static assert sorted(g.nodes.properties.temporal.keys()) == expected_names_no_static assert ( - sorted(g.nodes.out_neighbours.properties.temporal.keys()) - == expected_names_no_static + sorted(g.nodes.out_neighbours.properties.temporal.keys()) + == expected_names_no_static ) expected_names_no_static_at_1 = sorted(["prop 4", "prop 1", "prop 3"]) assert ( - sorted(g.at(1).node(1).properties.temporal.keys()) - == expected_names_no_static_at_1 + sorted(g.at(1).node(1).properties.temporal.keys()) + == expected_names_no_static_at_1 ) assert ( - sorted(g.at(1).nodes.properties.temporal.keys()) - == expected_names_no_static_at_1 + sorted(g.at(1).nodes.properties.temporal.keys()) + == expected_names_no_static_at_1 ) assert ( - sorted(g.at(1).nodes.out_neighbours.properties.temporal.keys()) - == expected_names_no_static_at_1 + sorted(g.at(1).nodes.out_neighbours.properties.temporal.keys()) + == expected_names_no_static_at_1 ) # testing has_property @@ -1325,11 +1325,11 @@ def test_constant_property_update(): def updates(v): v.update_constant_properties({"prop1": "value1", "prop2": 123}) assert ( - v.properties.get("prop1") == "value1" and v.properties.get("prop2") == 123 + v.properties.get("prop1") == "value1" and v.properties.get("prop2") == 123 ) v.update_constant_properties({"prop1": "value2", "prop2": 345}) assert ( - v.properties.get("prop1") == "value2" and v.properties.get("prop2") == 345 + v.properties.get("prop1") == "value2" and v.properties.get("prop2") == 345 ) v.add_constant_properties({"name": "value1"}) @@ -1666,18 +1666,18 @@ def check(g): assert g.exclude_layer("layer2").count_edges() == 4 with pytest.raises( - Exception, - match=re.escape( - "Invalid layer: test_layer. Valid layers: _default, layer1, layer2" - ), + Exception, + match=re.escape( + "Invalid layer: test_layer. Valid layers: _default, layer1, layer2" + ), ): g.layers(["test_layer"]) with pytest.raises( - Exception, - match=re.escape( - "Invalid layer: test_layer. Valid layers: _default, layer1, layer2" - ), + Exception, + match=re.escape( + "Invalid layer: test_layer. Valid layers: _default, layer1, layer2" + ), ): g.edge(1, 2).layers(["test_layer"]) @@ -1754,20 +1754,20 @@ def test_layer_name(): assert str(e.value) == error_msg assert [ - list(iterator) for iterator in g.nodes.neighbours.edges.explode().layer_name - ] == [ - ["_default", "awesome layer"], - ["_default", "awesome layer"], - ["_default", "awesome layer"], - ] + list(iterator) for iterator in g.nodes.neighbours.edges.explode().layer_name + ] == [ + ["_default", "awesome layer"], + ["_default", "awesome layer"], + ["_default", "awesome layer"], + ] assert [ - list(iterator) - for iterator in g.nodes.neighbours.edges.explode_layers().layer_name - ] == [ - ["_default", "awesome layer"], - ["_default", "awesome layer"], - ["_default", "awesome layer"], - ] + list(iterator) + for iterator in g.nodes.neighbours.edges.explode_layers().layer_name + ] == [ + ["_default", "awesome layer"], + ["_default", "awesome layer"], + ["_default", "awesome layer"], + ] def test_time(): @@ -1801,12 +1801,12 @@ def check(g): # assert str(e.value) == error_msg assert [ - list(iterator) for iterator in g.nodes.neighbours.edges.explode().time - ] == [ - [0, 0, 1], - [0, 0, 1], - [0, 0, 1], - ] + list(iterator) for iterator in g.nodes.neighbours.edges.explode().time + ] == [ + [0, 0, 1], + [0, 0, 1], + [0, 0, 1], + ] check(g) @@ -2371,8 +2371,8 @@ def test_weird_windows(): @with_disk_graph def check(g): with pytest.raises( - Exception, - match="'ddd' is not a valid datetime, valid formats are RFC3339, RFC2822, %Y-%m-%d, %Y-%m-%dT%H:%M:%S%.3f, %Y-%m-%dT%H:%M:%S%, %Y-%m-%d %H:%M:%S%.3f and %Y-%m-%d %H:%M:%S%", + Exception, + match="'ddd' is not a valid datetime, valid formats are RFC3339, RFC2822, %Y-%m-%d, %Y-%m-%dT%H:%M:%S%.3f, %Y-%m-%dT%H:%M:%S%, %Y-%m-%d %H:%M:%S%.3f and %Y-%m-%d %H:%M:%S%", ): g.window("ddd", "aaa") @@ -2569,9 +2569,9 @@ def check(g): assert g.nodes.type_filter(["a"]).neighbours.type_filter( ["c"] ).name.collect() == [ - [], - ["5"], - ] + [], + ["5"], + ] assert g.nodes.type_filter(["a"]).neighbours.type_filter([]).name.collect() == [ [], [], @@ -2582,9 +2582,9 @@ def check(g): assert g.nodes.type_filter(["a"]).neighbours.type_filter( ["d"] ).name.collect() == [ - [], - [], - ] + [], + [], + ] assert g.nodes.type_filter(["a"]).neighbours.neighbours.name.collect() == [ ["1", "3", "4"], ["1", "3", "4", "4", "6"], @@ -2639,20 +2639,20 @@ def check(g): for edge in edges: time_nested.append(edge.time) assert [ - item - for sublist in g.nodes.edges.explode().time.collect() - for item in sublist - ] == time_nested + item + for sublist in g.nodes.edges.explode().time.collect() + for item in sublist + ] == time_nested date_time_nested = [] for edges in g.nodes.edges.explode(): for edge in edges: date_time_nested.append(edge.date_time) assert [ - item - for sublist in g.nodes.edges.explode().date_time.collect() - for item in sublist - ] == date_time_nested + item + for sublist in g.nodes.edges.explode().date_time.collect() + for item in sublist + ] == date_time_nested check(g) @@ -2966,44 +2966,44 @@ def check(g): assert len(index.fuzzy_search_nodes("name:habza", levenshtein_distance=1)) == 1 assert ( - len( - index.fuzzy_search_nodes( - "name:haa", levenshtein_distance=1, prefix=True - ) + len( + index.fuzzy_search_nodes( + "name:haa", levenshtein_distance=1, prefix=True ) - == 2 + ) + == 2 ) assert ( - len( - index.fuzzy_search_nodes( - "value_str:abc123", levenshtein_distance=2, prefix=True - ) + len( + index.fuzzy_search_nodes( + "value_str:abc123", levenshtein_distance=2, prefix=True ) - == 2 + ) + == 2 ) assert ( - len( - index.fuzzy_search_nodes( - "value_str:dsss312", levenshtein_distance=2, prefix=False - ) + len( + index.fuzzy_search_nodes( + "value_str:dsss312", levenshtein_distance=2, prefix=False ) - == 1 + ) + == 1 ) assert len(index.fuzzy_search_edges("from:bon", levenshtein_distance=1)) == 2 assert ( - len( - index.fuzzy_search_edges("from:bo", levenshtein_distance=1, prefix=True) - ) - == 2 + len( + index.fuzzy_search_edges("from:bo", levenshtein_distance=1, prefix=True) + ) + == 2 ) assert ( - len( - index.fuzzy_search_edges( - "from:eon", levenshtein_distance=2, prefix=True - ) + len( + index.fuzzy_search_edges( + "from:eon", levenshtein_distance=2, prefix=True ) - == 2 + ) + == 2 ) check(g) @@ -3011,26 +3011,38 @@ def check(g): def test_create_node_graph(): g = Graph() - g.create_node(1, "shivam", properties={"value": 60, "value_f": 31.3, "value_str": "abc123"}) + g.create_node( + 1, "shivam", properties={"value": 60, "value_f": 31.3, "value_str": "abc123"} + ) node = g.node("shivam") assert node.name == "shivam" assert node.properties == {"value": 60, "value_f": 31.3, "value_str": "abc123"} with pytest.raises(Exception) as excinfo: - g.create_node(1, "shivam", properties={"value": 60, "value_f": 31.3, "value_str": "abc123"}) + g.create_node( + 1, + "shivam", + properties={"value": 60, "value_f": 31.3, "value_str": "abc123"}, + ) assert "Node already exists" in str(excinfo.value) def test_create_node_graph_with_deletion(): g = PersistentGraph() - g.create_node(1, "shivam", properties={"value": 60, "value_f": 31.3, "value_str": "abc123"}) + g.create_node( + 1, "shivam", properties={"value": 60, "value_f": 31.3, "value_str": "abc123"} + ) node = g.node("shivam") assert node.name == "shivam" assert node.properties == {"value": 60, "value_f": 31.3, "value_str": "abc123"} with pytest.raises(Exception) as excinfo: - g.create_node(1, "shivam", properties={"value": 60, "value_f": 31.3, "value_str": "abc123"}) + g.create_node( + 1, + "shivam", + properties={"value": 60, "value_f": 31.3, "value_str": "abc123"}, + ) assert "Node already exists" in str(excinfo.value) @@ -3047,6 +3059,7 @@ def datadir(tmpdir, request): raise e return tmpdir + # def currently_broken_fuzzy_search(): #TODO: Fix fuzzy searching for properties # g = Graph() # g.add_edge(2,"haaroon","hamza", properties={"value":60,"value_f":31.3,"value_str":"abc123"}) diff --git a/python/tests/test_graphdb/test_graphdb_imports.py b/python/tests/test_graphdb/test_graphdb_imports.py index 1ecd5c2dab..1fd00fa44a 100644 --- a/python/tests/test_graphdb/test_graphdb_imports.py +++ b/python/tests/test_graphdb/test_graphdb_imports.py @@ -287,3 +287,31 @@ def test_import_edges_as_force(): z = gg.node("Z") assert z.name == "Z" assert z.history().tolist() == [2, 3] + + +def test_import_edges(): + g = Graph() + g.add_node(1, 1) + g.add_node(1, 2) + g.add_node(1, 3) + g.add_edge(1, 4, 5) + g.add_edge(1, 6, 7) + g.add_edge(1, 8, 9) + g2 = Graph() + g2.import_edges(g.edges) + assert g2.count_edges() == 3 + assert g.edges.id == g2.edges.id + + +def test_import_edges_iterator(): + g = Graph() + g.add_node(1, 1) + g.add_node(1, 2) + g.add_node(1, 3) + g.add_edge(1, 4, 5) + g.add_edge(1, 6, 7) + g.add_edge(1, 8, 9) + g2 = Graph() + g2.import_edges(iter(g.edges)) + assert g2.count_edges() == 3 + assert g.edges.id == g2.edges.id diff --git a/raphtory-api/src/core/storage/timeindex.rs b/raphtory-api/src/core/storage/timeindex.rs index 7b6d214dd4..c96e0002ca 100644 --- a/raphtory-api/src/core/storage/timeindex.rs +++ b/raphtory-api/src/core/storage/timeindex.rs @@ -23,6 +23,62 @@ pub trait AsTime: fmt::Debug + Copy + Ord + Eq + Send + Sync + 'static { fn new(t: i64, s: usize) -> Self; } +pub trait TimeIndexIntoOps: Sized { + type IndexType: AsTime; + type RangeType: TimeIndexIntoOps; + + fn into_range(self, w: Range) -> Self::RangeType; + + fn into_range_t(self, w: Range) -> Self::RangeType { + self.into_range(Self::IndexType::range(w)) + } + + fn into_iter(self) -> impl Iterator + Send; + + fn into_iter_t(self) -> impl Iterator + Send { + self.into_iter().map(|time| time.t()) + } +} + +pub trait TimeIndexOps: Send + Sync { + type IndexType: AsTime; + type RangeType<'a>: TimeIndexOps + 'a + where + Self: 'a; + + fn active(&self, w: Range) -> bool; + + fn active_t(&self, w: Range) -> bool { + self.active(Self::IndexType::range(w)) + } + + fn range(&self, w: Range) -> Self::RangeType<'_>; + + fn range_t(&self, w: Range) -> Self::RangeType<'_> { + self.range(Self::IndexType::range(w)) + } + + fn first_t(&self) -> Option { + self.first().map(|ti| ti.t()) + } + + fn first(&self) -> Option; + + fn last_t(&self) -> Option { + self.last().map(|ti| ti.t()) + } + + fn last(&self) -> Option; + + fn iter(&self) -> Box + Send + '_>; + + fn iter_t(&self) -> Box + Send + '_> { + Box::new(self.iter().map(|time| time.t())) + } + + fn len(&self) -> usize; +} + impl From for TimeIndexEntry { fn from(value: i64) -> Self { Self::start(value) diff --git a/raphtory-api/src/iter.rs b/raphtory-api/src/iter.rs new file mode 100644 index 0000000000..840687fa0e --- /dev/null +++ b/raphtory-api/src/iter.rs @@ -0,0 +1,12 @@ +pub type BoxedIter = Box + Send>; +pub type BoxedLIter<'a, T> = Box + Send + 'a>; + +pub trait IntoDynBoxed<'a, T> { + fn into_dyn_boxed(self) -> BoxedLIter<'a, T>; +} + +impl<'a, T, I: Iterator + Send + 'a> IntoDynBoxed<'a, T> for I { + fn into_dyn_boxed(self) -> BoxedLIter<'a, T> { + Box::new(self) + } +} diff --git a/raphtory-api/src/lib.rs b/raphtory-api/src/lib.rs index c90b7d8bb9..a5bb02383c 100644 --- a/raphtory-api/src/lib.rs +++ b/raphtory-api/src/lib.rs @@ -3,3 +3,5 @@ pub mod compute; pub mod core; #[cfg(feature = "python")] pub mod python; + +pub mod iter; diff --git a/raphtory-cypher/src/executor/table_provider/edge.rs b/raphtory-cypher/src/executor/table_provider/edge.rs index a980874c63..3312b2e005 100644 --- a/raphtory-cypher/src/executor/table_provider/edge.rs +++ b/raphtory-cypher/src/executor/table_provider/edge.rs @@ -54,7 +54,7 @@ impl EdgeListTableProvider { .as_ref() .layer(layer_id) .edges_storage() - .time() + .time_col() .values() .len(); @@ -186,7 +186,7 @@ fn produce_record_batch( let layer = graph.as_ref().layer(layer_id); let edges = layer.edges_storage(); - let chunked_lists_ts = edges.time(); + let chunked_lists_ts = edges.time_col(); let offsets = chunked_lists_ts.offsets(); // FIXME: potentially implement into_iter_chunks() for chunked arrays to avoid having to collect these chunks, if it turns out to be a problem let time_values_chunks = chunked_lists_ts @@ -257,7 +257,7 @@ fn produce_record_batch( let column_ids = layer .edges_storage() - .data_type() + .prop_dtypes() .iter() .enumerate() .skip(1) // first one is supposed to be time diff --git a/raphtory-cypher/src/transpiler/mod.rs b/raphtory-cypher/src/transpiler/mod.rs index 2b129d4847..ea9099edc1 100644 --- a/raphtory-cypher/src/transpiler/mod.rs +++ b/raphtory-cypher/src/transpiler/mod.rs @@ -262,7 +262,7 @@ fn scan_edges_as_sql_cte( // TODO: this needs to match the schema from EdgeListTableProvider fn full_layer_fields(graph: &DiskGraphStorage, layer_id: usize) -> Option { - let dt = graph.as_ref().layer(layer_id).edges_props_data_type(); + let dt = graph.as_ref().layer(layer_id).edges_props_data_type()?; let arr_dt: arrow_schema::DataType = dt.clone().into(); match arr_dt { arrow_schema::DataType::Struct(fields) => { diff --git a/raphtory/Cargo.toml b/raphtory/Cargo.toml index b21b95e78b..246d42176e 100644 --- a/raphtory/Cargo.toml +++ b/raphtory/Cargo.toml @@ -40,7 +40,7 @@ quad-rand = { workspace = true } serde_json = { workspace = true } ouroboros = { workspace = true } either = { workspace = true } -kdam = { workspace = true } +kdam = { workspace = true, optional = true} bytemuck = { workspace = true } tracing = { workspace = true } diff --git a/raphtory/src/core/entities/properties/graph_meta.rs b/raphtory/src/core/entities/properties/graph_meta.rs index e40837ff47..57abbcf780 100644 --- a/raphtory/src/core/entities/properties/graph_meta.rs +++ b/raphtory/src/core/entities/properties/graph_meta.rs @@ -11,7 +11,9 @@ use raphtory_api::core::storage::{ FxDashMap, }; use serde::{Deserialize, Serialize}; -use std::ops::{Deref, DerefMut}; +#[cfg(feature = "proto")] +use std::ops::Deref; +use std::ops::DerefMut; #[derive(Serialize, Deserialize, Debug)] pub struct GraphMeta { diff --git a/raphtory/src/core/mod.rs b/raphtory/src/core/mod.rs index 52611058f1..58205c3010 100644 --- a/raphtory/src/core/mod.rs +++ b/raphtory/src/core/mod.rs @@ -32,7 +32,6 @@ use chrono::{DateTime, NaiveDateTime, Utc}; use itertools::Itertools; use raphtory_api::core::storage::arc_str::ArcStr; use serde::{Deserialize, Serialize}; -use serde_json::Value; use std::{ cmp::Ordering, collections::HashMap, @@ -171,39 +170,6 @@ impl PartialOrd for Prop { } impl Prop { - pub fn to_json(&self) -> Value { - match self { - Prop::Str(value) => Value::String(value.to_string()), - Prop::U8(value) => Value::Number((*value).into()), - Prop::U16(value) => Value::Number((*value).into()), - Prop::I32(value) => Value::Number((*value).into()), - Prop::I64(value) => Value::Number((*value).into()), - Prop::U32(value) => Value::Number((*value).into()), - Prop::U64(value) => Value::Number((*value).into()), - Prop::F32(value) => Value::Number(serde_json::Number::from_f64(*value as f64).unwrap()), - Prop::F64(value) => Value::Number(serde_json::Number::from_f64(*value).unwrap()), - Prop::Bool(value) => Value::Bool(*value), - Prop::List(value) => { - let vec: Vec = value.iter().map(|v| v.to_json()).collect(); - Value::Array(vec) - } - Prop::Map(value) => { - let map: serde_json::Map = value - .iter() - .map(|(k, v)| (k.to_string(), v.to_json())) - .collect(); - Value::Object(map) - } - Prop::DTime(value) => Value::String(value.to_string()), - Prop::NDTime(value) => Value::String(value.to_string()), - Prop::Graph(_) => Value::String("Graph cannot be converted to JSON".to_string()), - Prop::PersistentGraph(_) => { - Value::String("Persistent Graph cannot be converted to JSON".to_string()) - } - Prop::Document(DocumentInput { content, .. }) => Value::String(content.to_owned()), // TODO: return Value::Object ?? - } - } - pub fn dtype(&self) -> PropType { match self { Prop::Str(_) => PropType::Str, diff --git a/raphtory/src/core/storage/timeindex.rs b/raphtory/src/core/storage/timeindex.rs index fe58e28693..e79e71c247 100644 --- a/raphtory/src/core/storage/timeindex.rs +++ b/raphtory/src/core/storage/timeindex.rs @@ -281,62 +281,6 @@ impl<'a, T: AsTime, Ops: TimeIndexOps, V: AsRef> + Send } } -pub trait TimeIndexOps: Send + Sync { - type IndexType: AsTime; - type RangeType<'a>: TimeIndexOps + 'a - where - Self: 'a; - - fn active(&self, w: Range) -> bool; - - fn active_t(&self, w: Range) -> bool { - self.active(Self::IndexType::range(w)) - } - - fn range(&self, w: Range) -> Self::RangeType<'_>; - - fn range_t(&self, w: Range) -> Self::RangeType<'_> { - self.range(Self::IndexType::range(w)) - } - - fn first_t(&self) -> Option { - self.first().map(|ti| ti.t()) - } - - fn first(&self) -> Option; - - fn last_t(&self) -> Option { - self.last().map(|ti| ti.t()) - } - - fn last(&self) -> Option; - - fn iter(&self) -> Box + Send + '_>; - - fn iter_t(&self) -> Box + Send + '_> { - Box::new(self.iter().map(|time| time.t())) - } - - fn len(&self) -> usize; -} - -pub trait TimeIndexIntoOps: Sized { - type IndexType: AsTime; - type RangeType: TimeIndexIntoOps; - - fn into_range(self, w: Range) -> Self::RangeType; - - fn into_range_t(self, w: Range) -> Self::RangeType { - self.into_range(Self::IndexType::range(w)) - } - - fn into_iter(self) -> impl Iterator + Send; - - fn into_iter_t(self) -> impl Iterator + Send { - self.into_iter().map(|time| time.t()) - } -} - impl TimeIndexOps for TimeIndex { type IndexType = T; type RangeType<'a> diff --git a/raphtory/src/db/api/storage/graph/nodes/node_entry.rs b/raphtory/src/db/api/storage/graph/nodes/node_entry.rs index efc5c11c8d..690aaf3d57 100644 --- a/raphtory/src/db/api/storage/graph/nodes/node_entry.rs +++ b/raphtory/src/db/api/storage/graph/nodes/node_entry.rs @@ -69,7 +69,7 @@ impl<'b> NodeStorageEntry<'b> { self, layers: &LayerIds, dir: Direction, - ) -> impl Iterator + 'b { + ) -> impl Iterator + use<'b, '_> { match self { NodeStorageEntry::Mem(entry) => { StorageVariants::Mem(GenLockedIter::from(entry, |entry| { diff --git a/raphtory/src/db/api/view/mod.rs b/raphtory/src/db/api/view/mod.rs index 8456aa491a..dff00df179 100644 --- a/raphtory/src/db/api/view/mod.rs +++ b/raphtory/src/db/api/view/mod.rs @@ -27,15 +27,4 @@ pub use node_property_filter::NodePropertyFilterOps; pub use reset_filter::*; pub use time::*; -pub type BoxedIter = Box + Send>; -pub type BoxedLIter<'a, T> = Box + Send + 'a>; - -pub trait IntoDynBoxed<'a, T> { - fn into_dyn_boxed(self) -> BoxedLIter<'a, T>; -} - -impl<'a, T, I: Iterator + Send + 'a> IntoDynBoxed<'a, T> for I { - fn into_dyn_boxed(self) -> BoxedLIter<'a, T> { - Box::new(self) - } -} +pub use raphtory_api::iter::{BoxedIter, BoxedLIter, IntoDynBoxed}; diff --git a/raphtory/src/db/graph/graph.rs b/raphtory/src/db/graph/graph.rs index 5796e1c3e1..6f96e35d07 100644 --- a/raphtory/src/db/graph/graph.rs +++ b/raphtory/src/db/graph/graph.rs @@ -611,73 +611,6 @@ mod db_tests { .all(|&(_, src, dst)| g.edge(src, dst).is_some()) } - #[test] - fn prop_json_test() { - let g = Graph::new(); - let _ = g.add_node(0, "A", NO_PROPS, None).unwrap(); - let _ = g.add_node(0, "B", NO_PROPS, None).unwrap(); - let e = g.add_edge(0, "A", "B", NO_PROPS, None).unwrap(); - e.add_constant_properties(vec![("aprop".to_string(), Prop::Bool(true))], None) - .unwrap(); - let ee = g.add_edge(0, "A", "B", NO_PROPS, Some("LAYERA")).unwrap(); - ee.add_constant_properties( - vec![("aprop".to_string(), Prop::Bool(false))], - Some("LAYERA"), - ) - .unwrap(); - let json_res = g - .edge("A", "B") - .unwrap() - .properties() - .constant() - .get("aprop") - .unwrap() - .to_json(); - let json_as_map = json_res.as_object().unwrap(); - assert_eq!(json_as_map.len(), 2); - assert_eq!(json_as_map.get("LAYERA"), Some(&Value::Bool(false))); - assert_eq!(json_as_map.get("_default"), Some(&Value::Bool(true))); - - let eee = g.add_edge(0, "A", "B", NO_PROPS, Some("LAYERB")).unwrap(); - let v: Vec = vec![Prop::Bool(true), Prop::Bool(false), Prop::U64(0)]; - eee.add_constant_properties( - vec![("bprop".to_string(), Prop::List(Arc::new(v)))], - Some("LAYERB"), - ) - .unwrap(); - let json_res = g - .edge("A", "B") - .unwrap() - .properties() - .constant() - .get("bprop") - .unwrap() - .to_json(); - let list_res = json_res.as_object().unwrap().get("LAYERB").unwrap(); - assert_eq!(list_res.as_array().unwrap().len(), 3); - - let eeee = g.add_edge(0, "A", "B", NO_PROPS, Some("LAYERC")).unwrap(); - let v: HashMap = HashMap::from([ - (ArcStr::from("H".to_string()), Prop::Bool(false)), - (ArcStr::from("Y".to_string()), Prop::U64(0)), - ]); - eeee.add_constant_properties( - vec![("mymap".to_string(), Prop::Map(Arc::new(v)))], - Some("LAYERC"), - ) - .unwrap(); - let json_res = g - .edge("A", "B") - .unwrap() - .properties() - .constant() - .get("mymap") - .unwrap() - .to_json(); - let map_res = json_res.as_object().unwrap().get("LAYERC").unwrap(); - assert_eq!(map_res.as_object().unwrap().len(), 2); - } - #[test] fn import_from_another_graph() { let g = Graph::new(); diff --git a/raphtory/src/disk_graph/graph_impl/edge_storage_ops.rs b/raphtory/src/disk_graph/graph_impl/edge_storage_ops.rs index d8e8388e03..f0c124ae72 100644 --- a/raphtory/src/disk_graph/graph_impl/edge_storage_ops.rs +++ b/raphtory/src/disk_graph/graph_impl/edge_storage_ops.rs @@ -9,7 +9,6 @@ use crate::{ tprop_storage_ops::TPropOps, variants::layer_variants::LayerVariants, }, - disk_graph::graph_impl::tprops::read_tprop_column, }; use pometry_storage::{edge::Edge, tprops::DiskTProp}; use raphtory_api::core::{entities::edges::edge_ref::EdgeRef, storage::timeindex::TimeIndexEntry}; @@ -110,11 +109,12 @@ impl<'a> EdgeStorageOps<'a> for Edge<'a> { fn temporal_prop_layer(self, layer_id: usize, prop_id: usize) -> impl TPropOps<'a> + Sync + 'a { self.graph() .localize_edge_prop_id(layer_id, prop_id) - .and_then(|local_id| { - self.temporal_prop_layer_inner(layer_id, local_id) - .map(|field| (field, local_id)) + .map(|prop_id| { + self.graph() + .layer(layer_id) + .edges_storage() + .prop(self.eid(), prop_id) }) - .and_then(|(field, prop_id)| read_tprop_column(self, prop_id, layer_id, field)) .unwrap_or(DiskTProp::empty()) } diff --git a/raphtory/src/disk_graph/graph_impl/mod.rs b/raphtory/src/disk_graph/graph_impl/mod.rs index 7b9dd344bc..6a909aa574 100644 --- a/raphtory/src/disk_graph/graph_impl/mod.rs +++ b/raphtory/src/disk_graph/graph_impl/mod.rs @@ -5,7 +5,6 @@ use crate::{core::utils::errors::GraphError, disk_graph::DiskGraphStorage, prelu mod edge_storage_ops; mod interop; pub mod prop_conversion; -mod time_index_into_ops; pub mod tprops; #[derive(Debug)] diff --git a/raphtory/src/disk_graph/graph_impl/time_index_into_ops.rs b/raphtory/src/disk_graph/graph_impl/time_index_into_ops.rs deleted file mode 100644 index 74f35a4995..0000000000 --- a/raphtory/src/disk_graph/graph_impl/time_index_into_ops.rs +++ /dev/null @@ -1,176 +0,0 @@ -use crate::{ - core::storage::timeindex::{TimeIndexIntoOps, TimeIndexOps}, - db::api::view::IntoDynBoxed, -}; -use pometry_storage::{ - prelude::{ArrayOps, BaseArrayOps}, - timestamps::TimeStamps, -}; -use raphtory_api::core::storage::timeindex::TimeIndexEntry; -use std::ops::Range; - -impl<'a> TimeIndexIntoOps for TimeStamps<'a, TimeIndexEntry> { - type IndexType = TimeIndexEntry; - - type RangeType = Self; - - fn into_range(self, w: Range) -> Self { - let start = self.position(&w.start); - let end = self.position(&w.end); - let (timestamps, sec_index) = self.into_inner(); - TimeStamps::new( - timestamps.sliced(start..end), - sec_index.map(|sec_index| sec_index.sliced(start..end)), - ) - } - - #[allow(refining_impl_trait)] - fn into_iter(self) -> impl Iterator + Send + 'static { - let (timestamps, sec_index) = self.into_inner(); - let sec_iter: Box + Send> = sec_index - .map(|v| v.into_owned().map(|i| i as usize).into_dyn_boxed()) - .unwrap_or(self.timestamps().range().clone().into_dyn_boxed()); - timestamps - .into_owned() - .zip(sec_iter) - .map(|(t, s)| TimeIndexEntry(t, s)) - } -} -impl<'a> TimeIndexIntoOps for TimeStamps<'a, i64> { - type IndexType = i64; - - type RangeType = Self; - - fn into_range(self, w: Range) -> Self { - let start = self.timestamps().partition_point(|i| i < w.start); - let end = self.timestamps().partition_point(|i| i < w.end); - let (timestamps, _) = self.into_inner(); - TimeStamps::new(timestamps.sliced(start..end), None) - } - fn into_iter(self) -> impl Iterator + Send { - let (timestamps, _) = self.into_inner(); - timestamps - } -} - -impl<'a> TimeIndexOps for TimeStamps<'a, TimeIndexEntry> { - type IndexType = TimeIndexEntry; - type RangeType<'b> - = TimeStamps<'b, TimeIndexEntry> - where - Self: 'b; - - fn len(&self) -> usize { - self.timestamps().len() - } - - fn active(&self, w: Range) -> bool { - let i = self.position(&w.start); - i < self.timestamps().len() && self.get(i) < w.end - } - - fn range(&self, w: Range) -> Self::RangeType<'_> { - let start = self.position(&w.start); - let end = self.position(&w.end); - TimeStamps::new( - self.timestamps().sliced(start..end), - self.sec_index() - .map(|sec_index| sec_index.sliced(start..end)), - ) - } - - fn first_t(&self) -> Option { - (self.timestamps().len() > 0).then(|| self.timestamps().get(0)) - } - - fn first(&self) -> Option { - if self.timestamps().len() == 0 { - return None; - } - let t = self.timestamps().get(0); - let sec = self.sec_index().as_ref().map(|arr| arr.get(0)).unwrap_or(0); - - Some(TimeIndexEntry::new(t, sec as usize)) - } - - fn last_t(&self) -> Option { - (self.timestamps().len() > 0).then(|| self.timestamps().get(self.timestamps().len() - 1)) - } - - fn last(&self) -> Option { - if self.timestamps().len() == 0 { - return None; - } - let last_idx = self.timestamps().len() - 1; - - let t = self.timestamps().get(last_idx); - let sec = self - .sec_index() - .as_ref() - .map(|arr| arr.get(last_idx)) - .unwrap_or(0); - - Some(TimeIndexEntry::new(t, sec as usize)) - } - - fn iter(&self) -> Box + Send + 'a> { - let sec_iter = self - .sec_index() - .map(|v| v.map(|i| i as usize).into_dyn_boxed()) - .unwrap_or(self.timestamps().range().clone().into_dyn_boxed()); - Box::new( - self.timestamps() - .into_iter() - .zip(sec_iter) - .map(|(t, s)| TimeIndexEntry(t, s)), - ) - } -} -impl<'a> TimeIndexOps for TimeStamps<'a, i64> { - type IndexType = i64; - type RangeType<'b> - = TimeStamps<'b, i64> - where - Self: 'b; - - fn len(&self) -> usize { - self.timestamps().len() - } - fn active(&self, w: Range) -> bool { - let i = self.timestamps().insertion_point(w.start); - i < self.timestamps().len() && self.timestamps().get(i) < w.end - } - - fn range(&self, w: Range) -> Self::RangeType<'_> { - let start = self.timestamps().partition_point(|i| i < w.start); - let end = self.timestamps().partition_point(|i| i < w.end); - TimeStamps::new( - self.timestamps().sliced(start..end), - self.sec_index() - .map(|sec_index| sec_index.sliced(start..end)), - ) - } - - fn first(&self) -> Option { - if self.timestamps().len() == 0 { - return None; - } - let t = self.timestamps().get(0); - Some(t) - } - - fn last(&self) -> Option { - if self.timestamps().len() == 0 { - return None; - } - - let last_idx = self.timestamps().len() - 1; - - let t = self.timestamps().get(last_idx); - Some(t) - } - - fn iter(&self) -> Box + Send + '_> { - Box::new(self.timestamps().into_iter()) - } -} diff --git a/raphtory/src/disk_graph/graph_impl/tprops.rs b/raphtory/src/disk_graph/graph_impl/tprops.rs index 492360e9d3..9591c98174 100644 --- a/raphtory/src/disk_graph/graph_impl/tprops.rs +++ b/raphtory/src/disk_graph/graph_impl/tprops.rs @@ -1,8 +1,5 @@ use crate::{ - arrow2::{ - datatypes::{ArrowDataType as DataType, Field}, - types::{NativeType, Offset}, - }, + arrow2::types::{NativeType, Offset}, core::storage::timeindex::TimeIndexIntoOps, db::api::{storage::graph::tprop_storage_ops::TPropOps, view::IntoDynBoxed}, prelude::Prop, @@ -10,9 +7,7 @@ use crate::{ use polars_arrow::array::Array; use pometry_storage::{ chunked_array::{bool_col::ChunkedBoolCol, col::ChunkedPrimitiveCol, utf8_col::StringCol}, - edge::Edge, prelude::{ArrayOps, BaseArrayOps}, - timestamps::TimeStamps, tprops::{DiskTProp, EmptyTProp, TPropColumn}, }; use raphtory_api::core::storage::timeindex::TimeIndexEntry; @@ -187,55 +182,6 @@ impl<'a, I: Offset> TPropOps<'a> for TPropColumn<'a, StringCol<'a, I>, TimeIndex } } -fn new_tprop_column( - edge: Edge, - id: usize, - layer_id: usize, -) -> Option, TimeIndexEntry>> -where - Prop: From, -{ - let props = edge.prop_values::(id, layer_id)?; - let timestamps = TimeStamps::new(edge.timestamp_slice(layer_id), None); - Some(TPropColumn::new(props, timestamps)) -} - -pub fn read_tprop_column( - edge: Edge, - id: usize, - layer_id: usize, - field: Field, -) -> Option> { - match field.data_type() { - DataType::Boolean => { - let props = edge.prop_bool_values(id, layer_id)?; - let timestamps = TimeStamps::new(edge.timestamp_slice(layer_id), None); - Some(DiskTProp::Bool(TPropColumn::new(props, timestamps))) - } - DataType::Int64 => new_tprop_column::(edge, id, layer_id).map(DiskTProp::I64), - DataType::Int32 => new_tprop_column::(edge, id, layer_id).map(DiskTProp::I32), - DataType::UInt32 => new_tprop_column::(edge, id, layer_id).map(DiskTProp::U32), - DataType::UInt64 => new_tprop_column::(edge, id, layer_id).map(DiskTProp::U64), - DataType::Float32 => new_tprop_column::(edge, id, layer_id).map(DiskTProp::F32), - DataType::Float64 => new_tprop_column::(edge, id, layer_id).map(DiskTProp::F64), - DataType::Utf8 => { - let props = edge.prop_str_values::(id, layer_id)?; - let timestamps = TimeStamps::new(edge.timestamp_slice(layer_id), None); - Some(DiskTProp::Str32(TPropColumn::new(props, timestamps))) - } - DataType::LargeUtf8 => { - let props = edge.prop_str_values::(id, layer_id)?; - let timestamps = TimeStamps::new(edge.timestamp_slice(layer_id), None); - Some(DiskTProp::Str64(TPropColumn::new(props, timestamps))) - } - DataType::Date64 => new_tprop_column::(edge, id, layer_id).map(DiskTProp::I64), - DataType::Timestamp(_, _) => { - new_tprop_column::(edge, id, layer_id).map(DiskTProp::I64) - } - _ => todo!(), - } -} - impl<'a> TPropOps<'a> for EmptyTProp { fn last_before(&self, _t: TimeIndexEntry) -> Option<(TimeIndexEntry, Prop)> { None diff --git a/raphtory/src/disk_graph/storage_interface/node.rs b/raphtory/src/disk_graph/storage_interface/node.rs index cf99884f03..29706e57a2 100644 --- a/raphtory/src/disk_graph/storage_interface/node.rs +++ b/raphtory/src/disk_graph/storage_interface/node.rs @@ -15,7 +15,7 @@ use crate::{ }; use itertools::Itertools; use polars_arrow::datatypes::ArrowDataType; -use pometry_storage::{graph::TemporalGraph, timestamps::TimeStamps, GidRef}; +use pometry_storage::{graph::TemporalGraph, timestamps::TimeStamps, tprops::DiskTProp, GidRef}; use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator}; use std::{borrow::Cow, iter, sync::Arc}; @@ -35,11 +35,14 @@ impl<'a> DiskNode<'a> { } } - pub fn temporal_node_prop_ids(self) -> Box + 'a> { - match &self.graph.node_properties().temporal_props { - Some(props) => Box::new(props.prop_dtypes().iter().enumerate().map(|(i, _)| i)), - None => Box::new(std::iter::empty()), - } + pub fn temporal_node_prop_ids(self) -> impl Iterator + 'a { + self.graph + .prop_mapping() + .nodes() + .into_iter() + .enumerate() + .filter(|(_, exists)| exists.is_some()) + .map(|(id, _)| id) } pub(crate) fn new(graph: &'a TemporalGraph, vid: VID) -> Self { @@ -176,13 +179,14 @@ impl<'a> DiskNode<'a> { .collect::>(), }; - if let Some(props) = &self.graph.node_properties().temporal_props { + for props in self.graph.node_properties().temporal_props() { let timestamps = props.timestamps::(self.vid); if timestamps.len() > 0 { let ts = timestamps.times(); additions.push(ts); } } + NodeAdditions::Col(additions) } } @@ -239,11 +243,16 @@ impl<'a> NodeStorageOps<'a> for DiskNode<'a> { fn tprop(self, prop_id: usize) -> impl TPropOps<'a> { self.graph - .node_properties() - .temporal_props - .as_ref() - .unwrap() - .prop(self.vid, prop_id) + .prop_mapping() + .localise_node_prop_id(prop_id) + .and_then(|(layer, local_prop_id)| { + self.graph + .node_properties() + .temporal_props() + .get(layer) + .map(|t_props| t_props.prop(self.vid, local_prop_id)) + }) + .unwrap_or(DiskTProp::empty()) } fn prop(self, prop_id: usize) -> Option { @@ -268,7 +277,7 @@ impl<'a> NodeStorageOps<'a> for DiskNode<'a> { self, layers: &LayerIds, dir: Direction, - ) -> Box + Send + 'a> { + ) -> impl Iterator + Send + 'a { //FIXME: something is capturing the &LayerIds lifetime when using impl Iterator Box::new(match dir { Direction::OUT => DirectionVariants::Out(self.out_edges(layers)), diff --git a/raphtory/src/io/arrow/df_loaders.rs b/raphtory/src/io/arrow/df_loaders.rs index 80ec595780..1a54ddf88c 100644 --- a/raphtory/src/io/arrow/df_loaders.rs +++ b/raphtory/src/io/arrow/df_loaders.rs @@ -15,6 +15,7 @@ use crate::{ serialise::incremental::InternalCache, }; use bytemuck::checked::cast_slice_mut; +#[cfg(feature = "python")] use kdam::{Bar, BarBuilder, BarExt}; use raphtory_api::{ atomic_extra::atomic_usize_from_mut_slice, @@ -27,6 +28,7 @@ use raphtory_api::{ use rayon::prelude::*; use std::{collections::HashMap, sync::atomic::Ordering}; +#[cfg(feature = "python")] fn build_progress_bar(des: String, num_rows: usize) -> Result { BarBuilder::default() .desc(des) @@ -414,6 +416,7 @@ pub(crate) fn load_edge_deletions_from_df< None }; let layer_index = layer_index.transpose()?; + #[cfg(feature = "python")] let mut pb = build_progress_bar("Loading edge deletions".to_string(), df_view.num_rows)?; let mut start_idx = graph.reserve_event_ids(df_view.num_rows)?; @@ -436,6 +439,7 @@ pub(crate) fn load_edge_deletions_from_df< graph.delete_edge((time, start_idx + idx), src, dst, layer)?; Ok::<(), GraphError>(()) })?; + #[cfg(feature = "python")] let _ = pb.update(df.len()); start_idx += df.len(); } @@ -480,6 +484,7 @@ pub(crate) fn load_node_props_from_df< .collect::, GraphError>>()?, None => vec![], }; + #[cfg(feature = "python")] let mut pb = build_progress_bar("Loading node properties".to_string(), df_view.num_rows)?; for chunk in df_view.chunks { let df = chunk?; @@ -512,6 +517,7 @@ pub(crate) fn load_node_props_from_df< } Ok::<(), GraphError>(()) })?; + #[cfg(feature = "python")] let _ = pb.update(df.len()); } Ok(()) @@ -543,6 +549,7 @@ pub(crate) fn load_edges_props_from_df< None }; let layer_index = layer_index.transpose()?; + #[cfg(feature = "python")] let mut pb = build_progress_bar("Loading edge properties".to_string(), df_view.num_rows)?; let shared_constant_properties = match shared_constant_properties { None => { @@ -596,6 +603,7 @@ pub(crate) fn load_edges_props_from_df< } Ok::<(), GraphError>(()) })?; + #[cfg(feature = "python")] let _ = pb.update(df.len()); } Ok(()) diff --git a/raphtory/src/python/graph/disk_graph.rs b/raphtory/src/python/graph/disk_graph.rs index 70b9f7b084..4cb1f9fa64 100644 --- a/raphtory/src/python/graph/disk_graph.rs +++ b/raphtory/src/python/graph/disk_graph.rs @@ -14,7 +14,7 @@ use crate::{ python::{graph::graph::PyGraph, types::repr::StructReprBuilder}, }; use itertools::Itertools; -use pometry_storage::graph::load_node_const_properties; +use pometry_storage::graph::{load_node_const_properties, TemporalGraph}; use pyo3::{exceptions::PyRuntimeError, prelude::*, pybacked::PyBackedStr, types::PyDict}; use std::{ ops::Deref, @@ -262,6 +262,19 @@ impl PyDiskGraph { Self::load_from_dir(self.graph_dir().to_path_buf()) } + #[pyo3(signature = (location, chunk_size=20_000_000))] + pub fn append_node_temporal_properties( + &self, + location: &str, + chunk_size: usize, + ) -> Result { + let path = PathBuf::from_str(location).unwrap(); + let chunks = read_struct_arrays(&path, None)?; + let mut graph = TemporalGraph::new(self.graph.inner().graph_dir())?; + graph.load_temporal_node_props_from_chunks(chunks, chunk_size, false)?; + Self::load_from_dir(self.graph_dir().to_path_buf()) + } + /// Merge this graph with another `DiskGraph`. Note that both graphs should have nodes that are /// sorted by their global ids or the resulting graph will be nonsense! fn merge_by_sorted_gids( diff --git a/raphtory/src/python/graph/graph.rs b/raphtory/src/python/graph/graph.rs index 5ae9103343..8729bd2cff 100644 --- a/raphtory/src/python/graph/graph.rs +++ b/raphtory/src/python/graph/graph.rs @@ -17,6 +17,7 @@ use crate::{ edge::PyEdge, graph_with_deletions::PyPersistentGraph, io::pandas_loaders::*, node::PyNode, views::graph_view::PyGraphView, }, + types::iterable::FromIterable, utils::{PyNodeRef, PyTime}, }, serialise::{StableDecode, StableEncode}, @@ -362,7 +363,7 @@ impl PyGraph { /// Raises: /// GraphError: If the operation fails. #[pyo3(signature = (nodes, force = false))] - pub fn import_nodes(&self, nodes: Vec, force: bool) -> Result<(), GraphError> { + pub fn import_nodes(&self, nodes: FromIterable, force: bool) -> Result<(), GraphError> { let node_views = nodes.iter().map(|node| &node.node); self.graph.import_nodes(node_views, force) } @@ -456,7 +457,7 @@ impl PyGraph { /// Raises: /// GraphError: If the operation fails. #[pyo3(signature = (edges, force = false))] - pub fn import_edges(&self, edges: Vec, force: bool) -> Result<(), GraphError> { + pub fn import_edges(&self, edges: FromIterable, force: bool) -> Result<(), GraphError> { let edge_views = edges.iter().map(|edge| &edge.edge); self.graph.import_edges(edge_views, force) } diff --git a/raphtory/src/python/packages/vectors.rs b/raphtory/src/python/packages/vectors.rs index 84604d9bbc..40fd2c4d18 100644 --- a/raphtory/src/python/packages/vectors.rs +++ b/raphtory/src/python/packages/vectors.rs @@ -1,12 +1,6 @@ use crate::{ - core::{ - utils::{errors::GraphError, time::IntoTime}, - DocumentInput, Lifespan, Prop, - }, - db::api::{ - properties::{internal::PropertiesOps, Properties}, - view::{MaterializedGraph, StaticGraphViewOps}, - }, + core::utils::{errors::GraphError, time::IntoTime}, + db::api::view::{MaterializedGraph, StaticGraphViewOps}, prelude::{EdgeViewOps, GraphViewOps, NodeViewOps}, python::{ graph::{edge::PyEdge, node::PyNode, views::graph_view::PyGraphView}, @@ -21,7 +15,6 @@ use crate::{ Document, Embedding, EmbeddingFunction, EmbeddingResult, }, }; -use chrono::DateTime; use futures_util::future::BoxFuture; use itertools::Itertools; use pyo3::{ @@ -70,17 +63,6 @@ impl<'source> FromPyObject<'source> for PyQuery { } } -fn format_time(millis: i64) -> String { - if millis == 0 { - "unknown time".to_owned() - } else { - match DateTime::from_timestamp_millis(millis) { - Some(time) => time.naive_utc().format("%Y-%m-%d %H:%M:%S").to_string(), - None => "unknown time".to_owned(), - } - } -} - impl PyDocument { pub fn extract_rust_document(&self, py: Python) -> Result { if let (Some(entity), Some(embedding)) = (&self.entity, &self.embedding) { @@ -173,68 +155,6 @@ pub fn into_py_document( } } -/// This funtions ignores the time history of temporal props if their type is Document and they have a life different than Lifespan::Inherited -fn get_documents_from_props( - properties: Properties

, - name: &str, -) -> Box> { - let prop = properties.temporal().get(name); - - match prop { - Some(prop) => { - let props = prop.into_iter(); - let docs = props - .map(|(time, prop)| prop_to_docs(&prop, Lifespan::event(time)).collect_vec()) - .flatten(); - Box::new(docs) - } - None => match properties.get(name) { - Some(prop) => Box::new( - prop_to_docs(&prop, Lifespan::Inherited) - .collect_vec() - .into_iter(), - ), - _ => Box::new(std::iter::empty()), - }, - } -} - -impl Lifespan { - fn overwrite_inherited(&self, default_lifespan: Lifespan) -> Self { - match self { - Lifespan::Inherited => default_lifespan, - other => other.clone(), - } - } -} - -fn prop_to_docs( - prop: &Prop, - default_lifespan: Lifespan, -) -> Box + '_> { - match prop { - Prop::List(docs) => Box::new( - docs.iter() - .map(move |prop| prop_to_docs(prop, default_lifespan)) - .flatten(), - ), - Prop::Map(doc_map) => Box::new( - doc_map - .values() - .map(move |prop| prop_to_docs(prop, default_lifespan)) - .flatten(), - ), - Prop::Document(document) => Box::new(std::iter::once(DocumentInput { - life: document.life.overwrite_inherited(default_lifespan), - ..document.clone() - })), - prop => Box::new(std::iter::once(DocumentInput { - content: prop.to_string(), - life: default_lifespan, - })), - } -} - #[pymethods] impl PyGraphView { /// Create a VectorisedGraph from the current graph diff --git a/raphtory/src/python/types/iterable.rs b/raphtory/src/python/types/iterable.rs index 9f73ffea8b..9ac5b6fa9c 100644 --- a/raphtory/src/python/types/iterable.rs +++ b/raphtory/src/python/types/iterable.rs @@ -2,8 +2,12 @@ use crate::{ db::api::view::BoxedIter, python::types::repr::{iterator_repr, Repr}, }; -use pyo3::{IntoPy, PyObject}; -use std::{marker::PhantomData, sync::Arc}; +use pyo3::prelude::*; +use std::{ + marker::PhantomData, + ops::{Deref, DerefMut}, + sync::Arc, +}; pub struct Iterable + From + Repr> { pub name: &'static str, @@ -113,3 +117,31 @@ impl + From + Repr> Repr for NestedIterable(Vec); +impl Deref for FromIterable { + type Target = [T]; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for FromIterable { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl<'py, T: FromPyObject<'py>> FromPyObject<'py> for FromIterable { + fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { + let len = ob.len().unwrap_or(0); + let mut vec = Vec::::with_capacity(len); + { + for value in ob.iter()? { + vec.push(value?.extract()?) + } + } + Ok(Self(vec)) + } +} diff --git a/raphtory/src/serialise/incremental.rs b/raphtory/src/serialise/incremental.rs index f5a92558c0..5e17929084 100644 --- a/raphtory/src/serialise/incremental.rs +++ b/raphtory/src/serialise/incremental.rs @@ -232,7 +232,7 @@ impl GraphWriter { } } -pub(crate) trait InternalCache { +pub trait InternalCache { /// Initialise the cache by pointing it at a proto file. /// Future updates will be appended to the cache. fn init_cache(&self, path: &GraphFolder) -> Result<(), GraphError>; diff --git a/scripts/activate_private_storage.py b/scripts/activate_private_storage.py index 319c5fcd09..bea1370ebe 100755 --- a/scripts/activate_private_storage.py +++ b/scripts/activate_private_storage.py @@ -14,7 +14,7 @@ if "#[private-storage]" in line: next_line = lines[i + 1] if next_line.strip().startswith("#") and "pometry-storage" in next_line: - lines[i + 1] = re.sub(r"#\s*", "", next_line, 1) + lines[i + 1] = re.sub(r"#\s*", "", next_line, count=1) if "#[public-storage]" in line: next_line = lines[i + 1] if next_line.strip().startswith("pometry-storage"):