Source code for meersolar.utils.casatasks
import types
import psutil
import numpy as np
import glob
import os
import traceback
import time
from casatools import msmetadata, ms as casamstool, table
from .basic_utils import *
from .resource_utils import *
#############################
# General CASA tasks
#############################
[docs]
def check_scan_in_caltable(caltable, scan):
"""
Check scan number available in caltable or not
Parameters
----------
caltable : str
Name of the caltable
scan : int
Scan number
Returns
-------
bool
Whether scan is present in the caltable or not
"""
tb = table()
tb.open(caltable)
scans = tb.getcol("SCAN_NUMBER")
tb.close()
if int(scan) in scans:
return True
else:
return False
[docs]
def reset_weights_and_flags(
msname="",
restore_flag=True,
force_reset=False,
n_threads=-1,
):
"""
Reset weights and flags for the ms
Parameters
----------
msname : str
Measurement set
restore_flag : bool, optional
Restore flags or not
force_reset : bool, optional
Force reset
"""
limit_threads(n_threads=n_threads)
from casatasks import flagdata
msname = msname.rstrip("/")
if os.path.exists(f"{msname}/.reset") == False or force_reset:
mspath = os.path.dirname(os.path.abspath(msname))
os.chdir(mspath)
if restore_flag:
print(f"Restoring flags of measurement set : {msname}")
if os.path.exists(msname + ".flagversions"):
os.system("rm -rf " + msname + ".flagversions")
flagdata(vis=msname, mode="unflag", flagbackup=False)
print(f"Resetting previous weights of the measurement set: {msname}")
msmd = msmetadata()
msmd.open(msname)
npol = msmd.ncorrforpol()[0]
msmd.close()
tb = table()
tb.open(msname, nomodify=False)
colnames = tb.colnames()
nrows = tb.nrows()
if "WEIGHT" in colnames:
print(f"Resetting weight column to ones of measurement set : {msname}.")
weight = np.ones((npol, nrows))
tb.putcol("WEIGHT", weight)
if "SIGMA" in colnames:
print(f"Resetting sigma column to ones of measurement set: {msname}.")
sigma = np.ones((npol, nrows))
tb.putcol("SIGMA", sigma)
if "WEIGHT_SPECTRUM" in colnames:
print(f"Removing weight spectrum of measurement set: {msname}.")
tb.removecols("WEIGHT_SPECTRUM")
if "SIGMA_SPECTRUM" in colnames:
print(f"Removing sigma spectrum of measurement set: {msname}.")
tb.removecols("SIGMA_SPECTRUM")
tb.flush()
tb.close()
os.system(f"touch {msname}/.reset")
return
[docs]
def correct_missing_col_subms(msname):
"""
Correct for missing colurmns in sub-MSs
Parameters
----------
msname : str
Name of the measurement set
"""
tb = table()
colname_list = []
sub_mslist = glob.glob(msname + "/SUBMSS/*.ms")
for ms in sub_mslist:
tb.open(ms)
colname_list.append(tb.colnames())
tb.close()
sets = [set(sublist) for sublist in colname_list]
if len(sets) > 0:
common_elements = set.intersection(*sets)
unique_elements = set.union(*sets) - common_elements
for ms in sub_mslist:
tb.open(ms, nomodify=False)
colnames = tb.colnames()
for colname in unique_elements:
if colname in colnames:
print(f"Removing column: {colname} from sub-MS: {ms}")
tb.removecols(colname)
tb.flush()
tb.close()
return
[docs]
def single_mstransform(
msname="",
outputms="",
field="",
scan="",
width=1,
timebin="",
datacolumn="DATA",
spw="",
corr="",
timerange="",
numsubms="auto",
n_threads=-1,
):
"""
Perform mstransform of a single scan
Parameters
----------
msname : str
Name of the measurement set
outputms : str
Output ms name
scan : int
Scan to split (a single scan)
field : str, optional
Field name
width : int, optional
Number of channels to average
timebin : str, optional
Time to average
datacolumn : str, optional
Data column to split
spw : str, optional
Spectral window
corr : str, optional
Correlation to split
timerange : str, optional
Time range
numsubms : str, optional
Number of subms
n_threads : int, optional
Number of CPU threads
Returns
-------
str
Output measurement set name
"""
limit_threads(n_threads=n_threads)
from casatasks import mstransform, initweights, flagdata
if timebin == "" or timebin is None:
timeaverage = False
else:
timeaverage = True
if width > 1:
chanaverage = True
else:
chanaverage = False
outputms = outputms.rstrip("/")
if os.path.exists(outputms):
os.system("rm -rf " + outputms)
if os.path.exists(outputms + ".flagversions"):
os.system("rm -rf " + outputms + ".flagversions")
try:
if n_threads < 1:
n_threads = 2
else:
n_threads = min(n_threads, 2)
if field == "":
msmd = msmetadata()
msmd.open(msname)
field = str(msmd.fieldsforscan(int(scan))[0])
msmd.close()
with suppress_output():
mstransform(
vis=msname,
outputvis=outputms,
spw=spw,
timerange=timerange,
field=field,
scan=scan,
datacolumn=datacolumn,
createmms=True,
correlation=corr,
timeaverage=timeaverage,
timebin=timebin,
chanaverage=chanaverage,
chanbin=int(width),
nthreads=n_threads,
separationaxis="scan",
numsubms=numsubms,
)
time.sleep(5)
if os.path.exists(outputms):
with suppress_output():
initweights(vis=outputms, wtmode="ones", dowtsp=True)
flagdata(
vis=outputms,
mode="clip",
clipzeros=True,
datacolumn="data",
flagbackup=False,
)
os.system(f"touch {outputms}/.splited")
return outputms
except Exception as e:
traceback.print_exc()
if os.path.exists(outputms):
os.system("rm -rf " + outputms)
return
# Expose functions and classes
__all__ = [
name
for name, obj in globals().items()
if (
(isinstance(obj, types.FunctionType) or isinstance(obj, type))
and obj.__module__ == __name__
)
]