Source code for s3analysis.slstr.mdb.slstrmdb

"""
.. module::s3mdbreader.slstrreader

Specific functions for felyx S3 SLSTR match-up database handling

:copyright: Copyright 2016 Ifremer / Cersat.
:license: Released under GPL v3 license, see :ref:`license`.

.. sectionauthor:: Jeff Piolle <jfpiolle@ifremer.fr>
.. codeauthor:: Jeff Piolle <jfpiolle@ifremer.fr>
"""
import logging
import datetime
import os
from collections import OrderedDict

import numpy
import netCDF4 as netcdf

from pyfelyx.mdb import MDB
import s3analysis.slstr.cloud
import s3analysis.slstr.quality_level

# recommended cloud mask
DEFAULT_CLOUDMASK = [
    'visible',
    '1.37_threshold',
    '1.6_large_histogram',
    '2.25_large_histogram',
    'gross_cloud',
    'thin_cirrus',
    'medium_high',
    '11_12_view_difference',
    '3.7_11_view_difference',
    'fog_low_stratus',
    '11_spatial_coherence',
    ]

# labels for the different algorithm selection criteria
ALGO_LABEL = [
    'N 2-ch day', 'N 2-ch night', 'N 3-ch night',
    'D 2-ch day',  'D 2-ch night', 'D 3-ch night'
    ]
ALGO_CODE = ['N2d', 'N2n', 'N3n', 'D2d', 'D2n', 'D3n']


[docs]class SLSTRMDB(MDB): """ An abstraction class to read the SLSTR MDB records. It can be initialized in three different exclusive ways. Args: identifier (str, optional): configuration identifier. If provided, a configuration file named <identifier>.cfg must exist in your $HOME/.s3analysis/mdb directory. cfgfile (str, optional): explicit path to configuration file (exclusive with identifier) mdbroot (str, optional): the path to the root directory where the match-up files are stored (with YYYY/DDD subfolders) """
[docs] def load_config(self, identifier): config = super(SLSTRMDB, self).load_config(identifier) # default cloud mask try: self.cloud_mask = config.options('cloud_mask') except: self.cloud_mask = DEFAULT_CLOUDMASK # default prefixes for each dataset contained in the match-up files self.datasets = {k: v for k, v in config.items('full_identifier')} return config
[docs] def __str__(self, *args, **kwargs): return "\n".join([ super(SLSTRMDB, self).__str__(*args, **kwargs), "cloud mask : {}".format(self.cloud_mask) ])
[docs] def quality_level(self, matchups): """Return an experimental quality level""" for field in ['sst_algorithm_type', 'l2p_flags', 'sst_theoretical_uncertainty', 'dt_analysis']: if field not in matchups['WST']: raise ValueError( "missing field {} in matchups" .format(field) ) for field in ['cloud_in', "cloud_io"]: if field not in matchups['WCT']: raise ValueError( "missing field {} in matchups" .format(field) ) # compute WST cloud mask algo = matchups['WST']['sst_algorithm_type'] composite_mask = numpy.ma.masked_all( matchups['WCT']["cloud_in"].shape, matchups['WCT']["cloud_in"].dtype) composite_mask[algo < 4] = matchups['WCT']["cloud_in"][algo < 4] composite_mask[algo >= 4] = matchups['WCT']["cloud_io"][algo >= 4] new_cloud_mask = s3analysis.slstr.cloud.cloud_mask( composite_mask, bits=s3analysis.slstr.cloud.WST_MASK ) new_quality_level = s3analysis.slstr.quality_level.quality_level( matchups['WST']['l2p_flags'], new_cloud_mask, algo, matchups['WST']['sst_theoretical_uncertainty'], matchups['WST']['dt_analysis'], matchups['WST']['sea_surface_temperature'], ) return new_quality_level
[docs] def read_cloud_mask(source, day, matchup_root, end_day=None, box=False, max_distance=None, maskvar='S3A_SL_2_WCT__cloud_in', maskbits=None, multiple_mask=False): """Return a cloud mask from 'S3A_SL_2_WCT__cloud_in' flag Args: maskvar (str): the bitmask field name (ex: 'S3A_SL_2_WCT__cloud_in') maskbits (list, optional): list of bits to test for cloud mask extraction, if you don't want to use default values (preset for 'S3A_SL_2_WCT__cloud_in' variable), as a list of flag meanings. multiple_mask (boolean, optional): if False (default), return a single mask array. If True, return a mask array for each mask bit within a dictionary. Return: numpy.ndarray or dict of numpy.ndarray: an array of boolean (True is set when cloudy) if ``multiple_mask`` keyword is not set, or a dictionary of boolean arrays (one for each mask bit testes). """ cloud_flag = read_satellite_data(source, day, matchup_root, [maskvar], end_day=end_day, box=box, max_distance=max_distance) if len(cloud_flag[maskvar]) == 0: return numpy.ma.masked_all(()) # get cloud flag metadata mdbfiles = get_matchup_files([day], matchup_root, source) mdbf = netcdf.Dataset(mdbfiles[0]) cloudvar = mdbf.variables[maskvar] allmeanings = cloudvar.getncattr('flag_meanings') allmasks = cloudvar.getncattr('flag_masks') if not multiple_mask: return cloud_mask(cloud_flag[maskvar], allmeanings, allmasks, maskbits) else: bits = maskbits if bits is None: bits = DEFAULT_CLOUDMASK cloud_mask_dict = OrderedDict([]) for bit in bits: cloud_mask_dict[bit] = cloud_mask( cloud_flag[maskvar], allmeanings, allmasks, [bit]) return cloud_mask_dict
[docs] def check_fields(self, matchups, fields): """check that all required fields are in the matchup set""" for dataset in fields: for fieldname in fields[dataset]: if fieldname not in matchups[dataset]: raise ValueError( "field {} is missing in the match-up set for {}" .format(fieldname, dataset) )
[docs] def add_wind_speed(self, matchups): """Calculate and add the model wind speed""" self.check_fields(matchups, {"WCT": ["t_series", "u_wind_tx", "v_wind_tx"]}) # select wind at closest forecast time i = numpy.ma.abs(matchups['WCT']['t_series']).argmin(axis=1) wind_speed = numpy.ma.sqrt( matchups['WCT']['u_wind_tx'][numpy.arange(len(i)), i]**2 + matchups['WCT']['v_wind_tx'][numpy.arange(len(i)), i]**2 ) matchups['WCT']['wind_tx'] = wind_speed return matchups
[docs] def get_wct_selection(self, matchups, across_track_range=[0,1500], min_wind_speed=6.0, max_sat_zen=55., dt_thresh=5. ): """return a selection of the match-ups in the proper range of validity. Default settings as provided by G.Corlett """ self.check_fields(matchups, {"WCT": ["solar_zenith_tn", "dynamic_target_center_index"]}) # select wind at closest forecast time if 'wind_tx' not in matchups['WCT']: matchups = self.add_wind_speed(matchups) # fix warning with nan matchups['WCT']["sat_zenith_tn"] = numpy.ma.fix_invalid(matchups['WCT']["sat_zenith_tn"]) return ( (matchups['WCT']["dynamic_target_center_index"][:, 1] >= across_track_range[0]) & (matchups['WCT']["dynamic_target_center_index"][:, 1] <= across_track_range[1]) & (matchups['WCT']["wind_tx"] > min_wind_speed) & (matchups['WCT']["sat_zenith_tn"] <= max_sat_zen) )
[docs] def get_wst_selection(self, matchups, insitu, dt_thresh=5., min_quality_level=2 ): """return a selection of the match-ups in the proper range of validity. Default settings as provided by G.Corlett """ self.check_fields(matchups, {'WST': [ 'quality_level', 'sst_theoretical_uncertainty', 'dt_analysis' ] }) return ( (~numpy.ma.getmaskarray(matchups['WST']["sst_theoretical_uncertainty"])) & (matchups['WST']["quality_level"] >= min_quality_level) & (numpy.ma.fabs(matchups['WST']["dt_analysis"]) <= dt_thresh) & (~numpy.ma.getmaskarray(insitu["water_temperature"])) )
[docs] def get_algo_type_masks(self, matchups, use_wst_filter=True): """return a selection mask for each algorithm type Default settings as provided by G.Corlett Args: dt_threshold (float): maximum deviation to analysis """ self.check_fields(matchups, {"WCT": [ "solar_zenith_tn", "N2_sea_surface_temperature", "N3_sea_surface_temperature", "D2_sea_surface_temperature", "D3_sea_surface_temperature" ], 'WST': [ 'sst_algorithm_type', 'sst_theoretical_uncertainty', ] }) # fix warning with nan matchups['WCT']["solar_zenith_tn"] = numpy.ma.fix_invalid( matchups['WCT']["solar_zenith_tn"] ) n2day = ( (matchups['WCT']["solar_zenith_tn"] < 90) & ~numpy.ma.getmaskarray(matchups['WCT']["N2_sea_surface_temperature"]) ) if use_wst_filter: n2day = n2day & (matchups['WST']["sst_algorithm_type"] == 1) n2night = ( (matchups['WCT']["solar_zenith_tn"] > 90) & ~numpy.ma.getmaskarray(matchups['WCT']["N2_sea_surface_temperature"]) ) if use_wst_filter: n2night = n2night & (matchups['WST']["sst_algorithm_type"] == 1) n3night = ( (matchups['WCT']["solar_zenith_tn"] > 90) & ~numpy.ma.getmaskarray(matchups['WCT']["N3_sea_surface_temperature"]) ) if use_wst_filter: n3night = n3night & (matchups['WST']["sst_algorithm_type"] == 3) d2day = ( (matchups['WCT']["solar_zenith_tn"] < 90) & ~numpy.ma.getmaskarray(matchups['WCT']["D2_sea_surface_temperature"]) ) if use_wst_filter: d2day = d2day & (matchups['WST']["sst_algorithm_type"] == 4) d2night = ( (matchups['WCT']["solar_zenith_tn"] > 90) & ~numpy.ma.getmaskarray(matchups['WCT']["D2_sea_surface_temperature"]) ) if use_wst_filter: d2night = d2night & (matchups['WST']["sst_algorithm_type"] == 4) d3night = ( (matchups['WCT']["solar_zenith_tn"] > 90) & ~numpy.ma.getmaskarray(matchups['WCT']["D3_sea_surface_temperature"]) ) if use_wst_filter: d3night = d3night & (matchups['WST']["sst_algorithm_type"] == 5) return n2day, n2night, n3night, d2day, d2night, d3night
[docs] def extract_value(self, fieldname, data, row, cell): """extract the pixel pointed to by row and cell indices. Override here for specific cases """ #print row n = len(row) if fieldname in ['u_wind_tx', 'v_wind_tx']: # wind fields have a dummy extra dimensions and history return data[numpy.arange(n), :, 0, row, cell, ...] elif fieldname.endswith('_tx'): # meteo field have a dummy extra dimensions return data[numpy.arange(n), 0, row, cell, ...] else: return data[numpy.arange(n), row, cell, ...]
[docs] def read_aux_data(self, source, day, fields, end_day=None, filters=None): """ Read the auxiliary files generated by Gary Corlett Args: source (str): the site collections (cmems_drifter, cmems_argo, etc...) day (datetime): the date (or start date if end_date is also set) for which to retrieve the matchup in situ values matchup_root (str): the path to the root repository where the matchup files are stored. fields (list): a list of fields to read (remove the level suffix: for instance, use cmems_water_temperature instead of cmems_water_temperature_0 and the function will detect and reconstruct the two dimensional fields for you). Returns: a dict where keys are field names, and values the retrieved data """ if filters is None: filters = self.default_filters dates = [day] if end_day is not None: date = day + datetime.timedelta(days=1) while date <= end_day: dates.append(date) date += datetime.timedelta(days=1) result = OrderedDict([]) # get matchup files mdbfiles = self.get_matchup_files(dates, self.root, source) # get auxiliary matchup files prefix = "%s_AUX" % source auxfiles = self.get_matchup_files(dates, self.root, prefix) # if no files found, return empty arrays if len(mdbfiles) == 0: for field in fields: result[field] = numpy.ma.array(()) return result # concatenate all files for the same source and dataset for i, mdbfname in enumerate(mdbfiles): msg = "File: {}".format(mdbfname) logging.info(msg) # check mdb and aux file cover the same time period if (os.path.basename(mdbfname).split('_')[-1][0:16] != os.path.basename(auxfiles[i]).split('_')[-1][0:16]): raise IOError("mdb and aux file don't match: {} <> {}" .format(os.path.basename(mdbfname), os.path.basename(auxfiles[i]))) mdbf = netcdf.Dataset(mdbfname) # read the center pixel coordinates lat = mdbf.variables['lat'][:] lon = mdbf.variables['lon'][:] auxf = netcdf.Dataset(auxfiles[i]) # read in situ data for var in fields: # get indice of closest pixel to in situ, for each dataset indices, distances = self.get_closest_valid_pixel_indices( mdbf, self.reference, lat=lat, lon=lon, filters=filters ) i, j = indices data = self.read_insitu_field( mdbf, source, var, min_quality=min_quality, closest_to_surface=closest_to_surface ) if var in result: result[var] = numpy.ma.concatenate((result[var], data)) else: result[var] = numpy.ma.array(data) mdbf.close() return result