import os, glob, psutil, copy, time, traceback, argparse, socket
from multiprocessing import Process, Event
from meersolar.pipeline.basic_func import *
from meersolar.pipeline.init_data import init_meersolar_data
from meersolar.data.sendmail import send_paircars_notification as send_notification
from casatasks import casalog
try:
logfile = casalog.logfile()
os.system("rm -rf " + logfile)
except:
pass
[docs]
def run_flag(
msname, workdir, flag_calibrators=True, jobid=0, cpu_frac=0.8, mem_frac=0.8
):
"""
Run flagging jobs
Parameters
----------
msname: str
Name of the measurement set
workdir : str
Working directory
flag_calibrators : bool, optional
Flag calibrator fields
jobid : int, optional
Job ID
cpu_frac : float, optional
CPU fraction to use
mem_frac : float, optional
Memory fraction to use
Returns
-------
int
Success message
"""
print("###########################")
print("Flagging ....")
print("###########################\n")
########################
# Calibrator ms flagging
########################
msname = msname.rstrip("/")
if flag_calibrators:
flagfield_type = "cal"
flagging_cmd = (
f"run_flag {msname}"
+ " --datacolumn DATA --use_tfcrop"
+ " --cpu_frac "
+ str(cpu_frac)
+ " --mem_frac "
+ str(mem_frac)
+ " --workdir "
+ str(workdir)
+ " --jobid "
+ str(jobid)
)
else:
flagfield_type = "target"
flagging_cmd = (
f"run_flag {msname}"
+ " --datacolumn DATA --use_tfcrop --flagdimension freq"
+ " --cpu_frac "
+ str(cpu_frac)
+ " --mem_frac "
+ str(mem_frac)
+ " --workdir "
+ str(workdir)
+ " --jobid "
+ str(jobid)
)
flag_basename = (
f"flagging_{flagfield_type}_" + os.path.basename(msname).split(".ms")[0]
)
logfile = workdir + "/logs/" + flag_basename + ".log"
flagging_cmd += f" --logfile {logfile}"
os.makedirs(workdir + "/logs",exist_ok=True)
batch_file = create_batch_script_nonhpc(flagging_cmd, workdir, flag_basename)
print(flagging_cmd + "\n")
os.system("bash " + batch_file)
print("Waiting to finish flagging...\n")
while True:
finished_file = glob.glob(workdir + "/.Finished_" + flag_basename + "*")
if len(finished_file) > 0:
break
else:
time.sleep(1)
success_index = int(finished_file[0].split("_")[-1])
if success_index == 0:
print("Initial flagging is done successfully.\n")
return 0
else:
print("Initial flagging is not done successfully.\n")
return 1
[docs]
def run_import_model(msname, workdir, jobid=0, cpu_frac=0.8, mem_frac=0.8):
"""
Importing calibrator models
Parameters
----------
msname: str
Name of the measurement set
workdir : str
Working directory
cpu_frac : float, optional
CPU fraction to use
mem_frac : float, optional
Memory fraction to use
Returns
-------
int
Success message
"""
print("###########################")
print("Importing model of calibrators....")
print("###########################\n")
msname = msname.rstrip("/")
import_model_cmd = (
f"import_model {msname}"
+ " --workdir "
+ str(workdir)
+ " --cpu_frac "
+ str(cpu_frac)
+ " --mem_frac "
+ str(mem_frac)
+ " --jobid "
+ str(jobid)
)
model_basename = "modeling_" + os.path.basename(msname).split(".ms")[0]
logfile = workdir + "/logs/" + model_basename + ".log"
import_model_cmd += f" --logfile {logfile}"
os.makedirs(workdir + "/logs",exist_ok=True)
batch_file = create_batch_script_nonhpc(import_model_cmd, workdir, model_basename)
print(import_model_cmd + "\n")
os.system("bash " + batch_file)
print("Waiting to finish visibility simulation for calibrators...\n")
while True:
finished_file = glob.glob(workdir + "/.Finished_" + model_basename + "*")
if len(finished_file) > 0:
break
else:
time.sleep(1)
success_index = int(finished_file[0].split("_")[-1])
if success_index == 0:
print("Visibility simulation for calibrator fields are done successfully.\n")
return 0
else:
print(
"Visibility simulation for calibrator fields are not done successfully.\n"
)
return 1
[docs]
def run_basic_cal_jobs(
msname,
workdir,
caldir,
perform_polcal=False,
jobid=0,
cpu_frac=0.8,
mem_frac=0.8,
keep_backup=False,
):
"""
Perform basic calibration
Parameters
----------
msname: str
Name of the measurement set
workdir : str
Working directory
caldir : str
Caltable directory
perform_polcal : bool, optional
Perform full polarization calibration
cpu_frac : float, optional
CPU fraction to use
mem_frac : float, optional
Memory fraction to use
keep_backup : bool, optional
Keep backups
Returns
-------
int
Success message for basic calibration
"""
print("###########################")
print("Performing basic calibration .....")
print("###########################\n")
msname = msname.rstrip("/")
cal_basename = "basic_cal"
basic_cal_cmd = (
f"run_basic_cal {msname}"
+ " --workdir "
+ workdir
+ " --caldir "
+ caldir
+ " --cpu_frac "
+ str(cpu_frac)
+ " --mem_frac "
+ str(mem_frac)
+ " --jobid "
+ str(jobid)
)
if perform_polcal:
basic_cal_cmd += " --perform_polcal"
if keep_backup:
basic_cal_cmd += " --keep_backup"
logfile = workdir + "/logs/" + cal_basename + ".log"
basic_cal_cmd += f" --logfile {logfile}"
os.makedirs(workdir + "/logs",exist_ok=True)
batch_file = create_batch_script_nonhpc(basic_cal_cmd, workdir, cal_basename)
print(basic_cal_cmd + "\n")
os.system("bash " + batch_file)
print("Waiting to finish calibration...\n")
while True:
finished_file = glob.glob(workdir + "/.Finished_" + cal_basename + "*")
if len(finished_file) > 0:
break
else:
time.sleep(1)
success_index_cal = int(finished_file[0].split("_")[-1])
if success_index_cal == 0:
print("Basic calibration is done successfully.\n")
else:
print("Basic calibration is unsuccessful.\n")
return success_index_cal
[docs]
def run_noise_diode_cal(
msname, workdir, caldir, keep_backup=False, jobid=0, cpu_frac=0.8, mem_frac=0.8
):
"""
Perform noise diode based flux calibration
Parameters
----------
msname: str
Name of the measurement set
workdir : str
Working directory
caldir : str
Caltable directory
keep_backup : bool, optional
Keep backup
cpu_frac : float, optional
CPU fraction to use
mem_frac : float, optional
Memory fraction to use
Returns
-------
int
Success message for noise diode based flux calibration
"""
try:
print("###########################")
print("Performing noise diode based flux calibration .....")
print("###########################\n")
msname = msname.rstrip("/")
noisecal_basename = "noise_cal"
noise_cal_cmd = (
f"run_fluxcal {msname}"
+ " --workdir "
+ workdir
+ " --caldir "
+ caldir
+ " --cpu_frac "
+ str(cpu_frac)
+ " --mem_frac "
+ str(mem_frac)
+ " --jobid "
+ str(jobid)
)
logfile = workdir + "/logs/" + noisecal_basename + ".log"
noise_cal_cmd += f" --logfile {logfile}"
os.makedirs(workdir + "/logs",exist_ok=True)
batch_file = create_batch_script_nonhpc(
noise_cal_cmd, workdir, noisecal_basename
)
print(noise_cal_cmd + "\n")
os.system("bash " + batch_file)
print("Waiting to finish noise-diode based flux calibration...\n")
while True:
finished_file = glob.glob(workdir + "/.Finished_" + noisecal_basename + "*")
if len(finished_file) > 0:
break
else:
time.sleep(1)
success_index_noisecal = int(finished_file[0].split("_")[-1])
if success_index_noisecal == 0:
print("Noise-diode based flux-calibration is done successfully.\n")
return 0
else:
return 1
except Exception as e:
traceback.print_exc()
return 1
[docs]
def run_partion(
msname, workdir, split_fullpol=False, jobid=0, cpu_frac=0.8, mem_frac=0.8
):
"""
Perform basic calibration
Parameters
----------
msname: str
Name of the measurement set
workdir : str
Working directory
split_fullpol : bool, optional
Perform full polar split
cpu_frac : float, optional
CPU fraction to use
mem_frac : float, optional
Memory fraction to use
Returns
-------
int
Success message
"""
print("###########################")
print("Partitioning measurement set ...")
print("###########################\n")
msname = msname.rstrip("/")
msmd = msmetadata()
msmd.open(msname)
nchan = msmd.nchan(0)
times = msmd.timesforfield(0)
msmd.close()
if len(times) == 1:
timeres = msmd.exposuretime(scan)["value"]
else:
timeres = times[1] - times[0]
if nchan > 1024:
width = int(nchan / 1024)
if width < 1:
width = 1
else:
width = 1
if timeres < 8:
timebin = "8s"
else:
timebin = ""
target_scans, cal_scans, f_scans, g_scans, p_scans = get_cal_target_scans(msname)
cal_scans_copy = copy.deepcopy(cal_scans)
for s in cal_scans:
noise_cal_scan = determine_noise_diode_cal_scan(msname, s)
if noise_cal_scan:
print(f"Removing noise-diode scan: {s}")
cal_scans_copy.remove(s)
cal_scans = copy.deepcopy(cal_scans_copy)
cal_scans = ",".join([str(s) for s in cal_scans])
calibrator_ms = workdir + "/calibrator.ms"
split_cmd = f"run_partition {msname} --outputms {calibrator_ms} --scans {cal_scans} --timebin {timebin} --width {width} --cpu_frac {cpu_frac} --mem_frac {mem_frac} --workdir {workdir} --jobid {jobid}"
if split_fullpol:
split_cmd += " --split_fullpol"
####################################
# Partition fields
####################################
print("\n########################################")
partition_basename = f"partition_cal"
logfile = workdir + "/logs/" + partition_basename + ".log"
split_cmd += f" --logfile {logfile}"
os.makedirs(workdir + "/logs",exist_ok=True)
batch_file = create_batch_script_nonhpc(split_cmd, workdir, partition_basename)
print(split_cmd + "\n")
os.system("bash " + batch_file)
print("Waiting to finish partitioning...\n")
while True:
finished_file = glob.glob(workdir + "/.Finished_" + partition_basename + "*")
if len(finished_file) > 0:
break
else:
time.sleep(1)
print("Partitioning is finished.\n")
if os.path.exists(calibrator_ms):
print(f"Calibrator ms: {calibrator_ms}")
return 0
else:
print(f"Calibrator fields could not be partitioned.")
return 1
[docs]
def run_target_split_jobs(
msname,
workdir,
datacolumn="data",
spw="",
timeres=-1,
freqres=-1,
target_freq_chunk=-1,
n_spectral_chunk=-1,
target_scans=[],
prefix="targets",
split_fullpol=False,
merge_spws=False,
time_window=-1,
time_interval=-1,
jobid=0,
cpu_frac=0.8,
mem_frac=0.8,
max_cpu_frac=0.8,
max_mem_frac=0.8,
):
"""
Split target scans
Parameters
----------
msname: str
Name of the measurement set
workdir : str
Working directory
datacolumn : str, optional
Data column
spw : str, optional
Spectral window to split
timeres : float, optional
Time bin to average in seconds
freqres : float, optional
Frequency averaging in MHz
target_freq_chunk : float, optional
Target frequency chunk in MHz
n_spectral_chunk : int, optional
Number of spectral chunks to split
target_scans : list, optional
Target scans
prefix : str, optional
Prefix of splited targets
split_fullpol : bool, optional
Split full polar data or not
merge_spws : bool, optional
Merge spectral windows
time_window : float, optional
Time window in seconds
time_interval : float, optional
Time interval in seconds
cpu_frac : float, optional
CPU fraction to use
mem_frac : float, optional
Memory fraction to use
Returns
-------
int
Success message for spliting target scans
"""
try:
print("###########################")
print("spliting target scans .....")
print("###########################\n")
msname = msname.rstrip("/")
split_basename = f"split_{prefix}"
split_cmd = (
f"run_target_split {msname}"
+ " --workdir "
+ workdir
+ " --datacolumn "
+ datacolumn
+ " --freqres "
+ str(freqres)
+ " --timeres "
+ str(timeres)
+ " --cpu_frac "
+ str(cpu_frac)
+ " --mem_frac "
+ str(mem_frac)
+ " --spectral_chunk "
+ str(target_freq_chunk)
+ " --max_cpu_frac "
+ str(max_cpu_frac)
+ " --max_mem_frac "
+ str(max_mem_frac)
+ " --n_spectral_chunk "
+ str(n_spectral_chunk)
+ " --prefix "
+ str(prefix)
+ " --time_window "
+ str(time_window)
+ " --time_interval "
+ str(time_interval)
+ " --jobid "
+ str(jobid)
)
if split_fullpol:
split_cmd += " --split_fullpol"
if merge_spws:
split_cmd += " --merge_spws"
if spw != "":
split_cmd = split_cmd + " --spw " + str(spw)
if len(target_scans) > 0:
split_cmd = (
split_cmd + " --scans " + ",".join([str(s) for s in target_scans])
)
logfile = workdir + "/logs/" + split_basename + ".log"
split_cmd += f" --logfile {logfile}"
os.makedirs(workdir + "/logs",exist_ok=True)
batch_file = create_batch_script_nonhpc(split_cmd, workdir, split_basename)
print(split_cmd + "\n")
os.system("bash " + batch_file)
return 0
except Exception as e:
traceback.print_exc()
return 1
[docs]
def run_solar_siderealcor_jobs(
mslist,
workdir,
prefix="targets",
jobid=0,
cpu_frac=0.8,
mem_frac=0.8,
max_cpu_frac=0.8,
max_mem_frac=0.8,
):
"""
Apply sidereal motion correction of the Sun
Parameters
----------
mslist: str
List of the measurement sets
workdir : str
Work directory
prefix : str, optional
Measurement set prefix
cpu_frac : float, optional
CPU fraction to use
mem_frac : float, optional
Memory fraction to use
max_cpu_frac : float, optional
Maximum CPU fraction to use
max_mem_frac : float, optional
Maximum memory fraction to use
Returns
-------
int
Success message
"""
try:
print("###########################")
print("Correcting sidereal motion .....")
print("###########################\n")
mslist = ",".join(mslist)
sidereal_basename = f"cor_sidereal_{prefix}"
sidereal_cor_cmd = (
f"run_solar_siderealcor {mslist}"
+ " --workdir "
+ str(workdir)
+ " --cpu_frac "
+ str(cpu_frac)
+ " --mem_frac "
+ str(mem_frac)
+ " --max_cpu_frac "
+ str(max_cpu_frac)
+ " --max_mem_frac "
+ str(max_mem_frac)
+ " --jobid "
+ str(jobid)
)
os.makedirs(workdir + "/logs",exist_ok=True)
logfile = workdir + "/logs/" + sidereal_basename + ".log"
sidereal_cor_cmd += f" --logfile {logfile}"
batch_file = create_batch_script_nonhpc(
sidereal_cor_cmd, workdir, sidereal_basename
)
print(sidereal_cor_cmd + "\n")
os.system("bash " + batch_file)
print("Waiting to finish sidereal correction...\n")
while True:
finished_file = glob.glob(workdir + "/.Finished_" + sidereal_basename + "*")
if len(finished_file) > 0:
break
else:
time.sleep(1)
success_index_cal = int(finished_file[0].split("_")[-1])
if success_index_cal == 0:
print("Sidereal motion correction is done successfully.\n")
else:
print("Sidereal motion correction is unsuccessful.\n")
return success_index_cal
except Exception as e:
traceback.print_exc()
return 1
[docs]
def run_apply_pbcor(
imagedir,
workdir,
apply_parang=True,
jobid=0,
cpu_frac=0.8,
mem_frac=0.8,
):
"""
Apply primary beam corrections on all images
Parameters
----------
imagedir: str
Image directory name
workdir : str
Work directory
apply_parang : bool, optional
Apply parallactic angle correction
cpu_frac : float, optional
CPU fraction to use
mem_frac : float, optional
Memory fraction to use
Returns
-------
int
Success message for applying primary beam correction on all images
"""
try:
print("###########################")
print("Applying primary beam corrections on all images .....")
print("###########################\n")
applypbcor_basename = "apply_pbcor"
applypbcor_cmd = (
f"run_meerpbcor {imagedir}"
+ " --workdir "
+ str(workdir)
+ " --cpu_frac "
+ str(cpu_frac)
+ " --mem_frac "
+ str(mem_frac)
+ " --jobid "
+ str(jobid)
)
if apply_parang == False:
applypbcor_cmd += " --no_apply_parang"
os.makedirs(workdir + "/logs",exist_ok=True)
logfile = workdir + "/logs/" + applypbcor_basename + ".log"
applypbcor_cmd += f" --logfile {logfile}"
batch_file = create_batch_script_nonhpc(
applypbcor_cmd, workdir, applypbcor_basename
)
print(applypbcor_cmd + "\n")
os.system("bash " + batch_file)
print("Waiting to finish apply primary beam correction...\n")
while True:
finished_file = glob.glob(
workdir + "/.Finished_" + applypbcor_basename + "*"
)
if len(finished_file) > 0:
break
else:
time.sleep(1)
success_index_cal = int(finished_file[0].split("_")[-1])
if success_index_cal == 0:
print("Applying primary beam correction is done successfully.\n")
else:
print("Applying primary beam correction is unsuccessful.\n")
return success_index_cal
except Exception as e:
traceback.print_exc()
return 1
[docs]
def run_apply_basiccal_sol(
target_mslist,
workdir,
caldir,
use_only_bandpass=False,
overwrite_datacolumn=True,
applymode="calflag",
jobid=0,
cpu_frac=0.8,
mem_frac=0.8,
):
"""
Apply basic calibration solutions on splited target scans
Parameters
----------
target_mslist: list
Target measurement set list
workdir : str
Working directory
caldir : str
Caltable directory
use_only_bandpass : bool
Use only bandpass solutions
applymode : str, optional
Applycal mode
cpu_frac : float, optional
CPU fraction to use
mem_frac : float, optional
Memory fraction to use
overwrite_datacolumn : bool
Overwrite data column or not
Returns
-------
int
Success message for applying calibration solutions and spliting target scans
"""
try:
print("###########################")
print("Applying basic calibration solutions on target scans .....")
print("###########################\n")
applycal_basename = "apply_basiccal"
mslist = ",".join(target_mslist)
applycal_cmd = (
f"run_apply_basiccal {mslist}"
+ " --workdir "
+ workdir
+ " --caldir "
+ caldir
+ " --cpu_frac "
+ str(cpu_frac)
+ " --mem_frac "
+ str(mem_frac)
+ " --applymode "
+ str(applymode)
+ " --jobid "
+ str(jobid)
)
if use_only_bandpass:
applycal_cmd += " --use_only_bandpass"
if overwrite_datacolumn:
applycal_cmd += " --overwrite_datacolumn"
os.makedirs(workdir + "/logs",exist_ok=True)
logfile = workdir + "/logs/" + applycal_basename + ".log"
applycal_cmd += f" --logfile {logfile}"
batch_file = create_batch_script_nonhpc(
applycal_cmd, workdir, applycal_basename
)
print(applycal_cmd + "\n")
os.system("bash " + batch_file)
print("Waiting to finish applycal...\n")
while True:
finished_file = glob.glob(workdir + "/.Finished_" + applycal_basename + "*")
if len(finished_file) > 0:
break
else:
time.sleep(1)
success_index_cal = int(finished_file[0].split("_")[-1])
if success_index_cal == 0:
print("Applying basic calibration is done successfully.\n")
else:
print("Applying basic calibration is unsuccessful.\n")
return success_index_cal
except Exception as e:
traceback.print_exc()
return 1
[docs]
def run_apply_selfcal_sol(
target_mslist,
workdir,
caldir,
overwrite_datacolumn=True,
applymode="calflag",
jobid=0,
cpu_frac=0.8,
mem_frac=0.8,
):
"""
Apply self-calibration solutions on splited target scans
Parameters
----------
target_mslist: list
Target measurement set list
workdir : str
Working directory
caldir : str
Caltable directory
use_only_bandpass : bool
Use only bandpass solutions
applymode : str, optional
Applycal mode
cpu_frac : float, optional
CPU fraction to use
mem_frac : float, optional
Memory fraction to use
overwrite_datacolumn : bool
Overwrite data column or not
Returns
-------
int
Success message for applying calibration solutions and spliting target scans
"""
try:
print("###########################")
print("Applying self-calibration solutions on target scans .....")
print("###########################\n")
applycal_basename = "apply_selfcal"
mslist = ",".join(target_mslist)
applycal_cmd = (
f"run_apply_selfcal {mslist}"
+ " --workdir "
+ workdir
+ " --caldir "
+ caldir
+ " --cpu_frac "
+ str(cpu_frac)
+ " --mem_frac "
+ str(mem_frac)
+ " --applymode "
+ str(applymode)
+ " --jobid "
+ str(jobid)
)
if overwrite_datacolumn:
applycal_cmd += " --overwrite_datacolumn"
os.makedirs(workdir + "/logs",exist_ok=True)
logfile = workdir + "/logs/" + applycal_basename + ".log"
applycal_cmd += f" --logfile {logfile}"
batch_file = create_batch_script_nonhpc(
applycal_cmd, workdir, applycal_basename
)
print(applycal_cmd + "\n")
os.system("bash " + batch_file)
print("Waiting to finish applycal...\n")
while True:
finished_file = glob.glob(workdir + "/.Finished_" + applycal_basename + "*")
if len(finished_file) > 0:
break
else:
time.sleep(1)
success_index_cal = int(finished_file[0].split("_")[-1])
if success_index_cal == 0:
print("Applying self-calibration is done successfully.\n")
else:
print("Applying self-calibration is unsuccessful.\n")
return success_index_cal
except Exception as e:
traceback.print_exc()
return 1
[docs]
def run_selfcal_jobs(
mslist,
workdir,
caldir,
start_thresh=5.0,
stop_thresh=3.0,
max_iter=100,
max_DR=1000,
min_iter=2,
conv_frac=0.3,
solint="60s",
do_apcal=True,
solar_selfcal=True,
keep_backup=False,
uvrange="",
minuv=0,
weight="briggs",
robust=0.0,
applymode="calonly",
min_tol_factor=10.0,
jobid=0,
cpu_frac=0.8,
mem_frac=0.8,
):
"""
Self-calibration on target scans
Parameters
----------
mslist: list
Target measurement set list
workdir : str
Working directory
caldir : str
Caltable directory
cpu_frac : float, optional
CPU fraction to use
mem_frac : float, optional
Memory fraction to use
start_threshold : int, optional
Start CLEAN threhold
end_threshold : int, optional
End CLEAN threshold
max_iter : int, optional
Maximum numbers of selfcal iterations
max_DR : float, optional
Maximum dynamic range
min_iter : int, optional
Minimum numbers of seflcal iterations at different stages
conv_frac : float, optional
Dynamic range fractional change to consider as converged
uvrange : str, optional
UV-range for calibration
minuv : float, optionial
Minimum UV-lambda to use in imaging
weight : str, optional
Image weighitng scheme
robust : float, optional
Robustness parameter for briggs weighting
solint : str, optional
Solutions interval
do_apcal : bool, optional
Perform ap-selfcal or not
min_tol_factor : float, optional
Minimum tolerance in temporal variation in imaging
applymode : str, optional
Solution apply mode
solar_selfcal : bool, optional
Whether is is solar selfcal or not
Returns
-------
int
Success message for self-calibration
"""
try:
print("###########################")
print("Performing self-calibration of target scans .....")
print("###########################\n")
selfcal_basename = "selfcal_targets"
mslist = ",".join(mslist)
selfcal_cmd = (
f"run_selfcal {mslist}"
+ " --workdir "
+ workdir
+ " --caldir "
+ caldir
+ " --cpu_frac "
+ str(cpu_frac)
+ " --mem_frac "
+ str(mem_frac)
+ " --jobid "
+ str(jobid)
)
if start_thresh > 0:
selfcal_cmd += " --start_thresh " + str(start_thresh)
if stop_thresh > 0 and stop_thresh < start_thresh:
selfcal_cmd += " --stop_thresh " + str(stop_thresh)
if max_iter > 0:
selfcal_cmd += " --max_iter " + str(max_iter)
if max_DR > 0:
selfcal_cmd += " --max_DR " + str(max_DR)
if min_iter > 0:
selfcal_cmd += " --min_iter " + str(min_iter)
if conv_frac > 0:
selfcal_cmd += " --conv_frac " + str(conv_frac)
if solint != "":
selfcal_cmd += " --solint " + solint
if uvrange != "":
selfcal_cmd += " --uvrange " + str(uvrange)
if minuv > 0:
selfcal_cmd += " --minuv " + str(minuv)
if applymode != "":
selfcal_cmd += " --applymode " + str(applymode)
if min_tol_factor > 0:
selfcal_cmd += " --min_tol_factor " + str(min_tol_factor)
if weight != "":
selfcal_cmd += " --weight " + str(weight)
if robust != "":
selfcal_cmd += " --robust " + str(robust)
if do_apcal == False:
selfcal_cmd += " --no_apcal"
if solar_selfcal == False:
selfcal_cmd += " --no_solar_selfcal"
if keep_backup:
selfcal_cmd += " --keep_backup"
os.makedirs(workdir + "/logs",exist_ok=True)
batch_file = create_batch_script_nonhpc(selfcal_cmd, workdir, selfcal_basename)
print(selfcal_cmd + "\n")
os.system("bash " + batch_file)
print("Waiting to finish self-calibration...\n")
while True:
finished_file = glob.glob(workdir + "/.Finished_" + selfcal_basename + "*")
if len(finished_file) > 0:
break
else:
time.sleep(1)
success_index_cal = int(finished_file[0].split("_")[-1])
if success_index_cal == 0:
print("Self-calibration is done successfully.\n")
else:
print("Self-calibration is unsuccessful.\n")
return success_index_cal
except Exception as e:
traceback.print_exc()
return 1
[docs]
def run_imaging_jobs(
mslist,
workdir,
outdir,
freqrange="",
timerange="",
minuv=-1,
weight="briggs",
robust=0.0,
pol="IQUV",
freqres=-1,
timeres=-1,
band="",
threshold=1.0,
use_multiscale=True,
use_solar_mask=True,
cutout_rsun=2.5,
make_overlay=True,
savemodel=False,
saveres=False,
jobid=0,
cpu_frac=0.8,
mem_frac=0.8,
):
"""
Self-calibration on target scans
Parameters
----------
mslist: list
Target measurement set list
workdir : str
Working directory
outdir : str
Output image directory
freqrange : str, optional
Frequency range to image in MHz
timerange : str, optional
Time range to image (YYYY/MM/DD/hh:mm:ss~YYYY/MM/DD/hh:mm:ss)
cpu_frac : float, optional
CPU fraction to use
mem_frac : float, optional
Memory fraction to use
minuv : float, optionial
Minimum UV-lambda to use in imaging
weight : str, optional
Imaging weighting
robust : float, optional
Briggs weighting robust parameter (-1 to 1)
pol : str, optional
Stokes parameters to image
freqres : float, optional
Frequency resolution of spectral chunk in MHz (default : -1, no spectral chunking)
timeres : float, optional
Time resolution of temporal chunks in MHz (default : -1, no temporal chunking)
band : str, optional
Band name
threshold : float, optional
CLEAN threshold
use_multiscale : bool, optional
Use multiscale or not
use_solar_mask : bool, optional
Use solar mask or not
cutout_rsun : float, optional
Cutout image size from center in solar radii (default : 2.5 solar radii)
make_overlay : bool, optional
Make SUVI MeerKAT overlay
savemodel : bool, optional
Save model images or not
saveres : bool, optional
Save residual images or not
Returns
-------
int
Success message for imaging
"""
try:
print("###########################")
print("Performing imaging of target scans .....")
print("###########################\n")
imaging_basename = "imaging_targets"
mslist = ",".join(mslist)
imaging_cmd = (
f"run_imaging {mslist}"
+ " --workdir "
+ workdir
+ " --outdir "
+ outdir
+ " --cpu_frac "
+ str(cpu_frac)
+ " --mem_frac "
+ str(mem_frac)
+ " --pol "
+ str(pol)
+ " --freqres "
+ str(freqres)
+ " --timeres "
+ str(timeres)
+ " --weight "
+ weight
+ " --robust "
+ str(robust)
+ " --minuv_l "
+ str(minuv)
+ " --threshold "
+ str(threshold)
+ " --cutout_rsun "
+ str(cutout_rsun)
+ " --jobid "
+ str(jobid)
)
if use_solar_mask == False:
imaging_cmd += " --no_solar_mask"
if savemodel == False:
imaging_cmd += " --no_savemodel"
if saveres == False:
imaging_cmd += " --no_saveres"
if use_multiscale == False:
imaging_cmd += " --no_multiscale"
if make_overlay == False:
imaging_cmd += " --no_make_overlay"
if freqrange != "":
imaging_cmd += " --freqrange " + str(freqrange)
if timerange != "":
imaging_cmd += " --timerange " + str(timerange)
if band != "":
imaging_cmd += " --band " + str(band)
os.makedirs(workdir + "/logs",exist_ok=True)
batch_file = create_batch_script_nonhpc(imaging_cmd, workdir, imaging_basename)
print(imaging_cmd + "\n")
os.system("bash " + batch_file)
print("Waiting to finish imaging...\n")
while True:
finished_file = glob.glob(workdir + "/.Finished_" + imaging_basename + "*")
if len(finished_file) > 0:
break
else:
time.sleep(1)
success_index_cal = int(finished_file[0].split("_")[-1])
if success_index_cal == 0:
print("Imaging is done successfully.\n")
else:
print("Imaging is unsuccessful.\n")
return success_index_cal
except Exception as e:
traceback.print_exc()
return 1
[docs]
def run_ds_jobs(msname, workdir, outdir, target_scans=[], jobid=0, cpu_frac=0.8, mem_frac=0.8):
"""
Make dynamic spectra of the target scans
Parameters
----------
msname : str
Name of the measurement set
workdir : str
Name of the work directory
outdir : str
Output directory
target_scans : list, optional
Target scans
cpu_frac : float, optional
CPU fraction to use
mem_frac : float, optional
Memory fraction to use
Returns
-------
int
Success message
"""
try:
print("###########################")
print("Making dynamic spectra of target scans .....")
print("###########################\n")
ds_basename = "ds_targets"
target_scans = " ".join([str(s) for s in target_scans])
ds_cmd = f"run_makeds {msname} --workdir {workdir} --outdir {outdir} --cpu_frac {cpu_frac} --mem_frac {mem_frac} --jobid {jobid} --target_scans {target_scans}"
os.makedirs(workdir + "/logs",exist_ok=True)
logfile = workdir + "/logs/" + ds_basename + ".log"
ds_cmd += f" --logfile {logfile}"
batch_file = create_batch_script_nonhpc(ds_cmd, workdir, ds_basename)
print(ds_cmd + "\n")
os.system("bash " + batch_file)
print("Waiting to finish making of dynamic spectra...\n")
while True:
finished_file = glob.glob(workdir + "/.Finished_" + ds_basename + "*")
if len(finished_file) > 0:
break
else:
time.sleep(1)
success_index_cal = int(finished_file[0].split("_")[-1])
if success_index_cal == 0:
print("Dynamic spectra are successfully made.\n")
else:
print("Dynamic spectra are not made successfully.\n")
return success_index_cal
except Exception as e:
traceback.print_exc()
return 1
[docs]
def check_status(workdir, basename):
"""
Check job status
Parameters
----------
workdir : str
Work directory
basename : str
Basename
Returns
-------
bool
Finished or not
"""
finished_file = glob.glob(workdir + "/.Finished_" + basename + "*")
if len(finished_file) > 0:
success_index_split_target = int(finished_file[0].split("_")[-1])
if success_index_split_target == 0:
return True
else:
return False
else:
return False
[docs]
def start_ping_logger(jobid, remote_jobid, waittime, remote_link=""):
"""
Run ping remote logger in background
Parameters
----------
jobid : int
Job ID
remote_jobid : str
Remote log job ID
waittime : float
Wait time before process ends in hour
remote_link : str, optional
Remote logger link
Returns
-------
int
PID of the background job
"""
if remote_link != "" and waittime > 0:
stop_event = Event()
proc = Process(
target=ping_logger,
args=(jobid, remote_jobid, stop_event, remote_link),
daemon=False,
)
proc.start()
def stop_after_delay():
time.sleep(waittime * 3600)
stop_event.set()
from threading import Thread
Thread(target=stop_after_delay, daemon=True).start()
return int(proc.pid)
else:
return
[docs]
def exit_job(start_time, mspath="", workdir=""):
print(f"Total time taken: {round(time.time()-start_time,2)}s.")
time.sleep(10)
gc.collect()
if mspath != "" and os.path.exists(mspath + "/dask-scratch-space"):
os.system("rm -rf " + mspath + "/dask-scratch-space " + mspath + "/tmp")
if workdir != "" and os.path.exists(workdir + "/dask-scratch-space"):
os.system("rm -rf " + workdir + "/dask-scratch-space " + workdir + "/tmp")
[docs]
def master_control(
msname,
workdir,
outdir,
solar_data=True,
# Pre-calibration
do_forcereset_weightflag=False,
do_cal_partition=True,
do_cal_flag=True,
do_import_model=True,
# Basic calibration
do_basic_cal=True,
do_noise_cal=True,
do_applycal=True,
# Target data preparation
do_target_split=True,
target_scans=[],
freqrange="",
timerange="",
uvrange="",
# Polarization calibration
do_polcal=False,
# Self-calibration
do_selfcal=True,
do_selfcal_split=True,
do_apply_selfcal=True,
do_ap_selfcal=True,
solar_selfcal=True,
solint="5min",
# Sidereal correction
do_sidereal_cor=False,
# Dynamic spectra
make_ds=True,
# Imaging
do_imaging=True,
do_pbcor=True,
weight="briggs",
robust=0.0,
minuv=0,
image_freqres=-1,
image_timeres=-1,
pol="IQUV",
apply_parang=True,
clean_threshold=1.0,
use_multiscale=True,
use_solar_mask=True,
cutout_rsun=2.5,
make_overlay=True,
# Resource settings
cpu_frac=0.8,
mem_frac=0.8,
n_nodes=0,
keep_backup=False,
# Remote logging
remote_logger=False,
remote_logger_waittime=0,
):
"""
Master controller of the entire pipeline
Parameters
----------
msname : str
Measurement set name
workdir : str
Work directory path
outdir : str
Output directory
solar_data : bool, optional
Whether it is solar data or not
do_forcereset_weightflag : bool, optional
Reset weights and flags of the input ms
do_cal_partition : bool, optional
Make calibrator multi-MS
do_cal_flag : bool, optional
Perform flagging on calibrator
do_import_model : bool, optional
Import model visibilities of flux and polarization calibrators
do_basic_cal : bool, optional
Perform basic calibration
do_noise_cal : bool, optional
Peform calibration of solar attenuators using noise diode (only used if solar_data=True)
do_applycal : bool, optional
Apply basic calibration on target scans
do_target_split : bool, optional
Split target scans into chunks
target_scans : list, optional
Target scans to self-cal and image
freqrange : str, optional
Frequency range to image in MHz (xx1~xx2,xx3~xx4,)
timerange : str, optional
Time range to image in YYYY/MM/DD/hh:mm:ss format (tt1~tt2,tt3~tt4,...)
uvrange : str, optional
UV-range for calibration
do_polcal : bool, optional
Perform full-polarization calibration and imaging
do_selfcal : bool, optional
Perform self-calibration
do_selfcal_split : bool, optional
Split data after each round of self-calibration
do_apply_selfcal : bool, optional
Apply self-calibration solutions
do_ap_selfcal : bool, optional
Perform amplitude-phase self-cal or not
solar_selfcal : bool, optional
Whether self-calibration is performing on solar observation or not
solint : str, optional
Solution intervals in self-cal
do_sidereal_cor : bool, optional
Perform solar sidereal motion correction or not
make_ds : bool, optional
Make dynamic spectra
do_imaging : bool, optional
Perform final imaging
do_pbcor : bool, optional
Perform primary beam correction
weight : str, optional
Image weighting
robust : float, optional
Robust parameter for briggs weighting (-1 to 1)
minuv : float, optional
Minimum UV-lambda for final imaging
image_freqres : float, optional
Image frequency resolution in MHz (-1 means full bandwidth)
image_timeres : float, optional
Image temporal resolution in seconds (-1 means full scan duration)
pol : str, optional
Stokes parameters of final imaging
apply_parang : bool, optional
Apply parallactic angle correction
clean_threshold : float, optional
CLEAN threshold of final imaging
use_multiscale : bool, optional
Use multiscale scales or not
use_solar_mask : bool, optional
Use solar mask
cutout_rsun : float, optional
Cutout image size from center in solar radii (default : 2.5 solar radii)
make_overlay : bool, optional
Make SUVI MeerKAT overlay
cpu_frac : float, optional
CPU fraction to use
mem_frac : float, optional
Memory fraction to use
n_nodes: int, optional
Number of nodes to use (Only for cluster architecture)
keep_backup : bool, optional
Keep backup of self-cal rounds and final models and residual images
remote_logger : bool, optional
Enable remote logging of the pipeline status
remote_logger_waittime : int, optional
Wait time (in seconds) before connecting to remote logger
Returns
-------
int
Success message
"""
init_meersolar_data()
msname = os.path.abspath(msname.rstrip("/"))
if os.path.exists(msname) == False:
print("Please provide a valid measurement set location.\n")
exit_job(start_time)
return 1
mspath = os.path.dirname(msname)
###################################
# Preparing working directories
###################################
if workdir == "":
workdir = os.path.dirname(os.path.abspath(msname)) + "/workdir"
workdir=workdir.rstrip("/")
if outdir=="":
outdir=workdir
outdir=outdir.rstrip("/")
caldir=outdir+"/caltables"
caldir=caldir.rstrip("/")
os.makedirs(workdir,exist_ok=True)
os.makedirs(outdir,exist_ok=True)
os.makedirs(caldir,exist_ok=True)
if mspath != "" and os.path.exists(mspath + "/dask-scratch-space"):
os.system("rm -rf " + mspath + "/dask-scratch-space " + mspath + "/tmp")
if workdir != "" and os.path.exists(workdir + "/dask-scratch-space"):
os.system("rm -rf " + workdir + "/dask-scratch-space " + workdir + "/tmp")
try:
####################################
# Job and process IDs
####################################
pid = os.getpid()
jobid = get_jobid()
main_job_file = save_main_process_info(
pid,
jobid,
os.path.abspath(msname),
os.path.abspath(workdir),
os.path.abspath(outdir),
cpu_frac,
mem_frac,
)
start_time = time.time()
print("###########################")
print(f"MeerSOLAR Job ID: {jobid}")
print(f"Work directory: {workdir}")
print(f"Final product directory: {outdir}")
print("###########################\n")
#####################################
# Moving into work directory
#####################################
os.chdir(workdir)
cpu_frac_bkp = copy.deepcopy(cpu_frac)
mem_frac_bkp = copy.deepcopy(mem_frac)
if remote_logger:
remote_link = get_remote_logger_link()
if remote_link == "":
print("Please provide a valid remote link.")
remote_logger = False
if remote_logger == False:
emails = get_emails()
timestamp = dt.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
if emails != "":
email_subject = f"MeerSOLAR Logger Details: {timestamp}"
email_msg = (
f"MeerSOLAR user,\n\n"
f"MeerSOLAR Job ID: {jobid}\n\n"
f"Best,\n"
f"MeerSOLAR"
)
success_msg, error_msg = send_notification(emails, email_subject, email_msg)
else:
####################################
# Job name and logging password
####################################
hostname = socket.gethostname()
timestamp = dt.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
job_name = f"{hostname} :: {timestamp} :: {os.path.basename(msname).split('.ms')[0]}"
timestamp1 = dt.utcnow().strftime("%Y%m%dT%H%M%S")
remote_job_id = (
f"{hostname}_{timestamp1}_{os.path.basename(msname).split('.ms')[0]}"
)
password = generate_password()
np.save(
f"{workdir}/jobname_password.npy",
np.array([job_name, password], dtype="object"),
)
print(
"############################################################################"
)
print(remote_link)
print(f"Job ID: {job_name}")
print(f"Remote access password: {password}")
print(
"#############################################################################"
)
emails = get_emails()
if emails != "":
email_subject = f"MeerSOLAR Logger Details: {timestamp}"
email_msg = (
f"MeerSOLAR user,\n\n"
f"MeerSOLAR Job ID: {jobid}\n\n"
f"Remote logger Job ID: {job_name}\n"
f"Remote access password: {password}\n\n"
f"Best,\n"
f"MeerSOLAR"
)
success_msg, error_msg = send_notification(emails, email_subject, email_msg)
#####################################
# Settings for solar data
#####################################
if solar_data:
if use_solar_mask == False:
print("Use solar mask during CLEANing.")
use_solar_mask = True
if solar_selfcal == False:
solar_selfcal = True
full_FoV=False
else:
if do_noise_cal:
print(
"Turning off noise diode based calibration for non-solar observation."
)
do_noise_cal = False
if use_solar_mask:
print("Stop using solar mask during CLEANing.")
use_solar_mask = False
if solar_selfcal:
solar_selfcal = False
full_FoV=True
###################################################
# Target spliting spectral and temporal chunks
##################################################
if image_timeres > (2 * 3660): # If more than 2 hours
print(
f"Image time integration is more than 2 hours, which may cause smearing due to solar differential rotation."
)
#################################################
# Determining maximum allowed frequency averaging
#################################################
max_freqres = calc_bw_smearing_freqwidth(msname,full_FoV=full_FoV)
if image_freqres>0:
freqavg = round(min(image_freqres, max_freqres), 1)
else:
freqavg = round(max_freqres,1)
################################################
# Determining maximum allowed temporal averaging
################################################
if solar_data: # For solar data, it is assumed Sun is tracked.
max_timeres = calc_time_smearing_timewidth(msname)
else:
max_timeres = min(
calc_time_smearing_timewidth(msname), max_time_solar_smearing(msname)
)
if image_timeres>0:
timeavg = round(min(image_timeres, max_timeres), 1)
else:
timeavg = round(max_timeres,1)
#########################################
# Target ms frequency chunk based on band
#########################################
bad_spws = get_bad_chans(msname).split("0:")[-1].split(";")
good_start = []
good_end = []
for i in range(len(bad_spws) - 1):
start_chan = int(bad_spws[i].split("~")[-1]) + 1
end_chan = int(bad_spws[i + 1].split("~")[0]) - 1
good_start.append(start_chan)
good_end.append(end_chan)
start_chan = min(good_start)
end_chan = max(good_end)
spw = f"0:{start_chan}~{end_chan}"
#############################################################
# Determining numbers of spectral chunks for parallel imaging
#############################################################
if image_freqres < 0:
target_freq_chunk = -1
nchunk = 1
else:
msmd = msmetadata()
msmd.open(msname)
chanres = msmd.chanres(0, unit="MHz")[0]
msmd.close()
total_bw = chanres * (end_chan - start_chan)
nchunk=int(total_bw/image_freqres)
if nchunk>max(4,n_nodes): # Maximum 4 chunking or number of compute nodes
nchunk=max(4,n_nodes)
target_freq_chunk=total_bw/nchunk
else:
nchnuk=1
target_freq_chunk=image_freqres
#############################
# Reset any previous weights
############################
cpu_usage = psutil.cpu_percent(interval=1) # Average over 1 second
total_cpus = psutil.cpu_count(logical=True)
available_cpus = int(total_cpus * (1 - cpu_usage / 100.0))
available_cpus = max(1, available_cpus) # Avoid zero workers
reset_weights_and_flags(
msname, n_threads=available_cpus, force_reset=do_forcereset_weightflag
)
#######################################
# Run dynamic spectra making
#######################################
if make_ds:
msg = run_ds_jobs(
msname,
workdir,
outdir,
jobid=jobid,
target_scans=target_scans,
cpu_frac=round(cpu_frac, 2),
mem_frac=round(mem_frac, 2),
)
if msg != 0:
print("!!!! WARNING: Dynamic spectra could not be made. !!!!")
########################################
# Run noise-diode based flux calibration
########################################
if do_noise_cal:
msg = run_noise_diode_cal(
msname,
workdir,
caldir,
jobid=jobid,
keep_backup=keep_backup,
cpu_frac=round(cpu_frac, 2),
mem_frac=round(mem_frac, 2),
) # Run noise diode based flux calibration
if msg != 0:
print(
"!!!! WARNING: Error in running noise-diode based flux calibration. Flux density calibration may not be correct. !!!!"
)
##############################
# Run partitioning jobs
##############################
# If do partition or calibrator ms is not present in case of basic calibration is requested
calibrator_msname = workdir + "/calibrator.ms"
if do_basic_cal and (
do_cal_partition or os.path.exists(calibrator_msname) == False
):
msg = run_partion(
msname,
workdir,
split_fullpol=do_polcal,
jobid=jobid,
cpu_frac=round(cpu_frac, 2),
mem_frac=round(mem_frac, 2),
)
if msg != 0:
print("!!!! WARNING: Error in partitioning calibrator fields. !!!!")
exit_job(start_time, mspath, workdir)
if remote_logger:
pid = start_ping_logger(
jobid,
remote_job_id,
remote_logger_waittime,
remote_link=remote_link,
)
meersolar_cachedir = get_meersolar_cachedir()
if pid is not None:
save_pid(pid, f"{meersolar_cachedir}/pids/pids_{jobid}.txt")
return 1
##################################
# Run flagging jobs on calibrators
##################################
# Only if basic calibration is requested
if do_cal_flag and do_basic_cal:
if os.path.exists(calibrator_msname) == False:
print(f"Calibrator ms: {calibrator_ms} is not present.")
exit_job(start_time, mspath, workdir)
if remote_logger:
pid = start_ping_logger(
jobid,
remote_job_id,
remote_logger_waittime,
remote_link=remote_link,
)
meersolar_cachedir = get_meersolar_cachedir()
if pid is not None:
save_pid(pid, f"{meersolar_cachedir}/pids/pids_{jobid}.txt")
return 1
msg = run_flag(
calibrator_msname,
workdir,
flag_calibrators=True,
jobid=jobid,
cpu_frac=round(cpu_frac, 2),
mem_frac=round(mem_frac, 2),
)
if msg != 0:
print(
"!!!! WARNING: Flagging error. Examine calibration solutions with caution. !!!!"
)
#################################
# Import model
#################################
# Only if basic calibration is requested
if do_import_model and do_basic_cal:
if os.path.exists(calibrator_msname) == False:
print(f"Calibrator ms: {calibrator_ms} is not present.")
exit_job(start_time, mspath, workdir)
if remote_logger:
pid = start_ping_logger(
jobid,
remote_job_id,
remote_logger_waittime,
remote_link=remote_link,
)
meersolar_cachedir = get_meersolar_cachedir()
if pid is not None:
save_pid(pid, f"{meersolar_cachedir}/pids/pids_{jobid}.txt")
return 1
fluxcal_fields, fluxcal_scans = get_fluxcals(calibrator_msname)
phasecal_fields, phasecal_scans, phasecal_fluxes = get_phasecals(
calibrator_msname
)
calibrator_field = fluxcal_fields + phasecal_fields
msg = run_import_model(
calibrator_msname,
workdir,
jobid=jobid,
cpu_frac=round(cpu_frac, 2),
mem_frac=round(mem_frac, 2),
) # Run model import
if msg != 0:
print(
"!!!! WARNING: Error in importing calibrator models. Not continuing calibration. !!!!"
)
exit_job(start_time, mspath, workdir)
if remote_logger:
pid = start_ping_logger(
jobid,
remote_job_id,
remote_logger_waittime,
remote_link=remote_link,
)
meersolar_cachedir = get_meersolar_cachedir()
if pid is not None:
save_pid(pid, f"{meersolar_cachedir}/pids/pids_{jobid}.txt")
return 1
###############################
# Run basic calibration
###############################
use_only_bandpass = False
if do_basic_cal:
if os.path.exists(calibrator_msname) == False:
print(f"Calibrator ms: {calibrator_ms} is not present.")
exit_job(start_time, mspath, workdir)
if remote_logger:
pid = start_ping_logger(
jobid,
remote_job_id,
remote_logger_waittime,
remote_link=remote_link,
)
meersolar_cachedir = get_meersolar_cachedir()
if pid is not None:
save_pid(pid, f"{meersolar_cachedir}/pids/pids_{jobid}.txt")
return 1
msg = run_basic_cal_jobs(
calibrator_msname,
workdir,
caldir,
perform_polcal=do_polcal,
jobid=jobid,
cpu_frac=round(cpu_frac, 2),
mem_frac=round(mem_frac, 2),
keep_backup=keep_backup,
) # Run basic calibration
if msg != 0:
print(
"!!!! WARNING: Error in basic calibration. Not continuing further. !!!!"
)
exit_job(start_time, mspath, workdir)
if remote_logger:
pid = start_ping_logger(
jobid,
remote_job_id,
remote_logger_waittime,
remote_link=remote_link,
)
meersolar_cachedir = get_meersolar_cachedir()
if pid is not None:
save_pid(pid, f"{meersolar_cachedir}/pids/pids_{jobid}.txt")
return 1
##########################################
# Checking presence of necessary caltables
##########################################
if len(glob.glob(caldir + "/*.bcal")) == 0:
print(f"No bandpass table is present in calibration directory : {caldir}.")
exit_job(start_time, mspath, workdir)
if remote_logger:
pid = start_ping_logger(
jobid,
remote_job_id,
remote_logger_waittime,
remote_link=remote_link,
)
meersolar_cachedir = get_meersolar_cachedir()
if pid is not None:
save_pid(pid, f"{meersolar_cachedir}/pids/pids_{jobid}.txt")
return 1
if len(glob.glob(caldir + "/*.gcal")) == 0:
print(
f"No time-dependent gaintable is present in calibration directory : {caldir}. Applying only bandpass solutions."
)
use_only_bandpass = True
############################################
# Spliting for self-cals
############################################
# Spliting only if self-cal is requested
if do_selfcal:
if do_selfcal_split == False:
selfcal_target_mslist = glob.glob(workdir + "/selfcals_scan*.ms")
if len(selfcal_target_mslist) == 0:
print(
"No measurement set is present for self-calibration. Spliting them.."
)
do_selfcal_split = True
if do_selfcal_split:
prefix = "selfcals"
try:
time_interval = float(solint)
except:
if "s" in solint:
time_interval = float(solint.split("s")[0])
elif "min" in solint:
time_interval = float(solint.split("min")[0]) * 60
elif solint == "int":
time_interval = image_timeres
else:
time_interval = -1
msg = run_target_split_jobs(
msname,
workdir,
datacolumn="data",
freqres=freqavg,
timeres=timeavg,
target_freq_chunk=25,
n_spectral_chunk=nchunk, # Number of target spectral chunk
target_scans=target_scans,
prefix=prefix,
merge_spws=True,
time_window=min(60, time_interval),
time_interval=time_interval,
split_fullpol=do_polcal,
jobid=jobid,
cpu_frac=round(cpu_frac, 2),
mem_frac=round(mem_frac, 2),
max_cpu_frac=round(cpu_frac, 2),
max_mem_frac=round(mem_frac, 2),
)
if msg != 0:
print(
"!!!! WARNING: Error in running spliting target scans for selfcal. !!!!"
)
do_selfcal = False
else:
print("Waiting to finish spliting of target scans for selfcal...\n")
split_basename = f"split_{prefix}"
while True:
finished_file = glob.glob(
workdir + "/.Finished_" + split_basename + "*"
)
if len(finished_file) > 0:
break
else:
time.sleep(1)
success_index_split_target = int(finished_file[0].split("_")[-1])
if success_index_split_target == 0:
print(
"Spliting target scans are done successfully for selfcal.\n"
)
else:
print(
"!!!! WARNING: Error in spliting target scans for selfcal. Not continuing further for selfcal. !!!!"
)
do_selfcal = False
####################################
# Filtering any corrupted ms
#####################################
if do_selfcal:
selfcal_target_mslist = glob.glob(workdir + "/selfcals_scan*.ms")
filtered_mslist = [] # Filtering in case any ms is corrupted
for ms in selfcal_target_mslist:
checkcol = check_datacolumn_valid(ms)
if checkcol:
filtered_mslist.append(ms)
else:
print(f"Issue in : {ms}")
os.system(f"rm -rf {ms}")
selfcal_mslist = filtered_mslist
if len(selfcal_mslist) == 0:
print(
"No splited target scan ms are available in work directory for selfcal. Not continuing further for selfcal."
)
do_selfcal = False
print(f"Selfcal mslist : {[os.path.basename(i) for i in selfcal_mslist]}")
#########################################################
# Applying solutions on target scans for self-calibration
#########################################################
if do_selfcal: # Applying solutions for selfcal
msg = run_apply_basiccal_sol(
selfcal_mslist,
workdir,
caldir,
use_only_bandpass=use_only_bandpass,
overwrite_datacolumn=False,
applymode="calflag",
jobid=jobid,
cpu_frac=round(cpu_frac, 2),
mem_frac=round(mem_frac, 2),
)
if msg != 0:
print(
"!!!! WARNING: Error in applying basic calibration solutions on target scans. Not continuing further for selfcal.!!!!"
)
do_selfcal = False
########################################
# Performing self-calibration
########################################
if do_selfcal:
os.system(
"rm -rf " + workdir + "/*selfcal " + workdir + "/caltables/*selfcal*"
)
if do_sidereal_cor:
msg = run_solar_siderealcor_jobs(
selfcal_mslist,
workdir,
prefix="selfcals",
jobid=jobid,
cpu_frac=round(cpu_frac, 2),
mem_frac=round(mem_frac, 2),
max_cpu_frac=round(cpu_frac, 2),
max_mem_frac=round(mem_frac, 2),
)
if msg != 0:
print("Sidereal correction is not successful.")
msg = run_selfcal_jobs(
selfcal_mslist,
workdir,
caldir,
solint=solint,
do_apcal=do_ap_selfcal,
solar_selfcal=solar_selfcal,
keep_backup=keep_backup,
uvrange=uvrange,
weight="briggs",
robust=0.0,
jobid=jobid,
cpu_frac=round(cpu_frac, 2),
mem_frac=round(mem_frac, 2),
)
if msg != 0:
print(
"!!!! WARNING: Error in self-calibration on target scans. Not applying self-calibration. !!!!"
)
do_apply_selfcal = False
########################################
# Checking self-cal caltables
########################################
selfcal_tables = glob.glob(caldir + "/*selfcal*")
if len(selfcal_tables) == 0:
print(
"Self-calibration is not performed and no self-calibration caltable is available."
)
do_apply_selfcal = False
#############################################
# Check spliting target scans finished or not
#############################################
# If corrected data is requested or imaging is requested
prefix = "targets"
if do_target_split and (do_applycal or do_imaging):
msg = run_target_split_jobs(
msname,
workdir,
datacolumn="data",
spw=spw,
target_freq_chunk=target_freq_chunk,
freqres=freqavg,
timeres=timeavg,
n_spectral_chunk=-1,
target_scans=target_scans,
prefix=prefix,
split_fullpol=do_polcal,
jobid=jobid,
cpu_frac=round(cpu_frac, 2),
mem_frac=round(mem_frac, 2),
max_cpu_frac=round(cpu_frac, 2),
max_mem_frac=round(mem_frac, 2),
)
if msg != 0:
print("!!!! WARNING: Error in running spliting target scans. !!!!")
exit_job(start_time, mspath, workdir)
if remote_logger:
pid = start_ping_logger(
jobid,
remote_job_id,
remote_logger_waittime,
remote_link=remote_link,
)
meersolar_cachedir = get_meersolar_cachedir()
if pid is not None:
save_pid(pid, f"{meersolar_cachedir}/pids/pids_{jobid}.txt")
return 1
##################################
# Waiting only spliting is started
##################################
print("Waiting to finish spliting of target scans...\n")
split_basename = f"split_{prefix}"
while True:
finished_file = glob.glob(
workdir + "/.Finished_" + split_basename + "*"
)
if len(finished_file) > 0:
break
else:
time.sleep(1)
success_index_split_target = int(finished_file[0].split("_")[-1])
if success_index_split_target == 0:
print("Spliting target scans are done successfully.\n")
else:
print(
"!!!! WARNING: Error in spliting target scans. Not continuing further. !!!!"
)
exit_job(start_time, mspath, workdir)
if remote_logger:
pid = start_ping_logger(
jobid,
remote_job_id,
remote_logger_waittime,
remote_link=remote_link,
)
meersolar_cachedir = get_meersolar_cachedir()
if pid is not None:
save_pid(pid, f"{meersolar_cachedir}/pids/pids_{jobid}.txt")
return 1
cpu_frac = copy.deepcopy(cpu_frac_bkp)
mem_frac = copy.deepcopy(mem_frac_bkp)
target_mslist = glob.glob(workdir + "/targets_scan*.ms")
####################################
# Filtering any corrupted ms
#####################################
filtered_mslist = [] # Filtering in case any ms is corrupted
for ms in target_mslist:
checkcol = check_datacolumn_valid(ms)
if checkcol:
filtered_mslist.append(ms)
else:
print(f"Issue in : {ms}")
os.system("rm -rf {ms}")
target_mslist = filtered_mslist
if len(target_mslist) == 0:
print("No splited target scan ms are available in work directory.")
exit_job(start_time, mspath, workdir)
if remote_logger:
pid = start_ping_logger(
jobid,
remote_job_id,
remote_logger_waittime,
remote_link=remote_link,
)
meersolar_cachedir = get_meersolar_cachedir()
if pid is not None:
save_pid(pid, f"{meersolar_cachedir}/pids/pids_{jobid}.txt")
return 1
if do_applycal or do_imaging:
print(
f"Target scan mslist : {[os.path.basename(i) for i in target_mslist]}"
)
#########################################################
# Applying basic solutions on target scans
#########################################################
if do_applycal:
if len(target_mslist) > 0:
msg = run_apply_basiccal_sol(
target_mslist,
workdir,
caldir,
use_only_bandpass=use_only_bandpass,
overwrite_datacolumn=True,
applymode="calflag",
jobid=jobid,
cpu_frac=round(cpu_frac, 2),
mem_frac=round(mem_frac, 2),
)
if msg != 0:
print(
"!!!! WARNING: Error in applying basic calibration solutions on target scans. Not continuing further.!!!!"
)
exit_job(start_time, mspath, workdir)
if remote_logger:
pid = start_ping_logger(
jobid,
remote_job_id,
remote_logger_waittime,
remote_link=remote_link,
)
meersolar_cachedir = get_meersolar_cachedir()
if pid is not None:
save_pid(pid, f"{meersolar_cachedir}/pids/pids_{jobid}.txt")
return 1
else:
print(
"!!!! WARNING: No measurement set is present for basic calibration applying solutions. Not continuing further. !!!!"
)
exit_job(start_time, mspath, workdir)
if remote_logger:
pid = start_ping_logger(
jobid,
remote_job_id,
remote_logger_waittime,
remote_link=remote_link,
)
meersolar_cachedir = get_meersolar_cachedir()
if pid is not None:
save_pid(pid, f"{meersolar_cachedir}/pids/pids_{jobid}.txt")
return 1
if do_sidereal_cor:
msg = run_solar_siderealcor_jobs(
target_mslist,
workdir,
prefix="targets",
jobid=jobid,
cpu_frac=round(cpu_frac, 2),
mem_frac=round(mem_frac, 2),
max_cpu_frac=round(cpu_frac, 2),
max_mem_frac=round(mem_frac, 2),
)
########################################
# Apply self-calibration
########################################
if do_apply_selfcal:
target_mslist = sorted(target_mslist)
msg = run_apply_selfcal_sol(
target_mslist,
workdir,
caldir,
overwrite_datacolumn=False,
applymode="calonly",
jobid=jobid,
cpu_frac=round(cpu_frac, 2),
mem_frac=round(mem_frac, 2),
)
if msg != 0:
print(
"!!!! WARNING: Error in applying self-calibration solutions on target scans. !!!!"
)
#####################################
# Imaging
######################################
if do_imaging:
if do_polcal == False: # Only if do_polcal is False, overwrite to make only Stokes I
pol = "I"
band = get_band_name(target_mslist[0])
msg = run_imaging_jobs(
target_mslist,
workdir,
outdir,
freqrange=freqrange,
timerange=timerange,
minuv=minuv,
weight=weight,
robust=float(robust),
pol=pol,
band=band,
freqres=image_freqres,
timeres=image_timeres,
threshold=float(clean_threshold),
use_multiscale=use_multiscale,
use_solar_mask=use_solar_mask,
cutout_rsun=cutout_rsun,
make_overlay=make_overlay,
savemodel=keep_backup,
saveres=keep_backup,
jobid=jobid,
cpu_frac=round(cpu_frac, 2),
mem_frac=round(mem_frac, 2),
)
if msg != 0:
print(
"!!!! WARNING: Final imaging on all measurement sets is not successful. Check the image directory. !!!!"
)
exit_job(start_time, mspath, workdir)
if remote_logger:
pid = start_ping_logger(
jobid,
remote_job_id,
remote_logger_waittime,
remote_link=remote_link,
)
meersolar_cachedir = get_meersolar_cachedir()
if pid is not None:
save_pid(pid, f"{meersolar_cachedir}/pids/pids_{jobid}.txt")
return 1
###########################
# Primary beam correction
###########################
if do_pbcor:
if weight == "briggs":
weight_str = f"{weight}_{robust}"
else:
weight_str = weight
if image_freqres == -1 and image_timeres == -1:
imagedir = outdir + f"/imagedir_f_all_t_all_w_{weight_str}"
elif image_freqres != -1 and image_timeres == -1:
imagedir = (
outdir
+ f"/imagedir_f_{round(float(image_freqres),1)}_t_all_w_{weight_str}"
)
elif image_freqres == -1 and image_timeres != -1:
imagedir = (
outdir
+ f"/imagedir_f_all_t_{round(float(image_timeres),1)}_w_{weight_str}"
)
else:
imagedir = (
outdir
+ f"/imagedir_f_{round(float(image_freqres),1)}_t_{round(float(image_timeres),1)}_w_{weight_str}"
)
imagedir = imagedir + "/images"
images = glob.glob(imagedir + "/*.fits")
if len(images) == 0:
print(f"No image is present in image directory: {imagedir}")
else:
msg = run_apply_pbcor(
imagedir,
workdir,
apply_parang=apply_parang,
jobid=jobid,
cpu_frac=round(cpu_frac, 2),
mem_frac=round(mem_frac, 2),
)
if msg != 0:
print(
"!!!! WARNING: Primary beam corrections of the final images are not successful. !!!!"
)
exit_job(start_time, mspath, workdir)
if remote_logger:
pid = start_ping_logger(
jobid,
remote_job_id,
remote_logger_waittime,
remote_link=remote_link,
)
meersolar_cachedir = get_meersolar_cachedir()
if pid is not None:
save_pid(pid, f"{meersolar_cachedir}/pids/pids_{jobid}.txt")
return 1
print(f"Final image directory: {os.path.dirname(imagedir)}\n")
###########################################
# Successful exit
###########################################
print(
f"Calibration and imaging pipeline is successfully run on measurement set : {msname}\n"
)
exit_job(start_time, mspath, workdir)
if remote_logger:
pid = start_ping_logger(
jobid, remote_job_id, remote_logger_waittime, remote_link=remote_link
)
meersolar_cachedir = get_meersolar_cachedir()
if pid is not None:
save_pid(pid, f"{meersolar_cachedir}/pids/pids_{jobid}.txt")
return 0
except Exception as e:
traceback.print_exc()
return 1
finally:
drop_cache(msname)
drop_cache(workdir)
drop_cache(outdir)
[docs]
def main():
parser = argparse.ArgumentParser(
description="Run MeerSOLAR for calibration and imaging of solar observations.",
formatter_class=SmartDefaultsHelpFormatter,
)
# === Essential parameters ===
essential = parser.add_argument_group(
"###################\nEssential parameters\n###################"
)
essential.add_argument("msname", type=str, help="Measurement set name")
essential.add_argument(
"--workdir",
type=str,
dest="workdir",
required=True,
help="Working directory",
)
essential.add_argument(
"--outdir",
type=str,
dest="outdir",
required=True,
help="Output products directory",
)
# === Advanced calibration parameters ===
advanced_cal = parser.add_argument_group(
"###################\nAdvanced calibration parameters\n###################"
)
advanced_cal.add_argument(
"--solint",
type=str,
default="5min",
help="Solution interval for calibration (e.g. 'int', '10s', '5min', 'inf')",
)
advanced_cal.add_argument(
"--cal_uvrange",
type=str,
default="",
help="UV range to filter data for calibration (e.g. '>100klambda', '100~10000lambda')",
)
advanced_cal.add_argument(
"--no_polcal",
action="store_false",
dest="do_polcal",
help="Disable polarization calibration",
)
# === Advanced imaging and calibration parameters ===
advanced_image = parser.add_argument_group(
"###################\nAdvanced imaging parameters\n###################"
)
advanced_image.add_argument(
"--target_scans",
nargs="*",
type=str,
default=[],
help="List of target scans to process (space-separated, e.g. 3 5 7)",
)
advanced_image.add_argument(
"--freqrange",
type=str,
default="",
help="Frequency range in MHz to select during imaging (comma-seperate, e.g. '100~110,130~140')",
)
advanced_image.add_argument(
"--timerange",
type=str,
default="",
help="Time range to select during imaging (comma-seperated, e.g. '2014/09/06/09:30:00~2014/09/06/09:45:00,2014/09/06/10:30:00~2014/09/06/10:45:00')",
)
advanced_image.add_argument(
"--image_freqres",
type=int,
default=-1,
help="Output image frequency resolution in MHz (-1 = full)",
)
advanced_image.add_argument(
"--image_timeres",
type=int,
default=-1,
help="Output image time resolution in seconds (-1 = full)",
)
advanced_image.add_argument(
"--pol",
type=str,
default="IQUV",
help="Stokes parameter(s) to image (e.g. 'I', 'XX', 'RR', 'IQUV')",
)
advanced_image.add_argument(
"--minuv",
type=float,
default=0,
help="Minimum baseline length (in wavelengths) to include in imaging",
)
advanced_image.add_argument(
"--weight",
type=str,
default="briggs",
help="Imaging weighting scheme (e.g. 'briggs', 'natural', 'uniform')",
)
advanced_image.add_argument(
"--robust",
type=float,
default=0.0,
help="Robust parameter for Briggs weighting (-2 to +2)",
)
advanced_image.add_argument(
"--no_multiscale",
action="store_false",
dest="use_multiscale",
help="Disable multiscale CLEAN for extended structures",
)
advanced_image.add_argument(
"--clean_threshold",
type=float,
default=1.0,
help="Clean threshold in sigma for final deconvolution (Note this is not auto-mask)",
)
advanced_image.add_argument(
"--do_pbcor",
action="store_true",
help="Apply primary beam correction after imaging",
)
advanced_image.add_argument(
"--no_apply_parang",
action="store_false",
dest="apply_parang",
help="Disable parallactic angle rotation during imaging",
)
advanced_image.add_argument(
"--cutout_rsun",
type=float,
default=2.5,
help="Field of view cutout radius in solar radii",
)
advanced_image.add_argument(
"--no_solar_mask",
action="store_false",
dest="use_solar_mask",
help="Disable use solar disk mask during deconvolution",
)
advanced_image.add_argument(
"--no_overlay",
action="store_false",
dest="make_overlay",
help="Disable overlay plot on GOES SUVI after imaging",
)
# === Advanced options ===
advanced = parser.add_argument_group(
"###################\nAdvanced pipeline parameters\n###################"
)
advanced.add_argument(
"--non_solar_data",
action="store_false",
dest="solar_data",
help="Disable solar data mode",
)
advanced.add_argument(
"--non_ds",
action="store_false",
dest="make_ds",
help="Disable making solar dynamic spectra",
)
advanced.add_argument(
"--do_forcereset_weightflag",
action="store_true",
help="Force reset of weights and flags (disabled by default)",
)
advanced.add_argument(
"--no_noise_cal",
action="store_false",
dest="do_noise_cal",
help="Disable noise calibration",
)
advanced.add_argument(
"--no_cal_partition",
action="store_false",
dest="do_cal_partition",
help="Disable calibrator MS partitioning",
)
advanced.add_argument(
"--no_cal_flag",
action="store_false",
dest="do_cal_flag",
help="Disable initial flagging of calibrators",
)
advanced.add_argument(
"--no_import_model",
action="store_false",
dest="do_import_model",
help="Disable model import",
)
advanced.add_argument(
"--no_basic_cal",
action="store_false",
dest="do_basic_cal",
help="Disable basic gain calibration",
)
advanced.add_argument(
"--do_sidereal_cor",
action="store_true",
dest="do_sidereal_cor",
help="Sidereal motion correction for Sun (disabled by default)",
)
advanced.add_argument(
"--no_selfcal_split",
action="store_false",
dest="do_selfcal_split",
help="Disable split for self-calibration",
)
advanced.add_argument(
"--no_selfcal",
action="store_false",
dest="do_selfcal",
help="Disable self-calibration",
)
advanced.add_argument(
"--no_ap_selfcal",
action="store_false",
dest="do_ap_selfcal",
help="Disable amplitude-phase self-calibration",
)
advanced.add_argument(
"--no_solar_selfcal",
action="store_false",
dest="solar_selfcal",
help="Disable solar-specific self-calibration parameters",
)
advanced.add_argument(
"--no_target_split",
action="store_false",
dest="do_target_split",
help="Disable target data split",
)
advanced.add_argument(
"--no_applycal",
action="store_false",
dest="do_applycal",
help="Disable application of basic calibration solutions",
)
advanced.add_argument(
"--no_apply_selfcal",
action="store_false",
dest="do_apply_selfcal",
help="Disable application of self-calibration solutions",
)
advanced.add_argument(
"--no_imaging",
action="store_false",
dest="do_imaging",
help="Disable final imaging",
)
# === Advanced local system/ per node hardware resource parameters ===
advanced_resource = parser.add_argument_group(
"###################\nAdvanced hardware resource parameters for local system or per node on HPC cluster\n###################"
)
advanced_resource.add_argument(
"--cpu_frac",
type=float,
default=0.8,
help="Fraction of CPU usuage per node",
)
advanced_resource.add_argument(
"--mem_frac",
type=float,
default=0.8,
help="Fraction of memory usuage per node",
)
advanced_resource.add_argument(
"--keep_backup",
action="store_true",
help="Keep backup of intermediate steps",
)
advanced_resource.add_argument(
"--no_remote_logger",
action="store_false",
dest="remote_logger",
help="Disable remote logger",
)
advanced_resource.add_argument(
"--logger_alivetime",
type=float,
default=0,
help="Keep remote logger alive for this many hours (Otherwise, logger will be removed after 15 minutes of inactivity)",
)
# === Advanced local system/ per node hardware resource parameters ===
advanced_hpc = parser.add_argument_group(
"###################\nAdvanced HPC settings\n###################"
)
advanced_hpc.add_argument(
"--n_nodes",
type=int,
default=0,
help="Number of compute nodes to use (0 means local cluster)",
)
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
sys.exit(1)
args = parser.parse_args()
try:
print("\n########################################")
print("Starting MeerSOLAR Pipeline....")
print("#########################################\n")
# TODO: multi specification imaging parameters as json dic
msg = master_control(
msname=args.msname,
workdir=args.workdir,
outdir=args.outdir,
solar_data=args.solar_data,
# Pre-calibration
do_forcereset_weightflag=args.do_forcereset_weightflag,
do_cal_partition=args.do_cal_partition,
do_cal_flag=args.do_cal_flag,
do_import_model=args.do_import_model,
# Basic calibration
do_basic_cal=args.do_basic_cal,
do_noise_cal=args.do_noise_cal,
do_applycal=args.do_applycal,
# Target data preparation
do_target_split=args.do_target_split,
target_scans=args.target_scans,
freqrange=args.freqrange,
timerange=args.timerange,
uvrange=args.cal_uvrange,
# Polarization calibration
do_polcal=args.do_polcal,
# Self-calibration
do_selfcal=args.do_selfcal,
do_selfcal_split=args.do_selfcal_split,
do_apply_selfcal=args.do_apply_selfcal,
do_ap_selfcal=args.do_ap_selfcal,
solar_selfcal=args.solar_selfcal,
solint=args.solint,
# Sidereal correction
do_sidereal_cor=args.do_sidereal_cor,
# Dynamic spectra
make_ds=args.make_ds,
# Imaging
do_imaging=args.do_imaging,
do_pbcor=args.do_pbcor,
weight=args.weight,
robust=args.robust,
minuv=args.minuv,
image_freqres=args.image_freqres,
image_timeres=args.image_timeres,
pol=args.pol,
apply_parang=args.apply_parang,
clean_threshold=args.clean_threshold,
use_multiscale=args.use_multiscale,
use_solar_mask=args.use_solar_mask,
cutout_rsun=args.cutout_rsun,
make_overlay=args.make_overlay,
# Resource settings
cpu_frac=args.cpu_frac,
mem_frac=args.mem_frac,
n_nodes=args.n_nodes,
keep_backup=args.keep_backup,
# Remote logging
remote_logger=args.remote_logger,
remote_logger_waittime=args.logger_alivetime,
)
except Exception as e:
traceback.print_exc()
finally:
drop_cache(args.msname)
drop_cache(args.workdir)
drop_cache(args.outdir)
if __name__ == "__main__":
main()