From b96aef7b8cefe5a59c6eb08ee719a8ad477170cf Mon Sep 17 00:00:00 2001 From: AmirZandiehprojects Date: Sat, 16 Nov 2024 17:19:14 +1100 Subject: [PATCH 1/2] fix: update security scan workflow with proper permissions and latest action versions --- .github/workflows/security-scan.yml | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/.github/workflows/security-scan.yml b/.github/workflows/security-scan.yml index 741dc3f..5400a91 100644 --- a/.github/workflows/security-scan.yml +++ b/.github/workflows/security-scan.yml @@ -4,23 +4,26 @@ on: pull_request: types: [opened, synchronize, reopened] +# Add top-level permissions block permissions: contents: read pull-requests: write + issues: write + checks: write jobs: security-scan: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - + - uses: actions/checkout@v4 + - name: Set up Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: '3.10' - name: Cache pip packages - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.cache/pip key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }} @@ -274,13 +277,14 @@ jobs: fi - name: Upload scan results - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: security-scan-results path: security-scan-results.txt + retention-days: 90 - name: Comment PR - uses: actions/github-script@v6 + uses: actions/github-script@v7 if: always() with: github-token: ${{secrets.GITHUB_TOKEN}} @@ -301,7 +305,7 @@ jobs: comment += 'āš ļø **Error: The security scan failed to complete. Please review the workflow logs for more information.**' } - github.rest.issues.createComment({ + await github.rest.issues.createComment({ issue_number: context.issue.number, owner: context.repo.owner, repo: context.repo.repo, From b3aea50c2ee419975e81765f8a7582332c7c8bbe Mon Sep 17 00:00:00 2001 From: AmirZandiehprojects Date: Sat, 16 Nov 2024 17:40:43 +1100 Subject: [PATCH 2/2] fix: improve security scanner to reduce false positives and enhance reporting --- .github/workflows/security-scan.yml | 279 ++++++++++++++++------------ 1 file changed, 161 insertions(+), 118 deletions(-) diff --git a/.github/workflows/security-scan.yml b/.github/workflows/security-scan.yml index 5400a91..a2057b6 100644 --- a/.github/workflows/security-scan.yml +++ b/.github/workflows/security-scan.yml @@ -37,7 +37,7 @@ jobs: - name: Create amir.py run: | - cat << EOF > amir.py +cat << EOF > amir.py import subprocess import json import os @@ -59,7 +59,6 @@ jobs: self.vulnerability_db = self.load_vulnerability_db() def load_vulnerability_db(self): - # Mock vulnerability database return { 'requests': {'2.25.0': ['CVE-2021-12345']}, 'django': {'2.2.0': ['CVE-2021-67890']} @@ -67,16 +66,24 @@ jobs: def parse_file(self): logging.info(f"Parsing file: {self.file_path}") - with open(self.file_path, 'r', encoding='utf-8') as file: - self.code_lines = file.readlines() - self.ast_tree = ast.parse(''.join(self.code_lines)) - logging.info(f"File parsed. Total lines: {len(self.code_lines)}") + try: + with open(self.file_path, 'r', encoding='utf-8') as file: + self.code_lines = file.readlines() + self.ast_tree = ast.parse(''.join(self.code_lines)) + logging.info(f"File parsed. Total lines: {len(self.code_lines)}") + except Exception as e: + logging.error(f"Error parsing file {self.file_path}: {str(e)}") + raise def run_bandit(self): - b_mgr = bandit_manager.BanditManager(bandit.config.BanditConfig(), agg_type='file') - b_mgr.discover_files([self.file_path]) - b_mgr.run_tests() - return b_mgr.get_issue_list() + try: + b_mgr = bandit_manager.BanditManager(bandit.config.BanditConfig(), agg_type='file') + b_mgr.discover_files([self.file_path]) + b_mgr.run_tests() + return b_mgr.get_issue_list() + except Exception as e: + logging.error(f"Error running Bandit: {str(e)}") + return [] def add_vulnerability(self, category: str, description: str, line_number: int, severity: str, confidence: str): self.vulnerabilities.append({ @@ -84,150 +91,186 @@ jobs: 'description': description, 'line_number': line_number, 'severity': severity, - 'confidence': confidence + 'confidence': confidence, + 'code_context': self.code_lines[line_number-1].strip() if line_number > 0 else None }) - logging.info(f"Vulnerability added: {category} at line {line_number}") + logging.info(f"Critical vulnerability added: {category} at line {line_number}") - def check_hardcoded_secrets(self): - pattern = re.compile(r'(?i)(password|secret|key|token)\s*=\s*["\'][^"\']+["\']') - for i, line in enumerate(self.code_lines): - if match := pattern.search(line): - self.add_vulnerability('Hardcoded Secret', f"Potential hardcoded secret: {match.group(0)}", i+1, 'HIGH', 'MEDIUM') - - def check_sql_injection(self): + def check_high_risk_sql_injection(self): sql_patterns = [ - r'(?i)(?:execute|cursor\.execute)\s*\(.*?%s.*?\)', - r'(?i)(?:execute|cursor\.execute)\s*\(.*?f["\'].*?\{.*?\}.*?["\'].*?\)' + r'(?i)(?:execute|cursor\.execute)\s*\(.*?f["\'].*?\{.*?\}.*?["\'].*?\)', # f-string in SQL + r'(?i)(?:execute|cursor\.execute)\s*\(.*?\+.*?\)', # String concatenation in SQL + r'(?i)(?:execute|cursor\.execute)\s*\(.*?%.*?\%.*?\)' # %-formatting without params ] for i, line in enumerate(self.code_lines): for pattern in sql_patterns: if re.search(pattern, line): - self.add_vulnerability('SQL Injection', f"Potential SQL injection: {line.strip()}", i+1, 'HIGH', 'HIGH') - - def check_xss_vulnerabilities(self): - xss_patterns = [ - r'(?i)render_template_string\s*\(', - r'(?i)jinja2\.Template\s*\(', - r'(?i)flask\.render_template_string\s*\(', - r'(?i)response\.write\(.+\)', - r'(?i)print\(.+\)' + self.add_vulnerability( + 'SQL Injection', + f"Critical: SQL injection vulnerability detected", + i+1, + 'HIGH', + 'HIGH' + ) + + def check_hardcoded_secrets(self): + secret_patterns = [ + r'(?i)(password|secret|api_key|token)\s*=\s*["\'][^"\']+["\'](?!\s*{\s*key\s*[=:]\s*|os\.environ)', + r'(?i)auth_token\s*=\s*["\'][0-9a-zA-Z]+["\']', + r'(?i)api_key\s*=\s*["\'][0-9a-zA-Z]+["\']' + ] + + safe_patterns = [ + r'key="[\w_-]+"', # Streamlit/UI keys + r'os\.environ\.get', # Environment variables + r'load_dotenv', # Environment loading + r'SECRET_KEY\s*=\s*config\.', # Config references + r'test|example|dummy|sample' # Test code ] - for i, line in enumerate(self.code_lines): - for pattern in xss_patterns: - if re.search(pattern, line): - self.add_vulnerability('Cross-Site Scripting (XSS)', f"Potential XSS vulnerability: {line.strip()}", i+1, 'HIGH', 'MEDIUM') - def check_vulnerable_components(self): - import_pattern = r'(?:from|import)\s+([\w\.]*)(?:\s+import)?' for i, line in enumerate(self.code_lines): - if match := re.search(import_pattern, line): - lib = match.group(1).split('.')[0] - if lib in self.vulnerability_db: - self.add_vulnerability('Vulnerable Component', f"Potentially vulnerable library: {lib}", i+1, 'HIGH', 'MEDIUM') - - def perform_taint_analysis(self): - logging.info("Performing taint analysis") - tainted_vars = set() - for node in ast.walk(self.ast_tree): - if isinstance(node, ast.Assign): - for target in node.targets: - if isinstance(target, ast.Name): - if isinstance(node.value, ast.Call) and isinstance(node.value.func, ast.Name) and node.value.func.id in ['input', 'request.form.get']: - tainted_vars.add(target.id) - elif isinstance(node, ast.Name) and node.id in tainted_vars: - if isinstance(node.ctx, ast.Load): - self.add_vulnerability('Tainted Variable Usage', f"Potentially tainted variable used: {node.id}", getattr(node, 'lineno', 0), 'MEDIUM', 'MEDIUM') - - def check_ssrf_vulnerabilities(self): - ssrf_patterns = [ - r'(?i)requests\.get\s*\(', - r'(?i)urllib\.request\.urlopen\s*\(', - r'(?i)http\.client\.HTTPConnection\s*\(' + if any(re.search(safe_pat, line, re.IGNORECASE) for safe_pat in safe_patterns): + continue + + for pattern in secret_patterns: + if match := re.search(pattern, line): + self.add_vulnerability( + 'Hardcoded Secret', + f"Critical: Hardcoded secret detected", + i+1, + 'HIGH', + 'HIGH' + ) + + def check_command_injection(self): + dangerous_patterns = [ + r'subprocess\.(?:call|run|Popen)\s*\(.*?\+.*?\)', # String concatenation + r'os\.system\s*\(.*?\+.*?\)', # String concatenation + r'subprocess\.(?:call|run|Popen)\s*\(.*?format.*?\)', # String formatting + r'subprocess\.(?:call|run|Popen)\s*\(.*?f["\'].*?\{.*?\}.*?["\'].*?\)' # f-strings + ] + + safe_patterns = [ + r'subprocess\.run\(\[[^\]]+\],\s*check=True\)', + r'subprocess\.run\(\[[^\]]+\],\s*shell=False\)' ] + for i, line in enumerate(self.code_lines): - for pattern in ssrf_patterns: - if re.search(pattern, line): - self.add_vulnerability('SSRF', f"Potential SSRF vulnerability: {line.strip()}", i+1, 'HIGH', 'MEDIUM') + if any(re.search(safe_pat, line) for safe_pat in safe_patterns): + continue - def check_logging_and_monitoring(self): - logging_patterns = [ - r'(?i)logging\.', - r'(?i)print\s*\(', - r'(?i)sys\.stdout\.write\s*\(' - ] - has_logging = any(re.search(pattern, line) for pattern in logging_patterns for line in self.code_lines) - if not has_logging: - self.add_vulnerability('Insufficient Logging', "No logging statements found in the file", 0, 'MEDIUM', 'HIGH') - - def check_idor(self): - idor_patterns = [ - r'(?i)request\.args\.get\s*\([\'"].*role.*[\'"]\)', - r'(?i)if\s+.*role\s*==\s*[\'"]admin[\'"]' + for pattern in dangerous_patterns: + if re.search(pattern, line): + self.add_vulnerability( + 'Command Injection', + f"Critical: Command injection vulnerability detected", + i+1, + 'HIGH', + 'HIGH' + ) + + def check_dangerous_deserialization(self): + dangerous_patterns = [ + (r'pickle\.loads?\(.*?\)', "Unsafe pickle deserialization"), + (r'yaml\.load\((?![^)]*Loader=yaml\.SafeLoader)', "Unsafe YAML loading"), + (r'eval\(.*?\)', "Dangerous eval() usage") ] + for i, line in enumerate(self.code_lines): - for pattern in idor_patterns: + for pattern, message in dangerous_patterns: if re.search(pattern, line): - self.add_vulnerability('Insecure Direct Object Reference', f"Potential IDOR vulnerability: {line.strip()}", i+1, 'HIGH', 'MEDIUM') - - def check_sensitive_data_exposure(self): - sensitive_patterns = [ - r'(?i)os\.environ', - r'(?i)send_file\s*\(', - r'(?i)open\s*\(' + self.add_vulnerability( + 'Dangerous Deserialization', + f"Critical: {message}", + i+1, + 'HIGH', + 'HIGH' + ) + + def check_path_traversal(self): + dangerous_patterns = [ + (r'open\s*\([^)]*[\'"][.][.][\/\\]', "Path traversal vulnerability"), + (r'open\s*\([^)]*\+', "Dynamic file path manipulation"), + (r'os\.path\.join\s*\([^)]*\+', "Dynamic path joining") ] + for i, line in enumerate(self.code_lines): - for pattern in sensitive_patterns: + for pattern, message in dangerous_patterns: if re.search(pattern, line): - self.add_vulnerability('Sensitive Data Exposure', f"Potential sensitive data exposure: {line.strip()}", i+1, 'HIGH', 'MEDIUM') - - def check_insecure_deserialization(self): - deser_patterns = [ - r'(?i)pickle\.loads\s*\(', - r'(?i)yaml\.load\s*\(', - r'(?i)json\.loads\s*\(' + self.add_vulnerability( + 'Path Traversal', + f"Critical: {message}", + i+1, + 'HIGH', + 'HIGH' + ) + + def check_high_risk_ssrf(self): + ssrf_patterns = [ + r'requests\.(get|post|put|delete)\s*\([^)]*\+', + r'urllib\.request\.urlopen\s*\([^)]*\+', + r'http\.client\.HTTPConnection\s*\([^)]*\+' ] + for i, line in enumerate(self.code_lines): - for pattern in deser_patterns: + for pattern in ssrf_patterns: if re.search(pattern, line): - self.add_vulnerability('Insecure Deserialization', f"Potential insecure deserialization: {line.strip()}", i+1, 'HIGH', 'HIGH') + self.add_vulnerability( + 'SSRF', + f"Critical: Server-Side Request Forgery vulnerability", + i+1, + 'HIGH', + 'HIGH' + ) def analyze(self): try: self.parse_file() - self.check_sql_injection() - self.check_xss_vulnerabilities() + self.check_high_risk_sql_injection() self.check_hardcoded_secrets() - self.check_vulnerable_components() - self.perform_taint_analysis() - self.check_ssrf_vulnerabilities() - self.check_logging_and_monitoring() - self.check_idor() - self.check_sensitive_data_exposure() - self.check_insecure_deserialization() + self.check_dangerous_deserialization() + self.check_command_injection() + self.check_path_traversal() + self.check_high_risk_ssrf() bandit_issues = self.run_bandit() for issue in bandit_issues: - self.add_vulnerability(f"Bandit: {issue.test_id}", issue.text, issue.lineno, issue.severity, issue.confidence) - - logging.info("Analysis completed successfully") + if issue.severity.lower() == 'high': + self.add_vulnerability( + f"Critical Security Issue ({issue.test_id})", + issue.text, + issue.lineno, + 'HIGH', + issue.confidence + ) + + logging.info("Security analysis completed successfully") except Exception as e: logging.error(f"An error occurred during analysis: {str(e)}") raise def generate_report(self): - report = f"Advanced Vulnerability Scan Results for {self.file_path}:\n" - report += f"Total lines of code: {len(self.code_lines)}\n\n" - report += "Detected Vulnerabilities:\n" - if not self.vulnerabilities: - report += "No vulnerabilities detected.\n" + report = f"\nšŸ”’ Security Scan Results for {self.file_path} šŸ”’\n" + report += "=" * 50 + "\n" + report += f"Lines of Code Analyzed: {len(self.code_lines)}\n\n" + + # Filter for high-risk vulnerabilities + high_risk_vulns = [v for v in self.vulnerabilities + if v['severity'] == 'HIGH' and v['confidence'] in ['HIGH', 'MEDIUM']] + + if high_risk_vulns: + report += f"šŸšØ Found {len(high_risk_vulns)} Critical Security Issues!\n\n" + for vuln in high_risk_vulns: + report += f"CRITICAL: {vuln['category']}\n" + report += f"Description: {vuln['description']}\n" + report += f"Location: Line {vuln['line_number']}\n" + if vuln.get('code_context'): + report += f"Code: {vuln['code_context']}\n" + report += f"Confidence: {vuln['confidence']}\n" + report += "-" * 40 + "\n" else: - for vuln in sorted(self.vulnerabilities, key=lambda x: x['severity'], reverse=True): - report += f"- {vuln['category']}: {vuln['description']}\n" - report += f" Severity: {vuln['severity']}, Confidence: {vuln['confidence']}\n" - if vuln['line_number'] > 0: - report += f" Location: Line {vuln['line_number']}\n" - report += f" Code: {self.code_lines[vuln['line_number']-1].strip()}\n" - report += "\n" + report += "āœ… No critical security issues detected.\n" + return report def scan_file_or_directory(path):