""" Configuration file and options
A number of globals are defined here to be available everywhere.
"""
import logging
import os
import shutil
import sys
import glob
import json
from collections import OrderedDict
from distutils.util import strtobool
import numpy as np
import pandas as pd
from scipy.signal import gaussian
from configobj import ConfigObj, ConfigObjError
try:
import geopandas as gpd
except ImportError:
pass
try:
import salem
except ImportError:
pass
# Local logger
log = logging.getLogger(__name__)
# Path to the cache directory
CACHE_DIR = os.path.join(os.path.expanduser('~'), '.oggm')
if not os.path.exists(CACHE_DIR):
os.makedirs(CACHE_DIR)
# Path to the config file
CONFIG_FILE = os.path.join(os.path.expanduser('~'), '.oggm_config')
# config was changed, indicates that multiprocessing needs a reset
CONFIG_MODIFIED = False
class DocumentedDict(dict):
"""Quick "magic" to document the BASENAMES entries."""
def __init__(self):
self._doc = dict()
def _set_key(self, key, value, docstr=''):
if key in self:
raise ValueError('Cannot overwrite a key.')
dict.__setitem__(self, key, value)
self._doc[key] = docstr
def __setitem__(self, key, value):
# Overrides the original dic to separate value and documentation
global CONFIG_MODIFIED
try:
self._set_key(key, value[0], docstr=value[1])
CONFIG_MODIFIED = True
except BaseException:
raise ValueError('DocumentedDict accepts only tuple of len 2')
def info_str(self, key):
"""Info string for the documentation."""
return ' {}'.format(self[key]) + '\n' + ' ' + self._doc[key]
def doc_str(self, key):
"""Info string for the documentation."""
return ' {}'.format(self[key]) + '\n' + ' ' + \
self._doc[key]
class ResettingOrderedDict(OrderedDict):
"""OrderedDict wrapper that resets our multiprocessing on set"""
def __setitem__(self, key, value):
global CONFIG_MODIFIED
OrderedDict.__setitem__(self, key, value)
CONFIG_MODIFIED = True
class PathOrderedDict(ResettingOrderedDict):
"""Quick "magic" to be sure that paths are expanded correctly."""
def __setitem__(self, key, value):
# Overrides the original dic to expand the path
ResettingOrderedDict.__setitem__(self, key, os.path.expanduser(value))
# Globals
IS_INITIALIZED = False
PARAMS = ResettingOrderedDict()
PATHS = PathOrderedDict()
BASENAMES = DocumentedDict()
LRUHANDLERS = ResettingOrderedDict()
DATA = ResettingOrderedDict()
# Constants
SEC_IN_YEAR = 365*24*3600
SEC_IN_DAY = 24*3600
SEC_IN_HOUR = 3600
SEC_IN_MONTH = 2628000
DAYS_IN_MONTH = np.array([31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31])
G = 9.81 # gravity
GAUSSIAN_KERNEL = dict()
for ks in [5, 7, 9]:
kernel = gaussian(ks, 1)
GAUSSIAN_KERNEL[ks] = kernel / kernel.sum()
_doc = ('A geotiff file containing the DEM (reprojected into the local grid).'
'This DEM is not smoothed or gap filles, and is the closest to the '
'original DEM source.')
BASENAMES['dem'] = ('dem.tif', _doc)
_doc = ('A glacier mask geotiff file with the same extend and projection as '
'the `dem.tif`. This geotiff has value 1 at glaciated grid points and '
' value 0 at unglaciated points.')
BASENAMES['glacier_mask'] = ('glacier_mask.tif', _doc)
_doc = ('The glacier outlines in the local map projection (Transverse '
'Mercator).')
BASENAMES['outlines'] = ('outlines.shp', _doc)
_doc = ('The glacier intersects in the local map projection (Transverse '
'Mercator).')
BASENAMES['intersects'] = ('intersects.shp', _doc)
_doc = ('Each flowline has a catchment area computed from flow routing '
'algorithms: this shapefile stores the catchment outlines (in the '
'local map projection (Transverse Mercator).')
BASENAMES['flowline_catchments'] = ('flowline_catchments.shp', _doc)
_doc = ('The intersections between cathments (shapefile) in the local map '
'projection (Transverse Mercator).')
BASENAMES['catchments_intersects'] = ('catchments_intersects.shp', _doc)
_doc = 'A ``salem.Grid`` handling the georeferencing of the local grid.'
BASENAMES['glacier_grid'] = ('glacier_grid.json', _doc)
_doc = 'A dictionary containing runtime diagnostics useful for debugging.'
BASENAMES['diagnostics'] = ('diagnostics.json', _doc)
_doc = ('A netcdf file containing several gridded data variables such as '
'topography, the glacier masks, the interpolated 2D glacier bed, '
'and more.')
BASENAMES['gridded_data'] = ('gridded_data.nc', _doc)
_doc = ('A dictionary containing the shapely.Polygons of a glacier. The '
'"polygon_hr" entry contains the geometry transformed to the local '
'grid in (i, j) coordinates, while the "polygon_pix" entry contains '
'the geometries transformed into the coarse grid (the i, j elements '
'are integers). The "polygon_area" entry contains the area of the '
'polygon as computed by Shapely. The "catchment_indices" entry'
'contains a list of len `n_centerlines`, each element containing '
'a numpy array of the indices in the glacier grid which represent '
'the centerlines catchment area.')
BASENAMES['geometries'] = ('geometries.pkl', _doc)
_doc = ('A dictionary containing the downsteam line geometry as well as the '
'bed shape computed from a parabolic fit.')
BASENAMES['downstream_line'] = ('downstream_line.pkl', _doc)
_doc = 'A text file with the source of the topo file (GIMP, SRTM, ...).'
BASENAMES['dem_source'] = ('dem_source.txt', _doc)
_doc = ('A hypsometry file computed by OGGM and provided in the same format '
'as the RGI (useful for diagnostics).')
BASENAMES['hypsometry'] = ('hypsometry.csv', _doc)
_doc = 'A list of :py:class:`oggm.Centerline` instances, sorted by flow order.'
BASENAMES['centerlines'] = ('centerlines.pkl', _doc)
_doc = ('A "better" version of the Centerlines, now on a regular spacing '
'i.e., not on the gridded (i, j) indices. The tails of the '
'tributaries are cut out to make more realistic junctions. '
'They are now "1.5D" i.e., with a width.')
BASENAMES['inversion_flowlines'] = ('inversion_flowlines.pkl', _doc)
_doc = 'The monthly climate timeseries stored in a netCDF file.'
BASENAMES['climate_monthly'] = ('climate_monthly.nc', _doc)
_doc = ('Some information (dictionary) about the climate data and the mass '
'balance parameters for this glacier.')
BASENAMES['climate_info'] = ('climate_info.json', _doc)
_doc = 'The monthly GCM climate timeseries stored in a netCDF file.'
BASENAMES['gcm_data'] = ('gcm_data.nc', _doc)
_doc = "A dict containing the glacier's t*, bias, and the flowlines' mu*"
BASENAMES['local_mustar'] = ('local_mustar.json', _doc)
_doc = 'List of dicts containing the data needed for the inversion.'
BASENAMES['inversion_input'] = ('inversion_input.pkl', _doc)
_doc = 'List of dicts containing the output data from the inversion.'
BASENAMES['inversion_output'] = ('inversion_output.pkl', _doc)
_doc = 'Dict of fs and fd as computed by the inversion optimisation.'
BASENAMES['inversion_params'] = ('inversion_params.pkl', _doc)
_doc = 'List of flowlines ready to be run by the model.'
BASENAMES['model_flowlines'] = ('model_flowlines.pkl', _doc)
_doc = ('When using a linear mass-balance for the inversion, this dict stores '
'the optimal ela_h and grad.')
BASENAMES['linear_mb_params'] = ('linear_mb_params.pkl', _doc)
_doc = ('A netcdf file containing enough information to reconstruct the '
'entire flowline glacier along the run (can be data expensive).')
BASENAMES['model_run'] = ('model_run.nc', _doc)
_doc = ('A netcdf file containing the model diagnostics (volume, '
'mass-balance, length...).')
BASENAMES['model_diagnostics'] = ('model_diagnostics.nc', _doc)
_doc = ('A csv file containing the output of each iteration of the '
'find_inversion_calving_loop task loop.')
BASENAMES['calving_loop'] = ('calving_loop.csv', _doc)
_doc = 'Calving output (deprecated).'
BASENAMES['calving_output'] = ('calving_output.pkl', _doc)
_doc = "A dict containing the glacier's t*, bias, mu*. Analogous " \
"to 'local_mustar.json', but for the volume/area scaling model."
BASENAMES['vascaling_mustar'] = ('vascaling_mustar.json', _doc)
[docs]def set_logging_config(logging_level='INFO'):
"""Set the global logger parameters.
Logging levels:
DEBUG
Print detailed information, typically of interest only when diagnosing
problems.
INFO
Print confirmation that things are working as expected, e.g. when
each task is run correctly (this is the default).
WARNING
Indication that something unexpected happened on a glacier,
but that OGGM is still working on this glacier.
WORKFLOW
Print only high level, workflow information (typically, one message
per task). Errors will still be printed, but warnings won't.
ERROR
Print errors only, e.g. when a glacier cannot run properly.
CRITICAL
Print nothing but fatal errors.
Parameters
----------
logging_level : str or None
the logging level. See description above for a list of options. Setting
to `None` is equivalent to `'CRITICAL'`, i.e. no log output will be
generated.
"""
# Add a custom level - just for us
logging.addLevelName(35, 'WORKFLOW')
def workflow(self, message, *args, **kws):
"""Standard log message with a custom level."""
if self.isEnabledFor(35):
# Yes, logger takes its '*args' as 'args'.
self._log(35, message, args, **kws)
logging.WORKFLOW = 35
logging.Logger.workflow = workflow
# Remove all handlers associated with the root logger object.
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
# Spammers
logging.getLogger("Fiona").setLevel(logging.CRITICAL)
logging.getLogger("fiona").setLevel(logging.CRITICAL)
logging.getLogger("shapely").setLevel(logging.CRITICAL)
logging.getLogger("rasterio").setLevel(logging.CRITICAL)
logging.getLogger("matplotlib").setLevel(logging.CRITICAL)
logging.getLogger("numexpr").setLevel(logging.CRITICAL)
# Basic config
if logging_level is None:
logging_level = 'CRITICAL'
logging_level = logging_level.upper()
logging.basicConfig(format='%(asctime)s: %(name)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=getattr(logging, logging_level))
[docs]def initialize_minimal(file=None, logging_level='INFO'):
"""Same as initialise() but without requiring any download of data.
This is useful for "flowline only" OGGM applications
Parameters
----------
file : str
path to the configuration file (default: OGGM params.cfg)
logging_level : str
set a logging level. See :func:`set_logging_config` for options.
"""
global IS_INITIALIZED
global PARAMS
global PATHS
set_logging_config(logging_level=logging_level)
if file is None:
file = os.path.join(os.path.abspath(os.path.dirname(__file__)),
'params.cfg')
try:
cp = ConfigObj(file, file_error=True)
except (ConfigObjError, IOError) as e:
log.critical('Config file could not be parsed (%s): %s', file, e)
sys.exit()
log.workflow('Using configuration file: %s', file)
# Paths
oggm_static_paths()
PATHS['working_dir'] = cp['working_dir']
PATHS['dem_file'] = cp['dem_file']
PATHS['climate_file'] = cp['climate_file']
# Multiprocessing pool
PARAMS['use_multiprocessing'] = cp.as_bool('use_multiprocessing')
PARAMS['mp_processes'] = cp.as_int('mp_processes')
# Some non-trivial params
PARAMS['continue_on_error'] = cp.as_bool('continue_on_error')
PARAMS['grid_dx_method'] = cp['grid_dx_method']
PARAMS['topo_interp'] = cp['topo_interp']
PARAMS['use_intersects'] = cp.as_bool('use_intersects')
PARAMS['use_compression'] = cp.as_bool('use_compression')
PARAMS['mpi_recv_buf_size'] = cp.as_int('mpi_recv_buf_size')
PARAMS['use_multiple_flowlines'] = cp.as_bool('use_multiple_flowlines')
PARAMS['filter_min_slope'] = cp.as_bool('filter_min_slope')
PARAMS['auto_skip_task'] = cp.as_bool('auto_skip_task')
PARAMS['correct_for_neg_flux'] = cp.as_bool('correct_for_neg_flux')
PARAMS['filter_for_neg_flux'] = cp.as_bool('filter_for_neg_flux')
PARAMS['run_mb_calibration'] = cp.as_bool('run_mb_calibration')
PARAMS['rgi_version'] = cp['rgi_version']
PARAMS['use_rgi_area'] = cp.as_bool('use_rgi_area')
PARAMS['compress_climate_netcdf'] = cp.as_bool('compress_climate_netcdf')
PARAMS['use_tar_shapefiles'] = cp.as_bool('use_tar_shapefiles')
PARAMS['clip_mu_star'] = cp.as_bool('clip_mu_star')
PARAMS['clip_tidewater_border'] = cp.as_bool('clip_tidewater_border')
PARAMS['dl_verify'] = cp.as_bool('dl_verify')
# Climate
PARAMS['baseline_climate'] = cp['baseline_climate'].strip().upper()
PARAMS['baseline_y0'] = cp.as_int('baseline_y0')
PARAMS['baseline_y1'] = cp.as_int('baseline_y1')
PARAMS['hydro_month_nh'] = cp.as_int('hydro_month_nh')
PARAMS['hydro_month_sh'] = cp.as_int('hydro_month_sh')
PARAMS['temp_use_local_gradient'] = cp.as_bool('temp_use_local_gradient')
PARAMS['tstar_search_glacierwide'] = cp.as_bool('tstar_search_glacierwide')
k = 'temp_local_gradient_bounds'
PARAMS[k] = [float(vk) for vk in cp.as_list(k)]
k = 'tstar_search_window'
PARAMS[k] = [int(vk) for vk in cp.as_list(k)]
PARAMS['use_bias_for_run'] = cp.as_bool('use_bias_for_run')
# Inversion
k = 'use_shape_factor_for_inversion'
PARAMS[k] = cp[k]
# Flowline model
k = 'use_shape_factor_for_fluxbasedmodel'
PARAMS[k] = cp[k]
# Delete non-floats
ltr = ['working_dir', 'dem_file', 'climate_file', 'use_tar_shapefiles',
'grid_dx_method', 'run_mb_calibration', 'compress_climate_netcdf',
'mp_processes', 'use_multiprocessing', 'baseline_y0', 'baseline_y1',
'temp_use_local_gradient', 'temp_local_gradient_bounds',
'topo_interp', 'use_compression', 'bed_shape', 'continue_on_error',
'use_multiple_flowlines', 'tstar_search_glacierwide',
'mpi_recv_buf_size', 'hydro_month_nh', 'clip_mu_star',
'tstar_search_window', 'use_bias_for_run', 'hydro_month_sh',
'use_intersects', 'filter_min_slope', 'clip_tidewater_border',
'auto_skip_task', 'correct_for_neg_flux', 'filter_for_neg_flux',
'rgi_version', 'dl_verify',
'use_shape_factor_for_inversion', 'use_rgi_area',
'use_shape_factor_for_fluxbasedmodel', 'baseline_climate']
for k in ltr:
cp.pop(k, None)
# Other params are floats
for k in cp:
PARAMS[k] = cp.as_float(k)
# Empty defaults
set_intersects_db()
IS_INITIALIZED = True
[docs]def initialize(file=None, logging_level='INFO'):
"""Read the configuration file containing the run's parameters.
This should be the first call, before using any of the other OGGM modules
for most (all?) OGGM simulations.
Parameters
----------
file : str
path to the configuration file (default: OGGM params.cfg)
logging_level : str
set a logging level. See :func:`set_logging_config` for options.
"""
global PARAMS
global DATA
initialize_minimal(file=file, logging_level=logging_level)
# Make sure we have a proper cache dir
from oggm.utils import download_oggm_files, get_demo_file
download_oggm_files()
# Read-in the reference t* data for all available models types (oggm, vas)
model_prefixes = ['oggm_', 'vas_']
for prefix in model_prefixes:
fns = ['ref_tstars_rgi5_cru4', 'ref_tstars_rgi6_cru4',
'ref_tstars_rgi5_histalp', 'ref_tstars_rgi6_histalp']
for fn in fns:
fpath = get_demo_file(prefix + fn + '.csv')
PARAMS[prefix + fn] = pd.read_csv(fpath)
fpath = get_demo_file(prefix + fn + '_calib_params.json')
with open(fpath, 'r') as fp:
mbpar = json.load(fp)
PARAMS[prefix + fn + '_calib_params'] = mbpar
# Pre extract cru cl to avoid problems by multiproc
from oggm.utils import get_cru_cl_file
get_cru_cl_file()
# Read in the demo glaciers
file = os.path.join(os.path.abspath(os.path.dirname(__file__)),
'data', 'demo_glaciers.csv')
DATA['demo_glaciers'] = pd.read_csv(file, index_col=0)
# Add other things
if 'dem_grids' not in DATA:
grids = {}
for grid_json in ['gimpdem_90m_v01.1.json',
'arcticdem_mosaic_100m_v3.0.json',
'AntarcticDEM_wgs84.json',
'REMA_100m_dem.json']:
if grid_json not in grids:
fp = os.path.join(os.path.abspath(os.path.dirname(__file__)),
'data', grid_json)
try:
grids[grid_json] = salem.Grid.from_json(fp)
except NameError:
pass
DATA['dem_grids'] = grids
def oggm_static_paths():
"""Initialise the OGGM paths from the config file."""
global PATHS, PARAMS
# See if the file is there, if not create it
if not os.path.exists(CONFIG_FILE):
dldir = os.path.join(os.path.expanduser('~'), 'OGGM')
config = ConfigObj()
config['dl_cache_dir'] = os.path.join(dldir, 'download_cache')
config['dl_cache_readonly'] = False
config['tmp_dir'] = os.path.join(dldir, 'tmp')
config['cru_dir'] = os.path.join(dldir, 'cru')
config['rgi_dir'] = os.path.join(dldir, 'rgi')
config['test_dir'] = os.path.join(dldir, 'tests')
config['has_internet'] = True
config.filename = CONFIG_FILE
config.write()
# OK, read in the file
try:
config = ConfigObj(CONFIG_FILE, file_error=True)
except (ConfigObjError, IOError) as e:
log.critical('Config file could not be parsed (%s): %s',
CONFIG_FILE, e)
sys.exit()
# Check that all keys are here
for k in ['dl_cache_dir', 'dl_cache_readonly', 'tmp_dir',
'cru_dir', 'rgi_dir', 'test_dir', 'has_internet']:
if k not in config:
raise RuntimeError('The oggm config file ({}) should have an '
'entry for {}.'.format(CONFIG_FILE, k))
# Override defaults with env variables if available
if os.environ.get('OGGM_DOWNLOAD_CACHE_RO') is not None:
ro = bool(strtobool(os.environ.get('OGGM_DOWNLOAD_CACHE_RO')))
config['dl_cache_readonly'] = ro
if os.environ.get('OGGM_DOWNLOAD_CACHE') is not None:
config['dl_cache_dir'] = os.environ.get('OGGM_DOWNLOAD_CACHE')
if os.environ.get('OGGM_EXTRACT_DIR') is not None:
# This is for the directories where OGGM needs to extract things
# On the cluster it might be useful to do it on a fast disc
edir = os.path.abspath(os.environ.get('OGGM_EXTRACT_DIR'))
config['tmp_dir'] = os.path.join(edir, 'tmp')
config['cru_dir'] = os.path.join(edir, 'cru')
config['rgi_dir'] = os.path.join(edir, 'rgi')
if not config['dl_cache_dir']:
raise RuntimeError('At the very least, the "dl_cache_dir" entry '
'should be provided in the oggm config file '
'({})'.format(CONFIG_FILE, k))
# Fill the PATH dict
for k, v in config.iteritems():
if not k.endswith('_dir'):
continue
PATHS[k] = os.path.abspath(os.path.expanduser(v))
# Other
PARAMS['has_internet'] = config.as_bool('has_internet')
PARAMS['dl_cache_readonly'] = config.as_bool('dl_cache_readonly')
# Create cache dir if possible
if not os.path.exists(PATHS['dl_cache_dir']):
if not PARAMS['dl_cache_readonly']:
os.makedirs(PATHS['dl_cache_dir'])
# Always call this one!
oggm_static_paths()
def get_lru_handler(tmpdir=None, maxsize=100, ending='.tif'):
"""LRU handler for a given temporary directory (singleton).
Parameters
----------
tmpdir : str
path to the temporary directory to handle. Default is
``cfg.PATHS['tmp_dir']``.
maxsize : int
the max number of files to keep in the directory
ending : str
consider only the files with a certain ending
"""
global LRUHANDLERS
# see if we're set up
if tmpdir is None:
tmpdir = PATHS['tmp_dir']
if not os.path.exists(tmpdir):
os.makedirs(tmpdir)
# one handler per directory and per size
# (in practice not very useful, but a dict is easier to handle)
k = (tmpdir, maxsize)
if k in LRUHANDLERS:
# was already there
return LRUHANDLERS[k]
else:
# we do a new one
from oggm.utils import LRUFileCache
# the files already present have to be counted, too
l0 = list(glob.glob(os.path.join(tmpdir, '*' + ending)))
l0.sort(key=os.path.getmtime)
lru = LRUFileCache(l0, maxsize=maxsize)
LRUHANDLERS[k] = lru
return lru
[docs]def set_intersects_db(path_or_gdf=None):
"""Set the glacier intersection database for OGGM to use.
It is now set automatically by the
:func:`oggm.workflow.init_glacier_regions` task, but setting it manually
can be useful for a slightly faster run initialization.
See :func:`oggm.utils.get_rgi_intersects_region_file` for how to obtain
such data.
Parameters
----------
path_or_gdf : str of geopandas.GeoDataframe
the intersects file to use
"""
if PARAMS['use_intersects'] and path_or_gdf is not None:
if isinstance(path_or_gdf, str):
PARAMS['intersects_gdf'] = gpd.read_file(path_or_gdf)
else:
PARAMS['intersects_gdf'] = path_or_gdf
else:
PARAMS['intersects_gdf'] = pd.DataFrame()
[docs]def reset_working_dir():
"""Deletes the content of the working directory. Careful: cannot be undone!
"""
if PATHS['working_dir']:
if os.path.exists(PATHS['working_dir']):
shutil.rmtree(PATHS['working_dir'])
os.makedirs(PATHS['working_dir'])
def pack_config():
"""Pack the entire configuration in one pickleable dict."""
return {
'IS_INITIALIZED': IS_INITIALIZED,
'PARAMS': PARAMS,
'PATHS': PATHS,
'LRUHANDLERS': LRUHANDLERS,
'DATA': DATA,
'BASENAMES': dict(BASENAMES)
}
def unpack_config(cfg_dict):
"""Unpack and apply the config packed via pack_config."""
global IS_INITIALIZED, PARAMS, PATHS, BASENAMES, LRUHANDLERS, DATA
IS_INITIALIZED = cfg_dict['IS_INITIALIZED']
PARAMS = cfg_dict['PARAMS']
PATHS = cfg_dict['PATHS']
LRUHANDLERS = cfg_dict['LRUHANDLERS']
DATA = cfg_dict['DATA']
# BASENAMES is a DocumentedDict, which cannot be pickled because
# set intentionally mismatches with get
BASENAMES = DocumentedDict()
for k in cfg_dict['BASENAMES']:
BASENAMES[k] = (cfg_dict['BASENAMES'][k], 'Imported Pickle')
def add_to_basenames(basename, filename, docstr=''):
"""Add an entry to the list of BASENAMES.
BASENAMES are access keys to files available at the gdir level.
Parameters
----------
basename : str
the key (e.g. 'dem', 'model_flowlines')
filename : str
the associated filename (e.g. 'dem.tif', 'model_flowlines.pkl')
docstr : str
the associated docstring (for documentation)
"""
global BASENAMES
if '.' not in filename:
raise ValueError('The filename needs a proper file suffix!')
BASENAMES[basename] = (filename, docstr)