Source code for oggm.shop.hugonnet_maps

import logging
import warnings
from packaging.version import Version

import numpy as np
import pandas as pd
import xarray as xr
import os

try:
    import rasterio
    from rasterio.warp import reproject, Resampling, calculate_default_transform
    from rasterio import MemoryFile
    try:
        # rasterio V > 1.0
        from rasterio.merge import merge as merge_tool
    except ImportError:
        from rasterio.tools.merge import merge as merge_tool
except ImportError:
    pass


from oggm import utils, cfg

# Module logger
log = logging.getLogger(__name__)

default_base_url = 'https://cluster.klima.uni-bremen.de/~oggm/geodetic_ref_mb_maps/'

_lookup_csv = None


def _get_lookup_csv():
    global _lookup_csv
    if _lookup_csv is None:
        fname = default_base_url + 'hugonnet_dhdt_lookup_csv_20230129.csv'
        _lookup_csv = pd.read_csv(utils.file_downloader(fname), index_col=0)
    return _lookup_csv


[docs] @utils.entity_task(log, writes=['gridded_data']) def hugonnet_to_gdir(gdir, add_error=False): """Add the Hugonnet 21 dhdt maps to this glacier directory. Parameters ---------- gdir : :py:class:`oggm.GlacierDirectory` the glacier directory to process add_error : bool add the error data or not """ if add_error: raise NotImplementedError('Not yet') # Find out which file(s) we need df = _get_lookup_csv() lon_ex, lat_ex = gdir.extent_ll # adding small buffer for unlikely case where one lon/lat_ex == xx.0 lons = np.arange(np.floor(lon_ex[0] - 1e-9), np.ceil(lon_ex[1] + 1e-9)) lats = np.arange(np.floor(lat_ex[0] - 1e-9), np.ceil(lat_ex[1] + 1e-9)) flist = [] for lat in lats: # north or south? ns = 'S' if lat < 0 else 'N' for lon in lons: # east or west? ew = 'W' if lon < 0 else 'E' ll_str = f'{ns}{abs(lat):02.0f}{ew}{abs(lon):03.0f}' try: filename = df.loc[(df['file_id'] == ll_str)]['dhdt'].iloc[0] except IndexError: # We can maybe be on the edge (unlikely but hey pass file_local = utils.file_downloader(default_base_url + filename) if file_local is not None: flist.append(file_local) # A glacier area can cover more than one tile: if len(flist) == 1: dem_dss = [rasterio.open(flist[0])] # if one tile, just open it file_crs = dem_dss[0].crs dem_data = rasterio.band(dem_dss[0], 1) if Version(rasterio.__version__) >= Version('1.0'): src_transform = dem_dss[0].transform else: src_transform = dem_dss[0].affine nodata = dem_dss[0].meta.get('nodata', None) else: dem_dss = [rasterio.open(s) for s in flist] # list of rasters # make sure all files have the same crs and reproject if needed; # defining the target crs to the one most commonly used, minimizing # the number of files for reprojection crs_list = np.array([dem_ds.crs.to_string() for dem_ds in dem_dss]) unique_crs, crs_counts = np.unique(crs_list, return_counts=True) file_crs = rasterio.crs.CRS.from_string( unique_crs[np.argmax(crs_counts)]) if len(unique_crs) != 1: # more than one crs, we need to do reprojection memory_files = [] for i, src in enumerate(dem_dss): if file_crs != src.crs: transform, width, height = calculate_default_transform( src.crs, file_crs, src.width, src.height, *src.bounds) kwargs = src.meta.copy() kwargs.update({ 'crs': file_crs, 'transform': transform, 'width': width, 'height': height }) reprojected_array = np.empty(shape=(src.count, height, width), dtype=src.dtypes[0]) # just for completeness; even the data only has one band for band in range(1, src.count + 1): reproject(source=rasterio.band(src, band), destination=reprojected_array[band - 1], src_transform=src.transform, src_crs=src.crs, dst_transform=transform, dst_crs=file_crs, resampling=Resampling.nearest) memfile = MemoryFile() with memfile.open(**kwargs) as mem_dst: mem_dst.write(reprojected_array) memory_files.append(memfile) else: memfile = MemoryFile() with memfile.open(**src.meta) as mem_src: mem_src.write(src.read()) memory_files.append(memfile) with rasterio.Env(): datasets_to_merge = [memfile.open() for memfile in memory_files] nodata = datasets_to_merge[0].meta.get('nodata', None) dem_data, src_transform = merge_tool(datasets_to_merge, nodata=nodata) else: # only one single crs occurring, no reprojection needed nodata = dem_dss[0].meta.get('nodata', None) dem_data, src_transform = merge_tool(dem_dss, nodata=nodata) # Set up profile for writing output with rasterio.open(gdir.get_filepath('dem')) as dem_ds: dst_array = dem_ds.read().astype(np.float32) dst_array[:] = np.nan profile = dem_ds.profile transform = dem_ds.transform dst_crs = dem_ds.crs # Set up profile for writing output profile.update({ 'nodata': np.nan, }) resampling = Resampling.bilinear with MemoryFile() as dest: reproject( # Source parameters source=dem_data, src_crs=file_crs, src_transform=src_transform, src_nodata=nodata, # Destination parameters destination=dst_array, dst_transform=transform, dst_crs=dst_crs, dst_nodata=np.nan, # Configuration resampling=resampling) dest.write(dst_array) for dem_ds in dem_dss: dem_ds.close() # Write with utils.ncDataset(gdir.get_filepath('gridded_data'), 'a') as nc: vn = 'hugonnet_dhdt' if vn in nc.variables: v = nc.variables[vn] else: v = nc.createVariable(vn, 'f4', ('y', 'x', ), zlib=True, fill_value=np.nan) v.units = 'm' ln = 'dhdt (2000-2020) from Hugonnet et al. 2021' v.long_name = ln data_str = ' '.join(flist) if len(flist) > 1 else flist[0] v.data_source = data_str v[:] = np.squeeze(dst_array).astype(np.float32)
@utils.entity_task(log) def hugonnet_statistics(gdir): """Gather statistics about the Hugonnet data interpolated to this glacier. """ d = dict() # Easy stats - this should always be possible d['rgi_id'] = gdir.rgi_id d['rgi_region'] = gdir.rgi_region d['rgi_subregion'] = gdir.rgi_subregion d['rgi_area_km2'] = gdir.rgi_area_km2 d['hugonnet_area_km2'] = 0 d['hugonnet_perc_cov'] = 0 d['hugonnet_avg_dhdt'] = np.nan try: with xr.open_dataset(gdir.get_filepath('gridded_data')) as ds: dhdt = ds['hugonnet_dhdt'].where(ds['glacier_mask'], np.nan).load() gridded_area = ds['glacier_mask'].sum() * gdir.grid.dx ** 2 * 1e-6 d['hugonnet_area_km2'] = float((~dhdt.isnull()).sum() * gdir.grid.dx ** 2 * 1e-6) d['hugonnet_perc_cov'] = float(d['hugonnet_area_km2'] / gridded_area) with warnings.catch_warnings(): # This can trigger an empty mean warning warnings.filterwarnings("ignore", category=RuntimeWarning) d['hugonnet_avg_dhdt'] = np.nanmean(dhdt.data) except (FileNotFoundError, AttributeError, KeyError): pass return d
[docs] @utils.global_task(log) def compile_hugonnet_statistics(gdirs, filesuffix='', path=True): """Gather as much statistics as possible about a list of glaciers. It can be used to do result diagnostics and other stuffs. Parameters ---------- gdirs : list of :py:class:`oggm.GlacierDirectory` objects the glacier directories to process filesuffix : str add suffix to output file path : str, bool Set to "True" in order to store the info in the working directory Set to a path to store the file to your chosen location """ from oggm.workflow import execute_entity_task out_df = execute_entity_task(hugonnet_statistics, gdirs) out = pd.DataFrame(out_df).set_index('rgi_id') if path: if path is True: out.to_csv(os.path.join(cfg.PATHS['working_dir'], ('hugonnet_statistics' + filesuffix + '.csv'))) else: out.to_csv(path) return out