Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coverage badge generation, additional unit tests #41

Merged
merged 13 commits into from
Jan 11, 2021
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
![PyPI - Status](https://img.shields.io/pypi/status/crowdstrike-falconpy)<br/>
![PyPI](https://img.shields.io/pypi/v/crowdstrike-falconpy) ![PyPI - Downloads](https://img.shields.io/pypi/dm/crowdstrike-falconpy) ![CI Tests](https://github.com/CrowdStrike/falconpy/workflows/Python%20package/badge.svg)<br/>
![PyPI - Implementation](https://img.shields.io/pypi/implementation/crowdstrike-falconpy) ![PyPI - Python Version](https://img.shields.io/pypi/pyversions/crowdstrike-falconpy) ![PyPI - Wheel](https://img.shields.io/pypi/wheel/crowdstrike-falconpy)<br/>
![PyPI - Status](https://img.shields.io/pypi/status/crowdstrike-falconpy) ![PyPI - Implementation](https://img.shields.io/pypi/implementation/crowdstrike-falconpy) ![PyPI - Python Version](https://img.shields.io/pypi/pyversions/crowdstrike-falconpy) ![PyPI - Wheel](https://img.shields.io/pypi/wheel/crowdstrike-falconpy)<br/>
![PyPI](https://img.shields.io/pypi/v/crowdstrike-falconpy) ![PyPI - Downloads](https://img.shields.io/pypi/dm/crowdstrike-falconpy) ![CI Tests](https://github.com/CrowdStrike/falconpy/workflows/Python%20package/badge.svg) ![CI Test Coverage](https://raw.githubusercontent.com/CrowdStrike/falconpy/main/tests/coverage.svg)<br/>
![Twitter URL](https://img.shields.io/twitter/url?label=Follow%20%40CrowdStrike&style=social&url=https%3A%2F%2Ftwitter.com%2FCrowdStrike)<br/>

# FalconPy
Expand Down Expand Up @@ -34,7 +33,7 @@ $ python -m pip uninstall crowdstrike-falconpy
| CrowdStrike Sensor Policy Management API | [./src/falconpy/sensor_update_policy.py](./src/falconpy/sensor_update_policy.py) |
| [CrowdStrike Custom Indicators of Compromose (IOCs) APIs](https://falcon.crowdstrike.com/support/documentation/88/custom-ioc-apis) | [./src/falconpy/iocs.py](./src/falconpy/iocs.py) |
| [CrowdStrike Detections APIs](https://falcon.crowdstrike.com/support/documentation/85/detection-and-prevention-policies-apis) | [./src/falconpy/detects.py](./src/falconpy/detects.py) |
| [CrowdStrike Event Streams API](https://falcon.crowdstrike.com/support/documentation/89/event-streams-apis)| [./serices/event_streams.py](./src/falconpy/event_streams.py) |
| [CrowdStrike Event Streams API](https://falcon.crowdstrike.com/support/documentation/89/event-streams-apis)| [./src/falconpy/event_streams.py](./src/falconpy/event_streams.py) |
| [CrowdStrike Falcon Horizon APIs](https://falcon.crowdstrike.com/support/documentation/137/falcon-horizon-apis) | *Coming Soon* |
| [CrowdStrike Falon X APIs](https://falcon.crowdstrike.com/support/documentation/92/falcon-x-apis) | *Coming Soon* |
| [CrowdStrike Firewall Management API](https://falcon.crowdstrike.com/support/documentation/107/falcon-firewall-management-apis) | [./src/falconpy/firewall_management.py](./src/falconpy/firewall_management.py) |
Expand Down
21 changes: 21 additions & 0 deletions tests/coverage.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
23 changes: 23 additions & 0 deletions tests/test_authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,26 @@ def serviceAuth(self):
else:
return False

def failServiceAuth(self):

self.authorization = FalconAuth.OAuth2(creds={
'client_id': "BadClientID",
'client_secret': "BadClientSecret"
})
self.authorization.base_url = "nowhere"
try:
self.token = self.authorization.token()['body']['access_token']
except:
self.token = False

self.authorization.revoke(self.token)

if self.token:
return False
else:
return True


def serviceRevoke(self):
try:
result = self.authorization.revoke(token=self.token)["status_code"]
Expand All @@ -106,5 +126,8 @@ def test_serviceRevoke(self):
self.serviceAuth()
assert self.serviceRevoke() == True

def test_failServiceAuth(self):
assert self.failServiceAuth() == True



102 changes: 92 additions & 10 deletions tests/test_cloud_connect_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,20 @@
auth = Authorization.TestAuthorization()
auth.serviceAuth()
falcon = FalconAWS.Cloud_Connect_AWS(access_token=auth.token)
AllowedResponses = [200, 429] #Adding rate-limiting as an allowed response for now

AllowedResponses = [200, 201, 429] #Adding rate-limiting as an allowed response for now
accountPayload = {
"resources": [
{
# "cloudtrail_bucket_owner_id": cloudtrail_bucket_owner_id,
# "cloudtrail_bucket_region": cloudtrail_bucket_region,
# "external_id": external_id,
# "iam_role_arn": iam_role_arn,
# "id": local_account,
"rate_limit_reqs": 0,
"rate_limit_time": 0
}
]
}
class TestCloudConnectAWS:
def serviceCCAWS_GetAWSSettings(self):
if falcon.GetAWSSettings()["status_code"] in AllowedResponses:
Expand All @@ -26,19 +38,44 @@ def serviceCCAWS_GetAWSSettings(self):
return False

def serviceCCAWS_QueryAWSAccounts(self):
if falcon.QueryAWSAccounts()["status_code"] in AllowedResponses:
if falcon.QueryAWSAccounts(parameters={"limit":1})["status_code"] in AllowedResponses:
return True
else:
return False

@pytest.mark.skipif(falcon.QueryAWSAccounts(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def serviceCCAWS_GetAWSAccounts(self):
if falcon.GetAWSAccounts(ids=falcon.QueryAWSAccounts(parameters={"limit":1})["body"]["resources"][0]["id"])["status_code"] in AllowedResponses:
return True
else:
return False

@pytest.mark.skipif(falcon.QueryAWSAccounts(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def serviceCCAWS_AccountUpdate(self):
account = falcon.QueryAWSAccounts(parameters={"limit":1})["body"]["resources"][0]
accountPayload["resources"][0]["cloudtrail_bucket_owner_id"] = account["cloudtrail_bucket_owner_id"]
accountPayload["resources"][0]["cloudtrail_bucket_region"] = account["cloudtrail_bucket_region"]
orig_external_id = account["external_id"]
accountPayload["resources"][0]["external_id"] = "UnitTesting"
accountPayload["resources"][0]["iam_role_arn"] = account["iam_role_arn"]
accountPayload["resources"][0]["id"] = account["id"]
if falcon.UpdateAWSAccounts(body=accountPayload)["status_code"] in AllowedResponses:
accountPayload["resources"][0]["external_id"] = orig_external_id
return True
else:
accountPayload["resources"][0]["external_id"] = orig_external_id
return False

def serviceCCAWS_AccountDelete(self):
if falcon.DeleteAWSAccounts(ids=accountPayload["resources"][0]["id"])["status_code"] in AllowedResponses:
return True
else:
return False

def serviceCCAWS_AccountRegister(self):
if falcon.ProvisionAWSAccounts(body=accountPayload)["status_code"] in AllowedResponses:
return True
else:
return False

def serviceCCAWS_VerifyAWSAccountAccess(self):
if falcon.VerifyAWSAccountAccess(ids=falcon.QueryAWSAccounts(parameters={"limit":1})["body"]["resources"][0]["id"])["status_code"] in AllowedResponses:
return True
Expand All @@ -51,20 +88,65 @@ def serviceCCAWS_QueryAWSAccountsForIDs(self):
else:
return False

def serviceCCAWS_GenerateErrors(self):
# Garf the base_url so we force 500s for each method to cover all remaining code paths
falcon.base_url = "nowhere"
errorChecks = True
if falcon.QueryAWSAccounts()["status_code"] != 500:
errorChecks = False
if falcon.QueryAWSAccountsForIDs()["status_code"] != 500:
errorChecks = False
if falcon.GetAWSSettings()["status_code"] != 500:
errorChecks = False
if falcon.GetAWSAccounts(ids="1234567890")["status_code"] != 500:
errorChecks = False
if falcon.UpdateAWSAccounts(body={})["status_code"] != 500:
errorChecks = False
if falcon.DeleteAWSAccounts(ids="1234567890")["status_code"] != 500:
errorChecks = False
if falcon.ProvisionAWSAccounts(body={})["status_code"] != 500:
errorChecks = False
if falcon.CreateOrUpdateAWSSettings(body={})["status_code"] != 500:
errorChecks = False
if falcon.VerifyAWSAccountAccess(ids="1234567890", body={})["status_code"] != 500:
errorChecks = False

return errorChecks

def test_GetAWSSettings(self):
assert self.serviceCCAWS_GetAWSSettings() == True

def test_QueryAWSAccounts(self):
assert self.serviceCCAWS_QueryAWSAccounts() == True


@pytest.mark.skipif(falcon.QueryAWSAccounts(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_GetAWSAccounts(self):
assert self.serviceCCAWS_GetAWSAccounts() == True


@pytest.mark.skipif(falcon.QueryAWSAccounts(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_VerifyAWSAccountAccess(self):
assert self.serviceCCAWS_VerifyAWSAccountAccess() == True


@pytest.mark.skipif(falcon.QueryAWSAccounts(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
@pytest.mark.skipif(sys.version_info.minor < 9, reason="Frequency reduced due to potential race condition")
def test_AccountUpdate(self):
assert self.serviceCCAWS_AccountUpdate() == True

@pytest.mark.skipif(falcon.QueryAWSAccounts(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
@pytest.mark.skipif(sys.version_info.minor < 9, reason="Frequency reduced due to potential race condition")
def test_AccountDelete(self):
assert self.serviceCCAWS_AccountDelete() == True

def test_QueryAWSAccountsForIDs(self):
assert self.serviceCCAWS_QueryAWSAccountsForIDs() == True

@pytest.mark.skipif(falcon.QueryAWSAccounts(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
@pytest.mark.skipif(sys.version_info.minor < 9, reason="Frequency reduced due to potential race condition")
def test_AccountRegister(self):
assert self.serviceCCAWS_AccountRegister() == True

def test_Logout(self):
assert auth.serviceRevoke() == True

def test_logout(self):
assert auth.serviceRevoke() == True
def test_Errors(self):
assert self.serviceCCAWS_GenerateErrors() == True
30 changes: 23 additions & 7 deletions tests/test_detects.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,46 @@ def serviceDetects_QueryDetects(self):
else:
return False

@pytest.mark.skipif(falcon.QueryDetects(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def serviceDetects_GetDetectSummaries(self):
if falcon.GetDetectSummaries(body={"ids":falcon.QueryDetects(parameters={"limit":1})["body"]["resources"]})["status_code"] in AllowedResponses:
return True
else:
return False

# def serviceDetects_GetAggregateDetects(self):
# auth, falcon = self.authenticate()
# if falcon.GetAggregateDetects(body={"ids":falcon.QueryDetects(parameters={"limit":1})["body"]["resources"]})["status_code"] in AllowedResponses:
# auth.serviceRevoke()
# print(falcon.QueryDetects(parameters={"limit":1}))
# print(falcon.GetAggregateDetects(body=[{"ranges":"ranges'{}'".format(falcon.QueryDetects(parameters={"limit":1})["body"]["resources"][0])}]))
# if falcon.GetAggregateDetects(body={"id":falcon.QueryDetects(parameters={"limit":1})["body"]["resources"][0]})["status_code"] in AllowedResponses:
# return True
# else:
# auth.serviceRevoke()
# return False

def serviceDetects_GenerateErrors(self):
falcon.base_url = "nowhere"
errorChecks = True
if falcon.QueryDetects()["status_code"] != 500:
errorChecks = False
if falcon.GetDetectSummaries(body={})["status_code"] != 500:
errorChecks = False
if falcon.GetAggregateDetects(body={})["status_code"] != 500:
errorChecks = False
if falcon.UpdateDetectsByIdsV2(body={})["status_code"] != 500:
errorChecks = False

return errorChecks

def test_QueryDetects(self):
assert self.serviceDetects_QueryDetects() == True


@pytest.mark.skipif(falcon.QueryDetects(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_GetDetectSummaries(self):
assert self.serviceDetects_GetDetectSummaries() == True

# def test_GetAggregateDetects(self):
# assert self.serviceDetects_GetAggregateDetects() == True

def test_logout(self):
assert auth.serviceRevoke() == True
assert auth.serviceRevoke() == True

def test_Errors(self):
assert self.serviceDetects_GenerateErrors() == True
39 changes: 32 additions & 7 deletions tests/test_device_control_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@ def serviceDeviceControlPolicies_queryDeviceControlPolicies(self):
else:
return False

@pytest.mark.skipif(falcon.queryDeviceControlPolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def serviceDeviceControlPolicies_queryDeviceControlPolicyMembers(self):
if falcon.queryDeviceControlPolicyMembers(parameters={"id": falcon.queryDeviceControlPolicies(parameters={"limit":1})["body"]["resources"][0]})["status_code"] in AllowedResponses:
return True
else:
return False

@pytest.mark.skipif(falcon.queryDeviceControlPolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")

def serviceDeviceControlPolicies_getDeviceControlPolicies(self):
if falcon.getDeviceControlPolicies(ids=falcon.queryDeviceControlPolicies(parameters={"limit":1})["body"]["resources"][0])["status_code"] in AllowedResponses:
return True
Expand All @@ -46,27 +45,53 @@ def serviceDeviceControlPolicies_queryCombinedDeviceControlPolicies(self):
else:
return False

@pytest.mark.skipif(falcon.queryCombinedDeviceControlPolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def serviceDeviceControlPolicies_queryCombinedDeviceControlPolicyMembers(self):
if falcon.queryCombinedDeviceControlPolicyMembers(parameters={"id": falcon.queryCombinedDeviceControlPolicies(parameters={"limit":1})["body"]["resources"][0]["id"]})["status_code"] in AllowedResponses:
return True
else:
return False

def serviceDeviceControlPolicies_GenerateErrors(self):
falcon.base_url = "nowhere"
errorChecks = True
commandList = [
["queryCombinedDeviceControlPolicyMembers",""],
["queryCombinedDeviceControlPolicies",""],
["performDeviceControlPoliciesAction","body={}, parameters={}"],
["setDeviceControlPoliciesPrecedence", "body={}"],
["getDeviceControlPolicies","ids='12345678'"],
["createDeviceControlPolicies","body={}"],
["deleteDeviceControlPolicies","ids='12345678'"],
["updateDeviceControlPolicies","body={}"],
["queryDeviceControlPolicyMembers",""],
["queryDeviceControlPolicies",""]
]
for cmd in commandList:
if eval("falcon.{}({})['status_code']".format(cmd[0],cmd[1])) != 500:
errorChecks = False

return errorChecks

def test_queryDeviceControlPolicies(self):
assert self.serviceDeviceControlPolicies_queryDeviceControlPolicies() == True


@pytest.mark.skipif(falcon.queryDeviceControlPolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_queryDeviceControlPolicyMembers(self):
assert self.serviceDeviceControlPolicies_queryDeviceControlPolicyMembers() == True

@pytest.mark.skipif(falcon.queryDeviceControlPolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_getDeviceControlPolicies(self):
assert self.serviceDeviceControlPolicies_getDeviceControlPolicies() == True

def test_queryCombinedDeviceControlPolicies(self):
assert self.serviceDeviceControlPolicies_queryCombinedDeviceControlPolicies() == True


@pytest.mark.skipif(falcon.queryCombinedDeviceControlPolicies(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
def test_queryCombinedDeviceControlPolicyMembers(self):
assert self.serviceDeviceControlPolicies_queryCombinedDeviceControlPolicyMembers() == True

def test_logout(self):
assert auth.serviceRevoke() == True
def test_Logout(self):
assert auth.serviceRevoke() == True

def test_Errors(self):
assert self.serviceDeviceControlPolicies_GenerateErrors() == True
19 changes: 16 additions & 3 deletions tests/test_event_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def serviceStream_listAvailableStreamsOAuth2(self):
else:
return False

@pytest.mark.skipif(falcon.listAvailableStreamsOAuth2(parameters={"appId":"pytest-event_streams-unit-test"})["status_code"] == 429, reason="API rate limit reached")
def serviceStream_refreshActiveStreamSession(self):
avail = falcon.listAvailableStreamsOAuth2(parameters={"appId":"pytest-event_streams-unit-test"})
t1 = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S +0000')
Expand All @@ -39,12 +38,26 @@ def serviceStream_refreshActiveStreamSession(self):
return True
else:
return False

def serviceStream_GenerateErrors(self):
falcon.base_url = "nowhere"
errorChecks = True
if falcon.listAvailableStreamsOAuth2(parameters={})["status_code"] != 500:
errorChecks = False
if falcon.refreshActiveStreamSession(parameters={}, partition=0)["status_code"] != 500:
errorChecks = False

return errorChecks

def test_listAvailableStreamsOAuth2(self):
assert self.serviceStream_listAvailableStreamsOAuth2() == True

#@pytest.mark.skipif(falcon.listAvailableStreamsOAuth2(parameters={"appId":"pytest-event_streams-unit-test"})["status_code"] == 429, reason="API rate limit reached")
# def test_refreshActiveStreamSession(self):
# assert self.serviceStream_refreshActiveStreamSession() == True

def test_logout(self):
assert auth.serviceRevoke() == True
def test_Logout(self):
assert auth.serviceRevoke() == True

def test_Errors(self):
assert self.serviceStream_GenerateErrors() == True
Loading