Source code for EXOSIMS.SurveySimulation.linearJScheduler

from EXOSIMS.Prototypes.SurveySimulation import SurveySimulation
import astropy.units as u
import numpy as np
import astropy.constants as const
from EXOSIMS.util._numpy_compat import copy_if_needed


[docs] class linearJScheduler(SurveySimulation): """linearJScheduler This class implements the linear cost function scheduler described in [Savransky2010]_ Args: coeffs (iterable 6x1): Cost function coefficients: slew distance, completeness, least visited planet ramp, unvisited known RV planet ramp, least visited ramp, unvisited ramp revisit_wait (float): The time required for the scheduler to wait before a target may be revisited find_known_RV (boolean): A flag that turns on the ability to identify known RV stars. The stars with known rocky planets have their int_comp value set to 1.0. specs (dict): :ref:`sec:inputspec` """ def __init__( self, coeffs=[1, 1, 1, 1, 2, 1], revisit_wait=91.25, find_known_RV=False, **specs, ): SurveySimulation.__init__(self, **specs) TL = self.TargetList # verify that coefficients input is iterable 6x1 if not (isinstance(coeffs, (list, tuple, np.ndarray))) or (len(coeffs) != 6): raise TypeError("coeffs must be a 6 element iterable") # Add to outspec self._outspec["coeffs"] = coeffs self._outspec["revisit_wait"] = revisit_wait # normalize coefficients coeffs = np.array(coeffs) coeffs = coeffs / np.linalg.norm(coeffs, ord=1) self.coeffs = coeffs self.find_known_RV = find_known_RV self.revisit_wait = revisit_wait * u.d self.earth_candidates = ( [] ) # list of detected earth-like planets aroung promoted stars self.no_dets = np.ones(self.TargetList.nStars, dtype=bool) self.known_stars = np.array([]) self.known_rocky = np.array([]) if self.find_known_RV: self.known_stars, self.known_rocky = self.find_known_plans() TL.int_comp[self.known_rocky] = 1.0
[docs] def next_target(self, old_sInd, mode): """Finds index of next target star and calculates its integration time. This method chooses the next target star index based on which stars are available, their integration time, and maximum completeness. Returns None if no target could be found. Args: old_sInd (integer): Index of the previous target star mode (dict): Selected observing mode for detection Returns: DRM (dict): Design Reference Mission, contains the results of one complete observation (detection and characterization) sInd (integer): Index of next target star. Defaults to None. intTime (astropy Quantity): Selected star integration time for detection in units of day. Defaults to None. waitTime (astropy Quantity): a strategically advantageous amount of time to wait in the case of an occulter for slew times """ OS = self.OpticalSystem TL = self.TargetList Obs = self.Observatory TK = self.TimeKeeping # create DRM DRM = {} # create appropriate koMap koMap = self.koMaps[mode["syst"]["name"]] # allocate settling time + overhead time tmpCurrentTimeAbs = ( TK.currentTimeAbs.copy() + Obs.settlingTime + mode["syst"]["ohTime"] ) tmpCurrentTimeNorm = ( TK.currentTimeNorm.copy() + Obs.settlingTime + mode["syst"]["ohTime"] ) # look for available targets # 1. initialize arrays slewTimes = np.zeros(TL.nStars) * u.d # fZs = np.zeros(TL.nStars) / u.arcsec**2 dV = np.zeros(TL.nStars) * u.m / u.s intTimes = np.zeros(TL.nStars) * u.d obsTimes = np.zeros([2, TL.nStars]) * u.d sInds = np.arange(TL.nStars) # 2. find spacecraft orbital START positions (if occulter, positions # differ for each star) and filter out unavailable targets sd = None if OS.haveOcculter: sd = Obs.star_angularSep(TL, old_sInd, sInds, tmpCurrentTimeAbs) obsTimes = Obs.calculate_observableTimes( TL, sInds, tmpCurrentTimeAbs, self.koMaps, self.koTimes, mode ) slewTimes = Obs.calculate_slewTimes( TL, old_sInd, sInds, sd, obsTimes, tmpCurrentTimeAbs ) # 2.1 filter out totTimes > integration cutoff if len(sInds.tolist()) > 0: sInds = np.intersect1d(self.intTimeFilterInds, sInds) # start times, including slew times startTimes = tmpCurrentTimeAbs.copy() + slewTimes startTimesNorm = tmpCurrentTimeNorm.copy() + slewTimes # 2.5 Filter stars not observable at startTimes try: tmpIndsbool = list() for i in np.arange(len(sInds)): koTimeInd = np.where( np.round(startTimes[sInds[i]].value) - self.koTimes.value == 0 )[0][ 0 ] # find indice where koTime is startTime[0] tmpIndsbool.append( koMap[sInds[i]][koTimeInd].astype(bool) ) # Is star observable at time ind sInds = sInds[tmpIndsbool] del tmpIndsbool except: # noqa: E722 If there are no target stars to observe sInds = np.asarray([], dtype=int) # 3. filter out all previously (more-)visited targets, unless in if len(sInds.tolist()) > 0: sInds = self.revisitFilter(sInds, tmpCurrentTimeNorm) # 4.1 calculate integration times for ALL preselected targets ( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife, ) = TK.get_ObsDetectionMaxIntTime(Obs, mode) maxIntTime = min( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife ) # Maximum intTime allowed if len(sInds.tolist()) > 0: intTimes[sInds] = self.calc_targ_intTime(sInds, startTimes[sInds], mode) sInds = sInds[ np.where(intTimes[sInds] <= maxIntTime) ] # Filters targets exceeding end of OB endTimes = startTimes + intTimes if maxIntTime.value <= 0: sInds = np.asarray([], dtype=int) # 5.1 TODO Add filter to filter out stars entering and exiting keepout # between startTimes and endTimes # 5.2 find spacecraft orbital END positions (for each candidate target), # and filter out unavailable targets if len(sInds.tolist()) > 0 and Obs.checkKeepoutEnd: # endTimes may exist past koTimes so we have an exception to hand this case try: tmpIndsbool = list() for i in np.arange(len(sInds)): koTimeInd = np.where( np.round(endTimes[sInds[i]].value) - self.koTimes.value == 0 )[0][ 0 ] # find indice where koTime is endTime[0] tmpIndsbool.append( koMap[sInds[i]][koTimeInd].astype(bool) ) # Is star observable at time ind sInds = sInds[tmpIndsbool] del tmpIndsbool except: # noqa: E722 sInds = np.asarray([], dtype=int) # 6. choose best target from remaining if len(sInds.tolist()) > 0: # choose sInd of next target sInd, waitTime = self.choose_next_target( old_sInd, sInds, slewTimes, intTimes[sInds] ) # Should Choose Next Target decide there are no stars it wishes to # observe at this time. if (sInd is None) and (waitTime is not None): self.vprint( ( "There are no stars Choose Next Target would like to Observe. " "Waiting {}" ).format(waitTime) ) return DRM, None, None, waitTime elif (sInd is None) and (waitTime is not None): self.vprint( ( "There are no stars Choose Next Target would like to Observe " "and waitTime is None" ) ) return DRM, None, None, waitTime # store selected star integration time intTime = intTimes[sInd] # if no observable target, advanceTime to next Observable Target else: self.vprint( "No Observable Targets at currentTimeNorm= " + str(TK.currentTimeNorm.copy()) ) return DRM, None, None, None # update visited list for selected star self.starVisits[sInd] += 1 # store normalized start time for future completeness update self.lastObsTimes[sInd] = startTimesNorm[sInd] # populate DRM with occulter related values if OS.haveOcculter: DRM = Obs.log_occulterResults( DRM, slewTimes[sInd], sInd, sd[sInd], dV[sInd] ) return DRM, sInd, intTime, slewTimes[sInd] return DRM, sInd, intTime, waitTime
[docs] def choose_next_target(self, old_sInd, sInds, slewTimes, intTimes): """Choose next target based on truncated depth first search of linear cost function. Args: old_sInd (integer): Index of the previous target star sInds (integer array): Indices of available targets slewTimes (astropy quantity array): slew times to all stars (must be indexed by sInds) intTimes (astropy Quantity array): Integration times for detection in units of day Returns: tuple: sInd (int): Index of next target star waitTime (astropy.units.Quantity or None): the amount of time to wait (this method returns None) """ Comp = self.Completeness TL = self.TargetList TK = self.TimeKeeping OS = self.OpticalSystem Obs = self.Observatory allModes = OS.observingModes # cast sInds to array sInds = np.array(sInds, ndmin=1, copy=copy_if_needed) known_sInds = np.intersect1d(sInds, self.known_rocky) # current star has to be in the adjmat if (old_sInd is not None) and (old_sInd not in sInds): sInds = np.append(sInds, old_sInd) # calculate dt since previous observation dt = TK.currentTimeNorm.copy() + slewTimes[sInds] - self.lastObsTimes[sInds] # get dynamic completeness values comps = Comp.completeness_update(TL, sInds, self.starVisits[sInds], dt) for idx, sInd in enumerate(sInds): if sInd in known_sInds: comps[idx] = 1.0 # if first target, or if only 1 available target, # choose highest available completeness nStars = len(sInds) if (old_sInd is None) or (nStars == 1): sInd = np.random.choice(sInds[comps == max(comps)]) return sInd, slewTimes[sInd] # define adjacency matrix A = np.zeros((nStars, nStars)) # 0/ only consider slew distance when there's an occulter if OS.haveOcculter: r_ts = TL.starprop(sInds, TK.currentTimeAbs.copy()) u_ts = ( r_ts.to("AU").value.T / np.linalg.norm(r_ts.to("AU").value, axis=1) ).T angdists = np.arccos(np.clip(np.dot(u_ts, u_ts.T), -1, 1)) A[np.ones((nStars), dtype=bool)] = angdists A = self.coeffs[0] * (A) / np.pi # 1/ add factor due to completeness A = A + self.coeffs[1] * (1 - comps) # add factor for unvisited ramp for known stars if np.any(known_sInds): # 2/ add factor for least visited known stars f_uv = np.zeros(nStars) u1 = np.isin(sInds, known_sInds) u2 = self.starVisits[sInds] == min(self.starVisits[known_sInds]) unvisited = np.logical_and(u1, u2) f_uv[unvisited] = ( float(TK.currentTimeNorm.copy() / TK.missionLife.copy()) ** 2 ) A = A - self.coeffs[2] * f_uv # 3/ add factor for unvisited known stars no_visits = np.zeros(nStars) u2 = self.starVisits[sInds] == 0 unvisited = np.logical_and(u1, u2) no_visits[unvisited] = 1.0 A = A - self.coeffs[3] * no_visits # 4/ add factor due to unvisited ramp f_uv = np.zeros(nStars) unvisited = self.starVisits[sInds] == 0 f_uv[unvisited] = float(TK.currentTimeNorm.copy() / TK.missionLife.copy()) ** 2 A = A - self.coeffs[4] * f_uv # 5/ add factor due to revisited ramp if self.starRevisit.size != 0: f2_uv = 1 - (np.isin(sInds, self.starRevisit[:, 0])) A = A + self.coeffs[5] * f2_uv # kill diagonal A = A + np.diag(np.ones(nStars) * np.inf) # take two traversal steps step1 = np.tile(A[sInds == old_sInd, :], (nStars, 1)).flatten("F") step2 = A[np.array(np.ones((nStars, nStars)), dtype=bool)] tmp = np.nanargmin(step1 + step2) sInd = sInds[int(np.floor(tmp / float(nStars)))] waitTime = slewTimes[sInd] # Check if exoplanetObsTime would be exceeded mode = list(filter(lambda mode: mode["detectionMode"], allModes))[0] ( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife, ) = TK.get_ObsDetectionMaxIntTime(Obs, mode) maxIntTime = min( maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife ) # Maximum intTime allowed intTimes2 = self.calc_targ_intTime( np.array([sInd]), TK.currentTimeAbs.copy(), mode ) if ( intTimes2 > maxIntTime ): # check if max allowed integration time would be exceeded self.vprint("max allowed integration time would be exceeded") sInd = None waitTime = 1.0 * u.d return sInd, waitTime
[docs] def revisitFilter(self, sInds, tmpCurrentTimeNorm): """Helper method for Overloading Revisit Filtering Args: sInds(int numpy.ndarray): indices of stars still in observation list tmpCurrentTimeNorm (MJD) the simulation time after overhead was added in MJD Returns: int numpy.ndarray: sInds - indices of stars still in observation list """ tovisit = np.zeros( self.TargetList.nStars, dtype=bool ) # tovisit is a boolean array containing the if len(sInds) > 0: # so long as there is at least 1 star left in sInds tovisit[sInds] = (self.starVisits[sInds] == min(self.starVisits[sInds])) & ( self.starVisits[sInds] < self.nVisitsMax ) # Checks that no star has exceeded the number of revisits if ( self.starRevisit.size != 0 ): # There is at least one revisit planned in starRevisit dt_rev = ( self.starRevisit[:, 1] * u.day - tmpCurrentTimeNorm ) # absolute temporal spacing between revisit and now. # return indices of all revisits within a threshold dt_max of revisit # day and indices of all revisits with no detections past the # revisit time ind_rev2 = [ int(x) for x in self.starRevisit[dt_rev < 0 * u.d, 0] if (x in sInds) ] tovisit[ind_rev2] = self.starVisits[ind_rev2] < self.nVisitsMax sInds = np.where(tovisit)[0] return sInds
[docs] def scheduleRevisit(self, sInd, smin, det, pInds): """A Helper Method for scheduling revisits after observation detection Args: sInd (int): sInd of the star just detected smin (float): minimum separation of the planet to star of planet just detected det (bool numpy.ndarray): detection array pInds (int numpy.ndarray): Indices of planets around target star Returns: None updates self.starRevisit attribute """ TK = self.TimeKeeping TL = self.TargetList SU = self.SimulatedUniverse # in both cases (detection or false alarm), schedule a revisit # based on minimum separation Ms = TL.MsTrue[sInd] if ( smin is not None and smin is not np.nan ): # smin is None if no planet was detected sp = smin if np.any(det): pInd_smin = pInds[det][np.argmin(SU.s[pInds[det]])] Mp = SU.Mp[pInd_smin] else: Mp = SU.Mp.mean() mu = const.G * (Mp + Ms) T = 2.0 * np.pi * np.sqrt(sp**3 / mu) t_rev = TK.currentTimeNorm.copy() + T / 2.0 # otherwise, revisit based on average of population semi-major axis and mass else: sp = SU.s.mean() Mp = SU.Mp.mean() mu = const.G * (Mp + Ms) T = 2.0 * np.pi * np.sqrt(sp**3 / mu) t_rev = TK.currentTimeNorm.copy() + 0.75 * T # if no detections then schedule revisit based off of revisit_weight if not np.any(det): t_rev = TK.currentTimeNorm.copy() + self.revisit_wait self.no_dets[sInd] = True else: self.no_dets[sInd] = False t_rev = TK.currentTimeNorm.copy() + self.revisit_wait # finally, populate the revisit list (NOTE: sInd becomes a float) revisit = np.array([sInd, t_rev.to("day").value]) if self.starRevisit.size == 0: # If starRevisit has nothing in it self.starRevisit = np.array([revisit]) # initialize sterRevisit else: revInd = np.where(self.starRevisit[:, 0] == sInd)[ 0 ] # indices of the first column of the starRevisit list containing sInd if revInd.size == 0: self.starRevisit = np.vstack((self.starRevisit, revisit)) else: self.starRevisit[revInd, 1] = revisit[1] # over
[docs] def observation_characterization(self, sInd, mode): """Finds if characterizations are possible and relevant information Args: sInd (int): Integer index of the star of interest mode (dict): Selected observing mode for characterization Returns: tuple: characterized (integer list): Characterization status for each planet orbiting the observed target star including False Alarm if any, where 1 is full spectrum, -1 partial spectrum, and 0 not characterized fZ (astropy Quantity): Surface brightness of local zodiacal light in units of 1/arcsec2 JEZ (astropy.units.Quantity(numpy.ndarray(float))): Intensity of exo-zodiacal light in units of photons/s/m2/arcsec2 systemParams (dict): Dictionary of time-dependant planet properties averaged over the duration of the integration SNR (float ndarray): Characterization signal-to-noise ratio of the observable planets. Defaults to None. intTime (astropy Quantity): Selected star characterization time in units of day. Defaults to None. """ OS = self.OpticalSystem ZL = self.ZodiacalLight TL = self.TargetList SU = self.SimulatedUniverse Obs = self.Observatory TK = self.TimeKeeping # find indices of planets around the target pInds = np.where(SU.plan2star == sInd)[0] # get the detected status, and check if there was a FA det = self.lastDetected[sInd, 0] FA = len(det) == len(pInds) + 1 if FA: pIndsDet = np.append(pInds, -1)[det] else: pIndsDet = pInds[det] # initialize outputs, and check if there's anything (planet or FA) to # characterize characterized = np.zeros(len(det), dtype=int) fZ = 0.0 / u.arcsec**2 systemParams = SU.dump_system_params( sInd ) # write current system params by default JEZ = np.zeros(len(det)) * u.ph / u.s / u.m**2 / u.arcsec**2 SNR = np.zeros(len(det)) intTime = None if len(det) == 0: # nothing to characterize return characterized, fZ, JEZ, systemParams, SNR, intTime # look for last detected planets that have not been fully characterized if not (FA): # only true planets, no FA tochar = self.fullSpectra[pIndsDet] == 0 else: # mix of planets and a FA truePlans = pIndsDet[:-1] tochar = np.append((self.fullSpectra[truePlans] == 0), True) # 1/ find spacecraft orbital START position including overhead time, # and check keepout angle if np.any(tochar): # start times startTime = ( TK.currentTimeAbs.copy() + mode["syst"]["ohTime"] + Obs.settlingTime ) startTimeNorm = ( TK.currentTimeNorm.copy() + mode["syst"]["ohTime"] + Obs.settlingTime ) # planets to characterize koTimeInd = np.where(np.round(startTime.value) - self.koTimes.value == 0)[ 0 ][ 0 ] # find indice where koTime is startTime[0] # wherever koMap is 1, the target is observable koMap = self.koMaps[mode["syst"]["name"]] tochar[tochar] = koMap[sInd][koTimeInd] # 2/ if any planet to characterize, find the characterization times # at the detected JEZ, dMag, and WA if np.any(tochar): is_earthlike = np.logical_and( np.array([(p in self.earth_candidates) for p in pIndsDet]), tochar ) fZ = ZL.fZ(Obs, TL, np.array([sInd], ndmin=1), startTime.reshape(1), mode)[ 0 ] JEZ = self.lastDetected[sInd, 1][det][tochar] dMag = self.lastDetected[sInd, 2][det][tochar] WA = self.lastDetected[sInd, 3][det][tochar] * u.arcsec WA[is_earthlike[tochar]] = SU.WA[pIndsDet[is_earthlike]] dMag[is_earthlike[tochar]] = SU.dMag[pIndsDet[is_earthlike]] intTimes = np.zeros(len(tochar)) * u.day intTimes[tochar] = OS.calc_intTime( TL, sInd, fZ, JEZ, dMag, WA, mode, TK=self.TimeKeeping ) intTimes[~np.isfinite(intTimes)] = 0 * u.d # add a predetermined margin to the integration times intTimes = intTimes * (1.0 + self.charMargin) # apply time multiplier totTimes = intTimes * (mode["timeMultiplier"]) # end times endTimes = startTime + totTimes endTimesNorm = startTimeNorm + totTimes # planets to characterize tochar = ( (totTimes > 0) & (totTimes <= OS.intCutoff) & (endTimesNorm <= TK.OBendTimes[TK.OBnumber]) ) # 3/ is target still observable at the end of any char time? if np.any(tochar) and Obs.checkKeepoutEnd: koTimeInds = np.zeros(len(endTimes.value[tochar]), dtype=int) # find index in koMap where each endTime is closest to koTimes for t, endTime in enumerate(endTimes.value[tochar]): if endTime > self.koTimes.value[-1]: # case where endTime exceeds largest koTimes element endTimeInBounds = np.where( np.floor(endTime) - self.koTimes.value == 0 )[0] koTimeInds[t] = ( endTimeInBounds[0] if endTimeInBounds.size != 0 else -1 ) else: koTimeInds[t] = np.where( np.round(endTime) - self.koTimes.value == 0 )[0][ 0 ] # find indice where koTime is endTimes[0] tochar[tochar] = [koMap[sInd][koT] if koT >= 0 else 0 for koT in koTimeInds] # 4/ if yes, allocate the overhead time, and perform the characterization # for the maximum char time if np.any(tochar): # Save Current Time before attempting time allocation currentTimeNorm = TK.currentTimeNorm.copy() currentTimeAbs = TK.currentTimeAbs.copy() # Allocate Time if np.any(np.logical_and(is_earthlike, tochar)): intTime = np.max(intTimes[np.logical_and(is_earthlike, tochar)]) else: intTime = np.max(intTimes[tochar]) extraTime = intTime * (mode["timeMultiplier"] - 1.0) # calculates extraTime success = TK.allocate_time( intTime + extraTime + mode["syst"]["ohTime"] + Obs.settlingTime, True ) # allocates time if not (success): # Time was not successfully allocated # Identical to when "if char_mode['SNR'] not in [0, np.inf]:" # in run_sim() char_intTime = None lenChar = len(pInds) + 1 if FA else len(pInds) characterized = np.zeros(lenChar, dtype=float) char_SNR = np.zeros(lenChar, dtype=float) char_fZ = 0.0 / u.arcsec**2 char_JEZ = np.zeros(lenChar) * u.ph / u.s / u.m**2 / u.arcsec**2 char_systemParams = SU.dump_system_params(sInd) return ( characterized, char_fZ, char_JEZ, char_systemParams, char_SNR, char_intTime, ) pIndsChar = pIndsDet[tochar] log_char = " - Charact. planet inds %s (%s/%s detected)" % ( pIndsChar, len(pIndsChar), len(pIndsDet), ) self.logger.info(log_char) self.vprint(log_char) # SNR CALCULATION: # first, calculate SNR for observable planets (without false alarm) planinds = pIndsChar[:-1] if pIndsChar[-1] == -1 else pIndsChar SNRplans = np.zeros(len(planinds)) if len(planinds) > 0: # initialize arrays for SNR integration fZs = np.zeros(self.ntFlux) / u.arcsec**2 systemParamss = np.empty(self.ntFlux, dtype="object") Ss = np.zeros((self.ntFlux, len(planinds))) Ns = np.zeros((self.ntFlux, len(planinds))) # integrate the signal (planet flux) and noise dt = intTime / float(self.ntFlux) timePlus = ( Obs.settlingTime.copy() + mode["syst"]["ohTime"].copy() ) # accounts for the time since the current time for i in range(self.ntFlux): # allocate first half of dt timePlus += dt / 2.0 # calculate current zodiacal light brightness fZs[i] = ZL.fZ( Obs, TL, np.array([sInd], ndmin=1), (currentTimeAbs + timePlus).reshape(1), mode, )[0] # propagate the system to match up with current time SU.propag_system( sInd, currentTimeNorm + timePlus - self.propagTimes[sInd] ) self.propagTimes[sInd] = currentTimeNorm + timePlus # save planet parameters systemParamss[i] = SU.dump_system_params(sInd) # calculate signal and noise (electron count rates) Ss[i, :], Ns[i, :] = self.calc_signal_noise( sInd, planinds, dt, mode, fZ=fZs[i] ) # allocate second half of dt timePlus += dt / 2.0 # average output parameters fZ = np.mean(fZs) systemParams = { key: sum([systemParamss[x][key] for x in range(self.ntFlux)]) / float(self.ntFlux) for key in sorted(systemParamss[0]) } # calculate planets SNR S = Ss.sum(0) N = Ns.sum(0) SNRplans[N > 0] = S[N > 0] / N[N > 0] # allocate extra time for timeMultiplier # if only a FA, just save zodiacal brightness in the middle of the # integration else: totTime = intTime * (mode["timeMultiplier"]) fZ = ZL.fZ( Obs, TL, np.array([sInd], ndmin=1), (currentTimeAbs + totTime / 2.0).reshape(1), mode, )[0] # calculate the false alarm SNR (if any) SNRfa = [] if pIndsChar[-1] == -1: JEZ = self.lastDetected[sInd, 1][-1] dMag = self.lastDetected[sInd, 2][-1] WA = self.lastDetected[sInd, 3][-1] * u.arcsec C_p, C_b, C_sp = OS.Cp_Cb_Csp( TL, sInd, fZ, JEZ, dMag, WA, mode, TK=self.TimeKeeping ) S = (C_p * intTime).decompose().value N = np.sqrt((C_b * intTime + (C_sp * intTime) ** 2).decompose().value) SNRfa = S / N if N > 0 else 0.0 # save all SNRs (planets and FA) to one array SNRinds = np.where(det)[0][tochar] SNR[SNRinds] = np.append(SNRplans, SNRfa) # now, store characterization status: 1 for full spectrum, # -1 for partial spectrum, 0 for not characterized char = SNR >= mode["SNR"] # initialize with full spectra characterized = char.astype(int) WAchar = self.lastDetected[sInd, 3][char] * u.arcsec # find the current WAs of characterized planets WAs = systemParams["WA"] if FA: WAs = np.append(WAs, self.lastDetected[sInd, 3][-1] * u.arcsec) # check for partial spectra IWA_max = mode["IWA"] * (1 + mode["BW"] / 2.0) OWA_min = mode["OWA"] * (1 - mode["BW"] / 2.0) char[char] = (WAchar < IWA_max) | (WAchar > OWA_min) characterized[char] = -1 # encode results in spectra lists (only for planets, not FA) charplans = characterized[:-1] if FA else characterized self.fullSpectra[pInds[charplans == 1]] += 1 self.partialSpectra[pInds[charplans == -1]] += 1 return characterized.astype(int), fZ, JEZ, systemParams, SNR, intTime