diff --git a/README.md b/README.md
index 17947eba3..778da0160 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,5 @@
-![PyPI - Status](https://img.shields.io/pypi/status/crowdstrike-falconpy)
-![PyPI](https://img.shields.io/pypi/v/crowdstrike-falconpy) ![PyPI - Downloads](https://img.shields.io/pypi/dm/crowdstrike-falconpy) ![CI Tests](https://github.com/CrowdStrike/falconpy/workflows/Python%20package/badge.svg)
-![PyPI - Implementation](https://img.shields.io/pypi/implementation/crowdstrike-falconpy) ![PyPI - Python Version](https://img.shields.io/pypi/pyversions/crowdstrike-falconpy) ![PyPI - Wheel](https://img.shields.io/pypi/wheel/crowdstrike-falconpy)
+![PyPI - Status](https://img.shields.io/pypi/status/crowdstrike-falconpy) ![PyPI - Implementation](https://img.shields.io/pypi/implementation/crowdstrike-falconpy) ![PyPI - Python Version](https://img.shields.io/pypi/pyversions/crowdstrike-falconpy) ![PyPI - Wheel](https://img.shields.io/pypi/wheel/crowdstrike-falconpy)
+![PyPI](https://img.shields.io/pypi/v/crowdstrike-falconpy) ![PyPI - Downloads](https://img.shields.io/pypi/dm/crowdstrike-falconpy) ![CI Tests](https://github.com/CrowdStrike/falconpy/workflows/Python%20package/badge.svg) ![CI Test Coverage](https://raw.githubusercontent.com/CrowdStrike/falconpy/main/tests/coverage.svg)
![Twitter URL](https://img.shields.io/twitter/url?label=Follow%20%40CrowdStrike&style=social&url=https%3A%2F%2Ftwitter.com%2FCrowdStrike)
# FalconPy
@@ -34,7 +33,7 @@ $ python -m pip uninstall crowdstrike-falconpy
| CrowdStrike Sensor Policy Management API | [./src/falconpy/sensor_update_policy.py](./src/falconpy/sensor_update_policy.py) |
| [CrowdStrike Custom Indicators of Compromose (IOCs) APIs](https://falcon.crowdstrike.com/support/documentation/88/custom-ioc-apis) | [./src/falconpy/iocs.py](./src/falconpy/iocs.py) |
| [CrowdStrike Detections APIs](https://falcon.crowdstrike.com/support/documentation/85/detection-and-prevention-policies-apis) | [./src/falconpy/detects.py](./src/falconpy/detects.py) |
-| [CrowdStrike Event Streams API](https://falcon.crowdstrike.com/support/documentation/89/event-streams-apis)| [./serices/event_streams.py](./src/falconpy/event_streams.py) |
+| [CrowdStrike Event Streams API](https://falcon.crowdstrike.com/support/documentation/89/event-streams-apis)| [./src/falconpy/event_streams.py](./src/falconpy/event_streams.py) |
| [CrowdStrike Falcon Horizon APIs](https://falcon.crowdstrike.com/support/documentation/137/falcon-horizon-apis) | *Coming Soon* |
| [CrowdStrike Falon X APIs](https://falcon.crowdstrike.com/support/documentation/92/falcon-x-apis) | *Coming Soon* |
| [CrowdStrike Firewall Management API](https://falcon.crowdstrike.com/support/documentation/107/falcon-firewall-management-apis) | [./src/falconpy/firewall_management.py](./src/falconpy/firewall_management.py) |
diff --git a/tests/coverage.svg b/tests/coverage.svg
new file mode 100644
index 000000000..b6c4e361f
--- /dev/null
+++ b/tests/coverage.svg
@@ -0,0 +1,21 @@
+
+
diff --git a/tests/test_authorization.py b/tests/test_authorization.py
index 6ef7f8525..d0eadb7b2 100644
--- a/tests/test_authorization.py
+++ b/tests/test_authorization.py
@@ -80,6 +80,26 @@ def serviceAuth(self):
else:
return False
+ def failServiceAuth(self):
+
+ self.authorization = FalconAuth.OAuth2(creds={
+ 'client_id': "BadClientID",
+ 'client_secret': "BadClientSecret"
+ })
+ self.authorization.base_url = "nowhere"
+ try:
+ self.token = self.authorization.token()['body']['access_token']
+ except:
+ self.token = False
+
+ self.authorization.revoke(self.token)
+
+ if self.token:
+ return False
+ else:
+ return True
+
+
def serviceRevoke(self):
try:
result = self.authorization.revoke(token=self.token)["status_code"]
@@ -106,5 +126,8 @@ def test_serviceRevoke(self):
self.serviceAuth()
assert self.serviceRevoke() == True
+ def test_failServiceAuth(self):
+ assert self.failServiceAuth() == True
+
diff --git a/tests/test_cloud_connect_aws.py b/tests/test_cloud_connect_aws.py
index 120c378cf..478e32744 100644
--- a/tests/test_cloud_connect_aws.py
+++ b/tests/test_cloud_connect_aws.py
@@ -16,8 +16,20 @@
auth = Authorization.TestAuthorization()
auth.serviceAuth()
falcon = FalconAWS.Cloud_Connect_AWS(access_token=auth.token)
-AllowedResponses = [200, 429] #Adding rate-limiting as an allowed response for now
-
+AllowedResponses = [200, 201, 429] #Adding rate-limiting as an allowed response for now
+accountPayload = {
+ "resources": [
+ {
+ # "cloudtrail_bucket_owner_id": cloudtrail_bucket_owner_id,
+ # "cloudtrail_bucket_region": cloudtrail_bucket_region,
+ # "external_id": external_id,
+ # "iam_role_arn": iam_role_arn,
+ # "id": local_account,
+ "rate_limit_reqs": 0,
+ "rate_limit_time": 0
+ }
+ ]
+ }
class TestCloudConnectAWS:
def serviceCCAWS_GetAWSSettings(self):
if falcon.GetAWSSettings()["status_code"] in AllowedResponses:
@@ -26,19 +38,44 @@ def serviceCCAWS_GetAWSSettings(self):
return False
def serviceCCAWS_QueryAWSAccounts(self):
- if falcon.QueryAWSAccounts()["status_code"] in AllowedResponses:
+ if falcon.QueryAWSAccounts(parameters={"limit":1})["status_code"] in AllowedResponses:
return True
else:
return False
- @pytest.mark.skipif(falcon.QueryAWSAccounts(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def serviceCCAWS_GetAWSAccounts(self):
if falcon.GetAWSAccounts(ids=falcon.QueryAWSAccounts(parameters={"limit":1})["body"]["resources"][0]["id"])["status_code"] in AllowedResponses:
return True
else:
return False
- @pytest.mark.skipif(falcon.QueryAWSAccounts(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
+ def serviceCCAWS_AccountUpdate(self):
+ account = falcon.QueryAWSAccounts(parameters={"limit":1})["body"]["resources"][0]
+ accountPayload["resources"][0]["cloudtrail_bucket_owner_id"] = account["cloudtrail_bucket_owner_id"]
+ accountPayload["resources"][0]["cloudtrail_bucket_region"] = account["cloudtrail_bucket_region"]
+ orig_external_id = account["external_id"]
+ accountPayload["resources"][0]["external_id"] = "UnitTesting"
+ accountPayload["resources"][0]["iam_role_arn"] = account["iam_role_arn"]
+ accountPayload["resources"][0]["id"] = account["id"]
+ if falcon.UpdateAWSAccounts(body=accountPayload)["status_code"] in AllowedResponses:
+ accountPayload["resources"][0]["external_id"] = orig_external_id
+ return True
+ else:
+ accountPayload["resources"][0]["external_id"] = orig_external_id
+ return False
+
+ def serviceCCAWS_AccountDelete(self):
+ if falcon.DeleteAWSAccounts(ids=accountPayload["resources"][0]["id"])["status_code"] in AllowedResponses:
+ return True
+ else:
+ return False
+
+ def serviceCCAWS_AccountRegister(self):
+ if falcon.ProvisionAWSAccounts(body=accountPayload)["status_code"] in AllowedResponses:
+ return True
+ else:
+ return False
+
def serviceCCAWS_VerifyAWSAccountAccess(self):
if falcon.VerifyAWSAccountAccess(ids=falcon.QueryAWSAccounts(parameters={"limit":1})["body"]["resources"][0]["id"])["status_code"] in AllowedResponses:
return True
@@ -51,20 +88,65 @@ def serviceCCAWS_QueryAWSAccountsForIDs(self):
else:
return False
+ def serviceCCAWS_GenerateErrors(self):
+ # Garf the base_url so we force 500s for each method to cover all remaining code paths
+ falcon.base_url = "nowhere"
+ errorChecks = True
+ if falcon.QueryAWSAccounts()["status_code"] != 500:
+ errorChecks = False
+ if falcon.QueryAWSAccountsForIDs()["status_code"] != 500:
+ errorChecks = False
+ if falcon.GetAWSSettings()["status_code"] != 500:
+ errorChecks = False
+ if falcon.GetAWSAccounts(ids="1234567890")["status_code"] != 500:
+ errorChecks = False
+ if falcon.UpdateAWSAccounts(body={})["status_code"] != 500:
+ errorChecks = False
+ if falcon.DeleteAWSAccounts(ids="1234567890")["status_code"] != 500:
+ errorChecks = False
+ if falcon.ProvisionAWSAccounts(body={})["status_code"] != 500:
+ errorChecks = False
+ if falcon.CreateOrUpdateAWSSettings(body={})["status_code"] != 500:
+ errorChecks = False
+ if falcon.VerifyAWSAccountAccess(ids="1234567890", body={})["status_code"] != 500:
+ errorChecks = False
+
+ return errorChecks
+
def test_GetAWSSettings(self):
assert self.serviceCCAWS_GetAWSSettings() == True
def test_QueryAWSAccounts(self):
assert self.serviceCCAWS_QueryAWSAccounts() == True
-
+
+ @pytest.mark.skipif(falcon.QueryAWSAccounts(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_GetAWSAccounts(self):
assert self.serviceCCAWS_GetAWSAccounts() == True
-
+
+ @pytest.mark.skipif(falcon.QueryAWSAccounts(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_VerifyAWSAccountAccess(self):
assert self.serviceCCAWS_VerifyAWSAccountAccess() == True
-
+
+ @pytest.mark.skipif(falcon.QueryAWSAccounts(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
+ @pytest.mark.skipif(sys.version_info.minor < 9, reason="Frequency reduced due to potential race condition")
+ def test_AccountUpdate(self):
+ assert self.serviceCCAWS_AccountUpdate() == True
+
+ @pytest.mark.skipif(falcon.QueryAWSAccounts(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
+ @pytest.mark.skipif(sys.version_info.minor < 9, reason="Frequency reduced due to potential race condition")
+ def test_AccountDelete(self):
+ assert self.serviceCCAWS_AccountDelete() == True
+
def test_QueryAWSAccountsForIDs(self):
assert self.serviceCCAWS_QueryAWSAccountsForIDs() == True
+
+ @pytest.mark.skipif(falcon.QueryAWSAccounts(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
+ @pytest.mark.skipif(sys.version_info.minor < 9, reason="Frequency reduced due to potential race condition")
+ def test_AccountRegister(self):
+ assert self.serviceCCAWS_AccountRegister() == True
+
+ def test_Logout(self):
+ assert auth.serviceRevoke() == True
- def test_logout(self):
- assert auth.serviceRevoke() == True
\ No newline at end of file
+ def test_Errors(self):
+ assert self.serviceCCAWS_GenerateErrors() == True
\ No newline at end of file
diff --git a/tests/test_detects.py b/tests/test_detects.py
index 4ec8f1c27..0269c7bc2 100644
--- a/tests/test_detects.py
+++ b/tests/test_detects.py
@@ -26,7 +26,6 @@ def serviceDetects_QueryDetects(self):
else:
return False
- @pytest.mark.skipif(falcon.QueryDetects(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def serviceDetects_GetDetectSummaries(self):
if falcon.GetDetectSummaries(body={"ids":falcon.QueryDetects(parameters={"limit":1})["body"]["resources"]})["status_code"] in AllowedResponses:
return True
@@ -34,17 +33,31 @@ def serviceDetects_GetDetectSummaries(self):
return False
# def serviceDetects_GetAggregateDetects(self):
- # auth, falcon = self.authenticate()
- # if falcon.GetAggregateDetects(body={"ids":falcon.QueryDetects(parameters={"limit":1})["body"]["resources"]})["status_code"] in AllowedResponses:
- # auth.serviceRevoke()
+ # print(falcon.QueryDetects(parameters={"limit":1}))
+ # print(falcon.GetAggregateDetects(body=[{"ranges":"ranges'{}'".format(falcon.QueryDetects(parameters={"limit":1})["body"]["resources"][0])}]))
+ # if falcon.GetAggregateDetects(body={"id":falcon.QueryDetects(parameters={"limit":1})["body"]["resources"][0]})["status_code"] in AllowedResponses:
# return True
# else:
- # auth.serviceRevoke()
# return False
+ def serviceDetects_GenerateErrors(self):
+ falcon.base_url = "nowhere"
+ errorChecks = True
+ if falcon.QueryDetects()["status_code"] != 500:
+ errorChecks = False
+ if falcon.GetDetectSummaries(body={})["status_code"] != 500:
+ errorChecks = False
+ if falcon.GetAggregateDetects(body={})["status_code"] != 500:
+ errorChecks = False
+ if falcon.UpdateDetectsByIdsV2(body={})["status_code"] != 500:
+ errorChecks = False
+
+ return errorChecks
+
def test_QueryDetects(self):
assert self.serviceDetects_QueryDetects() == True
-
+
+ @pytest.mark.skipif(falcon.QueryDetects(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_GetDetectSummaries(self):
assert self.serviceDetects_GetDetectSummaries() == True
@@ -52,4 +65,7 @@ def test_GetDetectSummaries(self):
# assert self.serviceDetects_GetAggregateDetects() == True
def test_logout(self):
- assert auth.serviceRevoke() == True
\ No newline at end of file
+ assert auth.serviceRevoke() == True
+
+ def test_Errors(self):
+ assert self.serviceDetects_GenerateErrors() == True
\ No newline at end of file
diff --git a/tests/test_device_control_policies.py b/tests/test_device_control_policies.py
index 308e3eaf8..708a62184 100644
--- a/tests/test_device_control_policies.py
+++ b/tests/test_device_control_policies.py
@@ -26,14 +26,13 @@ def serviceDeviceControlPolicies_queryDeviceControlPolicies(self):
else:
return False
- @pytest.mark.skipif(falcon.queryDeviceControlPolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def serviceDeviceControlPolicies_queryDeviceControlPolicyMembers(self):
if falcon.queryDeviceControlPolicyMembers(parameters={"id": falcon.queryDeviceControlPolicies(parameters={"limit":1})["body"]["resources"][0]})["status_code"] in AllowedResponses:
return True
else:
return False
- @pytest.mark.skipif(falcon.queryDeviceControlPolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
+
def serviceDeviceControlPolicies_getDeviceControlPolicies(self):
if falcon.getDeviceControlPolicies(ids=falcon.queryDeviceControlPolicies(parameters={"limit":1})["body"]["resources"][0])["status_code"] in AllowedResponses:
return True
@@ -46,27 +45,53 @@ def serviceDeviceControlPolicies_queryCombinedDeviceControlPolicies(self):
else:
return False
- @pytest.mark.skipif(falcon.queryCombinedDeviceControlPolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def serviceDeviceControlPolicies_queryCombinedDeviceControlPolicyMembers(self):
if falcon.queryCombinedDeviceControlPolicyMembers(parameters={"id": falcon.queryCombinedDeviceControlPolicies(parameters={"limit":1})["body"]["resources"][0]["id"]})["status_code"] in AllowedResponses:
return True
else:
return False
+ def serviceDeviceControlPolicies_GenerateErrors(self):
+ falcon.base_url = "nowhere"
+ errorChecks = True
+ commandList = [
+ ["queryCombinedDeviceControlPolicyMembers",""],
+ ["queryCombinedDeviceControlPolicies",""],
+ ["performDeviceControlPoliciesAction","body={}, parameters={}"],
+ ["setDeviceControlPoliciesPrecedence", "body={}"],
+ ["getDeviceControlPolicies","ids='12345678'"],
+ ["createDeviceControlPolicies","body={}"],
+ ["deleteDeviceControlPolicies","ids='12345678'"],
+ ["updateDeviceControlPolicies","body={}"],
+ ["queryDeviceControlPolicyMembers",""],
+ ["queryDeviceControlPolicies",""]
+ ]
+ for cmd in commandList:
+ if eval("falcon.{}({})['status_code']".format(cmd[0],cmd[1])) != 500:
+ errorChecks = False
+
+ return errorChecks
+
def test_queryDeviceControlPolicies(self):
assert self.serviceDeviceControlPolicies_queryDeviceControlPolicies() == True
-
+
+ @pytest.mark.skipif(falcon.queryDeviceControlPolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_queryDeviceControlPolicyMembers(self):
assert self.serviceDeviceControlPolicies_queryDeviceControlPolicyMembers() == True
+ @pytest.mark.skipif(falcon.queryDeviceControlPolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_getDeviceControlPolicies(self):
assert self.serviceDeviceControlPolicies_getDeviceControlPolicies() == True
def test_queryCombinedDeviceControlPolicies(self):
assert self.serviceDeviceControlPolicies_queryCombinedDeviceControlPolicies() == True
-
+
+ @pytest.mark.skipif(falcon.queryCombinedDeviceControlPolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_queryCombinedDeviceControlPolicyMembers(self):
assert self.serviceDeviceControlPolicies_queryCombinedDeviceControlPolicyMembers() == True
- def test_logout(self):
- assert auth.serviceRevoke() == True
\ No newline at end of file
+ def test_Logout(self):
+ assert auth.serviceRevoke() == True
+
+ def test_Errors(self):
+ assert self.serviceDeviceControlPolicies_GenerateErrors() == True
\ No newline at end of file
diff --git a/tests/test_event_streams.py b/tests/test_event_streams.py
index 3cf19583d..2b565d6dc 100644
--- a/tests/test_event_streams.py
+++ b/tests/test_event_streams.py
@@ -28,7 +28,6 @@ def serviceStream_listAvailableStreamsOAuth2(self):
else:
return False
- @pytest.mark.skipif(falcon.listAvailableStreamsOAuth2(parameters={"appId":"pytest-event_streams-unit-test"})["status_code"] == 429, reason="API rate limit reached")
def serviceStream_refreshActiveStreamSession(self):
avail = falcon.listAvailableStreamsOAuth2(parameters={"appId":"pytest-event_streams-unit-test"})
t1 = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S +0000')
@@ -39,12 +38,26 @@ def serviceStream_refreshActiveStreamSession(self):
return True
else:
return False
+
+ def serviceStream_GenerateErrors(self):
+ falcon.base_url = "nowhere"
+ errorChecks = True
+ if falcon.listAvailableStreamsOAuth2(parameters={})["status_code"] != 500:
+ errorChecks = False
+ if falcon.refreshActiveStreamSession(parameters={}, partition=0)["status_code"] != 500:
+ errorChecks = False
+
+ return errorChecks
def test_listAvailableStreamsOAuth2(self):
assert self.serviceStream_listAvailableStreamsOAuth2() == True
+ #@pytest.mark.skipif(falcon.listAvailableStreamsOAuth2(parameters={"appId":"pytest-event_streams-unit-test"})["status_code"] == 429, reason="API rate limit reached")
# def test_refreshActiveStreamSession(self):
# assert self.serviceStream_refreshActiveStreamSession() == True
- def test_logout(self):
- assert auth.serviceRevoke() == True
\ No newline at end of file
+ def test_Logout(self):
+ assert auth.serviceRevoke() == True
+
+ def test_Errors(self):
+ assert self.serviceStream_GenerateErrors() == True
\ No newline at end of file
diff --git a/tests/test_falconx_sandbox.py b/tests/test_falconx_sandbox.py
index c43bc89e9..e50c071f4 100644
--- a/tests/test_falconx_sandbox.py
+++ b/tests/test_falconx_sandbox.py
@@ -32,21 +32,47 @@ def serviceFalconX_QuerySubmissions(self):
else:
return False
- @pytest.mark.skipif(falcon.QueryReports(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def serviceFalconX_GetSummaryReports(self):
if falcon.GetSummaryReports(ids=falcon.QueryReports(parameters={"limit":1})["body"]["resources"])["status_code"] in AllowedResponses:
return True
else:
return False
+ def serviceFalconX_GenerateErrors(self):
+ falcon.base_url = "nowhere"
+ errorChecks = True
+ commandList = [
+ ["GetArtifacts", "parameters={}"],
+ ["GetSummaryReports", "ids='12345678'"],
+ # ["GetReports","body={}, parameters={}"],
+ # ["DeleteReport", "body={}"],
+ ["GetSubmissions","ids='12345678'"],
+ ["Submit","body={}"],
+ ["QueryReports",""],
+ ["QuerySubmissions",""],
+ # ["GetSampleV2","body={}"],
+ ["UploadSampleV2", "body={},parameters={}"]
+ # ["DeleteSampleV2", ""],
+ # ["QuerySampleV1", ""]
+ ]
+ for cmd in commandList:
+ if eval("falcon.{}({})['status_code']".format(cmd[0],cmd[1])) != 500:
+ errorChecks = False
+
+ return errorChecks
+
def test_QueryReports(self):
assert self.serviceFalconX_QueryReports() == True
def test_QuerySubmissions(self):
assert self.serviceFalconX_QuerySubmissions() == True
+ @pytest.mark.skipif(falcon.QueryReports(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_GetSummaryReports(self):
assert self.serviceFalconX_GetSummaryReports() == True
- def test_logout(self):
- assert auth.serviceRevoke() == True
\ No newline at end of file
+ def test_Logout(self):
+ assert auth.serviceRevoke() == True
+
+ def test_Errors(self):
+ assert self.serviceFalconX_GenerateErrors() == True
\ No newline at end of file
diff --git a/tests/test_firewall_management.py b/tests/test_firewall_management.py
index 522252277..1ddb57fd3 100644
--- a/tests/test_firewall_management.py
+++ b/tests/test_firewall_management.py
@@ -24,10 +24,60 @@ def serviceFirewall_query_rules(self):
else:
return False
+
+ def serviceFirewall_GenerateErrors(self):
+ falcon.base_url = "nowhere"
+ errorChecks = True
+ if falcon.aggregate_events(body={})["status_code"] != 500:
+ errorChecks = False
+ if falcon.aggregate_policy_rules(body={})["status_code"] != 500:
+ errorChecks = False
+ if falcon.aggregate_rule_groups(body={})["status_code"] != 500:
+ errorChecks = False
+ if falcon.aggregate_rules(body={})["status_code"] != 500:
+ errorChecks = False
+ if falcon.get_events(ids="12345678")["status_code"] != 500:
+ errorChecks = False
+ if falcon.get_firewall_fields(ids="12345678")["status_code"] != 500:
+ errorChecks = False
+ if falcon.get_platforms(ids="12345678")["status_code"] != 500:
+ errorChecks = False
+ if falcon.get_policy_containers(ids="12345678")["status_code"] != 500:
+ errorChecks = False
+ if falcon.update_policy_container(body={}, cs_username="BillTheCat")["status_code"] != 500:
+ errorChecks = False
+ if falcon.get_rule_groups(ids="12345678")["status_code"] != 500:
+ errorChecks = False
+ if falcon.create_rule_group(body={}, cs_username="HarryHenderson")["status_code"] != 500:
+ errorChecks = False
+ if falcon.delete_rule_groups(ids="12345678", cs_username="KyloRen")["status_code"] != 500:
+ errorChecks = False
+ if falcon.update_rule_group(body={}, cs_username="Calcifer")["status_code"] != 500:
+ errorChecks = False
+ if falcon.get_rules(ids="12345678")["status_code"] != 500:
+ errorChecks = False
+ if falcon.query_events()["status_code"] != 500:
+ errorChecks = False
+ if falcon.query_firewall_fields()["status_code"] != 500:
+ errorChecks = False
+ if falcon.query_platforms()["status_code"] != 500:
+ errorChecks = False
+ if falcon.query_policy_rules()["status_code"] != 500:
+ errorChecks = False
+ if falcon.query_rule_groups()["status_code"] != 500:
+ errorChecks = False
+ if falcon.query_rules()["status_code"] != 500:
+ errorChecks = False
+
+ return errorChecks
+
# def test_query_rules(self):
# assert self.serviceFirewall_query_rules() == True
- def test_logout(self):
+ def test_Logout(self):
assert auth.serviceRevoke() == True
+ def test_Errors(self):
+ assert self.serviceFirewall_GenerateErrors() == True
+
#TODO: My current API key can't hit this API. Pending additional unit testing for now.
\ No newline at end of file
diff --git a/tests/test_firewall_policies.py b/tests/test_firewall_policies.py
index 35c6c97bd..5f1728309 100644
--- a/tests/test_firewall_policies.py
+++ b/tests/test_firewall_policies.py
@@ -27,6 +27,30 @@ def serviceFirewall_queryFirewallPolicies(self):
# def test_queryFirewallPolicies(self):
# assert self.serviceFirewall_queryFirewallPolicies() == True
- def test_logout(self):
+ def serviceFirewall_GenerateErrors(self):
+ falcon.base_url = "nowhere"
+ errorChecks = True
+ commandList = [
+ ["queryCombinedFirewallPolicyMembers",""],
+ ["queryCombinedFirewallPolicies",""],
+ ["performFirewallPoliciesAction","body={}, parameters={}"],
+ ["setFirewallPoliciesPrecedence","body={}"],
+ ["getFirewallPolicies","ids='12345678'"],
+ ["createFirewallPolicies","body={}"],
+ ["deleteFirewallPolicies","ids='12345678'"],
+ ["updateFirewallPolicies","body={}"],
+ ["queryFirewallPolicyMembers",""],
+ ["queryFirewallPolicies", ""]
+ ]
+ for cmd in commandList:
+ if eval("falcon.{}({})['status_code']".format(cmd[0],cmd[1])) != 500:
+ errorChecks = False
+
+ return errorChecks
+
+ def test_Logout(self):
assert auth.serviceRevoke() == True
+
+ def test_Errors(self):
+ assert self.serviceFirewall_GenerateErrors() == True
#TODO: My current API key can't hit this API. Pending additional unit testing for now.
\ No newline at end of file
diff --git a/tests/test_host_group.py b/tests/test_host_group.py
index fdd961ae0..b8f617129 100644
--- a/tests/test_host_group.py
+++ b/tests/test_host_group.py
@@ -25,14 +25,12 @@ def serviceHostGroup_queryHostGroups(self):
else:
return False
- @pytest.mark.skipif(falcon.queryHostGroups(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def serviceHostGroup_queryGroupMembers(self):
if falcon.queryGroupMembers(parameters={"limit":1,"id":falcon.queryHostGroups(parameters={"limit":1})["body"]["resources"][0]})["status_code"] in AllowedResponses:
return True
else:
return False
- @pytest.mark.skipif(falcon.queryHostGroups(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def serviceHostGroup_getHostGroups(self):
if falcon.getHostGroups(ids=falcon.queryHostGroups(parameters={"limit":1})["body"]["resources"][0])["status_code"] in AllowedResponses:
return True
@@ -45,27 +43,53 @@ def serviceHostGroup_queryCombinedHostGroups(self):
else:
return False
- @pytest.mark.skipif(falcon.queryCombinedHostGroups(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
+
def serviceHostGroup_queryCombinedGroupMembers(self):
if falcon.queryCombinedGroupMembers(parameters={"limit":1,"id":falcon.queryCombinedHostGroups(parameters={"limit":1})["body"]["resources"][0]["id"]})["status_code"] in AllowedResponses:
return True
else:
return False
+ def serviceHostGroup_GenerateErrors(self):
+ falcon.base_url = "nowhere"
+ errorChecks = True
+ commandList = [
+ ["queryCombinedGroupMembers",""],
+ ["queryCombinedHostGroups",""],
+ ["performGroupAction","body={}, parameters={}"],
+ ["getHostGroups","ids='12345678'"],
+ ["createHostGroups","body={}"],
+ ["deleteHostGroups","ids='12345678'"],
+ ["updateHostGroups","body={}"],
+ ["queryGroupMembers",""],
+ ["queryHostGroups",""]
+ ]
+ for cmd in commandList:
+ if eval("falcon.{}({})['status_code']".format(cmd[0],cmd[1])) != 500:
+ errorChecks = False
+
+ return errorChecks
+
def test_queryHostGroups(self):
assert self.serviceHostGroup_queryHostGroups() == True
+ @pytest.mark.skipif(falcon.queryHostGroups(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_queryGroupMembers(self):
assert self.serviceHostGroup_queryGroupMembers() == True
+ @pytest.mark.skipif(falcon.queryHostGroups(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_getHostGroups(self):
assert self.serviceHostGroup_getHostGroups() == True
def test_queryCombinedHostGroups(self):
assert self.serviceHostGroup_queryCombinedHostGroups() == True
-
+
+ @pytest.mark.skipif(falcon.queryCombinedHostGroups(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_queryCombinedGroupMembers(self):
assert self.serviceHostGroup_queryCombinedGroupMembers() == True
- def test_logout(self):
- assert auth.serviceRevoke() == True
\ No newline at end of file
+ def test_Logout(self):
+ assert auth.serviceRevoke() == True
+
+ def test_Errors(self):
+ assert self.serviceHostGroup_GenerateErrors() == True
\ No newline at end of file
diff --git a/tests/test_hosts.py b/tests/test_hosts.py
index dcc45706c..a5a5d766d 100644
--- a/tests/test_hosts.py
+++ b/tests/test_hosts.py
@@ -63,6 +63,22 @@ def serviceHosts_PerformActionV2(self):
else:
return False
+ def serviceHosts_GenerateErrors(self):
+ falcon.base_url = "nowhere"
+ errorChecks = True
+ commandList = [
+ ["PerformActionV2","body={}, parameters={}"],
+ ["GetDeviceDetails","ids='12345678'"],
+ ["QueryHiddenDevices",""],
+ ["QueryDevicesByFilterScroll",""],
+ ["QueryDevicesByFilter",""]
+ ]
+ for cmd in commandList:
+ if eval("falcon.{}({})['status_code']".format(cmd[0],cmd[1])) != 500:
+ errorChecks = False
+
+ return errorChecks
+
def test_QueryHiddenDevices(self):
assert self.serviceHosts_QueryHiddenDevices() == True
@@ -79,5 +95,8 @@ def test_QueryDevicesByFilter(self):
# def test_PerformActionV2(self):
# assert self.serviceHosts_PerformActionV2() == True
- def test_logout(self):
- assert auth.serviceRevoke() == True
\ No newline at end of file
+ def test_Logout(self):
+ assert auth.serviceRevoke() == True
+
+ def test_Errors(self):
+ assert self.serviceHosts_GenerateErrors() == True
\ No newline at end of file
diff --git a/tests/test_incidents.py b/tests/test_incidents.py
index 298481ce4..e18d2324d 100644
--- a/tests/test_incidents.py
+++ b/tests/test_incidents.py
@@ -38,19 +38,35 @@ def serviceIncidents_QueryIncidents(self):
else:
return False
- @pytest.mark.skipif(falcon.QueryBehaviors(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def serviceIncidents_GetBehaviors(self):
if falcon.GetBehaviors(body={"ids":falcon.QueryBehaviors(parameters={"limit":1})["body"]["resources"]})["status_code"] in AllowedResponses:
return True
else:
return False
- @pytest.mark.skipif(falcon.QueryIncidents(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
+
def serviceIncidents_GetIncidents(self):
if falcon.GetIncidents(body={"ids":falcon.QueryIncidents(parameters={"limit":1})["body"]["resources"]})["status_code"] in AllowedResponses:
return True
else:
return False
+ def serviceIncidents_GenerateErrors(self):
+ falcon.base_url = "nowhere"
+ errorChecks = True
+ commandList = [
+ ["CrowdScore",""],
+ ["GetBehaviors","body={}"],
+ ["PerformIncidentAction","body={}"],
+ ["GetIncidents","body={}"],
+ ["QueryBehaviors",""],
+ ["QueryIncidents",""]
+ ]
+ for cmd in commandList:
+ if eval("falcon.{}({})['status_code']".format(cmd[0],cmd[1])) != 500:
+ errorChecks = False
+
+ return errorChecks
+
def test_CrowdScore(self):
assert self.serviceIncidents_CrowdScore() == True
@@ -59,12 +75,17 @@ def test_QueryBehaviors(self):
def test_QueryIncidents(self):
assert self.serviceIncidents_QueryIncidents() == True
-
+
+ @pytest.mark.skipif(falcon.QueryIncidents(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_GetIncidents(self):
assert self.serviceIncidents_GetIncidents() == True
-
+
+ @pytest.mark.skipif(falcon.QueryBehaviors(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_GetBehaviors(self):
assert self.serviceIncidents_GetBehaviors() == True
- def test_logout(self):
- assert auth.serviceRevoke() == True
\ No newline at end of file
+ def test_Logout(self):
+ assert auth.serviceRevoke() == True
+
+ def test_Errors(self):
+ assert self.serviceIncidents_GenerateErrors() == True
\ No newline at end of file
diff --git a/tests/test_intel.py b/tests/test_intel.py
index 7a206737a..98808ec95 100644
--- a/tests/test_intel.py
+++ b/tests/test_intel.py
@@ -36,19 +36,18 @@ def serviceIntel_QueryIntelReportEntities(self):
else:
return False
- @pytest.mark.skipif(falcon.QueryIntelActorEntities(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def serviceIntel_GetIntelActorEntities(self):
if falcon.GetIntelActorEntities(ids=falcon.QueryIntelActorEntities(parameters={"limit":1})["body"]["resources"][0])["status_code"] in AllowedResponses:
return True
else:
return False
- @pytest.mark.skipif(falcon.QueryIntelIndicatorIds(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
+
def serviceIntel_GetIntelIndicatorEntities(self):
if falcon.GetIntelIndicatorEntities(body={"id": falcon.QueryIntelIndicatorIds(parameters={"limit":1})["body"]["resources"][0]})["status_code"] in AllowedResponses:
return True
else:
return False
- @pytest.mark.skipif(falcon.QueryIntelReportEntities(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
+
def serviceIntel_GetIntelReportEntities(self):
if falcon.GetIntelReportEntities(ids=falcon.QueryIntelReportEntities(parameters={"limit":1})["body"]["resources"][0])["status_code"] in AllowedResponses:
return True
@@ -79,6 +78,31 @@ def serviceIntel_QueryIntelRuleIds(self):
else:
return False
+ def serviceIntel_GenerateErrors(self):
+ falcon.base_url = "nowhere"
+ errorChecks = True
+ commandList = [
+ ["QueryIntelActorEntities",""],
+ ["QueryIntelIndicatorEntities",""],
+ ["QueryIntelReportEntities",""],
+ ["GetIntelActorEntities","ids='12345678'"],
+ ["GetIntelIndicatorEntities","body={}"],
+ ["GetIntelReportPDF","parameters={}"],
+ ["GetIntelReportEntities","ids='12345678'"],
+ ["GetIntelRuleFile","parameters={}"],
+ ["GetLatestIntelRuleFile","parameters={}"],
+ ["GetIntelRuleEntities", "ids='12345678'"],
+ ["QueryIntelActorIds", ""],
+ ["QueryIntelIndicatorIds", ""],
+ ["QueryIntelReportIds", ""],
+ ["QueryIntelRuleIds", "parameters={}"]
+ ]
+ for cmd in commandList:
+ if eval("falcon.{}({})['status_code']".format(cmd[0],cmd[1])) != 500:
+ errorChecks = False
+
+ return errorChecks
+
def test_QueryIntelActorEntities(self):
assert self.serviceIntel_QueryIntelActorEntities() == True
@@ -87,14 +111,17 @@ def test_QueryIntelIndicatorEntities(self):
def test_QueryIntelReportEntities(self):
assert self.serviceIntel_QueryIntelReportEntities() == True
-
+
+ @pytest.mark.skipif(falcon.QueryIntelActorEntities(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_GetIntelActorEntities(self):
assert self.serviceIntel_GetIntelActorEntities() == True
#Not working - data issue with input body payload
+ #@pytest.mark.skipif(falcon.QueryIntelIndicatorIds(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
# def test_GetIntelIndicatorEntities(self):
# assert self.serviceIntel_GetIntelIndicatorEntities() == True
-
+
+ @pytest.mark.skipif(falcon.QueryIntelReportEntities(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_GetIntelReportEntities(self):
assert self.serviceIntel_GetIntelReportEntities() == True
@@ -110,5 +137,8 @@ def test_QueryIntelReportIds(self):
def test_QueryIntelRuleIds(self):
assert self.serviceIntel_QueryIntelRuleIds() == True
- def test_logout(self):
- assert auth.serviceRevoke() == True
\ No newline at end of file
+ def test_Logout(self):
+ assert auth.serviceRevoke() == True
+
+ def test_Errors(self):
+ assert self.serviceIntel_GenerateErrors() == True
\ No newline at end of file
diff --git a/tests/test_iocs.py b/tests/test_iocs.py
index 851abc7b1..ba95c484c 100644
--- a/tests/test_iocs.py
+++ b/tests/test_iocs.py
@@ -24,7 +24,7 @@ def serviceIOCs_QueryIOCs(self):
else:
return False
- @pytest.mark.skipif(falcon.QueryIOCs(parameters={"types":"ipv4"})["status_code"] == 429, reason="API rate limit reached")
+
def serviceIOCs_GetIOC(self):
if falcon.GetIOC(parameters={"type":"ipv4", "value":falcon.QueryIOCs(parameters={"types":"ipv4"})["body"]["resources"][0]})["status_code"] in AllowedResponses:
@@ -32,13 +32,37 @@ def serviceIOCs_GetIOC(self):
else:
return False
+ def serviceIOCs_GenerateErrors(self):
+ falcon.base_url = "nowhere"
+ errorChecks = True
+ commandList = [
+ ["DevicesCount","parameters={}"],
+ ["GetIOC","parameters={}"],
+ ["CreateIOC","body={}"],
+ ["DeleteIOC","parameters={}"],
+ ["UpdateIOC","body={}, parameters={}"],
+ ["DevicesRanOn","parameters={}"],
+ ["QueryIOCs",""],
+ ["ProcessesRanOn","parameters={}"],
+ ["entities_processes","ids='12345678'"]
+ ]
+ for cmd in commandList:
+ if eval("falcon.{}({})['status_code']".format(cmd[0],cmd[1])) != 500:
+ errorChecks = False
+
+ return errorChecks
+
def test_QueryIOCs(self):
assert self.serviceIOCs_QueryIOCs() == True
# Current test environment doesn't have any custom IOCs configured atm
+ #@pytest.mark.skipif(falcon.QueryIOCs(parameters={"types":"ipv4"})["status_code"] == 429, reason="API rate limit reached")
# def test_GetIOC(self):
# assert self.serviceIOCs_GetIOC() == True
- def test_logout(self):
- assert auth.serviceRevoke() == True
\ No newline at end of file
+ def test_Logout(self):
+ assert auth.serviceRevoke() == True
+
+ def test_Errors(self):
+ assert self.serviceIOCs_GenerateErrors() == True
\ No newline at end of file
diff --git a/tests/test_prevention_policy.py b/tests/test_prevention_policy.py
index a08002a43..9ab6261fc 100644
--- a/tests/test_prevention_policy.py
+++ b/tests/test_prevention_policy.py
@@ -24,7 +24,6 @@ def servicePrevent_queryPreventionPolicies(self):
else:
return False
- @pytest.mark.skipif(falcon.queryPreventionPolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def servicePrevent_queryPreventionPolicyMembers(self):
if falcon.queryPreventionPolicyMembers(parameters={"id":falcon.queryPreventionPolicies(parameters={"limit":1})["body"]["resources"][0]})["status_code"] in AllowedResponses:
return True
@@ -32,7 +31,6 @@ def servicePrevent_queryPreventionPolicyMembers(self):
return False
return True
- @pytest.mark.skipif(falcon.queryPreventionPolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def servicePrevent_getPreventionPolicies(self):
if falcon.getPreventionPolicies(ids=falcon.queryPreventionPolicies(parameters={"limit":1})["body"]["resources"][0])["status_code"] in AllowedResponses:
return True
@@ -46,7 +44,6 @@ def servicePrevent_queryCombinedPreventionPolicies(self):
else:
return False
- @pytest.mark.skipif(falcon.queryCombinedPreventionPolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def servicePrevent_queryCombinedPreventionPolicyMembers(self):
if falcon.queryCombinedPreventionPolicyMembers(parameters={"id":falcon.queryCombinedPreventionPolicies(parameters={"limit":1})["body"]["resources"][0]["id"]})["status_code"] in AllowedResponses:
return True
@@ -54,20 +51,47 @@ def servicePrevent_queryCombinedPreventionPolicyMembers(self):
return False
return True
+ def servicePrevent_GenerateErrors(self):
+ falcon.base_url = "nowhere"
+ errorChecks = True
+ commandList = [
+ ["queryCombinedPreventionPolicyMembers",""],
+ ["queryCombinedPreventionPolicies",""],
+ ["performPreventionPoliciesAction","body={}, parameters={}"],
+ ["setPreventionPoliciesPrecedence","body={}"],
+ ["getPreventionPolicies","ids='12345678'"],
+ ["createPreventionPolicies","body={}"],
+ ["deletePreventionPolicies","ids='12345678'"],
+ ["updatePreventionPolicies","body={}"],
+ ["queryPreventionPolicyMembers",""],
+ ["queryPreventionPolicies",""]
+ ]
+ for cmd in commandList:
+ if eval("falcon.{}({})['status_code']".format(cmd[0],cmd[1])) != 500:
+ errorChecks = False
+
+ return errorChecks
+
def test_queryPreventionPolicies(self):
assert self.servicePrevent_queryPreventionPolicies() == True
+ @pytest.mark.skipif(falcon.queryPreventionPolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_queryPreventionPolicyMembers(self):
assert self.servicePrevent_queryPreventionPolicyMembers() == True
-
+
+ @pytest.mark.skipif(falcon.queryPreventionPolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_getPreventionPolicies(self):
assert self.servicePrevent_getPreventionPolicies() == True
def test_queryCombinedPreventionPolicies(self):
assert self.servicePrevent_queryCombinedPreventionPolicies() == True
-
+
+ @pytest.mark.skipif(falcon.queryCombinedPreventionPolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_queryCombinedPreventionPolicyMembers(self):
assert self.servicePrevent_queryCombinedPreventionPolicyMembers() == True
- def test_logout(self):
- assert auth.serviceRevoke() == True
\ No newline at end of file
+ def test_Logout(self):
+ assert auth.serviceRevoke() == True
+
+ def test_Errors(self):
+ assert self.servicePrevent_GenerateErrors() == True
\ No newline at end of file
diff --git a/tests/test_real_time_response.py b/tests/test_real_time_response.py
index 35f96a396..5581522d4 100644
--- a/tests/test_real_time_response.py
+++ b/tests/test_real_time_response.py
@@ -25,8 +25,43 @@ def serviceRTR_ListAllSessions(self):
else:
return False
+ def serviceRTR_GenerateErrors(self):
+ falcon.base_url = "nowhere"
+ errorChecks = True
+ commandList = [
+ ["RTR_AggregateSessions","body={}"],
+ ["BatchActiveResponderCmd","body={}"],
+ ["BatchCmd","body={}"],
+ ["BatchGetCmdStatus","parameters={}"],
+ ["BatchGetCmd","body={}"],
+ ["BatchInitSessions","body={}"],
+ ["BatchRefreshSessions","body={}"],
+ ["RTR_CheckActiveResponderCommandStatus","parameters={}"],
+ ["RTR_ExecuteActiveResponderCommand","body={}"],
+ ["RTR_CheckCommandStatus","parameters={}"],
+ ["RTR_ExecuteCommand","body={}"],
+ ["RTR_GetExtractedFileContents","parameters={}"],
+ ["RTR_ListFiles","parameters={}"],
+ ["RTR_DeleteFile","ids='12345678', parameters={}"],
+ ["RTR_ListQueuedSessions","body={}"],
+ ["RTR_DeleteQueuedSession","parameters={}"],
+ ["RTR_PulseSession","body={}"],
+ ["RTR_ListSessions","body={}"],
+ ["RTR_InitSession","body={}"],
+ ["RTR_DeleteSession","parameters={}"],
+ ["RTR_ListAllSessions",""]
+ ]
+ for cmd in commandList:
+ if eval("falcon.{}({})['status_code']".format(cmd[0],cmd[1])) != 500:
+ errorChecks = False
+
+ return errorChecks
+
def test_RTR_ListAllSessions(self):
assert self.serviceRTR_ListAllSessions() == True
- def test_logout(self):
- assert auth.serviceRevoke() == True
\ No newline at end of file
+ def test_Logout(self):
+ assert auth.serviceRevoke() == True
+
+ def test_Errors(self):
+ assert self.serviceRTR_GenerateErrors() == True
\ No newline at end of file
diff --git a/tests/test_real_time_response_admin.py b/tests/test_real_time_response_admin.py
index 3be1de00e..a3ba4508d 100644
--- a/tests/test_real_time_response_admin.py
+++ b/tests/test_real_time_response_admin.py
@@ -31,11 +31,37 @@ def serviceRTR_ListScripts(self):
else:
return False
+ def serviceRTR_GenerateErrors(self):
+ falcon.base_url = "nowhere"
+ errorChecks = True
+ commandList = [
+ ["BatchAdminCmd","body={}"],
+ ["RTR_CheckAdminCommandStatus","parameters={}"],
+ ["RTR_ExecuteAdminCommand","body={}"],
+ ["RTR_GetPut_Files","ids='12345678'"],
+ ["RTR_CreatePut_Files","data={}, files=[]"],
+ ["RTR_DeletePut_Files","ids='12345678'"],
+ ["RTR_GetScripts","ids='12345678'"],
+ ["RTR_CreateScripts","data={}, files=[]"],
+ ["RTR_DeleteScripts","ids='12345678'"],
+ ["RTR_UpdateScripts","data={}, files=[]"],
+ ["RTR_ListPut_Files",""],
+ ["RTR_ListScripts",""]
+ ]
+ for cmd in commandList:
+ if eval("falcon.{}({})['status_code']".format(cmd[0],cmd[1])) != 500:
+ errorChecks = False
+
+ return errorChecks
+
def test_RTR_ListScripts(self):
assert self.serviceRTR_ListScripts() == True
def test_RTR_ListPut_Files(self):
assert self.serviceRTR_ListPut_Files() == True
- def test_logout(self):
- assert auth.serviceRevoke() == True
\ No newline at end of file
+ def test_Logout(self):
+ assert auth.serviceRevoke() == True
+
+ def test_Errors(self):
+ assert self.serviceRTR_GenerateErrors() == True
\ No newline at end of file
diff --git a/tests/test_sensor_update_policy.py b/tests/test_sensor_update_policy.py
index bdf701808..6b67b84e8 100644
--- a/tests/test_sensor_update_policy.py
+++ b/tests/test_sensor_update_policy.py
@@ -29,14 +29,13 @@ def serviceSensorUpdate_querySensorUpdatePolicyMembers(self):
return True
else:
return False
- @pytest.mark.skipif(falcon.querySensorUpdatePolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
+
def serviceSensorUpdate_getSensorUpdatePolicies(self):
if falcon.getSensorUpdatePolicies(ids=falcon.querySensorUpdatePolicies(parameters={"limit":1})["body"]["resources"][0])["status_code"] in AllowedResponses:
return True
else:
return False
- @pytest.mark.skipif(falcon.querySensorUpdatePolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def serviceSensorUpdate_getSensorUpdatePoliciesV2(self):
if falcon.getSensorUpdatePoliciesV2(ids=falcon.querySensorUpdatePolicies(parameters={"limit":1})["body"]["resources"][0])["status_code"] in AllowedResponses:
return True
@@ -55,6 +54,34 @@ def serviceSensorUpdate_queryCombinedSensorUpdatePolicyMembers(self):
else:
return False
+ def serviceSensorUpdate_GenerateErrors(self):
+ falcon.base_url = "nowhere"
+ errorChecks = True
+ commandList = [
+ ["querySensorUpdatePolicies",""],
+ ["querySensorUpdatePolicyMembers",""],
+ ["getSensorUpdatePolicies","ids='12345678'"],
+ ["getSensorUpdatePoliciesV2","ids='12345678'"],
+ ["queryCombinedSensorUpdatePolicies",""],
+ ["queryCombinedSensorUpdatePolicyMembers", ""],
+ ["revealUninstallToken","body={}"],
+ ["queryCombinedSensorUpdateBuilds", ""],
+ ["createSensorUpdatePolicies", "body={}"],
+ ["createSensorUpdatePoliciesV2", "body={}"],
+ ["deleteSensorUpdatePolicies", "ids='12345678'"],
+ ["updateSensorUpdatePolicies", "body={}"],
+ ["updateSensorUpdatePoliciesV2", "body={}"],
+ ["performSensorUpdatePoliciesAction", "body={},parameters={}"],
+ ["setSensorUpdatePoliciesPrecedence", "body={}"],
+ ["queryCombinedSensorUpdatePoliciesV2",""]
+ ]
+
+ for cmd in commandList:
+ if eval("falcon.{}({})['status_code']".format(cmd[0],cmd[1])) != 500:
+ errorChecks = False
+
+ return errorChecks
+
def test_querySensorUpdatePolicies(self):
assert self.serviceSensorUpdate_querySensorUpdatePolicies() == True
@@ -66,12 +93,17 @@ def test_queryCombinedSensorUpdatePolicies(self):
def test_queryCombinedSensorUpdatePolicyMembers(self):
assert self.serviceSensorUpdate_queryCombinedSensorUpdatePolicyMembers() == True
-
+
+ @pytest.mark.skipif(falcon.querySensorUpdatePolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_getSensorUpdatePolicies(self):
assert self.serviceSensorUpdate_getSensorUpdatePolicies() == True
-
+
+ @pytest.mark.skipif(falcon.querySensorUpdatePolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_getSensorUpdatePoliciesV2(self):
assert self.serviceSensorUpdate_getSensorUpdatePoliciesV2() == True
- def test_logout(self):
- assert auth.serviceRevoke() == True
\ No newline at end of file
+ def test_Logout(self):
+ assert auth.serviceRevoke() == True
+
+ def test_Errors(self):
+ assert self.serviceSensorUpdate_GenerateErrors() == True
\ No newline at end of file
diff --git a/tests/test_spotlight_vulnerabilities.py b/tests/test_spotlight_vulnerabilities.py
index f56caf527..f07140b53 100644
--- a/tests/test_spotlight_vulnerabilities.py
+++ b/tests/test_spotlight_vulnerabilities.py
@@ -24,18 +24,31 @@ def serviceSpotlight_queryVulnerabilities(self):
else:
return False
- @pytest.mark.skipif(falcon.queryVulnerabilities(parameters={"limit":1,"filter":"created_timestamp:>'2020-01-01T00:00:01Z'"})["status_code"] == 429, reason="API rate limit reached")
def serviceSpotlight_getVulnerabilities(self):
if falcon.getVulnerabilities(ids=falcon.queryVulnerabilities(parameters={"limit":1,"filter":"created_timestamp:>'2020-01-01T00:00:01Z'"})["body"]["resources"][0])["status_code"] in AllowedResponses:
return True
else:
return False
+ def serviceSpotlight_GenerateErrors(self):
+ falcon.base_url = "nowhere"
+ errorChecks = True
+ if falcon.queryVulnerabilities(parameters={})["status_code"] != 500:
+ errorChecks = False
+ if falcon.getVulnerabilities(ids="12345678")["status_code"] != 500:
+ errorChecks = False
+
+ return errorChecks
+
def test_queryVulnerabilities(self):
assert self.serviceSpotlight_queryVulnerabilities() == True
-
+
+ @pytest.mark.skipif(falcon.queryVulnerabilities(parameters={"limit":1,"filter":"created_timestamp:>'2020-01-01T00:00:01Z'"})["status_code"] == 429, reason="API rate limit reached")
def test_getVulnerabilities(self):
assert self.serviceSpotlight_getVulnerabilities() == True
- def test_logout(self):
+ def test_Logout(self):
assert auth.serviceRevoke() == True
+
+ def test_Errors(self):
+ assert self.serviceSpotlight_GenerateErrors() == True
\ No newline at end of file
diff --git a/tests/test_uber_api_complete.py b/tests/test_uber_api_complete.py
new file mode 100644
index 000000000..2fbb42740
--- /dev/null
+++ b/tests/test_uber_api_complete.py
@@ -0,0 +1,204 @@
+# test_uber_api_complete.py
+# This class tests the uber class
+
+import json
+import os
+import sys
+import pytest
+import datetime
+import hashlib
+#Import our sibling src folder into the path
+sys.path.append(os.path.abspath('src'))
+# Classes to test - manually imported from our sibling folder
+from falconpy import api_complete as FalconSDK
+
+AllowedResponses = [200, 400, 415, 429, 500]
+
+if "DEBUG_API_ID" in os.environ and "DEBUG_API_SECRET" in os.environ:
+ config = {}
+ config["falcon_client_id"] = os.getenv("DEBUG_API_ID")
+ config["falcon_client_secret"] = os.getenv("DEBUG_API_SECRET")
+else:
+ cur_path = os.path.dirname(os.path.abspath(__file__))
+ if os.path.exists('%s/test.config' % cur_path):
+ with open('%s/test.config' % cur_path, 'r') as file_config:
+ config = json.loads(file_config.read())
+ else:
+ sys.exit(1)
+
+
+falcon = FalconSDK.APIHarness(
+ creds={
+ "client_id": config["falcon_client_id"],
+ "client_secret": config["falcon_client_secret"]
+ }
+)
+falcon.authenticate()
+if not falcon.authenticated:
+ sys.exit(1)
+
+class TestUber:
+ def uberCCAWS_GetAWSSettings(self):
+ if falcon.command("GetAWSSettings")["status_code"] in AllowedResponses:
+ return True
+ else:
+ return False
+
+ def uberCCAWS_QueryAWSAccounts(self):
+ if falcon.command("QueryAWSAccounts", parameters={"limit":1})["status_code"] in AllowedResponses:
+ return True
+ else:
+ return False
+
+ def uberCCAWS_GetAWSAccounts(self):
+ if falcon.command("GetAWSAccounts", ids=falcon.command("QueryAWSAccounts", parameters={"limit":1})["body"]["resources"][0]["id"])["status_code"] in AllowedResponses:
+ return True
+ else:
+ return False
+
+ def uberCCAWS_VerifyAWSAccountAccess(self):
+ if falcon.command("VerifyAWSAccountAccess", ids=falcon.command("QueryAWSAccounts", parameters={"limit":1})["body"]["resources"][0]["id"])["status_code"] in AllowedResponses:
+ return True
+ else:
+ return False
+
+ def uberCCAWS_QueryAWSAccountsForIDs(self):
+ if falcon.command("QueryAWSAccountsForIDs", parameters={"limit":1})["status_code"] in AllowedResponses:
+ return True
+ else:
+ return False
+
+ def uberCCAWS_TestUploadDownload(self):
+ FILENAME="tests/testfile.png"
+ fmt='%Y-%m-%d %H:%M:%S'
+ stddate = datetime.datetime.now().strftime(fmt)
+ sdtdate = datetime.datetime.strptime(stddate, fmt)
+ sdtdate = sdtdate.timetuple()
+ jdate = sdtdate.tm_yday
+ jdate = "{}{}".format(stddate.replace("-","").replace(":","").replace(" ",""),jdate)
+ SOURCE="%s_source.png" % jdate
+ TARGET="tests/%s_target.png" % jdate
+ PAYLOAD = open(FILENAME, 'rb').read()
+ response = falcon.command('UploadSampleV3', file_name=SOURCE, data=PAYLOAD, content_type="application/octet-stream")
+ sha = response["body"]["resources"][0]["sha256"]
+ response = falcon.command("GetSampleV3", parameters={}, ids=sha)
+ open(TARGET, 'wb').write(response)
+ buf=65536
+ hash1 = hashlib.sha256()
+ with open(FILENAME, 'rb') as f:
+ while True:
+ data = f.read(buf)
+ if not data:
+ break
+ hash1.update(data)
+ hash1 = hash1.hexdigest()
+ hash2 = hashlib.sha256()
+ with open(TARGET, 'rb') as f:
+ while True:
+ data = f.read(buf)
+ if not data:
+ break
+ hash2.update(data)
+ hash2 = hash2.hexdigest()
+ if os.path.exists(TARGET):
+ os.remove(TARGET)
+ if hash1 == hash2:
+ return True
+ else:
+ return False
+
+ def uberCCAWS_GenerateError(self):
+ if falcon.command("QueryAWSAccounts", partition=0)["status_code"] in AllowedResponses:
+ return True
+ else:
+ return False
+
+ def uberCCAWS_GenerateInvalidPayload(self):
+ if falcon.command("refreshActiveStreamSession", partition=9, parameters={"action_name":"refresh_active_stream_session"})["status_code"] in AllowedResponses:
+ return True
+ else:
+ return False
+
+ def uberCCAWS_OverrideAndHeader(self):
+ if falcon.command(action="", override="GET,/cloud-connect-aws/combined/accounts/v1", headers={"Nothing":"Special"})["status_code"] in AllowedResponses:
+ return True
+ else:
+ return False
+
+ def uberCCAWS_BadCommand(self):
+ if falcon.command("IWantTheImpossible", parameters={"limit":1})["status_code"] in AllowedResponses:
+ return True
+ else:
+ return False
+
+ def uberCCAWS_GenerateServerError(self):
+ if falcon.command("GetAWSAccounts", ids="123", data=['Kerash!'])["status_code"] in AllowedResponses:
+ return True
+ else:
+ return False
+
+ def uberCCAWS_GenerateTokenError(self):
+ falcon.base_url = "'`\""
+ if falcon.deauthenticate() == False:
+ return True
+ else:
+ return False
+
+ def uberCCAWS_BadAuthentication(self):
+ falcon = FalconSDK.APIHarness(
+ creds={
+ "client_id": "BadClientID",
+ "client_secret": "BadClientSecret"
+ }
+ )
+ if falcon.command("QueryAWSAccounts", parameters={"limit":1})["status_code"] in AllowedResponses:
+ return True
+ else:
+ return False
+
+
+ def test_GetAWSSettings(self):
+ assert self.uberCCAWS_GetAWSSettings() == True
+
+ def test_QueryAWSAccounts(self):
+ assert self.uberCCAWS_QueryAWSAccounts() == True
+
+ @pytest.mark.skipif(falcon.command("QueryAWSAccounts", parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
+ def test_GetAWSAccounts(self):
+ assert self.uberCCAWS_GetAWSAccounts() == True
+
+ @pytest.mark.skipif(falcon.command("QueryAWSAccounts", parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
+ def test_VerifyAWSAccountAccess(self):
+ assert self.uberCCAWS_VerifyAWSAccountAccess() == True
+
+ def test_QueryAWSAccountsForIDs(self):
+ assert self.uberCCAWS_QueryAWSAccountsForIDs() == True
+
+ def test_UploadDownload(self):
+ assert self.uberCCAWS_TestUploadDownload() == True
+
+ def test_GenerateError(self):
+ assert self.uberCCAWS_GenerateError() == True
+
+ def test_GenerateInvalidPayload(self):
+ assert self.uberCCAWS_GenerateInvalidPayload() == True
+
+ def test_BadCommand(self):
+ assert self.uberCCAWS_BadCommand() == True
+
+ def test_OverrideAndHeader(self):
+ #Also check token auto-renewal
+ falcon.token_expiration=0
+ assert self.uberCCAWS_OverrideAndHeader() == True
+
+ def test_GenerateServerError(self):
+ assert self.uberCCAWS_GenerateServerError() == True
+
+ def test_logout(self):
+ assert falcon.deauthenticate() == True
+
+ def test_GenerateTokenError(self):
+ assert self.uberCCAWS_GenerateTokenError() == True
+
+ def test_BadAuthentication(self):
+ assert self.uberCCAWS_BadAuthentication() == True
\ No newline at end of file
diff --git a/tests/test_user_management.py b/tests/test_user_management.py
index 18ee7ff03..696a2a986 100644
--- a/tests/test_user_management.py
+++ b/tests/test_user_management.py
@@ -29,21 +29,18 @@ def serviceUserManagement_RetrieveUserUUIDsByCID(self):
else:
return False
- @pytest.mark.skipif(falcon.RetrieveEmailsByCID()["status_code"] == 429, reason="API rate limit reached")
def serviceUserManagement_RetrieveUserUUID(self):
if falcon.RetrieveUserUUID(parameters={"uid": falcon.RetrieveEmailsByCID()["body"]["resources"][0]})["status_code"] in AllowedResponses:
return True
else:
return False
- @pytest.mark.skipif(falcon.RetrieveUserUUIDsByCID()["status_code"] == 429, reason="API rate limit reached")
def serviceUserManagement_RetrieveUser(self):
if falcon.RetrieveUser(ids=falcon.RetrieveUserUUIDsByCID()["body"]["resources"][0])["status_code"] in AllowedResponses:
return True
else:
return False
- @pytest.mark.skipif(falcon.RetrieveUserUUIDsByCID()["status_code"] == 429, reason="API rate limit reached")
def serviceUserManagement_GetUserRoleIds(self):
if falcon.GetUserRoleIds(parameters={"user_uuid":falcon.RetrieveUserUUIDsByCID()["body"]["resources"][0]})["status_code"] in AllowedResponses:
return True
@@ -56,33 +53,62 @@ def serviceUserManagement_GetAvailableRoleIds(self):
else:
return False
- @pytest.mark.skipif(falcon.GetAvailableRoleIds()["status_code"] == 429, reason="API rate limit reached")
def serviceUserManagement_GetRoles(self):
if falcon.GetRoles(ids=falcon.GetAvailableRoleIds()["body"]["resources"][0])["status_code"] in AllowedResponses:
return True
else:
return False
+
+ def serviceUserManagement_GenerateErrors(self):
+ falcon.base_url = "nowhere"
+ errorChecks = True
+ commandList = [
+ ["GetRoles","ids='12345678'"],
+ ["GrantUserRoleIds","body={}, parameters={}"],
+ ["RevokeUserRoleIds","ids='12345678', parameters={}"],
+ ["GetAvailableRoleIds",""],
+ ["GetUserRoleIds","parameters={}"],
+ ["RetrieveUser","ids='12345678'"],
+ ["CreateUser","body={}"],
+ ["DeleteUser","parameters={}"],
+ ["UpdateUser","body={}, parameters={}"],
+ ["RetrieveEmailsByCID",""],
+ ["RetrieveUserUUIDsByCID",""],
+ ["RetrieveUserUUID","parameters={}"]
+ ]
+ for cmd in commandList:
+ if eval("falcon.{}({})['status_code']".format(cmd[0],cmd[1])) != 500:
+ errorChecks = False
+
+ return errorChecks
def test_RetrieveEmailsByCID(self):
assert self.serviceUserManagement_RetrieveEmailsByCID() == True
def test_RetrieveUserUUIDsByCID(self):
assert self.serviceUserManagement_RetrieveUserUUIDsByCID() == True
-
+
+ @pytest.mark.skipif(falcon.RetrieveEmailsByCID()["status_code"] == 429, reason="API rate limit reached")
def test_RetrieveUserUUID(self):
assert self.serviceUserManagement_RetrieveUserUUID() == True
-
+
+ @pytest.mark.skipif(falcon.RetrieveUserUUIDsByCID()["status_code"] == 429, reason="API rate limit reached")
def test_RetrieveUser(self):
assert self.serviceUserManagement_RetrieveUser() == True
-
+
+ @pytest.mark.skipif(falcon.RetrieveUserUUIDsByCID()["status_code"] == 429, reason="API rate limit reached")
def test_GetUserRoleIds(self):
assert self.serviceUserManagement_GetUserRoleIds() == True
def test_GetAvailableRoleIds(self):
assert self.serviceUserManagement_GetAvailableRoleIds() == True
-
+
+ @pytest.mark.skipif(falcon.GetAvailableRoleIds()["status_code"] == 429, reason="API rate limit reached")
def test_GetRoles(self):
assert self.serviceUserManagement_GetRoles() == True
- def test_logout(self):
- assert auth.serviceRevoke() == True
\ No newline at end of file
+ def test_Logout(self):
+ assert auth.serviceRevoke() == True
+
+ def test_Errors(self):
+ assert self.serviceUserManagement_GenerateErrors() == True
\ No newline at end of file
diff --git a/tests/testfile.png b/tests/testfile.png
new file mode 100644
index 000000000..3495bd51a
Binary files /dev/null and b/tests/testfile.png differ