Skip to content

Response Handling

Joshua Hiller edited this page Feb 4, 2023 · 38 revisions

CrowdStrike Falcon Twitter URL

Consuming responses from the CrowdStrike API

Documentation Version Page Updated

Response types

The responses received from the CrowdStrike Falcon API will be in either binary or JSON format, with the bulk of these responses being JSON. All requests to the API resulting in an error will produce a JSON formatted response.

Passing credentials

WARNING

client_id and client_secret are keyword arguments that contain your CrowdStrike API credentials. Please note that all examples below do not hard code these values. (These values are ingested as strings.)

CrowdStrike does NOT recommend hard coding API credentials or customer identifiers within source code.

Binary responses

Binary responses are files that have been requested by the operation performed, and can have varying file formats. These return payloads are intended to be saved locally as part of your handling of the result.

Example

from falconpy import SampleUploads

falcon = SampleUploads(client_id=CLIENT_ID,
                       client_secret=CLIENT_SECRET
                       )

# File unique identifier
file_sha = "50d858e0985ecc7f60418aaf0cc5ab587f42c2570a884095a9e8ccacd0f6545c"

# Name of the file we are saving our result to. The extension should
# be aligned to match the requested file format. (This particular
# API operation example returns the result in ZIP format.)
save_file = "api_response.zip"

# We use the 'password_protected' keyword to inform the API we want
# the result bundled in a zip archive with a password applied.
response = falcon.get_sample(password_protected=True, ids=file_sha)

# We can then write the response we've received from the API to a file.
with open(save_file, 'wb') as save_to:
    save_to.write(response)

Result object expansion

Beginning in v1.1.0, FalconPy supports result object expansion. Result object expansion provides support for scenarios where a binary response is expected as the result of the operation, but HTTP status code or headers retrieval is also necessary. Result expansion is supported for all operations, but provides little advantage for operations that do not produce binary responses.

Expanded results can be requested using the expand_result keyword when calling the operation. Results are returned as a tuple, (status_code, headers, content).

Example

# Pass a boolean True to the `expand_result` keyword to request expanded results.
download_result = samples.get_sample(ids=file_sha, expand_result=True)

# We're returned a tuple (status_code, headers, content)
# Status will be in 0
print(f"Status returned: {download_result[0]}")
# Headers will be in 1
print(f"Headers returned: {download_result[1]}")
# File content will be in 2
with open(example_file, "wb") as download_file:
    download_file.write(download_result[2])

JSON responses

The bulk of the responses from the CrowdStrike Falcon API are in JSON format and have a standardized structure across all API service collections.

{
    "status_code": integer,
    "headers": {
        "Content-Encoding": "string",
        "Content-Length": "integer string",
        "Content-Type": "application/json",
        "Date": "GMT timestamp",
        "X-Cs-Region": "string",
        "X-Ratelimit-Limit": "integer string",          
        "X-Ratelimit-Remaining": "integer string"
    },
    "body": {
        "meta": {
            "query_time": float,
            "pagination": {
                "offset": integer,
                "limit": integer,
                "total": integer
            },
            "powered_by": "string",
            "trace_id": "1c142b37-fa3f-4e04-9042-12345678e8d3"
        },
        "errors": [],
        "resources": [
            Results will be returned here either as a list of strings,
            a list of integers, or a list of JSON dictionaries
        ]
    }
}

Key elements found within an API response

API responses are segregated into 3 main branches.

Branch name Description
status_code This will contain the resulting HTTP status code received from the API in integer format. These results follow standard HTTP status code syntax.
  • 200 - 299 - Successes
  • 400 - 499 - Failures
  • 500 - 599 - Server errors
headers The header branch contains details regarding the returned payload. This will include content details, the time of the request, the CrowdStrike cloud returning the response, and the overall remaining requests available within your current rate limit.
body This branch contains the primary result for the response. The body branch has three sub-branches.
  • meta - Additional metadata regarding the request. This will include overall query time, pagination details and the trace_id.
  • errors - Errors received from the API will be found in this branch and will vary based upon the type of error generated. The errors branch is always a list, with individual errors returned as objects within the list. Each error object will contain a single code and message value.
  • resources - The resources branch is always a list, and contains the successful results received from the API. These results may be a list of strings, a list of integers, or a list of dictionaries depending upon the API and the operation performed.
Example

In this example we will interact with the Hosts API to create a list of IDs. For example purposes, we're going to drop the limit down and build our list using multiple requests to the API.

# Maximum number of rows to return. Different
# operations support different limit maximums.
max_rows = 100
# Set our total to one so our loop begins
total = 1
# Start with the first record
offset = 0
# List to hold all of the IDs returned by our example
all_ids = []
# Start our loop
while offset < total:
    # We use the same integer we use to control our loop for our offset.
    response = falcon.query_devices_by_filter(sort="hostname|asc",
                                              limit=max_rows,
                                              offset=offset
                                              )
    if response["status_code"] == 200:
        # Retrieve our body branch
        result = response["body"]
        # Retrieve the value of the offset.
        offset = result["meta"]["pagination"]["offset"]
        # This will be the same every time, overrides our initial value of 1.
        total = result["meta"]["pagination"]["total"]
        # Retrieve the list of IDs returned.
        id_list = result["resources"]
        # Append this list to our running list of all IDs.
        # In a normal program, processing would occur here.
        all_ids.extend(id_list)
    else:
        # API error has occurred
        for error_result in response["body"]["errors"]:
            print(error_result["message"])
# We're done, show our entire list.
print(all_ids)

Paginating JSON responses

Depending on the API operation performed, there may be more results available than what can be returned by the API. When this occurs, metadata is present within the result to allow developers to specify the next page of results. As demonstrated in the example above, results pagination is typically handled by leveraging the value of offset and total to determine your position within the recordset (as determined by the value specified by limit or by the API maximum) and to retrieve the next batch of results. For larger data sets, CrowdStrike APIs support one of two variations of deep pagination, tokenized and timestamp.

Deep pagination leveraging tokens (Tokenized)

For scenarios where the amount of data requested exceeds limits set by the API, developers should leverage deep pagination to achieve the desired result. For most APIs, this is handled by using a string token to indicate the position within the overall recordset, and will be named after. There are a couple of API operations where this token will be stored in offset instead. Please refer to the documenation for that specific Service Collection and Operation for more detail as to which value to use. (See QueryDevicesByFilterScroll for an example of this. A sample has also been developed discussing the two variations of offset found within the Hosts Service Collection.)

Example

In this example we will interact with the QueryVulnerabilities operation within the Spotlight Vulnerabilities API. This operation provides an example of deep pagination that leverages tokenized offsets.

# Import the Spotlight Vulnerabilities Service Class
from falconpy import SpotlightVulnerabilities

# Instantiate the Service Class.
spotlight = SpotlightVulnerabilities(client_id=CLIENT_ID,
                                     client_secret=CLIENT_SECRET
                                     )
# Total number of records to retrieve per request to the QueryVulnerabilities operation. (1-400)
LIMIT = 400
# A simple filter for the query operation
FILTER = "last_seen_within:'45'"
# List to hold our retrieved IDs
id_list = []
# We set our total to one (1) so our initial loop step executes,
# this value is reset by the results of our API call.
total = 1
# We will store the returned pagination attribute 'after' here, and then
# pass it to the method on subsequent executions of the loop. We do 
# not pass a value for the after keyword on the first iteration.
position = None
# For this example our loop runs as long as our list holds less than the total results available
while len(id_list) < total:
    # Query the Spotlight Vulnerabilities API using the FILTER constant defined above.
    # Set the limit to be the value of the LIMIT constant, and our positional after token to be
    # the value of the 'position' variable.
    returned = spotlight.query_vulnerabilities(limit=LIMIT, filter=FILTER, after=position)
    if returned["status_code"] == 200:
        # Retrieve pagination detail
        page = returned["body"]["meta"]["pagination"]
        # Total records returned for the filter used
        total = page["total"]
        # The 'position' variable holds our next positional token based
        # upon the values of limit and the current `after` keywords.
        position = page["after"]
        # Extend our list by adding in the returned IDs for this iteration.
        id_list.extend(returned["body"]["resources"])
        # Display running progress
        print(f"Total: {total}\nPosition: {position}\nRetrieved so far: {len(id_list)}\n")
    else:
        # Set total to zero (0) to end the loop
        total = 0
        # Retrieve the errors branch
        errors = returned["body"]["errors"]
        # Display each error returned
        for err in errors:
            # Error code
            ecode = err["code"]
            # Error message
            emsg = err["message"]
            print(f"[{ecode}] {emsg}")

# Print our grand total
print(f"Total IDs retrieved: {total}")
Deep pagination leveraging markers (Timestamp)

Some CrowdStrike APIs support leveraging a timestamp to indicate your position within the resultset. (Example: QueryIntelIndicatorEntities) In this scenario, the marker value is specified as part of the filter keyword and provided as a timestamp (seconds from epoch), and should be named _marker.

This basic example demonstrates pagination interaction without addressing potential performance optimizations.

# Import the datetime module so we can calculate a _marker
import datetime
# Import the Intel Service Class
from falconpy import Intel

# Instantiate the Service Class
intel = Intel(client_id=CLIENT_ID,
              client_secret=CLIENT_SECRET
              )
# Calculate our current timestamp in seconds (%s),
# we will use this value for our _marker timestamp.
current_page = datetime.datetime.now().timestamp()
# List to hold the indicators retrieved
indicators_list = []
# The maximum number of records to return from the QueryIndicatorEntities operation. (1-5000)
LIMIT = 5000
# Sort for our results. We will sort ascending using our _marker timestamp.
SORT = "_marker.asc"
# Set total to one (1) so our initial loop starts. This will get reset by the API result.
total = 1
# Start retrieving indicators until our total is zero (0).
while total > 0:
    # Retrieve a batch of indicators passing in our marker timestamp and limit
    returned = intel.query_indicator_entities(filter=f"_marker:>='{current_page}'",
                                              limit=LIMIT,
                                              sort=SORT
                                              )
    if returned["status_code"] == 200:
        # Retrieve the pagination detail for this result
        page = returned["body"]["meta"]["pagination"]
        # Based upon the timestamp within our _marker (first 10 characters),
        # a total number of available indicators is shown in the 'total' key.
        # This value will be reduced by our position from this timestamp as
        # indicated by the unique string appended to the timestamp, so as our
        # loop progresses, the total remaining will decrement. Due to the 
        # large number of indicators created per minute, this number will also
        # grow slightly while the loop progresses as these new indicators are 
        # appended to the end of the resultset we are working with.
        total = page["total"]
        # Extend our indicators list by adding in the new records retrieved
        indicators_list.extend(returned["body"]["resources"])
        # Set our _marker to be the last one returned in our list,
        # we will use this to grab the next page of results
        current_page = indicators_list[-1].get("_marker", "")
        # Display our running progress
        print(f"Retrieved: {len(indicators_list)}, Remaining: {total}, Marker: {current_page}")
    else:
        # Retrieve all errors returned from the API
        errors = returned["body"]["errors"]
        # Tell the loop to stop processing
        total = 0
        # Display each error returned
        for err in errors:
            # Error code
            ecode = err["code"]
            # Error message
            emsg = err["message"]
            print(f"[{ecode}] {emsg}")

# Display the grand total of indicators retrieved
print(f"Total indicators retrieved: {len(indicators_list)}")
Paginating GraphQL operations

API responses from GraphQL will follow the same general JSON format detailed above, with differences starting to appear within the body branch of the response.

Example GraphQL operation response
{
    "status_code": 200,
    "headers": {
        "Server": "nginx",
        "Date": "Sat, 04 Feb 2023 17:52:51 GMT",
        "Content-Type": "application/json; charset=utf-8",
        "Content-Length": "348",
        "Connection": "keep-alive",
        "Cache-Control": "no-cache",
        "Content-Encoding": "gzip",
        "Etag": "REDACTED",
        "Expires": "Sat, 04 Feb 2023 17:52:50 GMT",
        "Pragma": "no-cache",
        "Strict-Transport-Security": "max-age=15724800; includeSubDomains, max-age=31536000; includeSubDomains",
        "X-Appliance-Date": "2023-02-04T17:52:51+00:00",
        "X-Appliance-Id": "REDACTED",
        "X-Content-Type-Options": "nosniff",
        "X-Cs-Region": "us-1",
        "X-Cs-Traceid": "REDACTED",
        "X-Dns-Prefetch-Control": "off",
        "X-Download-Options": "noopen",
        "X-Frame-Options": "SAMEORIGIN",
        "X-Powered-By": "Express",
        "X-Preempt-Version": "5.40.43656",
        "X-Ratelimit-Limit": "6000",
        "X-Ratelimit-Remaining": "5980, 5995",
        "X-Xss-Protection": "1; mode=block"
    },
    "body": {
        "data": {
            "entities": {
                "nodes": [
                    {
                        "primaryDisplayName": "example-host1",
                        "secondaryDisplayName": "example.com\\example-host1",
                        "accounts": [
                            {
                                "domain": "example.com"
                            }
                        ]
                    },
                    {
                        "primaryDisplayName": "example-host2",
                        "secondaryDisplayName": "example.com\\example-host2",
                        "accounts": [
                            {
                                "domain": "example.com"
                            }
                        ]
                    }
                ],
                "pageInfo": {
                    "hasNextPage": true,
                    "endCursor": "PAGINATION_TOKEN_STRING"
                }
            }
        },
        "extensions": {
            "runTime": 57,
            "remainingPoints": 499995,
            "reset": 9997,
            "consumedPoints": 2
        }
    }
}
GraphQL pagination example

For this example, pagination details will be found in the pageInfo branch (nested within the entities branch of the main data branch).

# Import the IdentityProtection Service Class
from falconpy import IdentityProtection

# Create a constant that contains our base query
IDP_QUERY = """
query ($after: Cursor) {
  entities(types: [USER], archived: false, learned: false, first: 5, after: $after) {
    nodes {
      primaryDisplayName
      secondaryDisplayName
      accounts {
        ... on ActiveDirectoryAccountDescriptor {
          domain
        }
      }
    }
    pageInfo {
      hasNextPage
      endCursor
    }
  }
}
"""
# Create an instance of the IdentityProtection Service Class, providing our
# credentials in the process. Do NOT hard code credentials.
idp = IdentityProtection(client_id=CLIENT_ID, client_secret=CLIENT_SECRET)
# Boolean to handle the processing of our loop
running = True
# List containing our returned results
returned_nodes = []
# Our variables dictionary that will store our pagination token
variables = None
# Start our loop
while running:
    # Provide a status update to the user
    print("Requesting a page of results")
    # Query for a batch of results, providing the variables dictionary.
    # For the first call, this value will be null.
    result = idp.graphql(query=IDP_QUERY, variables=variables)
    # Check for successful API response
    if result["status_code"] != 200:
        # Bad response, quit the process
        raise SystemExit("API error, check query contents.")
    # If results were returned
    if result["body"]["data"].get("entities"):
        # No nodes were returned for this iteration
        if "nodes" in result["body"]["data"]["entities"]:
            # Extend our list of returned results with the contents of this batch
            returned_nodes.extend(result["body"]["data"]["entities"]["nodes"])
            # Grab the page info branch so we can retrieve our pagination detail
            page_info = result["body"]["data"]["entities"]["pageInfo"]
            # If the token is present
            if page_info["hasNextPage"]:
                # Add this value to our variables dictionary for the next iteration
                variables = {
                    "after": page_info["endCursor"]
                }
            else:
                # Break the loop
                running = False
        else:
            # Break the loop
            running = False
    else:
        # No results were found
        raise SystemExit("No results returned.")

# Inform the user of our returned results
for node in returned_nodes:
    print(node["primaryDisplayName"])

CrowdStrike Falcon

Clone this wiki locally