Source code for EXOSIMS.Prototypes.SurveySimulation

# -*- coding: utf-8 -*-
import copy
import hashlib
import inspect
import json
import logging
import os
import random as py_random
import re
import sys
import time
from pathlib import Path
from typing import Any, Dict, Optional

import astropy.constants as const
import astropy.units as u
import numpy as np
from astropy.time import Time

from EXOSIMS.util._numpy_compat import copy_if_needed
from EXOSIMS.util.deltaMag import deltaMag
from EXOSIMS.util.get_dirs import get_cache_dir
from EXOSIMS.util.get_module import get_module
from EXOSIMS.util.version_util import get_version
from EXOSIMS.util.vprint import vprint

Logger = logging.getLogger(__name__)


[docs] class SurveySimulation(object): """:ref:`SurveySimulation` Prototype Args: scriptfile (str, optional): JSON script file. If not set, assumes that dictionary has been passed through specs. Defaults to None\ ntFlux (int): Number of intervals to split integration into for computing total SNR. When greater than 1, SNR is effectively computed as a Reimann sum. Defaults to 1 nVisitsMax (int): Maximum number of observations (in detection mode) per star. Defaults to 5 charMargin (float): Integration time margin for characterization. Defaults to 0.15 dt_max (float): Maximum time for revisit window (in days). Defaults to 1. record_counts_path (str, optional): If set, write out photon count info to file specified by this keyword. Defaults to None. revisit_wait (float, optional): If set, it is the minimum time (in days) to wait before revisiting current target. Defaults to None. nokoMap (bool): Skip generating keepout map. Only useful if you're not planning on actually running a mission simulation. Defaults to False. nofZ (bool): Skip precomputing zodical light minima. Only useful if you're not planning on actually running a mission simulation. Defaults to False. cachedir (str, optional): Full path to cachedir. If None (default) use default (see :ref:`EXOSIMSCACHE`) defaultAddExoplanetObsTime (bool): If True, time advancement when no targets are observable will add to exoplanetObsTime (i.e., wasting time is counted against you). Defaults to True find_known_RV (bool): Identify stars with known planets. Defaults to False include_known_RV (str, optional): Path to file including known planets to include. Defaults to None make_debug_bird_plots (bool, optional): If True, makes completeness bird plots for every observation that are saved in the cache directory debug_plot_path (str, optional): Path to save the debug plots in, must be set if make_debug_bird_plots is True **specs: :ref:`sec:inputspec` Attributes: _outspec (dict): :ref:`sec:outspec` absTimefZmin (astropy.time.core.Time): Absolute time of local zodi minima BackgroundSources (:ref:`BackgroundSources`): BackgroundSources object cachedir (str): Path to the EXOSIMS cache directory (see :ref:`EXOSIMSCACHE`) cachefname (str): Base filename for cache files. charMargin (float): Integration time margin for characterization. Completeness (:ref:`Completeness`): Completeness object count_lines (list): Photon counts. Only used when ``record_counts_path`` is set defaultAddExoplanetObsTime (bool): If True, time advancement when no targets are observable will add to exoplanetObsTime (i.e., wasting time is counted against you). DRM (list): The mission simulation. List of observation dictionaries. dt_max (astropy.units.quantity.Quantity): Maximum time for revisit window. revisit_wait (astropy.units.quantity.Quantity): Minimum time (in days) to wait before revisiting. find_known_RV (bool): Identify stars with known planets. fullSpectra (numpy.ndarray(bool)): Array of booleans indicating whether a planet's spectrum has been fully observed. fZmins (dict): Dictionary of local zodi minimum value candidates for each observing mode fZtypes (dict): Dictionary of type of local zodi minimum candidates for each observing mode include_known_RV (str, optional): Path to file including known planets to include. intTimeFilterInds (numpy.ndarray(ind)): Indices of targets where integration times fall below cutoff value intTimesIntTimeFilter (astropy.units.quantity.Quantity): Default integration times for pre-filtering targets. known_earths (numpy.ndarray): Indices of Earth-like planets known_rocky (list): Indices of rocky planets known_stars (list): Stars with known planets koMaps (dict): Keepout Maps koTimes (astropy.time.core.Time): Times corresponding to keepout map array entries. lastDetected (numpy.ndarray): ntarg x 4. For each target, contains 4 lists with planets' detected status (boolean), exozodi brightness (in units of 1/arcsec2), delta magnitude, and working angles (in units of arcsec) lastObsTimes (astropy.units.quantity.Quantity): Contains the last observation start time for future completeness update in units of day logger (logging.Logger): Logger object modules (dict): Modules dictionary. ntFlux (int): Number of intervals to split integration into for computing total SNR. When greater than 1, SNR is effectively computed as a Reimann sum. nVisitsMax (int): Maximum number of observations (in detection mode) per star. Observatory (:ref:`Observatory`): Observatory object. OpticalSystem (:ref:`OpticalSystem`): Optical system object. partialSpectra (numpy.ndarray): Array of booleans indicating whether a planet's spectrum has been partially observed. PlanetPhysicalModel (:ref:`PlanetPhysicalModel`): Planet pysical model object PlanetPopulation (:ref:`PlanetPopulation`): Planet population object PostProcessing (:ref:`PostProcessing`): Postprocessing object propagTimes (astropy.units.quantity.Quantity): Contains the current time at each target system. record_counts_path (str, optional): If set, write out photon count info to file specified by this keyword. seed (int): Seed for random number generation SimulatedUniverse (:ref:`SimulatedUniverse`): Simulated universe object StarCatalog (:ref:`StarCatalog`): Star catalog object starExtended (numpy.ndarray): TBD starRevisit (numpy.ndarray): ntargs x 2. Contains indices of targets to revisit and revisit times of these targets in units of day starVisits (numpy.ndarray): ntargs x 1. Contains the number of times each target was visited TargetList (:ref:`TargetList`): TargetList object. TimeKeeping (:ref:`TimeKeeping`): Timekeeping object valfZmin (astropy.units.quantity.Quantity): Minimum local zodi for each target. ZodiacalLight (:ref:`ZodiacalLight`): Zodiacal light object. """ _modtype = "SurveySimulation" def __init__( self, scriptfile=None, ntFlux=1, nVisitsMax=5, charMargin=0.15, dt_max=1.0, record_counts_path=None, revisit_wait=None, nokoMap=False, nofZ=False, cachedir=None, defaultAddExoplanetObsTime=True, find_known_RV=False, include_known_RV=None, make_debug_bird_plots=False, debug_plot_path=None, **specs, ): # Commonly used units self.zero_d = 0 << u.d self.zero_arcsec = 0 * u.arcsec self.inv_arcsec2 = 1 / u.arcsec**2 self.inv_s = 1 / u.s self.JEZ_unit = u.ph / u.s / u.m**2 / u.arcsec**2 self.fZ_unit = 1 / u.arcsec**2 self.AU2pc = (1 * u.AU).to_value(u.pc) self.rad2arcsec = (1 * u.rad).to_value(u.arcsec) self.day2sec = (1 * u.d).to_value(u.s) self.m_per_s = u.m / u.s # start the outspec self._outspec = {} # if a script file is provided read it in. If not set, assumes that # dictionary has been passed through specs. if scriptfile is not None: assert os.path.isfile(scriptfile), "%s is not a file." % scriptfile try: with open(scriptfile) as ff: script = ff.read() specs.update(json.loads(script)) except ValueError: sys.stderr.write("Script file `%s' is not valid JSON." % scriptfile) # must re-raise, or the error will be masked raise # modules array must be present if "modules" not in specs: raise ValueError("No modules field found in script.") # load the vprint function (same line in all prototype module constructors) self.vprint = vprint(specs.get("verbose", True)) # count dict contains all of the C info for each star index self.record_counts_path = record_counts_path self.count_lines = [] self._outspec["record_counts_path"] = record_counts_path # mission simulation logger self.logger = specs.get("logger", logging.getLogger(__name__)) # set up numpy random number (generate it if not in specs) self.seed = int(specs.get("seed", py_random.randint(1, int(1e9)))) self.vprint("Numpy random seed is: %s" % self.seed) np.random.seed(self.seed) self._outspec["seed"] = self.seed # cache directory self.cachedir = get_cache_dir(cachedir) self._outspec["cachedir"] = self.cachedir # N.B.: cachedir is going to be used by everything, so let's make sure that # it doesn't get popped out of specs specs["cachedir"] = self.cachedir # if any of the modules is a string, assume that they are all strings # and we need to initalize if isinstance(next(iter(specs["modules"].values())), str): # import desired module names (prototype or specific) self.SimulatedUniverse = get_module( specs["modules"]["SimulatedUniverse"], "SimulatedUniverse" )(**specs) self.Observatory = get_module( specs["modules"]["Observatory"], "Observatory" )(**specs) self.TimeKeeping = get_module( specs["modules"]["TimeKeeping"], "TimeKeeping" )(**specs) # bring inherited class objects to top level of Survey Simulation SU = self.SimulatedUniverse self.StarCatalog = SU.StarCatalog self.PlanetPopulation = SU.PlanetPopulation self.PlanetPhysicalModel = SU.PlanetPhysicalModel self.OpticalSystem = SU.OpticalSystem self.ZodiacalLight = SU.ZodiacalLight self.BackgroundSources = SU.BackgroundSources self.PostProcessing = SU.PostProcessing self.Completeness = SU.Completeness self.TargetList = SU.TargetList else: # these are the modules that must be present if passing instantiated objects neededObjMods = [ "PlanetPopulation", "PlanetPhysicalModel", "OpticalSystem", "ZodiacalLight", "BackgroundSources", "PostProcessing", "Completeness", "TargetList", "SimulatedUniverse", "Observatory", "TimeKeeping", ] # ensure that you have the minimal set for modName in neededObjMods: if modName not in specs["modules"]: raise ValueError( "%s module is required but was not provided." % modName ) for modName in specs["modules"]: assert specs["modules"][modName]._modtype == modName, ( "Provided instance of %s has incorrect modtype." % modName ) setattr(self, modName, specs["modules"][modName]) # create a dictionary of all modules, except StarCatalog self.modules = {} self.modules["PlanetPopulation"] = self.PlanetPopulation self.modules["PlanetPhysicalModel"] = self.PlanetPhysicalModel self.modules["OpticalSystem"] = self.OpticalSystem self.modules["ZodiacalLight"] = self.ZodiacalLight self.modules["BackgroundSources"] = self.BackgroundSources self.modules["PostProcessing"] = self.PostProcessing self.modules["Completeness"] = self.Completeness self.modules["TargetList"] = self.TargetList self.modules["SimulatedUniverse"] = self.SimulatedUniverse self.modules["Observatory"] = self.Observatory self.modules["TimeKeeping"] = self.TimeKeeping # add yourself to modules list for bookkeeping purposes self.modules["SurveySimulation"] = self # observation time sampling self.ntFlux = int(ntFlux) self._outspec["ntFlux"] = self.ntFlux # maximum number of observations per star self.nVisitsMax = int(nVisitsMax) self._outspec["nVisitsMax"] = self.nVisitsMax # integration time margin for characterization self.charMargin = float(charMargin) self._outspec["charMargin"] = self.charMargin # maximum time for revisit window self.dt_max = float(dt_max) * u.week self._outspec["dt_max"] = self.dt_max.value # minimum time for revisit window if revisit_wait is not None: self.revisit_wait = revisit_wait * u.d else: self.revisit_wait = revisit_wait self._outspec["revisit_wait"] = revisit_wait # list of detected earth-like planets aroung promoted stars self.known_earths = np.array([]) self.find_known_RV = find_known_RV self._outspec["find_known_RV"] = find_known_RV self._outspec["include_known_RV"] = include_known_RV if self.find_known_RV: # select specific knonw RV stars if a file exists if include_known_RV is not None: if os.path.isfile(include_known_RV): with open(include_known_RV, "r") as rv_file: self.include_known_RV = [ hip.strip() for hip in rv_file.read().split(",") ] self.vprint( "Including known RV stars: {}".format(self.include_known_RV) ) else: self.include_known_RV = None self.vprint( "WARNING: Known RV file: {} does not exist!!".format( include_known_RV ) ) else: self.include_known_RV = None self.known_stars, self.known_rocky = self.find_known_plans() else: self.include_known_RV = None self.known_stars = [] self.known_rocky = [] # defaultAddExoplanetObsTime Tells us time advanced when no targets available # counts agains exoplanetObsTime (when True) self.defaultAddExoplanetObsTime = defaultAddExoplanetObsTime self._outspec["defaultAddExoplanetObsTime"] = defaultAddExoplanetObsTime # If inputs are scalars, save scalars to outspec, otherwise save full lists OS = self.OpticalSystem TL = self.TargetList SU = self.SimulatedUniverse TK = self.TimeKeeping # initialize arrays updated in run_sim() self.initializeStorageArrays() # Generate File Hashnames and loction self.cachefname = self.generateHashfName(specs) # choose observing modes selected for detection (default marked with a flag) allModes = OS.observingModes det_mode = list(filter(lambda mode: mode["detectionMode"], allModes))[0] # getting keepout map for entire mission startTime = self.TimeKeeping.missionStart.copy() endTime = self.TimeKeeping.missionFinishAbs.copy() nSystems = len(allModes) systNames = np.unique( [allModes[x]["syst"]["name"] for x in np.arange(nSystems)] ) systOrder = np.argsort(systNames) koStr = ["koAngles_Sun", "koAngles_Moon", "koAngles_Earth", "koAngles_Small"] koangles = np.zeros([len(systNames), 4, 2]) for x in systOrder: rel_mode = list( filter(lambda mode: mode["syst"]["name"] == systNames[x], allModes) )[0] koangles[x] = np.asarray([rel_mode["syst"][k] for k in koStr]) # Precalculate Earth positions and create a cubic spline interpolator # to save time during orbit calculations dt = 0.1 # days self.Observatory.generate_Earth_interpolator(startTime, endTime, dt) self._outspec["nokoMap"] = nokoMap if not (nokoMap): koMaps, self.koTimes = self.Observatory.generate_koMap( TL, startTime, endTime, koangles ) self.koTimes_mjd = self.koTimes.mjd self.koMaps = {} for x, n in zip(systOrder, systNames[systOrder]): print(n) self.koMaps[n] = koMaps[x, :, :] self._outspec["nofZ"] = nofZ # TODO: do we still want a nofZ option? probably not. self.fZmins = {} self.fZtypes = {} for x, n in zip(systOrder, systNames[systOrder]): self.fZmins[n] = np.array([]) self.fZtypes[n] = np.array([]) # TODO: do we need to do this for all modes? doing det only breaks other # schedulers, but maybe there's a better approach here. sInds = np.arange(TL.nStars) # Initialize some sInds array for mode in allModes: # This instantiates fZMap arrays for every starlight suppresion system # that is actually used in a mode modeHashName = ( f'{self.cachefname[0:-1]}_{TL.nStars}_{mode["syst"]["name"]}' f"_{startTime.mjd:0}_{endTime.mjd:0}." ) if (np.size(self.fZmins[mode["syst"]["name"]]) == 0) or ( np.size(self.fZtypes[mode["syst"]["name"]]) == 0 ): self.ZodiacalLight.generate_fZ( self.Observatory, TL, self.TimeKeeping, mode, modeHashName, self.koTimes, ) ( self.fZmins[mode["syst"]["name"]], self.fZtypes[mode["syst"]["name"]], ) = self.ZodiacalLight.calcfZmin( sInds, self.Observatory, TL, self.TimeKeeping, mode, modeHashName, self.koMaps[mode["syst"]["name"]], self.koTimes, ) # Precalculating intTimeFilter for coronagraph # find fZmin to use in intTimeFilter self.valfZmin, self.absTimefZmin = self.ZodiacalLight.extractfZmin( self.fZmins[det_mode["syst"]["name"]], sInds, self.koTimes ) JEZ0 = self.TargetList.JEZ0[det_mode["hex"]] dMag = TL.int_dMag[sInds] # grabbing dMag WA = TL.int_WA[sInds] # grabbing WA self.intTimesIntTimeFilter = ( self.OpticalSystem.calc_intTime( TL, sInds, self.valfZmin, JEZ0, dMag, WA, det_mode, TK=TK ) * det_mode["timeMultiplier"] ) # intTimes to filter by # These indices are acceptable for use simulating self.intTimeFilterInds = np.where( ( (self.intTimesIntTimeFilter > 0) & (self.intTimesIntTimeFilter <= self.OpticalSystem.intCutoff) ) )[0] self.make_debug_bird_plots = make_debug_bird_plots if self.make_debug_bird_plots: assert ( debug_plot_path is not None ), "debug_plot_path must be set by input if make_debug_bird_plots is True" self.obs_plot_path = Path(f"{debug_plot_path}/{self.seed}") # Make directory if it doesn't exist if not self.obs_plot_path.exists(): vprint(f"Making plot directory: {self.obs_plot_path}") self.obs_plot_path.mkdir(parents=True, exist_ok=True) self.obs_n_counter = 0 self._outspec["make_debug_bird_plots"] = self.make_debug_bird_plots self._outspec["debug_plot_path"] = debug_plot_path
[docs] def initializeStorageArrays(self): """ Initialize all storage arrays based on # of stars and targets """ self.DRM = [] self.fullSpectra = np.zeros(self.SimulatedUniverse.nPlans, dtype=int) self.partialSpectra = np.zeros(self.SimulatedUniverse.nPlans, dtype=int) self.propagTimes = np.zeros(self.TargetList.nStars) * u.d self.lastObsTimes = np.zeros(self.TargetList.nStars) * u.d self.starVisits = np.zeros( self.TargetList.nStars, dtype=int ) # contains the number of times each star was visited self.starRevisit = np.array([]) self.starExtended = np.array([], dtype=int) self.lastDetected = np.empty((self.TargetList.nStars, 4), dtype=object)
def __str__(self): """String representation of the Survey Simulation object When the command 'print' is used on the Survey Simulation object, this method will return the values contained in the object """ for att in self.__dict__: print("%s: %r" % (att, getattr(self, att))) return "Survey Simulation class object attributes"
[docs] def run_sim(self): """Performs the survey simulation""" OS = self.OpticalSystem TL = self.TargetList SU = self.SimulatedUniverse Obs = self.Observatory TK = self.TimeKeeping # TODO: start using this self.currentSep # set occulter separation if haveOcculter if OS.haveOcculter: self.currentSep = Obs.occulterSep # choose observing modes selected for detection (default marked with a flag) allModes = OS.observingModes det_mode = list(filter(lambda mode: mode["detectionMode"], allModes))[0] # and for characterization (default is first spectro/IFS mode) spectroModes = list( filter(lambda mode: "spec" in mode["inst"]["name"], allModes) ) if np.any(spectroModes): char_mode = spectroModes[0] # if no spectro mode, default char mode is first observing mode else: char_mode = allModes[0] # begin Survey, and loop until mission is finished log_begin = "OB%s: survey beginning." % (TK.OBnumber) self.logger.info(log_begin) self.vprint(log_begin) t0 = time.time() sInd = None ObsNum = 0 while not TK.mission_is_over(OS, Obs, det_mode): # acquire the NEXT TARGET star index and create DRM old_sInd = sInd # used to save sInd if returned sInd is None DRM, sInd, det_intTime, waitTime = self.next_target(sInd, det_mode) if sInd is not None: ObsNum += ( 1 # we're making an observation so increment observation number ) if OS.haveOcculter: # advance to start of observation (add slew time for selected target _ = TK.advanceToAbsTime(TK.currentTimeAbs.copy() + waitTime) # beginning of observation, start to populate DRM DRM["star_ind"] = sInd DRM["star_name"] = TL.Name[sInd] DRM["arrival_time"] = TK.currentTimeNorm.to("day").copy() DRM["OB_nb"] = TK.OBnumber DRM["ObsNum"] = ObsNum pInds = np.where(SU.plan2star == sInd)[0] DRM["plan_inds"] = pInds.astype(int) log_obs = ( " Observation #%s, star ind %s (of %s) with %s planet(s), " + "mission time at Obs start: %s, exoplanetObsTime: %s" ) % ( ObsNum, sInd, TL.nStars, len(pInds), TK.currentTimeNorm.to("day").copy().round(2), TK.exoplanetObsTime.to("day").copy().round(2), ) self.logger.info(log_obs) self.vprint(log_obs) # PERFORM DETECTION and populate revisit list attribute ( detected, det_fZ, det_JEZ, det_systemParams, det_SNR, FA, ) = self.observation_detection(sInd, det_intTime.copy(), det_mode) # update the occulter wet mass if OS.haveOcculter: DRM = self.update_occulter_mass( DRM, sInd, det_intTime.copy(), "det" ) # populate the DRM with detection results DRM["det_time"] = det_intTime.to("day") DRM["det_status"] = detected DRM["det_SNR"] = det_SNR DRM["det_fZ"] = det_fZ.to(self.fZ_unit) DRM["det_JEZ"] = det_JEZ.to(self.JEZ_unit) DRM["det_params"] = det_systemParams # PERFORM CHARACTERIZATION and populate spectra list attribute if char_mode["SNR"] not in [0, np.inf]: ( characterized, char_fZ, char_JEZ, char_systemParams, char_SNR, char_intTime, ) = self.observation_characterization(sInd, char_mode) else: char_intTime = None lenChar = len(pInds) + 1 if FA else len(pInds) characterized = np.zeros(lenChar, dtype=float) char_SNR = np.zeros(lenChar, dtype=float) char_fZ = 0.0 / u.arcsec**2 char_systemParams = SU.dump_system_params(sInd) assert char_intTime != 0, "Integration time can't be 0." # update the occulter wet mass if OS.haveOcculter and (char_intTime is not None): DRM = self.update_occulter_mass(DRM, sInd, char_intTime, "char") # populate the DRM with characterization results DRM["char_time"] = ( char_intTime.to("day") if char_intTime is not None else 0.0 * u.day ) DRM["char_status"] = characterized[:-1] if FA else characterized DRM["char_SNR"] = char_SNR[:-1] if FA else char_SNR DRM["char_fZ"] = char_fZ.to(self.fZ_unit) DRM["char_params"] = char_systemParams # populate the DRM with FA results DRM["FA_det_status"] = int(FA) DRM["FA_char_status"] = characterized[-1] if FA else 0 DRM["FA_char_SNR"] = char_SNR[-1] if FA else 0.0 DRM["FA_char_JEZ"] = ( self.lastDetected[sInd, 1][-1] if FA else 0.0 * self.JEZ_unit ) DRM["FA_char_dMag"] = self.lastDetected[sInd, 2][-1] if FA else 0.0 DRM["FA_char_WA"] = ( self.lastDetected[sInd, 3][-1] * u.arcsec if FA else 0.0 * u.arcsec ) # populate the DRM with observation modes DRM["det_mode"] = dict(det_mode) del DRM["det_mode"]["inst"], DRM["det_mode"]["syst"] DRM["char_mode"] = dict(char_mode) del DRM["char_mode"]["inst"], DRM["char_mode"]["syst"] DRM["exoplanetObsTime"] = TK.exoplanetObsTime.copy() # append result values to self.DRM self.DRM.append(DRM) # handle case of inf OBs and missionPortion < 1 if np.isinf(TK.OBduration) and (TK.missionPortion < 1.0): self.arbitrary_time_advancement( TK.currentTimeNorm.to("day").copy() - DRM["arrival_time"] ) else: # sInd == None sInd = old_sInd # Retain the last observed star if ( TK.currentTimeNorm.copy() >= TK.OBendTimes[TK.OBnumber] ): # currentTime is at end of OB # Conditional Advance To Start of Next OB if not TK.mission_is_over( OS, Obs, det_mode ): # as long as the mission is not over TK.advancetToStartOfNextOB() # Advance To Start of Next OB elif waitTime is not None: # CASE 1: Advance specific wait time _ = TK.advanceToAbsTime( TK.currentTimeAbs.copy() + waitTime, self.defaultAddExoplanetObsTime, ) self.vprint("waitTime is not None") else: startTimes = ( TK.currentTimeAbs.copy() + np.zeros(TL.nStars) * u.d ) # Start Times of Observations observableTimes = Obs.calculate_observableTimes( TL, np.arange(TL.nStars), startTimes, self.koMaps, self.koTimes, det_mode, )[0] # CASE 2 If There are no observable targets for the rest of the # mission if ( observableTimes[ ( TK.missionFinishAbs.copy().value * u.d > observableTimes.value * u.d ) * ( observableTimes.value * u.d >= TK.currentTimeAbs.copy().value * u.d ) ].shape[0] ) == 0: self.vprint( ( "No Observable Targets for Remainder of mission at " "currentTimeNorm = {}" ).format(TK.currentTimeNorm) ) # Manually advancing time to mission end TK.currentTimeNorm = TK.missionLife TK.currentTimeAbs = TK.missionFinishAbs # CASE 3 nominal wait time if at least 1 target is still in list # and observable else: # TODO: ADD ADVANCE TO WHEN FZMIN OCURS inds1 = np.arange(TL.nStars)[ observableTimes.value * u.d > TK.currentTimeAbs.copy().value * u.d ] # apply intTime filter inds2 = np.intersect1d(self.intTimeFilterInds, inds1) # apply revisit Filter #NOTE this means stars you added to the # revisit list inds3 = self.revisitFilter( inds2, TK.currentTimeNorm.copy() + self.dt_max.to(u.d) ) self.vprint( "Filtering %d stars from advanceToAbsTime" % (TL.nStars - len(inds3)) ) oTnowToEnd = observableTimes[inds3] # there is at least one observableTime between now and the end # of the mission if not oTnowToEnd.value.shape[0] == 0: # advance to that observable time tAbs = np.min(oTnowToEnd) else: tAbs = ( TK.missionStart + TK.missionLife ) # advance to end of mission tmpcurrentTimeNorm = TK.currentTimeNorm.copy() # Advance Time to this time OR start of next # OB following this time _ = TK.advanceToAbsTime(tAbs, self.defaultAddExoplanetObsTime) self.vprint( ( "No Observable Targets a currentTimeNorm= {:.2f}. " "Advanced To {:.2f}" ).format( tmpcurrentTimeNorm.to("day"), TK.currentTimeNorm.to("day"), ) ) else: # TK.mission_is_over() dtsim = (time.time() - t0) * u.s log_end = ( "Mission complete: no more time available.\n" + "Simulation duration: %s.\n" % dtsim.astype("int") + "Results stored in SurveySimulation.DRM (Design Reference Mission)." ) self.logger.info(log_end) self.vprint(log_end)
[docs] def arbitrary_time_advancement(self, dt): """Handles fully dynamically scheduled case where OBduration is infinite and missionPortion is less than 1. Args: dt (~astropy.units.quantity.Quantity): Total amount of time, including all overheads and extras used for the previous observation. Returns: None """ self.TimeKeeping.allocate_time( dt * (1.0 - self.TimeKeeping.missionPortion) / self.TimeKeeping.missionPortion, addExoplanetObsTime=False, )
[docs] def next_target(self, old_sInd, mode): """Finds index of next target star and calculates its integration time. This method chooses the next target star index based on which stars are available, their integration time, and maximum completeness. Returns None if no target could be found. Args: old_sInd (int): Index of the previous target star mode (dict): Selected observing mode for detection Returns: tuple: DRM (dict): Design Reference Mission, contains the results of one complete observation (detection and characterization) sInd (int): Index of next target star. Defaults to None. intTime (astropy.units.Quantity): Selected star integration time for detection in units of day. Defaults to None. waitTime (astropy.units.Quantity): a strategically advantageous amount of time to wait in the case of an occulter for slew times """ OS = self.OpticalSystem TL = self.TargetList Obs = self.Observatory TK = self.TimeKeeping # create DRM DRM = {} # allocate settling time + overhead time tmpCurrentTimeAbs = ( TK.currentTimeAbs.copy() + Obs.settlingTime + mode["syst"]["ohTime"] ) tmpCurrentTimeNorm = ( TK.currentTimeNorm.copy() + Obs.settlingTime + mode["syst"]["ohTime"] ) # create appropriate koMap koMap = self.koMaps[mode["syst"]["name"]] # look for available targets # 1. initialize arrays slewTimes = np.zeros(TL.nStars) << u.d # fZs = np.zeros(TL.nStars) / u.arcsec**2.0 dV = np.zeros(TL.nStars) << self.m_per_s intTimes = np.zeros(TL.nStars) << u.d obsTimes = np.zeros([2, TL.nStars]) << u.d sInds = np.arange(TL.nStars) # 2. find spacecraft orbital START positions (if occulter, positions # differ for each star) and filter out unavailable targets sd = None if OS.haveOcculter: sd = Obs.star_angularSep(TL, old_sInd, sInds, tmpCurrentTimeAbs) obsTimes = Obs.calculate_observableTimes( TL, sInds, tmpCurrentTimeAbs, self.koMaps, self.koTimes, mode ) slewTimes = Obs.calculate_slewTimes( TL, old_sInd, sInds, sd, obsTimes, tmpCurrentTimeAbs ) # 2.1 filter out totTimes > integration cutoff if len(sInds.tolist()) > 0: sInds = np.intersect1d(self.intTimeFilterInds, sInds) # start times, including slew times startTimes = tmpCurrentTimeAbs.copy() + slewTimes startTimesNorm = tmpCurrentTimeNorm.copy() + slewTimes # 2.5 Filter stars not observable at startTimes try: tmpIndsbool = list() for i in np.arange(len(sInds)): koTimeInd = np.where( np.round(startTimes[sInds[i]].value) - self.koTimes.value == 0 )[0][ 0 ] # find indice where koTime is startTime[0] tmpIndsbool.append( koMap[sInds[i]][koTimeInd].astype(bool) ) # Is star observable at time ind sInds = sInds[tmpIndsbool] del tmpIndsbool except: # noqa: E722 # If there are no target stars to observe sInds = np.asarray([], dtype=int) # 3. filter out all previously (more-)visited targets, unless in if len(sInds.tolist()) > 0: sInds = self.revisitFilter(sInds, tmpCurrentTimeNorm) # 4.1 calculate integration times for ALL preselected targets ( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife, ) = TK.get_ObsDetectionMaxIntTime(Obs, mode) maxIntTime = min( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife ) # Maximum intTime allowed if len(sInds.tolist()) > 0: if OS.haveOcculter and (old_sInd is not None): ( sInds, slewTimes[sInds], intTimes[sInds], dV[sInds], ) = self.refineOcculterSlews( old_sInd, sInds, slewTimes, obsTimes, sd, mode ) endTimes = tmpCurrentTimeAbs.copy() + intTimes + slewTimes else: intTimes[sInds] = self.calc_targ_intTime(sInds, startTimes[sInds], mode) sInds = sInds[ np.where(intTimes[sInds] <= maxIntTime) ] # Filters targets exceeding end of OB endTimes = tmpCurrentTimeAbs.copy() + intTimes if maxIntTime.value <= 0: sInds = np.asarray([], dtype=int) # 5.1 TODO Add filter to filter out stars entering and exiting keepout # between startTimes and endTimes # 5.2 find spacecraft orbital END positions (for each candidate target), # and filter out unavailable targets if len(sInds.tolist()) > 0 and Obs.checkKeepoutEnd: # endTimes may exist past koTimes so we have an exception to hand this case try: tmpIndsbool = list() for i in np.arange(len(sInds)): koTimeInd = np.where( np.round(endTimes[sInds[i]].value) - self.koTimes.value == 0 )[0][ 0 ] # find indice where koTime is endTime[0] tmpIndsbool.append( koMap[sInds[i]][koTimeInd].astype(bool) ) # Is star observable at time ind sInds = sInds[tmpIndsbool] del tmpIndsbool except: # noqa: E722 sInds = np.asarray([], dtype=int) # 6. choose best target from remaining if len(sInds.tolist()) > 0: # choose sInd of next target sInd, waitTime = self.choose_next_target( old_sInd, sInds, slewTimes, intTimes[sInds] ) # Should Choose Next Target decide there are no stars it wishes to # observe at this time. if sInd is None and (waitTime is not None): self.vprint( "There are no stars available to observe. Waiting {}".format( waitTime ) ) return DRM, None, None, waitTime elif (sInd is None) and (waitTime is None): self.vprint( "There are no stars available to observe and waitTime is None." ) return DRM, None, None, waitTime # store selected star integration time intTime = intTimes[sInd] # if no observable target, advanceTime to next Observable Target else: self.vprint( "No Observable Targets at currentTimeNorm= " + str(TK.currentTimeNorm.copy()) ) return DRM, None, None, None # update visited list for selected star self.starVisits[sInd] += 1 # store normalized start time for future completeness update self.lastObsTimes[sInd] = startTimesNorm[sInd] # populate DRM with occulter related values if OS.haveOcculter: DRM = Obs.log_occulterResults( DRM, slewTimes[sInd], sInd, sd[sInd], dV[sInd] ) return DRM, sInd, intTime, slewTimes[sInd] return DRM, sInd, intTime, waitTime
[docs] def calc_targ_intTime(self, sInds, startTimes, mode): """Helper method for next_target to aid in overloading for alternative implementations. Given a subset of targets, calculate their integration times given the start of observation time. Prototype just calculates integration times for fixed contrast depth. Args: sInds (~numpy.ndarray(int)): Indices of available targets startTimes (astropy quantity array): absolute start times of observations. must be of the same size as sInds mode (dict): Selected observing mode for detection Returns: ~astropy.units.Quantity(~numpy.ndarray(float)): Integration times for detection same dimension as sInds .. note:: next_target filter will discard targets with zero integration times. """ TL = self.TargetList # assumed values for detection fZ = self.ZodiacalLight.fZ( self.Observatory, self.TargetList, sInds, startTimes, mode ) # Use the default exozodi intensities JEZ = TL.JEZ0[mode["hex"]][sInds] dMag = TL.int_dMag[sInds] WA = TL.int_WA[sInds] # save out file containing photon count info if self.record_counts_path is not None and len(self.count_lines) == 0: C_p, C_b, C_sp, C_extra = self.OpticalSystem.Cp_Cb_Csp( self.TargetList, sInds, fZ, JEZ, dMag, WA, mode, returnExtra=True ) import csv count_fpath = os.path.join(self.record_counts_path, "counts") if not os.path.exists(count_fpath): os.mkdir(count_fpath) outfile = os.path.join(count_fpath, str(self.seed) + ".csv") self.count_lines.append( [ "sInd", "HIPs", "C_F0", "C_p0", "C_sr", "C_z", "C_ez", "C_dc", "C_cc", "C_rn", "C_p", "C_b", "C_sp", ] ) for i, sInd in enumerate(sInds): self.count_lines.append( [ sInd, self.TargetList.Name[sInd], C_extra["C_F0"][0].value, C_extra["C_sr"][i].value, C_extra["C_z"][i].value, C_extra["C_ez"][i].value, C_extra["C_dc"][i].value, C_extra["C_cc"][i].value, C_extra["C_rn"][i].value, C_p[i].value, C_b[i].value, C_sp[i].value, ] ) with open(outfile, "w") as csvfile: c = csv.writer(csvfile) c.writerows(self.count_lines) intTimes = self.OpticalSystem.calc_intTime( self.TargetList, sInds, fZ, JEZ, dMag, WA, mode ) intTimes[~np.isfinite(intTimes)] = 0 * u.d return intTimes
[docs] def choose_next_target(self, old_sInd, sInds, slewTimes, intTimes): """Helper method for method next_target to simplify alternative implementations. Given a subset of targets (pre-filtered by method next_target or some other means), select the best next one. The prototype uses completeness as the sole heuristic. Args: old_sInd (int): Index of the previous target star sInds (~numpy.ndarray(int)): Indices of available targets slewTimes (~astropy.units.Quantity(~numpy.ndarray(float))): slew times to all stars (must be indexed by sInds) intTimes (~astropy.units.Quantity(~numpy.ndarray(float))): Integration times for detection in units of day Returns: tuple: sInd (int): Index of next target star waitTime (:py:class:`~astropy.units.Quantity`): Some strategic amount of time to wait in case an occulter slew is desired (default is None) """ Comp = self.Completeness TL = self.TargetList TK = self.TimeKeeping OS = self.OpticalSystem Obs = self.Observatory allModes = OS.observingModes # cast sInds to array sInds = np.array(sInds, ndmin=1, copy=copy_if_needed) # calculate dt since previous observation dt = TK.currentTimeNorm.copy() + slewTimes[sInds] - self.lastObsTimes[sInds] # get dynamic completeness values comps = Comp.completeness_update(TL, sInds, self.starVisits[sInds], dt) # choose target with maximum completeness sInd = np.random.choice(sInds[comps == max(comps)]) # Check if exoplanetObsTime would be exceeded mode = list(filter(lambda mode: mode["detectionMode"], allModes))[0] ( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife, ) = TK.get_ObsDetectionMaxIntTime(Obs, mode) maxIntTime = min( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife ) # Maximum intTime allowed intTimes2 = self.calc_targ_intTime( np.array([sInd]), TK.currentTimeAbs.copy(), mode ) if ( intTimes2 > maxIntTime ): # check if max allowed integration time would be exceeded self.vprint("max allowed integration time would be exceeded") sInd = None waitTime = 1.0 * u.d return sInd, waitTime return ( sInd, slewTimes[sInd], ) # if coronagraph or first sInd, waitTime will be 0 days
[docs] def refineOcculterSlews(self, old_sInd, sInds, slewTimes, obsTimes, sd, mode): """Refines/filters/chooses occulter slews based on time constraints Refines the selection of occulter slew times by filtering based on mission time constraints and selecting the best slew time for each star. This method calls on other occulter methods within SurveySimulation depending on how slew times were calculated prior to calling this function (i.e. depending on which implementation of the Observatory module is used). Args: old_sInd (int): Index of the previous target star sInds (~numpy.ndarray(int)): Indices of available targets slewTimes (astropy quantity array): slew times to all stars (must be indexed by sInds) obsTimes (~astropy.units.Quantity(~numpy.ndarray(float))): A binary array with TargetList.nStars rows and (missionFinishAbs-missionStart)/dt columns where dt is 1 day by default. A value of 1 indicates the star is in keepout (and therefore cannot be observed). A value of 0 indicates the star is not in keepout and may be observed. sd (~astropy.units.Quantity(~numpy.ndarray(float))): Angular separation between stars in rad mode (dict): Selected observing mode for detection Returns: tuple: sInds (int): Indeces of next target star slewTimes (astropy.units.Quantity(numpy.ndarray(float))): slew times to all stars (must be indexed by sInds) intTimes (astropy.units.Quantity(numpy.ndarray(float))): Integration times for detection in units of day dV (astropy.units.Quantity(numpy.ndarray(float))): Delta-V used to transfer to new star line of sight in unis of m/s """ Obs = self.Observatory TL = self.TargetList # initializing arrays obsTimeArray = np.zeros([TL.nStars, 50]) << u.d intTimeArray = np.zeros([TL.nStars, 2]) << u.d for n in sInds: obsTimeArray[n, :] = ( np.linspace(obsTimes[0, n].value, obsTimes[1, n].value, 50) << u.d ) intTimeArray[sInds, 0] = self.calc_targ_intTime( sInds, Time(obsTimeArray[sInds, 0], format="mjd", scale="tai"), mode ) intTimeArray[sInds, 1] = self.calc_targ_intTime( sInds, Time(obsTimeArray[sInds, -1], format="mjd", scale="tai"), mode ) # determining which scheme to use to filter slews obsModName = Obs.__class__.__name__ # slew times have not been calculated/decided yet (SotoStarshade) if obsModName == "SotoStarshade": sInds, intTimes, slewTimes, dV = self.findAllowableOcculterSlews( sInds, old_sInd, sd[sInds], slewTimes[sInds], obsTimeArray[sInds, :], intTimeArray[sInds, :], mode, ) # slew times were calculated/decided beforehand (Observatory Prototype) else: sInds, intTimes, slewTimes = self.filterOcculterSlews( sInds, slewTimes[sInds], obsTimeArray[sInds, :], intTimeArray[sInds, :], mode, ) dV = np.zeros(len(sInds)) << self.m_per_s return sInds, slewTimes, intTimes, dV
[docs] def filterOcculterSlews(self, sInds, slewTimes, obsTimeArray, intTimeArray, mode): """Filters occulter slews that have already been calculated/selected. Used by the refineOcculterSlews method when slew times have been selected a priori. This method filters out slews that are not within desired observing blocks, the maximum allowed integration time, and are outside of future keepouts. Args: sInds (~numpy.ndarray(int)): Indices of available targets slewTimes (~astropy.units.Quantity(~numpy.ndarray(float))): slew times to all stars (must be indexed by sInds) obsTimeArray (~astropy.units.Quantity(~numpy.ndarray(float))): Array of times during which a star is out of keepout, has shape nx50 where n is the number of stars in sInds. Unit of days intTimeArray (~astropy.units.Quantity(~numpy.ndarray(float))): Array of integration times for each time in obsTimeArray, has shape nx2 where n is the number of stars in sInds. Unit of days mode (dict): Selected observing mode for detection Returns: tuple: sInds (int): Indeces of next target star intTimes (astropy.units.Quantity(numpy.ndarray(float))): Integration times for detection in units of day slewTimes (astropy.units.Quantity(numpy.ndarray(float))): slew times to all stars (must be indexed by sInds) """ TK = self.TimeKeeping Obs = self.Observatory # allocate settling time + overhead time tmpCurrentTimeAbs = ( TK.currentTimeAbs.copy() + Obs.settlingTime + mode["syst"]["ohTime"] ) tmpCurrentTimeNorm = ( TK.currentTimeNorm.copy() + Obs.settlingTime + mode["syst"]["ohTime"] ) # 0. lambda function that linearly interpolates Integration Time # between obsTimes linearInterp = lambda y, x, t: np.diff(y) / np.diff(x) * ( # noqa: E731 t - np.array(x[:, 0]).reshape(len(t), 1) ) + np.array(y[:, 0]).reshape(len(t), 1) # 1. initializing arrays obsTimesRange = np.array( [obsTimeArray[:, 0], obsTimeArray[:, -1]] ) # nx2 array with start and end times of obsTimes for each star intTimesRange = np.array([intTimeArray[:, 0], intTimeArray[:, -1]]) OBnumbers = np.zeros( [len(sInds), 1] ) # for each sInd, will show during which OB observations will take place maxIntTimes = np.zeros([len(sInds), 1]) << u.d intTimes = ( linearInterp( intTimesRange.T, obsTimesRange.T, (tmpCurrentTimeAbs + slewTimes).reshape(len(sInds), 1).value, ) << u.d ) # calculate intTimes for each slew time minObsTimeNorm = (obsTimesRange[0, :] - tmpCurrentTimeAbs.value).reshape( [len(sInds), 1] ) maxObsTimeNorm = (obsTimesRange[1, :] - tmpCurrentTimeAbs.value).reshape( [len(sInds), 1] ) ObsTimeRange = maxObsTimeNorm - minObsTimeNorm # 2. find OBnumber for each sInd's slew time if len(TK.OBendTimes) > 1: for i in range(len(sInds)): S = np.where( TK.OBstartTimes.value - tmpCurrentTimeNorm.value < slewTimes[i].value )[0][-1] F = np.where( TK.OBendTimes.value - tmpCurrentTimeNorm.value < slewTimes[i].value )[0] # case when slews are in the first OB if F.shape[0] == 0: F = -1 else: F = F[-1] # slew occurs within an OB (nth OB has started but hasn't ended) if S != F: OBnumbers[i] = S ( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife, ) = TK.get_ObsDetectionMaxIntTime(Obs, mode, TK.OBstartTimes[S], S) maxIntTimes[i] = min( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife, ) # Maximum intTime allowed # slew occurs between OBs, badbadnotgood else: OBnumbers[i] = -1 maxIntTimes[i] = 0 * u.d OBstartTimeNorm = ( TK.OBstartTimes[np.array(OBnumbers, dtype=int)].value - tmpCurrentTimeNorm.value ) else: ( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife, ) = TK.get_ObsDetectionMaxIntTime(Obs, mode, tmpCurrentTimeNorm) maxIntTimes[:] = min( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife ) # Maximum intTime allowed OBstartTimeNorm = np.zeros(OBnumbers.shape) # finding the minimum possible slew time # (either OBstart or when star JUST leaves keepout) minAllowedSlewTime = np.max([OBstartTimeNorm, minObsTimeNorm], axis=0) # 3. start filtering process good_inds = np.where((OBnumbers >= 0) & (ObsTimeRange > intTimes.value))[0] # star slew within OB -AND- can finish observing # before it goes back into keepout if good_inds.shape[0] > 0: # the good ones sInds = sInds[good_inds] slewTimes = slewTimes[good_inds] intTimes = intTimes[good_inds] OBstartTimeNorm = OBstartTimeNorm[good_inds] minAllowedSlewTime = minAllowedSlewTime[good_inds] # maximum allowed slew time based on integration times maxAllowedSlewTime = maxIntTimes[good_inds].value - intTimes.value maxAllowedSlewTime[maxAllowedSlewTime < 0] = -np.inf maxAllowedSlewTime += OBstartTimeNorm # calculated rel to currentTime norm # checking to see if slewTimes are allowed good_inds = np.where( (slewTimes.reshape([len(sInds), 1]).value > minAllowedSlewTime) & (slewTimes.reshape([len(sInds), 1]).value < maxAllowedSlewTime) )[0] slewTimes = slewTimes[good_inds] else: slewTimes = slewTimes[good_inds] return sInds[good_inds], intTimes[good_inds].flatten(), slewTimes
[docs] def findAllowableOcculterSlews( self, sInds, old_sInd, sd, slewTimes, obsTimeArray, intTimeArray, mode ): """Finds an array of allowable slew times for each star Used by the refineOcculterSlews method when slew times have NOT been selected a priori. This method creates nx50 arrays (where the row corresponds to a specific star and the column corresponds to a future point in time relative to currentTime). These arrays are initially zero but are populated with the corresponding values (slews, intTimes, etc) if slewing to that time point (i.e. beginning an observation) would lead to a successful observation. A "successful observation" is defined by certain conditions relating to keepout and the komap, observing blocks, mission lifetime, and some constraints on the dVmap calculation in SotoStarshade. Each star will likely have a range of slewTimes that would lead to a successful observation -- another method is then called to select the best of these slewTimes. Args: sInds (~numpy.ndarray(int)): Indices of available targets old_sInd (int): Index of the previous target star sd (~astropy.units.Quantity(~numpy.ndarray(float))): Angular separation between stars in rad slewTimes (astropy quantity array): slew times to all stars (must be indexed by sInds) obsTimeArray (~astropy.units.Quantity(~numpy.ndarray(float))): Array of times during which a star is out of keepout, has shape nx50 where n is the number of stars in sInds intTimeArray (~astropy.units.Quantity(~numpy.ndarray(float))): Array of integration times for each time in obsTimeArray, has shape nx50 where n is the number of stars in sInds mode (dict): Selected observing mode for detection Returns: tuple: sInds (numpy.ndarray(int)): Indices of next target star slewTimes (astropy.units.Quantity(numpy.ndarray(float))): slew times to all stars (must be indexed by sInds) intTimes (astropy.units.Quantity(numpy.ndarray(float))): Integration times for detection in units of day dV (astropy.units.Quantity(numpy.ndarray(float))): Delta-V used to transfer to new star line of sight in unis of m/s """ TK = self.TimeKeeping Obs = self.Observatory TL = self.TargetList # 0. lambda function that linearly interpolates Integration # Time between obsTimes linearInterp = lambda y, x, t: np.diff(y) / np.diff(x) * ( # noqa: E731 t - np.array(x[:, 0]).reshape(len(t), 1) ) + np.array(y[:, 0]).reshape(len(t), 1) # allocate settling time + overhead time tmpCurrentTimeAbs = ( TK.currentTimeAbs.copy() + Obs.settlingTime + mode["syst"]["ohTime"] ) tmpCurrentTimeNorm = ( TK.currentTimeNorm.copy() + Obs.settlingTime + mode["syst"]["ohTime"] ) # 1. initializing arrays obsTimes = np.array( [obsTimeArray[:, 0], obsTimeArray[:, -1]] ) # nx2 array with start and end times of obsTimes for each star intTimes_int = ( np.zeros(obsTimeArray.shape) << u.d ) # initializing intTimes of shape nx50 then interpolating intTimes_int = ( np.hstack( [ intTimeArray[:, 0].reshape(len(sInds), 1).value, linearInterp( intTimeArray.value, obsTimes.T, obsTimeArray[:, 1:].value ), ] ) << u.d ) allowedSlewTimes = np.zeros(obsTimeArray.shape) << u.d allowedintTimes = np.zeros(obsTimeArray.shape) << u.d allowedCharTimes = np.zeros(obsTimeArray.shape) << u.d obsTimeArrayNorm = obsTimeArray.value - tmpCurrentTimeAbs.value # obsTimes -> relative to current Time try: minObsTimeNorm = np.array([np.min(v[v > 0]) for v in obsTimeArrayNorm]) except: # noqa: E722 # an error pops up sometimes at the end of the mission, this fixes it # TODO: define the error type that occurs # rewrite to avoid a try/except if possible minObsTimeNorm = obsTimes[1, :].T - tmpCurrentTimeAbs.value maxObsTimeNorm = obsTimes[1, :].T - tmpCurrentTimeAbs.value ObsTimeRange = maxObsTimeNorm - minObsTimeNorm # getting max possible intTime ( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife, ) = TK.get_ObsDetectionMaxIntTime(Obs, mode, tmpCurrentTimeNorm, TK.OBnumber) maxIntTime = min( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife ) # Maximum intTime allowed # 2. giant array of min and max slew times, starts at current time, ends when # stars enter keepout (all same size). Each entry either has a slew time value # if a slew is allowed at that date or 0 if slewing is not allowed # first filled in for the current OB minAllowedSlewTimes = np.array( [minObsTimeNorm.T] * len(intTimes_int.T) ).T # just to make it nx50 so it plays nice with the other arrays maxAllowedSlewTimes = maxIntTime.value - intTimes_int.value maxAllowedSlewTimes[maxAllowedSlewTimes > Obs.occ_dtmax.value] = ( Obs.occ_dtmax.value ) # conditions that must be met to define an allowable slew time cond1 = ( minAllowedSlewTimes >= Obs.occ_dtmin.value ) # minimum dt time in dV map interpolant cond2 = ( maxAllowedSlewTimes <= Obs.occ_dtmax.value ) # maximum dt time in dV map interpolant cond3 = maxAllowedSlewTimes > minAllowedSlewTimes cond4 = intTimes_int.value < ObsTimeRange.reshape(len(sInds), 1) conds = cond1 & cond2 & cond3 & cond4 minAllowedSlewTimes[np.invert(conds)] = ( np.inf ) # these are filtered during the next filter maxAllowedSlewTimes[np.invert(conds)] = -np.inf # one last condition to meet map_i, map_j = np.where( (obsTimeArrayNorm > minAllowedSlewTimes) & (obsTimeArrayNorm < maxAllowedSlewTimes) ) # 2.5 if any stars are slew-able to within this OB block, populate # "allowedSlewTimes", a running tally of possible slews # within the time range a star is observable (out of keepout) if map_i.shape[0] > 0 and map_j.shape[0] > 0: allowedSlewTimes[map_i, map_j] = obsTimeArrayNorm[map_i, map_j] * u.d allowedintTimes[map_i, map_j] = intTimes_int[map_i, map_j] allowedCharTimes[map_i, map_j] = maxIntTime - intTimes_int[map_i, map_j] # 3. search future OBs OB_withObsStars = ( TK.OBstartTimes.value - np.min(obsTimeArrayNorm) - tmpCurrentTimeNorm.value ) # OBs within which any star is observable if any(OB_withObsStars > 0): nOBstart = np.argmin(np.abs(OB_withObsStars)) nOBend = np.argmax(OB_withObsStars) # loop through the next 5 OBs (or until mission is over if there are less # than 5 OBs in the future) for i in np.arange(nOBstart, np.min([nOBend, nOBstart + 5])): # max int Times for the next OB ( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife, ) = TK.get_ObsDetectionMaxIntTime( Obs, mode, TK.OBstartTimes[i + 1], i + 1 ) maxIntTime_nOB = min( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife, ) # Maximum intTime allowed # min and max slew times rel to current Time (norm) nOBstartTimeNorm = np.array( [TK.OBstartTimes[i + 1].value - tmpCurrentTimeNorm.value] * len(sInds) ) # min slew times for stars start either whenever the star first leaves # keepout or when next OB stars, whichever comes last minAllowedSlewTimes_nOB = np.array( [np.max([minObsTimeNorm, nOBstartTimeNorm], axis=0).T] * len(maxAllowedSlewTimes.T) ).T maxAllowedSlewTimes_nOB = ( nOBstartTimeNorm.reshape(len(sInds), 1) + maxIntTime_nOB.value - intTimes_int.value ) maxAllowedSlewTimes_nOB[ maxAllowedSlewTimes_nOB > Obs.occ_dtmax.value ] = Obs.occ_dtmax.value # amount of time left when the stars are still out of keepout ObsTimeRange_nOB = ( maxObsTimeNorm - np.max([minObsTimeNorm, nOBstartTimeNorm], axis=0).T ) # condition to be met for an allowable slew time cond1 = minAllowedSlewTimes_nOB >= Obs.occ_dtmin.value cond2 = maxAllowedSlewTimes_nOB <= Obs.occ_dtmax.value cond3 = maxAllowedSlewTimes_nOB > minAllowedSlewTimes_nOB cond4 = intTimes_int.value < ObsTimeRange_nOB.reshape(len(sInds), 1) cond5 = intTimes_int.value < maxIntTime_nOB.value conds = cond1 & cond2 & cond3 & cond4 & cond5 minAllowedSlewTimes_nOB[np.invert(conds)] = np.inf maxAllowedSlewTimes_nOB[np.invert(conds)] = -np.inf # one last condition map_i, map_j = np.where( (obsTimeArrayNorm > minAllowedSlewTimes_nOB) & (obsTimeArrayNorm < maxAllowedSlewTimes_nOB) ) # 3.33 populate the running tally of allowable slew times if it meets # all conditions if map_i.shape[0] > 0 and map_j.shape[0] > 0: allowedSlewTimes[map_i, map_j] = ( obsTimeArrayNorm[map_i, map_j] * u.d ) allowedintTimes[map_i, map_j] = intTimes_int[map_i, map_j] allowedCharTimes[map_i, map_j] = ( maxIntTime_nOB - intTimes_int[map_i, map_j] ) # 3.67 filter out any stars that are not observable at all filterDuds = np.sum(allowedSlewTimes, axis=1) > 0.0 sInds = sInds[filterDuds] # 4. choose a slew time for each available star # calculate dVs for each possible slew time for each star allowed_dVs = Obs.calculate_dV( TL, old_sInd, sInds, sd[filterDuds], allowedSlewTimes[filterDuds, :], tmpCurrentTimeAbs, ) if len(sInds.tolist()) > 0: # select slew time for each star dV_inds = np.arange(0, len(sInds)) sInds, intTime, slewTime, dV = self.chooseOcculterSlewTimes( sInds, allowedSlewTimes[filterDuds, :], allowed_dVs[dV_inds, :], allowedintTimes[filterDuds, :], allowedCharTimes[filterDuds, :], ) return sInds, intTime, slewTime, dV else: empty = np.asarray([], dtype=int) return empty, empty << u.d, empty << u.d, empty << self.m_per_s
[docs] def chooseOcculterSlewTimes(self, sInds, slewTimes, dV, intTimes, charTimes): """Selects the best slew time for each star This method searches through an array of permissible slew times for each star and chooses the best slew time for the occulter based on maximizing possible characterization time for that particular star (as a default). Args: sInds (~numpy.ndarray(int)): Indices of available targets slewTimes (astropy quantity array): slew times to all stars (must be indexed by sInds) dV (~astropy.units.Quantity(~numpy.ndarray(float))): Delta-V used to transfer to new star line of sight in unis of m/s intTimes (~astropy.units.Quantity(~numpy.ndarray(float))): Integration times for detection in units of day charTimes (~astropy.units.Quantity(~numpy.ndarray(float))): Time left over after integration which could be used for characterization in units of day Returns: tuple: sInds (int): Indeces of next target star slewTimes (astropy.units.Quantity(numpy.ndarray(float))): slew times to all stars (must be indexed by sInds) intTimes (astropy.units.Quantity(numpy.ndarray(float))): Integration times for detection in units of day dV (astropy.units.Quantity(numpy.ndarray(float))): Delta-V used to transfer to new star line of sight in unis of m/s """ # selection criteria for each star slew good_j = np.argmax( charTimes, axis=1 ) # maximum possible characterization time available good_i = np.arange(0, len(sInds)) dV = dV[good_i, good_j] intTime = intTimes[good_i, good_j] slewTime = slewTimes[good_i, good_j] return sInds, intTime, slewTime, dV
[docs] def observation_detection(self, sInd, intTime, mode): """Determines SNR and detection status for a given integration time for detection. Also updates the lastDetected and starRevisit lists. Args: sInd (int): Integer index of the star of interest intTime (~astropy.units.Quantity(~numpy.ndarray(float))): Selected star integration time for detection in units of day. Defaults to None. mode (dict): Selected observing mode for detection Returns: tuple: detected (numpy.ndarray(int)): Detection status for each planet orbiting the observed target star: 1 is detection, 0 missed detection, -1 below IWA, and -2 beyond OWA fZ (astropy.units.Quantity(numpy.ndarray(float))): Surface brightness of local zodiacal light in units of 1/arcsec2 JEZ (astropy.units.Quantity(numpy.ndarray(float))): Intensity of exo-zodiacal light in units of photons/s/m2/arcsec2 systemParams (dict): Dictionary of time-dependant planet properties averaged over the duration of the integration SNR (numpy.darray(float)): Detection signal-to-noise ratio of the observable planets FA (bool): False alarm (false positive) boolean """ PPop = self.PlanetPopulation ZL = self.ZodiacalLight PPro = self.PostProcessing TL = self.TargetList SU = self.SimulatedUniverse Obs = self.Observatory TK = self.TimeKeeping # Save Current Time before attempting time allocation currentTimeNorm = TK.currentTimeNorm.copy() currentTimeAbs = TK.currentTimeAbs.copy() # Allocate Time extraTime = intTime * (mode["timeMultiplier"] - 1.0) # calculates extraTime success = TK.allocate_time( intTime + extraTime + Obs.settlingTime + mode["syst"]["ohTime"], True ) assert success, "Could not allocate observation detection time ({}).".format( intTime + extraTime + Obs.settlingTime + mode["syst"]["ohTime"] ) # calculates partial time to be added for every ntFlux dt = intTime / float(self.ntFlux) # find indices of planets around the target pInds = np.where(SU.plan2star == sInd)[0] # initialize outputs detected = np.array([], dtype=int) # write current system params by default systemParams = SU.dump_system_params(sInd) SNR = np.zeros(len(pInds)) # if any planet, calculate SNR if len(pInds) > 0: # initialize arrays for SNR integration fZs = np.zeros(self.ntFlux) << self.fZ_unit systemParamss = np.empty(self.ntFlux, dtype="object") JEZs = np.zeros((self.ntFlux, len(pInds))) << self.JEZ_unit Ss = np.zeros((self.ntFlux, len(pInds))) Ns = np.zeros((self.ntFlux, len(pInds))) # accounts for the time since the current time timePlus = Obs.settlingTime.copy() + mode["syst"]["ohTime"].copy() # integrate the signal (planet flux) and noise for i in range(self.ntFlux): # allocate first half of dt timePlus += dt / 2.0 # calculate current zodiacal light brightness fZs[i] = ZL.fZ( Obs, TL, np.array([sInd], ndmin=1), (currentTimeAbs + timePlus).reshape(1), mode, )[0] # propagate the system to match up with current time SU.propag_system( sInd, currentTimeNorm + timePlus - self.propagTimes[sInd] ) # Calculate the exozodi intensity JEZs[i] = SU.scale_JEZ(sInd, mode) self.propagTimes[sInd] = currentTimeNorm + timePlus # save planet parameters systemParamss[i] = SU.dump_system_params(sInd) # calculate signal and noise (electron count rates) Ss[i, :], Ns[i, :] = self.calc_signal_noise( sInd, pInds, dt, mode, fZ=fZs[i], JEZ=JEZs[i] ) # allocate second half of dt timePlus += dt / 2.0 # average output parameters fZ = np.mean(fZs) JEZ = np.mean(JEZs, axis=0) systemParams = { key: sum([systemParamss[x][key] for x in range(self.ntFlux)]) / float(self.ntFlux) for key in sorted(systemParamss[0]) } # calculate SNR S = Ss.sum(0) N = Ns.sum(0) SNR[N > 0] = S[N > 0] / N[N > 0] # if no planet, just save zodiacal brightness in the middle of the integration else: totTime = intTime * (mode["timeMultiplier"]) fZ = ZL.fZ( Obs, TL, np.array([sInd], ndmin=1), (currentTimeAbs + totTime / 2.0).reshape(1), mode, )[0] # Use the default star value if no planets JEZ = TL.JEZ0[mode["hex"]][sInd] # find out if a false positive (false alarm) or any false negative # (missed detections) have occurred FA, MD = PPro.det_occur(SNR, mode, TL, sInd, intTime) # populate detection status array # 1:detected, 0:missed, -1:below IWA, -2:beyond OWA if len(pInds) > 0: detected = (~MD).astype(int) WA = ( np.array( [ systemParamss[x]["WA"].to_value(u.arcsec) for x in range(len(systemParamss)) ] ) << u.arcsec ) detected[np.all(WA < mode["IWA"], 0)] = -1 detected[np.all(WA > mode["OWA"], 0)] = -2 # if planets are detected, calculate the minimum apparent separation smin = None det = detected == 1 # If any of the planets around the star have been detected if np.any(det): smin = np.min(SU.s[pInds[det]]) log_det = " - Detected planet inds %s (%s/%s)" % ( pInds[det], len(pInds[det]), len(pInds), ) self.logger.info(log_det) self.vprint(log_det) # populate the lastDetected array by storing det, JEZ, dMag, and WA self.lastDetected[sInd, :] = [ det, JEZ.flatten(), systemParams["dMag"], systemParams["WA"].to("arcsec").value, ] # in case of a FA, generate a random delta mag (between PPro.FAdMag0 and # TL.intCutoff_dMag) and working angle (between IWA and min(OWA, a_max)) if FA: WA = ( np.random.uniform( mode["IWA"].to_value(u.arcsec), np.minimum( mode["OWA"], np.arctan(max(PPop.arange) / TL.dist[sInd]) ).to_value(u.arcsec), ) << u.arcsec ) dMag = np.random.uniform(PPro.FAdMag0(WA), TL.intCutoff_dMag) self.lastDetected[sInd, 0] = np.append(self.lastDetected[sInd, 0], True) self.lastDetected[sInd, 1] = np.append( self.lastDetected[sInd, 1], TL.JEZ0[mode["hex"]][sInd].flatten() ) self.lastDetected[sInd, 2] = np.append(self.lastDetected[sInd, 2], dMag) self.lastDetected[sInd, 3] = np.append( self.lastDetected[sInd, 3], WA.to_value(u.arcsec) ) sminFA = np.tan(WA) * TL.dist[sInd].to("AU") smin = np.minimum(smin, sminFA) if smin is not None else sminFA log_FA = " - False Alarm (WA=%s, dMag=%s)" % ( np.round(WA, 3), np.round(dMag, 1), ) self.logger.info(log_FA) self.vprint(log_FA) # Schedule Target Revisit self.scheduleRevisit(sInd, smin, det, pInds) if self.make_debug_bird_plots: from tools.obs_plot import obs_plot obs_plot(self, systemParams, mode, sInd, pInds, SNR, detected) return detected.astype(int), fZ, JEZ, systemParams, SNR, FA
[docs] def scheduleRevisit(self, sInd, smin, det, pInds): """A Helper Method for scheduling revisits after observation detection Updates self.starRevisit attribute only Args: sInd (int): sInd of the star just detected smin (~astropy.units.Quantity): minimum separation of the planet to star of planet just detected det (~np.ndarray(bool)): Detection status of all planets in target system pInds (~np.ndarray(int)): Indices of planets in the target system Returns: None """ TK = self.TimeKeeping TL = self.TargetList SU = self.SimulatedUniverse # in both cases (detection or false alarm), schedule a revisit # based on minimum separation Ms = TL.MsTrue[sInd] if smin is not None: # smin is None if no planet was detected sp = smin if np.any(det): pInd_smin = pInds[det][np.argmin(SU.s[pInds[det]])] Mp = SU.Mp[pInd_smin] else: Mp = SU.Mp.mean() mu = const.G * (Mp + Ms) T = 2.0 * np.pi * np.sqrt(sp**3.0 / mu) t_rev = TK.currentTimeNorm.copy() + T / 2.0 # otherwise, revisit based on average of population semi-major axis and mass else: sp = SU.s.mean() Mp = SU.Mp.mean() mu = const.G * (Mp + Ms) T = 2.0 * np.pi * np.sqrt(sp**3.0 / mu) t_rev = TK.currentTimeNorm.copy() + 0.75 * T if self.revisit_wait is not None: revisit_wait = self.revisit_wait if np.ndim(revisit_wait) > 0: revisit_wait = revisit_wait[sInd] t_rev = TK.currentTimeNorm.copy() + revisit_wait # finally, populate the revisit list (NOTE: sInd becomes a float) revisit = np.array([sInd, float(t_rev.to_value(u.day))]) if self.starRevisit.size == 0: # If starRevisit has nothing in it self.starRevisit = np.array([revisit]) # initialize sterRevisit else: revInd = np.where(self.starRevisit[:, 0] == sInd)[ 0 ] # indices of the first column of the starRevisit list containing sInd if revInd.size == 0: self.starRevisit = np.vstack((self.starRevisit, revisit)) else: self.starRevisit[revInd, 1] = revisit[1]
[docs] def observation_characterization(self, sInd, mode): """Finds if characterizations are possible and relevant information Args: sInd (int): Integer index of the star of interest mode (dict): Selected observing mode for characterization Returns: tuple: characterized (list(int)): Characterization status for each planet orbiting the observed target star including False Alarm if any, where 1 is full spectrum, -1 partial spectrum, and 0 not characterized fZ (astropy.units.Quantity(numpy.ndarray(float))): Surface brightness of local zodiacal light in units of 1/arcsec2 JEZ (astropy.units.Quantity(numpy.ndarray(float))): Intensity of exo-zodiacal light in units of photons/s/m2/arcsec systemParams (dict): Dictionary of time-dependant planet properties averaged over the duration of the integration SNR (numpy.ndarray(float)): Characterization signal-to-noise ratio of the observable planets. Defaults to None. intTime (astropy.units.Quantity(numpy.ndarray(float))): Selected star characterization time in units of day. Defaults to None. """ OS = self.OpticalSystem ZL = self.ZodiacalLight TL = self.TargetList SU = self.SimulatedUniverse Obs = self.Observatory TK = self.TimeKeeping # selecting appropriate koMap koMap = self.koMaps[mode["syst"]["name"]] # find indices of planets around the target pInds = np.where(SU.plan2star == sInd)[0] # get the detected status, and check if there was a FA det = self.lastDetected[sInd, 0] FA = len(det) == len(pInds) + 1 if FA: pIndsDet = np.append(pInds, -1)[det] else: pIndsDet = pInds[det] # initialize outputs, and check if there's anything (planet or FA) # to characterize characterized = np.zeros(len(det), dtype=int) fZ = 0.0 * self.inv_arcsec2 JEZ = 0.0 * self.JEZ_unit # write current system params by default systemParams = SU.dump_system_params(sInd) SNR = np.zeros(len(det)) intTime = None if len(det) == 0: # nothing to characterize return characterized, fZ, JEZ, systemParams, SNR, intTime # look for last detected planets that have not been fully characterized if not (FA): # only true planets, no FA tochar = self.fullSpectra[pIndsDet] == 0 else: # mix of planets and a FA truePlans = pIndsDet[:-1] tochar = np.append((self.fullSpectra[truePlans] == 0), True) # 1/ find spacecraft orbital START position including overhead time, # and check keepout angle if np.any(tochar): # start times startTime = ( TK.currentTimeAbs.copy() + mode["syst"]["ohTime"] + Obs.settlingTime ) # if we're beyond mission end, bail out if startTime >= TK.missionFinishAbs: return characterized, fZ, systemParams, SNR, intTime startTimeNorm = ( TK.currentTimeNorm.copy() + mode["syst"]["ohTime"] + Obs.settlingTime ) # planets to characterize koTimeInd = np.where(np.round(startTime.value) - self.koTimes.value == 0)[ 0 ][ 0 ] # find indice where koTime is startTime[0] # wherever koMap is 1, the target is observable tochar[tochar] = koMap[sInd][koTimeInd] # 2/ if any planet to characterize, find the characterization times # at the detected JEZ, dMag, and WA if np.any(tochar): fZ = ZL.fZ(Obs, TL, np.array([sInd], ndmin=1), startTime.reshape(1), mode)[ 0 ] JEZ = self.lastDetected[sInd, 1][det][tochar] dMag = self.lastDetected[sInd, 2][det][tochar] WA = self.lastDetected[sInd, 3][det][tochar] * u.arcsec intTimes = np.zeros(len(tochar)) << u.day intTimes[tochar] = OS.calc_intTime(TL, sInd, fZ, JEZ, dMag, WA, mode, TK=TK) intTimes[~np.isfinite(intTimes)] = self.zero_d # add a predetermined margin to the integration times intTimes = intTimes * (1.0 + self.charMargin) # apply time multiplier totTimes = intTimes * (mode["timeMultiplier"]) # Filter totTimes to make nan integration times correspond to the # maximum float value because Time cannot handle nan values totTimes[np.where(np.isnan(totTimes))[0]] = np.finfo(np.float64).max * u.d # end times endTimes = startTime + totTimes endTimesNorm = startTimeNorm + totTimes # planets to characterize tochar = ( (totTimes > 0) & (totTimes <= OS.intCutoff) & (endTimesNorm <= TK.OBendTimes[TK.OBnumber]) ) # 3/ is target still observable at the end of any char time? if np.any(tochar) and Obs.checkKeepoutEnd: koTimeInds = np.zeros(len(endTimes.value[tochar]), dtype=int) # find index in koMap where each endTime is closest to koTimes for t, endTime in enumerate(endTimes.value[tochar]): if endTime > self.koTimes.value[-1]: # case where endTime exceeds largest koTimes element endTimeInBounds = np.where( np.floor(endTime) - self.koTimes.value == 0 )[0] koTimeInds[t] = ( endTimeInBounds[0] if endTimeInBounds.size != 0 else -1 ) else: koTimeInds[t] = np.where( np.round(endTime) - self.koTimes.value == 0 )[0][ 0 ] # find indice where koTime is endTimes[0] tochar[tochar] = [koMap[sInd][koT] if koT >= 0 else 0 for koT in koTimeInds] # 4/ if yes, allocate the overhead time, and perform the characterization # for the maximum char time if np.any(tochar): # Save Current Time before attempting time allocation currentTimeNorm = TK.currentTimeNorm.copy() currentTimeAbs = TK.currentTimeAbs.copy() # Allocate Time intTime = np.max(intTimes[tochar]) extraTime = intTime * (mode["timeMultiplier"] - 1.0) success = TK.allocate_time( intTime + extraTime + mode["syst"]["ohTime"] + Obs.settlingTime, True ) # allocates time if not (success): # Time was not successfully allocated char_intTime = None lenChar = len(pInds) + 1 if FA else len(pInds) characterized = np.zeros(lenChar, dtype=float) char_JEZ = np.zeros(lenChar, dtype=float) << self.JEZ_unit char_SNR = np.zeros(lenChar, dtype=float) char_fZ = 0.0 * self.inv_arcsec2 char_systemParams = SU.dump_system_params(sInd) return ( characterized, char_fZ, char_JEZ, char_systemParams, char_SNR, char_intTime, ) pIndsChar = pIndsDet[tochar] log_char = " - Charact. planet inds %s (%s/%s detected)" % ( pIndsChar, len(pIndsChar), len(pIndsDet), ) self.logger.info(log_char) self.vprint(log_char) # SNR CALCULATION: # first, calculate SNR for observable planets (without false alarm) planinds = pIndsChar[:-1] if pIndsChar[-1] == -1 else pIndsChar SNRplans = np.zeros(len(planinds)) if len(planinds) > 0: # initialize arrays for SNR integration fZs = np.zeros(self.ntFlux) << self.inv_arcsec2 systemParamss = np.empty(self.ntFlux, dtype="object") JEZs = np.zeros((self.ntFlux, len(planinds))) << self.JEZ_unit Ss = np.zeros((self.ntFlux, len(planinds))) Ns = np.zeros((self.ntFlux, len(planinds))) # integrate the signal (planet flux) and noise dt = intTime / float(self.ntFlux) timePlus = ( Obs.settlingTime.copy() + mode["syst"]["ohTime"].copy() ) # accounts for the time since the current time for i in range(self.ntFlux): # allocate first half of dt timePlus += dt / 2.0 # calculate current zodiacal light brightness fZs[i] = ZL.fZ( Obs, TL, np.array([sInd], ndmin=1), (currentTimeAbs + timePlus).reshape(1), mode, )[0] # propagate the system to match up with current time SU.propag_system( sInd, currentTimeNorm + timePlus - self.propagTimes[sInd] ) self.propagTimes[sInd] = currentTimeNorm + timePlus # Calculate the exozodi intensity JEZs[i] = SU.scale_JEZ(sInd, mode, pInds=planinds) # save planet parameters systemParamss[i] = SU.dump_system_params(sInd) # calculate signal and noise (electron count rates) Ss[i, :], Ns[i, :] = self.calc_signal_noise( sInd, planinds, dt, mode, fZ=fZs[i], JEZ=JEZs[i] ) # allocate second half of dt timePlus += dt / 2.0 # average output parameters fZ = np.mean(fZs) JEZ = np.mean(JEZs, axis=0) systemParams = { key: sum([systemParamss[x][key] for x in range(self.ntFlux)]) / float(self.ntFlux) for key in sorted(systemParamss[0]) } # calculate planets SNR S = Ss.sum(0) N = Ns.sum(0) SNRplans[N > 0] = S[N > 0] / N[N > 0] # allocate extra time for timeMultiplier # if only a FA, just save zodiacal brightness in the middle of the # integration else: totTime = intTime * (mode["timeMultiplier"]) fZ = ZL.fZ( Obs, TL, np.array([sInd], ndmin=1), (currentTimeAbs + totTime / 2.0).reshape(1), mode, )[0] # Use the default star value if no planets JEZ = TL.JEZ0[mode["hex"]][sInd] # calculate the false alarm SNR (if any) SNRfa = [] if pIndsChar[-1] == -1: JEZ = self.lastDetected[sInd, 1][-1] dMag = self.lastDetected[sInd, 2][-1] WA = self.lastDetected[sInd, 3][-1] * u.arcsec C_p, C_b, C_sp = OS.Cp_Cb_Csp(TL, sInd, fZ, JEZ, dMag, WA, mode, TK=TK) S = (C_p * intTime).decompose().value N = np.sqrt((C_b * intTime + (C_sp * intTime) ** 2.0).decompose().value) SNRfa = S / N if N > 0.0 else 0.0 # save all SNRs (planets and FA) to one array SNRinds = np.where(det)[0][tochar] SNR[SNRinds] = np.append(SNRplans, SNRfa) # now, store characterization status: 1 for full spectrum, # -1 for partial spectrum, 0 for not characterized char = SNR >= mode["SNR"] # initialize with full spectra characterized = char.astype(int) WAchar = self.lastDetected[sInd, 3][char] * u.arcsec # find the current WAs of characterized planets WAs = systemParams["WA"] if FA: WAs = np.append(WAs, self.lastDetected[sInd, 3][-1] * u.arcsec) # check for partial spectra (for coronagraphs only) if not (mode["syst"]["occulter"]): IWA_max = mode["IWA"] * (1.0 + mode["BW"] / 2.0) OWA_min = mode["OWA"] * (1.0 - mode["BW"] / 2.0) char[char] = (WAchar < IWA_max) | (WAchar > OWA_min) characterized[char] = -1 # encode results in spectra lists (only for planets, not FA) charplans = characterized[:-1] if FA else characterized self.fullSpectra[pInds[charplans == 1]] += 1 self.partialSpectra[pInds[charplans == -1]] += 1 if self.make_debug_bird_plots: from tools.obs_plot import obs_plot obs_plot(self, systemParams, mode, sInd, pInds, SNR, characterized) return characterized.astype(int), fZ, JEZ, systemParams, SNR, intTime
[docs] def calc_signal_noise( self, sInd, pInds, t_int, mode, fZ=None, JEZ=None, dMag=None, WA=None ): """Calculates the signal and noise fluxes for a given time interval. Called by observation_detection and observation_characterization methods in the SurveySimulation module. Args: sInd (int): Integer index of the star of interest t_int (~astropy.units.Quantity(~numpy.ndarray(float))): Integration time interval in units of day pInds (int): Integer indices of the planets of interest mode (dict): Selected observing mode (from OpticalSystem) fZ (~astropy.units.Quantity(~numpy.ndarray(float))): Surface brightness of local zodiacal light in units of 1/arcsec2 JEZ (~astropy.units.Quantity(~numpy.ndarray(float))): Intensity of exo-zodiacal light in units of ph/s/m2/arcsec2 dMag (~numpy.ndarray(float)): Differences in magnitude between planets and their host star WA (~astropy.units.Quantity(~numpy.ndarray(float))): Working angles of the planets of interest in units of arcsec Returns: tuple: Signal (float): Counts of signal Noise (float): Counts of background noise variance """ OS = self.OpticalSystem ZL = self.ZodiacalLight TL = self.TargetList SU = self.SimulatedUniverse Obs = self.Observatory TK = self.TimeKeeping # calculate optional parameters if not provided fZ = ( fZ if (fZ is not None) else ZL.fZ( Obs, TL, np.array([sInd], ndmin=1), TK.currentTimeAbs.copy().reshape(1), mode, )[0] ) JEZ = ( u.Quantity(JEZ, ndmin=1) if (JEZ is not None) else SU.scale_JEZ(sInd, mode, pInds=pInds) ) # if lucky_planets, use lucky planet params for dMag and WA if SU.lucky_planets and mode in list( filter(lambda mode: "spec" in mode["inst"]["name"], OS.observingModes) ): phi = (1 / np.pi) * np.ones(len(SU.d)) dMag = deltaMag(SU.p, SU.Rp, SU.d, phi)[pInds] # delta magnitude # working angle WA = ( np.arctan( SU.a.to_value(u.AU) * self.AU2pc / TL.dist[SU.plan2star].to_value(u.pc) ) * self.rad2arcsec << u.arcsec )[pInds] else: dMag = dMag if (dMag is not None) else SU.dMag[pInds] WA = WA if (WA is not None) else SU.WA[pInds] # initialize Signal and Noise arrays Signal = np.zeros(len(pInds)) Noise = np.zeros(len(pInds)) # find observable planets wrt IWA-OWA range obs = (WA > mode["IWA"]) & (WA < mode["OWA"]) if np.any(obs): # find electron counts C_p, C_b, C_sp = OS.Cp_Cb_Csp( TL, sInd, fZ, JEZ[obs], dMag[obs], WA[obs], mode, TK=TK ) # calculate signal and noise levels (based on Nemati14 formula) _t_int_s = t_int.to_value(u.d) * self.day2sec Signal[obs] = C_p.to_value(self.inv_s) * _t_int_s Noise[obs] = np.sqrt( ( C_b.to_value(self.inv_s) * _t_int_s + (C_sp.to_value(self.inv_s) * _t_int_s) ** 2 ) ) return Signal, Noise
[docs] def update_occulter_mass(self, DRM, sInd, t_int, skMode): """Updates the occulter wet mass in the Observatory module, and stores all the occulter related values in the DRM array. Args: DRM (dict): Design Reference Mission, contains the results of one complete observation (detection and characterization) sInd (int): Integer index of the star of interest t_int (~astropy.units.Quantity(~numpy.ndarray(float))): Selected star integration time (for detection or characterization) in units of day skMode (str): Station keeping observing mode type ('det' or 'char') Returns: dict: Design Reference Mission dictionary, contains the results of one complete observation """ TL = self.TargetList Obs = self.Observatory TK = self.TimeKeeping assert skMode in ("det", "char"), "Observing mode type must be 'det' or 'char'." # decrement mass for station-keeping dF_lateral, dF_axial, intMdot, mass_used, deltaV = Obs.mass_dec_sk( TL, sInd, TK.currentTimeAbs.copy(), t_int ) DRM[skMode + "_dV"] = deltaV.to("m/s") DRM[skMode + "_mass_used"] = mass_used.to("kg") DRM[skMode + "_dF_lateral"] = dF_lateral.to("N") DRM[skMode + "_dF_axial"] = dF_axial.to("N") # update spacecraft mass Obs.scMass = Obs.scMass - mass_used DRM["scMass"] = Obs.scMass.to("kg") if Obs.twotanks: Obs.skMass = Obs.skMass - mass_used DRM["skMass"] = Obs.skMass.to("kg") return DRM
[docs] def reset_sim(self, genNewPlanets=True, rewindPlanets=True, seed=None): """Performs a full reset of the current simulation. This will reinitialize the TimeKeeping, Observatory, and SurveySimulation objects with their own outspecs. Args: genNewPlanets (bool): Generate all new planets based on the original input specification. If False, then the original planets will remain. Setting to True forces ``rewindPlanets`` to be True as well. Defaults True. rewindPlanets (bool): Reset the current set of planets to their original orbital phases. If both genNewPlanets and rewindPlanet are False, the original planets will be retained and will not be rewound to their initial starting locations (i.e., all systems will remain at the times they were at the end of the last run, thereby effectively randomizing planet phases. Defaults True. seed (int, optional): Random seed to use for all random number generation. If None (default) a new random seed will be generated when re-initializing the SurveySimulation. """ SU = self.SimulatedUniverse TK = self.TimeKeeping TL = self.TargetList # re-initialize SurveySimulation arrays specs = self._outspec specs["modules"] = self.modules if seed is None: # pop the seed so a new one is generated if "seed" in specs: specs.pop("seed") else: # if seed is provided, replace seed with provided seed specs["seed"] = seed # reset mission time, re-init surveysim and observatory TK.__init__(**TK._outspec) self.__init__(**specs) self.Observatory.__init__(**self.Observatory._outspec) # generate new planets if requested (default) if genNewPlanets: TL.stellar_mass() TL.I = TL.gen_inclinations(TL.PlanetPopulation.Irange) # noqa: E741 SU.gen_physical_properties(**SU._outspec) rewindPlanets = True # re-initialize systems if requested (default) if rewindPlanets: SU.init_systems() # reset helper arrays self.initializeStorageArrays() self.vprint("Simulation reset.")
[docs] def genOutSpec( self, tofile: Optional[str] = None, starting_outspec: Optional[Dict[str, Any]] = None, modnames: bool = False, ) -> Dict[str, Any]: """Join all _outspec dicts from all modules into one output dict and optionally write out to JSON file on disk. Args: tofile (str, optional): Name of the file containing all output specifications (outspecs). Defaults to None. starting_outspec (dict, optional): Initial outspec (from MissionSim). Defaults to None. modnames (bool): If True, populate outspec dictionary with the module it originated from, instead of the actual value of the keyword. Defaults False. Returns: dict: Dictionary containing the full :ref:`sec:inputspec`, including all filled-in default values. Combination of all individual module _outspec attributes. """ # start with our own outspec if modnames: out = copy.copy(self._outspec) for k in out: out[k] = self.__class__.__name__ else: out = copy.deepcopy(self._outspec) # Add any provided other outspec if starting_outspec is not None: out.update(starting_outspec) # add in all modules _outspec's for module in self.modules.values(): if modnames: tmp = copy.copy(module._outspec) for k in tmp: tmp[k] = module.__class__.__name__ else: tmp = module._outspec out.update(tmp) # add in the specific module names used out["modules"] = {} for mod_name, module in self.modules.items(): # find the module file mod_name_full = module.__module__ if mod_name_full.startswith("EXOSIMS"): # take just its short name if it is in EXOSIMS mod_name_short = mod_name_full.split(".")[-1] else: # take its full path if it is not in EXOSIMS - changing .pyc -> .py mod_name_short = re.sub( r"\.pyc$", ".py", inspect.getfile(module.__class__) ) out["modules"][mod_name] = mod_name_short # add catalog name module = self.TargetList.StarCatalog mod_name_full = module.__module__ if mod_name_full.startswith("EXOSIMS"): # take just its short name if it is in EXOSIMS mod_name_short = mod_name_full.split(".")[-1] else: # take its full path if it is not in EXOSIMS - changing .pyc -> .py mod_name_short = re.sub(r"\.pyc$", ".py", inspect.getfile(module.__class__)) out["modules"]["StarCatalog"] = mod_name_short # if we don't know about the SurveyEnsemble, just write a blank to the output if "SurveyEnsemble" not in out["modules"]: out["modules"]["SurveyEnsemble"] = " " # get version and Git information out["Version"] = get_version() # dump to file if tofile is not None: with open(tofile, "w") as outfile: json.dump( out, outfile, sort_keys=True, indent=4, ensure_ascii=False, separators=(",", ": "), default=array_encoder, ) return out
[docs] def generateHashfName(self, specs): """Generate cached file Hashname Requires a .XXX appended to end of hashname for each individual use case Args: specs (dict): :ref:`sec:inputspec` Returns: str: Unique indentifier string for cahce products from this set of modules and inputs """ # Allows cachefname to be predefined if "cachefname" in specs: return specs["cachefname"] cachefname = "" # declares cachefname mods = ["Completeness", "TargetList", "OpticalSystem"] # modules to look at tmp = ( self.Completeness.PlanetPopulation.__class__.__name__ + self.Completeness.PlanetPhysicalModel.__class__.__name__ + self.PlanetPopulation.__class__.__name__ + self.SimulatedUniverse.__class__.__name__ + self.PlanetPhysicalModel.__class__.__name__ ) if "selectionMetric" in specs: tmp += specs["selectionMetric"] if "Izod" in specs: tmp += specs["Izod"] if "maxiter" in specs: tmp += str(specs["maxiter"]) if "ftol" in specs: tmp += str(specs["ftol"]) if "missionLife" in specs: tmp += str(specs["missionLife"]) if "missionPortion" in specs: tmp += str(specs["missionPortion"]) if "smaknee" in specs: tmp += str(specs["smaknee"]) if "koAngleMax" in specs: tmp += str(specs["koAngleMax"]) tmp += str(np.sum(self.Completeness.PlanetPopulation.arange.value)) tmp += str(np.sum(self.Completeness.PlanetPopulation.Rprange.value)) tmp += str(np.sum(self.Completeness.PlanetPopulation.erange)) tmp += str( self.Completeness.PlanetPopulation.PlanetPhysicalModel.whichPlanetPhaseFunction # noqa: E501 ) tmp += str(np.sum(self.PlanetPopulation.arange.value)) tmp += str(np.sum(self.PlanetPopulation.Rprange.value)) tmp += str(np.sum(self.PlanetPopulation.erange)) tmp += str(self.PlanetPopulation.PlanetPhysicalModel.whichPlanetPhaseFunction) for mod in mods: cachefname += self.modules[mod].__module__.split(".")[ -1 ] # add module name to end of cachefname cachefname += hashlib.md5( (str(self.TargetList.Name) + tmp).encode("utf-8") ).hexdigest() # turn cachefname into hashlib cachefname = os.path.join( self.cachedir, cachefname + os.extsep ) # join into filepath and fname # Needs file terminator (.starkt0, .t0, etc) appended done by each individual # use case. return cachefname
[docs] def revisitFilter(self, sInds, tmpCurrentTimeNorm): """Helper method for Overloading Revisit Filtering Args: sInds (~numpy.ndarray(int)): Indices of stars still in observation list tmpCurrentTimeNorm (astropy.units.Quantity): Normalized simulation time Returns: ~numpy.ndarray(int): indices of stars still in observation list """ tovisit = np.zeros(self.TargetList.nStars, dtype=bool) if len(sInds) > 0: # Check that no star has exceeded the number of revisits and the indicies # of all considered stars have minimum number of observations # This should prevent revisits so long as all stars have not # been observed tovisit[sInds] = (self.starVisits[sInds] == min(self.starVisits[sInds])) & ( self.starVisits[sInds] < self.nVisitsMax ) if self.starRevisit.size != 0: ind_rev = self.revisit_inds(sInds, tmpCurrentTimeNorm) tovisit[ind_rev] = self.starVisits[ind_rev] < self.nVisitsMax sInds = np.where(tovisit)[0] return sInds
[docs] def is_earthlike(self, plan_inds, sInd): """Is the planet earthlike? Returns boolean array that's True for Earth-like planets. Args: plan_inds(~numpy.ndarray(int)): Planet indices sInd (int): Star index Returns: ~numpy.ndarray(bool): Array of same dimension as plan_inds input that's True for Earthlike planets and false otherwise. """ TL = self.TargetList SU = self.SimulatedUniverse PPop = self.PlanetPopulation # extract planet and star properties Rp_plan = SU.Rp[plan_inds].value L_star = TL.L[sInd] if PPop.scaleOrbits: a_plan = (SU.a[plan_inds] / np.sqrt(L_star)).value else: a_plan = (SU.a[plan_inds]).value # Definition: planet radius (in earth radii) and solar-equivalent luminosity # must be between the given bounds. Rp_plan_lo = 0.80 / np.sqrt(a_plan) # We use the numpy versions so that plan_ind can be a numpy vector. return np.logical_and( np.logical_and(Rp_plan >= Rp_plan_lo, Rp_plan <= 1.4), np.logical_and(a_plan >= 0.95, a_plan <= 1.67), )
[docs] def find_known_plans(self): """ Find and return list of known RV stars and list of stars with earthlike planets based on info from David Plavchan dated 12/24/2018 """ TL = self.TargetList SU = self.SimulatedUniverse PPop = self.PlanetPopulation L_star = TL.L[SU.plan2star] c = 28.4 * u.m / u.s Mj = 317.8 * u.earthMass Mpj = SU.Mp / Mj # planet masses in jupiter mass units Ms = TL.MsTrue[SU.plan2star] Teff = TL.Teff[SU.plan2star] mu = const.G * (SU.Mp + Ms) T = (2.0 * np.pi * np.sqrt(SU.a**3 / mu)).to(u.yr) e = SU.e # pinds in correct temp range t_filt = np.where((Teff.value > 3000) & (Teff.value < 6800))[0] K = ( (c / np.sqrt(1 - e[t_filt])) * Mpj[t_filt] * np.sin(SU.I[t_filt]) * Ms[t_filt] ** (-2 / 3) * T[t_filt] ** (-1 / 3) ) K_filter = (T[t_filt].to(u.d) / 10**4).value # create period-filter # if period-filter value is lower than .03, set to .03 K_filter[np.where(K_filter < 0.03)[0]] = 0.03 k_filt = t_filt[np.where(K.value > K_filter)[0]] # pinds in the correct K range if PPop.scaleOrbits: a_plan = (SU.a / np.sqrt(L_star)).value else: a_plan = SU.a.value Rp_plan_lo = 0.80 / np.sqrt(a_plan) # pinds in habitable zone a_filt = k_filt[np.where((a_plan[k_filt] > 0.95) & (a_plan[k_filt] < 1.67))[0]] # rocky planets r_filt = a_filt[ np.where( (SU.Rp.value[a_filt] >= Rp_plan_lo[a_filt]) & (SU.Rp.value[a_filt] < 1.4) )[0] ] self.known_earths = np.union1d(self.known_earths, r_filt).astype(int) # these are known_rv stars with earths around them known_stars = np.unique(SU.plan2star[k_filt]) # these are known_rv stars known_rocky = np.unique(SU.plan2star[r_filt]) # if include_known_RV, then filter out all other sInds if self.include_known_RV is not None: HIP_sInds = np.where(np.isin(TL.Name, self.include_known_RV))[0] known_stars = np.intersect1d(HIP_sInds, known_stars) known_rocky = np.intersect1d(HIP_sInds, known_rocky) return known_stars.astype(int), known_rocky.astype(int)
[docs] def find_char_SNR(self, sInd, pIndsChar, startTime, intTime, mode): """Finds the SNR achieved by an observing mode after intTime days The observation window (which includes settling and overhead times) is a superset of the integration window (in which photons are collected). The observation window begins at startTime. The integration window begins at startTime + Obs.settlingTime + mode["ohTime"], and the integration itself has a duration of intTime. Args: sInd (int): Integer index of the star of interest pIndsChar (int numpy.ndarray): Observable planets to characterize. Place (-1) at end to put False Alarm parameters at end of returned tuples. startTime (astropy.units.Quantity): Beginning of observation window in units of day. intTime (astropy.units.Quantity): Selected star characterization integration time in units of day. mode (dict): Observing mode for the characterization Returns: tuple: SNR (float numpy.ndarray): Characterization signal-to-noise ratio of the observable planets. Defaults to None. [TBD] fZ (astropy.units.Quantity): Surface brightness of local zodiacal light in units of 1/arcsec2 systemParams (dict): Dictionary of time-dependent planet properties averaged over the duration of the integration. """ OS = self.OpticalSystem ZL = self.ZodiacalLight TL = self.TargetList SU = self.SimulatedUniverse Obs = self.Observatory TK = self.TimeKeeping # time at start of integration window currentTimeNorm = startTime currentTimeAbs = TK.missionStart + startTime # first, calculate SNR for observable planets (without false alarm) planinds = pIndsChar[:-1] if pIndsChar[-1] == -1 else pIndsChar SNRplans = np.zeros(len(planinds)) if len(planinds) > 0: # initialize arrays for SNR integration fZs = np.zeros(self.ntFlux) << self.fZ_unit systemParamss = np.empty(self.ntFlux, dtype="object") Ss = np.zeros((self.ntFlux, len(planinds))) Ns = np.zeros((self.ntFlux, len(planinds))) # integrate the signal (planet flux) and noise dt = intTime / float(self.ntFlux) timePlus = ( Obs.settlingTime.copy() + mode["syst"]["ohTime"].copy() ) # accounts for the time since the current time for i in range(self.ntFlux): # calculate signal and noise (electron count rates) if SU.lucky_planets: fZs[i] = ZL.fZ( Obs, TL, np.array([sInd], ndmin=1), currentTimeAbs.reshape(1), mode, )[0] Ss[i, :], Ns[i, :] = self.calc_signal_noise( sInd, planinds, dt, mode, fZ=fZs[i] ) # allocate first half of dt timePlus += dt / 2.0 # calculate current zodiacal light brightness fZs[i] = ZL.fZ( Obs, TL, np.array([sInd], ndmin=1), (currentTimeAbs + timePlus).reshape(1), mode, )[0] # propagate the system to match up with current time SU.propag_system( sInd, currentTimeNorm + timePlus - self.propagTimes[sInd] ) self.propagTimes[sInd] = currentTimeNorm + timePlus # time-varying planet params (WA, dMag, phi, fEZ, d) systemParamss[i] = SU.dump_system_params(sInd) # calculate signal and noise (electron count rates) if not SU.lucky_planets: Ss[i, :], Ns[i, :] = self.calc_signal_noise( sInd, planinds, dt, mode, fZ=fZs[i] ) # allocate second half of dt timePlus += dt / 2.0 # average output parameters fZ = np.mean(fZs) systemParams = { key: sum([systemParamss[x][key] for x in range(self.ntFlux)]) / float(self.ntFlux) for key in sorted(systemParamss[0]) } # calculate planets SNR S = Ss.sum(0) N = Ns.sum(0) SNRplans[N > 0] = S[N > 0] / N[N > 0] # allocate extra time for timeMultiplier # if only a FA, just save zodiacal brightness # in the middle of the integration else: totTime = intTime * (mode["timeMultiplier"]) fZ = ZL.fZ( Obs, TL, np.array([sInd], ndmin=1), (currentTimeAbs.copy() + totTime / 2.0).reshape(1), mode, )[0] # calculate the false alarm SNR (if any) SNRfa = [] if pIndsChar[-1] == -1: # Note: these attributes may not exist for all schedulers fEZ = self.lastDetected[sInd, 1][-1] << self.fZ_unit dMag = self.lastDetected[sInd, 2][-1] WA = self.lastDetected[sInd, 3][-1] * u.arcsec C_p, C_b, C_sp = OS.Cp_Cb_Csp(TL, sInd, fZ, fEZ, dMag, WA, mode) S = (C_p * intTime).decompose().value N = np.sqrt((C_b * intTime + (C_sp * intTime) ** 2.0).decompose().value) SNRfa = S / N if N > 0.0 else 0.0 # SNR vector is of length (#planets), plus 1 if FA # This routine leaves SNR = 0 if unknown or not found pInds = np.where(SU.plan2star == sInd)[0] FA_present = pIndsChar[-1] == -1 # there will be one SNR for each entry of pInds_FA pInds_FA = np.append(pInds, [-1] if FA_present else np.zeros(0, dtype=int)) # boolean index vector into SNR # True iff we have computed a SNR for that planet # False iff we didn't find an SNR (and will leave 0 there) # if FA_present, SNR_plug_in[-1] = True SNR_plug_in = np.isin(pInds_FA, pIndsChar) # save all SNRs (planets and FA) to one array SNR = np.zeros(len(pInds_FA)) # plug in the SNR's we computed (pIndsChar and optional FA) SNR[SNR_plug_in] = np.append(SNRplans, SNRfa) return SNR, fZ, systemParams
[docs] def revisit_inds(self, sInds, tmpCurrentTimeNorm): """Return subset of star indices that are scheduled for revisit. Args: sInds (~numpy.ndarray(int)): Indices of stars to consider tmpCurrentTimeNorm (astropy.units.Quantity): Normalized simulation time Returns: ~numpy.ndarray(int): Indices of stars whose revisit is scheduled for within `self.dt_max` of the current time """ dt_rev = np.abs(self.starRevisit[:, 1] * u.day - tmpCurrentTimeNorm) ind_rev = [ int(x) for x in self.starRevisit[dt_rev < self.dt_max, 0] if x in sInds ] return ind_rev
[docs] def keepout_filter(self, sInds, startTimes, endTimes, koMap): """Filters stars not observable due to keepout Args: sInds (~numpy.ndarray(int)): indices of stars still in observation list startTimes (~numpy.ndarray(float)): start times of observations endTimes (~numpy.ndarray(float)): end times of observations koMap (~numpy.ndarray(bool)): map where True is for a target unobstructed and observable, False is for a target obstructed and unobservable due to keepout zone Returns: ~numpy.ndarray(int): sInds - filtered indices of stars still in observation list """ # find the indices of keepout times that pertain to the start and end times of # observation startInds = np.searchsorted(self.koTimes.value, startTimes) endInds = np.searchsorted(self.koTimes.value, endTimes) # boolean array of available targets (unobstructed between observation start and # end time) we include a -1 in the start and +1 in the end to include keepout # days enclosing start and end times availableTargets = np.array( [ np.all( koMap[ sInd, max(startInds[sInd] - 1, 0) : min( endInds[sInd] + 1, len(self.koTimes.value) + 1 ), ] ) for sInd in sInds ], dtype="bool", ) return sInds[availableTargets]
[docs] def array_encoder(obj): r"""Encodes numpy arrays, astropy Times, and astropy Quantities, into JSON. Called from json.dump for types that it does not already know how to represent, like astropy Quantity's, numpy arrays, etc. The json.dump() method encodes types like ints, strings, and lists itself, so this code does not see these types. Likewise, this routine can and does return such objects, which is OK as long as they unpack recursively into types for which encoding is known Args: obj (Any): Object to encode. Returns: Any: Encoded object """ from astropy.coordinates import SkyCoord from astropy.time import Time if isinstance(obj, Time): # astropy Time -> time string return obj.fits # isot also makes sense here if isinstance(obj, u.quantity.Quantity): # note: it is possible to have a numpy ndarray wrapped in a Quantity. # NB: alternatively, can return (obj.value, obj.unit.name) return obj.value if isinstance(obj, SkyCoord): return dict( lon=obj.heliocentrictrueecliptic.lon.value, lat=obj.heliocentrictrueecliptic.lat.value, distance=obj.heliocentrictrueecliptic.distance.value, ) if isinstance(obj, (np.ndarray, np.number)): # ndarray -> list of numbers return obj.tolist() if isinstance(obj, complex): # complex -> (real, imag) pair return [obj.real, obj.imag] if callable(obj): # this case occurs for interpolants like PSF and QE # We cannot simply "write" the function to JSON, so we make up a string # to keep from throwing an error. # The fix is simple: when generating the interpolant, add a _outspec attribute # to the function (or the lambda), containing (e.g.) the fits filename, or the # explicit number -- whatever string was used. Then, here, check for that # attribute and write it out instead of this dummy string. (Attributes can # be transparently attached to python functions, even lambda's.) return "interpolant_function" if isinstance(obj, set): return list(obj) if isinstance(obj, bytes): return obj.decode() # an EXOSIMS object if hasattr(obj, "_modtype"): return obj.__dict__ # an object for which no encoding is defined yet # as noted above, ordinary types (lists, ints, floats) do not take this path raise ValueError("Could not JSON-encode an object of type %s" % type(obj))