Source code for satpy.readers.oli_tirs_l1_tif

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2024 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""Landsat OLI/TIRS Level 1 reader.

Details of the data format can be found here:
  https://d9-wret.s3.us-west-2.amazonaws.com/assets/palladium/production/s3fs-public/atoms/files/LSDS-1822_Landsat8-9-OLI-TIRS-C2-L1-DFCB-v6.pdf
  https://www.usgs.gov/landsat-missions/using-usgs-landsat-level-1-data-product

NOTE: The scene geometry data (SZA, VZA, SAA, VAA) is retrieved from the L1 TIFF files, which are derived from Band 04.
The geometry differs between bands, so if you need precise geometry you should calculate this from the metadata instead.

"""

import logging
from datetime import datetime, timezone

import defusedxml.ElementTree as ET
import numpy as np
import xarray as xr

from satpy.readers.file_handlers import BaseFileHandler

logger = logging.getLogger(__name__)

PLATFORMS = {"08": "Landsat-8",
             "09": "Landsat-9"}

OLI_BANDLIST = ["B1", "B2", "B3", "B4", "B5", "B6", "B7", "B8", "B9"]
TIRS_BANDLIST = ["B10", "B11"]
PAN_BANDLIST = ["B8"]
ANGLIST = ["satellite_azimuth_angle",
           "satellite_zenith_angle",
           "solar_azimuth_angle",
           "solar_zenith_angle"]

ANGLIST_CHAN = ["sza", "saa", "vaa", "vza"]

BANDLIST = OLI_BANDLIST + TIRS_BANDLIST


[docs] class OLITIRSCHReader(BaseFileHandler): """File handler for Landsat L1 files (tif)."""
[docs] @staticmethod def get_btype(file_type): """Return the band type from the file type.""" pos = file_type.rfind("_") if pos == -1: raise ValueError(f"Invalid file type: {file_type}") else: return file_type[pos+1:]
@property def start_time(self): """Return start time.""" return self._mda.start_time @property def end_time(self): """Return end time.""" return self._mda.end_time def __init__(self, filename, filename_info, filetype_info, mda, **kwargs): """Initialize the reader.""" super().__init__(filename, filename_info, filetype_info) # Check we have landsat data if filename_info["platform_type"] != "L": raise ValueError("This reader only supports Landsat data") # Get the channel name self.channel = self.get_btype(filetype_info["file_type"]) # Data can be VIS, TIR or Combined. This flag denotes what the granule contains (O, T or C respectively). self.chan_selector = filename_info["data_type"] self._obs_date = filename_info["observation_date"] self._mda = mda # Retrieve some per-band useful metadata self.bsat = self._mda.band_saturation self.calinfo = self._mda.band_calibration self.platform_name = PLATFORMS[filename_info["spacecraft_id"]]
[docs] def get_dataset(self, key, info): """Load a dataset.""" if self.channel != key["name"] and self.channel not in ANGLIST_CHAN: raise ValueError(f"Requested channel {key['name']} does not match the reader channel {self.channel}") if key["name"] in OLI_BANDLIST and self.chan_selector not in ["O", "C"]: raise ValueError(f"Requested channel {key['name']} is not available in this granule") if key["name"] in TIRS_BANDLIST and self.chan_selector not in ["T", "C"]: raise ValueError(f"Requested channel {key['name']} is not available in this granule") logger.debug("Reading %s.", key["name"]) data = xr.open_dataarray(self.filename, engine="rasterio", chunks={"band": 1, "y": "auto", "x": "auto"}, mask_and_scale=False).squeeze() # The fill value for Landsat is '0', for calibration simplicity convert it to np.nan data.data = xr.where(data.data == 0, np.float32(np.nan), data.data) attrs = data.attrs.copy() # Add useful metadata to the attributes. attrs["perc_cloud_cover"] = self._mda.cloud_cover # Add platform / sensor attributes attrs["platform_name"] = self.platform_name attrs["sensor"] = "OLI_TIRS" # Apply attrs from YAML attrs["standard_name"] = info["standard_name"] attrs["units"] = info["units"] # Only OLI bands have a saturation flag if key["name"] in OLI_BANDLIST: attrs["saturated"] = self.bsat[key["name"]] # Rename to Satpy convention data = data.rename({"band": "bands"}) data.attrs.update(attrs) # Calibrate if we're using a band rather than a QA or geometry dataset if key["name"] in BANDLIST: data = self.calibrate(data, key["calibration"]) if key["name"] in ANGLIST: data.data = data.data * 0.01 return data
[docs] def calibrate(self, data, calibration): """Calibrate the data from counts into the desired units.""" if calibration == "counts": return data if calibration in ["radiance", "brightness_temperature"]: data.data = data.data * self.calinfo[self.channel][0] + self.calinfo[self.channel][1] if calibration == "radiance": data.data = data.data.astype(np.float32) return data if calibration == "reflectance": if int(self.channel[1:]) < 10: data.data = data.data * self.calinfo[self.channel][2] + self.calinfo[self.channel][3] data.data = data.data.astype(np.float32) * 100 return data if calibration == "brightness_temperature": if self.channel[1:] in ["10", "11"]: data.data = (self.calinfo[self.channel][3] / np.log((self.calinfo[self.channel][2] / data.data) + 1)) data.data = data.data.astype(np.float32) return data
[docs] def get_area_def(self, dsid): """Get area definition of the image from the metadata.""" return self._mda.build_area_def(dsid["name"])
[docs] class OLITIRSMDReader(BaseFileHandler): """File handler for Landsat L1 files (tif).""" def __init__(self, filename, filename_info, filetype_info): """Init the reader.""" super().__init__(filename, filename_info, filetype_info) # Check we have landsat data if filename_info["platform_type"] != "L": raise ValueError("This reader only supports Landsat data") self.platform_name = PLATFORMS[filename_info["spacecraft_id"]] self._obs_date = filename_info["observation_date"] self.root = ET.parse(self.filename) self.process_level = filename_info["process_level_correction"] import bottleneck # noqa import geotiepoints # noqa @property def center_time(self): """Return center time.""" return datetime.strptime(self.root.find(".//IMAGE_ATTRIBUTES/SCENE_CENTER_TIME").text[:-2], "%H:%M:%S.%f").replace(tzinfo=timezone.utc) @property def start_time(self): """Return start time. This is actually the scene center time, as we don't have the start time. It is constructed from the observation date (from the filename) and the center time (from the metadata). """ return datetime(self._obs_date.year, self._obs_date.month, self._obs_date.day, self.center_time.hour, self.center_time.minute, self.center_time.second, tzinfo=timezone.utc) @property def end_time(self): """Return end time. This is actually the scene center time, as we don't have the end time. It is constructed from the observation date (from the filename) and the center time (from the metadata). """ return datetime(self._obs_date.year, self._obs_date.month, self._obs_date.day, self.center_time.hour, self.center_time.minute, self.center_time.second, tzinfo=timezone.utc) @property def cloud_cover(self): """Return estimated granule cloud cover percentage.""" return float(self.root.find(".//IMAGE_ATTRIBUTES/CLOUD_COVER").text) def _get_satflag(self, band): """Return saturation flag for a band.""" flag = self.root.find(f".//IMAGE_ATTRIBUTES/SATURATION_BAND_{band}").text if flag == "Y": return True return False @property def band_saturation(self): """Return per-band saturation flag.""" bdict = {} for i in range(1, 10): bdict[f"B{i:01d}"] = self._get_satflag(i) return bdict def _get_band_radcal(self, band): """Get the radiance scale and offset values.""" rad_gain = float(self.root.find(f".//LEVEL1_RADIOMETRIC_RESCALING/RADIANCE_MULT_BAND_{band}").text) rad_add = float(self.root.find(f".//LEVEL1_RADIOMETRIC_RESCALING/RADIANCE_ADD_BAND_{band}").text) return rad_gain, rad_add def _get_band_viscal(self, band): """Return visible channel calibration info.""" rad_gain, rad_add = self._get_band_radcal(band) ref_gain = float(self.root.find(f".//LEVEL1_RADIOMETRIC_RESCALING/REFLECTANCE_MULT_BAND_{band}").text) ref_add = float(self.root.find(f".//LEVEL1_RADIOMETRIC_RESCALING/REFLECTANCE_ADD_BAND_{band}").text) return rad_gain, rad_add, ref_gain, ref_add def _get_band_tircal(self, band): """Return thermal channel calibration info.""" rad_gain, rad_add = self._get_band_radcal(band) bt_k1 = float(self.root.find(f".//LEVEL1_THERMAL_CONSTANTS/K1_CONSTANT_BAND_{band}").text) bt_k2 = float(self.root.find(f".//LEVEL1_THERMAL_CONSTANTS/K2_CONSTANT_BAND_{band}").text) return rad_gain, rad_add, bt_k1, bt_k2 @property def band_calibration(self): """Return per-band saturation flag.""" bdict = {} for i in range(1, 10): bdict[f"B{i:01d}"] = self._get_band_viscal(i) for i in range(10, 12): bdict[f"B{i:02d}"] = self._get_band_tircal(i) return bdict
[docs] def earth_sun_distance(self): """Return Earth-Sun distance.""" return float(self.root.find(".//IMAGE_ATTRIBUTES/EARTH_SUN_DISTANCE").text)
[docs] def build_area_def(self, bname): """Build area definition from metadata.""" from pyresample.geometry import AreaDefinition # Here we assume that the thermal bands have the same resolution as the reflective bands, # with only the panchromatic band (b08) having a different resolution. if bname in PAN_BANDLIST: pixoff = float(self.root.find(".//PROJECTION_ATTRIBUTES/GRID_CELL_SIZE_PANCHROMATIC").text) / 2. x_size = float(self.root.find(".//PROJECTION_ATTRIBUTES/PANCHROMATIC_SAMPLES").text) y_size = float(self.root.find(".//PROJECTION_ATTRIBUTES/PANCHROMATIC_LINES").text) else: pixoff = float(self.root.find(".//PROJECTION_ATTRIBUTES/GRID_CELL_SIZE_REFLECTIVE").text) / 2. x_size = float(self.root.find(".//PROJECTION_ATTRIBUTES/REFLECTIVE_SAMPLES").text) y_size = float(self.root.find(".//PROJECTION_ATTRIBUTES/REFLECTIVE_LINES").text) # Get remaining geoinfo from file datum = self.root.find(".//PROJECTION_ATTRIBUTES/DATUM").text utm_zone = int(self.root.find(".//PROJECTION_ATTRIBUTES/UTM_ZONE").text) utm_str = f"{utm_zone}N" # We need to subtract / add half a pixel from the corner to get the correct extent (pixel centers) ext_p1 = float(self.root.find(".//PROJECTION_ATTRIBUTES/CORNER_UL_PROJECTION_X_PRODUCT").text) - pixoff ext_p2 = float(self.root.find(".//PROJECTION_ATTRIBUTES/CORNER_LR_PROJECTION_Y_PRODUCT").text) - pixoff ext_p3 = float(self.root.find(".//PROJECTION_ATTRIBUTES/CORNER_LR_PROJECTION_X_PRODUCT").text) + pixoff ext_p4 = float(self.root.find(".//PROJECTION_ATTRIBUTES/CORNER_UL_PROJECTION_Y_PRODUCT").text) + pixoff # Create area definition pcs_id = f"{datum} / UTM zone {utm_str}" proj4_dict = {"proj": "utm", "zone": utm_zone, "datum": datum, "units": "m", "no_defs": None, "type": "crs"} area_extent = (ext_p1, ext_p2, ext_p3, ext_p4) # Return the area extent return AreaDefinition("geotiff_area", pcs_id, pcs_id, proj4_dict, x_size, y_size, area_extent)