Source code for vcstools.beam_sim

"""
The functions required to simulate the tied-array beam response of the MWA. 
All equations can be found in https://ui.adsabs.harvard.edu/abs/2018IAUS..337..378M/abstract
"""
import numpy as np
import sys
import os
from itertools import chain
import datetime

from astropy.constants import c
from astropy.io import fits

#from mwapy import ephem_utils,metadata
from mwa_pb.primarybeammap_tant import get_Haslam, map_sky
from vcstools.config import load_config_file
from vcstools.metadb_utils import get_common_obs_metadata, get_obs_array_phase, calc_ta_fwhm, ensure_metafits
from vcstools.job_submit import submit_slurm
from vcstools.general_utils import mdir

import logging
logger = logging.getLogger(__name__)

[docs]def getTileLocations(metafits, flags=[]): """Function grab the MWA tile locations for a given observation from the metafits file. Parameters ---------- metafits : `str` The metafits file location. flags : `list`, optional RTS tile flags (i.e. the first entry in the metafits correspond to "tile 0", irrespective of what the antenna name is). |br| Default: []. Returns ------- list : `list` A list of lists containing the following: list[0] = a list of tile positions East of the array centre list[1] = a list of tile positions North of the array centre list[2] = a list of tile heights about sea-level list[3] = a list of tile cable delays """ f = fits.open(metafits) east = f[1].data['East'][::2] north = f[1].data['North'][::2] height = f[1].data['Height'][::2] # height above sea-level cable_delays_raw = f[1].data['Length'] # Format Cable delays to floats cable_delays = [] for cab in cable_delays_raw: cable_delays.append(float(cab[3:])) cable_delays = np.array(cable_delays) # MWA array centre height above sea-level mwacentre_h = 377.827 height = height - mwacentre_h # flag the tiles from the x,y,z positions east = np.delete(east,flags) north = np.delete(north,flags) height = np.delete(height,flags) cable_delays = np.delete(cable_delays,flags) return east, north, height, cable_delays
[docs]def get_obstime_duration(metafits): """Funciton to grab the recorded start-time and duration of the observation Parameters ---------- metafits : `str` The metafits file location. Returns ------- obs_begin : `str` Observation starting time in UTC. obs_duration : `int` Observation duration in seconds. """ # metafits file will already have been downloaded f = fits.open(metafits) return [f[0].header['DATE-OBS'], f[0].header['EXPOSURE']]
[docs]def calc_pixel_area(za, az_res, za_res): """Calculate the area of a pixel on the sky from their height, width and zenith angle Parameters ---------- za : `float` The zenith angle of the pixel in radians. az_res : `float` The azuimuth resolution of the pixel in degrees. za_res : `float` The zenith resolution of the pixel in degrees. Returns ------- area : `float` Area of the pixel in square radians. """ return np.radians(az_res) * np.radians(za_res) * np.sin(za)
[docs]def calcWaveNumbers(freq, p, t): """Function to calculate the 3D wavenumbers for a given wavelength and az/za grid. This is the part of equation 7 within the square brackets not includeing x_n, y_n and z_n Parameters ---------- freq : `float` Central frequency for the observation in Hz. p : `float` azimuth/phi (either a scalar or an array) this is assuming that theta,phi are in the convention from Sutinjo et al. 2015. t : `float` zenith angle/theta (either a scalar or an array). Returns ------- kx, ky, kz : `list` The 3D wavenumbers. """ C = 2 * np.pi * freq / c.value kx = C * np.multiply(np.sin(t), np.cos(p)) ky = C * np.multiply(np.sin(t), np.sin(p)) kz = C * np.cos(t) return [kx,ky,kz]
[docs]def calcSkyPhase(xpos, ypos, zpos, kx, ky, kz, coplanar=False): """Completes the calculation of equation 7 to get the phase of the tiles for each position on the sky Parameters ---------- xpos : `list` (N) A list of tile positions East of the array centre ypos : `list` (N) A list of tile positions North of the array centre zpos : `list` (N) A list of tile heights about sea-level kx : `list` (az/za) The x 3D wavenumbers for a given wavelength and az/za grid ky : `list` (az/za) The y 3D wavenumbers for a given wavelength and az/za grid kz : `list` (az/za) The z 3D wavenumbers for a given wavelength and az/za grid Returns ------- ph_tile : `list` (N, az/za) A list of the phases for each tile """ #ph_tile = [] #for x, y, z in zip(xpos, ypos, zpos): #ph = kx * x + ky * y + kz * z logger.debug("np.multiply(np.tile(kx, (len(xpos), 1))).shape : {}".format(np.tile(kx, (len(xpos), 1)).shape)) if coplanar: ph_tile = list(chain(np.add(np.multiply(kx, x), np.multiply(ky, y)) for x, y in zip(xpos, ypos))) else: ph_tile = list(chain(np.add(np.add(np.multiply(kx, x), np.multiply(ky, y)), np.multiply(kz, z)) for x, y, z in zip(xpos, ypos, zpos))) logger.debug("ph_tile.shape[0] : {}".format(ph_tile[0].shape)) return ph_tile
[docs]def calc_geometric_delay_distance(p, t): """ Equation 1 of Ord 2019. Changed cos(el) to sin(za) """ gx = np.multiply(np.sin(t), np.sin(p)) gy = np.multiply(np.sin(t), np.cos(p)) #gx = np.multiply(np.sin(t), np.cos(p)) #gy = np.multiply(np.sin(t), np.sin(p)) gz = np.cos(t) return gx, gy, gz
[docs]def cal_phase_ord(xpos, ypos, zpos, delays, gx, gy, gz, freq, coplanar=False, no_delays=False): """ Equation 2 and 3 of Ord 2019. Parameters ---------- xpos[N]: A list of tile positions East of the array centre ypos[N]: A list of tile positions North of the array centre zpos[N]: A list of tile heights about sea-level gx[az/za]: The x 3D wavenumbers for a given wavelength and az/za grid gy[az/za]: The y 3D wavenumbers for a given wavelength and az/za grid gz[az/za]: The z 3D wavenumbers for a given wavelength and az/za grid Returns ------- ph_tile[N][az/za]: A list of the phases for each tile """ if no_delays: # Set cable delays to zeros so that they're not included delays = np.zeros_like(delays) #ph_tile = [] #for x, y, z in zip(xpos, ypos, zpos): #ph = kx * x + ky * y + kz * z # Equation 1 and 2 (TODO there is a minus in the ord paper so check if this is required) # /delta t * c = gx * x + gy * y + gz * z + L if coplanar: wl = list(chain(np.subtract(np.add(np.multiply(gx, x), np.multiply(gy, y)), l) for x, y, l in zip(xpos, ypos, delays))) else: wl = list(chain(np.subtract(np.add(np.add(np.multiply(gx, x), np.multiply(gy, y)), np.multiply(gz, z)), l) for x, y, z, l in zip(xpos, ypos, zpos, delays))) # Equation 3 ph_tile = 2 * np.pi * np.array(wl) * freq / c.value logger.debug("ph_tile.shape[0] : {}".format(ph_tile[0].shape)) return ph_tile
[docs]def calcArrayFactor(ph_tiles, ph_targets): """ Calculates array factor pointed at some target zenith angle (za) and azimuth (az) (equation 11) Parameters ---------- ph_tiles[N][az/za]: List of the sky phases for the tiles ph_targets[N]: List of the sky phases for the target Returns ------- array_factor[az/za]: The array factor for each za and az array_factor_power[az/za]: The array factor power for each za and az """ #array_factor = np.zeros(za.shape, dtype=np.complex_) #for i, _ in enumerate(xpos): #array_factor += np.cos(ph - ph_target) + 1.j * np.sin(ph - ph_target) #array_factor_tiles = list(chain(np.cos(ph_tile - ph_target) + 1.j * np.sin(ph_tile - ph_target) for ph_tile, ph_target in zip(ph_tiles, ph_targets))) array_factor_tiles = list(chain( np.multiply( np.cos(ph_tile) + 1.j * np.sin(ph_tile), np.cos(ph_target) - 1.j * np.sin(ph_target) ) for ph_tile, ph_target in zip(ph_tiles, ph_targets))) array_factor = np.sum(array_factor_tiles, axis=0) # normalise to unity at pointing position array_factor = np.divide(array_factor, len(ph_tiles)) array_factor_power = np.abs(array_factor)**2 return array_factor, array_factor_power
def partial_convolve_sky_map(az_grid, za_grid, pixel_area, phased_array_pattern, freq, time): # Get Haslam and interpolate onto grid # taken from https://github.com/MWATelescope/mwa_pb/blob/master/mwa_pb/primarybeammap_tant.py # then edited to include pixel size logger.debug("freq: {}".format(freq)) my_map = get_Haslam(freq) #mask = numpy.isnan(za_grid) #za_grid[numpy.isnan(za_grid)] = 90.0 # Replace nans as they break the interpolation #Supress print statements of the primary beam model functions sys.stdout = open(os.devnull, 'w') sky_grid = map_sky(my_map['skymap'], my_map['RA'], my_map['dec'], time, az_grid, za_grid) logger.debug("np.max(sky_grid): {}".format(np.max(sky_grid.flatten()))) logger.debug("sky_grid.shape: {}".format(sky_grid.flatten().shape)) logger.debug("phased_array_pattern.shape: {}".format(phased_array_pattern.shape)) sys.stdout = sys.__stdout__ logger.debug("np.average(sky_grid.flatten()): {}".format(np.average(sky_grid.flatten()))) sum_B_T = np.sum(phased_array_pattern * sky_grid.flatten() * pixel_area) sum_B = np.sum(phased_array_pattern * pixel_area) return sum_B_T, sum_B
[docs]def read_sefd_file(sefd_file, all_data=False): """ Read in the output sefd file from a pabeam.py simulation. Parameters ---------- sefd_file : `str` The location of the sefd file to be read in. all_data : `boolean` If True will return the freq, sefd, t_sys, t_ant, gain, effective_area. If False will only return the sefd. Default False """ input_array = np.loadtxt(sefd_file, dtype=float) if all_data: freq = input_array[:,0] time = input_array[:,1] sefd = input_array[:,2] t_sys = input_array[:,3] t_ant = input_array[:,4] gain = input_array[:,5] effective_area = input_array[:,6] beam_solid_angle = input_array[:,7] return freq, time, sefd, t_sys, t_ant, gain, effective_area, beam_solid_angle else: time = input_array[:,1] sefd_freq_time = np.array(input_array[:,2]) # Work out the number of time steps ntime = list(time).count(list(time)[0]) # Reshape into [freq][time] array sefd_freq_time = sefd_freq_time.reshape((sefd_freq_time.shape[0]//ntime, ntime)) sefd_time = np.average(sefd_freq_time, axis=0) # Calc mean and std sefd_mean = np.mean(sefd_time) u_sefd_mean = np.std(sefd_time) / np.sqrt(len(sefd_time)) return sefd_freq_time, sefd_mean, u_sefd_mean
[docs]def launch_pabeam_sim(obsid, pointing, begin, duration, source_name="noname", metafits_file=None, flagged_tiles=None, delays=None, efficiency=1, vcstools_version='master', args=None, common_metadata=None, output_dir=None): """Submit a job to run the pabeam code to estimate the system equivelent flux density and a dependent job to resume the submit_to_databse.py code if `args` is given. Parameters ---------- obsid : `int` The MWA observation ID. pointing : `str` The pointing of the simulation in the format HH:MM:SS.SS_DD:MM:SS.SS. begin : `int` The begining of the simulation in GPS time. duration : `int` The duration of the simulation in seconds (used to calculate the end of the simulation). source_name : `str`, optional The name of the source to be used to label output files. |br| Default: "noname". metafits_file : `str`, optional The location of the metafits file. If none given will assume the default location. flagged_tiles : `list`, optional A list of the flagged tiles. If none given will assume no tiles were flagged. efficiency : `float`, optional Frequency and pointing dependent array efficiency. |br| Default: 1. vcstools_version : `str`, optional VCSTools version to load in the job. args : `dict`, optional The argument parse dictionary from submit_to_database.py. If supplied will launch a dependedn job with submit_to_databse.py to complete the script. common_metadata : `list`, optional The list of common metadata generated from :py:meth:`vcstools.metadb_utils.get_common_obs_metadata`. output_dir : `str` The output directory of the simulation results. By default will put it in the VCS directory under <obsid>/sefd_simulations. Examples -------- A simple example: >>>launch_pabeam_sim(1206977296, "12:49:12_+27:12:00", 1206977300, 600, source_name="SEFD_test", output_dir=".") """ # Load computer dependant config file comp_config = load_config_file() # Ensure metafits file is there data_dir = "{}{}".format(comp_config['base_data_dir'], obsid) ensure_metafits(data_dir, obsid, "{0}_metafits_ppds.fits".format(obsid)) # Perform metadata calls if common_metadata is None: common_metadata = get_common_obs_metadata(obsid) # Get frequencies centre_freq = common_metadata[5] * 10e5 low_freq = common_metadata[6][0] * 1.28 * 10e5 high_freq = common_metadata[6][-1] * 1.28 * 10e5 sim_freqs = [str(low_freq), str(centre_freq), str(high_freq)] # Calculate required pixel res and cores/mem array_phase = get_obs_array_phase(obsid) fwhm = calc_ta_fwhm(high_freq / 10e5, array_phase=array_phase) #degrees phi_res = theta_res = fwhm / 3 if phi_res < 0.015: # Going any smaller causes memory errors phi_res = theta_res = 0.015 npixels = 360. // phi_res + 90. // theta_res cores_required = npixels * len(sim_freqs) // 600 nodes_required = cores_required // 24 + 1 # Make directories batch_dir = "{}/batch".format(data_dir) if output_dir is None: sefd_dir = "{}/sefd_simulations".format(data_dir) else: sefd_dir = output_dir if not os.path.exists(batch_dir): mdir(batch_dir, "Batch", gid=comp_config['gid']) if not os.path.exists(sefd_dir): mdir(sefd_dir, "SEFD", gid=comp_config['gid']) # Parse defaults if metafits_file is None: metafits_file = "{0}{1}/{1}_metafits_ppds.fits".format(comp_config['base_data_dir'], obsid) # Get delays if none given if delays is None: delays = get_common_obs_metadata(obsid)[4][0] print(delays) print(' '.join(np.array(delays, dtype=str))) # Set up pabeam command command = 'srun --export=all -u -n {} pabeam.py'.format(int(nodes_required*24)) command += ' -o {}'.format(obsid) command += ' -b {}'.format(begin) command += ' -d {}'.format(int(duration)) command += ' -s {}'.format(int(duration//4-1)) # force 4 time steps to get reasonable std command += ' -e {}'.format(efficiency) command += ' --metafits {}'.format(metafits_file) command += ' -p {}'.format(pointing) command += ' --grid_res {:.3f} {:.3f}'.format(theta_res, phi_res) command += ' --delays {}'.format(' '.join(np.array(delays, dtype=str))) command += ' --out_dir {}'.format(sefd_dir) command += ' --out_name {}'.format(source_name) command += ' --freq {}'.format(" ".join(sim_freqs)) if flagged_tiles is not None: logger.debug("flagged_tiles: {}".format(flagged_tiles)) command += ' --flagged_tiles {}'.format(' '.join(flagged_tiles)) # Set up and launch job batch_file_name = 'pabeam_{}_{}_{}'.format(obsid, source_name, pointing) job_id = submit_slurm(batch_file_name, [command], batch_dir=batch_dir, slurm_kwargs={"time": datetime.timedelta(seconds=10*60*60), "nodes":int(nodes_required)}, module_list=['hyperbeam-python'], queue='cpuq', cpu_threads=24, mem=12288, vcstools_version=vcstools_version) if args: # Set up dependant submit_to_database.py job submit_args = vars(args) # Add sefd_file argument submit_args['sefd_file'] = "{}/{}*stats".format(sefd_dir, source_name) command_str = "submit_to_database.py" for key, val in submit_args.items(): if val: if val == True: command_str += " --{}".format(key) else: command_str += " --{} {}".format(key, val) batch_file_name = 'submit_to_database_{}_{}_{}'.format(obsid, source_name, pointing) job_id_dependant = submit_slurm(batch_file_name, [command_str], batch_dir=batch_dir, slurm_kwargs={"time": datetime.timedelta(seconds=1*60*60)}, queue='cpuq', vcstools_version=vcstools_version, depend=[job_id]) return job_id, job_id_dependant