Source code for openpiv.pyprocess

"""This module contains a pure python implementation of the basic
cross-correlation algorithm for PIV image processing."""

from typing import Optional, Tuple, Callable, Union
import numpy.lib.stride_tricks
import numpy as np
from numpy import log
from numpy import ma
from numpy.fft import rfft2 as rfft2_, irfft2 as irfft2_, fftshift as fftshift_
from scipy.signal import convolve2d as conv_


__licence_ = """
Copyright (C) 2011  www.openpiv.net

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""


[docs]def get_field_shape( image_size: Tuple[int,int], search_area_size: Tuple[int,int], overlap: Tuple[int,int], )->Tuple[int,int]: """Compute the shape of the resulting flow field. Given the image size, the interrogation window size and the overlap size, it is possible to calculate the number of rows and columns of the resulting flow field. Parameters ---------- image_size: two elements tuple a two dimensional tuple for the pixel size of the image first element is number of rows, second element is the number of columns, easy to obtain using .shape search_area_size: tuple the size of the interrogation windows (if equal in frames A,B) or the search area (in frame B), the largest of the two overlap: tuple the number of pixel by which two adjacent interrogation windows overlap. Returns ------- field_shape : 2-element tuple the shape of the resulting flow field """ field_shape = (np.array(image_size) - np.array(search_area_size)) // ( # type: ignore np.array(search_area_size) - np.array(overlap) ) + 1 return field_shape
[docs]def get_coordinates( image_size: Tuple[int,int], search_area_size: int, overlap: int, center_on_field: bool=True )->Tuple[np.ndarray, np.ndarray]: """Compute the x, y coordinates of the centers of the interrogation windows. for the SQUARE windows only, see also get_rect_coordinates the origin (0,0) is like in the image, top left corner positive x is an increasing column index from left to right positive y is increasing row index, from top to bottom Parameters ---------- image_size: two elements tuple a two dimensional tuple for the pixel size of the image first element is number of rows, second element is the number of columns. search_area_size: int the size of the search area windows, sometimes it's equal to the interrogation window size in both frames A and B overlap: int = 0 (default is no overlap) the number of pixel by which two adjacent interrogation windows overlap. Returns ------- x : 2d np.ndarray a two dimensional array containing the x coordinates of the interrogation window centers, in pixels. y : 2d np.ndarray a two dimensional array containing the y coordinates of the interrogation window centers, in pixels. Coordinate system 0,0 is at the top left corner, positive x to the right, positive y from top downwards, i.e. image coordinate system """ # get shape of the resulting flow field as a 2 component array field_shape = get_field_shape(image_size, (search_area_size, search_area_size), (overlap, overlap) ) # compute grid coordinates of the search area window centers # note the field_shape[1] (columns) for x x = ( np.arange(field_shape[1]) * (search_area_size - overlap) + (search_area_size) / 2.0 ) # note the rows in field_shape[0] y = ( np.arange(field_shape[0]) * (search_area_size - overlap) + (search_area_size) / 2.0 ) # moving coordinates further to the center, so that the points at the # extreme left/right or top/bottom # have the same distance to the window edges. For simplicity only integer # movements are allowed. if center_on_field is True: x += ( image_size[1] - 1 - ((field_shape[1] - 1) * (search_area_size - overlap) + (search_area_size - 1)) ) // 2 y += ( image_size[0] - 1 - ((field_shape[0] - 1) * (search_area_size - overlap) + (search_area_size - 1)) ) // 2 # the origin 0,0 is at top left # the units are pixels X, Y = np.meshgrid(x,y) return (X, Y)
[docs]def get_rect_coordinates( image_size: Tuple[int,int], window_size: Union[int, Tuple[int,int]], overlap: Union[int, Tuple[int,int]], center_on_field: bool=False, ): ''' Rectangular grid version of get_coordinates. ''' if isinstance(window_size, int): window_size = (window_size, window_size) if isinstance(overlap, int): overlap = (overlap, overlap) # @alexlib why the center_on_field is False? # todo: test True as well _, y = get_coordinates(image_size, window_size[0], overlap[0], center_on_field=center_on_field) x, _ = get_coordinates(image_size, window_size[1], overlap[1], center_on_field=center_on_field) X,Y = np.meshgrid(x[0,:], y[:,0]) return (X, Y)
[docs]def sliding_window_array( image: np.ndarray, window_size: Tuple[int,int]=(64,64), overlap: Tuple[int,int]=(32,32), )-> np.ndarray: ''' This version does not use numpy as_strided and is much more memory efficient. Basically, we have a 2d array and we want to perform cross-correlation over the interrogation windows. An approach could be to loop over the array but loops are expensive in python. So we create from the array a new array with three dimension, of size (n_windows, window_size, window_size), in which each slice, (along the first axis) is an interrogation window. ''' # if isinstance(window_size, int): # window_size = (window_size, window_size) # if isinstance(overlap, int): # overlap = (overlap, overlap) x, y = get_rect_coordinates(image.shape, window_size, overlap, center_on_field = False) x = (x - window_size[1]//2).astype(int) y = (y - window_size[0]//2).astype(int) x, y = np.reshape(x, (-1,1,1)), np.reshape(y, (-1,1,1)) win_x, win_y = np.meshgrid(np.arange(0, window_size[1]), np.arange(0, window_size[0])) win_x = win_x[np.newaxis,:,:] + x win_y = win_y[np.newaxis,:,:] + y windows = image[win_y, win_x] return windows
[docs]def moving_window_array(array, window_size, overlap): """ This is a nice numpy trick. The concept of numpy strides should be clear to understand this code. Basically, we have a 2d array and we want to perform cross-correlation over the interrogation windows. An approach could be to loop over the array but loops are expensive in python. So we create from the array a new array with three dimension, of size (n_windows, window_size, window_size), in which each slice, (along the first axis) is an interrogation window. """ sz = array.itemsize shape = array.shape array = np.ascontiguousarray(array) strides = ( sz * shape[1] * (window_size - overlap), sz * (window_size - overlap), sz * shape[1], sz, ) shape = ( int((shape[0] - window_size) / (window_size - overlap)) + 1, int((shape[1] - window_size) / (window_size - overlap)) + 1, window_size, window_size, ) return numpy.lib.stride_tricks.as_strided( array, strides=strides, shape=shape ).reshape(-1, window_size, window_size)
[docs]def find_first_peak(corr): """ Find row and column indices of the first correlation peak. Parameters ---------- corr : np.ndarray the correlation map fof the strided images (N,K,M) where N is the number of windows, KxM is the interrogation window size Returns ------- (i,j) : integers, index of the peak position peak : amplitude of the peak """ return np.unravel_index(np.argmax(corr), corr.shape), corr.max()
[docs]def find_second_peak(corr, i=None, j=None, width=2): """ Find the value of the second largest peak. The second largest peak is the height of the peak in the region outside a 3x3 submatrxi around the first correlation peak. Parameters ---------- corr: np.ndarray the correlation map. i,j : ints row and column location of the first peak. width : int the half size of the region around the first correlation peak to ignore for finding the second peak. Returns ------- i : int the row index of the second correlation peak. j : int the column index of the second correlation peak. corr_max2 : int the value of the second correlation peak. """ if i is None or j is None: (i, j), tmp = find_first_peak(corr) # create a masked view of the corr tmp = corr.view(ma.MaskedArray) # set width x width square submatrix around the first correlation peak as # masked. # Before check if we are not too close to the boundaries, otherwise we # have negative indices iini = max(0, i - width) ifin = min(i + width + 1, corr.shape[0]) jini = max(0, j - width) jfin = min(j + width + 1, corr.shape[1]) tmp[iini:ifin, jini:jfin] = ma.masked (i, j), corr_max2 = find_first_peak(tmp) return (i, j), corr_max2
[docs]def find_all_first_peaks(corr): ''' Find row and column indices of the first correlation peak. Parameters ---------- corr : np.ndarray the correlation map fof the strided images (N,K,M) where N is the number of windows, KxM is the interrogation window size Returns ------- index_list : integers, index of the peak position in (N,i,j) peaks_max : amplitude of the peak ''' ind = corr.reshape(corr.shape[0], -1).argmax(-1) peaks = np.array(np.unravel_index(ind, corr.shape[-2:])) peaks = np.vstack((peaks[0], peaks[1])).T index_list = [(i, v[0], v[1]) for i, v in enumerate(peaks)] peaks_max = np.nanmax(corr, axis = (-2, -1)) return np.array(index_list), np.array(peaks_max)
[docs]def find_all_second_peaks(corr, width = 2): ''' Find row and column indices of the first correlation peak. Parameters ---------- corr : np.ndarray the correlation map fof the strided images (N,K,M) where N is the number of windows, KxM is the interrogation window size width : int the half size of the region around the first correlation peak to ignore for finding the second peak Returns ------- index_list : integers, index of the peak position in (N,i,j) peaks_max : amplitude of the peak ''' indexes = find_all_first_peaks(corr)[0].astype(int) ind = indexes[:, 0] x = indexes[:, 1] y = indexes[:, 2] iini = x - width ifin = x + width + 1 jini = y - width jfin = y + width + 1 iini[iini < 0] = 0 # border checking ifin[ifin > corr.shape[1]] = corr.shape[1] jini[jini < 0] = 0 jfin[jfin > corr.shape[2]] = corr.shape[2] # create a masked view of the corr tmp = corr.view(np.ma.MaskedArray) for i in ind: tmp[i, iini[i]:ifin[i], jini[i]:jfin[i]] = np.ma.masked indexes, peaks = find_all_first_peaks(tmp) return indexes, peaks
[docs]def find_subpixel_peak_position(corr, subpixel_method="gaussian"): """ Find subpixel approximation of the correlation peak. This function returns a subpixels approximation of the correlation peak by using one of the several methods available. If requested, the function also returns the signal to noise ratio level evaluated from the correlation map. Parameters ---------- corr : np.ndarray the correlation map. subpixel_method : string one of the following methods to estimate subpixel location of the peak: 'centroid' [replaces default if correlation map is negative], 'gaussian' [default if correlation map is positive], 'parabolic'. Returns ------- subp_peak_position : two elements tuple the fractional row and column indices for the sub-pixel approximation of the correlation peak. If the first peak is on the border of the correlation map or any other problem, the returned result is a tuple of NaNs. """ # initialization # default_peak_position = (np.floor(corr.shape[0] / 2.), # np.floor(corr.shape[1] / 2.)) # default_peak_position = np.array([0,0]) eps = 1e-7 # subp_peak_position = tuple(np.floor(np.array(corr.shape)/2)) subp_peak_position = (np.nan, np.nan) # any wrong position will mark nan # check inputs if subpixel_method not in ("gaussian", "centroid", "parabolic"): raise ValueError(f"Method not implemented {subpixel_method}") # the peak locations (peak1_i, peak1_j), _ = find_first_peak(corr) # import pdb; pdb.set_trace() # the peak and its neighbours: left, right, down, up # but we have to make sure that peak is not at the border # @ErichZimmer noticed this bug for the small windows if ((peak1_i == 0) | (peak1_i == corr.shape[0]-1) | (peak1_j == 0) | (peak1_j == corr.shape[1]-1)): return subp_peak_position else: corr += eps # prevents log(0) = nan if "gaussian" is used (notebook) c = corr[peak1_i, peak1_j] cl = corr[peak1_i - 1, peak1_j] cr = corr[peak1_i + 1, peak1_j] cd = corr[peak1_i, peak1_j - 1] cu = corr[peak1_i, peak1_j + 1] # gaussian fit if np.logical_and(np.any(np.array([c, cl, cr, cd, cu]) < 0), subpixel_method == "gaussian"): subpixel_method = "parabolic" # try: if subpixel_method == "centroid": subp_peak_position = ( ((peak1_i - 1) * cl + peak1_i * c + (peak1_i + 1) * cr) / (cl + c + cr), ((peak1_j - 1) * cd + peak1_j * c + (peak1_j + 1) * cu) / (cd + c + cu), ) elif subpixel_method == "gaussian": nom1 = log(cl) - log(cr) den1 = 2 * log(cl) - 4 * log(c) + 2 * log(cr) nom2 = log(cd) - log(cu) den2 = 2 * log(cd) - 4 * log(c) + 2 * log(cu) subp_peak_position = ( peak1_i + np.divide(nom1, den1, out=np.zeros(1), where=(den1 != 0.0))[0], peak1_j + np.divide(nom2, den2, out=np.zeros(1), where=(den2 != 0.0))[0], ) elif subpixel_method == "parabolic": subp_peak_position = ( peak1_i + (cl - cr) / (2 * cl - 4 * c + 2 * cr), peak1_j + (cd - cu) / (2 * cd - 4 * c + 2 * cu), ) return subp_peak_position
[docs]def sig2noise_ratio( correlation: np.ndarray, sig2noise_method: str="peak2peak", width: int=2 )-> np.ndarray: """ Computes the signal to noise ratio from the correlation map. The signal to noise ratio is computed from the correlation map with one of two available method. It is a measure of the quality of the matching between to interrogation windows. Parameters ---------- corr : 3d np.ndarray the correlation maps of the image pair, concatenated along 0th axis sig2noise_method: string the method for evaluating the signal to noise ratio value from the correlation map. Can be `peak2peak`, `peak2mean` or None if no evaluation should be made. width : int, optional the half size of the region around the first correlation peak to ignore for finding the second peak. [default: 2]. Only used if ``sig2noise_method==peak2peak``. Returns ------- sig2noise : np.array the signal to noise ratios from the correlation maps. """ sig2noise = np.zeros(correlation.shape[0]) corr_max1 = np.zeros(correlation.shape[0]) corr_max2 = np.zeros(correlation.shape[0]) if sig2noise_method == "peak2peak": for i, corr in enumerate(correlation): # compute first peak position (peak1_i, peak1_j), corr_max1[i] = find_first_peak(corr) condition = ( corr_max1[i] < 1e-3 or peak1_i == 0 or peak1_i == corr.shape[0] - 1 or peak1_j == 0 or peak1_j == corr.shape[1] - 1 ) if condition: # return zero, since we have no signal. # no point to get the second peak, save time sig2noise[i] = 0.0 else: # find second peak height (peak2_i, peak2_j), corr_max2 = find_second_peak( corr, peak1_i, peak1_j, width=width ) border_condition = ( peak2_i == 0 or peak2_i == corr.shape[0] - 1 or peak2_j == 0 or peak2_j == corr.shape[1] - 1 ) condition = ( # non-valid second peak corr_max2 == 0 or ( # maybe a valid peak at the border, bad sign border_condition and corr_max2 > 0.5*corr_max1[i] ) ) if condition: # mark failed peak2 corr_max2 = np.nan sig2noise[i] = corr_max1[i] / corr_max2 elif sig2noise_method == "peak2mean": # only one loop for i, corr in enumerate(correlation): # compute first peak position (peak1_i, peak1_j), corr_max1[i] = find_first_peak(corr) condition = ( corr_max1[i] < 1e-3 or peak1_i == 0 or peak1_i == corr.shape[0] - 1 or peak1_j == 0 or peak1_j == corr.shape[1] - 1 ) if condition: # return zero, since we have no signal. # no point to get the second peak, save time corr_max1[i] = 0.0 # find means of all the correlation maps corr_max2 = np.abs(correlation.mean(axis=(-2, -1))) corr_max2[corr_max2 == 0] = np.nan # mark failed ones sig2noise = corr_max1 / corr_max2 else: raise ValueError("wrong sig2noise_method") # sig2noise is zero for all failed ones sig2noise[np.isnan(sig2noise)] = 0.0 return sig2noise
[docs]def vectorized_sig2noise_ratio(correlation, sig2noise_method = 'peak2peak', width = 2): ''' Computes the signal to noise ratio from the correlation map in a mostly vectorized approach, thus much faster. The signal to noise ratio is computed from the correlation map with one of two available method. It is a measure of the quality of the matching between to interrogation windows. Parameters ---------- corr : 3d np.ndarray the correlation maps of the image pair, concatenated along 0th axis sig2noise_method: string the method for evaluating the signal to noise ratio value from the correlation map. Can be `peak2peak`, `peak2mean` or None if no evaluation should be made. width : int, optional the half size of the region around the first correlation peak to ignore for finding the second peak. [default: 2]. Only used if sig2noise_method==peak2peak. Returns ------- sig2noise : np.array the signal to noise ratios from the correlation maps. ''' if sig2noise_method == "peak2peak": ind1, peaks1 = find_all_first_peaks(correlation) ind2, peaks2 = find_all_second_peaks(correlation, width = width) peaks1_i, peaks1_j = ind1[:, 1], ind1[:, 2] peaks2_i, peaks2_j = ind2[:, 1], ind2[:, 2] # peak checking flag = np.zeros(peaks1.shape).astype(bool) flag[peaks1 < 1e-3] = True flag[peaks1_i == 0] = True flag[peaks1_i == correlation.shape[1]-1] = True flag[peaks1_j == 0] = True flag[peaks1_j == correlation.shape[2]-1] = True flag[peaks2 < 1e-3] = True flag[peaks2_i == 0] = True flag[peaks2_i == correlation.shape[1]-1] = True flag[peaks2_j == 0] = True flag[peaks2_j == correlation.shape[2]-1] = True # peak-to-peak calculation peak2peak = np.divide( peaks1, peaks2, out=np.zeros_like(peaks1), where=(peaks2 > 0.0) ) peak2peak[flag is True] = 0 # replace invalid values return peak2peak elif sig2noise_method == "peak2mean": peaks, peaks1max = find_all_first_peaks(correlation) peaks = np.array(peaks) peaks1_i, peaks1_j = peaks[:,1], peaks[:, 2] peaks2mean = np.abs(np.nanmean(correlation, axis = (-2, -1))) # peak checking flag = np.zeros(peaks1max.shape).astype(bool) flag[peaks1max < 1e-3] = True flag[peaks1_i == 0] = True flag[peaks1_i == correlation.shape[1]-1] = True flag[peaks1_j == 0] = True flag[peaks1_j == correlation.shape[2]-1] = True # peak-to-mean calculation peak2mean = np.divide( peaks1max, peaks2mean, out=np.zeros_like(peaks1max), where=(peaks2mean > 0.0) ) peak2mean[flag is True] = 0 # replace invalid values return peak2mean else: raise ValueError(f"sig2noise_method not supported: {sig2noise_method}")
[docs]def fft_correlate_images( image_a: np.ndarray, image_b: np.ndarray, correlation_method: str="circular", normalized_correlation: bool=True, conj: Callable=np.conj, rfft2 = rfft2_, irfft2 = irfft2_, fftshift = fftshift_, )->np.ndarray: """ FFT based cross correlation of two images with multiple views of np.stride_tricks() The 2D FFT should be applied to the last two axes (-2,-1) and the zero axis is the number of the interrogation window This should also work out of the box for rectangular windows. Parameters ---------- image_a : 3d np.ndarray, first dimension is the number of windows, and two last dimensions are interrogation windows of the first image image_b : similar correlation_method : string one of the three methods implemented: 'circular' or 'linear' [default: 'circular]. normalized_correlation : string decides wetehr normalized correlation is done or not: True or False [default: True]. conj : function function used for complex conjugate rfft2 : function function used for rfft2 irfft2 : function function used for irfft2 fftshift : function function used for fftshift """ if normalized_correlation: # remove the effect of stronger laser or # longer exposure for frame B # image_a = match_histograms(image_a, image_b) # remove mean background, normalize to 0..1 range image_a = normalize_intensity(image_a) image_b = normalize_intensity(image_b) s1 = np.array(image_a.shape[-2:]) s2 = np.array(image_b.shape[-2:]) if correlation_method == "linear": # have to be normalized, mainly because of zero padding size = s1 + s2 - 1 fsize = 2 ** np.ceil(np.log2(size)).astype(int) - 1 fslice = (slice(0, image_a.shape[0]), slice((fsize[0]-s1[0])//2, (fsize[0]+s1[0])//2), slice((fsize[1]-s1[1])//2, (fsize[1]+s1[1])//2)) f2a = conj(rfft2(image_a, fsize, axes=(-2, -1))) # type: ignore f2b = rfft2(image_b, fsize, axes=(-2, -1)) # type: ignore corr = fftshift(irfft2(f2a * f2b).real, axes=(-2, -1))[fslice] elif correlation_method == "circular": f2a = conj(rfft2(image_a)) f2b = rfft2(image_b) corr = fftshift(irfft2(f2a * f2b).real, axes=(-2, -1)) else: print(f"correlation method {correlation_method } is not implemented") if normalized_correlation: corr = corr/(s2[0]*s2[1]) # for extended search area corr = np.clip(corr, 0, 1) return corr
[docs]def normalize_intensity(window): """Normalize interrogation window or strided image of many windows, by removing the mean intensity value per window and clipping the negative values to zero Parameters ---------- window : 2d np.ndarray the interrogation window array Returns ------- window : 2d np.ndarray the interrogation window array, with mean value equal to zero and intensity normalized to -1 +1 and clipped if some pixels are extra low/high """ window = window.astype(np.float32) window -= window.mean(axis=(-2, -1), keepdims=True, dtype=np.float32) tmp = window.std(axis=(-2, -1), keepdims=True) window = np.divide(window, tmp, out=np.zeros_like(window), where=(tmp != 0)) return np.clip(window, 0, window.max())
[docs]def correlate_windows(window_a, window_b, correlation_method="fft", convolve2d = conv_, rfft2 = rfft2_, irfft2 = irfft2_): """Compute correlation function between two interrogation windows. The correlation function can be computed by using the correlation theorem to speed up the computation. Parameters ---------- window_a : 2d np.ndarray a two dimensions array for the first interrogation window window_b : 2d np.ndarray a two dimensions array for the second interrogation window correlation_method : string, methods currently implemented: 'circular' - FFT based without zero-padding 'linear' - FFT based with zero-padding 'direct' - linear convolution based Default is 'fft', which is much faster. convolve2d : function function used for 2d convolutions rfft2 : function function used for rfft2 irfft2 : function function used for irfft2 Returns ------- corr : 2d np.ndarray a two dimensions array for the correlation function. Note that due to the wish to use 2^N windows for faster FFT we use a slightly different convention for the size of the correlation map. The theory says it is M+N-1, and the 'direct' method gets this size out the FFT-based method returns M+N size out, where M is the window_size and N is the search_area_size It leads to inconsistency of the output """ # first we remove the mean to normalize contrast and intensity # the background level which is take as a mean of the image # is subtracted # import pdb; pdb.set_trace() window_a = normalize_intensity(window_a) window_b = normalize_intensity(window_b) # this is not really circular one, as we pad a bit to get fast 2D FFT, # see fft_correlate for implementation if correlation_method in ("circular", "fft"): corr = fft_correlate_windows(window_a, window_b, rfft2 = rfft2, irfft2 = irfft2) elif correlation_method == "linear": # save the original size: s1 = np.array(window_a.shape) s2 = np.array(window_b.shape) size = s1 + s2 - 1 fslice = tuple([slice(0, int(sz)) for sz in size]) # and slice only the relevant part corr = fft_correlate_windows(window_a, window_b, rfft2 = rfft2, irfft2 = irfft2)[fslice] elif correlation_method == "direct": corr = convolve2d(window_a, window_b[::-1, ::-1], "full") else: print(f"correlation method {correlation_method } is not implemented") return corr
[docs]def fft_correlate_windows(window_a, window_b, rfft2 = rfft2_, irfft2 = irfft2_): """ FFT based cross correlation it is a so-called linear convolution based, since we increase the size of the FFT to reduce the edge effects. This should also work out of the box for rectangular windows. Parameters ---------- window_a : 2d np.ndarray a two dimensions array for the first interrogation window window_b : 2d np.ndarray a two dimensions array for the second interrogation window rfft2 : function function used for rfft2 irfft2 : function function used for irfft2 # from Stackoverflow: from scipy import linalg import numpy as np # works for rectangular windows as well x = [[1 , 0 , 0 , 0] , [0 , -1 , 0 , 0] , [0 , 0 , 3 , 0] , [0 , 0 , 0 , 1], [0 , 0 , 0 , 1]] x = np.array(x,dtype=np.float) y = [[4 , 5] , [3 , 4]] y = np.array(y) print ("conv:" , signal.convolve2d(x , y , 'full')) s1 = np.array(x.shape) s2 = np.array(y.shape) size = s1 + s2 - 1 fsize = 2 ** np.ceil(np.log2(size)).astype(int) fslice = tuple([slice(0, int(sz)) for sz in size]) new_x = np.fft.fft2(x , fsize) new_y = np.fft.fft2(y , fsize) result = np.fft.ifft2(new_x*new_y)[fslice].copy() print("fft for my method:" , np.array(result.real, np.int32)) """ s1 = np.array(window_a.shape) s2 = np.array(window_b.shape) size = s1 + s2 - 1 fsize = 2 ** np.ceil(np.log2(size)).astype(int) fslice = tuple([slice(0, int(sz)) for sz in size]) f2a = rfft2(window_a, fsize) f2b = rfft2(window_b[::-1, ::-1], fsize) corr = irfft2(f2a * f2b).real[fslice] return corr
[docs]def extended_search_area_piv( frame_a: np.ndarray, frame_b: np.ndarray, window_size: Union[int, Tuple[int,int]], overlap: Union[int, Tuple[int,int]]=(0,0), dt: float=1.0, search_area_size: Optional[Union[int, Tuple[int,int]]]=None, correlation_method: str="circular", subpixel_method: str="gaussian", sig2noise_method: Union[str, None]='peak2mean', width: int=2, normalized_correlation: bool=False, use_vectorized: bool=False, ): """Standard PIV cross-correlation algorithm, with an option for extended area search that increased dynamic range. The search region in the second frame is larger than the interrogation window size in the first frame. For Cython implementation see openpiv.process.extended_search_area_piv This is a pure python implementation of the standard PIV cross-correlation algorithm. It is a zero order displacement predictor, and no iterative process is performed. Parameters ---------- frame_a : 2d np.ndarray an two dimensions array of integers containing grey levels of the first frame. frame_b : 2d np.ndarray an two dimensions array of integers containing grey levels of the second frame. window_size : int the size of the (square) interrogation window, [default: 32 pix]. overlap : int the number of pixels by which two adjacent windows overlap [default: 16 pix]. dt : float the time delay separating the two frames [default: 1.0]. correlation_method : string one of the two methods implemented: 'circular' or 'linear', default: 'circular', it's faster, without zero-padding 'linear' requires also normalized_correlation = True (see below) subpixel_method : string one of the following methods to estimate subpixel location of the peak: 'centroid' [replaces default if correlation map is negative], 'gaussian' [default if correlation map is positive], 'parabolic'. sig2noise_method : string defines the method of signal-to-noise-ratio measure, ('peak2peak' or 'peak2mean'. If None, no measure is performed.) width : int the half size of the region around the first correlation peak to ignore for finding the second peak. [default: 2]. Only used if ``sig2noise_method==peak2peak``. search_area_size : int the size of the interrogation window in the second frame, default is the same interrogation window size and it is a fallback to the simplest FFT based PIV normalized_correlation: bool if True, then the image intensity will be modified by removing the mean, dividing by the standard deviation and the correlation map will be normalized. It's slower but could be more robust Returns ------- u : 2d np.ndarray a two dimensional array containing the u velocity component, in pixels/seconds. v : 2d np.ndarray a two dimensional array containing the v velocity component, in pixels/seconds. sig2noise : 2d np.ndarray, ( optional: only if sig2noise_method != None ) a two dimensional array the signal to noise ratio for each window pair. The implementation of the one-step direct correlation with different size of the interrogation window and the search area. The increased size of the search areas cope with the problem of loss of pairs due to in-plane motion, allowing for a smaller interrogation window size, without increasing the number of outlier vectors. See: Particle-Imaging Techniques for Experimental Fluid Mechanics Annual Review of Fluid Mechanics Vol. 23: 261-304 (Volume publication date January 1991) DOI: 10.1146/annurev.fl.23.010191.001401 originally implemented in process.pyx in Cython and converted to a NumPy vectorized solution in pyprocess.py """ # Reformat inputs so it works for both square and rectangular windows # first if we get integer window size -> make it tuple if isinstance(window_size, int): window_size = (window_size, window_size) # same for overlap if isinstance(overlap, int): overlap = (overlap, overlap) # if no search_size, copy window_size if search_area_size is None: search_area_size = window_size elif isinstance(search_area_size, int): search_area_size = (search_area_size, search_area_size) # verify that things are logically possible: if overlap[0] >= window_size[0] or overlap[1] >= window_size[1]: raise ValueError("Overlap has to be smaller than the window_size") if search_area_size[0] < window_size[0] or search_area_size[1] < window_size[1]: raise ValueError("Search size cannot be smaller than the window_size") if (window_size[1] > frame_a.shape[0]) or (window_size[0] > frame_a.shape[1]): raise ValueError("window size cannot be larger than the image") # get field shape n_rows, n_cols = get_field_shape(frame_a.shape, search_area_size, overlap) # We implement the new vectorized code aa = sliding_window_array(frame_a, search_area_size, overlap) bb = sliding_window_array(frame_b, search_area_size, overlap) # for the case of extended seearch, the window size is smaller than # the search_area_size. In order to keep it all vectorized the # approach is to use the interrogation window in both # frames of the same size of search_area_asize, # but mask out the region around # the interrogation window in the frame A if search_area_size > window_size: # before masking with zeros we need to remove # edges aa = normalize_intensity(aa) bb = normalize_intensity(bb) mask = np.zeros((search_area_size[0], search_area_size[1])).astype(aa.dtype) pady = int((search_area_size[0] - window_size[0]) / 2) padx = int((search_area_size[1] - window_size[1]) / 2) mask[slice(pady, search_area_size[0] - pady), slice(padx, search_area_size[1] - padx)] = 1 mask = np.broadcast_to(mask, aa.shape) aa *= mask corr = fft_correlate_images(aa, bb, correlation_method=correlation_method, normalized_correlation=normalized_correlation) if use_vectorized is True: u, v = vectorized_correlation_to_displacements(corr, n_rows, n_cols, subpixel_method=subpixel_method) else: u, v = correlation_to_displacement(corr, n_rows, n_cols, subpixel_method=subpixel_method) # return output depending if user wanted sig2noise information if sig2noise_method is not None: if use_vectorized is True: sig2noise = vectorized_sig2noise_ratio( corr, sig2noise_method=sig2noise_method, width=width ) else: sig2noise = sig2noise_ratio( corr, sig2noise_method=sig2noise_method, width=width ) else: sig2noise = np.zeros_like(u)*np.nan sig2noise = sig2noise.reshape(n_rows, n_cols) return u/dt, v/dt, sig2noise
[docs]def correlation_to_displacement(corr, n_rows, n_cols, subpixel_method="gaussian"): """ Correlation maps are converted to displacement for each interrogation window using the convention that the size of the correlation map is 2N -1 where N is the size of the largest interrogation window (in frame B) that is called search_area_size Inputs: corr : 3D nd.array contains output of the fft_correlate_images n_rows, n_cols : number of interrogation windows, output of the get_field_shape """ # iterate through interrogation widows and search areas u = np.zeros((n_rows, n_cols)) v = np.zeros((n_rows, n_cols)) # center point of the correlation map default_peak_position = np.floor(np.array(corr[0, :, :].shape)/2) for k in range(n_rows): for m in range(n_cols): # look at studying_correlations.ipynb # the find_subpixel_peak_position returns peak = np.array(find_subpixel_peak_position(corr[k*n_cols+m, :, :], subpixel_method=subpixel_method)) -\ default_peak_position # type: ignore # the horizontal shift from left to right is the u # the vertical displacement from top to bottom (increasing row) is v # x the vertical shift from top to bottom is row-wise shift is now # a negative vertical u[k, m], v[k, m] = peak[1], peak[0] return (u, v)
[docs]def vectorized_correlation_to_displacements(corr: np.ndarray, n_rows: Optional[int]=None, n_cols: Optional[int]=None, subpixel_method: str='gaussian', eps: float=1e-7 ): """ Correlation maps are converted to displacement for each interrogation window using the convention that the size of the correlation map is 2N -1 where N is the size of the largest interrogation window (in frame B) that is called search_area_size Parameters ---------- corr : 3D nd.array contains output of the fft_correlate_images n_rows, n_cols : number of interrogation windows, output of the get_field_shape mask_width: int distance, in pixels, from the interrogation window in which correlation peaks would be flagged as invalid Returns ------- u, v: 2D nd.array 2d array of displacements in pixels/dt """ if subpixel_method not in ("gaussian", "centroid", "parabolic"): raise ValueError(f"Method not implemented {subpixel_method}") corr = corr.astype(np.float32) + eps # avoids division by zero peaks = find_all_first_peaks(corr)[0] ind, peaks_x, peaks_y = peaks[:,0], peaks[:,1], peaks[:,2] peaks1_i, peaks1_j = peaks_x, peaks_y # peak checking if subpixel_method in ("gaussian", "centroid", "parabolic"): mask_width = 1 invalid = list(np.where(peaks1_i < mask_width)[0]) invalid += list(np.where(peaks1_i > corr.shape[1] - mask_width - 1)[0]) invalid += list(np.where(peaks1_j < mask_width - 0)[0]) invalid += list(np.where(peaks1_j > corr.shape[2] - mask_width - 1)[0]) peaks1_i[invalid] = corr.shape[1] // 2 # temp. so no errors would be produced peaks1_j[invalid] = corr.shape[2] // 2 print(f"Found {len(invalid)} bad peak(s)") if len(invalid) == corr.shape[0]: # in case something goes horribly wrong return np.zeros((np.size(corr, 0), 2))*np.nan #points c = corr[ind, peaks1_i, peaks1_j] cl = corr[ind, peaks1_i - 1, peaks1_j] cr = corr[ind, peaks1_i + 1, peaks1_j] cd = corr[ind, peaks1_i, peaks1_j - 1] cu = corr[ind, peaks1_i, peaks1_j + 1] if subpixel_method == "centroid": shift_i = ((peaks1_i - 1) * cl + peaks1_i * c + (peaks1_i + 1) * cr) / (cl + c + cr) shift_j = ((peaks1_j - 1) * cd + peaks1_j * c + (peaks1_j + 1) * cu) / (cd + c + cu) elif subpixel_method == "gaussian": inv = list(np.where(c <= 0)[0]) # get rid of any pesky NaNs inv += list(np.where(cl <= 0)[0]) inv += list(np.where(cr <= 0)[0]) inv += list(np.where(cu <= 0)[0]) inv += list(np.where(cd <= 0)[0]) #cl_, cr_ = np.delete(cl, inv), np.delete(cr, inv) #c_ = np.delete(c, inv) #cu_, cd_ = np.delete(cu, inv), np.delete(cd, inv) nom1 = log(cl) - log(cr) den1 = 2 * log(cl) - 4 * log(c) + 2 * log(cr) nom2 = log(cd) - log(cu) den2 = 2 * log(cd) - 4 * log(c) + 2 * log(cu) shift_i = np.divide( nom1, den1, out=np.zeros_like(nom1), where=(den1 != 0.0) ) shift_j = np.divide( nom2, den2, out=np.zeros_like(nom2), where=(den2 != 0.0) ) if len(inv) >= 1: print(f'Found {len(inv)} negative correlation indices resulting in NaNs\n'+ 'Fallback for negative indices is a 3 point parabolic curve method') shift_i[inv] = (cl[inv] - cr[inv]) / (2 * cl[inv] - 4 * c[inv] + 2 * cr[inv]) shift_j[inv] = (cd[inv] - cu[inv]) / (2 * cd[inv] - 4 * c[inv] + 2 * cu[inv]) elif subpixel_method == "parabolic": shift_i = (cl - cr) / (2 * cl - 4 * c + 2 * cr) shift_j = (cd - cu) / (2 * cd - 4 * c + 2 * cu) if subpixel_method != "centroid": disp_vy = (peaks1_i.astype(np.float64) + shift_i) - np.floor(np.array(corr.shape[1])/2) disp_vx = (peaks1_j.astype(np.float64) + shift_j) - np.floor(np.array(corr.shape[2])/2) else: disp_vy = shift_i - np.floor(np.array(corr.shape[1])/2) disp_vx = shift_j - np.floor(np.array(corr.shape[2])/2) disp_vx[invalid] = peaks_x[invalid]*np.nan disp_vy[invalid] = peaks_y[invalid]*np.nan #disp[ind, :] = np.vstack((disp_vx, disp_vy)).T #return disp[:,0].reshape((n_rows, n_cols)), disp[:,1].reshape((n_rows, n_cols)) if n_rows == None or n_cols == None: return disp_vx, disp_vy else: return disp_vx.reshape((n_rows, n_cols)), disp_vy.reshape((n_rows, n_cols))
[docs]def nextpower2(i): """ Find 2^n that is equal to or greater than. """ n = 1 while n < i: n *= 2 return n