Source code for meersolar.utils.flagging

import types
import psutil
import numpy as np
import traceback
import glob
import os
from datetime import datetime as dt, timezone
from .basic_utils import *
from .resource_utils import *


###############################
# Flagging related functions
################################
[docs] def flagsummary(msname, summary_file): """ Save flag summary Parameters ---------- msname : str Measurement set name summary_file : str Summary file name Returns ------- str Summary file """ from casatasks import flagdata with suppress_output(): s = flagdata(vis=msname, mode="summary") allkeys = s.keys() with open(summary_file, "w") as f: f.write(f"Flag summary of: {msname}\n") for x in allkeys: try: for y in s[x].keys(): try: flagged_percent = 100.0 * ( s[x][y]["flagged"] / s[x][y]["total"] ) logstring = f"{x} {y} {flagged_percent}\n" f.write(logstring) except: pass except: pass return summary_file
[docs] def do_flag_backup(msname, flagtype="flagdata"): """ Take a flag backup Parameters ---------- msname : str Measurement set name flagtype : str, optional Flag type """ from casatools import agentflagger af = agentflagger() af.open(msname) versionlist = af.getflagversionlist() if len(versionlist) != 0: for version_name in versionlist: if flagtype in version_name: try: version_num = ( int(version_name.split(":")[0].split(" ")[0].split("_")[-1]) + 1 ) except BaseException: version_num = 1 else: version_num = 1 else: version_num = 1 dt_string = dt.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") af.saveflagversion( flagtype + "_" + str(version_num), "Flags autosave on " + dt_string ) af.done()
[docs] def get_unflagged_antennas( msname="", scan="", n_threads=-1, ): """ Get unflagged antennas of a scan Parameters ---------- msname : str Name of the measurement set scan : str Scans Returns ------- numpy.array Unflagged antenna names numpy.array Flag fraction list """ limit_threads(n_threads=n_threads) from casatasks import flagdata msname = msname.rstrip("/") mspath = os.path.dirname(os.path.abspath(msname)) os.chdir(mspath) with suppress_output(): flag_summary = flagdata(vis=msname, scan=str(scan), mode="summary") antenna_flags = flag_summary["antenna"] unflagged_antenna_names = [] flag_frac_list = [] for ant in antenna_flags.keys(): flag_frac = antenna_flags[ant]["flagged"] / antenna_flags[ant]["total"] if flag_frac < 1.0: unflagged_antenna_names.append(ant) flag_frac_list.append(flag_frac) unflagged_antenna_names = np.array(unflagged_antenna_names) flag_frac_list = np.array(flag_frac_list) return unflagged_antenna_names, flag_frac_list
[docs] def get_chans_flag( msname="", field="", n_threads=-1, ): """ Get flag/unflag channel list Parameters ---------- msname : str Measurement set name field : str, optional Field name or ID Returns ------- list Unflag channel list list Flag channel list """ limit_threads(n_threads=n_threads) from casatasks import flagdata msname = msname.rstrip("/") mspath = os.path.dirname(os.path.abspath(msname)) os.chdir(mspath) with suppress_output(): summary = flagdata(vis=msname, field=field, mode="summary", spwchan=True) unflag_chans = [] flag_chans = [] for chan in summary["spw:channel"]: r = summary["spw:channel"][chan] chan_number = int(chan.split("0:")[-1]) flag_frac = r["flagged"] / r["total"] if flag_frac == 1: flag_chans.append(chan_number) else: unflag_chans.append(chan_number) return unflag_chans, flag_chans
[docs] def calc_flag_fraction( msname="", field="", scan="", n_threads=-1, ): """ Function to calculate the fraction of total data flagged. Parameters ---------- msname : str Name of the measurement set field : str, optional Field names scan : str, optional Scan names Returns ------- float Fraction of the total data flagged """ limit_threads(n_threads=n_threads) from casatasks import flagdata msname = msname.rstrip("/") mspath = os.path.dirname(os.path.abspath(msname)) os.chdir(mspath) with suppress_output(): summary = flagdata(vis=msname, field=field, scan=scan, mode="summary") flagged_fraction = summary["flagged"] / summary["total"] return flagged_fraction
[docs] def flag_outside_uvrange(vis, uvrange, n_threads=-1, flagbackup=True): """ Flag outside the given uv range Parameters ---------- vis : str Measurement set name uvrange : str UV-range flagbackup : bool, optional Flag backup """ limit_threads(n_threads=n_threads) from casatasks import flagdata try: if "lambda" in uvrange: islambda = True uvrange = uvrange.replace("lambda", "") else: islambda = False if "~" in uvrange: low, high = uvrange.split("~") if islambda: low = f"{low}lambda" high = f"{high}lambda" cmds = [ {"mode": "manual", "uvrange": f"<{low}", "flagbackup": flagbackup}, {"mode": "manual", "uvrange": f">{high}", "flagbackup": flagbackup}, ] elif ">" in uvrange: low = uvrange.split(">")[-1] if islambda: low = f"{low}lambda" cmds = [ {"mode": "manual", "uvrange": f"<{low}", "flagbackup": flagbackup}, ] elif "<" in uvrange: high = uvrange.split("<")[-1] if islambda: high = f"{high}lambda" cmds = [ {"mode": "manual", "uvrange": f">{high}", "flagbackup": flagbackup}, ] else: cmds = [] if len(cmds) > 0: for cmd in cmds: print(f"Flagging command: {cmd}") flagdata(vis=vis, **cmd) return 0 except Exception as e: traceback.print_exc() return 1
# Expose functions and classes __all__ = [ name for name, obj in globals().items() if ( (isinstance(obj, types.FunctionType) or isinstance(obj, type)) and obj.__module__ == __name__ ) ]