Source code for EXOSIMS.SurveySimulation.coroOnlyScheduler

from EXOSIMS.Prototypes.SurveySimulation import SurveySimulation
import astropy.units as u
import astropy.constants as const
from astropy.time import Time
import numpy as np
import time
import copy
from EXOSIMS.util.deltaMag import deltaMag


[docs] class coroOnlyScheduler(SurveySimulation): """coroOnlyScheduler - Coronograph Only Scheduler This scheduler inherits directly from the prototype SurveySimulation module. The coronlyScheduler operates using only a coronograph. The scheduler makes detections until stars can be promoted into a characterization list, at which point they are charcterized. Args: revisit_wait (float): Wait time threshold for star revisits. The value given is the fraction of a characterized planet's period that must be waited before scheduling a revisit. revisit_weight (float): Weight used to increase preference for coronograph revisits. n_det_remove (integer): Number of failed detections before a star is removed from the target list. n_det_min (integer): Minimum number of detections required for promotion to char target. max_successful_chars (integer): Maximum number of successful characterizions before star is taken off target list. max_successful_dets (integer): Maximum number of successful detections before star is taken off target list. lum_exp (int): The exponent to use for luminosity weighting on coronograph targets. promote_by_time (bool): Only promote stars that have had detections that span longer than half a period. detMargin (float): Acts in the same way a charMargin. Adds a multiplyer to the calculated detection time. **specs: user specified values """ def __init__( self, revisit_wait=0.5, revisit_weight=1.0, n_det_remove=3, n_det_min=3, max_successful_chars=1, max_successful_dets=4, lum_exp=1, promote_by_time=False, detMargin=0.0, **specs, ): SurveySimulation.__init__(self, **specs) TL = self.TargetList OS = self.OpticalSystem SU = self.SimulatedUniverse # Add to outspec self._outspec["revisit_wait"] = revisit_wait self._outspec["revisit_weight"] = revisit_weight self._outspec["n_det_remove"] = n_det_remove self._outspec["max_successful_chars"] = max_successful_chars self._outspec["lum_exp"] = lum_exp self._outspec["n_det_min"] = n_det_min self.FA_status = np.zeros(TL.nStars, dtype=bool) # False Alarm status array # The exponent to use for luminosity weighting on coronograph targets self.lum_exp = lum_exp self.sInd_charcounts = {} # Number of characterizations by star index self.sInd_detcounts = np.zeros( TL.nStars, dtype=int ) # Number of detections by star index self.sInd_dettimes = {} # Minimum number of visits with no detections required to filter off star self.n_det_remove = n_det_remove # Minimum number of detections required for promotio self.n_det_min = n_det_min # Maximum allowed number of successful chars of deep dive targets before # removal from target list self.max_successful_chars = max_successful_chars self.max_successful_dets = max_successful_dets self.char_starRevisit = np.array([]) # Array of star revisit times # The number of times each star was visited by the occulter self.char_starVisits = np.zeros(TL.nStars, dtype=int) self.promote_by_time = promote_by_time self.detMargin = detMargin # self.revisit_wait = revisit_wait * u.d EEID = 1 * u.AU * np.sqrt(TL.L) mu = const.G * (TL.MsTrue) T = (2.0 * np.pi * np.sqrt(EEID**3 / mu)).to(u.d) self.revisit_wait = revisit_wait * T self.revisit_weight = revisit_weight self.no_dets = np.ones(self.TargetList.nStars, dtype=bool) # list of stars promoted from the detection list to the characterization list self.promoted_stars = self.known_rocky # list of stars that have been removed from the occ_sInd list self.ignore_stars = [] self.t_char_earths = np.array([]) # corresponding integration times for earths allModes = OS.observingModes num_char_modes = len( list(filter(lambda mode: "spec" in mode["inst"]["name"], allModes)) ) self.fullSpectra = np.zeros((num_char_modes, SU.nPlans), dtype=int) self.partialSpectra = np.zeros((num_char_modes, SU.nPlans), dtype=int) # Promote all stars assuming they have known earths char_sInds_with_earths = [] if TL.earths_only: OS = self.OpticalSystem TL = self.TargetList SU = self.SimulatedUniverse # char_modes = list( # filter(lambda mode: "spec" in mode["inst"]["name"], OS.observingModes) # ) # check for earths around the available stars for sInd in np.arange(TL.nStars): pInds = np.where(SU.plan2star == sInd)[0] pinds_earthlike = self.is_earthlike(pInds, sInd) if np.any(pinds_earthlike): self.known_earths = np.union1d( self.known_earths, pInds[pinds_earthlike] ).astype(int) char_sInds_with_earths.append(sInd) self.promoted_stars = np.union1d( self.promoted_stars, char_sInds_with_earths ).astype(int)
[docs] def initializeStorageArrays(self): """ Initialize all storage arrays based on # of stars and targets """ self.DRM = [] OS = self.OpticalSystem SU = self.SimulatedUniverse allModes = OS.observingModes num_char_modes = len( list(filter(lambda mode: "spec" in mode["inst"]["name"], allModes)) ) self.fullSpectra = np.zeros((num_char_modes, SU.nPlans), dtype=int) self.partialSpectra = np.zeros((num_char_modes, SU.nPlans), dtype=int) self.propagTimes = np.zeros(self.TargetList.nStars) << u.d self.lastObsTimes = np.zeros(self.TargetList.nStars) << u.d self.starVisits = np.zeros( self.TargetList.nStars, dtype=int ) # contains the number of times each star was visited self.starRevisit = np.array([]) self.starExtended = np.array([], dtype=int) self.lastDetected = np.empty((self.TargetList.nStars, 4), dtype=object)
[docs] def run_sim(self): """Performs the survey simulation""" OS = self.OpticalSystem TL = self.TargetList SU = self.SimulatedUniverse Obs = self.Observatory TK = self.TimeKeeping Comp = self.Completeness # choose observing modes selected for detection (default marked with a flag) allModes = OS.observingModes det_modes = list( filter(lambda mode: "imag" in mode["inst"]["name"], OS.observingModes) ) base_det_mode = list( filter(lambda mode: mode["detectionMode"], OS.observingModes) )[0] # and for characterization (default is first spectro/IFS mode) spectroModes = list( filter(lambda mode: "spec" in mode["inst"]["name"], allModes) ) if np.any(spectroModes): char_modes = spectroModes # if no spectro mode, default char mode is first observing mode else: char_modes = [allModes[0]] # begin Survey, and loop until mission is finished log_begin = "OB%s: survey beginning." % (TK.OBnumber) self.logger.info(log_begin) self.vprint(log_begin) t0 = time.time() sInd = None ObsNum = 0 while not TK.mission_is_over(OS, Obs, det_modes[0]): # acquire the NEXT TARGET star index and create DRM old_sInd = sInd # used to save sInd if returned sInd is None DRM, sInd, det_intTime, waitTime, det_mode = self.next_target( sInd, det_modes, char_modes ) if sInd is not None: # beginning of observation, start to populate DRM pInds = np.where(SU.plan2star == sInd)[0] log_obs = ( " Observation #%s, star ind %s (of %s) with %s planet(s), " + "mission time at Obs start: %s, exoplanetObsTime: %s" ) % ( ObsNum, sInd, TL.nStars, len(pInds), TK.currentTimeNorm.to("day").copy().round(2), TK.exoplanetObsTime.to("day").copy().round(2), ) self.logger.info(log_obs) self.vprint(log_obs) FA = False if sInd not in self.promoted_stars: ObsNum += ( 1 # we're making an observation so increment observation number ) pInds = np.where(SU.plan2star == sInd)[0] DRM["star_ind"] = sInd DRM["star_name"] = TL.Name[sInd] DRM["arrival_time"] = TK.currentTimeNorm.to("day").copy() DRM["OB_nb"] = TK.OBnumber DRM["ObsNum"] = ObsNum DRM["plan_inds"] = pInds.astype(int) # update visited list for selected star self.starVisits[sInd] += 1 # PERFORM DETECTION and populate revisit list attribute ( detected, det_fZ, det_JEZ, det_systemParams, det_SNR, FA, ) = self.observation_detection(sInd, det_intTime.copy(), det_mode) if np.any(detected > 0): self.sInd_detcounts[sInd] += 1 self.sInd_dettimes[sInd] = ( self.sInd_dettimes.get(sInd) or [] ) + [TK.currentTimeNorm.copy().to("day")] self.vprint(" Det. results are: %s" % (detected)) if ( np.any(self.is_earthlike(pInds.astype(int), sInd)) and self.sInd_detcounts[sInd] >= self.n_det_min ): good_2_promote = False if not self.promote_by_time: good_2_promote = True else: sp = SU.s[pInds] Ms = TL.MsTrue[sInd] Mp = SU.Mp[pInds] mu = const.G * (Mp + Ms) T = (2.0 * np.pi * np.sqrt(sp**3 / mu)).to("d") # star must have detections that span longer than half a # period and be in the habitable zone # and have a smaller radius that a sub-neptune if np.any( ( T / 2.0 < ( self.sInd_dettimes[sInd][-1] - self.sInd_dettimes[sInd][0] ) ) ): good_2_promote = True if sInd not in self.promoted_stars and good_2_promote: self.promoted_stars = np.union1d( self.promoted_stars, sInd ).astype(int) self.known_earths = np.union1d( self.known_earths, pInds[self.is_earthlike(pInds.astype(int), sInd)], ).astype(int) # populate the DRM with detection results DRM["det_time"] = det_intTime.to("day") DRM["det_status"] = detected DRM["det_SNR"] = det_SNR DRM["det_fZ"] = det_fZ.to("1/arcsec2") if np.any(pInds): DRM["det_JEZ"] = det_JEZ DRM["det_dMag"] = SU.dMag[pInds].tolist() DRM["det_WA"] = SU.WA[pInds].to("mas").value.tolist() DRM["det_params"] = det_systemParams DRM["det_mode"] = dict(det_mode) if det_intTime is not None: det_comp = Comp.comp_per_intTime( det_intTime, TL, sInd, det_fZ, TL.JEZ0[det_mode["hex"]][sInd], TL.int_WA[sInd], det_mode, )[0] DRM["det_comp"] = det_comp else: DRM["det_comp"] = 0.0 del DRM["det_mode"]["inst"], DRM["det_mode"]["syst"] # append result values to self.DRM self.DRM.append(DRM) # handle case of inf OBs and missionPortion < 1 if np.isinf(TK.OBduration) and (TK.missionPortion < 1.0): self.arbitrary_time_advancement( TK.currentTimeNorm.to("day").copy() - DRM["arrival_time"] ) else: self.char_starVisits[sInd] += 1 # PERFORM CHARACTERIZATION and populate spectra list attribute do_char = True for mode_index, char_mode in enumerate(char_modes): ( characterized, char_fZ, char_JEZ, char_systemParams, char_SNR, char_intTime, ) = self.test_observation_characterization( sInd, char_mode, mode_index ) if char_intTime is None: char_intTime = self.zero_d if char_intTime == self.zero_d: do_char = False TK.advanceToAbsTime(TK.currentTimeAbs.copy() + 0.5 * u.d) if do_char is True: # we're making an observation so increment observation number ObsNum += 1 pInds = np.where(SU.plan2star == sInd)[0] DRM["star_ind"] = sInd DRM["star_name"] = TL.Name[sInd] DRM["arrival_time"] = TK.currentTimeNorm.to("day").copy() DRM["OB_nb"] = TK.OBnumber DRM["ObsNum"] = ObsNum DRM["plan_inds"] = pInds.astype(int) DRM["char_info"] = [] for mode_index, char_mode in enumerate(char_modes): char_data = {} if char_mode["SNR"] not in [0, np.inf]: ( characterized, char_fZ, char_JEZ, char_systemParams, char_SNR, char_intTime, ) = self.observation_characterization( sInd, char_mode, mode_index ) if np.any(characterized): self.vprint( " Char. results are: %s" % (characterized.T) ) else: char_intTime = None lenChar = len(pInds) + 1 if FA else len(pInds) characterized = np.zeros(lenChar, dtype=float) char_SNR = np.zeros(lenChar, dtype=float) char_fZ = 0.0 / u.arcsec**2 char_systemParams = SU.dump_system_params(sInd) assert char_intTime != 0, "Integration time can't be 0." # populate the DRM with characterization results char_data["char_time"] = ( char_intTime.to("day") if char_intTime is not None else self.zero_d ) char_data["char_status"] = ( characterized[:-1] if FA else characterized ) char_data["char_SNR"] = char_SNR[:-1] if FA else char_SNR char_data["char_fZ"] = char_fZ.to("1/arcsec2") char_data["char_params"] = char_systemParams if char_intTime is not None and np.any(characterized): char_comp = Comp.comp_per_intTime( char_intTime, TL, sInd, char_fZ, TL.JEZ0[det_mode["hex"]][sInd], TL.int_WA[sInd], char_mode, )[0] DRM["char_comp"] = char_comp else: DRM["char_comp"] = 0.0 # populate the DRM with FA results char_data["FA_det_status"] = int(FA) char_data["FA_char_status"] = characterized[-1] if FA else 0 char_data["FA_char_SNR"] = char_SNR[-1] if FA else 0.0 char_data["FA_char_JEZ"] = ( self.lastDetected[sInd, 1][-1] / u.arcsec**2 if FA else 0.0 * self.JEZ_unit ) char_data["FA_char_dMag"] = ( self.lastDetected[sInd, 2][-1] if FA else 0.0 ) char_data["FA_char_WA"] = ( self.lastDetected[sInd, 3][-1] * u.arcsec if FA else self.zero_arcsec ) # populate the DRM with observation modes char_data["char_mode"] = dict(char_mode) del ( char_data["char_mode"]["inst"], char_data["char_mode"]["syst"], ) char_data["exoplanetObsTime"] = TK.exoplanetObsTime.copy() DRM["char_info"].append(char_data) # do not revisit partial char if lucky_planets if SU.lucky_planets: self.char_starVisits[sInd] = self.nVisitsMax # append result values to self.DRM self.DRM.append(DRM) # handle case of inf OBs and missionPortion < 1 if np.isinf(TK.OBduration) and (TK.missionPortion < 1.0): self.arbitrary_time_advancement( TK.currentTimeNorm.to("day").copy() - DRM["arrival_time"] ) else: # sInd == None sInd = old_sInd # Retain the last observed star if ( TK.currentTimeNorm.copy() >= TK.OBendTimes[TK.OBnumber] ): # currentTime is at end of OB # Conditional Advance To Start of Next OB if not TK.mission_is_over( OS, Obs, det_mode ): # as long as the mission is not over TK.advancetToStartOfNextOB() # Advance To Start of Next OB elif waitTime is not None: # CASE 1: Advance specific wait time _ = TK.advanceToAbsTime(TK.currentTimeAbs.copy() + waitTime) self.vprint("waitTime is not None") else: startTimes = ( TK.currentTimeAbs.copy() + np.zeros(TL.nStars) * u.d ) # Start Times of Observations observableTimes = Obs.calculate_observableTimes( TL, np.arange(TL.nStars), startTimes, self.koMaps, self.koTimes, base_det_mode, )[0] # CASE 2 If There are no observable targets for the rest # of the mission if ( observableTimes[ ( TK.missionFinishAbs.copy().value * u.d > observableTimes.value * u.d ) * ( observableTimes.value * u.d >= TK.currentTimeAbs.copy().value * u.d ) ].shape[0] ) == 0: self.vprint( ( "No Observable Targets for Remainder of mission at " "currentTimeNorm = {}" ).format(TK.currentTimeNorm.copy()) ) # Manually advancing time to mission end TK.currentTimeNorm = TK.missionLife TK.currentTimeAbs = TK.missionFinishAbs else: # CASE 3 nominal wait time if at least 1 target is still in # list and observable # TODO: ADD ADVANCE TO WHEN FZMIN OCURS inds1 = np.arange(TL.nStars)[ observableTimes.value * u.d > TK.currentTimeAbs.copy().value * u.d ] # apply intTime filter inds2 = np.intersect1d(self.intTimeFilterInds, inds1) # apply revisit Filter #NOTE this means stars you added to # the revisit list inds3 = self.revisitFilter( inds2, TK.currentTimeNorm.copy() + self.dt_max.to(u.d) ) self.vprint( "Filtering %d stars from advanceToAbsTime" % (TL.nStars - len(inds3)) ) oTnowToEnd = observableTimes[inds3] # there is at least one observableTime between now and the # end of the mission if not oTnowToEnd.value.shape[0] == 0: tAbs = np.min(oTnowToEnd) # advance to that observable time else: tAbs = ( TK.missionStart + TK.missionLife ) # advance to end of mission tmpcurrentTimeNorm = TK.currentTimeNorm.copy() # Advance Time to this time OR start of next OB following # this time _ = TK.advanceToAbsTime(tAbs) self.vprint( ( "No Observable Targets a currentTimeNorm = {:.2f} " "Advanced To currentTimeNorm= {:.2f}" ).format( tmpcurrentTimeNorm.to("day"), TK.currentTimeNorm.to("day"), ) ) else: # TK.mission_is_over() dtsim = (time.time() - t0) * u.s log_end = ( "Mission complete: no more time available.\n" + "Simulation duration: %s.\n" % dtsim.astype("int") + "Results stored in SurveySimulation.DRM (Design Reference Mission)." ) self.logger.info(log_end) self.vprint(log_end)
[docs] def next_target(self, old_sInd, det_modes, char_modes): """Finds index of next target star and calculates its integration time. This method chooses the next target star index based on which stars are available, their integration time, and maximum completeness. Returns None if no target could be found. Args: old_sInd (integer): Index of the previous target star mode (dict): Selected observing mode for detection Returns: tuple: DRM (dict): Design Reference Mission, contains the results of one complete observation (detection and characterization) sInd (integer): Index of next target star. Defaults to None. intTime (astropy Quantity): Selected star integration time for detection in units of day. Defaults to None. waitTime (astropy Quantity): a strategically advantageous amount of time to wait in the case of an occulter for slew times """ OS = self.OpticalSystem ZL = self.ZodiacalLight TL = self.TargetList Obs = self.Observatory TK = self.TimeKeeping SU = self.SimulatedUniverse # create DRM DRM = {} # allocate settling time + overhead time tmpCurrentTimeAbs = ( TK.currentTimeAbs.copy() + Obs.settlingTime + det_modes[0]["syst"]["ohTime"] ) tmpCurrentTimeNorm = ( TK.currentTimeNorm.copy() + Obs.settlingTime + det_modes[0]["syst"]["ohTime"] ) # create appropriate koMap koMap = self.koMaps[det_modes[0]["syst"]["name"]] char_koMap = self.koMaps[char_modes[0]["syst"]["name"]] # look for available targets # 1. initialize arrays slewTimes = np.zeros(TL.nStars) << u.d # fZs = np.zeros(TL.nStars) / u.arcsec**2.0 # dV = np.zeros(TL.nStars) * u.m / u.s intTimes = np.zeros(TL.nStars) << u.d char_intTimes = np.zeros(TL.nStars) << u.d char_intTimes_no_oh_d = np.zeros(TL.nStars) # obsTimes = np.zeros([2, TL.nStars]) * u.d char_tovisit = np.zeros(TL.nStars, dtype=bool) sInds = np.arange(TL.nStars) # 2. find spacecraft orbital START positions (if occulter, positions # differ for each star) and filter out unavailable targets # sd = None # 2.1 filter out totTimes > integration cutoff if len(sInds) > 0: char_sInds = np.intersect1d(sInds, self.promoted_stars).astype(int) sInds = np.intersect1d(self.intTimeFilterInds, sInds).astype(int) # start times, including slew times # startTimes = tmpCurrentTimeAbs.copy() + slewTimes startTimes = Time( np.full(TL.nStars, tmpCurrentTimeAbs.mjd), format="mjd", scale="tai" ) # startTimesNorm = tmpCurrentTimeNorm.copy() + slewTimes startTimesNorm = np.full(TL.nStars, tmpCurrentTimeNorm.to_value(u.d)) << u.d startTimes_mjd = startTimes.mjd rounded_start_times = np.round(startTimes_mjd) # 2.5 Filter stars not observable at startTimes koTimeInds = np.searchsorted(self.koTimes_mjd, rounded_start_times[sInds]) mask_valid = koTimeInds < len(self.koTimes_mjd) valid_matches = np.zeros_like(koTimeInds, dtype=bool) valid_matches[mask_valid] = ( self.koTimes_mjd[koTimeInds[mask_valid]] == rounded_start_times[sInds[mask_valid]] ) if np.any(valid_matches): sInds = sInds[valid_matches] koTimeInds = koTimeInds[valid_matches] tmpIndsbool = koMap[sInds, koTimeInds].astype(bool) sInds = sInds[tmpIndsbool] else: sInds = np.asarray([], dtype=int) # Get char stars observable at startTimes koTimeInds = np.searchsorted(self.koTimes_mjd, rounded_start_times[char_sInds]) mask_valid = koTimeInds < len(self.koTimes_mjd) valid_matches = np.zeros_like(koTimeInds, dtype=bool) valid_matches[mask_valid] = ( self.koTimes_mjd[koTimeInds[mask_valid]] == rounded_start_times[char_sInds[mask_valid]] ) if np.any(valid_matches): tmpIndsbool = char_koMap[char_sInds, koTimeInds].astype(bool) char_sInds = char_sInds[tmpIndsbool] else: char_sInds = np.asarray([], dtype=int) # 3. filter out all previously (more-)visited targets, unless in if len(sInds) > 0: sInds = self.revisitFilter(sInds, tmpCurrentTimeNorm) # revisit list, with time after start if np.any(char_sInds): char_tovisit[char_sInds] = ( self.char_starVisits[char_sInds] < self.nVisitsMax ) if self.char_starRevisit.size != 0: dt_rev = TK.currentTimeNorm.copy() - self.char_starRevisit[:, 1] * u.day ind_rev = [ int(x) for x in self.char_starRevisit[dt_rev > self.zero_d, 0] if x in char_sInds ] char_tovisit[ind_rev] = self.char_starVisits[ind_rev] < self.nVisitsMax char_sInds = np.where(char_tovisit)[0] # 4.1 calculate integration times for ALL preselected targets ( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife, ) = TK.get_ObsDetectionMaxIntTime(Obs, det_modes[0]) maxIntTime = min( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife, OS.intCutoff, ) # Maximum intTime allowed if len(sInds) > 0: intTimes[sInds] = self.calc_targ_intTime( sInds, startTimes[sInds], det_modes[0] ) * (1 + self.detMargin) sInds = sInds[ (intTimes[sInds] <= maxIntTime) ] # Filters targets exceeding end of OB endTimes_mjd = startTimes_mjd + intTimes.to_value(u.d) endTimes_rounded = np.round(endTimes_mjd) if maxIntTime.value <= 0: sInds = np.asarray([], dtype=int) if len(char_sInds) > 0: for char_mode in char_modes: ( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife, ) = TK.get_ObsDetectionMaxIntTime(Obs, char_mode) char_maxIntTime = min( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife, OS.intCutoff, ) # Maximum intTime allowed char_mode_intTimes = np.zeros(TL.nStars) << u.d char_mode_intTimes[char_sInds] = self.calc_targ_intTime( char_sInds, startTimes[char_sInds], char_mode ) * (1 + self.charMargin) char_mode_intTimes[np.isnan(char_mode_intTimes)] = 0 * u.d # Adjust integration time for stars with known earths around them for char_star in char_sInds: char_earths = np.intersect1d( np.where(SU.plan2star == char_star)[0], self.known_earths ).astype(int) if np.any(char_earths): fZ = ZL.fZ( Obs, TL, np.array([char_star], ndmin=1), startTimes[char_star].reshape(1), char_mode, ) JEZ = TL.JEZ0[char_mode["hex"]][char_star] if SU.lucky_planets: phi = (1 / np.pi) * np.ones(len(SU.d)) dMag = deltaMag(SU.p, SU.Rp, SU.d, phi)[ char_earths ] # delta magnitude WA = np.arctan(SU.a / TL.dist[SU.plan2star]).to("arcsec")[ char_earths ] # working angle else: dMag = SU.dMag[char_earths] WA = SU.WA[char_earths] if np.all((WA < char_mode["IWA"]) | (WA > char_mode["OWA"])): char_mode_intTimes[char_star] = self.zero_d else: earthlike_inttimes = OS.calc_intTime( TL, char_star, fZ, JEZ, dMag, WA, char_mode ) * (1 + self.charMargin) earthlike_inttimes[~np.isfinite(earthlike_inttimes)] = ( self.zero_d ) earthlike_inttime = earthlike_inttimes[ (earthlike_inttimes < char_maxIntTime) ] if len(earthlike_inttime) > 0: char_mode_intTimes[char_star] = np.max( earthlike_inttime ) char_intTimes_no_oh_d += char_mode_intTimes.to_value(u.d) char_intTimes += char_mode_intTimes + char_mode["syst"]["ohTime"] char_endTimes_mjd = ( startTimes_mjd + (char_intTimes.to_value(u.d) * char_mode["timeMultiplier"]) + Obs.settlingTime.to_value(u.d) ) char_endTimes_rounded = np.round(char_endTimes_mjd) # Filters with an inttime of 0 char_sInds = char_sInds[char_intTimes_no_oh_d[char_sInds] > 0] if char_maxIntTime.value <= 0: char_sInds = np.asarray([], dtype=int) # 5 remove char targets on ignore_stars list sInds = np.setdiff1d( sInds, np.intersect1d(sInds, self.promoted_stars).astype(int) ) char_sInds = np.setdiff1d( char_sInds, np.intersect1d(char_sInds, self.ignore_stars) ) # 6.2 Filter off coronograph stars with too many visits and no detections no_dets = np.logical_and( (self.starVisits[sInds] > self.n_det_remove), (self.sInd_detcounts[sInds] == 0), ) sInds = sInds[np.where(np.invert(no_dets))[0]] max_dets = np.where(self.sInd_detcounts[sInds] < self.max_successful_dets)[0] sInds = sInds[max_dets] # 5.1 TODO Add filter to filter out stars entering and exiting keepout # between startTimes and endTimes # 5.2 find spacecraft orbital END positions (for each candidate target), # and filter out unavailable targets if len(sInds) > 0 and Obs.checkKeepoutEnd: end_times = endTimes_rounded[sInds] # This finds where each end_time would be inserted to maintain order koTimeInds = np.searchsorted(self.koTimes_mjd, end_times) # Check that the found indices actually match the end_times mask_valid = koTimeInds < len(self.koTimes_mjd) valid_matches = np.zeros_like(koTimeInds, dtype=bool) valid_matches[mask_valid] = ( self.koTimes_mjd[koTimeInds[mask_valid]] == end_times[mask_valid] ) # Handle invalid matches if np.any(valid_matches): # Remove invalid matches from the koTimeInds list koTimeInds = koTimeInds[valid_matches] sInds = sInds[valid_matches] tmpIndsbool = koMap[sInds, koTimeInds].astype(bool) if np.any(tmpIndsbool): sInds = sInds[tmpIndsbool] else: sInds = np.asarray([], dtype=int) else: # If no star has a matching koTimeInd, set sInds to empty sInds = np.asarray([], dtype=int) if len(char_sInds) > 0 and Obs.checkKeepoutEnd: end_times = char_endTimes_rounded[char_sInds] # Use searchsorted to find indices in koTimes_mjd koTimeInds = np.searchsorted(self.koTimes_mjd, end_times) # Check if the found indices actually match the end_times mask_valid = koTimeInds < len(self.koTimes_mjd) valid_matches = np.zeros_like(koTimeInds, dtype=bool) valid_matches[mask_valid] = ( self.koTimes_mjd[koTimeInds[mask_valid]] == end_times[mask_valid] ) tmpIndsbool = np.zeros(len(char_sInds), dtype=bool) if np.any(valid_matches): # Retrieve observable status for valid matches tmpIndsbool[valid_matches] = char_koMap[ char_sInds[valid_matches], koTimeInds[valid_matches] ].astype(bool) # Filter char_sInds based on the boolean mask if np.any(tmpIndsbool): char_sInds = char_sInds[tmpIndsbool] else: char_sInds = np.asarray([], dtype=int) # 6. choose best target from remaining if len(sInds) > 0: # choose sInd of next target if np.any(char_sInds): sInd, waitTime = self.choose_next_target( old_sInd, char_sInds, slewTimes, char_intTimes[char_sInds] ) # store selected star integration time intTime = char_intTimes[sInd] else: sInd, waitTime = self.choose_next_target( old_sInd, sInds, slewTimes, intTimes[sInds] ) # store selected star integration time intTime = intTimes[sInd] # Should Choose Next Target decide there are no stars it wishes to # observe at this time. if (sInd is None) and (waitTime is not None): self.vprint( ( "There are no stars Choose Next Target would like to Observe. " "Waiting {}" ).format(waitTime) ) return DRM, None, None, waitTime, det_modes[0] elif (sInd is None) and (waitTime is None): self.vprint( ( "There are no stars Choose Next Target would like to Observe " "and waitTime is None" ) ) return DRM, None, None, waitTime, det_modes[0] # Perform dual band detections if necessary if (len(det_modes) > 1) and ( TL.int_WA[sInd] > det_modes[1]["IWA"] and TL.int_WA[sInd] < det_modes[1]["OWA"] ): # Only make a deep copy when we need to modify det_mode # to create a combined mode det_mode = copy.deepcopy(det_modes[0]) modifying_det_mode = True det_mode["BW"] = det_mode["BW"] + det_modes[1]["BW"] det_mode["inst"]["sread"] = ( det_mode["inst"]["sread"] + det_modes[1]["inst"]["sread"] ) det_mode["inst"]["idark"] = ( det_mode["inst"]["idark"] + det_modes[1]["inst"]["idark"] ) det_mode["inst"]["CIC"] = ( det_mode["inst"]["CIC"] + det_modes[1]["inst"]["CIC"] ) det_mode["syst"]["optics"] = np.mean( (det_mode["syst"]["optics"], det_modes[1]["syst"]["optics"]) ) det_mode["instName"] = "combined" else: # In the case where we don't need to modify the det_mode, we # can just use the original det_mode det_mode = det_modes[0] intTime = self.calc_targ_intTime( np.array([sInd]), startTimes[sInd].reshape(1), det_mode )[0] * (1 + self.detMargin) if intTime > maxIntTime and maxIntTime > 0 * u.d: intTime = maxIntTime # if no observable target, advanceTime to next Observable Target else: self.vprint( "No Observable Targets at currentTimeNorm= " + str(TK.currentTimeNorm.copy()) ) return DRM, None, None, None, det_modes[0] # store normalized start time for future completeness update self.lastObsTimes[sInd] = startTimesNorm[sInd] return DRM, sInd, intTime, waitTime, det_modes[0]
[docs] def choose_next_target(self, old_sInd, sInds, slewTimes, t_dets): """Choose next telescope target based on star completeness and integration time. Args: old_sInd (integer): Index of the previous target star sInds (integer array): Indices of available targets t_dets (astropy Quantity array): Integration times for detection in units of day Returns: sInd (integer): Index of next target star """ Comp = self.Completeness TL = self.TargetList TK = self.TimeKeeping # reshape sInds sInds = np.array(sInds, ndmin=1) # 1/ Choose next telescope target comps = Comp.completeness_update( TL, sInds, self.starVisits[sInds], TK.currentTimeNorm.copy() ) # add weight for star revisits ind_rev = [] if self.starRevisit.size != 0: dt_rev = self.starRevisit[:, 1] - TK.currentTimeNorm.to_value(u.d) # Vectorized version candidates = self.starRevisit[dt_rev < 0, 0].astype(int) mask = np.isin(candidates, sInds) ind_rev = candidates[mask] _starVisits = self.starVisits[sInds] f2_uv = np.where( (_starVisits > 0) & (_starVisits < self.nVisitsMax), _starVisits, 0, ) * (1 - (np.isin(sInds, ind_rev, invert=True))) l_extreme = max( [ np.abs(np.log10(np.min(TL.L[sInds]))), np.abs(np.log10(np.max(TL.L[sInds]))), ] ) if l_extreme == 0.0: l_weight = 1 else: l_weight = 1 - np.abs(np.log10(TL.L[sInds]) / l_extreme) ** self.lum_exp t_dets_val = t_dets.to_value(u.d) t_weight = t_dets_val / np.max(t_dets_val) weights = ( (comps + self.revisit_weight * f2_uv / float(self.nVisitsMax)) / t_weight ) * l_weight sInd = np.random.choice(sInds[weights == max(weights)]) return sInd, slewTimes[sInd]
[docs] def observation_characterization(self, sInd, mode, mode_index): """Finds if characterizations are possible and relevant information Args: sInd (integer): Integer index of the star of interest mode (dict): Selected observing mode for characterization Returns: characterized (integer list): Characterization status for each planet orbiting the observed target star including False Alarm if any, where 1 is full spectrum, -1 partial spectrum, and 0 not characterized fZ (astropy Quantity): Surface brightness of local zodiacal light in units of 1/arcsec2 JEZ (astropy Quantity): Intensity of exo-zodiacal light in units of ph/s/m2/arcsec2 systemParams (dict): Dictionary of time-dependant planet properties averaged over the duration of the integration SNR (float ndarray): Characterization signal-to-noise ratio of the observable planets. Defaults to None. intTime (astropy Quantity): Selected star characterization time in units of day. Defaults to None. """ OS = self.OpticalSystem ZL = self.ZodiacalLight TL = self.TargetList SU = self.SimulatedUniverse Obs = self.Observatory TK = self.TimeKeeping # find indices of planets around the target pInds = np.where(SU.plan2star == sInd)[0] JEZs = SU.scale_JEZ(sInd, mode) dMags = SU.dMag[pInds] if SU.lucky_planets: # used in the "partial char" check below WAs = np.arctan(SU.a[pInds] / TL.dist[sInd]).to("arcsec").value else: WAs = SU.WA[pInds].to("arcsec").value # get the detected status, and check if there was a FA # det = self.lastDetected[sInd,0] det = np.ones(pInds.size, dtype=bool) FA = len(det) == len(pInds) + 1 if FA: pIndsDet = np.append(pInds, -1)[det] else: pIndsDet = pInds[det] # initialize outputs, and check if there's anything (planet or FA) # to characterize characterized = np.zeros(len(det), dtype=int) fZ = 0.0 << self.inv_arcsec2 JEZ = 0.0 << self.JEZ_unit systemParams = SU.dump_system_params( sInd ) # write current system params by default SNR = np.zeros(len(det)) intTime = None if len(det) == 0: # nothing to characterize return characterized, fZ, JEZ, systemParams, SNR, intTime # look for last detected planets that have not been fully characterized if not (FA): # only true planets, no FA tochar = self.fullSpectra[mode_index][pIndsDet] == 0 else: # mix of planets and a FA truePlans = pIndsDet[:-1] tochar = np.append((self.fullSpectra[mode_index][truePlans] == 0), True) # 1/ find spacecraft orbital START position including overhead time, # and check keepout angle if np.any(tochar): # start times startTime = ( TK.currentTimeAbs.copy() + mode["syst"]["ohTime"] + Obs.settlingTime ) startTimeNorm = ( TK.currentTimeNorm.copy() + mode["syst"]["ohTime"] + Obs.settlingTime ) # planets to characterize koTimeInd = np.where(np.round(startTime.value) - self.koTimes.value == 0)[ 0 ][ 0 ] # find indice where koTime is startTime[0] # wherever koMap is 1, the target is observable koMap = self.koMaps[mode["syst"]["name"]] tochar[tochar] = koMap[sInd][koTimeInd] # 2/ if any planet to characterize, find the characterization times if np.any(tochar): # propagate the whole system to match up with current time # calculate characterization times at the detected JEZ, dMag, and WA pinds_earthlike = np.logical_and( np.array([(p in self.known_earths) for p in pIndsDet]), tochar ) fZ = ZL.fZ(Obs, TL, np.array([sInd], ndmin=1), startTime.reshape(1), mode) WAp = TL.int_WA[sInd] * np.ones(len(tochar)) dMag = TL.int_dMag[sInd] * np.ones(len(tochar)) # if lucky_planets, use lucky planet params for dMag and WA if SU.lucky_planets: phi = (1 / np.pi) * np.ones(len(SU.d)) e_dMag = deltaMag(SU.p, SU.Rp, SU.d, phi) # delta magnitude e_WA = np.arctan(SU.a / TL.dist[SU.plan2star]).to( "arcsec" ) # working angle else: e_dMag = SU.dMag e_WA = SU.WA WAp[((pinds_earthlike) & (tochar))] = e_WA[pIndsDet[pinds_earthlike]] dMag[((pinds_earthlike) & (tochar))] = e_dMag[pIndsDet[pinds_earthlike]] intTimes = np.zeros(len(tochar)) * u.day intTimes[tochar] = OS.calc_intTime( TL, sInd, fZ, JEZs[tochar], dMag[tochar], WAp[tochar], mode ) intTimes[~np.isfinite(intTimes)] = 0 * u.d # add a predetermined margin to the integration times intTimes = intTimes * (1 + self.charMargin) # apply time multiplier totTimes = intTimes * (mode["timeMultiplier"]) # end times endTimes = startTime + totTimes endTimesNorm = startTimeNorm + totTimes # planets to characterize tochar = ( (totTimes > 0) & (totTimes <= OS.intCutoff) & (endTimesNorm <= TK.OBendTimes[TK.OBnumber]) ) # 3/ is target still observable at the end of any char time? if np.any(tochar) and Obs.checkKeepoutEnd: koTimeInds = np.zeros(len(endTimes.value[tochar]), dtype=int) # find index in koMap where each endTime is closest to koTimes for t, endTime in enumerate(endTimes.value[tochar]): if endTime > self.koTimes.value[-1]: # case where endTime exceeds largest koTimes element endTimeInBounds = np.where( np.floor(endTime) - self.koTimes.value == 0 )[0] koTimeInds[t] = ( endTimeInBounds[0] if endTimeInBounds.size != 0 else -1 ) else: koTimeInds[t] = np.where( np.round(endTime) - self.koTimes.value == 0 )[0][ 0 ] # find indice where koTime is endTimes[0] tochar[tochar] = [koMap[sInd][koT] if koT >= 0 else 0 for koT in koTimeInds] # 4/ if yes, perform the characterization for the maximum char time if np.any(tochar): # Save Current Time before attempting time allocation currentTimeNorm = TK.currentTimeNorm.copy() currentTimeAbs = TK.currentTimeAbs.copy() if np.any(np.logical_and(pinds_earthlike, tochar)): intTime = np.max(intTimes[np.logical_and(pinds_earthlike, tochar)]) else: intTime = np.max(intTimes[tochar]) extraTime = intTime * (mode["timeMultiplier"] - 1.0) # calculates extraTime success = TK.allocate_time( intTime + extraTime + mode["syst"]["ohTime"] + Obs.settlingTime, True ) # allocates time if not (success): # Time was not successfully allocated char_intTime = None lenChar = len(pInds) + 1 if FA else len(pInds) characterized = np.zeros(lenChar, dtype=int) char_SNR = np.zeros(lenChar, dtype=float) char_fZ = 0.0 << self.inv_arcsec2 char_JEZ = 0.0 << self.JEZ_unit char_systemParams = SU.dump_system_params(sInd) # finally, populate the revisit list (NOTE: sInd becomes a float) t_rev = TK.currentTimeNorm.copy() + self.revisit_wait[sInd] revisit = np.array([sInd, t_rev.to("day").value]) if self.char_starRevisit.size == 0: self.char_starRevisit = np.array([revisit]) else: revInd = np.where(self.char_starRevisit[:, 0] == sInd)[0] if revInd.size == 0: self.char_starRevisit = np.vstack( (self.char_starRevisit, revisit) ) else: self.char_starRevisit[revInd, 1] = revisit[1] return ( characterized, char_fZ, char_JEZ, char_systemParams, char_SNR, char_intTime, ) pIndsChar = pIndsDet[tochar] log_char = " - Charact. planet(s) %s (%s/%s detected)" % ( pIndsChar, len(pIndsChar), len(pIndsDet), ) self.logger.info(log_char) self.vprint(log_char) # SNR CALCULATION: # first, calculate SNR for observable planets (without false alarm) planinds = pIndsChar[:-1] if pIndsChar[-1] == -1 else pIndsChar SNRplans = np.zeros(len(planinds)) if len(planinds) > 0: # initialize arrays for SNR integration fZs = np.zeros(self.ntFlux) << self.inv_arcsec2 JEZs = np.zeros((self.ntFlux, len(planinds))) << self.JEZ_unit systemParamss = np.empty(self.ntFlux, dtype="object") Ss = np.zeros((self.ntFlux, len(planinds))) Ns = np.zeros((self.ntFlux, len(planinds))) # integrate the signal (planet flux) and noise dt = intTime / float(self.ntFlux) timePlus = ( Obs.settlingTime.copy() + mode["syst"]["ohTime"].copy() ) # accounts for the time since the current time for i in range(self.ntFlux): # calculate signal and noise (electron count rates) if SU.lucky_planets: fZs[i] = ZL.fZ( Obs, TL, np.array([sInd], ndmin=1), currentTimeAbs.reshape(1), mode, )[0] Ss[i, :], Ns[i, :] = self.calc_signal_noise( sInd, planinds, dt, mode, fZ=fZs[i] ) # allocate first half of dt timePlus += dt / 2.0 # calculate current zodiacal light brightness fZs[i] = ZL.fZ( Obs, TL, np.array([sInd], ndmin=1), (currentTimeAbs + timePlus).reshape(1), mode, )[0] # propagate the system to match up with current time SU.propag_system( sInd, currentTimeNorm + timePlus - self.propagTimes[sInd] ) self.propagTimes[sInd] = currentTimeNorm + timePlus # Calculate the exozodi intensity JEZs[i] = SU.scale_JEZ(sInd, mode, pInds=planinds) # save planet parameters systemParamss[i] = SU.dump_system_params(sInd) # calculate signal and noise (electron count rates) if not SU.lucky_planets: Ss[i, :], Ns[i, :] = self.calc_signal_noise( sInd, planinds, dt, mode, fZ=fZs[i], JEZ=JEZs[i] ) # allocate second half of dt timePlus += dt / 2.0 # average output parameters fZ = np.mean(fZs) JEZ = np.mean(JEZs, axis=0) systemParams = { key: sum([systemParamss[x][key] for x in range(self.ntFlux)]) / float(self.ntFlux) for key in sorted(systemParamss[0]) } # calculate planets SNR S = Ss.sum(0) N = Ns.sum(0) SNRplans[N > 0] = S[N > 0] / N[N > 0] # allocate extra time for timeMultiplier # if only a FA, just save zodiacal brightness in the middle of the # integration else: # totTime = intTime * (mode["timeMultiplier"]) fZ = ZL.fZ( Obs, TL, np.array([sInd], ndmin=1), TK.currentTimeAbs.copy().reshape(1), mode, )[0] # Use the default star value if no planets JEZ = TL.JEZ0[mode["hex"]][sInd] # calculate the false alarm SNR (if any) SNRfa = [] if pIndsChar[-1] == -1: JEZ = JEZs[-1] dMag = dMags[-1] WA = WAs[-1] << u.arcsec C_p, C_b, C_sp = OS.Cp_Cb_Csp(TL, sInd, fZ, JEZ, dMag, WA, mode) S = (C_p * intTime).decompose().value N = np.sqrt((C_b * intTime + (C_sp * intTime) ** 2).decompose().value) SNRfa = S / N if N > 0 else 0.0 # save all SNRs (planets and FA) to one array SNRinds = np.where(det)[0][tochar] SNR[SNRinds] = np.append(SNRplans, SNRfa) # now, store characterization status: 1 for full spectrum, # -1 for partial spectrum, 0 for not characterized char = SNR >= mode["SNR"] # initialize with full spectra characterized = char.astype(int) WAchar = WAs[char] * u.arcsec # find the current WAs of characterized planets if SU.lucky_planets: # keep original WAs (note, the dump_system_params() above, whence comes # systemParams, does not understand lucky_planets) pass else: WAs = systemParams["WA"] if FA: WAs = np.append(WAs, WAs[-1] * u.arcsec) # check for partial spectra IWA_max = mode["IWA"] * (1.0 + mode["BW"] / 2.0) OWA_min = mode["OWA"] * (1.0 - mode["BW"] / 2.0) char[char] = (WAchar < IWA_max) | (WAchar > OWA_min) characterized[char] = -1 all_full = np.copy(characterized) all_full[char] = 0 if sInd not in self.sInd_charcounts.keys(): self.sInd_charcounts[sInd] = all_full else: self.sInd_charcounts[sInd] = self.sInd_charcounts[sInd] + all_full # encode results in spectra lists (only for planets, not FA) charplans = characterized[:-1] if FA else characterized self.fullSpectra[mode_index][pInds[charplans == 1]] += 1 self.partialSpectra[mode_index][pInds[charplans == -1]] += 1 # in both cases (detection or false alarm), schedule a revisit smin = np.min(SU.s[pInds[det]]) Ms = TL.MsTrue[sInd] # if target in promoted_stars list, schedule revisit based off of # semi-major axis if sInd in self.promoted_stars: sp = np.min(SU.a[pInds[det]]).to("AU") if np.any(det): pInd_smin = pInds[det][np.argmin(SU.a[pInds[det]])] Mp = SU.Mp[pInd_smin] else: Mp = SU.Mp.mean() mu = const.G * (Mp + Ms) T = 2.0 * np.pi * np.sqrt(sp**3 / mu) t_rev = TK.currentTimeNorm.copy() + T / 3.0 # otherwise schedule revisit based off of seperation elif smin is not None: sp = smin if np.any(det): pInd_smin = pInds[det][np.argmin(SU.s[pInds[det]])] Mp = SU.Mp[pInd_smin] else: Mp = SU.Mp.mean() mu = const.G * (Mp + Ms) T = 2.0 * np.pi * np.sqrt(sp**3 / mu) t_rev = TK.currentTimeNorm.copy() + T / 2.0 # otherwise, revisit based on average of population semi-major axis and mass else: sp = SU.s.mean() Mp = SU.Mp.mean() mu = const.G * (Mp + Ms) T = 2.0 * np.pi * np.sqrt(sp**3 / mu) t_rev = TK.currentTimeNorm.copy() + 0.75 * T # finally, populate the revisit list (NOTE: sInd becomes a float) revisit = np.array([sInd, t_rev.to("day").value]) if self.char_starRevisit.size == 0: self.char_starRevisit = np.array([revisit]) else: revInd = np.where(self.char_starRevisit[:, 0] == sInd)[0] if revInd.size == 0: self.char_starRevisit = np.vstack((self.char_starRevisit, revisit)) else: self.char_starRevisit[revInd, 1] = revisit[1] # add stars to filter list if np.any(characterized.astype(int) == 1): if np.any(self.sInd_charcounts[sInd] >= self.max_successful_chars): self.ignore_stars = np.union1d(self.ignore_stars, [sInd]).astype(int) return characterized.astype(int), fZ, JEZ, systemParams, SNR, intTime
[docs] def test_observation_characterization(self, sInd, mode, mode_index): """Finds if characterizations are possible and relevant information Args: sInd (integer): Integer index of the star of interest mode (dict): Selected observing mode for characterization Returns: characterized (integer list): Characterization status for each planet orbiting the observed target star including False Alarm if any, where 1 is full spectrum, -1 partial spectrum, and 0 not characterized fZ (astropy Quantity): Surface brightness of local zodiacal light in units of 1/arcsec2 JEZ (astropy Quantity): Intensity of exo-zodiacal light in units of ph/s/m2/arcsec2 systemParams (dict): Dictionary of time-dependant planet properties averaged over the duration of the integration SNR (float ndarray): Characterization signal-to-noise ratio of the observable planets. Defaults to None. intTime (astropy Quantity): Selected star characterization time in units of day. Defaults to None. """ OS = self.OpticalSystem ZL = self.ZodiacalLight TL = self.TargetList SU = self.SimulatedUniverse Obs = self.Observatory TK = self.TimeKeeping # find indices of planets around the target pInds = np.where(SU.plan2star == sInd)[0] JEZs = SU.scale_JEZ(sInd, mode) dMags = SU.dMag[pInds] # WAs = SU.WA[pInds].to("arcsec").value # get the detected status, and check if there was a FA # det = self.lastDetected[sInd,0] det = np.ones(pInds.size, dtype=bool) FA = len(det) == len(pInds) + 1 if FA: pIndsDet = np.append(pInds, -1)[det] else: pIndsDet = pInds[det] # initialize outputs, and check if there's anything (planet or FA) # to characterize characterized = np.zeros(len(det), dtype=int) fZ = 0.0 << self.inv_arcsec2 JEZ = 0.0 << self.JEZ_unit systemParams = SU.dump_system_params( sInd ) # write current system params by default SNR = np.zeros(len(det)) intTime = None if len(det) == 0: # nothing to characterize return characterized, fZ, JEZ, systemParams, SNR, intTime # look for last detected planets that have not been fully characterized if not (FA): # only true planets, no FA tochar = self.fullSpectra[mode_index][pIndsDet] == 0 else: # mix of planets and a FA truePlans = pIndsDet[:-1] tochar = np.append((self.fullSpectra[mode_index][truePlans] == 0), True) # 1/ find spacecraft orbital START position including overhead time, # and check keepout angle if np.any(tochar): # start times startTime = ( TK.currentTimeAbs.copy() + mode["syst"]["ohTime"] + Obs.settlingTime ) startTimeNorm = ( TK.currentTimeNorm.copy() + mode["syst"]["ohTime"] + Obs.settlingTime ) # planets to characterize koTimeInd = np.where(np.round(startTime.value) - self.koTimes.value == 0)[ 0 ][ 0 ] # find indice where koTime is startTime[0] # wherever koMap is 1, the target is observable koMap = self.koMaps[mode["syst"]["name"]] tochar[tochar] = koMap[sInd][koTimeInd] # 2/ if any planet to characterize, find the characterization times if np.any(tochar): # propagate the whole system to match up with current time # calculate characterization times at the detected JEZ, dMag, and WA pinds_earthlike = np.logical_and( np.array([(p in self.known_earths) for p in pIndsDet]), tochar ) fZ = ZL.fZ(Obs, TL, np.array([sInd], ndmin=1), startTime.reshape(1), mode) JEZ = SU.scale_JEZ(sInd, mode) dMag = dMags[tochar] WAp = TL.int_WA[sInd] * np.ones(len(tochar)) dMag = TL.int_dMag[sInd] * np.ones(len(tochar)) # if lucky_planets, use lucky planet params for dMag and WA if SU.lucky_planets: phi = (1 / np.pi) * np.ones(len(SU.d)) e_dMag = deltaMag(SU.p, SU.Rp, SU.d, phi) # delta magnitude e_WA = np.arctan(SU.a / TL.dist[SU.plan2star]).to( "arcsec" ) # working angle else: e_dMag = SU.dMag e_WA = SU.WA WAp[((pinds_earthlike) & (tochar))] = e_WA[pIndsDet[pinds_earthlike]] dMag[((pinds_earthlike) & (tochar))] = e_dMag[pIndsDet[pinds_earthlike]] intTimes = np.zeros(len(tochar)) << u.d intTimes[tochar] = OS.calc_intTime( TL, sInd, fZ, JEZ[tochar], dMag[tochar], WAp[tochar], mode ) intTimes[~np.isfinite(intTimes)] = self.zero_d # add a predetermined margin to the integration times intTimes = intTimes * (1 + self.charMargin) # apply time multiplier totTimes = intTimes * (mode["timeMultiplier"]) # end times endTimes = startTime + totTimes endTimesNorm = startTimeNorm + totTimes # planets to characterize tochar = ( (totTimes > 0) & (totTimes <= OS.intCutoff) & (endTimesNorm <= TK.OBendTimes[TK.OBnumber]) ) # 3/ is target still observable at the end of any char time? if np.any(tochar) and Obs.checkKeepoutEnd: koTimeInds = np.zeros(len(endTimes.value[tochar]), dtype=int) # find index in koMap where each endTime is closest to koTimes for t, endTime in enumerate(endTimes.value[tochar]): if endTime > self.koTimes.value[-1]: # case where endTime exceeds largest koTimes element endTimeInBounds = np.where( np.floor(endTime) - self.koTimes.value == 0 )[0] koTimeInds[t] = ( endTimeInBounds[0] if endTimeInBounds.size != 0 else -1 ) else: koTimeInds[t] = np.where( np.round(endTime) - self.koTimes.value == 0 )[0][ 0 ] # find indice where koTime is endTimes[0] tochar[tochar] = [koMap[sInd][koT] if koT >= 0 else 0 for koT in koTimeInds] # 4/ if yes, perform the characterization for the maximum char time if np.any(tochar): if np.any(np.logical_and(pinds_earthlike, tochar)): intTime = np.max(intTimes[np.logical_and(pinds_earthlike, tochar)]) else: intTime = np.max(intTimes[tochar]) extraTime = intTime * (mode["timeMultiplier"] - 1.0) # calculates extraTime dt = intTime + extraTime + mode["syst"]["ohTime"] + Obs.settlingTime if ( (dt.value <= 0 or dt.value == np.inf) or (TK.currentTimeNorm.copy() + dt > TK.missionLife.to("day")) or (TK.currentTimeNorm.copy() + dt > TK.OBendTimes[TK.OBnumber]) ): success = ( False # The temporal block to allocate is not positive nonzero ) else: success = True # success = TK.allocate_time(intTime + extraTime + mode['syst']['ohTime'] # + Obs.settlingTime, True)#allocates time if not (success): # Time was not successfully allocated char_intTime = None lenChar = len(pInds) + 1 if FA else len(pInds) characterized = np.zeros(lenChar, dtype=float) char_SNR = np.zeros(lenChar, dtype=float) char_fZ = 0.0 << self.inv_arcsec2 char_JEZ = 0.0 << self.JEZ_unit char_systemParams = SU.dump_system_params(sInd) return ( characterized, char_fZ, char_JEZ, char_systemParams, char_SNR, char_intTime, ) # pIndsChar = pIndsDet[tochar] return characterized.astype(int), fZ, JEZ, systemParams, SNR, intTime
[docs] def scheduleRevisit(self, sInd, smin, det, pInds): """A Helper Method for scheduling revisits after observation detection Args: sInd - sInd of the star just detected smin - minimum separation of the planet to star of planet just detected det - pInds - Indices of planets around target star Return: updates self.starRevisit attribute """ TK = self.TimeKeeping t_rev = TK.currentTimeNorm.copy() + self.revisit_wait[sInd] # finally, populate the revisit list (NOTE: sInd becomes a float) revisit = np.array([sInd, t_rev.to_value(u.d)]) if self.starRevisit.size == 0: # If starRevisit has nothing in it self.starRevisit = np.array([revisit]) # initialize sterRevisit else: revInd = np.where(self.starRevisit[:, 0] == sInd)[ 0 ] # indices of the first column of the starRevisit list containing sInd if revInd.size == 0: self.starRevisit = np.vstack((self.starRevisit, revisit)) else: self.starRevisit[revInd, 1] = revisit[1] # over
[docs] def revisitFilter(self, sInds, tmpCurrentTimeNorm): """Helper method for Overloading Revisit Filtering Args: sInds - indices of stars still in observation list tmpCurrentTimeNorm (MJD) - the simulation time after overhead was added in MJD form Returns: ~numpy.ndarray(int): sInds - indices of stars still in observation list """ tovisit = np.zeros( self.TargetList.nStars, dtype=bool ) # tovisit is a boolean array containing the if len(sInds) > 0: # so long as there is at least 1 star left in sInds tovisit[sInds] = (self.starVisits[sInds] == min(self.starVisits[sInds])) & ( self.starVisits[sInds] < self.nVisitsMax ) # Checks that no star has exceeded the number of revisits if ( self.starRevisit.size != 0 ): # There is at least one revisit planned in starRevisit dt_rev = ( self.starRevisit[:, 1] * u.day - tmpCurrentTimeNorm ) # absolute temporal spacing between revisit and now. # return indices of all revisits within a threshold dt_max of # revisit day and indices of all revisits with no detections # past the revisit time ind_rev2 = [ int(x) for x in self.starRevisit[dt_rev < self.zero_d, 0] if (x in sInds) ] tovisit[ind_rev2] = self.starVisits[ind_rev2] < self.nVisitsMax sInds = np.where(tovisit)[0] return sInds