#! /usr/bin/env python
"""
Module to estimate the spectral correlation between channels of an IFS datacube.
"""
__author__ = 'V. Christiaens'
__all__ = ['spectral_correlation',
'combine_spec_corrs']
from astropy.stats import gaussian_fwhm_to_sigma
import numpy as np
from scipy.optimize import curve_fit
[docs]def spectral_correlation(array, awidth=2, r_in=1, r_out=None, pl_xy=None,
mask_r=4, fwhm=4, sp_fwhm_guess=3, full_output=False):
""" Computes the spectral correlation between (post-processed) IFS frames,
as a function of radius, implemented as Eq. 7 of [GRE16]_. This is a crucial
step for an unbiased fit of a measured IFS spectrum to either synthetic or
template spectra.
Parameters
----------
array : numpy ndarray
Input cube or 3d array, of dimensions n_ch x n_y x n_x; where n_y and
n_x should be odd values (star should be centered on central pixel).
awidth : int, optional
Width in pixels of the concentric annuli used to compute the spectral
correlation as a function of radial separation. Greco & Brandt 2017
noted no significant differences for annuli between 1 and 3 pixels
width on GPI data.
r_in: int, optional
Innermost radius where the spectral correlation starts to be computed.
r_out: int, optional
Outermost radius where the spectral correlation is computed. If left as
None, it will automatically be computed up to the edge of the frame.
pl_xy: tuple of tuples of 2 floats, optional
x,y coordinates of all companions present in the images. If provided,
a circle centered on the location of each companion will be masked out
for the spectral correlation computation.
mask_r: float, optional
if pl_xy is provided, this should also be provided. Size of the
aperture around each companion (in terms of fwhm) that is discarded to
not bias the spectral correlation computation.
fwhm: float, optional
if pl_xy is provided, this should also be provided. By default we
consider a 2FWHM aperture mask around each companion to not bias the
spectral correlation computation.
sp_fwhm_guess: float, optional
Initial guess on the spectral FWHM of all channels.
full_output: bool, opt
Whether to also output the fitted spectral FWHM for each channel, and
the vector of radial separation at which each spectral correlation
matrix is calculated.
Returns
-------
sp_corr : numpy ndarray
3d array of spectral correlation, as a function of radius with
dimensions: n_rad x n_ch x n_ch, where n_rad = int((r_out-r_in)/2)
sp_fwhm: numpy ndarray
(if full_output is True) 2d array containing the spectral fwhm at each
radius, for each spectral channel. Dims: n_rad x n_ch
sp_rad: numpy ndarray
(if full_output is True) 1d array containing the radial separation of
each measured spectral correlation matrix. Dims: n_rad
Note
----
Radii that are skipped will be filled with zeros in the output cube.
"""
if not isinstance(awidth,int) or not isinstance(r_in,int):
raise TypeError("Inputs should be integers")
if array.ndim != 3:
raise TypeError("Input array should be 3D.")
n_ch, n_y, n_x = array.shape
n_r = min((n_y-1)/2.,(n_x-1)/2.)
if n_r%1:
raise TypeError("Input array y and x dimensions should be odd")
if r_out is None:
r_out = n_r
test_rads = np.arange(r_in-1,r_out-1)
n_rad = max(1,int(np.floor(test_rads.shape[0]/awidth)))
#n_rad = int(np.ceil(n_r/ann_width)) # effective number of annuli probed
sp_corr = np.zeros([n_rad,n_ch,n_ch])
sp_rad= np.zeros([n_rad])
if full_output:
sp_fwhm = np.zeros([n_rad,n_ch])
def gauss_1fp(x, *p):
sigma = p[0]*gaussian_fwhm_to_sigma
return np.exp(-x**2/(2.*sigma**2))
mask_f = np.zeros_like(array[0])
if pl_xy is not None:
mask = np.ones_like(array[0])
for i in range(len(pl_xy)):
if not isinstance(pl_xy[i], tuple):
raise TypeError("Format of companions coordinates incorrect")
mask_i = get_circle(mask, radius=mask_r*fwhm, cy=pl_xy[i][1],
cx=pl_xy[i][0], mode="mask")
mask_f[np.where(mask_i)] = 1
for ann in range(n_rad):
inner_radius = r_in+ (ann * awidth)
ind = get_annulus_segments(array[0], inner_radius, awidth)
yy = ind[0][0]
xx = ind[0][1]
yy_final = [yy[i] for i in range(len(ind[0][0])) if not mask_f[yy[i],
xx[i]]]
xx_final = [xx[i] for i in range(len(ind[0][0])) if not mask_f[yy[i],
xx[i]]]
matrix = array[:, yy_final, xx_final] # shape (z, npx_annsegm)
sp_rad[ann*awidth:(ann+1)*awidth] = r_in+(ann+0.5)*awidth
for zi in range(n_ch):
for zj in range(n_ch):
num = np.nanmean(matrix[zi]*matrix[zj])
denom = np.sqrt(np.nanmean(matrix[zi]*matrix[zi])* \
np.nanmean(matrix[zj]*matrix[zj]))
sp_corr[ann*awidth:(ann+1)*awidth,zi,zj] = num/denom
if full_output:
p0 = (sp_fwhm_guess,)
x = np.arange(n_ch)-zi
y = sp_corr[ann*awidth,zi]# norm y
y = y-np.amin(y)
y = y/np.amax(y)
coeff, var_matrix = curve_fit(gauss_1fp, x, y, p0=p0)
sp_fwhm[ann*awidth:(ann+1)*awidth,zi] = coeff[0]
# Zero is adopted for uncorrelated channels
sp_corr[np.where(sp_corr<0)] = 0
if full_output:
return sp_corr, sp_fwhm, sp_rad
else:
return sp_corr
[docs]def combine_spec_corrs(arr_list):
""" Combines the spectral correlation matrices of different instruments
into a single square matrix (required for input of spectral fits).
Parameters
----------
arr_list : list or tuple of numpy ndarrays
List/tuple containing the distinct square spectral correlation matrices
OR ones (for independent photometric measurements).
Returns
-------
combi_corr : numpy 2d ndarray
2d square ndarray representing the combined spectral correlation.
"""
n_arr = len(arr_list)
size = 0
for nn in range(n_arr):
if isinstance(arr_list[nn],np.ndarray):
if arr_list[nn].ndim != 2:
raise TypeError("Arrays of the tuple should be 2d")
elif arr_list[nn].shape[0] != arr_list[nn].shape[1]:
raise TypeError("Arrays of the tuple should be square")
size+=arr_list[nn].shape[0]
elif arr_list[nn] == 1:
size+=1
else:
raise TypeError("Tuple can only have square 2d arrays or ones")
combi_corr = np.zeros([size,size])
size_tmp = 0
for nn in range(n_arr):
if isinstance(arr_list[nn],np.ndarray):
mm = arr_list[nn].shape[0]
combi_corr[size_tmp:size_tmp+mm,size_tmp:size_tmp+mm]=arr_list[nn]
size_tmp+=mm
elif arr_list[nn] == 1:
combi_corr[size_tmp,size_tmp]=1
size_tmp+=1
return combi_corr
def get_annulus_segments(data, inner_radius, width, nsegm=1, theta_init=0,
optim_scale_fact=1, mode="ind"):
"""
Return indices or values in segments of a centerered annulus (as in ``VIP``).
The annulus is defined by ``inner_radius <= annulus < inner_radius+width``.
Parameters
----------
data : 2d numpy ndarray or tuple
Input 2d array (image) or tuple with its shape.
inner_radius : float
The inner radius of the donut region.
width : float
The size of the annulus.
nsegm : int
Number of segments of annulus to be extracted.
theta_init : int
Initial azimuth [degrees] of the first segment, counting from the
positive x-axis counterclockwise.
optim_scale_fact : float
To enlarge the width of the segments, which can then be used as
optimization segments (e.g. in LOCI).
mode : {'ind', 'val', 'mask'}, optional
Controls what is returned: indices of selected pixels, values of
selected pixels, or a boolean mask.
Returns
-------
indices : list of ndarrays
[mode='ind'] Coordinates of pixels for each annulus segment.
values : list of ndarrays
[mode='val'] Pixel values.
masked : list of ndarrays
[mode='mask'] Copy of ``data`` with masked out regions.
"""
array = frame_or_shape(data)
if not isinstance(nsegm, int):
raise TypeError('`nsegm` must be an integer')
cy, cx = frame_center(array)
azimuth_coverage = np.deg2rad(int(np.ceil(360 / nsegm)))
twopi = 2 * np.pi
yy, xx = np.mgrid[:array.shape[0], :array.shape[1]]
rad = np.sqrt((xx - cx) ** 2 + (yy - cy) ** 2)
phi = np.arctan2(yy - cy, xx - cx)
phirot = phi % twopi
outer_radius = inner_radius + (width*optim_scale_fact)
masks = []
for i in range(nsegm):
phi_start = np.deg2rad(theta_init) + (i * azimuth_coverage)
phi_end = phi_start + azimuth_coverage
if phi_start < twopi and phi_end > twopi:
masks.append((rad >= inner_radius) & (rad < outer_radius) &
(phirot >= phi_start) & (phirot <= twopi) |
(rad >= inner_radius) & (rad < outer_radius) &
(phirot >= 0) & (phirot < phi_end - twopi))
elif phi_start >= twopi and phi_end > twopi:
masks.append((rad >= inner_radius) & (rad < outer_radius) &
(phirot >= phi_start - twopi) &
(phirot < phi_end - twopi))
else:
masks.append((rad >= inner_radius) & (rad < outer_radius) &
(phirot >= phi_start) & (phirot < phi_end))
if mode == "ind":
return [np.where(mask) for mask in masks]
elif mode == "val":
return [array[mask] for mask in masks]
elif mode == "mask":
return [array*mask for mask in masks]
else:
raise ValueError("mode '{}' unknown!".format(mode))
def get_circle(array, radius, cy=None, cx=None, mode="mask"):
"""
Return a centered circular region from a 2d ndarray (as in ``VIP``).
Parameters
----------
array : numpy ndarray
Input 2d array or image.
radius : int
The radius of the circular region.
cy, cx : int, optional
Coordinates of the circle center. If one of them is ``None``, the
center of ``array`` is used.
mode : {'mask', 'val'}, optional
Controls what is returned: array with circular mask applied, or values
of the pixels in the circular region.
Returns
-------
masked : numpy ndarray
[mode="mask"] Input array with the circular mask applied.
values : numpy ndarray
[mode="val"] 1d array with the values of the pixels in the circular
region.
Notes
-----
An alternative implementation would use ``skimage.draw.disk``. ``disk``
performs better on large ``array``s (e.g. 1000px, 10.000px), while the
current implementation is faster for small ``array``s (e.g. 100px). See
`test_shapes.py` for benchmark details.
"""
if array.ndim != 2:
raise TypeError('Input array is not a frame or 2d array.')
sy, sx = array.shape
if cy is None or cx is None:
cy, cx = frame_center(array, verbose=False)
# ogrid is a multidim mesh creator (faster than mgrid):
yy, xx = np.ogrid[:sy, :sx]
circle = (yy - cy) ** 2 + (xx - cx) ** 2 # eq of circle. sq dist to center
circle_mask = circle < radius ** 2 # boolean mask
if mode == "mask":
return array * circle_mask
elif mode == "val":
return array[circle_mask]
else:
raise ValueError("mode '{}' unknown!".format(mode))
def frame_center(array, verbose=False):
"""
Return the coordinates y,x of the frame(s) center.
If odd: dim/2-0.5
If even: dim/2
Parameters
----------
array : 2d/3d/4d numpy ndarray
Frame or cube.
verbose : bool optional
If True the center coordinates are printed out.
Returns
-------
cy, cx : int
Coordinates of the center.
"""
if array.ndim == 2:
shape = array.shape
elif array.ndim == 3:
shape = array[0].shape
elif array.ndim == 4:
shape = array[0, 0].shape
else:
raise ValueError('`array` is not a 2d, 3d or 4d array')
cy = shape[0] / 2
cx = shape[1] / 2
if shape[0]%2:
cy-=0.5
if shape[1]%2:
cx-=0.5
if verbose:
print('Center px coordinates at x,y = ({}, {})'.format(cx, cy))
return int(cy), int(cx)
def frame_or_shape(data):
"""
Sanitize ``data``, always return a 2d frame.
If ``data`` is a 2d frame, it is returned unchanged. If it is a shaped,
return an empty array of that shape.
Parameters
----------
data : 2d ndarray or shape tuple
Returns
-------
array : 2d ndarray
"""
if isinstance(data, np.ndarray):
array = data
if array.ndim != 2:
raise TypeError('`data` is not a frame or 2d array')
elif isinstance(data, tuple):
array = np.zeros(data)
else:
raise TypeError('`data` must be a tuple (shape) or a 2d array')
return array