Source code for meersolar.pipeline.meer_pbcor

import os, glob, traceback, argparse, time
from meersolar.pipeline.single_image_meerpbcor import get_pbcor_image
from meersolar.pipeline.basic_func import *
from dask import delayed, compute
from casatasks import casalog

try:
    casalogfile = casalog.logfile()
    os.system("rm -rf " + casalogfile)
except:
    pass


[docs] def get_fits_freq(image_file): hdr = fits.getheader(image_file) if hdr["CTYPE3"] == "FREQ": freq = hdr["CRVAL3"] return freq elif hdr["CTYPE4"] == "FREQ": freq = hdr["CRVAL4"] return freq else: print("No frequency axis in image.") return
[docs] def run_pbcor(imagename, pbdir, pbcor_dir, apply_parang, jobid=0, ncpu=8): cmd = f"run_single_meerpbcor {imagename} --pbdir {pbdir} --pbcor_dir {pbcor_dir} --ncpu {ncpu} --jobid {jobid}" print (cmd) if apply_parang == False: cmd += " --no_apply_parang" a = os.system(f"{cmd} > {imagename}.tmp") os.system(f"rm -rf {imagename}.tmp") return a
[docs] def pbcor_all_images( imagedir, make_TB=True, make_plots=True, apply_parang=True, jobid=0, cpu_frac=0.8, mem_frac=0.8, ): """ Correct primary beam of MeerKAT for images in a directory Parameters ---------- imagedir : str Name of the image directory make_TB : bool, optional Make brightness temperature map make_plots : bool, optional Make plots apply_parang : bool, optional Apply parallactic angle correction jobid : int, optional Job ID cpu_frac : float, optional CPU fraction to use mem_frac : float, optional Memory fraction to use Returns ------- int Success message """ imagedir=imagedir.rstrip("/") pbdir = f"{os.path.dirname(imagedir)}/pbdir" pbcor_dir = f"{os.path.dirname(imagedir)}/pbcor_images" os.makedirs(pbdir, exist_ok=True) os.makedirs(pbcor_dir, exist_ok=True) try: images = glob.glob(f"{imagedir}/*.fits") if make_TB: tb_dir = f"{os.path.dirname(imagedir)}/tb_images" os.makedirs(tb_dir, exist_ok=True) if len(images) == 0: print(f"No image is present in image directory: {imagedir}") return 1 first_set = [] remaining_set = [] freqs = [] for image in images: freq = get_fits_freq(image) if freq in freqs: remaining_set.append(image) else: freqs.append(freq) first_set.append(image) mem_limit = ( 16 * max([os.path.getsize(image) for image in images]) / 1024**3 ) # In GB if len(first_set) > 0: dask_client, dask_cluster, n_jobs, n_threads, mem_limit = get_dask_client( len(first_set), dask_dir=pbdir, cpu_frac=cpu_frac, mem_frac=mem_frac, min_mem_per_job=mem_limit / 0.6, ) tasks = [] for image in first_set: task = delayed(run_pbcor)( image, pbdir, pbcor_dir, apply_parang, jobid=jobid, ncpu=n_threads ) tasks.append(task) results = compute(*tasks) dask_client.close() dask_cluster.close() successful_pbcor = 0 for r in results: if r == 0: successful_pbcor += 1 if len(remaining_set) > 0: print(f"Correcting remaining images of different timestamps.") dask_client, dask_cluster, n_jobs, n_threads, mem_limit = get_dask_client( len(remaining_set), dask_dir=pbdir, cpu_frac=cpu_frac, mem_frac=mem_frac, min_mem_per_job=mem_limit / 0.6, ) tasks = [] for image in remaining_set: task = delayed(run_pbcor)( image, pbdir, pbcor_dir, apply_parang, jobid=jobid, ncpu=n_threads ) tasks.append(task) results = compute(*tasks) dask_client.close() dask_cluster.close() for r in results: if r == 0: successful_pbcor += 1 ############################################ # Saving fits in helioprojective coordinates ############################################ if successful_pbcor>0: hpcdir=f"{pbcor_dir}/hpcs" pbcor_images = glob.glob(f"{pbcor_dir}/*.fits") os.makedirs(hpcdir, exist_ok=True) print("Saving primary beam corrected images helioprojective coordinates...") for image in pbcor_images: save_in_hpc(image,outdir=hpcdir) if make_plots: print("Making plots of primary beam corrected images ...") pngdir = f"{pbcor_dir}/pngs" pdfdir = f"{pbcor_dir}/pdfs" os.makedirs(pngdir, exist_ok=True) os.makedirs(pdfdir, exist_ok=True) for image in pbcor_images: try: outimages=plot_in_hpc( image, draw_limb=True, extensions=["png","pdf"], outdirs=[pngdir,pdfdir] ) except: junkpng=f"{pngdir}/{os.path.basename(image).split('.fits')[0]}.png.junk" junkpdf=f"{pngdir}/{os.path.basename(image).split('.fits')[0]}.pdf.junk" os.system(f"touch {junkpng}") os.system(f"touch {junkpdf}") #################################### # Making brightness temperature maps #################################### if successful_pbcor> 0 and make_TB: for pbcor_image in pbcor_images: tb_image = ( tb_dir + "/" + os.path.basename(pbcor_image).split(".fits")[0] + "_TB.fits" ) generate_tb_map(pbcor_image, outfile=tb_image) ############################################ # Saving fits in helioprojective coordinates ########################################### hpcdir=f"{tb_dir}/hpcs" tb_images = glob.glob(f"{tb_dir}/*.fits") os.makedirs(hpcdir, exist_ok=True) print("Saving brightness temperature maps helioprojective coordinates...") for image in tb_images: save_in_hpc(image,outdir=hpcdir) if make_plots: print("Making plots of brightness temperature maps..") pngdir = f"{tb_dir}/pngs" pdfdir = f"{tb_dir}/pdfs" os.makedirs(pngdir, exist_ok=True) os.makedirs(pdfdir, exist_ok=True) for image in tb_images: outimages=plot_in_hpc( image, draw_limb=True, extensions=["png","pdf"], outdirs=[pngdir,pdfdir] ) ######################################### # Final calculations ######################################### print(f"Total input images: {len(images)}") print(f"Total corrected images: {len(pbcor_images)}") if make_TB: print(f"Total brightness temperatures maps: {len(tb_images)}") os.system(f"rm -rf {pbdir}/dask-scratch-space") return 0 except Exception as e: traceback.print_exc() return 1 finally: time.sleep(5) drop_cache(imagedir) drop_cache(pbcor_dir) os.system(f"rm -rf {pbdir}")
[docs] def main(): parser = argparse.ArgumentParser( description="Correct all images for MeerKAT full-pol averaged primary beam", formatter_class=SmartDefaultsHelpFormatter, ) ## Essential parameters basic_args = parser.add_argument_group( "###################\nEssential parameters\n###################" ) basic_args.add_argument("imagedir", help="Path to image directory") basic_args.add_argument("--workdir", default="", help="Path to work directory") ## Advanced parameters adv_args = parser.add_argument_group( "###################\nAdvanced parameters\n###################" ) adv_args.add_argument( "--no_make_TB", action="store_false", dest="make_TB", help="Do not generate brightness temperature map", ) adv_args.add_argument( "--no_make_plots", action="store_false", dest="make_plots", help="Do not make png and pdf plots", ) adv_args.add_argument( "--no_apply_parang", action="store_false", dest="apply_parang", help="Do not apply parallactic angle correction", ) ## Resource management parameters hard_args = parser.add_argument_group( "###################\nHardware resource management parameters\n###################" ) hard_args.add_argument( "--cpu_frac", type=float, default=0.8, help="CPU usage fraction" ) hard_args.add_argument( "--mem_frac", type=float, default=0.8, help="Memory usage fraction" ) hard_args.add_argument("--logfile", default=None, help="Path to log file") hard_args.add_argument( "--jobid", type=int, default=0, help="Job ID for logging and PID tracking" ) if len(sys.argv) == 1: parser.print_help(sys.stderr) sys.exit(1) args = parser.parse_args() if args.workdir == "" or not os.path.exists(args.workdir): workdir = os.path.dirname(os.path.abspath(args.msname)) + "/workdir" else: workdir = args.workdir os.makedirs(workdir,exist_ok=True) pid = os.getpid() meersolar_cachedir = get_meersolar_cachedir() save_pid(pid, f"{meersolar_cachedir}/pids/pids_{args.jobid}.txt") logfile = args.logfile observer = None if os.path.exists(f"{workdir}/jobname_password.npy") and logfile is not None: time.sleep(5) jobname, password = np.load( f"{workdir}/jobname_password.npy", allow_pickle=True ) if os.path.exists(logfile): observer = init_logger( "all_pbcor", logfile, jobname=jobname, password=password ) try: if os.path.exists(args.imagedir): msg = pbcor_all_images( args.imagedir, make_TB=args.make_TB, make_plots=args.make_plots, apply_parang=args.apply_parang, jobid=args.jobid, cpu_frac=args.cpu_frac, mem_frac=args.mem_frac, ) else: print("Please provide correct image directory path.") msg = 1 except Exception: traceback.print_exc() msg = 1 finally: time.sleep(5) drop_cache(args.imagedir) drop_cache(workdir) clean_shutdown(observer) return msg
if __name__ == "__main__": result = main() print( "\n###################\nPrimary beam corrections are done.\n###################\n" ) os._exit(result)