data anonymization #37
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name: Security Scan | |
on: | |
pull_request: | |
types: [opened, synchronize, reopened] | |
permissions: | |
contents: read | |
pull-requests: write | |
jobs: | |
security-scan: | |
runs-on: ubuntu-latest | |
steps: | |
- uses: actions/checkout@v3 | |
- name: Set up Python | |
uses: actions/setup-python@v4 | |
with: | |
python-version: '3.10' | |
- name: Cache pip packages | |
uses: actions/cache@v3 | |
with: | |
path: ~/.cache/pip | |
key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }} | |
restore-keys: | | |
${{ runner.os }}-pip- | |
- name: Install dependencies | |
run: | | |
python -m pip install --upgrade pip | |
pip install bandit | |
- name: Create amir.py | |
run: | | |
cat << EOF > amir.py | |
import subprocess | |
import json | |
import os | |
import re | |
import ast | |
import logging | |
from typing import List, Dict, Any | |
import bandit | |
from bandit.core import manager as bandit_manager | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
class AdvancedVulnerabilityScanner: | |
def __init__(self, file_path: str): | |
self.file_path = file_path | |
self.vulnerabilities: List[Dict[str, Any]] = [] | |
self.code_lines: List[str] = [] | |
self.ast_tree: ast.AST = None | |
self.vulnerability_db = self.load_vulnerability_db() | |
def load_vulnerability_db(self): | |
# Mock vulnerability database | |
return { | |
'requests': {'2.25.0': ['CVE-2021-12345']}, | |
'django': {'2.2.0': ['CVE-2021-67890']} | |
} | |
def parse_file(self): | |
logging.info(f"Parsing file: {self.file_path}") | |
with open(self.file_path, 'r', encoding='utf-8') as file: | |
self.code_lines = file.readlines() | |
self.ast_tree = ast.parse(''.join(self.code_lines)) | |
logging.info(f"File parsed. Total lines: {len(self.code_lines)}") | |
def run_bandit(self): | |
b_mgr = bandit_manager.BanditManager(bandit.config.BanditConfig(), agg_type='file') | |
b_mgr.discover_files([self.file_path]) | |
b_mgr.run_tests() | |
return b_mgr.get_issue_list() | |
def add_vulnerability(self, category: str, description: str, line_number: int, severity: str, confidence: str): | |
self.vulnerabilities.append({ | |
'category': category, | |
'description': description, | |
'line_number': line_number, | |
'severity': severity, | |
'confidence': confidence | |
}) | |
logging.info(f"Vulnerability added: {category} at line {line_number}") | |
def check_hardcoded_secrets(self): | |
pattern = re.compile(r'(?i)(password|secret|key|token)\s*=\s*["\'][^"\']+["\']') | |
for i, line in enumerate(self.code_lines): | |
if match := pattern.search(line): | |
self.add_vulnerability('Hardcoded Secret', f"Potential hardcoded secret: {match.group(0)}", i+1, 'HIGH', 'MEDIUM') | |
def check_sql_injection(self): | |
sql_patterns = [ | |
r'(?i)(?:execute|cursor\.execute)\s*\(.*?%s.*?\)', | |
r'(?i)(?:execute|cursor\.execute)\s*\(.*?f["\'].*?\{.*?\}.*?["\'].*?\)' | |
] | |
for i, line in enumerate(self.code_lines): | |
for pattern in sql_patterns: | |
if re.search(pattern, line): | |
self.add_vulnerability('SQL Injection', f"Potential SQL injection: {line.strip()}", i+1, 'HIGH', 'HIGH') | |
def check_xss_vulnerabilities(self): | |
xss_patterns = [ | |
r'(?i)render_template_string\s*\(', | |
r'(?i)jinja2\.Template\s*\(', | |
r'(?i)flask\.render_template_string\s*\(', | |
r'(?i)response\.write\(.+\)', | |
r'(?i)print\(.+\)' | |
] | |
for i, line in enumerate(self.code_lines): | |
for pattern in xss_patterns: | |
if re.search(pattern, line): | |
self.add_vulnerability('Cross-Site Scripting (XSS)', f"Potential XSS vulnerability: {line.strip()}", i+1, 'HIGH', 'MEDIUM') | |
def check_vulnerable_components(self): | |
import_pattern = r'(?:from|import)\s+([\w\.]*)(?:\s+import)?' | |
for i, line in enumerate(self.code_lines): | |
if match := re.search(import_pattern, line): | |
lib = match.group(1).split('.')[0] | |
if lib in self.vulnerability_db: | |
self.add_vulnerability('Vulnerable Component', f"Potentially vulnerable library: {lib}", i+1, 'HIGH', 'MEDIUM') | |
def perform_taint_analysis(self): | |
logging.info("Performing taint analysis") | |
tainted_vars = set() | |
for node in ast.walk(self.ast_tree): | |
if isinstance(node, ast.Assign): | |
for target in node.targets: | |
if isinstance(target, ast.Name): | |
if isinstance(node.value, ast.Call) and isinstance(node.value.func, ast.Name) and node.value.func.id in ['input', 'request.form.get']: | |
tainted_vars.add(target.id) | |
elif isinstance(node, ast.Name) and node.id in tainted_vars: | |
if isinstance(node.ctx, ast.Load): | |
self.add_vulnerability('Tainted Variable Usage', f"Potentially tainted variable used: {node.id}", getattr(node, 'lineno', 0), 'MEDIUM', 'MEDIUM') | |
def check_ssrf_vulnerabilities(self): | |
ssrf_patterns = [ | |
r'(?i)requests\.get\s*\(', | |
r'(?i)urllib\.request\.urlopen\s*\(', | |
r'(?i)http\.client\.HTTPConnection\s*\(' | |
] | |
for i, line in enumerate(self.code_lines): | |
for pattern in ssrf_patterns: | |
if re.search(pattern, line): | |
self.add_vulnerability('SSRF', f"Potential SSRF vulnerability: {line.strip()}", i+1, 'HIGH', 'MEDIUM') | |
def check_logging_and_monitoring(self): | |
logging_patterns = [ | |
r'(?i)logging\.', | |
r'(?i)print\s*\(', | |
r'(?i)sys\.stdout\.write\s*\(' | |
] | |
has_logging = any(re.search(pattern, line) for pattern in logging_patterns for line in self.code_lines) | |
if not has_logging: | |
self.add_vulnerability('Insufficient Logging', "No logging statements found in the file", 0, 'MEDIUM', 'HIGH') | |
def check_idor(self): | |
idor_patterns = [ | |
r'(?i)request\.args\.get\s*\([\'"].*role.*[\'"]\)', | |
r'(?i)if\s+.*role\s*==\s*[\'"]admin[\'"]' | |
] | |
for i, line in enumerate(self.code_lines): | |
for pattern in idor_patterns: | |
if re.search(pattern, line): | |
self.add_vulnerability('Insecure Direct Object Reference', f"Potential IDOR vulnerability: {line.strip()}", i+1, 'HIGH', 'MEDIUM') | |
def check_sensitive_data_exposure(self): | |
sensitive_patterns = [ | |
r'(?i)os\.environ', | |
r'(?i)send_file\s*\(', | |
r'(?i)open\s*\(' | |
] | |
for i, line in enumerate(self.code_lines): | |
for pattern in sensitive_patterns: | |
if re.search(pattern, line): | |
self.add_vulnerability('Sensitive Data Exposure', f"Potential sensitive data exposure: {line.strip()}", i+1, 'HIGH', 'MEDIUM') | |
def check_insecure_deserialization(self): | |
deser_patterns = [ | |
r'(?i)pickle\.loads\s*\(', | |
r'(?i)yaml\.load\s*\(', | |
r'(?i)json\.loads\s*\(' | |
] | |
for i, line in enumerate(self.code_lines): | |
for pattern in deser_patterns: | |
if re.search(pattern, line): | |
self.add_vulnerability('Insecure Deserialization', f"Potential insecure deserialization: {line.strip()}", i+1, 'HIGH', 'HIGH') | |
def analyze(self): | |
try: | |
self.parse_file() | |
self.check_sql_injection() | |
self.check_xss_vulnerabilities() | |
self.check_hardcoded_secrets() | |
self.check_vulnerable_components() | |
self.perform_taint_analysis() | |
self.check_ssrf_vulnerabilities() | |
self.check_logging_and_monitoring() | |
self.check_idor() | |
self.check_sensitive_data_exposure() | |
self.check_insecure_deserialization() | |
bandit_issues = self.run_bandit() | |
for issue in bandit_issues: | |
self.add_vulnerability(f"Bandit: {issue.test_id}", issue.text, issue.lineno, issue.severity, issue.confidence) | |
logging.info("Analysis completed successfully") | |
except Exception as e: | |
logging.error(f"An error occurred during analysis: {str(e)}") | |
raise | |
def generate_report(self): | |
report = f"Advanced Vulnerability Scan Results for {self.file_path}:\n" | |
report += f"Total lines of code: {len(self.code_lines)}\n\n" | |
report += "Detected Vulnerabilities:\n" | |
if not self.vulnerabilities: | |
report += "No vulnerabilities detected.\n" | |
else: | |
for vuln in sorted(self.vulnerabilities, key=lambda x: x['severity'], reverse=True): | |
report += f"- {vuln['category']}: {vuln['description']}\n" | |
report += f" Severity: {vuln['severity']}, Confidence: {vuln['confidence']}\n" | |
if vuln['line_number'] > 0: | |
report += f" Location: Line {vuln['line_number']}\n" | |
report += f" Code: {self.code_lines[vuln['line_number']-1].strip()}\n" | |
report += "\n" | |
return report | |
def scan_file_or_directory(path): | |
if os.path.isfile(path): | |
scanner = AdvancedVulnerabilityScanner(path) | |
scanner.analyze() | |
return scanner.generate_report() | |
elif os.path.isdir(path): | |
full_report = "" | |
for root, dirs, files in os.walk(path): | |
for file in files: | |
if file.endswith('.py'): | |
file_path = os.path.join(root, file) | |
scanner = AdvancedVulnerabilityScanner(file_path) | |
scanner.analyze() | |
full_report += scanner.generate_report() + "\n\n" | |
return full_report | |
else: | |
return f"Error: {path} is not a valid file or directory." | |
def main(): | |
path = "." # Scan the entire repository | |
report = scan_file_or_directory(path) | |
with open('security-scan-results.txt', 'w') as f: | |
f.write(report) | |
if __name__ == "__main__": | |
main() | |
EOF | |
- name: Run security scan | |
run: python amir.py | |
continue-on-error: true | |
- name: Check for scan results | |
id: check_results | |
run: | | |
if [ -f security-scan-results.txt ]; then | |
echo "file_exists=true" >> $GITHUB_OUTPUT | |
if grep -q "Detected Vulnerabilities:" security-scan-results.txt; then | |
echo "vulnerabilities_found=true" >> $GITHUB_OUTPUT | |
else | |
echo "vulnerabilities_found=false" >> $GITHUB_OUTPUT | |
fi | |
else | |
echo "file_exists=false" >> $GITHUB_OUTPUT | |
fi | |
- name: Upload scan results | |
uses: actions/upload-artifact@v3 | |
with: | |
name: security-scan-results | |
path: security-scan-results.txt | |
- name: Comment PR | |
uses: actions/github-script@v6 | |
if: always() | |
with: | |
github-token: ${{secrets.GITHUB_TOKEN}} | |
script: | | |
const fs = require('fs') | |
let comment = '## Security Scan Results\n\n' | |
if ('${{ steps.check_results.outputs.file_exists }}' === 'true') { | |
const scanResults = fs.readFileSync('security-scan-results.txt', 'utf8') | |
comment += '```\n' + scanResults + '\n```\n\n' | |
if ('${{ steps.check_results.outputs.vulnerabilities_found }}' === 'true') { | |
comment += '⛔ **Vulnerabilities detected. Please address these issues before merging.**' | |
} else { | |
comment += '✅ **No vulnerabilities detected.**' | |
} | |
} else { | |
comment += '⚠️ **Error: The security scan failed to complete. Please review the workflow logs for more information.**' | |
} | |
github.rest.issues.createComment({ | |
issue_number: context.issue.number, | |
owner: context.repo.owner, | |
repo: context.repo.repo, | |
body: comment | |
}) | |
- name: Fail if vulnerabilities found or scan failed | |
if: steps.check_results.outputs.vulnerabilities_found == 'true' || steps.check_results.outputs.file_exists == 'false' | |
run: exit 1 |