Source code for vasca.visualization

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
Visualization related methods for VASCA

from collections import OrderedDict
from itertools import cycle
import warnings

import astropy.units as uu
from astropy import constants as cc
import healpy as hpy
import matplotlib.pyplot as plt
import numpy as np
from astropy.coordinates import SkyCoord
from astropy.nddata import Cutout2D
from astropy.time import Time
from astropy.visualization.wcsaxes import SphericalCircle
from loguru import logger
from matplotlib.colors import LogNorm
from matplotlib.ticker import ScalarFormatter
import as cm
from astropy.modeling import models, fitting
from astropy.utils.exceptions import AstropyUserWarning
import astropy.constants as const

from vasca.utils import (

# %% sky plotting

[docs] def plot_sky_sources( tt_src, tt_det=None, only_selected=True, ax=None, src_id="rg_src_id", sky_region_wcs=None, draw_labels=True, src_kwargs=None, det_kwargs=None, ): """ Plot the selected sources and (optinally) the visit detections on the sky. Parameters ---------- tt_src : astropy.table.Table Source list to plot. Has to contain "ra", "dec" and srd id columns. tt_det : astropy.table.Table, optional Detection list to plot. Has to contain "ra", "dec" and src_id columns. Default is None. only_selected: bool, optional Show only selected sources. tt_src must have a "sel" column. default is True. ax : axes, optional Matplotlib axes to plot on. The default is None. src_id: str, optional Write the source ID next to its marker, saved in the passed column name, typically "rg_src_id", "fd_src_id" or None. Default is ""rg_src_id"" sky_region_wcs: (regions.SkyRegion, WCS) , optional Plot only sources within the sky region. A WCS has to be passed along ina tuple. Default is None. draw_labels : bool, optional Draw labels next to sources showing "src_iud" entry. The default is True. src_kwargs : dict, optional Keyword arguments for pyplot.plot of the sources. The default is None. det_kwargs : dict, optional Keyword arguments for pyplot.patches.Polygon of the detections. The default is None. Returns ------- matplotlib.axes Used Matplotlib axes. astropy.table.Table Sources plotted from tt_src """ logger.debug("Plotting sky sources") if ax is None: ax = plt.gca() # Show only selected sources sel = np.ones(len(tt_src), dtype=bool) if only_selected and "sel" in tt_src.colnames: sel = tt_src["sel"] tt_src = tt_src[sel] # select sources within the region sel_reg = np.ones(len(tt_src), dtype=bool) if type(sky_region_wcs) is not type(None): src_coords = SkyCoord(tt_src["ra"], tt_src["dec"], frame="icrs") sel_reg = sky_region_wcs[0].contains(src_coords, sky_region_wcs[1]) logger.debug(f"Limiting to region {sky_region_wcs[0]}") # Set marker properties for sources plt_src_kwargs = { "marker": "+", "markersize": 6.5, "alpha": 0.5, "lw": 0, "markeredgewidth": 2.0, "fillstyle": "none", "transform": ax.get_transform("world"), } if src_kwargs is not None: plt_src_kwargs.update(src_kwargs) # Set marker properties for detections plt_det_kwargs = { "fill": False, "lw": 0.8, "transform": ax.get_transform("world"), } if det_kwargs is not None: plt_det_kwargs.update(det_kwargs) plt_txt_kwargs = { "xycoords": ax.get_transform("world"), "fontsize": 12, } if type(tt_det) is not type(None): tt_det.add_index(src_id) # Loop over all srcs and plot colors = cycle("rbgcmrbgcmrbgcmrbgcm") lss = [":", "-", "--", "_."] for src, col in zip(tt_src[sel_reg], colors): # Set colors in tandem for srcs, det and label, unless specific color passed if src_kwargs is not None and "color" in src_kwargs.keys(): col = plt_src_kwargs["color"] plt_src_kwargs["color"] = col plt_det_kwargs["color"] = col plt_txt_kwargs["color"] = col if type(tt_det) is not type(None) and src[src_id] in tt_det[src_id]: det_idxs = np.array(tt_det.loc_indices[src_id, src[src_id]]).flatten() for det_idx in det_idxs: coord_det = SkyCoord( tt_det[det_idx]["ra"] * uu.deg, tt_det[det_idx]["dec"] * uu.deg, frame="icrs", ) s_det = SphericalCircle( coord_det, tt_det[det_idx]["pos_err"] * uu.arcsec, ls=lss[int(tt_det[det_idx]["obs_filter_id"] % 4)], **plt_det_kwargs, ) ax.add_patch(s_det) ax.plot(src["ra"], src["dec"], **plt_src_kwargs) # Add labels if src_id was passed if type(src_id) is not type(None) and draw_labels: ax.annotate( str(src[src_id]), (src["ra"], src["dec"]), **plt_txt_kwargs, ) return ax, tt_src[sel_reg]
[docs] def plot_field_sky_map( field, fig=None, ax=None, img_idx=-1, sky_region=None, **img_kwargs ): """ Plot the reference sky map. Parameters ---------- field: vasca.BaseField VASCA field to be plotted. fig: figure, optional Matplotlib figure to draw on, if None a new figure is created. The default is None. ax : axes, optional Matplotlib axes to plot on. The default is None. img_idx : int, optional Index nr of the visit in the tt_visits table. If -1 is passed the reference image is shown. Default is -1. sky_region: regions.SkyRegion , optional Plot only within the sky region. The field WCS will be used Default is None. **img_kwargs : dict Key word arguments for pyplot.imshow plotting. Returns ------- matplotlib.figure Matplotlib figure used to draw matplotlib.graph.AxesImage Matplotlib axes of 2D image. """ logger.debug("Plotting sky map'") if field.ref_img is None and field.vis_img is None: logger.error("No map to draw") # Setup imshow parameters plt_img_kwargs = { "interpolation": "None", "cmap": "gray_r", "origin": "lower", "norm": LogNorm(), } if img_kwargs is not None: plt_img_kwargs.update(img_kwargs) # Check if reference or visit images should be plotted fig_title = f"Field {field.field_id} reference image" plot_img = field.ref_img if img_idx > -1: # Check if visit images are there and get the right image from the cube if img_idx > len(field.tt_visits) - 2: logger.error(f"Requested image index {img_idx} is out of range") elif type(field.vis_img) == type(None): raise TypeError( f"Visit images not set, got image type '{type(field.vis_img)}'." ) elif field.vis_img.ndim == 3: plot_img = field.vis_img[img_idx, :, :] else: plot_img = field.vis_img fig_title = f"Visit {field.tt_visits[img_idx]['vis_id']} image" # Check if plotting should be restricted to one region only wcs = field.ref_wcs if type(sky_region) is not type(None): box = sky_region.to_pixel(field.ref_wcs).bounding_box cutout = Cutout2D( plot_img, position=[[1],[0]], size=[box.shape[1], box.shape[0]], wcs=field.ref_wcs, copy=True, ) plot_img = wcs = cutout.wcs # Start drawing # Check if figure was passed if type(fig) is type(None): fig = plt.figure(figsize=(8, 7)) # , constrained_layout=True if type(ax) is type(None): ax = plt.subplot(projection=wcs) ax.coords["ra"].set_major_formatter("d.dd") ax.coords["dec"].set_major_formatter("d.dd") ax.set_xlabel("RA") ax.set_ylabel("Dec") else: plt.gcf() # Check if axis was passed and setup axis if ax is None: ax = plt.gca() # Add coordinate grid ax.coords.grid(True, color="grey", ls="-", lw=0.5) ax.imshow(plot_img, **plt_img_kwargs) # if type(pix_patch) is not type(None): # graph.set_clip_path(pix_patch) ax.set_title(fig_title) return ax, wcs
# TODO: Add coordinate system match check between tt_coverage_hp and this function
[docs] def plot_region_sky_gnomeview( region, ra, dec, sel_srcs=True, gw_kwargs=None, ps_kwargs=None ): """ Plot the nr visits and optionally (selected) sources (optinally) on the sky. Parameters ---------- region : vasca.region.Region Region for which is plotted. ra : float RA of the center of the sky figure. dec: float DEC of the center of the sky figure. sel_srcs : bool, optional Show only selected sources. The default is True. gw_kwargs : dict, optional Keyword arguments for healpy.gnomview. The default is None. ps_kwargs : dict, optional Keyword arguments for healpy.projscatter. The default is None. Returns ------- matplotlib.axes Used Matplotlib axes. """ if not hasattr(region, "tt_coverage_hp"): logger.warning( "Create healpix coverage table first with region.add_coverage_hp. " "Creating it now with default values." ) region.add_coverage_hp() # Get healpix map of Nr of visits nside = region.tt_coverage_hp.meta["NSIDE"] npix = hpy.nside2npix(nside) hp_map = np.zeros(npix, dtype=np.float64) pix_ids = region.tt_coverage_hp["pix_id"].data.astype(np.int64) hp_map[pix_ids] = region.tt_coverage_hp["nr_vis"] # Plot background map plt_gw_kwargs = { "title": "Nr. of visits", "coord": "C", "reso": 10 / 60.0, "xsize": 1400, "ysize": 1400, "cmap": "gray", } if gw_kwargs is not None: plt_gw_kwargs.update(gw_kwargs) hpy.gnomview(hp_map, rot=[ra, dec], **plt_gw_kwargs) # Plots selected sources plt_ps_kwargs = {"lonlat": True, "marker": "o", "s": 0.2} if ps_kwargs is not None: plt_ps_kwargs.update(ps_kwargs) tt_srcs = region.tt_sources.group_by("rg_fd_id") for tt in tt_srcs.groups: sel = tt["sel"] if not sel_srcs: sel = np.ones(len(sel)).astype(bool) hpy.projscatter(tt[sel]["ra"], tt[sel]["dec"], **plt_ps_kwargs) # hpy.projscatter( # [ra], [dec], lonlat=True, marker="o", s=4.0 # ) # Mark center of gnomeview return plt.gca()
[docs] def plot_region_sky_mollview(region, var="nr_vis", mw_kwargs=None): """ Plot the nr visits, fields or exposure on the sky. Coverage table has to be previously created with region.add_coverage_hp. Parameters ---------- region : vasca.region.Region Region for which is plotted. coord : str Coordinate system, Galactic or ICKS var: str Variable to plot, exposure "exp", visits "nr_vis" or or fields "nr_fds" mw_kwargs : dict, optional Keyword arguments for healpy.mollview. The default is None. Returns ------- matplotlib.axes Used Matplotlib axes. """ if not hasattr(region, "tt_coverage_hp"): logger.warning( "Create healpix coverage table first with region.add_coverage_hp. " "Creating it now with default values." ) region.add_coverage_hp() # Get healpix map of Nr of visits nside = region.tt_coverage_hp.meta["NSIDE"] npix = hpy.nside2npix(nside) hp_map = np.zeros(npix, dtype=np.float32) pix_ids = region.tt_coverage_hp["pix_id"].data hp_map[pix_ids] = region.tt_coverage_hp[var] coord_sys = str(region.tt_coverage_hp.meta["COOR_SYS"]) dd_title = { "nr_vis": "Number of vists in " + coord_sys + " coord.", "nr_fds": "Number of fields in " + coord_sys + " coord.", "exp": "Exposure in " + coord_sys + " coord.", } # Plot background map plt_mw_kwargs = { "title": dd_title[var], "nest": False, "xsize": 4800, "cmap": "gist_stern", } if mw_kwargs is not None: plt_mw_kwargs.update(mw_kwargs) hpy.mollview(hp_map, **plt_mw_kwargs) return plt.gca()
# %% table variable plotting
[docs] def plot_table_hist(tt, var, ax=None, logx=False, obs_filter_id=None, **hist_kwargs): """ Plot histogram for passed astropy.table.Table and variable Parameters ---------- tt : astropy.table.Table Table containing a column with the plotted variable. var : str, optional Variable name ax : matplotlib.axes, optional Axes to draw on. The default is None. logx : bool, optional Histogram of log10(var) instead of var. The default is False. obs_filter_id: int, optional Observation filter ID Nr., if None all filters are shown. THe default is None. **hist_kwargs : dict Key word arguments passed tu plt.hist Returns ------- matplotlib.axes Axes that where used to draw. list of float Histogram bin values. list of float Histogram bins, default is [selected, all] events. """ logger.debug(f"Plotting histogram of variable '{var}'") if ax is None: ax = plt.gca() # Set marker properties for sources plot_kwargs = { "bins": "auto", "histtype": "step", "density": True, "log": True, "alpha": 0.5, } if hist_kwargs is not None: plot_kwargs.update(hist_kwargs) if obs_filter_id is not None: tt = select_obs_filter(tt, obs_filter_id) sel = tt["sel"] col = tt[var] str_nrsel = str(sel.sum()) str_nrnotsel = str((~sel).sum()) data = [np.array(col[~sel]).flatten(), np.array(col[sel]).flatten()] xlabel = var + " [" + str(col.unit) + "]" if str(col.unit) == "None" or str(col.unit) == "": xlabel = var if logx: with np.errstate(divide="ignore", invalid="ignore"): data = [np.log10(data[0]), np.log10(data[1])] xlabel = "log10( " + xlabel + " )" vals, bins, *_ = ax.hist( data, label=["unselected_" + str_nrnotsel, "selected_" + str_nrsel], **plot_kwargs, ) if obs_filter_id is not None: xlabel = xlabel + " - " + dd_id2filter[obs_filter_id] ax.set_xlabel(xlabel) ax.set_ylabel("Counts") return ax, vals, bins
[docs] def scatter_hist(x, y, ax, ax_histx, ax_histy): # no labels ax_histx.tick_params(axis="x", labelbottom=False) ax_histy.tick_params(axis="y", labelleft=False) # the scatter plot: ax.scatter(x, y) # now determine nice limits by hand: binwidth = 0.25 xymax = max(np.max(np.abs(x)), np.max(np.abs(y))) lim = (int(xymax / binwidth) + 1) * binwidth bins = np.arange(-lim, lim + binwidth, binwidth) ax_histx.hist(x, bins=bins) ax_histy.hist(y, bins=bins, orientation="horizontal")
[docs] def plot_table_scatter( tt, varx, vary, ax=None, xlim=None, ylim=None, invert_xaxis=None, invert_yaxis=None, xscale="linear", yscale="linear", obs_filter_id=None, grp_var="sel", grp_vals=None, add_projection=False, **scatter_kwargs, ): """ Plot scatter plot for passed astropy.table.Table and variables Parameters ---------- tt : astropy.table.Table Table containing columns with the plotted variables. varx : str Variable name on X-axis vary : str Variable name on Y-axis ax : matplotlib.axes, optional Axes to draw on. The default is None. xlim : list, optional List with [xmin, xmax] axis value. Default is None. ylim : list, optional List with [ymin, ymax] axis value. Default is None. xscale : str, optional Type of x-scale ("log", "linear"). Default is "linear". yscale : str, optional Type of y-scale ("log", "linear"). Default is "linear". obs_filter_id: int, optional Observation filter ID Nr., if None all filters are shown. The default is None. grp_var: str, optional Group scatter plot by colors based on this table variable. The default is "sel". If None is passed no groups will be done. add_projection: bool, optional Add histogram with projection on the sides **plot_kwargs : dict Key word arguments passed tu plt.plot Returns ------- ax : matplotlix.axes Axes that where used to draw. """ logger.debug(f"Plotting of variables '{varx}' and '{vary}'") if ax is None: ax = plt.gca() if add_projection: ax_histx = ax.inset_axes([0, 1.05, 1, 0.25], sharex=ax) ax_histy = ax.inset_axes([1.05, 0, 0.25, 1], sharey=ax) ax_histx.tick_params(axis="x", labelbottom=False) ax_histy.tick_params(axis="y", labelleft=False) if obs_filter_id is not None: tt = select_obs_filter(tt, obs_filter_id) # Set marker properties for sources plot_kwargs = { "markersize": 2.0, "linewidth": 0, "marker": "o", "markeredgewidth": 0, } if scatter_kwargs is not None: plot_kwargs.update(scatter_kwargs) if type(grp_var) == type(None): ax.plot( tt[varx], tt[vary], **plot_kwargs, ) else: tt_grp = tt.group_by(grp_var) if type(grp_vals) == type(None): grp_vals = tt_grp.groups.keys[grp_var] for grp in grp_vals: mask = ( tt_grp.groups.keys[grp_var] == grp ) # obs_by_name.groups.keys['name'] == 'M101' t_grp = tt_grp.groups[mask] pp = ax.plot( t_grp[varx], t_grp[vary], label=grp, **plot_kwargs, ) if add_projection: ccol = pp[0].get_color() binsx = 10 ** np.linspace(np.log10(xlim[0]), np.log10(xlim[1]), 30) ax_histx.hist( t_grp[varx], bins=binsx, log=True, histtype="step", color=ccol ) # binsy = 10 ** np.linspace(np.log10(ylim[0]), np.log10(ylim[1]), 30) ax_histy.hist( t_grp[vary], bins=binsy, orientation="horizontal", log=True, histtype="step", color=ccol, ) # Set labels xlabel = varx + " [" + str(tt[varx].unit) + "]" if str(tt[varx].unit) == "None" or str(tt[varx].unit) == "": xlabel = varx ylabel = vary + " [" + str(tt[vary].unit) + "]" if str(tt[vary].unit) == "None" or str(tt[vary].unit) == "": ylabel = vary if obs_filter_id is not None: xlabel = xlabel + " - " + dd_id2filter[obs_filter_id] ylabel = ylabel + " - " + dd_id2filter[obs_filter_id] ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) # Set axis limits if xlim is not None: ax.set_xlim(xlim) if ylim is not None: ax.set_ylim(ylim) ax.set_xscale(xscale) ax.set_yscale(yscale) if invert_xaxis: ax.invert_xaxis() if invert_yaxis: ax.invert_yaxis() return ax
# %% pipeline diagnostic plotting # TODO: there are some GALEX specific variables # will need to be adapted to other missions
[docs] def plot_pipe_diagnostic( tc, table_name, plot_type, fig_size=(12, 8), obs_filter_id=None ): """ Diagnotic plots for VASCA pipe Parameters ---------- tc : vasca.TableCollection Table collection, either field or region table_name : str Name of the table, tt_detections or tt_sources plot_type : str Type of plot, either "hist" or "scatter" fig_size: (float,float) Matplotlib figure (x,y) size in inches. obs_filter_id: int, optional Observation filter ID. Default is None. Returns ------- matplotlib.figure Figure used for plotting dict Dictionary with plot variables keys and plot settings values """ flt_name = "" if obs_filter_id is not None: flt_name = dd_id2filter[obs_filter_id] var_plt = OrderedDict() if plot_type == "hist": # Detections diagnostic if table_name == "tt_detections": var_plt["s2n"] = {"logx": True} var_plt["flux"] = {"logx": True} var_plt["r_fov"] = {"range": [0.0, 0.7]} var_plt["class_star"] = {} var_plt["artifacts"] = {"histtype": "step"} var_plt["chkobj_type"] = {} var_plt["ellip_world"] = {} # "range": [0.0, 5] var_plt["size_world"] = {} # "logx": True var_plt["flux_app_ratio"] = {"range": [0.1, 5]} # "logx": True fig, axs = plt.subplots( 3, 3, figsize=(9, 9), squeeze=False, num="Detections Histograms " + flt_name, ) elif table_name == "tt_sources": var_plt["nr_det"] = {} var_plt["flux_cpval"] = {"range": [0, 1]} var_plt["flux_nxv"] = {"range": [-0.01, 0.01]} # "logx": True var_plt["coadd_fdiff_s2n"] = {"range": [-10, 25]} var_plt["coadd_ffactor"] = {"range": [-2, 5]} var_plt["pos_cpval"] = {} var_plt["flux"] = {"logx": True} var_plt["nr_fd_srcs"] = {} fig, axs = plt.subplots( 2, 4, figsize=(12, 6), squeeze=False, num="Sources Histograms " + flt_name, ) elif table_name == "tt_coadd_sources": var_plt["nr_det"] = {} var_plt["flux_cpval"] = {} var_plt["pos_cpval"] = {} var_plt["flux"] = {"logx": True} fig, axs = plt.subplots( 1, 4, figsize=(12, 3), squeeze=False, num="Co-add Sources Histograms " + flt_name, ) else: logger.warning("Diagnostic for table '{table_name}' not defined") elif plot_type == "scatter": # Detections diagnostic if table_name == "tt_detections": var_plt[("flux_app_ratio", "flux")] = { "yscale": "log", "xlim": [0, 5], } var_plt[("artifacts", "flux")] = { "yscale": "log", } var_plt[("class_star", "flux")] = {"yscale": "log"} var_plt[("size_world", "flux")] = {"yscale": "log"} # {"yscale": "log"} var_plt[("ellip_world", "flux")] = {"yscale": "log"} var_plt[("class_star", "flux_app_ratio")] = {"ylim": [0, 5]} fig, axs = plt.subplots( 2, 3, figsize=(9, 6), squeeze=False, num="Detections Scatter " + flt_name, ) elif table_name == "tt_sources": var_plt[("flux_cpval", "flux")] = { "xscale": "log", "xlim": [1e-23, 1.0], "yscale": "log", } var_plt[("pos_cpval", "flux")] = { "yscale": "log", } var_plt[("coadd_ffactor", "flux")] = { "yscale": "log", "xlim": [0.1, 100], "xscale": "log", } var_plt[("nr_det", "flux")] = { "yscale": "log", } var_plt[("coadd_fdiff_s2n", "coadd_ffactor")] = { "xlim": [-10, 25], "yscale": "log", "ylim": [0.1, 100], } var_plt[("flux_cpval", "nr_det")] = { "xscale": "log", "xlim": [1e-23, 1.0], } fig, axs = plt.subplots( 2, 3, figsize=(9, 6), squeeze=False, num="Sources Scatter " + flt_name, ) elif table_name == "tt_coadd_sources": var_plt[("flux_cpval", "flux")] = { "xscale": "log", "yscale": "log", } var_plt[("pos_cpval", "flux")] = { "yscale": "log", } var_plt[("nr_det", "flux")] = { "yscale": "log", } fig, axs = plt.subplots( 1, 3, figsize=(9, 3), squeeze=False, num="Co-add Sources Scatter " + flt_name, ) else: logger.warning("Diegnostic for table '{table_name}' not defined") else: logger.warning("Plot type '{plot_type}' unknown") # Select filter tt = tc.__dict__[table_name] if obs_filter_id is not None: tt = select_obs_filter(tt, obs_filter_id) # Start plotting axs = axs.flatten() ax_ctr = 0 for var, plot_arg in var_plt.items(): if plot_type == "hist": plot_table_hist(tt, var, axs[ax_ctr], **plot_arg) elif plot_type == "scatter": plot_table_scatter(tt, var[0], var[1], axs[ax_ctr], **plot_arg) ax_ctr += 1 plt.tight_layout() plt.legend() return fig, var_plt
# %% light curve plotting
[docs] def plot_light_curves( tc, fd_src_ids=None, rg_src_ids=None, fig=None, ax=None, ylim=None, plot_upper_limits=True, flux_var="flux", **errorbar_kwargs, ): """ Plot the light curves of the passed sources. Parameters ---------- tc : VASCA.table.TableCollection Either field, region or source that contains the light curves. fd_src_ids : list or int List or single field source IDs to plot. Default is None. rg_src_ids : list or int List or single region source IDs to plot. Default is None. fig: figure, optional Matplotlib figure to draw on, if None a new figure is created. The default is None. ax : axes, optional Matplotlib axes to plot on. The default is None. ylim : list, optional Limits of the y axis. Default is None plot_upper_limits : bool Plot upper limits to the lightcurve. The default is True. flux_var: str, optional Variable in table to be used to get flux Jy **errorbar_kwargs : dict Key word arguments for pyplot.errorbars plotting. Returns ------- matplotlib.figure Matplotlib figure used to draw matplotlib.axes Used Matplotlib axes. """ logger.debug("Plotting lightcurves ") # Check if figure was passed if type(fig) is type(None) and type(ax) is type(None): fig = plt.figure(figsize=(6, 6)) # , constrained_layout=True else: fig = plt.gcf() # Check if axis was passed if ax is None: ax = plt.gca() # ax.invert_yaxis() ax.set_yscale("log") if hasattr(ylim, "__iter__"): ax.set_ylim(ylim) # Setup plotting parameters plt_errorbar_kwargs = { "markersize": 4, "alpha": 0.6, "capsize": 0, "lw": 0.1, "linestyle": "dotted", "elinewidth": 0.7, } if errorbar_kwargs is not None: plt_errorbar_kwargs.update(errorbar_kwargs) # Loop over selected sources and plot colors = cycle("bgrcmykbgrcmykbgrcmykbgrcmyk") markers = cycle("osDd<>^v") ctr = 0 # Get light curves dictionary dd_lcs = tc.get_light_curve(fd_src_ids, rg_src_ids, flux_var=flux_var) src_ids = list(dd_lcs.keys()) for src_id, col, mar in zip(src_ids, colors, markers): ctr += 1 # Arrays to plot filters differently mfc = ["None", col] ls = ["--", ":"] # Get light curve lc = dd_lcs[src_id] fluxs = lc["flux"] fluxs_err = lc["flux_err"] # Check if one one filter present filter_ids = np.sort(np.unique(lc["obs_filter_id"].data)) for flt_id in filter_ids: flt_plot = flt_id % 2 # Select filter to show sel = (lc["flux"] > 0) * (lc["obs_filter_id"] == flt_id) src_lab = str(src_id) + " " + str(lc[sel]["obs_filter"][0]) # Draw mean value t_mean = [np.min(lc["time"][sel]), np.max(lc["time"][sel])] flux_weight = 1.0 / lc["flux_err"][sel] ** 2 flux_mean = np.average(lc["flux"][sel], weights=flux_weight) ax.plot( t_mean, [flux_mean, flux_mean], ls=ls[flt_plot], color=col, linewidth=0.5, ) # Plot ax.errorbar( lc["time"][sel], fluxs[sel], yerr=fluxs_err[sel], color=col, markeredgecolor=col, markerfacecolor=mfc[flt_plot], marker=mar, label=src_lab, **plt_errorbar_kwargs, ) ax.legend(fontsize="small") # bbox_to_anchor=(1.04, 1), ax.set_xlabel("MJD") ax.set_ylabel(r"Flux [$\mu$Jy]") # Add a second time axis on top showing years def mjd2yr(mjd): return Time(mjd, format="mjd").jyear def yr2mjd(jyr): return Time(jyr, format="jyear").mjd secax = ax.secondary_xaxis("top", functions=(mjd2yr, yr2mjd)) secax.set_xlabel("Year") secay = ax.secondary_yaxis("right", functions=(flux2mag_np, mag2flux_np)) # Avoid scientific notation for magnitudes # formatter.set_scientific(False) # secay.yaxis.set_minor_formatter(formatter) secay.set_ylabel("AB magnitude") return fig, ax
[docs] def plot_light_curve( tc_src, fig=None, ax=None, show_gphoton=True, add_axes=True, **errorbar_kwargs ): """ Plots light curve Parameters ---------- tc_src: vasca.TableCollection Table collection containing tt_sed table. fig: figure, optional Matplotlib figure to draw on, if None a new figure is created. The default is None. ax : axes, optional Matplotlib axes to plot on. The default is None. show_gphoton: bool, optional Show gphoton light curve too, if present in table collection?. The default is True. add_axes: bool, optional Add additional x- and y-axis on the empty sides **errorbar_kwargs : dict Key word arguments for pyplot.errorbars plotting. Returns ------- matplotlib.figure Matplotlib figure used to draw matplotlib.axes Used Matplotlib axes. """ logger.debug("Plotting spectral energy distribution ") # Check if lightcurve table exists if "tt_source_lc" not in tc_src._table_names: logger.warning("No light curve table found") return # Consider only selected points else: tt_lc = tc_src.tt_source_lc[tc_src.tt_source_lc["sel"]] # Get also gPhoton light curve if it excists if "tt_gphoton_lc" in tc_src._table_names: tt_gp_lc = tc_src.tt_gphoton_lc[tc_src.tt_gphoton_lc["sel"]] # Check if figure was passed if type(fig) is type(None) and type(ax) is type(None): fig = plt.figure(figsize=(12, 6)) # , constrained_layout=True else: plt.gcf() # Check if axis was passed if ax is None: ax = plt.gca() ax.set_yscale("log") # Setup plotting parameters plt_errorbar_kwargs = { "markersize": 6, "alpha": 0.6, "capsize": 0, "lw": 0.2, "linestyle": "None", # "dotted", "elinewidth": 0.7, } if errorbar_kwargs is not None: plt_errorbar_kwargs.update(errorbar_kwargs) filter_ids = np.sort(np.unique(tt_lc["obs_filter_id"].data)) colors = cycle("brgcmykbgrcmykbgrcmykbgrcmyk") markers = cycle("osDd<>^v") for flt_id, mar, col in zip(filter_ids, markers, colors): # Plot gPhoton light curve first, if present if show_gphoton and "tt_gphoton_lc" in tc_src._table_names: sel_gp = tt_gp_lc["obs_filter_id"] == flt_id if sel_gp.sum() > 0: ax.errorbar( tt_gp_lc["time"][sel_gp], tt_gp_lc["flux"][sel_gp], yerr=tt_gp_lc["flux_err"][sel_gp], marker=mar, color="0.5", **plt_errorbar_kwargs, ) # Select filter to show sel = tt_lc["obs_filter_id"] == flt_id src_lab = str(tt_lc[sel]["obs_filter"][0]) # Plot if sel.sum() > 0: t_mean = [np.min(tt_lc["time"][sel]), np.max(tt_lc["time"][sel])] flux_weight = 1.0 / tt_lc["flux_err"][sel] ** 2 flux_mean = np.average(tt_lc["flux"][sel], weights=flux_weight) ax.plot( t_mean, [flux_mean, flux_mean], ls="--", color=col, linewidth=0.5, ) ax.errorbar( tt_lc["time"][sel], tt_lc["flux"][sel], yerr=tt_lc["flux_err"][sel], marker=mar, label=src_lab, color=col, **plt_errorbar_kwargs, ) ax.legend(fontsize=12, frameon=False) # bbox_to_anchor=(1.04, 1), ax.set_xlabel("MJD", fontsize=18) ax.set_ylabel(r"Flux [$\mu$Jy]", fontsize=18) ax.text( 0.3, 0.02, tc_src.tt_sources["src_name"][0], size=22, transform=ax.transAxes, ha="left", va="bottom", ) # , color='purple' # Avoid scientific notation formatter = ScalarFormatter() formatter.set_scientific(False) ax.yaxis.set_minor_formatter(formatter) ax.yaxis.set_major_formatter(formatter) if add_axes: # Add a second time axis on top showing years secax = ax.secondary_xaxis("top", functions=(mjd2yr, yr2mjd)) secax.ticklabel_format(useOffset=False, style="plain") secax.set_xlabel("Year", fontsize=18) secay = ax.secondary_yaxis("right", functions=(flux2mag_np, mag2flux_np)) # Avoid scientific notation for magnitudes secay.yaxis.set_minor_formatter(formatter) secay.set_ylabel("AB magnitude", fontsize=18) return fig, ax
# %% LombScargle
[docs] def plot_lombscargle( tt_lc, fig=None, ax=None, ax_phase=None, ax_lc=None, obs_filter="NUV", nbins_min=10, logy=False, freq_range=[0.03, 2] / uu.d, plot_dtbins=True, ): """ Runs and plots Lomb Scargle diagram Parameters ---------- tc_src: vasca.TableCollection Table collection containing tt_sed table. fig: figure, optional Matplotlib figure to draw on, if None a new figure is created. The default is None. ax : axes, optional Matplotlib axes to plot LombScargle diagram on. The default is None. ax_lc : axes, optional Matplotlib axes to plot with the light curve to plot peak frequency model on. The default is None. ax_phase : axes, optional Matplotlib axes to plot phase diagram. The default is None. obs_filter : str, optional Observational filter to perform LombScargle on. The default is "NUV" nbins_min : int, optional Minimum number of time bins to perform LombScargle. The default is 20. logy : bool, optional Plot LombScargle diagram in log(Power). The default is True. freq_range : list Minimum and maximum Frequency. If None calculated automatically. Returns ------- matplotlib.figure Matplotlib figure used to draw matplotlib.axes Used Matplotlib axes. dict Lomb Scargle results """ logger.debug("Plotting Lomb-Scargle diagram ") label_size = 18 # Prepare light curve sel = np.array( (tt_lc["obs_filter"] == obs_filter) * (tt_lc["sel"] == True), dtype=bool ) if sel.sum() < nbins_min: return tt_lc = tt_lc[sel] tt_lc.sort("time") # Check if figure was passed if type(fig) is type(None) and type(ax) is type(None): fig = plt.figure(figsize=(6, 6)) # , constrained_layout=True else: plt.gcf() # Check if axis was passed if ax is None: ax = plt.gca() # Setup plotting parameters plt_plot_kwargs = {"alpha": 0.5} dd_ls_results = run_LombScargle(tt_lc, nbins_min=nbins_min, freq_range=freq_range) # Set labels ax.set_xscale("log") if logy: ax.set_yscale("log") ax.set_xlabel("Frequency [1/day]", fontsize=label_size) ax.set_ylabel("LS power", fontsize=label_size) secax = ax.secondary_xaxis("top", functions=(freq2period, period2freq)) secax.set_xlabel("Period [day]", fontsize=label_size) # Plot frequency distribution of time bins if plot_dtbins: fbinsize = dd_ls_results["ls_freq"][1:] - dd_ls_results["ls_freq"][:-1] fedges = dd_ls_results["ls_freq"][:-1] + fbinsize / 2.0 dtimes = tt_lc["time"][1:].data[None, :] - tt_lc["time"][:-1].data[:, None] dfs = 1 / dtimes.flatten() hist_f, _ = np.histogram(dfs, hist_f = 0.5 * hist_f / np.max(hist_f) # normalize maximum to 0.5 ax.plot( dd_ls_results["ls_freq"][1:-1], hist_f, label="Frequency between visits", **plt_plot_kwargs, ) # Plot LS pl = ax.plot( dd_ls_results["ls_freq"], dd_ls_results["ls_power"], **plt_plot_kwargs, ) # label="data", col = pl[0].get_color() probabilities = [ 0.95449973610364, 0.002699796063, 0.000063342484, 0.000000573303, ] # 2-5 sigma # Get confidence interval conf = dd_ls_results["ls"].false_alarm_level( probabilities, method="baluev" ) # "bootstrap" ax.axhline( conf[1], linewidth=0.5, ls="--", color=col, label=r"$3 \sigma$ - $5 \sigma$ confidence levels", ) # col ax.axhline(conf[2], linewidth=0.5, ls="--", color=col) # col ax.axhline(conf[3], linewidth=0.5, ls="--", color=col) # ax.legend() # Plot model on lc if type(ax_lc) != type(None): t_min = np.min(tt_lc["time"].quantity) t_max = np.max(tt_lc["time"].quantity) nbins = int( 16 * (t_max.value - t_min.value) * dd_ls_results["ls_peak_freq"].value ) t_fit = np.linspace(t_min, t_max, nbins) flux_fit = dd_ls_results["ls"].model(t_fit, dd_ls_results["ls_peak_freq"]) ax_lc.plot(t_fit, flux_fit, alpha=0.5) # Plot phase diagram if type(ax_phase) != type(None): period_peak = float(1 / dd_ls_results["ls_peak_freq"].value) # period_peak = 220.44/(24*60*60) times_phased = tt_lc["time"] % period_peak t_fit = np.linspace(0, period_peak, 40) flux_fit = dd_ls_results["ls"].model( t_fit * uu.d, dd_ls_results["ls_peak_freq"] ) ax_phase.errorbar( times_phased / period_peak, tt_lc["flux"], yerr=tt_lc["flux_err"], linestyle="none", marker="o", color=col, ) ax_phase.plot(t_fit / period_peak, flux_fit, **plt_plot_kwargs) ax_phase.set_xlabel("Phase ", fontsize=label_size) ax_phase.set_ylabel("Flux [Jy]", fontsize=label_size) return fig, ax, dd_ls_results
[docs] def plot_sed( tc_src, fig=None, ax=None, plot_spec_lines=False, plot_spec=False, **errorbar_kwargs ): """ Plots spectral energy distribution Parameters ---------- tc_src: vasca.TableCollection Table collection containing tt_sed table. fig: figure, optional Matplotlib figure to draw on, if None a new figure is created. The default is None. ax : axes, optional Matplotlib axes to plot on. The default is None. plot_spec_lines: bool Plot typical spectral lines in White Dwarfs. The default is False. plot_spec: bool Plot typical spectral lines in White Dwarfs. The default is False. **errorbar_kwargs : dict Key word arguments for pyplot.errorbars plotting. Returns ------- matplotlib.figure Matplotlib figure used to draw matplotlib.axes Used Matplotlib axes. dict Dictionary with fit information """ logger.debug("Plotting spectral energy distribution ") # Check if SED table exists if "tt_sed" not in tc_src._table_names: logger.warning("No SED table found") return else: tt_sed = tc_src.tt_sed # Remove SkyMapper data, as it seems off sel_obs = np.ones(len(tt_sed), dtype=bool) for obs in ["SkyMapper/SkyMapper", "GAIA/GAIA2", "Gaia", "GALEX", "SDSS"]: sel_obs = sel_obs * ~(tt_sed["observatory"] == obs) sel_obs = sel_obs + (tt_sed["origin"] == "VASCA") sel_obs = sel_obs + (tt_sed["origin"] == "J/MNRAS/486/2169/table3") # sel_obs = sel_obs + (tt_sed["origin"] == "V/154/sdss16") tt_sed = tt_sed[sel_obs] # Check if figure was passed if type(fig) is type(None) and type(ax) is type(None): fig = plt.figure(figsize=(6, 6)) # , constrained_layout=True else: plt.gcf() # Check if axis was passed if ax is None: ax = plt.gca() ax.set_xscale("log") ax.set_yscale("log") # Setup plotting parameters plt_errorbar_kwargs = { "capsize": 0, "lw": 0.1, "linestyle": "dotted", "elinewidth": 0.7, } if errorbar_kwargs is not None: plt_errorbar_kwargs.update(errorbar_kwargs) # Loop over selected sources and plot colors = cycle("bgcmykbgcmykbgcmykbgcmyk") markers = cycle("sDd<>^v") # Prepare tables, to plot vasca point separatelly sel = tt_sed["origin"] == "VASCA" # If ony VASCA point present, do not plot SED if sel.sum() == len(tt_sed): print("Only VASCA points in SED, not plotting") return tt_grp = tt_sed[~sel].group_by("observatory") # ********** Plot spectral lines in background first if "tt_spectrum_0" in tc_src._table_names and plot_spec_lines: for lwavs, llabel, lls, lcol in zip( dd_spec_lines.values(), dd_spec_lines.keys(), [":", "--", "-"], ["0.7", "0.8", "0.9"], ): valabel = llabel for lwav in lwavs: ax.axvline( lwav.value, 0.3, 0.9, linestyle=lls, color=lcol, linewidth=1.0, label=valabel, ) valabel = None # ********** Plot spectra, if present. Up to 5 if plot_spec: for ii, col in zip(range(0, 5), colors): spec_name = "tt_spectrum_" + str(ii) lines_draw = True if spec_name in tc_src._table_names: tt_spec_ii = tc_src.__dict__[spec_name] sel_spec = tt_spec_ii["sel"] if sel_spec.sum() > 0: ax.plot( tt_spec_ii["wavelength"][sel_spec], tt_spec_ii["flux"][sel_spec], label="SDSS spectrum", # + str(ii), alpha=0.5, color=col, ) ax.plot( tt_spec_ii["wavelength"][sel_spec], tt_spec_ii["flux_model"][sel_spec], label="SDSS model", # + str(ii) # alpha=0.5, color=col, ) else: break # *********** Plot all none-VASCA points for tt, grp, col, mar in zip(tt_grp.groups, tt_grp.groups.keys, colors, markers): # Plot ax.errorbar( tt["wavelength"], tt["flux"], yerr=tt["flux_err"], color=col, markeredgecolor=col, marker=mar, label=str(grp[0]), markersize=6, alpha=0.4, **plt_errorbar_kwargs, ) # ************* Plot VASCA points ax.errorbar( tt_sed[sel]["wavelength"], tt_sed[sel]["flux"], yerr=tt_sed[sel]["flux_err"], color="r", markeredgecolor="r", marker="o", markersize=6, alpha=1.0, label="GALEX/VASCA", **plt_errorbar_kwargs, ) # ************* Plot Black Body # Fit Black Body spectrum uscale = uu.Unit("1e-6 Jy/sr") BB = models.BlackBody(temperature=2e4 * uu.K, scale=1e-24 * uscale) # # fit = fitting.LMLSQFitter() fit = fitting.LevMarLSQFitter() # Restrict fit range of BB fit to reasonable range bb_flux = (tt_sed["flux"].quantity / uu.Unit("sr")).to(uscale) selfit = ( (tt_sed["wavelength"] > 0) * (tt_sed["wavelength"] < 1e20) * (bb_flux.value > 1e-10) * (bb_flux.value < 1e10) ) with warnings.catch_warnings(): # Ignore model linearity warning from the fitter warnings.filterwarnings( "ignore", message="Model is linear in parameters", category=AstropyUserWarning, ) bb_bins = ( np.linspace( np.min(tt_sed["wavelength"][selfit]), np.max(tt_sed["wavelength"][selfit]), 100, ) * tt_sed["wavelength"].unit ) fitted_bb = fit( BB, tt_sed["wavelength"].quantity[selfit], bb_flux[selfit], weights=None, # 1 / tt_sed["flux_err"][selfit] maxiter=100, acc=1e-07, epsilon=1.4901161193847656e-08, estimate_jacobian=False, filter_non_finite=True, ) if fit.fit_info["ierr"] <= 4 and fit.fit_info["ierr"] >= 1: bb_bins = ( np.linspace( np.min(tt_sed["wavelength"][selfit]), np.max(tt_sed["wavelength"][selfit]), 100, ) * tt_sed["wavelength"].unit ) fit_flux = (fitted_bb(bb_bins) * uu.Unit("sr")).to(uu.Unit("1e-6 Jy")) sel_fit = tt_sed fit_temp = ( np.round(fitted_bb.temperature.value, 0) * fitted_bb.temperature.unit ) ax.plot( bb_bins, fit_flux, color="0.5", ls="-", # label="BB " + str(fit_temp), ) print("BB fit temperature:", fit_temp) else: print("Black body fit did not converge.") # Plot star black body if distance is available if "tt_gaiadr3" in tc_src._table_names: gaia_match_id = tc_src.tt_sources["gaiadr3_match_id"][0] tc_src.tt_gaiadr3.add_index("gaiadr3_match_id") gaia_idx = tc_src.tt_gaiadr3.loc_indices["gaiadr3_match_id", gaia_match_id] if tc_src.tt_gaiadr3["Plx_dist"][gaia_idx] > 1: TS = 2700 * uu.K rS = (0.1 * const.R_sun).to( dS = (tc_src.tt_gaiadr3["Plx_dist"].quantity[gaia_idx]).to( BBS_scale = np.pi * (rS / dS) ** 2 BBS = models.BlackBody(temperature=TS, scale=BBS_scale) wavS = np.linspace(7000, 110000, 100) * uu.AA fluxS = (BBS(wavS) * uu.Unit("sr")).to(uu.Unit("1e-6 Jy")) ax.plot( wavS, fluxS, color="0.5", ls="--", # label="BB star " + str(TS), ) # Helper functions to define second axis def AA2ev_np(wave): return (cc.h * cc.c / (wave * uu.AA)).to(uu.eV).value def ev2AA_np(ener): return (cc.h * cc.c / (ener * uu.eV)).to(uu.AA).value # In units of Jy the Wien-displacement constant is given by the factor below # w_kb_ev = / uu.K).value * 2.82 def AA2K_np(wave): return AA2ev_np(wave) / w_kb_ev def K2AA_np(temp): return ev2AA_np(temp * w_kb_ev) # Axis and labels # secax = ax.secondary_xaxis("top", functions=(AA2ev_np, ev2AA_np)) # secax.set_xlabel("eV") secax = ax.secondary_xaxis("top", functions=(AA2K_np, K2AA_np)) secax.set_xlabel("Temperature [K]", fontsize=16) secay = ax.secondary_yaxis("right", functions=(flux2mag_np, mag2flux_np)) # Avoid scientific notation for magnitudes formatter = ScalarFormatter() formatter.set_scientific(False) secay.yaxis.set_minor_formatter(formatter) secay.set_ylabel("AB magnitude", fontsize=16) ax.set_ylabel("Flux [$\mu$Jy]", fontsize=16) ax.set_xlabel("Wavelength [Angstom]", fontsize=16) ax.legend(frameon=False, loc="upper right") return fig, ax, fit.fit_info