From fd197ee43c27eeaaec81f25fb07ec4ca54450782 Mon Sep 17 00:00:00 2001 From: Dolev Farhi Date: Fri, 18 Mar 2022 23:39:24 -0400 Subject: [PATCH] refactor, include curl verification commands, conslidate http clients --- README.md | 32 ++++++++-- graphql-cop.py | 84 +++++--------------------- lib/tests/dos_alias_overloading.py | 21 +++++-- lib/tests/dos_batch.py | 23 ++++--- lib/tests/dos_directive_overloading.py | 21 +++++-- lib/tests/dos_field_duplication.py | 19 ++++-- lib/tests/info_field_suggestions.py | 20 ++++-- lib/tests/info_get_method_support.py | 18 ++++-- lib/tests/info_graphiql.py | 16 +++-- lib/tests/info_introspect.py | 18 ++++-- lib/tests/info_trace_mode.py | 28 ++++++--- lib/utils.py | 69 ++++++++++++--------- version.py | 2 +- 13 files changed, 214 insertions(+), 157 deletions(-) diff --git a/README.md b/README.md index bce145b..8175ecf 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ Options: -h, --help show this help message and exit -t URL, --target=URL target url with the path -H HEADER, --header=HEADER - Append Header to the request '{"Authorizathion": + Append Header to the request '{"Authorization": "Bearer eyjt"}' -o OUTPUT_JSON, --output=OUTPUT_JSON Output results to stdout (JSON) @@ -51,19 +51,41 @@ Starting... [LOW] GraphQL Playground UI (Information Leakage) [HIGH] Alias Overloading with 100+ aliases is allowed (Denial of Service) [HIGH] Queries are allowed with 1000+ of the same repeated field (Denial of Service) +``` +Test a website, dump to a parse-able JSON output, cURL reproduction command +``` python3 main.py -t https://mywebsite.com/graphql -o json -{'Field Suggestions': {'severity': 'LOW', 'impact': 'Information Leakage', 'description': 'Field Suggestions are Enabled'}, 'Introspection': {'severity': 'HIGH', 'impact': 'Information Leakage', 'description': 'Introspection Query Enabled'}, 'Possible CSRF (GET)': {'severity': 'LOW', 'impact': 'Possible CSRF', 'description': 'HTTP GET method supported (maybe CSRF)'}, 'Alias Overloading': {'severity': 'HIGH', 'impact': 'Denial of Service', 'description': 'Alias Overloading with 100+ aliases is allowed'}, 'Field Duplication': {'severity': 'HIGH', 'impact': 'Denial of Service', 'description': 'Queries are allowed with 1000+ of the same repeated field'}, 'Directive Overloading': {'severity': 'HIGH', 'impact': 'Denial of Service', 'description': 'Multiple duplicated directives allowed in a query'}} + {'curl_verify': 'curl -X POST -H "User-Agent: graphql-cop/1.2" -H ' + '"Accept-Encoding: gzip, deflate" -H "Accept: */*" -H ' + '"Connection: keep-alive" -H "Content-Length: 33" -H ' + '"Content-Type: application/json" -d \'{"query": "query { ' + '__typename }"}\' \'http://localhost:5013/graphql\'', + 'description': 'Tracing is Enabled', + 'impact': 'Information Leakage', + 'result': False, + 'severity': 'INFO', + 'title': 'Trace Mode'}, + {'curl_verify': 'curl -X POST -H "User-Agent: graphql-cop/1.2" -H ' + '"Accept-Encoding: gzip, deflate" -H "Accept: */*" -H ' + '"Connection: keep-alive" -H "Content-Length: 64" -H ' + '"Content-Type: application/json" -d \'{"query": "query { ' + '__typename @aa@aa@aa@aa@aa@aa@aa@aa@aa@aa }"}\' ' + "'http://localhost:5013/graphql'", + 'description': 'Multiple duplicated directives allowed in a query', + 'impact': 'Denial of Service', + 'result': True, + 'severity': 'HIGH', + 'title': 'Directive Overloading'}] ``` -Test a website -Using `graphql-cop` through a Proxy (Eg: Burp Suite) and adding custom headers (Eg: Authorization): +Test a website using `graphql-cop` through a proxy (e.g. Burp Suite) with custom headers (e.g. Authorization): ``` $ python3 graphql-cop.py -t https://mywebsite.com/graphql --proxy --header '{"Authorization": "Bearer token_here"}' - GraphQL Cop 1.1 + GraphQL Cop 1.2 Security Auditor for GraphQL Dolev Farhi & Nick Aleks diff --git a/graphql-cop.py b/graphql-cop.py index 3785303..c84d586 100644 --- a/graphql-cop.py +++ b/graphql-cop.py @@ -1,10 +1,10 @@ #!/usr/env/python3 import sys +from json import loads from optparse import OptionParser from version import VERSION from config import HEADERS -from json import loads from urllib.parse import urlparse from lib.tests.info_field_suggestions import field_suggestions @@ -21,7 +21,7 @@ parser = OptionParser(usage='%prog -t http://example.com -o json') parser.add_option('-t', '--target', dest='url', help='target url with the path') -parser.add_option('-H', '--header', dest='header', help='Append Header to the request \'{"Authorizathion": "Bearer eyjt"}\'') +parser.add_option('-H', '--header', dest='header', help='Append Header to the request \'{"Authorization": "Bearer eyjt"}\'') parser.add_option('-o', '--output', dest='output_json', help='Output results to stdout (JSON)', default=False) parser.add_option('--proxy', '-x', dest='proxy', action='store_true', default=False, @@ -55,7 +55,7 @@ print("Cannot cast %s into header dictionary. Ensure the format \'{\"key\": \"value\"}\'."%(options.header)) if not urlparse(options.url).scheme: - print("Url missing scheme (http:// or https://). Ensure Url contains a scheme.") + print("URL missing scheme (http:// or https://). Ensure ULR contains some scheme.") sys.exit(1) else: url = options.url @@ -64,73 +64,19 @@ print(url, 'does not seem to be running GraphQL.') sys.exit(1) -json_output = {} - -if field_suggestions(url, proxy, HEADERS): -# Field Suggestions - json_output['Field Suggestions'] = {} - json_output['Field Suggestions']['severity'] = 'LOW' - json_output['Field Suggestions']['impact'] = 'Information Leakage' - json_output['Field Suggestions']['description'] = 'Field Suggestions are Enabled' - -if introspection(url, proxy, HEADERS): -# Introspection - json_output['Introspection'] = {} - json_output['Introspection']['severity'] = 'HIGH' - json_output['Introspection']['impact'] = 'Information Leakage' - json_output['Introspection']['description'] = 'Introspection Query Enabled' - -if detect_graphiql(url, proxy, HEADERS): -# Playground - json_output['GraphiQL Playground'] = {} - json_output['GraphiQL Playground']['severity'] = 'LOW' - json_output['GraphiQL Playground']['impact'] = 'Information Leakage' - json_output['GraphiQL Playground']['description'] = 'GraphiQL Explorer Enabled' - -if get_method_support(url, proxy, HEADERS): -# HTTP GET method support - json_output['Possible CSRF (GET)'] = {} - json_output['Possible CSRF (GET)']['severity'] = 'LOW' - json_output['Possible CSRF (GET)']['impact'] = 'Possible CSRF' - json_output['Possible CSRF (GET)']['description'] = 'HTTP GET method supported (maybe CSRF)' - -if alias_overloading(url, proxy, HEADERS): -# Alias Overloading - json_output['Alias Overloading'] = {} - json_output['Alias Overloading']['severity'] = 'HIGH' - json_output['Alias Overloading']['impact'] = 'Denial of Service' - json_output['Alias Overloading']['description'] = 'Alias Overloading with 100+ aliases is allowed' - -if batch_query(url, proxy, HEADERS): -# Batch Queries - json_output['Batch Queries'] = {} - json_output['Batch Queries']['severity'] = 'HIGH' - json_output['Batch Queries']['impact'] = 'Denial of Service' - json_output['Batch Queries']['description'] = 'Batch queries allowed with 10+ simultaneous queries)' - -if field_duplication(url, proxy, HEADERS): -# Field Duplication - json_output['Field Duplication'] = {} - json_output['Field Duplication']['severity'] = 'HIGH' - json_output['Field Duplication']['impact'] = 'Denial of Service' - json_output['Field Duplication']['description'] = 'Queries are allowed with 500 of the same repeated field' - -if trace_mode(url, proxy, HEADERS): -# Tracing mode - json_output['Tracing Mode'] = {} - json_output['Tracing Mode']['severity'] = 'INFORMATIONAL' - json_output['Tracing Mode']['impact'] = 'Information Leakage' - json_output['Tracing Mode']['description'] = 'Tracing is enabled' +tests = [field_suggestions, introspection, detect_graphiql, + get_method_support, alias_overloading, batch_query, + field_duplication, trace_mode, directive_overloading] -if directive_overloading(url, proxy, HEADERS): -# Directive Overloading - json_output['Directive Overloading'] = {} - json_output['Directive Overloading']['severity'] = 'HIGH' - json_output['Directive Overloading']['impact'] = 'Denial of Service' - json_output['Directive Overloading']['description'] = 'Multiple duplicated directives allowed in a query' +json_output = [] +for test in tests: + json_output.append(test(url, proxy, HEADERS)) + if options.output_json == 'json': - print(json_output) + from pprint import pprint + pprint(json_output) else: - for k, v in json_output.items(): - print('[{}] {} - {} ({})'.format(v['severity'], k, v['description'], v['impact'])) + for i in json_output: + print('[{}] {} - {} ({})'.format(i['severity'], i['title'], i['description'], i['impact'])) + \ No newline at end of file diff --git a/lib/tests/dos_alias_overloading.py b/lib/tests/dos_alias_overloading.py index b0fe8f5..2e576a0 100644 --- a/lib/tests/dos_alias_overloading.py +++ b/lib/tests/dos_alias_overloading.py @@ -1,21 +1,30 @@ """Alias overloading tests.""" -from lib.utils import graph_query +from lib.utils import graph_query, curlify def alias_overloading(url, proxy, headers): """Check for alias overloading.""" - result = False + res = { + 'result':False, + 'title':'Alias Overloading', + 'description':'Alias Overloading with 100+ aliases is allowed', + 'impact':'Denial of Service', + 'severity':'HIGH', + 'curl_verify':'' + } aliases = '' for i in range(0, 101): aliases += 'alias{}:__typename \n'.format(i) gql_response = graph_query(url, proxies=proxy, headers=headers, payload='query { ' + aliases + ' }') - + + res['curl_verify'] = curlify(gql_response) + try: - if gql_response['data']['alias100']: - result = True + if gql_response.json()['data']['alias100']: + res['result'] = True except: pass - return result + return res diff --git a/lib/tests/dos_batch.py b/lib/tests/dos_batch.py index 58c69ee..b24c964 100644 --- a/lib/tests/dos_batch.py +++ b/lib/tests/dos_batch.py @@ -1,17 +1,26 @@ """Batch tests.""" -from lib.utils import graph_batch_query +from lib.utils import graph_query, curlify def batch_query(url, proxy, headers): """Check for batch queries.""" - result = False - - gql_response = graph_batch_query(url, proxies=proxy, headers=headers, payload='query { __typename }') + res = { + 'result':False, + 'title':'Array-based Query Batching', + 'description':'Batch queries allowed with 10+ simultaneous queries)', + 'impact':'Denial of Service', + 'severity':'HIGH', + 'curl_verify':'' + } + gql_response = graph_query(url, proxies=proxy, headers=headers, payload='query { __typename }', batch=True) + + res['curl_verify'] = curlify(gql_response) + try: - if len(gql_response) >= 10: - result = True + if len(gql_response.json()) >= 10: + res['result'] = True except: pass - return result + return res diff --git a/lib/tests/dos_directive_overloading.py b/lib/tests/dos_directive_overloading.py index d4696ad..8b1eb35 100644 --- a/lib/tests/dos_directive_overloading.py +++ b/lib/tests/dos_directive_overloading.py @@ -1,17 +1,26 @@ """Directive overloading tests.""" -from lib.utils import graph_query +from lib.utils import graph_query, curlify + def directive_overloading(url, proxy, headers): """Check for directive overloading.""" - result = False + res = { + 'result':False, + 'title':'Directive Overloading', + 'description':'Multiple duplicated directives allowed in a query', + 'impact':'Denial of Service', + 'severity':'HIGH', + 'curl_verify':'' + } q = 'query { __typename @aa@aa@aa@aa@aa@aa@aa@aa@aa@aa }' gql_response = graph_query(url, proxies=proxy, headers=headers, payload=q) - + res['curl_verify'] = curlify(gql_response) + try: - if len(gql_response['errors']) == 10: - result = True + if len(gql_response.json()['errors']) == 10: + res['result'] = True except: pass - return result + return res diff --git a/lib/tests/dos_field_duplication.py b/lib/tests/dos_field_duplication.py index 46dcb7d..587a719 100644 --- a/lib/tests/dos_field_duplication.py +++ b/lib/tests/dos_field_duplication.py @@ -1,18 +1,27 @@ """Field duplication tests.""" -from lib.utils import graph_query +from lib.utils import graph_query, curlify def field_duplication(url, proxy, headers): """Check for field duplication.""" - result = False + res = { + 'result':False, + 'title':'Field Duplication', + 'description':'Queries are allowed with 500 of the same repeated field', + 'impact':'Denial of Service', + 'severity':'HIGH', + 'curl_verify':'' + } duplicated_string = '__typename \n' * 500 q = 'query { ' + duplicated_string + '} ' gql_response = graph_query(url, proxies=proxy, headers=headers, payload=q) + res['curl_verify'] = curlify(gql_response) + try: - if gql_response['data']['__typename']: - result = True + if gql_response.json()['data']['__typename']: + res['result'] = True except: pass - return result + return res diff --git a/lib/tests/info_field_suggestions.py b/lib/tests/info_field_suggestions.py index 701ea45..03c11d6 100644 --- a/lib/tests/info_field_suggestions.py +++ b/lib/tests/info_field_suggestions.py @@ -1,17 +1,27 @@ """Field suggestions tests.""" -from lib.utils import graph_query, get_error +from lib.utils import graph_query, get_error, curlify def field_suggestions(url, proxy, headers): """Retrieve field suggestions.""" - result = False + res = { + 'result':False, + 'title':'Field Suggestions', + 'description':'Field Suggestions are Enabled', + 'impact':'Information Leakage', + 'severity':'LOW', + 'curl_verify':'' + } q = 'query { __schema { directive } }' gql_response = graph_query(url, proxies=proxy, headers=headers, payload=q) + res['curl_verify'] = curlify(gql_response) + + try: - if 'Did you mean' in get_error(gql_response): - result = True + if 'Did you mean' in get_error(gql_response.json()): + res['result'] = True except: pass - return result + return res diff --git a/lib/tests/info_get_method_support.py b/lib/tests/info_get_method_support.py index 23d045b..300ad1c 100644 --- a/lib/tests/info_get_method_support.py +++ b/lib/tests/info_get_method_support.py @@ -1,19 +1,27 @@ """Collect all supported methods.""" -from lib.utils import request_get +from lib.utils import request_get, curlify def get_method_support(url, proxies, headers): """Get the supported methods.""" - result = False + res = { + 'result':False, + 'title':'GET Method Query Support', + 'description':'GraphQL queries allowed using the GET method', + 'impact':'Possible Cross Site Request Forgery (CSRF)', + 'severity':'LOW', + 'curl_verify':'' + } q = '{__typename}' response = request_get(url, proxies=proxies, headers=headers, params={'query':q}) - + res['curl_verify'] = curlify(response) + try: if response and response.json()['data']['__typename']: - result = True + res['result'] = True except: pass - return result + return res diff --git a/lib/tests/info_graphiql.py b/lib/tests/info_graphiql.py index e363589..29fd2e7 100644 --- a/lib/tests/info_graphiql.py +++ b/lib/tests/info_graphiql.py @@ -1,11 +1,18 @@ """Collect GraphiQL details.""" from urllib.parse import urlparse -from lib.utils import request_get +from lib.utils import request_get, curlify def detect_graphiql(url, proxy, headers): """Get GraphiQL.""" - result = False + res = { + 'result':False, + 'title':'GraphQL IDE', + 'description':'GraphiQL Explorer/Playground Enabled', + 'impact':'Information Leakage', + 'severity':'LOW', + 'curl_verify':'' + } heuristics = ('graphiql.min.css', 'GraphQL Playground', 'GraphiQL', 'graphql-playground') endpoints = ['/graphiql', '/playground', '/console', '/graphql'] @@ -15,11 +22,12 @@ def detect_graphiql(url, proxy, headers): for endpoint in endpoints: response = request_get(url + endpoint, proxies=proxy, headers=headers) + res['curl_verify'] = curlify(response) try: if response and any(word in response.text for word in heuristics): - result = True + res['result'] = True break except: pass - return result + return res diff --git a/lib/tests/info_introspect.py b/lib/tests/info_introspect.py index 87f44b3..06d07c3 100644 --- a/lib/tests/info_introspect.py +++ b/lib/tests/info_introspect.py @@ -1,18 +1,26 @@ """Perform introspection tests.""" -from lib.utils import graph_query +from lib.utils import graph_query, curlify def introspection(url, proxy, headers): """Run introspection.""" - result = False + res = { + 'result':False, + 'title':'Introspection', + 'description':'Introspection Query Enabled', + 'impact':'Information Leakage', + 'severity':'HIGH', + 'curl_verify':'' + } q = 'query { __schema { types { name fields { name } } } }' gql_response = graph_query(url, proxies=proxy, headers=headers, payload=q) + res['curl_verify'] = curlify(gql_response) try: - if gql_response['data']['__schema']['types']: - result = True + if gql_response.json()['data']['__schema']['types']: + res['result'] = True except: pass - return result + return res diff --git a/lib/tests/info_trace_mode.py b/lib/tests/info_trace_mode.py index 25dff0b..2accb4c 100644 --- a/lib/tests/info_trace_mode.py +++ b/lib/tests/info_trace_mode.py @@ -1,22 +1,30 @@ """Collect trace mode details.""" -from lib.utils import graph_query +from lib.utils import graph_query, curlify def trace_mode(url, proxy, headers): """Get the trace mode.""" - result = False + res = { + 'result':False, + 'title':'Trace Mode', + 'description':'Tracing is Enabled', + 'impact':'Information Leakage', + 'severity':'INFO', + 'curl_verify':'' + } q = 'query { __typename }' gql_response = graph_query(url, proxies=proxy, headers=headers, payload=q) - + res['curl_verify'] = curlify(gql_response) + try: - if gql_response.get('errors', {}).get('extensions', {}).get('tracing'): - result = True - elif gql_response.get('errors', {}).get('extensions', {}).get('exception', None): - result = True - elif 'stacktrace' in str(gql_response).lower(): - result = True + if gql_response.json().get('errors', {}).get('extensions', {}).get('tracing'): + res['result'] = True + elif gql_response.json().get('errors', {}).get('extensions', {}).get('exception', None): + res['result'] = True + elif 'stacktrace' in str(gql_response.json()).lower(): + res['result'] = True except: pass - return result + return res diff --git a/lib/utils.py b/lib/utils.py index fc2fba7..5f21a17 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -1,10 +1,23 @@ """Helper parts for graphql-cop.""" import requests -#from config import HEADERS +from simplejson import JSONDecodeError from version import VERSION requests.packages.urllib3.disable_warnings() +def curlify(obj): + req = obj.request + command = "curl -X {method} -H {headers} -d '{data}' '{uri}'" + method = req.method + uri = req.url + if req.body: + data = req.body.decode('UTF-8') + else: + data = '' + headers = ['"{0}: {1}"'.format(k, v) for k, v in req.headers.items()] + headers = " -H ".join(headers) + return command.format(method=method, headers=headers, data=data, uri=uri) + def get_error(resp): """Collect the error.""" error = None @@ -14,8 +27,16 @@ def get_error(resp): pass return error -def graph_query(url, proxies, headers, operation='query', payload={}): +def graph_query(url, proxies, headers, operation='query', payload={}, batch=False): """Perform a query.""" + + if batch: + data = [] + for _ in range(10): + data.append({operation:payload}) + else: + data = {operation:payload} + try: response = requests.post(url, headers=headers, @@ -24,32 +45,13 @@ def graph_query(url, proxies, headers, operation='query', payload={}): allow_redirects=True, timeout=60, proxies=proxies, - json={operation:payload}) - return response.json() - except: + json=data) + return response + except Exception: return {} -def graph_batch_query(url, proxies, headers, operation='query', payload={}, batch=10): - """Perform a batch query.""" - try: - batch_query = [] - for _ in range(0, batch+1): - batch_query.append({operation:payload}) - - response = requests.post(url, - headers=headers, - cookies=None, - verify=False, - allow_redirects=True, - timeout=5, - proxies=proxies, - json=batch_query) - return response.json() - except: - return {} - -def request_get(url, proxies, headers, params=None): +def request_get(url, proxies, headers, params=None, data=None): """Perform requests.""" try: response = requests.get(url, @@ -59,7 +61,8 @@ def request_get(url, proxies, headers, params=None): verify=False, allow_redirects=True, proxies=proxies, - timeout=5) + timeout=5, + data=data) return response except: return None @@ -72,11 +75,19 @@ def is_graphql(url, proxies, headers): } ''' response = graph_query(url, proxies, headers, payload=query) - if response.get('data', {}).get('__typename', '') in ('Query', 'QueryRoot', 'query_root'): + + try: + response.json() + except AttributeError: + return False + except JSONDecodeError: + return False + + if response.json().get('data', {}).get('__typename', '') in ('Query', 'QueryRoot', 'query_root'): return True - elif response.get('errors') and (any('locations' in i for i in response['errors']) or (any('extensions' in i for i in response))): + elif response.json().get('errors') and (any('locations' in i for i in response['errors']) or (any('extensions' in i for i in response))): return True - elif response.get('data'): + elif response.json().get('data'): return True else: return False diff --git a/version.py b/version.py index f2ce76c..262a891 100644 --- a/version.py +++ b/version.py @@ -1,2 +1,2 @@ """Version details of graphql-cop.""" -VERSION = '1.1' +VERSION = '1.2'