diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5401833 --- /dev/null +++ b/.gitignore @@ -0,0 +1,92 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# IPython Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# dotenv +.env + +# virtualenv +venv/ +ENV/ + +# Spyder project settings +.spyderproject + +# Rope project settings +.ropeproject + +# OS X Bullshit +.DS_Store diff --git a/LICENSE b/LICENSE index 261eeb9..8dada3e 100644 --- a/LICENSE +++ b/LICENSE @@ -178,7 +178,7 @@ APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" + boilerplate notice, with the fields enclosed by brackets "{}" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a @@ -186,7 +186,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright [yyyy] [name of copyright owner] + Copyright {yyyy} {name of copyright owner} Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..ebcaa28 --- /dev/null +++ b/README.md @@ -0,0 +1,2 @@ +# critsapi +Python library for interfacing with the CRITs API and raw MongoDB diff --git a/critsapi/__init__.py b/critsapi/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/critsapi/critsapi.py b/critsapi/critsapi.py new file mode 100644 index 0000000..f537405 --- /dev/null +++ b/critsapi/critsapi.py @@ -0,0 +1,786 @@ +import datetime +import json +import logging +import os +import requests + +from critsapi.exceptions import CRITsOperationalError +from critsapi.exceptions import CRITsInvalidTypeError +from critsapi.vocabulary import IndicatorThreatTypes as itt +from critsapi.vocabulary import IndicatorAttackTypes as iat + +log = logging.getLogger() + + +class CRITsAPI(): + + def __init__(self, api_url='', api_key='', username='', verify=True, + proxies={}): + self.url = api_url + if self.url[-1] == '/': + self.url = self.url[:-1] + self.api_key = api_key + self.username = username + self.verify = verify + self.proxies = proxies + + def get_object(self, obj_id, obj_type): + type_trans = self._type_translation(obj_type) + get_url = '{}/{}/{}/'.format(self.url, type_trans, obj_id) + params = { + 'username': self.username, + 'api_key': self.api_key, + } + r = requests.get(get_url, params=params, proxies=self.proxies, + verify=self.verify) + if r.status_code == 200: + return json.loads(r.text) + else: + print('Status code returned for query {}, ' + 'was: {}'.format(get_url, r.status_code)) + return None + + def add_indicator(self, + value, + itype, + source='', + reference='', + method='', + campaign=None, + confidence=None, + bucket_list=[], + ticket='', + add_domain=True, + add_relationship=True, + indicator_confidence='unknown', + indicator_impact='unknown', + threat_type=itt.UNKNOWN, + attack_type=iat.UNKNOWN, + description=''): + """ + Add an indicator to CRITs + + Args: + value: The indicator itself + itype: The overall indicator type. See your CRITs vocabulary + source: Source of the information + reference: A reference where more information can be found + method: The method for adding this indicator + campaign: If the indicator has a campaign, add it here + confidence: The confidence this indicator belongs to the given + campaign + bucket_list: Bucket list items for this indicator + ticket: A ticket associated with this indicator + add_domain: If the indicator is a domain, it will automatically + add a domain TLO object. + add_relationship: If add_domain is True, this will create a + relationship between the indicator and domain TLOs + indicator_confidence: The confidence of the indicator + indicator_impact: The impact of the indicator + threat_type: The threat type of the indicator + attack_type: the attack type of the indicator + description: A description of this indicator + Returns: + JSON object for the indicator or None if it failed. + """ + # Time to upload these indicators + data = { + 'api_key': self.api_key, + 'username': self.username, + 'source': source, + 'reference': reference, + 'method': '', + 'campaign': campaign, + 'confidence': confidence, + 'bucket_list': ','.join(bucket_list), + 'ticket': ticket, + 'add_domain': True, + 'add_relationship': True, + 'indicator_confidence': indicator_confidence, + 'indicator_impact': indicator_impact, + 'type': itype, + 'threat_type': threat_type, + 'attack_type': attack_type, + 'value': value, + 'description': description, + } + + r = requests.post("{0}/indicators/".format(self.url), data=data, + verify=self.verify, proxies=self.proxies) + if r.status_code == 200: + log.debug("Indicator uploaded successfully - {}".format(value)) + ind = json.loads(r.text) + return ind + + return None + + def add_event(self, + source, + reference, + event_title, + event_type, + method='', + description='', + bucket_list=[], + campaign='', + confidence='', + date=None): + """ + Adds an event. If the event name already exists, it will return that + event instead. + + Args: + source: Source of the information + reference: A reference where more information can be found + event_title: The title of the event + event_type: The type of event. See your CRITs vocabulary. + method: The method for obtaining the event. + description: A text description of the event. + bucket_list: A list of bucket list items to add + campaign: An associated campaign + confidence: The campaign confidence + date: A datetime.datetime object of when the event occurred. + Returns: + A JSON event object or None if there was an error. + """ + # Check to see if the event already exists + events = self.get_events(event_title) + if events is not None: + if events['meta']['total_count'] == 1: + return events['objects'][0] + if events['meta']['total_count'] > 1: + log.error('Multiple events found while trying to add the event' + ': {}'.format(event_title)) + return None + # Now we can create the event + data = { + 'api_key': self.api_key, + 'username': self.username, + 'source': source, + 'reference': reference, + 'method': method, + 'campaign': campaign, + 'confidence': confidence, + 'description': description, + 'event_type': event_type, + 'date': date, + 'title': event_title, + 'bucket_list': ','.join(bucket_list), + } + + r = requests.post('{}/events/'.format(self.url), data=data, + verify=self.verify, proxies=self.proxies) + if r.status_code == 200: + log.debug('Event created: {}'.format(event_title)) + json_obj = json.loads(r.text) + if 'id' not in json_obj: + log.error('Error adding event. id not returned.') + return None + return json_obj + else: + log.error('Event creation failed with status code: ' + '{}'.format(r.status_code)) + return None + + def add_sample_file(self, + sample_path, + source, + reference, + method='', + file_format='raw', + file_password='', + sample_name='', + campaign='', + confidence='', + description='', + bucket_list=[]): + """ + Adds a file sample. For meta data only use add_sample_meta. + + Args: + sample_path: The path on disk of the sample to upload + source: Source of the information + reference: A reference where more information can be found + method: The method for obtaining the sample. + file_format: Must be raw, zip, or rar. + file_password: The password of a zip or rar archived sample + sample_name: Specify a filename for the sample rather than using + the name on disk + campaign: An associated campaign + confidence: The campaign confidence + description: A text description of the sample + bucket_list: A list of bucket list items to add + Returns: + A JSON sample object or None if there was an error. + """ + if os.path.isfile(sample_path): + data = { + 'api_key': self.api_key, + 'username': self.username, + 'source': source, + 'reference': reference, + 'method': method, + 'filetype': file_format, + 'upload_type': 'file', + 'campaign': campaign, + 'confidence': confidence, + 'description': description, + 'bucket_list': ','.join(bucket_list), + } + if sample_name != '': + data['filename'] = sample_name + with open(sample_path, 'rb') as fdata: + if file_password: + data['password'] = file_password + r = requests.post('{0}/samples/'.format(self.url), + data=data, + files={'filedata': fdata}, + verify=self.verify, + proxies=self.proxies) + if r.status_code == 200: + result_data = json.loads(r.text) + return result_data + else: + log.error('Error with status code {0} and message ' + '{1}'.format(r.status_code, r.text)) + return None + + def add_sample_meta(self, + source, + reference, + method='', + filename='', + md5='', + sha1='', + sha256='', + size='', + mimetype='', + campaign='', + confidence='', + description='', + bucket_list=[]): + """ + Adds a metadata sample. To add an actual file, use add_sample_file. + + Args: + source: Source of the information + reference: A reference where more information can be found + method: The method for obtaining the sample. + filename: The name of the file. + md5: An MD5 hash of the file. + sha1: SHA1 hash of the file. + sha256: SHA256 hash of the file. + size: size of the file. + mimetype: The mimetype of the file. + campaign: An associated campaign + confidence: The campaign confidence + bucket_list: A list of bucket list items to add + upload_type: Either 'file' or 'meta' + Returns: + A JSON sample object or None if there was an error. + """ + data = { + 'api_key': self.api_key, + 'username': self.username, + 'source': source, + 'reference': reference, + 'method': method, + 'filename': filename, + 'md5': md5, + 'sha1': sha1, + 'sha256': sha256, + 'size': size, + 'mimetype': mimetype, + 'upload_type': 'meta', + 'campaign': campaign, + 'confidence': confidence, + 'bucket_list': ','.join(bucket_list), + } + r = requests.post('{0}/samples/'.format(self.url), + data=data, + verify=self.verify, + proxies=self.proxies) + if r.status_code == 200: + result_data = json.loads(r.text) + return result_data + else: + log.error('Error with status code {0} and message ' + '{1}'.format(r.status_code, r.text)) + return None + + def add_email(self, + email_path, + source, + reference, + method='', + upload_type='raw', + campaign='', + confidence='', + description='', + bucket_list=[], + password=''): + """ + Add an email object to CRITs. Only RAW, MSG, and EML are supported + currently. + + Args: + email_path: The path on disk of the email. + source: Source of the information + reference: A reference where more information can be found + method: The method for obtaining the email. + upload_type: 'raw', 'eml', or 'msg' + campaign: An associated campaign + confidence: The campaign confidence + description: A description of the email + bucket_list: A list of bucket list items to add + password: A password for a 'msg' type. + Returns: + A JSON email object from CRITs or None if there was an error. + """ + if not os.path.isfile(email_path): + log.error('{} is not a file'.format(email_path)) + return None + with open(email_path, 'rb') as fdata: + data = { + 'api_key': self.api_key, + 'username': self.username, + 'source': source, + 'reference': reference, + 'method': method, + 'upload_type': upload_type, + 'campaign': campaign, + 'confidence': confidence, + 'bucket_list': bucket_list, + 'description': description, + } + if password: + data['password'] = password + r = requests.post("{0}/emails/".format(self.url), + data=data, + files={'filedata': fdata}, + verify=self.verify, + proxies=self.proxies) + if r.status_code == 200: + result_data = json.loads(r.text) + return result_data + else: + print('Error with status code {0} and message ' + '{1}'.format(r.status_code, r.text)) + return None + + def add_backdoor(self, + backdoor_name, + source, + reference, + method='', + aliases=[], + version='', + campaign='', + confidence='', + description='', + bucket_list=[]): + """ + Add a backdoor object to CRITs. + + Args: + backdoor_name: The primary name of the backdoor + source: Source of the information + reference: A reference where more information can be found + method: The method for obtaining the backdoor information. + aliases: List of aliases for the backdoor. + version: Version + campaign: An associated campaign + confidence: The campaign confidence + description: A description of the email + bucket_list: A list of bucket list items to add + """ + data = { + 'api_key': self.api_key, + 'username': self.username, + 'source': source, + 'reference': reference, + 'method': method, + 'name': backdoor_name, + 'aliases': ','.join(aliases), + 'version': version, + 'campaign': campaign, + 'confidence': confidence, + 'bucket_list': bucket_list, + 'description': description, + } + r = requests.post('{0}/backdoors/'.format(self.url), + data=data, + verify=self.verify, + proxies=self.proxies) + if r.status_code == 200: + result_data = json.loads(r.text) + return result_data + else: + log.error('Error with status code {0} and message ' + '{1}'.format(r.status_code, r.text)) + return None + + def add_profile_point(self, + value, + source='', + reference='', + method='', + ticket='', + campaign=None, + confidence=None, + bucket_list=[]): + """ + Add an indicator to CRITs + + Args: + value: The profile point itself + source: Source of the information + reference: A reference where more information can be found + method: The method for adding this indicator + campaign: If the indicator has a campaign, add it here + confidence: The confidence this indicator belongs to the given + campaign + bucket_list: Bucket list items for this indicator + ticket: A ticket associated with this indicator + Returns: + JSON object for the indicator or None if it failed. + """ + # Time to upload these indicators + data = { + 'api_key': self.api_key, + 'username': self.username, + 'source': source, + 'reference': reference, + 'method': '', + 'campaign': campaign, + 'confidence': confidence, + 'bucket_list': ','.join(bucket_list), + 'ticket': ticket, + 'value': value, + } + + r = requests.post("{0}/profile_points/".format(self.url), data=data, + verify=self.verify, proxies=self.proxies) + if r.status_code == 200: + log.debug("Profile Point uploaded successfully - {}".format(value)) + pp = json.loads(r.text) + return pp + + return None + + def get_events(self, event_title, regex=False): + """ + Search for events with the provided title + + Args: + event_title: The title of the event + Returns: + An event JSON object returned from the server with the following: + { + "meta":{ + "limit": 20, "next": null, "offset": 0, + "previous": null, "total_count": 3 + }, + "objects": [{}, {}, etc] + } + or None if an error occurred. + """ + regex_val = 0 + if regex: + regex_val = 1 + r = requests.get('{0}/events/?api_key={1}&username={2}&c-title=' + '{3}®ex={4}'.format(self.url, self.api_key, + self.username, event_title, + regex_val), verify=self.verify) + if r.status_code == 200: + json_obj = json.loads(r.text) + return json_obj + else: + log.error('Non-200 status code from get_event: ' + '{}'.format(r.status_code)) + return None + + def get_samples(self, md5='', sha1='', sha256=''): + """ + Searches for a sample in CRITs. Currently only hashes allowed. + + Args: + md5: md5sum + sha1: sha1sum + sha256: sha256sum + Returns: + JSON response or None if not found + """ + params = {'api_key': self.api_key, 'username': self.username} + if md5: + params['c-md5'] = md5 + if sha1: + params['c-sha1'] = sha1 + if sha256: + params['c-sha256'] = sha256 + r = requests.get('{0}/samples/'.format(self.url), + params=params, + verify=self.verify, + proxies=self.proxies) + if r.status_code == 200: + result_data = json.loads(r.text) + if 'meta' in result_data: + if 'total_count' in result_data['meta']: + if result_data['meta']['total_count'] > 0: + return result_data + else: + log.error('Non-200 status code: {}'.format(r.status_code)) + return None + + def get_backdoors(self, name): + """ + Searches a backdoor given the name. Returns multiple results + + Args: + name: The name of the backdoor. This can be an alias. + Returns: + Returns a JSON object contain one or more backdoor results or + None if not found. + """ + params = {} + params['or'] = 1 + params['c-name'] = name + params['c-aliases__in'] = name + r = requests.get('{0}/backdoors/'.format(self.url), + params=params, + verify=self.verify, + proxies=self.proxies) + if r.status_code == 200: + result_data = json.loads(r.text) + if 'meta' in result_data: + if 'total_count' in result_data['meta']: + if result_data['meta']['total_count'] > 0: + return result_data + else: + log.error('Non-200 status code: {}'.format(r.status_code)) + return None + + def get_backdoor(self, name, version=''): + """ + Searches for the backdoor based on name and version. + + Args: + name: The name of the backdoor. This can be an alias. + version: The version. + Returns: + Returns a JSON object contain one or more backdoor results or + None if not found. + """ + params = {} + params['or'] = 1 + params['c-name'] = name + params['c-aliases__in'] = name + r = requests.get('{0}/backdoors/'.format(self.url), + params=params, + verify=self.verify, + proxies=self.proxies) + if r.status_code == 200: + result_data = json.loads(r.text) + if 'meta' not in result_data: + return None + if 'total_count' not in result_data['meta']: + return None + if result_data['meta']['total_count'] <= 0: + return None + if 'objects' not in result_data: + return None + for backdoor in result_data['objects']: + if 'version' in backdoor: + if backdoor['version'] == version: + return backdoor + else: + log.error('Non-200 status code: {}'.format(r.status_code)) + return None + + def has_relationship(self, left_id, left_type, right_id, right_type, + rel_type='Related To'): + """ + Checks if the two objects are related + + Args: + left_id: The CRITs ID of the first indicator + left_type: The CRITs TLO type of the first indicator + right_id: The CRITs ID of the second indicator + right_type: The CRITs TLO type of the second indicator + rel_type: The relationships type ("Related To", etc) + Returns: + True or False if the relationship exists or not. + """ + data = self.get_object(left_id, left_type) + if not data: + raise CRITsOperationalError('Crits Object not found with id {}' + 'and type {}'.format(left_id, + left_type)) + if 'relationships' not in data: + return False + for relationship in data['relationships']: + if relationship['relationship'] != rel_type: + continue + if relationship['value'] != right_id: + continue + if relationship['type'] != right_type: + continue + return True + return False + + def forge_relationship(self, left_id, left_type, right_id, right_type, + rel_type='Related To', rel_date=None, + rel_confidence='high', rel_reason=''): + """ + Forges a relationship between two TLOs. + + Args: + left_id: The CRITs ID of the first indicator + left_type: The CRITs TLO type of the first indicator + right_id: The CRITs ID of the second indicator + right_type: The CRITs TLO type of the second indicator + rel_type: The relationships type ("Related To", etc) + rel_date: datetime.datetime object for the date of the + relationship. If left blank, it will be datetime.datetime.now() + rel_confidence: The relationship confidence (high, medium, low) + rel_reason: Reason for the relationship. + Returns: + True if the relationship was created. False otherwise. + """ + if not rel_date: + rel_date = datetime.datetime.now() + type_trans = self._type_translation(left_type) + submit_url = '{}/{}/{}/'.format(self.url, type_trans, left_id) + + params = { + 'api_key': self.api_key, + 'username': self.username, + } + + data = { + 'action': 'forge_relationship', + 'right_type': right_type, + 'right_id': right_id, + 'rel_type': rel_type, + 'rel_date': rel_date, + 'rel_confidence': rel_confidence, + 'rel_reason': rel_reason + } + + r = requests.patch(submit_url, params=params, data=data, + proxies=self.proxies, verify=self.verify) + if r.status_code == 200: + log.debug('Relationship built successfully: {0} <-> ' + '{1}'.format(left_id, right_id)) + return True + else: + log.error('Error with status code {0} and message {1} between ' + 'these indicators: {2} <-> ' + '{3}'.format(r.status_code, r.text, left_id, right_id)) + return False + + def status_update(self, crits_id, crits_type, status): + """ + Update the status of the TLO. By default, the options are: + - New + - In Progress + - Analyzed + - Deprecated + + Args: + crits_id: The object id of the TLO + crits_type: The type of TLO. This must be 'Indicator', '' + status: The status to change. + Returns: + True if the status was updated. False otherwise. + Raises: + CRITsInvalidTypeError + """ + obj_type = self._type_translation(crits_type) + patch_url = "{0}/{1}/{2}/".format(self.url, obj_type, crits_id) + params = { + 'api_key': self.api_key, + 'username': self.username, + } + + data = { + 'action': 'status_update', + 'value': status, + } + + r = requests.patch(patch_url, params=params, data=data, + verify=self.verify, proxies=self.proxies) + if r.status_code == 200: + log.debug('Object {} set to {}'.format(crits_id, status)) + return True + else: + log.error('Attempted to set object id {} to ' + 'Informational, but did not receive a ' + '200'.format(crits_id)) + log.error('Error message was: {}'.format(r.text)) + return False + + def source_add_update(self, crits_id, crits_type, source, + action_type='add', method='', reference='', + date=None): + """ + date must be in the format "%Y-%m-%d %H:%M:%S.%f" + """ + type_trans = self._type_translation(crits_type) + submit_url = '{}/{}/{}/'.format(self.url, type_trans, crits_id) + + if date is None: + date = datetime.datetime.now() + date = datetime.datetime.strftime(date, '%Y-%m-%d %H:%M:%S.%f') + + params = { + 'api_key': self.api_key, + 'username': self.username, + } + + data = { + 'action': 'source_add_update', + 'action_type': action_type, + 'source': source, + 'method': method, + 'reference': reference, + 'date': date + } + + r = requests.patch(submit_url, params=params, data=json.dumps(data), + proxies=self.proxies, verify=self.verify) + if r.status_code == 200: + log.debug('Source {0} added successfully to {1} ' + '{2}'.format(source, crits_type, crits_id)) + return True + else: + log.error('Error with status code {0} and message {1} for ' + 'type {2} and id {3} and source ' + '{4}'.format(r.status_code, r.text, crits_type, + crits_id, source)) + return False + + def _type_translation(self, str_type): + """ + Internal method to translate the named CRITs TLO type to a URL + specific string. + """ + if str_type == 'Indicator': + return 'indicators' + if str_type == 'Domain': + return 'domains' + if str_type == 'IP': + return 'ips' + if str_type == 'Sample': + return 'samples' + if str_type == 'Event': + return 'events' + if str_type == 'Actor': + return 'actors' + if str_type == 'Email': + return 'emails' + if str_type == 'Backdoor': + return 'backdoors' + + raise CRITsInvalidTypeError('Invalid object type specified: ' + '{}'.format(str_type)) diff --git a/critsapi/critsdbapi.py b/critsapi/critsdbapi.py new file mode 100644 index 0000000..8ab6724 --- /dev/null +++ b/critsapi/critsdbapi.py @@ -0,0 +1,225 @@ +import datetime +import logging + +from bson.objectid import ObjectId +from pymongo import MongoClient + +log = logging.getLogger() + + +class CRITsDBAPI(): + """ + Interface to the raw CRITs mongodb backend. This is typically much faster + than using the provided web API. + + Most queries require a "collection" variable. The is a string of the + mongodb collection for the TLO in CRITs. It must follow the specific mongo + collection name for the corresponding TLO. The following are acceptable: + - indicators + - sample + - events + - backdoors + - exploits + - domains + - ips + """ + + def __init__(self, + mongo_uri='', + mongo_host='localhost', + mongo_port=27017, + mongo_user='', + mongo_pass='', + db_name='crits'): + """ + Create our CRITsDBAPI object. You may specify a full mongodb uri or + the arguments individually. + + Args: + mongo_uri: A full mongo uri in the form of: + mongodb://user:pass@mongo_server:port + mongo_host: The server name/ip where the mongo db is hosted + mongo_port: The port listening for connections + mongo_user: Mongo username (if using) + mongo_pass: Password for the user (if using) + db_name: The name of the CRITs database. + """ + # If the user provided a URI, we will use that. Otherwise we will build + # a URI from the other arguments. + if mongo_uri != '': + self.mongo_uri = mongo_uri + else: + # Build the authentication portion. Simple authentication only for + # now. + auth_str = '' + if mongo_user != '': + auth_str = mongo_user + if mongo_pass != '' and mongo_user != '': + auth_str = auth_str + ':' + mongo_pass + if auth_str != '': + auth_str = auth_str + '@' + # Build the URI + self.mongo_uri = 'mongodb://{}{}:{}'.format(auth_str, mongo_host, + mongo_port) + self.db_name = db_name + self.client = None + self.db = None + + def connect(self): + """ + Starts the mongodb connection. Must be called before anything else + will work. + """ + self.client = MongoClient(self.mongo_uri) + self.db = self.client[self.db_name] + + def find(self, collection, query): + """ + Search a collection for the query provided. Just a raw interface to + mongo to do any query you want. + + Args: + collection: The db collection. See main class documentation. + query: A mongo find query. + Returns: + pymongo Cursor object with the results. + """ + obj = getattr(self.db, collection) + result = obj.find(query) + return result + + def find_all(self, collection): + """ + Search a collection for all available items. + + Args: + collection: The db collection. See main class documentation. + Returns: + List of all items in the collection. + """ + obj = getattr(self.db, collection) + result = obj.find() + return result + + def find_one(self, collection, query): + """ + Search a collection for the query provided and return one result. Just + a raw interface to mongo to do any query you want. + + Args: + collection: The db collection. See main class documentation. + query: A mongo find query. + Returns: + pymongo Cursor object with the results. + """ + obj = getattr(self.db, collection) + result = obj.find_one(query) + return result + + def find_distinct(self, collection, key): + """ + Search a collection for the distinct key values provided. + + Args: + collection: The db collection. See main class documentation. + key: The name of the key to find distinct values. For example with + the indicators collection, the key could be "type". + Returns: + List of distinct values. + """ + obj = getattr(self.db, collection) + result = obj.distinct(key) + return result + + def add_embedded_campaign(self, id, collection, campaign, confidence, + analyst, date, description): + """ + Adds an embedded campaign to the TLO. + + Args: + id: the CRITs object id of the TLO + collection: The db collection. See main class documentation. + campaign: The campaign to assign. + confidence: The campaign confidence + analyst: The analyst making the assignment + date: The date of the assignment + description: A description + Returns: + The resulting mongo object + """ + if type(id) is not ObjectId: + id = ObjectId(id) + # TODO: Make sure the object does not already have the campaign + # Return if it does. Add it if it doesn't + obj = getattr(self.db, collection) + result = obj.find({'_id': id, 'campaign.name': campaign}) + if result.count() > 0: + return + else: + log.debug('Adding campaign to set: {}'.format(campaign)) + campaign_obj = { + 'analyst': analyst, + 'confidence': confidence, + 'date': date, + 'description': description, + 'name': campaign + } + result = obj.update( + {'_id': id}, + {'$push': {'campaign': campaign_obj}} + ) + return result + + def remove_bucket_list_item(self, id, collection, item): + """ + Removes an item from the bucket list + + Args: + id: the CRITs object id of the TLO + collection: The db collection. See main class documentation. + item: the bucket list item to remove + Returns: + The mongodb result + """ + if type(id) is not ObjectId: + id = ObjectId(id) + obj = getattr(self.db, collection) + result = obj.update( + {'_id': id}, + {'$pull': {'bucket_list': item}} + ) + return result + + def add_bucket_list_item(self, id, collection, item): + """ + Adds an item to the bucket list + + Args: + id: the CRITs object id of the TLO + collection: The db collection. See main class documentation. + item: the bucket list item to add + Returns: + The mongodb result + """ + if type(id) is not ObjectId: + id = ObjectId(id) + obj = getattr(self.db, collection) + result = obj.update( + {'_id': id}, + {'$addToSet': {'bucket_list': item}} + ) + return result + + def get_campaign_name_list(self): + """ + Returns a list of all valid campaign names + + Returns: + List of strings containing all valid campaign names + """ + campaigns = self.find('campaigns', {}) + campaign_names = [] + for campaign in campaigns: + if 'name' in campaign: + campaign_names.append(campaign['name']) + return campaign_names diff --git a/critsapi/exceptions.py b/critsapi/exceptions.py new file mode 100644 index 0000000..8be472e --- /dev/null +++ b/critsapi/exceptions.py @@ -0,0 +1,8 @@ +class CRITsOperationalError(Exception): + """ Critical error oh shiiiii""" + pass + + +class CRITsInvalidTypeError(Exception): + """Raised when an invalid TLO type is specified""" + pass diff --git a/critsapi/vocabulary.py b/critsapi/vocabulary.py new file mode 100644 index 0000000..5009999 --- /dev/null +++ b/critsapi/vocabulary.py @@ -0,0 +1,173 @@ +class IndicatorTypes(): + """ + Vocabulary for Indicator Types. + """ + ADJUST_TOKEN = "Adjust Token" + API_KEY = "API Key" + AS_NUMBER = "AS Number" + AS_NAME = "AS Name" + BANK_ACCOUNT = "Bank account" + BITCOIN_ACCOUNT = "Bitcoin account" + CERTIFICATE_FINGERPRINT = "Certificate Fingerprint" + CERTIFICATE_NAME = "Certificate Name" + CHECKSUM_CRC16 = "Checksum CRC16" + CMD_LINE = "Command Line" + COMPANY_NAME = "Company name" + COOKIE_NAME = "Cookie Name" + COUNTRY = "Country" + CRX = "CRX" + DEBUG_PATH = "Debug Path" + DEBUG_STRING = "Debug String" + DEST_PORT = "Destination Port" + DEVICE_IO = "Device IO" + DOC_FROM_URL = "Document from URL" + DOMAIN = "Domain" + EMAIL_BOUNDARY = "Email Boundary" + EMAIL_ADDRESS = "Email Address" + EMAIL_FROM = "Email Address From" + EMAIL_HEADER_FIELD = "Email Header Field" + EMAIL_HELO = "Email HELO" + EMAIL_MESSAGE_ID = "Email Message ID" + EMAIL_ORIGINATING_IP = "Email Originating IP" + EMAIL_REPLY_TO = "Email Reply-To" + EMAIL_SENDER = "Email Address Sender" + EMAIL_SUBJECT = "Email Subject" + EMAIL_X_MAILER = "Email X-Mailer" + EMAIL_X_ORIGINATING_IP = "Email X-Originating IP" + FILE_CREATED = "File Created" + FILE_DELETED = "File Deleted" + FILE_MOVED = "File Moved" + FILE_NAME = "File Name" + FILE_OPENED = "File Opened" + FILE_PATH = "File Path" + FILE_READ = "File Read" + FILE_WRITTEN = "File Written" + GET_PARAM = "GET Parameter" + HEX_STRING = "HEX String" + HTML_ID = "HTML ID" + HTTP_REQUEST = "HTTP Request" + HTTP_RESP_CODE = "HTTP Response Code" + IMPHASH = "IMPHASH" + IPV4_ADDRESS = "IPv4 Address" + IPV4_SUBNET = "IPv4 Subnet" + IPV6_ADDRESS = "IPv6 Address" + IPV6_SUBNET = "IPv6 Subnet" + LATITUDE = "Latitude" + LAUNCH_AGENT = "Launch Agent" + LOCATION = "Location" + LONGITUDE = "Longitude" + MAC_ADDRESS = "MAC Address" + MALWARE_NAME = "Malware Name" + MD5 = "MD5" + MEMORY_ALLOC = "Memory Alloc" + MEMORY_PROTECT = "Memory Protect" + MEMORY_READ = "Memory Read" + MEMORY_WRITTEN = "Memory Written" + MUTANT_CREATED = "Mutant Created" + MUTEX = "Mutex" + NAME_SERVER = "Name Server" + OTHER_FILE_OP = "Other File Operation" + PASSWORD = "Password" + PASSWORD_SALT = "Password Salt" + PAYLOAD_DATA = "Payload Data" + PAYLOAD_TYPE = "Payload Type" + PIPE = "Pipe" + POST_DATA = "POST Data" + PROCESS_NAME = "Process Name" + PROTOCOL = "Protocol" + REFERER = "Referer" + REFERER_OF_REFERER = "Referer of Referer" + REGISTRAR = "Registrar" + REGISTRY_KEY = "Registry Key" + REG_KEY_CREATED = "Registry Key Created" + REG_KEY_DELETED = "Registry Key Deleted" + REG_KEY_ENUMERATED = "Registry Key Enumerated" + REG_KEY_MONITORED = "Registry Key Monitored" + REG_KEY_OPENED = "Registry Key Opened" + REG_KEY_VALUE_CREATED = "Registry Key Value Created" + REG_KEY_VALUE_DELETED = "Registry Key Value Deleted" + REG_KEY_VALUE_MODIFIED = "Registry Key Value Modified" + REG_KEY_VALUE_QUERIED = "Registry Key Value Queried" + SERVICE_NAME = "Service Name" + SHA1 = "SHA1" + SHA256 = "SHA256" + SMS_ORIGIN = "SMS Origin" + SOURCE_PORT = "Source Port" + SSDEEP = "SSDEEP" + TELEPHONE = "Telephone" + TIME_CREATED = "Time Created" + TIME_UPDATED = "Time Updated" + TRACKING_ID = "Tracking ID" + TS_END = "TS End" + TS_START = "TS Start" + URI = "URI" + USER_AGENT = "User Agent" + USER_ID = "User ID" + VICTIM_IP = "Victim IP" + VOLUME_QUERIED = "Volume Queried" + WEBSTORAGE_KEY = "Webstorage Key" + WEB_PAYLOAD = "Web Payload" + WHOIS_NAME = "WHOIS Name" + WHOIS_ADDR1 = "WHOIS Address 1" + WHOIS_ADDR2 = "WHOIS Address 2" + WHOIS_REGISTRANT_EMAIL_ADDRESS = "WHOIS Registrant Email Address" + WHOIS_TELEPHONE = "WHOIS Telephone" + XPI = "XPI" + + +class IndicatorThreatTypes(): + """ + Vocabulary for Indicator Threat Types. + """ + BAD_ACTOR = "Bad Actor" + COMPROMISED_CREDENTIAL = "Compromised Credential" + COMMAND_EXEC = "Command Exec" + MALICIOUS_AD = "Malicious Ad" + MALICIOUS_CONTENT = "Malicious Content" + MALICIOUS_DOMAIN = "Malicious Domain" + MALICIOUS_INJECT = "Malicious Inject" + MALICIOUS_IP = "Malicious IP" + MALICIOUS_URL = "Malicious URL" + MALICIOUS_URLCHUNK = "Malicious URL Chunk" + MALWARE_ARTIFACTS = "Malware Artifacts" + MALWARE_SAMPLE = "Malware Sample" + MALWARE_VICTIM = "Malware Victim" + PROXY_IP = "Proxy IP" + SINKHOLE_EVENT = "Sinkhole Event" + SMS_SPAM = "SMS Spam" + UNKNOWN = "Unknown" + VICTIM_IP_USAGE = "Victim IP Usage" + WEB_REQUEST = "Web Request" + WHITELIST_DOMAIN = "Whitelist Domain" + WHITELIST_IP = "Whitelist IP" + WHITELIST_URL = "Whitelist URL" + + +class IndicatorAttackTypes(): + """ + Vocabulary for Indicator Attack Types. + """ + ACCESS_TOKEN_THEFT = "Access Token Theft" + BRUTE_FORCE = "Brute Force" + CLICKJACKING = "Clickjacking" + EMAIL_SPAM = "Email Spam" + FAKE_ACCOUNTS = "Fake Accounts" + IP_INFRINGEMENT = "IP Infringement" + MALICIOUS_APP = "Malicious App" + MALWARE = "Malware" + PHISHING = "Phishing" + SELF_XSS = "Self XSS" + SHARE_BAITING = "Share Baiting" + TARGETED = "Targeted" + UNKNOWN = "Unknown" + + +class IndicatorCI(): + """ + Vocabulary for Indicator CI. + """ + UNKNOWN = "unknown" + BENIGN = "benign" + LOW = "low" + MEDIUM = "medium" + HIGH = "high" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..eae0f08 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +requests +pymongo diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..7e53181 --- /dev/null +++ b/setup.py @@ -0,0 +1,114 @@ +"""A setuptools based setup module. + +See: +https://packaging.python.org/en/latest/distributing.html +https://github.com/pypa/sampleproject +""" + +# Always prefer setuptools over distutils +from setuptools import setup +# To use a consistent encoding +from codecs import open +from os import path + +here = path.abspath(path.dirname(__file__)) + +# Get the long description from the README file +with open(path.join(here, 'README.md'), encoding='utf-8') as f: + long_description = f.read() + +setup( + name='critsapi', + + # Versions should comply with PEP440. For a discussion on single-sourcing + # the version across setup.py and the project code, see + # https://packaging.python.org/en/latest/single_source_version.html + version='0.1.0', + + description='Library to interface with the CRITs API and raw MongoDB', + long_description=long_description, + + # The project's main homepage. + url='https://github.com/IntegralDefense/critsapi', + + # Author details + author='Matthew Wilson', + author_email='automationator@runbox.com', + + # Choose your license + license='Apache 2.0', + + # See https://pypi.python.org/pypi?%3Aaction=list_classifiers + classifiers=[ + # How mature is this project? Common values are + # 3 - Alpha + # 4 - Beta + # 5 - Production/Stable + 'Development Status :: 3 - Alpha', + + # Indicate who your project is intended for + 'Intended Audience :: Developers', + 'Topic :: Software Development :: Build Tools', + + # Pick your license as you wish (should match "license" above) + 'License :: OSI Approved :: Apache Software License', + + # Specify the Python versions you support here. In particular, ensure + # that you indicate whether you support Python 2, Python 3 or both. + 'Programming Language :: Python :: 2', + 'Programming Language :: Python :: 2.6', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.3', + 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.5', + ], + + # What does your project relate to? + keywords='critsapi critsdbapi crits', + + # You can just specify the packages manually here if your project is + # simple. Or you can use find_packages(). + packages=['critsapi'], + + # Alternatively, if you want to distribute just a my_module.py, uncomment + # this: + # py_modules=["my_module"], + + # List run-time dependencies here. These will be installed by pip when + # your project is installed. For an analysis of "install_requires" vs pip's + # requirements files see: + # https://packaging.python.org/en/latest/requirements.html + install_requires=['requests', 'pymongo'], + + # List additional groups of dependencies here (e.g. development + # dependencies). You can install these using the following syntax, + # for example: + # $ pip install -e .[dev,test] + extras_require={ + #'dev': ['check-manifest'], + #'test': ['coverage'], + }, + + # If there are data files included in your packages that need to be + # installed, specify them here. If using Python 2.6 or less, then these + # have to be included in MANIFEST.in as well. + package_data={ + #'sample': ['package_data.dat'], + }, + + # Although 'package_data' is the preferred approach, in some case you may + # need to place data files outside of your packages. See: + # http://docs.python.org/3.4/distutils/setupscript.html#installing-additional-files # noqa + # In this case, 'data_file' will be installed into '/my_data' + #data_files=[('my_data', ['data/data_file'])], + + # To provide executable scripts, use entry points in preference to the + # "scripts" keyword. Entry points provide cross-platform support and allow + # pip to create the appropriate form of executable for the target platform. + #entry_points={ + # 'console_scripts': [ + # 'sample=sample:main', + # ], + #}, +)