From 653e64d78290ca10e6e3a56458f37f474280d167 Mon Sep 17 00:00:00 2001 From: Noe Brucy Date: Mon, 14 Dec 2020 09:51:29 +0100 Subject: [PATCH] solve bug in custom pool multiprocessing --- aggregator.py | 11 +++++---- baseprocessor.py | 62 ++++++++++++++++++++++-------------------------- mypool.py | 38 +++-------------------------- 3 files changed, 38 insertions(+), 73 deletions(-) diff --git a/aggregator.py b/aggregator.py index 30ff8c4..02d3fec 100644 --- a/aggregator.py +++ b/aggregator.py @@ -1,3 +1,4 @@ +from mypool import MyPool from postprocessor import * @@ -34,10 +35,12 @@ class Aggregator: ) if self.pp_params.process.num_process > 1: - pool = Pool(processes=self.pp_params.process.num_process) - done = pool.map(map_fn, run_num) + pool = MyPool(processes=self.pp_params.process.num_process) + result = pool.map(map_fn, run_num) pool.close() pool.join() else: - done = map(map_fn, run_num) - self.just_done.extend([item for li in done for item in li]) + result = map(map_fn, run_num) + + if np.any([res is not None for res in result]): + self.just_done.append(dep) diff --git a/baseprocessor.py b/baseprocessor.py index cd543f7..ff86b1b 100644 --- a/baseprocessor.py +++ b/baseprocessor.py @@ -18,15 +18,20 @@ from pymses.sources.hop.file_formats import * from pymses.analysis import Camera, raytracing, slicing, splatting from pymses.filters import CellsToPoints from pymses.analysis import ScalarOperator, FractionOperator, MaxLevelOperator +from astrophysix.simdm import SimulationStudy +from astrophysix.simdm.experiment import ( + Simulation, + AppliedAlgorithm, + ParameterSetting, + ParameterVisibility, +) +from astrophysix.simdm.results import GenericResult, Snapshot +from ramses_astrophysix import ramses + import subprocess - -# import module_extract as me - -from mypool import MyPool as Pool from functools import partial from abc import ABCMeta, abstractmethod - from run_selector import * from units import * @@ -108,45 +113,30 @@ class BaseProcessor: print(self.log_id + string) def process( - self, - to_process_list, - args=[None], - overwrite=False, - overwrite_dep=False, - **kwargs + self, to_process, arg=None, overwrite=False, overwrite_dep=False, **kwargs ): """ - Render the data in to_process_list and save them + Process the rule `to_process` """ - if type(to_process_list) == str: - to_process_list = [to_process_list] - - if type(args) == str or args is None: - args = [args] - self.overwrite_dep = overwrite_dep - self.just_done = [] # Computations done within this call + self.just_done = [] - for name in to_process_list: - if name in self.rules: - rule = self.rules[name] - for arg in args: - self._solve_and_process_rule(name, rule, arg, overwrite, **kwargs) - else: - self._log( - "{} is unknown, allowed rules are {}".format( - name, self.rules.keys() - ), - "ERROR", - ) - - return self.just_done + if to_process in self.rules: + rule = self.rules[to_process] + return self._solve_and_process_rule( + to_process, rule, arg, overwrite, **kwargs + ) + else: + self._log( + "{} is unknown, allowed rules are {}".format(name, self.rules.keys()), + "ERROR", + ) def _solve_and_process_rule(self, name, rule, arg, overwrite=False, **kwargs): updated = self._solve_dependencies(name, rule, arg, overwrite, **kwargs) overwrite_rule = overwrite or updated - self._process_rule(name, rule, arg, overwrite_rule, **kwargs) + return self._process_rule(name, rule, arg, overwrite_rule, **kwargs) def _solve_dependencies(self, name, rule, arg, overwrite=False, **kwargs): @@ -154,6 +144,7 @@ class BaseProcessor: # Solve dependencies for dep in rule.dependencies: + # get arguments try: dep_arg = rule.dependencies[dep] except: @@ -162,6 +153,8 @@ class BaseProcessor: if dep_arg == "__parent__": dep_arg = arg + # Whether the processor solves its own dependencies or it gives + # it to a child processor if self.solve_self_dep and dep in self.rules: rule_dep = self.rules[dep] self._solve_and_process_rule(dep, rule_dep, dep_arg, self.overwrite_dep) @@ -191,6 +184,7 @@ class BaseProcessor: self._save_data(name_full, data, rule.description, rule.unit) self._log("Data for {} computed".format(name_full), "SUCCESS") self.just_done.append(name_full) + return data else: self._log( "Data for {} is already computed, skipping...".format(name_full) diff --git a/mypool.py b/mypool.py index 331784a..7d48fce 100644 --- a/mypool.py +++ b/mypool.py @@ -12,6 +12,9 @@ from random import randint class NoDaemonProcess(multiprocessing.Process): + def __init__(self, *args, **kwargs): + super(NoDaemonProcess, self).__init__(**kwargs) + # make 'daemon' attribute always return False def _get_daemon(self): return False @@ -26,38 +29,3 @@ class NoDaemonProcess(multiprocessing.Process): # because the latter is only a wrapper function, not a proper class. class MyPool(multiprocessing.pool.Pool): Process = NoDaemonProcess - - -def sleepawhile(t): - print("Sleeping %i seconds..." % t) - time.sleep(t) - return t - - -def work(num_procs): - print("Creating %i (daemon) workers and jobs in child." % num_procs) - pool = multiprocessing.Pool(num_procs) - - result = pool.map(sleepawhile, [randint(1, 5) for x in range(num_procs)]) - - # The following is not really needed, since the (daemon) workers of the - # child's pool are killed when the child is terminated, but it's good - # practice to cleanup after ourselves anyway. - pool.close() - pool.join() - return result - - -def test(): - print("Creating 5 (non-daemon) workers and jobs in main process.") - pool = MyPool(5) - - result = pool.map(work, [randint(1, 5) for x in range(5)]) - - pool.close() - pool.join() - print(result) - - -if __name__ == "__main__": - test()