diff --git a/pygac_fdr/metadata.py b/pygac_fdr/metadata.py index 0cf5663..1da5f20 100644 --- a/pygac_fdr/metadata.py +++ b/pygac_fdr/metadata.py @@ -20,13 +20,17 @@ from datetime import datetime from enum import IntEnum +import os import logging import netCDF4 +import sqlalchemy import numpy as np import pandas as pd -import sqlite3 import xarray as xr from xarray.coding.times import encode_cf_datetime +from sqlalchemy import Column, Integer, Float, String, DateTime +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker LOG = logging.getLogger(__package__) @@ -101,6 +105,31 @@ class QualityFlags(IntEnum): 'fill_value': None} ] +CHECKPOINT_STEP = 100 +CHECKPOINT_DB = "_checkpoint.db" +Base = declarative_base() + + +class FileMetadata(Base): + """Object relational model for SQL database.""" + __tablename__ = 'metadata' + + platform = Column(String) + start_time = Column(DateTime) + end_time = Column(DateTime) + along_track = Column(Integer) + filename = Column(String, primary_key=True) + orbit_number_start = Column(Integer) + orbit_number_end = Column(Integer) + equator_crossing_longitude_1 = Column(Float) + equator_crossing_time_1 = Column(DateTime) + equator_crossing_longitude_2 = Column(Float) + equator_crossing_time_2 = Column(DateTime) + midnight_line = Column(Float) + overlap_free_start = Column(Float) + overlap_free_end = Column(Float) + global_quality_flag = Column(Integer) + class MetadataCollector: """Collect and complement metadata from level 1c files. @@ -138,47 +167,69 @@ def get_metadata(self, filenames): def save_sql(self, mda, dbfile, if_exists): """Save metadata to sqlite database.""" - con = sqlite3.connect(dbfile) - mda.to_sql(name='metadata', con=con, if_exists=if_exists) - con.commit() - con.close() + engine = sqlalchemy.create_engine('sqlite:///{0}'.format(dbfile)) + mda.to_sql(name="metadata", con=engine, if_exists=if_exists) def read_sql(self, dbfile): """Read metadata from sqlite database.""" - with sqlite3.connect(dbfile) as con: - mda = pd.read_sql('select * from metadata', con) - mda = mda.set_index(['platform', 'level_1']) - mda.fillna(value=np.nan, inplace=True) - for col in mda.columns: - if 'time' in col: - mda[col] = mda[col].astype('datetime64[ns]') + engine = sqlalchemy.create_engine('sqlite:///{0}'.format(dbfile)) + mda = pd.read_sql_table("metadata", engine, index_col=['platform', 'level_1']) return mda + def _extract_fromfile(self, filename): + """Extract metadata from a single file.""" + LOG.debug('Collecting metadata from {}'.format(filename)) + with xr.open_dataset(filename) as ds: + midnight_line = np.float64(self._get_midnight_line(ds['acq_time'])) + eq_cross_lons, eq_cross_times = self._get_equator_crossings(ds) + metadata = FileMetadata( + platform=ds.attrs['platform'].split('>')[-1].strip(), + start_time=ds['acq_time'].values[0], + end_time=ds['acq_time'].values[-1], + along_track=ds.dims['y'], + filename=filename, + orbit_number_start=ds.attrs['orbit_number_start'], + orbit_number_end=ds.attrs['orbit_number_end'], + equator_crossing_longitude_1=eq_cross_lons[0], + equator_crossing_time_1=eq_cross_times[0], + equator_crossing_longitude_2=eq_cross_lons[0], + equator_crossing_time_2=eq_cross_times[1], + midnight_line=midnight_line, + overlap_free_start=np.nan, + overlap_free_end=np.nan, + global_quality_flag=QualityFlags.OK + ) + return metadata + def _collect_metadata(self, filenames): """Collect metadata from the given level 1c files.""" - records = [] - for filename in filenames: - LOG.debug('Collecting metadata from {}'.format(filename)) - with xr.open_dataset(filename) as ds: - midnight_line = np.float64(self._get_midnight_line(ds['acq_time'])) - eq_cross_lons, eq_cross_times = self._get_equator_crossings(ds) - rec = {'platform': ds.attrs['platform'].split('>')[-1].strip(), - 'start_time': ds['acq_time'].values[0], - 'end_time': ds['acq_time'].values[-1], - 'along_track': ds.dims['y'], - 'filename': filename, - 'orbit_number_start': ds.attrs['orbit_number_start'], - 'orbit_number_end': ds.attrs['orbit_number_end'], - 'equator_crossing_longitude_1': eq_cross_lons[0], - 'equator_crossing_time_1': eq_cross_times[0], - 'equator_crossing_longitude_2': eq_cross_lons[1], - 'equator_crossing_time_2': eq_cross_times[1], - 'midnight_line': midnight_line, - 'overlap_free_start': np.nan, - 'overlap_free_end': np.nan, - 'global_quality_flag': QualityFlags.OK} - records.append(rec) - return records + if os.path.isfile(CHECKPOINT_DB): + LOG.info("Restarting from checkpoint.") + db_url = 'sqlite:///{0}'.format(CHECKPOINT_DB) + engine = sqlalchemy.create_engine(db_url) + Base.metadata.create_all(engine) + # open session + Session = sessionmaker(bind=engine) + session = Session() + # get set of processed files in case of a restart + wanted = set(filenames) + done = set( + filename for filename, in session.query(FileMetadata.filename) + ) + todo = wanted.difference(done) + for i, filename in enumerate(todo): + metadata = self._extract_fromfile(filename) + session.add(metadata) + if i % CHECKPOINT_STEP == 0: + session.commit() + # clear session cache + session.expire_all() + session.commit() + session.close() + # load data into memory and remove the checkpoint database + metadata = pd.read_sql_table(FileMetadata.__tablename__, engine) + os.remove(CHECKPOINT_DB) + return metadata def _get_midnight_line(self, acq_time): """Find scanline where the UTC date increases by one day. diff --git a/setup.py b/setup.py index 202901f..015aa1f 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,8 @@ if __name__ == '__main__': requires = ['setuptools_scm', 'numpy', 'xarray >=0.15.1', 'pandas >=1.0.3', 'netCDF4', - 'h5py', 'pygac >=1.3.1', 'satpy >=0.21.0', 'pyyaml', 'trollsift'] + 'h5py', 'pygac >=1.3.1', 'satpy >=0.21.0', 'pyyaml', 'trollsift', + 'sqlalchemy'] README = open('README.md', 'r').read() setup(name='pygac-fdr', description='Python package for creating a Fundamental Data Record (FDR) of AVHRR GAC '