Source code for EXOSIMS.SurveySimulation.tieredScheduler_DD

from EXOSIMS.SurveySimulation.tieredScheduler import tieredScheduler
import astropy.units as u
import astropy.constants as const
import numpy as np
import time
import copy
from EXOSIMS.util.deltaMag import deltaMag


[docs] class tieredScheduler_DD(tieredScheduler): """tieredScheduler_DD - tieredScheduler Dual Detection This class implements a version of the tieredScheduler that performs dual-band detections """ def __init__(self, **specs): tieredScheduler.__init__(self, **specs) self.inttime_predict = 0 * u.d
[docs] def run_sim(self): """Performs the survey simulation Returns: mission_end (string): Message printed at the end of a survey simulation. """ OS = self.OpticalSystem TL = self.TargetList SU = self.SimulatedUniverse Obs = self.Observatory TK = self.TimeKeeping Comp = self.Completeness # TODO: start using this self.currentSep # set occulter separation if haveOcculter self.currentSep = Obs.occulterSep # Choose observing modes selected for detection (default marked with a flag), det_modes = list( filter(lambda mode: "imag" in mode["inst"]["name"], OS.observingModes) ) base_det_mode = list( filter(lambda mode: mode["detectionMode"], OS.observingModes) )[0] # and for characterization (default is first spectro/IFS mode) spectroModes = list( filter(lambda mode: "spec" in mode["inst"]["name"], OS.observingModes) ) if np.any(spectroModes): char_mode = spectroModes[0] # if no spectro mode, default char mode is first observing mode else: char_mode = OS.observingModes[0] # Begin Survey, and loop until mission is finished self.logger.info("OB{}: survey beginning.".format(TK.OBnumber + 1)) self.vprint("OB{}: survey beginning.".format(TK.OBnumber + 1)) t0 = time.time() sInd = None occ_sInd = None cnt = 0 while not TK.mission_is_over(OS, Obs, det_modes[0]): # Acquire the NEXT TARGET star index and create DRM # prev_occ_sInd = occ_sInd old_sInd = sInd # used to save sInd if returned sInd is None waitTime = None DRM, sInd, occ_sInd, t_det, sd, occ_sInds, det_mode = self.next_target( sInd, occ_sInd, det_modes, char_mode ) if det_mode is not None: true_t_det = ( t_det * det_mode["timeMultiplier"] + Obs.settlingTime + det_mode["syst"]["ohTime"] ) else: true_t_det = t_det if sInd != occ_sInd and sInd is not None: assert t_det != 0, "Integration time can't be 0." if ( sInd is not None and (TK.currentTimeAbs.copy() + true_t_det) >= self.occ_arrives and occ_sInd != self.last_chard ): sInd = occ_sInd if sInd == occ_sInd: self.ready_to_update = True time2arrive = self.occ_arrives - TK.currentTimeAbs.copy() if sInd is not None: cnt += 1 # clean up revisit list when one occurs to prevent repeats if np.any(self.starRevisit) and np.any( np.where(self.starRevisit[:, 0] == float(sInd)) ): s_revs = np.where(self.starRevisit[:, 0] == float(sInd))[0] t_revs = np.where( self.starRevisit[:, 1] * u.day - TK.currentTimeNorm.copy() < 0 * u.d )[0] self.starRevisit = np.delete( self.starRevisit, np.intersect1d(s_revs, t_revs), 0 ) # get the index of the selected target for the extended list if ( TK.currentTimeNorm.copy() > TK.missionLife and self.starExtended.shape[0] == 0 ): for i in range(len(self.DRM)): if np.any([x == 1 for x in self.DRM[i]["plan_detected"]]): self.starExtended = np.hstack( (self.starExtended, self.DRM[i]["star_ind"]) ) self.starExtended = np.unique(self.starExtended) # Beginning of observation, start to populate DRM DRM["OB_nb"] = TK.OBnumber + 1 DRM["ObsNum"] = cnt DRM["star_ind"] = sInd pInds = np.where(SU.plan2star == sInd)[0] DRM["plan_inds"] = pInds.astype(int).tolist() if sInd == occ_sInd: # wait until expected arrival time is observed if time2arrive > 0 * u.d: TK.advanceToAbsTime(self.occ_arrives) if time2arrive > 1 * u.d: self.GAtime = self.GAtime + time2arrive.to("day") TK.obsStart = TK.currentTimeNorm.copy().to("day") self.logger.info( "Observation #%s, target #%s/%s with %s planet(s), mission time: %s" % (cnt, sInd + 1, TL.nStars, len(pInds), TK.obsStart.round(2)) ) self.vprint( "Observation #%s, target #%s/%s with %s planet(s), mission time: %s" % (cnt, sInd + 1, TL.nStars, len(pInds), TK.obsStart.round(2)) ) DRM["arrival_time"] = TK.currentTimeNorm.copy().to("day") if sInd != occ_sInd: self.starVisits[sInd] += 1 # PERFORM DETECTION and populate revisit list attribute. # First store dMag, WA if np.any(pInds): DRM["det_dMag"] = SU.dMag[pInds].tolist() DRM["det_WA"] = SU.WA[pInds].to("mas").value.tolist() ( detected, det_fZ, det_JEZ, det_systemParams, det_SNR, FA, ) = self.observation_detection(sInd, t_det, det_mode) if np.any(pInds): DRM["det_JEZ"] = det_JEZ if np.any(detected > 0): self.sInd_detcounts[sInd] += 1 self.sInd_dettimes[sInd] = ( self.sInd_dettimes.get(sInd) or [] ) + [TK.currentTimeNorm.copy().to("day")] self.vprint(" Det. results are: %s" % (detected)) # update GAtime self.GAtime = ( self.GAtime + t_det.to("day") * self.GA_simult_det_fraction ) self.tot_dettime += t_det.to("day") # populate the DRM with detection results DRM["det_time"] = t_det.to("day") DRM["det_status"] = detected DRM["det_SNR"] = det_SNR DRM["det_fZ"] = det_fZ.to("1/arcsec2") DRM["det_params"] = det_systemParams DRM["FA_det_status"] = int(FA) det_comp = Comp.comp_per_intTime( t_det, TL, sInd, det_fZ, TL.JEZ0[det_mode["hex"]][sInd], TL.int_WA[sInd], det_mode, )[0] DRM["det_comp"] = det_comp DRM["det_mode"] = dict(det_mode) del DRM["det_mode"]["inst"], DRM["det_mode"]["syst"] elif sInd == occ_sInd: self.occ_starVisits[occ_sInd] += 1 self.last_chard = occ_sInd # PERFORM CHARACTERIZATION and populate spectra list attribute. occ_pInds = np.where(SU.plan2star == occ_sInd)[0] DRM["slew_time"] = self.occ_slewTime.to("day").value DRM["slew_angle"] = self.occ_sd.to("deg").value slew_mass_used = ( self.occ_slewTime * Obs.defburnPortion * Obs.flowRate ) DRM["slew_dV"] = ( (self.occ_slewTime * self.ao * Obs.defburnPortion) .to("m/s") .value ) DRM["slew_mass_used"] = slew_mass_used.to("kg") Obs.scMass = Obs.scMass - slew_mass_used DRM["scMass"] = Obs.scMass.to("kg") if Obs.twotanks: Obs.slewMass = Obs.slewMass - slew_mass_used DRM["slewMass"] = Obs.slewMass.to("kg") self.logger.info(" Starshade and telescope aligned at target star") self.vprint(" Starshade and telescope aligned at target star") # PERFORM CHARACTERIZATION and populate spectra list attribute ( characterized, char_fZ, char_systemParams, char_SNR, char_intTime, ) = self.observation_characterization(sInd, char_mode) if np.any(characterized): self.vprint(" Char. results are: %s" % (characterized)) else: # make sure we don't accidentally double characterize TK.advanceToAbsTime(TK.currentTimeAbs.copy() + 0.01 * u.d) assert char_intTime != 0.0 * u.d, "Integration time can't be 0." if np.any(occ_pInds): DRM["char_fEZ"] = ( SU.fEZ[occ_pInds].to("1/arcsec2").value.tolist() ) DRM["char_dMag"] = SU.dMag[occ_pInds].tolist() DRM["char_WA"] = SU.WA[occ_pInds].to("mas").value.tolist() DRM["char_mode"] = dict(char_mode) del DRM["char_mode"]["inst"], DRM["char_mode"]["syst"] # update the occulter wet mass if OS.haveOcculter and char_intTime is not None: DRM = self.update_occulter_mass(DRM, sInd, char_intTime, "char") char_comp = Comp.comp_per_intTime( char_intTime, TL, occ_sInd, char_fZ, self.ZodiacalLight.fEZ0, TL.int_WA[occ_sInd], char_mode, )[0] DRM["char_comp"] = char_comp FA = False # populate the DRM with characterization results DRM["char_time"] = ( char_intTime.to("day") if char_intTime is not None else 0.0 * u.day ) # DRM['char_counts'] = self.sInd_charcounts[sInd] DRM["char_status"] = characterized[:-1] if FA else characterized DRM["char_SNR"] = char_SNR[:-1] if FA else char_SNR DRM["char_fZ"] = char_fZ.to("1/arcsec2") DRM["char_params"] = char_systemParams # populate the DRM with FA results DRM["FA_det_status"] = int(FA) DRM["FA_char_status"] = characterized[-1] if FA else 0 DRM["FA_char_SNR"] = char_SNR[-1] if FA else 0.0 DRM["FA_char_fEZ"] = ( self.lastDetected[sInd, 1][-1] / u.arcsec**2 if FA else 0.0 / u.arcsec**2 ) DRM["FA_char_dMag"] = self.lastDetected[sInd, 2][-1] if FA else 0.0 DRM["FA_char_WA"] = ( self.lastDetected[sInd, 3][-1] * u.arcsec if FA else 0.0 * u.arcsec ) # add star back into the revisit list if np.any(characterized): char = np.where(characterized)[0] pInds = np.where(SU.plan2star == sInd)[0] smin = np.min(SU.s[pInds[char]]) pInd_smin = pInds[np.argmin(SU.s[pInds[char]])] Ms = TL.MsTrue[sInd] sp = smin Mp = SU.Mp[pInd_smin] mu = const.G * (Mp + Ms) T = 2.0 * np.pi * np.sqrt(sp**3 / mu) t_rev = TK.currentTimeNorm.copy() + T / 2.0 # noqa: F841 self.goal_GAtime = self.GA_percentage * TK.currentTimeNorm.copy().to( "day" ) goal_GAdiff = self.goal_GAtime - self.GAtime # allocate extra time to GA if we are falling behind if ( goal_GAdiff > 1 * u.d and TK.currentTimeAbs.copy() < self.occ_arrives ): GA_diff = min( self.occ_arrives - TK.currentTimeAbs.copy(), goal_GAdiff ) self.vprint( "Allocating time %s to general astrophysics" % (GA_diff) ) self.GAtime = self.GAtime + GA_diff TK.advanceToAbsTime(TK.currentTimeAbs.copy() + GA_diff) # allocate time if there is no target for the starshade elif ( goal_GAdiff > 1 * u.d and (self.occ_arrives - TK.currentTimeAbs.copy()) < -5 * u.d and not np.any(occ_sInds) ): self.vprint( ( "No Available Occulter Targets. Allocating time " "{} to general astrophysics".format(goal_GAdiff) ) ) self.GAtime = self.GAtime + goal_GAdiff TK.advanceToAbsTime(TK.currentTimeAbs.copy() + goal_GAdiff) DRM["exoplanetObsTime"] = TK.exoplanetObsTime.copy() # Append result values to self.DRM self.DRM.append(DRM) # Calculate observation end time TK.obsEnd = TK.currentTimeNorm.copy().to("day") # With prototype TimeKeeping, if no OB duration was specified, advance # to the next OB with timestep equivalent to time spent on one target if np.isinf(TK.OBduration) and (TK.missionPortion < 1): self.arbitrary_time_advancement( TK.currentTimeNorm.to("day").copy() - DRM["arrival_time"] ) else: # sInd == None sInd = old_sInd # Retain the last observed star if ( TK.currentTimeNorm.copy() >= TK.OBendTimes[TK.OBnumber] ): # currentTime is at end of OB # Conditional Advance To Start of Next OB if not TK.mission_is_over( OS, Obs, det_mode ): # as long as the mission is not over TK.advancetToStartOfNextOB() # Advance To Start of Next OB elif waitTime is not None: # CASE 1: Advance specific wait time _ = TK.advanceToAbsTime(TK.currentTimeAbs.copy() + waitTime) self.vprint("waitTime is not None") else: startTimes = ( TK.currentTimeAbs.copy() + np.zeros(TL.nStars) * u.d ) # Start Times of Observations observableTimes = Obs.calculate_observableTimes( TL, np.arange(TL.nStars), startTimes, self.koMaps, self.koTimes, base_det_mode, )[0] # CASE 2 If There are no observable targets for the rest of # the mission # Are there any stars coming out of keepout before end of mission if ( observableTimes[ ( TK.missionFinishAbs.copy().value * u.d > observableTimes.value * u.d ) * ( observableTimes.value * u.d >= TK.currentTimeAbs.copy().value * u.d ) ].shape[0] ) == 0: self.vprint( ( "No Observable Targets for Remainder of mission at " "currentTimeNorm = {}".format(TK.currentTimeNorm.copy) ) ) # Manually advancing time to mission end TK.currentTimeNorm = TK.missionLife TK.currentTimeAbs = TK.missionFinishAbs else: # CASE 3 nominal wait time if at least 1 target is still # in list and observable # TODO: ADD ADVANCE TO WHEN FZMIN OCURS inds1 = np.arange(TL.nStars)[ observableTimes.value * u.d > TK.currentTimeAbs.copy().value * u.d ] # apply intTime filter inds2 = np.intersect1d(self.intTimeFilterInds, inds1) # apply revisit Filter #NOTE this means stars you added to # the revisit list inds3 = self.revisitFilter( inds2, TK.currentTimeNorm.copy() + self.dt_max.to(u.d) ) self.vprint( "Filtering %d stars from advanceToAbsTime" % (TL.nStars - len(inds3)) ) # there is at least one observableTime between now and the # end of the mission oTnowToEnd = observableTimes[inds3] if not oTnowToEnd.value.shape[0] == 0: # advance to that observable time tAbs = np.min(oTnowToEnd) else: # advance to end of mission tAbs = TK.missionStart + TK.missionLife tmpcurrentTimeNorm = TK.currentTimeNorm.copy() # Advance Time to this time OR start of next OB following # this time _ = TK.advanceToAbsTime(tAbs) self.vprint( ( "No Observable Targets a currentTimeNorm = {:.2f} " "Advanced To currentTimeNorm = {:.2f}" ).format( tmpcurrentTimeNorm.to("day").value, TK.currentTimeNorm.to("day").value, ) ) else: dtsim = (time.time() - t0) * u.s mission_end = ( "Mission complete: no more time available.\n" + "Simulation duration: %s.\n" % dtsim.astype("int") + "Results stored in SurveySimulation.DRM (Design Reference Mission)." ) self.logger.info(mission_end) self.vprint(mission_end) return mission_end
[docs] def next_target(self, old_sInd, old_occ_sInd, det_modes, char_mode): """Finds index of next target star and calculates its integration time. This method chooses the next target star index based on which stars are available, their integration time, and maximum completeness. Returns None if no target could be found. Args: old_sInd (integer): Index of the previous target star for the telescope old_occ_sInd (integer): Index of the previous target star for the occulter det_modes (dict array): Selected observing mode for detection char_mode (dict): Selected observing mode for characterization Returns: tuple: DRM (dicts): Contains the results of survey simulation sInd (integer): Index of next target star. Defaults to None. occ_sInd (integer): Index of next occulter target star. Defaults to None. t_det (astropy Quantity): Selected star integration time for detection in units of day. Defaults to None. """ OS = self.OpticalSystem ZL = self.ZodiacalLight TL = self.TargetList Obs = self.Observatory TK = self.TimeKeeping SU = self.SimulatedUniverse # Create DRM DRM = {} # selecting appropriate koMap occ_koMap = self.koMaps[char_mode["syst"]["name"]] koMap = self.koMaps[det_modes[0]["syst"]["name"]] # In case of an occulter, initialize slew time factor # (add transit time and reduce starshade mass) assert OS.haveOcculter self.ao = Obs.thrust / Obs.scMass # Star indices that correspond with the given HIPs numbers for the occulter # TODO: print out HIPs that don't show up in TL HIP_sInds = np.where(np.isin(TL.Name, self.occHIPs))[0] if TL.earths_only: HIP_sInds = np.union1d(HIP_sInds, self.promoted_stars).astype(int) sInd = None # Now, start to look for available targets while not TK.mission_is_over(OS, Obs, det_modes[0]): # allocate settling time + overhead time tmpCurrentTimeAbs = TK.currentTimeAbs.copy() # tmpCurrentTimeNorm = TK.currentTimeNorm.copy() occ_tmpCurrentTimeAbs = TK.currentTimeAbs.copy() # occ_tmpCurrentTimeNorm = TK.currentTimeNorm.copy() # 0 initialize arrays slewTimes = np.zeros(TL.nStars) * u.d # fZs = np.zeros(TL.nStars) / u.arcsec**2 # dV = np.zeros(TL.nStars) * u.m / u.s intTimes = np.zeros(TL.nStars) * u.d occ_intTimes = np.zeros(TL.nStars) * u.d # tovisit = np.zeros(TL.nStars, dtype=bool) occ_tovisit = np.zeros(TL.nStars, dtype=bool) sInds = np.arange(TL.nStars) # 1 Find spacecraft orbital START positions and filter out unavailable # targets. If occulter, each target has its own START position. sd = Obs.star_angularSep(TL, old_occ_sInd, sInds, tmpCurrentTimeAbs) obsTimes = Obs.calculate_observableTimes( TL, sInds, tmpCurrentTimeAbs, self.koMaps, self.koTimes, char_mode ) slewTimes = Obs.calculate_slewTimes( TL, old_occ_sInd, sInds, sd, obsTimes, tmpCurrentTimeAbs ) # 2.1 filter out totTimes > integration cutoff if len(sInds) > 0: occ_sInds = np.intersect1d(self.occ_intTimeFilterInds, sInds) if len(sInds) > 0: sInds = np.intersect1d(self.intTimeFilterInds, sInds) # Starttimes based off of slewtime occ_startTimes = occ_tmpCurrentTimeAbs.copy() + slewTimes # occ_startTimesNorm = occ_tmpCurrentTimeNorm.copy() + slewTimes startTimes = tmpCurrentTimeAbs.copy() + np.zeros(TL.nStars) * u.d # startTimesNorm = tmpCurrentTimeNorm.copy() # 2.5 Filter stars not observable at startTimes try: tmpIndsbool = list() for i in np.arange(len(occ_sInds)): koTimeInd = np.where( np.round(occ_startTimes[occ_sInds[i]].value) - self.koTimes.value == 0 )[0][ 0 ] # find indice where koTime is endTime[0] tmpIndsbool.append( occ_koMap[occ_sInds[i]][koTimeInd].astype(bool) ) # Is star observable at time ind sInds_occ_ko = occ_sInds[tmpIndsbool] occ_sInds = sInds_occ_ko[np.where(np.isin(sInds_occ_ko, HIP_sInds))[0]] del tmpIndsbool except: # noqa: E722 If there are no target stars to observe sInds_occ_ko = np.asarray([], dtype=int) occ_sInds = np.asarray([], dtype=int) try: tmpIndsbool = list() for i in np.arange(len(sInds)): koTimeInd = np.where( np.round(startTimes[sInds[i]].value) - self.koTimes.value == 0 )[0][ 0 ] # find indice where koTime is endTime[0] tmpIndsbool.append( koMap[sInds[i]][koTimeInd].astype(bool) ) # Is star observable at time ind sInds = sInds[tmpIndsbool] del tmpIndsbool except: # noqa: E722 If there are no target stars to observe sInds = np.asarray([], dtype=int) # 2.9 Occulter target promotion step occ_sInds = self.promote_coro_targets(occ_sInds, sInds_occ_ko) # 3 Filter out all previously (more-)visited targets, unless in # revisit list, with time within some dt of start (+- 1 week) if len(sInds.tolist()) > 0: sInds = self.revisitFilter(sInds, TK.currentTimeNorm.copy()) # revisit list, with time after start if np.any(occ_sInds): occ_tovisit[occ_sInds] = ( self.occ_starVisits[occ_sInds] == self.occ_starVisits[occ_sInds].min() ) if self.occ_starRevisit.size != 0: dt_rev = ( TK.currentTimeNorm.copy() - self.occ_starRevisit[:, 1] * u.day ) ind_rev = [ int(x) for x in self.occ_starRevisit[dt_rev > 0, 0] if x in occ_sInds ] occ_tovisit[ind_rev] = True occ_sInds = np.where(occ_tovisit)[0] # 4 calculate integration times for ALL preselected targets, # and filter out totTimes > integration cutoff ( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife, ) = TK.get_ObsDetectionMaxIntTime(Obs, det_modes[0]) maxIntTime = min( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife, OS.intCutoff, ) # Maximum intTime allowed ( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife, ) = TK.get_ObsDetectionMaxIntTime(Obs, char_mode) occ_maxIntTime = min( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife, OS.intCutoff, ) # Maximum intTime allowed if len(occ_sInds) > 0: if self.int_inflection: fEZ = ZL.fEZ0 WA = TL.int_WA occ_intTimes[occ_sInds] = self.calc_int_inflection( occ_sInds, fEZ, occ_startTimes, WA[occ_sInds], char_mode, ischar=True, ) totTimes = occ_intTimes * char_mode["timeMultiplier"] occ_endTimes = occ_startTimes + totTimes else: # characterization_start = occ_startTimes occ_intTimes[occ_sInds] = self.calc_targ_intTime( occ_sInds, occ_startTimes[occ_sInds], char_mode ) * (1 + self.charMargin) # Adjust integration time for stars with known earths around them for occ_star in occ_sInds: if occ_star in self.promoted_stars: occ_earths = np.intersect1d( np.where(SU.plan2star == occ_star)[0], self.known_earths ).astype(int) if np.any(occ_earths): fZ = ZL.fZ( Obs, TL, occ_star, occ_startTimes[occ_star], char_mode, ) fEZ = ( SU.fEZ[occ_earths].to("1/arcsec2").value / u.arcsec**2 ) if SU.lucky_planets: phi = (1 / np.pi) * np.ones(len(SU.d)) dMag = deltaMag(SU.p, SU.Rp, SU.d, phi)[ occ_earths ] # delta magnitude WA = np.arctan(SU.a / TL.dist[SU.plan2star]).to( "arcsec" )[ occ_earths ] # working angle else: dMag = SU.dMag[occ_earths] WA = SU.WA[occ_earths] if np.all( (WA < char_mode["IWA"]) | (WA > char_mode["OWA"]) ): occ_intTimes[occ_star] = 0.0 * u.d else: earthlike_inttimes = OS.calc_intTime( TL, occ_star, fZ, fEZ, dMag, WA, char_mode ) * (1 + self.charMargin) earthlike_inttimes[ ~np.isfinite(earthlike_inttimes) ] = (0 * u.d) earthlike_inttime = earthlike_inttimes[ (earthlike_inttimes < occ_maxIntTime) ] if len(earthlike_inttime) > 0: occ_intTimes[occ_star] = np.max( earthlike_inttime ) else: occ_intTimes[occ_star] = np.max( earthlike_inttimes ) occ_endTimes = ( occ_startTimes + (occ_intTimes * char_mode["timeMultiplier"]) + Obs.settlingTime + char_mode["syst"]["ohTime"] ) occ_sInds = occ_sInds[ (occ_intTimes[occ_sInds] <= occ_maxIntTime) ] # Filters targets exceeding maximum intTime occ_sInds = occ_sInds[ (occ_intTimes[occ_sInds] > 0.0 * u.d) ] # Filters with an inttime of 0 if occ_maxIntTime.value <= 0: occ_sInds = np.asarray([], dtype=int) if len(sInds.tolist()) > 0: intTimes[sInds] = self.calc_targ_intTime( sInds, startTimes[sInds], det_modes[0] ) sInds = sInds[ (intTimes[sInds] <= maxIntTime) ] # Filters targets exceeding end of OB endTimes = ( startTimes.value * u.d + intTimes + Obs.settlingTime + det_modes[0]["syst"]["ohTime"] ) if maxIntTime.value <= 0: sInds = np.asarray([], dtype=int) # 5.2 find spacecraft orbital END positions (for each candidate target), # and filter out unavailable targets if len(occ_sInds.tolist()) > 0 and Obs.checkKeepoutEnd: # endTimes may exist past koTimes so we have an exception to hand # this case try: tmpIndsbool = list() for i in np.arange(len(occ_sInds)): koTimeInd = np.where( np.round(occ_endTimes[occ_sInds[i]].value) - self.koTimes.value == 0 )[0][ 0 ] # find indice where koTime is endTime[0] tmpIndsbool.append( occ_koMap[occ_sInds[i]][koTimeInd].astype(bool) ) # Is star observable at time ind occ_sInds = occ_sInds[tmpIndsbool] del tmpIndsbool except: # noqa: E722 occ_sInds = np.asarray([], dtype=int) if len(sInds.tolist()) > 0 and Obs.checkKeepoutEnd: # endTimes may exist past koTimes so we have an exception to # hand this case try: tmpIndsbool = list() for i in np.arange(len(sInds)): # find indice where koTime is endTime[0] koTimeInd = np.where( np.round(endTimes[sInds[i]].value) - self.koTimes.value == 0 )[0][0] # Is star observable at time ind tmpIndsbool.append(koMap[sInds[i]][koTimeInd].astype(bool)) sInds = sInds[tmpIndsbool] del tmpIndsbool except: # noqa: E722 sInds = np.asarray([], dtype=int) # 5.3 Filter off current occulter target star from detection list if old_occ_sInd is not None: sInds = sInds[(sInds != old_occ_sInd)] occ_sInds = occ_sInds[(occ_sInds != old_occ_sInd)] # 6.1 Filter off any stars visited by the occulter 3 or more times if np.any(occ_sInds): occ_sInds = occ_sInds[ (self.occ_starVisits[occ_sInds] < self.occ_max_visits) ] # 6.2 Filter off coronograph stars with > 3 visits and no detections no_dets = np.logical_and( (self.starVisits[sInds] > self.n_det_remove), (self.sInd_detcounts[sInds] == 0), ) sInds = sInds[np.where(np.invert(no_dets))[0]] max_dets = np.where(self.sInd_detcounts[sInds] < self.max_successful_dets)[ 0 ] sInds = sInds[max_dets] # 7 Filter off cornograph stars with too-long inttimes available_time = None if self.occ_arrives > TK.currentTimeAbs: available_time = self.occ_arrives - TK.currentTimeAbs.copy() if np.any(sInds[intTimes[sInds] < available_time]): sInds = sInds[intTimes[sInds] < available_time] # 8 remove occ targets on ignore_stars list occ_sInds = np.setdiff1d( occ_sInds, np.intersect1d(occ_sInds, self.ignore_stars) ) tmpIndsbool = list() for i in np.arange(len(occ_sInds)): koTimeInd = np.where( np.round(occ_startTimes[occ_sInds[i]].value) - self.koTimes.value == 0 )[0][ 0 ] # find indice where koTime is endTime[0] tmpIndsbool.append( occ_koMap[occ_sInds[i]][koTimeInd].astype(bool) ) # Is star observable at time ind t_det = 0 * u.d det_mode = copy.deepcopy(det_modes[0]) occ_sInd = old_occ_sInd # 9 Choose best target from remaining # if the starshade has arrived at its destination, or it is the # first observation if np.any(occ_sInds): if old_occ_sInd is None or ( (TK.currentTimeAbs.copy() + t_det) >= self.occ_arrives and self.ready_to_update ): occ_sInd = self.choose_next_occulter_target( old_occ_sInd, occ_sInds, occ_intTimes ) if old_occ_sInd is None: self.occ_arrives = TK.currentTimeAbs.copy() else: self.occ_arrives = occ_startTimes[occ_sInd] self.occ_slewTime = slewTimes[occ_sInd] self.occ_sd = sd[occ_sInd] self.inttime_predict = occ_intTimes[occ_sInd] self.ready_to_update = False elif not np.any(sInds): TK.advanceToAbsTime(TK.currentTimeAbs.copy() + 1 * u.d) continue if occ_sInd is not None: sInds = sInds[np.where(sInds != occ_sInd)] if self.tot_det_int_cutoff < self.tot_dettime: sInds = np.array([]) if np.any(sInds): # choose sInd of next target sInd = self.choose_next_telescope_target( old_sInd, sInds, intTimes[sInds] ) # Perform dual band detections if necessary if ( TL.int_WA[sInd] > det_modes[1]["IWA"] and TL.int_WA[sInd] < det_modes[1]["OWA"] ): det_mode["BW"] = det_mode["BW"] + det_modes[1]["BW"] det_mode["inst"]["sread"] = ( det_mode["inst"]["sread"] + det_modes[1]["inst"]["sread"] ) det_mode["inst"]["idark"] = ( det_mode["inst"]["idark"] + det_modes[1]["inst"]["idark"] ) det_mode["inst"]["CIC"] = ( det_mode["inst"]["CIC"] + det_modes[1]["inst"]["CIC"] ) det_mode["syst"]["optics"] = np.mean( (det_mode["syst"]["optics"], det_modes[1]["syst"]["optics"]) ) det_mode["instName"] = "combined" t_det = self.calc_targ_intTime( np.array([sInd]), startTimes[sInd], det_mode )[0] if t_det > maxIntTime and maxIntTime > 0 * u.d: t_det = maxIntTime if available_time is not None and available_time > 0 * u.d: if t_det > available_time: t_det = available_time.copy().value * u.d # if no observable target, call the TimeKeeping.wait() method if not np.any(sInds) and not np.any(occ_sInds): self.vprint( "No Observable Targets at currentTimeNorm= " + str(TK.currentTimeNorm.copy()) ) return DRM, None, None, None, None, None, None break else: self.logger.info("Mission complete: no more time available") self.vprint("Mission complete: no more time available") return DRM, None, None, None, None, None, None if TK.mission_is_over(OS, Obs, det_mode): self.logger.info("Mission complete: no more time available") self.vprint("Mission complete: no more time available") return DRM, None, None, None, None, None, None return DRM, sInd, occ_sInd, t_det, sd, occ_sInds, det_mode