Source code for meersolar.utils.meer_ploting_utils

import types
import astropy.units as u
import logging
import psutil
import numpy as np
import warnings
import glob
import dask
import os
import traceback
import requests
import matplotlib
import matplotlib.pyplot as plt
from parfive import Downloader
from bs4 import BeautifulSoup
from dask import delayed, compute
from multiprocessing.pool import ThreadPool
from sunpy.net import Fido, attrs as a
from sunpy.map import Map
from sunpy.timeseries import TimeSeries
from astropy.visualization import ImageNormalize, PowerStretch, LogStretch
from astropy.io import fits
from astropy.time import Time
from astropy.coordinates import EarthLocation, SkyCoord
from astropy.wcs import FITSFixedWarning
from astroquery.jplhorizons import Horizons
from casatools import msmetadata, ms as casamstool, table
from datetime import datetime as dt, timedelta
from dask import delayed
from PIL import Image
from .basic_utils import *
from .proc_manage_utils import *
from .ms_metadata import *
from .meer_utils import *

warnings.simplefilter("ignore", category=FITSFixedWarning)


#################################
# Plotting related functions
#################################
[docs] def plot_ms_diagnostics( msname, outdir="", dask_client=None, cpu_frac=0.8, mem_frac=0.8 ): """ Plot diagonistics plots for measurement set Parameters ---------- msname : str Measurement set outdir : str, optional Output directory dask_client : dask.client Dask client cpu_frac : float, optional CPU fraction mem_frac : float, optional Memory fraction Returns ------- int Success message list Output plot file list """ if outdir == "": outdir = os.getcwd() os.makedirs(outdir, exist_ok=True) output_pdf = f"{outdir}/{os.path.basename(msname).split('.ms')[0]}_plots" output_pdf_list = glob.glob(f"{output_pdf}*.pdf") if len(output_pdf_list) > 0: return 0, output_pdf_list msname = msname.rstrip("/") mstool = casamstool() mstool.open(msname) nrow = mstool.nrow() mstool.close() msmd = msmetadata() msmd.open(msname) npol = msmd.ncorrforpol()[0] scan_list = msmd.scannumbers() msmd.close() scan_sizes = [get_ms_scan_size(msname, scan) for scan in scan_list] if cpu_frac > 0.8: cpu_frac = 0.8 ncpu = max(1, int(psutil.cpu_count() * cpu_frac)) if mem_frac > 0.8: mem_frac = 0.8 total_mem = (psutil.virtual_memory().available * mem_frac) / (1024**3) # In GB max_scan_size = max(scan_sizes) frac_chunk = min(1, total_mem / max_scan_size) nchunk = int(nrow * frac_chunk) output_pdf_list = [] try: ####################### # Commands to run ###################### cmds = [] # Define correlation groups corr_sets = [ ("XX,YY", True), # parallel hands, always plotted ("XY,YX", npol == 4), # cross hands, only if 4 pols ] # Define y-axis modes and labels plot_types = { "amp": "Amplitude", "phase": "Phase (deg)", "real": "Real", "imag": "Imaginary", } # Define x-axis settings xaxes = {"uv": ("UV(m)",), "FREQ": ("Frequency (GHz)",), "TIME": ("Time",)} for corr, do_plot in corr_sets: if not do_plot: continue for yaxis, ylabel in plot_types.items(): for xaxis, (xlabel,) in xaxes.items(): for col in ["CORRECTED_DATA", "CORRECTED_DATA-MODEL_DATA"]: cmds.append( f"shadems --no-lim-save --xaxis {xaxis} --yaxis {yaxis} " f"--col {col} -j {ncpu} -z {nchunk} " f"--xlabel '{xlabel}' --ylabel '{ylabel}' " f"--corr {corr} --colour-by CORR --iter-scan --iter-field " f"--dmap tab10 {msname}" ) print(f"Making plots of: {msname}") for cmd in cmds: run_shadems(cmd, verbose=False) for yaxis, ylabel in plot_types.items(): ######################### # Making plots ######################### pngs = glob.glob(f"*{yaxis}*.png") outfile = f"{output_pdf}_{yaxis}.pdf" if len(pngs) > 0: images = [] for image in pngs: images.append(Image.open(image).convert("RGB")) images[0].save(outfile, save_all=True, append_images=images[1:]) output_pdf_list.append(outfile) for png in pngs: os.system(f"rm -rf {png}") else: print(f"No plot for {ylabel} is made.") if len(output_pdf_list) > 0: return 0, output_pdf_list else: print("No plot is made.") return 1, [] except Exception: traceback.print_exc() finally: drop_cache(msname) os.system(f"rm -rf log-shadems.txt")
[docs] def plot_caltable_diagnostics(caltable, outdir=""): """ Plot diagonistic plot of a caltable Parameters ---------- caltable : str Caltable name outdir : str, optional Output directory Returns ------- int Success messsage str Output file """ caltable = caltable.rstrip("/") if outdir == "": outdir = os.getcwd() os.makedirs(outdir, exist_ok=True) output_pdf = f"{outdir}/{os.path.basename(caltable)}_plots.pdf" if os.path.exists(output_pdf): return 0, output_pdf pols = ["X", "Y"] ncols = 3 nrows = 3 plots_per_fig = ncols * nrows out_files = [] try: tb = table() tb.open(f"{caltable}/SPECTRAL_WINDOW") freqs = tb.getcol("CHAN_FREQ") / 10**9 # In GHz tb.close() tb.open(caltable) cal_type = tb.getkeywords()["VisCal"] if cal_type == "K Jones": gain = tb.getcol("FPARAM") flag = tb.getcol("FLAG") else: gain = tb.getcol("CPARAM") flag = tb.getcol("FLAG") gain[flag] = np.nan ants = np.unique(tb.getcol("ANTENNA1")) times = np.unique(tb.getcol("TIME")) nant = np.nanmax(ants) + 1 tb.close() print(f"Ploting {cal_type}") if cal_type == "K Jones": plt.figure(figsize=(15, 10)) gain = np.nanmean(gain, axis=1) for i in range(2): plt.scatter( range(gain.shape[-1]), gain[i, ...], label=f"Pol: {pols[i]}" ) plt.xlabel("Antenna index", fontsize=14) plt.ylabel("Delay (ns)", fontsize=14) plt.title("Antenna vs Delay", fontsize=14) plt.legend() plt.tight_layout() savefile = f"{caltable}.png" plt.savefig(savefile) plt.clf() out_files.append(savefile) else: if cal_type == "G Jones": ntime = int(gain.shape[-1] / nant) gain = gain.reshape(gain.shape[0], gain.shape[1], nant, ntime) gain = gain[:, 0, ...] elif cal_type == "T Jones": ntime = int(gain.shape[-1] / nant) gain = gain.reshape(gain.shape[0], gain.shape[1], nant, ntime) gain = gain[0, 0, ...] elif cal_type == "B Jones" or cal_type == "Df Jones": ntime = int(gain.shape[-1] / nant) gain = gain.reshape(gain.shape[0], gain.shape[1], nant, ntime) gain = np.nanmean(gain, axis=-1) else: print(f"{cal_type} is not implemented yet.") return for quantity in ["amp", "phase"]: if cal_type == "G Jones": for idx in range(0, len(ants), plots_per_fig): fig, axes = plt.subplots(nrows, ncols, figsize=(15, 10)) if quantity == "amp": fig.suptitle("Time vs Gain Amplitude", fontsize=14) else: fig.suptitle("Time vs Gain Phase", fontsize=14) axes = axes.flatten() for i, ant in enumerate(ants[idx : idx + plots_per_fig]): ax = axes[i] for j in range(2): # loop over polarizations if quantity == "amp": ax.scatter( times - np.nanmin(times), np.abs(gain[j, ant, :]), label=f"Pol: {pols[j]}", s=14, ) ax.set_ylabel("Gain Amplitude", fontsize=14) else: ax.scatter( times - np.nanmin(times), np.angle(gain[j, ant, :], deg=True), label=f"Pol: {pols[j]}", s=14, ) ax.set_ylabel("Gain Phase (degree)", fontsize=14) ax.set_title(f"Antenna {ant+1}", fontsize=14) ax.set_xlabel("Time (s)", fontsize=14) ax.legend(fontsize=10) for j in range(i + 1, plots_per_fig): fig.delaxes(axes[j]) plt.tight_layout(rect=[0, 0, 1, 0.99]) savefile = f"{caltable}_gain_{quantity}_batch_{idx // plots_per_fig + 1}.png" plt.savefig(savefile) plt.close() out_files.append(savefile) elif cal_type == "T Jones": for idx in range(0, len(ants), plots_per_fig): fig, axes = plt.subplots(nrows, ncols, figsize=(15, 10)) if quantity == "amp": fig.suptitle("Time vs Gain Amplitude", fontsize=14) else: fig.suptitle("Time vs Gain Phase", fontsize=14) axes = axes.flatten() for i, ant in enumerate(ants[idx : idx + plots_per_fig]): ax = axes[i] if quantity == "amp": ax.scatter( times - np.nanmin(times), np.abs(gain[ant, :]) ) ax.set_ylabel("Gain Amplitude", fontsize=14) else: ax.scatter( times - np.nanmin(times), np.angle(gain[ant, :], deg=True), ) ax.set_ylabel("Gain Phase", fontsize=14) ax.set_title(f"Antenna {ant+1}", fontsize=14) ax.set_xlabel("Time (s)", fontsize=14) for j in range(i + 1, plots_per_fig): fig.delaxes(axes[j]) plt.tight_layout(rect=[0, 0, 1, 0.99]) savefile = f"{caltable}_gain_{quantity}_batch_{idx // plots_per_fig + 1}.png" plt.savefig(savefile) plt.close() out_files.append(savefile) elif cal_type == "B Jones" or cal_type == "Df Jones": for idx in range(0, len(ants), plots_per_fig): fig, axes = plt.subplots(nrows, ncols, figsize=(15, 10)) if quantity == "amp": fig.suptitle("Frequency vs Gain Amplitude", fontsize=14) else: fig.suptitle("Frequency vs Gain Phase", fontsize=14) axes = axes.flatten() for i, ant in enumerate(ants[idx : idx + plots_per_fig]): ax = axes[i] for j in range(2): if quantity == "amp": ax.scatter( freqs, np.abs(gain[j, :, ant]), label=f"Pol: {pols[j]}", s=14, ) ax.set_ylabel("Gain Amplitude", fontsize=14) else: ax.scatter( freqs, np.angle(gain[j, :, ant], deg=True), label=f"Pol: {pols[j]}", s=14, ) ax.set_ylabel("Gain Phase (degree)", fontsize=14) ax.set_title(f"Antenna {ant+1}", fontsize=14) ax.set_xlabel("Frequency (GHz)", fontsize=14) ax.legend(fontsize=10) for j in range(i + 1, plots_per_fig): fig.delaxes(axes[j]) plt.tight_layout(rect=[0, 0, 1, 0.99]) savefile = f"{caltable}_gain_{quantity}_batch_{idx // plots_per_fig + 1}.png" plt.savefig(savefile) plt.close() out_files.append(savefile) images = [] for image in out_files: images.append(Image.open(image).convert("RGB")) images[0].save(output_pdf, save_all=True, append_images=images[1:]) return 0, output_pdf except Exception: traceback.print_exc() return 1, "" finally: drop_cache(caltable) for png in out_files: os.system(f"rm -rf {png}")
[docs] def get_meermap(fits_image, band="", do_sharpen=False): """ Make MeerKAT sunpy map Parameters ---------- fits_image : str MeerKAT fits image band : str, optional Band name do_sharpen : bool, optional Sharpen the image Returns ------- sunpy.map Sunpy map """ from scipy.ndimage import gaussian_filter from sunpy.map import make_fitswcs_header from sunpy.coordinates import frames, sun logging.getLogger("sunpy").setLevel(logging.ERROR) MEERLAT = -30.7133 MEERLON = 21.4429 MEERALT = 1086.6 meer_hdu = fits.open(fits_image) # Opening MeerKAT fits file meer_header = meer_hdu[0].header # meer header meer_data = meer_hdu[0].data if len(meer_data.shape) > 2: meer_data = meer_data[0, 0, :, :] # meer data if meer_header["CTYPE3"] == "FREQ": frequency = meer_header["CRVAL3"] * u.Hz elif meer_header["CTYPE4"] == "FREQ": frequency = meer_header["CRVAL4"] * u.Hz else: frequency = "" if band == "": try: band = meer_header["BAND"] except BaseException: band = "" try: pixel_unit = meer_header["BUNIT"] except BaseException: pixel_nuit = "" obstime = Time(meer_header["date-obs"]) meerpos = EarthLocation( lat=MEERLAT * u.deg, lon=MEERLON * u.deg, height=MEERALT * u.m ) # Converting into GCRS coordinate meer_gcrs = SkyCoord(meerpos.get_gcrs(obstime)) reference_coord = SkyCoord( meer_header["crval1"] * u.Unit(meer_header["cunit1"]), meer_header["crval2"] * u.Unit(meer_header["cunit2"]), frame="gcrs", obstime=obstime, obsgeoloc=meer_gcrs.cartesian, obsgeovel=meer_gcrs.velocity.to_cartesian(), distance=meer_gcrs.hcrs.distance, ) reference_coord_arcsec = reference_coord.transform_to( frames.Helioprojective(observer=meer_gcrs) ) cdelt1 = (np.abs(meer_header["cdelt1"]) * u.deg).to(u.arcsec) cdelt2 = (np.abs(meer_header["cdelt2"]) * u.deg).to(u.arcsec) P1 = sun.P(obstime) # Relative rotation angle new_meer_header = make_fitswcs_header( meer_data, reference_coord_arcsec, reference_pixel=u.Quantity( [meer_header["crpix1"] - 1, meer_header["crpix2"] - 1] * u.pixel ), scale=u.Quantity([cdelt1, cdelt2] * u.arcsec / u.pix), rotation_angle=-P1, wavelength=frequency.to(u.MHz).round(2), observatory="MeerKAT", ) if do_sharpen: blurred = gaussian_filter(meer_data, sigma=10) meer_data = meer_data + (meer_data - blurred) meer_map = Map(meer_data, new_meer_header) meer_map_rotate = meer_map.rotate() return meer_map_rotate
[docs] def save_in_hpc(fits_image, outdir="", xlim=[-1600, 1600], ylim=[-1600, 1600]): """ Save solar image in helioprojective coordinates Parameters ---------- fits_image : str FITS image name outdir : str, optional Output directory xlim : list X axis limit in arcsecond ylim : list Y axis limit in arcsecond Returns ------- str FITS image in helioprojective coordinate """ logging.getLogger("sunpy").setLevel(logging.ERROR) fits_header = fits.getheader(fits_image) meermap = get_meermap(fits_image) if len(xlim) == 2 and len(ylim) == 2: top_right = SkyCoord( xlim[1] * u.arcsec, ylim[1] * u.arcsec, frame=meermap.coordinate_frame ) bottom_left = SkyCoord( xlim[0] * u.arcsec, ylim[0] * u.arcsec, frame=meermap.coordinate_frame ) meermap = meermap.submap(bottom_left, top_right=top_right) if outdir == "": outdir = os.path.dirname(os.path.abspath(fits_image)) outfile = f"{outdir}/{os.path.basename(fits_image).split('.fits')[0]}_HPC.fits" if os.path.exists(outfile): os.system(f"rm -rf {outfile}") meermap.save(outfile, filetype="fits") data = fits.getdata(outfile) data = data[np.newaxis, np.newaxis, ...] hpc_header = fits.getheader(outfile) for key in [ "NAXIS", "NAXIS3", "NAXIS4", "BUNIT", "CTYPE3", "CRPIX3", "CRVAL3", "CDELT3", "CUNIT3", "CTYPE4", "CRPIX4", "CRVAL4", "CDELT4", "CUNIT4", "AUTHOR", "PIPELINE", "BAND", "MAX", "MIN", "RMS", "SUM", "MEAN", "MEDIAN", "RMSDYN", "MIMADYN", ]: if key in fits_header: hpc_header[key] = fits_header[key] fits.writeto(outfile, data=data, header=hpc_header, overwrite=True) return outfile
[docs] def plot_in_hpc( fits_image, draw_limb=False, extensions=["png"], outdirs=[], plot_range=[], power=0.5, xlim=[-1600, 1600], ylim=[-1600, 1600], contour_levels=[], band="", showgui=False, ): """ Function to convert MeerKAT image into Helioprojective co-ordinate Parameters ---------- fits_image : str Name of the fits image draw_limb : bool, optional Draw solar limb or not extensions : list, optional Output file extensions outdirs : list, optional Output directories for each extensions plot_range : list, optional Plot range power : float, optional Power stretch xlim : list X axis limit in arcsecond ylim : list Y axis limit in arcsecond contour_levels : list, optional Contour levels in fraction of peak, both positive and negative values allowed band : str, optional Band name showgui : bool, optional Show GUI Returns ------- outfiles Saved plot file names sunpy.Map MeerKAT image in helioprojective co-ordinate """ import matplotlib.ticker as ticker from matplotlib.patches import Ellipse, Rectangle from matplotlib.colors import ListedColormap from matplotlib import cm from sunpy.coordinates import sun logging.getLogger("sunpy").setLevel(logging.ERROR) if not showgui: matplotlib.use("Agg") else: matplotlib.use("TkAgg") matplotlib.rcParams.update({"font.size": 12}) fits_image = fits_image.rstrip("/") meer_header = fits.getheader(fits_image) # Opening MeerKAT fits file if meer_header["CTYPE3"] == "FREQ": frequency = meer_header["CRVAL3"] * u.Hz elif meer_header["CTYPE4"] == "FREQ": frequency = meer_header["CRVAL4"] * u.Hz else: frequency = "" if band == "": try: band = meer_header["BAND"] except BaseException: band = "" try: pixel_unit = meer_header["BUNIT"] except BaseException: pixel_nuit = "" pixel_scale = abs(meer_header["CDELT1"]) * 3600.0 # In arcsec obstime = Time(meer_header["date-obs"]) meer_map_rotate = get_meermap(fits_image, band=band) top_right = SkyCoord( xlim[1] * u.arcsec, ylim[1] * u.arcsec, frame=meer_map_rotate.coordinate_frame ) bottom_left = SkyCoord( xlim[0] * u.arcsec, ylim[0] * u.arcsec, frame=meer_map_rotate.coordinate_frame ) cropped_map = meer_map_rotate.submap(bottom_left, top_right=top_right) meer_data = cropped_map.data if len(plot_range) < 2: norm = ImageNormalize( meer_data, vmin=0.03 * np.nanmax(meer_data), vmax=0.99 * np.nanmax(meer_data), stretch=PowerStretch(power), ) else: norm = ImageNormalize( meer_data, vmin=np.nanmin(plot_range), vmax=np.nanmax(plot_range), stretch=PowerStretch(power), ) if band == "U": cmap = "inferno" pos_color = "white" neg_color = "cyan" elif band == "L": pos_color = "hotpink" neg_color = "yellow" if "YlGnBu_inferno" not in plt.colormaps(): # Sample YlGnBu_r colormap with 256 colors cmap_ylgnbu = cm.get_cmap("YlGnBu_r", 256) colors = cmap_ylgnbu(np.linspace(0, 1, 256)) # Create perceptually linear spacing using inferno luminance cmap_inferno = cm.get_cmap("inferno", 256) # Sort YlGnBu colors by the inferred brightness from inferno luminance_ranks = np.argsort( np.mean(cmap_inferno(np.linspace(0, 1, 256))[:, :3], axis=1) ) colors_uniform = colors[luminance_ranks] # New perceptual-YlGnBu-inspired colormap YlGnBu_inferno = ListedColormap(colors_uniform, name="YlGnBu_inferno") plt.colormaps.register(name="YlGnBu_inferno", cmap=YlGnBu_inferno) cmap = "YlGnBu_inferno" else: cmap = "cubehelix" pos_color = "cyan" neg_color = "gold" try: fig = plt.figure() ax = plt.subplot(projection=cropped_map) cropped_map.plot(norm=norm, cmap=cmap, axes=ax) if len(contour_levels) > 0: contour_levels = np.array(contour_levels) pos_cont = contour_levels[contour_levels >= 0] neg_cont = contour_levels[contour_levels < 0] if len(pos_cont) > 0: cropped_map.draw_contours( np.sort(pos_cont) * np.nanmax(meer_data), colors=pos_color ) if len(neg_cont) > 0: cropped_map.draw_contours( np.sort(neg_cont) * np.nanmax(meer_data), colors=neg_color ) ax.coords.grid(False) rgba_vmin = plt.get_cmap(cmap)(norm(norm.vmin)) ax.set_facecolor(rgba_vmin) # Read synthesized beam from header try: bmaj = meer_header["BMAJ"] * u.deg.to(u.arcsec) # in arcsec bmin = meer_header["BMIN"] * u.deg.to(u.arcsec) bpa = meer_header["BPA"] - sun.P(obstime).deg # in degrees except KeyError: bmaj = bmin = bpa = None # Plot PSF ellipse in bottom-left if all values are present if bmaj and bmin and bpa is not None: # Coordinates where to place the beam (e.g., 5% above bottom-left # corner) x0, x1 = ax.get_xlim() y0, y1 = ax.get_ylim() beam_center = SkyCoord( x0 + 0.08 * (x1 - x0), y0 + 0.08 * (y1 - y0), unit=u.arcsec, frame=cropped_map.coordinate_frame, ) # Add ellipse patch beam_ellipse = Ellipse( (beam_center.Tx.value, beam_center.Ty.value), # center in arcsec width=bmin / pixel_scale, height=bmaj / pixel_scale, angle=bpa, edgecolor="white", facecolor="white", lw=1, ) ax.add_patch(beam_ellipse) # Draw square box around the ellipse box_size = ( max(0.2 * (x1 - x0), 1.5 * max(bmin, bmaj)) / pixel_scale ) # slightly bigger than beam # slightly bigger than beam rect = Rectangle( ( beam_center.Tx.value - box_size / 2, beam_center.Ty.value - box_size / 2, ), width=box_size, height=box_size, edgecolor="white", facecolor="none", lw=1.2, linestyle="solid", ) ax.add_patch(rect) if draw_limb: cropped_map.draw_limb() formatter = ticker.FuncFormatter(lambda x, _: f"{int(x):.0e}") cbar = plt.colorbar(format=formatter) # Optional: set max 5 ticks to prevent clutter cbar.locator = ticker.MaxNLocator(nbins=5) cbar.update_ticks() if pixel_unit == "K": cbar.set_label("Brightness temperature (K)") elif pixel_unit == "JY/BEAM": cbar.set_label("Flux density (Jy/beam)") fig.tight_layout() output_image_list = [] for i in range(len(extensions)): ext = extensions[i] try: outdir = outdirs[i] except BaseException: outdir = os.path.dirname(os.path.abspath(fits_image)) if len(contour_levels) > 0: output_image = ( outdir + "/" + os.path.basename(fits_image).split(".fits")[0] + f"_contour.{ext}" ) else: output_image = ( outdir + "/" + os.path.basename(fits_image).split(".fits")[0] + f".{ext}" ) output_image_list.append(output_image) for output_image in output_image_list: fig.savefig(output_image) if showgui: plt.show() plt.close(fig) except Exception: traceback.print_exc() finally: plt.close("all") return output_image_list, cropped_map
[docs] def get_aia_map(obs_date, obs_time, workdir, wavelength=193, keep_aia_fits=False): """ Get SDO AIA map Parameters ---------- obs_date : str Observation date in yyyy-mm-dd format obs_time : str Observation time in hh:mm format workdir : str Work directory wavelength : float, optional Wavelength, options: 94, 131, 171, 193, 211, 304, 335 Å keep_aia_fits : bool, optional Keep AIA fits file or not Returns ------- sunpy.map Sunpy AIAMap """ logging.getLogger("sunpy").setLevel(logging.ERROR) warnings.filterwarnings( "ignore", message="This download has been started in a thread which is not the main thread", ) aia_wavelengths = [94, 131, 171, 193, 211, 304, 335] if wavelength not in aia_wavelengths: print("Please provide correct AIA wavelength from : {aia_wavelengths}.") return os.makedirs(workdir, exist_ok=True) start_time = dt.fromisoformat(f"{obs_date}T{obs_time}") t_start = (start_time - timedelta(minutes=2)).strftime("%Y-%m-%dT%H:%M") t_end = (start_time + timedelta(minutes=2)).strftime("%Y-%m-%dT%H:%M") time = a.Time(t_start, t_end) instrument = a.Instrument("aia") wavelength = a.Wavelength(wavelength * u.angstrom) results = Fido.search(time, instrument, wavelength) obs_times = results[0]["Start Time"].value.tolist() times_dt = [dt.strptime(t, "%Y-%m-%d %H:%M:%S.%f") for t in obs_times] closest_time = min(times_dt, key=lambda t: abs(t - start_time)) pos = times_dt.index(closest_time) downloaded_files = Fido.fetch( results[0][pos], path=workdir, progress=False, overwrite=False ) final_image = downloaded_files[0] aia_map = Map(final_image) # Step 1: Pointing correction try: pointing_corrected_map = update_pointing(aia_map) except: pointing_corrected_map = aia_map # Step 2: register (we are skipping PSF deconvolution) registered_map = register(pointing_corrected_map) # Step 3: instrument degradation correction try: corrected_map = correct_degradation(registered_map) except: corrected_map = registered_map # Step 4: Normalize by exposure time normalized_data = corrected_map.data / corrected_map.exposure_time.to(u.s).value normalized_map = Map(normalized_data, corrected_map.meta) if keep_aia_fits is False: os.system(f"rm -rf {final_image}") return normalized_map
[docs] def get_suvi_map(obs_date, obs_time, workdir, wavelength=195, keep_suvi_fits=False): """ Get GOES SUVI map Parameters ---------- obs_date : str Observation date in yyyy-mm-dd format obs_time : str Observation time in hh:mm format workdir : str Work directory wavelength : float, optional Wavelength, options: 94, 131, 171, 195, 284, 304 Å keep_suvi_fits : bool, optional Keep SUVI fits file or not Returns ------- sunpy.map Sunpy SUVIMap """ def list_url_directory(url, ext=""): page = requests.get(url).text soup = BeautifulSoup(page, "html.parser") return [ url + node.get("href") for node in soup.find_all("a") if node.get("href").endswith(ext) ] logging.getLogger("sunpy").setLevel(logging.ERROR) warnings.filterwarnings( "ignore", message="This download has been started in a thread which is not the main thread", ) suvi_wavelengths = [94, 131, 171, 195, 284, 304] if wavelength not in suvi_wavelengths: print("Please provide correct SUVI wavelength from : {suvi_wavelengths}.") return os.makedirs(workdir, exist_ok=True) baseurl1 = "https://data.ngdc.noaa.gov/platforms/solar-space-observing-satellites/goes/goes" baseurl2 = "l2/data" ext = ".fits" spacecraft_numbers = [16, 18] wvln_path = dict( { 94: "suvi-l2-ci094", 131: "suvi-l2-ci131", 171: "suvi-l2-ci171", 195: "suvi-l2-ci195", 284: "suvi-l2-ci284", 304: "suvi-l2-ci304", } ) date_str = "/".join(obs_date.split("-")) urls = [] for spacecraft in spacecraft_numbers: url = f"{baseurl1}{spacecraft}/{baseurl2}/{wvln_path[wavelength]}/{date_str}/" urls.append(url) all_files = [] start_times = [] out_files = [] for url in urls: request = requests.get(url) if not request.status_code == 200: raise Exception("Website not found: " + url) else: for file_name in list_url_directory(url, ext): all_files.append(file_name) file_base = os.path.basename(file_name) out_files.append(file_base) start_times.append(file_base.split("_")[-3]) times_dt = [dt.strptime(t, "s%Y%m%dT%H%M%Sz") for t in start_times] start_time = dt.fromisoformat(f"{obs_date}T{obs_time}") closest_time = min(times_dt, key=lambda t: abs(t - start_time)) pos = times_dt.index(closest_time) download_url = all_files[pos] out_file = out_files[pos] if os.path.exists(out_file) is False: dl = Downloader() dl.enqueue_file(download_url, path=out_file) downloaded_files = dl.download() if len(downloaded_files) > 0: final_image = downloaded_files[0] else: final_image = out_file suvi_map = Map(final_image) if keep_suvi_fits is False: os.system(f"rm -rf {final_image}") return suvi_map return
[docs] def enhance_offlimb(sunpy_map, do_sharpen=True): """ Enhance off-disk emission Parameters ---------- sunpy_map : sunpy.map Sunpy map do_sharpen : bool, optional Sharpen images Returns ------- sunpy.map Off-disk enhanced emission """ from scipy.ndimage import gaussian_filter from sunpy.map.maputils import all_coordinates_from_map logging.getLogger("sunpy").setLevel(logging.ERROR) hpc_coords = all_coordinates_from_map(sunpy_map) r = np.sqrt(hpc_coords.Tx**2 + hpc_coords.Ty**2) / sunpy_map.rsun_obs rsun_step_size = 0.01 rsun_array = np.arange(1, r.max(), rsun_step_size) y = np.array( [ sunpy_map.data[(r > this_r) * (r < this_r + rsun_step_size)].mean() for this_r in rsun_array ] ) pos = np.where(y < 10e-3)[0][0] r_lim = round(rsun_array[pos], 2) params = np.polyfit( rsun_array[rsun_array < r_lim], np.log(y[rsun_array < r_lim]), 1 ) scale_factor = np.exp((r - 1) * -params[0]) scale_factor[r < 1] = 1 if do_sharpen: blurred = gaussian_filter(sunpy_map.data, sigma=3) data = sunpy_map.data + (sunpy_map.data - blurred) else: data = sunpy_map.data scaled_map = Map(data * scale_factor, sunpy_map.meta) scaled_map.plot_settings["norm"] = ImageNormalize(stretch=LogStretch(10)) return scaled_map
[docs] def make_meer_overlay( meerkat_image, suvi_wavelength=195, plot_file_prefix=None, plot_meer_colormap=True, enhance_offdisk=True, contour_levels=[0.05, 0.1, 0.2, 0.4, 0.6, 0.8], do_sharpen_suvi=True, xlim=[-1600, 1600], ylim=[-1600, 1600], extensions=["png"], outdirs=[], ncpu=-1, showgui=False, verbose=False, ): """ Make overlay of MeerKAT image on GOES SUVI image Parameters ---------- meerkat_image : str MeerKAT image suvi_wavelength : float, optional GOES SUVI wavelength, options: 94, 131, 171, 195, 284, 304 Å plot_file_prefix : str, optional Plot file prefix name plot_meer_colormap : bool, optional Plot MeerKAT map colormap enhance_offdisk : bool, optional Enhance off-disk emission contour_levels : list, optional Contour levels in fraction of peak do_sharpen_suvi : bool, optional Do sharpen SUVI images xlim : list, optional X-axis limit in arcsec tlim : list, optional Y-axis limit in arcsec extensions : list, optional Image file extensions outdirs : list, optional Output directories for each extensions ncpu : int, optional Number of CPUs to use showgui : bool, optional Show GUI verbose: bool, optinal Verbose output Returns ------- list Plot file names """ import matplotlib import matplotlib.ticker as ticker import matplotlib.pyplot as plt from sunpy.coordinates import SphericalScreen from matplotlib.colors import ListedColormap from matplotlib import cm from sunpy.map import make_fitswcs_header logging.getLogger("sunpy").setLevel(logging.ERROR) logging.getLogger("reproject.common").setLevel(logging.WARNING) @delayed def reproject_map(smap, target_header): with SphericalScreen(smap.observer_coordinate): return smap.reproject_to(target_header) if showgui: matplotlib.use("TkAgg") else: matplotlib.use("Agg") workdir = os.path.dirname(os.path.abspath(meerkat_image)) meermap = get_meermap(meerkat_image) obs_datetime = fits.getheader(meerkat_image)["DATE-OBS"] obs_date = obs_datetime.split("T")[0] obs_time = ":".join(obs_datetime.split("T")[-1].split(":")[:2]) suvi_map = get_suvi_map(obs_date, obs_time, workdir, wavelength=suvi_wavelength) if enhance_offdisk: suvi_map = enhance_offlimb(suvi_map, do_sharpen=do_sharpen_suvi) projected_coord = SkyCoord( 0 * u.arcsec, 0 * u.arcsec, obstime=suvi_map.observer_coordinate.obstime, frame="helioprojective", observer=suvi_map.observer_coordinate, rsun=suvi_map.coordinate_frame.rsun, ) projected_header = make_fitswcs_header( suvi_map.data.shape, projected_coord, scale=u.Quantity(suvi_map.scale), instrument=suvi_map.instrument, wavelength=suvi_map.wavelength, ) reprojected = [ reproject_map(meermap, projected_header), reproject_map(suvi_map, projected_header), ] if ncpu < 1: ncpu = 1 pool = ThreadPool(processes=ncpu) with dask.config.set(pool=pool): meer_reprojected, suvi_reprojected = compute(*reprojected, scheduler="threads") meertime = meermap.meta["date-obs"].split(".")[0] suvitime = suvi_map.meta["date-obs"].split(".")[0] try: if plot_meer_colormap and len(contour_levels) > 0: matplotlib.rcParams.update({"font.size": 18}) fig = plt.figure(figsize=(16, 8)) ax_colormap = fig.add_subplot(1, 2, 1, projection=suvi_reprojected) ax_contour = fig.add_subplot(1, 2, 2, projection=suvi_reprojected) elif plot_meer_colormap: matplotlib.rcParams.update({"font.size": 14}) fig = plt.figure(figsize=(10, 8)) ax_colormap = fig.add_subplot(projection=suvi_reprojected) elif len(contour_levels) > 0: matplotlib.rcParams.update({"font.size": 14}) fig = plt.figure(figsize=(10, 8)) ax_contour = fig.add_subplot(projection=suvi_reprojected) else: print("No overlay is plotting.") return title = f"SUVI time: {suvitime}\n MeerKAT time: {meertime}" if "transparent_inferno" not in plt.colormaps(): cmap = cm.get_cmap("inferno", 256) colors = cmap(np.linspace(0, 1, 256)) x = np.linspace(0, 1, 256) alpha = 0.8 * (1 - np.exp(-3 * x)) colors[:, -1] = alpha # Update the alpha channel transparent_inferno = ListedColormap(colors) plt.colormaps.register(name="transparent_inferno", cmap=transparent_inferno) if plot_meer_colormap and len(contour_levels) > 0: suptitle = title.replace("\n", ",") title = "" fig.suptitle(suptitle) if plot_meer_colormap: z = 0 suvi_reprojected.plot( axes=ax_colormap, title=title, autoalign=True, clip_interval=(3, 99.9) * u.percent, zorder=z, ) z += 1 meer_reprojected.plot( axes=ax_colormap, title=title, clip_interval=(3, 99.9) * u.percent, cmap="transparent_inferno", zorder=z, ) if len(contour_levels) > 0: z = 0 suvi_reprojected.plot( axes=ax_contour, title=title, autoalign=True, clip_interval=(3, 99.9) * u.percent, zorder=z, ) z += 1 contour_levels = np.array(contour_levels) * np.nanmax(meer_reprojected.data) meer_reprojected.draw_contours( contour_levels, axes=ax_contour, cmap="YlGnBu", zorder=z ) ax_contour.set_facecolor("black") if len(xlim) > 0: x_pix_limits = [] for x in xlim: sky = SkyCoord( x * u.arcsec, 0 * u.arcsec, frame=suvi_reprojected.coordinate_frame ) x_pix = suvi_reprojected.world_to_pixel(sky)[0].value x_pix_limits.append(x_pix) if plot_meer_colormap and len(contour_levels) > 0: ax_colormap.set_xlim(x_pix_limits) ax_contour.set_xlim(x_pix_limits) elif plot_meer_colormap: ax_colormap.set_xlim(x_pix_limits) elif len(contour_levels) > 0: ax_contour.set_xlim(x_pix_limits) if len(ylim) > 0: y_pix_limits = [] for y in ylim: sky = SkyCoord( 0 * u.arcsec, y * u.arcsec, frame=suvi_reprojected.coordinate_frame ) y_pix = suvi_reprojected.world_to_pixel(sky)[1].value y_pix_limits.append(y_pix) if plot_meer_colormap and len(contour_levels) > 0: ax_colormap.set_ylim(y_pix_limits) ax_contour.set_ylim(y_pix_limits) elif plot_meer_colormap: ax_colormap.set_ylim(y_pix_limits) elif len(contour_levels) > 0: ax_contour.set_ylim(y_pix_limits) if plot_meer_colormap and len(contour_levels) > 0: ax_colormap.coords.grid(False) ax_contour.coords.grid(False) elif plot_meer_colormap: ax_colormap.coords.grid(False) elif len(contour_levels) > 0: ax_contour.coords.grid(False) fig.tight_layout() plot_file_list = [] if verbose: print("#######################") if plot_file_prefix: for i in range(len(extensions)): ext = extensions[i] try: savedir = outdirs[i] except BaseException: savedir = workdir plot_file = f"{savedir}/{plot_file_prefix}.{ext}" plt.savefig(plot_file, bbox_inches="tight") if verbose: print(f"Plot saved: {plot_file}") plot_file_list.append(plot_file) if verbose: print("#######################\n") else: plot_file = None if showgui: plt.show() plt.close(fig) else: plt.close(fig) except Exception: traceback.print_exc() finally: plt.close("all") return plot_file_list
############################## # Extract dynamic spectrum ##############################
[docs] def make_ds_file_per_scan(msname, save_file, scan, datacolumn): """ Extract dynamic spectrum from measurement set Parameters ---------- msname : str Measurement set name save_file : str File name to save dynamic spectrum scan : int Scan number datacolumn : str Data column name Returns ------- str Dynamic spectrum file """ if os.path.exists(f"{save_file}.npy") == False: mstool = casamstool() try: all_data = [] for ant in range(5): mstool.open(msname) mstool.selectpolarization(["I"]) mstool.select( {"antenna1": ant, "antenna2": ant, "scan_number": int(scan)} ) data_dic = mstool.getdata(datacolumn) mstool.close() if datacolumn == "CORRECTED_DATA,CORRECTED_DATA-MODEL_DATA": data = np.abs( data_dic["CORRECTED_DATA,CORRECTED_DATA-MODEL_DATA"][0, ...] ) else: data = np.abs(data_dic["data"][0, ...]) del data_dic m = np.nanmedian(data, axis=1) data = data / m[:, None] all_data.append(data) del data except Exception as e: print("Auto-corrrelations are not present. Using short baselines.") count = 0 all_data = [] while count <= 5: for i in range(5): for j in range(5): if i != j: mstool.open(msname) mstool.selectpolarization(["I"]) mstool.select( { "antenna1": i, "antenna2": j, "scan_number": int(scan), } ) data_dic = mstool.getdata(datacolumn) mstool.close() if datacolumn == "CORRECTED_DATA,CORRECTED_DATA-MODEL_DATA": data = np.abs( data_dic[ "CORRECTED_DATA,CORRECTED_DATA-MODEL_DATA" ][0, ...] ) else: data = np.abs(data_dic["data"][0, ...]) del data_dic m = np.nanmedian(data, axis=1) data = data / m[:, None] all_data.append(data) del data count += 1 finally: try: mstool.close() except: pass all_data = np.array(all_data) data = np.nanmedian(all_data, axis=0) bad_chans = get_bad_chans(msname) if bad_chans != "": bad_chans = bad_chans.replace("0:", "").split(";") for bad_chan in bad_chans: s = int(bad_chan.split("~")[0]) e = int(bad_chan.split("~")[-1]) + 1 data[s:e, :] = np.nan msmd = msmetadata() msmd.open(msname) freqs = msmd.chanfreqs(0, unit="MHz") times = msmd.timesforscans(int(scan)) timestamps = [mjdsec_to_timestamp(mjdsec, str_format=0) for mjdsec in times] msmd.close() np.save( save_file, np.array([freqs, times, timestamps, data], dtype="object"), ) if ".npy" in save_file: return save_file else: return f"{save_file}.npy"
[docs] def make_ds_plot(dsfiles, plot_file=None, showgui=False): """ Make dynamic spectrum plot Parameters ---------- dsfile : list DS files list plot_file : str, optional Plot file name to save the plot showgui : bool, optional Show GUI Returns ------- str Plot name """ from matplotlib.gridspec import GridSpec if showgui: matplotlib.use("TkAgg") else: matplotlib.use("Agg") matplotlib.rcParams.update({"font.size": 18}) if type(dsfiles) == str: dsfiles = [dsfiles] for i, dsfile in enumerate(dsfiles): freqs_i, times_i, timestamps_i, data_i = np.load(dsfile, allow_pickle=True) if i == 0: freqs = freqs_i times = times_i timestamps = timestamps_i data = data_i else: gapsize = int( (np.nanmin(times_i) - np.nanmax(times)) / (times[1] - times[0]) ) if gapsize < 10: last_time_median = np.nanmedian(data[:, -1], axis=0) new_time_median = np.nanmedian(data_i[:, 0], axis=0) data_i = (data_i / new_time_median) * last_time_median # Insert vertical NaN gap (1 column wide) gap = np.full((data.shape[0], gapsize), np.nan) data = np.concatenate([data, gap, data_i], axis=1) # Insert dummy time and timestamp times = np.append(times, np.nan) timestamps = np.append(timestamps, "GAP") # Append new values times = np.append(times, times_i) timestamps = np.append(timestamps, timestamps_i) # (Optional) Check or merge freqs if needed — assuming same across files # Normalize by median bandshape median_bandshape = np.nanmedian(data, axis=-1) pos = np.where(np.isnan(median_bandshape) == False)[0] data /= median_bandshape[:, None] data = data[min(pos) : max(pos), :] freqs = freqs[min(pos) : max(pos)] temp_times = times[np.isnan(times) == False] maxtimepos = np.argmax(temp_times) mintimepos = np.argmin(temp_times) datestamp = f"{timestamps[mintimepos].split('T')[0]}" tstart = f"{timestamps[mintimepos].split('T')[0]} {':'.join(timestamps[mintimepos].split('T')[-1].split(':')[:2])}" tend = f"{timestamps[maxtimepos].split('T')[0]} {':'.join(timestamps[maxtimepos].split('T')[-1].split(':')[:2])}" print(f"Time range : {tstart}~{tend}") results = Fido.search( a.Time(tstart, tend), a.Instrument("XRS"), a.Resolution("avg1m") ) files = Fido.fetch(results, path=os.path.dirname(dsfiles[0]), overwrite=False) goes_tseries = TimeSeries(files, concatenate=True) for goes_f in files: os.system(f"rm -rf {goes_f}") goes_tseries = goes_tseries.truncate(tstart, tend) timeseries = np.nanmean(data, axis=0) # Normalization data_std = np.nanstd(data) data_median = np.nanmedian(data) norm = ImageNormalize( data, stretch=LogStretch(1), vmin=0.99 * np.nanmin(data), vmax=0.99 * np.nanmax(data), ) try: # Create figure and GridSpec layout fig = plt.figure(figsize=(18, 10)) gs = GridSpec( nrows=3, ncols=2, width_ratios=[1, 0.03], height_ratios=[4, 1.5, 2] ) # Axes ax_spec = fig.add_subplot(gs[0, 0]) ax_ts = fig.add_subplot(gs[1, 0]) ax_goes = fig.add_subplot(gs[2, 0]) cax = fig.add_subplot(gs[:, 1]) # colorbar spans both rows # Plot dynamic spectrum im = ax_spec.imshow( data, aspect="auto", origin="lower", norm=norm, cmap="magma" ) ax_spec.set_ylabel("Frequency (MHz)") ax_spec.set_xticklabels([]) # Remove x-axis labels from top plot # Y-ticks yticks = ax_spec.get_yticks() yticks = yticks[(yticks >= 0) & (yticks < len(freqs))] ax_spec.set_yticks(yticks) ax_spec.set_yticklabels([f"{freqs[int(i)]:.1f}" for i in yticks]) # Plot time series ax_ts.plot(timeseries) ax_ts.set_xlim(0, len(timeseries) - 1) ax_ts.set_ylabel("Mean \n flux density") goes_tseries.plot(axes=ax_goes) goes_times = goes_tseries.time times_dt = goes_times.to_datetime() ax_goes.set_xlim(times_dt[0], times_dt[-1]) ax_goes.set_ylabel(r"Flux ($\frac{W}{m^2}$)") ax_goes.legend(ncol=2, loc="upper right") ax_goes.set_title("GOES light curve", fontsize=14) ax_ts.set_title("MeerKAT light curve", fontsize=14) ax_spec.set_title("MeerKAT dynamic spectrum", fontsize=14) ax_goes.set_xlabel("Time (UTC)") # Format x-ticks ax_ts.set_xticks([]) ax_ts.set_xticklabels([]) # Colorbar cbar = fig.colorbar(im, cax=cax) cbar.set_label("Flux density (arb. unit)") plt.tight_layout() # Save or show if plot_file: plt.savefig(plot_file, bbox_inches="tight") print(f"Plot saved: {plot_file}") if showgui: plt.show() plt.close(fig) else: plt.close(fig) except Exception: traceback.print_exc() finally: plt.close("all") return plot_file
[docs] def plot_goes_full_timeseries( msname, workdir, plot_file_prefix=None, extension="png", showgui=False ): """ Plot GOES full time series on the day of observation Parameters ---------- msname : str Measurement set workdir : str Work directory plot_file_prefix : str, optional Plot file name prefix extension : str, optional Save file extension showgui : bool, optional Show GUI Returns ------- str Plot file name """ os.makedirs(workdir, exist_ok=True) if showgui: matplotlib.use("TkAgg") else: matplotlib.use("Agg") matplotlib.rcParams.update({"font.size": 14}) scans, cal_scans, f_scans, g_scans, p_scans = get_cal_target_scans(msname) valid_scans = get_valid_scans(msname) filtered_scans = [] for scan in scans: if scan in valid_scans: filtered_scans.append(scan) msmd = msmetadata() msmd.open(msname) tstart_mjd = min(msmd.timesforscan(int(min(filtered_scans)))) tend_mjd = max(msmd.timesforscan(int(max(filtered_scans)))) msmd.close() tstart = mjdsec_to_timestamp(tstart_mjd, str_format=2) tend = mjdsec_to_timestamp(tend_mjd, str_format=2) print(f"Time range: {tstart}~{tend}") results = Fido.search( a.Time(tstart, tend), a.Instrument("XRS"), a.Resolution("avg1m") ) files = Fido.fetch(results, path=workdir, overwrite=False) goes_tseries = TimeSeries(files, concatenate=True) for f in files: os.system(f"rm -rf {f}") fig, ax = plt.subplots(figsize=(15, 5), constrained_layout=True) goes_tseries.plot(axes=ax) times = goes_tseries.time times_dt = times.to_datetime() ax.axvspan(tstart, tend, alpha=0.2) ax.set_xlim(times_dt[0], times_dt[-1]) plt.tight_layout() # Save or show if plot_file_prefix: plot_file = f"{workdir}/{plot_file_prefix}.{extension}" plt.savefig(plot_file, bbox_inches="tight") print(f"Plot saved: {plot_file}") else: plot_file = None if showgui: plt.show() plt.close(fig) plt.close("all") else: plt.close(fig) return plot_file
[docs] def rename_meersolar_image( imagename, imagedir="", pol="", band="", attcal="NOINFO", cutout_rsun=2.5, make_overlay=True, make_plots=True, ): """ Rename and move image to image directory Parameters ---------- imagename : str Image name imagedir : str, optional Image directory (default given image directory) pol : str, optional Stokes parameters band : str, optional Observing band attcal : str, optional Solar attenuation calibrated or not cutout_rsun : float, optional Cutout in solar radii from center (default: 2.5 solar radii) make_overlay : bool, optional Make overlay on SUVI make_plots : bool, optional Make radio map plot in helioprojective coordinates Returns ------- str New imagename with full path """ imagename = imagename.rstrip("/") imagename = cutout_image( imagename, imagename, x_deg=(cutout_rsun * 2 * 16.0) / 60.0 ) header = fits.getheader(imagename) time = header["DATE-OBS"] astro_time = Time(time, scale="utc") sun_jpl = Horizons(id="10", location="500", epochs=astro_time.jd) eph = sun_jpl.ephemerides() sun_coords = SkyCoord( ra=eph["RA"][0] * u.deg, dec=eph["DEC"][0] * u.deg, frame="icrs" ) maxval, minval, rms, total_val, mean_val, median_val, rms_dyn, minmax_dyn = ( calc_solar_image_stat(imagename, disc_size=18) ) with fits.open(imagename, mode="update") as hdul: hdr = hdul[0].header hdr["AUTHOR"] = "DevojyotiKansabanik,DeepanPatra" if band != "": hdr["BAND"] = band hdr["PIPELINE"] = "MeerSOLAR" hdr["CRVAL1"] = sun_coords.ra.deg hdr["CRVAL2"] = sun_coords.dec.deg hdr["MAX"] = maxval hdr["MIN"] = minval hdr["RMS"] = rms hdr["SUM"] = total_val hdr["MEAN"] = mean_val hdr["MEDIAN"] = median_val hdr["RMSDYN"] = rms_dyn hdr["MIMADYN"] = minmax_dyn hdr["ATTCAL"] = str(attcal) freq = round(header["CRVAL3"] / 10**6, 2) t_str = "".join(time.split("T")[0].split("-")) + ( "".join(time.split("T")[-1].split(":")) ) new_name = "time_" + t_str + "_freq_" + str(freq) if pol != "": new_name += "_pol_" + str(pol) if "MFS" in imagename: new_name += "_MFS" new_name = new_name + ".fits" if imagedir == "": imagedir = os.path.dirname(os.path.abspath(imagename)) new_name = imagedir + "/" + new_name os.system("mv " + imagename + " " + new_name) hpcdir = f"{os.path.dirname(imagedir)}/images/hpcs" os.makedirs(hpcdir, exist_ok=True) save_in_hpc(new_name, outdir=hpcdir) if make_plots: try: pngdir = f"{os.path.dirname(imagedir)}/images/pngs" os.makedirs(pngdir, exist_ok=True) outimages, cropped_map = plot_in_hpc( new_name, draw_limb=True, extensions=["png"], outdirs=[pngdir], ) except Exception: pass if make_overlay: try: overlay_pngdir = f"{os.path.dirname(imagedir)}/overlays_pngs" os.makedirs(overlay_pngdir, exist_ok=True) outimages = make_meer_overlay( new_name, plot_file_prefix=os.path.basename(new_name).split(".fits")[0] + "_suvi_meerkat_overlay", extensions=["png"], outdirs=[overlay_pngdir], verbose=False, ) except Exception: pass return new_name
# Expose functions and classes __all__ = [ name for name, obj in globals().items() if ( (isinstance(obj, types.FunctionType) or isinstance(obj, type)) and obj.__module__ == __name__ ) ]