Fix/888 #92
Workflow file for this run
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name: Security Scan | |
on: | |
pull_request: | |
types: [opened, synchronize, reopened] | |
permissions: | |
contents: write # Changed from read hi | |
pull-requests: write | |
issues: write | |
checks: write | |
actions: write # Added | |
security-events: write # Added | |
discussions: write # Added sssadsada | |
statuses: write # Added | |
jobs: | |
security-scan: | |
runs-on: ubuntu-latest | |
steps: | |
- uses: actions/checkout@v4 | |
- name: Set up Python | |
uses: actions/setup-python@v5 | |
with: | |
python-version: '3.10' | |
- name: Cache pip packages | |
uses: actions/cache@v4 | |
with: | |
path: ~/.cache/pip | |
key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }} | |
restore-keys: | | |
${{ runner.os }}-pip- | |
- name: Install dependencies | |
run: | | |
python -m pip install --upgrade pip | |
pip install bandit | |
- name: Create amir.py | |
run: | | |
cat << 'EOL' > amir.py | |
import subprocess | |
import json | |
import os | |
import re | |
import ast | |
import logging | |
from typing import List, Dict, Any | |
import bandit | |
from bandit.core import manager as bandit_manager | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
class AdvancedVulnerabilityScanner: | |
def __init__(self, file_path: str): | |
self.file_path = file_path | |
self.vulnerabilities: List[Dict[str, Any]] = [] | |
self.code_lines: List[str] = [] | |
self.ast_tree: ast.AST = None | |
self.vulnerability_db = self.load_vulnerability_db() | |
def load_vulnerability_db(self): | |
return { | |
"requests": {"2.25.0": ["CVE-2021-12345"]}, | |
"django": {"2.2.0": ["CVE-2021-67890"]} | |
} | |
def parse_file(self): | |
logging.info(f"Parsing file: {self.file_path}") | |
try: | |
with open(self.file_path, "r", encoding="utf-8") as file: | |
self.code_lines = file.readlines() | |
self.ast_tree = ast.parse("".join(self.code_lines)) | |
logging.info(f"File parsed. Total lines: {len(self.code_lines)}") | |
except Exception as e: | |
logging.error(f"Error parsing file {self.file_path}: {str(e)}") | |
raise | |
def run_bandit(self): | |
try: | |
b_mgr = bandit_manager.BanditManager(bandit.config.BanditConfig(), agg_type="file") | |
b_mgr.discover_files([self.file_path]) | |
b_mgr.run_tests() | |
return b_mgr.get_issue_list() | |
except Exception as e: | |
logging.error(f"Error running Bandit: {str(e)}") | |
return [] | |
def add_vulnerability(self, category: str, description: str, line_number: int, severity: str, confidence: str): | |
self.vulnerabilities.append({ | |
"category": category, | |
"description": description, | |
"line_number": line_number, | |
"severity": severity, | |
"confidence": confidence, | |
"code_context": self.code_lines[line_number-1].strip() if line_number > 0 else None | |
}) | |
if severity == "HIGH" and confidence in ["HIGH", "MEDIUM"]: | |
logging.warning(f"Critical vulnerability added: {category} at line {line_number}") | |
else: | |
logging.info(f"Vulnerability added: {category} at line {line_number}") | |
def check_high_risk_sql_injection(self): | |
sql_patterns = [ | |
r"(?i)(?:execute|cursor\.execute)\s*\(.*?f[\"''].*?\{.*?\}.*?[\"''].*?\)", | |
r"(?i)(?:execute|cursor\.execute)\s*\(.*?\+.*?\)", | |
r"(?i)(?:execute|cursor\.execute)\s*\(.*?%.*?\%.*?\)" | |
] | |
for i, line in enumerate(self.code_lines): | |
for pattern in sql_patterns: | |
if re.search(pattern, line): | |
self.add_vulnerability( | |
"SQL Injection", | |
f"Critical: SQL injection vulnerability detected", | |
i+1, | |
"HIGH", | |
"HIGH" | |
) | |
def check_hardcoded_secrets(self): | |
secret_patterns = [ | |
r"(?i)(password|secret|api_key|token)\s*=\s*[\"''][^\"'']+[\"''](?!\s*{\s*key\s*[=:]\s*|os\.environ)", | |
r"(?i)auth_token\s*=\s*[\"''][0-9a-zA-Z]+[\"'']", | |
r"(?i)api_key\s*=\s*[\"''][0-9a-zA-Z]+[\"'']" | |
] | |
safe_patterns = [ | |
r"key=\"[\w_-]+\"", | |
r"os\.environ\.get", | |
r"load_dotenv", | |
r"SECRET_KEY\s*=\s*config\.", | |
r"test|example|dummy|sample" | |
] | |
for i, line in enumerate(self.code_lines): | |
if any(re.search(safe_pat, line, re.IGNORECASE) for safe_pat in safe_patterns): | |
continue | |
for pattern in secret_patterns: | |
if match := re.search(pattern, line): | |
self.add_vulnerability( | |
"Hardcoded Secret", | |
f"Critical: Hardcoded secret detected", | |
i+1, | |
"HIGH", | |
"HIGH" | |
) | |
def check_command_injection(self): | |
dangerous_patterns = [ | |
r"subprocess\.(?:call|run|Popen)\s*\(.*?\+.*?\)", | |
r"os\.system\s*\(.*?\+.*?\)", | |
r"subprocess\.(?:call|run|Popen)\s*\(.*?format.*?\)", | |
r"subprocess\.(?:call|run|Popen)\s*\(.*?f[\"''].*?\{.*?\}.*?[\"''].*?\)" | |
] | |
safe_patterns = [ | |
r"subprocess\.run\(\[[^\]]+\],\s*check=True\)", | |
r"subprocess\.run\(\[[^\]]+\],\s*shell=False\)" | |
] | |
for i, line in enumerate(self.code_lines): | |
if any(re.search(safe_pat, line) for safe_pat in safe_patterns): | |
continue | |
for pattern in dangerous_patterns: | |
if re.search(pattern, line): | |
self.add_vulnerability( | |
"Command Injection", | |
f"Critical: Command injection vulnerability detected", | |
i+1, | |
"HIGH", | |
"HIGH" | |
) | |
def check_dangerous_deserialization(self): | |
dangerous_patterns = [ | |
(r"pickle\.loads?\(.*?\)", "Unsafe pickle deserialization"), | |
(r"yaml\.load\((?![^)]*Loader=yaml\.SafeLoader)", "Unsafe YAML loading"), | |
(r"eval\(.*?\)", "Dangerous eval() usage") | |
] | |
for i, line in enumerate(self.code_lines): | |
for pattern, message in dangerous_patterns: | |
if re.search(pattern, line): | |
self.add_vulnerability( | |
"Dangerous Deserialization", | |
f"Critical: {message}", | |
i+1, | |
"HIGH", | |
"HIGH" | |
) | |
def check_path_traversal(self): | |
dangerous_patterns = [ | |
(r"open\s*\([^)]*[\"''][.][.][\/\\]", "Path traversal vulnerability"), | |
(r"open\s*\([^)]*\+", "Dynamic file path manipulation"), | |
(r"os\.path\.join\s*\([^)]*\+", "Dynamic path joining") | |
] | |
for i, line in enumerate(self.code_lines): | |
for pattern, message in dangerous_patterns: | |
if re.search(pattern, line): | |
self.add_vulnerability( | |
"Path Traversal", | |
f"Critical: {message}", | |
i+1, | |
"HIGH", | |
"HIGH" | |
) | |
def check_high_risk_ssrf(self): | |
ssrf_patterns = [ | |
r"requests\.(get|post|put|delete)\s*\([^)]*\+", | |
r"urllib\.request\.urlopen\s*\([^)]*\+", | |
r"http\.client\.HTTPConnection\s*\([^)]*\+" | |
] | |
for i, line in enumerate(self.code_lines): | |
for pattern in ssrf_patterns: | |
if re.search(pattern, line): | |
self.add_vulnerability( | |
"SSRF", | |
f"Critical: Server-Side Request Forgery vulnerability", | |
i+1, | |
"HIGH", | |
"HIGH" | |
) | |
def analyze(self): | |
try: | |
self.parse_file() | |
self.check_high_risk_sql_injection() | |
self.check_hardcoded_secrets() | |
self.check_dangerous_deserialization() | |
self.check_command_injection() | |
self.check_path_traversal() | |
self.check_high_risk_ssrf() | |
bandit_issues = self.run_bandit() | |
for issue in bandit_issues: | |
if issue.severity.lower() == "high": | |
self.add_vulnerability( | |
f"Critical Security Issue ({issue.test_id})", | |
issue.text, | |
issue.lineno, | |
"HIGH", | |
issue.confidence | |
) | |
logging.info("Security analysis completed successfully") | |
except Exception as e: | |
logging.error(f"An error occurred during analysis: {str(e)}") | |
raise | |
def generate_report(self): | |
report = f"\n🔒 Security Scan Results for {self.file_path} 🔒\n" | |
report += "=" * 50 + "\n" | |
report += f"Lines of Code Analyzed: {len(self.code_lines)}\n\n" | |
high_risk_vulns = [v for v in self.vulnerabilities | |
if v["severity"] == "HIGH" and v["confidence"] in ["HIGH", "MEDIUM"]] | |
if high_risk_vulns: | |
report += f"🚨 Found {len(high_risk_vulns)} Critical Security Issues!\n\n" | |
for vuln in high_risk_vulns: | |
report += f"CRITICAL: {vuln['category']}\n" | |
report += f"Description: {vuln['description']}\n" | |
report += f"Location: Line {vuln['line_number']}\n" | |
if vuln.get("code_context"): | |
report += f"Code: {vuln['code_context']}\n" | |
report += f"Confidence: {vuln['confidence']}\n" | |
report += "-" * 40 + "\n" | |
else: | |
report += "✅ No critical security issues detected.\n" | |
return report | |
def scan_file_or_directory(path): | |
if os.path.isfile(path): | |
scanner = AdvancedVulnerabilityScanner(path) | |
scanner.analyze() | |
return scanner.generate_report() | |
elif os.path.isdir(path): | |
full_report = "" | |
for root, dirs, files in os.walk(path): | |
for file in files: | |
if file.endswith(".py"): | |
file_path = os.path.join(root, file) | |
scanner = AdvancedVulnerabilityScanner(file_path) | |
scanner.analyze() | |
full_report += scanner.generate_report() + "\n\n" | |
return full_report | |
else: | |
return f"Error: {path} is not a valid file or directory." | |
def main(): | |
path = "." | |
report = scan_file_or_directory(path) | |
with open("security-scan-results.txt", "w") as f: | |
f.write(report) | |
if __name__ == "__main__": | |
main() | |
EOL | |
- name: Run security scan | |
run: python amir.py | |
continue-on-error: true | |
- name: Check for scan results | |
id: check_results | |
run: | | |
if [ -f security-scan-results.txt ]; then | |
echo "file_exists=true" >> $GITHUB_OUTPUT | |
if grep -q "Critical Security Issues!" security-scan-results.txt; then | |
echo "vulnerabilities_found=true" >> $GITHUB_OUTPUT | |
echo "::warning::Critical security vulnerabilities detected" | |
else | |
echo "vulnerabilities_found=false" >> $GITHUB_OUTPUT | |
fi | |
else | |
echo "file_exists=false" >> $GITHUB_OUTPUT | |
echo "::error::Security scan failed to generate results" | |
fi | |
- name: Upload scan results | |
uses: actions/upload-artifact@v4 | |
with: | |
name: security-scan-results | |
path: security-scan-results.txt | |
retention-days: 90 | |
- name: Comment PR | |
uses: actions/github-script@v7 | |
if: always() | |
with: | |
github-token: ${{ secrets.PAT_TOKEN }} | |
script: | | |
const fs = require('fs') | |
let comment = '## Security Scan Results\n\n' | |
if ('${{ steps.check_results.outputs.file_exists }}' === 'true') { | |
const scanResults = fs.readFileSync('security-scan-results.txt', 'utf8') | |
comment += '```\n' + scanResults + '\n```\n\n' | |
if ('${{ steps.check_results.outputs.vulnerabilities_found }}' === 'true') { | |
comment += '⛔ **Critical vulnerabilities detected. Please review and address these security issues before merging.**\n\n' | |
comment += '### Next Steps:\n' | |
comment += '1. Review each critical finding above and fix them according to OWASP top 10 mitigations.\n' | |
} else { | |
comment += '✅ **No critical security issues detected.**\n\n' | |
comment += 'The code has passed all critical security checks.' | |
} | |
} else { | |
comment += '⚠️ **Error: The security scan failed to complete. Please review the workflow logs for more information.**' | |
} | |
await github.rest.issues.createComment({ | |
issue_number: context.issue.number, | |
owner: context.repo.owner, | |
repo: context.repo.repo, | |
body: comment | |
}) | |
- name: Fail if critical vulnerabilities found | |
if: steps.check_results.outputs.vulnerabilities_found == 'true' | |
run: | | |
echo "::error::Critical security vulnerabilities were detected. Please review the findings and address them before merging." | |
exit 1 |