Source code for vcstools.gfit

from vcstools.prof_utils import ProfileLengthError, BadFitError
from vcstools.prof_utils import estimate_components_onpulse, error_in_x_pos, profile_region_from_pairs, filled_profile_region_between_pairs, normamlise_prof, get_off_pulse
import itertools
from scipy.stats import vonmises
from scipy.optimize import curve_fit
import matplotlib.pyplot as plt
import numpy as np
import logging
from scipy.interpolate import UnivariateSpline
from scipy.special import erf
import matplotlib
matplotlib.use('Agg')

# Error imports

logger = logging.getLogger(__name__)


# Main class
[docs]class gfit: """ This class is used to fit multiple Gaussians to a pulse profile. Parameters ---------- raw_profile : `list` The pulse profile to be fit. max_N : `int` The maximum number of gaussians to attempt to fit. |br| Default: 10. plot_name : `str` The name of the output plot. Can be set with gfit.plot_name. If unsupplied, will use a generic name. |br| Default: None. conponent_plot_name : `str` The name of the plot for the component plot that illustrates how profile components are chosen. If unsupplied, will not plot. Only applicable if on_pulse_range unsupplied. |br| Default: None. scattering_threshold : `float` The threshold for which any tau (scattering pulse width) value greater will be deemed scattered (in phase). |br| Default: 0.7. on_pulse_ranges : `list` A list of two-lists/tuples that describes the on pulse region in phase. e.g. [[0.1, 0.2], [0.6, 0.7]] If not supplied, will attempt to find on-pulse region |br| Default: None. Methods ------- auto_fit: Runs a gaussian fit evaluation using up to max_N gaussians. The quality of the fit is decided via a chi-square evaluation. The best of these is saved and used to determine widths and maxima location. Part of the preprocessing of the profile will also determine the on-pulse regions and subsequently the noise level and signal to noise ratio. plot_fit: Plots the best chosen gaussian fit to a file whose name is gfit.plot_name. This can only be run after the fit_dict dictionary has been filled (presumably by auto_gfit). Returns ------- fit_dict : `dict` contains the following keys: W10 : `float` The W10 width of the profile measured in phase. W10_e : `float` The uncertainty in the W10. W50 : `float` The W50 width of the profile measured in phase. W50_e : `float` The uncertainty in the W50. Weq : `float` The equivalent width of the profile measured in phase. Weq_e : `float` The uncertainty in the equivalent width. Wscat : `float` The scattering width of the profile measured in phaseprofile_region_from_pairs. The number of distinguishable profile components. on_pulse_estimates : `list` The output of prof_utils.estimate_components_onpulse(). Will be the rolled on_pulse_range input instead if supplied. fit_params : `list` A list of length 5 + 3*(N-1) there N is num_gauss. Each set of 3 parameters corresponds to the amp, centre and width of a guassian component. The first five are baseline, tau, amp, centre and width. cov_mat : np.matrix The covariance matrix output from curve_fit. profile : `list` The normalised and rolled input profile. fit : `list` The best fit made into a list form. sn : `float` The estimated signal to noise ratio, obtained from the profile. sn_e : `float` The uncertainty in sn. scattering : `float` The tau value of the profile fit. scattered : `boolean` Whether or not the final profile's tau value is greater than the sattering threshold. """ def __init__(self, raw_profile, max_N=10, plot_name=None, component_plot_name=None, scattering_threshold=0.7, on_pulse_ranges=None): # Initialise inputs self._raw_profile = raw_profile self._max_N = max_N self._plot_name = plot_name self._component_plot_name = component_plot_name self._scattering_threshold = scattering_threshold self._on_pulse_ranges = on_pulse_ranges # The return dictionary self._fit_dict = {} # Stuff to put in the dictionary self._fit_profile = None self._best_chi = None self._std_profile = None self._on_pulse_prof = None self._noise_std = None self._noise_mean = None self._n_off_pulse = None self._n_gaussians = None self._n_prof_components = None self._popt = None self._pcov = None self._W10 = None self._W50 = None self._Weq = None self._Wscat = None self._W10_e = None self._W50_e = None self._Weq_e = None self._Wscat_e = None self._minima = None self._maxima = None self._minima_e = None self._maxima_e = None # Setters and Getters @property def plot_name(self): return self._plot_name @plot_name.setter def plot_name(self, val): self._plot_name = val @property def component_plot_name(self): return self._component_plot_name @component_plot_name.setter def component_plot_name(self, val): self._component_plot_name = val @property def max_N(self): return self._max_N @max_N.setter def max_N(self, val): self._max_N = val @property def fit_dict(self): return self._fit_dict # Main function - intended for use by the user
[docs] def auto_fit(self): """ Fits multiple gaussian profiles and finds the best combination of N_Gaussians and alpha. Uses """ #if len(self._raw_profile) < 100: # raise ProfileLengthError("Profile must have length > 100") if len(self._raw_profile) < 64: raise ProfileLengthError("Profile must have length > 64") if self._on_pulse_ranges: # Use the on-pulse region to get noise estimates self._standardise_raw_profile(roll_phase=None) # Convert phases to bins on_pulse_ranges = [] for phase_range in self._on_pulse_ranges: lower = int(phase_range[0] * len(self._std_profile)) upper = int(phase_range[1] * len(self._std_profile)) on_pulse_ranges.append([lower, upper]) self._on_pulse_ranges = on_pulse_ranges # Get the off-pulse region and calculate noise off_pulse_range = get_off_pulse(self._on_pulse_ranges) off_pulse_region = profile_region_from_pairs(self._std_profile, off_pulse_range) self._noise_std = np.nanstd(off_pulse_region) self._noise_mean = np.nanmean(off_pulse_region) self._n_prof_components = len(self._on_pulse_ranges) # Roll the profile while preserving the known on-pulse region profile_bins = np.linspace( 0, len(self._std_profile)-1, len(self._std_profile)) roll_i = self._roll_to_ideal_phase(off_pulse_range) rolled_profile_bins = np.roll(profile_bins, -roll_i) on_pulse_ranges = [] for phase_range in self._on_pulse_ranges: lower = rolled_profile_bins[phase_range[0]] upper = rolled_profile_bins[phase_range[1]] on_pulse_ranges.append([int(lower), int(upper)]) self._on_pulse_ranges = on_pulse_ranges #quit() else: # We need to estimate what the on-pulse region is self._standardise_raw_profile() comp_est_dict = estimate_components_onpulse(self._std_profile) self._n_prof_components = len(comp_est_dict["overest_on_pulse"]) # Roll the profile such that an off-pulse region is on the phase edge self._roll_to_ideal_phase(comp_est_dict["overest_off_pulse"]) # This has ruined our component estimates so we need get them back comp_est_dict = estimate_components_onpulse( self._std_profile, plot_name=self._component_plot_name) self._on_pulse_ranges = comp_est_dict["overest_on_pulse"] self._n_prof_components = len(comp_est_dict["overest_on_pulse"]) self._noise_std = comp_est_dict["norm_noise_std"] self._noise_mean = comp_est_dict["norm_noise_mean"] # Create the on-pulse profile where the noise is set to the mean noise value self._on_pulse_prof = filled_profile_region_between_pairs( self._std_profile, self._on_pulse_ranges, fill_value=self._noise_mean) self._on_pulse_bool = [] for opp in self._on_pulse_prof: if opp == self._noise_mean: self._on_pulse_bool.append(False) else: self._on_pulse_bool.append(True) # Attempt to fit gaussians to the profile self._fit_gaussian() # Initial fit self._est_noise_sn() # Initial Estimate SN estimate from the fit # Force no scattering if the fit isn't scattered now no_scat = True if self._scattered and self._popt[1] <= self._scattering_threshold else False # Redo the fit with updated scattering and noise information self._fit_gaussian(force_no_scattering=no_scat) # Do all the fancy things with the fit self._find_widths() # Find widths + error self._find_minima_maxima_gauss() # Find turning points self._est_noise_sn() # Estimate SN # Update scattering information self._scattered = True if self._Wscat >= self._scattering_threshold else False # Dump to dictionary self._fit_dict = {} self._fit_dict["W10"] = self._W10 self._fit_dict["W10_e"] = self._W10_e self._fit_dict["W50"] = self._W50 self._fit_dict["W50_e"] = self._W50_e self._fit_dict["Wscat"] = self._Wscat self._fit_dict["Wscat_e"] = self._Wscat_e self._fit_dict["Weq"] = self._Weq self._fit_dict["Weq_e"] = self._Weq_e self._fit_dict["maxima"] = self._maxima self._fit_dict["maxima_e"] = self._maxima_e self._fit_dict["minima"] = self._minima self._fit_dict["minima_e"] = self._minima_e self._fit_dict["redchisq"] = self._best_chi self._fit_dict["n_components"] = self._n_gaussians self._fit_dict["fit_params"] = self._popt self._fit_dict["cov_mat"] = self._pcov self._fit_dict["profile"] = self._std_profile self._fit_dict["on_pulse_bool"] = self._on_pulse_bool self._fit_dict["fit"] = self._fit_profile self._fit_dict["sn"] = self._sn self._fit_dict["sn_e"] = self._sn_e self._fit_dict["noise_std"] = self._noise_std self._fit_dict["noise_mean"] = self._noise_mean self._fit_dict["scattering"] = self._popt[1] # tau self._fit_dict["scattered"] = self._scattered self._fit_dict["n_profile_components"] = self._n_prof_components # Log all the important stuff logger.info("### Best fit results ###") logger.info( f"Best model found with reduced Chi square of {self._fit_dict['redchisq']} using {self._n_gaussians} Gaussian components") logger.info( f"W10: {self._fit_dict['W10']} +/- {self._fit_dict['W10_e']}") logger.info( f"W50: {self._fit_dict['W50']} +/- {self._fit_dict['W50_e']}") logger.info( f"Wscat: {self._fit_dict['Wscat']} +/- {self._fit_dict['Wscat_e']}") logger.info( f"Weq: {self._fit_dict['Weq']} +/- {self._fit_dict['Weq_e']}") logger.info( f"S/N: {self._fit_dict['sn']} +/- {self._fit_dict['sn_e']}") logger.info( f"Noise estiamte: {self._noise_std}") logger.info( f"Scattering Timescale: {self._popt[1]}") logger.info(f"popt: {self._fit_dict['fit_params']}")
# And we're done # Plotter for the resulting fit
[docs] def plot_fit(self): """Plots the best fit set of Gaussians. Default output is "Gaussian_fit.png". """ if not self._plot_name: self._plot_name = "Gaussian_fit.png" popt = self._fit_dict["fit_params"] # We need an x range for the plot - which will be phase and for # Recreating the Gaussians - which will be -pi to pi plot_x = np.linspace(0, 1, len(self._fit_profile), endpoint=False) gaussian_x = np.linspace(-np.pi, np.pi, len(self._fit_profile), endpoint=False) fig = plt.figure(figsize=(20, 12)) # Plot the convolved gaussian z = self._exp_vm_gauss_conv(gaussian_x, *self._popt[0:5]) plt_label = "Convolved Gaussian Component" if self._n_prof_components == 1 else f"Convolved Gaussian Component 1" plt.plot(plot_x, z, "--", label=plt_label) # Plot any other convoled gaussians - one for each component past the first for i in range(1, self._n_prof_components): base = 0 tau = self._popt[1] popt_idx = 5 + (i-1)*3 z = self._exp_vm_gauss_conv( gaussian_x, base, tau, *self._popt[popt_idx:popt_idx + 3]) plt.plot(plot_x, z, "--", label=f"Convolved Gaussian Component {i+1}") # Plot the other gaussian components start_j = 5 + (self._n_prof_components-1)*3 for i, j in list(enumerate(range(start_j, len(self._popt), 3))): z = self._vm_gauss(gaussian_x, *self._popt[j:j+3]) plt.plot(plot_x, z, "--", label=f"Gaussian Component {i+1}") if self._maxima: for mx in self._maxima: plt.axvline(x=mx, ls=":", lw=2, color="gray") # Error bar - for noise and SN plt.text(0.03, 0.94, f"S/N: {round(self._sn, 2)} +/- \n {round(self._sn_e, 3)}", fontsize=14) plt.text( 0.03, 0.91 - self._noise_std/2, f"Noise: {round(self._noise_std, 5)}", fontsize=14) plt.errorbar(0.02, 0.91 - self._noise_std/2, yerr=self._noise_std, color="gray", capsize=10, markersize=0, label="Noise") # All the other plotting stuff formats = tuple(list(fig.canvas.get_supported_filetypes().keys())) for f in formats: if self._plot_name.endswith(f): title = self.plot_name[:-len(f)-1] if not self.plot_name.endswith(formats): title = self._plot_name self._plot_name += ".png" plt.title(title, fontsize=26) plt.xticks(fontsize=20) plt.yticks(fontsize=20) plt.xlim(0, 1) plt.xlabel("Phase", fontsize=24) plt.ylabel("Intensity", fontsize=24) plt.plot(plot_x, self._std_profile, label="Original Profile", color="black") plt.plot(plot_x, self._fit_profile, label="Gaussian Model", color="red") plt.legend(loc="best", prop={'size': 14}) plt.savefig(self._plot_name, bbox_inches="tight") plt.close()
def _standardise_raw_profile(self, roll_phase=0.25): """Normalises and rolls the raw profile to 0.25 in phase""" self._std_profile = self._raw_profile.copy() # Roll the profile such that the max point is at 0.25 phase (default) if roll_phase: roll_from = np.argmax(self._std_profile) roll_to = round(roll_phase*len(self._std_profile)) roll_i = roll_to - roll_from self._std_profile = np.roll(self._std_profile, roll_i) # Normalise profile to max=1, min=0 self._std_profile = normamlise_prof(self._std_profile) def _roll_to_ideal_phase(self, off_pulse_ranges): """Finds the largest off-pulse region from the off pulse ranges and rolls the profile to that index""" # Find the largest off-pulse range off_pulse_lens = [] for off_range in off_pulse_ranges: start_off = off_range[0] end_off = off_range[1] if start_off > end_off: # Wrapped around phase end off_pulse_lens.append( len(self._std_profile) - start_off + end_off) else: off_pulse_lens.append(end_off - start_off) longest_range, _ = sorted(zip(off_pulse_ranges, off_pulse_lens), reverse=True)[ 0] # Grabs the longest one start_off, end_off = longest_range # Find the centre index of this range if start_off > end_off: # off pulse is wrapped over phase end off_pulse_len = (len(self._std_profile) - start_off + end_off) centre_of_off_pulse = start_off + off_pulse_len/2 if centre_of_off_pulse > len(self._std_profile): centre_of_off_pulse = centre_of_off_pulse - \ len(self._std_profile) roll_from = int(centre_of_off_pulse) else: # Easy case - off pulse is not wrapped roll_from = int((end_off - start_off)/2 + start_off) # Do the rolling roll_to = len(self._std_profile) roll_i = roll_to - roll_from self._std_profile = np.roll(self._std_profile, roll_i) return roll_i # The function that does the actual fitting def _fit_gaussian(self, force_no_scattering=False): """ Fits multiple gaussian components to a pulse profile and finds the best number to use for a fit. Will always fit at least one gaussian per profile component. Profile components are defined by prof_utils.estimate_components_onpulse(). Will fit a number of gaussians convoled with an exponential tail equal to the number of profile components Will then attempt to successively fit one more regular gaussian component until max_N is reached The number of gaussians fit that yields the lowes Bayesian Information Criterion is deemed the most successful """ # Initiate x - we will be working in 1D circular coordinates x = np.linspace(-np.pi, np.pi, len(self._std_profile), endpoint=False) # Estimate gaussian parameters based on profile components comp_locs = [] comp_amp = [] comp_width = [] components = [] for pair in self._on_pulse_ranges: lower_bound = pair[0] upper_bound = pair[1] # Amplitude # Amp guess is half of max amplitude of component on_pulse_region = profile_region_from_pairs( self._std_profile, [pair]) comp_amp.append(max(on_pulse_region)*0.5) # Gaussian width (kappa/sigma) width_est = len(profile_region_from_pairs( self._std_profile, [pair]))/2 # halve the width est because it always looks too big width_est = 2*np.pi * width_est/len(self._std_profile) width_est = 1/width_est**2 # Convert this to kappa comp_width.append(width_est) # The entire component profile will be useful to know about components.append(profile_region_from_pairs( np.linspace(-np.pi, np.pi, len(self._std_profile), endpoint=False), [pair])) # Centre location - loop over on-pulse. This estimate only makes sense if the edge of the phase is # off-pulse. Which should always be the case by this point. for on_range in self._on_pulse_ranges: middle = (on_range[0] + on_range[1])/2 # Convert m from 0 -> len(x) to -pi -> pi coordinates comp_locs.append((middle/len(x)) * 2*np.pi - np.pi) # Turn guesses into generators that cycle between profile components amp_gen = itertools.cycle(comp_amp) width_gen = itertools.cycle(comp_width) loc_gen = itertools.cycle(comp_locs) components_gen = itertools.cycle(components) # Check if the profile is scattered - this will determine which profile to use for curve_fit() if not force_no_scattering: scattering_bounds = [[], []] scattering_guess = [] amp_gen_sc = itertools.cycle(comp_amp) width_gen_sc = itertools.cycle(comp_width) loc_gen_sc = itertools.cycle(comp_locs) components_gen_sc = itertools.cycle(components) for i in range(self._n_prof_components): # Add the baseline and tau to a single gaussian is_first = True if i == 0 else False scattering_guess, scattering_bounds = self._add_bounds_guess( amp_gen_sc, width_gen_sc, loc_gen_sc, components_gen_sc, scattering_guess, scattering_bounds, base=is_first, tau=is_first) # Bounds needs to be in tuple form for curve_fit scattering_bounds_tuple = ( tuple(scattering_bounds[0]), tuple(scattering_bounds[1])) try: popt, _ = curve_fit(self._multi_gauss_exp_with_base, x, self._std_profile, bounds=scattering_bounds_tuple, p0=scattering_guess, maxfev=10000) # TODO: make this estimate better self._scattered = True if popt[1] >= self._scattering_threshold else False except (RuntimeError, ValueError) as e: self._scattered = True # The profile to fit - scattered profiles will use the entire standardised profile as we expect that the # on-pulse region is not accurate fitting_profile = self._std_profile if self._scattered else self._on_pulse_prof else: fitting_profile = self._on_pulse_prof # Do the fittingloop fit_dict = self._fit_loop( fitting_profile, amp_gen, width_gen, loc_gen, components_gen) if not fit_dict: raise BadFitError( f"No suitable fit could be found for any provided values of N_gaussians") # Find the best fit according to reduced chi square best_chi_diff = np.inf for n_gaussians in fit_dict.keys(): chi_diff = abs(1 - fit_dict[n_gaussians]["redchisq"]) if chi_diff < best_chi_diff: best_chi_diff = chi_diff best_fit = n_gaussians # Record the best things in the object self._popt = fit_dict[best_fit]["popt"] self._pcov = fit_dict[best_fit]["pcov"] self._fit_profile = fit_dict[best_fit]["fit"] self._best_chi = fit_dict[best_fit]["redchisq"] self._n_gaussians = int(best_fit) def _fit_loop(self, profile, amp_gen, width_gen, loc_gen, components_gen, reduce_chi_threshold=0.05): """ A loop that attempts to fit multiple gaussians, some with exponential tails, to a profile. If scattered=True, """ # Initiate x - we will be working in 1D circular coordinates x = np.linspace(-np.pi, np.pi, len(self._std_profile), endpoint=False) # Initiate bounds, guesses and fit dictionary bounds_arr = [[], []] guess = [] fit_dict = {} # Fit from n_components to max_N gaussians to the profile. Evaluate profile fit using bayesian information criterion for num in range(self._max_N): # Add the baseline and tau to a single gaussian is_first = True if num == 0 else False guess, bounds_arr = self._add_bounds_guess( amp_gen, width_gen, loc_gen, components_gen, guess, bounds_arr, base=is_first, tau=is_first) # Bounds needs to be in tuple form for curve_fit bounds_tuple = (tuple(bounds_arr[0]), tuple(bounds_arr[1])) if num+1 < self._n_prof_components: # Skip until we have at least num = profile_components continue # Do the fitting popt = None pcov = None try: popt, pcov = curve_fit(self._multi_gauss_exp_with_base, x, profile, bounds=bounds_tuple, p0=guess, maxfev=10000) except (RuntimeError, ValueError) as e: # No fit found in maxfev logger.warn(e) for i, g in enumerate(guess): logger.debug( f"Guess: {g} | bounds: {bounds_tuple[0][i]} -> {bounds_tuple[1][i]}") # continue break # Work out some stuff if popt is not None and pcov is not None: fit = self._multi_gauss_exp_with_base(x, *popt) chisq = self._calculate_chisq( self._std_profile, fit, self._noise_std) fit_dict[str(num+1)] = {"popt": [], "pcov": [], "fit": [], "chisq": []} fit_dict[str(num+1)]["popt"] = popt fit_dict[str(num+1)]["pcov"] = pcov fit_dict[str(num+1)]["fit"] = fit fit_dict[str(num+1)]["redchisq"] = chisq / \ (len(self._std_profile)-len(popt)) logger.debug( f"Reduced chi squared for {num+1} components: {fit_dict[str(num+1)]['redchisq']}") # function exit condition if abs(fit_dict[str(num+1)]["redchisq"]-1) <= reduce_chi_threshold: return fit_dict return fit_dict def _add_bounds_guess(self, amp_gen, width_gen, loc_gen, component_gen, current_guess, current_bounds, base=False, tau=False): if base: # Base current_guess += [0.1] # Guess at 10% current_bounds[0].append(0) # Noise should definitely not be above 50% max anyway current_bounds[1].append(0.5) if tau: # Tau current_guess += [1] current_bounds[0].append(1e-3) current_bounds[1].append(200) # Add the guesses amp_guess = next(amp_gen) width_guess = next(width_gen) loc_guess = next(loc_gen) component_guess = next(component_gen) # Add the bounds amp_lo = 3*self._noise_std if self._noise_std < 0.2 else 0.6 amp_hi = 1.0 # width bounds is hard - need to reverse engineer how we got here in the first place # kappa_sigma (width_est) = (2pi * og_width / prof_len)^-2 # og_width = (kappa_sigma^-1/2 * prof_len) / 2pi og_width = (width_guess**(-1/2) * len(self._std_profile)) / (2*np.pi) # Profiles are not allowed to be much wider than their component estimates width_lo = og_width * 3 width_lo = (2*np.pi * width_lo / len(self._std_profile))**-2 width_hi = 1e6 # Components are allowed to be very sharp # Locations should be confined to their respective component loc_lo = component_guess[0] loc_hi = component_guess[-1] if loc_lo > loc_hi: loc_lo = -np.pi loc_hi = np.pi # Fix guesses if they're out of bounds amp_guess = amp_lo if amp_guess < amp_lo else amp_guess amp_guess = amp_hi if amp_guess > amp_hi else amp_guess width_guess = width_lo if width_guess < width_lo else width_guess width_guess = width_hi if width_guess > width_hi else width_guess loc_guess = loc_lo if loc_guess < loc_lo else loc_guess loc_guess = loc_hi if loc_guess > loc_hi else loc_guess # [Amp, kappa, loc] current_guess += [amp_guess, width_guess, loc_guess] # Add the bounds current_bounds[0].append(amp_lo) # Amplitude current_bounds[1].append(amp_hi) current_bounds[0].append(width_lo) # kappa current_bounds[1].append(width_hi) current_bounds[0].append(loc_lo) # Location current_bounds[1].append(loc_hi) return current_guess, current_bounds # Find min and max points def _find_minima_maxima_gauss(self): """ Finds all roots of a gaussian function. Measured in Phase. """ # Create the derivative list and spline it to find roots x = np.linspace(0, len(self._fit_profile)-1, # Work in indexes len(self._fit_profile)) spline_prof = UnivariateSpline(x, self._fit_profile, s=0, k=4) dy_spline = spline_prof.derivative() d2y_spline = UnivariateSpline( x, self._fit_profile, s=0, k=5).derivative().derivative() d2y_profile = d2y_spline(x) roots = dy_spline.roots() logger.debug(roots) roots = [int(round(i)) for i in roots] logger.debug(roots) roots = list(set(roots)) # Remove duplicates logger.debug(roots) # Find which are max, min, and false maxima = [] minima = [] for root in roots: # Set valid maxima to be 2 times noise if (self._fit_profile[root] - self._popt[0]) > 2*self._noise_std: if d2y_profile[root] < 0: maxima.append(root) else: minima.append(root) # Get errors minima_e = [] maxima_e = [] for m in minima: minima_e.append(error_in_x_pos( x, self._fit_profile, self._noise_std, round(m))) for m in maxima: maxima_e.append(error_in_x_pos( x, self._fit_profile, self._noise_std, round(m))) # Convert to phase, we're currently in indexes self._maxima = list(np.array(maxima)/len(self._fit_profile)) self._minima = list(np.array(minima)/len(self._fit_profile)) self._maxima_e = list(np.array(maxima_e)/len(self._fit_profile)) self._minima_e = list(np.array(minima_e)/len(self._fit_profile)) # Find the width of profile components def _find_widths(self): """ Attempts to find the W_10, W_50 and equivalent width of a profile by using a spline approach. Weq errors are estimated by finding the average difference in Weq when you add and subtract the std from the on-pulse profile """ # Perform spline operations on the fit x = np.linspace(0, len(self._fit_profile)-1, # Work in indexes len(self._fit_profile), endpoint=False) # Profile needs to be at zero baseline profile_for_widths = self._fit_profile - min(self._fit_profile) amp_fit = max(profile_for_widths) - min(profile_for_widths) spline10 = UnivariateSpline( x, profile_for_widths - np.full(len(x), 0.1*amp_fit), s=0) spline50 = UnivariateSpline( x, profile_for_widths - np.full(len(x), 0.5*amp_fit), s=0) spline_s = UnivariateSpline( x, profile_for_widths - np.full(len(x), 1/np.exp(1)*amp_fit), s=0) # Find Weq on_pulse = [] for pair in self._on_pulse_ranges: if pair[0] < pair[1]: on_pulse.extend(self._std_profile[pair[0]:pair[1]]) else: on_pulse.extend(self._std_profile[pair[1]:]) on_pulse.extend(self._std_profile[0:pair[0]]) x_eq = np.linspace(0, len(on_pulse)-1, len(on_pulse)) spline0 = UnivariateSpline(x_eq, on_pulse, s=0) integral = spline0.integral(0, len(on_pulse)-1) Weq = integral/max(on_pulse) # Find W10, W50 and Wscat W10_roots = spline10.roots() W10_roots = [int(round(i)) for i in W10_roots] W50_roots = spline50.roots() W50_roots = W50_roots = [int(round(i)) for i in W50_roots] Wscat_roots = spline_s.roots() Wscat_roots = [int(round(i)) for i in Wscat_roots] # If any of the widths roots don't exist, this fit sucks if not list(W10_roots) or not list(W50_roots) or not list(Wscat_roots): raise BadFitError("This fit is not suitable for use") W10 = W10_roots[-1] - W10_roots[0] W50 = W50_roots[-1] - W50_roots[0] Wscat = Wscat_roots[-1] - Wscat_roots[0] # W10 root errors err_10_1 = error_in_x_pos( x, self._fit_profile, self._noise_std, round(W10_roots[0])) err_10_2 = error_in_x_pos( x, self._fit_profile, self._noise_std, round(W10_roots[-1])) W10_e = np.sqrt(err_10_1**2 + err_10_2**2) # W50 root errors err_50_1 = error_in_x_pos( x, self._fit_profile, self._noise_std, round(W50_roots[0])) err_50_2 = error_in_x_pos( x, self._fit_profile, self._noise_std, round(W50_roots[-1])) W50_e = np.sqrt(err_50_1**2 + err_50_2**2) # Wscat root errors err_scat_1 = error_in_x_pos( x, self._fit_profile, self._noise_std, round(Wscat_roots[0])) err_scat_2 = error_in_x_pos( x, self._fit_profile, self._noise_std, round(Wscat_roots[-1])) Wscat_e = np.sqrt(err_scat_1**2 + err_scat_2**2) # Weq errors - using covariance formula on_pulse_less = (on_pulse - self._noise_std).clip(min=0) spline0 = UnivariateSpline(x_eq, on_pulse_less, s=0) integral = spline0.integral(0, len(self._std_profile)-1) dwdint = 1/max(on_pulse)**2 dwdmax = -integral/max(on_pulse)**2 int_e = abs(integral/max(on_pulse - self._noise_std) - integral/max(on_pulse)) max_e = self._noise_std Weq_e = np.sqrt(dwdint**2 * int_e**2 + dwdmax**2 * max_e**2 + 2*dwdint*dwdmax*int_e*max_e) # Convert to phases self._W10 = W10/len(self._fit_profile) self._W10_e = W10_e/len(self._fit_profile) self._W50 = W50/len(self._fit_profile) self._W50_e = W50_e/len(self._fit_profile) self._Wscat = Wscat/len(self._fit_profile) self._Wscat_e = Wscat_e/len(self._fit_profile) self._Weq = Weq/len(self._fit_profile) self._Weq_e = Weq_e/len(self._fit_profile) def _est_noise_sn(self): """ Estimates the SN of a profile using the fit profile to determine an off-pulse region. """ sorted_profile = [i for _, i in sorted( zip(self._fit_profile, self._std_profile), key=lambda pair: pair[0])] ten_percent = round(0.1*len(self._std_profile)) self._noise_std = np.std(sorted_profile[:ten_percent]) self._noise_mean = np.mean(sorted_profile[:ten_percent]) self._sn = 1/self._noise_std # TODO: make this estimate better self._sn_e = 1/(self._noise_std * np.sqrt(2 * ten_percent - 2)) #################################### ###### A bunch of maths stuff ###### #################################### def _exp_tail(self, tau, x): return np.exp(-x/tau)/tau def _exp_vm_gauss_conv(self, x, base, tau, amp, kappa, loc): e_tail = self._exp_tail(tau/100, x) e_tail = e_tail/max(e_tail) g = self._vm_gauss(x, amp, kappa, loc) y = np.real(np.fft.ifft(np.fft.fft(g) * np.fft.fft(e_tail))) # Convolve y = amp*y/max(y) y += base return y def _vm_gauss(self, x, amp, kappa, loc): if kappa <= 10: # Use vonmises g = vonmises.pdf(x, kappa, loc) g = amp * g/max(g) else: # Use regular gaussian - vonmises can't handle large kappa sigma = np.sqrt(1/kappa) # This is true for kappa approx > 10 g = amp * np.exp(-(((x-loc)**2) / (2*sigma**2))) return g def _multi_gauss_exp_with_base(self, x, base, tau, *params): """ Where x is a sequenetial numpy array of floats from -pi to pi of any length This function will employ a vonmises distribution where kappa is small and convolve the first gaussian/vonmises with an exponential tail described by tau. base is the baseline noise The parameters are passed as a list of len%3 = 0 with values: amp: The amplitude of the gaussian kappa: The kappa value of the vonmises distribution. For k>10 we will use sigma=sqrt(1/k) instead loc: The location of the peak of the gaussian. Should be between -pi and pi """ y = np.zeros_like(x) for i in range(0, len(params), 3): amp = params[i] kappa = params[i+1] loc = params[i+2] if i < 3*self._n_prof_components: # Make a gauss with exp tail if i != 0: # Only use the base for one component base = 0 conv = self._exp_vm_gauss_conv(x, base, tau, amp, kappa, loc) y += conv else: # Make a vonmises/gaussian g = self._vm_gauss(x, amp, kappa, loc) y += g return y def _integral_multi_gauss(self, *params): y = 0 for i in range(0, len(params), 3): a = params[i] c = params[i+2] y = y + a*c*np.sqrt(2*np.pi) return y def _multi_gauss(self, x, *params): y = np.zeros_like(x) for i in range(0, len(params), 3): a = params[i] b = params[i+1] c = params[i+2] y = y + a * np.exp(-(((x-b)**2) / (2*c**2))) return y def _multi_gauss_ddx(self, x, *params): # Derivative of gaussian y = np.zeros_like(x) for i in range(0, len(params), 3): a = params[i] b = params[i+1] c = params[i+2] y = y - a/c**2 * (x - b) * np.exp(-(((x-b)**2) / (2*c**2))) return y def _multi_gauss_d2dx2(self, x, *params): # Double derivative of gaussian y = np.zeros_like(x) for i in range(0, len(params), 3): a = params[i] b = params[i+1] c = params[i+2] y = y + (self._multi_gauss(x, a, b, c) / c**2) * \ (((x - b)**2)/(c**2) - 1) return y def _partial_gauss_dda(x, a, b, c): return np.exp((-(b - x)**2)/(2*c**2)) def _partial_gauss_ddb(x, a, b, c): return a*(x - b) * np.exp((-(b - x)**2)/(2*c**2))/c**2 def _partial_gauss_ddc(x, a, b, c): return a*(x - b)**2 * np.exp((-(b - x)**2)/(2*c**2))/c**3 # Chi sqaured evaluation def _calculate_chisq(self, observed_values, expected_values, err): test_statistic = 0 for observed, expected in zip(observed_values, expected_values): test_statistic += ((float(observed)-float(expected))/float(err))**2 return test_statistic