Source code for eodal.utils.arrays

"""
Utilities to interact with 2d arrays

Copyright (C) 2022 Lukas Valentin Graf

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""

from __future__ import annotations

import itertools
import geopandas as gpd
import numpy as np

from numba import njit, prange
from numpy.ma.core import MaskedArray
from typing import Optional, Union

from eodal.utils.exceptions import InputError
from eodal.core.utils.geometry import check_geometry_types


[docs] def count_valid( in_array: Union[np.array, MaskedArray], no_data_value: Optional[Union[int, float]] = 0.0, ) -> int: """ Counts the number of valid (i.e., non no-data) elements in a 2-d array. If a masked array is provided, the number of valid elements is the number of not-masked array elements. :param in_array: two-dimensional array to analyze. Can be an ordinary ``numpy.ndarray`` or a masked array. :param no_data_value: no data value indicating invalid array elements. Default set to zero. Ignored if ``in_array`` is a ``MaskedArray`` :returns: number of invalid array elements. """ if len(in_array.shape) > 2: raise InputError( f"Expected a two-dimensional array, got {len(in_array.shape)} instead." ) # masked array already has a count method if isinstance(in_array, MaskedArray): return in_array.count() # check if array is np.array or np.ndarray return (in_array != no_data_value).sum()
[docs] def upsample_array( in_array: np.array, scaling_factor: int, ) -> np.array: """ takes a 2-dimensional input array (i.e., image matrix) and splits every array cell (i.e., pixel) into X smaller ones having all the same value as the "super" cell they belong to, where X is the scaling factor (X>=1). This way the input image matrix gets a higher spatial resolution without changing any of the original pixel values. The value of the scaling_factor determines the spatial resolution of the output. If scaling_factor = 1 then the input and the output array are the same. If scaling_factor = 2 then the output array has a spatial resolution two times higher then the input (e.g. from 20 to 10 m), and so on. :param array_in: 2-d array (image matrix) :param scaling_factor: factor for increasing spatial resolution. Must be greater than/ equal to 1 :returns: upsampled array with pixel values in target spatial resolution """ # check inputs if scaling_factor < 1: raise ValueError("scaling_factor must be greater/equal 1") # define output image matrix array bounds shape_out = (in_array.shape[0] * scaling_factor, in_array.shape[1] * scaling_factor) out_array = np.zeros(shape_out, dtype=in_array.dtype) # increase resolution using itertools by repeating pixel values # scaling_factor times counter = 0 for row in range(in_array.shape[0]): column = in_array[row, :] out_array[counter : counter + scaling_factor, :] = list( itertools.chain.from_iterable( itertools.repeat(x, scaling_factor) for x in column ) ) counter += scaling_factor return out_array
# @njit(cache=True, parallel=True) def _fill_array( img_arr: np.ndarray, vals: np.ndarray, x_indices: np.ndarray, x_coords: np.ndarray, y_indices: np.ndarray, y_coords: np.ndarray, ) -> np.ndarray: """ `numba` accelerated back-end for fast rasterization of POINT vector features (`GeoDataFrame` attribute column to 2d-array). This method used `NEAREST_NEIGHBOR` to fill in pixel values! :param img_arr: target 2d array to populate with values from `vals` :param vals: `POINT` like feature values to rasterize :param x_indices: `POINT` x coordinates calculated from vector features :param x_coords: output raster x coordinates (from vector features' spatial extent) :param y_indices: `POINT` y coordinates calculated from vector features :param y_coords: output raster y coordinates (from vector features' spatial extent) :returns: 2d-array with rasterized `POINT` features """ _img_arr = img_arr.copy() nvals = vals.shape[0] for idx in prange(nvals): # search for closest array index corresponding to the pixel # and assign the value to the raster cell try: x_index = np.argmin(abs(x_indices - x_coords[idx])) y_index = np.argmin(abs(y_indices - y_coords[idx])) _img_arr[y_index, x_index] = vals[idx] except IndexError: continue return _img_arr
[docs] def array_from_points( gdf: gpd.GeoDataFrame, band_name_src: str, pixres_x: Union[int, float], pixres_y: Union[int, float], nodata_dst: Optional[Union[int, float]] = 0, dtype_src: Optional[str] = "float32", ) -> np.array: """ Converts a `GeoDataFrame` with POINT features into a 2-d `np.ndarray` using the full spatial extent of the input features NOTE: Currently, only nearest neighbor interpolation is supported :param gdf: `GeoDataFrame` with POINT features to convert to raster :param band_name_src: name of `GeoDataFrame` column to rasterize :param pixres_x: spatial resolution of the output raster (in units of the CRS of the `gdf` input) in x direction :param pixres_y: spatial resolution of the output raster (in units of the CRS of the `gdf` input) in y direction :param nodata_dst: no data values to assign to empty cells. Zero by default. :param dtype_src: data type of the resulting raster array. Per default "float32" is used. :returns: 2-d `numpy.ndarray` with rasterized POINT features """ # check input geometries, must be Point gdf = check_geometry_types( in_dataset=gdf, allowed_geometry_types=["Point"], remove_empty_geoms=True ) bounds = gdf.total_bounds # get upper left X/Y coordinates ulx = bounds[0] uly = bounds[-1] # get lower right X/Y coordinates to span the img matrix lrx = bounds[2] lry = bounds[1] # calculate max rows along x and y axis max_x_coord = int(np.ceil(abs((lrx - ulx) / pixres_x))) + 1 max_y_coord = int(np.ceil(abs((uly - lry) / pixres_y))) + 1 # create index lists for coordinates x_indices = np.arange(ulx, lrx + pixres_x, step=pixres_x) y_indices = np.arange(uly, lry + pixres_y, step=pixres_y) # un-flatten the DataFrame along the selected columns (e.g. loop over columns) img_arr = np.ones(shape=(max_y_coord, max_x_coord), dtype=dtype_src) * nodata_dst x_coords = gdf.geometry.x.values y_coords = gdf.geometry.y.values vals = gdf[band_name_src].values rasterized = _fill_array(img_arr, vals, x_indices, x_coords, y_indices, y_coords) return rasterized