from EXOSIMS.Prototypes.SurveySimulation import SurveySimulation
import EXOSIMS
import os
import astropy.units as u
import astropy.constants as const
import numpy as np
import pickle
import time
from EXOSIMS.util.deltaMag import deltaMag
[docs]
class tieredScheduler(SurveySimulation):
"""tieredScheduler
This class implements a tiered scheduler that independently schedules the
observatory while the starshade slews to its next target.
Args:
coeffs (iterable 7x1):
Cost function coefficients: slew distance, completeness, intTime,
deep-dive least visited ramp, deep-dive unvisited ramp, unvisited ramp,
and least-visited ramp
occHIPs (iterable nx1):
List of star HIP numbers to initialize occulter target list.
topstars (integer):
Number of HIP numbers to recieve preferential treatment.
revisit_wait (float):
Wait time threshold for star revisits. The value given is the fraction of a
characterized planet's period that must be waited before scheduling a
revisit.
revisit_weight (float):
Weight used to increase preference for coronograph revisits.
GAPortion (float):
Portion of mission time used for general astrophysics.
int_inflection (boolean):
Calculate integration time using the pre-calculated integration time curves.
Default is False.
GA_simult_det_fraction (float):
Fraction of detection time to be considered as GA time.
promote_hz_stars (boolean):
Flag that allows promotion of targets with planets in the habitable zone
to the occulter target list.
phase1_end (int):
Number of days to wait before the end of phase 1, when phase 1 ends,
target promotion begins.
n_det_remove (int):
Minimum number of visits with no detections required to filter off star
n_det_min (int):
Minimum number of detections required for promotion
occ_max_visits (int):
Number of maximum visits to a star allowed by the occulter.
max_successful_chars (int):
Maximum number of successful characterizations on a given star before
it is removed from the target list.
max_successful_dets (int):
Maximum number of successful detections on a given star before
it is removed from the target list.
nmax_promo_det (int):
Number of detection on a star required to be promoted regardless of
detection occurance times.
lum_exp (int):
Exponent used in the luminosity weighting function.
tot_det_int_cutoff (float):
Number of total days the scheduler is allowed to spend on detections.
prom_stop_dets (boolean):
Remove an sInd from the revisit list upon promotion regardeless of
the value of 'max_successful_dets'.
disable_dets (boolean):
It sets 'max_successful_dets' to 0 forcing 'choose_next_telescope_target'
to return None. Therefore it sets sInd equal to occ_sInd at every iteration.
**specs:
user specified values
"""
def __init__(
self,
coeffs=[2, 1, 1, 8, 4, 1, 1],
occHIPs=[],
topstars=0,
revisit_wait=0.5,
revisit_weight=1.0,
GAPortion=0.25,
int_inflection=False,
GA_simult_det_fraction=0.07,
promote_hz_stars=False,
phase1_end=365,
n_det_remove=3,
n_det_min=3,
occ_max_visits=3,
max_successful_chars=1,
max_successful_dets=4,
nmax_promo_det=4,
lum_exp=1,
tot_det_int_cutoff=None,
prom_stop_dets=True,
disable_dets=False,
**specs,
):
SurveySimulation.__init__(self, **specs)
# verify that coefficients input is iterable 4x1
if not (isinstance(coeffs, (list, tuple, np.ndarray))) or (len(coeffs) != 7):
raise TypeError("coeffs must be a 7 element iterable")
TK = self.TimeKeeping
TL = self.TargetList
OS = self.OpticalSystem
SU = self.SimulatedUniverse
# Add to outspec
self._outspec["coeffs"] = coeffs
self._outspec["occHIPs"] = occHIPs
self._outspec["topstars"] = topstars
self._outspec["revisit_wait"] = revisit_wait
self._outspec["revisit_weight"] = revisit_weight
self._outspec["GAPortion"] = GAPortion
self._outspec["int_inflection"] = int_inflection
self._outspec["GA_simult_det_fraction"] = GA_simult_det_fraction
self._outspec["promote_hz_stars"] = promote_hz_stars
self._outspec["phase1_end"] = phase1_end
self._outspec["n_det_remove"] = n_det_remove
self._outspec["n_det_min"] = n_det_min
self._outspec["occ_max_visits"] = occ_max_visits
self._outspec["max_successful_chars"] = max_successful_chars
self._outspec["lum_exp"] = lum_exp
# normalize coefficients
coeffs = np.array(coeffs)
coeffs = coeffs / np.linalg.norm(coeffs, ord=1)
self.coeffs = coeffs
if occHIPs != []:
occHIPs_path = os.path.join(EXOSIMS.__path__[0], "Scripts", occHIPs)
assert os.path.isfile(occHIPs_path), "%s is not a file." % occHIPs_path
with open(occHIPs_path, "r") as ofile:
HIPsfile = ofile.read()
self.occHIPs = HIPsfile.split(",")
if len(self.occHIPs) <= 1:
self.occHIPs = HIPsfile.split("\n")
else:
# assert occHIPs != [], "occHIPs target list is empty, occHIPs
# file must be specified in script file"
self.occHIPs = occHIPs
self.occHIPs = [hip.strip() for hip in self.occHIPs]
# The timestamp at which the occulter finishes slewing
self.occ_arrives = TK.currentTimeAbs.copy()
self.occ_starRevisit = np.array([]) # Array of star revisit times
# The number of times each star was visited by the occulter
self.occ_starVisits = np.zeros(TL.nStars, dtype=int)
# Flag that determines whether or not we are in phase 1
self.is_phase1 = True
# The designated end time for the first observing phase
self.phase1_end = TK.missionStart.copy() + phase1_end * u.d
self.FA_status = np.zeros(TL.nStars, dtype=bool) # False Alarm status array
# Percentage of mission devoted to general astrophysics
self.GA_percentage = GAPortion
self.GAtime = 0.0 * u.d # Current amount of time devoted to GA
# Fraction of detection time allocated to GA
self.GA_simult_det_fraction = GA_simult_det_fraction
# The desired amount of GA time based off of mission time
self.goal_GAtime = None
self.curves = None
self.ao = None
# Use int_inflection to calculate int times
self.int_inflection = int_inflection
self.promote_hz_stars = promote_hz_stars # Flag to promote hz stars
# Keeps track of last characterized star to avoid repeats
self.last_chard = None
# The exponent to use for luminosity weighting on coronograph targets
self.lum_exp = lum_exp
self.ready_to_update = False
self.occ_slewTime = 0.0 * u.d
self.occ_sd = 0.0 * u.rad
self.sInd_charcounts = {} # Number of characterizations by star index
# Number of detections by star index
self.sInd_detcounts = np.zeros(TL.nStars, dtype=int)
self.sInd_dettimes = {}
# Minimum number of visits with no detections required to filter off star
self.n_det_remove = n_det_remove
# Minimum number of detections required for promotion
self.n_det_min = n_det_min
# Maximum number of allowed occulter visits
self.occ_max_visits = occ_max_visits
# Maximum allowed number of successful chars of deep dive targets
# before removal from target list
self.max_successful_chars = max_successful_chars
self.disable_dets = disable_dets
if self.disable_dets:
self.max_successful_dets = 0
else:
self.max_successful_dets = max_successful_dets
self.nmax_promo_det = nmax_promo_det
if tot_det_int_cutoff is None:
self.tot_det_int_cutoff = np.inf
else:
self.tot_det_int_cutoff = tot_det_int_cutoff * u.d
self.tot_dettime = 0.0 * u.d
# Allow preferential treatment of top n stars in occ_sInds target list
self.topstars = topstars
self.coeff_data_a3 = []
self.coeff_data_a4 = []
self.coeff_time = []
# self.revisit_wait = revisit_wait * u.d
EEID = 1 * u.AU * np.sqrt(TL.L)
mu = const.G * (TL.MsTrue)
T = (2.0 * np.pi * np.sqrt(EEID**3 / mu)).to("d")
self.revisit_wait = revisit_wait * T
self.revisit_weight = revisit_weight
self.no_dets = np.ones(self.TargetList.nStars, dtype=bool)
# list of stars promoted from the coronograph list to the starshade list
self.promoted_stars = []
# list of stars that have been removed from the occ_sInd list
self.ignore_stars = []
# corresponding integration times for earths
self.t_char_earths = np.array([])
# Precalculating intTimeFilter
allModes = OS.observingModes
char_mode = list(filter(lambda mode: "spec" in mode["inst"]["name"], allModes))[
0
]
sInds = np.arange(TL.nStars) # Initialize some sInds array
(
self.occ_valfZmin,
self.occ_absTimefZmin,
) = self.ZodiacalLight.extractfZmin(
self.fZmins[char_mode["syst"]["name"]], sInds, self.koTimes
)
JEZ = TL.JEZ0[char_mode["hex"]][sInds] # Get the default values per star
dMag = TL.int_dMag[sInds] # grabbing dMag
WA = TL.int_WA[sInds] # grabbing WA
self.occ_intTimesIntTimeFilter = (
self.OpticalSystem.calc_intTime(
TL, sInds, self.occ_valfZmin, JEZ, dMag, WA, char_mode
)
* char_mode["timeMultiplier"]
)
# These indices are acceptable for use simulating
self.occ_intTimeFilterInds = np.where(
(
(self.occ_intTimesIntTimeFilter > 0)
& (self.occ_intTimesIntTimeFilter <= self.OpticalSystem.intCutoff)
)
)[0]
# Promote all stars assuming they have known earths
occ_sInds_with_earths = []
if TL.earths_only:
char_mode = list(
filter(lambda mode: "spec" in mode["inst"]["name"], OS.observingModes)
)[0]
# check for earths around the available stars
for sInd in np.arange(TL.nStars):
pInds = np.where(SU.plan2star == sInd)[0]
pinds_earthlike = self.is_earthlike(pInds, sInd)
if np.any(pinds_earthlike):
self.known_earths = np.union1d(
self.known_earths, pInds[pinds_earthlike]
).astype(int)
occ_sInds_with_earths.append(sInd)
self.promoted_stars = np.union1d(
self.promoted_stars, occ_sInds_with_earths
).astype(int)
# calculate example integration times
sInds = SU.plan2star[self.known_earths]
fZ = self.occ_valfZmin[sInds]
# fZ = ZL.fZ(Obs, TL, sInds, TK.currentTimeAbs.copy(), char_mode)
# Walker previous version.
JEZ = TL.JEZ0[char_mode["hex"]][sInds][self.known_earths]
if SU.lucky_planets:
phi = (1 / np.pi) * np.ones(len(SU.d))
dMag = deltaMag(SU.p, SU.Rp, SU.d, phi)[
self.known_earths
] # delta magnitude
WAp = np.arctan(SU.a / TL.dist[SU.plan2star]).to("arcsec")[
self.known_earths
] # working angle
else:
dMag = SU.dMag[self.known_earths]
WAp = SU.WA[self.known_earths]
# WAp = SU.WA[self.known_earths]
# dMag = SU.dMag[self.known_earths]
self.t_char_earths = OS.calc_intTime(
TL, sInds, fZ, JEZ, dMag, WAp, char_mode
)
self.t_char_earths[~np.isfinite(self.t_char_earths)] = 0 * u.d
# occ_sInds = occ_sInds[(occ_intTimes[occ_sInds] > 0.0*u.d)]
sInds = sInds[
(self.t_char_earths > 0)
& (self.t_char_earths <= self.OpticalSystem.intCutoff)
]
# sInds = sInds[(self.t_char_earths <= self.OpticalSystem.intCutoff)]
self.occ_intTimeFilterInds = np.intersect1d(sInds, np.arange(TL.nStars))
[docs]
def run_sim(self):
"""Performs the survey simulation
Returns:
mission_end (string):
Message printed at the end of a survey simulation.
"""
OS = self.OpticalSystem
TL = self.TargetList
SU = self.SimulatedUniverse
Obs = self.Observatory
TK = self.TimeKeeping
Comp = self.Completeness
# TODO: start using this self.currentSep
# set occulter separation if haveOcculter
self.currentSep = Obs.occulterSep
# Choose observing modes selected for detection (default marked with a flag),
det_mode = list(filter(lambda mode: mode["detectionMode"], OS.observingModes))[
0
]
# and for characterization (default is first spectro/IFS mode)
spectroModes = list(
filter(lambda mode: "spec" in mode["inst"]["name"], OS.observingModes)
)
if np.any(spectroModes):
char_mode = spectroModes[0]
# if no spectro mode, default char mode is first observing mode
else:
char_mode = OS.observingModes[0]
# Begin Survey, and loop until mission is finished
self.logger.info("OB{}: survey beginning.".format(TK.OBnumber + 1))
self.vprint("OB{}: survey beginning.".format(TK.OBnumber + 1))
t0 = time.time()
sInd = None
occ_sInd = None
cnt = 0
while not TK.mission_is_over(OS, Obs, det_mode):
# Acquire the NEXT TARGET star index and create DRM
# prev_occ_sInd = occ_sInd
old_sInd = sInd # used to save sInd if returned sInd is None
waitTime = None
DRM, sInd, occ_sInd, t_det, sd, occ_sInds = self.next_target(
sInd, occ_sInd, det_mode, char_mode
)
# print(TK.currentTimeAbs.copy())
if not self.disable_dets:
true_t_det = (
t_det * det_mode["timeMultiplier"]
+ Obs.settlingTime
+ det_mode["syst"]["ohTime"]
)
if sInd != occ_sInd and sInd is not None:
assert t_det != 0, "Integration time can't be 0."
if (
sInd is not None
and (TK.currentTimeAbs.copy() + true_t_det) >= self.occ_arrives
and occ_sInd != self.last_chard
):
sInd = occ_sInd
if sInd == occ_sInd:
self.ready_to_update = True
else:
t_det = 0 * u.d # not used for char, but explicit
if (
occ_sInd is not None
and TK.currentTimeAbs.copy() <= self.occ_arrives
and occ_sInd != self.last_chard
):
sInd = occ_sInd
self.ready_to_update = True
else:
sInd, occ_sInd = None, None
# elif (
# occ_sInd is not None
# and occ_sInd == self.last_chard
# ):
# sInd, occ_sInd = None, None
# self.ready_to_update = False
if (occ_sInd is not None) and (
TK.currentTimeAbs.copy() > self.occ_arrives
):
self.logger.warning(
"Past occ_arrives with occ_sInd set — missed arrival window"
)
time2arrive = self.occ_arrives - TK.currentTimeAbs.copy()
# print(sInd, occ_sInd, time2arrive, self.occ_arrives, TK.currentTimeAbs.copy())
if sInd is not None:
cnt += 1
# clean up revisit list when one occurs to prevent repeats
if np.any(self.starRevisit) and np.any(
np.where(self.starRevisit[:, 0] == float(sInd))
):
s_revs = np.where(self.starRevisit[:, 0] == float(sInd))[0]
t_revs = np.where(
self.starRevisit[:, 1] * u.day - TK.currentTimeNorm.copy()
< 0 * u.d
)[0]
self.starRevisit = np.delete(
self.starRevisit, np.intersect1d(s_revs, t_revs), 0
)
# get the index of the selected target for the extended list
if (
TK.currentTimeNorm.copy() > TK.missionLife
and self.starExtended.shape[0] == 0
):
for i in range(len(self.DRM)):
if np.any([x == 1 for x in self.DRM[i]["plan_detected"]]):
self.starExtended = np.hstack(
(self.starExtended, self.DRM[i]["star_ind"])
)
self.starExtended = np.unique(self.starExtended)
# Beginning of observation, start to populate DRM
DRM["OB_nb"] = TK.OBnumber + 1
DRM["ObsNum"] = cnt
DRM["star_ind"] = sInd
pInds = np.where(SU.plan2star == sInd)[0]
DRM["plan_inds"] = pInds.astype(int).tolist()
if sInd == occ_sInd:
# wait until expected arrival time is observed
if time2arrive > 0 * u.d:
TK.advanceToAbsTime(self.occ_arrives)
if time2arrive > 1 * u.d:
self.GAtime = self.GAtime + time2arrive.to("day")
TK.obsStart = TK.currentTimeNorm.copy().to("day")
self.logger.info(
"Observation #%s, target #%s/%s with %s planet(s), mission time: %s"
% (cnt, sInd + 1, TL.nStars, len(pInds), TK.obsStart.round(2))
)
self.vprint(
"Observation #%s, target #%s/%s with %s planet(s), mission time: %s"
% (cnt, sInd + 1, TL.nStars, len(pInds), TK.obsStart.round(2))
)
DRM["arrival_time"] = TK.currentTimeNorm.copy().to("day")
if sInd != occ_sInd and not self.disable_dets:
self.starVisits[sInd] += 1
# PERFORM DETECTION and populate revisit list attribute.
# First store dMag, WA
if np.any(pInds):
DRM["det_dMag"] = SU.dMag[pInds].tolist()
DRM["det_WA"] = SU.WA[pInds].to("mas").value.tolist()
(
detected,
det_fZ,
det_JEZ,
det_systemParams,
det_SNR,
FA,
) = self.observation_detection(sInd, t_det, det_mode)
if np.any(pInds):
DRM["det_JEZ"] = det_JEZ
if np.any(detected > 0):
self.sInd_detcounts[sInd] += 1
self.sInd_dettimes[sInd] = (
self.sInd_dettimes.get(sInd) or []
) + [TK.currentTimeNorm.copy().to("day")]
self.vprint(" Det. results are: %s" % (detected))
# update GAtime
self.GAtime = (
self.GAtime + t_det.to("day") * self.GA_simult_det_fraction
)
self.tot_dettime += t_det.to("day")
# populate the DRM with detection results
DRM["det_time"] = t_det.to("day")
DRM["det_status"] = detected
DRM["det_SNR"] = det_SNR
DRM["det_fZ"] = det_fZ.to("1/arcsec2")
DRM["det_params"] = det_systemParams
DRM["FA_det_status"] = int(FA)
det_comp = Comp.comp_per_intTime(
t_det,
TL,
sInd,
det_fZ,
TL.JEZ0[det_mode["hex"]][sInd],
TL.int_WA[sInd],
det_mode,
)[0]
DRM["det_comp"] = det_comp
DRM["det_mode"] = dict(det_mode)
del DRM["det_mode"]["inst"], DRM["det_mode"]["syst"]
elif sInd == occ_sInd:
self.occ_starVisits[occ_sInd] += 1
self.last_chard = occ_sInd
# PERFORM CHARACTERIZATION and populate spectra list attribute.
occ_pInds = np.where(SU.plan2star == occ_sInd)[0]
sInd = occ_sInd
DRM["slew_time"] = self.occ_slewTime.to("day").value
DRM["slew_angle"] = self.occ_sd.to("deg").value
slew_mass_used = (
self.occ_slewTime * Obs.defburnPortion * Obs.flowRate
)
DRM["slew_dV"] = (
(self.occ_slewTime * self.ao * Obs.defburnPortion)
.to("m/s")
.value
)
DRM["slew_mass_used"] = slew_mass_used.to("kg")
Obs.scMass = Obs.scMass - slew_mass_used
DRM["scMass"] = Obs.scMass.to("kg")
if Obs.twotanks:
Obs.slewMass = Obs.slewMass - slew_mass_used
DRM["slewMass"] = Obs.slewMass.to("kg")
self.logger.info(" Starshade and telescope aligned at target star")
self.vprint(" Starshade and telescope aligned at target star")
# PERFORM CHARACTERIZATION and populate spectra list attribute
(
characterized,
char_fZ,
char_JEZ,
char_systemParams,
char_SNR,
char_intTime,
) = self.observation_characterization(sInd, char_mode)
if np.any(characterized):
self.vprint(" Char. results are: %s" % (characterized.T))
else:
# make sure we don't accidentally double characterize
TK.advanceToAbsTime(TK.currentTimeAbs.copy() + 0.01 * u.d)
assert char_intTime != 0, "Integration time can't be 0."
if np.any(occ_pInds):
DRM["char_JEZ"] = char_JEZ.tolist()
DRM["char_dMag"] = SU.dMag[occ_pInds].tolist()
DRM["char_WA"] = SU.WA[occ_pInds].to("mas").value.tolist()
DRM["char_mode"] = dict(char_mode)
del DRM["char_mode"]["inst"], DRM["char_mode"]["syst"]
# update the occulter wet mass
if OS.haveOcculter and char_intTime is not None:
DRM = self.update_occulter_mass(DRM, sInd, char_intTime, "char")
char_comp = Comp.comp_per_intTime(
char_intTime,
TL,
occ_sInd,
char_fZ,
char_JEZ,
TL.int_WA[occ_sInd],
char_mode,
)[0]
DRM["char_comp"] = char_comp
FA = False
# populate the DRM with characterization results
DRM["char_time"] = (
char_intTime.to("day")
if char_intTime is not None
else 0.0 * u.day
)
# DRM['char_counts'] = self.sInd_charcounts[sInd]
DRM["char_status"] = characterized[:-1] if FA else characterized
DRM["char_SNR"] = char_SNR[:-1] if FA else char_SNR
DRM["char_fZ"] = char_fZ.to("1/arcsec2")
DRM["char_params"] = char_systemParams
# populate the DRM with FA results
DRM["FA_det_status"] = int(FA)
DRM["FA_char_status"] = characterized[-1] if FA else 0
DRM["FA_char_SNR"] = char_SNR[-1] if FA else 0.0
DRM["FA_char_JEZ"] = (
self.lastDetected[sInd, 1][-1] / u.arcsec**2
if FA
else 0.0 * u.ph / u.s / u.m**2 / u.arcsec**2
)
DRM["FA_char_dMag"] = self.lastDetected[sInd, 2][-1] if FA else 0.0
DRM["FA_char_WA"] = (
self.lastDetected[sInd, 3][-1] * u.arcsec
if FA
else 0.0 * u.arcsec
)
# add star back into the revisit list
if np.any(characterized):
char = np.where(characterized)[0]
pInds = np.where(SU.plan2star == sInd)[0]
smin = np.min(SU.s[pInds[char]])
pInd_smin = pInds[np.argmin(SU.s[pInds[char]])]
Ms = TL.MsTrue[sInd]
sp = smin
Mp = SU.Mp[pInd_smin]
mu = const.G * (Mp + Ms)
T = 2.0 * np.pi * np.sqrt(sp**3 / mu)
t_rev = TK.currentTimeNorm.copy() + T / 2.0 # noqa: F841
# do not revisit char if lucky_planets (occ_max_visits = 3, default)
if SU.lucky_planets and -1 in characterized:
self.occ_starVisits[occ_sInds] = self.occ_max_visits + 0
self.goal_GAtime = self.GA_percentage * TK.currentTimeNorm.copy().to(
"day"
)
goal_GAdiff = self.goal_GAtime - self.GAtime
# allocate extra time to GA if we are falling behind
if (
goal_GAdiff > 1 * u.d
and TK.currentTimeAbs.copy() < self.occ_arrives
):
GA_diff = min(
self.occ_arrives - TK.currentTimeAbs.copy(), goal_GAdiff
)
self.vprint(
"Allocating time %s to general astrophysics" % (GA_diff)
)
self.GAtime = self.GAtime + GA_diff
TK.advanceToAbsTime(TK.currentTimeAbs.copy() + GA_diff)
# allocate time if there is no target for the starshade
elif (
goal_GAdiff > 1 * u.d
and (self.occ_arrives - TK.currentTimeAbs.copy()) < -5 * u.d
and not np.any(occ_sInds)
):
self.vprint(
(
"No Available Occulter Targets: Allocating time "
"{} to general astrophysics"
).format(goal_GAdiff)
)
self.GAtime = self.GAtime + goal_GAdiff
TK.advanceToAbsTime(TK.currentTimeAbs.copy() + goal_GAdiff)
DRM["exoplanetObsTime"] = TK.exoplanetObsTime.copy()
# Append result values to self.DRM
self.DRM.append(DRM)
# Calculate observation end time
TK.obsEnd = TK.currentTimeNorm.copy().to("day")
# With prototype TimeKeeping, if no OB duration was specified, advance
# to the next OB with timestep equivalent to time spent on one target
if np.isinf(TK.OBduration) and (TK.missionPortion < 1):
self.arbitrary_time_advancement(
TK.currentTimeNorm.to("day").copy() - DRM["arrival_time"]
)
else: # sInd == None
sInd = old_sInd # Retain the last observed star
if (
TK.currentTimeNorm.copy() >= TK.OBendTimes[TK.OBnumber]
): # currentTime is at end of OB
# Conditional Advance To Start of Next OB
if not TK.mission_is_over(
OS, Obs, det_mode
): # as long as the mission is not over
TK.advancetToStartOfNextOB() # Advance To Start of Next OB
elif waitTime is not None:
# CASE 1: Advance specific wait time
_ = TK.advanceToAbsTime(TK.currentTimeAbs.copy() + waitTime)
self.vprint("waitTime is not None")
else:
startTimes = (
TK.currentTimeAbs.copy() + np.zeros(TL.nStars) * u.d
) # Start Times of Observations
observableTimes = Obs.calculate_observableTimes(
TL,
np.arange(TL.nStars),
startTimes,
self.koMaps,
self.koTimes,
det_mode,
)[0]
# CASE 2 If There are no observable targets for the rest of
# the mission
if (
observableTimes[
(
TK.missionFinishAbs.copy().value * u.d
> observableTimes.value * u.d
)
* (
observableTimes.value * u.d
>= TK.currentTimeAbs.copy().value * u.d
)
].shape[0]
) == 0:
# Are there any stars coming out of keepout before
# end of mission
self.vprint(
(
"No Observable Targets for Remainder of mission at "
"currentTimeNorm = {}".format(TK.currentTimeNorm)
)
)
# Manually advancing time to mission end
TK.currentTimeNorm = TK.missionLife.copy()
TK.currentTimeAbs = TK.missionFinishAbs.copy()
else:
# CASE 3 nominal wait time if at least 1 target is still in
# list and observable
# TODO: ADD ADVANCE TO WHEN FZMIN OCURS
inds1 = np.arange(TL.nStars)[
observableTimes.value * u.d
> TK.currentTimeAbs.copy().value * u.d
]
# apply intTime filter
inds2 = np.intersect1d(self.intTimeFilterInds, inds1)
# apply revisit Filter #NOTE this means stars you added
# to the revisit list
inds3 = self.revisitFilter(
inds2, TK.currentTimeNorm.copy() + self.dt_max.to(u.d)
)
self.vprint(
"Filtering %d stars from advanceToAbsTime"
% (TL.nStars - len(inds3))
)
oTnowToEnd = observableTimes[inds3]
# there is at least one observableTime between now and
# the end of the mission
if not oTnowToEnd.value.shape[0] == 0:
# advance to that observable time
tAbs = np.min(oTnowToEnd)
else:
# advance to end of mission
tAbs = TK.missionStart + TK.missionLife
tmpcurrentTimeNorm = TK.currentTimeNorm.copy()
# Advance Time to this time OR start of next OB following
# this time
_ = TK.advanceToAbsTime(tAbs)
self.vprint(
(
"No Observable Targets a currentTimeNorm = {:.2f} "
"Advanced To currentTimeNorm = {:.2f}"
).format(
tmpcurrentTimeNorm.to("day").value,
TK.currentTimeNorm.to("day").value,
)
)
else:
dtsim = (time.time() - t0) * u.s
mission_end = (
"Mission complete: no more time available.\n"
+ "Simulation duration: %s.\n" % dtsim.astype("int")
+ "Results stored in SurveySimulation.DRM (Design Reference Mission).\n"
+ "Time allocated to General Astrophysics = "
+ str(round(self.GAtime.value, 2))
+ "\n"
)
self.logger.info(mission_end)
self.vprint(mission_end)
return mission_end
[docs]
def next_target(self, old_sInd, old_occ_sInd, det_mode, char_mode):
"""Finds index of next target star and calculates its integration time.
This method chooses the next target star index based on which
stars are available, their integration time, and maximum completeness.
Returns None if no target could be found.
Args:
old_sInd (integer):
Index of the previous target star for the telescope
old_occ_sInd (integer):
Index of the previous target star for the occulter
det_mode (dict):
Selected observing mode for detection
char_mode (dict):
Selected observing mode for characterization
Returns:
tuple:
DRM (dicts):
Contains the results of survey simulation
sInd (integer):
Index of next target star. Defaults to None.
occ_sInd (integer):
Index of next occulter target star. Defaults to None.
t_det (astropy Quantity):
Selected star integration time for detection in units of day.
Defaults to None.
"""
OS = self.OpticalSystem
ZL = self.ZodiacalLight
TL = self.TargetList
Obs = self.Observatory
TK = self.TimeKeeping
SU = self.SimulatedUniverse
# selecting appropriate koMap
occ_koMap = self.koMaps[char_mode["syst"]["name"]]
koMap = self.koMaps[det_mode["syst"]["name"]]
# Create DRM
DRM = {}
# In case of an occulter, initialize slew time factor
# (add transit time and reduce starshade mass)
assert OS.haveOcculter
self.ao = Obs.thrust / Obs.scMass
# Star indices that correspond with the given HIPs numbers for the occulter
# XXX ToDo: print out HIPs that don't show up in TL
HIP_sInds = np.where(np.isin(TL.Name, self.occHIPs))[0]
if TL.earths_only:
HIP_sInds = np.union1d(HIP_sInds, self.promoted_stars).astype(int)
sInd = None
# Now, start to look for available targets
while not TK.mission_is_over(OS, Obs, det_mode):
# allocate settling time + overhead time
tmpCurrentTimeAbs = TK.currentTimeAbs.copy()
# tmpCurrentTimeNorm = TK.currentTimeNorm.copy()
occ_tmpCurrentTimeAbs = TK.currentTimeAbs.copy()
# occ_tmpCurrentTimeNorm = TK.currentTimeNorm.copy()
# 0 initialize arrays
slewTimes = np.zeros(TL.nStars) * u.d
# fZs = np.zeros(TL.nStars) / u.arcsec**2
# dV = np.zeros(TL.nStars) * u.m / u.s
intTimes = np.zeros(TL.nStars) * u.d
occ_intTimes = np.zeros(TL.nStars) * u.d
occ_tovisit = np.zeros(TL.nStars, dtype=bool)
sInds = np.arange(TL.nStars)
# 1 Find spacecraft orbital START positions and filter out unavailable
# targets. If occulter, each target has its own START position.
sd = Obs.star_angularSep(TL, old_occ_sInd, sInds, tmpCurrentTimeAbs)
obsTimes = Obs.calculate_observableTimes(
TL, sInds, tmpCurrentTimeAbs, self.koMaps, self.koTimes, char_mode
)
slewTimes = Obs.calculate_slewTimes(
TL, old_occ_sInd, sInds, sd, obsTimes, tmpCurrentTimeAbs
)
# 2.1 filter out totTimes > integration cutoff
if len(sInds) > 0:
occ_sInds = np.intersect1d(self.occ_intTimeFilterInds, sInds)
if len(sInds) > 0:
sInds = np.intersect1d(self.intTimeFilterInds, sInds)
# Starttimes based off of slewtime
occ_startTimes = occ_tmpCurrentTimeAbs.copy() + slewTimes
# occ_startTimesNorm = occ_tmpCurrentTimeNorm.copy() + slewTimes
startTimes = tmpCurrentTimeAbs.copy() + np.zeros(TL.nStars) * u.d
# startTimesNorm = tmpCurrentTimeNorm.copy()
# 2.5 Filter stars not observable at startTimes
try:
tmpIndsbool = list()
for i in np.arange(len(occ_sInds)):
koTimeInd = np.where(
np.round(occ_startTimes[occ_sInds[i]].value)
- self.koTimes.value
== 0
)[0][
0
] # find indice where koTime is endTime[0]
tmpIndsbool.append(
occ_koMap[occ_sInds[i]][koTimeInd].astype(bool)
) # Is star observable at time ind
sInds_occ_ko = occ_sInds[tmpIndsbool]
occ_sInds = sInds_occ_ko[np.where(np.isin(sInds_occ_ko, HIP_sInds))[0]]
del tmpIndsbool
except: # noqa: E722 If there are no target stars to observe
sInds_occ_ko = np.asarray([], dtype=int)
occ_sInds = np.asarray([], dtype=int)
try:
tmpIndsbool = list()
for i in np.arange(len(sInds)):
koTimeInd = np.where(
np.round(startTimes[sInds[i]].value) - self.koTimes.value == 0
)[0][
0
] # find indice where koTime is endTime[0]
tmpIndsbool.append(
koMap[sInds[i]][koTimeInd].astype(bool)
) # Is star observable at time ind
sInds = sInds[tmpIndsbool]
del tmpIndsbool
except: # noqa: E722 If there are no target stars to observe
sInds = np.asarray([], dtype=int)
# 2.9 Occulter target promotion step
occ_sInds = self.promote_coro_targets(occ_sInds, sInds_occ_ko)
# 3 Filter out all previously (more-)visited targets, unless in
# revisit list
if len(sInds.tolist()) > 0:
sInds = self.revisitFilter(sInds, TK.currentTimeNorm.copy())
# revisit list, with time after start
if np.any(occ_sInds):
occ_tovisit[occ_sInds] = (
self.occ_starVisits[occ_sInds]
== self.occ_starVisits[occ_sInds].min()
)
if self.occ_starRevisit.size != 0:
dt_rev = (
TK.currentTimeNorm.copy() - self.occ_starRevisit[:, 1] * u.day
)
ind_rev = [
int(x)
for x in self.occ_starRevisit[dt_rev > 0, 0]
if x in occ_sInds
]
occ_tovisit[ind_rev] = True
occ_sInds = np.where(occ_tovisit)[0]
# 4 calculate integration times for ALL preselected targets,
# and filter out totTimes > integration cutoff
(
maxIntTimeOBendTime,
maxIntTimeExoplanetObsTime,
maxIntTimeMissionLife,
) = TK.get_ObsDetectionMaxIntTime(Obs, det_mode)
maxIntTime = min(
maxIntTimeOBendTime,
maxIntTimeExoplanetObsTime,
maxIntTimeMissionLife,
OS.intCutoff,
) # Maximum intTime allowed
(
maxIntTimeOBendTime,
maxIntTimeExoplanetObsTime,
maxIntTimeMissionLife,
) = TK.get_ObsDetectionMaxIntTime(Obs, char_mode)
occ_maxIntTime = min(
maxIntTimeOBendTime,
maxIntTimeExoplanetObsTime,
maxIntTimeMissionLife,
OS.intCutoff,
) # Maximum intTime allowed
if len(occ_sInds) > 0:
if self.int_inflection:
JEZ = TL.JEZ0[char_mode["hex"]][occ_sInds]
WA = TL.int_WA
occ_intTimes[occ_sInds] = self.calc_int_inflection(
occ_sInds,
JEZ,
occ_startTimes,
WA[occ_sInds],
char_mode,
ischar=True,
)
totTimes = occ_intTimes * char_mode["timeMultiplier"]
occ_endTimes = occ_startTimes + totTimes
else:
# characterization_start = occ_startTimes
occ_intTimes[occ_sInds] = self.calc_targ_intTime(
occ_sInds, occ_startTimes[occ_sInds], char_mode
) * (1 + self.charMargin)
# Adjust integration time for stars with known earths around them
for occ_star in occ_sInds:
if occ_star in self.promoted_stars:
occ_earths = np.intersect1d(
np.where(SU.plan2star == occ_star)[0], self.known_earths
).astype(int)
if np.any(occ_earths):
fZ = ZL.fZ(
Obs,
TL,
occ_star,
occ_startTimes[occ_star],
char_mode,
)
JEZ = SU.scale_JEZ(occ_star, char_mode)
if SU.lucky_planets:
phi = (1 / np.pi) * np.ones(len(SU.d))
dMag = deltaMag(SU.p, SU.Rp, SU.d, phi)[
occ_earths
] # delta magnitude
WA = np.arctan(SU.a / TL.dist[SU.plan2star]).to(
"arcsec"
)[
occ_earths
] # working angle
else:
dMag = SU.dMag[occ_earths]
WA = SU.WA[occ_earths]
if np.all(
(WA < char_mode["IWA"]) | (WA > char_mode["OWA"])
):
occ_intTimes[occ_star] = 0.0 * u.d
else:
earthlike_inttimes = OS.calc_intTime(
TL, occ_star, fZ, JEZ, dMag, WA, char_mode
) * (1 + self.charMargin)
earthlike_inttimes[
~np.isfinite(earthlike_inttimes)
] = (0 * u.d)
earthlike_inttime = earthlike_inttimes[
(earthlike_inttimes < occ_maxIntTime)
]
if len(earthlike_inttime) > 0:
occ_intTimes[occ_star] = np.max(
earthlike_inttime
)
else:
occ_intTimes[occ_star] = np.max(
earthlike_inttimes
)
occ_endTimes = (
occ_startTimes
+ (occ_intTimes * char_mode["timeMultiplier"])
+ Obs.settlingTime
+ char_mode["syst"]["ohTime"]
)
occ_sInds = occ_sInds[
(occ_intTimes[occ_sInds] <= occ_maxIntTime)
] # Filters targets exceeding maximum intTime
occ_sInds = occ_sInds[
(occ_intTimes[occ_sInds] > 0.0 * u.d)
] # Filters with an inttime of 0
if occ_maxIntTime.value <= 0:
occ_sInds = np.asarray([], dtype=int)
if len(sInds.tolist()) > 0:
intTimes[sInds] = self.calc_targ_intTime(
sInds, startTimes[sInds], det_mode
)
sInds = sInds[
(intTimes[sInds] <= maxIntTime)
] # Filters targets exceeding end of OB
endTimes = startTimes.value * u.d + intTimes
if maxIntTime.value <= 0:
sInds = np.asarray([], dtype=int)
# 5.2 find spacecraft orbital END positions (for each candidate target),
# and filter out unavailable targets
if len(occ_sInds.tolist()) > 0 and Obs.checkKeepoutEnd:
try:
# endTimes may exist past koTimes so we have an exception
# to hand this case
tmpIndsbool = list()
for i in np.arange(len(occ_sInds)):
koTimeInd = np.where(
np.round(occ_endTimes[occ_sInds[i]].value)
- self.koTimes.value
== 0
)[0][
0
] # find indice where koTime is endTime[0]
tmpIndsbool.append(
occ_koMap[occ_sInds[i]][koTimeInd].astype(bool)
) # Is star observable at time ind
occ_sInds = occ_sInds[tmpIndsbool]
del tmpIndsbool
except: # noqa: E722
occ_sInds = np.asarray([], dtype=int)
if len(sInds.tolist()) > 0 and Obs.checkKeepoutEnd:
try:
# endTimes may exist past koTimes so we have an exception
# to hand this case
tmpIndsbool = list()
for i in np.arange(len(sInds)):
koTimeInd = np.where(
np.round(endTimes[sInds[i]].value) - self.koTimes.value == 0
)[0][
0
] # find indice where koTime is endTime[0]
tmpIndsbool.append(
koMap[sInds[i]][koTimeInd].astype(bool)
) # Is star observable at time ind
sInds = sInds[tmpIndsbool]
del tmpIndsbool
except: # noqa: E722
sInds = np.asarray([], dtype=int)
# 5.3 Filter off current occulter target star from detection list
if old_occ_sInd is not None:
sInds = sInds[np.where(sInds != old_occ_sInd)]
occ_sInds = occ_sInds[(occ_sInds != old_occ_sInd)]
# 6.1 Filter off any stars visited by the occulter more than the
# max number of times
if np.any(occ_sInds):
occ_sInds = occ_sInds[
(self.occ_starVisits[occ_sInds] < self.occ_max_visits)
]
# 6.2 Filter off coronograph stars with too many visits and no detections
no_dets = np.logical_and(
(self.starVisits[sInds] > self.n_det_remove),
(self.sInd_detcounts[sInds] == 0),
)
sInds = sInds[np.where(np.invert(no_dets))[0]]
max_dets = np.where(self.sInd_detcounts[sInds] < self.max_successful_dets)[
0
]
sInds = sInds[max_dets]
# 7 Filter off cornograph stars with too-long inttimes
if self.occ_arrives > TK.currentTimeAbs:
available_time = self.occ_arrives - TK.currentTimeAbs.copy()
if np.any(sInds[intTimes[sInds] < available_time]):
sInds = sInds[intTimes[sInds] < available_time]
# 8 remove occ targets on ignore_stars list
occ_sInds = np.setdiff1d(
occ_sInds, np.intersect1d(occ_sInds, self.ignore_stars)
)
t_det = 0 * u.d
occ_sInd = old_occ_sInd
if np.any(sInds):
# choose sInd of next target
sInd = self.choose_next_telescope_target(
old_sInd, sInds, intTimes[sInds]
)
# store relevant values
t_det = intTimes[sInd]
# 8 Choose best target from remaining
# if the starshade has arrived at its destination, or it is
# the first observation
if np.any(occ_sInds):
if old_occ_sInd is None or (
(TK.currentTimeAbs.copy() + t_det) >= self.occ_arrives
and self.ready_to_update
):
occ_sInd = self.choose_next_occulter_target(
old_occ_sInd, occ_sInds, occ_intTimes
)
if old_occ_sInd is None:
self.occ_arrives = TK.currentTimeAbs.copy()
else:
self.occ_arrives = occ_startTimes[occ_sInd]
self.occ_slewTime = slewTimes[occ_sInd]
self.occ_sd = sd[occ_sInd]
self.ready_to_update = False
elif not np.any(sInds):
TK.advanceToAbsTime(TK.currentTimeAbs.copy() + 1 * u.d)
continue
if occ_sInd is not None:
sInds = sInds[(sInds != occ_sInd)]
if self.tot_det_int_cutoff < self.tot_dettime:
sInds = np.array([])
if np.any(sInds):
# choose sInd of next target
sInd = self.choose_next_telescope_target(
old_sInd, sInds, intTimes[sInds]
)
# store relevant values
t_det = intTimes[sInd]
else:
sInd = None
# if no observable target, call the TimeKeeping.wait() method
if not np.any(sInds) and not np.any(occ_sInds):
self.vprint(
"No Observable Targets at currentTimeNorm = "
+ str(TK.currentTimeNorm.copy())
)
return DRM, None, None, None, None, None
break
else:
self.logger.info("Mission complete: no more time available")
self.vprint("Mission complete: no more time available")
return DRM, None, None, None, None, None
if TK.mission_is_over(OS, Obs, det_mode):
self.logger.info("Mission complete: no more time available")
self.vprint("Mission complete: no more time available")
return DRM, None, None, None, None, None
return DRM, sInd, occ_sInd, t_det, sd, occ_sInds
[docs]
def choose_next_occulter_target(self, old_occ_sInd, occ_sInds, intTimes):
"""Choose next target for the occulter based on truncated
depth first search of linear cost function.
Args:
old_occ_sInd (integer):
Index of the previous target star
occ_sInds (integer array):
Indices of available targets
intTimes (astropy Quantity array):
Integration times for detection in units of day
Returns:
sInd (int):
Index of next target star
"""
# Choose next Occulter target
Comp = self.Completeness
TL = self.TargetList
TK = self.TimeKeeping
OS = self.OpticalSystem
# reshape sInds, store available top9 sInds
occ_sInds = np.array(occ_sInds, ndmin=1)
top_HIPs = self.occHIPs[: self.topstars]
top_sInds = np.intersect1d(np.where(np.isin(TL.Name, top_HIPs))[0], occ_sInds)
# current stars have to be in the adjmat
if (old_occ_sInd is not None) and (old_occ_sInd not in occ_sInds):
occ_sInds = np.append(occ_sInds, old_occ_sInd)
# get completeness values
comps = Comp.completeness_update(
TL, occ_sInds, self.occ_starVisits[occ_sInds], TK.currentTimeNorm.copy()
)
# if first target, or if only 1 available target,
# choose highest available completeness
nStars = len(occ_sInds)
if (old_occ_sInd is None) or (nStars == 1):
occ_sInd = np.random.choice(occ_sInds[comps == max(comps)])
return occ_sInd
# define adjacency matrix
A = np.zeros((nStars, nStars))
# consider slew distance when there's an occulter
r_ts = TL.starprop(occ_sInds, TK.currentTimeAbs.copy())
u_ts = (r_ts.to("AU").value.T / np.linalg.norm(r_ts.to("AU").value, axis=1)).T
angdists = np.arccos(np.clip(np.dot(u_ts, u_ts.T), -1, 1))
A[np.ones((nStars), dtype=bool)] = angdists
A = self.coeffs[0] * (A) / np.pi
# add factor due to completeness
A = A + self.coeffs[1] * (1 - comps)
# add factor due to intTime
intTimes[old_occ_sInd] = np.inf
A = A + self.coeffs[2] * (intTimes[occ_sInds] / OS.intCutoff)
# add factor for unvisited ramp for deep dive stars
if np.any(top_sInds):
# add factor for least visited deep dive stars
f_uv = np.zeros(nStars)
u1 = np.isin(occ_sInds, top_sInds)
u2 = self.occ_starVisits[occ_sInds] == min(self.occ_starVisits[top_sInds])
unvisited = np.logical_and(u1, u2)
f_uv[unvisited] = (
float(TK.currentTimeNorm.copy() / TK.missionLife.copy()) ** 2
)
A = A - self.coeffs[3] * f_uv
self.coeff_data_a3.append([occ_sInds, f_uv])
# add factor for unvisited deep dive stars
no_visits = np.zeros(nStars)
# no_visits[u1] = np.ones(len(top_sInds))
u2 = self.occ_starVisits[occ_sInds] == 0
unvisited = np.logical_and(u1, u2)
no_visits[unvisited] = 1.0
A = A - self.coeffs[4] * no_visits
self.coeff_data_a4.append([occ_sInds, no_visits])
self.coeff_time.append(TK.currentTimeNorm.copy().value)
# add factor due to unvisited ramp
f_uv = np.zeros(nStars)
unvisited = self.occ_starVisits[occ_sInds] == 0
f_uv[unvisited] = float(TK.currentTimeNorm.copy() / TK.missionLife.copy()) ** 2
A = A - self.coeffs[5] * f_uv
# add factor due to revisited ramp
if self.occ_starRevisit.size != 0:
f2_uv = 1 - (np.isin(occ_sInds, self.occ_starRevisit[:, 0]))
A = A + self.coeffs[6] * f2_uv
# kill diagonal
A = A + np.diag(np.ones(nStars) * np.inf)
# take two traversal steps
step1 = np.tile(A[occ_sInds == old_occ_sInd, :], (nStars, 1)).flatten("F")
step2 = A[np.array(np.ones((nStars, nStars)), dtype=bool)]
tmp = np.nanargmin(step1 + step2)
occ_sInd = occ_sInds[int(np.floor(tmp / float(nStars)))]
return occ_sInd
[docs]
def choose_next_telescope_target(self, old_sInd, sInds, t_dets):
"""Choose next telescope target based on star completeness and integration time.
Args:
old_sInd (integer):
Index of the previous target star
sInds (integer array):
Indices of available targets
t_dets (astropy Quantity array):
Integration times for detection in units of day
Returns:
sInd (integer):
Index of next target star
"""
Comp = self.Completeness
TL = self.TargetList
TK = self.TimeKeeping
OS = self.OpticalSystem
Obs = self.Observatory
allModes = OS.observingModes
# reshape sInds
sInds = np.array(sInds, ndmin=1)
# 1/ Choose next telescope target
comps = Comp.completeness_update(
TL, sInds, self.starVisits[sInds], TK.currentTimeNorm.copy()
)
# add weight for star revisits
ind_rev = []
if self.starRevisit.size != 0:
dt_rev = self.starRevisit[:, 1] * u.day - TK.currentTimeNorm.copy()
ind_rev = [
int(x) for x in self.starRevisit[dt_rev < 0 * u.d, 0] if x in sInds
]
f2_uv = np.where(
(self.starVisits[sInds] > 0) & (self.starVisits[sInds] < self.nVisitsMax),
self.starVisits[sInds],
0,
) * (1 - (np.isin(sInds, ind_rev, invert=True)))
# L = TL.L[sInds]
l_extreme = max(
[
np.abs(np.log10(np.min(TL.L[sInds]))),
np.abs(np.log10(np.max(TL.L[sInds]))),
]
)
if l_extreme == 0.0:
l_weight = 1
else:
l_weight = 1 - np.abs(np.log10(TL.L[sInds]) / l_extreme) ** self.lum_exp
t_weight = t_dets / np.max(t_dets)
weights = (
(comps + self.revisit_weight * f2_uv / float(self.nVisitsMax)) / t_weight
) * l_weight
# weights = (comps + self.revisit_weight*f2_uv/float(self.nVisitsMax))*l_weight
sInd = np.random.choice(sInds[weights == max(weights)])
# Check if exoplanetObsTime would be exceeded
mode = list(filter(lambda mode: mode["detectionMode"], allModes))[0]
(
maxIntTimeOBendTime,
maxIntTimeExoplanetObsTime,
maxIntTimeMissionLife,
) = TK.get_ObsDetectionMaxIntTime(Obs, mode)
maxIntTime = min(
maxIntTimeOBendTime, maxIntTimeExoplanetObsTime, maxIntTimeMissionLife
) # Maximum intTime allowed
intTimes2 = self.calc_targ_intTime(
np.array([sInd]), TK.currentTimeAbs.copy(), mode
)
if (
intTimes2 > maxIntTime
): # check if max allowed integration time would be exceeded
self.vprint("max allowed integration time would be exceeded")
sInd = None
waitTime = 1.0 * u.d # noqa: F841
return sInd
[docs]
def calc_int_inflection(self, t_sInds, JEZ, startTime, WA, mode, ischar=False):
"""Calculate integration time based on inflection point of Completeness
as a function of int_time
Args:
t_sInds (integer array):
Indices of the target stars
JEZ (astropy Quantity):
Intensity of exo-zodiacal light in units of ph/s/m2/arcsec2
startTime (astropy Quantity array):
Surface brightness of local zodiacal light in units of 1/arcsec2
WA (astropy Quantity):
Working angle of the planet of interest in units of arcsec
mode (dict):
Selected observing mode
Returns:
int_times (astropy quantity array):
The suggested integration time
"""
Comp = self.Completeness
TL = self.TargetList
ZL = self.ZodiacalLight
Obs = self.Observatory
num_points = 500
intTimes = np.logspace(-5, 2, num_points) * u.d
sInds = np.arange(TL.nStars)
# don't use WA input because we don't know planet positions
# before characterization
WA = TL.int_WA
curve = np.zeros([1, sInds.size, intTimes.size])
Cpath = os.path.join(Comp.classpath, Comp.filename + ".fcomp")
# if no preexisting curves exist, either load from file or calculate
if self.curves is None:
if os.path.exists(Cpath):
self.vprint('Loading cached completeness file from "{}".'.format(Cpath))
with open(Cpath, "rb") as cfile:
curves = pickle.load(cfile)
self.vprint("Completeness curves loaded from cache.")
else:
# calculate completeness curves for all sInds
self.vprint('Cached completeness file not found at "{}".'.format(Cpath))
self.vprint("Beginning completeness curve calculations.")
curves = {}
for t_i, t in enumerate(intTimes):
fZ = ZL.fZ(Obs, TL, sInds, startTime, mode)
curve[0, :, t_i] = Comp.comp_per_intTime(
t, TL, sInds, fZ, JEZ, WA, mode
)
curves[mode["systName"]] = curve
with open(Cpath, "wb") as cfile:
pickle.dump(curves, cfile)
self.vprint("completeness curves stored in {}".format(Cpath))
self.curves = curves
# if no curves for current mode
if (
mode["systName"] not in self.curves.keys()
or TL.nStars != self.curves[mode["systName"]].shape[1]
):
for t_i, t in enumerate(intTimes):
fZ = ZL.fZ(Obs, TL, sInds, startTime, mode)
curve[0, :, t_i] = Comp.comp_per_intTime(
t, TL, sInds, fZ, JEZ, WA, mode
)
self.curves[mode["systName"]] = curve
with open(Cpath, "wb") as cfile:
pickle.dump(self.curves, cfile)
self.vprint("recalculated completeness curves stored in {}".format(Cpath))
int_times = np.zeros(len(t_sInds)) * u.d
for i, sInd in enumerate(t_sInds):
c_v_t = self.curves[mode["systName"]][0, sInd, :]
dcdt = np.diff(c_v_t) / np.diff(intTimes)
# find the inflection point of the completeness graph
if ischar is False:
target_point = max(dcdt).value + 10 * np.var(dcdt).value
idc = np.abs(dcdt - target_point / (1 * u.d)).argmin()
int_time = intTimes[idc]
int_time = int_time * self.starVisits[sInd]
# update star completeness
idx = (np.abs(intTimes - int_time)).argmin()
comp = c_v_t[idx]
TL.comp[sInd] = comp
else:
idt = np.abs(intTimes - max(intTimes)).argmin()
idx = np.abs(c_v_t - c_v_t[idt] * 0.9).argmin()
# idx = np.abs(comps - max(comps)*.9).argmin()
int_time = intTimes[idx]
comp = c_v_t[idx]
int_times[i] = int_time
int_times[int_times < 2.000e-5 * u.d] = 0.0 * u.d
return int_times
[docs]
def observation_characterization(self, sInd, mode):
"""Finds if characterizations are possible and relevant information
Args:
sInd (integer):
Integer index of the star of interest
mode (dict):
Selected observing mode for characterization
Returns:
characterized (integer list):
Characterization status for each planet orbiting the observed
target star including False Alarm if any, where 1 is full spectrum,
-1 partial spectrum, and 0 not characterized
fZ (astropy Quantity):
Surface brightness of local zodiacal light in units of 1/arcsec2
JEZ (astropy.units.Quantity(numpy.ndarray(float))):
Intensity of exo-zodiacal light in units of photons/s/m2/arcsec2
systemParams (dict):
Dictionary of time-dependant planet properties averaged over the
duration of the integration
SNR (float ndarray):
Characterization signal-to-noise ratio of the observable planets.
Defaults to None.
intTime (astropy Quantity):
Selected star characterization time in units of day. Defaults to None.
"""
OS = self.OpticalSystem
ZL = self.ZodiacalLight
TL = self.TargetList
SU = self.SimulatedUniverse
Obs = self.Observatory
TK = self.TimeKeeping
# selecting appropriate koMap
koMap = self.koMaps[mode["syst"]["name"]]
# find indices of planets around the target
pInds = np.where(SU.plan2star == sInd)[0]
pinds_earthlike = np.array([])
det = np.ones(pInds.size, dtype=bool)
JEZs = SU.scale_JEZ(sInd, mode)
dMags = SU.dMag[pInds]
# WAs = SU.WA[pInds].to("arcsec").value
if SU.lucky_planets:
# used in the "partial char" check below
WAs = np.arctan(SU.a[pInds] / TL.dist[sInd]).to("arcsec").value
else:
WAs = SU.WA[pInds].to("arcsec").value
FA = det.size == pInds.size + 1
if FA:
pIndsDet = np.append(pInds, -1)[det]
else:
pIndsDet = pInds[det]
# initialize outputs, and check if any planet to characterize
characterized = np.zeros(det.size, dtype=int)
fZ = 0.0 / u.arcsec**2
JEZ = np.zeros(len(det)) * u.ph / u.s / u.m**2 / u.arcsec**2
systemParams = SU.dump_system_params(
sInd
) # write current system params by default
SNR = np.zeros(len(det))
intTime = None
if len(det) == 0: # nothing to characterize
HIP_sInds = np.where(np.isin(TL.Name, self.occHIPs))[0]
if sInd in HIP_sInds:
startTime = TK.currentTimeAbs.copy()
startTimeNorm = TK.currentTimeNorm.copy()
intTime = self.calc_targ_intTime(np.array([sInd]), startTime, mode)[0]
extraTime = intTime * (
mode["timeMultiplier"] - 1.0
) # calculates extraTime
# add a predetermined margin to the integration times
intTime = intTime * (1 + self.charMargin)
# apply time multiplier
totTime = intTime * (mode["timeMultiplier"])
# end times
endTimes = startTime + totTime
endTimesNorm = startTimeNorm + totTime
# planets to characterize
tochar = (
(totTime > 0)
& (totTime <= OS.intCutoff)
& (endTimesNorm <= TK.OBendTimes[TK.OBnumber])
)
success = TK.allocate_time(
intTime + extraTime + mode["syst"]["ohTime"] + Obs.settlingTime,
True,
) # allocates time
if not (success) or not (tochar):
intTime = None
if sInd not in self.sInd_charcounts.keys():
self.sInd_charcounts[sInd] = characterized
return characterized, fZ, JEZ, systemParams, SNR, intTime
# look for last detected planets that have not been fully characterized
if not (FA): # only true planets, no FA
tochar = self.fullSpectra[pIndsDet] != -2
else: # mix of planets and a FA
truePlans = pIndsDet[:-1]
tochar = np.append((self.fullSpectra[truePlans] == 0), True)
# 1/ find spacecraft orbital START position and check keepout angle
if np.any(tochar):
# start times
startTime = TK.currentTimeAbs.copy()
startTimeNorm = TK.currentTimeNorm.copy()
# planets to characterize
koTimeInd = np.where(np.round(startTime.value) - self.koTimes.value == 0)[
0
][
0
] # find indice where koTime is startTime[0]
# wherever koMap is 1, the target is observable
tochar[tochar] = koMap[sInd][koTimeInd]
# 2/ if any planet to characterize, find the characterization times
if np.any(tochar):
# propagate the whole system to match up with current time
# calculate characterization times at the detected JEZ, dMag, and WA
pinds_earthlike = np.logical_and(
np.array([(p in self.known_earths) for p in pIndsDet]), tochar
)
fZ = ZL.fZ(Obs, TL, sInd, startTime, mode)
JEZ = JEZs[tochar]
WAp = TL.int_WA[sInd] * np.ones(len(tochar))
dMag = TL.int_dMag[sInd] * np.ones(len(tochar))
# if lucky_planets, use lucky planet params for dMag and WA
if SU.lucky_planets:
phi = (1 / np.pi) * np.ones(len(SU.d))
e_dMag = deltaMag(SU.p, SU.Rp, SU.d, phi) # delta magnitude
e_WA = np.arctan(SU.a / TL.dist[SU.plan2star]).to(
"arcsec"
) # working angle
else:
e_dMag = SU.dMag
e_WA = SU.WA
WAp[pinds_earthlike[tochar]] = e_WA[pIndsDet[pinds_earthlike]]
dMag[pinds_earthlike[tochar]] = e_dMag[pIndsDet[pinds_earthlike]]
intTimes = np.zeros(len(tochar)) * u.day
if self.int_inflection:
for i, j in enumerate(WAp):
if tochar[i]:
intTimes[i] = self.calc_int_inflection(
[sInd], JEZ[i], startTime, j, mode, ischar=True
)[0]
else:
intTimes[tochar] = OS.calc_intTime(TL, sInd, fZ, JEZ, dMag, WAp, mode)
intTimes[~np.isfinite(intTimes)] = 0 * u.d
# add a predetermined margin to the integration times
intTimes = intTimes * (1 + self.charMargin)
# apply time multiplier
totTimes = intTimes * (mode["timeMultiplier"])
# end times
endTimes = startTime + totTimes
endTimesNorm = startTimeNorm + totTimes
# planets to characterize
tochar = (
(totTimes > 0)
& (totTimes <= OS.intCutoff)
& (endTimesNorm <= TK.OBendTimes[TK.OBnumber])
)
# 3/ is target still observable at the end of any char time?
if np.any(tochar) and Obs.checkKeepoutEnd:
koTimeInds = np.zeros(len(endTimes.value[tochar]), dtype=int)
# find index in koMap where each endTime is closest to koTimes
for t, endTime in enumerate(endTimes.value[tochar]):
if endTime > self.koTimes.value[-1]:
# case where endTime exceeds largest koTimes element
endTimeInBounds = np.where(
np.floor(endTime) - self.koTimes.value == 0
)[0]
koTimeInds[t] = (
endTimeInBounds[0] if endTimeInBounds.size != 0 else -1
)
else:
koTimeInds[t] = np.where(
np.round(endTime) - self.koTimes.value == 0
)[0][
0
] # find indice where koTime is endTimes[0]
tochar[tochar] = [koMap[sInd][koT] if koT >= 0 else 0 for koT in koTimeInds]
# 4/ if yes, perform the characterization for the maximum char time
if np.any(tochar):
# Save Current Time before attempting time allocation
currentTimeNorm = TK.currentTimeNorm.copy()
currentTimeAbs = TK.currentTimeAbs.copy()
if np.any(np.logical_and(pinds_earthlike, tochar)):
intTime = np.max(intTimes[np.logical_and(pinds_earthlike, tochar)])
else:
intTime = np.max(intTimes[tochar])
extraTime = intTime * (mode["timeMultiplier"] - 1.0) # calculates extraTime
success = TK.allocate_time(
intTime + extraTime + mode["syst"]["ohTime"] + Obs.settlingTime, True
) # allocates time
if not (success): # Time was not successfully allocated
# Identical to when "if char_mode['SNR'] not in [0, np.inf]:"
# in run_sim()
char_intTime = None
lenChar = len(pInds) + 1 if FA else len(pInds)
characterized = np.zeros(lenChar, dtype=float)
char_SNR = np.zeros(lenChar, dtype=float)
char_fZ = 0.0 / u.arcsec**2
char_JEZ = 0.0 * u.ph / u.s / u.m**2 / u.arcsec**2
char_systemParams = SU.dump_system_params(sInd)
return (
characterized,
char_fZ,
char_JEZ,
char_systemParams,
char_SNR,
char_intTime,
)
pIndsChar = pIndsDet[tochar]
log_char = " - Charact. planet(s) %s (%s/%s detected)" % (
pIndsChar,
len(pIndsChar),
len(pIndsDet),
)
self.logger.info(log_char)
self.vprint(log_char)
SNR, fZ, systemParams = self.find_char_SNR(
sInd, pIndsChar, currentTimeNorm, intTime, mode
)
# now, store characterization status: 1 for full spectrum,
# -1 for partial spectrum, 0 for not characterized
char = SNR >= mode["SNR"]
# initialize with full spectra
characterized = char.astype(int)
# WAchar = WAs[char] * u.arcsec
# find the current WAs of characterized planets
WA = WAs * u.arcsec
if FA:
WAs = np.append(WAs, WAs[-1] * u.arcsec)
all_full = np.copy(characterized)
all_full[char] = 1
if sInd not in self.sInd_charcounts.keys():
self.sInd_charcounts[sInd] = all_full
else:
self.sInd_charcounts[sInd] = self.sInd_charcounts[sInd] + all_full
# encode results in spectra lists (only for planets, not FA)
charplans = characterized[:-1] if FA else characterized
self.fullSpectra[pInds[charplans == 1]] += 1
self.partialSpectra[pInds[charplans == -1]] += 1
# in both cases (detection or false alarm), schedule a revisit
smin = np.min(SU.s[pInds[det]])
Ms = TL.MsTrue[sInd]
# if target in promoted_stars list, schedule revisit based off of
# semi-major axis
if sInd in self.promoted_stars:
sp = np.min(SU.a[pInds[det]]).to("AU")
if np.any(det):
pInd_smin = pInds[det][np.argmin(SU.a[pInds[det]])]
Mp = SU.Mp[pInd_smin]
else:
Mp = SU.Mp.mean()
mu = const.G * (Mp + Ms)
T = 2.0 * np.pi * np.sqrt(sp**3 / mu)
t_rev = TK.currentTimeNorm.copy() + T / 3.0
# otherwise schedule revisit based off of seperation
elif smin is not None:
sp = smin
if np.any(det):
pInd_smin = pInds[det][np.argmin(SU.s[pInds[det]])]
Mp = SU.Mp[pInd_smin]
else:
Mp = SU.Mp.mean()
mu = const.G * (Mp + Ms)
T = 2.0 * np.pi * np.sqrt(sp**3 / mu)
t_rev = TK.currentTimeNorm.copy() + T / 2.0
# otherwise, revisit based on average of population semi-major axis and mass
else:
sp = SU.s.mean()
Mp = SU.Mp.mean()
mu = const.G * (Mp + Ms)
T = 2.0 * np.pi * np.sqrt(sp**3 / mu)
t_rev = TK.currentTimeNorm.copy() + 0.75 * T
# finally, populate the revisit list (NOTE: sInd becomes a float)
revisit = np.array([sInd, t_rev.to("day").value])
if self.occ_starRevisit.size == 0:
self.occ_starRevisit = np.array([revisit])
else:
revInd = np.where(self.occ_starRevisit[:, 0] == sInd)[0]
if revInd.size == 0:
self.occ_starRevisit = np.vstack((self.occ_starRevisit, revisit))
else:
self.occ_starRevisit[revInd, 1] = revisit[1]
# add stars to filter list
if np.any(characterized.astype(int) == 1):
top_HIPs = self.occHIPs[: self.topstars] # noqa :F841
# if a top star has had max_successful_chars remove from list
if np.any(self.sInd_charcounts[sInd] >= self.max_successful_chars):
self.ignore_stars.append(sInd)
# #if a promoted star has an earthlike char, then ignore
# if sInd in self.promoted_stars:
# c_plans = pInds[charplans == 1]
# if np.any(
# np.logical_and(
# (SU.a[c_plans] > 0.95 * u.AU), (SU.a[c_plans] < 1.67 * u.AU)
# )
# ):
# if np.any(
# (0.8 * (SU.a[c_plans] ** -0.5).value < SU.Rp[c_plans].value)
# & (SU.Rp[c_plans].value < 1.4)
# ):
# self.ignore_stars.append(sInd)
return characterized.astype(int), fZ, JEZ, systemParams, SNR, intTime
[docs]
def revisit_inds(self, sInds, tmpCurrentTimeNorm):
dt_rev = self.starRevisit[:, 1] * u.day - tmpCurrentTimeNorm
ind_rev = [
int(x) for x in self.starRevisit[dt_rev < 0 * u.d, 0] if (x in sInds)
]
return ind_rev