From 3e4d28de1266439d430318e5a43e88f959fd7eab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Josu=C3=A9=20Sehnem?= Date: Mon, 18 Sep 2023 22:06:07 -0300 Subject: [PATCH] wip evaluate query --- tap_shopify/client.py | 215 +++++++++++++++++++++++++++++++++++--- tap_shopify/client_gql.py | 13 +-- tap_shopify/tap.py | 64 ++++++++---- 3 files changed, 248 insertions(+), 44 deletions(-) diff --git a/tap_shopify/client.py b/tap_shopify/client.py index 8925537..53bbc59 100644 --- a/tap_shopify/client.py +++ b/tap_shopify/client.py @@ -2,12 +2,15 @@ from __future__ import annotations +import simplejson from functools import cached_property from inspect import stack -from typing import Any, Optional +from typing import Any, Dict, Iterable, Optional, cast + import requests from singer_sdk.exceptions import FatalAPIError, RetriableAPIError +from singer_sdk.helpers.jsonpath import extract_jsonpath from http import HTTPStatus @@ -18,6 +21,14 @@ from tap_shopify.auth import ShopifyAuthenticator from tap_shopify.gql_queries import schema_query from tap_shopify.paginator import ShopifyPaginator +from tap_shopify.gql_queries import query_incremental + +from datetime import datetime +from time import sleep +from singer_sdk.pagination import SinglePagePaginator + +from tap_shopify.exceptions import InvalidOperation, OperationFailed +from tap_shopify.gql_queries import bulk_query, bulk_query_status def verify_recursion(func): @@ -46,6 +57,7 @@ class ShopifyStream(GraphQLStream): ignore_objs = [] _requests_session = None nested_connections = [] + denied_fields = [] @property def url_base(self) -> str: @@ -54,16 +66,6 @@ def url_base(self) -> str: api_version = self.config.get("api_version") return f"https://{store}.myshopify.com/admin/api/{api_version}/graphql.json" - @property - def authenticator(self): - """Return a new authenticator object.""" - return ShopifyAuthenticator( - self, - key="X-Shopify-Access-Token", - value=self.config["access_token"], - location="header", - ) - @property def get_new_paginator(self): if not self.replication_key or self.config.get("bulk"): @@ -81,6 +83,7 @@ def http_headers(self) -> dict: """ headers = {} headers["Content-Type"] = "application/json" + headers["X-Shopify-Access-Token"] = self.config["access_token"] return headers @cached_property @@ -207,6 +210,8 @@ def gql_selected_fields(self): def denest_schema(schema): output = "" for key, value in schema.items(): + if key in self.denied_fields: + continue if "items" in value.keys(): value = value["items"] if "properties" in value.keys(): @@ -273,4 +278,190 @@ def post_process( if self.config["use_numeric_ids"]: self.convert_id_fields(row) - return row \ No newline at end of file + return row + + + def query_gql(self) -> str: + """Set or return the GraphQL query string.""" + base_query = query_incremental + + query = base_query.replace("__query_name__", self.query_name) + query = query.replace("__selected_fields__", self.gql_selected_fields) + additional_args = ", " + ", ".join(self.additional_arguments) + query = query.replace("__additional_args__", additional_args) + + return query + + def get_url_params( + self, context: Optional[dict], next_page_token: Optional[Any] + ) -> Dict[str, Any]: + """Return a dictionary of values to be used in URL parameterization.""" + params = {} + + if next_page_token: + params.update(next_page_token) + else: + params["first"] = 1 + if self.replication_key: + start_date = self.get_starting_timestamp(context) + if start_date: + date = start_date.strftime("%Y-%m-%dT%H:%M:%S") + params["filter"] = f"updated_at:>{date}" + if self.single_object_params: + params = self.single_object_params + return params + + def prepare_request_payload( + self, context: Optional[dict], next_page_token: Optional[Any] + ) -> Optional[dict]: + """Prepare the data payload for the GraphQL API request.""" + params = self.get_url_params(context, next_page_token) + query = self.query.lstrip() + request_data = { + "query": query, + "variables": params, + } + self.logger.debug(f"Attempting query:\n{query}") + return request_data + + def parse_response_gql(self, response: requests.Response) -> Iterable[dict]: + """Parse the response and return an iterator of result rows.""" + if self.replication_key: + json_path = f"$.data.{self.query_name}.edges[*].node" + else: + json_path = f"$.data.{self.query_name}" + json_resp = response.json() + + yield from extract_jsonpath(json_path, json_resp) + + + def query_bulk(self) -> str: + """Set or return the GraphQL query string.""" + base_query = bulk_query + + query = base_query.replace("__query_name__", self.query_name) + query = query.replace("__selected_fields__", self.gql_selected_fields) + filters = f"({self.filters})" if self.filters else "" + query = query.replace("__filters__", filters) + + return query + + @property + def filters(self): + """Return a dictionary of values to be used in URL parameterization.""" + filters = [] + if self.additional_arguments: + filters.extend(self.additional_arguments) + if self.replication_key: + start_date = self.get_starting_timestamp({}) + if start_date: + date = start_date.strftime("%Y-%m-%dT%H:%M:%S") + filters.append(f'query: "updated_at:>{date}"') + return ",".join(filters) + + def get_operation_status(self): + headers = self.http_headers + authenticator = self.authenticator + if authenticator: + headers.update(authenticator.auth_headers or {}) + + request = cast( + requests.PreparedRequest, + self.requests_session.prepare_request( + requests.Request( + method=self.rest_method, + url=self.get_url({}), + headers=headers, + json=dict(query=bulk_query_status, variables={}), + ), + ), + ) + + decorated_request = self.request_decorator(self._request) + response = decorated_request(request, {}) + + return response + + def check_status(self, operation_id, sleep_time=10, timeout=1800): + status_jsonpath = "$.data.currentBulkOperation" + start = datetime.now().timestamp() + + while datetime.now().timestamp() < (start + timeout): + status_response = self.get_operation_status() + status = next( + extract_jsonpath(status_jsonpath, input=status_response.json()) + ) + if status["id"] != operation_id: + raise InvalidOperation( + "The current job was not triggered by the process, " + "check if other service is using the Bulk API" + ) + if status["url"]: + return status["url"] + if status["status"] == "FAILED": + raise InvalidOperation(f"Job failed: {status['errorCode']}") + sleep(sleep_time) + raise OperationFailed("Job Timeout") + + def parse_response_bulk(self, response: requests.Response) -> Iterable[dict]: + """Parse the response and return an iterator of result rows.""" + operation_id_jsonpath = "$.data.bulkOperationRunQuery.bulkOperation.id" + error_jsonpath = "$.data.bulkOperationRunQuery.userErrors" + json_resp = response.json() + errors = next(extract_jsonpath(error_jsonpath, json_resp), None) + if errors: + raise InvalidOperation(simplejson.dumps(errors)) + operation_id = next( + extract_jsonpath(operation_id_jsonpath, json_resp) + ) + + url = self.check_status(operation_id) + + output = requests.get(url, stream=True, timeout=30) + + for line in output.iter_lines(): + yield simplejson.loads(line) + + def parse_response(self, response: requests.Response) -> Iterable[dict]: + """Parse the response and return an iterator of result rows.""" + if self.config.get("bulk"): + return self.parse_response_bulk(response) + return self.parse_response_gql(response) + + @cached_property + def query(self) -> str: + """Set or return the GraphQL query string.""" + self.evaluate_query() + if self.config.get("bulk"): + return self.query_bulk() + return self.query_gql() + + + def evaluate_query(self) -> dict: + query = self.query_gql().lstrip() + params = self.get_url_params(None, None) + request_data = { + "query": query, + "variables": params, + } + + response = requests.request( + method=self.rest_method, + url=self.get_url({}), + params=params, + headers=self.http_headers, + json=request_data, + ) + + errors = response.json().get("errors") + if errors: + for error in errors: + error_code = error.get("extensions", {}).get("code") + if error_code in ["ACCESS_DENIED"]: + message = error.get("message", "") + if message.startswith("Access denied for "): + self.logger.warning(message) + self.denied_fields.append(message.split(" ")[3]) + else: + raise FatalAPIError(error.get("message", ""), response) + self.evaluate_query() \ No newline at end of file diff --git a/tap_shopify/client_gql.py b/tap_shopify/client_gql.py index 6ca49f0..4d2bd76 100644 --- a/tap_shopify/client_gql.py +++ b/tap_shopify/client_gql.py @@ -14,17 +14,8 @@ class shopifyGqlStream(ShopifyStream): """shopify stream class.""" - # @cached_property - def query(self) -> str: + def query_gql(self) -> str: """Set or return the GraphQL query string.""" - # This is for supporting the single object like shop endpoint - # if not self.replication_key and not self.single_object_params: - # base_query = simple_query - # elif self.single_object_params: - # base_query = simple_query_incremental - # else: - # base_query = query_incremental - base_query = query_incremental query = base_query.replace("__query_name__", self.query_name) @@ -66,7 +57,7 @@ def prepare_request_payload( self.logger.debug(f"Attempting query:\n{query}") return request_data - def parse_response(self, response: requests.Response) -> Iterable[dict]: + def parse_response_gql(self, response: requests.Response) -> Iterable[dict]: """Parse the response and return an iterator of result rows.""" if self.replication_key: json_path = f"$.data.{self.query_name}.edges[*].node" diff --git a/tap_shopify/tap.py b/tap_shopify/tap.py index f66aad3..7273993 100644 --- a/tap_shopify/tap.py +++ b/tap_shopify/tap.py @@ -12,27 +12,49 @@ import requests import inflection -from tap_shopify.client_bulk import shopifyBulkStream -from tap_shopify.client_gql import shopifyGqlStream - - -class ShopifyStream(shopifyGqlStream, shopifyBulkStream): - """Define base based on the type GraphQL or Bulk.""" - - def parse_response(self, response: requests.Response) -> Iterable[dict]: - """Parse the response and return an iterator of result rows.""" - if self.config.get("bulk"): - return shopifyBulkStream.parse_response(self, response) - else: - return shopifyGqlStream.parse_response(self, response) - - @cached_property - def query(self) -> str: - """Set or return the GraphQL query string.""" - if self.config.get("bulk"): - return shopifyBulkStream.query(self) - else: - return shopifyGqlStream.query(self) +from tap_shopify.client import ShopifyStream + +# from tap_shopify.client_bulk import shopifyBulkStream +# from tap_shopify.client_gql import shopifyGqlStream + + +# class ShopifyStream(shopifyGqlStream, shopifyBulkStream): +# """Define base based on the type GraphQL or Bulk.""" + + +# def parse_response(self, response: requests.Response) -> Iterable[dict]: +# """Parse the response and return an iterator of result rows.""" +# if self.config.get("bulk"): +# return shopifyBulkStream.parse_response(self, response) +# else: +# return shopifyGqlStream.parse_response(self, response) + +# @cached_property +# def query(self) -> str: +# """Set or return the GraphQL query string.""" +# if self.config.get("bulk"): +# return shopifyBulkStream.query(self) +# else: +# return shopifyGqlStream.query(self) + + # def evaluate_query(self) -> dict: + # query = shopifyGqlStream.query(self) + # params = self.get_url_params(None, None) + # query = self.query.lstrip() + # request_data = { + # "query": query, + # "variables": params, + # } + + # response = requests.request( + # method=self.rest_method, + # url=self.get_url({}), + # params=params, + # headers=self.http_headers, + # json=request_data, + # ) + + # return response class TapShopify(Tap):