Source code for tomography_tutorial.ancillary

import os
import re
import numpy as np

from osgeo import gdal, gdal_array

from scipy.ndimage.measurements import label
from scipy.ndimage import generate_binary_structure, find_objects


[docs]def listfiles(path, pattern): """ list files in a directory whose names match a regular expression Parameters ---------- path: str the directory to be searched pattern: str the regular expression search pattern Returns ------- list a list of absolute file names Example ------- >>> listfiles('/path/to/somedata', 'file[0-9].tif') ['/path/to/somedata/file1.tif', '/path/to/somedata/file2.tif', '/path/to/somedata/file3.tif'] """ return sorted([os.path.join(path, x) for x in os.listdir(path) if re.search(pattern, x)])
[docs]def normalize(slice): """ normalize a 1D array by its minimum and maximum values: .. math:: y = \\frac{x-min(x)}{max(x)-min(x)} Parameters ---------- slice: numpy.ndarray the 1d input array to be normalized Returns ------- numpy.ndarray the normalized array """ max = np.amax(slice) min = np.amin(slice) return np.divide((slice - min), (max - min))
[docs]def cbfi(slice, nTrack, height): """ computation of capon beam forming inversion for a single pixel. This function is used internally by the core function :func:`~tomography_tutorial.functions.capon_beam_forming_inversion`. Parameters ---------- slice: numpy.ndarray an array containing the covariance matrix and wave number for a single pixel nTrack: int the number of original SLC files height: int the maximum inversion height Returns ------- numpy.ndarray the tomographic result for one pixel """ kz = np.transpose([slice[(nTrack ** 2):]]) r0 = slice[0:(nTrack ** 2)].reshape((nTrack, nTrack)) z_vector = np.matrix(np.arange(-height, height + 1, 1)) # define the loading factor load_fac = (1 / 25.) * np.identity(nTrack) # define the steering matrix a_steer = np.exp(np.dot(complex(0, 1) * kz, z_vector)) # define the numerator of the filter habf hnum = np.dot(np.linalg.inv(r0 + load_fac), a_steer) # define the denominator of the filter habf hden0 = np.diag(np.dot(np.conjugate(np.transpose(a_steer)), hnum)) # replicate the diagonal by number of Tracks hden = np.array([hden0] * nTrack, dtype=np.complex64) h_abf = np.divide(hnum, hden) return np.diag(np.dot(np.dot(np.conjugate(np.transpose(h_abf)), r0), h_abf))
[docs]def lut_crop(lut_rg, lut_az, range_min=0, range_max=None, azimuth_min=0, azimuth_max=None): """ compute indices for subsetting the range and azimuth lookup tables (LUTs). The returned slices describe the minimum LUT subset, which contains all radar coordinates within the range-azimuth subset. Parameters ---------- lut_rg: numpy.ndarray the lookup table for range direction lut_az: numpy.ndarray the lookup table for azimuth direction range_min: int first range pixel range_max: int last range pixel azimuth_min: int first azimuth pixel azimuth_max: int last azimuth pixel Returns ------- tuple of slices the pixel indices for subsetting: (ymin:ymax, xmin:xmax) """ def get_indices(mask): """ helper function to get the array slices containing all rows and columns where a binary mask is one/True Parameters ---------- mask: numpy.ndarray the mask to be subsetted Returns ------- tuple of slices two slices for subsetting numpy arrays in the row (azimuth) and column (range) dimension """ # get the azimuth array indices for subsetting the LUT files az_index = np.where(np.any(mask, axis=1)) azmin = np.min(az_index) azmax = np.max(az_index) + 1 # get the range array indices for subsetting the LUT files rg_index = np.where(np.any(mask, axis=0)) rgmin = np.min(rg_index) rgmax = np.max(rg_index) + 1 return slice(azmin, azmax), slice(rgmin, rgmax) # create a LUT mask containing all radar coordinates of the defined subset lut_mask = (float(range_min) <= lut_rg) & \ (lut_rg <= float(range_max)) & \ (float(azimuth_min) <= lut_az) & \ (lut_az <= float(azimuth_max)) # subset the original mask to the computed subset coordinates indices = get_indices(lut_mask) lut_mask_sub = lut_mask[indices] # refine the mask using an image segmentation approach # create a new mask by selecting only the largest segment of the original mask s = generate_binary_structure(2, 2) labeled_array, num_features = label(lut_mask_sub, structure=s) obj_slices = find_objects(labeled_array) # identify the largest object obj_sizes = [(x.stop - x.start) * (y.stop - y.start) for x, y in obj_slices] obj_max = obj_sizes.index(max(obj_sizes)) # create a new mask subset containing only the largest object lut_mask_sub = labeled_array == (obj_max + 1) # create a new mask with original extent and place the values of the mask subset lut_mask = np.zeros(lut_mask.shape) lut_mask[indices] = lut_mask_sub # return the position indices of the mask object on the main mask return get_indices(lut_mask)
[docs]def geowrite(data, outname, reference, indices, nodata=-99): """ write an array to a file using an already geocoded file as reference. The output format is either GeoTiff (for 2D arrays) or ENVI (for 3D arrays). Parameters ---------- data: numpy.ndarray the array to write to the file; must be either 2D or 3D outname: str the file name reference: `gdal.Dataset <https://gdal.org/python/osgeo.gdal.Dataset-class.html>`_ the geocoded reference dataset indices: tuple of slices the slices which define the subset of data in reference, i.e. where is the data located within the reference pixel dimensions; see :func:`lut_crop` nodata: int the nodata value to write to the file Returns ------- """ geo_keys = ['xmin', 'xres', 'rotation_x', 'ymax', 'rotation_y', 'yres'] geo = dict(zip(geo_keys, reference.GetGeoTransform())) row_f, row_l = indices[0].start, indices[0].stop col_f, col_l = indices[1].start, indices[1].stop if data.ndim == 2: nrow, ncol = data.shape nbands = 1 elif data.ndim == 3: nrow, ncol, nbands = data.shape else: raise RuntimeError("parameter 'data' must be an array with either two or three dimensions") if (row_l - row_f, col_l - col_f) != (nrow, ncol): raise IndexError('mismatch of data dimensions and subset indices') geo['xmin'] += col_f * geo['xres'] geo['ymax'] -= row_f * abs(geo['yres']) geotransform = [geo[x] for x in geo_keys] driver = gdal.GetDriverByName('GTiff' if nbands == 1 else 'ENVI') dtype = gdal_array.NumericTypeCodeToGDALTypeCode(data.dtype) outDataset = driver.Create(outname, ncol, nrow, nbands, dtype) driver = None outDataset.SetMetadata(reference.GetMetadata()) outDataset.SetGeoTransform(geotransform) outDataset.SetProjection(reference.GetProjection()) if nbands == 1: outband = outDataset.GetRasterBand(1) outband.SetNoDataValue(nodata) outband.WriteArray(data) outband.FlushCache() else: for i in range(nbands): outband = outDataset.GetRasterBand(i+1) outband.SetNoDataValue(nodata) outband.WriteArray(data[:, :, i]) outband.FlushCache() ref_data = None outband = None outDataset = None
[docs]def geocode(data, lut_rg_name, lut_az_name, outname=None, range_min=0, range_max=None, azimuth_min=0, azimuth_max=None): """ Geocode a radar image using lookup tables. The LUTs are expected to be georeferenced and contain range and azimuth radar coordinates for a specific image data set which is linked to these LUTs. If parameter `data` is a subset of this data set, the pixel coordinates of this subset need to be defined. Parameters ---------- data: numpy.ndarray the image data in radar coordinates lut_rg_name: str the name of the range coordinates lookup table file lut_az_name: str the name of the azimuth coordinates lookup table file outname: str or None the name of the file to write; if None, the geocoded array is returned and no file is written. See function :func:`geowrite` for details on how the file is written. range_min: int the minimum range coordinate range_max: int the maximum range coordinate azimuth_min: int the minimum azimuth coordinate azimuth_max: int the maximum azimuth coordinate Returns ------- Example ------- >>> from osgeo import gdal >>> from tomography_tutorial.ancillary import geocode >>> image_name = 'path/to/somedata/image.tif' >>> lut_rg_name = 'path/to/somedata/lut_rg.tif' >>> lut_az_name = 'path/to/somedata/lut_az.tif' >>> outname = 'path/to/somedata/image_sub_geo.tif' >>> image_ras = gdal.Open(image_name) >>> image_mat = image_ras.ReadAsArray() >>> image_ras = None >>> image_mat_sub = image_mat[0:100, 200:400] >>> geocode(image_mat_sub, outname, lut_rg_name, lut_az_name, \ range_min=200, range_max=400, azimuth_min=0, azimuth_max=100) """ if data.ndim == 2: nazimuth, nrange = data.shape nbands = 1 elif data.ndim == 3: nazimuth, nrange, nbands = data.shape else: raise RuntimeError("parameter 'data' must be an array with either two or three dimensions") if not range_max: range_max = nrange if not azimuth_max: azimuth_max = nazimuth imgfile = gdal.Open(lut_rg_name) lut_rg = imgfile.ReadAsArray() imgfile = gdal.Open(lut_az_name) lut_az = imgfile.ReadAsArray() # compute indices for subsetting the LUTs to only contain the subset of selected radar coordinates indices = lut_crop(lut_rg, lut_az, range_min, range_max, azimuth_min, azimuth_max) # crop the LUTs to the selected subset lut_rg_sub = lut_rg[indices] lut_az_sub = lut_az[indices] # recompute the LUT radar coordinates lut_rg_sub = lut_rg_sub - range_min lut_az_sub = lut_az_sub - azimuth_min ######################################################################################################## # actual geocoding # create a mask containing only radar coordinates of the selected subset mask = ((lut_rg_sub < 0) | (lut_rg_sub > nrange)) | ((lut_az_sub < 0) | (lut_az_sub > nazimuth)) # combine the two LUTs by computing indices for a 1-D row-major flattened array lut_combi = lut_az_sub.astype(int) * nrange + lut_rg_sub.astype(int) # set all indices out of bounds to zero lut_combi[(lut_combi < 0) | ((nrange * nazimuth) < lut_combi)] = 0 if nbands == 1: # create the geo-coded array by flattening the radar image to 1D and indexing it by the combined LUT # the resulting array has the same dimensions as the LUT, which is still 2D mat_geo = data.flatten()[lut_combi] # mask out all areas outside the selected subset mat_geo[mask] = np.nan else: mat_geo = np.empty(lut_combi.shape + (nbands,), data.dtype) for i in range(nbands): sub = data[:, :, i].flatten()[lut_combi] sub[mask] = np.nan mat_geo[:, :, i] = sub ######################################################################################################## # write the result to disk if outname: geowrite(mat_geo, outname, imgfile, indices) imgfile = None if not outname: return mat_geo