Source code for vasca.source

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Jan 19 10:17:54 2023

@author: buehler
"""

import os
import numpy as np
from astropy import constants as cc
from astropy import units as uu
from astropy.table import Table, vstack
from astropy.coordinates import SkyCoord
from loguru import logger
from astroquery.sdss import SDSS

from vasca.tables import TableCollection
from vasca.utils import (
    query_vizier_sed,
    dd_id2filter,
    dd_filter2id,
    dd_filter2wavelength,
    mag2flux,
    tgalex_to_astrotime,
    get_var_stat,
    get_lc_from_gphoton_npfile,
)
from vasca.resource_manager import ResourceManager

rm = ResourceManager()


[docs] class Source(TableCollection): """Class to store all VASCA information for one particular source. This class is for convenience, the same data is also found in the Field and Region classes containing this source. """ def __init__(self) -> None: """ Many class attributes are stored in :py:class:`~astropy.table.Table`. """ # Sets skeleton super().__init__()
[docs] def add_vizier_SED(self, vizier_radius: uu.Quantity = 1 * uu.arcsec) -> None: """Add spectral energy distribution table (tt_sed) with all spectral points from VizieR within given radius. Uses :py:func:`~vasca.utils.query_vizier_sed()` :param vizier_radius: Radius within which to add flux points from VizieR (Default value = 1 * uu.arcsec) :type vizier_radius: :py:class:`~astropy.units.Quantity` """ # Search for Vizier flux around source, or simbad associated source if present ra, dec = self.tt_sources["ra"][0], self.tt_sources["dec"][0] if "tt_simbad" in self._table_names: ra, dec = self.tt_simbad["ra"][0], self.tt_simbad["dec"][0] self.add_table(None, "region:tt_sed") try: tt_vizier = query_vizier_sed( ra, dec, radius=vizier_radius.to(uu.arcsec).value ) # Add columns in right formats for tt_sed later tt_vizier["wavelength"] = (cc.c / tt_vizier["sed_freq"]).to(uu.AA) tt_vizier["flux"] = tt_vizier["sed_flux"].quantity.to(uu.Unit("1e-6 Jy")) tt_vizier["flux_err"] = tt_vizier["sed_eflux"].quantity.to( uu.Unit("1e-6 Jy") ) # tt_vizier.sort("wavelength") # tt_vizier.pprint_all() # Add Vizier info to SED table for row in tt_vizier: ll_flt = str(row["sed_filter"]).split(":") # Check if Vizier data ok if ( np.isnan(row["flux"]) or np.isnan(row["flux_err"]) or len(ll_flt) != 2 or len(ll_flt[0]) == 0 or len(ll_flt[1]) == 0 ): logger.warning("Skipping row as flux contains nan") else: obs, flt = ll_flt dd_vdat = { "flux": row["flux"], "flux_err": row["flux_err"], "wavelength": row["wavelength"], "observatory": obs, "obs_filter": flt, "origin": row["_tabname"], } self.tt_sed.add_row(dd_vdat) except Exception: print("No entries found in Vizier database") # Add mean VASCA flux for all filters flt_ids = np.array(self.tt_sources["obs_filter_id"]).flatten() for flt_idx in range(len(flt_ids)): flt_id = self.tt_sources["obs_filter_id"][:, flt_idx][0] if self.tt_sources["flux"].quantity[:, flt_idx][0] > 0: obs_flt = dd_id2filter[flt_id] dd_vdat = { "flux": self.tt_sources["flux"][:, flt_idx][0], "flux_err": self.tt_sources["flux_err"][:, flt_idx][0], "wavelength": dd_filter2wavelength[obs_flt], "observatory": "GALEX", # TODO: make this general "obs_filter": obs_flt, "origin": "VASCA", } self.tt_sed.add_row(dd_vdat) # Sort by wavelength self.tt_sed.sort("wavelength")
[docs] def add_gphoton_lc(self, s2n_min: float = 3.0, tbin: int = -1) -> None: """Add light curve from gPhoton. Only include points with no flags. Assumes gPhoton flux is given for a 6 arcsec aperture. :param s2n_min: Minimum significance of points for selection in light curve. (Default value = 3.0) :type s2n_min: float, optional :param tbin_name: :type tbin_name: int, optional :param tbin: (Default value = -1) """ # Get location of gphoton files gphot_dir = rm.get_path("gal_gphoton", "sas_cloud") # Prepare info for file reading rg_src_id = self.tt_sources["rg_src_id"][0] ra_src = round(self.tt_sources["ra"][0], 3) dec_src = round(self.tt_sources["dec"][0], 3) tname = "" if tbin < 0 else "_" + str(tbin) # Check if NUV file is present and load it, this is requires fname_nuv = ( gphot_dir + "/gPhoton_ra" + str(ra_src) + "_dec" + str(dec_src) + "_nuv" + tname + "_app.npy" ) if os.path.exists(fname_nuv): tt_lc = get_lc_from_gphoton_npfile(fname_nuv, "NUV") else: logger.warning(f"gPhoton file not found {fname_nuv}") return # If FUV file present add it to table fname_fuv = fname_nuv.replace("_nuv", "_fuv") if os.path.exists(fname_fuv): tt_lc_fuv = get_lc_from_gphoton_npfile(fname_fuv, "FUV") tt_lc = vstack([tt_lc, tt_lc_fuv]) # Add light curve table self.add_table(tt_lc, "region:tt_gphoton_lc") # Modify selection sel = (self.tt_gphoton_lc["flags"] < 0.5) * ( self.tt_gphoton_lc["s2n"] > s2n_min ) self.tt_gphoton_lc["sel"] = sel # Calculate variability statistic for each filter and add it to tt_source tt_lc = tt_lc[sel] filter_ids = np.sort(np.unique(tt_lc["obs_filter_id"].data)) dd_gp_var = {"rg_src_id": [rg_src_id], "nr_det": [[]]} for flt_id in filter_ids: # Get stats variables and write them all to dictionary sel_flt = tt_lc["obs_filter_id"] == flt_id dd_var = get_var_stat(tt_lc["flux"][sel_flt], tt_lc["flux_err"][sel_flt]) for var, val in dd_var.items(): if var in dd_gp_var.keys(): dd_gp_var[var][0].append(val) else: dd_gp_var[var] = [[val]] # Add number of bins dd_gp_var["nr_det"][0].append(sel_flt.sum()) self.add_table(dd_gp_var, "tt_gphoton_stats")
[docs] def add_spectrum(self, search_radius: uu.Quantity = 2 * uu.arcsec) -> None: """Get spectrum from SDSS, if available :param search_radius: Search radius around multifrequency counterpart, or if this does not excist, around VASCA position. (Default value = 2 * uu.arcsec) :type search_radius: :py:class:`~astropy.units.Quantity`, optional """ # Prepare spectral query data ra, dec = self.tt_sources["ra"].quantity[0], self.tt_sources["dec"].quantity[0] if "tt_simbad" in self._table_names: ra, dec = ( self.tt_simbad["ra"].quantity[0], self.tt_simbad["dec"].quantity[0], ) pos = SkyCoord(ra, dec, frame="icrs") # Get spectrum from SDSS tt_xid = SDSS.query_region(pos, radius=search_radius, spectro=True) if type(tt_xid) != type(None): ll_sp = SDSS.get_spectra(matches=tt_xid) for ii in range(len(ll_sp)): hdu_spec = ll_sp[ii] tt_spec = Table(hdu_spec["COADD"].data) c_Aps = cc.c.to(uu.AA / uu.s) spec_flux = tt_spec["flux"] * 1e-17 * uu.erg / (uu.cm**2 * uu.s * uu.AA) model_flux = ( tt_spec["model"] * 1e-17 * uu.erg / (uu.cm**2 * uu.s * uu.AA) ) spec_wave = np.power(10, tt_spec["loglam"]) * uu.AA dd_spec = { "wavelength": spec_wave, "flux": (spec_flux * spec_wave**2 / c_Aps).to(uu.Unit("1e-6 Jy")), "flux_model": (model_flux * spec_wave**2 / c_Aps).to( uu.Unit("1e-6 Jy") ), "s2n": tt_spec["flux"] * np.sqrt(tt_spec["ivar"]), } # Add data to table collection tt_ii = self.table_from_template(dd_spec, "region:tt_spectrum") # Select only significant flux pponts tt_ii["sel"] = tt_ii["s2n"] > 2 self.add_table(tt_ii, "tt_spectrum_" + str(ii)) else: logger.warning("No spectrum found")