# Copyright (c) 2018 The Harmonica Developers.
# Distributed under the terms of the BSD 3-Clause License.
# SPDX-License-Identifier: BSD-3-Clause
# This code is part of the Fatiando a Terra project (
Functions for interacting with standarized data files that contain gravity or
magnetic geophysical data.
import contextlib

import numpy as np
import xarray as xr

[docs]def load_icgem_gdf(fname, **kwargs): """ Reads data from an ICGEM .gdf file. The `ICGEM Calculation Service <>`__ [BarthelmesKohler2016]_ generates gravity field grids from spherical harmonic models. They use a custom ASCII grid format with information in the header. This function can read the format and parse information from the header. It returns the data in a :class:`xarray.Dataset` for convenience and reduced storage requirements. Parameters ---------- fname : string or file-like object Name of the ICGEM ``.gdf`` file or an open file object to read from. kwargs Extra keyword arguments to this function will be passed to :func:`numpy.loadtxt`. Returns ------- grid : :class:`xarray.Dataset` An :class:`xarray.Dataset` with the data from the file. The header of the gdf file is available through the ``attr`` attribute of the :class:`xarray.Dataset`. """ rawdata, metadata = _read_gdf_file(fname, **kwargs) shape = (int(metadata["latitude_parallels"]), int(metadata["longitude_parallels"])) data = dict(zip(metadata["attributes"], rawdata)) coords = { "longitude": data["longitude"].reshape(shape)[0, :], "latitude": data["latitude"].reshape(shape)[:, 0][::-1], } dims = ["latitude", "longitude"] data_vars = { name: (dims, value.reshape(shape)[::-1]) for name, value in data.items() if name not in dims } # If the grid is at constant height, add the height as a matrix for # convenience (otherwise, would have to parse from the attrs) if "height_over_ell" in metadata: height = float(metadata["height_over_ell"].split()[0]) data_vars["height_over_ell"] = (dims, np.full(shape, height)) # Can't have lists in the Dataset metadata to make it compatible with # netCDF3. This way the data can be saved using only scipy as a dependency # instead of netcdf4. metadata["attributes"] = " ".join(metadata["attributes"]) metadata["attributes_units"] = " ".join(metadata["attributes_units"]) grid = xr.Dataset(data_vars, coords=coords, attrs=metadata) # Check area from header equals to area from data in cols area = [ float(metadata["latlimit_south"]), float(metadata["latlimit_north"]), float(metadata["longlimit_west"]), float(metadata["longlimit_east"]), ] area_from_cols = ( grid.latitude.values.min(), grid.latitude.values.max(), grid.longitude.values.min(), grid.longitude.values.max(), ) if not np.allclose(area, area_from_cols): raise IOError( "Grid area read ({}) and calculated from attributes " "({}) mismatch.".format(area, area_from_cols) ) return grid
def _read_gdf_file(fname, **kwargs): """ Read ICGEM gdf file and returns metadata dict and data in cols as np.array """ # If it's an open file, does nothing. Otherwise, open and add to the # context lib stack to make it close when exiting the with block. with contextlib.ExitStack() as stack: if hasattr(fname, "read"): # It's a file object gdf_file = fname else: # It's a file path gdf_file = stack.enter_context(open(fname)) # Read the header and extract metadata metadata = {} metadata_line = True attributes_units_line = False for line in gdf_file: if line.strip()[:11] == "end_of_head": break if not line.strip(): metadata_line = False continue if metadata_line: parts = line.strip().split() metadata[parts[0]] = " ".join(parts[1:]) else: if not attributes_units_line: metadata["attributes"] = line.strip().split() attributes_units_line = True else: metadata["attributes_units"] = line.strip().split() # Read the numerical values rawdata = np.loadtxt(gdf_file, ndmin=2, unpack=True, **kwargs) _check_gdf_integrity(metadata) # Remove column names from the metadata if they weren't read if kwargs.get("usecols", None) is not None: metadata["attributes"] = [metadata["attributes"][i] for i in kwargs["usecols"]] if len(metadata["attributes"]) != rawdata.shape[0]: raise IOError( "Number of attributes ({}) and data columns ({}) mismatch".format( len(metadata["attributes"]), rawdata.shape[0] ) ) return rawdata, metadata def _check_gdf_integrity(metadata): "Check the integrity of ICGEM gdf file metadata." needed_args = [ "latitude_parallels", "longitude_parallels", "number_of_gridpoints", "latlimit_south", "latlimit_north", "longlimit_west", "longlimit_east", ] # Check for needed arguments in metadata dictionary for arg in needed_args: if arg in metadata: metadata[arg] = metadata[arg].split()[0] else: raise IOError("Couldn't read {} field from gdf file header".format(arg)) if "attributes" not in metadata: raise IOError("Couldn't read column names.") if "attributes_units" not in metadata: raise IOError("Couldn't read column units.") # Check cols names and units integrity if len(metadata["attributes"]) != len(metadata["attributes_units"]): raise IOError( "Number of attributes ({}) and units ({}) mismatch".format( len(metadata["attributes"]), len(metadata["attributes_units"]) ) ) metadata["attributes_units"] = [ attr.replace("[", "").replace("]", "").strip() for attr in metadata["attributes_units"] ] for arg in ["latitude", "longitude"]: if arg not in metadata["attributes"]: raise IOError("Couldn't find {} column.".format(arg)) # Check proper values for shape and size shape = (int(metadata["latitude_parallels"]), int(metadata["longitude_parallels"])) size = int(metadata["number_of_gridpoints"]) if shape[0] * shape[1] != size: raise IOError("Grid shape '{}' and size '{}' mismatch.".format(shape, size))