Source code for meersolar.pipeline.do_sidereal_cor

import numpy as np, os, time, traceback, gc, argparse
from meersolar.pipeline.basic_func import *
from casatasks import casalog
from dask import delayed, compute

try:
    logfile = casalog.logfile()
    os.system("rm -rf " + logfile)
except:
    pass


[docs] def cor_sidereal_motion( mslist, workdir, cpu_frac=0.8, mem_frac=0.8, max_cpu_frac=0.8, max_mem_frac=0.8 ): """ Perform sidereal motion correction Parameters ---------- mslist : list Measurement set list workdir : str Work directory cpu_frac : float, optional CPU fraction to use mem_frac : float, optional Memory fraction to use max_cpu_frac : float, optional Maximum CPU fraction to use max_mem_frac : float, optional Maximum memory fraction to use Returns ------- int Success message list List of sidereal motion corrected measurement sets """ start_time = time.time() try: ############################################# # Memory limit ############################################# task = delayed(correct_solar_sidereal_motion)(dry_run=True) mem_limit = run_limited_memory_task(task, dask_dir=workdir) ############################################# tasks = [] for ms in mslist: tasks.append(delayed(correct_solar_sidereal_motion)(ms)) total_chunks = len(tasks) dask_client, dask_cluster, n_jobs, n_threads, mem_limit = get_dask_client( total_chunks, dask_dir=workdir, cpu_frac=cpu_frac, mem_frac=mem_frac, min_mem_per_job=mem_limit / 0.6, ) results = compute(*tasks) dask_client.close() dask_cluster.close() splited_ms_list_phaserotated = [] for i in range(len(results)): msg = results[i] ms = mslist[i] if msg == 0: if os.path.exists(ms + "/.sidereal_cor"): splited_ms_list_phaserotated.append(ms) if len(splited_ms_list_phaserotated) == 0: print("##################") print( "Sidereal motion correction is not successful for any measurement set." ) print("Total time taken : ", time.time() - start_time) print("##################\n") return 1, [] else: print("##################") print("Sidereal motion corrections are done successfully.") print("Total time taken : ", time.time() - start_time) print("##################\n") return 0, splited_ms_list_phaserotated except Exception as e: traceback.print_exc() print("##################") print("Sidereal motion correction is not successful for any measurement set.") print("Total time taken : ", time.time() - start_time) print("##################\n") return 1, [] finally: time.sleep(5) for ms in mslist: drop_cache(ms) drop_cache(workdir)
[docs] def main(): parser = argparse.ArgumentParser( description="Correct measurement sets for sidereal motion", formatter_class=SmartDefaultsHelpFormatter, ) ## Essential parameters basic_args = parser.add_argument_group( "###################\nEssential parameters\n###################" ) basic_args.add_argument( "mslist", type=str, help="Comma-separated list of measurement sets (required positional argument)", ) basic_args.add_argument( "--workdir", type=str, default="", help="Working directory" ) ## Resource management parameters hard_args = parser.add_argument_group( "###################\nHardware resource management parameters\n###################" ) hard_args.add_argument( "--cpu_frac", type=float, default=0.8, help="CPU fraction to use", metavar="Float", ) hard_args.add_argument( "--mem_frac", type=float, default=0.8, help="Memory fraction to use", metavar="Float", ) hard_args.add_argument( "--max_cpu_frac", type=float, default=0.8, help="Maximum CPU fraction to use", metavar="Float", ) hard_args.add_argument( "--max_mem_frac", type=float, default=0.8, help="Maximum memory fraction to use", metavar="Float", ) hard_args.add_argument( "--logfile", type=str, default=None, help="Log file" ) hard_args.add_argument( "--jobid", type=int, default=0, help="Job ID" ) if len(sys.argv) == 1: parser.print_help(sys.stderr) sys.exit(1) args = parser.parse_args() pid = os.getpid() save_pid(pid, datadir + f"/pids/pids_{args.jobid}.txt") if args.workdir == "" or os.path.exists(args.workdir) == False: workdir = ( os.path.dirname(os.path.abspath(args.mslist.split(",")[0])) + "/workdir" ) else: workdir = args.workdir os.makedirs(workdir,exist_ok=True) logfile = args.logfile observer = None if os.path.exists(f"{workdir}/jobname_password.npy") and logfile != None: time.sleep(5) jobname, password = np.load( f"{workdir}/jobname_password.npy", allow_pickle=True ) if os.path.exists(logfile): observer = init_logger( "do_sidereal_cor", logfile, jobname=jobname, password=password ) mslist = args.mslist.split(",") try: if len(mslist) == 0: print("Please provide a list of measurement sets.") msg = 1 elif args.workdir == "" or os.path.exists(args.workdir) == False: print("Please provide a valid work directory.") msg = 1 else: msg, final_target_mslist = cor_sidereal_motion( mslist, args.workdir, cpu_frac=float(args.cpu_frac), mem_frac=float(args.mem_frac), max_cpu_frac=float(args.max_cpu_frac), max_mem_frac=float(args.max_mem_frac), ) except Exception as e: traceback.print_exc() msg = 1 finally: time.sleep(5) for ms in mslist: drop_cache(ms) drop_cache(args.workdir) clean_shutdown(observer) return msg
if __name__ == "__main__": result = main() print( "\n###################\Sidereal motion corrections are done.\n###################\n" ) os._exit(result)