import numpy as np
[docs]def replace_nans(array, max_iter, tol, kernel_size=2, method="disk"):
"""Replace NaN elements in an array using an iterative image inpainting
algorithm.
The algorithm is the following:
1) For each element in the input array, replace it by a weighted average
of the neighbouring elements which are not NaN themselves. The weights
depend on the method type. See Methods below.
2) Several iterations are needed if there are adjacent NaN elements.
If this is the case, information is "spread" from the edges of the
missing regions iteratively, until the variation is below a certain
threshold.
Methods:
localmean - A square kernel where all elements have the same value,
weights are equal to n/( (2*kernel_size+1)**2 -1 ),
where n is the number of non-NaN elements.
disk - A circular kernel where all elements have the same value,
kernel is calculated by::
if ((S-i)**2 + (S-j)**2)**0.5 <= S:
kernel[i,j] = 1.0
else:
kernel[i,j] = 0.0
where S is the kernel radius.
distance - A circular inverse distance kernel where elements are
weighted proportional to their distance away from the
center of the kernel, elements farther away have less
weight. Elements outside the specified radius are set
to 0.0 as in 'disk', the remaining of the weights are
calculated as::
maxDist = ((S)**2 + (S)**2)**0.5
kernel[i,j] = -1*(((S-i)**2 + (S-j)**2)**0.5 - maxDist)
where S is the kernel radius.
Parameters
----------
array : 2d or 3d np.ndarray
an array containing NaN elements that have to be replaced
if array is a masked array (numpy.ma.MaskedArray), then
the mask is reapplied after the replacement
max_iter : int
the number of iterations
tol : float
On each iteration check if the mean square difference between
values of replaced elements is below a certain tolerance `tol`
kernel_size : int
the size of the kernel, default is 1
method : str
the method used to replace invalid values. Valid options are
`localmean`, `disk`, and `distance`.
Returns
-------
filled : 2d or 3d np.ndarray
a copy of the input array, where NaN elements have been replaced.
"""
kernel_size = int(kernel_size)
filled = array.copy()
n_dim = len(array.shape)
# generating the kernel
kernel = np.zeros([2 * kernel_size + 1] * len(array.shape), dtype=int)
if method == "localmean":
kernel += 1
elif method == "disk":
dist, dist_inv = get_dist(kernel, kernel_size)
kernel[dist <= kernel_size] = 1
elif method == "distance":
dist, dist_inv = get_dist(kernel, kernel_size)
kernel[dist <= kernel_size] = dist_inv[dist <= kernel_size]
else:
raise ValueError(
"Known methods are: `localmean`, `disk` or `distance`."
)
# list of kernel array indices
# kernel_indices = np.indices(kernel.shape)
# kernel_indices = np.reshape(kernel_indices,
# (n_dim, (2 * kernel_size + 1) ** n_dim),
# order="C").T
# indices where array is NaN
nan_indices = np.array(np.nonzero(np.isnan(array))).T.astype(int)
# number of NaN elements
n_nans = len(nan_indices)
# arrays which contain replaced values to check for convergence
replaced_new = np.zeros(n_nans)
replaced_old = np.zeros(n_nans)
# make several passes
# until we reach convergence
for _ in range(max_iter):
# note: identifying new nan indices and looping other the new indices
# would give slightly different result
# for each NaN element
for k in range(n_nans):
ind = nan_indices[
k
] # 2 or 3 indices indicating the position of a nan element
# init to 0.0
replaced_new[k] = 0.0
# generating a list of indices of the convolution window in the
# array
slice_indices = np.array(np.meshgrid(*[range(i - kernel_size,
i + kernel_size + 1) for i in ind]))
# identifying all indices strictly inside the image edges:
in_mask = np.array(
[
np.logical_and(
slice_indices[i] < array.shape[i],
slice_indices[i] >= 0
)
for i in range(n_dim)
]
)
# logical and over x,y (and z) indices
in_mask = np.prod(in_mask, axis=0).astype(bool)
# extract window from array
win = filled[tuple(slice_indices[:, in_mask])]
# selecting the same points from the kernel
kernel_in = kernel[in_mask]
# sum of elements of the kernel that are not nan in the window
non_nan = np.sum(kernel_in[~np.isnan(win)])
if non_nan > 0:
# convolution with the kernel
replaced_new[k] = np.nansum(win * kernel_in) / non_nan
else:
# don't do anything if there is only nans around
replaced_new[k] = np.nan
# bulk replace all new values in array
filled[tuple(nan_indices.T)] = replaced_new
# check if replaced elements are below a certain tolerance
if np.mean((replaced_new - replaced_old) ** 2) < tol:
break
else:
replaced_old = replaced_new
return filled
[docs]def get_dist(kernel, kernel_size):
# generates a map of distances to the center of the kernel. This is later
# used to generate disk-shaped kernels and
# to fill in distance based weights
if len(kernel.shape) == 2:
# x and y coordinates for each points
xs, ys = np.indices(kernel.shape)
# maximal distance form center - distance to center (of each point)
dist = np.sqrt((ys - kernel_size) ** 2 + (xs - kernel_size) ** 2)
dist_inv = np.sqrt(2) * kernel_size - dist
if len(kernel.shape) == 3:
xs, ys, zs = np.indices(kernel.shape)
dist = np.sqrt(
(ys - kernel_size) ** 2 +
(xs - kernel_size) ** 2 +
(zs - kernel_size) ** 2
)
dist_inv = np.sqrt(3) * kernel_size - dist
return dist, dist_inv