import os, time, psutil, numpy as np, glob, traceback, gc, copy
import dask.array as da, argparse
from meersolar.pipeline.basic_func import *
from meersolar.pipeline.flagging import single_ms_flag
from meersolar.pipeline.import_model import import_fluxcal_models
from casatools import table, ms as casamstool, msmetadata
from dask import delayed, compute, config
from casatasks import casalog
try:
logfile = casalog.logfile()
os.system("rm -rf " + logfile)
except:
pass
[docs]
def split_casatask(
msname="", outputvis="", scan="", time_range="", n_threads=-1, dry_run=False
):
limit_threads(n_threads=n_threads)
if dry_run:
process = psutil.Process(os.getpid())
mem = round(process.memory_info().rss / 1024**3, 2) # in GB
return mem
with suppress_casa_output():
from casatasks import split
split(
vis=msname,
outputvis=outputvis,
scan=scan,
timerange=time_range,
datacolumn="data",
uvrange="0",
correlation="XX,YY",
)
return outputvis
[docs]
def split_autocorr(
msname, workdir, scan_list, time_window=-1, cpu_frac=0.8, mem_frac=0.8
):
"""
Split auto-correlations
Parameters
----------
msname : str
Measurement set
workdir : str
Working directory
scan_list : list
Scan list
time_window : float, optional
Time window in seconds from start time of the scan
cpu_frac : float, optional
CPU fraction to use
mem_frac : float, optional
Memory fraction to use
Returns
-------
list
Splited ms list
"""
msname = msname.rstrip("/")
task = delayed(split_casatask)(dry_run=True)
mem_limit = run_limited_memory_task(task, dask_dir=workdir)
dask_client, dask_cluster, n_jobs, n_threads, mem_limit = get_dask_client(
len(scan_list),
dask_dir=workdir,
cpu_frac=cpu_frac,
mem_frac=mem_frac,
min_mem_per_job=mem_limit / 0.6,
)
tasks = []
for scan in scan_list:
if time_window > 0:
tb = table()
tb.open(msname)
tbsel = tb.query(f"SCAN_NUMBER=={scan}")
times = tbsel.getcol("TIME")
tbsel.close()
tb.close()
if len(times) == 0:
continue
start_time = times[0]
end_time = start_time + time_window # add in seconds
if end_time > max(times):
end_time = max(times)
start_time = mjdsec_to_timestamp(start_time, str_format=1)
end_time = mjdsec_to_timestamp(end_time, str_format=1)
time_range = f"{start_time}~{end_time}"
else:
time_range = ""
outputvis = workdir + "/autocorr_scan_" + str(scan) + ".ms"
if os.path.exists(outputvis):
os.system("rm -rf " + outputvis)
if os.path.exists(outputvis + ".flagversions"):
os.system("rm -rf " + outputvis + ".flagversions")
tasks.append(
delayed(split_casatask)(
msname, outputvis, scan, time_range, n_threads=n_threads
)
)
if len(tasks) > 0:
autocorr_mslist = compute(*tasks)
else:
autocorr_mslist = []
dask_client.close()
dask_cluster.close()
return autocorr_mslist
[docs]
def get_on_off_power(msname="", scale_factor="", ant_list=[], dry_run=False):
"""
Get noise diode on and off power averaged over antennas
Parameters
----------
msname : str
Measurement set name
ant_list : list
Antenna id list
Returns
-------
numpy.array
Spectra of power difference
"""
if dry_run:
process = psutil.Process(os.getpid())
mem = round(process.memory_info().rss / 1024**3, 2) # in GB
return mem
######################
mstool = casamstool()
mstool.open(msname)
mstool.select({"antenna1": ant_list, "antenna2": ant_list, "uvdist": [0.0, 0.0]})
mstool.selectpolarization(["XX", "YY"])
data_dict = mstool.getdata(["DATA", "FLAG"], ifraxis=True)
mstool.close()
del mstool
gc.collect()
data_source = np.abs(data_dict["data"]).astype(np.float32)
data_source[data_dict["flag"]] = np.nan
del data_dict
gc.collect()
n_total = data_source.shape[-1]
n_tstamps = (n_total // 2) * 2
antslice = slice(min(ant_list), max(ant_list) + 1)
if data_source[0, 0, 0, 0] > data_source[0, 0, 0, 1]:
on_idx = slice(0, n_tstamps, 2)
off_idx = slice(1, n_tstamps, 2)
else:
on_idx = slice(1, n_tstamps, 2)
off_idx = slice(0, n_tstamps, 2)
###################################################
# Averaging along time axis in the antenna chunk
###################################################
diff_source = np.nanmean(
scale_factor[..., None, None] * data_source[..., on_idx]
- data_source[..., off_idx],
axis=-1,
)
del data_source
gc.collect()
return diff_source
[docs]
def get_att_per_ant(cal_msname, source_msname, scale_factor, ant_list=[]):
"""
Get per antenna attenuatioin array
Parameters
----------
cal_msname : str
Fluxcal scan with noise diode
source_msname : str
Source scan with noise diode
scale_factor : numpy.array
Scaling factor for on-off gain offset in fluxcal scan
ant_list : list, optional
Antenna list, default: all antennas
Returns
-------
numpy.array
Attenuation per antenna array. Shape : npol, nchan, nantenna
"""
if len(ant_list) == 0:
msmd = msmetadata()
msmd.open(cal_msname)
nant = msmd.nantennas()
msmd.close()
del msmd
ant_list = [i for i in range(nant)]
cal_diff = get_on_off_power(cal_msname, scale_factor, ant_list=ant_list)
gc.collect()
source_diff = get_on_off_power(
source_msname, scale_factor * 0 + 1, ant_list=ant_list
)
gc.collect()
att = source_diff / cal_diff
del source_diff, cal_diff
return att
[docs]
def get_power_diff(
cal_msname="",
source_msname="",
on_cal="",
off_cal="",
n_threads=-1,
memory_limit=-1,
dry_run=False,
):
"""
Estimate power level difference between alternative correlator dumps.
Parameters
----------
cal_msname : str
Fluxcal measurement set
source_msname : str
Source measurement set
on_cal : str
Noise diode on caltable
off_cal : str
Noise diode off caltable
n_threads : int, optional
Number of OpenMP threads
memory_limit : float, optional
Memory limit in GB
Returns
-------
numpy.array
Attenuation spectra for both polarizations avergaed over all antennas
numpy.array
Attenuation spectra for both polarizations for all antennas
"""
if dry_run:
process = psutil.Process(os.getpid())
mem = round(process.memory_info().rss / 1024**3, 2) # in GB
return mem
import warnings
warnings.filterwarnings("ignore")
starttime = time.time()
limit_threads(n_threads=n_threads)
if memory_limit == -1:
memory_limit = psutil.virtual_memory().available / 1024**3 # GB
cal_msname = cal_msname.rstrip("/")
source_msname = source_msname.rstrip("/")
print(
f"Estimating power difference from {os.path.basename(cal_msname)} and {os.path.basename(source_msname)}....\n"
)
# Get MS metadata
msmd = msmetadata()
msmd.open(source_msname)
nrow = int(msmd.nrows())
nchan = msmd.nchan(0)
npol = msmd.ncorrforpol(0)
nant = msmd.nantennas()
nbaselines = msmd.nbaselines()
if nbaselines == 0 or nrow % nbaselines != 0:
nbaselines += nant
ntime = int(nrow / nbaselines)
msmd.close()
del msmd
gc.collect()
####################################
# Calculate mean on-off gain offset
####################################
tb = table()
tb.open(on_cal)
ongain = np.abs(tb.getcol("CPARAM")) ** 2
tb.close()
tb.open(off_cal)
offgain = np.abs(tb.getcol("CPARAM")) ** 2
tb.close()
del tb
G = (ongain - offgain) / offgain # Power offset
del ongain, offgain
gc.collect()
G_mean = np.nanmean(G, axis=-1) # Averaged over antennas
del G
gc.collect()
scale_factor = 1 / (1 + G_mean)
del G_mean
gc.collect()
########################################
# Determining chunk size
########################################
cal_mssize = get_ms_size(cal_msname,only_autocorr=True)
source_mssize = get_ms_size(source_msname,only_autocorr=True)
total_mssize = cal_mssize + source_mssize
scale_factor_size = nant * ntime * scale_factor.nbytes / (1024.0**3)
att_ant_array_size = (npol * nchan * nant * 16) / (1024.0**3)
func_mem = get_on_off_power(dry_run=True)
per_ant_memory = (scale_factor_size + total_mssize) / nant
per_ant_memory += att_ant_array_size + func_mem
nant_per_chunk = min(nant, max(2, int(memory_limit / per_ant_memory)))
##############################################
# Estimating per antenna attenuation in chunks
##############################################
ant_blocks = [
list(range(i, min(i + nant_per_chunk, nant)))
for i in range(0, nant, nant_per_chunk)
]
for i, ant_block in enumerate(ant_blocks):
if i == 0:
att_ant_array = get_att_per_ant(
cal_msname, source_msname, scale_factor, ant_list=ant_block
)
else:
att_ant_array = np.append(
att_ant_array,
get_att_per_ant(
cal_msname, source_msname, scale_factor, ant_list=ant_block
),
axis=-1,
)
######################################
# Averaging over all antennas
######################################
att = np.nanmedian(att_ant_array, axis=-1)
return att, att_ant_array
[docs]
def estimate_att(
msname,
workdir,
noise_on_caltable,
noise_off_caltable,
noise_diode_flux_scan,
valid_target_scans,
time_window=900,
cpu_frac=0.8,
mem_frac=0.8,
):
"""
Estimate attenaution scaling
Parameters
----------
msname : str
Measurement set name
workdir : str
Working directory
noise_on_caltable : int
Caltable with noise diode on
noise_off_caltable : str
Caltable with noise diode off
noise_diode_flux_scan : float
Fluxcal scan with noise diode
valid_target_scans : list
Valid list of target scans
time_window : float, optional
Time window in second to use
cpu_frac : float, optional
CPU fraction to use
mem_frac : float, optional
Memory fraction to use
Returns
-------
int
Success message
dict
Dictionary with attenuation spectra for each solar scan
list
Attenuation spectra file list
"""
try:
print("Estimating attenuation ...")
###########################################
# All auto-corr scans spliting
###########################################
all_scans = [noise_diode_flux_scan] + valid_target_scans
scan_sizes = []
for scan in all_scans:
scan_sizes.append(get_ms_scan_size(msname, int(scan), only_autocorr=True))
print("Spliting auto-correlation in different scans ...")
autocorr_mslist = split_autocorr(
msname,
workdir,
all_scans,
time_window=time_window,
cpu_frac=cpu_frac,
mem_frac=mem_frac,
)
drop_cache(msname)
drop_cache(workdir)
if len(autocorr_mslist) == 0:
print("No scans splited.")
return 1, None, None
##########################################
# Flagging on corrected data
##########################################
fluxcal_fields, fluxcal_scans = get_fluxcals(msname)
badspw = get_bad_chans(msname)
bad_ants, bad_ants_str = get_bad_ants(msname, fieldnames=fluxcal_fields)
print("Flagging auto-correlation measurement sets ...")
task = delayed(single_ms_flag)(dry_run=True)
mem_limit = run_limited_memory_task(task, dask_dir=workdir)
mem_limit += max(scan_sizes)
dask_client, dask_cluster, n_jobs, n_threads, mem_limit = get_dask_client(
len(autocorr_mslist),
dask_dir=workdir,
cpu_frac=cpu_frac,
mem_frac=mem_frac,
min_mem_per_job=mem_limit / 0.6,
)
tasks = []
for autocorr_msname in autocorr_mslist:
tasks.append(
delayed(single_ms_flag)(
autocorr_msname,
badspw=badspw,
bad_ants_str=bad_ants_str,
datacolumn="data",
use_tfcrop=True,
use_rflag=False,
flagdimension="freq",
flag_autocorr=False,
n_threads=n_threads,
memory_limit=mem_limit,
)
)
results = compute(*tasks)
dask_client.close()
dask_cluster.close()
for autocorr_msname in autocorr_mslist:
drop_cache(autocorr_msname)
att_level = {}
########################################
# Calculating per scan level
########################################
print("Calculating noise-diode power difference ...")
task = delayed(get_power_diff)(dry_run=True)
mem_limit = run_limited_memory_task(task, dask_dir=workdir)
dask_client, dask_cluster, n_jobs, n_threads, mem_limit = get_dask_client(
len(valid_target_scans),
dask_dir=workdir,
cpu_frac=cpu_frac,
mem_frac=mem_frac,
min_mem_per_job=mem_limit / 0.6,
)
all_scaling_files = []
filtered_scans = []
tasks = []
for scan in valid_target_scans:
autocorr_msname = f"{workdir}/autocorr_scan_{scan}.ms"
if autocorr_msname not in autocorr_mslist:
pass
tasks.append(
delayed(get_power_diff)(
f"{workdir}/autocorr_scan_{noise_diode_flux_scan}.ms",
f"{workdir}/autocorr_scan_{scan}.ms",
noise_on_caltable,
noise_off_caltable,
n_threads=n_threads,
memory_limit=mem_limit,
)
)
filtered_scans.append(scan)
results = compute(*tasks)
dask_client.close()
dask_cluster.close()
##########################################
# Determining frequencies
##########################################
msmd = msmetadata()
msmd.open(msname)
freqs = msmd.chanfreqs(0)
msmd.close()
msmd.done()
del msmd
########################
for i in range(len(filtered_scans)):
att_value = results[i][0]
att_ant_array = results[i][1]
if np.nanmean(att_value) < 0:
att_value *= -1
att_ant_array *= -1
scan = filtered_scans[i]
att_level[scan] = att_value
filename = (
workdir
+ "/"
+ os.path.basename(msname).split(".ms")[0]
+ "_attval_scan_"
+ str(scan)
)
att_ant_array_percentage_change = (
att_value[..., None] - att_ant_array
) / att_value[..., None]
flag_ants = []
for pol in range(2):
mean_percentage_change = np.nanmean(
att_ant_array_percentage_change[pol, ...], axis=0
)
std_percentage_change = np.nanstd(
att_ant_array_percentage_change[pol, ...], axis=0
)
pos = np.where(
np.abs(mean_percentage_change) > 3 * std_percentage_change
)[0]
if len(pos) > 0:
for i in range(len(pos)):
if pos[i] not in flag_ants:
flag_ants.append(pos[i])
np.save(
filename,
np.array(
[scan, freqs, att_value, flag_ants, att_ant_array], dtype="object"
),
)
all_scaling_files.append(filename + ".npy")
return 0, att_level, all_scaling_files
except Exception as e:
traceback.print_exc()
return 1, None, None
finally:
time.sleep(5)
drop_cache(msname)
drop_cache(workdir)
[docs]
def run_noise_cal(
msname,
workdir,
keep_backup=False,
cpu_frac=0.8,
mem_frac=0.8,
):
"""
Perform flux calibration using noise diode
Parameters
----------
msname : str
Measurement set
workdir : str
Working directory
keep_backup : bool, optional
Keep backup
cpu_frac : float, optional
CPU fraction to use
mem_frac : float, optional
Memory fraction to use
Returns
-------
int
Success message
dict
Attenuation values for different scans
list
File list saved attenuation values for different scans
"""
start_time = time.time()
ncpus = int(psutil.cpu_count() * (1 - cpu_frac))
limit_threads(n_threads=ncpus)
from casatasks import split, bandpass
msname = msname.rstrip("/")
workdir = workdir.rstrip("/")
try:
os.chdir(workdir)
print("##############################################")
print("Performing flux calibration using noise-diode.")
print("##############################################")
###################################
# Determining noise diode cal scans
###################################
fluxcal_fields, fluxcal_scans = get_fluxcals(msname)
target_scans, cal_scans, f_scans, g_scans, p_scans = get_cal_target_scans(
msname
)
valid_scans = get_valid_scans(msname)
noise_diode_cal_scan = ""
for scan in f_scans:
if scan in valid_scans:
noise_cal = determine_noise_diode_cal_scan(msname, scan)
if noise_cal:
noise_diode_cal_scan = scan
break
valid_target_scans = []
for scan in target_scans:
if scan in valid_scans:
valid_target_scans.append(scan)
if noise_diode_cal_scan == "":
print("No noise diode cal scan is present.")
print("##################")
print("Total time taken : ", time.time() - start_time)
print("##################\n")
return 1, None, None
##############################
# Split noise cal scan
##############################
print("Spliting auto-correlation in different scans ...")
noisecal_ms = workdir + "/noisecal.ms"
if os.path.exists(noisecal_ms):
os.system("rm -rf " + noisecal_ms)
if os.path.exists(noisecal_ms + ".flagversions"):
os.system("rm -rf " + noisecal_ms + ".flagversions")
with suppress_casa_output():
split(
vis=msname,
outputvis=noisecal_ms,
scan=noise_diode_cal_scan,
datacolumn="data",
)
###############################
# Flagging
###############################
print("Flagging noise cal ms....")
fluxcal_fields, fluxcal_scans = get_fluxcals(msname)
badspw = get_bad_chans(msname)
bad_ants, bad_ants_str = get_bad_ants(msname, fieldnames=fluxcal_fields)
single_ms_flag(
noisecal_ms,
badspw=badspw,
bad_ants_str=bad_ants_str,
datacolumn="data",
use_tfcrop=True,
use_rflag=False,
flagdimension="freq",
flag_autocorr=False,
)
##################################
# Import models
##################################
print("Importing calibrator models ....")
fluxcal_result = import_fluxcal_models(
[noisecal_ms],
fluxcal_fields,
fluxcal_scans,
ncpus=ncpus,
mem_frac=1 - cpu_frac,
)
##################################
# Bandpass calibration
##################################
print("Peforming bandpass with noise diode on and off....")
msmd = msmetadata()
msmd.open(noisecal_ms)
times = msmd.timesforscan(noise_diode_cal_scan)
msmd.close()
even_times = times[::2] # Even-indexed timestamps
odd_times = times[1::2] # Odd-indexed timestamps
even_timerange = ",".join(
[mjdsec_to_timestamp(t, str_format=1) for t in even_times]
)
odd_timerange = ",".join(
[mjdsec_to_timestamp(t, str_format=1) for t in odd_times]
)
mstool = casamstool()
mstool.open(noisecal_ms)
mstool.select({"antenna1": 1, "antenna2": 1, "time": [times[0], times[1]]})
dataeven = np.abs(mstool.getdata("DATA", ifraxis=True)["data"])
mstool.close()
mstool.open(noisecal_ms)
mstool.select({"antenna1": 1, "antenna2": 1, "time": [times[1], times[2]]})
dataodd = np.abs(mstool.getdata("DATA", ifraxis=True)["data"])
mstool.close()
if np.nansum(dataeven) > np.nansum(dataodd):
on_timerange = even_timerange
off_timerange = odd_timerange
else:
on_timerange = odd_timerange
off_timerange = even_timerange
oncal = noisecal_ms.split(".ms")[0] + "_on.bcal"
offcal = noisecal_ms.split(".ms")[0] + "_off.bcal"
with suppress_casa_output():
bandpass(
vis=noisecal_ms,
caltable=oncal,
timerange=on_timerange,
uvrange=">200lambda",
minsnr=1,
)
bandpass(
vis=noisecal_ms,
caltable=offcal,
timerange=off_timerange,
uvrange=">200lambda",
minsnr=1,
)
#####################################
# Determine attenuation scaling
#####################################
msg, att_level, all_scaling_files = estimate_att(
msname,
workdir,
oncal,
offcal,
noise_diode_cal_scan,
valid_target_scans,
time_window=900,
cpu_frac=cpu_frac,
mem_frac=mem_frac,
)
if keep_backup:
print("Backup directory: " + workdir + "/backup")
os.makedirs(workdir + "/backup",exist_ok=True)
os.system(
"mv "
+ noisecal_ms
+ " "
+ oncal
+ " "
+ offcal
+ " "
+ workdir
+ "/autocorr_scan_*.ms* "
+ workdir
+ "/backup/"
)
else:
os.system(
"rm -rf "
+ noisecal_ms
+ " "
+ oncal
+ " "
+ offcal
+ " "
+ workdir
+ "/autocorr_scan_*.ms*"
)
print("##################")
print("Total time taken : ", time.time() - start_time)
print("##################\n")
return msg, att_level, all_scaling_files
except Exception as e:
traceback.print_exc()
print("##################")
print("Total time taken : ", time.time() - start_time)
print("##################\n")
return 1, None, None
finally:
time.sleep(5)
drop_cache(msname)
drop_cache(workdir)
[docs]
def main():
parser = argparse.ArgumentParser(
description="Basic calibration using calibrators",
formatter_class=SmartDefaultsHelpFormatter,
)
## Essential parameters
basic_args = parser.add_argument_group(
"###################\nEssential parameters\n###################"
)
basic_args.add_argument(
"msname",
type=str,
help="Name of measurement set (required positional argument)",
)
basic_args.add_argument(
"--workdir",
type=str,
required=True,
default="",
help="Working directory (default: auto-created next to MS)",
)
basic_args.add_argument(
"--caldir",
type=str,
required=True,
default="",
help="Directory for calibration products (default: auto-created in the workdir MS)",
)
## Advanced parameters
adv_args = parser.add_argument_group(
"###################\nAdvanced parameters\n###################"
)
adv_args.add_argument(
"--keep_backup",
action="store_true",
help="Keep backup of measurement set after each round",
)
## Resource management parameters
hard_args = parser.add_argument_group(
"###################\nHardware resource management parameters\n###################"
)
hard_args.add_argument(
"--cpu_frac", type=float, default=0.8, help="CPU fraction to use"
)
hard_args.add_argument(
"--mem_frac", type=float, default=0.8, help="Memory fraction to use"
)
hard_args.add_argument("--logfile", type=str, default=None, help="Path to log file")
hard_args.add_argument(
"--jobid", type=str, default="0", help="Job ID for logging and tracking"
)
# === Show help if no arguments ===
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
sys.exit(1)
args = parser.parse_args()
pid = os.getpid()
save_pid(pid, datadir + f"/pids/pids_{args.jobid}.txt")
# === Set up workdir ===
if args.workdir == "" or not os.path.exists(args.workdir):
workdir = os.path.dirname(os.path.abspath(args.msname)) + "/workdir"
else:
workdir = args.workdir
os.makedirs(workdir,exist_ok=True)
if args.caldir=="" or not os.path.exists(args.caldir):
caldir=f"{workdir}/caltables"
else:
caldir=args.caldir
os.makedirs(caldir,exist_ok=True)
logfile = args.logfile
observer = None
if os.path.exists(f"{workdir}/jobname_password.npy") and logfile is not None:
time.sleep(5)
jobname, password = np.load(
f"{workdir}/jobname_password.npy", allow_pickle=True
)
if os.path.exists(logfile):
observer = init_logger(
"do_fluxcal", logfile, jobname=jobname, password=password
)
try:
if os.path.exists(args.msname):
print("\n###################################")
print("Starting flux calibration using noise-diode.")
print("###################################\n")
msg, att_level, all_scaling_files = run_noise_cal(
args.msname,
workdir,
keep_backup=args.keep_backup,
cpu_frac=args.cpu_frac,
mem_frac=args.mem_frac,
)
if msg == 0 and all_scaling_files is not None:
for att_file in all_scaling_files:
os.system("mv " + att_file + " " + caldir)
else:
print("Please provide a valid measurement set.")
msg = 1
except Exception:
traceback.print_exc()
msg = 1
finally:
time.sleep(5)
drop_cache(args.msname)
drop_cache(workdir)
drop_cache(caldir)
clean_shutdown(observer)
return msg
if __name__ == "__main__":
result = main()
print(
"\n###################\nNoise diode based flux calibration is finished.\n###################\n"
)
os._exit(result)