from EXOSIMS.util.vprint import vprint
from EXOSIMS.util.get_dirs import get_cache_dir
import numpy as np
import astropy.units as u
from astropy.time import Time
import os
import csv
[docs]
class TimeKeeping(object):
""":ref:`TimeKeeping` Prototype
This class keeps track of the current mission elapsed time
for exoplanet mission simulation. It is initialized with a
mission duration, and throughout the simulation, it allocates
temporal intervals for observations. Eventually, all available
time has been allocated, and the mission is over.
Time is allocated in contiguous windows of size "duration". If a
requested interval does not fit in the current window, we move to
the next one.
Args:
missionStart (float):
Mission start date in MJD. Defaults to 60634 (11-20-2024)
missionLife (float):
Mission duration (in years). Defaults to 0.1
missionPortion (float):
Fraction of mission devoted to exoplanet imaging science.
Must be between 0 and 1. Defaults to 1
OBduration (float):
Observing block length (in days). If infinite, do not
define observing blocks. Defaults to np.inf
missionSchedule (str, optional):
Full path to mission schedule file stored on disk.
Defaults None.
cachedir (str, optional):
Full path to cachedir.
If None (default) use default (see :ref:`EXOSIMSCACHE`)
**specs:
:ref:`sec:inputspec`
Attributes:
_outspec (dict):
:ref:`sec:outspec`
cachedir (str):
Path to the EXOSIMS cache directory (see :ref:`EXOSIMSCACHE`)
currentTimeAbs (astropy.time.core.Time):
Current absolute mission time in MJD
currentTimeNorm (astropy.units.quantity.Quantity):
Current mission time minus mission start time.
exoplanetObsTime (astropy.units.quantity.Quantity):
How much time has been used so far on exoplanet science.
missionFinishAbs (astropy.time.core.Time):
Mission end time in MJD
missionLife (astropy.units.quantity.Quantity):
Total mission duration
missionPortion (float):
Fraction of mission devoted to exoplanet science
missionStart (astropy.time.core.Time):
Start time of mission in MJD
OBduration (astropy.units.quantity.Quantity):
Observing block length
OBendTimes (astropy.units.quantity.Quantity):
Array containing the normalized end times of each observing block
throughout the mission
OBnumber (int):
Index of the current observing block (OB). Each
observing block has a duration, a start time, an end time, and can
host one or multiple observations
OBstartTimes (astropy.units.quantity.Quantity):
Array containing the normalized start times of each observing block
throughout the mission
"""
_modtype = "TimeKeeping"
def __init__(
self,
missionStart=60634,
missionLife=0.1,
missionPortion=1,
OBduration=np.inf,
missionSchedule=None,
cachedir=None,
**specs,
):
# start the outspec
self._outspec = {}
# get cache directory
self.cachedir = get_cache_dir(cachedir)
self._outspec["cachedir"] = self.cachedir
specs["cachedir"] = self.cachedir
# load the vprint function (same line in all prototype module constructors)
self.vprint = vprint(specs.get("verbose", True))
# illegal value checks
assert missionLife >= 0, "Need missionLife >= 0, got %f" % missionLife
# arithmetic on missionPortion fails if it is outside the legal range
assert missionPortion > 0 and missionPortion <= 1, (
"Require missionPortion in the interval [0,1], got %f" % missionPortion
)
# OBduration must be positive nonzero
assert OBduration * u.d > 0 * u.d, (
"Required OBduration positive nonzero, got %f" % OBduration
)
# set up state variables
# tai scale specified because the default, utc, requires accounting for leap
# seconds, causing warnings from astropy.time when time-deltas are added
# Absolute mission start time:
self.missionStart = Time(float(missionStart), format="mjd", scale="tai")
# Fraction of missionLife allowed for exoplanet science
self.missionPortion = float(missionPortion)
# Total mission duration
self.missionLife = float(missionLife) * u.year
self.missionLife_d = self.missionLife.to_value(u.d)
self.allocated_time_d = self.missionLife_d * self.missionPortion
# populate outspec
for att in self.__dict__:
if att not in ["vprint", "_outspec", "missionLife_d", "allocated_time_d"]:
dat = self.__dict__[att]
self._outspec[att] = (
dat.value if isinstance(dat, (u.Quantity, Time)) else dat
)
# Absolute mission end time
self.missionFinishAbs = self.missionStart + self.missionLife.to("day")
# initialize values updated by various class methods
# the current mission elapsed time (0 at mission start)
self.currentTimeNorm = 0.0 * u.day
# current absolute mission time (equals missionStart at mission start)
self.currentTimeAbs = self.missionStart
# initialize observing block times arrays.
# An Observing Block is a segment of time over which observations may take place
self.init_OB(str(missionSchedule), OBduration * u.d)
self._outspec["missionSchedule"] = missionSchedule
self._outspec["OBduration"] = OBduration
# initialize time spend using instrument
self.exoplanetObsTime = 0 * u.day
def __str__(self):
r"""String representation of the TimeKeeping object.
When the command 'print' is used on the TimeKeeping object, this
method prints the values contained in the object."""
for att in self.__dict__:
print("%s: %r" % (att, getattr(self, att)))
return (
"TimeKeeping instance at %.6f days" % self.currentTimeNorm.to("day").value
)
[docs]
def init_OB(self, missionSchedule, OBduration):
"""
Initializes mission Observing Blocks from file or missionDuration, missionLife,
and missionPortion. Updates attributes OBstartTimes, OBendTimes, and OBnumber
Args:
missionSchedule (str):
A string containing the missionSchedule file (or "None").
OBduration (~astropy.units.Quantity):
Observing block length
Returns:
None
"""
if not missionSchedule == "None": # If the missionSchedule is specified
tmpOBtimes = list()
schedulefname = str(
os.path.dirname(__file__) + "/../Scripts/" + missionSchedule
) # .csv file in EXOSIMS/Scripts folder
if not os.path.isfile(schedulefname):
# Check if scriptNames in ScriptsPath
ScriptsPath = str(os.path.dirname(__file__) + "/../../../Scripts/")
makeSimilar_TemplateFolder = ""
dirsFolderDown = [
x[0].split("/")[-1] for x in os.walk(ScriptsPath)
] # Get all directories in ScriptsPath
for tmpFolder in dirsFolderDown:
if (
os.path.isfile(ScriptsPath + tmpFolder + "/" + missionSchedule)
and not tmpFolder == ""
): # We found the Scripts folder containing scriptfile
makeSimilar_TemplateFolder = (
tmpFolder + "/"
) # We found the file!!!
break
schedulefname = str(
ScriptsPath + makeSimilar_TemplateFolder + missionSchedule
) # .csv file in EXOSIMS/Scripts folder
if os.path.isfile(
schedulefname
): # Check if a mission schedule is manually specified
self.vprint("Loading Manual Schedule from %s" % missionSchedule)
with open(schedulefname, "r") as f: # load csv file
lines = csv.reader(f, delimiter=",")
self.vprint("The manual Schedule is:")
for line in lines:
tmpOBtimes.append(line)
self.vprint(line)
self.OBstartTimes = (
np.asarray([float(item[0]) for item in tmpOBtimes]) * u.d
)
self.OBendTimes = (
np.asarray([float(item[1]) for item in tmpOBtimes]) * u.d
)
# Automatically construct OB from OBduration, missionLife, and missionPortion
else:
if OBduration == np.inf * u.d: # There is 1 OB spanning the mission
self.OBstartTimes = np.asarray([0]) * u.d
self.OBendTimes = np.asarray([self.missionLife.to("day").value]) * u.d
else: # OB
startToStart = OBduration / self.missionPortion
numBlocks = np.ceil(
self.missionLife.to("day") / startToStart
) # This is the number of Observing Blocks
self.OBstartTimes = np.arange(numBlocks) * startToStart
self.OBendTimes = self.OBstartTimes + OBduration
if self.OBendTimes[-1] > self.missionLife.to(
"day"
): # If the end of the last observing block exceeds the end of mission
self.OBendTimes[-1] = self.missionLife.to(
"day"
).copy() # Set end of last OB to end of mission
self.OBduration = OBduration
self.OBnumber = 0
self.vprint("OBendTimes is: " + str(self.OBendTimes))
[docs]
def mission_is_over(self, OS, Obs, mode):
r"""Are the mission time, or any other mission resources, exhausted?
Args:
OS (:ref:`OpticalSystem`):
Optical System object
Obs (:ref:`Observatory`):
Observatory object
mode (dict):
Selected observing mode for detection (uses only overhead time)
Returns:
bool:
True if the mission time or fuel are used up, else False.
"""
# let's be optimistic and assume we still have time
is_over = False
# if we've exceeded total mission time (or overhead on the next observation
# will make us exceed total time, we're done)
if self.currentTimeNorm + Obs.settlingTime + mode["syst"][
"ohTime"
] >= self.missionLife.to("day"):
self.vprint(
"missionLife would be exceeded at %s"
% self.currentTimeNorm.to("day").round(2)
)
is_over = True
# if we've used up our time allocation (or overhead on the next observation
# will make us exceed it, we're done)
if (
self.exoplanetObsTime.to("day") + Obs.settlingTime + mode["syst"]["ohTime"]
>= self.missionLife.to("day") * self.missionPortion
):
self.vprint(
(
"exoplanetObstime ({:.2f}) would exceed "
"(missionPortion*missionLife) = {:.2f}) at currentTimeNorm = {}"
).format(
self.exoplanetObsTime,
self.missionPortion * self.missionLife.to("day"),
self.currentTimeNorm.to("day").round(2),
)
)
is_over = True
# if overheads will put us past the end of the final observing block, we're done
if (
self.currentTimeNorm + Obs.settlingTime + mode["syst"]["ohTime"]
>= self.OBendTimes[-1]
):
self.vprint(
(
"Last Observing Block (# {}, end time: {:.2f}) would be "
"exceeded at currentTimeNorm {}"
).format(
self.OBnumber,
self.OBendTimes[-1],
self.currentTimeNorm.to("day").round(2),
)
)
is_over = True
# and now, all the fuel stuff
if OS.haveOcculter:
# handle case of separate fuel tanks for slew and sk:
if Obs.twotanks:
if Obs.skMass <= 0 * u.kg:
self.vprint(
"Stationkeeping fuel exhausted at currentTimeNorm %s"
% (self.currentTimeNorm.to("day").round(2))
)
# see if we can refuel
if not (Obs.refuel_tank(self, tank="sk")):
is_over = True
if Obs.slewMass <= 0 * u.kg:
self.vprint(
"Slew fuel exhausted at currentTimeNorm %s"
% (self.currentTimeNorm.to("day").round(2))
)
# see if we can refuel
if not (Obs.refuel_tank(self, tank="slew")):
is_over = True
# now consider case of only one tank
else:
if Obs.scMass <= Obs.dryMass:
self.vprint(
"Fuel exhausted at currentTimeNorm %s"
% (self.currentTimeNorm.to("day").round(2))
)
# see if we can refuel
if not (Obs.refuel_tank(self)):
is_over = True
return is_over
[docs]
def allocate_time(self, dt, addExoplanetObsTime=True):
r"""Allocate a temporal block of width dt
Advance the mission time by dt units. Updates attributes currentTimeNorm and
currentTimeAbs
Args:
dt (~astropy.units.Quantity):
Temporal block allocated in units of days
addExoplanetObsTime (bool):
Indicates the allocated time is for the primary instrument (True)
or some other instrument (False)
By default this function assumes all allocated time is attributed to
the primary instrument (is True)
Returns:
bool:
a flag indicating the time allocation was successful or not successful
"""
# Check dt validity
if dt.value <= 0 or dt.value == np.inf:
self.vprint("dt must be positive and nonzero (got {})".format(dt))
return False # The temporal block to allocate is not positive nonzero
# Check dt exceeds mission life
if self.currentTimeNorm + dt > self.missionLife.to("day"):
self.vprint(
(
"Allocating dt = {} at curremtTimeNorm = {} would exceed "
"missionLife = {}"
).format(
dt,
self.currentTimeNorm,
self.missionLife.to("day"),
)
)
return False # The time to allocate would exceed the missionLife
# Check dt exceeds current OB
if self.currentTimeNorm + dt > self.OBendTimes[self.OBnumber]:
self.vprint(
(
"Allocating dt = {} at currentTimeNorm = {} would exceed the end "
"of observing block #{} with end time {}"
).format(
dt,
self.currentTimeNorm,
self.OBnumber,
self.OBendTimes[self.OBnumber],
)
)
return False
# Check exceeds allowed instrument Time
if addExoplanetObsTime:
if (
self.exoplanetObsTime + dt
> self.missionLife.to("day") * self.missionPortion
):
self.vprint(
(
"Allocating dt = {} with current exoplanetObsTime = {} would "
"exceed (missionPortion*missionLife) = {:.2f}"
).format(
dt,
self.exoplanetObsTime,
self.missionLife.to("day") * self.missionPortion,
)
)
return False
self.currentTimeAbs += dt
self.currentTimeNorm += dt
self.exoplanetObsTime += dt
return True
else: # Time will not be counted against exoplanetObstime
self.currentTimeAbs += dt
self.currentTimeNorm += dt
return True
[docs]
def advancetToStartOfNextOB(self):
"""Advances to Start of Next Observation Block
This method is called in the allocate_time() method of the TimeKeeping
class object, when the allocated time requires moving outside of the current OB.
If no OB duration was specified, a new Observing Block is created for
each observation in the SurveySimulation module. Updates attributes OBnumber,
currentTimeNorm and currentTimeAbs.
"""
self.OBnumber += 1 # increase the observation block number
self.currentTimeNorm = self.OBstartTimes[
self.OBnumber
] # update currentTimeNorm
self.currentTimeAbs = (
self.OBstartTimes[self.OBnumber] + self.missionStart
) # update currentTimeAbs
# begin Survey, and loop until mission is finished
log_begin = "OB%s:" % (
self.OBnumber
) # prints because this is the beginning of the nesxt observation block
self.vprint(log_begin)
self.vprint(
"Advanced currentTimeNorm to beginning of next OB %.2fd"
% (self.currentTimeNorm.to("day").value)
)
[docs]
def advanceToAbsTime(self, tAbs, addExoplanetObsTime=True):
"""Advances the current mission time to tAbs.
Updates attributes currentTimeNorma dn currentTimeAbs
Args:
tAbs (~astropy.time.Time):
The absolute mission time to advance currentTimeAbs to.
MUST HAVE scale='tai'
addExoplanetObsTime (bool):
A flag indicating whether to add advanced time to exoplanetObsTime or
not
Returns:
bool:
A bool indicating whether the operation was successful or not
"""
# Checks on tAbs validity
if tAbs <= self.currentTimeAbs:
self.vprint(
"The time to advance to "
+ str(tAbs)
+ " is not after "
+ str(self.currentTimeAbs)
)
return False
# Use 2 and Use 4
if tAbs >= self.missionFinishAbs: #
tmpcurrentTimeNorm = self.currentTimeNorm.copy()
t_added = (tAbs - self.currentTimeAbs).value * u.d
self.currentTimeNorm = (tAbs - self.missionStart).value * u.d
self.currentTimeAbs = tAbs
if addExoplanetObsTime: # Count time towards exoplanetObs Time
if (self.exoplanetObsTime + t_added) > (
self.missionLife.to("day") * self.missionPortion
):
self.vprint(
(
"Adding {} to current exoplanetObsTime ({:0.2f}) would "
"exceed (missionLife*missionPortion) = {:0.2f}"
).format(
t_added.to("day"),
self.exoplanetObsTime.to("day"),
self.missionLife.to("day") * self.missionPortion,
)
)
self.exoplanetObsTime = (
self.missionLife.to("day") * self.missionPortion
)
return False
self.exoplanetObsTime += (
self.missionLife.to("day") - tmpcurrentTimeNorm
) # Advances exoplanet time to end of mission time
else:
self.exoplanetObsTime += 0 * u.d
return True
# Use 1 and Use 3
if (
tAbs <= self.OBendTimes[self.OBnumber] + self.missionStart
): # The time to advance to does not leave the current OB
t_added = (tAbs - self.currentTimeAbs).value * u.d
self.currentTimeNorm = (tAbs - self.missionStart).to("day")
self.currentTimeAbs = tAbs
if addExoplanetObsTime: # count time towards exoplanet Obs Time
if (self.exoplanetObsTime + t_added) > (
self.missionLife.to("day") * self.missionPortion
):
self.vprint(
(
"Adding {} to current exoplanetObsTime ({:0.2f}) would "
"exceed (missionLife*missionPortion) = {:0.2f}"
).format(
t_added.to("day"),
self.exoplanetObsTime.to("day"),
self.missionLife.to("day") * self.missionPortion,
)
)
self.exoplanetObsTime = (
self.missionLife.to("day") * self.missionPortion
)
return False
else:
self.exoplanetObsTime += t_added
return True
else: # addExoplanetObsTime is False
self.exoplanetObsTime += 0 * u.d
return True
# Use 5 and 7 #extended to accomodate any current and future time between OBs
tNorm = (tAbs - self.missionStart).value * u.d
if np.any(
(tNorm <= self.OBstartTimes[1:]) & (tNorm >= self.OBendTimes[0:-1])
): # The tAbs is between end End of an OB and start of the Next OB
# Return OBnumber of End Index
endIndex = np.where(
(tNorm <= self.OBstartTimes[1:]) & (tNorm >= self.OBendTimes[0:-1])
)[0][0]
# self.OBendTimes[endIndex+1] - self.currentTimeNorm
# Time to be added to exoplanetObsTime from current OB
t_added = self.OBendTimes[self.OBnumber] - self.currentTimeNorm
for ind in np.arange(
self.OBnumber, endIndex
): # ,len(self.OBendTimes)): # Add time for all additional OB
t_added += self.OBendTimes[ind] - self.OBstartTimes[ind]
while self.OBnumber < endIndex + 1:
self.advancetToStartOfNextOB()
# self.OBnumber = endIndex + 1 # set OBnumber to correct Observing Block
# self.currentTimeNorm = self.OBstartTimes[self.OBnumber]
# Advance Time to start of next OB
# self.currentTimeAbs = self.OBstartTimes[self.OBnumber] + self.missionStart
# Advance Time to start of next OB
if addExoplanetObsTime: # Count time towards exoplanetObs Time
if (
self.exoplanetObsTime + t_added
> self.missionLife.to("day") * self.missionPortion
): # We can CANNOT allocate that time to exoplanetObsTime
self.vprint(
(
"Adding {} to current exoplanetObsTime ({:0.2f}) would "
"exceed (missionLife*missionPortion) = {:0.2f}"
).format(
t_added.to("day"),
self.exoplanetObsTime.to("day"),
self.missionLife.to("day") * self.missionPortion,
)
)
# This kind of failure is by design.
# It just means the mission has come to an end
self.vprint("Advancing to tAbs failed under Use Case 7")
self.exoplanetObsTime = (
self.missionLife.to("day") * self.missionPortion
)
return False
self.exoplanetObsTime += t_added
else:
self.exoplanetObsTime += 0 * u.d
return True
# Use 6 and 8 #extended to accomodate any current and future time inside OBs
# The tAbs is between start of a future OB and end of that OB
if np.any(
# fmt: off
(tNorm >= self.OBstartTimes[self.OBnumber:])
& (tNorm <= self.OBendTimes[self.OBnumber:])
# fmt: on
):
# fmt: off
endIndex = np.where(
(tNorm >= self.OBstartTimes[self.OBnumber:])
& (tNorm <= self.OBendTimes[self.OBnumber:])
)[0][
0
] # Return index of OB that tAbs will be inside of
# fmt: on
endIndex += self.OBnumber
t_added = 0 * u.d
if addExoplanetObsTime: # Count time towards exoplanetObs Time
t_added += (tAbs - self.currentTimeAbs).to("day")
for i in np.arange(self.OBnumber, endIndex):
# accumulate time to subtract (time not counted against Exoplanet
# Obs Time)
# Subtract the time between these OB from the t_added to
# exoplanetObsTime
index = self.OBnumber
t_added -= self.OBstartTimes[index + 1] - self.OBendTimes[index]
# Check if exoplanetObsTime would be exceeded
if (
self.exoplanetObsTime + t_added
> self.missionLife.to("day") * self.missionPortion
):
self.vprint(
(
"Adding {} to current exoplanetObsTime ({:0.2f}) would "
"exceed (missionLife*missionPortion) = {:0.2f}"
).format(
t_added.to("day"),
self.exoplanetObsTime.to("day"),
self.missionLife.to("day") * self.missionPortion,
)
)
# This kind of failure is by design.
# It just means the mission has come to an end
self.vprint("Advancing to tAbs failed under Use Case 8")
self.exoplanetObsTime = (
self.missionLife.to("day") * self.missionPortion
)
self.OBnumber = endIndex # set OBnumber to correct Observing Block
self.currentTimeNorm = (tAbs - self.missionStart).to(
"day"
) # Advance Time to start of next OB
self.currentTimeAbs = tAbs # Advance Time to start of next OB
return False
else:
self.exoplanetObsTime += t_added
else: # addExoplanetObsTime is False
self.exoplanetObsTime += 0 * u.d
self.OBnumber = endIndex # set OBnumber to correct Observing Block
self.currentTimeNorm = (tAbs - self.missionStart).to(
"day"
) # Advance Time to start of next OB
self.currentTimeAbs = tAbs # Advance Time to start of next OB
return True
# Generic error if there exists some use case that I have not encountered yet.
assert False, "No Use Case Found in AdvanceToAbsTime"
[docs]
def get_ObsDetectionMaxIntTime(
self, Obs, mode, currentTimeNorm=None, OBnumber=None
):
"""Tells you the maximum Detection Observation Integration Time you can pass
into observation_detection(X,intTime,X)
Args:
Obs (:ref:`Observatory`):
Observatory object
mode (dict):
Selected observing mode for detection
currentTimeNorm (astropy.unit.Quantity, optional):
Time since mission start
OBnumber (float, optional):
Current observing block
Returns:
tuple:
maxIntTimeOBendTime (astropy.units.Quantity):
The maximum integration time bounded by Observation Block end Time
maxIntTimeExoplanetObsTime (astropy.units.Quantity):
The maximum integration time bounded by exoplanetObsTime
maxIntTimeMissionLife (astropy.units.Quantity):
The maximum integration time bounded by MissionLife
"""
if OBnumber is None:
OBnumber = self.OBnumber
if currentTimeNorm is None:
currentTimeNorm = self.currentTimeNorm.copy()
OBendTime_d = self.OBendTimes[OBnumber].to_value(u.d)
currentTimeNorm_d = currentTimeNorm.to_value(u.d)
settling_oh_time_d = Obs.settlingTime.to_value(u.d) + mode["syst"][
"ohTime"
].to_value(u.d)
maxTimeOBendTime_d = OBendTime_d - currentTimeNorm_d
maxIntTimeOBendTime_d = (maxTimeOBendTime_d - settling_oh_time_d) / (
1.0 + mode["timeMultiplier"] - 1.0
)
maxTimeExoplanetObsTime_d = (
self.allocated_time_d - self.exoplanetObsTime.to_value(u.d)
)
maxIntTimeExoplanetObsTime_d = (
maxTimeExoplanetObsTime_d - settling_oh_time_d
) / (1.0 + mode["timeMultiplier"] - 1.0)
maxTimeMissionLife_d = self.missionLife_d - currentTimeNorm_d
maxIntTimeMissionLife_d = (maxTimeMissionLife_d - settling_oh_time_d) / (
1.0 + mode["timeMultiplier"] - 1.0
)
# Ensure all are positive or zero
if maxIntTimeOBendTime_d < 0.0:
maxIntTimeOBendTime_d = 0.0
if maxIntTimeExoplanetObsTime_d < 0.0:
maxIntTimeExoplanetObsTime_d = 0.0
if maxIntTimeMissionLife_d < 0.0:
maxIntTimeMissionLife_d = 0.0
return (
maxIntTimeOBendTime_d << u.d,
maxIntTimeExoplanetObsTime_d << u.d,
maxIntTimeMissionLife_d << u.d,
)