Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reading EO-SIP LAC data #125

Merged
merged 14 commits into from
Mar 19, 2024
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
fail-fast: true
matrix:
os: ["ubuntu-latest"]
python-version: ["3.8", "3.9", "3.10"]
python-version: ["3.10", "3.11", "3.12"]
experimental: [false]

env:
Expand Down
35 changes: 19 additions & 16 deletions pygac/pod_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@
# choose the right header depending on the date
with file_opener(fileobj or filename) as fd_:
self.tbm_head, self.head = self.read_header(
filename, fileobj=fd_)
filename, fileobj=fd_, eosip_header=self.eosip_header)
if self.tbm_head:
tbm_offset = tbm_header.itemsize
else:
Expand All @@ -302,12 +302,13 @@
return self.head, self.scans

@classmethod
def read_header(cls, filename, fileobj=None):
def read_header(cls, filename, fileobj=None, eosip_header=False):
"""Read the file header.

Args:
filename (str): Path to GAC/LAC file
fileobj: An open file object to read from. (optional)
eosip_header: if the format to read is eosip, use only the latest header format, independently of the date.

Returns:
archive_header (struct): archive header
Expand All @@ -332,20 +333,22 @@
fd_.seek(0)
tbm_head = None
tbm_offset = 0
# read header
head0, = np.frombuffer(
fd_.read(header0.itemsize),
dtype=header0, count=1)
year, jday, _ = cls.decode_timestamps(head0["start_time"])
start_date = (datetime.date(year, 1, 1) +
datetime.timedelta(days=int(jday) - 1))
if start_date < datetime.date(1992, 9, 8):
header = header1
elif start_date <= datetime.date(1994, 11, 15):
header = header2
else:
header = header3
fd_.seek(tbm_offset, 0)
header = header3
if not eosip_header:

Check warning on line 337 in pygac/pod_reader.py

View check run for this annotation

Codecov / codecov/patch

pygac/pod_reader.py#L336-L337

Added lines #L336 - L337 were not covered by tests
# choose the appropriate header
head0, = np.frombuffer(

Check warning on line 339 in pygac/pod_reader.py

View check run for this annotation

Codecov / codecov/patch

pygac/pod_reader.py#L339

Added line #L339 was not covered by tests
fd_.read(header0.itemsize),
dtype=header0, count=1)
year, jday, _ = cls.decode_timestamps(head0["start_time"])
start_date = (datetime.date(year, 1, 1) +

Check warning on line 343 in pygac/pod_reader.py

View check run for this annotation

Codecov / codecov/patch

pygac/pod_reader.py#L342-L343

Added lines #L342 - L343 were not covered by tests
datetime.timedelta(days=int(jday) - 1))
if start_date < datetime.date(1992, 9, 8):
header = header1
elif start_date <= datetime.date(1994, 11, 15):
header = header2

Check warning on line 348 in pygac/pod_reader.py

View check run for this annotation

Codecov / codecov/patch

pygac/pod_reader.py#L345-L348

Added lines #L345 - L348 were not covered by tests
else:
header = header3
fd_.seek(tbm_offset, 0)

Check warning on line 351 in pygac/pod_reader.py

View check run for this annotation

Codecov / codecov/patch

pygac/pod_reader.py#L350-L351

Added lines #L350 - L351 were not covered by tests
mraspaud marked this conversation as resolved.
Show resolved Hide resolved
# need to copy frombuffer to have write access on head
head, = np.frombuffer(
fd_.read(header.itemsize),
Expand Down
26 changes: 16 additions & 10 deletions pygac/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class Reader(six.with_metaclass(ABCMeta)):

def __init__(self, interpolate_coords=True, adjust_clock_drift=True,
tle_dir=None, tle_name=None, tle_thresh=7, creation_site=None,
custom_calibration=None, calibration_file=None):
custom_calibration=None, calibration_file=None, eosip_header=False):
"""Init the reader.

Args:
Expand All @@ -122,6 +122,7 @@ def __init__(self, interpolate_coords=True, adjust_clock_drift=True,
self.creation_site = (creation_site or 'NSS').encode('utf-8')
self.custom_calibration = custom_calibration
self.calibration_file = calibration_file
self.eosip_header = eosip_header
self.head = None
self.scans = None
self.spacecraft_name = None
Expand Down Expand Up @@ -205,19 +206,24 @@ def _correct_data_set_name(cls, header, filename):
filename (str): path to file
"""
filename = str(filename)
data_set_name = header['data_set_name'].decode(errors='ignore')
if not cls.data_set_pattern.match(data_set_name):
LOG.debug('The data_set_name in header %s does not match.'
' Use filename instead.' % header['data_set_name'])
for encoding in "utf-8", "cp500":
sfinkens marked this conversation as resolved.
Show resolved Hide resolved
data_set_name = header['data_set_name'].decode(encoding, errors='ignore')
if not cls.data_set_pattern.match(data_set_name):
LOG.debug(f'The data_set_name in header {header["data_set_name"]} '
f'does not seem correct using encoding {encoding}.')
else:
header["data_set_name"] = data_set_name.encode()
break
else:
LOG.debug(f'The data_set_name in header {header["data_set_name"]} does not match.'
' Use filename instead.')
match = cls.data_set_pattern.search(filename)
if match:
data_set_name = match.group()
LOG.debug("Set data_set_name, to filename %s"
% data_set_name)
LOG.debug(f"Set data_set_name, to filename {data_set_name}")
header['data_set_name'] = data_set_name.encode()
else:
LOG.debug("header['data_set_name']=%s; filename='%s'"
% (header['data_set_name'], filename))
LOG.debug(f"header['data_set_name']={header['data_set_name']}; filename='{filename}'")
raise ReaderError('Cannot determine data_set_name!')
return header

Expand Down Expand Up @@ -274,7 +280,7 @@ def _read_scanlines(self, buffer, count):
"Expected %d scan lines, but found %d!"
% (count, line_count))
warnings.warn("Unexpected number of scanlines!",
category=RuntimeWarning)
category=RuntimeWarning, stacklevel=2)
self.scans = np.frombuffer(
buffer, dtype=self.scanline_type, count=line_count)

Expand Down
6 changes: 3 additions & 3 deletions pygac/tests/test_klm.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
class TestKLM:
"""Test the klm reader."""

def setup(self):
def setup_method(self):
"""Set up the tests."""
self.reader = GACKLMReader()

Expand Down Expand Up @@ -126,7 +126,7 @@ def test_quality_indicators(self):
class TestGACKLM:
"""Tests for gac klm."""

def setup(self):
def setup_method(self):
"""Set up the tests."""
self.reader = GACKLMReader()

Expand All @@ -150,7 +150,7 @@ def test_get_tsm_pixels(self, get_tsm_idx):
class TestLACKLM:
"""Tests for lac klm."""

def setup(self):
def setup_method(self):
"""Set up the tests."""
self.reader = LACKLMReader()
self.reader.scans = np.ones(100, dtype=scanline)
Expand Down
51 changes: 42 additions & 9 deletions pygac/tests/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,29 +174,62 @@ def test__validate_header(self):
head = {'data_set_name': b'\xea\xf8'}
self.reader._validate_header(head)

def test__correct_data_set_name(self):
def test__correct_data_set_name_ebcdic_encoded_header_invalid_path(self):
sfinkens marked this conversation as resolved.
Show resolved Hide resolved
"""Test the data_set_name correction in file header."""
inv_filename = 'InvalidFileName'
inv_filepath = 'path/to/' + inv_filename

expected_data_set_name = 'NSS.GHRR.TN.D80001.S0332.E0526.B0627173.WI'
val_head = {'data_set_name': 'NSS.GHRR.TN.D80001.S0332.E0526.B0627173.WI'.encode("cp500")}
head = self.reader._correct_data_set_name(val_head.copy(), inv_filepath)
assert head['data_set_name'] == expected_data_set_name.encode()

def test__correct_data_set_name_valid_header_and_file(self):
"""Test the data_set_name correction in file header."""
val_filename = 'NSS.GHRR.TN.D80001.S0332.E0526.B0627173.WI'
val_filepath = 'path/to/' + val_filename
val_head = {'data_set_name': b'NSS.GHRR.TN.D80001.S0332.E0526.B0627173.WI'}
inv_filename = 'InvalidFileName'
inv_filepath = 'path/to/' + inv_filename
inv_head = {'data_set_name': b'InvalidDataSetName'}
# Note: always pass a copy to _correct_data_set_name, because
# the input header is modified in place.
# enter a valid data_set_name and filepath
head = self.reader._correct_data_set_name(val_head.copy(), val_filepath)
assert head['data_set_name'] == val_filename.encode()

def test__correct_data_set_name_invalid_header_and_valid_file(self):
"""Test the data_set_name correction in file header."""
val_filename = 'NSS.GHRR.TN.D80001.S0332.E0526.B0627173.WI'
val_filepath = 'path/to/' + val_filename
inv_head = {'data_set_name': b'InvalidDataSetName'}

# enter an invalid data_set_name, but valid filepath
head = self.reader._correct_data_set_name(inv_head.copy(), val_filepath)
self.assertEqual(head['data_set_name'], val_filename.encode())
# enter an invalid data_set_name, and invalid filepath
assert head['data_set_name'] == val_filename.encode()

def test__correct_data_set_name_invalid_header_and_file(self):
"""Test the data_set_name correction in file header."""
inv_filename = 'InvalidFileName'
inv_filepath = 'path/to/' + inv_filename
inv_head = {'data_set_name': b'InvalidDataSetName'}
with self.assertRaisesRegex(ReaderError, 'Cannot determine data_set_name!'):
head = self.reader._correct_data_set_name(inv_head.copy(), inv_filepath)
_ = self.reader._correct_data_set_name(inv_head.copy(), inv_filepath)

def test__correct_data_set_name_valid_header_invalid_file(self):
"""Test the data_set_name correction in file header."""
val_head = {'data_set_name': b'NSS.GHRR.TN.D80001.S0332.E0526.B0627173.WI'}
inv_filename = 'InvalidFileName'
inv_filepath = 'path/to/' + inv_filename

# enter a valid data_set_name, and an invalid filepath
# should be fine, because the data_set_name is the pefered source
head = self.reader._correct_data_set_name(val_head.copy(), inv_filepath)
self.assertEqual(head['data_set_name'], val_head['data_set_name'])
# enter a valid data_set_name, and an FSFile/pathlib object as filepath
assert head['data_set_name'] == val_head['data_set_name']

def test__correct_data_set_name_valid_header_pathlib_file(self):
"""Test the data_set_name correction in file header."""
val_filename = 'NSS.GHRR.TN.D80001.S0332.E0526.B0627173.WI'
val_filepath = 'path/to/' + val_filename
val_head = {'data_set_name': b'NSS.GHRR.TN.D80001.S0332.E0526.B0627173.WI'}

fs_filepath = TestPath(val_filepath)
head = self.reader._correct_data_set_name(val_head.copy(), fs_filepath)
self.assertEqual(head['data_set_name'], val_filename.encode())
Expand Down
Loading