#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script that runs the VASCA pipeline.
"""
import argparse
import os
import sys
from multiprocessing import Pool
from pathlib import Path
from importlib.resources import files
import yaml
from astropy import units as uu
from astropy.table import unique
from loguru import logger
from vasca.region import Region
from vasca.tables_dict import dd_vasca_tables
from vasca.tables import TableCollection
from vasca.utils import dd_obs_id_add, get_config
from vasca.field import BaseField
import numpy as np
[docs]
def set_pipe_dir(vasca_cfg: dict) -> Path:
"""Creates the pipeline output directory and necessary parent directories.
:returns: Path object pointing to the pipeline directory
:rtype: :py:class:`~pathlib.Path`
"""
# Setup root output directory
out_dir_base: Path = Path(
files("vasca").parent / vasca_cfg["general"]["out_dir_base"]
)
out_dir_base.mkdir(parents=True, exist_ok=True)
# Setup pipeline directory
pipe_dir: Path = out_dir_base / vasca_cfg["general"]["name"]
pipe_dir.mkdir(parents=True, exist_ok=True)
return pipe_dir
[docs]
def set_logger(vasca_cfg):
"""
Setup logger. Gets configuration from global config variable set with get_config.
Parameters
----------
vasca_cfg: dict
Dictionary with the VASCA configuration, typically from a YAML file
Returns
-------
None
"""
log_dir = (
vasca_cfg["general"]["out_dir_base"] + "/" + vasca_cfg["general"]["name"] + "/"
)
os.makedirs(log_dir, exist_ok=True)
# create log file name
log_file_name = "log_" + vasca_cfg["general"]["name"] + ".log"
if vasca_cfg["general"]["log_file"] != "default":
log_file_name = vasca_cfg["general"]["log_file"]
log_cfg = {
"handlers": [
{
"sink": sys.stdout,
"format": "<green>{time:YYYY-MM-DD HH:mm:ss.SSSS}</green> "
"<cyan>{name}</cyan>:<cyan>{line}</cyan> |"
"<level>{level}:</level> {message}",
"level": vasca_cfg["general"]["log_level"],
"colorize": True,
"backtrace": True,
"diagnose": True,
},
],
}
logger.configure(**log_cfg)
logger.add(log_dir + log_file_name)
logger.enable("vasca")
logger.info("Runing '" + __file__ + "'")
logger.debug("Config. file: '" + vasca_cfg["cfg_file"] + "'")
logger.debug("Output log. file: '" + log_dir + log_file_name + "'")
[docs]
def keep_base_field(field):
"""
Remove field data that is not needed for further analysis to save memory
Parameters
----------
field: vasca.field.BaseField
VASCA field
Returns
-------
None
"""
# Remove detections which did not pass selections and where not used in clustering
field.remove_unselected("tt_detections")
if hasattr(field, "tt_coadd_detections"):
field.remove_unselected("tt_coadd_detections")
# Remove image data
field.ref_img = None
field.ref_wcs = None
field.vis_img = None
# Keep only base class columns
for tt_name in field._table_names:
cols = dd_vasca_tables["base_field"][tt_name]["names"]
field.__dict__[tt_name] = field.__dict__[tt_name][cols]
[docs]
def run_field(obs_nr, field_id, rg, vasca_cfg):
"""
Run analysis on a single field
Parameters
----------
obs_nr : int
Observation number in the config file.
field_id : str
Field ID of the field to run
rg : vasca.region.Region
Region of the VASCA pipeline.
vasca_cfg : dict
VASCA configuration
Returns
-------
vasca.field.BaseField
Modified field with results
"""
logger.info("Analysing field:" + str(field_id))
try:
field = rg.get_field(
field_id=field_id,
load_method="VASCA",
mast_products=vasca_cfg["resources"]["load_products"],
field_kwargs=vasca_cfg["resources"]["field_kwargs"],
)
except Exception as e:
logger.exception(
f"Faild to run pipeline for field '{field_id}'. "
f"Returning None. Exception:\n {e}"
)
return None
# Create directory structure for fields
field_out_dir = (
vasca_cfg["general"]["out_dir_base"]
+ "/"
+ vasca_cfg["general"]["name"]
+ "/fields/"
)
# Create folder
os.makedirs(field_out_dir, exist_ok=True)
# Get configuration for this field
obs_cfg = vasca_cfg["observations"][obs_nr]
# Apply detections selections
# Visit detections
field.select_rows(obs_cfg["selection"]["det_quality"], remove_unselected=False)
# Coadd detections
if hasattr(field, "tt_coadd_detections"):
field.select_rows(
obs_cfg["selection"]["coadd_det_quality"], remove_unselected=False
)
# Cluster and do source stats
if len(field.tt_detections) > 0:
# Run clustering
logger.info("Clustering field detections")
field.cluster_meanshift(
**obs_cfg["cluster_det"]["meanshift"],
)
# Calculate source variables from light curve
# field.set_src_stats(src_id_name="fd_src_id")
# Select detections used in clustering
field.select_rows(obs_cfg["selection"]["det_association"], remove_unselected=False)
# Write out field
field.write_to_fits(field_out_dir + "field_" + field.field_id + ".fits")
# Keep only data needed for further analysis
keep_base_field(field)
logger.info("Done with field:" + str(field_id))
return field
[docs]
def run_field_docs(
obs_nr: int, field_id: str, rg: Region, vasca_cfg: dict
) -> BaseField | None:
"""Run field-level analysis (docs-version)
:par obs_nr: Observation list index
:type obs_nr: int
:par field_id: Field ID
:type field_id: str, int
:par rg: VASCA region object from where the fields are loaded
:type rg: :py:class:`~vasca.region.Region`
:par vasca_cfg: Pipeline configuration
:type vasca_cfg: dict
"""
logger.info(f"Analysing field: {field_id}")
# Load field from VASCA-field file
try:
field: BaseField = rg.get_field(
field_id=field_id,
load_method="VASCA",
mast_products=vasca_cfg["resources"]["load_products"],
field_kwargs=vasca_cfg["resources"]["field_kwargs"],
)
except Exception as e:
logger.exception(
f"Faild to run pipeline for field '{field_id}'. "
f"Returning None. Exception:\n {e}"
)
return None
# Get configuration for this field
obs_cfg = vasca_cfg["observations"][obs_nr]
# Apply detections selections
# Visit detections
field.select_rows(obs_cfg["selection"]["det_quality"], remove_unselected=False)
# Coadd detections
if hasattr(field, "tt_coadd_detections"):
field.select_rows(
obs_cfg["selection"]["coadd_det_quality"], remove_unselected=False
)
# Clustering
if len(field.tt_detections) > 0:
# Run clustering
logger.info(f"Clustering field detections ({field_id})")
field.cluster_meanshift(
**obs_cfg["cluster_det"]["meanshift"],
)
# Select detections used in clustering
field.select_rows(obs_cfg["selection"]["det_association"], remove_unselected=False)
# Create directory structure for fields in pipeline output location
field_out_dir: Path = set_pipe_dir(vasca_cfg) / "fields"
Path(field_out_dir).mkdir(parents=True, exist_ok=True)
# Write out VASCA-field
field.write_to_fits(field_out_dir / f"field_{field.field_id}.fits")
# Memory efficiency: keep only data needed for further analysis
keep_base_field(field)
logger.info(f"Done with field: {field_id}")
return field
[docs]
def run_cluster_fields(meanshift_cfg, tt_fd_src, tt_fd_det=None, cluster_coadd=False):
"""
Helper function to do clustering for passed sources or detections.
The main purpose is to only keep the needed tables, to save memory usage
Parameters
----------
meanshift_cfg: dict
Mean shift parameters
tt_fd_src: astropy.table.Table
Table with sources
tt_fd_det: astropy.table.Table
Table with detections
cluster_coadd: bool
Clustering coadd sources?
Returns
-------
vasca.tables.TableCollection
Table collection with calculated clusters
"""
tc = TableCollection()
if cluster_coadd:
tc.add_table(tt_fd_src, "tt_coadd_detections")
else:
tc.add_table(tt_fd_src, "tt_sources")
tc.add_table(tt_fd_det, "tt_detections")
tc.cluster_meanshift(**meanshift_cfg)
return tc
[docs]
def run(vasca_cfg):
"""
Runs the VASCA pipeline
Parameters
----------
vasca_cfg : dict
VASCA pipeline configuration dictionary
Returns
-------
None
"""
# Setup logger
set_logger(vasca_cfg)
# Load region fields
rg = Region.load_from_config(vasca_cfg)
# Setup output directory
os.makedirs(rg.region_path, exist_ok=True)
# Set field selection to False and only back to Trua below if it contains detections
rg.tt_fields["sel"] = np.zeros(len(rg.tt_fields), dtype=bool)
rg.tt_fields.add_index("field_id")
# Ifrunnign for the first time, or with new field settings, run fields
if vasca_cfg["general"]["run_fields"]:
# Prepare fields to run in parallel
fd_pars = list() # List of observations and field_ids in the config file
vobs = vasca_cfg["observations"]
for obs_nr in range(len(vobs)):
for field_nr in vobs[obs_nr]["obs_field_ids"]:
field_id = dd_obs_id_add[
vobs[obs_nr]["observatory"] + vobs[obs_nr]["obs_filter"]
] + str(field_nr)
fd_pars.append([obs_nr, field_id, rg, vasca_cfg])
# Run each field in a separate process in parallel
nr_cpus = vasca_cfg["general"]["nr_cpus"]
logger.info(f"Analyzing {len(fd_pars)} fields on {nr_cpus} parallel threads.")
with Pool(processes=nr_cpus) as pool:
pool_return = pool.starmap(run_field, fd_pars)
pool.join()
logger.info("Done analyzing individual fields.")
# update region fields
for field in pool_return:
# Check if field was filled or is empty for any reason
if hasattr(field, "tt_detections") and len(field.tt_detections) > 0:
rg.fields[field.field_id] = field
fd_idx = rg.tt_fields.loc_indices["field_id", field.field_id]
rg.tt_fields["sel"][fd_idx] = True
logger.info(f"Added field {field.field_id} from pool to region")
else:
logger.warning(
"Ignoring field, as it was empty or had no detections from the pool"
)
else:
for rg_fd_id in rg.tt_fields["rg_fd_id"]:
field = rg.get_field(rg_fd_id=rg_fd_id, load_method="FITS", add_field=False)
if hasattr(field, "tt_detections") and len(field.tt_detections) > 0:
# Keep only data needed for further analysis
keep_base_field(field)
rg.fields[field.field_id] = field
fd_idx = rg.tt_fields.loc_indices["field_id", field.field_id]
rg.tt_fields["sel"][fd_idx] = True
logger.info(f"Added field: {field.field_id} from file")
else:
logger.warning(
"Ignoring field, as it was empty or had no detections from the file"
)
# Add field tables to region
# For visits merge obs_filter_id and remove doubles
rg.add_table_from_fields("tt_visits")
rg.tt_visits = unique(rg.tt_visits, keys="vis_id")
rg.add_table_from_fields("tt_sources")
rg.add_table_from_fields("tt_detections", only_selected=False)
if vasca_cfg["resources"]["coadd_exists"]:
rg.add_table_from_fields("tt_coadd_detections")
del rg.fields # All that was needed has been transferred to region tables
# ---Run second clustering step
# Cluster field sources and co-adds in parallel
if vasca_cfg["resources"]["coadd_exists"]:
ll_cluster = [
[
vasca_cfg["cluster_src"]["meanshift"],
rg.tt_sources,
rg.tt_detections,
False,
],
[
vasca_cfg["cluster_coadd_dets"]["meanshift"],
rg.tt_coadd_detections,
False,
True,
],
]
with Pool(processes=2) as pool:
pool_return = pool.starmap(run_cluster_fields, ll_cluster)
pool.join()
# Copy parallelized results into original region
for pool_rg in pool_return:
if "tt_sources" in pool_rg._table_names:
rg.tt_sources = pool_rg.tt_sources
rg.tt_detections = pool_rg.tt_detections
else:
rg.add_table(pool_rg.tt_coadd_sources, "region:tt_coadd_sources")
rg.tt_coadd_detections = pool_rg.tt_coadd_detections
else:
rg.cluster_meanshift(**vasca_cfg["cluster_src"]["meanshift"])
# Calculate source statistics
rg.set_src_stats(src_id_name="rg_src_id")
if vasca_cfg["resources"]["coadd_exists"]:
rg.set_src_stats(src_id_name="coadd_src_id")
# Add hardness ratio (default obs_filter_id 1 & 2 for GALEX)
rg.set_hardness_ratio()
# Match sources to coadd sources
if vasca_cfg["resources"]["coadd_exists"]:
rg.cross_match(
dist_max=vasca_cfg["assoc_src_coadd"]["dist_max"] * uu.arcsec,
dist_s2n_max=vasca_cfg["assoc_src_coadd"]["dist_s2n_max"],
)
# Select variable sources, deselect all sources before
rg.tt_sources["sel"] = False
rg.select_from_config(vasca_cfg["selection_src"])
rg.synch_src_sel(remove_unselected=False)
# Set source id table
rg.set_src_id_info()
# Write out region
fname_base = rg.region_path + "/region_" + vasca_cfg["general"]["name"]
rg.write_to_fits(file_name=fname_base + ".fits")
# Write out reduced catalog region
rc = rg.get_region_catalog()
rc.write_to_fits(file_name=fname_base + "_cat.fits")
# Write used config file
yaml_out_name = (
rg.region_path + "/cfg_ran_" + vasca_cfg["general"]["name"] + ".yaml"
)
with open(yaml_out_name, "w") as yaml_file:
yaml.dump(vasca_cfg, yaml_file)
logger.info("Done running VASCA pipeline.")
[docs]
def run_from_file():
"""
Run VASCA pipeline from a configuration file passed with the ArgumentParser
Returns
-------
None
"""
# Argument parsing
parser = argparse.ArgumentParser()
parser.add_argument(
"cfg",
metavar="cfg",
type=str,
default=os.getcwd() + "/vasca_cfg.yaml",
help="Path to the pipeline configuration file.",
)
parser.add_argument(
"-u",
"--umask",
type=str,
default="003",
help=(
"Set the umask. Default is 003, user and group have full permissions, "
"other can only read."
),
)
args = parser.parse_args()
# Sets the file and directory permissions
os.umask(int(f"0o{args.umask}", 0))
print(
50 * "-"
+ f"\n Running vasca_pipe.py with configuration file:\n {args.cfg}\n"
+ 50 * "-"
)
vasca_cfg = get_config(args.cfg)
run(vasca_cfg)
if __name__ == "__main__":
run_from_file()