-
Notifications
You must be signed in to change notification settings - Fork 122
Response Handling
The responses received from the CrowdStrike Falcon API will be in either binary
or JSON
format, with the bulk of these responses being JSON. All requests to the API resulting in an error will produce a JSON formatted response.
WARNING
client_id
andclient_secret
are keyword arguments that contain your CrowdStrike API credentials. Please note that all examples below do not hard code these values. (These values are ingested as strings.)CrowdStrike does NOT recommend hard coding API credentials or customer identifiers within source code.
Binary responses are files that have been requested by the operation performed, and can have varying file formats. These return payloads are intended to be saved locally as part of your handling of the result.
from falconpy import SampleUploads
falcon = SampleUploads(client_id=CLIENT_ID,
client_secret=CLIENT_SECRET
)
# File unique identifier
file_sha = "50d858e0985ecc7f60418aaf0cc5ab587f42c2570a884095a9e8ccacd0f6545c"
# Name of the file we are saving our result to. The extension should
# be aligned to match the requested file format. (This particular
# API operation example returns the result in ZIP format.)
save_file = "api_response.zip"
# We use the 'password_protected' keyword to inform the API we want
# the result bundled in a zip archive with a password applied.
response = falcon.get_sample(password_protected=True, ids=file_sha)
# We can then write the response we've received from the API to a file.
with open(save_file, 'wb') as save_to:
save_to.write(response)
Beginning in v1.1.0, FalconPy supports result object expansion. Result object expansion provides support for scenarios where a binary response is expected as the result of the operation, but HTTP status code or headers retrieval is also necessary. Result expansion is supported for all operations, but provides little advantage for operations that do not produce binary responses.
Expanded results can be requested using the expand_result
keyword when calling the operation. Results are returned as a tuple, (status_code
, headers
, content
).
# Pass a boolean True to the `expand_result` keyword to request expanded results.
download_result = samples.get_sample(ids=file_sha, expand_result=True)
# We're returned a tuple (status_code, headers, content)
# Status will be in 0
print(f"Status returned: {download_result[0]}")
# Headers will be in 1
print(f"Headers returned: {download_result[1]}")
# File content will be in 2
with open(example_file, "wb") as download_file:
download_file.write(download_result[2])
The bulk of the responses from the CrowdStrike Falcon API are in JSON format and have a standardized structure across all API service collections.
{
"status_code": integer,
"headers": {
"Content-Encoding": "string",
"Content-Length": "integer string",
"Content-Type": "application/json",
"Date": "GMT timestamp",
"X-Cs-Region": "string",
"X-Ratelimit-Limit": "integer string",
"X-Ratelimit-Remaining": "integer string"
},
"body": {
"meta": {
"query_time": float,
"pagination": {
"offset": integer,
"limit": integer,
"total": integer
},
"powered_by": "string",
"trace_id": "1c142b37-fa3f-4e04-9042-12345678e8d3"
},
"errors": [],
"resources": [
Results will be returned here either as a list of strings,
a list of integers, or a list of JSON dictionaries
]
}
}
API responses are segregated into 3 main branches.
Branch name | Description |
---|---|
status_code |
This will contain the resulting HTTP status code received from the API in integer format. These results follow standard HTTP status code syntax.
|
headers |
The header branch contains details regarding the returned payload. This will include content details, the time of the request, the CrowdStrike cloud returning the response, and the overall remaining requests available within your current rate limit. |
body |
This branch contains the primary result for the response. The body branch has three sub-branches.
|
In this example we will interact with the Hosts API to create a list of IDs. For example purposes, we're going to drop the limit down and build our list using multiple requests to the API.
# Maximum number of rows to return. Different
# operations support different limit maximums.
max_rows = 100
# Set our total to one so our loop begins
total = 1
# Start with the first record
offset = 0
# List to hold all of the IDs returned by our example
all_ids = []
# Start our loop
while offset < total:
# We use the same integer we use to control our loop for our offset.
response = falcon.query_devices_by_filter(sort="hostname|asc",
limit=max_rows,
offset=offset
)
if response["status_code"] == 200:
# Retrieve our body branch
result = response["body"]
# Retrieve the value of the offset.
offset = result["meta"]["pagination"]["offset"]
# This will be the same every time, overrides our initial value of 1.
total = result["meta"]["pagination"]["total"]
# Retrieve the list of IDs returned.
id_list = result["resources"]
# Append this list to our running list of all IDs.
# In a normal program, processing would occur here.
all_ids.extend(id_list)
else:
# API error has occurred
for error_result in response["body"]["errors"]:
print(error_result["message"])
# We're done, show our entire list.
print(all_ids)
Depending on the API operation performed, there may be more results available than what can be returned by the API. When this occurs, metadata is present within the result to allow developers to specify the next page of results. As demonstrated in the example above, results pagination is typically handled by leveraging the value of offset
and total
to determine your position within the recordset (as determined by the value specified by limit
or by the API maximum) and to retrieve the next batch of results. For larger data sets, CrowdStrike APIs support one of two variations of deep pagination, tokenized and timestamp.
For scenarios where the amount of data requested exceeds limits set by the API, developers should leverage deep pagination to achieve the desired result. For most APIs, this is handled by using a string token to indicate the position within the overall recordset, and will be named after
. There are a couple of API operations where this token will be stored in offset
instead. Please refer to the documenation for that specific Service Collection and Operation for more detail as to which value to use. (See QueryDevicesByFilterScroll for an example of this. A sample has also been developed discussing the two variations of offset found within the Hosts Service Collection.)
In this example we will interact with the QueryVulnerabilities operation within the Spotlight Vulnerabilities API. This operation provides an example of deep pagination that leverages tokenized offsets.
# Import the Spotlight Vulnerabilities Service Class
from falconpy import SpotlightVulnerabilities
# Instantiate the Service Class.
spotlight = SpotlightVulnerabilities(client_id=CLIENT_ID,
client_secret=CLIENT_SECRET
)
# Total number of records to retrieve per request to the QueryVulnerabilities operation. (1-400)
LIMIT = 400
# A simple filter for the query operation
FILTER = "last_seen_within:'45'"
# List to hold our retrieved IDs
id_list = []
# We set our total to one (1) so our initial loop step executes,
# this value is reset by the results of our API call.
total = 1
# We will store the returned pagination attribute 'after' here, and then
# pass it to the method on subsequent executions of the loop. We do
# not pass a value for the after keyword on the first iteration.
position = None
# For this example our loop runs as long as our list holds less than the total results available
while len(id_list) < total:
# Query the Spotlight Vulnerabilities API using the FILTER constant defined above.
# Set the limit to be the value of the LIMIT constant, and our positional after token to be
# the value of the 'position' variable.
returned = spotlight.query_vulnerabilities(limit=LIMIT, filter=FILTER, after=position)
if returned["status_code"] == 200:
# Retrieve pagination detail
page = returned["body"]["meta"]["pagination"]
# Total records returned for the filter used
total = page["total"]
# The 'position' variable holds our next positional token based
# upon the values of limit and the current `after` keywords.
position = page["after"]
# Extend our list by adding in the returned IDs for this iteration.
id_list.extend(returned["body"]["resources"])
# Display running progress
print(f"Total: {total}\nPosition: {position}\nRetrieved so far: {len(id_list)}\n")
else:
# Set total to zero (0) to end the loop
total = 0
# Retrieve the errors branch
errors = returned["body"]["errors"]
# Display each error returned
for err in errors:
# Error code
ecode = err["code"]
# Error message
emsg = err["message"]
print(f"[{ecode}] {emsg}")
# Print our grand total
print(f"Total IDs retrieved: {total}")
Some CrowdStrike APIs support leveraging a timestamp to indicate your position within the resultset. (Example: QueryIntelIndicatorEntities) In this scenario, the marker value is specified as part of the filter
keyword and provided as a timestamp (seconds from epoch), and should be named _marker
.
This basic example demonstrates pagination interaction without addressing potential performance optimizations.
# Import the datetime module so we can calculate a _marker
import datetime
# Import the Intel Service Class
from falconpy import Intel
# Instantiate the Service Class
intel = Intel(client_id=CLIENT_ID,
client_secret=CLIENT_SECRET
)
# Calculate our current timestamp in seconds (%s),
# we will use this value for our _marker timestamp.
current_page = datetime.datetime.now().timestamp()
# List to hold the indicators retrieved
indicators_list = []
# The maximum number of records to return from the QueryIndicatorEntities operation. (1-5000)
LIMIT = 5000
# Sort for our results. We will sort ascending using our _marker timestamp.
SORT = "_marker.asc"
# Set total to one (1) so our initial loop starts. This will get reset by the API result.
total = 1
# Start retrieving indicators until our total is zero (0).
while total > 0:
# Retrieve a batch of indicators passing in our marker timestamp and limit
returned = intel.query_indicator_entities(filter=f"_marker:>='{current_page}'",
limit=LIMIT,
sort=SORT
)
if returned["status_code"] == 200:
# Retrieve the pagination detail for this result
page = returned["body"]["meta"]["pagination"]
# Based upon the timestamp within our _marker (first 10 characters),
# a total number of available indicators is shown in the 'total' key.
# This value will be reduced by our position from this timestamp as
# indicated by the unique string appended to the timestamp, so as our
# loop progresses, the total remaining will decrement. Due to the
# large number of indicators created per minute, this number will also
# grow slightly while the loop progresses as these new indicators are
# appended to the end of the resultset we are working with.
total = page["total"]
# Extend our indicators list by adding in the new records retrieved
indicators_list.extend(returned["body"]["resources"])
# Set our _marker to be the last one returned in our list,
# we will use this to grab the next page of results
current_page = indicators_list[-1].get("_marker", "")
# Display our running progress
print(f"Retrieved: {len(indicators_list)}, Remaining: {total}, Marker: {current_page}")
else:
# Retrieve all errors returned from the API
errors = returned["body"]["errors"]
# Tell the loop to stop processing
total = 0
# Display each error returned
for err in errors:
# Error code
ecode = err["code"]
# Error message
emsg = err["message"]
print(f"[{ecode}] {emsg}")
# Display the grand total of indicators retrieved
print(f"Total indicators retrieved: {len(indicators_list)}")
API responses from GraphQL will follow the same general JSON format detailed above, with differences starting to appear within the body
branch of the response.
{
"status_code": 200,
"headers": {
"Server": "nginx",
"Date": "Sat, 04 Feb 2023 17:52:51 GMT",
"Content-Type": "application/json; charset=utf-8",
"Content-Length": "348",
"Connection": "keep-alive",
"Cache-Control": "no-cache",
"Content-Encoding": "gzip",
"Etag": "REDACTED",
"Expires": "Sat, 04 Feb 2023 17:52:50 GMT",
"Pragma": "no-cache",
"Strict-Transport-Security": "max-age=15724800; includeSubDomains, max-age=31536000; includeSubDomains",
"X-Appliance-Date": "2023-02-04T17:52:51+00:00",
"X-Appliance-Id": "REDACTED",
"X-Content-Type-Options": "nosniff",
"X-Cs-Region": "us-1",
"X-Cs-Traceid": "REDACTED",
"X-Dns-Prefetch-Control": "off",
"X-Download-Options": "noopen",
"X-Frame-Options": "SAMEORIGIN",
"X-Powered-By": "Express",
"X-Preempt-Version": "5.40.43656",
"X-Ratelimit-Limit": "6000",
"X-Ratelimit-Remaining": "5980, 5995",
"X-Xss-Protection": "1; mode=block"
},
"body": {
"data": {
"entities": {
"nodes": [
{
"primaryDisplayName": "example-host1",
"secondaryDisplayName": "example.com\\example-host1",
"accounts": [
{
"domain": "example.com"
}
]
},
{
"primaryDisplayName": "example-host2",
"secondaryDisplayName": "example.com\\example-host2",
"accounts": [
{
"domain": "example.com"
}
]
}
],
"pageInfo": {
"hasNextPage": true,
"endCursor": "eyJfaWQiOiIwODc4OTdlZi0xYTExLTQ3MjUtYjVkZi0yM2NlMDU5Y2ZmM2MifQ=="
}
}
},
"extensions": {
"runTime": 57,
"remainingPoints": 499995,
"reset": 9997,
"consumedPoints": 2
}
}
}
Pagination details will be found in the pageInfo
branch (nested within the entities
branch of the main data
branch).
# Import the IdentityProtection Service Class
from falconpy import IdentityProtection
# Create a constant that contains our base query
IDP_QUERY = """
query ($after: Cursor) {
entities(types: [USER], archived: false, learned: false, first: 5, after: $after) {
nodes {
primaryDisplayName
secondaryDisplayName
accounts {
... on ActiveDirectoryAccountDescriptor {
domain
}
}
}
pageInfo {
hasNextPage
endCursor
}
}
}
"""
# Create an instance of the IdentityProtection Service Class, providing our
# credentials in the process. Do NOT hard code credentials.
idp = IdentityProtection(client_id=CLIENT_ID, client_secret=CLIENT_SECRET)
# Boolean to handle the processing of our loop
running = True
# List containing our returned results
returned_nodes = []
# Our variables dictionary that will store our pagination token
variables = None
# Start our loop
while running:
# Provide a status update to the user
print("Requesting a page of results")
# Query for a batch of results, providing the variables dictionary.
# For the first call, this value will be null.
result = idp.graphql(query=IDP_QUERY, variables=variables)
# Check for successful API response
if result["status_code"] != 200:
# Bad response, quit the process
raise SystemExit("API error, check query contents.")
# If results were returned
if result["body"]["data"].get("entities"):
# No nodes were returned for this iteration
if "nodes" in result["body"]["data"]["entities"]:
# Extend our list of returned results with the contents of this batch
returned_nodes.extend(result["body"]["data"]["entities"]["nodes"])
# Grab the page info branch so we can retrieve our pagination detail
page_info = result["body"]["data"]["entities"]["pageInfo"]
# If the token is present
if page_info["hasNextPage"]:
# Add this value to our variables dictionary for the next iteration
variables = {
"after": page_info["endCursor"]
}
else:
# Break the loop
running = False
else:
# Break the loop
running = False
else:
# No results were found
raise SystemExit("No results returned.")
# Inform the user of our returned results
for node in returned_nodes:
print(node["primaryDisplayName"])
- Home
- Discussions Board
- Glossary of Terms
- Installation, Upgrades and Removal
- Samples Collection
- Using FalconPy
- API Operations
-
Service Collections
- Alerts
- API Integrations
- ASPM
- Certificate Based Exclusions
- Cloud Connect AWS (deprecated)
- Cloud Snapshots
- Compliance Assessments
- Configuration Assessment
- Configuration Assessment Evaluation Logic
- Container Alerts
- Container Detections
- Container Images
- Container Packages
- Container Vulnerabilities
- CSPM Registration
- Custom IOAs
- Custom Storage
- D4C Registration (deprecated)
- DataScanner
- Delivery Settings
- Detects
- Device Control Policies
- Discover
- Downloads
- Drift Indicators
- Event Streams
- Exposure Management
- Falcon Complete Dashboard
- Falcon Container
- Falcon Intelligence Sandbox
- FDR
- FileVantage
- Firewall Management
- Firewall Policies
- Foundry LogScale
- Host Group
- Host Migration
- Hosts
- Identity Protection
- Image Assessment Policies
- Incidents
- Installation Tokens
- Intel
- IOA Exclusions
- IOC
- IOCs (deprecated)
- Kubernetes Protection
- MalQuery
- Message Center
- ML Exclusions
- Mobile Enrollment
- MSSP (Flight Control)
- OAuth2
- ODS (On Demand Scan)
- Overwatch Dashboard
- Prevention Policy
- Quarantine
- Quick Scan
- Quick Scan Pro
- Real Time Response
- Real Time Response Admin
- Real Time Response Audit
- Recon
- Report Executions
- Response Policies
- Sample Uploads
- Scheduled Reports
- Sensor Download
- Sensor Update Policy
- Sensor Usage
- Sensor Visibility Exclusions
- Spotlight Evaluation Logic
- Spotlight Vulnerabilities
- Tailored Intelligence
- ThreatGraph
- Unidentified Containers
- User Management
- Workflows
- Zero Trust Assessment
- Documentation Support
-
CrowdStrike SDKs
- Crimson Falcon - Ruby
- FalconPy - Python 3
- FalconJS - Javascript
- goFalcon - Go
- PSFalcon - Powershell
- Rusty Falcon - Rust