import types
import psutil
import numpy as np
import traceback
import warnings
import glob
import os
from casatools import msmetadata, ms as casamstool, table
from .basic_utils import *
from .ms_metadata import *
from .imaging import *
#####################################
# Calibration related
#####################################
[docs]
def merge_caltables(caltables, merged_caltable, append=False, keepcopy=False):
"""
Merge multiple same type of caltables
Parameters
----------
caltables : list
Caltable list
merged_caltable : str
Merged caltable name
append : bool, optional
Append with exisiting caltable
keepcopy : bool, opitonal
Keep input caltables or not
Returns
-------
str
Merged caltable
"""
if not isinstance(caltables, list) or len(caltables) == 0:
print("Please provide a list of caltable.")
return
if os.path.exists(merged_caltable) and append:
pass
else:
if os.path.exists(merged_caltable):
os.system("rm -rf " + merged_caltable)
if keepcopy:
os.system("cp -r " + caltables[0] + " " + merged_caltable)
else:
os.system("mv " + caltables[0] + " " + merged_caltable)
caltables.remove(caltables[0])
if len(caltables) > 0:
tb = table()
for caltable in caltables:
if os.path.exists(caltable):
tb.open(caltable)
tb.copyrows(merged_caltable)
tb.close()
if not keepcopy:
os.system("rm -rf " + caltable)
return merged_caltable
[docs]
def get_psf_size(msname, chan_number=-1):
"""
Function to calculate PSF size in arcsec
Parameters
----------
msname : str
Name of the measurement set
chan_number : int, optional
Channel number
Returns
-------
float
PSF size in arcsec
"""
maxuv_m, maxuv_l = calc_maxuv(msname, chan_number=chan_number)
psf = np.rad2deg(1.2 / maxuv_l) * 3600.0 # In arcsec
return round(psf, 2)
[docs]
def calc_bw_smearing_freqwidth(msname, full_FoV=False, FWHM=True):
"""
Function to calculate spectral width to procude bandwidth smearing
Parameters
----------
msname : str
Name of the measurement set
full_FoV : bool, optional
Consider smearing within solar disc or full FoV
FWHM : bool, optional
If using full FoV, consider upto FWHM or first null
Returns
-------
float
Spectral width in MHz
"""
tb = table()
tb.open(f"{msname}/SPECTRAL_WINDOW")
freq = float(tb.getcol("REF_FREQUENCY")[0]) / 10**6
freqres = float(tb.getcol("CHAN_WIDTH")[0][0]) / 10**6
tb.close()
R = 0.9
if full_FoV:
fov = calc_field_of_view(msname, FWHM=FWHM) # In arcsec
else:
fov = 2 * calc_sun_dia(np.nanmean(freq)) * 60 # 2 times sun size
psf = get_psf_size(msname)
delta_nu = np.sqrt((1 / R**2) - 1) * (psf / fov) * freq
delta_nu = ceil_to_multiple(delta_nu, freqres)
return round(delta_nu, 2)
[docs]
def calc_time_smearing_timewidth(msname, full_FoV=False, FWHM=True):
"""
Calculate maximum time averaging to avoid time smearing over full FoV.
Parameters
----------
msname : str
Measurement set name
full_FoV : bool, optional
Consider smearing within solar disc or full FoV
FWHM : bool, optional
If using full FoV, consider upto FWHM or first null
Returns
-------
delta_t_max : float
Maximum allowable time averaging in seconds.
"""
msmd = msmetadata()
msmd.open(msname)
freq_Hz = msmd.chanfreqs(0)[0]
times = msmd.timesforspws(0)
msmd.close()
timeres = times[1] - times[0]
c = 299792458.0 # speed of light in m/s
omega_E = 7.2921159e-5 # Earth rotation rate in rad/s
lam = c / freq_Hz # wavelength in meters
freq = freq_Hz / 10**6
if full_FoV:
fov = calc_field_of_view(msname, FWHM=FWHM) # In arcsec
else:
fov = 2 * calc_sun_dia(np.nanmean(freq)) * 60 # 2 times sun size
fov_deg = fov / 3600.0
fov_rad = np.deg2rad(fov_deg)
uv, uvlambda = calc_maxuv(msname)
# Approximate maximum allowable time to avoid >10% amplitude loss
delta_t_max = lam / (2 * np.pi * uv * omega_E * fov_rad)
delta_t_max = ceil_to_multiple(delta_t_max, timeres)
return round(delta_t_max, 2)
[docs]
def max_time_solar_smearing(msname):
"""
Max allowable time averaging to avoid solar motion smearing.
Parameters
----------
msname : str
Measurement set name
Returns
-------
t_max : float
Maximum time averaging in seconds.
"""
omega_sun = 2.5 / (60.0) # solar apparent motion (2.5 arcsec/min to arcsec/sec)
psf = get_psf_size(msname)
t_max = 0.5 * (psf / omega_sun) # seconds
return t_max
[docs]
def delaycal(
vis="",
caltable="",
spw="",
field="",
scan="",
uvrange="",
refant="",
refantmode="flex",
solint="inf",
combine="scan",
gaintable=[],
gainfield=[],
interp=[],
):
"""
General delay calibration using CASA, not assuming any point source
Parameters
----------
vis : str
Measurement set
caltable : str
Caltable name
spw : str, optional
Spectral window
field : str, optional
Field name
scan : str, optional
Scan number
uvrange : str, optional
UV-range
refant : str, optional
Reference antenna (require one reference antenna, can not keep black)
refantmode : str, optional
Refant mode
solint : str, optional
Solution interval
combine : str, optional
Combine data
gaintable : list, optional
Previous gaintables
gainfield : list, optional
Gain fields
interp : list, optional
Interpolation solutions
Returns
-------
str
Caltable name
"""
from casatasks import bandpass, gaincal, rerefant
try:
if refant == "":
print("Provide a reference antenna.")
return
warnings.filterwarnings("ignore")
vis = vis.rstrip("/")
os.system("rm -rf " + caltable + "*")
gaincal(
vis=vis,
caltable=caltable,
field=str(field),
scan=str(scan),
uvrange="",
refant=refant,
solint=solint,
combine=combine,
gaintype="K",
gaintable=gaintable,
gainfield=gainfield,
interp=interp,
)
bandpass(
vis=vis,
caltable=caltable + ".tempbcal",
spw=spw,
field=str(field),
scan=str(scan),
uvrange=uvrange,
refant=refant,
solint=solint,
combine=combine,
solnorm=True,
gaintable=gaintable,
gainfield=gainfield,
interp=interp,
)
rerefant(
vis=vis,
tablein=caltable + ".tempbcal",
refant=refant,
refantmode=refantmode,
)
tb = table()
tb.open(caltable + ".tempbcal/SPECTRAL_WINDOW")
freq = tb.getcol("CHAN_FREQ").flatten()
tb.close()
tb.open(caltable + ".tempbcal")
gain = tb.getcol("CPARAM")
flag = tb.getcol("FLAG")
gain[flag] = np.nan
tb.close()
tb.open(caltable, nomodify=False)
delay_gain = tb.getcol("FPARAM") * 0.0
delay_flag = tb.getcol("FLAG")
phase = np.angle(gain)
for i in range(delay_gain.shape[0]):
for j in range(delay_gain.shape[2]):
try:
pos = np.where(np.isnan(phase[i, :, j]) == False)[0]
if len(pos) > 0:
popt, pcov = np.polyfit(
2 * np.pi * freq[pos], phase[i, :, j][pos], deg=1
)
if np.isnan(popt):
delay = 0.0
else:
delay = popt / 10**-9 # Delay in nanosecond
else:
delay = 0.0
delay_gain[i, :, j] = delay
except Exception:
delay_gain[i, :, j] = 0.0
tb.putcol("FPARAM", delay_gain)
tb.putcol("FLAG", delay_flag)
tb.flush()
tb.close()
os.system("rm -rf " + caltable + ".tempbcal")
return caltable
except Exception as e:
traceback.print_exc()
return
# Expose functions and classes
__all__ = [
name
for name, obj in globals().items()
if (
(isinstance(obj, types.FunctionType) or isinstance(obj, type))
and obj.__module__ == __name__
)
]