Skip to content

Commit

Permalink
unwrap bulk results
Browse files Browse the repository at this point in the history
  • Loading branch information
sehnem committed Sep 20, 2023
1 parent fde2114 commit 4ff0bec
Showing 1 changed file with 30 additions and 4 deletions.
34 changes: 30 additions & 4 deletions tap_shopify/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class ShopifyStream(GraphQLStream):
ignore_objs = ["image", "metafield", "metafields", "metafieldconnection", "privateMetafield", "privateMetafields"]
_requests_session = None
denied_fields = []
stream_connections = []

@property
def url_base(self) -> str:
Expand Down Expand Up @@ -165,6 +166,12 @@ def get_fields_schema(self, fields) -> dict:
if field_name in self.ignore_objs:
continue

if type_def.get("name") and type_def["name"].endswith("Connection"):
self.stream_connections.append(dict(
name=field_name,
of_type=type_def["name"][:-10]
))

required = field["type"].get("kind") == "NON_NULL"
field_type = self.extract_field_type(type_def)

Expand Down Expand Up @@ -195,7 +202,9 @@ def schema(self) -> dict:
stream = (s for s in streams if s["tap_stream_id"] == self.name)
stream_catalog = next(stream, None)
if stream_catalog:
return stream_catalog["schema"]
metadata = next(f for f in stream_catalog["metadata"] if not f["breadcrumb"])
if not metadata["metadata"].get("selected"):
return stream_catalog["schema"]

stream_type = self.extract_gql_schema(self.gql_type)
properties = self.get_fields_schema(stream_type["fields"])
Expand All @@ -215,7 +224,7 @@ def selected_properties(self):
or field_name == self.replication_key
):
selected_properties.append(field_name)
return selected_properties
return selected_properties

@property
def gql_selected_fields(self):
Expand Down Expand Up @@ -429,9 +438,26 @@ def parse_response_bulk(self, response: requests.Response) -> Iterable[dict]:
url = self.check_status(operation_id)

output = requests.get(url, stream=True, timeout=30)

main_item = None
for line in output.iter_lines():
yield simplejson.loads(line)
line = simplejson.loads(line)
selected_connections = [c for c in self.stream_connections if c["name"] in self.selected_properties]
if "__parentId" not in line.keys():
if main_item:
yield main_item
main_item = line
for sc in selected_connections:
main_item[sc["name"]] = {}
main_item[sc["name"]]["edges"] = []
main_item["variants"] = {}
main_item["variants"]["edges"] = []
elif main_item["id"]==line["__parentId"]:
del line["__parentId"]
line_type = line["id"].split("/")[-2]
field_name = next(c["name"] for c in selected_connections if c["of_type"]==line_type)
main_item[field_name]["edges"].append(dict(node=line))
else:
pass

def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result rows."""
Expand Down

0 comments on commit 4ff0bec

Please sign in to comment.