Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update mfa_bombing_tester.py #2

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions mfa_bombing_tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,22 @@
from pathlib import Path
from time import time
from typing import List
import argparse
import os

import yaml
from fire import Fire
from okta import models
from okta.client import Client as OktaClient
import okta.api_response

class MFAChallenger:
def __init__(self):
with Path("config.yaml").open() as config_file:
config = yaml.safe_load(config_file)

def __init__(self, okta_domain: str, okta_token: str):
# Now the Okta domain and token come from command-line arguments
print(f"Running MFA bombing tests on org {okta_domain}. "
f"Searching for users with push notification factor configured.")
okta_config = {
'orgUrl': config['okta_domain'],
'token': config['okta_token'],
'orgUrl': okta_domain,
'token': okta_token,
}
print(f"Running MFA bombing tests on org {config['okta_domain']}."
f" Searching for users with push notification factor configured.")
self.okta_client = OktaClient(okta_config)

async def challenge_user(self, user: models.User):
Expand All @@ -47,20 +45,18 @@ async def challenge_user(self, user: models.User):
)
time_passed = time()-start_time
if round(time_passed) % 20 == 0:
print(f"Still waiting for response or timeout from user {user.profile.email}"
f" (waited {round(time_passed)} seconds)")
print(f"Still waiting for response or timeout from user {user.profile.email} "
f"(waited {round(time_passed)} seconds)")
polling_factor_response: models.VerifyUserFactorResponse
if polling_factor_response.factor_result != 'WAITING':
if polling_factor_response.factor_result == 'SUCCESS':
print(f"ALERT: User {user.profile.email} with id {user.id} approved the push notification")
return "User approved the challenge"
else:
# print(f"User {user.profile.email} didn't approve the push notification")
return "User rejected the challenge"

await sleep(1)


async def search_and_challenge_users(self):
users, resp, err = await self._get_users()
tasks = [asyncio.create_task(self.challenge_user(user)) for user in users]
Expand All @@ -81,8 +77,8 @@ async def _get_users(self):

return all_users, resp, err

def mfa_bombing_tester(output_file_path="report.csv"):
mfa_challenger = MFAChallenger()
def mfa_bombing_tester(okta_domain: str, okta_token: str, output_file_path="report.csv"):
mfa_challenger = MFAChallenger(okta_domain, okta_token)
users, results = asyncio.run(mfa_challenger.search_and_challenge_users())
with Path(output_file_path).open('w') as f:
f.write("user_id,user_email,result\n")
Expand All @@ -92,4 +88,14 @@ def mfa_bombing_tester(output_file_path="report.csv"):


if __name__ == '__main__':
Fire(mfa_bombing_tester)
# Set up argument parser to get values from command line
parser = argparse.ArgumentParser(description="MFA Bombing Tester")
parser.add_argument('--okta_domain', required=True, help="Okta Domain URL (e.g., https://your-org.okta.com)")
parser.add_argument('--okta_token', required=True, help="Okta API Token")
parser.add_argument('--output_file', default="report.csv", help="Path to save the output report (default: report.csv)")

# Parse the command line arguments
args = parser.parse_args()

# Call the function with the command-line arguments
mfa_bombing_tester(args.okta_domain, args.okta_token, args.output_file)