Source code for EXOSIMS.SurveySimulation.occulterJScheduler

from EXOSIMS.SurveySimulation.linearJScheduler import linearJScheduler
import numpy as np
from EXOSIMS.util._numpy_compat import copy_if_needed


[docs] class occulterJScheduler(linearJScheduler): """occulterJScheduler This class inherits linearJScheduler and works best when paired with the SotoStarshade Observatory class. Args: nSteps (integer 1x1): Number of steps to take when calculating the cost function. useAngles (bool): Use interpolated dV angles. **specs: user specified values """ def __init__(self, nSteps=1, useAngles=False, **specs): linearJScheduler.__init__(self, **specs) if nSteps < 1: raise TypeError("nSteps must be 1 or greater") nSteps = int(nSteps) self.nSteps = nSteps self.useAngles = useAngles
[docs] def choose_next_target(self, old_sInd, sInds, slewTimes, intTimes): """Helper method for method next_target to simplify alternative implementations. Given a subset of targets (pre-filtered by method next_target or some other means), select the best next one. The prototype uses completeness as the sole heuristic. Args: old_sInd (integer): Index of the previous target star sInds (integer array): Indices of available targets slewTimes (astropy quantity array): slew times to all stars (must be indexed by sInds) intTimes (astropy Quantity array): Integration times for detection in units of day Returns: tuple: sInd (integer): Index of next target star waitTime (astropy Quantity): some strategic amount of time to wait in case an occulter slew is desired (default is None) """ OS = self.OpticalSystem Comp = self.Completeness TL = self.TargetList Obs = self.Observatory TK = self.TimeKeeping # cast sInds to array sInds = np.array(sInds, ndmin=1, copy=copy_if_needed) # calculate dt since previous observation dt = TK.currentTimeAbs.copy() + slewTimes[sInds] # get dynamic completeness values comps = Comp.completeness_update(TL, sInds, self.starVisits[sInds], dt) # if first target, or if only 1 available target, # choose highest available completeness nStars = len(sInds) if (old_sInd is None) or (nStars == 1): sInd = np.random.choice(sInds[comps == max(comps)]) return sInd, slewTimes[sInd] else: # define adjacency matrix A = np.zeros(nStars) # only consider slew distance when there's an occulter if OS.haveOcculter: angdists = [ Obs.star_angularSep(TL, old_sInd, s, t) for s, t in zip(sInds, dt) ] try: Obs.__getattribute__("dV_interp") except AttributeError: self.useAngles = True if self.useAngles: A[np.ones((nStars), dtype=bool)] = angdists A = self.coeffs[0] * (A) / np.pi else: dVs = np.array( [ Obs.dV_interp(slewTimes[sInds[s]], angdists[s].to("deg"))[0] for s in range(len(sInds)) ] ) A[np.ones((nStars), dtype=bool)] = dVs A = self.coeffs[0] * (A) / (0.025 * Obs.dVtot.value) # add factor due to completeness A = A + self.coeffs[1] * (1 - comps) # add factor due to unvisited ramp f_uv = np.zeros(nStars) unvisited = self.starVisits[sInds] == 0 f_uv[unvisited] = float(TK.currentTimeNorm.copy() / TK.missionLife) ** 2 A = A - self.coeffs[2] * f_uv # add factor due to revisited ramp f2_uv = np.where(self.starVisits[sInds] > 0, 1, 0) * ( 1 - (np.isin(sInds, self.starRevisit[:, 0], invert=True)) ) A = A + self.coeffs[3] * f2_uv if self.nSteps > 1: A_ = np.zeros((nStars, nStars)) # only consider slew distance when there's an occulter if OS.haveOcculter: angdists_ = np.array( [ Obs.star_angularSep(TL, s, sInds, t) for s, t in zip(sInds, dt) ] ) dVs_ = np.array( [ Obs.dV_interp(slewTimes[sInds[s]], angdists_[s, :]) for s in range(len(sInds)) ] ) A_ = ( self.coeffs[0] * dVs_.reshape(nStars, nStars) / (0.025 * Obs.dVtot.value) ) # add factor due to completeness A_ = A_ + self.coeffs[1] * (1 - comps) # add factor due to unvisited ramp f_uv = np.zeros(nStars) unvisited = self.starVisits[sInds] == 0 f_uv[unvisited] = float(TK.currentTimeNorm.copy() / TK.missionLife) ** 2 A_ = A_ - self.coeffs[2] * f_uv # add factor due to revisited ramp f2_uv = np.where(self.starVisits[sInds] > 0, 1, 0) * ( 1 - (np.isin(sInds, self.starRevisit[:, 0], invert=True)) ) A_ = A_ + self.coeffs[3] * f2_uv step1 = np.tile(A, (nStars, 1)).flatten("F") stepN = A_.flatten() tmp = np.argmin(step1 + stepN * (self.nSteps - 1)) sInd = sInds[int(np.floor(tmp / float(nStars)))] else: # take just one step tmp = np.argmin(A) sInd = sInds[int(tmp)] return ( sInd, slewTimes[sInd], ) # if coronagraph or first sInd, waitTime will be 0 days