Source code for EXOSIMS.SurveyEnsemble.IPClusterEnsemble

from ipyparallel import Client
from EXOSIMS.Prototypes.SurveyEnsemble import SurveyEnsemble
import time
from IPython.display import clear_output
import sys
import os
import numpy as np
import os.path
import subprocess


[docs] class IPClusterEnsemble(SurveyEnsemble): """Parallelized suvey ensemble based on IPython parallel (ipcluster)""" def __init__(self, **specs): SurveyEnsemble.__init__(self, **specs) self.verb = specs.get("verbose", True) # access the cluster self.rc = Client() self.dview = self.rc[:] self.dview.block = True with self.dview.sync_imports(): import EXOSIMS, EXOSIMS.util.get_module, os, os.path, time, random, pickle, traceback, numpy # noqa: E401, F401, E501 if "logger" in specs: specs.pop("logger") if "seed" in specs: specs.pop("seed") self.dview.push(dict(specs=specs)) self.vprint("Building SurveySimulation object on all workers.") _ = self.dview.execute( "SS = EXOSIMS.util.get_module.get_module(specs['modules'] \ ['SurveySimulation'], 'SurveySimulation')(**specs)" ) _ = self.dview.execute("SS.reset_sim()") self.vprint( "Created SurveySimulation objects on %d engines." % len(self.rc.ids) ) # for row in res.stdout: # self.vprint(row) self.lview = self.rc.load_balanced_view() self.maxNumEngines = len(self.rc.ids)
[docs] def run_ensemble( self, sim, nb_run_sim, run_one=None, genNewPlanets=True, rewindPlanets=True, kwargs={}, ): """ Execute simulation ensemble Args: sim (:py:class:`EXOSIMS.MissionSim`): MissionSim object nb_run_sim (int): number of simulations to run run_one (callable): method to call for each simulation genNewPlanets (bool): Generate new planets each for simulation. Defaults True. rewindPlanets (bool): Reset planets to initial mean anomaly for each simulation. Defaults True kwargs (dict): Keyword arguments to pass onwards (not used in prototype) Returns: list(dict): List of dictionaries of mission results """ hangingRunsOccured = False # keeps track of whether hanging runs have occured t1 = time.time() async_res = [] for j in range(nb_run_sim): ar = self.lview.apply_async( run_one, genNewPlanets=genNewPlanets, rewindPlanets=rewindPlanets, **kwargs, ) async_res.append(ar) print("Submitted %d tasks." % len(async_res)) engine_pids = self.rc[:].apply(os.getpid).get_dict() # ar2 = self.lview.apply_async(os.getpid) # pids = ar2.get_dict() print("engine_pids") print(engine_pids) runStartTime = time.time() # create job starting time avg_time_per_run = 0.0 tmplenoutstandingset = nb_run_sim tLastRunFinished = time.time() ar = self.rc._asyncresult_from_jobs(async_res) while not ar.ready(): ar.wait(10.0) clear_output(wait=True) if ar.progress > 0: timeleft = ar.elapsed / ar.progress * (nb_run_sim - ar.progress) if timeleft > 3600.0: timeleftstr = "%2.2f hours" % (timeleft / 3600.0) elif timeleft > 60.0: timeleftstr = "%2.2f minutes" % (timeleft / 60.0) else: timeleftstr = "%2.2f seconds" % timeleft else: timeleftstr = "who knows" # Terminate hanging runs # a set of msg_ids that have been submitted but resunts have not # been received outstandingset = self.rc.outstanding # there is at least 1 run still going and we have not just started if len(outstandingset) > 0 and len(outstandingset) < nb_run_sim: # compute average amount of time per run avg_time_per_run = (time.time() - runStartTime) / float( nb_run_sim - len(outstandingset) ) # The scheduler has finished a run if len(outstandingset) < tmplenoutstandingset: # update this. should decrease by ~1 or number of cores... tmplenoutstandingset = len(outstandingset) # update tLastRunFinished to the last time a simulation finished # (right now) tLastRunFinished = time.time() if ( time.time() - tLastRunFinished > avg_time_per_run * (1.0 + self.maxNumEngines * 2.0) * 4.0 ): # nb_run_sim = len(self.rc.outstanding) # restartRuns = True self.vprint( "Aborting " + str(len(self.rc.outstanding)) + "qty outstandingset jobs" ) # runningPIDS = os.listdir('/proc') # get all running pids self.vprint("queue_status") self.vprint(str(self.rc.queue_status())) self.rc.abort() ar.wait(20) # runningPIDS = [ # int(tpid) for tpid in os.listdir("/proc") if tpid.isdigit() # ] # [self.rc.queue_status()[eind] for # eind in np.arange(self.maxNumEngines) if # self.rc.queue_status()[eind]['tasks']>0] for engineInd in [ eind for eind in np.arange(self.maxNumEngines) if self.rc.queue_status()[eind]["tasks"] > 0 ]: os.kill(engine_pids[engineInd], 15) time.sleep(20) # for pid in [engine_pids[eind] for eind in # np.arange(len(engine_pids))]: # if pid in runningPIDS: # os.kill(pid,9) # send kill command to stop this worker stopIPClusterCommand = subprocess.Popen(["ipcluster", "stop"]) stopIPClusterCommand.wait() time.sleep( 60 ) # doing this instead of waiting for ipcluster to terminate stopIPClusterCommand = subprocess.Popen(["ipcluster", "stop"]) stopIPClusterCommand.wait() time.sleep( 60 ) # doing this instead of waiting for ipcluster to terminate hangingRunsOccured = ( True # keeps track of whether hanging runs have occured ) break # stopIPClusterCommand.wait() # waits for process to terminate # call(["ipcluster","stop"]) # send command to stop ipcluster # self.rc.abort(jobs=self.rc.outstanding.copy().pop()) # self.rc.abort() # by default should abort all outstanding jobs... # it is possible that this will not stop the jobs running # ar.wait(100) # self.rc.purge_everything() # purge all results if outstanding *because rc.abort() # didn't seem to do the job right # update tLastRunFinished to the last time a simulation was # restarted (right now) tLastRunFinished = time.time() print( "%4i/%i tasks finished after %4i s. About %s to go." % (ar.progress, nb_run_sim, ar.elapsed, timeleftstr), end="", ) sys.stdout.flush() # numRunStarts += 1 # increment number of run restarts t2 = time.time() print("\nCompleted in %d sec" % (t2 - t1)) if hangingRunsOccured: # hanging runs have occured res = [1] else: res = [ar.get() for ar in async_res] return res