solve bug in custom pool multiprocessing
This commit is contained in:
+7
-4
@@ -1,3 +1,4 @@
|
||||
from mypool import MyPool
|
||||
from postprocessor import *
|
||||
|
||||
|
||||
@@ -34,10 +35,12 @@ class Aggregator:
|
||||
)
|
||||
|
||||
if self.pp_params.process.num_process > 1:
|
||||
pool = Pool(processes=self.pp_params.process.num_process)
|
||||
done = pool.map(map_fn, run_num)
|
||||
pool = MyPool(processes=self.pp_params.process.num_process)
|
||||
result = pool.map(map_fn, run_num)
|
||||
pool.close()
|
||||
pool.join()
|
||||
else:
|
||||
done = map(map_fn, run_num)
|
||||
self.just_done.extend([item for li in done for item in li])
|
||||
result = map(map_fn, run_num)
|
||||
|
||||
if np.any([res is not None for res in result]):
|
||||
self.just_done.append(dep)
|
||||
|
||||
+28
-34
@@ -18,15 +18,20 @@ from pymses.sources.hop.file_formats import *
|
||||
from pymses.analysis import Camera, raytracing, slicing, splatting
|
||||
from pymses.filters import CellsToPoints
|
||||
from pymses.analysis import ScalarOperator, FractionOperator, MaxLevelOperator
|
||||
from astrophysix.simdm import SimulationStudy
|
||||
from astrophysix.simdm.experiment import (
|
||||
Simulation,
|
||||
AppliedAlgorithm,
|
||||
ParameterSetting,
|
||||
ParameterVisibility,
|
||||
)
|
||||
from astrophysix.simdm.results import GenericResult, Snapshot
|
||||
from ramses_astrophysix import ramses
|
||||
|
||||
import subprocess
|
||||
|
||||
# import module_extract as me
|
||||
|
||||
from mypool import MyPool as Pool
|
||||
from functools import partial
|
||||
from abc import ABCMeta, abstractmethod
|
||||
|
||||
|
||||
from run_selector import *
|
||||
from units import *
|
||||
|
||||
@@ -108,45 +113,30 @@ class BaseProcessor:
|
||||
print(self.log_id + string)
|
||||
|
||||
def process(
|
||||
self,
|
||||
to_process_list,
|
||||
args=[None],
|
||||
overwrite=False,
|
||||
overwrite_dep=False,
|
||||
**kwargs
|
||||
self, to_process, arg=None, overwrite=False, overwrite_dep=False, **kwargs
|
||||
):
|
||||
"""
|
||||
Render the data in to_process_list and save them
|
||||
Process the rule `to_process`
|
||||
"""
|
||||
|
||||
if type(to_process_list) == str:
|
||||
to_process_list = [to_process_list]
|
||||
|
||||
if type(args) == str or args is None:
|
||||
args = [args]
|
||||
|
||||
self.overwrite_dep = overwrite_dep
|
||||
self.just_done = [] # Computations done within this call
|
||||
self.just_done = []
|
||||
|
||||
for name in to_process_list:
|
||||
if name in self.rules:
|
||||
rule = self.rules[name]
|
||||
for arg in args:
|
||||
self._solve_and_process_rule(name, rule, arg, overwrite, **kwargs)
|
||||
else:
|
||||
self._log(
|
||||
"{} is unknown, allowed rules are {}".format(
|
||||
name, self.rules.keys()
|
||||
),
|
||||
"ERROR",
|
||||
)
|
||||
|
||||
return self.just_done
|
||||
if to_process in self.rules:
|
||||
rule = self.rules[to_process]
|
||||
return self._solve_and_process_rule(
|
||||
to_process, rule, arg, overwrite, **kwargs
|
||||
)
|
||||
else:
|
||||
self._log(
|
||||
"{} is unknown, allowed rules are {}".format(name, self.rules.keys()),
|
||||
"ERROR",
|
||||
)
|
||||
|
||||
def _solve_and_process_rule(self, name, rule, arg, overwrite=False, **kwargs):
|
||||
updated = self._solve_dependencies(name, rule, arg, overwrite, **kwargs)
|
||||
overwrite_rule = overwrite or updated
|
||||
self._process_rule(name, rule, arg, overwrite_rule, **kwargs)
|
||||
return self._process_rule(name, rule, arg, overwrite_rule, **kwargs)
|
||||
|
||||
def _solve_dependencies(self, name, rule, arg, overwrite=False, **kwargs):
|
||||
|
||||
@@ -154,6 +144,7 @@ class BaseProcessor:
|
||||
|
||||
# Solve dependencies
|
||||
for dep in rule.dependencies:
|
||||
# get arguments
|
||||
try:
|
||||
dep_arg = rule.dependencies[dep]
|
||||
except:
|
||||
@@ -162,6 +153,8 @@ class BaseProcessor:
|
||||
if dep_arg == "__parent__":
|
||||
dep_arg = arg
|
||||
|
||||
# Whether the processor solves its own dependencies or it gives
|
||||
# it to a child processor
|
||||
if self.solve_self_dep and dep in self.rules:
|
||||
rule_dep = self.rules[dep]
|
||||
self._solve_and_process_rule(dep, rule_dep, dep_arg, self.overwrite_dep)
|
||||
@@ -191,6 +184,7 @@ class BaseProcessor:
|
||||
self._save_data(name_full, data, rule.description, rule.unit)
|
||||
self._log("Data for {} computed".format(name_full), "SUCCESS")
|
||||
self.just_done.append(name_full)
|
||||
return data
|
||||
else:
|
||||
self._log(
|
||||
"Data for {} is already computed, skipping...".format(name_full)
|
||||
|
||||
@@ -12,6 +12,9 @@ from random import randint
|
||||
|
||||
|
||||
class NoDaemonProcess(multiprocessing.Process):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(NoDaemonProcess, self).__init__(**kwargs)
|
||||
|
||||
# make 'daemon' attribute always return False
|
||||
def _get_daemon(self):
|
||||
return False
|
||||
@@ -26,38 +29,3 @@ class NoDaemonProcess(multiprocessing.Process):
|
||||
# because the latter is only a wrapper function, not a proper class.
|
||||
class MyPool(multiprocessing.pool.Pool):
|
||||
Process = NoDaemonProcess
|
||||
|
||||
|
||||
def sleepawhile(t):
|
||||
print("Sleeping %i seconds..." % t)
|
||||
time.sleep(t)
|
||||
return t
|
||||
|
||||
|
||||
def work(num_procs):
|
||||
print("Creating %i (daemon) workers and jobs in child." % num_procs)
|
||||
pool = multiprocessing.Pool(num_procs)
|
||||
|
||||
result = pool.map(sleepawhile, [randint(1, 5) for x in range(num_procs)])
|
||||
|
||||
# The following is not really needed, since the (daemon) workers of the
|
||||
# child's pool are killed when the child is terminated, but it's good
|
||||
# practice to cleanup after ourselves anyway.
|
||||
pool.close()
|
||||
pool.join()
|
||||
return result
|
||||
|
||||
|
||||
def test():
|
||||
print("Creating 5 (non-daemon) workers and jobs in main process.")
|
||||
pool = MyPool(5)
|
||||
|
||||
result = pool.map(work, [randint(1, 5) for x in range(5)])
|
||||
|
||||
pool.close()
|
||||
pool.join()
|
||||
print(result)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test()
|
||||
|
||||
Reference in New Issue
Block a user