Source code for oggm.core.massbalance

"""Mass balance models"""
# Built ins
import logging
# External libs
import cftime
import numpy as np
import pandas as pd
import netCDF4
from scipy.interpolate import interp1d
from scipy import optimize as optimization
# Locals
import oggm.cfg as cfg
from oggm.cfg import SEC_IN_YEAR, SEC_IN_MONTH
from oggm.utils import (SuperclassMeta, lazy_property, floatyear_to_date,
                        date_to_floatyear, monthly_timeseries, ncDataset,
                        tolist, clip_min, clip_max, clip_array,
from oggm.exceptions import InvalidWorkflowError, InvalidParamsError
from oggm import entity_task

# Module logger
log = logging.getLogger(__name__)

[docs]class MassBalanceModel(object, metaclass=SuperclassMeta): """Interface and common logic for all mass balance models used in OGGM. All mass balance models should implement this interface. Attributes ---------- valid_bounds : [float, float] The altitudinal bounds where the MassBalanceModel is valid. This is necessary for automated ELA search. hemisphere : str, {'nh', 'sh'} Used for time handling when the hydrological year convention is used (``cfg.PARAMS['hydro_month_nh'] != 1``) rho : float, default: ``cfg.PARAMS['ice_density']`` Density of ice """
[docs] def __init__(self): """ Initialize.""" self.valid_bounds = None self.hemisphere = None self.rho = cfg.PARAMS['ice_density']
def __repr__(self): """String Representation of the mass balance model""" summary = ['<oggm.MassBalanceModel>'] summary += [' Class: ' + self.__class__.__name__] summary += [' Attributes:'] # Add all scalar attributes for k, v in self.__dict__.items(): if np.isscalar(v) and not k.startswith('_'): nbform = ' - {}: {}' if k == 'mu_star': nbform = ' - {}: {:.2f}' summary += [nbform.format(k, v)] return '\n'.join(summary) + '\n'
[docs] def get_monthly_mb(self, heights, year=None, fl_id=None, fls=None): """Monthly mass balance at given altitude(s) for a moment in time. Units: [m s-1], or meters of ice per second Note: `year` is optional because some simpler models have no time component. Parameters ---------- heights: ndarray the atitudes at which the mass balance will be computed year: float, optional the time (in the "hydrological floating year" convention) fl_id: float, optional the index of the flowline in the fls array (might be ignored by some MB models) fls: list of flowline instances, optional the flowlines array, in case the MB model implementation needs to know details about the glacier geometry at the moment the MB model is called Returns ------- the mass balance (same dim as `heights`) (units: [m s-1]) """ raise NotImplementedError()
[docs] def get_annual_mb(self, heights, year=None, fl_id=None, fls=None): """Like `self.get_monthly_mb()`, but for annual MB. For some simpler mass balance models ``get_monthly_mb()` and `get_annual_mb()`` can be equivalent. Units: [m s-1], or meters of ice per second Note: `year` is optional because some simpler models have no time component. Parameters ---------- heights: ndarray the altitudes at which the mass balance will be computed year: float, optional the time (in the "floating year" convention) fl_id: float, optional the index of the flowline in the fls array (might be ignored by some MB models) fls: list of flowline instances, optional the flowlines array, in case the MB model implementation needs to know details about the glacier geometry at the moment the MB model is called Returns ------- the mass balance (same dim as `heights`) (units: [m s-1]) """ raise NotImplementedError()
[docs] def get_specific_mb(self, heights=None, widths=None, fls=None, year=None): """Specific mb for this year and a specific glacier geometry. Units: [mm w.e. yr-1], or millimeter water equivalent per year Parameters ---------- heights: ndarray the altitudes at which the mass balance will be computed. Overridden by ``fls`` if provided widths: ndarray the widths of the flowline (necessary for the weighted average). Overridden by ``fls`` if provided fls: list of flowline instances, optional Another way to get heights and widths - overrides them if provided. year: float, optional the time (in the "hydrological floating year" convention) Returns ------- the specific mass balance (units: mm w.e. yr-1) """ if len(np.atleast_1d(year)) > 1: out = [self.get_specific_mb(heights=heights, widths=widths, fls=fls, year=yr) for yr in year] return np.asarray(out) if fls is not None: mbs = [] widths = [] for i, fl in enumerate(fls): _widths = fl.widths try: # For rect and parabola don't compute spec mb _widths = np.where(fl.thick > 0, _widths, 0) except AttributeError: pass widths = np.append(widths, _widths) mbs = np.append(mbs, self.get_annual_mb(fl.surface_h, fls=fls, fl_id=i, year=year)) else: mbs = self.get_annual_mb(heights, year=year) return weighted_average_1d(mbs, widths) * SEC_IN_YEAR * self.rho
[docs] def get_ela(self, year=None, **kwargs): """Compute the equilibrium line altitude for a given year. Parameters ---------- year: float, optional the time (in the "hydrological floating year" convention) **kwargs: any other keyword argument accepted by self.get_annual_mb Returns ------- the equilibrium line altitude (ELA, units: m) """ if len(np.atleast_1d(year)) > 1: return np.asarray([self.get_ela(year=yr, **kwargs) for yr in year]) if self.valid_bounds is None: raise ValueError('attribute `valid_bounds` needs to be ' 'set for the ELA computation.') # Check for invalid ELAs b0, b1 = self.valid_bounds if (np.any(~np.isfinite( self.get_annual_mb([b0, b1], year=year, **kwargs))) or (self.get_annual_mb([b0], year=year, **kwargs)[0] > 0) or (self.get_annual_mb([b1], year=year, **kwargs)[0] < 0)): return np.NaN def to_minimize(x): return (self.get_annual_mb([x], year=year, **kwargs)[0] * SEC_IN_YEAR * self.rho) return optimization.brentq(to_minimize, *self.valid_bounds, xtol=0.1)
[docs]class ScalarMassBalance(MassBalanceModel): """Constant mass balance, everywhere."""
[docs] def __init__(self, mb=0.): """ Initialize. Parameters ---------- mb : float Fix the mass balance to a certain value (unit: [mm w.e. yr-1]) """ super(ScalarMassBalance, self).__init__() self.hemisphere = 'nh' self.valid_bounds = [-2e4, 2e4] # in m self._mb = mb
def get_monthly_mb(self, heights, **kwargs): mb = np.asarray(heights) * 0 + self._mb return mb / SEC_IN_YEAR / self.rho def get_annual_mb(self, heights, **kwargs): mb = np.asarray(heights) * 0 + self._mb return mb / SEC_IN_YEAR / self.rho
[docs]class LinearMassBalance(MassBalanceModel): """Constant mass balance as a linear function of altitude. Attributes ---------- ela_h: float the equilibrium line altitude (units: [m]) grad: float the mass balance gradient (unit: [mm w.e. yr-1 m-1]) max_mb: float Cap the mass balance to a certain value (unit: [mm w.e. yr-1]) temp_bias """
[docs] def __init__(self, ela_h, grad=3., max_mb=None): """ Initialize. Parameters ---------- ela_h: float Equilibrium line altitude (units: [m]) grad: float Mass balance gradient (unit: [mm w.e. yr-1 m-1]) max_mb: float Cap the mass balance to a certain value (unit: [mm w.e. yr-1]) """ super(LinearMassBalance, self).__init__() self.hemisphere = 'nh' self.valid_bounds = [-1e4, 2e4] # in m self.orig_ela_h = ela_h self.ela_h = ela_h self.grad = grad self.max_mb = max_mb self._temp_bias = 0
@property def temp_bias(self): """Change the ELA following a simple rule: + 1K -> ELA + 150 m A "temperature bias" doesn't makes much sense in the linear MB context, but we implemented a simple empirical rule: + 1K -> ELA + 150 m """ return self._temp_bias @temp_bias.setter def temp_bias(self, value): self.ela_h = self.orig_ela_h + value * 150 self._temp_bias = value def get_monthly_mb(self, heights, **kwargs): mb = (np.asarray(heights) - self.ela_h) * self.grad if self.max_mb is not None: clip_max(mb, self.max_mb, out=mb) return mb / SEC_IN_YEAR / self.rho def get_annual_mb(self, heights, **kwargs): return self.get_monthly_mb(heights, **kwargs)
[docs]class PastMassBalance(MassBalanceModel): """Mass balance during the climate data period. Attributes ---------- temp_bias prcp_fac """
[docs] def __init__(self, gdir, mu_star=None, bias=None, filename='climate_historical', input_filesuffix='', repeat=False, ys=None, ye=None, check_calib_params=True): """Initialize. Parameters ---------- gdir : GlacierDirectory the glacier directory mu_star : float, optional set to the alternative value of mu* you want to use (the default is to use the calibrated value). bias : float, optional set to the alternative value of the calibration bias [mm we yr-1] you want to use (the default is to use the calibrated value) Note that this bias is *substracted* from the computed MB. Indeed: BIAS = MODEL_MB - REFERENCE_MB. filename : str, optional set to a different BASENAME if you want to use alternative climate data. input_filesuffix : str the file suffix of the input climate file repeat : bool Whether the climate period given by [ys, ye] should be repeated indefinitely in a circular way ys : int The start of the climate period where the MB model is valid (default: the period with available data) ye : int The end of the climate period where the MB model is valid (default: the period with available data) check_calib_params : bool OGGM will try hard not to use wrongly calibrated mu* by checking the parameters used during calibration and the ones you are using at run time. If they don't match, it will raise an error. Set to False to suppress this check. """ super(PastMassBalance, self).__init__() self.valid_bounds = [-1e4, 2e4] # in m if mu_star is None: df = gdir.read_json('local_mustar') mu_star = df['mu_star_glacierwide'] if check_calib_params: if not df['mu_star_allsame']: msg = ('You seem to use the glacier-wide mu* to compute ' 'the mass balance although this glacier has ' 'different mu* for its flowlines. Set ' '`check_calib_params=False` to prevent this ' 'error.') raise InvalidWorkflowError(msg) if bias is None: if cfg.PARAMS['use_bias_for_run']: df = gdir.read_json('local_mustar') bias = df['bias'] else: bias = 0. self.mu_star = mu_star self.bias = bias # Parameters self.t_solid = cfg.PARAMS['temp_all_solid'] self.t_liq = cfg.PARAMS['temp_all_liq'] self.t_melt = cfg.PARAMS['temp_melt'] prcp_fac = cfg.PARAMS['prcp_scaling_factor'] # check if valid prcp_fac is used if prcp_fac <= 0: raise InvalidParamsError('prcp_fac has to be above zero!') default_grad = cfg.PARAMS['temp_default_gradient'] # Check the climate related params to the GlacierDir to make sure if check_calib_params: mb_calib = gdir.get_climate_info()['mb_calib_params'] for k, v in mb_calib.items(): if v != cfg.PARAMS[k]: msg = ('You seem to use different mass balance parameters ' 'than used for the calibration: ' f"you use cfg.PARAMS['{k}']={cfg.PARAMS[k]} while " f"it was calibrated with cfg.PARAMS['{k}']={v}. " 'Set `check_calib_params=False` to ignore this ' 'warning.') raise InvalidWorkflowError(msg) # Public attrs self.hemisphere = gdir.hemisphere self.repeat = repeat # Private attrs # to allow prcp_fac to be changed after instantiation # prescribe the prcp_fac as it is instantiated self._prcp_fac = prcp_fac # same for temp bias self._temp_bias = 0. # Read file fpath = gdir.get_filepath(filename, filesuffix=input_filesuffix) with ncDataset(fpath, mode='r') as nc: # time time = nc.variables['time'] try: time = netCDF4.num2date(time[:], time.units) except ValueError: # This is for longer time series time = cftime.num2date(time[:], time.units, calendar='noleap') ny, r = divmod(len(time), 12) if r != 0: raise ValueError('Climate data should be N full years') # This is where we switch to hydro float year format # Last year gives the tone of the hydro year self.years = np.repeat(np.arange(time[-1].year - ny + 1, time[-1].year + 1), 12) pok = slice(None) # take all is default (optim) if ys is not None: pok = self.years >= ys if ye is not None: try: pok = pok & (self.years <= ye) except TypeError: pok = self.years <= ye self.years = self.years[pok] self.months = np.tile(np.arange(1, 13), ny)[pok] # Read timeseries and correct it self.temp = nc.variables['temp'][pok].astype(np.float64) + self._temp_bias self.prcp = nc.variables['prcp'][pok].astype(np.float64) * self._prcp_fac if 'gradient' in nc.variables: grad = nc.variables['gradient'][pok].astype(np.float64) # Security for stuff that can happen with local gradients g_minmax = cfg.PARAMS['temp_local_gradient_bounds'] grad = np.where(~np.isfinite(grad), default_grad, grad) grad = clip_array(grad, g_minmax[0], g_minmax[1]) else: grad = self.prcp * 0 + default_grad self.grad = grad self.ref_hgt = nc.ref_hgt self.ys = self.years[0] = self.years[-1]
# adds the possibility of changing prcp_fac # after instantiation with properly changing the prcp time series @property def prcp_fac(self): """Precipitation factor (default: cfg.PARAMS['prcp_scaling_factor']) Called factor to make clear that it is a multiplicative factor in contrast to the additive temperature bias """ return self._prcp_fac @prcp_fac.setter def prcp_fac(self, new_prcp_fac): # just to check that no invalid prcp_factors are used if np.any(np.asarray(new_prcp_fac) <= 0): raise InvalidParamsError('prcp_fac has to be above zero!') if len(np.atleast_1d(new_prcp_fac)) == 12: # OK so that's monthly stuff # We dirtily assume that user just used calendar month sm = cfg.PARAMS['hydro_month_' + self.hemisphere] new_prcp_fac = np.roll(new_prcp_fac, 13 - sm) new_prcp_fac = np.tile(new_prcp_fac, len(self.prcp) // 12) self.prcp *= new_prcp_fac / self._prcp_fac # update old prcp_fac in order that it can be updated again ... self._prcp_fac = new_prcp_fac @property def temp_bias(self): """Add a temperature bias to the time series""" return self._temp_bias @temp_bias.setter def temp_bias(self, new_temp_bias): if len(np.atleast_1d(new_temp_bias)) == 12: # OK so that's monthly stuff # We dirtily assume that user just used calendar month sm = cfg.PARAMS['hydro_month_' + self.hemisphere] new_temp_bias = np.roll(new_temp_bias, 13 - sm) new_temp_bias = np.tile(new_temp_bias, len(self.temp) // 12) self.temp += new_temp_bias - self._temp_bias # update old temp_bias in order that it can be updated again ... self._temp_bias = new_temp_bias def get_monthly_climate(self, heights, year=None): """Monthly climate information at given heights. Note that prcp is corrected with the precipitation factor and that all other model biases (temp and prcp) are applied. Returns ------- (temp, tempformelt, prcp, prcpsol) """ y, m = floatyear_to_date(year) if self.repeat: y = self.ys + (y - self.ys) % ( - self.ys + 1) if y < self.ys or y > raise ValueError('year {} out of the valid time bounds: ' '[{}, {}]'.format(y, self.ys, pok = np.where((self.years == y) & (self.months == m))[0][0] # Read already (temperature bias and precipitation factor corrected!) itemp = self.temp[pok] iprcp = self.prcp[pok] igrad = self.grad[pok] # For each height pixel: # Compute temp and tempformelt (temperature above melting threshold) npix = len(heights) temp = np.ones(npix) * itemp + igrad * (heights - self.ref_hgt) tempformelt = temp - self.t_melt clip_min(tempformelt, 0, out=tempformelt) # Compute solid precipitation from total precipitation prcp = np.ones(npix) * iprcp fac = 1 - (temp - self.t_solid) / (self.t_liq - self.t_solid) prcpsol = prcp * clip_array(fac, 0, 1) return temp, tempformelt, prcp, prcpsol def _get_2d_annual_climate(self, heights, year): # Avoid code duplication with a getter routine year = np.floor(year) if self.repeat: year = self.ys + (year - self.ys) % ( - self.ys + 1) if year < self.ys or year > raise ValueError('year {} out of the valid time bounds: ' '[{}, {}]'.format(year, self.ys, pok = np.where(self.years == year)[0] if len(pok) < 1: raise ValueError('Year {} not in record'.format(int(year))) # Read already (temperature bias and precipitation factor corrected!) itemp = self.temp[pok] iprcp = self.prcp[pok] igrad = self.grad[pok] # For each height pixel: # Compute temp and tempformelt (temperature above melting threshold) heights = np.asarray(heights) npix = len(heights) grad_temp = np.atleast_2d(igrad).repeat(npix, 0) grad_temp *= (heights.repeat(12).reshape(grad_temp.shape) - self.ref_hgt) temp2d = np.atleast_2d(itemp).repeat(npix, 0) + grad_temp temp2dformelt = temp2d - self.t_melt clip_min(temp2dformelt, 0, out=temp2dformelt) # Compute solid precipitation from total precipitation prcp = np.atleast_2d(iprcp).repeat(npix, 0) fac = 1 - (temp2d - self.t_solid) / (self.t_liq - self.t_solid) prcpsol = prcp * clip_array(fac, 0, 1) return temp2d, temp2dformelt, prcp, prcpsol def get_annual_climate(self, heights, year=None): """Annual climate information at given heights. Note that prcp is corrected with the precipitation factor and that all other model biases (temp and prcp) are applied. Returns ------- (temp, tempformelt, prcp, prcpsol) """ t, tmelt, prcp, prcpsol = self._get_2d_annual_climate(heights, year) return (t.mean(axis=1), tmelt.sum(axis=1), prcp.sum(axis=1), prcpsol.sum(axis=1)) def get_monthly_mb(self, heights, year=None, add_climate=False, **kwargs): t, tmelt, prcp, prcpsol = self.get_monthly_climate(heights, year=year) mb_month = prcpsol - self.mu_star * tmelt mb_month -= self.bias * SEC_IN_MONTH / SEC_IN_YEAR if add_climate: return (mb_month / SEC_IN_MONTH / self.rho, t, tmelt, prcp, prcpsol) return mb_month / SEC_IN_MONTH / self.rho def get_annual_mb(self, heights, year=None, add_climate=False, **kwargs): t, tmelt, prcp, prcpsol = self._get_2d_annual_climate(heights, year) mb_annual = np.sum(prcpsol - self.mu_star * tmelt, axis=1) mb_annual = (mb_annual - self.bias) / SEC_IN_YEAR / self.rho if add_climate: return (mb_annual, t.mean(axis=1), tmelt.sum(axis=1), prcp.sum(axis=1), prcpsol.sum(axis=1)) return mb_annual
[docs]class ConstantMassBalance(MassBalanceModel): """Constant mass balance during a chosen period. This is useful for equilibrium experiments. Note that is is the "correct" way to represent the average mass balance over a given period. See: Attributes ---------- y0 : int the center year of the period halfsize : int the halfsize of the period years : ndarray the years of the period """
[docs] def __init__(self, gdir, mu_star=None, bias=None, y0=None, halfsize=15, filename='climate_historical', input_filesuffix='', **kwargs): """Initialize Parameters ---------- gdir : GlacierDirectory the glacier directory mu_star : float, optional set to the alternative value of mu* you want to use (the default is to use the calibrated value) bias : float, optional set to the alternative value of the annual bias [mm we yr-1] you want to use (the default is to use the calibrated value) y0 : int, optional, default: tstar the year at the center of the period of interest. The default is to use tstar as center. halfsize : int, optional the half-size of the time window (window size = 2 * halfsize + 1) filename : str, optional set to a different BASENAME if you want to use alternative climate data. input_filesuffix : str the file suffix of the input climate file """ super(ConstantMassBalance, self).__init__() self.mbmod = PastMassBalance(gdir, mu_star=mu_star, bias=bias, filename=filename, input_filesuffix=input_filesuffix, **kwargs) if y0 is None: df = gdir.read_json('local_mustar') y0 = df['t_star'] # This is a quick'n dirty optimisation try: fls = gdir.read_pickle('model_flowlines') h = [] for fl in fls: # We use bed because of overdeepenings h = np.append(h, fl.bed_h) h = np.append(h, fl.surface_h) zminmax = np.round([np.min(h)-50, np.max(h)+2000]) except FileNotFoundError: # in case we don't have them with ncDataset(gdir.get_filepath('gridded_data')) as nc: if np.isfinite(nc.min_h_dem): # a bug sometimes led to non-finite zminmax = [nc.min_h_dem-250, nc.max_h_dem+1500] else: zminmax = [nc.min_h_glacier-1250, nc.max_h_glacier+1500] self.hbins = np.arange(*zminmax, step=10) self.valid_bounds = self.hbins[[0, -1]] self.y0 = y0 self.halfsize = halfsize self.years = np.arange(y0-halfsize, y0+halfsize+1) self.hemisphere = gdir.hemisphere
@property def temp_bias(self): """Temperature bias to add to the original series.""" return self.mbmod.temp_bias @temp_bias.setter def temp_bias(self, value): for attr_name in ['_lazy_interp_yr', '_lazy_interp_m']: if hasattr(self, attr_name): delattr(self, attr_name) self.mbmod.temp_bias = value @property def prcp_fac(self): """Precipitation factor to apply to the original series.""" return self.mbmod.prcp_fac @prcp_fac.setter def prcp_fac(self, value): for attr_name in ['_lazy_interp_yr', '_lazy_interp_m']: if hasattr(self, attr_name): delattr(self, attr_name) self.mbmod.prcp_fac = value @property def bias(self): """Residual bias to apply to the original series.""" return self.mbmod.bias @bias.setter def bias(self, value): self.mbmod.bias = value @lazy_property def interp_yr(self): # annual MB mb_on_h = self.hbins*0. for yr in self.years: mb_on_h += self.mbmod.get_annual_mb(self.hbins, year=yr) return interp1d(self.hbins, mb_on_h / len(self.years)) @lazy_property def interp_m(self): # monthly MB months = np.arange(12)+1 interp_m = [] for m in months: mb_on_h = self.hbins*0. for yr in self.years: yr = date_to_floatyear(yr, m) mb_on_h += self.mbmod.get_monthly_mb(self.hbins, year=yr) interp_m.append(interp1d(self.hbins, mb_on_h / len(self.years))) return interp_m def get_monthly_climate(self, heights, year=None): """Average climate information at given heights. Note that prcp is corrected with the precipitation factor and that all other biases (precipitation, temp) are applied Returns ------- (temp, tempformelt, prcp, prcpsol) """ _, m = floatyear_to_date(year) yrs = [date_to_floatyear(y, m) for y in self.years] heights = np.atleast_1d(heights) nh = len(heights) shape = (len(yrs), nh) temp = np.zeros(shape) tempformelt = np.zeros(shape) prcp = np.zeros(shape) prcpsol = np.zeros(shape) for i, yr in enumerate(yrs): t, tm, p, ps = self.mbmod.get_monthly_climate(heights, year=yr) temp[i, :] = t tempformelt[i, :] = tm prcp[i, :] = p prcpsol[i, :] = ps return (np.mean(temp, axis=0), np.mean(tempformelt, axis=0), np.mean(prcp, axis=0), np.mean(prcpsol, axis=0)) def get_annual_climate(self, heights, year=None): """Average climate information at given heights. Note that prcp is corrected with the precipitation factor and that all other biases (precipitation, temp) are applied Returns ------- (temp, tempformelt, prcp, prcpsol) """ yrs = monthly_timeseries(self.years[0], self.years[-1], include_last_year=True) heights = np.atleast_1d(heights) nh = len(heights) shape = (len(yrs), nh) temp = np.zeros(shape) tempformelt = np.zeros(shape) prcp = np.zeros(shape) prcpsol = np.zeros(shape) for i, yr in enumerate(yrs): t, tm, p, ps = self.mbmod.get_monthly_climate(heights, year=yr) temp[i, :] = t tempformelt[i, :] = tm prcp[i, :] = p prcpsol[i, :] = ps # Note that we do not weight for number of days per month: # this is consistent with OGGM's calendar return (np.mean(temp, axis=0), np.mean(tempformelt, axis=0) * 12, np.mean(prcp, axis=0) * 12, np.mean(prcpsol, axis=0) * 12) def get_monthly_mb(self, heights, year=None, add_climate=False, **kwargs): yr, m = floatyear_to_date(year) if add_climate: t, tmelt, prcp, prcpsol = self.get_monthly_climate(heights, year=year) return self.interp_m[m-1](heights), t, tmelt, prcp, prcpsol return self.interp_m[m-1](heights) def get_annual_mb(self, heights, year=None, add_climate=False, **kwargs): mb = self.interp_yr(heights) if add_climate: t, tmelt, prcp, prcpsol = self.get_annual_climate(heights) return mb, t, tmelt, prcp, prcpsol return mb
[docs]class AvgClimateMassBalance(ConstantMassBalance): """Mass balance with the average climate of a selected period (wrong!). !!!Careful! This is conceptually wrong!!! This is here only to make a point. See: """
[docs] def __init__(self, gdir, mu_star=None, bias=None, filename='climate_historical', input_filesuffix='', y0=None, halfsize=15, **kwargs): """Initialize. Parameters ---------- gdir : GlacierDirectory the glacier directory mu_star : float, optional set to the alternative value of mu* you want to use (the default is to use the calibrated value). bias : float, optional set to the alternative value of the calibration bias [mm we yr-1] you want to use (the default is to use the calibrated value) Note that this bias is *substracted* from the computed MB. Indeed: BIAS = MODEL_MB - REFERENCE_MB. filename : str, optional set to a different BASENAME if you want to use alternative climate data. input_filesuffix : str the file suffix of the input climate file y0 : int, optional, default: tstar the year at the center of the period of interest. The default is to use tstar as center. halfsize : int, optional the half-size of the time window (window size = 2 * halfsize + 1) """ super(AvgClimateMassBalance, self).__init__(gdir, mu_star=mu_star, bias=bias, filename=filename, input_filesuffix=input_filesuffix, y0=y0, halfsize=halfsize) if y0 is None: df = gdir.read_json('local_mustar') y0 = df['t_star'] self.mbmod = PastMassBalance(gdir, mu_star=mu_star, bias=bias, filename=filename, input_filesuffix=input_filesuffix, ys=y0-halfsize, ye=y0+halfsize, **kwargs) tmp = self.mbmod.temp assert (len(tmp) // 12) == (halfsize * 2 + 1) self.mbmod.temp = tmp.reshape((len(tmp) // 12, 12)).mean(axis=0) tmp = self.mbmod.prcp self.mbmod.prcp = tmp.reshape((len(tmp) // 12, 12)).mean(axis=0) tmp = self.mbmod.grad self.mbmod.grad = tmp.reshape((len(tmp) // 12, 12)).mean(axis=0) self.mbmod.ys = y0 = y0 self.mbmod.months = np.arange(1, 13, dtype=int) self.mbmod.years = np.asarray([y0]*12) self.years = np.asarray([y0]*12)
[docs]class RandomMassBalance(MassBalanceModel): """Random shuffle of all MB years within a given time period. This is useful for finding a possible past glacier state or for sensitivity experiments. Note that this is going to be sensitive to extreme years in certain periods, but it is by far more physically reasonable than other approaches based on gaussian assumptions. """
[docs] def __init__(self, gdir, mu_star=None, bias=None, y0=None, halfsize=15, seed=None, filename='climate_historical', input_filesuffix='', all_years=False, unique_samples=False, prescribe_years=None, **kwargs): """Initialize. Parameters ---------- gdir : GlacierDirectory the glacier directory mu_star : float, optional set to the alternative value of mu* you want to use (the default is to use the calibrated value) bias : float, optional set to the alternative value of the calibration bias [mm we yr-1] you want to use (the default is to use the calibrated value) Note that this bias is *substracted* from the computed MB. Indeed: BIAS = MODEL_MB - REFERENCE_MB. y0 : int, optional, default: tstar the year at the center of the period of interest. The default is to use tstar as center. halfsize : int, optional the half-size of the time window (window size = 2 * halfsize + 1) seed : int, optional Random seed used to initialize the pseudo-random number generator. filename : str, optional set to a different BASENAME if you want to use alternative climate data. input_filesuffix : str the file suffix of the input climate file all_years : bool if True, overwrites ``y0`` and ``halfsize`` to use all available years. unique_samples: bool if true, chosen random mass balance years will only be available once per random climate period-length if false, every model year will be chosen from the random climate period with the same probability prescribe_years : pandas Series instead of random samples, take a series of (i, y) pairs where (i) is the simulation year index and (y) is the year to pick in the original timeseries. Overrides `y0`, `halfsize`, `all_years`, `unique_samples` and `seed`. **kwargs: kyeword arguments to pass to the PastMassBalance model """ super(RandomMassBalance, self).__init__() self.valid_bounds = [-1e4, 2e4] # in m self.mbmod = PastMassBalance(gdir, mu_star=mu_star, bias=bias, filename=filename, input_filesuffix=input_filesuffix, **kwargs) # Climate period self.prescribe_years = prescribe_years if self.prescribe_years is None: # Normal stuff self.rng = np.random.RandomState(seed) if all_years: self.years = self.mbmod.years else: if y0 is None: df = gdir.read_json('local_mustar') y0 = df['t_star'] self.years = np.arange(y0 - halfsize, y0 + halfsize + 1) else: self.rng = None self.years = self.prescribe_years.index self.yr_range = (self.years[0], self.years[-1] + 1) self.ny = len(self.years) self.hemisphere = gdir.hemisphere self._state_yr = dict() # Sampling without replacement self.unique_samples = unique_samples if self.unique_samples: self.sampling_years = self.years
@property def temp_bias(self): """Temperature bias to add to the original series.""" return self.mbmod.temp_bias @temp_bias.setter def temp_bias(self, value): """Temperature bias to add to the original series.""" for attr_name in ['_lazy_interp_yr', '_lazy_interp_m']: if hasattr(self, attr_name): delattr(self, attr_name) self.mbmod.temp_bias = value @property def prcp_fac(self): """Precipitation factor to apply to the original series.""" return self.mbmod.prcp_fac @prcp_fac.setter def prcp_fac(self, value): """Precipitation factor to apply to the original series.""" for attr_name in ['_lazy_interp_yr', '_lazy_interp_m']: if hasattr(self, attr_name): delattr(self, attr_name) self.mbmod.prcp_fac = value @property def bias(self): """Residual bias to apply to the original series.""" return self.mbmod.bias @bias.setter def bias(self, value): """Residual bias to apply to the original series.""" self.mbmod.bias = value def get_state_yr(self, year=None): """For a given year, get the random year associated to it.""" year = int(year) if year not in self._state_yr: if self.prescribe_years is not None: self._state_yr[year] = self.prescribe_years.loc[year] else: if self.unique_samples: # --- Sampling without replacement --- if self.sampling_years.size == 0: # refill sample pool when all years were picked once self.sampling_years = self.years # choose one year which was not used in the current period _sample = self.rng.choice(self.sampling_years) # write chosen year to dictionary self._state_yr[year] = _sample # update sample pool: remove the chosen year from it self.sampling_years = np.delete( self.sampling_years, np.where(self.sampling_years == _sample)) else: # --- Sampling with replacement --- self._state_yr[year] = self.rng.randint(*self.yr_range) return self._state_yr[year] def get_monthly_mb(self, heights, year=None, **kwargs): ryr, m = floatyear_to_date(year) ryr = date_to_floatyear(self.get_state_yr(ryr), m) return self.mbmod.get_monthly_mb(heights, year=ryr, **kwargs) def get_annual_mb(self, heights, year=None, **kwargs): ryr = self.get_state_yr(int(year)) return self.mbmod.get_annual_mb(heights, year=ryr, **kwargs)
[docs]class UncertainMassBalance(MassBalanceModel): """Adding uncertainty to a mass balance model. There are three variables for which you can add uncertainty: - temperature (additive bias) - precipitation (multiplicative factor) - residual (a bias in units of MB) """
[docs] def __init__(self, basis_model, rdn_temp_bias_seed=None, rdn_temp_bias_sigma=0.1, rdn_prcp_fac_seed=None, rdn_prcp_fac_sigma=0.1, rdn_bias_seed=None, rdn_bias_sigma=100): """Initialize. Parameters ---------- basis_model : MassBalanceModel the model to which you want to add the uncertainty to rdn_temp_bias_seed : int the seed of the random number generator rdn_temp_bias_sigma : float the standard deviation of the random temperature error rdn_prcp_fac_seed : int the seed of the random number generator rdn_prcp_fac_sigma : float the standard deviation of the random precipitation error (to be consistent this should be renamed prcp_fac as well) rdn_bias_seed : int the seed of the random number generator rdn_bias_sigma : float the standard deviation of the random MB error """ super(UncertainMassBalance, self).__init__() # the aim here is to change temp_bias and prcp_fac so self.mbmod = basis_model self.hemisphere = basis_model.hemisphere self.valid_bounds = self.mbmod.valid_bounds self.rng_temp = np.random.RandomState(rdn_temp_bias_seed) self.rng_prcp = np.random.RandomState(rdn_prcp_fac_seed) self.rng_bias = np.random.RandomState(rdn_bias_seed) self._temp_sigma = rdn_temp_bias_sigma self._prcp_sigma = rdn_prcp_fac_sigma self._bias_sigma = rdn_bias_sigma self._state_temp = dict() self._state_prcp = dict() self._state_bias = dict()
@property def temp_bias(self): """Temperature bias to add to the original series.""" return self.mbmod.temp_bias @temp_bias.setter def temp_bias(self, value): """Temperature bias to add to the original series.""" for attr_name in ['_lazy_interp_yr', '_lazy_interp_m']: if hasattr(self, attr_name): delattr(self, attr_name) self.mbmod.temp_bias = value @property def prcp_fac(self): """Precipitation factor to apply to the original series.""" return self.mbmod.prcp_fac @prcp_fac.setter def prcp_fac(self, value): """Precipitation factor to apply to the original series.""" self.mbmod.prcp_fac = value def _get_state_temp(self, year): year = int(year) if year not in self._state_temp: self._state_temp[year] = self.rng_temp.randn() * self._temp_sigma return self._state_temp[year] def _get_state_prcp(self, year): year = int(year) if year not in self._state_prcp: self._state_prcp[year] = self.rng_prcp.randn() * self._prcp_sigma return self._state_prcp[year] def _get_state_bias(self, year): year = int(year) if year not in self._state_bias: self._state_bias[year] = self.rng_bias.randn() * self._bias_sigma return self._state_bias[year] def get_monthly_mb(self, heights, year=None, **kwargs): raise NotImplementedError() def get_annual_mb(self, heights, year=None, fl_id=None, **kwargs): # Keep the original biases and add a random error _t = self.mbmod.temp_bias _p = self.mbmod.prcp_fac _b = self.mbmod.bias self.mbmod.temp_bias = self._get_state_temp(year) + _t self.mbmod.prcp_fac = self._get_state_prcp(year) + _p self.mbmod.bias = self._get_state_bias(year) + _b try: out = self.mbmod.get_annual_mb(heights, year=year, fl_id=fl_id) except BaseException: self.mbmod.temp_bias = _t self.mbmod.prcp_fac = _p self.mbmod.bias = _b raise # Back to normal self.mbmod.temp_bias = _t self.mbmod.prcp_fac = _p self.mbmod.bias = _b return out
[docs]class MultipleFlowlineMassBalance(MassBalanceModel): """Handle mass balance at the glacier level instead of flowline level. Convenience class doing not much more than wrapping a list of mass balance models, one for each flowline. This is useful for real-case studies, where each flowline might have a different mu*. Attributes ---------- fls : list list of flowline objects mb_models : list list of mass balance objects """
[docs] def __init__(self, gdir, fls=None, mu_star=None, mb_model_class=PastMassBalance, use_inversion_flowlines=False, input_filesuffix='', bias=None, **kwargs): """Initialize. Parameters ---------- gdir : GlacierDirectory the glacier directory mu_star : float or list of floats, optional set to the alternative value of mu* you want to use (the default is to use the calibrated value). Give a list of values for flowline-specific mu* fls : list, optional list of flowline objects to use (defaults to 'model_flowlines', and if not available, to 'inversion_flowlines') mb_model_class : class, optional the mass balance model to use (e.g. PastMassBalance, ConstantMassBalance...) use_inversion_flowlines: bool, optional if True 'inversion_flowlines' instead of 'model_flowlines' will be used. input_filesuffix : str the file suffix of the input climate file bias : float, optional set to the alternative value of the calibration bias [mm we yr-1] you want to use (the default is to use the calibrated value) Note that this bias is *substracted* from the computed MB. Indeed: BIAS = MODEL_MB - REFERENCE_MB. kwargs : kwargs to pass to mb_model_class """ # Read in the flowlines if use_inversion_flowlines: fls = gdir.read_pickle('inversion_flowlines') if fls is None: try: fls = gdir.read_pickle('model_flowlines') except FileNotFoundError: raise InvalidWorkflowError('Need a valid `model_flowlines` ' 'file. If you explicitly want to ' 'use `inversion_flowlines`, set ' 'use_inversion_flowlines=True.') self.fls = fls _y0 = kwargs.get('y0', None) # User mu*? if mu_star is not None: mu_star = tolist(mu_star, length=len(fls)) for fl, mu in zip(self.fls, mu_star): fl.mu_star = mu # Initialise the mb models self.flowline_mb_models = [] for fl in self.fls: # Merged glaciers will need different climate files, use filesuffix if (fl.rgi_id is not None) and (fl.rgi_id != gdir.rgi_id): rgi_filesuffix = '_' + fl.rgi_id + input_filesuffix else: rgi_filesuffix = input_filesuffix # merged glaciers also have a different MB bias from calibration if ((bias is None) and cfg.PARAMS['use_bias_for_run'] and (fl.rgi_id != gdir.rgi_id)): df = gdir.read_json('local_mustar', filesuffix='_' + fl.rgi_id) fl_bias = df['bias'] else: fl_bias = bias # Constant and RandomMassBalance need y0 if not provided if (issubclass(mb_model_class, RandomMassBalance) or issubclass(mb_model_class, ConstantMassBalance)) and ( fl.rgi_id != gdir.rgi_id) and (_y0 is None): df = gdir.read_json('local_mustar', filesuffix='_' + fl.rgi_id) kwargs['y0'] = df['t_star'] self.flowline_mb_models.append( mb_model_class(gdir, mu_star=fl.mu_star, bias=fl_bias, input_filesuffix=rgi_filesuffix, **kwargs)) self.valid_bounds = self.flowline_mb_models[-1].valid_bounds self.hemisphere = gdir.hemisphere
@property def temp_bias(self): """Temperature bias to add to the original series.""" return self.flowline_mb_models[0].temp_bias @temp_bias.setter def temp_bias(self, value): """Temperature bias to add to the original series.""" for mbmod in self.flowline_mb_models: mbmod.temp_bias = value @property def prcp_fac(self): """Precipitation factor to apply to the original series.""" return self.flowline_mb_models[0].prcp_fac @prcp_fac.setter def prcp_fac(self, value): """Precipitation factor to apply to the original series.""" for mbmod in self.flowline_mb_models: mbmod.prcp_fac = value @property def bias(self): """Residual bias to apply to the original series.""" return self.flowline_mb_models[0].bias @bias.setter def bias(self, value): """Residual bias to apply to the original series.""" for mbmod in self.flowline_mb_models: mbmod.bias = value def get_monthly_mb(self, heights, year=None, fl_id=None, **kwargs): if fl_id is None: raise ValueError('`fl_id` is required for ' 'MultipleFlowlineMassBalance!') return self.flowline_mb_models[fl_id].get_monthly_mb(heights, year=year, **kwargs) def get_annual_mb(self, heights, year=None, fl_id=None, **kwargs): if fl_id is None: raise ValueError('`fl_id` is required for ' 'MultipleFlowlineMassBalance!') return self.flowline_mb_models[fl_id].get_annual_mb(heights, year=year, **kwargs) def get_annual_mb_on_flowlines(self, fls=None, year=None): """Get the MB on all points of the glacier at once. Parameters ---------- fls: list, optional the list of flowlines to get the mass balance from. Defaults to self.fls year: float, optional the time (in the "floating year" convention) Returns ------- Tuple of (heights, widths, mass_balance) 1D arrays """ if fls is None: fls = self.fls heights = [] widths = [] mbs = [] for i, fl in enumerate(fls): h = fl.surface_h heights = np.append(heights, h) widths = np.append(widths, fl.widths) mbs = np.append(mbs, self.get_annual_mb(h, year=year, fl_id=i)) return heights, widths, mbs def get_specific_mb(self, heights=None, widths=None, fls=None, year=None): if heights is not None or widths is not None: raise ValueError('`heights` and `widths` kwargs do not work with ' 'MultipleFlowlineMassBalance!') if fls is None: fls = self.fls if len(np.atleast_1d(year)) > 1: out = [self.get_specific_mb(fls=fls, year=yr) for yr in year] return np.asarray(out) mbs = [] widths = [] for i, (fl, mb_mod) in enumerate(zip(fls, self.flowline_mb_models)): _widths = fl.widths try: # For rect and parabola don't compute spec mb _widths = np.where(fl.thick > 0, _widths, 0) except AttributeError: pass widths = np.append(widths, _widths) mb = mb_mod.get_annual_mb(fl.surface_h, year=year, fls=fls, fl_id=i) mbs = np.append(mbs, mb * SEC_IN_YEAR * mb_mod.rho) return weighted_average_1d(mbs, widths) def get_ela(self, year=None, **kwargs): # ELA here is not without ambiguity. # We compute a mean weighted by area. if len(np.atleast_1d(year)) > 1: return np.asarray([self.get_ela(year=yr) for yr in year]) elas = [] areas = [] for fl_id, (fl, mb_mod) in enumerate(zip(self.fls, self.flowline_mb_models)): elas = np.append(elas, mb_mod.get_ela(year=year, fl_id=fl_id, fls=self.fls)) areas = np.append(areas, np.sum(fl.widths)) return weighted_average_1d(elas, areas)
[docs]@entity_task(log) def fixed_geometry_mass_balance(gdir, ys=None, ye=None, years=None, monthly_step=False, use_inversion_flowlines=True, climate_filename='climate_historical', climate_input_filesuffix='', temperature_bias=None, precipitation_factor=None): """Computes the mass balance with climate input from e.g. CRU or a GCM. Parameters ---------- gdir : :py:class:`oggm.GlacierDirectory` the glacier directory to process ys : int start year of the model run (default: from the climate file) date) ye : int end year of the model run (default: from the climate file) years : array of ints override ys and ye with the years of your choice monthly_step : bool whether to store the diagnostic data at a monthly time step or not (default is yearly) use_inversion_flowlines : bool whether to use the inversion flowlines or the model flowlines climate_filename : str name of the climate file, e.g. 'climate_historical' (default) or 'gcm_data' climate_input_filesuffix: str filesuffix for the input climate file temperature_bias : float add a bias to the temperature timeseries precipitation_factor: float multiply a factor to the precipitation time series default is None and means that the precipitation factor from the calibration is applied which is cfg.PARAMS['prcp_scaling_factor'] """ if monthly_step: raise NotImplementedError('monthly_step not implemented yet') mbmod = MultipleFlowlineMassBalance(gdir, mb_model_class=PastMassBalance, filename=climate_filename, use_inversion_flowlines=use_inversion_flowlines, input_filesuffix=climate_input_filesuffix) if temperature_bias is not None: mbmod.temp_bias = temperature_bias if precipitation_factor is not None: mbmod.prcp_fac = precipitation_factor if years is None: if ys is None: ys = mbmod.flowline_mb_models[0].ys if ye is None: ye = mbmod.flowline_mb_models[0].ye years = np.arange(ys, ye + 1) odf = pd.Series(data=mbmod.get_specific_mb(year=years), index=years) return odf
[docs]@entity_task(log) def compute_ela(gdir, ys=None, ye=None, years=None, climate_filename='climate_historical', temperature_bias=None, precipitation_factor=None, climate_input_filesuffix=''): """Computes the ELA of a glacier for a for given years and climate. Parameters ---------- gdir : :py:class:`oggm.GlacierDirectory` the glacier directory to process ys : int start year ye : int end year years : array of ints override ys and ye with the years of your choice climate_filename : str name of the climate file, e.g. 'climate_historical' (default) or 'gcm_data' climate_input_filesuffix : str filesuffix for the input climate file temperature_bias : float add a bias to the temperature timeseries precipitation_factor: float multiply a factor to the precipitation time series default is None and means that the precipitation factor from the calibration is applied which is cfg.PARAMS['prcp_scaling_factor'] """ mbmod = PastMassBalance(gdir, filename=climate_filename, input_filesuffix=climate_input_filesuffix) if temperature_bias is not None: mbmod.temp_bias = temperature_bias if precipitation_factor is not None: mbmod.prcp_fac = precipitation_factor mbmod.valid_bounds = [-10000, 20000] if years is None: years = np.arange(ys, ye+1) ela = [] for yr in years: ela = np.append(ela, mbmod.get_ela(year=yr)) odf = pd.Series(data=ela, index=years) return odf