Source code for EXOSIMS.SurveySimulation.SLSQPScheduler

from EXOSIMS.Prototypes.SurveySimulation import SurveySimulation
import astropy.units as u
import numpy as np
from ortools.linear_solver import pywraplp
from scipy.optimize import minimize, minimize_scalar
import os
import pickle
from astropy.time import Time


[docs] class SLSQPScheduler(SurveySimulation): """SLSQPScheduler This class implements a continuous optimization of integration times using the scipy minimize function with method SLSQP. ortools with the CBC linear solver is used to find an initial solution consistent with the constraints. For details see Keithly et al. 2019. Alternatively: Savransky et al. 2017 (SPIE). Args: **specs: user specified values Notes: Due to the time costs of the current comp_per_inttime calculation in GarrettCompleteness this should be used with BrownCompleteness. Requires ortools """ def __init__( self, cacheOptTimes=False, staticOptTimes=False, selectionMetric="maxC", Izod="current", maxiter=60, ftol=1e-3, **specs, ): # fZminObs=False, # initialize the prototype survey SurveySimulation.__init__(self, **specs) # Calculate fZmax self.valfZmax, self.absTimefZmax = self.ZodiacalLight.calcfZmax( np.arange(self.TargetList.nStars), self.Observatory, self.TargetList, self.TimeKeeping, list( filter( lambda mode: mode["detectionMode"], self.OpticalSystem.observingModes, ) )[0], self.cachefname, ) assert isinstance(staticOptTimes, bool), "staticOptTimes must be boolean." self.staticOptTimes = staticOptTimes self._outspec["staticOptTimes"] = self.staticOptTimes assert isinstance(cacheOptTimes, bool), "cacheOptTimes must be boolean." self._outspec["cacheOptTimes"] = cacheOptTimes # Informs what selection metric to use assert selectionMetric in [ "maxC", "Izod-Izodmin", "Izod-Izodmax", "(Izod-Izodmin)/(Izodmax-Izodmin)", "(Izod-Izodmin)/(Izodmax-Izodmin)/CIzod", "TauIzod/CIzod", "random", "priorityObs", ], "selectionMetric not valid input" self.selectionMetric = selectionMetric self._outspec["selectionMetric"] = self.selectionMetric # Informs what Izod to optimize integration times for # [fZmin, fZmin+45d, fZ0, fZmax, current] assert Izod in [ "fZmin", "fZ0", "fZmax", "current", ], "Izod not valid input" self.Izod = Izod self._outspec["Izod"] = self.Izod # maximum number of iterations to optimize integration times for assert isinstance(maxiter, int), "maxiter is not an int" assert maxiter >= 1, "maxiter must be positive real" self.maxiter = maxiter self._outspec["maxiter"] = self.maxiter # tolerance to place on optimization assert isinstance(ftol, float), "ftol must be boolean" assert ftol > 0, "ftol must be positive real" self.ftol = ftol self._outspec["ftol"] = self.ftol # some global defs self.detmode = list( filter( lambda mode: mode["detectionMode"], self.OpticalSystem.observingModes, ) )[0] self.ohTimeTot = ( self.Observatory.settlingTime + self.detmode["syst"]["ohTime"] ) # total overhead time per observation self.maxTime = ( self.TimeKeeping.missionLife * self.TimeKeeping.missionPortion ) # total mission time self.constraints = { "type": "ineq", "fun": lambda x: self.maxTime.to(u.d).value - np.sum(x[x * u.d > 0.1 * u.s]) - np.sum(x * u.d > 0.1 * u.s).astype(float) # maxTime less sum of intTimes * self.ohTimeTot.to(u.d).value, # sum of True -> goes to 1 x OHTime "jac": lambda x: np.ones(len(x)) * -1.0, } self.t0 = None if cacheOptTimes: # Generate cache Name cachefname = self.cachefname + "t0" if os.path.isfile(cachefname): self.vprint("Loading cached t0 from %s" % cachefname) with open(cachefname, "rb") as f: try: self.t0 = pickle.load(f) except UnicodeDecodeError: self.t0 = pickle.load(f, encoding="latin1") sInds = np.arange(self.TargetList.nStars) fZ = ( np.array([self.ZodiacalLight.fZ0.value] * len(sInds)) * self.ZodiacalLight.fZ0.unit ) self.sint_comp = -self.objfun(self.t0.to("day").value, sInds, fZ) if self.t0 is None: # 1. find nominal background counts for all targets in list int_dMag = 25.0 # this works fine for WFIRST JEZ0 = self.TargetList.JEZ0[self.detmode["hex"]] _, Cbs, Csps = self.OpticalSystem.Cp_Cb_Csp( self.TargetList, np.arange(self.TargetList.nStars), self.ZodiacalLight.fZ0, JEZ0, int_dMag, self.TargetList.int_WA, self.detmode, TK=self.TimeKeeping, ) # find baseline solution with intCutoff_dMag-based integration times # 3. t0 = self.OpticalSystem.calc_intTime( self.TargetList, np.arange(self.TargetList.nStars), self.ZodiacalLight.fZ0, JEZ0, self.TargetList.int_dMag, self.TargetList.int_WA, self.detmode, TK=self.TimeKeeping, ) # 4. int_comp = self.Completeness.comp_per_intTime( t0, self.TargetList, np.arange(self.TargetList.nStars), self.ZodiacalLight.fZ0, JEZ0, self.TargetList.int_WA, self.detmode, C_b=Cbs, C_sp=Csps, TK=self.TimeKeeping, ) # 5. Formulating MIP to filter out stars we can't or don't want to # reasonably observe solver = pywraplp.Solver( "SolveIntegerProblem", pywraplp.Solver.CBC_MIXED_INTEGER_PROGRAMMING ) # create solver instance xs = [ solver.IntVar(0.0, 1.0, "x" + str(j)) for j in np.arange(len(int_comp)) ] # define x_i variables for each star either 0 or 1 self.vprint("Finding baseline fixed-time optimal target set.") # constraint is x_i*t_i < maxtime constraint = solver.Constraint( -solver.infinity(), self.maxTime.to(u.day).value ) # hmmm I wonder if we could set this to 0,maxTime for j, x in enumerate(xs): constraint.SetCoefficient( x, t0[j].to("day").value + self.ohTimeTot.to(u.day).value ) # this forms x_i*(t_0i+OH) for all i # objective is max x_i*comp_i objective = solver.Objective() for j, x in enumerate(xs): objective.SetCoefficient(x, int_comp[j]) objective.SetMaximization() # this line enables output of the CBC MIXED INTEGER PROGRAM # (Was hard to find don't delete) # solver.EnableOutput() solver.SetTimeLimit(5 * 60 * 1000) # time limit for solver in milliseconds cpres = solver.Solve() # noqa: F841 actually solve MIP x0 = np.array([x.solution_value() for x in xs]) # convert output solutions self.sint_comp = np.sum(int_comp * x0) # calculate sum Comp from MIP self.t0 = t0 # assign calculated t0 # Observation num x0=0 @ int_dMag=25 is 1501 # Observation num x0=0 @ int_dMag=30 is 1501... # now find the optimal eps baseline and use whichever gives you the highest # starting completeness self.vprint("Finding baseline fixed-eps optimal target set.") def totCompfeps(eps): compstars, tstars, x = self.inttimesfeps( eps, Cbs.to("1/d").value, Csps.to("1/d").value ) return -np.sum(compstars * x) # Note: There is no way to seed an initial solution to minimize scalar # 0 and 1 are supposed to be the bounds on epsres. # I could define upper bound to be 0.01, However defining the bounds to be # 5 lets the solver converge epsres = minimize_scalar( totCompfeps, method="bounded", bounds=[0, 7], options={"disp": 3, "xatol": self.ftol, "maxiter": self.maxiter}, ) # https://docs.scipy.org/doc/scipy/reference/optimize.minimize_scalar- # bounded.html#optimize-minimize-scalar-bounded comp_epsmax, t_epsmax, x_epsmax = self.inttimesfeps( epsres["x"], Cbs.to("1/d").value, Csps.to("1/d").value ) if np.sum(comp_epsmax * x_epsmax) > self.sint_comp: x0 = x_epsmax self.sint_comp = np.sum(comp_epsmax * x_epsmax) self.t0 = t_epsmax * x_epsmax * u.day # Optimize the baseline solution self.vprint("Optimizing baseline integration times.") sInds = np.arange(self.TargetList.nStars) if self.Izod == "fZ0": # Use fZ0 to calculate integration times fZ = ( np.array([self.ZodiacalLight.fZ0.value] * len(sInds)) * self.ZodiacalLight.fZ0.unit ) elif self.Izod == "fZmin": # Use fZmin to calculate integration times fZ = self.valfZmin[sInds] elif self.Izod == "fZmax": # Use fZmax to calculate integration times fZ = self.valfZmax[sInds] elif ( self.Izod == "current" ): # Use current fZ to calculate integration times fZ = self.ZodiacalLight.fZ( self.Observatory, self.TargetList, sInds, self.TimeKeeping.currentTimeAbs.copy() + np.zeros(self.TargetList.nStars) * u.d, self.detmode, ) ( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife, ) = self.TimeKeeping.get_ObsDetectionMaxIntTime( self.Observatory, self.detmode, self.TimeKeeping.currentTimeNorm.copy() ) maxIntTime = min( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife ) # Maximum intTime allowed bounds = [(0, maxIntTime.to(u.d).value) for i in np.arange(len(sInds))] initguess = x0 * self.t0.to(u.d).value self.save_initguess = initguess # While we use all sInds as input, theoretically, this can be solved faster # if we use the following lines: # sInds = np.asarray([sInd for sInd in sInds if np.bool(x0[sInd])]) # bounds = [(0,maxIntTime.to(u.d).value) for i in np.arange(len(sInds))] # and use initguess[sInds], fZ[sInds], and self.t0[sInds]. # There was no noticable performance improvement ires = minimize( self.objfun, initguess, jac=self.objfun_deriv, args=(sInds, fZ), constraints=self.constraints, method="SLSQP", bounds=bounds, options={"maxiter": self.maxiter, "ftol": self.ftol, "disp": True}, ) # original method assert ires["success"], "Initial time optimization failed." self.t0 = ires["x"] * u.d self.sint_comp = -ires["fun"] if cacheOptTimes: with open(cachefname, "wb") as f: pickle.dump(self.t0, f) self.vprint("Saved cached optimized t0 to %s" % cachefname) # Redefine filter inds self.intTimeFilterInds = np.where( (self.t0.value > 0.0) * (self.t0.value <= self.OpticalSystem.intCutoff.value) > 0.0 )[ 0 ] # These indices are acceptable for use simulating
[docs] def inttimesfeps(self, eps, Cb, Csp): """ Compute the optimal subset of targets for a given epsilon value where epsilon is the maximum completeness gradient. Everything is in units of days """ tstars = ( -Cb * eps * np.sqrt(np.log(10.0)) + np.sqrt((Cb * eps) ** 2.0 * np.log(10.0) + 5.0 * Cb * Csp**2.0 * eps) ) / ( 2.0 * Csp**2.0 * eps * np.log(10.0) ) # calculating Tau to achieve dC/dT #double check compstars = self.Completeness.comp_per_intTime( tstars * u.day, self.TargetList, np.arange(self.TargetList.nStars), self.ZodiacalLight.fZ0, self.TargetList.JEZ0[self.detmode["hex"]], self.TargetList.int_WA, self.detmode, C_b=Cb / u.d, C_sp=Csp / u.d, TK=self.TimeKeeping, ) solver = pywraplp.Solver( "SolveIntegerProblem", pywraplp.Solver.CBC_MIXED_INTEGER_PROGRAMMING ) xs = [solver.IntVar(0.0, 1.0, "x" + str(j)) for j in np.arange(len(compstars))] constraint = solver.Constraint(-solver.infinity(), self.maxTime.to(u.d).value) for j, x in enumerate(xs): constraint.SetCoefficient(x, tstars[j] + self.ohTimeTot.to(u.day).value) objective = solver.Objective() for j, x in enumerate(xs): objective.SetCoefficient(x, compstars[j]) objective.SetMaximization() # this line enables output of the CBC MIXED INTEGER PROGRAM # solver.EnableOutput() solver.SetTimeLimit(5 * 60 * 1000) # time limit for solver in milliseconds _ = solver.Solve() # self.vprint(solver.result_status()) x = np.array([x.solution_value() for x in xs]) # self.vprint('Solver is FEASIBLE: ' + str(solver.FEASIBLE)) # self.vprint('Solver is OPTIMAL: ' + str(solver.OPTIMAL)) # self.vprint('Solver is BASIC: ' + str(solver.BASIC)) return compstars, tstars, x
[docs] def objfun(self, t, sInds, fZ): """ Objective Function for SLSQP minimization. Purpose is to maximize summed completeness Args: t (ndarray): Integration times in days. NB: NOT an astropy quantity. sInds (ndarray): Target star indices (of same size as t) fZ (astropy Quantity): Surface brightness of local zodiacal light in units of 1/arcsec2 Same size as t """ good = t * u.d >= 0.1 * u.s # inds that were not downselected by initial MIP comp = self.Completeness.comp_per_intTime( t[good] * u.d, self.TargetList, sInds[good], fZ[good], self.TargetList.JEZ0[self.detmode["hex"]][sInds][good], self.TargetList.int_WA[sInds][good], self.detmode, ) # self.vprint(-comp.sum()) # for verifying SLSQP output return -comp.sum()
[docs] def objfun_deriv(self, t, sInds, fZ): """ Jacobian of objective Function for SLSQP minimization. Args: t (astropy Quantity): Integration times in days. NB: NOT an astropy quantity. sInds (ndarray): Target star indices (of same size as t) fZ (astropy Quantity): Surface brightness of local zodiacal light in units of 1/arcsec2 Same size as t """ good = t * u.d >= 0.1 * u.s # inds that were not downselected by initial MIP tmp = ( self.Completeness.dcomp_dt( t[good] * u.d, self.TargetList, sInds[good], fZ[good], self.TargetList.JEZ0[self.detmode["hex"]][sInds][good], self.TargetList.int_WA[sInds][good], self.detmode, TK=self.TimeKeeping, ) .to("1/d") .value ) jac = np.zeros(len(t)) jac[good] = tmp return -jac
[docs] def calc_targ_intTime(self, sInds, startTimes, mode): """ Given a subset of targets, calculate their integration times given the start of observation time. This implementation updates the optimized times based on current conditions and mission time left. Note: next_target filter will discard targets with zero integration times. Args: sInds (integer array): Indices of available targets startTimes (astropy quantity array): absolute start times of observations. must be of the same size as sInds mode (dict): Selected observing mode for detection Returns: astropy Quantity array: Integration times for detection. Same dimension as sInds """ if self.staticOptTimes: intTimes = self.t0[sInds] else: # assumed values for detection if self.Izod == "fZ0": # Use fZ0 to calculate integration times fZ = ( np.array([self.ZodiacalLight.fZ0.value] * len(sInds)) * self.ZodiacalLight.fZ0.unit ) elif self.Izod == "fZmin": # Use fZmin to calculate integration times fZ = self.valfZmin[sInds] elif self.Izod == "fZmax": # Use fZmax to calculate integration times fZ = self.valfZmax[sInds] elif ( self.Izod == "current" ): # Use current fZ to calculate integration times fZ = self.ZodiacalLight.fZ( self.Observatory, self.TargetList, sInds, startTimes, mode ) # instead of actual time left, try bounding by maxTime - detection time used # need to update time used in choose_next_target timeLeft = ( self.TimeKeeping.missionLife - self.TimeKeeping.currentTimeNorm.copy() ) * self.TimeKeeping.missionPortion bounds = [(0, timeLeft.to(u.d).value) for i in np.arange(len(sInds))] initguess = self.t0[sInds].to(u.d).value ires = minimize( self.objfun, initguess, jac=self.objfun_deriv, args=(sInds, fZ), constraints=self.constraints, method="SLSQP", bounds=bounds, options={"disp": True, "maxiter": self.maxiter, "ftol": self.ftol}, ) # update default times for these targets self.t0[sInds] = ires["x"] * u.d intTimes = ires["x"] * u.d intTimes[intTimes < 0.1 * u.s] = 0.0 * u.d return intTimes
[docs] def choose_next_target(self, old_sInd, sInds, slewTimes, intTimes): """ Given a subset of targets (pre-filtered by method next_target or some other means), select the best next one. Args: old_sInd (integer): Index of the previous target star sInds (integer array): Indices of available targets slewTimes (astropy quantity array): slew times to all stars (must be indexed by sInds) intTimes (astropy Quantity array): Integration times for detection in units of day Returns: tuple: sInd (integer): Index of next target star waitTime (astropy Quantity): the amount of time to wait (this method returns None) """ # Do Checking to Ensure There are Targetswith Positive Nonzero Integration Time # tmpsInds = sInds sInds = sInds[ np.where(intTimes.value > 1e-10)[0] ] # filter out any intTimes that are essentially 0 intTimes = intTimes[intTimes.value > 1e-10] # calcualte completeness values for current intTimes if self.Izod == "fZ0": # Use fZ0 to calculate integration times fZ = ( np.array([self.ZodiacalLight.fZ0.value] * len(sInds)) * self.ZodiacalLight.fZ0.unit ) elif self.Izod == "fZmin": # Use fZmin to calculate integration times fZ = self.valfZmin[sInds] elif self.Izod == "fZmax": # Use fZmax to calculate integration times fZ = self.valfZmax[sInds] elif self.Izod == "current": # Use current fZ to calculate integration times fZ = self.ZodiacalLight.fZ( self.Observatory, self.TargetList, sInds, self.TimeKeeping.currentTimeAbs.copy() + slewTimes[sInds], self.detmode, ) comps = self.Completeness.comp_per_intTime( intTimes, self.TargetList, sInds, fZ, self.TargetList.JEZ0[self.detmode["hex"]][sInds], self.TargetList.int_WA[sInds], self.detmode, TK=self.TimeKeeping, ) # Selection Metric Type valfZmax = self.valfZmax[sInds] valfZmin = self.valfZmin[sInds] if self.selectionMetric == "maxC": # A choose target with maximum completeness sInd = np.random.choice(sInds[comps == max(comps)]) elif ( self.selectionMetric == "Izod-Izodmin" ): # B choose target closest to its fZmin selectInd = np.argmin(fZ - valfZmin) sInd = sInds[selectInd] elif ( self.selectionMetric == "Izod-Izodmax" ): # C choose target furthest from fZmax selectInd = np.argmin( fZ - valfZmax ) # this is most negative when fZ is smallest sInd = sInds[selectInd] elif ( self.selectionMetric == "(Izod-Izodmin)/(Izodmax-Izodmin)" ): # D choose target closest to fZmin with largest fZmin-fZmax variation selectInd = np.argmin( (fZ - valfZmin) / (valfZmin - valfZmax) ) # this is most negative when fZ is smallest sInd = sInds[selectInd] elif ( self.selectionMetric == "(Izod-Izodmin)/(Izodmax-Izodmin)/CIzod" ): # E = D + current completeness at intTime optimized at selectInd = np.argmin( (fZ - valfZmin) / (valfZmin - valfZmax) * (1.0 / comps) ) sInd = sInds[selectInd] # F is simply E but where comp is calculated sing fZmin # elif self.selectionMetric == '(Izod-Izodmin)/(Izodmax-Izodmin)/CIzodmin': # # F = D + current completeness at Izodmin and intTime # selectInd = np.argmin((fZ - valfZmin)/(valfZmin - valfZmax)*(1./comps)) # sInd = sInds[selectInd] elif self.selectionMetric == "TauIzod/CIzod": # G maximum C/T selectInd = np.argmin(intTimes / comps) sInd = sInds[selectInd] elif self.selectionMetric == "random": # I random selection of available sInd = np.random.choice(sInds) elif self.selectionMetric == "priorityObs": # Advances time to # Apply same filters as in next_target (the issue here is that we might # want to make a target observation that is currently in keepout so we need # to "add back in those targets") sInds = np.arange(self.TargetList.nStars) sInds = sInds[np.where(self.t0.value > 1e-10)[0]] sInds = np.intersect1d(self.intTimeFilterInds, sInds) sInds = self.revisitFilter(sInds, self.TimeKeeping.currentTimeNorm.copy()) TK = self.TimeKeeping # Pick which absTime # We will readjust self.absTimefZmin later # we have to do this because "self.absTimefZmin does not support # item assignment" tmpabsTimefZmin = list() for i in np.arange(len(self.fZQuads)): fZarr = np.asarray( [ self.fZQuads[i][j][1].value for j in np.arange(len(self.fZQuads[i])) ] ) # create array of fZ for the Target Star fZarrInds = np.where( np.abs(fZarr - self.valfZmin[i].value) < 0.000001 * np.min(fZarr) )[0] dt = self.t0[ i ] # amount to subtract from points just about to enter keepout # Extract fZ Type assert not len(fZarrInds) == 0, "This should always be greater than 0" if len(fZarrInds) == 2: fZminType0 = self.fZQuads[i][fZarrInds[0]][0] fZminType1 = self.fZQuads[i][fZarrInds[1]][0] if ( fZminType0 == 2 and fZminType1 == 2 ): # Ah! we have a local minimum fZ! # which one occurs next? tmpabsTimefZmin.append( self.whichTimeComesNext( [ self.fZQuads[i][fZarrInds[0]][3], self.fZQuads[i][fZarrInds[1]][3], ] ) ) elif (fZminType0 == 0 and fZminType1 == 1) or ( fZminType0 == 1 and fZminType1 == 0 ): # we have an entering and exiting or exiting and entering if fZminType0 == 0: # and fZminType1 == 1 tmpabsTimefZmin.append( self.whichTimeComesNext( [ self.fZQuads[i][fZarrInds[0]][3] - dt, self.fZQuads[i][fZarrInds[1]][3], ] ) ) else: # fZminType0 == 1 and fZminType1 == 0 tmpabsTimefZmin.append( self.whichTimeComesNext( [ self.fZQuads[i][fZarrInds[0]][3], self.fZQuads[i][fZarrInds[1]][3] - dt, ] ) ) elif ( fZminType1 == 2 or fZminType0 == 2 ): # At least one is local minimum if fZminType0 == 2: tmpabsTimefZmin.append( self.whichTimeComesNext( [ self.fZQuads[i][fZarrInds[0]][3] - dt, self.fZQuads[i][fZarrInds[1]][3], ] ) ) else: # fZminType1 == 2 tmpabsTimefZmin.append( self.whichTimeComesNext( [ self.fZQuads[i][fZarrInds[0]][3], self.fZQuads[i][fZarrInds[1]][3] - dt, ] ) ) else: # Throw error raise Exception( "A fZminType was not assigned or handled correctly 1" ) elif len(fZarrInds) == 1: fZminType0 = self.fZQuads[i][fZarrInds[0]][0] if fZminType0 == 2: # only 1 local fZmin tmpabsTimefZmin.append(self.fZQuads[i][fZarrInds[0]][3]) elif fZminType0 == 0: # entering tmpabsTimefZmin.append(self.fZQuads[i][fZarrInds[0]][3] - dt) elif fZminType0 == 1: # exiting tmpabsTimefZmin.append(self.fZQuads[i][fZarrInds[0]][3]) else: # Throw error raise Exception( "A fZminType was not assigned or handled correctly 2" ) elif len(fZarrInds) == 3: # Not entirely sure why 3 is occuring. Looks like entering, # exiting, and local minima exist.... strange tmpdt = list() for k in np.arange(3): if self.fZQuads[i][fZarrInds[k]][0] == 0: tmpdt.append(dt) else: tmpdt.append(0.0 * u.d) tmpabsTimefZmin.append( self.whichTimeComesNext( [ self.fZQuads[i][fZarrInds[0]][3] - tmpdt[0], self.fZQuads[i][fZarrInds[1]][3] - tmpdt[1], self.fZQuads[i][fZarrInds[2]][3] - tmpdt[2], ] ) ) elif len(fZarrInds) >= 4: raise Exception("Unexpected Error: Number of fZarrInds was 4") # might check to see if local minimum and koentering/exiting # happened elif len(fZarrInds) == 0: raise Exception("Unexpected Error: Number of fZarrInds was 0") # reassign tmpabsTimefZmin = Time( np.asarray([tttt.value for tttt in tmpabsTimefZmin]), format="mjd", scale="tai", ) self.absTimefZmin = tmpabsTimefZmin # Time relative to now where fZmin occurs timeWherefZminOccursRelativeToNow = ( self.absTimefZmin.value - TK.currentTimeAbs.copy().value ) # of all targets indsLessThan0 = np.where((timeWherefZminOccursRelativeToNow < 0))[ 0 ] # find all inds that are less than 0 cnt = 0.0 # iterate until we know the next time in the future where fZmin occurs # for all targets while len(indsLessThan0) > 0: cnt += 1.0 # take original and add 365.25 until we get the right number of # years to add timeWherefZminOccursRelativeToNow[indsLessThan0] = ( self.absTimefZmin.copy().value[indsLessThan0] - TK.currentTimeAbs.copy().value + cnt * 365.25 ) indsLessThan0 = np.where((timeWherefZminOccursRelativeToNow < 0))[0] # contains all "next occuring" fZmins in absolute time timeToStartfZmins = timeWherefZminOccursRelativeToNow timefZminAfterNow = [ timeToStartfZmins[i] for i in sInds ] # filter by times in future and times not filtered timeToAdvance = np.min( np.asarray(timefZminAfterNow) ) # find the minimum time tsInds = np.where((timeToStartfZmins == timeToAdvance))[ 0 ] # find the index of the minimum time and return that sInd tsInds = [i for i in tsInds if i in sInds] if len(tsInds) > 1: sInd = tsInds[0] else: sInd = tsInds[0] del timefZminAfterNow # Advance To fZmin of Target _ = self.TimeKeeping.advanceToAbsTime( Time( timeToAdvance + TK.currentTimeAbs.copy().value, format="mjd", scale="tai", ), False, ) # Check if exoplanetObsTime would be exceeded OS = self.OpticalSystem Obs = self.Observatory TK = self.TimeKeeping allModes = OS.observingModes mode = list(filter(lambda mode: mode["detectionMode"], allModes))[0] ( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife, ) = TK.get_ObsDetectionMaxIntTime(Obs, mode) maxIntTime = min( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife ) # Maximum intTime allowed intTimes2 = self.calc_targ_intTime(sInd, TK.currentTimeAbs.copy(), mode) if ( intTimes2 > maxIntTime ): # check if max allowed integration time would be exceeded self.vprint("max allowed integration time would be exceeded") sInd = None # waitTime = 1.0 * u.d # H is simply G but where comp and intTime are calculated using fZmin # elif self.selectionMetric == 'TauIzodmin/CIzodmin': # #H maximum C at fZmin / T at fZmin if sInd is not None: # We assume any observation with integration time of less than 1 second # is not a valid integration time if self.t0[sInd] < 1.0 * u.s: self.vprint("sInd to None is: " + str(sInd)) sInd = None return sInd, None
[docs] def arbitrary_time_advancement(self, dt): """Handles fully dynamically scheduled case where OBduration is infinite and missionPortion is less than 1. Input dt is the total amount of time, including all overheads and extras used for the previous observation. """ if self.selectionMetric == "priorityObs": pass else: self.TimeKeeping.allocate_time( dt * (1.0 - self.TimeKeeping.missionPortion) / self.TimeKeeping.missionPortion, addExoplanetObsTime=False, )
[docs] def whichTimeComesNext(self, absTs): """Determine which absolute time comes next from current time Specifically designed to determine when the next local zodiacal light event occurs form fZQuads Args: absTs (list): the absolute times of different events (list of absolute times) Returns: astropy time quantity: the absolute time which occurs next """ TK = self.TimeKeeping # Convert Abs Times to norm Time tabsTs = list() for i in np.arange(len(absTs)): tabsTs.append( (absTs[i] - TK.missionStart).value ) # all should be in first year tSinceStartOfThisYear = TK.currentTimeNorm.copy().value % 365.25 if len(tabsTs) == len( np.where(tSinceStartOfThisYear < np.asarray(tabsTs))[0] ): # time strictly less than all absTs absT = absTs[np.argmin(tabsTs)] elif len(tabsTs) == len( np.where(tSinceStartOfThisYear > np.asarray(tabsTs))[0] ): absT = absTs[np.argmin(tabsTs)] else: # Some are above and some are below tmptabsTsInds = np.where(tSinceStartOfThisYear < np.asarray(tabsTs))[0] absT = absTs[ np.argmin(np.asarray(tabsTs)[tmptabsTsInds]) ] # of times greater than current time, returns smallest return absT