Source code for vcstools.analyse_psf

"""
Scripts to analyise the output PSFs
"""
import numpy as np
from itertools import chain
from scipy.interpolate import UnivariateSpline
import matplotlib.pyplot as plt
import matplotlib.colors as colors

from astropy.time import Time
from astropy.constants import c
from astropy.io import fits

from vcstools.pointing_utils import getTargetRADec, deg2sex

import logging
logger = logging.getLogger(__name__)


[docs]def read_vcsbeam_psf(psf_file): """Read in the PSF data file output from vcsbeam's mwa_tied_array_beam_psf Parameters ---------- psf_file : `str` The PSF data file output from vcsbeam's mwa_tied_array_beam_psf. Returns ------- ra : `numpy.array`, (Ny, Nx) The RA (degrees) in the meshgrid format. dec : `numpy.array`, (Ny, Nx) The declination (degrees) in the meshgrid format. power : `numpy.array`, (Ny, Nx) The the data values of the fits file in the meshgrid format. """ input_array = np.loadtxt(psf_file, dtype=float) ra = input_array[:,0] dec = input_array[:,1] power = input_array[:,2] ra.shape = dec.shape = power.shape = (int(np.sqrt(input_array.shape[0])), int(np.sqrt(input_array.shape[0]))) return ra, dec, power
[docs]def plot_vcsbeam_psf(psf_file, output_name="vcsbeam_psf.png", normalise=False, vmin=None): """Plot the PSF data file output from vcsbeam's mwa_tied_array_beam_psf Parameters ---------- psf_file : `str` The PSF data file output from vcsbeam's mwa_tied_array_beam_psf. output_name : `str`, optional Output plot name. |br| Default: vcsbeam_psf.png. normalise : `boolean`, optional Normalise the PSF. |br| Default: `False`. vmin : `float`, optional Minimum value of plot. |br| Default: `None`. """ ra, dec, power = read_vcsbeam_psf(psf_file) if normalise: power = power / np.amax(power) ax = plt.subplot(1, 1, 1,) im = ax.pcolormesh(ra, dec, power, norm=colors.LogNorm(), cmap='plasma', vmin=vmin) plt.xlabel(r"Right Ascension (hours)") plt.ylabel(r"Declination ($^{\circ}$)") plt.colorbar(im,#spacing='uniform', shrink = 0.65, #ticks=[2., 10., 20., 30., 40., 50.], label="Normalised array factor power") plt.savefig(output_name)
[docs]def plot_track_beam_response(response_file, output_name="vcsbeam_response.png", time_max=-1): """Plot the data file output from vcsbeam's mwa_track_primary_beam_response Parameters ---------- response_file : `str` The PSF data file output from vcsbeam's mwa_track_primary_beam_response. output_name : `str`, optional Output plot name. |br| Default: vcsbeam_psf.png. time_max : `int`, optional Maximum number of seconds to process. |br| Default: -1. """ input_array = np.loadtxt(response_file, dtype=float) sec = input_array[:,0] freq = input_array[:,1] stokes_I = input_array[:,5] array_factor = input_array[:,9] response = stokes_I * array_factor**2 # Split into freq chunks nsec = max(sec) nfreq = input_array.shape[0] // nsec sec_map = np.array(np.array_split(sec, nfreq))[:,:time_max] freq_map = np.array(np.array_split(freq, nfreq))[:,:time_max] power_map = np.array(np.array_split(response, nfreq))[:,:time_max] # Sum over frequency power_time = np.average(power_map, axis=0) sec_time = sec_map[0] # Sum over time power_freq = np.average(power_map, axis=1) freq_freq = freq_map[:,0] size = 3 fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(2*size,size)) # Plot rack over time ax1.plot(sec_time, power_time) ax1.set_xlabel("Time (s)") ax1.set_ylabel("Array Factor") # Plot rack over time ax2.plot(freq_freq, power_freq) ax2.set_xlabel("Frequency (MHz)") ax2.set_ylabel("Array Factor") plt.tight_layout() plt.savefig(output_name)
[docs]def plot_pabeam(dat_file, output_name="pabeam_psf.png"): """Makes a polar plot over the sky response with the data file output from pabeam.py Parameters ---------- dat_file : `str` The data file output from pabeam.py. output_name : `str`, optional Output plot name. |br| Default: pabeam_psf.png. """ with open(dat_file) as file: lines = file.readlines() lines = [line.rstrip() for line in lines] nza = int(lines[7].split(" ")[-1]) naz = int(lines[8].split(" ")[-1]) input_array = np.loadtxt(dat_file, dtype=float) za = input_array[:,0] az = input_array[:,1] power = input_array[:,2] za.shape = az.shape = power.shape = (naz, nza) ax = plt.subplot(1, 1, 1, projection='polar') im = plt.pcolormesh(np.radians(az), za, power) plt.xlabel(r"Azimuth ($^{\circ}$)") plt.ylabel(r"Zenith ($^{\circ}$)") plt.colorbar(im, label="Normalised array factor power") plt.savefig(output_name, dpi=500)
[docs]def read_pabeam_ra_dec(dat_file): """Read in the PSF data file output from pabeam.py when using the --ra_dec_projection option. Parameters ---------- dat_file : `str` The PSF data file output from pabeam.py Returns ------- ra : `numpy.array`, (Nx, Ny) The RA (degrees) in the meshgrid format dec : `numpy.array`, (Nx, Ny) The declination (degrees) in the meshgrid format power : `numpy.array`, (Nx, Ny) The the data values of the fits file in the meshgrid format """ with open(dat_file) as file: lines = file.readlines() lines = [line.rstrip() for line in lines] date = lines[3].split("'")[-2] time = Time(date, format='iso', scale='utc') nra = int(lines[7].split(" ")[-1]) ndec = int(lines[8].split(" ")[-1]) input_array = np.loadtxt(dat_file, dtype=float) za = input_array[:,0] az = input_array[:,1] power = input_array[:,2] # Convert to RA and Dec _, _, ra, dec = getTargetRADec(az, za, time) ra.shape = dec.shape = power.shape = (nra, ndec) return ra, dec, power
[docs]def plot_pabeam_ra_dec(dat_file, output_name="pabeam_psf.png", normalise=False, vmin=None): """Plots the PSF data file output pabeam.py when using the --ra_dec_projection option. Parameters ---------- dat_file : `str` The PSF data file output pabeam.py when using the --ra_dec_projection option output_name : `str`, optional Output plot name. |br| Default: pabeam_psf.png. normalise : `boolean`, optional Normalise the PSF. |br| Default: `False`. vmin : `float`, optional Minimum value of plot. |br| Default: `None`. """ ra, dec, power = read_pabeam_ra_dec(dat_file) ax = plt.subplot(1, 1, 1,) if normalise: power = power / np.amax(power) if fft_abs: # Take the fft of the PSF and plot the amplitude fits_fft = np.fft.fftshift(np.fft.fft2(power)) im = plt.imshow(np.abs(fits_fft), cmap='plasma', norm=colors.LogNorm()) elif fft_angle: # Take the fft of the PSF and plot the phase fits_fft = np.fft.fftshift(np.fft.fft2(power)) im = plt.imshow(np.angle(fits_fft), cmap='htp', vmin=-np.pi, vmax=np.pi) else: # Create plot im = plt.pcolormesh(ra, dec, power, cmap='plasma', vmin=vmin, norm=colors.LogNorm(), vmax=0.3) plt.xlabel(r"Right Acension ($^{\circ}$)") plt.ylabel(r"Declination ($^{\circ}$)") plt.colorbar(im) plt.savefig(output_name, dpi=500)
[docs]def read_psf_fits(fits_file, centre_size=None): """Read in an imaging PSF fits file (from WSCLEAN for example). Parameters ---------- fits_file : `str` The PSF fits file gained from imaging (WSCLEAN). centre_size : `int`, optional The radius in pixels to plot from the centre. |br| Default: `None`. Returns ------- rav : `numpy.array`, (Nx, Ny) The RA (degrees) in the meshgrid format. decv : `numpy.array`, (Nx, Ny) The declination (degrees) in the meshgrid format. fits_data : `numpy.array`, (Nx, Ny) The the data values of the fits file in the meshgrid format. """ hdul = fits.open(fits_file) # RA header info ra_pix_res = hdul[0].header['CDELT1'] ra_ref_pixel = int(hdul[0].header['CRPIX1']) ra_ref_pos = hdul[0].header['CRVAL1'] if ra_ref_pos < 0.: ra_ref_pos = 360. + ra_ref_pos # Dec header info dec_pix_res = hdul[0].header['CDELT2'] dec_ref_pixel = int(hdul[0].header['CRPIX2']) dec_ref_pos = hdul[0].header['CRVAL2'] # Other header info centre_freq = hdul[0].header['CRVAL3'] #Hz if len(hdul[0].data.shape) == 4: _, _, ra_npixels, dec_npixels = hdul[0].data.shape fits_data = hdul[0].data[0][0] else: ra_npixels, dec_npixels = hdul[0].data.shape fits_data = hdul[0].data raj, decj = deg2sex(float(ra_ref_pos), float(dec_ref_pos)) pointing = "{}_{}".format(raj,decj) # Calculate RA Dec range ra_start = ra_ref_pos + ra_ref_pixel * ra_pix_res ra_stop = ra_start - ra_npixels * ra_pix_res ra_range = np.linspace(ra_stop, ra_start, ra_npixels) dec_start = dec_ref_pos - dec_ref_pixel * dec_pix_res dec_stop = dec_start + dec_npixels * dec_pix_res dec_range = np.linspace(dec_start, dec_stop, dec_npixels) # Plot data rav, decv = np.meshgrid(ra_range, dec_range) if centre_size: # Grab the centre pixels rav = rav[ra_ref_pixel-centre_size:ra_ref_pixel+centre_size, dec_ref_pixel-centre_size:dec_ref_pixel+centre_size] decv = decv[ra_ref_pixel-centre_size:ra_ref_pixel+centre_size, dec_ref_pixel-centre_size:dec_ref_pixel+centre_size] fits_data = fits_data[ra_ref_pixel-centre_size:ra_ref_pixel+centre_size, dec_ref_pixel-centre_size:dec_ref_pixel+centre_size] return rav, decv, fits_data
[docs]def plot_imaging_psf(fits_file, output_name="imaging_psf.png", normalise=False, vmin=None, centre_size=None, fft_abs=False, fft_angle=False, centre=False): """Plots the imaging PSF (from WSCLEAN for example). Parameters ---------- fits_file : `str` The PSF fits file gained from imaging (WSCLEAN). output_name : `str`, optional Output plot name. |br| Default: imaging_psf.png. normalise : `boolean`, optional Normalise the PSF. |br| Default: `False`. vmin : `float`, optional Minimum value of plot. |br| Default: `None`. centre_size : `int`, optional The radius in pixels to plot from the centre. |br| Default: `None`. fft_abs : `boolean`, optional Take the fft of the PSF and plot the amplitude. |br| Default: `False`. fft_angle : `boolean`, optional Take the fft of the PSF and plot the phase. |br| Default: `False`. """ rav, decv, fits_data = read_psf_fits(fits_file, centre_size=centre_size) fig, ax = plt.subplots() if normalise: fits_data = fits_data / np.amax(fits_data) if centre: xcentre = rav.shape[0] // 2 ycentre = rav.shape[1] // 2 rav = (rav - rav[xcentre][ycentre]) * 3600 decv = (decv - decv[xcentre][ycentre]) * 3600 circle1 = plt.Circle((0, 0), 30, color='r', fill=False) ax.add_patch(circle1) if fft_abs: # Take the fft of the PSF and plot the amplitude fits_fft = np.fft.fftshift(np.fft.fft2(fits_data)) im = plt.imshow(np.abs(fits_fft), cmap='plasma', norm=colors.LogNorm()) elif fft_angle: # Take the fft of the PSF and plot the phase fits_fft = np.fft.fftshift(np.fft.fft2(fits_data)) im = plt.imshow(np.angle(fits_fft), cmap='htp', vmin=-np.pi, vmax=np.pi) else: # Create plot im = plt.pcolormesh(rav, decv, fits_data, cmap='plasma', vmin=vmin, norm=colors.LogNorm()) #image_data = fits.getdata(fits_file, ext=0) #plt.imshow(image_data, cmap='plasma', extent=[np.amin(rav), np.amax(rav), # np.amin(decv), np.amax(decv)]) #plt.colorbar()#, label="Normalised array factor power") plt.xlabel(r"Right Acension ($^{\circ}$)") plt.ylabel(r"Declination ($^{\circ}$)") plt.colorbar(im) plt.savefig(output_name, dpi=500)
#plt.clf() # Replot with squared power #im = plt.pcolormesh(rav, decv, fits_data**2, cmap='plasma', # vmin=vmin, norm=colors.LogNorm()) #plt.colorbar(im) #plt.savefig("squared_{}".format(output_name), dpi=500)
[docs]def plot_psf_comparison(imaging_psf, pabeam_psf, c_pabeam_psf): """Compare the PSFs FWHM along RA and declination from imaging, pabeam.py and vcsbeam's mwa_tied_array_beam_psf. Outputs a plot called comparing_psf_cuts.png. Parameters ---------- imaging_psf : `str` The PSF fits file gained from imaging (WSCLEAN). pabeam_psf : `str` The PSF data file output from pabeam.py. c_pabeam_psf : `str` The PSF data file output from vcsbeam's mwa_tied_array_beam_psf. """ # read image psf rai, deci, ipower = read_psf_fits(imaging_psf) rai_middle, deci_middle = rai.shape rai_middle = rai_middle // 2 deci_middle = deci_middle // 2 # read pabeam psf rap, decp, ppower = read_pabeam_ra_dec(pabeam_psf) rap_middle, decp_middle = rap.shape rap_middle = rap_middle // 2 decp_middle = decp_middle // 2 # read Sammy's C pabeam psf rac, decc, cpower = read_vcsbeam_psf(c_pabeam_psf) rac = rac * 15 rac_middle, decc_middle = rac.shape rac_middle = rac_middle // 2 decc_middle = decc_middle // 2 # normalise ppower = ppower / np.amax(ppower) cpower = cpower / np.amax(cpower) ipower = ipower / np.amax(ipower) size = 3 fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(2*size,size)) pixel_radius = 30 # remove offsets rap_diff = rap[rap_middle][rap_middle] - rai[rai_middle][rai_middle] rac_diff = rac[rac_middle][rac_middle] - rai[rai_middle][rai_middle] # plot RA cuts ----------------------------------------------------------------- image_ra = list(rai[rai_middle][rai_middle-pixel_radius:rai_middle+pixel_radius]) image_ra.reverse() spline = UnivariateSpline(image_ra, ipower[rai_middle][rai_middle-pixel_radius:rai_middle+pixel_radius] - 0.5, s=0) enter_beam, exit_beam = spline.roots() fwhm = exit_beam - enter_beam ax1.plot(rai[rai_middle][rai_middle-pixel_radius:rai_middle+pixel_radius], ipower[rai_middle][rai_middle-pixel_radius:rai_middle+pixel_radius], label=r"Image FWHM: {:.2f}$^\prime$".format(fwhm*60)) spline = UnivariateSpline(rap[rap_middle][rap_middle-pixel_radius:rap_middle+pixel_radius], ppower[rap_middle][rap_middle-pixel_radius:rap_middle+pixel_radius] - 0.5, s=0) enter_beam, exit_beam = spline.roots() fwhm = exit_beam - enter_beam ax1.plot(rap[rap_middle][rap_middle-pixel_radius:rap_middle+pixel_radius] - rap_diff, ppower[rap_middle][rap_middle-pixel_radius:rap_middle+pixel_radius], label=r"Phased Array FWHM: {:.2f}$^\prime$".format(fwhm*60)) psfc_slice = [] rac_slice = [] for i in range(rac_middle-pixel_radius, rac_middle+pixel_radius): psfc_slice.append(cpower[i][decc_middle]) rac_slice.append(rac[i][rac_middle]) psfc_slice = np.array(psfc_slice) rac_slice = np.array(rac_slice) #print(rac_slice) spline = UnivariateSpline(rac_slice, psfc_slice - 0.5, s=0) enter_beam, exit_beam = spline.roots() fwhm = exit_beam - enter_beam ax1.plot(rac_slice - rac_diff, psfc_slice, label=r"C Phased Array FWHM: {:.2f}$^\prime$".format(fwhm*60)) ax1.set_xlabel(r"Right Acension ($^{\circ}$)") ax1.legend(loc="upper right", fontsize=6) # plot dec cuts ----------------------------------------------------------------- psfi_slice = [] deci_slice = [] for i in range(deci_middle-pixel_radius, deci_middle+pixel_radius): psfi_slice.append(ipower[i][deci_middle]) deci_slice.append(deci[i][deci_middle]) psfi_slice = np.array(psfi_slice) deci_slice = np.array(deci_slice) spline = UnivariateSpline(deci_slice, psfi_slice - 0.5, s=0) enter_beam, exit_beam = spline.roots() fwhm = exit_beam - enter_beam deci_diff = deci_slice[pixel_radius] - decc[decc_middle][decc_middle] ax2.plot(deci_slice - deci_diff, psfi_slice, label=r"Image FWHM: {:.2f}$^\prime$".format(fwhm*60)) psfp_slice = [] decp_slice = [] for i in range(decp_middle-pixel_radius, decp_middle+pixel_radius): psfp_slice.append(ppower[i][decp_middle]) decp_slice.append(decp[i][decp_middle]) psfp_slice = np.array(psfp_slice) decp_slice = np.array(decp_slice) spline = UnivariateSpline(decp_slice, psfp_slice - 0.5, s=0) enter_beam, exit_beam = spline.roots() fwhm = exit_beam - enter_beam ax2.plot(decp_slice, psfp_slice, label=r"Phased Array FWHM: {:.2f}$^\prime$".format(fwhm*60)) spline = UnivariateSpline(decc[decc_middle][decc_middle-pixel_radius:decc_middle+pixel_radius], cpower[decc_middle][decc_middle-pixel_radius:decc_middle+pixel_radius] - 0.5, s=0) enter_beam, exit_beam = spline.roots() fwhm = exit_beam - enter_beam ax2.plot(decc[decc_middle][decc_middle-pixel_radius:decc_middle+pixel_radius], cpower[decc_middle][decc_middle-pixel_radius:decc_middle+pixel_radius], label=r"C Phased Array FWHM: {:.2f}$^\prime$".format(fwhm*60)) ax2.legend(loc="upper right", fontsize=6) ax2.set_xlabel(r"Declination ($^{\circ}$)") plt.savefig("comparing_psf_cuts.png", dpi=500, bbox_inches="tight")