Source code for vasca.field

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Field classes for VASCA
"""

import os
import time
import warnings
from datetime import datetime
from glob import glob

import numpy as np
from astropy import units as uu
from astropy.coordinates import SkyCoord
from astropy.io import fits
from astropy.table import Table, unique, vstack
from astropy.time import Time
from astropy.utils.exceptions import AstropyWarning
from astropy.wcs import wcs
from astroquery.mast import Observations
from loguru import logger
from regions import CircleSkyRegion
from requests.exceptions import HTTPError

from vasca.resource_manager import ResourceManager
from vasca.tables import TableCollection
from vasca.tables_dict import dd_vasca_columns
from vasca.utils import dd_filter2id, get_field_id, name2id

# global paths
# path to the dir. of this file
FILE_DIR = os.path.dirname(os.path.abspath(__file__))
ROOT_DIR = FILE_DIR + "/../"  # path to the root directory of the repository

dimless = uu.dimensionless_unscaled
# conf.replace_warnings = ["always"]


[docs] class BaseField(TableCollection): """ Class that defines the basic data structure for field-based analysis. One *field* is generally the area in the sky covered by a telescope in one observation. A field is generally composed of several *visits* of the telescope at different times. This class contains the main functionality for source detection and drawing. To be inherited by field analysis classes, which can then be tailored to the needs of the observatories supported by the VASCA pipeline. """ def __init__(self): """ Many class attributes are stored in astropy.table.Table_. To see a description of each of their columns run :meth: `~vasca.field.BaseField.info`. .. _astropy.table.Table: https://docs.astropy.org/en/stable/api/astropy.table.Table.html Returns ------- None """ # Sets skeleton super().__init__() #: Internal dictionary of important parameters #: with corresponding tables #: to be set as class attributes for convenience. self._dd_attr_names = { "tt_fields": [ "field_id", "field_name", "ra", "dec", "fov_diam", "observatory", "obs_filter", "center", ], "tt_visits": [ "nr_vis", "time_bin_size_sum", "time_start", "time_stop", ], } #: Reference image self.ref_img = None #: Reference WCS self.ref_wcs = None #: Visit images (assumes same WCS as reference image) self.vis_img = None
[docs] def load_sky_map(self, file_name, img_attr="ref_img"): """ Load reference image and WCS from passed fits file Parameters ---------- filename : str File name of image FITS img_attr: str, optional Class attribute to store the image in, 'ref_img' or 'vis_img'. Default is 'ref_img' Returns ------- None """ logger.debug(f"Loading skypmap from file: '{file_name}'") with fits.open(file_name) as ff: if img_attr == "ref_img": self.ref_img = np.array(ff[0].data, dtype=np.float32) self.ref_wcs = wcs.WCS(ff[0].header) if img_attr == "vis_img": img_data = np.array(ff[0].data, dtype=np.float16) if type(self.vis_img) == type(None): self.vis_img = img_data # If second or later image append else: if self.vis_img.ndim == 2: self.vis_img = [self.vis_img] self.vis_img = np.append(self.vis_img, [img_data], axis=0)
[docs] def get_upper_limits(self): """ Calculates magnitude upper limits on non-detections to the tt_visits table. Returns ------- float array Array of upper limits for each visit """ observatory = ( self.observatory if hasattr(self, "observatory") else self.get_field_par("observatory", "tt_fields") ) upper_limit = None # Call upper limit function according to observatory. # String matching is case insensitive if observatory.casefold() == "GALEX".casefold(): upper_limit = GALEXField.get_visit_upper_limits(self.tt_visits) return upper_limit
[docs] def set_light_curve(self, add_upper_limits=True): """ Adds detections information into ta_sources_lc for sources listed in self.tt_sources Parameters ---------- add_upper_limits : bool, optional Add upper limits to the tt_sources_lc, for visits with no detection. Upper limits are stored in the flux_err columns for none detections. The default is True. Returns ------- None """ logger.info( "Creating light curve table. " f"Upper limits option set to {add_upper_limits}." ) # Selection sel = self.tt_detections["sel"] # Prepare visit info nr_vis = len(self.tt_visits) self.tt_visits.add_index("vis_id") # Dictionary to collect lightcurve data tdata = { "fd_src_id": list(), "flux": list(), "flux_err": list(), "ul": list(), "time_bin_start": list(), "time_bin_size": list(), "ra": list(), "dec": list(), "pos_err": list(), } # Loop over sources and add them to dictionary tt_det_grp = self.tt_detections[sel].group_by(["fd_src_id"]) for tt_det in tt_det_grp.groups: # Add src id fd_src_id = tt_det["fd_src_id"][0] tdata["fd_src_id"].append(fd_src_id.tolist()) # Add flux and errors, array has length of Nr of visits. # Values is 0, except for detections. vis_idxs = self.tt_visits.loc_indices["vis_id", tt_det["vis_id"]] det_vars = ["flux", "flux_err", "ra", "dec", "pos_err"] for det_var in det_vars: np_var = np.zeros(nr_vis) - 1 np_var[vis_idxs] = tt_det[det_var] tdata[det_var].append(np_var.tolist()) tdata["time_bin_start"].append( np.array(self.tt_visits["time_bin_start"]).tolist() ) tdata["time_bin_size"].append( np.array(self.tt_visits["time_bin_size"]).tolist() ) # Store upper limits if no detection in a visit # TODO: make this more general and not GALEX specific if add_upper_limits: np_flux_ul = self.get_upper_limits() tdata["ul"].append(np_flux_ul.tolist()) # Add light curve table, convert type to allow for variable length array tdata["flux"] = np.array(tdata["flux"], dtype=np.object_) tdata["flux_err"] = np.array(tdata["flux_err"], dtype=np.object_) tdata["ul"] = np.array(tdata["ul"], dtype=np.object_) tdata["time_bin_start"] = np.array(tdata["time_bin_start"], dtype=np.object_) tdata["time_bin_size"] = np.array(tdata["time_bin_size"], dtype=np.object_) self.add_table(tdata, "base_field:ta_sources_lc")
[docs] def remove_double_visit_detections(self): """ Remove multiple detections of one source in one visit from tt_detections. Keep only the closest detection. Returns ------- list List of removed detection IDs """ logger.debug("Scanning for doubled visit detections") # Selection sel = self.tt_detections["sel"] # Index tables self.tt_sources.add_index("fd_src_id") self.tt_detections.add_index("det_id") # Determine detection_id of srcs with more than one detection in one visit rm_det_ids = [] tt_det_grp = self.tt_detections[sel].group_by(["vis_id", "fd_src_id"]) for tt_det in tt_det_grp.groups: if len(tt_det) > 1: # Get source coordinate fd_src_id = tt_det["fd_src_id"].data[0] fd_src_idx = self.tt_sources.loc_indices["fd_src_id", fd_src_id] src_ra = self.tt_sources["ra"].quantity[fd_src_idx] src_dec = self.tt_sources["dec"].quantity[fd_src_idx] src_coord = SkyCoord(src_ra, src_dec, frame="icrs") # Determine distance det_coords = SkyCoord( tt_det["ra"].quantity, tt_det["dec"].quantity, frame="icrs" ) sep = det_coords.separation(src_coord).to(uu.arcsec) min_sep_idx = np.where(sep == np.amin(sep))[0][0] # Save all detection but the closest for deletion rm_det_ids.extend(tt_det["det_id"].data[:min_sep_idx]) rm_det_ids.extend(tt_det["det_id"].data[min_sep_idx + 1 :]) if len(rm_det_ids) > 0: nr_rm_det = len(rm_det_ids) perc_rm_det = 100 * nr_rm_det / len(self.tt_detections[sel]) logger.warning( f"Removed double-visit detections: {nr_rm_det} ({perc_rm_det: .4f} %)" ) # remove the doubled detections from tt_detections rm_idx = self.tt_detections.loc_indices["det_id", rm_det_ids] self.tt_detections.remove_rows(rm_idx) # Update detection count in tt_sources sel = self.tt_detections["sel"] fd_src_ids, det_cts = np.unique( self.tt_detections[sel]["fd_src_id"], return_counts=True ) self.tt_sources.replace_column("nr_det", det_cts) return rm_det_ids
[docs] def set_field_attr(self, dd_names=None): """ Sets the most important field parameters as class attributes. Parameters ---------- dd_names : dict, None List of strings containing the parameter names. If None (default), a set of pre-defined parameters is set. Returns ------- None """ if dd_names is None: dd_names = self._dd_attr_names for table in dd_names: for par in dd_names[table]: setattr(self, par, self.get_field_par(par, table)) logger.debug(f"Set attributes: {self._dd_attr_names}")
[docs] def get_field_par(self, par_name, table_name): """ Returns metadata parameter ``par_name`` derived from astropy table data ``table_name``. Parameter --------- par_name : str Name of the parameter to be read either directly or calculated from table data. table_name : str Name of table data. Must be initialized as class attribute. Returns ------- str, None None is returned if either ``par_name`` or ``table_name`` are not available. In this case warnings are issued to logging. Raises ------ AssertionError If :attr: `~vasca.field.BaseField.tt_fields` has not exactly one row """ # Check data availability # Returns none if table is unknown if table_name not in self.__dict__: logger.warning( f"Data '{table_name}' not available. " f"Returning None for parameter '{par_name}'." ) return None # Consistency checks # tt_fields must be single-rowed if table_name == "tt_fields" and len(self.tt_fields) != 1: raise ValueError( "Inconsistent data. Expeted single-rowed " f"field metadata table 'tt_fields', got {len(self.tt_fields)}." ) # Indirectly derived parameters if par_name == "center": par = SkyCoord( self.get_field_par("ra", "tt_fields"), self.get_field_par("dec", "tt_fields"), frame="icrs", ) elif par_name == "nr_vis": par = len(self.tt_visits) elif par_name == "time_bin_size_sum": par = self.tt_visits["time_bin_size"].sum() * uu.s elif par_name == "time_start": par = Time(self.tt_visits["time_bin_start"][0], format="mjd") elif par_name == "time_stop": par = Time( self.tt_visits["time_bin_start"][-1] + self.tt_visits["time_bin_size"][-1], format="mjd", ) # Directly derived parameters # Return None if parameter is not in table elif par_name not in getattr(self, table_name).colnames: logger.warning( f"Unknown parameter in data set '{table_name}'. " f"Known parameters are {self.__dict__[table_name].colnames}. " f"Returning None for parameter '{par_name}'." ) par = None else: # retrieve parameter par = self.tt_fields[par_name].data[0] # Maintains unit # Some combinations of unit and dtype need special handling unit = self.tt_fields[par_name].unit dtype_kind = self.tt_fields[par_name].dtype.kind logger.debug(f"Getting parameter '{par_name}': {par}, {unit}, {dtype_kind}") # check (byte-)string or unicode if dtype_kind in ["U", "S"]: par = par.decode("utf-8") # check uu.dimensionless_unscaled and integer elif (unit == "" or unit is None) and dtype_kind in ["i", "u"]: par = int(par) else: par = par * unit return par
[docs] def get_sky_region(self): """ Get region on the sky of the field. Returns ------- regions.SkyRegion Region on the sky of the field. """ if self.observatory.casefold() in ["GALEX".casefold(), "GALEX_DS".casefold()]: return CircleSkyRegion(center=self.center, radius=self.fov_diam / 2.0) else: logger.warning(f"No region known for observatory {self.observatory}") return None
[docs] def load_from_fits(self, file_name="tables.fits"): """ Loads field from a fits file and sets field attributes. Parameters ---------- file_name : str, optional File name. The default is "field_default.fits". Returns ------- None """ super().load_from_fits(file_name) self.set_field_attr()
[docs] class GALEXField(BaseField): """ Instance of one GALEX field """ def __init__(self, obs_id, obs_filter=None, data_path=None, visits_data_path=None): """ Initializes a new GALEXField instance with skeleton VASCA data structure. Parameters ---------- obs_id : int GALEX field ID obs_filter : str, optional Selects the GALEX obs_filter for which the corresponding observation data is loaded. Needs to be either from: 'FUV' -> 135-175 nm 'NUV' -> 175-280 nm (default) data_path : str, optional Path to root location of cloud-synced data associated with the GALEX field. Defaults to a path given by the resource manager. visits_data_path : str, optional Path to a pre-downloaded table holding the complete list of GALEX visits. Defaults to a path given by the resource manager. Attributes ---------- data_path : str Path to root location of cloud-synced data associated with the GALEX field visits_data_path : str Path to a pre-downloaded table holding the complete list of GALEX visits vasca_file_prefix : str File name prefix following VASCA naming convention: 'VASCA_<observatory>_<filed_id>_<obs_filter>' """ # check obs_filter name, default is "NUV" if ( obs_filter is None ): # TODO: I think this is redundant, the default should then be "NUV" obs_filter = "NUV" elif not isinstance(obs_filter, str): raise TypeError( f"Expected argument of type 'str', got '{type(obs_filter).__name__}'." ) elif obs_filter not in ["NUV", "FUV"]: raise ValueError( f"Unknown obs_filter '{obs_filter}'. " "Available filters for GALEX are 'NUV' or 'FUV'." ) # Sets skeleton super().__init__() # Set root location of cloud-synced data associated with the field if data_path is not None: # Sets the data path following the pattern <root_path>/<obs_id> # If only <root_path> is passed, <obs_id> will be added self.data_path = ( data_path if data_path.rstrip(os.sep).endswith(str(obs_id)) else f"{data_path}/{obs_id}" ) if visits_data_path is not None: self.visits_data_path = visits_data_path if data_path is None or visits_data_path is None: with ResourceManager() as rm: # Path to read and write data relevant to the pipeline if data_path is None: self.data_path = ( rm.get_path("gal_fields", "sas_cloud") + "/" + str(obs_id) ) # Path to a pre-downloaded table holding # the complete list of GALEX visits if visits_data_path is None: self.visits_data_path = rm.get_path("gal_visits_list", "sas_cloud") # Create and check existence of directory # that holds field data and VASCA outputs os.makedirs(self.data_path, exist_ok=True) # File name prefix for VASCA/GALEXField outputs self.vasca_file_prefix = f"VASCA_GALEX_{obs_id}_{obs_filter}" logger.debug(f"Field data path set to: '{self.data_path}'") logger.debug(f"Visits data path set to: '{self.visits_data_path}'")
[docs] @classmethod def from_VASCA(cls, obs_id, obs_filter="NUV", fits_path=None, **kwargs): """ Constructor to initialize a GALEXField instance from a VASCA-generated FITS file Parameters ---------- obs_id : int GALEX field ID obs_filter : str, optional Selects the GALEX filter for which the corresponding observation data is loaded. Needs to be either from: 'FUV' -> 135-175 nm 'NUV' -> 175-280 nm (default) fits_path : str, optional Path to the fits file. Defaults to a path handled by ResourceManager. **kwargs All additional keyword arguments are passed to `~GALEXField.__init__()` Returns ------- vasca.field.GALEXField """ # Bootstrap the initialization procedure using the base class gf = cls(obs_id, obs_filter, **kwargs) # new GALEXField instance if fits_path is None: # Construct the file name from field ID and filter fits_path = f"{gf.data_path}/{gf.vasca_file_prefix}_field_data.fits" # Check if file exists if not os.path.isfile(fits_path): raise FileNotFoundError( "Wrong file or file path to VASCA data " f"for GALEX field '{obs_id}' with obs_filter '{obs_filter}'." ) else: # Reads the VASCA-generated field data gf.load_from_fits(fits_path) # Sets convenience class attributes gf.set_field_attr() # field_id = get_field_id( # obs_field_id=obs_id, observatory="GALEX", obs_filter=obs_filter # ) # # Check consistency # if not gf.field_id == field_id: # raise ValueError( # "Inconsistent data: Missmatch for 'field_id'. " # f"Expected '{obs_id}' but got '{gf.field_id}' " # f"from file '{fits_path.split(os.sep)[-1]}." # ) # elif not gf.obs_filter == obs_filter: # raise ValueError( # "Inconsistent data: Missmatch for 'obs_filter'. " # f"Expected '{obs_filter}' but got '{gf.obs_filter}' " # f"from file '{fits_path.split(os.sep)[-1]}." # ) # logger.info( # f"Loaded VASCA data for GALEX field '{obs_id}' " # f"with obs_filter '{obs_filter}'." # ) return gf
[docs] @classmethod def from_MAST( cls, obs_id, obs_filter="NUV", refresh=False, load_products="TABLES", write=True, **kwargs, ): """ Constructor to initialize a GALEXField instance either fresh from the MAST archive (refresh=True) or if available from cached raw data (refresh=False). The procedure uses vasca.resource_manager.ResourceManager to handle file locations. Parameters ---------- obs_id : int GALEX field ID obs_filter : str, optional Selects the GALEX obs_filter for which the corresponding observation data is loaded. Needs to be either from: 'FUV' -> 135-175 nm 'NUV' -> 175-280 nm (default) refresh : bool, optional Selects if data is freshly loaded from MAST (refresh=True) or from cashed data on disc (refresh=False, default). load_products : str, optional Selects if data products shall be loaded: "NONE", "TABLES" for tables, and reference image, "ALL" for tables and visit images. write: bool, optional If load_products is enabled, stores the data as VASCA tables in the cloud for faster loading in the future. Default is True. **kwargs All additional keyword arguments are passed to `~GALEXField.__init__()` Returns ------- vasca.field.GALEXField Returns None if no entry is found """ # Checks if not isinstance(refresh, bool): raise TypeError(f"Expected boolean argument, got {type(refresh).__name__}.") # Bootstrap the initialization procedure using the base class gf = cls(obs_id, obs_filter, **kwargs) # new GALEXField instance # Sets ``gf.tt_fields`` gf._load_galex_field_info(obs_id, obs_filter, refresh=refresh) # If no field entries returned return None if len(gf.tt_fields) == 0: logger.warning(f"No field found for ID {obs_id} and filter {obs_filter}") return None # Sets ``gf.tt_visits`` gf._load_galex_visits_info(obs_id, obs_filter) # Sets convenience class attributes gf.set_field_attr() # Sets ``gf.tt_detections``, ``gf.tt_coadd_detections`` and loads the ref image if load_products != "NONE": if load_products == "ALL": gf._load_galex_archive_products( obs_id, obs_filter, refresh=refresh, ref_maps_only=False ) elif load_products == "TABLES": gf._load_galex_archive_products( obs_id, obs_filter, refresh=refresh, ref_maps_only=True ) else: logger.warning(f"load_product option not valid: {load_products}") if write: fits_name = f"{gf.data_path}/{gf.vasca_file_prefix}_field_data.fits" gf.write_to_fits(fits_name) meta_only = ", metadata-only." if load_products == "NONE" else "." logger.info( f"Loaded new GALEX field '{obs_id}' with obs_filter '{obs_filter}'" f"from MAST data {meta_only}" ) return gf
[docs] @staticmethod def load( gfield_id, obs_filter="NUV", method="MAST_LOCAL", load_products="TABLES", **field_kwargs, ): """ Loads GALEX field data according to a given method and returns a GALEXField instance. Parameters ---------- gfield_id : int GALEX field ID obs_filter : str, optional Selects the GALEX obs_filter for which the corresponding observation data is loaded. Needs to be either from: 'FUV' -> 135-175 nm 'NUV' -> 175-280 nm (default) method : str, optional Specification of the load method. Four methods are implemented: MAST_REMOTE: Standard method to query data from MAST archives. This requires an internet connection to the MAST servers. Local data will be overwritten MAST_LOCAL: Dafault. Builds a new GALEXfield instance based on MAST archival data cached on local disc. If no data is found, the fallback is MAST_REMOTE. VASCA: Builds a new GALEXField based on a VASCA-generated field data file. AUTO: Attempts to load field data by using VASCA as method and falls back to MAST_LOCAL if no VASCA file is found on disc. The default directory where field data availability is checked is defined by the "data_path" attribute of GALEXField and can be passed explicitly via the "field_kwargs". load_products : str, optional Selects if data products shall be loaded: "NONE", "TABLES" for tables, and reference image, "ALL" for tables and visit images. field_kwargs All additional keyword arguments are passed to the load methods `~GALEXField.from_MAST()` and `~GALEXField.from_VASCA()`, as well as to `~GALEXField.__init__()`. Raises ------ TypeError If the specified load method is not a string. ValueError If the specified load method is not one of '["mast_remote", "mast_local", "vasca", "auto"]'. String matching is case insensitive. Returns ------- vasca.field.GALEXField """ logger.info( f"Loading field '{gfield_id}' with method '{method}' for filter '{obs_filter}' and load_products '{load_products}'." ) # Checks method argument # String matching is case insensitive (converts to all to lower case). if not isinstance(method, str): raise TypeError( "Expected string type for load method specification, " f"got '{type(method).__name__}'." ) method = method.casefold() # ensures all-lower-case string method_spec = ["mast_remote", "mast_local", "vasca", "auto"] if method not in method_spec: raise ValueError( f"Expected load method specification from {method_spec}, " f"got '{method}'." ) # Sets refresh option for MAST methods if method == "mast_remote": refresh = field_kwargs.pop("refresh", True) else: refresh = field_kwargs.pop("refresh", False) # Loads field according to load method specification if method in ["mast_remote", "mast_local"]: gf = GALEXField.from_MAST( obs_id=gfield_id, obs_filter=obs_filter, refresh=refresh, load_products=load_products, **field_kwargs, ) elif method == "vasca": # removes unused options field_kwargs.pop("refresh", None) field_kwargs.pop("write", None) try: gf = GALEXField.from_VASCA( obs_id=gfield_id, obs_filter=obs_filter, **field_kwargs, ) except FileNotFoundError as e: logger.warning( f"Load method '{method}' failed due to FileNotFoundError ('{e}'). " "Falling back to method 'mast_remote'." ) gf = GALEXField.from_MAST( obs_id=gfield_id, obs_filter=obs_filter, refresh=True, load_products=load_products, **field_kwargs, ) elif method == "auto": raise NotImplementedError(f"method '{method}' not operational.") # TODO: Lookahead via rm to check data availability. # Then "VASCA" is preferred for performance reasons. # Fallback to "MAST" & refresh=True if "VASCA" fails for some reason # (e.g. not complete set of tables stored in the fits file). # Update (2023-01-11): Reevaluate if this option is still needed return gf
[docs] @staticmethod def get_visit_upper_limits(tt_visits): """ Calculates upper limits on non-detections to the tt_visits table following the article_ of Gezari et al. ApJ 766 (2013) p4 .. _article: https://iopscience.iop.org/article/10.1088/0004-637X/766/1/60 Returns ------- float array Array of upper limits for each visit """ B_sky = 3e-3 N_pix = 16 * np.pi C_app = -0.23 T_exp = tt_visits["time_bin_size"].data upper_limit = -2.5 * np.log(5 * (B_sky * N_pix / T_exp)) + C_app return upper_limit
[docs] def _load_galex_field_info(self, obs_id, obs_filter, col_names=None, refresh=False): """ Loads the archival metadata associated to a given field ID. Parameters ---------- obs_id : int GALEX field ID obs_filter : str, optional Selects the GALEX filter for which the corresponding observation data is loaded. Needs to be either from: 'FUV' -> 135-175 nm 'NUV' -> 175-280 nm (default) col_names : dict Dictionary with keys of MAST column names to load, and values the corresponding VASCA table column names. Default in None. refresh : bool, optional Selects if data is freshly loaded from MAST (refresh=True) or from cashed data on disc (refresh=False, default). """ # Uses default columns if not otherwise specified # Already sets the order in which columns are added later on if col_names is None: # values represent variables in VASCA, keys the variable names # in the MAST database col_names = { "obs_id": "field_id", "target_name": "field_name", "s_ra": "ra", "s_dec": "dec", "instrument_name": "observatory", "filters": "obs_filter", "project": "project", } elif not isinstance(col_names, dict): raise TypeError( "Expected list type for argument 'col_names', " f"got '{type(col_names).__name__}'." ) mast_col_names = list(col_names.keys()) # Path to store raw data path_tt_coadd = f"{self.data_path}/MAST_{obs_id}_{obs_filter}_coadd.fits" # Reads cached file if found on disc if not os.path.isfile(path_tt_coadd) or refresh: # download raw table logger.debug( f"Downloading archive field info and saving to {path_tt_coadd}." ) tt_coadd = Observations.query_criteria( obs_id=obs_id, dataRights="PUBLIC", instrument_name="GALEX", filters=obs_filter, dataproduct_type="image", ) # save to disc tt_coadd.write(path_tt_coadd, overwrite=True) else: # read cached logger.debug( f"Reading archive field info from cashed file '{path_tt_coadd}'" ) tt_coadd = Table.read(path_tt_coadd) # Constructs field info table logger.debug("Constructing 'tt_fields'.") # Selects columns and strip mask if tt_coadd.masked is True: tt_coadd_select = Table(tt_coadd[mast_col_names].as_array().data) else: tt_coadd_select = tt_coadd[mast_col_names] # Converts field id dtype from unicode ('U19') or S to bytestring ('S64') obsid_kind = tt_coadd_select["obs_id"].dtype.kind if obsid_kind == "U" or obsid_kind == "S": tt_coadd_select.replace_column( "obs_id", tt_coadd_select["obs_id"].astype(np.dtype("S64")), ) # Converts obs_id colimn into VASCA field_id for row in tt_coadd_select: row["obs_id"] = get_field_id( obs_field_id=row["obs_id"], observatory="GALEX", obs_filter=obs_filter ) # Convert into dictionary with correct VASCA column names dd_coadd_select = {} for col in mast_col_names: dd_coadd_select[col_names[col]] = tt_coadd_select[col].data # Add fov size info dd_coadd_select["fov_diam"] = 1.2 * np.ones(len(tt_coadd_select["obs_id"])) self.add_table(dd_coadd_select, "base_field:tt_fields")
[docs] def _load_galex_visits_info(self, obs_id, obs_filter, col_names=None): """ Loads the archival metadata associated to a given visit ID. Parameters ---------- obs_id : int GALEX field ID obs_filter : str, optional Selects the GALEX filter for which the corresponding observation data is loaded. Needs to be either from: 'FUV' -> 135-175 nm 'NUV' -> 175-280 nm (default) col_names : dict Dictionary with keys of MAST column names to load, and values the corresponding VASCA table column names. Default in None. """ logger.debug("Loading GALEX visit info :" f"{obs_id} {obs_filter} {col_names}") # Uses default columns if not otherwise specified # Already sets the order in which columns are added later on if col_names is None: mast_col_names = [ "imgRunID", "minPhotoObsDate", "nexptime" if obs_filter == "NUV" else "fexptime", "fexptime" if obs_filter == "NUV" else "nexptime", "RATileCenter", "DECTileCenter", ] vasca_col_names = [ "vis_id", "time_bin_start", "time_bin_size", "time_bin_size_alt_filt", "ra", "dec", ] col_names = dict(zip(mast_col_names, vasca_col_names)) elif not isinstance(col_names, dict): raise TypeError( "Expected list type for argument 'col_names', " f"got '{type(col_names).__name__}'." ) mast_col_names = list(col_names.keys()) # Reads cached file if found on disc logger.debug( "Reading archive visit info from cashed file " f"'{self.visits_data_path}'" ) tt_visits_raw = Table.read(self.visits_data_path) # Filters for visits corresponding to field id and selects specified columns tt_visits_raw_select = tt_visits_raw[tt_visits_raw["ParentImgRunID"] == obs_id][ mast_col_names ] # Remove zero exposure visits exp_col_name = "nexptime" if obs_filter == "NUV" else "fexptime" sel_visits = tt_visits_raw_select[exp_col_name] > 1e-10 tt_visits_raw_select = tt_visits_raw_select[sel_visits] if len(tt_visits_raw_select) == 0: logger.warning(f"No exposure for field {obs_id} and filter {obs_filter}") # Check which filters where observing obs_filter_ids = np.zeros(len(tt_visits_raw_select)) obs_filter_ids += (tt_visits_raw_select["nexptime"] > 1e-10) * dd_filter2id[ "NUV" ] obs_filter_ids += (tt_visits_raw_select["fexptime"] > 1e-10) * dd_filter2id[ "FUV" ] # Converts string time format to mjd ll_times = [ datetime.strptime(date_str.decode("utf-8"), "%m/%d/%Y %I:%M:%S %p") for date_str in tt_visits_raw_select["minPhotoObsDate"].data ] tt_visits_raw_select.update({"minPhotoObsDate": Time(ll_times).mjd}) # Convert into dictionary with correct VASCA column names dd_visits_raw_select = {} for col in mast_col_names: dd_visits_raw_select[col_names[col]] = tt_visits_raw_select[col].data dd_visits_raw_select["obs_filter_id"] = obs_filter_ids # Sets table as class attribute self.add_table(dd_visits_raw_select, "galex_field:tt_visits")
# Add filter_id # flt_ids = dd_filter2id[obs_filter] * np.ones(len(self.tt_visits)) # self.add_column("tt_visits", "obs_filter_id", flt_ids)
[docs] def _load_galex_archive_products( self, obs_id, obs_filter, col_names=None, dd_products=None, ref_maps_only=False, refresh=False, ): """ Loads the relevant data products from MAST servers and stores them using the ResourceManager. Parameters ---------- obs_id : int GALEX field ID. obs_filter : str Selects the GALEX filter for which the corresponding observation data is loaded. Needs to be either from ['NUV', 'FUV']. col_names : list, optional List of columns to store from raw data. dd_products : dict, optional Dictionary of listing the data products to be loaded. ref_maps_only : bool, optional If True, only the reference/coadd maps are loaded (default). If False, also the visit maps are included. refresh : bool, optional Set True to refresh the raw data via a MAST query. On default cashed raw data is used. """ # Path to MAST helper files path_tt_coadd = f"{self.data_path}/MAST_{obs_id}_{obs_filter}_coadd.fits" path_tt_data = f"{self.data_path}/MAST_{obs_id}_{obs_filter}_data.fits" path_tt_down = f"{self.data_path}/MAST_{obs_id}_{obs_filter}_down.ecsv" # tt_data: List of all archival data products # Reads cached file if found found on disc if not os.path.isfile(path_tt_data) or refresh: # Get cached field info for archive query input logger.debug( f"Reading archive field info from cashed file '{path_tt_coadd}'" ) tt_coadd = Table.read(path_tt_coadd) # Download logger.debug( f"Downloading archive data products list and saving to {path_tt_data}." ) # -> Todo: Create a generic decorator wrapper for Astroquery requests try: tt_data = Table( Observations.get_product_list(tt_coadd).as_array().data ) # Returns unmasked table except HTTPError as e: # Need to check its an 404, 503, 500, 403 etc. status_code = e.response.status_code # Try again if status_code == 503: logger.info( "HTTPError Astroquery response " "503 Server Error 'Service Unavailable'" ) sleep_time = 5 logger.info(f"Retrying Astroquery request in {sleep_time} s.") time.sleep(sleep_time) tt_data = Table( Observations.get_product_list(tt_coadd).as_array().data ) # Returns unmasked table # Saves to disc tt_data.write(path_tt_data, overwrite=True) else: logger.debug( f"Reading archive data products list from cashed file '{path_tt_data}'" ) tt_data = Table.read(path_tt_data) # Sets default products to download if a selection was not explicitly passed if dd_products is None: # Default: Catalog and NUV intensity map dd_products = { 0: { "name": "mcat", "file_name": "xd-mcat.fits.gz", "product_type": "catalog", }, 1: { "name": "int_map", "file_name": ( "nd-int.fits.gz" if obs_filter == "NUV" else "fd-int.fits.gz" ), "product_type": "map", }, } # Verifies if products are listed in products list product_list_missing = [ prod["file_name"] for _, prod in dd_products.items() if not any(prod["file_name"] in uri for uri in tt_data["dataURI"]) ] if not len(product_list_missing) == 0: raise ValueError( f"Missing entry for data products {product_list_missing} " "in product list." ) # Filters for products of interest aa_sel_prod = np.full(len(tt_data), False) # bool array # Product files aa_data_uri = tt_data["dataURI"].data.astype(str) # string array # obs_id aa_obs_id = tt_data["obs_id"].data.astype(int) # int array for _, prod in dd_products.items(): if prod["product_type"] == "map" and ref_maps_only: aa_sel_prod += np.logical_and( np.char.endswith(aa_data_uri, prod["file_name"]), aa_obs_id == obs_id, ) else: aa_sel_prod += np.char.endswith( aa_data_uri, prod["file_name"] ) # Logical AND operation # Download data products # tt_down: manifest of files downloaded if not os.path.isfile(path_tt_down) or refresh: # Download logger.debug( f"Downloading archive data products. Manifest saved to {path_tt_down}." ) tt_down = Observations.download_products( tt_data, download_dir=self.data_path, cache=True, dataURI=tt_data[aa_sel_prod]["dataURI"], )["Local Path", "Status"] # Saves table to disc tt_down.replace_column( "Local Path", [str(s).split(self.data_path)[1] for s in tt_down["Local Path"]], ) # Keeps path only relative to self.data_path tt_down.write(path_tt_down, overwrite=True) else: # Reading cached manifest logger.debug(f"Reading archive data product manifest from {path_tt_down}.") tt_down = Table.read(path_tt_down) # Adds column with corresponding IDs tt_down["ID"] = [int(path.split(os.sep)[-2]) for path in tt_down["Local Path"]] # Verifies completed download aa_status_mask = np.where( tt_down["Status"].data.astype(str) == "COMPLETE", True, False ) # bool array if not aa_status_mask.all(): raise ValueError( "Data products download incomplete. " "Missing products are:\n" f"{tt_down[~aa_status_mask]}" ) # Collects data products # Source catalogs aa_sel_mcat = np.char.endswith( tt_down["Local Path"].data.astype(str), "xd-mcat.fits.gz" ) # Selects only the coadd path path_tt_ref = ( self.data_path + tt_down[np.logical_and(aa_sel_mcat, tt_down["ID"] == obs_id)][ "Local Path" ][0] ) # String # Selects everything else but the coadd path path_tt_detections = [ self.data_path + path for path in tt_down[np.logical_and(aa_sel_mcat, tt_down["ID"] != obs_id)][ "Local Path" ].data.astype(str) ] # list # Opens reference catalog with warnings.catch_warnings(): warnings.simplefilter("ignore", AstropyWarning) tt_coadd_detections_raw = Table.read(path_tt_ref) # Opens visit catalogs, add columns for visit and source IDs # and stack all catalogs tt_detections_raw = None # loop over visits for path, id in zip(path_tt_detections, self.tt_visits["vis_id"]): # read new visits catalog with warnings.catch_warnings(): warnings.simplefilter("ignore", AstropyWarning) tt_vis_mcat = Table.read(path) # set initial if tt_detections_raw is None: # column visit id tt_vis_mcat.add_column( np.full(len(tt_vis_mcat), id), name="visit_id", index=0 ) # column source id tt_vis_mcat.add_column( np.full(len(tt_vis_mcat), 999999999), name="source_id", index=1 ) tt_detections_raw = tt_vis_mcat # append else: # column visit id tt_vis_mcat.add_column( np.full(len(tt_vis_mcat), id), name="visit_id", index=0 ) # column source id tt_vis_mcat.add_column( np.full(len(tt_vis_mcat), 999999999), name="source_id", index=1 ) tt_detections_raw = vstack([tt_detections_raw, tt_vis_mcat]) # clean up del tt_vis_mcat obs_filter_l = obs_filter.lower() # lower case obs_filter name # Add to tables as class attributes if col_names is None: mast_col_names = [ "visit_id", "ggoid_dec", f"{obs_filter}_ALPHA_J2000", # take band-merged quantities? f"{obs_filter}_DELTA_J2000", # take band-merged quantities? f"{obs_filter_l}_poserr", f"{obs_filter_l}_flux", f"{obs_filter_l}_fluxerr", f"{obs_filter_l}_s2n", "fov_radius", f"{obs_filter_l}_artifact", f"{obs_filter}_CLASS_STAR", "chkobj_type", f"{obs_filter}_ELLIPTICITY", f"{obs_filter}_FLUX_APER_4", f"{obs_filter}_FLUXERR_APER_4", f"{obs_filter}_FLUX_APER_3", f"{obs_filter}_FLUXERR_APER_3", f"{obs_filter}_FLUX_AUTO", "E_bv", f"{obs_filter_l}_mag", f"{obs_filter_l}_magerr", ] vasca_col_names = [ "vis_id", "det_id", "ra", "dec", "pos_err", "flux_auto", "flux_auto_err", "s2n", "r_fov", "artifacts", "class_star", "chkobj_type", "ellip_world", "flux_r60", "flux_r60_err", "flux_r38", "flux_r38_err", "flux_auto_flt", "E_bv", "mag", "mag_err", ] col_names = dict(zip(mast_col_names, vasca_col_names)) elif not isinstance(col_names, dict): raise TypeError( "Expected dict type for argument 'col_names', " f"got '{type(col_names).__name__}'." ) mast_col_names = list(col_names.keys()) # set data as class attributes # Correct for flux outside of aperture, as listed here: # http://www.galex.caltech.edu/researcher/techdoc-ch5.html acorr60 = 1.116863247 if obs_filter == "NUV" else 1.09647819 # acorr38 = 1.21338885 if obs_filter == "NUV" else 1.202264 def get_det_dict(tt_det): """Retrieve dictionary with detection/coadd_detections data, including some variables derived from other MAST variables""" # Convert into dictionary with correct VASCA column names # *** Detections dd_det = {} # Keep only entries with detections sel_s2n = tt_det[f"{obs_filter_l}_s2n"] > 0 if len(tt_det[sel_s2n]) == 0: logger.warning( f"No detection with s2n>0 in MAST for field {obs_id} filter {obs_filter}" ) for col in mast_col_names: if col not in tt_det.colnames: continue dd_det[col_names[col]] = tt_det[col][sel_s2n].data # Add size_world in arcsec dd_det["size_world"] = ( 3600 * ( tt_det[f"{obs_filter}_A_WORLD"][sel_s2n].data + tt_det[f"{obs_filter}_B_WORLD"][sel_s2n].data ) / 2 ) # Add flux aperture ratio and covert counts to Jansky for r=3.8 arcsec aperture flux # Check to avoid divisions by zero idx_fauto = np.where(dd_det["flux_auto_flt"] > 0) cts2Jy = np.zeros(len(dd_det["flux_auto_flt"])) cts2Jy[idx_fauto] = ( dd_det["flux_auto"][idx_fauto] / dd_det["flux_auto_flt"][idx_fauto] ) # Use aperture flux instead of AUTO flux, as it is more reliable for variability studies dd_det["flux"] = dd_det["flux_r60"] * cts2Jy * acorr60 dd_det["flux_err"] = dd_det["flux_r60_err"] * cts2Jy * acorr60 # Add filter_id dd_det["obs_filter_id"] = np.array( dd_filter2id[obs_filter.upper()] + np.zeros(np.sum(sel_s2n)) ) # Ratio between the flux-counts, correcting for difference in aperture idx_fr60 = np.where(dd_det["flux_r60"] > 0) dd_det["flux_app_ratio"] = ( np.zeros( len(dd_det["flux_r60"]), dtype=dd_vasca_columns["flux_app_ratio"]["dtype"], ) + dd_vasca_columns["flux_app_ratio"]["default"] ) dd_det["flux_app_ratio"][idx_fr60] = ( dd_det["flux_r38"][idx_fr60] / dd_det["flux_r60"][idx_fr60] ) return dd_det # *** detections dd_detections_raw = get_det_dict(tt_detections_raw) self.add_table(dd_detections_raw, "galex_field:tt_detections") logger.debug("Constructed 'tt_detections'.") # *** Coadds dd_coadd_detections_raw = get_det_dict(tt_coadd_detections_raw) self.add_table(dd_coadd_detections_raw, "galex_field:tt_coadd_detections") logger.debug("Constructed 'tt_coadd_detections'.") # *** Intensity maps aa_sel_int_map = np.char.endswith( tt_down["Local Path"].data.astype(str), "nd-int.fits.gz" if obs_filter == "NUV" else "fd-int.fits.gz", ) # Selects the reference/coadd path path_int_map_ref = ( self.data_path + tt_down[np.logical_and(aa_sel_int_map, tt_down["ID"] == obs_id)][ "Local Path" ][0] ) # string self.load_sky_map(path_int_map_ref) if not ref_maps_only: # Selects everything else but the coadd path path_int_map_visits = [ self.data_path + path for path in tt_down[ np.logical_and(aa_sel_int_map, tt_down["ID"] != obs_id) ]["Local Path"].data.astype(str) ] # list for vis_img_name in path_int_map_visits: self.load_sky_map(vis_img_name, "vis_img")
[docs] class GALEXDSField(BaseField): """ VASCA implementation of GALEX drift-scan mode observations The archival data is expected to be downloaded separately. """ def __init__(self, field_name, data_path=None, visits_data_path=None, **kwargs): """ Initializes a new GALEXDSField instance with skeleton VASCA data structure. Parameters ---------- field_name : str GALEX drift-scan field name data_path : str, optional Path to root location of data associated with the GALEX field. Defaults to a path given by the resource manager. visits_data_path : str, optional Path to a pre-downloaded table holding the complete list of GALEX DS visits. Defaults to a path given by the resource manager. Attributes ---------- data_path : str Path to root location of data associated with the GALEX field visits_data_path : str Path to a pre-downloaded table holding the complete list of GALEX DS visits vasca_file_prefix : str File name prefix following VASCA naming convention: 'VASCA_<observatory>_<filed_id>_<obs_filter>' """ # Sets skeleton super().__init__() # Parse field name (e.g. "29208-KEPLER_SCAN_009_sv03") scan_name = field_name.split("_sv")[0] field_id = name2id(field_name, bits=64) # Set root location of data associated with the field if data_path is None or visits_data_path is None: with ResourceManager() as rm: # Path to read and write data relevant to the pipeline if data_path is None: self.data_path = ( f'{rm.get_path("gal_ds_fields", "lustre")}/' f"{scan_name}/{field_name}" ) else: self.data_path = data_path # Path to a pre-downloaded table holding # the complete list of GALEX visits if visits_data_path is None: self.visits_data_path = rm.get_path("gal_ds_visits_list", "lustre") else: self.visits_data_path = visits_data_path # Checks if field name exists if not field_name in Table.read(self.visits_data_path)["field_name"]: raise ValueError( f"Unkown field name '{field_name}'. " f"No matching entry in visits table ('{self.visits_data_path}') " ) # Create and check existence of directory # that holds field data and VASCA outputs os.makedirs(self.data_path, exist_ok=True) # Checks if visits table is available if not os.path.isfile(self.visits_data_path): raise FileNotFoundError( "GALEX drift scan visits table not found at " f"'{self.visits_data_path}'." ) # Create name-ID map for field and visits logger.debug(f"Creating name-ID map for field '{field_name}'.") tt_name_id = Table.read(self.visits_data_path) tt_name_id = tt_name_id[tt_name_id["field_name"] == field_name][ "field_name", "field_id", "vis_name", "vis_id" ] self.tt_name_id = tt_name_id # File name prefix for VASCA/GALEXField outputs self.vasca_file_prefix = f"VASCA_GALEX-DS_{field_id}_NUV" logger.debug(f"Field data path set to: '{self.data_path}'") logger.debug(f"Visits data path set to: '{self.visits_data_path}'")
[docs] def name_id_map(self, a): """ Return the field/visit ID if the name is supplied and vice versa. Note its not necessary to specify if identifier corresponds to the field or visit since names/IDs are unique. Parameters ---------- a : str, int Input name (if string) or ID (if integer) Returns ------- str, int Output name or ID """ # Gets field ID and name field_id = np.unique(self.tt_name_id["field_id"])[0] field_name = np.unique(self.tt_name_id["field_name"])[0] # Checks if input is name or ID is_name = False is_id = False if isinstance(a, str): is_name = True elif isinstance(a, int): is_id = True out = None # Name -> return ID if is_name: # Field if a == field_name: out = field_id # Visit else: try: out = self.tt_name_id[self.tt_name_id["vis_name"] == a]["vis_id"][0] except IndexError as e: logger.exception( f"No match for name '{a}'. Returning None. Exception:\n" f"{type(e).__name__}: {e}" ) # ID -> return name elif is_id: # Field if a == field_id: out = field_name # Visit else: try: out = self.tt_name_id[self.tt_name_id["vis_id"] == a]["vis_name"][0] except IndexError as e: logger.exception( f"No match for ID '{a}'. Returning None. Exception:\n" f"{type(e).__name__}: {e}" ) # Wrong input type else: logger.warning( f"Expected string or integer type for input {a}," f" got ({type(a).__name__}'). Returning None." ) return out
[docs] def _load_info(self, field_name): """ Load field and associated visits information. Parameters ---------- field_name : str GALEX field name. """ # Central visits table tt_visits_full = Table.read(self.visits_data_path) # Constructs field info table logger.debug("Constructing 'tt_fields'.") # Create field information table # Map table keys to VASCA-table column names col_names_map = { "field_id": "field_id", "field_name": "field_name", "RA_CENT": "ra", "DEC_CENT": "dec", "observatory": "observatory", "obs_filter": "obs_filter", } col_names = list(col_names_map.keys()) # Selects columns and filters for field ID tt_visits_select = tt_visits_full[tt_visits_full["field_name"] == field_name] tt_visits_select = tt_visits_select[col_names] tt_fields = unique(tt_visits_select) # Checks if resulting table has only one row if not len(tt_fields) == 1: raise ValueError( "Expected single-row table for tt_fields, " f"got {len(tt_fields)}, indicating inconsistent gloabl visits info." ) # Converts field_id into VASCA field_id tt_fields.replace_column( "field_id", [ get_field_id( obs_field_id=tt_fields["field_id"][0], observatory="GALEX_DS", obs_filter="NUV", ) ], ) # Convert into dictionary with correct VASCA column names dd_fields = {} for col in col_names: dd_fields[col_names_map[col]] = tt_fields[col].data # Add FoV size info dd_fields["fov_diam"] = [3.0] # Not defined for drift scan fields # Add table as class attribute self.add_table(dd_fields, "base_field:tt_fields") # Constructs field info table logger.debug("Constructing 'tt_visits'.") # Map table keys to VASCA-table column names col_names_map = { "vis_id": "vis_id", "time_bin_start": "time_bin_start", "NEXPTIME": "time_bin_size", "RA_CENT": "ra", "DEC_CENT": "dec", } col_names = list(col_names_map.keys()) # Selects columns and filters for field ID tt_visits_select = tt_visits_full[tt_visits_full["field_name"] == field_name] tt_visits_select = tt_visits_select[col_names] # Convert into dictionary with correct VASCA column names dd_visits_select = {} for col in col_names: dd_visits_select[col_names_map[col]] = tt_visits_select[col].data # Add filter info n_visits = len(tt_visits_select) dd_visits_select["obs_filter_id"] = [dd_filter2id["NUV"]] * n_visits # Sets table as class attribute self.add_table(dd_visits_select, "galex_field:tt_visits") # Sets convenience class attributes self.set_field_attr() # Cross-check IDs # Field ID if not int(self.field_id[3:]) == name2id(field_name, bits=64): raise ValueError( "Inconsistent field ID. " f"Expected {name2id(field_name, bits=64)} for name '{field_name}', " f"got {self.field_id[3:]}." ) # Visit IDs if not all( [ vis_id0 == vis_id1 for vis_id0, vis_id1 in zip( self.tt_visits["vis_id"], tt_visits_select["vis_id"] ) ] ): raise ValueError( "Inconsistent visit IDs. Missmatch between GALEXDSField.tt_visit " "and global DS visits table." )
[docs] def _load_data_products( self, field_name, ref_maps_only=False, ): """ Loads the relevant data products form storage. Parameters ---------- field_name : str GALEX field name. ref_maps_only : bool, optional If True, only the reference/coadd maps are loaded (default). If False, also the visit maps are included. """ # Specifies required columns for VASCA # Maps table columns to VASCA-table column names col_names_map = { "vis_id": "vis_id", "ggoid_dec": "det_id", "alpha_j2000": "ra", "delta_j2000": "dec", "nuv_poserr": "pos_err", "nuv_flux": "flux_auto", "nuv_fluxerr": "flux_auto_err", "nuv_s2n": "s2n", "fov_radius": "r_fov", "nuv_artifact": "artifacts", "NUV_CLASS_STAR": "class_star", "chkobj_type": "chkobj_type", "NUV_ELLIPTICITY": "ellip_world", "NUV_FLUX_APER_4": "flux_r60", "NUV_FLUXERR_APER_4": "flux_r60_err", "NUV_FLUX_APER_3": "flux_r38", "NUV_FLUXERR_APER_3": "flux_r38_err", "NUV_FLUX_AUTO": "flux_auto_flt", "E_bv": "E_bv", "nuv_mag": "mag", "nuv_magerr": "mag_err", } col_names = list(col_names_map.keys()) # Visit detections # Opens and stacks all visit-detection catalogs, adds column for visit IDs, logger.debug(f"Loading visit-detection catalogs for field '{self.field_name}'.") # Gets paths to all mcat (visit-detection catalogs) files # via the global visits table. This ensures to exclude bad quality visits # Central visits info table tt_visits_full = Table.read(self.visits_data_path) tt_visits_select = tt_visits_full[ tt_visits_full["field_name"] == self.field_name ] mcat_vis_paths = [ glob(f"{self.data_path}{os.sep}{vis_name}/*xd-mcat.fits")[0] for vis_name in tt_visits_select["vis_name"] ] # Loops over visits to read mcat files for i, (mcat_path, vis_id) in enumerate( zip(mcat_vis_paths, self.tt_visits["vis_id"]) ): # Reads visit detections catalog with warnings.catch_warnings(): warnings.simplefilter("ignore", AstropyWarning) tt_mcat = Table.read(mcat_path) # Initializes table in first iteration # Column selection is applied if i == 0: tt_mcat.add_column( np.full(len(tt_mcat), vis_id), name="vis_id", index=0 ) tt_detections_raw = tt_mcat # Appends all catalogs that follow else: tt_mcat.add_column( np.full(len(tt_mcat), vis_id), name="vis_id", index=0 ) tt_detections_raw = vstack([tt_detections_raw, tt_mcat]) # Clean-up del tt_mcat # Modify and augment visit-detections table # and convert into dictionary with correct VASCA column names dd_det = {} # Keep only entries with detections, check that by looking at s2n column sel_s2n = tt_detections_raw["nuv_s2n"] > 0 # Warn in cases where there is a filed entry in MAST without detections if len(tt_detections_raw[sel_s2n]) == 0: logger.warning( "No detection with s2n>0 in MAST for " f"field {self.tt_visits['vis_name']}." ) # Collect all columns that don't need modifications for mast_col in col_names: if mast_col not in tt_detections_raw.colnames: continue else: vasca_col = col_names_map[mast_col] dd_det[vasca_col] = tt_detections_raw[mast_col][sel_s2n].data # Correct flux flux outside of aperture, as listed here: # http://www.galex.caltech.edu/researcher/techdoc-ch5.html acorr60 = 1.116863247 # 6.0 arcsec correction factor for NUV filter # Add flux aperture ratio and covert counts to Jansky # for r=3.8 arcsec aperture flux # Check to avoid divisions by zero idx_fauto = np.where(dd_det["flux_auto_flt"] > 0) cts2Jy = np.zeros(len(dd_det["flux_auto_flt"])) cts2Jy[idx_fauto] = ( dd_det["flux_auto"][idx_fauto] / dd_det["flux_auto_flt"][idx_fauto] ) # Use aperture flux instead of AUTO flux, # as it is more reliable for variability studies dd_det["flux"] = dd_det["flux_r60"] * cts2Jy * acorr60 dd_det["flux_err"] = dd_det["flux_r60_err"] * cts2Jy * acorr60 # Add filter_id dd_det["obs_filter_id"] = np.array( dd_filter2id["NUV"] + np.zeros(np.sum(sel_s2n)) ) # Ratio between the flux-counts, correcting for difference in aperture idx_fr60 = np.where(dd_det["flux_r60"] > 0) dd_det["flux_app_ratio"] = ( np.zeros( len(dd_det["flux_r60"]), dtype=dd_vasca_columns["flux_app_ratio"]["dtype"], ) + dd_vasca_columns["flux_app_ratio"]["default"] ) dd_det["flux_app_ratio"][idx_fr60] = ( dd_det["flux_r38"][idx_fr60] / dd_det["flux_r60"][idx_fr60] ) # Compute geometric sizes of flux profile from average quantity in arcsec dd_det["size_world"] = ( 3600 * ( tt_detections_raw["NUV_A_WORLD"][sel_s2n].data + tt_detections_raw["NUV_B_WORLD"][sel_s2n].data ) / 2 ) # Create VASCA visit-detections table self.add_table(dd_det, "galex_field:tt_detections") del tt_detections_raw # Co-add/reference image logger.debug( "Creating reference/coadd sky-map for field " f"'{self.field_name}'" ) # Gets paths to intensity map FITS file coadd_int_file_name = glob(f"{self.data_path}{os.sep}*-nd-int-coadd.fits.gz")[0] # Loads the reference intensity map self.load_sky_map(coadd_int_file_name) # Loads visit intensity maps if requested logger.debug("Loading all visit-level sky maps.") if not ref_maps_only: visit_int_file_names = [ glob(f"{self.data_path}{os.sep}{vis_name}/*nd-int.fits.gz")[0] for vis_name in tt_visits_select["vis_name"] ] for path in visit_int_file_names: self.load_sky_map(path, "vis_img")
[docs] @classmethod def from_VASCA(cls, field_name, fits_path=None, **kwargs): """ Constructor to initialize a GALEXField instance from a VASCA-generated FITS file Parameters ---------- field_name : int GALEX field ID fits_path : str, optional Path to the fits file. Defaults to a path handled by ResourceManager. **kwargs All additional keyword arguments are passed to `~GALEXField.__init__()` Returns ------- vasca.field.GALEXField """ # Bootstrap the initialization procedure using the base class fd = cls(field_name, **kwargs) # new GALEXField instance if fits_path is None: # Construct the file name from field ID and filter fits_path = f"{fd.data_path}/{fd.vasca_file_prefix}_field_data.fits" # Check if file exists if not os.path.isfile(fits_path): raise FileNotFoundError( "Wrong file or file path to VASCA data " f"for GALEX drift scan field '{field_name}'." ) else: # Reads the VASCA-generated field data fd.load_from_fits(fits_path) # Sets convenience class attributes fd.set_field_attr() return fd
[docs] @classmethod def from_MAST( cls, field_name, load_products="TABLES", write=True, **kwargs, ): """ Constructor to initialize a GALEXDSField instance from pre-downloaded MAST archival data. The procedure uses vasca.resource_manager.ResourceManager to handle file locations. Parameters ---------- field_name : str GALEX drift scan field name load_products : str, optional Selects if data products shall be loaded: "NONE", "TABLES" for tables, and reference image, "ALL" for tables and visit images. write: bool, optional If load_products is enabled, stores the data as VASCA tables in the cloud for faster loading in the future. Default is True. **kwargs All additional keyword arguments are passed to `~GALEXField.__init__()` Returns ------- vasca.field.GALEXDSField Returns None if no entry is found """ # Initialize field instance fd = cls(field_name, **kwargs) # Gets basic information of field and associated visits try: fd._load_info(field_name) except Exception as e: logger.exception( f"Could not load GALEX drift scan field: {field_name}. Exception:\n" f"{type(e).__name__}: {e}" ) return None # Gets detections list and intensity maps if load_products != "NONE": if load_products == "ALL": fd._load_data_products(field_name, ref_maps_only=False) elif load_products == "TABLES": fd._load_data_products(field_name, ref_maps_only=True) else: logger.warning(f"load_product option not valid: {load_products}") if write: fits_name = f"{fd.data_path}/{fd.vasca_file_prefix}_field_data.fits" fd.write_to_fits(fits_name) meta_only = " (metadata only)" if load_products == "NONE" else "" logger.info( f"Finished loading new GALEX drift scan field '{field_name}'{meta_only} " f"from storage ({fd.data_path})." ) return fd
[docs] @staticmethod def load( field_name, method="MAST_LOCAL", load_products="TABLES", **field_kwargs, ): """ Loads GALEX field data according to a given method and returns a GALEXField instance. Parameters ---------- field_name : int GALEX drift scan field name method : str, optional Specification of the load method. Four methods are implemented: MAST_REMOTE: Standard method to query data from MAST archives. This requires an internet connection to the MAST servers. Local data will be overwritten MAST_LOCAL: Dafault. Builds a new GALEXfield instance based on MAST archival data cached on local disc. If no data is found, the fallback is MAST_REMOTE. VASCA: Builds a new GALEXField based on a VASCA-generated field data file. AUTO: Attempts to load field data by using VASCA as method and falls back to MAST_LOCAL if no VASCA file is found on disc. The default directory where field data availability is checked is defined by the "data_path" attribute of GALEXField and can be passed explicitly via the "field_kwargs". load_products : str, optional Selects if data products shall be loaded: "NONE", "TABLES" for tables, and reference image, "ALL" for tables and visit images. field_kwargs All additional keyword arguments are passed to the load methods `~GALEXField.from_MAST()` and `~GALEXField.from_VASCA()`, as well as to `~GALEXField.__init__()`. Raises ------ TypeError If the specified load method is not a string. ValueError If the specified load method is not one of '["mast_remote", "mast_local", "vasca", "auto"]'. String matching is case insensitive. Returns ------- vasca.field.GALEXField """ logger.info( f"Loading field '{field_name}' with method '{method}' and " f"load_products '{load_products}'." ) # Checks method argument # String matching is case insensitive (converts to all to lower case). if not isinstance(method, str): raise TypeError( "Expected string type for load method specification, " f"got '{type(method).__name__}'." ) method = method.casefold() # ensures all-lower-case string method_spec = ["mast_remote", "mast_local", "vasca", "auto"] if method not in method_spec: raise ValueError( f"Expected load method specification from {method_spec}, " f"got '{method}'." ) # Removes parameters undefined by GALEXDSField for param in ["obs_filter", "refresh"]: field_kwargs.pop(param, None) # Loads field according to load method specification if method in ["mast_remote", "mast_local"]: gf = GALEXDSField.from_MAST( field_name=field_name, load_products=load_products, **field_kwargs, ) elif method == "vasca": # removes unused options field_kwargs.pop("write", None) try: gf = GALEXDSField.from_VASCA( field_name=field_name, **field_kwargs, ) except FileNotFoundError as e: logger.warning( f"Load method '{method}' failed due to FileNotFoundError ('{e}'). " "Falling back to method 'mast_remote'." ) gf = GALEXField.from_MAST( field_name=field_name, load_products=load_products, **field_kwargs, ) elif method == "MAST-REMOTE": raise NotImplementedError( f"Method '{method}' not available for GALEX drift scan data." ) elif method == "auto": raise NotImplementedError(f"Method '{method}' not operational.") return gf