import types
import psutil
import traceback
import tempfile
import time
import glob
import os
from .basic_utils import *
####################
# uDOCKER related
####################
[docs]
def set_udocker_env():
datadir = get_datadir()
udocker_dir = datadir + "/udocker"
os.makedirs(udocker_dir, exist_ok=True)
os.environ["UDOCKER_DIR"] = udocker_dir
os.environ["UDOCKER_TARBALL"] = datadir + "/udocker-englib-1.2.11.tar.gz"
[docs]
def init_udocker():
set_udocker_env()
os.system("udocker install")
[docs]
def check_udocker_container(name):
"""
Check whether a docker container is present or not
Parameters
----------
name : str
Container name
Returns
-------
bool
Whether present or not
"""
set_udocker_env()
pid = os.getpid()
timestamp = int(time.time() * 1000)
tmp1 = f"tmp1_{pid}_{timestamp}.txt"
tmp2 = f"tmp2_{pid}_{timestamp}.txt"
b = os.system(
f"udocker --insecure --quiet inspect " + name + f" >> {tmp1} >> {tmp2}"
)
os.system(f"rm -rf {tmp1} {tmp2}")
if b != 0:
return False
else:
return True
[docs]
def initialize_wsclean_container(name="solarwsclean", update=False):
"""
Initialize WSClean container
Parameters
----------
name : str, optional
Name of the container
update : bool, optional
Update container
Returns
-------
bool
Whether initialized successfully or not
"""
set_udocker_env()
image_name = "devojyoti96/wsclean-solar:latest"
check_cmd = f"udocker images | grep -q '{image_name}'"
image_exists = os.system(check_cmd) == 0
if not image_exists:
with suppress_output():
a = os.system(f"udocker pull {image_name}")
else:
if update:
with suppress_output():
os.system(f"udocker rm {name}")
os.system(f"udocker rmi {image_name}")
print("Re-downloading docker image.")
a = os.system(f"udocker pull {image_name}")
if a == 0:
print("Re-downloaded docker image.")
else:
print("Re-downloading container image is failed.")
return
else:
print(f"Image '{image_name}' already present.")
a = 0
if a == 0:
with suppress_output():
a = os.system(f"udocker create --name={name} {image_name}")
print(f"Container started with name : {name}")
return name
else:
print(f"Container could not be created with name : {name}")
return
[docs]
def initialize_shadems_container(name="solarshadems", update=False):
"""
Initialize shadems container
Parameters
----------
name : str, optional
Name of the container
update : bool, optional
Update container
Returns
-------
bool
Whether initialized successfully or not
"""
set_udocker_env()
image_name = "devojyoti96/shadems:v0.5.4"
check_cmd = f"udocker images | grep -q '{image_name}'"
image_exists = os.system(check_cmd) == 0
if not image_exists:
with suppress_output():
a = os.system(f"udocker pull {image_name}")
else:
if update:
with suppress_output():
os.system(f"udocker rm {name}")
os.system(f"udocker rmi {image_name}")
print("Re-downloading docker image.")
a = os.system(f"udocker pull {image_name}")
if a == 0:
print("Re-downloaded docker image.")
else:
print("Re-downloading container image is failed.")
return
else:
print(f"Image '{image_name}' already present.")
a = 0
if a == 0:
with suppress_output():
a = os.system(f"udocker create --name={name} {image_name}")
print(f"Container started with name : {name}")
return name
else:
print(f"Container could not be created with name : {name}")
return
[docs]
def run_wsclean(
wsclean_cmd,
container_name="solarwsclean",
check_container=False,
verbose=False,
):
"""
Run WSClean inside a udocker container (no root permission required).
Parameters
----------
wsclean_cmd : str
Full WSClean command as a string.
container_name : str, optional
Container name
check_container : bool, optional
Check container presence or not
verbose : bool, optional
Verbose output or not
Returns
-------
int
Success message
"""
set_udocker_env()
pid = os.getpid()
timestamp = int(time.time() * 1000)
tmp1 = f"tmp1_{pid}_{timestamp}.txt"
tmp2 = f"tmp2_{pid}_{timestamp}.txt"
def show_file(path):
try:
print(open(path).read())
except Exception as e:
print(f"Error: {e}")
if check_container:
container_present = check_udocker_container(container_name)
if not container_present:
container_name = initialize_wsclean_container(name=container_name)
if container_name is None:
print(
f"Container {container_name} is not initiated. First initiate container and then run."
)
return 1
msname = wsclean_cmd.split(" ")[-1]
msname = os.path.abspath(msname)
mspath = os.path.dirname(msname)
temp_docker_path = tempfile.mkdtemp(prefix="wsclean_udocker_", dir=mspath)
wsclean_cmd_args = wsclean_cmd.split(" ")[:-1]
if "-fits-mask" in wsclean_cmd_args:
index = wsclean_cmd_args.index("-fits-mask")
name = wsclean_cmd_args[index + 1]
namedir = os.path.dirname(os.path.abspath(name))
basename = os.path.basename(os.path.abspath(name))
wsclean_cmd_args.remove(name)
wsclean_cmd_args.insert(index + 1, temp_docker_path + "/" + basename)
if "-name" not in wsclean_cmd_args:
wsclean_cmd_args.append(
"-name " + temp_docker_path + "/" + os.path.basename(msname).split(".ms")[0]
)
else:
index = wsclean_cmd_args.index("-name")
name = wsclean_cmd_args[index + 1]
namedir = os.path.dirname(os.path.abspath(name))
basename = os.path.basename(os.path.abspath(name))
wsclean_cmd_args.remove(name)
wsclean_cmd_args.insert(index + 1, temp_docker_path + "/" + basename)
if "-temp-dir" not in wsclean_cmd_args:
wsclean_cmd_args.append("-temp-dir " + temp_docker_path)
else:
index = wsclean_cmd_args.index("-temp-dir")
name = os.path.abspath(wsclean_cmd_args[index + 1])
wsclean_cmd_args.remove(name)
wsclean_cmd_args.insert(index + 1, temp_docker_path)
wsclean_cmd = (
" ".join(wsclean_cmd_args)
+ " "
+ temp_docker_path
+ "/"
+ os.path.basename(msname)
)
try:
full_command = f"udocker run --nobanner --volume={mspath}:{temp_docker_path} --workdir {temp_docker_path} {container_name} {wsclean_cmd}"
if not verbose:
full_command += f" >> {mspath}/{tmp1} "
else:
print(wsclean_cmd + "\n")
exit_code = os.system(full_command)
if exit_code != 0:
print("##########################")
print(os.path.basename(msname))
print("##########################")
show_file(f"{mspath}/{tmp1}")
os.system(f"rm -rf {temp_docker_path} {mspath}/{tmp1}")
return 0 if exit_code == 0 else 1
except Exception as e:
os.system(f"rm -rf {temp_docker_path}")
traceback.print_exc()
return 1
[docs]
def run_solar_sidereal_cor(
msname="",
only_uvw=False,
container_name="solarwsclean",
check_container=False,
verbose=False,
):
"""
Run chgcenter inside a udocker container to correct solar sidereal motion (no root permission required).
Parameters
----------
msname : str
Name of the measurement set
only_uvw : bool, optional
Update only UVW values
Note: This is required when visibilities are properly phase rotated in correlator to track the Sun,
but while creating the MS, UVW values are estimated using the first phasecenter of the Sun.
check_container : bool, optional
Check container
container_name : str, optional
Container name
verbose : bool, optional
Verbose output or not
Returns
-------
int
Success message
"""
set_udocker_env()
pid = os.getpid()
timestamp = int(time.time() * 1000)
tmp1 = f"tmp1_{pid}_{timestamp}.txt"
tmp2 = f"tmp2_{pid}_{timestamp}.txt"
if check_container:
container_present = check_udocker_container(container_name)
if not container_present:
container_name = initialize_wsclean_container(name=container_name)
if container_name is None:
print(
f"Container {container_name} is not initiated. First initiate container and then run."
)
return 1
msname = os.path.abspath(msname)
mspath = os.path.dirname(msname)
temp_docker_path = tempfile.mkdtemp(prefix="chgcenter_udocker_", dir=mspath)
if only_uvw:
cmd = (
"chgcentre -only-uvw -solarcenter "
+ temp_docker_path
+ "/"
+ os.path.basename(msname)
)
else:
cmd = (
"chgcentre -solarcenter "
+ temp_docker_path
+ "/"
+ os.path.basename(msname)
)
try:
full_command = f"udocker --quiet run --nobanner --volume={mspath}:{temp_docker_path} --workdir {temp_docker_path} solarwsclean {cmd}"
if not verbose:
full_command += f" >> {tmp1} >> {tmp2}"
else:
print(cmd)
with suppress_output():
exit_code = os.system(full_command)
os.system(f"rm -rf {temp_docker_path} {tmp1} {tmp2}")
return 0 if exit_code == 0 else 1
except Exception as e:
os.system(f"rm -rf {temp_docker_path} {tmp1} {tmp2}")
traceback.print_exc()
return 1
[docs]
def run_chgcenter(
msname,
ra,
dec,
only_uvw=False,
container_name="solarwsclean",
check_container=False,
verbose=False,
):
"""
Run chgcenter inside a udocker container (no root permission required).
Parameters
----------
msname : str
Name of the measurement set
ra : str
RA can either be 00h00m00.0s or 00:00:00.0
dec : str
Dec can either be 00d00m00.0s or 00.00.00.0
only_uvw : bool, optional
Update only UVW values
Note: This is required when visibilities are properly phase rotated in correlator,
but while creating the MS, UVW values are estimated using a wrong phase center.
check_container : bool, optional
Check container
container_name : str, optional
Container name
verbose : bool, optional
Verbose output
Returns
-------
int
Success message
"""
set_udocker_env()
pid = os.getpid()
timestamp = int(time.time() * 1000)
tmp1 = f"tmp1_{pid}_{timestamp}.txt"
tmp2 = f"tmp2_{pid}_{timestamp}.txt"
if check_container:
container_present = check_udocker_container(container_name)
if not container_present:
container_name = initialize_wsclean_container(name=container_name)
if container_name is None:
print(
f"Container {container_name} is not initiated. First initiate container and then run."
)
return 1
msname = os.path.abspath(msname)
mspath = os.path.dirname(msname)
temp_docker_path = tempfile.mkdtemp(prefix="chgcenter_udocker_", dir=mspath)
if only_uvw:
cmd = (
"chgcentre -only-uvw "
+ temp_docker_path
+ "/"
+ os.path.basename(msname)
+ " "
+ ra
+ " "
+ dec
)
else:
cmd = (
"chgcentre "
+ temp_docker_path
+ "/"
+ os.path.basename(msname)
+ " "
+ ra
+ " "
+ dec
)
try:
full_command = f"udocker --quiet run --nobanner --volume={mspath}:{temp_docker_path} --workdir {temp_docker_path} {container_name} {cmd}"
if not verbose:
full_command += f" >> {tmp1} >> {tmp2}"
else:
print(cmd)
exit_code = os.system(full_command)
os.system(f"rm -rf {temp_docker_path} {tmp1} {tmp2}")
return 0 if exit_code == 0 else 1
except Exception as e:
os.system(f"rm -rf {temp_docker_path} {tmp1} {tmp2}")
traceback.print_exc()
return 1
[docs]
def run_shadems(
cmd,
container_name="solarshadems",
check_container=False,
verbose=False,
):
"""
Run shadems inside a udocker container (no root permission required).
Parameters
----------
cmd : str
Shadems command
container_name : str, optional
Container name
check_container : bool, optional
Check container
verbose : bool, optional
Verbose output
Returns
-------
int
Success message
"""
set_udocker_env()
pid = os.getpid()
if check_container:
container_present = check_udocker_container(container_name)
if not container_present:
container_name = initialize_shadems_container(name=container_name)
if container_name is None:
print(
f"Container {container_name} is not initiated. First initiate container and then run."
)
return 1
splited_cmd = cmd.split(" ")
if splited_cmd[-1] in ["-h", "--help"]:
verbose = True
datapath = os.getcwd()
else:
msname = splited_cmd[-1]
datapath = os.path.dirname(os.path.abspath(msname))
temp_docker_path = tempfile.mkdtemp(prefix="shadems_udocker_", dir=datapath)
if splited_cmd[-1] not in ["-h", "--help"]:
cmd = f"{' '.join(splited_cmd[:-1])} {temp_docker_path}/{os.path.basename(msname)}"
try:
full_command = f"udocker --quiet run --nobanner --volume={datapath}:{temp_docker_path} --workdir {temp_docker_path} {container_name} {cmd}"
if not verbose:
with suppress_output():
exit_code = os.system(full_command)
else:
print(cmd)
exit_code = os.system(full_command)
return 0 if exit_code == 0 else 1
except Exception as e:
traceback.print_exc()
return 1
finally:
os.system(f"rm -rf {temp_docker_path}")
return
# Expose functions and classes
__all__ = [
name
for name, obj in globals().items()
if (
(isinstance(obj, types.FunctionType) or isinstance(obj, type))
and obj.__module__ == __name__
)
]