Source code for meersolar.pipeline.import_model

import sys, psutil, traceback
import os, numpy as np, time, glob, argparse
from meersolar.pipeline.basic_func import *
from casatasks import casalog
from casatools import msmetadata
from dask import delayed, compute

try:
    logfile = casalog.logfile()
    os.system("rm -rf " + logfile)
except:
    pass

datadir = get_datadir()


[docs] def get_polmodel_coeff(model_file): """ Get polarization model coefficients Parameters ---------- model_file : str Model ascii file name Returns ------- float Reference frequency in GHz float Stokes I flux density in Jy at reference frequency list Stokes I spectrum log-polynomial coefficients list List of linear polarization fraction polynomial coefficients list List of linear polarization angle (in radians) coefficients """ freq, I, pfrac, pangle = np.loadtxt(model_file, unpack=True, skiprows=5) ref_freq = freq[0] x = (freq - ref_freq) / ref_freq polyI = np.polyfit(np.log10(x[1:]), np.log10(I[1:]), deg=5)[::-1] poly_pfrac = np.polyfit(x, pfrac, deg=5)[::-1] poly_pangle = np.polyfit(x, pangle, deg=5)[::-1] return ref_freq, I[0], polyI[1:], poly_pfrac, poly_pangle
[docs] def polcal_setjy(msname="", field_name="", n_threads=-1, ismms=True, dry_run=False): """ Setjy polcal fields (3C286 or 3C138) Parameters ---------- msname : str Name of measurement set field : str, optional Field name n_threads : int, optional Number of OpenMP threads """ limit_threads(n_threads=n_threads) from casatasks import setjy if dry_run: process = psutil.Process(os.getpid()) mem = round(process.memory_info().rss / 1024**3, 2) # in GB return mem try: print(f"Polarization calibrator field name : {field_name}") if field_name in ["3C286", "1328+307", "1331+305", "J1331+3030"]: ref_freq, I, polyI, poly_pfrac, poly_pangle = get_polmodel_coeff( datadir + "/3C286_pol_model.txt" ) elif field_name in ["3C138", "0518+165", "0521+166", "J0521+1638"]: ref_freq, I, polyI, poly_pfrac, poly_pangle = get_polmodel_coeff( datadir + "/3C138_pol_model.txt" ) else: print("Field name is not either of the polcal, 3C286 or 3C138.") return 1 print(f"Reference frequency: {ref_freq} GHz.") with suppress_casa_output(): setjy( vis=msname, field=field_name, scalebychan=True, standard="manual", fluxdensity=[I, 0.0, 0.0, 0.0], spix=polyI, reffreq=f"{ref_freq}GHz", polindex=poly_pfrac, polangle=poly_pangle, rotmeas=0, usescratch=True, ) except Exception as e: traceback.print_exc() return
[docs] def phasecal_setjy(msname="", field="", ismms=False, n_threads=-1, dry_run=False): """ Setjy phasecal fields Parameters ---------- msname : str Measurement set field : str, optional Phasecal fields ismms : bool, optional Is a multi-ms ot not n_threads : int, optional Number of OpenMP threads """ limit_threads(n_threads=n_threads) from casatasks import setjy if dry_run: process = psutil.Process(os.getpid()) mem = round(process.memory_info().rss / 1024**3, 2) # in GB return mem try: with suppress_casa_output(): setjy( vis=msname, field=field, standard="manual", fluxdensity=[1.0, 0, 0, 0], usescratch=True, ismms=ismms, ) except Exception as e: traceback.print_exc() return
[docs] def import_fluxcal_models(mslist, fluxcal_fields, fluxcal_scans, ncpus=1, mem_frac=0.8): """ Import model visibilities using crystalball Parameters ---------- mslist : list List of sub-MS fluxcal_fields : list Fluxcal fields fluxcal_scans : dict Fluxcal scans dict ncpus : int Number of CPU threads to use mem_frac : float Memory fraction to use Returns ------- int Success message """ try: if len(fluxcal_fields) == 0: print("No flux calibrator scan is present.\n") return 1 print("##############################") print("Import fluxcal models") print("##############################") bandname = get_band_name(mslist[0]) cpu_count = psutil.cpu_count() ncpus = max(1, ncpus) ncpus = min(ncpus, cpu_count - 1) mem_frac = max(mem_frac, 0.8) scans = [] for ms in mslist: ms_scans = get_ms_scans(ms) for s in ms_scans: scans.append(s) for fluxcal in fluxcal_fields: f_scan = fluxcal_scans[fluxcal] for s in f_scan: if s in scans: pos = scans.index(s) sub_msname = mslist[pos] modelname = datadir + "/" + fluxcal + "_" + bandname + "_model.txt" crys_cmd_args = [ "-sm " + modelname, "-f " + fluxcal, "-ns 1000", "-j " + str(ncpus), "-mf " + str(mem_frac), ] crys_cmd = ( "crystalball " + " ".join(crys_cmd_args) + " " + sub_msname ) print(crys_cmd) tmpfile = f"tmp_{os.path.basename(sub_msname).split('.ms')[0]}" with suppress_casa_output(): msg = os.system(crys_cmd + f" > {tmpfile}") os.system(f"rm -rf {tmpfile}") if msg == 0: print(f"Fluxcal model is imported successfully for scan: {s}.") else: print(f"Error in importing fluxcal model for scan: {s}.") return 0 except Exception as e: print("Error in importing model.") traceback.print_exc() return 1
[docs] def import_phasecal_models( mslist, phasecal_fields, phasecal_scans, workdir, cpu_frac=0.8, mem_frac=0.8 ): """ Import model visibilities for phasecal Parameters ---------- mslist : list List of sub-MS phasecal_fields : list Phasecal fields phasecal_scans : list Phasecal scans workdir : str Work directory cpu_frac : float, optional CPU fraction to use mem_frac : float, optional Memory fraction to use Returns ------- int Success message """ try: print("##########################") print("Import phasecal models") print("##########################") task = delayed(phasecal_setjy)(dry_run=True) mem_limit = run_limited_memory_task(task) dask_client, dask_cluster, n_jobs, n_threads, mem_limit = get_dask_client( len(mslist), dask_dir=workdir, cpu_frac=cpu_frac, mem_frac=mem_frac, min_mem_per_job=mem_limit / 0.6, ) scans = [] for ms in mslist: ms_scans = get_ms_scans(ms) for s in ms_scans: scans.append(s) tasks = [] for phasecal in phasecal_fields: ph_scan = phasecal_scans[phasecal] for s in ph_scan: if s in scans: pos = scans.index(s) sub_msname = mslist[pos] tasks.append( delayed(phasecal_setjy)( sub_msname, field=phasecal, ismms=True, n_threads=n_threads, ) ) results = compute(*tasks) dask_client.close() dask_cluster.close() print("Phasecal models are initiated.\n") return 0 except Exception as e: print("Error in initiaing phasecal models.") traceback.print_exc() return 1
[docs] def import_polcal_model( mslist, polcal_fields, polcal_scans, workdir, cpu_frac=0.8, mem_frac=0.8 ): """ Import model for polarization calibrators (3C286 or 3C138) Parameters ---------- mslist : list List of sub-MS polcal_fields : list Polcal fields polcal_scans : list Polcal scans workdir : str Work directory n_threads : int, optional Number of OpenMP threads Returns ------- int Success message """ try: if len(polcal_scans) == 0: print("No polarization calibrator scans is present.") return 1 task = delayed(polcal_setjy)(dry_run=True) mem_limit = run_limited_memory_task(task) dask_client, dask_cluster, n_jobs, n_threads, mem_limit = get_dask_client( len(polcal_scans), dask_dir=workdir, cpu_frac=cpu_frac, mem_frac=mem_frac, min_mem_per_job=mem_limit / 0.6, ) scans = [] for ms in mslist: ms_scans = get_ms_scans(ms) for s in ms_scans: scans.append(s) tasks = [] for polcal_field in polcal_fields: p_scan = polcal_scans[polcal_field] for s in p_scan: if s in scans: pos = scans.index(s) sub_msname = mslist[pos] tasks.append( delayed(polcal_setjy)( sub_msname, polcal_field, ismms=True, n_threads=n_threads ) ) results = compute(*tasks) dask_client.close() dask_cluster.close() return 0 except Exception as e: traceback.print_exc() return 1
[docs] def import_all_models(msname, workdir, cpu_frac=0.8, mem_frac=0.8): """ Import all calibrator models Parameters ---------- msname : str Measurement set name workdir : str Work directory cpu_frac : float, optional CPU fraction to use mem_frac : float, optional Memory fraction to use Returns ------- int Success message """ start_time = time.time() cpu_threads = psutil.cpu_count() ncpu = int(cpu_threads * cpu_frac) mem_frac = 1 - mem_frac try: msname = msname.rstrip("/") correct_missing_col_subms(msname) mspath = os.path.dirname(os.path.abspath(msname)) os.chdir(mspath) result = get_submsname_scans(msname) if result != None: # If multi-ms mslist, scans = result else: print("Please provide a multi-MS with scans partitioned.") return 1, [] fluxcal_fields, fluxcal_scans = get_fluxcals(msname) phasecal_fields, phasecal_scans, phasecal_flux_list = get_phasecals(msname) polcal_fields, polcal_scans = get_polcals(msname) fluxcal_result = import_fluxcal_models( mslist, fluxcal_fields, fluxcal_scans, ncpus=ncpu, mem_frac=mem_frac ) if fluxcal_result != 0: print("##################") print("Total time taken : " + str(time.time() - start_time) + "s") print("##################\n") return fluxcal_result, 1, 1 phasecal_result = import_phasecal_models( mslist, phasecal_fields, phasecal_scans, workdir, cpu_frac=cpu_frac, mem_frac=mem_frac, ) polcal_result = import_polcal_model( mslist, polcal_fields, polcal_scans, workdir, cpu_frac=cpu_frac, mem_frac=mem_frac, ) if phasecal_result != 0: print( "Phasecal model was not import, but fluxcal model import is successful." ) if polcal_result != 0: print( "Polcal model was not imported, but fluxcal model import is successful." ) print("##################") print("Total time taken : " + str(time.time() - start_time) + "s") print("##################\n") return fluxcal_result, phasecal_result, polcal_result except Exception as e: print("##################") print("Total time taken : " + str(time.time() - start_time) + "s") print("##################\n") traceback.print_exc() return 1, 1, 1 finally: time.sleep(5) drop_cache(msname) drop_cache(workdir)
[docs] def main(): usage = "Import calibrator models" parser = argparse.ArgumentParser( description=usage, formatter_class=SmartDefaultsHelpFormatter ) ## Essential parameters basic_args = parser.add_argument_group( "###################\nEssential parameters\n###################" ) basic_args.add_argument("msname", type=str, help="Name of measurement set") basic_args.add_argument( "--workdir", type=str, default="", help="Name of work directory" ) ## Resource management parameters hard_args = parser.add_argument_group( "###################\nHardware resource management parameters\n###################" ) hard_args.add_argument( "--cpu_frac", type=float, default=0.8, help="CPU fraction to use" ) hard_args.add_argument( "--mem_frac", type=float, default=0.8, help="Memory fraction to use" ) hard_args.add_argument("--logfile", type=str, default=None, help="Log file") hard_args.add_argument("--jobid", type=int, default=0, help="Job ID") if len(sys.argv) == 1: parser.print_help(sys.stderr) sys.exit(1) args = parser.parse_args() pid = os.getpid() save_pid(pid, datadir + f"/pids/pids_{args.jobid}.txt") if args.workdir == "" or not os.path.exists(args.workdir): workdir = os.path.dirname(os.path.abspath(args.msname)) + "/workdir" else: workdir = args.workdir os.makedirs(workdir, exist_ok=True) observer = None if os.path.exists(f"{workdir}/jobname_password.npy") and args.logfile: time.sleep(5) jobname, password = np.load( f"{workdir}/jobname_password.npy", allow_pickle=True ) if os.path.exists(args.logfile): observer = init_logger( "import_model", args.logfile, jobname=jobname, password=password ) try: if args.msname and os.path.exists(args.msname): fluxcal_result, phasecal_result, polcal_result = import_all_models( args.msname, workdir, cpu_frac=args.cpu_frac, mem_frac=args.mem_frac, ) os.system("touch " + workdir + "/.fluxcal_" + str(fluxcal_result)) os.system("touch " + workdir + "/.phasecal_" + str(phasecal_result)) os.system("touch " + workdir + "/.polcal_" + str(polcal_result)) if fluxcal_result == 1 or ( fluxcal_result == 1 and phasecal_result == 1 and polcal_result == 1 ): msg = 1 else: msg = 0 else: print("Please provide correct measurement set.\n") msg = 1 except Exception as e: traceback.print_exc() os.system("touch " + workdir + "/.fluxcal_1") os.system("touch " + workdir + "/.phasecal_1") os.system("touch " + workdir + "/.polcal_1") msg = 1 finally: time.sleep(5) drop_cache(args.msname) drop_cache(args.workdir) clean_shutdown(observer) return msg
if __name__ == "__main__": result = main() print( "\n###################\nVisibility simulation is finished.\n###################\n" ) os._exit(result)