From 0fadbc5181de511e04023388154b90a1ab40386b Mon Sep 17 00:00:00 2001 From: Chih-Yu Chan Date: Tue, 26 Nov 2024 11:52:57 +0800 Subject: [PATCH] feat(satellite): add Sentinel-5P L2 products integration BREAKING CHANGE: add new data fetcher for Sentinel-5P products feat: - Add all level 2 products support (O3, CH4, CO, NO2, HCHO, SO2, Cloud, Aerosol) - Add processing mode options (NRTI, OFFL, RPRO) - Implement ESA data hub API connection improvements: - Add robust error handling - Add logging system - Add progress tracking for downloads --- app_run.py | 206 ++++++++++++++++++ main.py | 106 +++++++++ src/api/auth.py | 36 +-- src/api/downloader.py | 61 ++++-- src/api/sentinel_api.py | 123 ++++------- src/config/__init__.py | 100 +++++++++ src/config/catalog.py | 149 +++++++++++++ src/config/richer.py | 161 ++++++++++++++ src/config/settings.py | 45 +--- src/main.py | 105 --------- src/processing/data_processor.py | 327 +++++++++++++++------------- src/processing/interpolators.py | 130 +++++++---- src/processing/taiwan_frame.py | 2 +- src/utils/logger.py | 26 --- src/visualization/gif_nc.py | 61 ++++++ src/visualization/plot_nc.py | 312 +++++++++++++++++++------- src/visualization/read_shp.py | 23 -- src/visualization/sample_plot_nc.py | 122 ----------- 18 files changed, 1386 insertions(+), 709 deletions(-) create mode 100644 app_run.py create mode 100644 main.py create mode 100644 src/config/__init__.py create mode 100644 src/config/catalog.py create mode 100644 src/config/richer.py delete mode 100644 src/main.py delete mode 100644 src/utils/logger.py create mode 100644 src/visualization/gif_nc.py delete mode 100644 src/visualization/read_shp.py delete mode 100644 src/visualization/sample_plot_nc.py diff --git a/app_run.py b/app_run.py new file mode 100644 index 0000000..0cea8a1 --- /dev/null +++ b/app_run.py @@ -0,0 +1,206 @@ +import logging +import tkinter as tk +from tkinter import messagebox, ttk +from tkcalendar import DateEntry +import threading + +# 導入你的處理模組 +from src.api.sentinel_api import S5PFetcher +from src.config import setup_directory_structure + + +class SatelliteApp: + def __init__(self): + self.root = tk.Tk() + self.root.title("衛星數據處理器") + self.root.geometry("800x900") + + # 設置日誌 + logging.basicConfig(level=logging.INFO) + self.logger = logging.getLogger(__name__) + + self.create_gui() + + def create_gui(self): + # 主框架 + main_frame = ttk.Frame(self.root, padding="10") + main_frame.pack(fill=tk.BOTH, expand=True) + + # ===== 認證區域 ===== + auth_frame = ttk.LabelFrame(main_frame, text="認證", padding="10") + auth_frame.pack(fill=tk.X, pady=(0, 10)) + + ttk.Label(auth_frame, text="帳號:").grid(row=0, column=0, padx=5, pady=5) + self.username = ttk.Entry(auth_frame) + self.username.grid(row=0, column=1, padx=5, pady=5) + + ttk.Label(auth_frame, text="密碼:").grid(row=1, column=0, padx=5, pady=5) + self.password = ttk.Entry(auth_frame, show="*") + self.password.grid(row=1, column=1, padx=5, pady=5) + + # ===== 日期選擇區域 ===== + date_frame = ttk.LabelFrame(main_frame, text="日期範圍", padding="10") + date_frame.pack(fill=tk.X, pady=(0, 10)) + + ttk.Label(date_frame, text="開始日期:").grid(row=0, column=0, padx=5, pady=5) + self.start_date = DateEntry(date_frame, width=12) + self.start_date.grid(row=0, column=1, padx=5, pady=5) + + ttk.Label(date_frame, text="結束日期:").grid(row=1, column=0, padx=5, pady=5) + self.end_date = DateEntry(date_frame, width=12) + self.end_date.grid(row=1, column=1, padx=5, pady=5) + + # ===== 數據模式選擇 ===== + mode_frame = ttk.LabelFrame(main_frame, text="數據模式", padding="10") + mode_frame.pack(fill=tk.X, pady=(0, 10)) + + self.data_mode = tk.StringVar(value="all") + ttk.Radiobutton(mode_frame, text="即時數據", value="realtime", variable=self.data_mode).pack(anchor=tk.W) + ttk.Radiobutton(mode_frame, text="離線數據", value="offline", variable=self.data_mode).pack(anchor=tk.W) + ttk.Radiobutton(mode_frame, text="全部數據", value="all", variable=self.data_mode).pack(anchor=tk.W) + + # ===== 數據類型選擇 ===== + type_frame = ttk.LabelFrame(main_frame, text="數據類型", padding="10") + type_frame.pack(fill=tk.X, pady=(0, 10)) + + self.data_types = { + 'aerosol': 'Aerosol Index', + 'co': 'Carbon Monoxide (CO)', + 'cloud': 'Cloud', + 'hcho': 'Formaldehyde (HCHO)', + 'ch4': 'Methane (CH4)', + 'no2': 'Nitrogen Dioxide (NO2)', + 'o3': 'Ozone (O3)', + 'so2': 'Sulfur Dioxide (SO2)' + } + + self.selected_types = {} + for key, value in self.data_types.items(): + var = tk.BooleanVar() + self.selected_types[key] = var + ttk.Checkbutton(type_frame, text=value, variable=var).pack(anchor=tk.W) + + # ===== 執行按鈕 ===== + btn_frame = ttk.Frame(main_frame) + btn_frame.pack(fill=tk.X, pady=10) + + self.status_label = ttk.Label(btn_frame, text="就緒") + self.status_label.pack(side=tk.LEFT) + + self.run_button = ttk.Button(btn_frame, text="開始處理", command=self.start_processing) + self.run_button.pack(side=tk.RIGHT) + + # ===== 日誌區域 ===== + log_frame = ttk.LabelFrame(main_frame, text="處理日誌", padding="10") + log_frame.pack(fill=tk.BOTH, expand=True) + + self.log_text = tk.Text(log_frame, height=15, width=70) + self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + + scrollbar = ttk.Scrollbar(log_frame, orient="vertical", command=self.log_text.yview) + scrollbar.pack(side=tk.RIGHT, fill=tk.Y) + self.log_text.configure(yscrollcommand=scrollbar.set) + + def log_message(self, message): + """添加日誌消息""" + self.log_text.insert(tk.END, f"{message}\n") + self.log_text.see(tk.END) + self.root.update() + + def start_processing(self): + """開始處理數據""" + # 驗證輸入 + if not self.username.get() or not self.password.get(): + messagebox.showerror("錯誤", "請輸入帳號和密碼") + return + + selected_data = [key for key, var in self.selected_types.items() if var.get()] + if not selected_data: + messagebox.showerror("錯誤", "請至少選擇一種數據類型") + return + + # 禁用按鈕 + self.run_button.configure(state='disabled') + self.status_label.configure(text="處理中...") + + # 在新線程中執行處理 + thread = threading.Thread(target=self.process_data, args=(selected_data,)) + thread.daemon = True + thread.start() + + def process_data(self, selected_data): + """處理數據的主要邏輯""" + try: + start_str = self.start_date.get_date().strftime('%Y-%m-%d') + end_str = self.end_date.get_date().strftime('%Y-%m-%d') + data_mode = self.data_mode.get() + + self.log_message(f"開始處理數據:{start_str} 到 {end_str}") + setup_directory_structure(start_str, end_str) + + fetcher = S5PFetcher(max_workers=3) + + for data_type in selected_data: + self.log_message(f"處理 {self.data_types[data_type]}...") + + try: + fetch_method = getattr(fetcher, f'fetch_{data_type}_data') + products = fetch_method( + start_date=start_str, + end_date=end_str, + boundary=(118, 124, 20, 27), + limit=None, + data_mode=data_mode + ) + + if products: + self.log_message(f"找到 {len(products)} 個數據文件") + self.log_message("開始下載數據...") + fetcher.parallel_download(products) + self.log_message("數據下載完成") + + self.log_message("開始處理數據...") + processor_class = globals()[f"{data_type.upper()}Processor"] + processor = processor_class( + interpolation_method='kdtree', + resolution=0.02, + mask_value=0.5 + ) + processor.process_each_data( + start_str, + end_str, + use_taiwan_mask=False, + file_class=data_mode + ) + self.log_message("數據處理完成") + else: + self.log_message(f"找不到符合條件的 {self.data_types[data_type]} 數據") + + except Exception as e: + self.log_message(f"處理 {data_type} 時發生錯誤: {str(e)}") + continue + + self.root.after(0, lambda: messagebox.showinfo("完成", "所有數據處理完成!")) + + except Exception as e: + self.log_message(f"錯誤: {str(e)}") + self.root.after(0, lambda: messagebox.showerror("錯誤", str(e))) + + finally: + self.root.after(0, lambda: self.status_label.configure(text="就緒")) + self.root.after(0, lambda: self.run_button.configure(state='normal')) + + def run(self): + """運行應用程式""" + self.root.protocol("WM_DELETE_WINDOW", self.on_closing) + self.root.mainloop() + + def on_closing(self): + """關閉應用程式""" + if messagebox.askokcancel("確認", "確定要關閉程式嗎?"): + self.root.destroy() + + +if __name__ == "__main__": + app = SatelliteApp() + app.run() \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..47d6e1d --- /dev/null +++ b/main.py @@ -0,0 +1,106 @@ +"""主程式""" +import logging +from datetime import datetime + +from src.api.sentinel_api import S5PFetcher +from src.processing.data_processor import S5Processor + +from src.config.richer import rich_print +from src.config.catalog import ClassInput, TypeInput, PRODUCT_CONFIGS +from src.config import setup_directory_structure, FILTER_BOUNDARY + + +logger = logging.getLogger(__name__) + + +def fetch_data(file_class: ClassInput, + file_type: TypeInput, + start_date: str | datetime, + end_date: str | datetime): + """下載數據的工作流程""" + try: + rich_print( + f"正在獲取 sentinel-5p 衛星數據 ({PRODUCT_CONFIGS[file_type].display_name}) from {start_date} to {end_date} ...") + + fetcher = S5PFetcher(max_workers=3) + + products = fetcher.fetch_data( + file_class=file_class, + file_type=file_type, + start_date=start_date, + end_date=end_date, + boundary=FILTER_BOUNDARY, + limit=None + ) + + if products: + if rich_print("是否要下載數據?", confirm=True): + rich_print( + f"開始下載 sentinel-5p 衛星數據 ({PRODUCT_CONFIGS[file_type].display_name}) from {start_date} to {end_date} ...") + fetcher.parallel_download(products) + rich_print("數據下載完成!") + else: + rich_print("已取消下載操作") + else: + rich_print("找不到符合條件的數據") + + except Exception as e: + error_message = f"下載數據失敗: {str(e)}" + rich_print(error_message) + logger.error(error_message) + + +def process_data(file_class: ClassInput, + file_type: TypeInput, + start_date: str | datetime, + end_date: str | datetime): + """處理數據的工作流程""" + try: + if rich_print("是否要處理數據?", confirm=True): + rich_print( + f"正在處理 sentinel-5p 衛星數據 ({PRODUCT_CONFIGS[file_type].display_name}) from {start_date} to {end_date} ...") + + processor = S5Processor( + interpolation_method='kdtree', + resolution=0.02, + mask_qc_value=0.5 + ) + + processor.process_each_data( + file_class=file_class, + file_type=file_type, + start_date=start_date, + end_date=end_date, + ) + + rich_print("數據完成處理") + else: + rich_print("已取消處理操作") + + except Exception as e: + error_message = f"處理數據失敗: {str(e)}" + rich_print(error_message) + logger.error(error_message) + + +def main(): + # 設定參數 + start, end = '2024-03-01', '2024-03-02' + file_class: ClassInput = 'OFFL' + file_type: TypeInput = 'NO2___' + + # 設定輸入輸出配置 + setup_directory_structure(file_type=file_type, start_date=start, end_date=end) + + # 下載數據 + # fetch_data(file_class=file_class, file_type=file_type, start_date=start, end_date=end) + + # 處理與繪製數據 + process_data(file_class=file_class, file_type=file_type, start_date=start, end_date=end) + + # 動畫 + # animate_data(file_type=file_type, start_date=start, end_date=end) + + +if __name__ == "__main__": + main() diff --git a/src/api/auth.py b/src/api/auth.py index bbc63bc..ee10aad 100644 --- a/src/api/auth.py +++ b/src/api/auth.py @@ -23,24 +23,28 @@ def __init__(self): def get_token(self): """獲取新的 access token""" - data = { - 'grant_type': 'password', - 'username': self.username, - 'password': self.password, - 'client_id': 'cdse-public' - } - - response = requests.post(COPERNICUS_TOKEN_URL, data=data, timeout=30) - response.raise_for_status() - - token_data = response.json() - self.token = token_data['access_token'] - self.token_expiry = datetime.now() + timedelta(seconds=token_data['expires_in'] - 60) - # logger.info("Access token updated") - return self.token + try: + data = { + 'grant_type': 'password', + 'username': self.username, + 'password': self.password, + 'client_id': 'cdse-public' + } + + response = requests.post(COPERNICUS_TOKEN_URL, data=data, timeout=30) + response.raise_for_status() + + token_data = response.json() + self.token = token_data['access_token'] + self.token_expiry = datetime.now() + timedelta(seconds=token_data['expires_in'] - 60) + # logger.info("Access token updated") + return self.token + except requests.exceptions.RequestException as e: + logger.error(f"Error getting token: {e}") + raise def ensure_valid_token(self): """確保 token 有效""" if not self.token or not self.token_expiry or datetime.now() >= self.token_expiry: return self.get_token() - return self.token \ No newline at end of file + return self.token diff --git a/src/api/downloader.py b/src/api/downloader.py index 6cb827f..ce95409 100644 --- a/src/api/downloader.py +++ b/src/api/downloader.py @@ -2,6 +2,8 @@ import os import requests import logging +import zipfile + from pathlib import Path from tqdm import tqdm @@ -19,8 +21,8 @@ def __init__(self): self.session.trust_env = False self._retries = requests.adapters.Retry( total=5, - backoff_factor=0.1, - status_forcelist=[500, 502, 503, 504], + backoff_factor=10, + status_forcelist=[429, 500, 502, 503, 504], ) self.session.mount('https://', requests.adapters.HTTPAdapter(max_retries=self._retries)) @@ -45,31 +47,50 @@ def download_file(self, url, headers, output_path, progress_callback=None): # 建立臨時檔案 temp_path = output_path.with_suffix('.tmp') + zip_path = output_path.with_suffix('.zip') try: - with open(temp_path, 'wb') as file: - for data in response.iter_content(block_size): - file.write(data) - downloaded += len(data) - if progress_callback: - progress_callback(min(downloaded, total_size)) - - # 下載完成後,將臨時檔案重命名為正式檔案 - if temp_path.exists(): - if output_path.exists(): - output_path.unlink() - temp_path.rename(output_path) + if output_path.exists() and zipfile.is_zipfile(output_path): + output_path.rename(zip_path) + + elif zip_path.exists() and not zipfile.is_zipfile(zip_path): + zip_path.rename(output_path) + return True + + elif zip_path.exists() and zipfile.is_zipfile(zip_path): + pass + + else: + # 下載到臨時檔案 + with open(temp_path, 'wb') as file: + for data in response.iter_content(block_size): + file.write(data) + downloaded += len(data) + if progress_callback: + progress_callback(min(downloaded, total_size)) + + # 移動臨時檔案到 zip + temp_path.rename(zip_path) + + # 解壓縮處理 + if zipfile.is_zipfile(zip_path): + with zipfile.ZipFile(zip_path, 'r') as zip_ref: + nc_files = [f for f in zip_ref.namelist() if f.endswith('.nc')] + if nc_files: + with zip_ref.open(nc_files[0]) as source, open(output_path, 'wb') as target: + target.write(source.read()) + + elif not zipfile.is_zipfile(zip_path): + zip_path.rename(output_path) + return True finally: - # 清理臨時檔案 if temp_path.exists(): temp_path.unlink() - - except requests.exceptions.RequestException as e: - logger.error(f"Download error: {str(e)}") - return False + if zip_path.exists(): + zip_path.unlink() except Exception as e: - logger.error(f"Unexpected error during download: {str(e)}") + logger.error(f"Download error: {str(e)}") return False diff --git a/src/api/sentinel_api.py b/src/api/sentinel_api.py index 2920d07..bb22e23 100644 --- a/src/api/sentinel_api.py +++ b/src/api/sentinel_api.py @@ -2,11 +2,11 @@ import logging import time import requests +import zipfile import threading import multiprocessing from datetime import datetime from pathlib import Path -from typing import List, Dict, Optional, Tuple from rich.progress import ( Progress, @@ -16,10 +16,6 @@ BarColumn, TimeRemainingColumn, ) -from rich.console import Console -from rich.panel import Panel -from rich.table import Table -from rich.align import Align from src.api.auth import CopernicusAuth from src.api.downloader import Downloader @@ -29,8 +25,10 @@ RAW_DATA_DIR, DEFAULT_TIMEOUT ) +from src.config.richer import console, rich_print, DisplayManager +from src.config.catalog import ClassInput, TypeInput, PRODUCT_CONFIGS + -console = Console(force_terminal=True, color_system="auto", width=130) # 使用您想要的寬度 logger = logging.getLogger(__name__) @@ -50,7 +48,7 @@ def render(self, task): return f"{completed:5.1f} / {total:5.1f} MB" -class Sentinel5PDataFetcher: +class S5PFetcher: def __init__(self, max_workers: int = 5): self.auth = CopernicusAuth() self.downloader = Downloader() @@ -62,24 +60,31 @@ def __init__(self, max_workers: int = 5): 'failed': 0, 'skipped': 0, 'total_size': 0, + 'actual_download_size': 0, } - def fetch_no2_data(self, start_date: str, end_date: str, - bbox: Optional[Tuple[float, float, float, float]] = None, - limit: Optional[int] = None) -> List[Dict]: + def fetch_data(self, + file_class: ClassInput, + file_type: TypeInput, + start_date: str, + end_date: str, + boundary: tuple[float, float, float, float] | None = None, + limit: int | None = None, + ) -> list[dict]: """ - 擷取 NO2 數據 + 擷取數據 Args: + file_class (ProductClassInput): 資料類型 + file_type (ProductTypeInput): 資料種類 start_date: 開始日期 (YYYY-MM-DD) end_date: 結束日期 (YYYY-MM-DD) - bbox: 邊界框座標 (min_lon, min_lat, max_lon, max_lat) + boundary: 邊界框座標 (min_lon, min_lat, max_lon, max_lat) limit: 最大結果數量 Returns: - List[Dict]: 產品資訊列表 + list[dict]: 產品資訊列表 """ - # logger.info(f"Fetching NO2 data from {start_date} to {end_date}") try: # 取得認證 token @@ -91,17 +96,22 @@ def fetch_no2_data(self, start_date: str, end_date: str, 'Content-Type': 'application/json' } + file_class = '' if file_class == '*' else file_class + file_type = '' if file_type == '*' else file_type + self.file_type = file_type + # 構建基本篩選條件 base_filter = ( f"Collection/Name eq 'SENTINEL-5P' " - f"and contains(Name,'NO2') " - f"and ContentDate/Start gt {start_date}T00:00:00.000Z " - f"and ContentDate/Start lt {end_date}T23:59:59.999Z" + f"and contains(Name,'{file_class}') " + f"and contains(Name,'{file_type}') " + f"and ContentDate/Start gt '{start_date}T00:00:00.000Z' " + f"and ContentDate/Start lt '{end_date}T23:59:59.999Z' " ) # 如果提供了邊界框,加入空間篩選 - if bbox: - min_lon, min_lat, max_lon, max_lat = bbox + if boundary: + min_lon, max_lon, min_lat, max_lat = boundary spatial_filter = ( f" and OData.CSC.Intersects(area=geography'SRID=4326;POLYGON((" f"{min_lon} {min_lat}, " @@ -141,13 +151,12 @@ def fetch_no2_data(self, start_date: str, end_date: str, while True: try: response = requests.get( - f"{self.base_url}/Products", + url=f"{self.base_url}/Products", headers=headers, params=query_params, timeout=DEFAULT_TIMEOUT ) response.raise_for_status() - products = response.json().get('value', []) if not products: break @@ -173,20 +182,7 @@ def fetch_no2_data(self, start_date: str, end_date: str, # 顯示產品詳細資訊 if all_products: - table = Table(title=f"Found {len(all_products)} Products") - table.add_column("No.", justify="right", style="cyan") - table.add_column("Time", style="magenta") - table.add_column("Name", style="blue") - table.add_column("Size", justify="right", style="green") - - for i, product in enumerate(all_products, 1): - time_str = product.get('ContentDate', {}).get('Start', 'N/A')[:19] - name = product.get('Name', 'N/A') - size = product.get('ContentLength', 0) - size_str = f"{size / 1024 / 1024:.2f} MB" - table.add_row(str(i), time_str, name, size_str) - - console.print(table) + DisplayManager().display_products(all_products) return all_products @@ -194,7 +190,6 @@ def fetch_no2_data(self, start_date: str, end_date: str, logger.error(f"Error in fetch_no2_data: {str(e)}") raise - # TODO: main_task count wrong def parallel_download(self, products: list): """並行下載多個產品""" if not products: @@ -208,7 +203,6 @@ def parallel_download(self, products: list): task_queue.put(product) # 創建進度追蹤器 - import multiprocessing completed_files = multiprocessing.Value('i', 0) active_threads = multiprocessing.Value('i', 0) @@ -218,6 +212,7 @@ def parallel_download(self, products: list): 'failed': 0, 'skipped': 0, 'total_size': sum(p.get('ContentLength', 0) for p in products), + 'actual_download_size': 0, 'start_time': time.time() }) @@ -244,7 +239,7 @@ def download_files(progress, task_id, thread_index, completed_files, task_queue) with progress_lock: progress.update( task_id, - description=f"[cyan]Thread {thread_index + 1}: {file_name}", + description=f"[cyan]Thread {thread_index + 1}: {file_name[:28]}...{file_name[-9:]}", total=file_size, completed=0, visible=True, @@ -258,13 +253,15 @@ def download_files(progress, task_id, thread_index, completed_files, task_queue) token = self.auth.ensure_valid_token() headers = {'Authorization': f'Bearer {token}'} + product_type = self.file_type start_time = product.get('ContentDate', {}).get('Start') date_obj = datetime.strptime(start_time, '%Y-%m-%dT%H:%M:%S.%fZ') - output_dir = Path(RAW_DATA_DIR) / date_obj.strftime('%Y') / date_obj.strftime('%m') + output_dir = Path(RAW_DATA_DIR) / product_type / date_obj.strftime('%Y') / date_obj.strftime('%m') + output_path = output_dir / file_name # 檢查檔案是否已存在 - if output_path.exists() and output_path.stat().st_size == file_size: + if output_path.exists() and not zipfile.is_zipfile(output_path): with progress_lock: progress.update(task_id, completed=file_size) with stats_lock: @@ -313,12 +310,15 @@ def update_progress(downloaded_bytes): with stats_lock: if download_success: self.download_stats['success'] += 1 + else: self.download_stats['failed'] += 1 if output_path.exists(): output_path.unlink() success = True + with stats_lock: + self.download_stats['actual_download_size'] += file_size with completed_files.get_lock(): completed_files.value += 1 @@ -376,6 +376,7 @@ def update_progress(downloaded_bytes): thread.daemon = True threads.append(thread) thread.start() + time.sleep(1) # 監控進度 while True: @@ -397,44 +398,4 @@ def update_progress(downloaded_bytes): # progress.update(main_task, completed=len(products), refresh=True) # 顯示下載統計 - self._display_download_summary() - - def _display_download_summary(self): - """顯示下載統計摘要""" - elapsed_time = time.time() - self.download_stats['start_time'] - total_files = ( - self.download_stats['success'] + - self.download_stats['failed'] + - self.download_stats['skipped'] - ) - - table = Table(title="Download Summary", width=60, padding=(0, 2), expand=False) - table.add_column("Metric", style="cyan") - table.add_column("Value", justify="right", style="green") - - table.add_row("Total Files", str(total_files)) - table.add_row("Successfully Downloaded", str(self.download_stats['success'])) - table.add_row("Failed Downloads", str(self.download_stats['failed'])) - table.add_row("Skipped Files", str(self.download_stats['skipped'])) - - total_size = self.download_stats['total_size'] - size_str = f"{total_size / 1024 / 1024:.2f} MB" - table.add_row("Total Size", size_str) - table.add_row("Total Time", f"{elapsed_time:.2f}s") - - if elapsed_time > 0: - avg_speed = total_size / elapsed_time - speed_str = f"{avg_speed / 1024 / 1024:.2f} MB/s" - table.add_row("Average Speed", speed_str) - - # 使用 Align 將 table 置中 - centered_table = Align.center(table) - - console.print("\n", Panel( - centered_table, - title="Download Results", - width=130, - expand=True, - border_style="bright_blue", - padding=(1, 0) - )) + DisplayManager().display_download_summary(self.download_stats) diff --git a/src/config/__init__.py b/src/config/__init__.py new file mode 100644 index 0000000..a4b0241 --- /dev/null +++ b/src/config/__init__.py @@ -0,0 +1,100 @@ +import logging +from pathlib import Path +from datetime import datetime + +from src.config.settings import RAW_DATA_DIR, PROCESSED_DATA_DIR, FIGURE_DIR, LOGS_DIR, FILTER_BOUNDARY +from src.config.catalog import TypeInput + + +__all__ = ['setup_directory_structure', 'FILTER_BOUNDARY'] + + +def setup_logging(): + """設置日誌配置""" + # 確保日誌目錄存在 + log_dir = Path(LOGS_DIR) + log_dir.mkdir(parents=True, exist_ok=True) + + # 創建日誌檔案路徑 + log_file = log_dir / f"Satellite_S5P_{datetime.now().strftime('%Y%m')}.log" + + # 配置基本設定 + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name).10s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + handlers=[ + logging.FileHandler(log_file, encoding='utf-8'), + logging.StreamHandler() # 同時輸出到控制台 + ] + ) + + +def setup_directory_structure(file_type: TypeInput, + start_date: str | datetime, + end_date: str | datetime): + """確保所有必要的目錄存在""" + directories = [RAW_DATA_DIR, PROCESSED_DATA_DIR, FIGURE_DIR, LOGS_DIR] + for directory in directories: + try: + directory.mkdir(parents=True, exist_ok=True) + except Exception as e: + raise + + setup_logging() + + """依照開始和結束時間設定資料夾結構""" + start = datetime.strptime(start_date, '%Y-%m-%d') + end = datetime.strptime(end_date, '%Y-%m-%d') + + # 遍歷範圍內的所有月份 + current_date = start + while current_date <= end: + product_type = file_type + year = current_date.strftime('%Y') + month = current_date.strftime('%m') + + # 構建每個月份的 figure、processed 和 raw 路徑 + for base_dir in [FIGURE_DIR, PROCESSED_DATA_DIR, RAW_DATA_DIR]: + month_dir = base_dir / product_type / year / month + month_dir.mkdir(parents=True, exist_ok=True) + + # 移動到下個月 + if month == "12": + current_date = current_date.replace(year=current_date.year + 1, month=1, day=1) + else: + current_date = current_date.replace(month=current_date.month + 1, day=1) + + +""" I/O structure +Main Folder (Sentinel_data) +├── figure +│ ├── NO2 +│ │ ├── 2023 +│ │ │ ├── 01 +│ │ │ └── ... +│ │ └── 2024 +│ │ ├── 01 +│ │ └── ... +│ └── ... +├── logs +│ └── Satellite_S5P_202411.log +├── processed +│ ├── NO2 +│ │ ├── 2023 +│ │ │ ├── 01 +│ │ │ └── ... +│ │ └── 2024 +│ │ ├── 01 +│ │ └── ... +│ └── ... +└── raw + ├── NO2 + │ ├── 2023 + │ │ ├── 01 + │ │ └── ... + │ └── 2024 + │ ├── 01 + │ └── ... + └── ... +""" \ No newline at end of file diff --git a/src/config/catalog.py b/src/config/catalog.py new file mode 100644 index 0000000..d54ca1d --- /dev/null +++ b/src/config/catalog.py @@ -0,0 +1,149 @@ +from dataclasses import dataclass +from typing import Literal, TypeAlias +from enum import Enum + + +ClassInput = Literal['NRTI', 'OFFL', 'RPRO'] +TypeInput = Literal['O3____', 'O3_TCL', 'O3__PR', 'CH4___', 'CO____', 'NO2___', 'HCHO__', 'SO2___', 'CLOUD_', 'FRESCO', 'AER_LH', 'AER_AI'] + + +@dataclass +class ProductConfig: + """產品配置""" + display_name: str # 顯示名稱(帶下標) + dataset_name: str # 數據集名稱 + vmin: float | None # 最小值 + vmax: float | None # 最大值 + units: str # 單位 + title: str # 標題 + cmap: str = 'viridis' # 預設色階 + + +class ProductType(str, Enum): + """Available product types""" + # Ozone products + O3____ = 'O3____' + O3_TCL = 'O3_TCL' # No data ?? + O3__PR = 'O3__PR' # start from 2022 have total_vertical_columns and tropospheric_column and ozone_profile + + # Other gas products + CH4___ = 'CH4___' + CO____ = 'CO____' + NO2___ = 'NO2___' + HCHO__ = 'HCHO__' + SO2___ = 'SO2___' + + # Cloud and aerosol products + CLOUD_ = 'CLOUD_' + FRESCO = 'FRESCO' + AER_LH = 'AER_LH' + AER_AI = 'AER_AI' + + +class ProductLevel(str, Enum): + Level0 = 'L0__' + Level1B = 'L1B_' + Level2 = 'L2__' + + +class ProductClass(str, Enum): + """可用的處理類型 + + Available classes: + Main processing types: + - NRTI : Near-real time processing (最快但精確度較低) + - OFFL : Offline processing (標準處理) + - RPRO : Reprocessing (重新處理的歷史數據) + + Testing types: + - TEST : Internal testing + - OGCA : On-ground calibration + - GSOV : Ground segment validation + - OPER : Operational processing + """ + # Main processing types + NRTI = 'NRTI' + OFFL = 'OFFL' + RPRO = 'RPRO' + + # Testing types + TEST = 'TEST' + OGCA = 'OGCA' + GSOV = 'GSOV' + OPER = 'OPER' + + +PRODUCT_CONFIGS: dict[str, ProductConfig] = { + 'O3____': ProductConfig( + display_name='O\u2083', + dataset_name='ozone_total_vertical_column', + vmin=0.1, vmax=0.2, + units=f'O$_3$ Total Vertical Column (mol/m$^2$)', + title=f'O$_3$ Total Vertical Column', + cmap='RdBu_r' + ), + 'O3_TCL': ProductConfig( + display_name='O\u2083_TCL', + dataset_name='ozone_tropospheric_column', + vmin=0.1, vmax=0.2, + units=f'O$_3$ Tropospheric Column (mol/m$^2$)', + title=f'O$_3$ Tropospheric Column', + cmap='RdBu_r' + ), + 'O3__PR': ProductConfig( + display_name='O\u2083_PR', + dataset_name='ozone_tropospheric_column', + vmin=0.01, vmax=0.04, + units=f'O$_3$ Tropospheric Column (mol/m$^2$)', + title=f'O$_3$ Tropospheric Column', + cmap='jet' + ), + 'CH4___': ProductConfig( + display_name='CH\u2084', + dataset_name='methane_mixing_ratio', + vmin=None, vmax=None, + units=f'CH$_4$ Methane Mixing Ratio', + title=f'CH$_4$ Methane Mixing Ratio', + cmap='RdBu_r' + ), + 'CO____': ProductConfig( + display_name='CO', + dataset_name='carbonmonoxide_total_column', + vmin=None, vmax=None, + units=f'CO Total Column (mol/m$^2$)', + title=f'CO Total Column', + cmap='RdBu_r' + ), + 'NO2___': ProductConfig( + display_name='NO\u2082', + dataset_name='nitrogendioxide_tropospheric_column', + vmin=-2e-4, vmax=2e-4, + units=f'NO$_2$ Tropospheric Column (mol/m$^2$)', + title=f'NO$_2$ Tropospheric Column', + cmap='RdBu_r' + ), + 'HCHO__': ProductConfig( + display_name='HCHO', + dataset_name='formaldehyde_tropospheric_vertical_column', + vmin=-4e-4, vmax=4e-4, + units=f'HCHO Tropospheric Vertical Column (mol/m$^2$)', + title=f'HCHO Tropospheric Vertical Column', + cmap='RdBu_r' + ), + 'SO2___': ProductConfig( + display_name='SO\u2082', + dataset_name='sulfurdioxide_total_vertical_column', + vmin=-4e-3, vmax=4e-3, + units=f'SO$_2$ Total Vertical Column (mol/m$^2$)', + title=f'SO$_2$ Total Vertical Column', + cmap='RdBu_r' + ), + 'AER_AI': ProductConfig( + display_name='Aerosol Index', + dataset_name='aerosol_index_340_380', + vmin=None, vmax=None, + units=f'Aerosol Index', + title=f'Aerosol Index', + cmap='viridis' + ) +} diff --git a/src/config/richer.py b/src/config/richer.py new file mode 100644 index 0000000..c73258e --- /dev/null +++ b/src/config/richer.py @@ -0,0 +1,161 @@ +import time +from rich.console import Console +from rich.prompt import Confirm +from rich.panel import Panel +from rich.table import Table +from rich.align import Align + + +# 定義常數 +PANEL_WIDTH = 100 + +console = Console(force_terminal=True, color_system="auto", width=PANEL_WIDTH) # 使用您想要的寬度 + + +def rich_print(message: str | Table, + width: int = PANEL_WIDTH, + confirm: bool = False, + title: str = None) -> bool | None: + """統一的訊息顯示函數 + + Parameters + ---------- + message : str | Table + 要顯示的訊息或表格 + width : int + 面板寬度 + confirm : bool + 是否需要確認 + title : str + 面板標題,可選 + """ + if confirm: + return Confirm.ask( + f"[bold cyan]{message}[/bold cyan]", + default=True, + show_default=True + ) + + # 如果輸入是表格,則使用不同的格式化方式 + if isinstance(message, Table): + content = Align.center(message) + else: + content = Align.center(f"[bold cyan]{message}[/bold cyan]") + + console.print(Panel( + content, + title=title, + width=width, + expand=True, + border_style="bright_blue", + padding=(0, 0) + )) + + +class DisplayManager: + def __init__(self): + self.console = console + self.panel_width = PANEL_WIDTH + self.panel_style = "bright_blue" + self.panel_padding = (1, 0) + + def create_centered_panel(self, content, title, subtitle=None): + """創建置中的面板""" + centered_content = Align.center(content) + return Panel( + centered_content, + title=title, + width=self.panel_width, + expand=True, + border_style=self.panel_style, + padding=self.panel_padding, + subtitle=subtitle + ) + + def display_products(self, products): + """顯示產品資訊表格""" + table = Table(title="Product Information") + + # 設定欄位 + columns = [ + ("No.", "right", "cyan"), + ("Time", "left", "magenta"), + ("Name", "left", "blue"), + ("Size", "right", "green") + ] + + for file_name, justify, style in columns: + table.add_column(file_name, justify=justify, style=style) + + # 添加資料行 + for i, product in enumerate(products, 1): + time_str = product.get('ContentDate', {}).get('Start', 'N/A')[:19] + file_name = product.get('Name', 'N/A') + size = product.get('ContentLength', 0) + size_str = f"{size / 1024 / 1024:.2f} MB" + + # 處理過長的名稱 + name_short_cut = f"{file_name[:35]}...{file_name[-15:]}" if len(file_name) > 53 else file_name + + table.add_row(str(i), time_str, name_short_cut, size_str) + + # 顯示面板 + panel = self.create_centered_panel(table, f"Found {len(products)} Products") + self.console.print(panel) + + def display_download_summary(self, stats): + """顯示下載統計摘要""" + table = Table(title="Download Summary", width=60, padding=(0, 1), expand=False) + table.add_column("Metric", style="cyan") + table.add_column("Value", justify="right", style="green") + + # 計算基本統計 + total_files = sum(stats[key] for key in ['success', 'failed', 'skipped']) + elapsed_time = time.time() - stats['start_time'] + + # 準備顯示資料 + metrics = [ + ("Total Files", str(total_files)), + ("Successfully Downloaded", str(stats['success'])), + ("Failed Downloads", str(stats['failed'])), + ("Skipped Files", str(stats['skipped'])), + ("Total Size", f"{stats['total_size'] / 1024 / 1024:.2f} MB"), + ("Actual Download Size", f"{stats['actual_download_size'] / 1024 / 1024:.2f} MB"), + ("Spend Time", f"{elapsed_time:.2f}s") + ] + + # 如果有經過時間,添加速度資訊 + if elapsed_time > 0: + avg_speed = stats['actual_download_size'] / elapsed_time + metrics.append(("Average Speed", f"{avg_speed / 1024 / 1024:.2f} MB/s")) + + # 添加所有指標到表格 + for metric, value in metrics: + table.add_row(metric, value) + + # 顯示面板 + panel = self.create_centered_panel(table, "Download Results") + self.console.print(panel) + + def display_product_info(self, nc_info): + """顯示下載統計摘要""" + table = Table(title="Information", width=40, padding=(0, 1), expand=False) + table.add_column("Metric", style="cyan") + table.add_column("Value", justify="right", style="green") + + # 準備顯示資料 + metrics = [ + ("Data Time", str(nc_info['time'])), + ("Data Shape", str(nc_info['shape'])), + ("Latitude", str(nc_info['latitude'])), + ("Longitude", str(nc_info['longitude'])), + ] + + # 添加所有指標到表格 + for metric, value in metrics: + table.add_row(metric, value) + + # 顯示面板 + file_name_short_cut = f"{nc_info['file_name'][:35]}...{nc_info['file_name'][-15:]}" + panel = self.create_centered_panel(table, f"Processing: {file_name_short_cut}", "繪製插值後的數據圖...") + self.console.print("\n", panel) diff --git a/src/config/settings.py b/src/config/settings.py index 0fb2ba1..076b8b7 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -1,9 +1,5 @@ """API 設定和常數""" from pathlib import Path -from datetime import datetime -import logging - -logger = logging.getLogger(__name__) # API URLs COPERNICUS_TOKEN_URL = "https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/token" @@ -22,46 +18,11 @@ DOWNLOAD_TIMEOUT = 180 # 存儲路徑 -BASE_DIR = Path("/Users/chanchihyu/Sentinel_data") +BASE_DIR = Path("/Users/chanchihyu/Sentinel-5P") RAW_DATA_DIR = BASE_DIR / "raw" PROCESSED_DATA_DIR = BASE_DIR / "processed" FIGURE_DIR = BASE_DIR / "figure" LOGS_DIR = BASE_DIR / "logs" - -def setup_directory_structure(start_date: str, end_date: str): - """依照開始和結束時間設定資料夾結構""" - start = datetime.strptime(start_date, '%Y-%m-%d') - end = datetime.strptime(end_date, '%Y-%m-%d') - - # 遍歷範圍內的所有月份 - current_date = start - while current_date <= end: - year = current_date.strftime('%Y') - month = current_date.strftime('%m') - - # 構建每個月份的 figure、processed 和 raw 路徑 - for base_dir in [FIGURE_DIR, PROCESSED_DATA_DIR, RAW_DATA_DIR]: - month_dir = base_dir / year / month - month_dir.mkdir(parents=True, exist_ok=True) - - # 移動到下個月 - if month == "12": - current_date = current_date.replace(year=current_date.year + 1, month=1, day=1) - else: - current_date = current_date.replace(month=current_date.month + 1, day=1) - - -def ensure_directories(): - """確保所有必要的目錄存在""" - directories = [RAW_DATA_DIR, PROCESSED_DATA_DIR, FIGURE_DIR, LOGS_DIR] - for directory in directories: - try: - directory.mkdir(parents=True, exist_ok=True) - logger.info(f"確保目錄存在: {directory}") - except Exception as e: - logger.error(f"創建目錄失敗 {directory}: {str(e)}") - raise - - -ensure_directories() +FILTER_BOUNDARY = (120, 122, 22, 25.5) # (118, 124, 20, 27) +FIGURE_BOUNDARY = (119, 123, 21, 26) # (100, 145, 0, 45) diff --git a/src/main.py b/src/main.py deleted file mode 100644 index 7ae0c06..0000000 --- a/src/main.py +++ /dev/null @@ -1,105 +0,0 @@ -"""主程式""" -import logging -from pathlib import Path -from datetime import datetime - -from api.sentinel_api import Sentinel5PDataFetcher -from processing.data_processor import NO2Processor -from utils.logger import setup_logging -from config.settings import setup_directory_structure -from rich.console import Console -from rich.panel import Panel -from rich.prompt import Confirm -from rich.align import Align - -# 定義常數 -PANEL_WIDTH = 130 - -console = Console(force_terminal=True, color_system="auto", width=130) # 使用您想要的寬度 -logger = logging.getLogger(__name__) - - -def rich_print(message: str, width: int = PANEL_WIDTH, confirm: bool = False) -> bool | None: - """統一的訊息顯示函數""" - if confirm: - return Confirm.ask( - f"[bold cyan]{message}[/bold cyan]", # 在訊息中加入樣式 - default=True, - show_default=True - ) - - console.print(Panel( - Align.center(f"[bold cyan]{message}[/bold cyan]"), - width=width, - expand=True, - border_style="bright_blue", - padding=(0, 1) - )) - - -def fetch_data(start_date: str, end_date: str): - """下載數據的工作流程""" - setup_logging() - - try: - fetcher = Sentinel5PDataFetcher(max_workers=3) - - rich_print(f"正在獲取 sentinel-5p 衛星數據 (NO\u2082) from {start_date} to {end_date} ...") - products = fetcher.fetch_no2_data( - start_date=start_date, - end_date=end_date, - bbox=(118, 20, 124, 27), - limit=None - ) - - if products: - if rich_print("是否要下載數據?", confirm=True): - rich_print(f"開始下載 sentinel-5p 衛星數據 (NO\u2082) from {start_date} to {end_date} ...") - fetcher.parallel_download(products) - rich_print("數據下載完成!") - else: - rich_print("已取消下載操作") - else: - rich_print("找不到符合條件的數據") - - except Exception as e: - error_message = f"下載數據失敗: {str(e)}" - rich_print(error_message) - logger.error(error_message) - - -def process_data(start_date: str, end_date: str): - """處理數據的工作流程""" - setup_logging() - - try: - processor = NO2Processor( - interpolation_method='griddata', - resolution=0.02, - mask_value=0.5 - ) - - # 改用 rich style 的輸入提示 - if rich_print("是否要處理數據?", confirm=True): - logger.info(f"Start processing data from {start_date} to {end_date}") - processor.process_each_data(start_date, end_date, use_taiwan_mask=False) - rich_print("數據完成處理") - - except Exception as e: - error_message = f"處理數據失敗: {str(e)}" - rich_print(error_message) - logger.error(error_message) - - -if __name__ == "__main__": - # 設定參數 - start, end = '2024-11-01', '2024-11-10' - - # 設定輸入輸出配置 - setup_directory_structure(start, end) - - # 下載數據 - fetch_data(start, end) - - # 處理數據 - # process_data(start, end) diff --git a/src/processing/data_processor.py b/src/processing/data_processor.py index 4645289..a797298 100644 --- a/src/processing/data_processor.py +++ b/src/processing/data_processor.py @@ -1,23 +1,24 @@ """src/processing/no2_processor.py""" +import logging import numpy as np import xarray as xr -import logging -import zipfile -import tempfile -from pathlib import Path from datetime import datetime +from dateutil.relativedelta import relativedelta from src.processing.interpolators import DataInterpolator from src.processing.taiwan_frame import TaiwanFrame -from src.config.settings import RAW_DATA_DIR, PROCESSED_DATA_DIR, FIGURE_DIR -from src.visualization.sample_plot_nc import plot_global_no2 +from src.config.settings import RAW_DATA_DIR, PROCESSED_DATA_DIR, FIGURE_DIR, FIGURE_BOUNDARY +from src.visualization.plot_nc import plot_global_var +from src.config.catalog import ClassInput, TypeInput, PRODUCT_CONFIGS +from src.config.richer import DisplayManager + logger = logging.getLogger(__name__) -class NO2Processor: - def __init__(self, interpolation_method='kdtree', resolution=0.02, mask_value=0.50): - """初始化 NO2 處理器 +class S5Processor: + def __init__(self, interpolation_method='kdtree', resolution=0.01, mask_qc_value=0.75): + """初始化處理器 Parameters: ----------- @@ -25,28 +26,14 @@ def __init__(self, interpolation_method='kdtree', resolution=0.02, mask_value=0. 插值方法,可選 'griddata' 或 'kdtree' resolution : float 網格解析度(度) - mask_value : float + mask_qc_value : float QA 值的閾值 """ self.interpolation_method = interpolation_method self.resolution = resolution - self.mask_value = mask_value + self.mask_qc_value = mask_qc_value self.taiwan_frame = TaiwanFrame() - @staticmethod - def process_zipped_nc(zip_path: Path): - """處理壓縮的 NC 檔案""" - with tempfile.TemporaryDirectory() as temp_dir: - temp_dir_path = Path(temp_dir) - - with zipfile.ZipFile(zip_path, 'r') as zip_ref: - zip_ref.extractall(temp_dir) - - nc_files = list(temp_dir_path.rglob("*.nc")) - if nc_files: - return xr.open_dataset(nc_files[0], group='PRODUCT') - return None - def create_grid(self, lon: np.ndarray, lat: np.ndarray): """根據數據的經緯度範圍創建網格""" # 取得經緯度的範圍 @@ -60,147 +47,181 @@ def create_grid(self, lon: np.ndarray, lat: np.ndarray): # 創建網格矩陣 return np.meshgrid(grid_lon, grid_lat) - def extract_data(self, dataset: xr.Dataset, use_taiwan_mask: bool = False): - """從數據集中提取數據 + def extract_data(self, dataset: xr.Dataset, extract_range: tuple[float, float, float, float] = None): + """提取數據,可選擇是否限定範圍 - Parameters: - ----------- - dataset : xr.Dataset - 輸入的數據集 - use_taiwan_mask : bool - 是否只提取台灣區域的數據 + Args: + dataset: xarray Dataset + extract_range: 可選的tuple (min_lon, max_lon, min_lat, max_lat),如果提供則提取指定範圍 """ - if use_taiwan_mask: - return self._extract_data_taiwan(dataset) - return self._extract_data_global(dataset) - - def _extract_data_global(self, dataset: xr.Dataset): - """提取全球範圍的數據""" - time = dataset.time.values[0] - lat = dataset.latitude.values[0] - lon = dataset.longitude.values[0] - no2 = dataset.nitrogendioxide_tropospheric_column.values[0] - qa = dataset.qa_value.values[0] - - # 根據 QA 值過濾數據 - mask = qa < self.mask_value - no2[mask] = np.nan - - logger.info(f"\t{'data time':15}: {np.datetime64(time, 'D').astype(str)}") - logger.info(f"\t{'lon range':15}: {lon.min():.2f} to {lon.max():.2f}") - logger.info(f"\t{'lat range':15}: {lat.min():.2f} to {lat.max():.2f}") - logger.info(f"\t{'data shape':15}: {no2.shape}") - - return lon, lat, no2 - - def _extract_data_taiwan(self, dataset: xr.Dataset): - """提取台灣區域的數據""" - # 設定條件 - mask_lon = ((dataset.longitude >= 118) & (dataset.longitude <= 124)) - mask_lat = ((dataset.latitude >= 20) & (dataset.latitude <= 27)) - masked_lon_lat_ds = dataset.where((mask_lon & mask_lat), drop=True) - - if masked_lon_lat_ds.sizes['scanline'] == 0 or masked_lon_lat_ds.sizes['ground_pixel'] == 0: - raise ValueError("No data points within Taiwan region") - - mask_qa = (masked_lon_lat_ds.qa_value >= self.mask_value) - masked_ds = masked_lon_lat_ds.where(mask_qa) - - if np.all(np.isnan(masked_ds.nitrogendioxide_tropospheric_column)): + # 初始處理 + time = np.datetime64(dataset.time.values[0], 'D') + attributes = PRODUCT_CONFIGS[self.product_type].dataset_name + + # 如果提供了範圍,進行過濾 + if extract_range is not None: + min_lon, max_lon, min_lat, max_lat = extract_range + mask_lon = (dataset.longitude >= min_lon) & (dataset.longitude <= max_lon) + mask_lat = (dataset.latitude >= min_lat) & (dataset.latitude <= max_lat) + dataset = dataset.where((mask_lon & mask_lat), drop=True) + + # 檢查是否有數據 + if dataset.sizes['scanline'] == 0 or dataset.sizes['ground_pixel'] == 0: + raise ValueError(f"No data points within region: {extract_range}") + + # QA 過濾 + mask_qa = (dataset.qa_value >= self.mask_qc_value) + dataset = dataset.where(mask_qa) + + # 檢查數據有效性 + if np.all(np.isnan(dataset[attributes])): raise ValueError("No valid data points after QA filtering") - return ( - masked_ds.longitude[0].data, - masked_ds.latitude[0].data, - masked_ds.nitrogendioxide_tropospheric_column[0].data - ) + lon = dataset.longitude[0].values + lat = dataset.latitude[0].values + shape = dataset.latitude[0].values.shape + var = dataset[attributes][0].values + + info_dict = { + 'time': f"{time}", + 'shape': f"{shape}", + 'latitude': f'{np.nanmin(lat):.2f} to {np.nanmax(lat):.2f}', + 'longitude': f'{np.nanmin(lon):.2f} to {np.nanmax(lon):.2f}', + } + + if hasattr(self, 'nc_info'): + self.nc_info.update(info_dict) + + return lon, lat, var + + def process_each_data(self, + file_class: ClassInput, + file_type: TypeInput, + start_date: str, + end_date: str, + ): + """ + 處理指定日期範圍內的衛星數據 - def process_each_data(self, start_date: str, end_date: str, use_taiwan_mask: bool = False): - """處理單一數據""" - # 將字串日期轉換為 datetime 物件 + Args: + file_class : ProductClassInput + file_type: ProductTypeInput + start_date (str): 開始日期 (YYYY-MM-DD) + end_date (str): 結束日期 (YYYY-MM-DD) + """ + def process_single_file(file_path, output_dir): + """處理單一數據檔案""" + ds = xr.open_dataset(file_path, engine='netcdf4', group='PRODUCT') + + # 確保輸出目錄存在 + output_dir.mkdir(parents=True, exist_ok=True) + output_path = output_dir / file_path.name + + if not output_path.exists(): + ds.to_netcdf(output_path) + + if ds is None: + return + + try: + # 1. 紀錄 nc 檔訊息 + self.nc_info = {'file_name': file_path.name} + + # # 2. 提取數據和信息 + # lon, lat, var = self.extract_data(ds, extract_range=FIGURE_BOUNDARY) + # + # # 3. 顯示檔案信息 + # DisplayManager().display_product_info(self.nc_info) + # + # # 4. 創建網格並進行插值 + # lon_grid, lat_grid = self.create_grid(lon, lat) + # var_grid = DataInterpolator.interpolate( + # lon, lat, var, + # lon_grid, lat_grid, + # method=self.interpolation_method + # ) + # + # # 5. 創建插值後的數據集 + # interpolated_ds = xr.Dataset( + # { + # self.product_type.dataset_name: ( + # ['time', 'latitude', 'longitude'], + # var_grid[np.newaxis, :, :] + # ) + # }, + # coords={ + # 'time': ds.time.values[0:1], + # 'latitude': np.squeeze(lat_grid[:, 0]), + # 'longitude': np.squeeze(lon_grid[0, :]) + # } + # ) + + finally: + ds.close() + + # 主處理流程 start = datetime.strptime(start_date, '%Y-%m-%d') end = datetime.strptime(end_date, '%Y-%m-%d') - - # 逐月處理每個月份的路徑 current_date = start + + # 按月份逐月處理 while current_date <= end: + # 1. 準備當月的目錄路徑 + product_type = file_type year = current_date.strftime('%Y') month = current_date.strftime('%m') - # 構建每個月份的 input 和 output 路徑 - input_dir = RAW_DATA_DIR / year / month - output_dir = PROCESSED_DATA_DIR / year / month - figure_output_file = FIGURE_DIR / year / month - - # 創建路徑 - input_dir.mkdir(parents=True, exist_ok=True) - output_dir.mkdir(parents=True, exist_ok=True) - figure_output_file.mkdir(parents=True, exist_ok=True) - - # for monthly data - # output_file = output_dir / f"NO2_{year}{month}.nc" - # - # if output_file.exists(): - # logger.info(f"File {output_file} exists, skipping") - - # container = [] - - # 目前只拿"NTRI"畫圖 - for file_path in input_dir.glob("*NRTI_L2__NO2*.nc"): - logger.info(f'Processing: {file_path.name}') - + input_dir = RAW_DATA_DIR / product_type / year / month + output_dir = PROCESSED_DATA_DIR / product_type / year / month + figure_dir = FIGURE_DIR / product_type / year / month + + # 2. 創建必要的目錄 + for directory in [output_dir, figure_dir]: + directory.mkdir(parents=True, exist_ok=True) + + file_pattern = f"*{file_class}_L2__{file_type}*.nc" + + # 3. 處理原始數據(如果存在) + if input_dir.exists(): + for file_path in input_dir.glob(file_pattern): + # 檢查檔案日期是否在指定範圍內 + date_to_check = datetime.strptime(file_path.name[20:28], '%Y%m%d') + if not (start <= date_to_check <= end): + continue + try: + process_single_file(file_path, output_dir) + except Exception as e: + logger.error(f"處理檔案 {file_path.name} 時發生錯誤: {e}") + continue + + # 4. 繪製圖片(使用處理後的數據) + processed_files = list(output_dir.glob(file_pattern)) + if not processed_files: + logger.warning(f"在 {output_dir} 中找不到符合條件的處理後檔案") + continue + + for file_path in processed_files: + date_to_check = datetime.strptime(file_path.name[20:28], '%Y%m%d') + if not (start <= date_to_check <= end): + continue + figure_path = figure_dir / f"{file_path.stem}.png" try: - dataset = self.process_zipped_nc(file_path) - - if dataset is not None: - try: - # 提取數據 - lon, lat, no2 = self.extract_data(dataset, use_taiwan_mask) - - # 為每個檔案創建新的網格 - lon_grid, lat_grid = self.create_grid(lon, lat) - - # 插值 - no2_grid = DataInterpolator.interpolate( - lon, lat, no2, - lon_grid, lat_grid, - method=self.interpolation_method - ) - - # 創建臨時的 Dataset 來顯示插值後的結果 - interpolated_ds = xr.Dataset( - { - 'nitrogendioxide_tropospheric_column': ( - ['time', 'latitude', 'longitude'], - no2_grid[np.newaxis, :, :] - ) - }, - coords={ - 'time': dataset.time.values[0:1], # 使用原始時間 - 'latitude': np.squeeze(lat_grid[:, 0]), - 'longitude': np.squeeze(lon_grid[0, :]) - } - ) - - # 繪製插值後的數據圖 - logger.info("繪製插值後的數據圖...") - plot_global_no2(interpolated_ds, figure_output_file / file_path.stem, close_after=True, map_scale='Taiwan') - - # 移動到下個月 - if month == "12": - current_date = current_date.replace(year=current_date.year + 1, month=1, day=1) - else: - current_date = current_date.replace(month=current_date.month + 1, day=1) - - # container.append(no2_grid) - finally: - dataset.close() - + plot_global_var( + dataset=file_path, + product_params=PRODUCT_CONFIGS[file_type], + savefig_path=figure_path, + map_scale='Taiwan', + show_stations=True + ) except Exception as e: - logger.error(f"Error processing {file_path.name}: {e}") + logger.error(f"繪製檔案 {file_path.name} 時發生錯誤: {e}") continue - def _save_monthly_average(self, container, grid, year, month, output_file): + # 4. 移至下個月 + current_date = (current_date + relativedelta(months=1)).replace(day=1) + + @staticmethod + def _save_monthly_average(container, grid, year, month, output_file): """保存月平均數據""" # 計算平均值 no2_stack = np.stack(container) @@ -235,9 +256,3 @@ def _save_monthly_average(self, container, grid, year, month, output_file): # 確保輸出目錄存在 output_file.parent.mkdir(parents=True, exist_ok=True) ds_result.to_netcdf(output_file) - - logger.info(f"Saved monthly average to {output_file}") - logger.info(f"Final grid shape: {no2_average.shape}") - logger.info(f"Time: {ds_result.time.values}") - logger.info(f"Longitude range: {grid[0][0].min():.2f} to {grid[0][0].max():.2f}") - logger.info(f"Latitude range: {grid[1][:, 0].min():.2f} to {grid[1][:, 0].max():.2f}") \ No newline at end of file diff --git a/src/processing/interpolators.py b/src/processing/interpolators.py index d9b4406..515b6ff 100644 --- a/src/processing/interpolators.py +++ b/src/processing/interpolators.py @@ -9,63 +9,115 @@ class DataInterpolator: """數據插值器,支援多種插值方法""" @staticmethod - def griddata_interpolation(lon, lat, data, lon_grid, lat_grid): - """使用 griddata 進行插值""" - points = np.column_stack((lon.flatten(), lat.flatten())) - values = data.flatten() + def griddata_interpolation(lon, lat, data, lon_grid, lat_grid, max_distance=0.1): + """使用 griddata 進行插值,只填充距離較近的網格點 - # 移除無效值 - valid = ~np.isnan(values) - points = points[valid] - values = values[valid] + Parameters: + ----------- + lon, lat : ndarray + 原始經緯度數據 + data : ndarray + 要插值的數據 + lon_grid, lat_grid : ndarray + 目標網格的經緯度 + max_distance : float + 最大插值距離(單位:度),超過此距離的網格點不進行插值 + """ + # 移除無效值(NaN) + valid_mask = ~np.isnan(lon) & ~np.isnan(lat) & ~np.isnan(data) + valid_lon = lon[valid_mask] + valid_lat = lat[valid_mask] + valid_data = data[valid_mask] + + if len(valid_data) == 0: + return np.full_like(lon_grid, np.nan) + + points = np.column_stack((valid_lon.flatten(), valid_lat.flatten())) + values = valid_data.flatten() + + # 建立 KDTree 用於距離檢查 + tree = cKDTree(points) # 將網格點轉換為適合 griddata 的格式 grid_points = np.column_stack((lon_grid.flatten(), lat_grid.flatten())) - # 進行插值 - grid_values = griddata(points, values, grid_points, method='linear') + # 查找每個網格點最近的原始數據點的距離 + distances, _ = tree.query(grid_points, k=1) + + # 創建遮罩,只對距離在閾值內的點進行插值 + mask = distances <= max_distance + + # 初始化結果數組為 NaN + grid_values = np.full(grid_points.shape[0], np.nan) + + # 只對符合距離條件的點進行插值 + if np.any(mask): + grid_values[mask] = griddata(points, + values, + grid_points[mask], + method='linear') + return grid_values.reshape(lon_grid.shape) @staticmethod - def kdtree_interpolation(lon, lat, data, lon_grid, lat_grid): - """使用 KDTree 進行插值""" - lon_flat = lon_grid.flatten() - lat_flat = lat_grid.flatten() + def kdtree_interpolation(lon, lat, data, lon_grid, lat_grid, max_distance=0.1): + """使用 KDTree 進行插值,只填充距離較近的網格點""" + # 移除無效值(NaN) + valid_mask = ~np.isnan(lon) & ~np.isnan(lat) & ~np.isnan(data) + valid_lon = lon[valid_mask] + valid_lat = lat[valid_mask] + valid_data = data[valid_mask] - # 构建 KD 树 - tree = cKDTree(np.column_stack((lon.flatten(), lat.flatten()))) + if len(valid_data) == 0: + return np.full_like(lon_grid, np.nan) - # 使用 query 方法查找最近的点 - distances, indices = tree.query(np.column_stack((lon_flat, lat_flat)), k=1) + # 建立 KD 樹 + tree = cKDTree(np.column_stack((valid_lon.flatten(), valid_lat.flatten()))) - x_index, y_index = np.unravel_index(indices, lon.shape) - interpolated_values = map_coordinates(data, [x_index, y_index], order=1, mode='nearest') + # 將網格點轉換為適合查詢的格式 + grid_points = np.column_stack((lon_grid.flatten(), lat_grid.flatten())) + + # 使用 query 方法查找最近的點和距離 + distances, indices = tree.query(grid_points, k=1) + + # 創建遮罩,只對距離在閾值內的點進行插值 + mask = distances <= max_distance + + # 初始化結果數組為 NaN + interpolated_values = np.full_like(lon_grid.flatten(), np.nan) + + # 只對符合距離條件的點進行插值 + if np.any(mask): + valid_data_flat = valid_data.flatten() + interpolated_values[mask] = valid_data_flat[indices[mask]] return interpolated_values.reshape(lon_grid.shape) @classmethod - def interpolate(cls, lon, lat, data, lon_grid, lat_grid, method='griddata'): + def interpolate(cls, lon, lat, data, lon_grid, lat_grid, method='griddata', max_distance=0.1): """統一的插值介面 - Parameters: - ----------- - lon, lat : ndarray - 原始經緯度數據 - data : ndarray - 要插值的數據 - lon_grid, lat_grid : ndarray - 目標網格的經緯度 - method : str - 插值方法,可選 'griddata' 或 'kdtree' - - Returns: - -------- - ndarray - 插值後的數據 - """ + Parameters: + ----------- + lon, lat : ndarray + 原始經緯度數據 + data : ndarray + 要插值的數據 + lon_grid, lat_grid : ndarray + 目標網格的經緯度 + method : str + 插值方法,可選 'griddata' 或 'kdtree' + max_distance : float + 最大插值距離(單位:度),超過此距離的網格點不進行插值 + + Returns: + -------- + ndarray + 插值後的數據,距離過遠的點將為 NaN + """ if method == 'griddata': - return cls.griddata_interpolation(lon, lat, data, lon_grid, lat_grid) + return cls.griddata_interpolation(lon, lat, data, lon_grid, lat_grid, max_distance) elif method == 'kdtree': - return cls.kdtree_interpolation(lon, lat, data, lon_grid, lat_grid) + return cls.kdtree_interpolation(lon, lat, data, lon_grid, lat_grid, max_distance) else: raise ValueError(f"Unsupported interpolation method: {method}") diff --git a/src/processing/taiwan_frame.py b/src/processing/taiwan_frame.py index 0f782f1..1450096 100644 --- a/src/processing/taiwan_frame.py +++ b/src/processing/taiwan_frame.py @@ -2,7 +2,7 @@ class TaiwanFrame: - def __init__(self, resolution=0.01, lat_Taiwan=(21, 26), lon_Taiwan=(119, 123)): + def __init__(self, resolution=0.01, lat_Taiwan=(20, 27), lon_Taiwan=(118, 124)): self.lat = np.arange(lat_Taiwan[0], lat_Taiwan[1] + resolution, resolution) self.lon = np.arange(lon_Taiwan[0], lon_Taiwan[1] + resolution, resolution) diff --git a/src/utils/logger.py b/src/utils/logger.py deleted file mode 100644 index 4909918..0000000 --- a/src/utils/logger.py +++ /dev/null @@ -1,26 +0,0 @@ -"""日誌配置""" -import logging -from datetime import datetime -from pathlib import Path -from src.config.settings import LOGS_DIR - - -def setup_logging(): - """設置日誌配置""" - # 確保日誌目錄存在 - log_dir = Path(LOGS_DIR) - log_dir.mkdir(parents=True, exist_ok=True) - - # 創建日誌檔案路徑 - log_file = log_dir / f"Satellite_S5p_{datetime.now().strftime('%Y%m')}.log" - - # 配置基本設定 - logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name).10s - %(message)s', - datefmt='%Y-%m-%d %H:%M:%S', - handlers=[ - logging.FileHandler(log_file, encoding='utf-8'), - logging.StreamHandler() # 同時輸出到控制台 - ] - ) diff --git a/src/visualization/gif_nc.py b/src/visualization/gif_nc.py new file mode 100644 index 0000000..844a199 --- /dev/null +++ b/src/visualization/gif_nc.py @@ -0,0 +1,61 @@ +import imageio +from datetime import datetime +import re +from PIL import Image +import numpy as np +from src.config.settings import FIGURE_DIR +from src.config.catalog import TypeInput + + +def animate_data(file_type: TypeInput, start_date, end_date, fps=1, resize=None, **kwargs): + """ + 將 Sentinel 衛星圖片製作成 GIF 動畫 + + Parameters: + ----------- + image_dir : str or Path + 圖片資料夾路徑 + output_path : str or Path + 輸出檔案路徑 + fps : int + 每秒顯示幾張圖片 + resize : tuple + 調整圖片大小,例如 (800, 600) + """ + image_dir = FIGURE_DIR / file_type + output_path = FIGURE_DIR / file_type / 'sentinel_animation.gif' + + # 取得所有圖片檔案 + image_files = list(image_dir.glob('**/*OFFL*.png')) + + # 從檔名提取日期時間 + def get_datetime(filepath): + match = re.search(r'(\d{8}T\d{6})', filepath.name) + return datetime.strptime(match.group(1), '%Y%m%dT%H%M%S') if match else datetime.min + + # 依照日期時間排序 + image_files.sort(key=get_datetime) + + # 準備圖片 + images = [] + for filepath in image_files: + img = Image.open(filepath) + if resize: + img = img.resize(resize, Image.Resampling.LANCZOS) + images.append(np.array(img)) + + print(f"找到 {len(images)} 張圖片") + + if not images: + print("沒有找到符合條件的圖片!") + return + + # 製作 GIF 動畫 + print(f"正在製作 GIF 動畫... fps={fps}") + imageio.mimsave( + output_path, + images, + fps=fps, + loop=0 # 0 表示無限循環 + ) + print(f"動畫製作完成:{output_path}") diff --git a/src/visualization/plot_nc.py b/src/visualization/plot_nc.py index add2996..33fe3e2 100644 --- a/src/visualization/plot_nc.py +++ b/src/visualization/plot_nc.py @@ -1,24 +1,230 @@ -import cartopy.crs as ccrs -import numpy as np +import logging +import geopandas as gpd import xarray as xr import matplotlib.pyplot as plt -import cartopy.mpl.gridliner as gridliner -import time -import geopandas as gpd - -from scipy.signal import convolve2d -from netCDF4 import Dataset -from matplotlib.ticker import ScalarFormatter +import cartopy.crs as ccrs +import cartopy.feature as cfeature +import numpy as np +from typing import Literal from pathlib import Path -from src.processing.taiwan_frame import TaiwanFrame +from shapely.geometry import Point +from scipy.signal import convolve2d +from matplotlib.ticker import ScalarFormatter, FixedLocator + +from src.config.settings import FIGURE_BOUNDARY +from src.config.richer import DisplayManager + +plt.rcParams['mathtext.fontset'] = 'custom' +plt.rcParams['mathtext.rm'] = 'Times New Roman' +plt.rcParams['mathtext.it'] = 'Times New Roman: italic' +plt.rcParams['mathtext.bf'] = 'Times New Roman: bold' +plt.rcParams['mathtext.default'] = 'regular' +plt.rcParams['font.family'] = 'Times New Roman' +plt.rcParams['font.weight'] = 'normal' +plt.rcParams['font.size'] = 16 -taiwan_counties = gpd.read_file(Path(__file__).parent / "taiwan/COUNTY_MOI_1090820.shp") -station = gpd.read_file(Path(__file__).parent / "stations/空氣品質監測站位置圖_121_10704.shp") +plt.rcParams['axes.titlesize'] = 'large' +plt.rcParams['axes.titleweight'] = 'bold' +plt.rcParams['axes.labelweight'] = 'bold' -# @setFigure -def platecarree_plot(dataset, zoom=True, path=None, **kwargs): +logger = logging.getLogger(__name__) + + +def smooth_kernel(data, kernel_size=5): + kernel = np.ones((kernel_size, kernel_size)) + return convolve2d(data, kernel, mode='same', boundary='wrap') / np.sum(kernel) + + +def plot_stations(ax, stations: list[str], label_offset: tuple[float, float] = (-0.2, 0)): + """繪製測站標記和標籤 + + Args: + ax: matplotlib axes + stations: 要標記的測站名稱列表 + label_offset: (x偏移, y偏移),用於調整標籤位置 + """ + # 讀取測站資料 + station_data = gpd.read_file( + Path(__file__).parents[2] / "data/shapefiles/stations/空氣品質監測站位置圖_121_10704.shp" + ) + # 創建測站的 GeoDataFrame + station_geometry = [Point(xy) for xy in zip(station_data['TWD97Lon'], station_data['TWD97Lat'])] + + # all station + # geodata = gpd.GeoDataFrame(station_data, crs=ccrs.PlateCarree(), geometry=station_geometry) + # geodata.plot(ax=ax, color='gray', markersize=10) + + # matplotlib 預設顏色和標記循環 + colors = plt.rcParams['axes.prop_cycle'].by_key()['color'] # 預設顏色循環 + markers = ['o', 's', '^', 'v', 'D', '<', '>', 'p', '*'] # 標記循環 + + # 過濾出要標記的測站 + target_stations = station_data[station_data['SiteName'].isin(stations)] + + legend_labels = [] + legend_handles = [] + + for i, (_, row) in enumerate(target_stations.iterrows()): + # 循環使用顏色和標記 + color = colors[i % len(colors)] + marker = markers[i % len(markers)] + + # 畫點並保存標記物件 + marker_obj = ax.scatter(row['TWD97Lon'], row['TWD97Lat'], + marker=marker, + color=color, + s=15, + transform=ccrs.PlateCarree(), + label=row['SiteEngNam']) + + legend_labels.append(row['SiteEngNam']) + legend_handles.append(marker_obj) + + # 添加圖例 + ax.legend(handles=legend_handles, + labels=legend_labels, + loc='upper right', + bbox_to_anchor=(1, 1), + borderaxespad=0.) + + +def plot_global_var(dataset: Path | str, + product_params, + show_info: bool = True, + savefig_path=None, + map_scale: Literal['global', 'Taiwan'] = 'global', + show_stations: bool = False, + mark_stations: list = ['古亭', '楠梓', '鳳山'], + ): + """ + 在全球地圖上繪製 var 分布圖 + """ + try: + # 判斷輸入類型並適當處理 + from netCDF4 import Dataset + if isinstance(dataset, (str, Path)): + if 'PRODUCT' in Dataset(dataset, 'r').groups: + ds = xr.open_dataset(dataset, engine='netcdf4', group='PRODUCT') + else: + ds = xr.open_dataset(dataset, engine='netcdf4') + elif isinstance(dataset, xr.Dataset): + ds = dataset + else: + raise NotImplementedError + + if show_info: + lon = ds.longitude[0].values + lat = ds.latitude[0].values + var = ds[product_params.dataset_name][0].values + + nc_info = {'file_name': dataset.name if isinstance(dataset, Path) else '', + 'time': np.datetime64(ds.time.values[0], 'D'), + 'shape': ds.latitude[0].values.shape, + 'latitude': f'{np.nanmin(lat):.2f} to {np.nanmax(lat):.2f}', + 'longitude': f'{np.nanmin(lon):.2f} to {np.nanmax(lon):.2f}', + } + + DisplayManager().display_product_info(nc_info) + + # 創建圖形和投影 + fig = plt.figure(figsize=(12, 8) if map_scale == 'global' else (8, 8)) + ax = plt.axes(projection=ccrs.PlateCarree()) + + # 設定全球範圍 + if map_scale == 'global': + ax.set_global() + else: + ax.set_extent(FIGURE_BOUNDARY, crs=ccrs.PlateCarree()) + + # 繪製數據 + dataset = ds[product_params.dataset_name][0] + dataset.data = smooth_kernel(dataset.data, kernel_size=3) + + # 方法1:使用 ScalarFormatter + formatter = ScalarFormatter(useMathText=True) + formatter.set_scientific(True) + formatter.set_powerlimits((-2, 2)) # 可以調整使用科學記號的範圍 + + im = dataset.plot( + ax=ax, + x='longitude', y='latitude', + cmap='RdBu_r', + transform=ccrs.PlateCarree(), + robust=True, # 自動處理極端值 + vmin=product_params.vmin, + vmax=product_params.vmax, + cbar_kwargs={ + 'label': product_params.units, + 'fraction': 0.04, # colorbar 的寬度 (預設是 0.15) + 'pad': 0.04, # colorbar 和圖之間的間距 + 'aspect': 30, # colorbar 的長寬比,增加這個值會讓 colorbar 變長 + 'shrink': 1, + 'format': formatter, + 'extend': 'neither', + } + ) + + # plot = dataset.plot.pcolormesh(ax=ax, x='longitude', y='latitude', add_colorbar=False, cmap='jet') + + # 如果是台灣範圍且需要顯示測站 + if map_scale == 'Taiwan': + # ax.add_feature(cfeature.COASTLINE.with_scale('10m')) + + # 讀取縣市和測站資料並添加縣市邊界 + taiwan_counties = gpd.read_file(Path(__file__).parents[2] / "data/shapefiles/taiwan/COUNTY_MOI_1090820.shp") + ax.add_geometries(taiwan_counties['geometry'], crs=ccrs.PlateCarree(), edgecolor='black', facecolor='none') + + if show_stations and mark_stations: + plot_stations(ax, mark_stations) + + + else: + # 添加地圖特徵 + ax.add_feature(cfeature.BORDERS.with_scale('10m'), linestyle=':') + ax.add_feature(cfeature.COASTLINE.with_scale('10m')) + ax.add_feature(cfeature.LAND.with_scale('10m'), alpha=0.1) + ax.add_feature(cfeature.OCEAN.with_scale('10m'), alpha=0.1) + + # 設定網格線 + gl = ax.gridlines(draw_labels=True, linestyle='--', alpha=0.7) + gl.top_labels = False + gl.right_labels = False + gl.xlocator = FixedLocator([119, 120, 121, 122, 123]) # 設定經度刻度 + + # 用矩形標記數據範圍 + lon_min, lon_max = float(np.nanmin(ds.longitude)), float(np.nanmax(ds.longitude)) + lat_min, lat_max = float(np.nanmin(ds.latitude)), float(np.nanmax(ds.latitude)) + rect = plt.Rectangle( + (lon_min, lat_min), + lon_max - lon_min, + lat_max - lat_min, + fill=False, + color='red', + transform=ccrs.PlateCarree(), + linewidth=2 + ) + ax.add_patch(rect) + + # 設定標題 + time_str = np.datetime64(dataset.time.values, 'D').astype(str) + plt.title(f'{product_params.title} {time_str}', pad=20, fontdict={'weight': 'bold', 'fontsize': 24}) + + plt.tight_layout() + plt.show() + + if savefig_path is not None: + fig.savefig(savefig_path, dpi=600) + + ds.close() + + except Exception as e: + logger.error(f"繪圖時發生錯誤: {str(e)}") + raise + + +def platecarree_plot(dataset, product_params, zoom=True, path=None, **kwargs): fig, ax = plt.subplots(figsize=(7, 6), subplot_kw={'projection': ccrs.PlateCarree()}) if zoom: @@ -27,16 +233,12 @@ def platecarree_plot(dataset, zoom=True, path=None, **kwargs): gl = ax.gridlines(crs=ccrs.PlateCarree(), draw_labels=True, linewidth=1, color='gray', alpha=0.5) gl.xlocator = plt.FixedLocator([119, 120, 121, 122, 123]) gl.top_labels = gl.right_labels = False - # ax.coastlines(resolution='10m', color='gray', linewidth=1) - ax.add_geometries(taiwan_counties['geometry'], crs=ccrs.PlateCarree(), edgecolor='black', facecolor='none', linewidth=1) else: ax.set_global() gl = ax.gridlines(crs=ccrs.PlateCarree(), draw_labels=True, linewidth=1, color='gray', alpha=0.5) gl.top_labels = gl.right_labels = False - # ax.coastlines(resolution='10m', color='black', linewidth=0.5) - ax.add_geometries(taiwan_counties['geometry'], resolution='10m', crs=ccrs.PlateCarree(), edgecolor='black', facecolor='none') - var = 'nitrogendioxide_tropospheric_column' + var = product_params.dataset_name plot = dataset[var][0].plot.pcolormesh(ax=ax, x='longitude', y='latitude', add_colorbar=False, cmap='jet', vmin=0, vmax=1.4e-4) @@ -44,9 +246,9 @@ def platecarree_plot(dataset, zoom=True, path=None, **kwargs): cbar.set_label(r'$\bf NO_{2}\ mole/m^2$') # 设置colorbar刻度标签格式为科学记数法 - cbar.formatter = ScalarFormatter(useMathText=True, useOffset=True) - cbar.formatter.set_powerlimits((-2, 2)) # 设置指数的显示范围 - cbar.update_ticks() + formatter = ScalarFormatter(useMathText=True) + formatter.set_scientific(True) + formatter.set_powerlimits((-2, 2)) # 可以調整使用科學記號的範圍 plt.title(kwargs.get('title')) if path is not None: @@ -54,11 +256,11 @@ def platecarree_plot(dataset, zoom=True, path=None, **kwargs): plt.show() -def orthographic_plot(dataset): +def orthographic_plot(dataset, product_params): projection = ccrs.Orthographic(120, 25) fig, ax = plt.subplots(figsize=(8, 6), subplot_kw={'projection': projection}) - var = 'nitrogendioxide_tropospheric_column' + var = product_params.dataset_name vmin = dataset[var][0].min() dataset[var][0].plot.pcolormesh(ax=ax, x='longitude', y='latitude', @@ -68,61 +270,15 @@ def orthographic_plot(dataset): ax.set_global() ax.coastlines(resolution='10m') + plt.show() -def smooth_kernel(data, kernel_size=5): - """ - - :param data: - :param kernel_size: - :return: - """ - kernel = np.ones((kernel_size, kernel_size)) - - return convolve2d(data, kernel, mode='same', boundary='wrap') / np.sum(kernel) - - -if __name__ == '__main__': - year = '2022' # 改年度即可使用 - folder_path = Path(__file__).parent / "ncData" - output_folder = Path(__file__).parent / "Figure" - - lon_coordinate, lat_coordinate = TaiwanFrame().frame() - container = [] - - # 遍历文件夹中符合条件的文件 - for file_path in folder_path.glob(f"NO2_{year}*.nc"): - dataset = xr.open_dataset(file_path) - - # 提取数据并进行平滑处理 - traget_data = dataset.nitrogendioxide_tropospheric_column[0].data - container.append(traget_data) - - dataset.nitrogendioxide_tropospheric_column[0] = smooth_kernel(traget_data) - - # 生成图像文件并保存到输出文件夹 - # print('plot: ' + file_path.name) - # output_path = output_folder / f"{file_path.stem}.png" - # platecarree_plot(dataset, path=output_path, - # title=rf'$\bf {file_path.stem[4:8]}-{file_path.stem[8:10]}\ \ NO_{2}$') - - - # 计算年平均值 - no2_year_average = np.nanmean(container, axis=0) - original_shape = no2_year_average.shape - - ds_time = np.array(np.datetime64(year, 'ns')) - ds_result = xr.Dataset( - {'nitrogendioxide_tropospheric_column': (['latitude', 'longitude'], no2_year_average.reshape(*original_shape))}, - coords={'latitude': lat_coordinate[:, 0], - 'longitude': lon_coordinate[0]}) - - ds_result = ds_result.expand_dims(time=[ds_time]) +if __name__ == "__main__": + file_group = '/Users/chanchihyu/Sentinel-5P/raw/NO2___/2024/03/S5P_OFFL_L2__NO2____20240314T031823_20240314T045953_33252_03_020600_20240315T192700.nc' + file_ungroup = '/Users/chanchihyu/Sentinel-5P/processed/NO2___/2024/01/S5P_OFFL_L2__NO2____20240110T045402_20240110T063532_32345_03_020600_20240111T211523.nc' + ds = xr.open_dataset(file_ungroup) - # 畫年平均 - year_ds = ds_result.copy(deep=True) - traget_data = year_ds.nitrogendioxide_tropospheric_column[0].data - year_ds['nitrogendioxide_tropospheric_column'][0] = smooth_kernel(traget_data) + from netCDF4 import Dataset + nc = Dataset(file_ungroup, 'r').groups - output_path = output_folder / f"NO2_{year}.png" - platecarree_plot(year_ds, path=output_path, title=rf'$\bf {year}\ \ NO_{2}$') + # plot_global_var(file) diff --git a/src/visualization/read_shp.py b/src/visualization/read_shp.py deleted file mode 100644 index 3b5446d..0000000 --- a/src/visualization/read_shp.py +++ /dev/null @@ -1,23 +0,0 @@ -import matplotlib.pyplot as plt -import cartopy.crs as ccrs -import pandas as pd -import geopandas as gpd -from pathlib import Path -from shapely.geometry import Point, Polygon - -taiwan_counties = gpd.read_file(Path(__file__).parents[2] / "data/shapefiles/taiwan/COUNTY_MOI_1090820.shp") -station = gpd.read_file(Path(__file__).parents[2] / "data/shapefiles/stations/空氣品質監測站位置圖_121_10704.shp") - -geometry = [Point(xy) for xy in zip(station['TWD97Lon'], station['TWD97Lat'])] -geodata = gpd.GeoDataFrame(station, crs=ccrs.PlateCarree(), geometry=geometry) - -# 创建地图 -fig, ax = plt.subplots(subplot_kw={'projection': ccrs.PlateCarree()}) -ax.set_extent([119, 123, 21, 26], crs=ccrs.PlateCarree()) - -# 添加县市边界 -ax.add_geometries(taiwan_counties['geometry'], crs=ccrs.PlateCarree(), edgecolor='black', facecolor='none') -ax.add_geometries(station['geometry'], crs=ccrs.PlateCarree(), edgecolor='Red', facecolor='none') -geodata.plot(ax=ax, color='red', markersize=5) - -plt.show() diff --git a/src/visualization/sample_plot_nc.py b/src/visualization/sample_plot_nc.py deleted file mode 100644 index 342294e..0000000 --- a/src/visualization/sample_plot_nc.py +++ /dev/null @@ -1,122 +0,0 @@ -import xarray as xr -import matplotlib.pyplot as plt -import cartopy.crs as ccrs -import cartopy.feature as cfeature -import numpy as np -import warnings -from typing import Literal -from cartopy.io import DownloadWarning -from pathlib import Path -import logging - -plt.rcParams['mathtext.fontset'] = 'custom' -plt.rcParams['mathtext.rm'] = 'Times New Roman' -plt.rcParams['mathtext.it'] = 'Times New Roman: italic' -plt.rcParams['mathtext.bf'] = 'Times New Roman: bold' -plt.rcParams['mathtext.default'] = 'regular' -plt.rcParams['font.family'] = 'Times New Roman' -plt.rcParams['font.weight'] = 'normal' -plt.rcParams['font.size'] = 16 - -plt.rcParams['axes.titlesize'] = 'large' -plt.rcParams['axes.titleweight'] = 'bold' -plt.rcParams['axes.labelweight'] = 'bold' - - -# warnings.filterwarnings('ignore', category=DownloadWarning) -logger = logging.getLogger(__name__) - - -def plot_global_no2(data, - savefig_path=None, - close_after=True, - map_scale: Literal['global', 'Taiwan'] = 'global' - ): - """ - 在全球地圖上繪製 NO2 分布圖 - """ - try: - # 判斷輸入類型並適當處理 - if isinstance(data, (str, Path)): - ds = xr.open_dataset(data) - should_close = True - else: - ds = data - should_close = False and close_after - - # 創建圖形和投影 - fig = plt.figure(figsize=(15, 8) if map_scale == 'global' else (10, 8)) - ax = plt.axes(projection=ccrs.PlateCarree()) - - # 設定全球範圍 - if map_scale == 'global': - ax.set_global() - else: - ax.set_extent([100, 145, 0, 45], crs=ccrs.PlateCarree()) - - # 繪製 NO2 數據 - data = ds.nitrogendioxide_tropospheric_column[0] - - im = data.plot( - ax=ax, - cmap='RdBu_r', - transform=ccrs.PlateCarree(), - robust=True, # 自動處理極端值 - cbar_kwargs={ - 'label': f'NO$_2$ Tropospheric Column (mol/m$^2$)', - 'fraction': 0.046, # colorbar 的寬度 (預設是 0.15) - 'pad': 0.04, # colorbar 和圖之間的間距 - 'aspect': 20, # colorbar 的長寬比,增加這個值會讓 colorbar 變長 - 'shrink': 0.8 if map_scale == 'global' else 0.9 - } - ) - - # 添加地圖特徵 - ax.add_feature(cfeature.BORDERS.with_scale('50m'), linestyle=':') - ax.add_feature(cfeature.COASTLINE.with_scale('50m')) - ax.add_feature(cfeature.LAND.with_scale('50m'), alpha=0.1) - ax.add_feature(cfeature.OCEAN.with_scale('50m'), alpha=0.1) - - # 設定網格線 - gl = ax.gridlines(draw_labels=True, linestyle='--', alpha=0.5) - gl.top_labels = False - gl.right_labels = False - - # 用矩形標記數據範圍 - lon_min, lon_max = float(ds.longitude.min()), float(ds.longitude.max()) - lat_min, lat_max = float(ds.latitude.min()), float(ds.latitude.max()) - rect = plt.Rectangle( - (lon_min, lat_min), - lon_max - lon_min, - lat_max - lat_min, - fill=False, - color='red', - transform=ccrs.PlateCarree(), - linewidth=2 - ) - ax.add_patch(rect) - - # 設定標題 - time_str = np.datetime64(data.time.values).astype(str) - plt.title(f'NO$_2$ Tropospheric Column {time_str}', pad=20, fontdict={'weight': 'bold', 'fontsize': 24}) - - plt.tight_layout() - plt.show() - - if savefig_path is not None: - fig.savefig(savefig_path, dpi=600) - - if should_close: - ds.close() - - except Exception as e: - logger.error(f"繪圖時發生錯誤: {str(e)}") - raise - - -# 主程式 -if __name__ == "__main__": - file_list = ["/Users/chanchihyu/Sentinel_data/raw/2024/04/S5P_OFFL_L2__NO2____20240409T051555_20240409T065725_33622_03_020600_20240410T213619.nc"] - - for file in file_list: - plot_global_no2(file)