diff --git a/.github/workflows/security-scan.yml b/.github/workflows/security-scan.yml index ca8a36e..34c81df 100644 --- a/.github/workflows/security-scan.yml +++ b/.github/workflows/security-scan.yml @@ -3,376 +3,350 @@ name: Security Scan on: pull_request: types: [opened, synchronize, reopened] - branches: [main] permissions: - contents: write # Changed from read hi + contents: write pull-requests: write issues: write checks: write - actions: write # Added - security-events: write # Added - discussions: write # Added ss - statuses: write # Added + actions: write + security-events: write + discussions: write + statuses: write jobs: security-scan: runs-on: ubuntu-latest - if: | - github.event_name == 'pull_request' && - (github.event.pull_request.head.repo.full_name == github.repository || - github.event.pull_request.head.repo.fork) - + if: github.event_name == 'pull_request' + steps: - - uses: actions/checkout@v4 - - - name: Set up Python - uses: actions/setup-python@v5 - with: - python-version: '3.10' - - - name: Cache pip packages - uses: actions/cache@v4 - with: - path: ~/.cache/pip - key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }} - restore-keys: | - ${{ runner.os }}-pip- - - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install bandit - - - name: Create amir.py - run: | - cat << 'EOL' > amir.py - import subprocess - import json - import os - import re - import ast - import logging - from typing import List, Dict, Any - import bandit - from bandit.core import manager as bandit_manager - - logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") - - class AdvancedVulnerabilityScanner: - def __init__(self, file_path: str): - self.file_path = file_path - self.vulnerabilities: List[Dict[str, Any]] = [] - self.code_lines: List[str] = [] - self.ast_tree: ast.AST = None - self.vulnerability_db = self.load_vulnerability_db() - - def load_vulnerability_db(self): - return { - "requests": {"2.25.0": ["CVE-2021-12345"]}, - "django": {"2.2.0": ["CVE-2021-67890"]} - } - - def parse_file(self): - logging.info(f"Parsing file: {self.file_path}") - try: - with open(self.file_path, "r", encoding="utf-8") as file: - self.code_lines = file.readlines() - self.ast_tree = ast.parse("".join(self.code_lines)) - logging.info(f"File parsed. Total lines: {len(self.code_lines)}") - except Exception as e: - logging.error(f"Error parsing file {self.file_path}: {str(e)}") - raise - - def run_bandit(self): - try: - b_mgr = bandit_manager.BanditManager(bandit.config.BanditConfig(), agg_type="file") - b_mgr.discover_files([self.file_path]) - b_mgr.run_tests() - return b_mgr.get_issue_list() - except Exception as e: - logging.error(f"Error running Bandit: {str(e)}") - return [] - - def add_vulnerability(self, category: str, description: str, line_number: int, severity: str, confidence: str): - self.vulnerabilities.append({ - "category": category, - "description": description, - "line_number": line_number, - "severity": severity, - "confidence": confidence, - "code_context": self.code_lines[line_number-1].strip() if line_number > 0 else None - }) - if severity == "HIGH" and confidence in ["HIGH", "MEDIUM"]: - logging.warning(f"Critical vulnerability added: {category} at line {line_number}") - else: - logging.info(f"Vulnerability added: {category} at line {line_number}") - - def check_high_risk_sql_injection(self): - sql_patterns = [ - r"(?i)(?:execute|cursor\.execute)\s*\(.*?f[\"''].*?\{.*?\}.*?[\"''].*?\)", - r"(?i)(?:execute|cursor\.execute)\s*\(.*?\+.*?\)", - r"(?i)(?:execute|cursor\.execute)\s*\(.*?%.*?\%.*?\)" - ] - for i, line in enumerate(self.code_lines): - for pattern in sql_patterns: - if re.search(pattern, line): - self.add_vulnerability( - "SQL Injection", - f"Critical: SQL injection vulnerability detected", - i+1, - "HIGH", - "HIGH" - ) - - def check_hardcoded_secrets(self): - secret_patterns = [ - r"(?i)(password|secret|api_key|token)\s*=\s*[\"''][^\"'']+[\"''](?!\s*{\s*key\s*[=:]\s*|os\.environ)", - r"(?i)auth_token\s*=\s*[\"''][0-9a-zA-Z]+[\"'']", - r"(?i)api_key\s*=\s*[\"''][0-9a-zA-Z]+[\"'']" - ] - - safe_patterns = [ - r"key=\"[\w_-]+\"", - r"os\.environ\.get", - r"load_dotenv", - r"SECRET_KEY\s*=\s*config\.", - r"test|example|dummy|sample" - ] - - for i, line in enumerate(self.code_lines): - if any(re.search(safe_pat, line, re.IGNORECASE) for safe_pat in safe_patterns): - continue - - for pattern in secret_patterns: - if match := re.search(pattern, line): - self.add_vulnerability( - "Hardcoded Secret", - f"Critical: Hardcoded secret detected", - i+1, - "HIGH", - "HIGH" - ) - - def check_command_injection(self): - dangerous_patterns = [ - r"subprocess\.(?:call|run|Popen)\s*\(.*?\+.*?\)", - r"os\.system\s*\(.*?\+.*?\)", - r"subprocess\.(?:call|run|Popen)\s*\(.*?format.*?\)", - r"subprocess\.(?:call|run|Popen)\s*\(.*?f[\"''].*?\{.*?\}.*?[\"''].*?\)" - ] - - safe_patterns = [ - r"subprocess\.run\(\[[^\]]+\],\s*check=True\)", - r"subprocess\.run\(\[[^\]]+\],\s*shell=False\)" - ] - - for i, line in enumerate(self.code_lines): - if any(re.search(safe_pat, line) for safe_pat in safe_patterns): - continue - - for pattern in dangerous_patterns: - if re.search(pattern, line): - self.add_vulnerability( - "Command Injection", - f"Critical: Command injection vulnerability detected", - i+1, - "HIGH", - "HIGH" - ) - - def check_dangerous_deserialization(self): - dangerous_patterns = [ - (r"pickle\.loads?\(.*?\)", "Unsafe pickle deserialization"), - (r"yaml\.load\((?![^)]*Loader=yaml\.SafeLoader)", "Unsafe YAML loading"), - (r"eval\(.*?\)", "Dangerous eval() usage") - ] - - for i, line in enumerate(self.code_lines): - for pattern, message in dangerous_patterns: - if re.search(pattern, line): - self.add_vulnerability( - "Dangerous Deserialization", - f"Critical: {message}", - i+1, - "HIGH", - "HIGH" - ) - - def check_path_traversal(self): - dangerous_patterns = [ - (r"open\s*\([^)]*[\"''][.][.][\/\\]", "Path traversal vulnerability"), - (r"open\s*\([^)]*\+", "Dynamic file path manipulation"), - (r"os\.path\.join\s*\([^)]*\+", "Dynamic path joining") - ] - - for i, line in enumerate(self.code_lines): - for pattern, message in dangerous_patterns: - if re.search(pattern, line): - self.add_vulnerability( - "Path Traversal", - f"Critical: {message}", - i+1, - "HIGH", - "HIGH" - ) - - def check_high_risk_ssrf(self): - ssrf_patterns = [ - r"requests\.(get|post|put|delete)\s*\([^)]*\+", - r"urllib\.request\.urlopen\s*\([^)]*\+", - r"http\.client\.HTTPConnection\s*\([^)]*\+" - ] - - for i, line in enumerate(self.code_lines): - for pattern in ssrf_patterns: - if re.search(pattern, line): - self.add_vulnerability( - "SSRF", - f"Critical: Server-Side Request Forgery vulnerability", - i+1, - "HIGH", - "HIGH" - ) - - def analyze(self): - try: - self.parse_file() - self.check_high_risk_sql_injection() - self.check_hardcoded_secrets() - self.check_dangerous_deserialization() - self.check_command_injection() - self.check_path_traversal() - self.check_high_risk_ssrf() - - bandit_issues = self.run_bandit() - for issue in bandit_issues: - if issue.severity.lower() == "high": - self.add_vulnerability( - f"Critical Security Issue ({issue.test_id})", - issue.text, - issue.lineno, - "HIGH", - issue.confidence - ) - - logging.info("Security analysis completed successfully") - except Exception as e: - logging.error(f"An error occurred during analysis: {str(e)}") - raise - - def generate_report(self): - report = f"\nšŸ”’ Security Scan Results for {self.file_path} šŸ”’\n" - report += "=" * 50 + "\n" - report += f"Lines of Code Analyzed: {len(self.code_lines)}\n\n" - - high_risk_vulns = [v for v in self.vulnerabilities - if v["severity"] == "HIGH" and v["confidence"] in ["HIGH", "MEDIUM"]] - - if high_risk_vulns: - report += f"šŸšØ Found {len(high_risk_vulns)} Critical Security Issues!\n\n" - for vuln in high_risk_vulns: - report += f"CRITICAL: {vuln['category']}\n" - report += f"Description: {vuln['description']}\n" - report += f"Location: Line {vuln['line_number']}\n" - if vuln.get("code_context"): - report += f"Code: {vuln['code_context']}\n" - report += f"Confidence: {vuln['confidence']}\n" - report += "-" * 40 + "\n" - else: - report += "āœ… No critical security issues detected.\n" - - return report - - def scan_file_or_directory(path): - if os.path.isfile(path): - scanner = AdvancedVulnerabilityScanner(path) - scanner.analyze() - return scanner.generate_report() - elif os.path.isdir(path): - full_report = "" - for root, dirs, files in os.walk(path): - for file in files: - if file.endswith(".py"): - file_path = os.path.join(root, file) - scanner = AdvancedVulnerabilityScanner(file_path) - scanner.analyze() - full_report += scanner.generate_report() + "\n\n" - return full_report - else: - return f"Error: {path} is not a valid file or directory." - - def main(): - path = "." - report = scan_file_or_directory(path) - with open("security-scan-results.txt", "w") as f: - f.write(report) - - if __name__ == "__main__": - main() - EOL - - - name: Run security scan - run: python amir.py - continue-on-error: true - - - name: Check for scan results - id: check_results - run: | - if [ -f security-scan-results.txt ]; then - echo "file_exists=true" >> $GITHUB_OUTPUT - if grep -q "Critical Security Issues!" security-scan-results.txt; then - echo "vulnerabilities_found=true" >> $GITHUB_OUTPUT - echo "::warning::Critical security vulnerabilities detected" - else - echo "vulnerabilities_found=false" >> $GITHUB_OUTPUT - fi + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.10' + + - name: Cache pip packages + uses: actions/cache@v4 + with: + path: ~/.cache/pip + key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }} + restore-keys: | + ${{ runner.os }}-pip- + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install bandit + + - name: Create amir.py + run: | + cat << 'EOL' > amir.py + import subprocess + import json + import os + import re + import ast + import logging + from typing import List, Dict, Any + import bandit + from bandit.core import manager as bandit_manager + + logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + + class AdvancedVulnerabilityScanner: + def __init__(self, file_path: str): + self.file_path = file_path + self.vulnerabilities: List[Dict[str, Any]] = [] + self.code_lines: List[str] = [] + self.ast_tree: ast.AST = None + self.vulnerability_db = self.load_vulnerability_db() + + def load_vulnerability_db(self): + return { + "requests": {"2.25.0": ["CVE-2021-12345"]}, + "django": {"2.2.0": ["CVE-2021-67890"]} + } + + def parse_file(self): + logging.info(f"Parsing file: {self.file_path}") + try: + with open(self.file_path, "r", encoding="utf-8") as file: + self.code_lines = file.readlines() + self.ast_tree = ast.parse("".join(self.code_lines)) + logging.info(f"File parsed. Total lines: {len(self.code_lines)}") + except Exception as e: + logging.error(f"Error parsing file {self.file_path}: {str(e)}") + raise + + def run_bandit(self): + try: + b_mgr = bandit_manager.BanditManager(bandit.config.BanditConfig(), agg_type="file") + b_mgr.discover_files([self.file_path]) + b_mgr.run_tests() + return b_mgr.get_issue_list() + except Exception as e: + logging.error(f"Error running Bandit: {str(e)}") + return [] + + def add_vulnerability(self, category: str, description: str, line_number: int, severity: str, confidence: str): + self.vulnerabilities.append({ + "category": category, + "description": description, + "line_number": line_number, + "severity": severity, + "confidence": confidence, + "code_context": self.code_lines[line_number-1].strip() if line_number > 0 else None + }) + if severity == "HIGH" and confidence in ["HIGH", "MEDIUM"]: + logging.warning(f"Critical vulnerability added: {category} at line {line_number}") + else: + logging.info(f"Vulnerability added: {category} at line {line_number}") + + def check_high_risk_sql_injection(self): + sql_patterns = [ + r"(?i)(?:execute|cursor\.execute)\s*\(.*?f[\"''].*?\{.*?\}.*?[\"''].*?\)", + r"(?i)(?:execute|cursor\.execute)\s*\(.*?\+.*?\)", + r"(?i)(?:execute|cursor\.execute)\s*\(.*?%.*?\%.*?\)" + ] + for i, line in enumerate(self.code_lines): + for pattern in sql_patterns: + if re.search(pattern, line): + self.add_vulnerability( + "SQL Injection", + f"Critical: SQL injection vulnerability detected", + i+1, + "HIGH", + "HIGH" + ) + + def check_hardcoded_secrets(self): + secret_patterns = [ + r"(?i)(password|secret|api_key|token)\s*=\s*[\"''][^\"'']+[\"''](?!\s*{\s*key\s*[=:]\s*|os\.environ)", + r"(?i)auth_token\s*=\s*[\"''][0-9a-zA-Z]+[\"'']", + r"(?i)api_key\s*=\s*[\"''][0-9a-zA-Z]+[\"'']" + ] + safe_patterns = [ + r"key=\"[\w_-]+\"", + r"os\.environ\.get", + r"load_dotenv", + r"SECRET_KEY\s*=\s*config\.", + r"test|example|dummy|sample" + ] + for i, line in enumerate(self.code_lines): + if any(re.search(safe_pat, line, re.IGNORECASE) for safe_pat in safe_patterns): + continue + for pattern in secret_patterns: + if match := re.search(pattern, line): + self.add_vulnerability( + "Hardcoded Secret", + f"Critical: Hardcoded secret detected", + i+1, + "HIGH", + "HIGH" + ) + + def check_command_injection(self): + dangerous_patterns = [ + r"subprocess\.(?:call|run|Popen)\s*\(.*?\+.*?\)", + r"os\.system\s*\(.*?\+.*?\)", + r"subprocess\.(?:call|run|Popen)\s*\(.*?format.*?\)", + r"subprocess\.(?:call|run|Popen)\s*\(.*?f[\"''].*?\{.*?\}.*?[\"''].*?\)" + ] + safe_patterns = [ + r"subprocess\.run\(\[[^\]]+\],\s*check=True\)", + r"subprocess\.run\(\[[^\]]+\],\s*shell=False\)" + ] + for i, line in enumerate(self.code_lines): + if any(re.search(safe_pat, line) for safe_pat in safe_patterns): + continue + for pattern in dangerous_patterns: + if re.search(pattern, line): + self.add_vulnerability( + "Command Injection", + f"Critical: Command injection vulnerability detected", + i+1, + "HIGH", + "HIGH" + ) + + def check_dangerous_deserialization(self): + dangerous_patterns = [ + (r"pickle\.loads?\(.*?\)", "Unsafe pickle deserialization"), + (r"yaml\.load\((?![^)]*Loader=yaml\.SafeLoader)", "Unsafe YAML loading"), + (r"eval\(.*?\)", "Dangerous eval() usage") + ] + for i, line in enumerate(self.code_lines): + for pattern, message in dangerous_patterns: + if re.search(pattern, line): + self.add_vulnerability( + "Dangerous Deserialization", + f"Critical: {message}", + i+1, + "HIGH", + "HIGH" + ) + + def check_path_traversal(self): + dangerous_patterns = [ + (r"open\s*\([^)]*[\"''][.][.][\/\\]", "Path traversal vulnerability"), + (r"open\s*\([^)]*\+", "Dynamic file path manipulation"), + (r"os\.path\.join\s*\([^)]*\+", "Dynamic path joining") + ] + for i, line in enumerate(self.code_lines): + for pattern, message in dangerous_patterns: + if re.search(pattern, line): + self.add_vulnerability( + "Path Traversal", + f"Critical: {message}", + i+1, + "HIGH", + "HIGH" + ) + + def check_high_risk_ssrf(self): + ssrf_patterns = [ + r"requests\.(get|post|put|delete)\s*\([^)]*\+", + r"urllib\.request\.urlopen\s*\([^)]*\+", + r"http\.client\.HTTPConnection\s*\([^)]*\+" + ] + for i, line in enumerate(self.code_lines): + for pattern in ssrf_patterns: + if re.search(pattern, line): + self.add_vulnerability( + "SSRF", + f"Critical: Server-Side Request Forgery vulnerability", + i+1, + "HIGH", + "HIGH" + ) + + def analyze(self): + try: + self.parse_file() + self.check_high_risk_sql_injection() + self.check_hardcoded_secrets() + self.check_dangerous_deserialization() + self.check_command_injection() + self.check_path_traversal() + self.check_high_risk_ssrf() + + bandit_issues = self.run_bandit() + for issue in bandit_issues: + if issue.severity.lower() == "high": + self.add_vulnerability( + f"Critical Security Issue ({issue.test_id})", + issue.text, + issue.lineno, + "HIGH", + issue.confidence + ) + logging.info("Security analysis completed successfully") + except Exception as e: + logging.error(f"An error occurred during analysis: {str(e)}") + raise + + def generate_report(self): + report = f"\nšŸ”’ Security Scan Results for {self.file_path} šŸ”’\n" + report += "=" * 50 + "\n" + report += f"Lines of Code Analyzed: {len(self.code_lines)}\n\n" + high_risk_vulns = [v for v in self.vulnerabilities + if v["severity"] == "HIGH" and v["confidence"] in ["HIGH", "MEDIUM"]] + if high_risk_vulns: + report += f"šŸšØ Found {len(high_risk_vulns)} Critical Security Issues!\n\n" + for vuln in high_risk_vulns: + report += f"CRITICAL: {vuln['category']}\n" + report += f"Description: {vuln['description']}\n" + report += f"Location: Line {vuln['line_number']}\n" + if vuln.get("code_context"): + report += f"Code: {vuln['code_context']}\n" + report += f"Confidence: {vuln['confidence']}\n" + report += "-" * 40 + "\n" + else: + report += "āœ… No critical security issues detected.\n" + return report + + def scan_file_or_directory(path): + if os.path.isfile(path): + scanner = AdvancedVulnerabilityScanner(path) + scanner.analyze() + return scanner.generate_report() + elif os.path.isdir(path): + full_report = "" + for root, dirs, files in os.walk(path): + for file in files: + if file.endswith(".py"): + file_path = os.path.join(root, file) + scanner = AdvancedVulnerabilityScanner(file_path) + scanner.analyze() + full_report += scanner.generate_report() + "\n\n" + return full_report + else: + return f"Error: {path} is not a valid file or directory." + + def main(): + path = "." + report = scan_file_or_directory(path) + with open("security-scan-results.txt", "w") as f: + f.write(report) + + if __name__ == "__main__": + main() + EOL + + - name: Run security scan + run: python amir.py + continue-on-error: true + + - name: Check for scan results + id: check_results + run: | + if [ -f security-scan-results.txt ]; then + echo "file_exists=true" >> $GITHUB_OUTPUT + if grep -q "Critical Security Issues!" security-scan-results.txt; then + echo "vulnerabilities_found=true" >> $GITHUB_OUTPUT + echo "::warning::Critical security vulnerabilities detected" else - echo "file_exists=false" >> $GITHUB_OUTPUT - echo "::error::Security scan failed to generate results" + echo "vulnerabilities_found=false" >> $GITHUB_OUTPUT fi - - - name: Upload scan results - uses: actions/upload-artifact@v4 - with: - name: security-scan-results - path: security-scan-results.txt - retention-days: 90 - - - name: Comment PR - uses: actions/github-script@v7 - if: always() - with: - repo-token: ${{ secrets.GITHUB_TOKEN }} - script: | - const fs = require('fs') - let comment = '## Security Scan Results\n\n' - - if ('${{ steps.check_results.outputs.file_exists }}' === 'true') { - const scanResults = fs.readFileSync('security-scan-results.txt', 'utf8') - comment += '```\n' + scanResults + '\n```\n\n' - - if ('${{ steps.check_results.outputs.vulnerabilities_found }}' === 'true') { - comment += 'ā›” **Critical vulnerabilities detected. Please review and address these security issues before merging.**\n\n' - comment += '### Next Steps:\n' - comment += '1. Review each critical finding above and fix them according to OWASP top 10 mitigations.\n' - } else { - comment += 'āœ… **No critical security issues detected.**\n\n' - comment += 'The code has passed all critical security checks.' - } - } else { - comment += 'āš ļø **Error: The security scan failed to complete. Please review the workflow logs for more information.**' - } - - await github.rest.issues.createComment({ - issue_number: context.issue.number, - owner: context.repo.owner, - repo: context.repo.repo, - body: comment - }) - - - name: Fail if critical vulnerabilities found - if: steps.check_results.outputs.vulnerabilities_found == 'true' - run: | - echo "::error::Critical security vulnerabilities were detected. Please review the findings and address them before merging." - exit 1 + else + echo "file_exists=false" >> $GITHUB_OUTPUT + echo "::error::Security scan failed to generate results" + fi + + - name: Upload scan results + uses: actions/upload-artifact@v4 + with: + name: security-scan-results + path: security-scan-results.txt + retention-days: 90 + + - name: Comment PR + if: always() + run: | + COMMENT_BODY=$(cat security-scan-results.txt) + + if [ "${{ steps.check_results.outputs.vulnerabilities_found }}" == "true" ]; then + STATUS="ā›” **Critical vulnerabilities detected**" + NEXT_STEPS=$'\n### Next Steps:\n1. Review each critical finding above and fix them according to OWASP top 10 mitigations.' + else + STATUS="āœ… **No critical security issues detected**" + NEXT_STEPS="" + fi + + curl -X POST \ + -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ + -H "Accept: application/vnd.github.v3+json" \ + "https://api.github.com/repos/${{ github.repository }}/issues/${{ github.event.pull_request.number }}/comments" \ + -d @- << EOF + { + "body": "## Security Scan Results\n\n\`\`\`\n${COMMENT_BODY}\n\`\`\`\n\n${STATUS}${NEXT_STEPS}" + } + EOF + + - name: Fail if critical vulnerabilities found + if: steps.check_results.outputs.vulnerabilities_found == 'true' + run: | + echo "::error::Critical security vulnerabilities were detected. Please review the findings and address them before merging." + exit 1