Skip to content

feat: enhance security scanner with reduced false positives and bette… #49

feat: enhance security scanner with reduced false positives and bette…

feat: enhance security scanner with reduced false positives and bette… #49

Workflow file for this run

name: Security Scan
on:
pull_request:
types: [opened, synchronize, reopened]
# Add top-level permissions blocksadadsad
permissions:
contents: read
pull-requests: write
issues: write
checks: write
jobs:
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.10'
- name: Cache pip packages
uses: actions/cache@v4
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }}
restore-keys: |
${{ runner.os }}-pip-
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install bandit
- name: Create amir.py
run: |
cat << EOF > amir.py
import subprocess
import json
import os
import re
import ast
import logging
from typing import List, Dict, Any
import bandit
from bandit.core import manager as bandit_manager
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class AdvancedVulnerabilityScanner:
def __init__(self, file_path: str):
self.file_path = file_path
self.vulnerabilities: List[Dict[str, Any]] = []
self.code_lines: List[str] = []
self.ast_tree: ast.AST = None
self.vulnerability_db = self.load_vulnerability_db()
def load_vulnerability_db(self):
return {
'requests': {'2.25.0': ['CVE-2021-12345']},
'django': {'2.2.0': ['CVE-2021-67890']}
}
def parse_file(self):
logging.info(f"Parsing file: {self.file_path}")
try:
with open(self.file_path, 'r', encoding='utf-8') as file:
self.code_lines = file.readlines()
self.ast_tree = ast.parse(''.join(self.code_lines))
logging.info(f"File parsed. Total lines: {len(self.code_lines)}")
except Exception as e:
logging.error(f"Error parsing file {self.file_path}: {str(e)}")
raise
def run_bandit(self):
try:
b_mgr = bandit_manager.BanditManager(bandit.config.BanditConfig(), agg_type='file')
b_mgr.discover_files([self.file_path])
b_mgr.run_tests()
return b_mgr.get_issue_list()
except Exception as e:
logging.error(f"Error running Bandit: {str(e)}")
return []
def add_vulnerability(self, category: str, description: str, line_number: int, severity: str, confidence: str):
self.vulnerabilities.append({
'category': category,
'description': description,
'line_number': line_number,
'severity': severity,
'confidence': confidence,
'code_context': self.code_lines[line_number-1].strip() if line_number > 0 else None
})
logging.info(f"Critical vulnerability added: {category} at line {line_number}")
def check_high_risk_sql_injection(self):
sql_patterns = [
r'(?i)(?:execute|cursor\.execute)\s*\(.*?f["\'].*?\{.*?\}.*?["\'].*?\)', # f-string in SQL
r'(?i)(?:execute|cursor\.execute)\s*\(.*?\+.*?\)', # String concatenation in SQL
r'(?i)(?:execute|cursor\.execute)\s*\(.*?%.*?\%.*?\)' # %-formatting without params
]
for i, line in enumerate(self.code_lines):
for pattern in sql_patterns:
if re.search(pattern, line):
self.add_vulnerability(
'SQL Injection',
f"Critical: SQL injection vulnerability detected",
i+1,
'HIGH',
'HIGH'
)
def check_hardcoded_secrets(self):
secret_patterns = [
r'(?i)(password|secret|api_key|token)\s*=\s*["\'][^"\']+["\'](?!\s*{\s*key\s*[=:]\s*|os\.environ)',
r'(?i)auth_token\s*=\s*["\'][0-9a-zA-Z]+["\']',
r'(?i)api_key\s*=\s*["\'][0-9a-zA-Z]+["\']'
]
safe_patterns = [
r'key="[\w_-]+"', # Streamlit/UI keys
r'os\.environ\.get', # Environment variables
r'load_dotenv', # Environment loading
r'SECRET_KEY\s*=\s*config\.', # Config references
r'test|example|dummy|sample' # Test code
]
for i, line in enumerate(self.code_lines):
if any(re.search(safe_pat, line, re.IGNORECASE) for safe_pat in safe_patterns):
continue
for pattern in secret_patterns:
if match := re.search(pattern, line):
self.add_vulnerability(
'Hardcoded Secret',
f"Critical: Hardcoded secret detected",
i+1,
'HIGH',
'HIGH'
)
def check_command_injection(self):
dangerous_patterns = [
r'subprocess\.(?:call|run|Popen)\s*\(.*?\+.*?\)', # String concatenation
r'os\.system\s*\(.*?\+.*?\)', # String concatenation
r'subprocess\.(?:call|run|Popen)\s*\(.*?format.*?\)', # String formatting
r'subprocess\.(?:call|run|Popen)\s*\(.*?f["\'].*?\{.*?\}.*?["\'].*?\)' # f-strings
]
safe_patterns = [
r'subprocess\.run\(\[[^\]]+\],\s*check=True\)',
r'subprocess\.run\(\[[^\]]+\],\s*shell=False\)'
]
for i, line in enumerate(self.code_lines):
if any(re.search(safe_pat, line) for safe_pat in safe_patterns):
continue
for pattern in dangerous_patterns:
if re.search(pattern, line):
self.add_vulnerability(
'Command Injection',
f"Critical: Command injection vulnerability detected",
i+1,
'HIGH',
'HIGH'
)
def check_dangerous_deserialization(self):
dangerous_patterns = [
(r'pickle\.loads?\(.*?\)', "Unsafe pickle deserialization"),
(r'yaml\.load\((?![^)]*Loader=yaml\.SafeLoader)', "Unsafe YAML loading"),
(r'eval\(.*?\)', "Dangerous eval() usage")
]
for i, line in enumerate(self.code_lines):
for pattern, message in dangerous_patterns:
if re.search(pattern, line):
self.add_vulnerability(
'Dangerous Deserialization',
f"Critical: {message}",
i+1,
'HIGH',
'HIGH'
)
def check_path_traversal(self):
dangerous_patterns = [
(r'open\s*\([^)]*[\'"][.][.][\/\\]', "Path traversal vulnerability"),
(r'open\s*\([^)]*\+', "Dynamic file path manipulation"),
(r'os\.path\.join\s*\([^)]*\+', "Dynamic path joining")
]
for i, line in enumerate(self.code_lines):
for pattern, message in dangerous_patterns:
if re.search(pattern, line):
self.add_vulnerability(
'Path Traversal',
f"Critical: {message}",
i+1,
'HIGH',
'HIGH'
)
def check_high_risk_ssrf(self):
ssrf_patterns = [
r'requests\.(get|post|put|delete)\s*\([^)]*\+',
r'urllib\.request\.urlopen\s*\([^)]*\+',
r'http\.client\.HTTPConnection\s*\([^)]*\+'
]
for i, line in enumerate(self.code_lines):
for pattern in ssrf_patterns:
if re.search(pattern, line):
self.add_vulnerability(
'SSRF',
f"Critical: Server-Side Request Forgery vulnerability",
i+1,
'HIGH',
'HIGH'
)
def analyze(self):
try:
self.parse_file()
self.check_high_risk_sql_injection()
self.check_hardcoded_secrets()
self.check_dangerous_deserialization()
self.check_command_injection()
self.check_path_traversal()
self.check_high_risk_ssrf()
bandit_issues = self.run_bandit()
for issue in bandit_issues:
if issue.severity.lower() == 'high':
self.add_vulnerability(
f"Critical Security Issue ({issue.test_id})",
issue.text,
issue.lineno,
'HIGH',
issue.confidence
)
logging.info("Security analysis completed successfully")
except Exception as e:
logging.error(f"An error occurred during analysis: {str(e)}")
raise
def generate_report(self):
report = f"\n🔒 Security Scan Results for {self.file_path} 🔒\n"
report += "=" * 50 + "\n"
report += f"Lines of Code Analyzed: {len(self.code_lines)}\n\n"
# Filter for high-risk vulnerabilities
high_risk_vulns = [v for v in self.vulnerabilities
if v['severity'] == 'HIGH' and v['confidence'] in ['HIGH', 'MEDIUM']]
if high_risk_vulns:
report += f"🚨 Found {len(high_risk_vulns)} Critical Security Issues!\n\n"
for vuln in high_risk_vulns:
report += f"CRITICAL: {vuln['category']}\n"
report += f"Description: {vuln['description']}\n"
report += f"Location: Line {vuln['line_number']}\n"
if vuln.get('code_context'):
report += f"Code: {vuln['code_context']}\n"
report += f"Confidence: {vuln['confidence']}\n"
report += "-" * 40 + "\n"
else:
report += "✅ No critical security issues detected.\n"
return report
def scan_file_or_directory(path):
if os.path.isfile(path):
scanner = AdvancedVulnerabilityScanner(path)
scanner.analyze()
return scanner.generate_report()
elif os.path.isdir(path):
full_report = ""
for root, dirs, files in os.walk(path):
for file in files:
if file.endswith('.py'):
file_path = os.path.join(root, file)
scanner = AdvancedVulnerabilityScanner(file_path)
scanner.analyze()
full_report += scanner.generate_report() + "\n\n"
return full_report
else:
return f"Error: {path} is not a valid file or directory."
def main():
path = "." # Scan the entire repository
report = scan_file_or_directory(path)
with open('security-scan-results.txt', 'w') as f:
f.write(report)
if __name__ == "__main__":
main()
EOF
import ast
import logging
from typing import List, Dict, Any
import bandit
from bandit.core import manager as bandit_manager
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class AdvancedVulnerabilityScanner:
def __init__(self, file_path: str):
self.file_path = file_path
self.vulnerabilities: List[Dict[str, Any]] = []
self.code_lines: List[str] = []
self.ast_tree: ast.AST = None
self.vulnerability_db = self.load_vulnerability_db()
def load_vulnerability_db(self):
# Mock vulnerability database
return {
'requests': {'2.25.0': ['CVE-2021-12345']},
'django': {'2.2.0': ['CVE-2021-67890']}
}
def parse_file(self):
logging.info(f"Parsing file: {self.file_path}")
with open(self.file_path, 'r', encoding='utf-8') as file:
self.code_lines = file.readlines()
self.ast_tree = ast.parse(''.join(self.code_lines))
logging.info(f"File parsed. Total lines: {len(self.code_lines)}")
def run_bandit(self):
b_mgr = bandit_manager.BanditManager(bandit.config.BanditConfig(), agg_type='file')
b_mgr.discover_files([self.file_path])
b_mgr.run_tests()
return b_mgr.get_issue_list()
def add_vulnerability(self, category: str, description: str, line_number: int, severity: str, confidence: str):
self.vulnerabilities.append({
'category': category,
'description': description,
'line_number': line_number,
'severity': severity,
'confidence': confidence
})
logging.info(f"Vulnerability added: {category} at line {line_number}")
def check_hardcoded_secrets(self):
pattern = re.compile(r'(?i)(password|secret|key|token)\s*=\s*["\'][^"\']+["\']')
for i, line in enumerate(self.code_lines):
if match := pattern.search(line):
self.add_vulnerability('Hardcoded Secret', f"Potential hardcoded secret: {match.group(0)}", i+1, 'HIGH', 'MEDIUM')
def check_sql_injection(self):
sql_patterns = [
r'(?i)(?:execute|cursor\.execute)\s*\(.*?%s.*?\)',
r'(?i)(?:execute|cursor\.execute)\s*\(.*?f["\'].*?\{.*?\}.*?["\'].*?\)'
]
for i, line in enumerate(self.code_lines):
for pattern in sql_patterns:
if re.search(pattern, line):
self.add_vulnerability('SQL Injection', f"Potential SQL injection: {line.strip()}", i+1, 'HIGH', 'HIGH')
def check_xss_vulnerabilities(self):
xss_patterns = [
r'(?i)render_template_string\s*\(',
r'(?i)jinja2\.Template\s*\(',
r'(?i)flask\.render_template_string\s*\(',
r'(?i)response\.write\(.+\)',
r'(?i)print\(.+\)'
]
for i, line in enumerate(self.code_lines):
for pattern in xss_patterns:
if re.search(pattern, line):
self.add_vulnerability('Cross-Site Scripting (XSS)', f"Potential XSS vulnerability: {line.strip()}", i+1, 'HIGH', 'MEDIUM')
def check_vulnerable_components(self):
import_pattern = r'(?:from|import)\s+([\w\.]*)(?:\s+import)?'
for i, line in enumerate(self.code_lines):
if match := re.search(import_pattern, line):
lib = match.group(1).split('.')[0]
if lib in self.vulnerability_db:
self.add_vulnerability('Vulnerable Component', f"Potentially vulnerable library: {lib}", i+1, 'HIGH', 'MEDIUM')
def perform_taint_analysis(self):
logging.info("Performing taint analysis")
tainted_vars = set()
for node in ast.walk(self.ast_tree):
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name):
if isinstance(node.value, ast.Call) and isinstance(node.value.func, ast.Name) and node.value.func.id in ['input', 'request.form.get']:
tainted_vars.add(target.id)
elif isinstance(node, ast.Name) and node.id in tainted_vars:
if isinstance(node.ctx, ast.Load):
self.add_vulnerability('Tainted Variable Usage', f"Potentially tainted variable used: {node.id}", getattr(node, 'lineno', 0), 'MEDIUM', 'MEDIUM')
def check_ssrf_vulnerabilities(self):
ssrf_patterns = [
r'(?i)requests\.get\s*\(',
r'(?i)urllib\.request\.urlopen\s*\(',
r'(?i)http\.client\.HTTPConnection\s*\('
]
for i, line in enumerate(self.code_lines):
for pattern in ssrf_patterns:
if re.search(pattern, line):
self.add_vulnerability('SSRF', f"Potential SSRF vulnerability: {line.strip()}", i+1, 'HIGH', 'MEDIUM')
def check_logging_and_monitoring(self):
logging_patterns = [
r'(?i)logging\.',
r'(?i)print\s*\(',
r'(?i)sys\.stdout\.write\s*\('
]
has_logging = any(re.search(pattern, line) for pattern in logging_patterns for line in self.code_lines)
if not has_logging:
self.add_vulnerability('Insufficient Logging', "No logging statements found in the file", 0, 'MEDIUM', 'HIGH')
def check_idor(self):
idor_patterns = [
r'(?i)request\.args\.get\s*\([\'"].*role.*[\'"]\)',
r'(?i)if\s+.*role\s*==\s*[\'"]admin[\'"]'
]
for i, line in enumerate(self.code_lines):
for pattern in idor_patterns:
if re.search(pattern, line):
self.add_vulnerability('Insecure Direct Object Reference', f"Potential IDOR vulnerability: {line.strip()}", i+1, 'HIGH', 'MEDIUM')
def check_sensitive_data_exposure(self):
sensitive_patterns = [
r'(?i)os\.environ',
r'(?i)send_file\s*\(',
r'(?i)open\s*\('
]
for i, line in enumerate(self.code_lines):
for pattern in sensitive_patterns:
if re.search(pattern, line):
self.add_vulnerability('Sensitive Data Exposure', f"Potential sensitive data exposure: {line.strip()}", i+1, 'HIGH', 'MEDIUM')
def check_insecure_deserialization(self):
deser_patterns = [
r'(?i)pickle\.loads\s*\(',
r'(?i)yaml\.load\s*\(',
r'(?i)json\.loads\s*\('
]
for i, line in enumerate(self.code_lines):
for pattern in deser_patterns:
if re.search(pattern, line):
self.add_vulnerability('Insecure Deserialization', f"Potential insecure deserialization: {line.strip()}", i+1, 'HIGH', 'HIGH')
def analyze(self):
try:
self.parse_file()
self.check_sql_injection()
self.check_xss_vulnerabilities()
self.check_hardcoded_secrets()
self.check_vulnerable_components()
self.perform_taint_analysis()
self.check_ssrf_vulnerabilities()
self.check_logging_and_monitoring()
self.check_idor()
self.check_sensitive_data_exposure()
self.check_insecure_deserialization()
bandit_issues = self.run_bandit()
for issue in bandit_issues:
self.add_vulnerability(f"Bandit: {issue.test_id}", issue.text, issue.lineno, issue.severity, issue.confidence)
logging.info("Analysis completed successfully")
except Exception as e:
logging.error(f"An error occurred during analysis: {str(e)}")
raise
def generate_report(self):
report = f"Advanced Vulnerability Scan Results for {self.file_path}:\n"
report += f"Total lines of code: {len(self.code_lines)}\n\n"
report += "Detected Vulnerabilities:\n"
if not self.vulnerabilities:
report += "No vulnerabilities detected.\n"
else:
for vuln in sorted(self.vulnerabilities, key=lambda x: x['severity'], reverse=True):
report += f"- {vuln['category']}: {vuln['description']}\n"
report += f" Severity: {vuln['severity']}, Confidence: {vuln['confidence']}\n"
if vuln['line_number'] > 0:
report += f" Location: Line {vuln['line_number']}\n"
report += f" Code: {self.code_lines[vuln['line_number']-1].strip()}\n"
report += "\n"
return report
def scan_file_or_directory(path):
if os.path.isfile(path):
scanner = AdvancedVulnerabilityScanner(path)
scanner.analyze()
return scanner.generate_report()
elif os.path.isdir(path):
full_report = ""
for root, dirs, files in os.walk(path):
for file in files:
if file.endswith('.py'):
file_path = os.path.join(root, file)
scanner = AdvancedVulnerabilityScanner(file_path)
scanner.analyze()
full_report += scanner.generate_report() + "\n\n"
return full_report
else:
return f"Error: {path} is not a valid file or directory."
def main():
path = "." # Scan the entire repository
report = scan_file_or_directory(path)
with open('security-scan-results.txt', 'w') as f:
f.write(report)
if __name__ == "__main__":
main()
EOF
- name: Run security scan
run: python amir.py
continue-on-error: true
- name: Check for scan results
id: check_results
run: |
if [ -f security-scan-results.txt ]; then
echo "file_exists=true" >> $GITHUB_OUTPUT
if grep -q "Detected Vulnerabilities:" security-scan-results.txt; then
echo "vulnerabilities_found=true" >> $GITHUB_OUTPUT
else
echo "vulnerabilities_found=false" >> $GITHUB_OUTPUT
fi
else
echo "file_exists=false" >> $GITHUB_OUTPUT
fi
- name: Upload scan results
uses: actions/upload-artifact@v4
with:
name: security-scan-results
path: security-scan-results.txt
retention-days: 90
- name: Comment PR
uses: actions/github-script@v7
if: always()
with:
github-token: ${{secrets.GITHUB_TOKEN}}
script: |
const fs = require('fs')
let comment = '## Security Scan Results\n\n'
if ('${{ steps.check_results.outputs.file_exists }}' === 'true') {
const scanResults = fs.readFileSync('security-scan-results.txt', 'utf8')
comment += '```\n' + scanResults + '\n```\n\n'
if ('${{ steps.check_results.outputs.vulnerabilities_found }}' === 'true') {
comment += '⛔ **Vulnerabilities detected. Please address these issues before merging.**'
} else {
comment += '✅ **No vulnerabilities detected.**'
}
} else {
comment += '⚠️ **Error: The security scan failed to complete. Please review the workflow logs for more information.**'
}
await github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: comment
})
- name: Fail if vulnerabilities found or scan failed
if: steps.check_results.outputs.vulnerabilities_found == 'true' || steps.check_results.outputs.file_exists == 'false'
run: exit 1