Source code for meersolar.utils.selfcal_utils

import psutil
import numpy as np
import traceback
import copy
import glob
import os
from casatools import msmetadata, table
from .basic_utils import *
from .resource_utils import *
from .proc_manage_utils import *
from .ms_metadata import *
from .casatasks import *
from .flagging import *
from .calibration import *
from .imaging import *
from .image_utils import *
from .udocker_utils import *


[docs] def intensity_selfcal( msname, logger, selfcaldir, cellsize, imsize, round_number=0, uvrange="", minuv=0, calmode="ap", solint="60s", refant="1", solmode="", gaintype="T", applymode="calonly", threshold=3, weight="briggs", robust=0.0, use_previous_model=False, use_solar_mask=True, mask_radius=20, min_tol_factor=-1, ncpu=-1, mem=-1, ): """ A single self-calibration round Parameters ---------- msname : str Name of the measurement set logger : logger Python logger selfcaldir : str Self-calibration directory cellsize : float Cellsize in arcsec imsize : int Image pixel size round_number : int, optional Selfcal iteration number uvrange : float, optional UV range for calibration calmode : str, optional Calibration mode ('p' or 'ap') solint : str, optional Solution intervals refant : str, optional Reference antenna applymode : str, optional Solution apply mode (calonly or calflag) threshold : float, optional Imaging and auto-masking threshold weight : str, optional Image weighting robust : float, optional Robust parameter for briggs weighting use_previous_model : bool, optional Use previous model use_solar_mask : bool, optional Use solar disk mask or not mask_radius : float, optional Mask radius in arcminute min_tol_factor : float, optional Minimum tolerance factor ncpu : int, optional Number of CPUs to use in WSClean mem : float, optional Memory usage limit in WSClean Returns ------- int Success message str Caltable name float RMS based dynamic range float RMS of the image str Image name str Model image name str Residual image name """ limit_threads(n_threads=ncpu) from casatasks import gaincal, bandpass, applycal, flagdata, delmod, flagmanager try: ################################## # Setup wsclean params ################################## if ncpu < 1: ncpu = psutil.cpu_count() if mem < 0: mem = round(psutil.virtual_memory().available / (1024**3), 2) msname = msname.rstrip("/") if not use_previous_model: delmod(vis=msname, otf=True, scr=True) prefix = ( selfcaldir + "/" + os.path.basename(msname).split(".ms")[0] + "_selfcal_present" ) ############################ # Determining channel blocks ############################ msmd = msmetadata() msmd.open(msname) times = msmd.timesforspws(0) timeres = np.diff(times) pos = np.where(timeres > 3 * np.nanmedian(timeres))[0] max_intervals = min(1, len(pos)) freqs = msmd.chanfreqs(0, unit="MHz") freqMHz = np.nanmean(freqs) freqres = freqs[1] - freqs[0] freq_width = calc_bw_smearing_freqwidth(msname) nchan = int(freq_width / freqres) total_nchan = len(freqs) freq = msmd.meanfreq(0, unit="MHz") total_time = max(times) - min(times) msmd.close() chanres = np.diff(freqs) chanres /= np.nanmin(chanres) pos = np.where(chanres > 1)[0] chanrange_list = [] start_chan = 0 for i in range(len(pos) + 1): if i > len(pos) - 1: end_chan = total_nchan else: end_chan = pos[i] + 1 if end_chan - start_chan > 10 or len(chanrange_list) == 0: chanrange_list.append(f"{start_chan} {end_chan}") else: last_chanrange = chanrange_list[-1] chanrange_list.remove(last_chanrange) start_chan = last_chanrange.split(" ")[0] chanrange_list.append(f"{start_chan} {end_chan}") start_chan = end_chan + 1 if len(chanrange_list) == 0: unflag_chans, flag_chans = get_chans_flag(msname) chanrange_list = [f"{min(unflag_chans)} {max(unflag_chans)}"] ######################################## # Scale bias list and channel range list ######################################## scale_bias_list = [] for chanrange in chanrange_list: start_chan = int(chanrange.split(" ")[0]) end_chan = int(chanrange.split(" ")[-1]) mid_chan = int((start_chan + end_chan) / 2) mid_freq = freqs[mid_chan] scale_bias = round(get_multiscale_bias(mid_freq), 2) scale_bias_list.append(scale_bias) ############################################ # Merge channel ranges with identical scale bias ############################################ merged_channels = [] merged_biases = [] start, end = map(int, chanrange_list[0].split()) current_bias = scale_bias_list[0] for i in range(1, len(chanrange_list)): next_start, next_end = map(int, chanrange_list[i].split()) next_bias = scale_bias_list[i] if next_bias == current_bias: # Merge ranges (irrespective of contiguity) end = next_end else: # Finalize current group merged_channels.append(f"{start} {end}") merged_biases.append(current_bias) # Start new group start, end = next_start, next_end current_bias = next_bias # Final group merged_channels.append(f"{start} {end}") merged_biases.append(current_bias) chanrange_list = copy.deepcopy(merged_channels) scale_bias_list = copy.deepcopy(merged_biases) del merged_channels, merged_biases ############################### # Temporal chunking list ############################### nintervals = 1 nchan_list = [] nintervals_list = [] for i in range(len(chanrange_list)): chanrange = chanrange_list[i] ################################################################### # Spectral variation is kept fixed at 10% level. # Because selfcal is done along temporal axis, where variation matters most ################################################################### if min_tol_factor <= 0: min_tol_factor = 1.0 nintervals, _ = get_optimal_image_interval( msname, chan_range=f"{chanrange.replace(' ',',')}", temporal_tol_factor=float(min_tol_factor / 100.0), spectral_tol_factor=0.1, max_nchan=-1, max_ntime=max_intervals, ) nchan_list.append(nchan) nintervals_list.append(nintervals) os.system(f"rm -rf {prefix}*image.fits {prefix}*residual.fits") if weight == "briggs": weight += " " + str(robust) wsclean_args = [ "-quiet", "-scale " + str(cellsize) + "asec", "-size " + str(imsize) + " " + str(imsize), "-no-dirty", "-gridder wgridder", "-weight " + weight, "-niter 10000", "-mgain 0.85", "-nmiter 5", "-gain 0.1", "-minuv-l " + str(minuv), "-j " + str(ncpu), "-abs-mem " + str(mem), "-no-negative", "-auto-mask " + str(threshold + 0.1), "-auto-threshold " + str(threshold), ] ngrid = int(ncpu / 2) if ngrid > 1: wsclean_args.append("-parallel-gridding " + str(ngrid)) ################################################ # Creating and using solar mask ################################################ if use_solar_mask: fits_mask = msname.split(".ms")[0] + "_solar-mask.fits" if os.path.exists(fits_mask) == False: logger.info(f"Creating solar mask of size: {mask_radius} arcmin.\n") fits_mask = create_circular_mask( msname, cellsize, imsize, mask_radius=mask_radius ) if fits_mask is not None and os.path.exists(fits_mask): wsclean_args.append("-fits-mask " + fits_mask) ###################################### # Running imaging per channel range ###################################### final_image_list = [] final_model_list = [] final_residual_list = [] for i in range(len(chanrange_list)): chanrange = chanrange_list[i] per_chanrange_wsclean_args = copy.deepcopy(wsclean_args) ###################################### # Multiscale configuration ###################################### start_chan = int(chanrange.split(" ")[0]) end_chan = int(chanrange.split(" ")[-1]) chan_number = int((start_chan + end_chan) / 2) sun_dia = calc_sun_dia(freqMHz) # Sun diameter in arcmin sun_rad = sun_dia / 2 multiscale_scales = calc_multiscale_scales( msname, 3, chan_number=chan_number, max_scale=sun_rad ) per_chanrange_wsclean_args.append("-multiscale") per_chanrange_wsclean_args.append("-multiscale-gain 0.1") per_chanrange_wsclean_args.append( "-multiscale-scales " + ",".join([str(s) for s in multiscale_scales]) ) per_chanrange_wsclean_args.append( f"-multiscale-scale-bias {scale_bias_list[i]}" ) if imsize >= 2048 and 4 * max(multiscale_scales) < 1024: per_chanrange_wsclean_args.append("-parallel-deconvolution 1024") ################################################################### # Spectral variation is kept fixed at 10% level. # Because selfcal is done along temporal axis, where variation matters most ################################################################### if len(scale_bias_list) > 1: if min_tol_factor <= 0: min_tol_factor = 1.0 nintervals, _ = get_optimal_image_interval( msname, chan_range=f"{chanrange.replace(' ',',')}", temporal_tol_factor=float(min_tol_factor / 100.0), spectral_tol_factor=0.1, ) ##################################### # Spectral imaging configuration ##################################### if nchan > 1: per_chanrange_wsclean_args.append(f"-channels-out {nchan}") per_chanrange_wsclean_args.append("-no-mf-weighting") per_chanrange_wsclean_args.append("-join-channels") ##################################### # Temporal imaging configuration ##################################### if nintervals > 1: per_chanrange_wsclean_args.append(f"-intervals-out {nintervals}") logger.info(f"Spectral chunks: {nchan}, temporal chunks: {nintervals}.") temp_prefix = f"{prefix}_chan_{chanrange.replace(' ','_')}" per_chanrange_wsclean_args.append(f"-name {temp_prefix}") per_chanrange_wsclean_args.append(f"-channel-range {chanrange}") if use_previous_model: previous_models = glob.glob(f"{temp_prefix}*model.fits") if nchan > 1: total_models_expected = (nchan + 1) * nintervals else: total_models_expected = (nchan) * nintervals if len(previous_models) == total_models_expected: per_chanrange_wsclean_args.append("-continue") else: os.system(f"rm -rf {temp_prefix}*") wsclean_cmd = ( "wsclean " + " ".join(per_chanrange_wsclean_args) + " " + msname ) logger.info(f"WSClean command: {wsclean_cmd}\n") msg = run_wsclean(wsclean_cmd, "solarwsclean", verbose=False) if msg != 0: logger.info(f"Imaging is not successful.\n") else: ##################################### # Analyzing images ##################################### wsclean_files = {} for suffix in ["image", "model", "residual"]: files = glob.glob(temp_prefix + f"*MFS-{suffix}.fits") if not files: files = glob.glob(temp_prefix + f"*{suffix}.fits") wsclean_files[suffix] = files wsclean_images = wsclean_files["image"] wsclean_models = wsclean_files["model"] wsclean_residuals = wsclean_files["residual"] final_image = ( temp_prefix.replace("present", f"{round_number}") + "_I_image.fits" ) final_model = ( temp_prefix.replace("present", f"{round_number}") + "_I_model.fits" ) final_residual = ( temp_prefix.replace("present", f"{round_number}") + "_I_residual.fits" ) if len(wsclean_images) == 0: print("No image is made.") elif len(wsclean_images) == 1: os.system(f"cp -r {wsclean_images[0]} {final_image}") else: final_image = make_timeavg_image( wsclean_images, final_image, keep_wsclean_images=True ) final_image_list.append(final_image) if len(wsclean_models) == 1: os.system(f"cp -r {wsclean_models[0]} {final_model}") else: final_model = make_timeavg_image( wsclean_models, final_model, keep_wsclean_images=True ) final_model_list.append(final_model) if len(wsclean_residuals) == 1: os.system(f"cp -r {wsclean_residuals[0]} {final_residual}") else: final_residual = make_timeavg_image( wsclean_residuals, final_residual, keep_wsclean_images=True ) final_residual_list.append(final_residual) ######################################### # Restoring flags if applymode is calflag ######################################### if applymode == "calflag": with suppress_output(): flags = flagmanager(vis=msname, mode="list") keys = flags.keys() for k in keys: if k == "MS": pass else: version = flags[0]["name"] if "applycal" in version: try: with suppress_output(): flagmanager( vis=msname, mode="restore", versionname=version ) flagmanager( vis=msname, mode="delete", versionname=version ) except BaseException: pass ####################################################################### # Final frequency averaged images for backup or calculating dynamic ranges ####################################################################### final_image = prefix.replace("present", f"{round_number}") + "_I_image.fits" final_model = prefix.replace("present", f"{round_number}") + "_I_model.fits" final_residual = ( prefix.replace("present", f"{round_number}") + "_I_residual.fits" ) if len(final_image_list) == 1: os.system(f"mv {final_image_list[0]} {final_image}") else: final_image = make_freqavg_image( final_image_list, final_image, keep_wsclean_images=False ) if len(final_model_list) == 1: os.system(f"mv {final_model_list[0]} {final_model}") else: final_model = make_freqavg_image( final_model_list, final_model, keep_wsclean_images=False ) if len(final_residual_list) == 1: os.system(f"mv {final_residual_list[0]} {final_residual}") else: final_residual = make_freqavg_image( final_residual_list, final_residual, keep_wsclean_images=False ) os.system("rm -rf *psf.fits") ##################################### # Calculating dynamic ranges ###################################### model_flux, rms_DR, rms = calc_dyn_range( final_image, final_model, final_residual, fits_mask=fits_mask, ) if model_flux == 0: logger.info(f"No model flux.\n") return 1, "", 0, 0, "", "", "" ##################### # Perform calibration ##################### bpass_caltable = prefix.replace("present", f"{round_number}") + ".gcal" if os.path.exists(bpass_caltable): os.system("rm -rf " + bpass_caltable) if calmode == "p": logger.info( f"gaincal(vis='{msname}',caltable='{bpass_caltable}',uvrange='{uvrange}',refant='{refant}',solint='{solint}',calmode='p',minsnr=1,solnorm=True)\n" ) with suppress_output(): bandpass( vis=msname, caltable=bpass_caltable, uvrange=uvrange, refant=refant, minsnr=1, solint=f"{solint}", calmode="p", solnorm=True, ) else: logger.info( f"bandpass(vis='{msname}',caltable='{bpass_caltable}',uvrange='{uvrange}',refant='{refant}',solint='{solint},10MHz',minsnr=1,solnorm=True)\n" ) with suppress_output(): bandpass( vis=msname, caltable=bpass_caltable, uvrange=uvrange, refant=refant, minsnr=1, solint=f"{solint},10MHz", solnorm=True, ) if os.path.exists(bpass_caltable) == False: logger.info(f"No gain solutions are found.\n") return 2, "", 0, 0, "", "", "" ######################################### # Flagging bad gains ######################################### if calmode == "ap": with suppress_output(): flagdata( vis=bpass_caltable, mode="rflag", datacolumn="CPARAM", flagbackup=False, ) if applymode == "calonly": tb = table() tb.open(bpass_caltable, nomodify=False) gain = tb.getcol("CPARAM") flag = tb.getcol("FLAG") gain[flag] = 1.0 pos = np.where(np.abs(gain) == 0.0) gain[pos] = 1.0 flag *= False tb.putcol("CPARAM", gain) tb.putcol("FLAG", flag) tb.flush() tb.close() logger.info( f"applycal(vis={msname},gaintable=[{bpass_caltable}],interp=['linear,linearflag'],applymode='{applymode}',calwt=[False])\n" ) with suppress_output(): applycal( vis=msname, gaintable=[bpass_caltable], interp=["linear,linearflag"], applymode=applymode, calwt=[False], ) ##################################### # Flag zeros ##################################### with suppress_output(): flagdata( vis=msname, mode="clip", clipzeros=True, datacolumn="corrected", flagbackup=False, ) return ( 0, bpass_caltable, rms_DR, rms, final_image, final_model, final_residual, ) except Exception as e: print(e) traceback.print_exc() return 3, "", 0, 0, "", "", ""