"""
Functions used to calculate or simulate the MWA tile beam.
"""
import os
import sys
import numpy as np
from astropy.table import Table
import psrqpy
#MWA scripts
from mwa_pb import primary_beam
from mwa_pb import config
from vcstools import data_load
from vcstools.pointing_utils import sex2deg
from vcstools.metadb_utils import mwa_alt_az_za, get_common_obs_metadata,\
getmeta, obs_max_min
try:
import mwa_hyperbeam
except ImportError:
logger.warning('Could not import mwa_hyperbeam; using pure Python implementation')
import logging
logger = logging.getLogger(__name__)
[docs]def pixel_area(ra_min, ra_max, dec_min, dec_max):
"""Calculate the area of a pixel on the sky from the pixel borders
Parameters
----------
ra_min : `float`
The Right Acension minimum in degrees.
ra_max : `float`
The Right Acension maximum in degrees.
dec_min : `float`
The Declination minimum in degrees.
dec_max : `float`
The Declination maximum in degrees.
Returns
-------
area : `float`
Area of the pixel in square degrees.
"""
return (ra_max - ra_min) * np.degrees(np.sin(np.radians(dec_max)) - np.sin(np.radians(dec_min)))
[docs]def field_of_view(obsid,
common_metadata=None, dur=None):
"""Will find the field-of-view of the observation (including the drift) in square degrees.
Parameters
----------
obsid : `int`
The MWA observation ID.
common_metadata : `list`, optional
The list of common metadata generated from :py:meth:`vcstools.metadb_utils.get_common_obs_metadata`
dur : `int`, optional
Duration of observation to calculate for in seconds.
By default will use the entire observation duration.
Returns
-------
area : `float`
The field-of-view of the observation in square degrees.
"""
if common_metadata is None:
common_metadata = get_common_obs_metadata(obsid)
if dur is None:
dt = 296
else:
dt = 100
# Change the dur to the inpur dur
obsid, ra, dec, _, delays, centrefreq, channels = common_metadata
common_metadata = [obsid, ra, dec, dur, delays, centrefreq, channels]
# Make a pixel for each degree on the sky
names_ra_dec = []
for ra in range(0,360):
for dec in range(-90,90):
names_ra_dec.append(["sky_pos", ra+0.5, dec+0.5])
# Get tile beam power for all pixels
sky_powers = get_beam_power_over_time(names_ra_dec,
common_metadata=common_metadata,
degrees=True, dt=dt)
# Find the maximum power over all time
max_sky_powers = []
for pixel_power in sky_powers:
temp_power = 0.
for time_power in pixel_power:
if time_power[0] > temp_power:
temp_power = time_power[0]
max_sky_powers.append(temp_power)
# Find all pixels greater than the half power point and sum their area
half_power_point = max(max_sky_powers) / 2
i = 0
area_sum = 0
for ra in range(0,360):
for dec in range(-90,90):
if max_sky_powers[i] > half_power_point:
area_sum = area_sum + pixel_area(ra, ra+1, dec, dec+1)
i = i + 1
return area_sum
[docs]def from_power_to_gain(power, cfreq, n, coh=True):
"""Estimate the gain from the tile beam power.
Parameters
----------
power : `float`
The tile beam power at the source position.
cfreq : `float`
The centre frequency of the observation in Hz
n : `int`
The number of non-flagged MWA tiles.
coh : `boolean`, optional
`True` if the observation is coherent (tied-array beam) or `False` if it's incoherent. |br| Default: `True`.
Returns
-------
gain : `float`
Gain in K/Jy.
"""
from astropy.constants import c,k_B
from numpy import sqrt
obswl = c.value/cfreq
#for coherent
if coh:
coeff = obswl**2*16*n/(4*np.pi*k_B.value)
else:
coeff = obswl**2*16*sqrt(n)/(4*np.pi*k_B.value)
logger.debug("Wavelength {} m".format(obswl))
logger.debug("Gain coefficient: {}".format(coeff))
SI_to_Jy = 1e-26
return (power*coeff)*SI_to_Jy
[docs]def get_Trec(obsfreq, trcvr_file=None):
"""Get receiver temperature from the temperature receiver file.
Parameters
----------
obsfreq : `float`
The observing frequency in MHz.
trcvr_file : `str`, optional
The Trec file location to read in. If none is supplied, will use the Trec file in the data directory.
Returns
-------
Trec : `float`
The receiver temperature in K.
"""
if trcvr_file is None:
# Use trcvr file from vcstools data location
trcvr_file = data_load.TRCVR_FILE
tab = Table.read(data_load.TRCVR_FILE, format="csv")
Trec = 0.0
for r in range(len(tab)-1):
if tab[r][0]==obsfreq:
Trec = tab[r][1]
elif tab[r][0] < obsfreq < tab[r+1][0]:
Trec = ((tab[r][1] + tab[r+1][1])/2)
if Trec == 0.0:
logger.debug("ERROR getting Trec")
return Trec
[docs]def beam_enter_exit(powers, duration, dt=296, min_z_power=0.3):
"""Calculates when the source enters and exits the beam
Parameters
----------
powers : `list`, (ntimes, nfreqs)
Powers for the duration every dt and freq.
duration : `int`
Duration of the observation according to the metadata in seconds.
dt : `int`, optional
The time interval of how often powers are calculated. Default: 296.
min_z_power : `float`, optional
Zenith normalised power cut off. |br| Default: 0.3.
Returns
-------
dect_beg_norm, dect_end_norm : `float`
Fraction of the observation when the source enters and exits the beam respectively.
"""
from scipy.interpolate import UnivariateSpline
time_steps = np.array(range(0, duration, dt), dtype=float)
# For each time step record the min power so even if the source is in
# one freq channel it's recorded
powers_freq_min = []
for p in powers:
powers_freq_min.append(float(min(p) - min_z_power))
if min(powers_freq_min) > 0.:
enter_beam = 0.
exit_beam = 1.
else:
powers_freq_min = np.array(powers_freq_min)
logger.debug("time_steps: {}".format(time_steps))
logger.debug("powers: {}".format(powers_freq_min))
try:
spline = UnivariateSpline(time_steps, powers_freq_min , s=0.)
except:
return None, None
if len(spline.roots()) == 2:
enter_beam, exit_beam = spline.roots()
enter_beam /= duration
exit_beam /= duration
elif len(spline.roots()) == 1:
if powers_freq_min[0] > powers_freq_min[-1]:
#power declines so starts in beam then exits
enter_beam = 0.
exit_beam = spline.roots()[0]/duration
else:
enter_beam = spline.roots()[0]/duration
exit_beam = 1.
else:
enter_beam = 0.
exit_beam = 1.
return enter_beam, exit_beam
[docs]def get_beam_power_over_time(names_ra_dec,
common_metadata=None,
dt=296, centeronly=True, verbose=False,
option='analytic', degrees=False,
start_time=0):
"""Calculates the zenith normalised power for each source over time.
Parameters
----------
names_ra_dec : `list`
An array in the format [[source_name, RAJ, DecJ]]
common_metadata : `list`, optional
The list of common metadata generated from :py:meth:`vcstools.metadb_utils.get_common_obs_metadata`
dt : `int`, optional
The time interval of how often powers are calculated. |br| Default: 296.
centeronly : `boolean`, optional
Only calculates for the centre frequency. |br| Default: `True`.
verbose : `boolean`, optional
If `True` will not supress the output from mwa_pb. |br| Default: `False`.
option : `str`, optional
The primary beam model to use out of [analytic, advanced, full_EE, hyperbeam]. |br| Default: analytic.
degrees : `boolean`, optional
If true assumes RAJ and DecJ are in degrees. |br| Default: `False`.
start_time : `int`, optional
The time in seconds from the begining of the observation to start calculating at. |br| Default: 0.
Returns
-------
Powers : `numpy.array`, (len(names_ra_dec), ntimes, nfreqs)
The zenith normalised power for each source over time.
"""
if common_metadata is None:
common_metadata = get_common_obs_metadata(obsid)
obsid, _, _, time, delays, centrefreq, channels = common_metadata
names_ra_dec = np.array(names_ra_dec)
amps = [1.0] * 16
logger.debug("Calculating beam power for OBS ID: {0}".format(obsid))
if option == 'hyperbeam':
if "mwa_hyperbeam" not in sys.modules:
logger.error("mwa_hyperbeam not installed so can not use hyperbeam to create a beam model. Exiting")
sys.exit(1)
beam = mwa_hyperbeam.FEEBeam(config.h5file)
# Work out time steps to calculate over
starttimes = np.arange(start_time, time + start_time, dt)
stoptimes = starttimes + dt
stoptimes[stoptimes>time] = time
ntimes = len(starttimes)
midtimes = float(obsid) + 0.5 * (starttimes + stoptimes)
# Work out frequency steps
if centeronly:
if centrefreq > 1e6:
logger.warning("centrefreq is greater than 1e6, assuming input with units of Hz.")
frequencies=np.array([centrefreq])
else:
frequencies=np.array([centrefreq])*1e6
nfreqs = 1
else:
# in Hz
frequencies = np.array(channels) * 1.28e6
nfreqs = len(channels)
# Set up np power array
PowersX = np.zeros((len(names_ra_dec),
ntimes,
nfreqs))
PowersY = np.zeros((len(names_ra_dec),
ntimes,
nfreqs))
# Convert RA and Dec to desired units
if degrees:
RAs = np.array(names_ra_dec[:,1], dtype=float)
Decs = np.array(names_ra_dec[:,2], dtype=float)
else:
RAs, Decs = sex2deg(names_ra_dec[:,1], names_ra_dec[:,2])
# Then check if they're valid
if len(RAs) == 0:
sys.stderr.write('Must supply >=1 source positions\n')
return None
if not len(RAs) == len(Decs):
sys.stderr.write('Must supply equal numbers of RAs and Decs\n')
return None
if verbose is False:
#Supress print statements of the primary beam model functions
sys.stdout = open(os.devnull, 'w')
for itime in range(ntimes):
# this differ's from the previous ephem_utils method by 0.1 degrees
_, Azs, Zas = mwa_alt_az_za(midtimes[itime], ra=RAs, dec=Decs, degrees=True)
# go from altitude to zenith angle
theta = np.radians(Zas)
phi = np.radians(Azs)
for ifreq in range(nfreqs):
#Decide on beam model
if option == 'analytic':
rX, rY = primary_beam.MWA_Tile_analytic(theta, phi,
freq=frequencies[ifreq], delays=delays,
zenithnorm=True, power=True)
elif option == 'advanced':
rX, rY = primary_beam.MWA_Tile_advanced(theta, phi,
freq=frequencies[ifreq], delays=delays,
zenithnorm=True, power=True)
elif option == 'full_EE':
rX, rY = primary_beam.MWA_Tile_full_EE(theta, phi,
freq=frequencies[ifreq], delays=delays,
zenithnorm=True, power=True)
elif option == 'hyperbeam':
jones = beam.calc_jones_array(phi, theta, int(frequencies[ifreq]), delays[0], amps, True)
jones = jones.reshape(1, len(phi), 2, 2)
vis = primary_beam.mwa_tile.makeUnpolInstrumentalResponse(jones, jones)
rX, rY = (vis[:, :, 0, 0].real, vis[:, :, 1, 1].real)
PowersX[:,itime,ifreq] = rX
PowersY[:,itime,ifreq] = rY
if verbose is False:
sys.stdout = sys.__stdout__
Powers = 0.5 * (PowersX + PowersY)
return Powers
[docs]def source_beam_coverage(obs_list, names_ra_dec,
common_metadata_list=None,
dt_input=300, beam='analytic', min_z_power=0.3):
"""For a list of MWA observations and sources will find if the
sources are in the beam and when they enter and exit.
Parameters
----------
obs_list : `list`
A list of MWA Observation IDs.
names_ra_dec : `list`
An array in the format [[source_name, RAJ, DecJ]]
common_metadata_list : `list` of `list`, optional
A list of lists where each list is the common metadata generated from
:py:meth:`vcstools.metadb_utils.get_common_obs_metadata` for each obs in the obs_list.
dt_input : `int`, optional
The time interval in seconds of how often powers are calculated. |br| Default: 300.
beam : `str`, optional
The primary beam model to use out of [analytic, advanced, full_EE, hyperbeam]. |br| Default: analytic.
min_z_power : `float`, optional
Zenith normalised power cut off. |br| Default: 0.3.
Returns
-------
beam_coverage : `dict`
A dictionary where the first key is the observation ID and the second is the pulsar names like so: |br|
beam_coverage[obsid][name] = [dect_beg_norm, dect_end_norm, np.amax(source_ob_power)]
dect_beg_norm : `float`
Fraction of the observation when the source enters the beam.
dect_end_norm : `float`
Fraction of the observation when the source enters and exits the beam respectively.
"""
beam_coverage = {}
for on, obsid in enumerate(obs_list):
beam_coverage[obsid] = {}
if common_metadata_list:
common_metadata = common_metadata_list[on]
else:
# No metadata supplied so make the metadata call
common_metadata = get_common_obs_metadata(obsid)
#common_metadata = obsid,ra_obs,dec_obs,time_obs,delays,centrefreq,channels
obs_dur = common_metadata[3]
if dt_input * 4 > obs_dur:
# If the observation time is very short then a smaller dt time is required
# to get enough points to fit a curve
dt = int(obs_dur / 4.)
else:
dt = dt_input
logger.debug("obsid: {0}, time_obs {1} s, dt {2} s".format(obsid, obs_dur, dt))
logger.debug("names_ra_dec: {}".format(names_ra_dec))
powers = get_beam_power_over_time(names_ra_dec,
common_metadata=common_metadata,
dt=dt, centeronly=True,
option=beam)
for source_ob_power, name in zip(powers, np.array(names_ra_dec)[:,0]):
if max(source_ob_power) > min_z_power:
logger.debug("Running beam_enter_exit on obsid: {}".format(obsid))
dect_beg_norm, dect_end_norm = beam_enter_exit(source_ob_power, obs_dur,
dt=dt, min_z_power=min_z_power)
beam_coverage[obsid][name] = [dect_beg_norm, dect_end_norm, np.amax(source_ob_power)]
return beam_coverage
[docs]def source_beam_coverage_and_times(obsid, pulsar,
p_ra=None, p_dec=None,
obs_beg=None, obs_end=None,
files_beg=None, files_end=None,
min_z_power=0.3, dt_input=100,
common_metadata=None, query=None,
beam='analytic'):
"""Finds the normalised time that a pulsar is in the beam for a given obsid.
If pulsar is not in beam, returns None, None
Parameters
----------
obsid : `int`
The observation ID
pulsar : `str`
The pulsar's J name
p_ra, p_dec : `str`, optional
The target's right ascension and declination in sexidecimals.
If not supplied will use the values from the ANTF.
obs_beg, obs_end : `int`, optional
Beginning and end GPS time of the observation.
If not supplied will use :py:meth:`vcstools.metadb_utils.obs_max_min` to find it.
files_beg, files_end : `int`, optional
Beginning and end GPS time of the (fits of VCS) files.
If not supplied will assume the full observation is available.
min_z_power : `float`, optional
Zenith normalised power cut off. |br| Default: 0.3.
common_metadata : `list`, optional
The list of common metadata generated from :py:meth:`vcstools.metadb_utils.get_common_obs_metadata`
query : psrqpy object, optional
A previous psrqpy query. Can be supplied to prevent performing a new query.
beam : `str`, optional
The primary beam model to use out of [analytic, advanced, full_EE]. |br| Default: analytic.
Returns
-------
enter_files : `float`
A float between 0 and 1 that describes the normalised time that the pulsar enters the beam
exit_files : `float`
A float between 0 and 1 that describes the normalised time that the pulsar exits the beam
"""
# Perform required metadata calls
if query is None:
query = psrqpy.QueryATNF(psrs=pulsar).pandas
if p_ra is None or p_dec is None:
# Get some basic pulsar and obs info info
query_id = list(query['PSRJ']).index(pulsar)
p_ra = query["RAJ"][query_id]
p_dec = query["DECJ"][query_id]
if not common_metadata:
common_metadata = get_common_obs_metadata(obsid)
if obs_beg is None or obs_end is None:
obs_beg, obs_end = obs_max_min(obsid)
obs_dur = obs_end - obs_beg + 1
if not files_beg:
files_beg = obs_beg
if not files_end:
files_end = obs_end
files_dur = files_end - files_beg + 1
beam_coverage = source_beam_coverage([obsid], [[pulsar, p_ra, p_dec]],
common_metadata_list=[common_metadata],
dt_input=dt_input, beam=beam, min_z_power=min_z_power)
if pulsar not in beam_coverage[obsid].keys():
# Not in beam exiting
return None, None, None, None, None, None, None, None, None
dect_beg_norm, dect_end_norm, _ = beam_coverage[obsid][pulsar]
# GPS times the source enters and exits beam
dect_beg = obs_beg + obs_dur * dect_beg_norm
dect_end = obs_beg + obs_dur * dect_end_norm
# Normalised time the source enters/exits the beam in the files (used for Presto commands)
files_beg_norm = (dect_beg - files_beg) / files_dur
files_end_norm = (dect_end - files_beg) / files_dur
if files_beg_norm > 1. or files_end_norm < 0.:
logger.debug("source {0} is not in the beam for the files on disk".format(pulsar))
files_beg_norm = None
files_end_norm = None
else:
if files_beg_norm < 0.:
files_beg_norm = 0.
if files_end_norm > 1.:
files_end_norm = 1.
return dect_beg, dect_end, dect_beg_norm, dect_end_norm, files_beg_norm, files_end_norm, obs_beg, obs_end, obs_dur
[docs]def find_sources_in_obs(obsid_list, names_ra_dec,
obs_for_source=False, dt_input=300, beam='analytic',
min_z_power=0.3, cal_check=False, all_volt=False,
degrees_check=False, metadata_list=None):
"""Either creates text files for each MWA obs ID of each source within it or a text
file for each source with each MWA obs is that the source is in.
Parameters
----------
obsid_list : `list`
List of MWA observation IDs.
names_ra_dec : `list`
An array in the format [[source_name, RAJ, DecJ]]
obs_for_source : `boolean`, optional
If `True` creates a text file for each source with each MWA observation that the source is in.
If `False` creates text files for each MWA obs ID of each source within it. |br| Default: `False`.
dt_input : `int`, optional
The time interval in seconds of how often powers are calculated. |br| Default: 300.
beam : `str`, optional
The primary beam model to use out of [analytic, advanced, full_EE]. |br| Default: analytic.
min_z_power : `float`, optional
Zenith normalised power cut off. |br| Default: 0.3.
cal_check : `boolean`, optional
Checks the MWA pulsar database if there is a calibration suitable for the observation ID.
all_volt : `boolean`, optional
Included observations with missing or incorrect voltage files. |br| Default: `False`.
degrees_check : `boolean`, optional
If true assumes RAJ and DecJ are in degrees. |br| Default: `False`.
metadata_list : `list`
List of the outputs of vcstools.metadb_utils.get_common_obs_metadata.
If not provided, will make the metadata calls to find the data. |br| Default: `None`.
Returns
-------
output_data : `dict`
The format of output_data is dependant on obs_for_source.
|br| If obs_for_source is `True` :
|br| output_data = {jname:[[obsid, duration, enter, exit, max_power],
|br| [obsid, duration, enter, exit, max_power]]}
|br| If obs_for_source is `False` :
|br| ouput_data = {obsid:[[jname, enter, exit, max_power],
|br| [jname, enter, exit, max_power]]}
obsid_meta : `list`
A list of the output of get_common_obs_metadata for each obsid
"""
import urllib.request
#prepares metadata calls and calculates power
powers = []
#powers[obsid][source][time][freq]
common_metadata_list = []
obsid_to_remove = []
# Loop over observations to check if there are VCS files
for i, obsid in enumerate(obsid_list):
# Perform the file meta data call to ensure data is available, just ask for the first 10
try:
files_meta_data = getmeta(service='data_files', params={'obs_id':obsid, 'nocache':1, 'maxfiles':10})
except urllib.error.HTTPError as err:
files_meta_data = None
if files_meta_data is None:
logger.warning("No file metadata data found for obsid {}. Skipping".format(obsid))
obsid_to_remove.append(obsid)
continue
# Check raw voltage files
raw_available = False
raw_deleted = False
for file_name in files_meta_data.keys():
if file_name.endswith('dat'):
deleted = files_meta_data[file_name]['deleted']
if deleted:
raw_deleted = True
else:
raw_available = True
# Check combined voltage tar files
comb_available = False
comb_deleted = False
for file_name in files_meta_data.keys():
if file_name.endswith('tar'):
deleted = files_meta_data[file_name]['deleted']
if deleted:
comb_deleted = True
else:
comb_available = True
if raw_available or comb_available or all_volt:
if metadata_list:
common_metadata, _ = metadata_list[i]
else:
# No metadata supplied so make the metadata call
common_metadata = get_common_obs_metadata(obsid)
common_metadata_list.append(common_metadata)
elif raw_deleted and comb_deleted:
logger.warning('Raw and combined voltage files deleted for {}'.format(obsid))
obsid_to_remove.append(obsid)
elif raw_deleted:
logger.warning('Raw voltage files deleted for {}'.format(obsid))
obsid_to_remove.append(obsid)
elif comb_deleted:
logger.warning('Combined voltage files deleted for {}'.format(obsid))
obsid_to_remove.append(obsid)
else:
logger.warning('No raw or combined voltage files for {}'.format(obsid))
obsid_to_remove.append(obsid)
for otr in obsid_to_remove:
obsid_list.remove(otr)
# Calculate the power for all sources and obsids and find when they enter and exit the beam
beam_coverage = source_beam_coverage(obsid_list, names_ra_dec,
common_metadata_list=common_metadata_list,
dt_input=dt_input, beam=beam, min_z_power=min_z_power)
#chooses whether to list the source in each obs or the obs for each source
output_data = {}
if obs_for_source:
for source_name in np.array(names_ra_dec)[:,0]:
source_data = []
for on, obsid in enumerate(obsid_list):
if source_name in beam_coverage[obsid].keys():
# Source was in the beam so include it
_, _, _, duration, _, centre_freq, channels = common_metadata_list[on]
enter_beam_norm, exit_beam_norm, max_power = beam_coverage[obsid][source_name]
bandwidth = 1.28 * (channels[-1] - channels[0] + 1) # MHz
source_data.append([obsid, duration,
enter_beam_norm, exit_beam_norm,
max_power, centre_freq, bandwidth])
# For each source make a dictionary key that contains a list of
# lists of the data for each obsid
output_data[source_name] = source_data
else:
#output a list of sorces for each obs
for on, obsid in enumerate(obsid_list):
_, _, _, duration, _, centre_freq, channels = common_metadata_list[on]
obsid_data = []
for source_name in np.array(names_ra_dec)[:,0]:
if source_name in beam_coverage[obsid].keys():
enter_beam_norm, exit_beam_norm, max_power = beam_coverage[obsid][source_name]
obsid_data.append([source_name, enter_beam_norm, exit_beam_norm, max_power])
# For each obsid make a dictionary key that contains a list of
# lists of the data for each source/pulsar
output_data[obsid] = obsid_data
return output_data, common_metadata_list