Source code for meersolar.utils.meer_ploting_utils

import types
import astropy.units as u
import logging
import psutil
import numpy as np
import warnings
import glob
import dask
import os
import traceback
import matplotlib
import matplotlib.pyplot as plt
from dask import delayed, compute
from multiprocessing.pool import ThreadPool
from sunpy.net import Fido, attrs as a
from sunpy.map import Map
from astropy.visualization import ImageNormalize, PowerStretch, LogStretch
from astropy.io import fits
from astropy.time import Time
from astropy.coordinates import EarthLocation, SkyCoord
from astropy.wcs import FITSFixedWarning
from casatasks import casalog
from casatools import msmetadata, ms as casamstool
from datetime import datetime as dt, timedelta
from .basic_utils import *
from .proc_manage_utils import *
from .ms_metadata import *
from .meer_utils import *

warnings.simplefilter("ignore", category=FITSFixedWarning)

try:
    logfile = casalog.logfile()
    os.system("rm -rf " + logfile)
except BaseException:
    pass


#################################
# Plotting related functions
#################################


[docs] def get_meermap(fits_image, band="", do_sharpen=False): """ Make MeerKAT sunpy map Parameters ---------- fits_image : str MeerKAT fits image band : str, optional Band name do_sharpen : bool, optional Sharpen the image Returns ------- sunpy.map Sunpy map """ from scipy.ndimage import gaussian_filter from sunpy.map import make_fitswcs_header from sunpy.coordinates import frames, sun logging.getLogger("sunpy").setLevel(logging.ERROR) MEERLAT = -30.7133 MEERLON = 21.4429 MEERALT = 1086.6 meer_hdu = fits.open(fits_image) # Opening MeerKAT fits file meer_header = meer_hdu[0].header # meer header meer_data = meer_hdu[0].data if len(meer_data.shape) > 2: meer_data = meer_data[0, 0, :, :] # meer data if meer_header["CTYPE3"] == "FREQ": frequency = meer_header["CRVAL3"] * u.Hz elif meer_header["CTYPE4"] == "FREQ": frequency = meer_header["CRVAL4"] * u.Hz else: frequency = "" if band == "": try: band = meer_header["BAND"] except BaseException: band = "" try: pixel_unit = meer_header["BUNIT"] except BaseException: pixel_nuit = "" obstime = Time(meer_header["date-obs"]) meerpos = EarthLocation( lat=MEERLAT * u.deg, lon=MEERLON * u.deg, height=MEERALT * u.m ) # Converting into GCRS coordinate meer_gcrs = SkyCoord(meerpos.get_gcrs(obstime)) reference_coord = SkyCoord( meer_header["crval1"] * u.Unit(meer_header["cunit1"]), meer_header["crval2"] * u.Unit(meer_header["cunit2"]), frame="gcrs", obstime=obstime, obsgeoloc=meer_gcrs.cartesian, obsgeovel=meer_gcrs.velocity.to_cartesian(), distance=meer_gcrs.hcrs.distance, ) reference_coord_arcsec = reference_coord.transform_to( frames.Helioprojective(observer=meer_gcrs) ) cdelt1 = (np.abs(meer_header["cdelt1"]) * u.deg).to(u.arcsec) cdelt2 = (np.abs(meer_header["cdelt2"]) * u.deg).to(u.arcsec) P1 = sun.P(obstime) # Relative rotation angle new_meer_header = make_fitswcs_header( meer_data, reference_coord_arcsec, reference_pixel=u.Quantity( [meer_header["crpix1"] - 1, meer_header["crpix2"] - 1] * u.pixel ), scale=u.Quantity([cdelt1, cdelt2] * u.arcsec / u.pix), rotation_angle=-P1, wavelength=frequency.to(u.MHz).round(2), observatory="MeerKAT", ) if do_sharpen: blurred = gaussian_filter(meer_data, sigma=10) meer_data = meer_data + (meer_data - blurred) meer_map = Map(meer_data, new_meer_header) meer_map_rotate = meer_map.rotate() return meer_map_rotate
[docs] def save_in_hpc(fits_image, outdir="", xlim=[-1600, 1600], ylim=[-1600, 1600]): """ Save solar image in helioprojective coordinates Parameters ---------- fits_image : str FITS image name outdir : str, optional Output directory xlim : list X axis limit in arcsecond ylim : list Y axis limit in arcsecond Returns ------- str FITS image in helioprojective coordinate """ logging.getLogger("sunpy").setLevel(logging.ERROR) fits_header = fits.getheader(fits_image) meermap = get_meermap(fits_image) if len(xlim) == 2 and len(ylim) == 2: top_right = SkyCoord( xlim[1] * u.arcsec, ylim[1] * u.arcsec, frame=meermap.coordinate_frame ) bottom_left = SkyCoord( xlim[0] * u.arcsec, ylim[0] * u.arcsec, frame=meermap.coordinate_frame ) meermap = meermap.submap(bottom_left, top_right=top_right) if outdir == "": outdir = os.path.dirname(os.path.abspath(fits_image)) outfile = f"{outdir}/{os.path.basename(fits_image).split('.fits')[0]}_HPC.fits" if os.path.exists(outfile): os.system(f"rm -rf {outfile}") meermap.save(outfile, filetype="fits") data = fits.getdata(outfile) data = data[np.newaxis, np.newaxis, ...] hpc_header = fits.getheader(outfile) for key in [ "NAXIS", "NAXIS3", "NAXIS4", "BUNIT", "CTYPE3", "CRPIX3", "CRVAL3", "CDELT3", "CUNIT3", "CTYPE4", "CRPIX4", "CRVAL4", "CDELT4", "CUNIT4", "AUTHOR", "PIPELINE", "BAND", "MAX", "MIN", "RMS", "SUM", "MEAN", "MEDIAN", "RMSDYN", "MIMADYN", ]: if key in fits_header: hpc_header[key] = fits_header[key] fits.writeto(outfile, data=data, header=hpc_header, overwrite=True) return outfile
[docs] def plot_in_hpc( fits_image, draw_limb=False, extensions=["png"], outdirs=[], plot_range=[], power=0.5, xlim=[-1600, 1600], ylim=[-1600, 1600], contour_levels=[], band="", showgui=False, ): """ Function to convert MeerKAT image into Helioprojective co-ordinate Parameters ---------- fits_image : str Name of the fits image draw_limb : bool, optional Draw solar limb or not extensions : list, optional Output file extensions outdirs : list, optional Output directories for each extensions plot_range : list, optional Plot range power : float, optional Power stretch xlim : list X axis limit in arcsecond ylim : list Y axis limit in arcsecond contour_levels : list, optional Contour levels in fraction of peak, both positive and negative values allowed band : str, optional Band name showgui : bool, optional Show GUI Returns ------- outfiles Saved plot file names sunpy.Map MeerKAT image in helioprojective co-ordinate """ import matplotlib.ticker as ticker from matplotlib.patches import Ellipse, Rectangle from matplotlib.colors import ListedColormap from matplotlib import cm from sunpy.coordinates import sun logging.getLogger("sunpy").setLevel(logging.ERROR) if not showgui: matplotlib.use("Agg") else: matplotlib.use("TkAgg") matplotlib.rcParams.update({"font.size": 12}) fits_image = fits_image.rstrip("/") meer_header = fits.getheader(fits_image) # Opening MeerKAT fits file if meer_header["CTYPE3"] == "FREQ": frequency = meer_header["CRVAL3"] * u.Hz elif meer_header["CTYPE4"] == "FREQ": frequency = meer_header["CRVAL4"] * u.Hz else: frequency = "" if band == "": try: band = meer_header["BAND"] except BaseException: band = "" try: pixel_unit = meer_header["BUNIT"] except BaseException: pixel_nuit = "" obstime = Time(meer_header["date-obs"]) meer_map_rotate = get_meermap(fits_image, band=band) top_right = SkyCoord( xlim[1] * u.arcsec, ylim[1] * u.arcsec, frame=meer_map_rotate.coordinate_frame ) bottom_left = SkyCoord( xlim[0] * u.arcsec, ylim[0] * u.arcsec, frame=meer_map_rotate.coordinate_frame ) cropped_map = meer_map_rotate.submap(bottom_left, top_right=top_right) meer_data = cropped_map.data if len(plot_range) < 2: norm = ImageNormalize( meer_data, vmin=0.03 * np.nanmax(meer_data), vmax=0.99 * np.nanmax(meer_data), stretch=PowerStretch(power), ) else: norm = ImageNormalize( meer_data, vmin=np.nanmin(plot_range), vmax=np.nanmax(plot_range), stretch=PowerStretch(power), ) if band == "U": cmap = "inferno" pos_color = "white" neg_color = "cyan" elif band == "L": pos_color = "hotpink" neg_color = "yellow" if "YlGnBu_inferno" not in plt.colormaps(): # Sample YlGnBu_r colormap with 256 colors cmap_ylgnbu = cm.get_cmap("YlGnBu_r", 256) colors = cmap_ylgnbu(np.linspace(0, 1, 256)) # Create perceptually linear spacing using inferno luminance cmap_inferno = cm.get_cmap("inferno", 256) # Sort YlGnBu colors by the inferred brightness from inferno luminance_ranks = np.argsort( np.mean(cmap_inferno(np.linspace(0, 1, 256))[:, :3], axis=1) ) colors_uniform = colors[luminance_ranks] # New perceptual-YlGnBu-inspired colormap YlGnBu_inferno = ListedColormap(colors_uniform, name="YlGnBu_inferno") plt.colormaps.register(name="YlGnBu_inferno", cmap=YlGnBu_inferno) cmap = "YlGnBu_inferno" else: cmap = "cubehelix" pos_color = "cyan" neg_color = "gold" try: fig = plt.figure() ax = plt.subplot(projection=cropped_map) cropped_map.plot(norm=norm, cmap=cmap, axes=ax) if len(contour_levels) > 0: contour_levels = np.array(contour_levels) pos_cont = contour_levels[contour_levels >= 0] neg_cont = contour_levels[contour_levels < 0] if len(pos_cont) > 0: cropped_map.draw_contours( np.sort(pos_cont) * np.nanmax(meer_data), colors=pos_color ) if len(neg_cont) > 0: cropped_map.draw_contours( np.sort(neg_cont) * np.nanmax(meer_data), colors=neg_color ) ax.coords.grid(False) rgba_vmin = plt.get_cmap(cmap)(norm(norm.vmin)) ax.set_facecolor(rgba_vmin) # Read synthesized beam from header try: bmaj = meer_header["BMAJ"] * u.deg.to(u.arcsec) # in arcsec bmin = meer_header["BMIN"] * u.deg.to(u.arcsec) bpa = meer_header["BPA"] - sun.P(obstime).deg # in degrees except KeyError: bmaj = bmin = bpa = None # Plot PSF ellipse in bottom-left if all values are present if bmaj and bmin and bpa is not None: # Coordinates where to place the beam (e.g., 5% above bottom-left # corner) x0, x1 = ax.get_xlim() y0, y1 = ax.get_ylim() beam_center = SkyCoord( x0 + 0.08 * (x1 - x0), y0 + 0.08 * (y1 - y0), unit=u.arcsec, frame=cropped_map.coordinate_frame, ) # Add ellipse patch beam_ellipse = Ellipse( (beam_center.Tx.value, beam_center.Ty.value), # center in arcsec width=bmin, height=bmaj, angle=bpa, edgecolor="white", facecolor="white", lw=1, ) ax.add_patch(beam_ellipse) # Draw square box around the ellipse box_size = 100 # slightly bigger than beam rect = Rectangle( (beam_center.Tx.value - box_size / 2, beam_center.Ty.value - box_size / 2), width=box_size, height=box_size, edgecolor="white", facecolor="none", lw=1.2, linestyle="solid", ) ax.add_patch(rect) if draw_limb: cropped_map.draw_limb() formatter = ticker.FuncFormatter(lambda x, _: f"{int(x):.0e}") cbar = plt.colorbar(format=formatter) # Optional: set max 5 ticks to prevent clutter cbar.locator = ticker.MaxNLocator(nbins=5) cbar.update_ticks() if pixel_unit == "K": cbar.set_label("Brightness temperature (K)") elif pixel_unit == "JY/BEAM": cbar.set_label("Flux density (Jy/beam)") fig.tight_layout() output_image_list = [] for i in range(len(extensions)): ext = extensions[i] try: outdir = outdirs[i] except BaseException: outdir = os.path.dirname(os.path.abspath(fits_image)) if len(contour_levels) > 0: output_image = ( outdir + "/" + os.path.basename(fits_image).split(".fits")[0] + f"_contour.{ext}" ) else: output_image = ( outdir + "/" + os.path.basename(fits_image).split(".fits")[0] + f".{ext}" ) output_image_list.append(output_image) for output_image in output_image_list: fig.savefig(output_image) if showgui: plt.show() plt.close(fig) except Exception: traceback.print_exc() finally: plt.close("all") return output_image_list, cropped_map
[docs] def get_suvi_map(obs_date, obs_time, workdir, wavelength=195): """ Get GOES SUVI map Parameters ---------- obs_date : str Observation date in yyyy-mm-dd format obs_time : str Observation time in hh:mm format workdir : str Work directory wavelength : float, optional Wavelength, options: 94, 131, 171, 195, 284, 304 Å Returns ------- sunpy.map Sunpy SUVIMap """ logging.getLogger("sunpy").setLevel(logging.ERROR) warnings.filterwarnings( "ignore", message="This download has been started in a thread which is not the main thread", ) os.makedirs(workdir, exist_ok=True) start_time = dt.fromisoformat(f"{obs_date}T{obs_time}") t_start = (start_time - timedelta(minutes=2)).strftime("%Y-%m-%dT%H:%M") t_end = (start_time + timedelta(minutes=2)).strftime("%Y-%m-%dT%H:%M") time = a.Time(t_start, t_end) instrument = a.Instrument("suvi") wavelength = a.Wavelength(wavelength * u.angstrom) results = Fido.search(time, instrument, wavelength, a.Level(2)) downloaded_files = Fido.fetch(results, path=workdir, progress=False) obs_times = [] for image in downloaded_files: suvimap = Map(image) dateobs = suvimap.meta["date-obs"].split(".")[0] obs_times.append(dateobs) times_dt = [dt.strptime(t, "%Y-%m-%dT%H:%M:%S") for t in obs_times] closest_time = min(times_dt, key=lambda t: abs(t - start_time)) pos = times_dt.index(closest_time) closest_time_str = closest_time.strftime("%Y-%m-%dT%H:%M") final_image = downloaded_files[pos] suvi_map = Map(final_image) for f in downloaded_files: os.system(f"rm -rf {f}") return suvi_map
[docs] def enhance_offlimb(sunpy_map, do_sharpen=True): """ Enhance off-disk emission Parameters ---------- sunpy_map : sunpy.map Sunpy map do_sharpen : bool, optional Sharpen images Returns ------- sunpy.map Off-disk enhanced emission """ from scipy.ndimage import gaussian_filter from sunpy.map.maputils import all_coordinates_from_map logging.getLogger("sunpy").setLevel(logging.ERROR) hpc_coords = all_coordinates_from_map(sunpy_map) r = np.sqrt(hpc_coords.Tx**2 + hpc_coords.Ty**2) / sunpy_map.rsun_obs rsun_step_size = 0.01 rsun_array = np.arange(1, r.max(), rsun_step_size) y = np.array( [ sunpy_map.data[(r > this_r) * (r < this_r + rsun_step_size)].mean() for this_r in rsun_array ] ) pos = np.where(y < 10e-3)[0][0] r_lim = round(rsun_array[pos], 2) params = np.polyfit( rsun_array[rsun_array < r_lim], np.log(y[rsun_array < r_lim]), 1 ) scale_factor = np.exp((r - 1) * -params[0]) scale_factor[r < 1] = 1 if do_sharpen: blurred = gaussian_filter(sunpy_map.data, sigma=3) data = sunpy_map.data + (sunpy_map.data - blurred) else: data = sunpy_map.data scaled_map = Map(data * scale_factor, sunpy_map.meta) scaled_map.plot_settings["norm"] = ImageNormalize(stretch=LogStretch(10)) return scaled_map
[docs] def make_meer_overlay( meerkat_image, suvi_wavelength=195, plot_file_prefix=None, plot_meer_colormap=True, enhance_offdisk=True, contour_levels=[0.05, 0.1, 0.2, 0.4, 0.6, 0.8], do_sharpen_suvi=True, xlim=[-1600, 1600], ylim=[-1600, 1600], extensions=["png"], outdirs=[], ncpu=-1, showgui=False, verbose=False, ): """ Make overlay of MeerKAT image on GOES SUVI image Parameters ---------- meerkat_image : str MeerKAT image suvi_wavelength : float, optional GOES SUVI wavelength, options: 94, 131, 171, 195, 284, 304 Å plot_file_prefix : str, optional Plot file prefix name plot_meer_colormap : bool, optional Plot MeerKAT map colormap enhance_offdisk : bool, optional Enhance off-disk emission contour_levels : list, optional Contour levels in fraction of peak do_sharpen_suvi : bool, optional Do sharpen SUVI images xlim : list, optional X-axis limit in arcsec tlim : list, optional Y-axis limit in arcsec extensions : list, optional Image file extensions outdirs : list, optional Output directories for each extensions ncpu : int, optional Number of CPUs to use showgui : bool, optional Show GUI verbose: bool, optinal Verbose output Returns ------- list Plot file names """ import matplotlib import matplotlib.ticker as ticker import matplotlib.pyplot as plt from sunpy.coordinates import SphericalScreen from matplotlib.colors import ListedColormap from matplotlib import cm from sunpy.map import make_fitswcs_header logging.getLogger("sunpy").setLevel(logging.ERROR) logging.getLogger("reproject.common").setLevel(logging.WARNING) @delayed def reproject_map(smap, target_header): with SphericalScreen(smap.observer_coordinate): return smap.reproject_to(target_header) if showgui: matplotlib.use("TkAgg") else: matplotlib.use("Agg") workdir = os.path.dirname(os.path.abspath(meerkat_image)) meermap = get_meermap(meerkat_image) obs_datetime = fits.getheader(meerkat_image)["DATE-OBS"] obs_date = obs_datetime.split("T")[0] obs_time = ":".join(obs_datetime.split("T")[-1].split(":")[:2]) suvi_map = get_suvi_map(obs_date, obs_time, workdir, wavelength=suvi_wavelength) if enhance_offdisk: suvi_map = enhance_offlimb(suvi_map, do_sharpen=do_sharpen_suvi) projected_coord = SkyCoord( 0 * u.arcsec, 0 * u.arcsec, obstime=suvi_map.observer_coordinate.obstime, frame="helioprojective", observer=suvi_map.observer_coordinate, rsun=suvi_map.coordinate_frame.rsun, ) projected_header = make_fitswcs_header( suvi_map.data.shape, projected_coord, scale=u.Quantity(suvi_map.scale), instrument=suvi_map.instrument, wavelength=suvi_map.wavelength, ) reprojected = [ reproject_map(meermap, projected_header), reproject_map(suvi_map, projected_header), ] if ncpu < 1: ncpu = 1 pool = ThreadPool(processes=ncpu) with dask.config.set(pool=pool): meer_reprojected, suvi_reprojected = compute(*reprojected, scheduler="threads") meertime = meermap.meta["date-obs"].split(".")[0] suvitime = suvi_map.meta["date-obs"].split(".")[0] try: if plot_meer_colormap and len(contour_levels) > 0: matplotlib.rcParams.update({"font.size": 18}) fig = plt.figure(figsize=(16, 8)) ax_colormap = fig.add_subplot(1, 2, 1, projection=suvi_reprojected) ax_contour = fig.add_subplot(1, 2, 2, projection=suvi_reprojected) elif plot_meer_colormap: matplotlib.rcParams.update({"font.size": 14}) fig = plt.figure(figsize=(10, 8)) ax_colormap = fig.add_subplot(projection=suvi_reprojected) elif len(contour_levels) > 0: matplotlib.rcParams.update({"font.size": 14}) fig = plt.figure(figsize=(10, 8)) ax_contour = fig.add_subplot(projection=suvi_reprojected) else: print("No overlay is plotting.") return title = f"SUVI time: {suvitime}\n MeerKAT time: {meertime}" if "transparent_inferno" not in plt.colormaps(): cmap = cm.get_cmap("inferno", 256) colors = cmap(np.linspace(0, 1, 256)) x = np.linspace(0, 1, 256) alpha = 0.8 * (1 - np.exp(-3 * x)) colors[:, -1] = alpha # Update the alpha channel transparent_inferno = ListedColormap(colors) plt.colormaps.register(name="transparent_inferno", cmap=transparent_inferno) if plot_meer_colormap and len(contour_levels) > 0: suptitle = title.replace("\n", ",") title = "" fig.suptitle(suptitle) if plot_meer_colormap: z = 0 suvi_reprojected.plot( axes=ax_colormap, title=title, autoalign=True, clip_interval=(3, 99.9) * u.percent, zorder=z, ) z += 1 meer_reprojected.plot( axes=ax_colormap, title=title, clip_interval=(3, 99.9) * u.percent, cmap="transparent_inferno", zorder=z, ) if len(contour_levels) > 0: z = 0 suvi_reprojected.plot( axes=ax_contour, title=title, autoalign=True, clip_interval=(3, 99.9) * u.percent, zorder=z, ) z += 1 contour_levels = np.array(contour_levels) * np.nanmax(meer_reprojected.data) meer_reprojected.draw_contours( contour_levels, axes=ax_contour, cmap="YlGnBu", zorder=z ) ax_contour.set_facecolor("black") if len(xlim) > 0: x_pix_limits = [] for x in xlim: sky = SkyCoord( x * u.arcsec, 0 * u.arcsec, frame=suvi_reprojected.coordinate_frame ) x_pix = suvi_reprojected.world_to_pixel(sky)[0].value x_pix_limits.append(x_pix) if plot_meer_colormap and len(contour_levels) > 0: ax_colormap.set_xlim(x_pix_limits) ax_contour.set_xlim(x_pix_limits) elif plot_meer_colormap: ax_colormap.set_xlim(x_pix_limits) elif len(contour_levels) > 0: ax_contour.set_xlim(x_pix_limits) if len(ylim) > 0: y_pix_limits = [] for y in ylim: sky = SkyCoord( 0 * u.arcsec, y * u.arcsec, frame=suvi_reprojected.coordinate_frame ) y_pix = suvi_reprojected.world_to_pixel(sky)[1].value y_pix_limits.append(y_pix) if plot_meer_colormap and len(contour_levels) > 0: ax_colormap.set_ylim(y_pix_limits) ax_contour.set_ylim(y_pix_limits) elif plot_meer_colormap: ax_colormap.set_ylim(y_pix_limits) elif len(contour_levels) > 0: ax_contour.set_ylim(y_pix_limits) if plot_meer_colormap and len(contour_levels) > 0: ax_colormap.coords.grid(False) ax_contour.coords.grid(False) elif plot_meer_colormap: ax_colormap.coords.grid(False) elif len(contour_levels) > 0: ax_contour.coords.grid(False) fig.tight_layout() plot_file_list = [] if verbose: print("#######################") if plot_file_prefix: for i in range(len(extensions)): ext = extensions[i] try: savedir = outdirs[i] except BaseException: savedir = workdir plot_file = f"{savedir}/{plot_file_prefix}.{ext}" plt.savefig(plot_file, bbox_inches="tight") if verbose: print(f"Plot saved: {plot_file}") plot_file_list.append(plot_file) if verbose: print("#######################\n") else: plot_file = None if showgui: plt.show() plt.close(fig) else: plt.close(fig) except Exception: traceback.print_exc() finally: plt.close("all") return plot_file_list
############################## # Extract dynamic spectrum ##############################
[docs] def make_ds_file_per_scan(msname, save_file, scan, datacolumn): """ Extract dynamic spectrum from measurement set Parameters ---------- msname : str Measurement set name save_file : str File name to save dynamic spectrum scan : int Scan number datacolumn : str Data column name Returns ------- str Dynamic spectrum file """ if os.path.exists(f"{save_file}.npy") == False: mstool = casamstool() try: all_data = [] for ant in range(5): print(f"Extracting data for antenna :{ant}, scan: {scan}") mstool.open(msname) mstool.selectpolarization(["I"]) mstool.select( {"antenna1": ant, "antenna2": ant, "scan_number": int(scan)} ) data_dic = mstool.getdata(datacolumn) mstool.close() if datacolumn == "CORRECTED_DATA": data = np.abs(data_dic["corrected_data"][0, ...]) else: data = np.abs(data_dic["data"][0, ...]) del data_dic m = np.nanmedian(data, axis=1) data = data / m[:, None] all_data.append(data) del data except Exception as e: print("Auto-corrrelations are not present. Using short baselines.") count = 0 all_data = [] while count <= 5: for i in range(5): for j in range(5): if i != j: print( f"Extracting data for antennas :{i} and {j}, scan: {scan}" ) mstool.open(msname) mstool.selectpolarization(["I"]) mstool.select( { "antenna1": i, "antenna2": j, "scan_number": int(scan), } ) data_dic = mstool.getdata(datacolumn) mstool.close() if datacolumn == "CORRECTED_DATA": data = np.abs(data_dic["corrected_data"][0, ...]) else: data = np.abs(data_dic["data"][0, ...]) del data_dic m = np.nanmedian(data, axis=1) data = data / m[:, None] all_data.append(data) del data count += 1 finally: try: mstool.close() except: pass all_data = np.array(all_data) data = np.nanmedian(all_data, axis=0) bad_chans = get_bad_chans(msname) if bad_chans != "": bad_chans = bad_chans.replace("0:", "").split(";") for bad_chan in bad_chans: s = int(bad_chan.split("~")[0]) e = int(bad_chan.split("~")[-1]) + 1 data[s:e, :] = np.nan msmd = msmetadata() msmd.open(msname) freqs = msmd.chanfreqs(0, unit="MHz") times = msmd.timesforscans(int(scan)) timestamps = [mjdsec_to_timestamp(mjdsec, str_format=0) for mjdsec in times] msmd.close() np.save( save_file, np.array([freqs, times, timestamps, data], dtype="object"), ) if ".npy" in save_file: return save_file else: return f"{save_file}.npy"
[docs] def make_ds_plot(dsfiles, plot_file=None, showgui=False): """ Make dynamic spectrum plot Parameters ---------- dsfile : list DS files list plot_file : str, optional Plot file name to save the plot showgui : bool, optional Show GUI Returns ------- str Plot name """ from matplotlib.gridspec import GridSpec if showgui: matplotlib.use("TkAgg") else: matplotlib.use("Agg") matplotlib.rcParams.update({"font.size": 18}) if type(dsfiles) == str: dsfiles = [dsfiles] for i, dsfile in enumerate(dsfiles): freqs_i, times_i, timestamps_i, data_i = np.load(dsfile, allow_pickle=True) if i == 0: freqs = freqs_i times = times_i timestamps = timestamps_i data = data_i else: gapsize = int( (np.nanmin(times_i) - np.nanmax(times)) / (times[1] - times[0]) ) if gapsize < 10: last_time_median = np.nanmedian(data[:, -1], axis=0) new_time_median = np.nanmedian(data_i[:, 0], axis=0) data_i = (data_i / new_time_median) * last_time_median # Insert vertical NaN gap (1 column wide) gap = np.full((data.shape[0], gapsize), np.nan) data = np.concatenate([data, gap, data_i], axis=1) # Insert dummy time and timestamp times = np.append(times, np.nan) timestamps = np.append(timestamps, "GAP") # Append new values times = np.append(times, times_i) timestamps = np.append(timestamps, timestamps_i) # (Optional) Check or merge freqs if needed — assuming same across files # Normalize by median bandshape median_bandshape = np.nanmedian(data, axis=-1) pos = np.where(np.isnan(median_bandshape) == False)[0] data /= median_bandshape[:, None] data = data[min(pos) : max(pos), :] freqs = freqs[min(pos) : max(pos)] temp_times = times[np.isnan(times) == False] maxtimepos = np.argmax(temp_times) mintimepos = np.argmin(temp_times) datestamp = f"{timestamps[mintimepos].split('T')[0]}" tstart = f"{timestamps[mintimepos].split('T')[0]} {':'.join(timestamps[mintimepos].split('T')[-1].split(':')[:2])}" tend = f"{timestamps[maxtimepos].split('T')[0]} {':'.join(timestamps[maxtimepos].split('T')[-1].split(':')[:2])}" print(f"Time range : {tstart}~{tend}") results = Fido.search( a.Time(tstart, tend), a.Instrument("XRS"), a.Resolution("avg1m") ) files = Fido.fetch(results, path=os.path.dirname(dsfiles[0]), overwrite=False) goes_tseries = TimeSeries(files, concatenate=True) for goes_f in files: os.system(f"rm -rf {goes_f}") goes_tseries = goes_tseries.truncate(tstart, tend) timeseries = np.nanmean(data, axis=0) # Normalization data_std = np.nanstd(data) data_median = np.nanmedian(data) norm = ImageNormalize( data, stretch=LogStretch(1), vmin=0.99 * np.nanmin(data), vmax=0.99 * np.nanmax(data), ) try: # Create figure and GridSpec layout fig = plt.figure(figsize=(18, 10)) gs = GridSpec(nrows=3, ncols=2, width_ratios=[1, 0.03], height_ratios=[4, 1.5, 2]) # Axes ax_spec = fig.add_subplot(gs[0, 0]) ax_ts = fig.add_subplot(gs[1, 0]) ax_goes = fig.add_subplot(gs[2, 0]) cax = fig.add_subplot(gs[:, 1]) # colorbar spans both rows # Plot dynamic spectrum im = ax_spec.imshow(data, aspect="auto", origin="lower", norm=norm, cmap="magma") ax_spec.set_ylabel("Frequency (MHz)") ax_spec.set_xticklabels([]) # Remove x-axis labels from top plot # Y-ticks yticks = ax_spec.get_yticks() yticks = yticks[(yticks >= 0) & (yticks < len(freqs))] ax_spec.set_yticks(yticks) ax_spec.set_yticklabels([f"{freqs[int(i)]:.1f}" for i in yticks]) # Plot time series ax_ts.plot(timeseries) ax_ts.set_xlim(0, len(timeseries) - 1) ax_ts.set_ylabel("Mean \n flux density") goes_tseries.plot(axes=ax_goes) goes_times = goes_tseries.time times_dt = goes_times.to_datetime() ax_goes.set_xlim(times_dt[0], times_dt[-1]) ax_goes.set_ylabel(r"Flux ($\frac{W}{m^2}$)") ax_goes.legend(ncol=2, loc="upper right") ax_goes.set_title("GOES light curve", fontsize=14) ax_ts.set_title("MeerKAT light curve", fontsize=14) ax_spec.set_title("MeerKAT dynamic spectrum", fontsize=14) ax_goes.set_xlabel("Time (UTC)") # Format x-ticks ax_ts.set_xticks([]) ax_ts.set_xticklabels([]) # Colorbar cbar = fig.colorbar(im, cax=cax) cbar.set_label("Flux density (arb. unit)") plt.tight_layout() # Save or show if plot_file: plt.savefig(plot_file, bbox_inches="tight") print(f"Plot saved: {plot_file}") if showgui: plt.show() plt.close(fig) else: plt.close(fig) except Exception: traceback.print_exc() finally: plt.close("all") return plot_file
[docs] def plot_goes_full_timeseries( msname, workdir, plot_file_prefix=None, extension="png", showgui=False ): """ Plot GOES full time series on the day of observation Parameters ---------- msname : str Measurement set workdir : str Work directory plot_file_prefix : str, optional Plot file name prefix extension : str, optional Save file extension showgui : bool, optional Show GUI Returns ------- str Plot file name """ os.makedirs(workdir, exist_ok=True) if showgui: matplotlib.use("TkAgg") else: matplotlib.use("Agg") matplotlib.rcParams.update({"font.size": 14}) scans, cal_scans, f_scans, g_scans, p_scans = get_cal_target_scans(msname) valid_scans = get_valid_scans(msname) filtered_scans = [] for scan in scans: if scan in valid_scans: filtered_scans.append(scan) msmd = msmetadata() msmd.open(msname) tstart_mjd = min(msmd.timesforscan(int(min(filtered_scans)))) tend_mjd = max(msmd.timesforscan(int(max(filtered_scans)))) msmd.close() tstart = mjdsec_to_timestamp(tstart_mjd, str_format=2) tend = mjdsec_to_timestamp(tend_mjd, str_format=2) print(f"Time range: {tstart}~{tend}") results = Fido.search( a.Time(tstart, tend), a.Instrument("XRS"), a.Resolution("avg1m") ) files = Fido.fetch(results, path=workdir, overwrite=False) goes_tseries = TimeSeries(files, concatenate=True) for f in files: os.system(f"rm -rf {f}") fig, ax = plt.subplots(figsize=(15, 5), constrained_layout=True) goes_tseries.plot(axes=ax) times = goes_tseries.time times_dt = times.to_datetime() ax.axvspan(tstart, tend, alpha=0.2) ax.set_xlim(times_dt[0], times_dt[-1]) plt.tight_layout() # Save or show if plot_file_prefix: plot_file = f"{workdir}/{plot_file_prefix}.{extension}" plt.savefig(plot_file, bbox_inches="tight") print(f"Plot saved: {plot_file}") else: plot_file = None if showgui: plt.show() plt.close(fig) plt.close("all") else: plt.close(fig) return plot_file
[docs] def rename_meersolar_image( imagename, imagedir="", pol="", band="", attcal="NOINFO", cutout_rsun=2.5, make_overlay=True, make_plots=True, ): """ Rename and move image to image directory Parameters ---------- imagename : str Image name imagedir : str, optional Image directory (default given image directory) pol : str, optional Stokes parameters band : str, optional Observing band attcal : str, optional Solar attenuation calibrated or not cutout_rsun : float, optional Cutout in solar radii from center (default: 2.5 solar radii) make_overlay : bool, optional Make overlay on SUVI make_plots : bool, optional Make radio map plot in helioprojective coordinates Returns ------- str New imagename with full path """ imagename = imagename.rstrip("/") imagename = cutout_image( imagename, imagename, x_deg=(cutout_rsun * 2 * 16.0) / 60.0 ) header = fits.getheader(imagename) time = header["DATE-OBS"] astro_time = Time(time, scale="utc") sun_jpl = Horizons(id="10", location="500", epochs=astro_time.jd) eph = sun_jpl.ephemerides() sun_coords = SkyCoord( ra=eph["RA"][0] * u.deg, dec=eph["DEC"][0] * u.deg, frame="icrs" ) maxval, minval, rms, total_val, mean_val, median_val, rms_dyn, minmax_dyn = ( calc_solar_image_stat(imagename, disc_size=18) ) with fits.open(imagename, mode="update") as hdul: hdr = hdul[0].header hdr["AUTHOR"] = "DevojyotiKansabanik,DeepanPatra" if band != "": hdr["BAND"] = band hdr["PIPELINE"] = "MeerSOLAR" hdr["CRVAL1"] = sun_coords.ra.deg hdr["CRVAL2"] = sun_coords.dec.deg hdr["MAX"] = maxval hdr["MIN"] = minval hdr["RMS"] = rms hdr["SUM"] = total_val hdr["MEAN"] = mean_val hdr["MEDIAN"] = median_val hdr["RMSDYN"] = rms_dyn hdr["MIMADYN"] = minmax_dyn hdr["ATTCAL"] = str(attcal) freq = round(header["CRVAL3"] / 10**6, 2) t_str = "".join(time.split("T")[0].split("-")) + ( "".join(time.split("T")[-1].split(":")) ) new_name = "time_" + t_str + "_freq_" + str(freq) if pol != "": new_name += "_pol_" + str(pol) if "MFS" in imagename: new_name += "_MFS" new_name = new_name + ".fits" if imagedir == "": imagedir = os.path.dirname(os.path.abspath(imagename)) new_name = imagedir + "/" + new_name os.system("mv " + imagename + " " + new_name) hpcdir = f"{os.path.dirname(imagedir)}/images/hpcs" os.makedirs(hpcdir, exist_ok=True) save_in_hpc(new_name, outdir=hpcdir) if make_plots: try: pngdir = f"{os.path.dirname(imagedir)}/images/pngs" os.makedirs(pngdir, exist_ok=True) outimages, cropped_map = plot_in_hpc( new_name, draw_limb=True, extensions=["png"], outdirs=[pngdir], ) except Exception: pass if make_overlay: try: overlay_pngdir = f"{os.path.dirname(imagedir)}/overlays_pngs" os.makedirs(overlay_pngdir, exist_ok=True) outimages = make_meer_overlay( new_name, plot_file_prefix=os.path.basename(new_name).split(".fits")[0] + "_suvi_meerkat_overlay", extensions=["png"], outdirs=[overlay_pngdir], verbose=False, ) except Exception: pass return new_name
# Expose functions and classes __all__ = [ name for name, obj in globals().items() if ( (isinstance(obj, types.FunctionType) or isinstance(obj, type)) and obj.__module__ == __name__ ) ]