diff --git a/.gitignore b/.gitignore index 7cd094d..b7dc81d 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,4 @@ topic.txt fetchtastic.log .aider* app/__pycache__/ -fetchtastic.egg-info/ \ No newline at end of file +fetchtastic.egg-info/ diff --git a/.trunk/.gitignore b/.trunk/.gitignore index 072b680..15966d0 100644 --- a/.trunk/.gitignore +++ b/.trunk/.gitignore @@ -6,4 +6,4 @@ plugins user_trunk.yaml user.yaml -tmp \ No newline at end of file +tmp diff --git a/.trunk/configs/.bandit b/.trunk/configs/.bandit new file mode 100644 index 0000000..8aba310 --- /dev/null +++ b/.trunk/configs/.bandit @@ -0,0 +1,2 @@ +[bandit] +skips: B404,B603,B607,B605,B311,B103 diff --git a/.trunk/trunk.yaml b/.trunk/trunk.yaml index 855fca0..ce99594 100644 --- a/.trunk/trunk.yaml +++ b/.trunk/trunk.yaml @@ -7,7 +7,7 @@ cli: plugins: sources: - id: trunk - ref: v1.6.4 + ref: v1.6.5 uri: https://github.com/trunk-io/plugins # Many linters and tools depend on runtimes - configure them here. (https://docs.trunk.io/runtimes) runtimes: @@ -18,15 +18,21 @@ runtimes: lint: enabled: - actionlint@1.7.4 - - bandit@1.7.10 + - bandit@1.8.0 - black@24.10.0 - - checkov@3.2.291 + - checkov@3.2.327 - git-diff-check - isort@5.13.2 - - markdownlint@0.42.0 + - markdownlint@0.43.0 - osv-scanner@1.9.1 - - prettier@3.3.3 - - ruff@0.7.3 + - prettier@3.4.1 + - ruff@0.8.1 - taplo@0.9.3 - - trufflehog@3.83.6 + - trufflehog@3.84.2 - yamllint@1.35.1 +actions: + enabled: + - trunk-announce + - trunk-check-pre-push + - trunk-fmt-pre-commit + - trunk-upgrade-available diff --git a/app/downloader.py b/app/downloader.py index efb281b..6b6ab6f 100644 --- a/app/downloader.py +++ b/app/downloader.py @@ -2,10 +2,10 @@ import json import os +import re import time import zipfile from datetime import datetime -import re import requests from requests.adapters import HTTPAdapter @@ -32,7 +32,7 @@ def main(): extract_patterns = config.get("EXTRACT_PATTERNS", []) exclude_patterns = config.get("EXCLUDE_PATTERNS", []) wifi_only = config.get("WIFI_ONLY", False) if setup_config.is_termux() else False - notify_on_download_only = config.get("NOTIFY_ON_DOWNLOAD_ONLY", False) # Added this line + notify_on_download_only = config.get("NOTIFY_ON_DOWNLOAD_ONLY", False) selected_apk_patterns = config.get("SELECTED_APK_ASSETS", []) selected_firmware_patterns = config.get("SELECTED_FIRMWARE_ASSETS", []) @@ -78,7 +78,8 @@ def send_ntfy_notification(message, title=None): except requests.exceptions.RequestException as e: log_message(f"Error sending notification to {ntfy_url}: {e}") else: - log_message("Notifications are not configured.") + # Don't log when notifications are not configured + pass # Function to get the latest releases and sort by date def get_latest_releases(url, scan_count=10): @@ -110,7 +111,8 @@ def download_file(url, download_path): file.write(chunk) log_message(f"Downloaded {download_path}") else: - log_message(f"{download_path} already exists, skipping download.") + # Don't log when the file already exists + pass except requests.exceptions.RequestException as e: log_message(f"Error downloading {url}: {e}") @@ -154,18 +156,24 @@ def extract_files(zip_path, extract_dir, patterns, exclude_patterns): stripped_base_name = strip_version_numbers(base_name) for pattern in patterns: if pattern in stripped_base_name: - # Extract and flatten directory structure - source = zip_ref.open(file_info) + # Check if the file already exists target_path = os.path.join(extract_dir, base_name) - with open(target_path, "wb") as target_file: - target_file.write(source.read()) - log_message(f"Extracted {base_name} to {extract_dir}") + if not os.path.exists(target_path): + # Extract and flatten directory structure + source = zip_ref.open(file_info) + with open(target_path, "wb") as target_file: + target_file.write(source.read()) + log_message(f"Extracted {base_name} to {extract_dir}") + # If the file is a .sh script, check permissions + if base_name.endswith(".sh"): + # Check if the file has executable permissions + if not os.access(target_path, os.X_OK): + os.chmod(target_path, 0o755) + log_message( + f"Set executable permissions for {base_name}" + ) matched_files.append(base_name) break # Stop checking patterns for this file - if not matched_files: - log_message( - f"No files matched the extraction patterns in {zip_path}." - ) except zipfile.BadZipFile: log_message(f"Error: {zip_path} is a bad zip file and cannot be opened.") except Exception as e: @@ -180,7 +188,7 @@ def strip_version_numbers(filename): Uses the same regex as in menu_firmware.py to ensure consistency. """ # Regular expression matching version numbers and commit hashes - base_name = re.sub(r'([_-])\d+\.\d+\.\d+(?:\.[\da-f]+)?', r'\1', filename) + base_name = re.sub(r"([_-])\d+\.\d+\.\d+(?:\.[\da-f]+)?", r"\1", filename) return base_name # Cleanup function to keep only specific versions based on release tags @@ -214,6 +222,7 @@ def check_and_download( ): downloaded_versions = [] new_versions_available = [] + actions_taken = False # Track if any actions were taken if not os.path.exists(download_dir): os.makedirs(download_dir) @@ -239,10 +248,35 @@ def check_and_download( release_tag = release["tag_name"] release_dir = os.path.join(download_dir, release_tag) - if os.path.exists(release_dir) or release_tag == saved_release_tag: - log_message(f"Processing existing version {release_tag}.") + # Create release directory if it doesn't exist + if not os.path.exists(release_dir): + os.makedirs(release_dir, exist_ok=True) + + assets_to_download = [] + for asset in release["assets"]: + file_name = asset["name"] + # Strip version numbers from the file name + stripped_file_name = strip_version_numbers(file_name) + # Matching logic + if selected_patterns: + if not any( + pattern in stripped_file_name for pattern in selected_patterns + ): + continue # Skip this asset + download_path = os.path.join(release_dir, file_name) + if not os.path.exists(download_path): + assets_to_download.append( + (asset["browser_download_url"], download_path) + ) - # Check if extraction is needed + if assets_to_download: + actions_taken = True + log_message(f"Downloading missing assets for version {release_tag}.") + for url, path in assets_to_download: + download_file(url, path) + downloaded_versions.append(release_tag) + + # Extraction logic if auto_extract and release_type == "Firmware": for asset in release["assets"]: file_name = asset["name"] @@ -250,7 +284,10 @@ def check_and_download( zip_path = os.path.join(release_dir, file_name) if os.path.exists(zip_path): extraction_needed = check_extraction_needed( - zip_path, release_dir, extract_patterns + zip_path, + release_dir, + extract_patterns, + exclude_patterns, ) if extraction_needed: log_message(f"Extracting files from {zip_path}...") @@ -260,39 +297,12 @@ def check_and_download( extract_patterns, exclude_patterns, ) - else: - # Files are already extracted - pass - continue # Skip to the next release else: - # Proceed to download this version - os.makedirs(release_dir, exist_ok=True) - log_message(f"Downloading new {release_type} version: {release_tag}") - for asset in release["assets"]: - file_name = asset["name"] - # Strip version numbers from the file name - stripped_file_name = strip_version_numbers(file_name) - # Matching logic - if selected_patterns: - if not any( - pattern in stripped_file_name - for pattern in selected_patterns - ): - continue # Skip this asset - download_path = os.path.join(release_dir, file_name) - download_file(asset["browser_download_url"], download_path) - if ( - auto_extract - and file_name.endswith(".zip") - and release_type == "Firmware" - ): - extract_files( - download_path, - release_dir, - extract_patterns, - exclude_patterns, - ) - downloaded_versions.append(release_tag) + # No action needed for this release + pass + + # Set permissions on .sh files if needed + set_permissions_on_sh_files(release_dir) # Only update latest_release_file if downloads occurred if downloaded_versions: @@ -305,6 +315,9 @@ def check_and_download( # Clean up old versions cleanup_old_versions(download_dir, release_tags_to_keep) + if not actions_taken: + log_message(f"All {release_type} assets are up to date.") + # Collect new versions available for release in releases_to_download: release_tag = release["tag_name"] @@ -316,11 +329,24 @@ def check_and_download( return downloaded_versions, new_versions_available - def check_extraction_needed(zip_path, extract_dir, patterns): + def set_permissions_on_sh_files(directory): + """ + Sets executable permissions on .sh files if they do not already have them. + """ + for root, _dirs, files in os.walk(directory): + for file in files: + if file.endswith(".sh"): + file_path = os.path.join(root, file) + if not os.access(file_path, os.X_OK): + os.chmod(file_path, 0o755) + log_message(f"Set executable permissions for {file}") + + def check_extraction_needed(zip_path, extract_dir, patterns, exclude_patterns): """ Checks if extraction is needed based on the current extraction patterns. Returns True if any files matching the patterns are not already extracted. """ + files_to_extract = [] with zipfile.ZipFile(zip_path, "r") as zip_ref: for file_info in zip_ref.infolist(): file_name = file_info.filename @@ -328,13 +354,20 @@ def check_extraction_needed(zip_path, extract_dir, patterns): # Skip directories if not base_name: continue + # Check if file matches exclude patterns + if any(exclude in base_name for exclude in exclude_patterns): + continue # Strip version numbers from the file name stripped_base_name = strip_version_numbers(base_name) for pattern in patterns: if pattern in stripped_base_name: - extracted_file_path = os.path.join(extract_dir, base_name) - if not os.path.exists(extracted_file_path): - return True # Extraction needed + files_to_extract.append(base_name) + break # Stop checking patterns for this file + # Now check if any of the files to extract are missing + for base_name in files_to_extract: + extracted_file_path = os.path.join(extract_dir, base_name) + if not os.path.exists(extracted_file_path): + return True # Extraction needed return False # All files already extracted start_time = time.time() @@ -365,7 +398,6 @@ def check_extraction_needed(zip_path, extract_dir, patterns): latest_android_releases = [] if save_firmware and selected_firmware_patterns: - versions_to_download = firmware_versions_to_keep latest_firmware_releases = get_latest_releases( firmware_releases_url, releases_to_scan ) @@ -380,14 +412,12 @@ def check_extraction_needed(zip_path, extract_dir, patterns): ) downloaded_firmwares.extend(fw_downloaded) new_firmware_versions.extend(fw_new_versions) - log_message( - f"Latest Firmware releases: {', '.join(release['tag_name'] for release in latest_firmware_releases[:versions_to_download])}" - ) + if fw_downloaded: + log_message(f"Downloaded Firmware versions: {', '.join(fw_downloaded)}") elif not selected_firmware_patterns: log_message("No firmware assets selected. Skipping firmware download.") if save_apks and selected_apk_patterns: - versions_to_download = android_versions_to_keep latest_android_releases = get_latest_releases( android_releases_url, releases_to_scan ) @@ -402,9 +432,8 @@ def check_extraction_needed(zip_path, extract_dir, patterns): ) downloaded_apks.extend(apk_downloaded) new_apk_versions.extend(apk_new_versions) - log_message( - f"Latest Android APK releases: {', '.join(release['tag_name'] for release in latest_android_releases[:versions_to_download])}" - ) + if apk_downloaded: + log_message(f"Downloaded Android APK versions: {', '.join(apk_downloaded)}") elif not selected_apk_patterns: log_message("No APK assets selected. Skipping APK download.") @@ -448,12 +477,9 @@ def check_extraction_needed(zip_path, extract_dir, patterns): ) else: # No new downloads; everything is up to date - message = ( - f"No new downloads. All Firmware and Android APK versions are up to date.\n" - f"{datetime.now()}" - ) + message = f"All assets are up to date.\n" f"{datetime.now()}" log_message(message) - if not notify_on_download_only: # Added this condition + if not notify_on_download_only: send_ntfy_notification(message, title="Fetchtastic Up to Date") diff --git a/app/menu_firmware.py b/app/menu_firmware.py index 6a72721..e6029bd 100644 --- a/app/menu_firmware.py +++ b/app/menu_firmware.py @@ -1,6 +1,7 @@ # app/menu_firmware.py import re + import requests from pick import pick @@ -32,7 +33,7 @@ def extract_base_name(filename): """ # Regular expression to match version numbers and commit hashes # Matches patterns like '-2.5.13.1a06f88' or '_2.5.13.1a06f88' - base_name = re.sub(r'([_-])\d+\.\d+\.\d+(?:\.[\da-f]+)?', r'\1', filename) + base_name = re.sub(r"([_-])\d+\.\d+\.\d+(?:\.[\da-f]+)?", r"\1", filename) return base_name diff --git a/app/setup_config.py b/app/setup_config.py index 7519955..fab2fc6 100644 --- a/app/setup_config.py +++ b/app/setup_config.py @@ -205,7 +205,7 @@ def run_setup(): print( "Enter the keywords to match for extraction from the firmware zip files, separated by spaces." ) - print("Example: rak4631- tbeam-2 t1000-e- tlora-v2-1-1_6- device-") + print("Example: rak4631- tbeam t1000-e- tlora-v2-1-1_6- device-") if config.get("EXTRACT_PATTERNS"): current_patterns = " ".join(config.get("EXTRACT_PATTERNS", [])) print(f"Current patterns: {current_patterns}") diff --git a/setup.cfg b/setup.cfg index 4aef044..3aae056 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = fetchtastic -version = 0.2.3 +version = 0.2.4 author = Jeremiah K author_email = jeremiahk@gmx.com description = Meshtastic Firmware and APK Downloader