Skip to content

Commit

Permalink
wip evaluate query
Browse files Browse the repository at this point in the history
  • Loading branch information
sehnem committed Sep 19, 2023
1 parent 9b2c74a commit 3e4d28d
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 44 deletions.
215 changes: 203 additions & 12 deletions tap_shopify/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

from __future__ import annotations

import simplejson
from functools import cached_property
from inspect import stack
from typing import Any, Optional
from typing import Any, Dict, Iterable, Optional, cast

import requests

from singer_sdk.exceptions import FatalAPIError, RetriableAPIError
from singer_sdk.helpers.jsonpath import extract_jsonpath

from http import HTTPStatus

Expand All @@ -18,6 +21,14 @@
from tap_shopify.auth import ShopifyAuthenticator
from tap_shopify.gql_queries import schema_query
from tap_shopify.paginator import ShopifyPaginator
from tap_shopify.gql_queries import query_incremental

from datetime import datetime
from time import sleep
from singer_sdk.pagination import SinglePagePaginator

from tap_shopify.exceptions import InvalidOperation, OperationFailed
from tap_shopify.gql_queries import bulk_query, bulk_query_status


def verify_recursion(func):
Expand Down Expand Up @@ -46,6 +57,7 @@ class ShopifyStream(GraphQLStream):
ignore_objs = []
_requests_session = None
nested_connections = []
denied_fields = []

@property
def url_base(self) -> str:
Expand All @@ -54,16 +66,6 @@ def url_base(self) -> str:
api_version = self.config.get("api_version")
return f"https://{store}.myshopify.com/admin/api/{api_version}/graphql.json"

@property
def authenticator(self):
"""Return a new authenticator object."""
return ShopifyAuthenticator(
self,
key="X-Shopify-Access-Token",
value=self.config["access_token"],
location="header",
)

@property
def get_new_paginator(self):
if not self.replication_key or self.config.get("bulk"):
Expand All @@ -81,6 +83,7 @@ def http_headers(self) -> dict:
"""
headers = {}
headers["Content-Type"] = "application/json"
headers["X-Shopify-Access-Token"] = self.config["access_token"]
return headers

@cached_property
Expand Down Expand Up @@ -207,6 +210,8 @@ def gql_selected_fields(self):
def denest_schema(schema):
output = ""
for key, value in schema.items():
if key in self.denied_fields:
continue
if "items" in value.keys():
value = value["items"]
if "properties" in value.keys():
Expand Down Expand Up @@ -273,4 +278,190 @@ def post_process(
if self.config["use_numeric_ids"]:
self.convert_id_fields(row)

return row
return row


def query_gql(self) -> str:
"""Set or return the GraphQL query string."""
base_query = query_incremental

query = base_query.replace("__query_name__", self.query_name)
query = query.replace("__selected_fields__", self.gql_selected_fields)
additional_args = ", " + ", ".join(self.additional_arguments)
query = query.replace("__additional_args__", additional_args)

return query

def get_url_params(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization."""
params = {}

if next_page_token:
params.update(next_page_token)
else:
params["first"] = 1
if self.replication_key:
start_date = self.get_starting_timestamp(context)
if start_date:
date = start_date.strftime("%Y-%m-%dT%H:%M:%S")
params["filter"] = f"updated_at:>{date}"
if self.single_object_params:
params = self.single_object_params
return params

def prepare_request_payload(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Optional[dict]:
"""Prepare the data payload for the GraphQL API request."""
params = self.get_url_params(context, next_page_token)
query = self.query.lstrip()
request_data = {
"query": query,
"variables": params,
}
self.logger.debug(f"Attempting query:\n{query}")
return request_data

def parse_response_gql(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result rows."""
if self.replication_key:
json_path = f"$.data.{self.query_name}.edges[*].node"
else:
json_path = f"$.data.{self.query_name}"
json_resp = response.json()

yield from extract_jsonpath(json_path, json_resp)


def query_bulk(self) -> str:
"""Set or return the GraphQL query string."""
base_query = bulk_query

query = base_query.replace("__query_name__", self.query_name)
query = query.replace("__selected_fields__", self.gql_selected_fields)
filters = f"({self.filters})" if self.filters else ""
query = query.replace("__filters__", filters)

return query

@property
def filters(self):
"""Return a dictionary of values to be used in URL parameterization."""
filters = []
if self.additional_arguments:
filters.extend(self.additional_arguments)
if self.replication_key:
start_date = self.get_starting_timestamp({})
if start_date:
date = start_date.strftime("%Y-%m-%dT%H:%M:%S")
filters.append(f'query: "updated_at:>{date}"')
return ",".join(filters)

def get_operation_status(self):
headers = self.http_headers
authenticator = self.authenticator
if authenticator:
headers.update(authenticator.auth_headers or {})

request = cast(
requests.PreparedRequest,
self.requests_session.prepare_request(
requests.Request(
method=self.rest_method,
url=self.get_url({}),
headers=headers,
json=dict(query=bulk_query_status, variables={}),
),
),
)

decorated_request = self.request_decorator(self._request)
response = decorated_request(request, {})

return response

def check_status(self, operation_id, sleep_time=10, timeout=1800):
status_jsonpath = "$.data.currentBulkOperation"
start = datetime.now().timestamp()

while datetime.now().timestamp() < (start + timeout):
status_response = self.get_operation_status()
status = next(
extract_jsonpath(status_jsonpath, input=status_response.json())
)
if status["id"] != operation_id:
raise InvalidOperation(
"The current job was not triggered by the process, "
"check if other service is using the Bulk API"
)
if status["url"]:
return status["url"]
if status["status"] == "FAILED":
raise InvalidOperation(f"Job failed: {status['errorCode']}")
sleep(sleep_time)
raise OperationFailed("Job Timeout")

def parse_response_bulk(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result rows."""
operation_id_jsonpath = "$.data.bulkOperationRunQuery.bulkOperation.id"
error_jsonpath = "$.data.bulkOperationRunQuery.userErrors"
json_resp = response.json()
errors = next(extract_jsonpath(error_jsonpath, json_resp), None)
if errors:
raise InvalidOperation(simplejson.dumps(errors))
operation_id = next(
extract_jsonpath(operation_id_jsonpath, json_resp)
)

url = self.check_status(operation_id)

output = requests.get(url, stream=True, timeout=30)

for line in output.iter_lines():
yield simplejson.loads(line)

def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result rows."""
if self.config.get("bulk"):
return self.parse_response_bulk(response)
return self.parse_response_gql(response)

@cached_property
def query(self) -> str:
"""Set or return the GraphQL query string."""
self.evaluate_query()
if self.config.get("bulk"):
return self.query_bulk()
return self.query_gql()


def evaluate_query(self) -> dict:
query = self.query_gql().lstrip()
params = self.get_url_params(None, None)
request_data = {
"query": query,
"variables": params,
}

response = requests.request(
method=self.rest_method,
url=self.get_url({}),
params=params,
headers=self.http_headers,
json=request_data,
)

errors = response.json().get("errors")
if errors:
for error in errors:
error_code = error.get("extensions", {}).get("code")
if error_code in ["ACCESS_DENIED"]:
message = error.get("message", "")
if message.startswith("Access denied for "):
self.logger.warning(message)
self.denied_fields.append(message.split(" ")[3])
else:
raise FatalAPIError(error.get("message", ""), response)
self.evaluate_query()
13 changes: 2 additions & 11 deletions tap_shopify/client_gql.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,8 @@
class shopifyGqlStream(ShopifyStream):
"""shopify stream class."""

# @cached_property
def query(self) -> str:
def query_gql(self) -> str:
"""Set or return the GraphQL query string."""
# This is for supporting the single object like shop endpoint
# if not self.replication_key and not self.single_object_params:
# base_query = simple_query
# elif self.single_object_params:
# base_query = simple_query_incremental
# else:
# base_query = query_incremental

base_query = query_incremental

query = base_query.replace("__query_name__", self.query_name)
Expand Down Expand Up @@ -66,7 +57,7 @@ def prepare_request_payload(
self.logger.debug(f"Attempting query:\n{query}")
return request_data

def parse_response(self, response: requests.Response) -> Iterable[dict]:
def parse_response_gql(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result rows."""
if self.replication_key:
json_path = f"$.data.{self.query_name}.edges[*].node"
Expand Down
64 changes: 43 additions & 21 deletions tap_shopify/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,49 @@
import requests
import inflection

from tap_shopify.client_bulk import shopifyBulkStream
from tap_shopify.client_gql import shopifyGqlStream


class ShopifyStream(shopifyGqlStream, shopifyBulkStream):
"""Define base based on the type GraphQL or Bulk."""

def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result rows."""
if self.config.get("bulk"):
return shopifyBulkStream.parse_response(self, response)
else:
return shopifyGqlStream.parse_response(self, response)

@cached_property
def query(self) -> str:
"""Set or return the GraphQL query string."""
if self.config.get("bulk"):
return shopifyBulkStream.query(self)
else:
return shopifyGqlStream.query(self)
from tap_shopify.client import ShopifyStream

# from tap_shopify.client_bulk import shopifyBulkStream
# from tap_shopify.client_gql import shopifyGqlStream


# class ShopifyStream(shopifyGqlStream, shopifyBulkStream):
# """Define base based on the type GraphQL or Bulk."""


# def parse_response(self, response: requests.Response) -> Iterable[dict]:
# """Parse the response and return an iterator of result rows."""
# if self.config.get("bulk"):
# return shopifyBulkStream.parse_response(self, response)
# else:
# return shopifyGqlStream.parse_response(self, response)

# @cached_property
# def query(self) -> str:
# """Set or return the GraphQL query string."""
# if self.config.get("bulk"):
# return shopifyBulkStream.query(self)
# else:
# return shopifyGqlStream.query(self)

# def evaluate_query(self) -> dict:
# query = shopifyGqlStream.query(self)
# params = self.get_url_params(None, None)
# query = self.query.lstrip()
# request_data = {
# "query": query,
# "variables": params,
# }

# response = requests.request(
# method=self.rest_method,
# url=self.get_url({}),
# params=params,
# headers=self.http_headers,
# json=request_data,
# )

# return response


class TapShopify(Tap):
Expand Down

0 comments on commit 3e4d28d

Please sign in to comment.