import os, numpy as np, copy, psutil, gc, traceback, resource, time, argparse
from astropy.io import fits
from meersolar.pipeline.basic_func import *
from dask import delayed, compute, config
from functools import partial
from casatasks import casalog
try:
casalogfile = casalog.logfile()
os.system("rm -rf " + casalogfile)
except:
pass
[docs]
def single_selfcal_iteration(
msname,
logger,
selfcaldir,
cellsize,
imsize,
round_number=0,
uvrange="",
minuv=0,
calmode="ap",
solint="60s",
refant="1",
solmode="",
gaintype="T",
applymode="calonly",
threshold=3,
weight="briggs",
robust=0.0,
use_previous_model=False,
use_solar_mask=True,
mask_radius=20,
min_tol_factor=-1,
ncpu=-1,
mem=-1,
):
"""
A single self-calibration round
Parameters
----------
msname : str
Name of the measurement set
logger : logger
Python logger
selfcaldir : str
Self-calibration directory
cellsize : float
Cellsize in arcsec
imsize : int
Image pixel size
round_number : int, optional
Selfcal iteration number
uvrange : float, optional
UV range for calibration
calmode : str, optional
Calibration mode ('p' or 'ap')
solint : str, optional
Solution intervals
refant : str, optional
Reference antenna
applymode : str, optional
Solution apply mode (calonly or calflag)
threshold : float, optional
Imaging and auto-masking threshold
weight : str, optional
Image weighting
robust : float, optional
Robust parameter for briggs weighting
use_previous_model : bool, optional
Use previous model
use_solar_mask : bool, optional
Use solar disk mask or not
mask_radius : float, optional
Mask radius in arcminute
min_tol_factor : float, optional
Minimum tolerance factor
ncpu : int, optional
Number of CPUs to use in WSClean
mem : float, optional
Memory usage limit in WSClean
Returns
-------
int
Success message
str
Caltable name
float
RMS based dynamic range
float
RMS of the image
str
Image name
str
Model image name
str
Residual image name
"""
limit_threads(n_threads=ncpu)
from casatasks import gaincal, bandpass, applycal, flagdata, delmod, flagmanager
from casatools import msmetadata, table
try:
##################################
# Setup wsclean params
##################################
if ncpu < 1:
ncpu = psutil.cpu_count()
if mem < 0:
mem = round(psutil.virtual_memory().available / (1024**3), 2)
msname = msname.rstrip("/")
if use_previous_model == False:
delmod(vis=msname, otf=True, scr=True)
prefix = (
selfcaldir
+ "/"
+ os.path.basename(msname).split(".ms")[0]
+ "_selfcal_present"
)
############################
# Determining channel blocks
############################
msmd = msmetadata()
msmd.open(msname)
times = msmd.timesforspws(0)
timeres = np.diff(times)
pos = np.where(timeres > 3 * np.nanmedian(timeres))[0]
max_intervals = min(1, len(pos))
freqs = msmd.chanfreqs(0, unit="MHz")
freqres = freqs[1] - freqs[0]
freq_width = calc_bw_smearing_freqwidth(msname)
nchan = int(freq_width / freqres)
total_nchan = len(freqs)
freq = msmd.meanfreq(0, unit="MHz")
total_time = max(times) - min(times)
msmd.close()
chanres = np.diff(freqs)
chanres /= np.nanmin(chanres)
pos = np.where(chanres > 1)[0]
chanrange_list = []
start_chan = 0
for i in range(len(pos) + 1):
if i > len(pos) - 1:
end_chan = total_nchan
else:
end_chan = pos[i] + 1
if end_chan - start_chan > 10 or len(chanrange_list) == 0:
chanrange_list.append(f"{start_chan} {end_chan}")
else:
last_chanrange = chanrange_list[-1]
chanrange_list.remove(last_chanrange)
start_chan = last_chanrange.split(" ")[0]
chanrange_list.append(f"{start_chan} {end_chan}")
start_chan = end_chan + 1
if len(chanrange_list) == 0:
unflag_chans, flag_chans = get_chans_flag(msname)
chanrange_list = [f"{min(unflag_chans)} {max(unflag_chans)}"]
########################################
# Scale bias list and channel range list
########################################
scale_bias_list = []
for chanrange in chanrange_list:
start_chan = int(chanrange.split(" ")[0])
end_chan = int(chanrange.split(" ")[-1])
mid_chan = int((start_chan + end_chan) / 2)
mid_freq = freqs[mid_chan]
scale_bias = round(get_multiscale_bias(mid_freq), 2)
scale_bias_list.append(scale_bias)
############################################
# Merge channel ranges with identical scale bias
############################################
merged_channels = []
merged_biases = []
start, end = map(int, chanrange_list[0].split())
current_bias = scale_bias_list[0]
for i in range(1, len(chanrange_list)):
next_start, next_end = map(int, chanrange_list[i].split())
next_bias = scale_bias_list[i]
if next_bias == current_bias:
# Merge ranges (irrespective of contiguity)
end = next_end
else:
# Finalize current group
merged_channels.append(f"{start} {end}")
merged_biases.append(current_bias)
# Start new group
start, end = next_start, next_end
current_bias = next_bias
# Final group
merged_channels.append(f"{start} {end}")
merged_biases.append(current_bias)
chanrange_list = copy.deepcopy(merged_channels)
scale_bias_list = copy.deepcopy(merged_biases)
del merged_channels, merged_biases
###############################
# Temporal chunking list
###############################
nintervals = 1
nchan_list = []
nintervals_list = []
for i in range(len(chanrange_list)):
chanrange = chanrange_list[i]
###########################################################################
# Spectral variation is kept fixed at 10% level.
# Because selfcal is done along temporal axis, where variation matters most
###########################################################################
if min_tol_factor <= 0:
min_tol_factor = 1.0
nintervals, _ = get_optimal_image_interval(
msname,
chan_range=f"{chanrange.replace(' ',',')}",
temporal_tol_factor=float(min_tol_factor / 100.0),
spectral_tol_factor=0.1,
max_nchan=-1,
max_ntime=max_intervals,
)
nchan_list.append(nchan)
nintervals_list.append(nintervals)
os.system(f"rm -rf {prefix}*image.fits {prefix}*residual.fits")
if weight == "briggs":
weight += " " + str(robust)
wsclean_args = [
"-quiet",
"-scale " + str(cellsize) + "asec",
"-size " + str(imsize) + " " + str(imsize),
"-no-dirty",
"-gridder tuned-wgridder",
"-weight " + weight,
"-niter 10000",
"-mgain 0.85",
"-nmiter 5",
"-gain 0.1",
"-minuv-l " + str(minuv),
"-j " + str(ncpu),
"-abs-mem " + str(mem),
"-no-negative",
"-auto-mask " + str(threshold + 0.1),
"-auto-threshold " + str(threshold),
]
ngrid = int(ncpu / 2)
if ngrid > 1:
wsclean_args.append("-parallel-gridding " + str(ngrid))
################################################
# Creating and using solar mask
################################################
if use_solar_mask:
fits_mask = msname.split(".ms")[0] + "_solar-mask.fits"
if os.path.exists(fits_mask) == False:
logger.info(f"Creating solar mask of size: {mask_radius} arcmin.\n")
fits_mask = create_circular_mask(
msname, cellsize, imsize, mask_radius=mask_radius
)
if fits_mask != None and os.path.exists(fits_mask):
wsclean_args.append("-fits-mask " + fits_mask)
######################################
# Running imaging per channel range
######################################
final_image_list = []
final_model_list = []
final_residual_list = []
for i in range(len(chanrange_list)):
chanrange = chanrange_list[i]
per_chanrange_wsclean_args = copy.deepcopy(wsclean_args)
######################################
# Multiscale configuration
######################################
start_chan = int(chanrange.split(" ")[0])
end_chan = int(chanrange.split(" ")[-1])
chan_number = int((start_chan + end_chan) / 2)
multiscale_scales = calc_multiscale_scales(
msname, 3, chan_number=chan_number
)
per_chanrange_wsclean_args.append("-multiscale")
per_chanrange_wsclean_args.append("-multiscale-gain 0.1")
per_chanrange_wsclean_args.append(
"-multiscale-scales " + ",".join([str(s) for s in multiscale_scales])
)
per_chanrange_wsclean_args.append(
f"-multiscale-scale-bias {scale_bias_list[i]}"
)
if imsize >= 2048 and 4 * max(multiscale_scales) < 1024:
per_chanrange_wsclean_args.append("-parallel-deconvolution 1024")
###########################################################################
# Spectral variation is kept fixed at 10% level.
# Because selfcal is done along temporal axis, where variation matters most
###########################################################################
if len(scale_bias_list) > 1:
if min_tol_factor <= 0:
min_tol_factor = 1.0
nintervals, _ = get_optimal_image_interval(
msname,
chan_range=f"{chanrange.replace(' ',',')}",
temporal_tol_factor=float(min_tol_factor / 100.0),
spectral_tol_factor=0.1,
)
#####################################
# Spectral imaging configuration
#####################################
if nchan > 1:
per_chanrange_wsclean_args.append(f"-channels-out {nchan}")
per_chanrange_wsclean_args.append("-no-mf-weighting")
per_chanrange_wsclean_args.append("-join-channels")
#####################################
# Temporal imaging configuration
#####################################
if nintervals > 1:
per_chanrange_wsclean_args.append(f"-intervals-out {nintervals}")
logger.info(f"Spectral chunks: {nchan}, temporal chunks: {nintervals}.")
temp_prefix = f"{prefix}_chan_{chanrange.replace(' ','_')}"
per_chanrange_wsclean_args.append(f"-name {temp_prefix}")
per_chanrange_wsclean_args.append(f"-channel-range {chanrange}")
if use_previous_model:
previous_models = glob.glob(f"{temp_prefix}*model.fits")
if nchan > 1:
total_models_expected = (nchan + 1) * nintervals
else:
total_models_expected = (nchan) * nintervals
if len(previous_models) == total_models_expected:
per_chanrange_wsclean_args.append("-continue")
else:
os.system(f"rm -rf {temp_prefix}*")
wsclean_cmd = (
"wsclean " + " ".join(per_chanrange_wsclean_args) + " " + msname
)
logger.info(f"WSClean command: {wsclean_cmd}\n")
msg = run_wsclean(wsclean_cmd, "meerwsclean", verbose=False)
if msg != 0:
gc.collect()
logger.info(f"Imaging is not successful.\n")
else:
#####################################
# Analyzing images
#####################################
wsclean_files = {}
for suffix in ["image", "model", "residual"]:
files = glob.glob(temp_prefix + f"*MFS-{suffix}.fits")
if not files:
files = glob.glob(temp_prefix + f"*{suffix}.fits")
wsclean_files[suffix] = files
wsclean_images = wsclean_files["image"]
wsclean_models = wsclean_files["model"]
wsclean_residuals = wsclean_files["residual"]
final_image = (
temp_prefix.replace("present", f"{round_number}") + "_I_image.fits"
)
final_model = (
temp_prefix.replace("present", f"{round_number}") + "_I_model.fits"
)
final_residual = (
temp_prefix.replace("present", f"{round_number}")
+ "_I_residual.fits"
)
if len(wsclean_images) == 0:
print("No image is made.")
elif len(wsclean_images) == 1:
os.system(f"cp -r {wsclean_images[0]} {final_image}")
else:
final_image = make_timeavg_image(
wsclean_images, final_image, keep_wsclean_images=True
)
final_image_list.append(final_image)
if len(wsclean_models) == 1:
os.system(f"cp -r {wsclean_models[0]} {final_model}")
else:
final_model = make_timeavg_image(
wsclean_models, final_model, keep_wsclean_images=True
)
final_model_list.append(final_model)
if len(wsclean_residuals) == 1:
os.system(f"cp -r {wsclean_residuals[0]} {final_residual}")
else:
final_residual = make_timeavg_image(
wsclean_residuals, final_residual, keep_wsclean_images=True
)
final_residual_list.append(final_residual)
#########################################
# Restoring flags if applymode is calflag
#########################################
if applymode == "calflag":
with suppress_casa_output():
flags = flagmanager(vis=msname, mode="list")
keys = flags.keys()
for k in keys:
if k == "MS":
pass
else:
version = flags[0]["name"]
if "applycal" in version:
try:
with suppress_casa_output():
flagmanager(vis=msname, mode="restore", versionname=version)
flagmanager(vis=msname, mode="delete", versionname=version)
except:
pass
##########################################################################
# Final frequency averaged images for backup or calculating dynamic ranges
##########################################################################
final_image = prefix.replace("present", f"{round_number}") + "_I_image.fits"
final_model = prefix.replace("present", f"{round_number}") + "_I_model.fits"
final_residual = (
prefix.replace("present", f"{round_number}") + "_I_residual.fits"
)
if len(final_image_list) == 1:
os.system(f"mv {final_image_list[0]} {final_image}")
else:
final_image = make_freqavg_image(
final_image_list, final_image, keep_wsclean_images=False
)
if len(final_model_list) == 1:
os.system(f"mv {final_model_list[0]} {final_model}")
else:
final_model = make_freqavg_image(
final_model_list, final_model, keep_wsclean_images=False
)
if len(final_residual_list) == 1:
os.system(f"mv {final_residual_list[0]} {final_residual}")
else:
final_residual = make_freqavg_image(
final_residual_list, final_residual, keep_wsclean_images=False
)
os.system("rm -rf *psf.fits")
#####################################
# Calculating dynamic ranges
######################################
model_flux, rms_DR, rms = calc_dyn_range(
final_image,
final_model,
final_residual,
fits_mask=fits_mask,
)
if model_flux == 0:
gc.collect()
logger.info(f"No model flux.\n")
return 1, "", 0, 0, "", "", ""
#####################
# Perform calibration
#####################
bpass_caltable = prefix.replace("present", f"{round_number}") + ".gcal"
if os.path.exists(bpass_caltable):
os.system("rm -rf " + bpass_caltable)
logger.info(
f"bandpass(vis='{msname}',caltable='{bpass_caltable}',uvrange='{uvrange}',refant='{refant}',solint='{solint},10MHz',minsnr=1,solnorm=True)\n"
)
with suppress_casa_output():
bandpass(
vis=msname,
caltable=bpass_caltable,
uvrange=uvrange,
refant=refant,
minsnr=1,
solint=f"{solint},10MHz",
solnorm=True,
)
if os.path.exists(bpass_caltable) == False:
logger.info(f"No gain solutions are found.\n")
gc.collect()
return 2, "", 0, 0, "", "", ""
#########################################
# Flagging bad gains
#########################################
with suppress_casa_output():
flagdata(
vis=bpass_caltable, mode="rflag", datacolumn="CPARAM", flagbackup=False
)
tb = table()
tb.open(bpass_caltable, nomodify=False)
gain = tb.getcol("CPARAM")
if calmode == "p":
gain /= np.abs(gain)
flag = tb.getcol("FLAG")
gain[flag] = 1.0
pos = np.where(np.abs(gain) == 0.0)
gain[pos] = 1.0
flag *= False
tb.putcol("CPARAM", gain)
tb.putcol("FLAG", flag)
tb.flush()
tb.close()
logger.info(
f"applycal(vis={msname},gaintable=[{bpass_caltable}],interp=['linear,linearflag'],applymode='{applymode}',calwt=[False])\n"
)
with suppress_casa_output():
applycal(
vis=msname,
gaintable=[bpass_caltable],
interp=["linear,linearflag"],
applymode=applymode,
calwt=[False],
)
#####################################
# Flag zeros
#####################################
with suppress_casa_output():
flagdata(
vis=msname,
mode="clip",
clipzeros=True,
datacolumn="corrected",
flagbackup=False,
)
gc.collect()
return (
0,
bpass_caltable,
rms_DR,
rms,
final_image,
final_model,
final_residual,
)
except Exception as e:
traceback.print_exc()
return 3, "", 0, 0, "", "", ""
[docs]
def do_selfcal(
msname="",
workdir="",
selfcaldir="",
start_threshold=5,
end_threshold=3,
max_iter=100,
max_DR=1000,
min_iter=2,
DR_convegerence_frac=0.3,
uvrange="",
minuv=0,
solint="60s",
weight="briggs",
robust=0.0,
do_apcal=True,
min_tol_factor=1.0,
applymode="calonly",
solar_selfcal=True,
ncpu=-1,
mem=-1,
dry_run=False,
logfile="selfcal.log",
):
"""
Do selfcal iterations and use convergence rules to stop
Parameters
----------
msname : str
Name of the measurement set
workdir : str
Work directory
selfcaldir : str
Working directory
start_threshold : int, optional
Start CLEAN threhold
end_threshold : int, optional
End CLEAN threshold
max_iter : int, optional
Maximum numbers of selfcal iterations
max_DR : float, optional
Maximum dynamic range
min_iter : int, optional
Minimum numbers of seflcal iterations at different stages
DR_convegerence_frac : float, optional
Dynamic range fractional change to consider as converged
uvrange : str, optional
UV-range for calibration
minuv : float, optionial
Minimum UV-lambda to use in imaging
solint : str, optional
Solutions interval
weight : str, optional
Imaging weighting
robust : float, optional
Briggs weighting robust parameter (-1 to 1)
do_apcal : bool, optional
Perform ap-selfcal or not
min_tol_factor : float, optional
Minimum tolerable variation in temporal direction in percentage
applymode : str, optional
Solution apply mode
solar_selfcal : bool, optional
Whether is is solar selfcal or not
ncpu : int, optional
Number of CPU threads to use
mem : float, optional
Memory in GB to use
logfile : str, optional
Log file name
Returns
-------
int
Success message
str
Final caltable
"""
limit_threads(n_threads=ncpu)
from casatasks import split, flagdata, initweights, flagmanager
from casatools import msmetadata
if dry_run:
process = psutil.Process(os.getpid())
mem = round(process.memory_info().rss / 1024**3, 2) # in GB
return mem
sub_observer = None
logger, logfile = create_logger(
os.path.basename(logfile).split(".log")[0], logfile, verbose=False
)
if os.path.exists(f"{workdir}/jobname_password.npy") and logfile != None:
time.sleep(5)
jobname, password = np.load(
f"{workdir}/jobname_password.npy", allow_pickle=True
)
if os.path.exists(logfile):
sub_observer = init_logger(
"remotelogger_selfcal_{os.path.basename(msname).split('.ms')[0]}",
logfile,
jobname=jobname,
password=password,
)
try:
msname = os.path.abspath(msname.rstrip("/"))
selfcaldir = selfcaldir.rstrip("/")
os.makedirs(selfcaldir,exist_ok=True)
os.chdir(selfcaldir)
selfcalms = selfcaldir + "/selfcal_" + os.path.basename(msname)
if os.path.exists(selfcalms):
os.system("rm -rf " + selfcalms)
if os.path.exists(selfcalms + ".flagversions"):
os.system("rm -rf " + selfcalms + ".flagversions")
##############################
# Restoring any previous flags
##############################
with suppress_casa_output():
flags = flagmanager(vis=msname, mode="list")
keys = flags.keys()
for k in keys:
if k == "MS":
pass
else:
version = flags[0]["name"]
try:
with suppress_casa_output():
flagmanager(vis=msname, mode="restore", versionname=version)
flagmanager(vis=msname, mode="delete", versionname=version)
except:
pass
if os.path.exists(msname + ".flagversions"):
os.system("rm -rf " + msname + ".flagversions")
##############################
# Spliting corrected data
##############################
hascor = check_datacolumn_valid(msname, datacolumn="CORRECTED_DATA")
msmd = msmetadata()
msmd.open(msname)
scan = int(msmd.scannumbers()[0])
field = int(msmd.fieldsforscan(scan)[0])
msmd.close()
if hascor:
logger.info(f"Spliting corrected data to ms : {selfcalms}")
with suppress_casa_output():
split(
vis=msname,
field=str(field),
scan=str(scan),
outputvis=selfcalms,
datacolumn="corrected",
)
else:
logger.info(f"Spliting data to ms : {selfcalms}")
with suppress_casa_output():
split(
vis=msname,
field=str(field),
scan=str(scan),
outputvis=selfcalms,
datacolumn="data",
)
msname = selfcalms
##########################################
# Initiate proper weighting
##########################################
with suppress_casa_output():
flagdata(
vis=msname,
mode="clip",
clipzeros=True,
datacolumn="data",
flagbackup=False,
)
logger.info("Initiating weights ....")
with suppress_casa_output():
initweights(vis=msname, wtmode="ones", dowtsp=True)
############################################
# Imaging and calibration parameters
############################################
logger.info(f"Estimating imaging Parameters ...")
cellsize = calc_cellsize(msname, 3)
instrument_fov = calc_field_of_view(msname, FWHM=False)
fov = min(instrument_fov, 32 * 2 * 60) # 2 solar radii
imsize = int(fov / cellsize)
pow2 = np.ceil(np.log2(imsize)).astype("int")
possible_sizes = []
for p in range(pow2):
for k in [3, 5]:
possible_sizes.append(k * 2**p)
possible_sizes = np.sort(np.array(possible_sizes))
possible_sizes = possible_sizes[possible_sizes >= imsize]
imsize = max(1024, int(possible_sizes[0]))
unflagged_antenna_names, flag_frac_list = get_unflagged_antennas(msname)
refant = unflagged_antenna_names[0]
msmd = msmetadata()
msmd.open(msname)
msmd.close()
############################################
# Initiating selfcal Parameters
############################################
logger.info(f"Estimating self-calibration parameters...")
DR1 = 0.0
DR2 = 0.0
DR3 = 0.0
RMS1 = -1.0
RMS2 = -1.0
RMS3 = -1.0
num_iter = 0
num_iter_after_ap = 0
num_iter_fixed_sigma = 0
last_sigma_DR1 = 0
sigma_reduced_count = 0
calmode = "p"
threshold = start_threshold
last_round_gaintable = ""
use_previous_model = False
os.system("rm -rf *_selfcal_present*")
###########################################
# Starting selfcal loops
##########################################
while True:
##################################
# Selfcal round parameters
##################################
logger.info("######################################")
logger.info(
f"Selfcal iteration : "
+ str(num_iter)
+ ", Threshold: "
+ str(threshold)
+ ", Calibration mode: "
+ str(calmode)
)
msg, gaintable, dyn, rms, final_image, final_model, final_residual = (
single_selfcal_iteration(
msname,
logger,
selfcaldir,
cellsize,
imsize,
round_number=num_iter,
uvrange=uvrange,
minuv=minuv,
calmode=calmode,
solint=solint,
refant=str(refant),
applymode=applymode,
min_tol_factor=min_tol_factor,
threshold=threshold,
use_previous_model=use_previous_model,
weight=weight,
robust=robust,
use_solar_mask=solar_selfcal,
ncpu=ncpu,
mem=round(mem, 2),
)
)
if msg == 1:
if num_iter == 0:
logger.info(
f"No model flux is picked up in first round. Trying with lowest threshold.\n"
)
(
msg,
gaintable,
dyn,
rms,
final_image,
final_model,
final_residual,
) = single_selfcal_iteration(
msname,
logger,
selfcaldir,
cellsize,
imsize,
round_number=num_iter,
uvrange=uvrange,
minuv=minuv,
calmode=calmode,
solint=solint,
refant=str(refant),
applymode=applymode,
min_tol_factor=min_tol_factor,
threshold=end_threshold,
use_previous_model=False,
weight=weight,
robust=robust,
use_solar_mask=solar_selfcal,
ncpu=ncpu,
mem=round(mem, 2),
)
if msg == 1:
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return msg, []
else:
threshold = end_threshold
else:
os.system("rm -rf *_selfcal_present*")
return msg, []
if msg == 2:
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return msg, []
if num_iter == 0:
DR1 = DR3 = DR2 = dyn
RMS1 = RMS2 = RMS3 = rms
elif num_iter == 1:
DR3 = dyn
RMS2 = RMS1
RMS1 = rms
else:
DR1 = DR2
DR2 = DR3
DR3 = dyn
RMS3 = RMS2
RMS2 = RMS1
RMS1 = rms
logger.info(
f"RMS based dynamic ranges: "
+ str(DR1)
+ ","
+ str(DR2)
+ ","
+ str(DR3)
)
logger.info(
f"RMS of the images: " + str(RMS1) + "," + str(RMS2) + "," + str(RMS3)
)
if DR3 > 0.9 * DR2:
use_previous_model = True
else:
use_previous_model = False
#####################
# If DR is decreasing
#####################
if (
(DR3 < 0.85 * DR2 and DR3 < 0.9 * DR1 and DR2 > DR1)
and calmode == "p"
and num_iter > min_iter
):
logger.info(f"Dynamic range decreasing in phase-only self-cal.")
if do_apcal:
logger.info(f"Changed calmode to 'ap'.")
calmode = "ap"
if threshold > end_threshold and num_iter_fixed_sigma > min_iter:
threshold -= 1
sigma_reduced_count += 1
num_iter_fixed_sigma = 0
else:
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return 0, last_round_gaintable
elif (
(DR3 < 0.9 * DR2 and DR2 > 1.5 * DR1)
and calmode == "ap"
and num_iter_after_ap > min_iter
):
logger.info(
f"Dynamic range is decreasing after minimum numbers of 'ap' rounds.\n"
)
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return 0, last_round_gaintable
###########################
# If maximum DR has reached
###########################
if DR3 >= max_DR and num_iter_after_ap > min_iter:
logger.info(f"Maximum dynamic range is reached.\n")
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return 0, gaintable
###########################
# Checking DR convergence
###########################
# Condition 1
###########################
if (
((do_apcal and calmode == "ap") or do_apcal == False)
and num_iter_fixed_sigma > min_iter
and (
last_sigma_DR1 > 0
and abs(round(np.nanmedian([DR1, DR2, DR3]), 0) - last_sigma_DR1)
/ last_sigma_DR1
< DR_convegerence_frac
)
and sigma_reduced_count > 1
):
if threshold > end_threshold:
logger.info(
f"DR does not increase over last two changes in threshold, but minimum threshold has not reached yet.\n"
)
logger.info(
f"Starting final self-calibration rounds with threshold = "
+ str(end_threshold)
+ "sigma...\n"
)
threshold = end_threshold
sigma_reduced_count += 1
num_iter_fixed_sigma = 0
continue
else:
logger.info(
f"Selfcal converged. DR does not increase over last two changes in threshold.\n"
)
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return 0, gaintable
###############
# Condition 2
###############
else:
if (
abs(DR1 - DR2) / DR2 < DR_convegerence_frac
and num_iter > min_iter
and threshold == end_threshold + 1
):
if do_apcal and calmode == "p":
logger.info(
f"Dynamic range converged. Changing calmode to 'ap'.\n"
)
calmode = "ap"
if num_iter_fixed_sigma > min_iter:
threshold -= 1
sigma_reduced_count += 1
num_iter_fixed_sigma = 0
elif (
do_apcal and num_iter_after_ap > min_iter
) or do_apcal == False:
logger.info(f"Self-calibration has converged.\n")
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return 0, gaintable
elif (
abs(DR1 - DR2) / DR2 < DR_convegerence_frac
and threshold > end_threshold
and num_iter_fixed_sigma > min_iter
):
threshold -= 1
logger.info(f"Reducing threshold to : " + str(threshold))
sigma_reduced_count += 1
num_iter_fixed_sigma = 0
if last_sigma_DR1 > 0:
last_sigma_DR1 = round(np.nanmean([DR1, DR2, DR3]), 0)
else:
last_sigma_DR1 = round(np.nanmean([DR1, DR2, DR3]), 0)
elif (
(
(do_apcal and calmode == "ap" and num_iter_after_ap > min_iter)
or (do_apcal == False and num_iter > min_iter)
)
and abs(DR1 - DR2) / DR2 < DR_convegerence_frac
and threshold <= end_threshold
) or (
(do_apcal == False or (do_apcal and calmode == "ap"))
and num_iter == max_iter
):
logger.info(
f"Self-calibration is finished. Maximum iteration is reached.\n"
)
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return 0, gaintable
num_iter += 1
last_round_gaintable = gaintable
if calmode == "ap":
num_iter_after_ap += 1
num_iter_fixed_sigma += 1
except Exception as e:
traceback.print_exc()
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return 1, []
finally:
time.sleep(5)
drop_cache(msname)
[docs]
def main():
starttime = time.time()
parser = argparse.ArgumentParser(description="Self-calibration",formatter_class=SmartDefaultsHelpFormatter)
## Essential parameters
basic_args = parser.add_argument_group(
"###################\nEssential parameters\n###################"
)
basic_args.add_argument(
"mslist",
type=str,
help="Comma-separated list of measurement sets (required positional argument)",
)
basic_args.add_argument(
"--workdir", type=str, default="", required=True, help="Working directory",
)
basic_args.add_argument(
"--caldir", type=str, default="", required=True, help="Caltable directory",
)
## Advanced parameters
adv_args = parser.add_argument_group(
"###################\nAdvanced calibration and imaging parameters\n###################"
)
adv_args.add_argument(
"--start_thresh",
type=float,
default=5,
help="Starting CLEANing threshold",
metavar="Float",
)
adv_args.add_argument(
"--stop_thresh",
type=float,
default=3,
help="Stop CLEANing threshold",
metavar="Float",
)
adv_args.add_argument(
"--max_iter",
type=int,
default=100,
help="Maximum number of selfcal iterations",
metavar="Integer",
)
adv_args.add_argument(
"--max_DR",
type=float,
default=1000,
help="Maximum dynamic range",
metavar="Float",
)
adv_args.add_argument(
"--min_iter",
type=int,
default=2,
help="Minimum number of selfcal iterations",
metavar="Integer",
)
adv_args.add_argument(
"--conv_frac",
type=float,
default=0.3,
help="Fractional change in DR to determine convergence",
metavar="Float",
)
adv_args.add_argument(
"--solint", type=str, default="60s", help="Solution interval"
)
adv_args.add_argument(
"--uvrange",
type=str,
default="",
help="Calibration UV-range (CASA format)",
)
adv_args.add_argument(
"--minuv",
type=float,
default=0,
help="Minimum UV-lambda used for imaging",
metavar="Float",
)
adv_args.add_argument(
"--weight", type=str, default="briggs", help="Imaging weight"
)
adv_args.add_argument(
"--robust",
type=float,
default=0.0,
help="Robust parameter for briggs weight",
metavar="Float",
)
adv_args.add_argument(
"--applymode",
type=str,
default="calonly",
help="Solution apply mode",
metavar="String",
)
adv_args.add_argument(
"--min_tol_factor",
type=float,
default=1.0,
help="Minimum tolerable variation in temporal direction in percentage",
metavar="Float",
)
adv_args.add_argument("--no_apcal", action="store_false", dest="do_apcal", help="Do not perform ap-selfcal")
adv_args.add_argument(
"--no_solar_selfcal", action="store_false", dest="solar_selfcal", help="Do not perform solar self-calibration"
)
adv_args.add_argument(
"--keep_backup",
action="store_true",
help="Keep backup of self-calibration rounds",
)
## Resource management parameters
hard_args = parser.add_argument_group(
"###################\nHardware resource management parameters\n###################"
)
hard_args.add_argument(
"--cpu_frac",
type=float,
default=0.8,
help="CPU fraction to use",
metavar="Float",
)
hard_args.add_argument(
"--mem_frac",
type=float,
default=0.8,
help="Memory fraction to use",
metavar="Float",
)
hard_args.add_argument(
"--jobid", type=int, default=0, help="Job ID"
)
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
sys.exit(1)
args = parser.parse_args()
pid = os.getpid()
save_pid(pid, datadir + f"/pids/pids_{args.jobid}.txt")
if args.workdir == "" or os.path.exists(args.workdir) == False:
workdir = os.path.dirname(os.path.abspath(args.msname)) + "/workdir"
else:
workdir = args.workdir
os.makedirs(workdir,exist_ok=True)
if args.caldir=="" or not os.path.exists(args.caldir):
caldir=f"{workdir}/caltables"
else:
caldir=args.caldir
os.makedirs(caldir,exist_ok=True)
os.makedirs(workdir + "/logs",exist_ok=True)
mainlog_file = workdir + "/logs/selfcal_targets.mainlog"
mainlogger, mainlog_file = create_logger(
os.path.basename(mainlog_file).split(".mainlog")[0], mainlog_file, verbose=False
)
observer = None
if os.path.exists(f"{workdir}/jobname_password.npy") and mainlog_file != None:
time.sleep(5)
jobname, password = np.load(
f"{workdir}/jobname_password.npy", allow_pickle=True
)
if os.path.exists(mainlog_file):
observer = init_logger(
"all_selfcal", mainlog_file, jobname=jobname, password=password
)
###########################
# WSClean container
###########################
container_name = "meerwsclean"
container_present = check_udocker_container(container_name)
if container_present == False:
container_name = initialize_wsclean_container(name=container_name)
if container_name == None:
print(
"Container {container_name} is not initiated. First initiate container and then run."
)
return 1
mslist = str(args.mslist).split(",")
try:
if len(mslist) == 0:
mainlogger.info("Please provide at-least one measurement set.")
msg = 1
else:
task = delayed(do_selfcal)(dry_run=True)
mem_limit = run_limited_memory_task(task, dask_dir=args.workdir)
partial_do_selfcal = partial(
do_selfcal,
start_threshold=float(args.start_thresh),
end_threshold=float(args.stop_thresh),
max_iter=int(args.max_iter),
max_DR=float(args.max_DR),
min_iter=int(args.min_iter),
DR_convegerence_frac=float(args.conv_frac),
uvrange=str(args.uvrange),
minuv=float(args.minuv),
solint=str(args.solint),
weight=str(args.weight),
robust=float(args.robust),
do_apcal=args.do_apcal,
applymode=args.applymode,
min_tol_factor=float(args.min_tol_factor),
solar_selfcal=args.solar_selfcal,
)
####################################
# Filtering any corrupted ms
#####################################
filtered_mslist = [] # Filtering in case any ms is corrupted
for ms in mslist:
checkcol = check_datacolumn_valid(ms)
if checkcol:
filtered_mslist.append(ms)
else:
mainlogger.info(f"Issue in : {ms}")
os.system("rm -rf {ms}")
mslist = filtered_mslist
chanlist = []
for ms in mslist:
channame = (
os.path.basename(ms)
.split(".ms")[0]
.split("spw_")[-1]
.split("_time")[0]
)
if channame not in chanlist:
chanlist.append(channame)
available_mem = psutil.virtual_memory().available / 1024**3
if (mem_limit / 0.6) < 4 and available_mem > 4:
min_mem_per_job = 4
else:
min_mem_per_job = mem_limit / 0.6
######################################
# Resetting maximum file limit
######################################
soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
new_soft_limit = max(soft_limit, int(0.8 * hard_limit))
if soft_limit < new_soft_limit:
resource.setrlimit(
resource.RLIMIT_NOFILE, (new_soft_limit, hard_limit)
)
num_fd_list = []
for ms in mslist:
msmd = msmetadata()
msmd.open(ms)
times = msmd.timesforspws(0)
timeres = np.diff(times)
pos = np.where(timeres > 3 * np.nanmedian(timeres))[0]
max_intervals = min(1, len(pos))
freqs = msmd.chanfreqs(0, unit="MHz")
freqres = np.diff(freqs)
pos = np.where(freqres > 3 * np.nanmedian(freqres))[0]
max_nchan = min(1, len(pos))
msmd.close()
per_job_fd = (
(max_nchan + 1) * max_intervals * 4 * 2
) # 4 types of images, 2 is fudge factor
num_fd_list.append(per_job_fd)
total_fd = max(num_fd_list) * len(mslist)
n_jobs = max(1, int(new_soft_limit / total_fd))
n_jobs = min(len(mslist), n_jobs)
dask_client, dask_cluster, n_jobs, n_threads, mem_limit = (
get_dask_client(
n_jobs,
dask_dir=args.workdir,
cpu_frac=float(args.cpu_frac),
mem_frac=float(args.mem_frac),
min_cpu_per_job=3,
min_mem_per_job=min_mem_per_job,
)
)
tasks = []
for ms in mslist:
logfile = (
args.workdir
+ "/logs/"
+ os.path.basename(ms).split(".ms")[0]
+ "_selfcal.log"
)
mainlogger.info(f"MS name: {ms}, Log file: {logfile}\n")
tasks.append(
delayed(partial_do_selfcal)(
ms,
args.workdir,
args.workdir
+ "/"
+ os.path.basename(ms).split(".ms")[0]
+ "_selfcal",
ncpu=n_threads,
mem=mem_limit,
logfile=logfile,
)
)
results = compute(*tasks)
dask_client.close()
dask_cluster.close()
gcal_list = []
for i in range(len(results)):
r = results[i]
msg = r[0]
if msg != 0:
mainlogger.info(
f"Self-calibration was not successful for ms: {mslist[i]}."
)
else:
gcal = r[1]
tb = table()
tb.open(gcal)
scan = np.unique(tb.getcol("SCAN_NUMBER"))[0]
tb.close()
final_gain_caltable = caldir + f"/selfcal_scan_{scan}.gcal"
os.system(f"cp -r {gcal} {final_gain_caltable}")
gcal_list.append(final_gain_caltable)
if args.keep_backup == False:
for ms in mslist:
selfcaldir = (
args.workdir
+ "/"
+ os.path.basename(ms).split(".ms")[0]
+ "_selfcal"
)
os.system("rm -rf " + selfcaldir)
if len(gcal_list) > 0:
mainlogger.info(f"Final selfcal caltables: {gcal_list}")
mainlogger.info("################################################")
mainlogger.info(
f"Total time taken: {round(time.time()-starttime,2)}s"
)
mainlogger.info("################################################")
msg = 0
else:
mainlogger.info("No self-calibration is successful.")
mainlogger.info("################################################")
mainlogger.info(
f"Total time taken: {round(time.time()-starttime,2)}s"
)
mainlogger.info("################################################")
msg = 1
except Exception as e:
traceback.print_exc()
msg = 1
finally:
time.sleep(5)
for ms in mslist:
drop_cache(ms)
drop_cache(workdir)
clean_shutdown(observer)
return msg
if __name__ == "__main__":
result = main()
if result > 0:
result = 1
print("\n###################\nSelf-calibration is done.\n###################\n")
os._exit(result)