Source code for meersolar.pipeline.do_imaging
import os, glob, resource, traceback, psutil, time, copy, math, argparse
from meersolar.pipeline.basic_func import *
from dask import delayed, compute
from casatasks import casalog
try:
logfile = casalog.logfile()
os.system("rm -rf " + logfile)
except:
pass
[docs]
def rename_image(
imagename,
imagedir="",
pol="",
band="",
cutout_rsun=2.5,
make_overlay=True,
make_plots=True,
):
"""
Rename and move image to image directory
Parameters
----------
imagename : str
Image name
imagedir : str, optional
Image directory (default given image directory)
pol : str, optional
Stokes parameters
band : str, optional
Observing band
cutout_rsun : float, optional
Cutout in solar radii from center (default: 2.5 solar radii)
make_overlay : bool, optional
Make overlay on SUVI
make_plots : bool, optional
Make radio map plot in helioprojective coordinates
Returns
-------
str
New imagename with full path
"""
imagename = imagename.rstrip("/")
imagename = cutout_image(
imagename, imagename, x_deg=(cutout_rsun * 2 * 16.0) / 60.0
)
header = fits.getheader(imagename)
time = header["DATE-OBS"]
astro_time=Time(time,scale='utc')
sun_jpl = Horizons(id='10', location='500', epochs=astro_time.jd)
eph = sun_jpl.ephemerides()
sun_coords = SkyCoord(ra=eph['RA'][0]*u.deg, dec=eph['DEC'][0]*u.deg, frame='icrs')
maxval, minval, rms, total_val, mean_val, median_val, rms_dyn, minmax_dyn = calc_solar_image_stat(imagename,disc_size=18)
with fits.open(imagename, mode="update") as hdul:
hdr = hdul[0].header
hdr["AUTHOR"] = "DevojyotiKansabanik,DeepanPatra"
if band != "":
hdr["BAND"] = band
hdr["PIPELINE"]="MeerSOLAR"
hdr["CRVAL1"] = sun_coords.ra.deg
hdr["CRVAL2"] = sun_coords.dec.deg
hdr["MAX"]=maxval
hdr["MIN"]=minval
hdr["RMS"]=rms
hdr["SUM"]=total_val
hdr["MEAN"]=mean_val
hdr["MEDIAN"]=median_val
hdr["RMSDYN"]=rms_dyn
hdr["MIMADYN"]=minmax_dyn
freq = round(header["CRVAL3"] / 10**6, 2)
t_str = "".join(time.split("T")[0].split("-")) + (
"".join(time.split("T")[-1].split(":"))
)
new_name = "time_" + t_str + "_freq_" + str(freq)
if pol != "":
new_name += "_pol_" + str(pol)
if "MFS" in imagename:
new_name += "_MFS"
new_name = new_name + ".fits"
if imagedir=="":
imagedir=os.path.dirname(os.path.abspath(imagename))
new_name = imagedir + "/" + new_name
os.system("mv " + imagename + " " + new_name)
hpcdir=f"{os.path.dirname(imagedir)}/images/hpcs"
os.makedirs(hpcdir, exist_ok=True)
save_in_hpc(new_name,outdir=hpcdir)
if make_plots:
try:
pngdir = f"{os.path.dirname(imagedir)}/images/pngs"
pdfdir = f"{os.path.dirname(imagedir)}/images/pdfs"
os.makedirs(pngdir, exist_ok=True)
os.makedirs(pdfdir, exist_ok=True)
outimages,cropped_map=plot_in_hpc(
new_name,
draw_limb=True,
extensions=["png","pdf"],
outdirs=[pngdir,pdfdir],
)
except:
pass
if make_overlay:
try:
overlay_pngdir = f"{os.path.dirname(imagedir)}/overlays_pngs"
overlay_pdfdir = f"{os.path.dirname(imagedir)}/overlays_pdfs"
os.makedirs(overlay_pngdir, exist_ok=True)
os.makedirs(overlay_pdfdir, exist_ok=True)
outimages=make_meer_overlay(
new_name,
plot_file_prefix=os.path.basename(new_name).split(".fits")[0]
+ "_suvi_meerkat_overlay",
extensions=["png","pdf"],
outdirs=[overlay_pngdir,overlay_pdfdir]
)
except Exception as e:
traceback.print_exc()
pass
return new_name
[docs]
def perform_imaging(
msname="",
workdir="",
datacolumn="CORRECTED_DATA",
freqrange="",
timerange="",
imagedir="",
imsize=1024,
cellsize=2,
nchan=1,
ntime=1,
pol="I",
weight="briggs",
robust=0.0,
minuv=0,
threshold=1.0,
use_multiscale=True,
use_solar_mask=True,
mask_radius=25,
savemodel=True,
saveres=True,
ncpu=-1,
mem=-1,
band="",
cutout_rsun=2.5,
make_overlay=True,
make_plots=True,
logfile="imaging.log",
dry_run=False,
):
"""
Perform spectropolarimetric snapshot imaging of a ms
Parameters
----------
msname : str
Name of the measurement set
workdir : str
Work directory name
datacolumn : str, optional
Data column
freqrange : str, optional
Frequency range to image
imagedir : str, optional
Image directory name (default: workdir). Images, models, residuals will be saved in directories named images. models, residuals inside imagedir
imsize : int, optional
Image size in pixels
cellsize : float, optional
Cell size in arcseconds
nchan : int, optional
Number of spectral channels
ntime : int, optional
Number of temporal slices
pol : str, optional
Stokes parameters to image
weight : str, optional
Image weighting scheme
robust : float, optional
Briggs weighting robustness parameter
minuv : float, optional
Minimum UV-lambda to be used in imaging
threshold : float, optional
CLEAN threshold
use_multiscale : bool, optional
Use multiscale or not
use_solar_mask : bool, optional
Use solar mask
mask_radius : float, optional
Mask radius in arcminute
savemodel : bool, optional
Save model images or not
saveres : bool, optional
Save residual images or not
band : str, optional
Band name
cutout_rsun : float, optional
Cutout image size in solar radii from center (default: 2.5 solar radii)
make_overlay : bool, optional
Make SUVI MeerKAT overlay
make_plots : bool, optional
Make radio map helioprojective plots
logfile : str, optional
Log file name
ncpu : int, optional
Number of CPU threads to use
mem : float, optional
Memory in GB to use
Returns
-------
int
Success message
list
List of images [[images],[models],[residuals]]
"""
if os.path.exists(logfile):
os.system(f"rm -rf {logfile}")
if dry_run:
process = psutil.Process(os.getpid())
usemem = round(process.memory_info().rss / 1024**3, 2) # in GB
return usemem
logger, logfile = create_logger(
os.path.basename(logfile).split(".log")[0], logfile, verbose=False
)
sub_observer = None
if os.path.exists(f"{workdir}/jobname_password.npy") and logfile != None:
time.sleep(5)
jobname, password = np.load(
f"{workdir}/jobname_password.npy", allow_pickle=True
)
if os.path.exists(logfile):
sub_observer = init_logger(
"remotelogger_imaging_{os.path.basename(msname).split('.ms')[0]}",
logfile,
jobname=jobname,
password=password,
)
try:
msname = msname.rstrip("/")
msname = os.path.abspath(msname)
if band == "":
band = get_band_name(msname)
logger.info(f"{os.path.basename(msname)} --Perform imaging...\n")
#########
# Imaging
#########
msmd = msmetadata()
msmd.open(msname)
freq = msmd.meanfreq(0, unit="MHz")
freqs = msmd.chanfreqs(0, unit="MHz")
times = msmd.timesforspws(0)
npol = msmd.ncorrforpol()[0]
msmd.close()
###################################
# Finding channel and time ranges
###################################
if freqrange != "":
start_chans = []
end_chans = []
freq_list = freqrange.split(",")
for f in freq_list:
start_freq = float(f.split("~")[0])
end_freq = float(f.split("~")[-1])
if start_freq >= np.nanmin(freqs) and end_freq <= np.nanmax(freqs):
start_chan = np.argmin(np.abs(start_freq - freqs))
end_chan = np.argmin(np.abs(end_freq - freqs))
start_chans.append(start_chan)
end_chans.append(end_chan)
else:
start_chans = [0]
end_chans = [len(freqs)]
if len(start_chans) == 0:
print(f"Please provide valid channel range between 0 and {len(freqs)}")
time.sleep(5)
clean_shutdown(sub_observer)
return 1, []
if timerange != "":
start_times = []
end_times = []
time_list = timerange.split(",")
for timerange in time_list:
start_time = timestamp_to_mjdsec(timerange.split("~")[0])
end_time = timestamp_to_mjdsec(timerange.split("~")[-1])
if start_time >= np.nanmin(times) and end_time <= np.nanmax(times):
start_times.append(np.argmin(np.abs(times - start_time)))
end_times.append(np.argmin(np.abs(times - end_time)))
else:
start_times = [0]
end_times = [len(times)]
if len(start_times) == 0:
print(
f"Please provide valid time range between {mjdsec_to_timestamp(times[0])} and {mjdsec_to_timestamp(times[-1])}"
)
time.sleep(5)
clean_shutdown(sub_observer)
return 1, []
if npol < 4 and pol == "IQUV":
pol = "I"
if ncpu < 1:
ncpu = psutil.cpu_count()
if mem < 0:
mem = psutil.virtual_memory().total / (1024**3)
prefix = workdir + "/imaging_" + os.path.basename(msname).split(".ms")[0]
if imagedir == "":
imagedir = workdir
os.makedirs(imagedir,exist_ok=True)
if weight == "briggs":
weight += " " + str(robust)
if threshold <= 1:
threshold = 1.1
wsclean_args = [
"-quiet",
"-scale " + str(cellsize) + "asec",
"-size " + str(imsize) + " " + str(imsize),
"-no-dirty",
"-gridder tuned-wgridder",
"-weight " + weight,
"-name " + prefix,
"-pol " + str(pol),
"-niter 10000",
"-mgain 0.85",
"-nmiter 5",
"-gain 0.1",
"-minuv-l " + str(minuv),
"-j " + str(ncpu),
"-abs-mem " + str(round(mem, 2)),
"-auto-threshold 1 -auto-mask " + str(threshold),
"-no-update-model-required",
]
if datacolumn != "CORRECTED_DATA" and datacolumn != "corrected":
wsclean_args.append("-data-column " + datacolumn)
ngrid = int(ncpu / 2)
if ngrid > 1:
wsclean_args.append("-parallel-gridding " + str(ngrid))
if pol == "I":
wsclean_args.append("-no-negative")
#####################################
# Spectral imaging configuration
#####################################
if nchan > 1:
wsclean_args.append(f"-channels-out {nchan}")
wsclean_args.append("-no-mf-weighting")
wsclean_args.append("-join-channels")
#####################################
# Temporal imaging configuration
#####################################
if ntime > 1:
wsclean_args.append(f"-intervals-out {ntime}")
################################################
# Creating and using a solar mask
################################################
if use_solar_mask:
fits_mask = prefix + "_solar-mask.fits"
if os.path.exists(fits_mask) == False:
logger.info(
f"{os.path.basename(msname)} -- Creating solar mask of size: {mask_radius} arcmin.\n",
)
fits_mask = create_circular_mask(
msname, cellsize, imsize, mask_radius=mask_radius
)
if fits_mask != None and os.path.exists(fits_mask):
wsclean_args.append("-fits-mask " + fits_mask)
final_list = []
for i in range(len(start_chans)):
for j in range(len(start_times)):
temp_wsclean_args = copy.deepcopy(wsclean_args)
temp_wsclean_args.append(
f"-channel-range {start_chans[i]} {end_chans[i]}"
)
temp_wsclean_args.append(f"-interval {start_times[j]} {end_times[j]}")
######################################
# Multiscale configuration
######################################
if use_multiscale:
num_pixel_in_psf = calc_npix_in_psf(weight, robust=robust)
multiscale_scales = calc_multiscale_scales(
msname,
num_pixel_in_psf,
chan_number=int((start_chans[i] + end_chans[i]) / 2),
)
temp_wsclean_args.append("-multiscale")
temp_wsclean_args.append("-multiscale-gain 0.1")
temp_wsclean_args.append(
"-multiscale-scales "
+ ",".join([str(s) for s in multiscale_scales])
)
mid_freq = np.nanmean(
freqs[int(start_chans[i]) : int(end_chans[i])]
)
scale_bias = get_multiscale_bias(mid_freq)
temp_wsclean_args.append(f"-multiscale-scale-bias {scale_bias}")
if imsize >= 2048 and 4 * max(multiscale_scales) < 1024:
temp_wsclean_args.append("-parallel-deconvolution 1024")
elif imsize >= 2048:
temp_wsclean_args.append("-parallel-deconvolution 1024")
######################################
# Running imaging
######################################
wsclean_cmd = "wsclean " + " ".join(temp_wsclean_args) + " " + msname
logger.info(
f"{os.path.basename(msname)} -- WSClean command: {wsclean_cmd}\n",
)
msg = run_wsclean(wsclean_cmd, "meerwsclean", verbose=False)
if msg == 0:
os.system("rm -rf " + prefix + "*psf.fits")
######################
# Making stokes cubes
######################
pollist = [i.upper() for i in list(pol)]
if len(pollist) == 1:
imagelist = sorted(glob.glob(prefix + "*image.fits"))
if savemodel == False:
os.system("rm -rf " + prefix + "*model.fits")
else:
modellist = sorted(glob.glob(prefix + "*model.fits"))
if saveres == False:
os.system("rm -rf " + prefix + "*residual.fits")
else:
reslist = sorted(glob.glob(prefix + "*residual.fits"))
else:
imagelist = []
stokeslist = []
for p in pollist:
stokeslist.append(
sorted(glob.glob(prefix + "*" + p + "-image.fits"))
)
for i in range(len(stokeslist[0])):
wsclean_images = sorted(
[stokeslist[k][i] for k in range(len(pollist))]
)
image_prefix = os.path.basename(wsclean_images[0]).split(
"-image"
)[0]
image_cube = make_stokes_wsclean_imagecube(
wsclean_images,
image_prefix + f"_{pol}_image.fits",
keep_wsclean_images=False,
)
imagelist.append(image_cube)
del stokeslist
if savemodel == False:
os.system("rm -rf " + prefix + "*model.fits")
else:
modellist = []
stokeslist = []
for p in pollist:
stokeslist.append(
sorted(glob.glob(prefix + f"*{p}*model.fits"))
)
for i in range(len(stokeslist[0])):
wsclean_models = sorted(
[stokeslist[k][i] for k in range(len(pollist))]
)
model_prefix = os.path.basename(
wsclean_models[0]
).split("-model")[0]
model_cube = make_stokes_wsclean_imagecube(
wsclean_models,
model_prefix + f"_{pol}_model.fits",
keep_wsclean_images=False,
)
modellist.append(model_cube)
del stokeslist
if saveres == False:
os.system("rm -rf " + prefix + "*residual.fits")
else:
reslist = []
stokeslist = []
for p in pollist:
stokeslist.append(
sorted(glob.glob(prefix + f"*{p}*residual.fits"))
)
for i in range(len(stokeslist[0])):
wsclean_residuals = sorted(
[stokeslist[k][i] for k in range(len(pollist))]
)
res_prefix = os.path.basename(
wsclean_residuals[0]
).split("-residual")[0]
residual_cube = make_stokes_wsclean_imagecube(
wsclean_residuals,
res_prefix + f"_{pol}_residual.fits",
keep_wsclean_images=False,
)
reslist.append(residual_cube)
del stokeslist
######################
# Renaming images
######################
if len(imagelist) > 0:
os.makedirs(imagedir + "/images",exist_ok=True)
final_image_list = []
for imagename in imagelist:
renamed_image = rename_image(
imagename,
imagedir=imagedir + "/images",
pol=pol,
band=band,
cutout_rsun=cutout_rsun,
make_overlay=make_overlay,
make_plots=make_plots,
)
final_image_list.append(renamed_image)
final_list.append(final_image_list)
if savemodel and len(modellist) > 0:
final_model_list = []
os.makedirs(imagedir + "/models",exist_ok=True)
for modelname in modellist:
renamed_model = rename_image(
modelname,
imagedir=imagedir + "/models",
pol=pol,
band=band,
cutout_rsun=cutout_rsun,
make_overlay=False,
make_plots=False,
)
final_model_list.append(renamed_model)
final_list.append(final_model_list)
if saveres and len(reslist) > 0:
final_res_list = []
os.makedirs(imagedir + "/residuals",exist_ok=True)
for resname in reslist:
renamed_res = rename_image(
resname,
imagedir=imagedir + "/residuals",
pol=pol,
band=band,
cutout_rsun=cutout_rsun,
make_overlay=False,
make_plots=False,
)
final_res_list.append(renamed_res)
final_list.append(final_res_list)
if use_solar_mask and os.path.exists(fits_mask):
os.system("rm -rf " + fits_mask)
if len(final_list) == 0 or len(final_list[0]) == 0:
logger.info(
f"{os.path.basename(msname)} -- No image is made.\n",
)
time.sleep(5)
clean_shutdown(sub_observer)
return 1, final_list
else:
logger.info(
f"{os.path.basename(msname)} -- Imaging is successfully done.\n",
)
time.sleep(5)
clean_shutdown(sub_observer)
return 0, final_list
else:
if use_solar_mask and os.path.exists(fits_mask):
os.system("rm -rf " + fits_mask)
logger.info(
f"{os.path.basename(msname)} -- No image is made.\n",
)
time.sleep(5)
clean_shutdown(sub_observer)
return 1, []
except Exception as e:
traceback.print_exc()
time.sleep(5)
clean_shutdown(sub_observer)
return 1, []
finally:
time.sleep(5)
drop_cache(msname)
[docs]
def run_all_imaging(
mslist=[],
mainlogger=None,
workdir="",
outdir="",
freqrange="",
timerange="",
datacolumn="CORRECTED_DATA",
freqres=-1,
timeres=-1,
weight="briggs",
robust=0.0,
minuv=0,
pol="I",
threshold=1.0,
use_multiscale=True,
use_solar_mask=True,
imaging_params={}, #TODO
savemodel=False,
saveres=False,
band="",
cutout_rsun=2.5,
make_overlay=True,
make_plots=True,
cpu_frac=0.8,
mem_frac=0.8,
logfile="imaging.log",
):
"""
Run spectropolarimetric snapshot imaging on a list of measurement sets
Parameters
----------
mslist : list
Measurement set list
mainlogger : str
Python logger
workdir : str
Work directory
outdir : str
Output directory
freqrange : str, optional
Frequency range to image
timerange : str, optional
Time range
datacolumn : str, optional
Data column
freqres : float, optional
Frequency resolution of spectral chunk in MHz
timeres : float, optional
Time resolution of temporal chunk in seconds
weight : str, optional
Image weighting
robust : float, optional
Briggs weighting robust parameter
minuv : float, optional
Minimum UV-lambda to use in imaging
pol : str, optional
Stokes parameters to image
threshold : float, optional
CLEAN threshold
use_multiscale : bool, optional
Use multiscale or not
use_solar_mask : bool, optional
Use solar mask
savemodel : bool, optional
Save model images or not
saveres : bool, optional
Save residual images or not
band : str, optional
Band name
cutout_rsun : float, optional
Cutout image size from center in solar radii (default : 2.5 solar radii)
make_overlay : bool, optional
Make SUVI MeerKAT overlay
make_plots : bool, optional
Make radio image helioprojective plots
cpu_frac : float, optional
CPU fraction to use
mem_frac : float, optional
Memory fraction to use
Returns
-------
int
Success message
"""
start_time = time.time()
mslist = sorted(mslist)
observer = None
if mainlogger == None:
mainlog_file = workdir + "/logs/imaging_targets.mainlog"
mainlogger, mainlog_file = create_logger(
os.path.basename(mainlog_file).split(".mainlog")[0],
mainlog_file,
verbose=False,
)
observer = None
if os.path.exists(f"{workdir}/jobname_password.npy") and mainlog_file != None:
time.sleep(5)
jobname, password = np.load(
f"{workdir}/jobname_password.npy", allow_pickle=True
)
if os.path.exists(mainlog_file):
print(f"Starting remote logger. Remote logger password: {password}")
observer = init_logger(
"all_imaging", mainlog_file, jobname=jobname, password=password
)
###########################
# WSClean container
###########################
container_name = "meerwsclean"
container_present = check_udocker_container(container_name)
if container_present == False:
container_name = initialize_wsclean_container(name=container_name)
if container_name == None:
print(
"Container {container_name} is not initiated. First initiate container and then run."
)
return 1
try:
if len(mslist) == 0:
mainlogger.error("Provide valid measurement set list.")
time.sleep(5)
clean_shutdown(observer)
return 1
if weight == "briggs":
weight_str = f"{weight}_{robust}"
else:
weight_str = weight
if freqres == -1 and timeres == -1:
imagedir = outdir + f"/imagedir_f_all_t_all_w_{weight_str}"
elif freqres != -1 and timeres == -1:
imagedir = outdir + f"/imagedir_f_{freqres}_t_all_w_{weight_str}"
elif freqres == -1 and timeres != -1:
imagedir = outdir + f"/imagedir_f_all_t_{timeres}_w_{weight_str}"
else:
imagedir = outdir + f"/imagedir_f_{freqres}_t_{timeres}_w_{weight_str}"
os.makedirs(imagedir, exist_ok=True)
####################################
# Filtering any corrupted ms
#####################################
filtered_mslist = [] # Filtering in case any ms is corrupted
for ms in mslist:
checkcol = check_datacolumn_valid(ms)
if checkcol:
filtered_mslist.append(ms)
else:
mainlogger.warning(f"Issue in : {ms}")
os.system(f"rm -rf {ms}")
mslist = filtered_mslist
#####################################
# Determining spectro-temporal chunks
#####################################
if timeres < 0:
ntime_list = [1] * len(mslist)
else:
ntime_list = []
msmd = msmetadata()
for ms in mslist:
msmd.open(ms)
times = msmd.timesforspws(0)
msmd.close()
tw = max(times) - min(times)
ntime = max(1, int(tw / timeres))
ntime_list.append(ntime)
if freqres < 0:
nchan_list = [1] * len(mslist)
else:
nchan_list = []
msmd = msmetadata()
for ms in mslist:
msmd.open(ms)
freqs = msmd.chanfreqs(0, unit="MHz")
msmd.close()
bw = max(freqs) - min(freqs)
nchan = max(1, math.ceil(bw / freqres))
nchan_list.append(nchan)
######################################
# Resetting maximum file limit
######################################
soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
new_soft_limit = max(soft_limit, int(0.8*hard_limit))
if soft_limit < new_soft_limit:
resource.setrlimit(resource.RLIMIT_NOFILE, (new_soft_limit, hard_limit))
total_fd = 0
npol = len(pol)
for i in range(len(mslist)):
ms = mslist[i]
nchan = nchan_list[i]
ntime = ntime_list[i]
per_job_fd = (
npol * (nchan + 1) * ntime * 4 * 2
) # 4 types of images, 2 is fudge factor
total_fd+=per_job_fd
n_jobs = max(1, int(new_soft_limit / total_fd))
n_jobs = min(len(mslist), n_jobs)
#################################
# Dask client setup
#################################
task = delayed(perform_imaging)(dry_run=True)
mem_limit = run_limited_memory_task(task, dask_dir=workdir)
dask_client, dask_cluster, n_jobs, n_threads, mem_limit = get_dask_client(
n_jobs,
dask_dir=workdir,
cpu_frac=cpu_frac,
mem_frac=mem_frac,
min_cpu_per_job=3,
min_mem_per_job=mem_limit / 0.6,
)
tasks = []
for i in range(len(mslist)):
ms = mslist[i]
nchan = nchan_list[i]
ntime = ntime_list[i]
num_pixel_in_psf = calc_npix_in_psf(weight, robust=robust)
cellsize = calc_cellsize(ms, num_pixel_in_psf)
instrument_fov = calc_field_of_view(ms, FWHM=False)
fov = min(instrument_fov, 32 * 3 * 60) # 3 solar radii
imsize = int(fov / cellsize)
pow2 = np.ceil(np.log2(imsize)).astype("int")
possible_sizes = []
for p in range(pow2):
for k in [3, 5]:
possible_sizes.append(k * 2**p)
possible_sizes = np.sort(np.array(possible_sizes))
possible_sizes = possible_sizes[possible_sizes >= imsize]
imsize = max(1024, int(possible_sizes[0]))
os.makedirs(workdir + "/logs",exist_ok=True)
logfile = (
workdir
+ "/logs/imaging_"
+ os.path.basename(ms).split(".ms")[0]
+ ".log"
)
mainlogger.info(
f"Starting imaging for ms : {ms}, Log file : {logfile}\n",
)
tasks.append(
delayed(perform_imaging)(
msname=ms,
workdir=workdir,
freqrange=freqrange,
timerange=timerange,
datacolumn=datacolumn,
imagedir=imagedir,
imsize=imsize,
cellsize=cellsize,
nchan=nchan,
ntime=ntime,
pol=pol,
weight=weight,
robust=robust,
minuv=minuv,
threshold=threshold,
use_multiscale=use_multiscale,
use_solar_mask=use_solar_mask,
savemodel=savemodel,
saveres=saveres,
band=band,
cutout_rsun=cutout_rsun,
make_overlay=make_overlay,
make_plots=make_plots,
ncpu=n_threads,
mem=mem_limit,
logfile=logfile,
)
)
results = compute(*tasks)
dask_client.close()
dask_cluster.close()
all_image_list = []
all_imaged_ms_list = []
for i in range(len(results)):
r = results[i]
if r[0] != 0:
mainlogger.info(
f"Imaging failed for ms : {mslist[i]}",
)
else:
all_imaged_ms_list.append(mslist[i])
for image in r[1][0]:
all_image_list.append(image)
mainlogger.info(
f"Numbers of input measurement sets : {len(mslist)}.",
)
mainlogger.info(
f"Imaging successfully done for: {len(all_imaged_ms_list)} measurement sets.",
)
mainlogger.info(f"Total images made: {len(all_image_list)}.")
mainlogger.info(
f"Total time taken: {round(time.time()-start_time,2)}s",
)
time.sleep(5)
clean_shutdown(observer)
return 0
except Exception as e:
traceback.print_exc()
mainlogger.info(
f"Total time taken: {round(time.time()-start_time,2)}s",
)
time.sleep(5)
clean_shutdown(observer)
return 1
finally:
time.sleep(5)
for ms in mslist:
drop_cache(ms)
drop_cache(workdir)
[docs]
def main():
parser = argparse.ArgumentParser(
description="Perform spectropolarimetric snapshot imaging",
formatter_class=SmartDefaultsHelpFormatter,
)
## Essential parameters
basic_args = parser.add_argument_group(
"###################\nEssential parameters\n###################"
)
basic_args.add_argument(
"mslist",
type=str,
help="Comma-separated list of measurement sets (required)",
)
basic_args.add_argument(
"--workdir",
type=str,
required=True,
default="",
help="Work directory for imaging",
)
basic_args.add_argument(
"--outdir",
type=str,
required=True,
default="",
help="Output directory for imaging products",
)
## Advanced parameters
adv_args = parser.add_argument_group(
"###################\nAdvanced imaging parameters\n###################"
)
adv_args.add_argument(
"--freqrange",
type=str,
default="",
help="Frequency range to image",
)
adv_args.add_argument(
"--timerange",
type=str,
default="",
help="Time range to image",
)
adv_args.add_argument(
"--datacolumn",
type=str,
default="CORRECTED_DATA",
help="Data column to use for imaging",
)
adv_args.add_argument(
"--pol",
type=str,
default="I",
help="Stokes parameters to image",
)
adv_args.add_argument(
"--freqres",
type=float,
default=-1,
help="Frequency resolution per chunk in MHz (-1 for full)",
)
adv_args.add_argument(
"--timeres",
type=float,
default=-1,
help="Time resolution per chunk in seconds (-1 for full)",
)
adv_args.add_argument(
"--weight",
type=str,
default="briggs",
help="Imaging weighting scheme",
)
adv_args.add_argument(
"--robust",
type=float,
default=0.0,
help="Briggs robust parameter",
)
adv_args.add_argument(
"--minuv_l",
dest="minuv",
type=float,
default=0.0,
help="Minimum UV distance in wavelengths",
)
adv_args.add_argument(
"--threshold",
type=float,
default=1.0,
help="CLEAN threshold in Jy",
)
adv_args.add_argument(
"--band",
type=str,
default="",
help="Band name label for output",
)
adv_args.add_argument(
"--cutout_rsun",
type=float,
default=2.5,
help="Cutout radius for images (solar radii)",
)
adv_args.add_argument(
"--no_multiscale",
action="store_false",
dest="use_multiscale",
help="Do not use multiscale CLEAN",
)
adv_args.add_argument(
"--no_solar_mask",
action="store_false",
dest="use_solar_mask",
help="Do not use solar disk mask for CLEANing",
)
adv_args.add_argument(
"--no_savemodel",
action="store_false",
dest="savemodel",
help="Do no save model images",
)
adv_args.add_argument(
"--no_saveres",
action="store_false",
dest="saveres",
help="Do not save residual images",
)
adv_args.add_argument(
"--no_make_overlay",
action="store_false",
dest="make_overlay",
help="Do not generate overlay with SUVI images",
)
adv_args.add_argument(
"--no_make_plots",
action="store_false",
dest="make_plots",
help="Do not make generate helioprojective plots",
)
## Resource management parameters
hard_args = parser.add_argument_group(
"###################\nHardware resource management parameters\n###################"
)
hard_args.add_argument(
"--cpu_frac",
type=float,
default=0.8,
help="Fraction of available CPU to use",
)
hard_args.add_argument(
"--mem_frac",
type=float,
default=0.8,
help="Fraction of available memory to use",
)
hard_args.add_argument(
"--jobid",
type=str,
default="0",
help="Job ID for process tracking and logging",
)
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
sys.exit(1)
args = parser.parse_args()
pid = os.getpid()
save_pid(pid, datadir + f"/pids/pids_{args.jobid}.txt")
if args.workdir == "" or not os.path.exists(args.workdir):
first_ms = args.mslist.split(",")[0]
workdir = os.path.dirname(os.path.abspath(first_ms)) + "/workdir"
else:
workdir = args.workdir
os.makedirs(workdir,exist_ok=True)
if args.outdir == "" or not os.path.exists(args.outdir):
outdir=workdir
else:
outdir = args.outdir
os.makedirs(outdir,exist_ok=True)
os.makedirs(workdir + "/logs/",exist_ok=True)
mainlog_file = workdir + "/logs/imaging_targets.mainlog"
mainlogger, mainlog_file = create_logger(
os.path.basename(mainlog_file).split(".mainlog")[0], mainlog_file, verbose=False
)
observer = None
if os.path.exists(f"{workdir}/jobname_password.npy") and mainlog_file is not None:
time.sleep(5)
jobname, password = np.load(
f"{workdir}/jobname_password.npy", allow_pickle=True
)
if os.path.exists(mainlog_file):
observer = init_logger(
"all_imaging", mainlog_file, jobname=jobname, password=password
)
mslist = args.mslist.split(",")
try:
if len(mslist) == 0:
mainlogger.info("Please provide a valid measurement set list.")
msg = 1
else:
msg = run_all_imaging(
mslist=mslist,
mainlogger=mainlogger,
workdir=workdir,
outdir=outdir,
freqrange=args.freqrange,
timerange=args.timerange,
datacolumn=args.datacolumn,
freqres=args.freqres,
timeres=args.timeres,
weight=args.weight,
robust=args.robust,
minuv=args.minuv,
threshold=args.threshold,
use_multiscale=args.use_multiscale,
use_solar_mask=args.use_solar_mask,
pol=args.pol,
band=args.band,
make_plots=args.make_plots,
cutout_rsun=args.cutout_rsun,
make_overlay=args.make_overlay,
savemodel=args.savemodel,
saveres=args.saveres,
cpu_frac=args.cpu_frac,
mem_frac=args.mem_frac,
)
except Exception:
traceback.print_exc()
msg = 1
finally:
time.sleep(5)
for ms in mslist:
drop_cache(ms)
drop_cache(workdir)
drop_cache(outdir)
clean_shutdown(observer)
return msg
if __name__ == "__main__":
result = main()
if result > 0:
result = 1
print("\n###################\nImaging is done.\n###################\n")
os._exit(result)