[aggregator] factorize self.map, enable multiprocessing
Without multiprocessing : keep same pp With multiprocessing : recreate pp
This commit is contained in:
+35
-14
@@ -7,7 +7,7 @@ from mypool import MyPool
|
|||||||
from postprocessor import PostProcessor
|
from postprocessor import PostProcessor
|
||||||
|
|
||||||
|
|
||||||
def _map_rule(rule, arg, overwrite, path, path_out, pp_params, run_num):
|
def _map_aux(fun, path, path_out, pp_params, run_num, **kwargs):
|
||||||
try:
|
try:
|
||||||
pp = PostProcessor(
|
pp = PostProcessor(
|
||||||
path + "/" + run_num[0], run_num[1], path_out + "/" + run_num[0], pp_params
|
path + "/" + run_num[0], run_num[1], path_out + "/" + run_num[0], pp_params
|
||||||
@@ -15,35 +15,56 @@ def _map_rule(rule, arg, overwrite, path, path_out, pp_params, run_num):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(e)
|
print(e)
|
||||||
raise
|
raise
|
||||||
return pp.process(rule, arg, overwrite, overwrite)
|
return fun(pp, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
def _map_rule(pp, rule, arg, overwrite, overwrite_dep):
|
||||||
|
return pp.process(rule, arg, overwrite, overwrite_dep)
|
||||||
|
|
||||||
|
|
||||||
class Aggregator:
|
class Aggregator:
|
||||||
def get_pp_list(self):
|
def get_pp_list(self, select=None):
|
||||||
return [self.pp[run][num] for run in self.runs for num in self.nums[run]]
|
|
||||||
|
|
||||||
def map(self, func):
|
|
||||||
return [func(pp) for pp in self.get_pp_list()]
|
|
||||||
|
|
||||||
def _not_self_dep(self, name, dep, dep_arg, overwrite, select):
|
|
||||||
if select is not None:
|
if select is not None:
|
||||||
runs, nums = self.selector.select(**select)
|
runs, nums = self.selector.select(**select)
|
||||||
else:
|
else:
|
||||||
runs = self.runs
|
runs = self.runs
|
||||||
nums = self.nums
|
nums = self.nums
|
||||||
|
return [self.pp[run][num] for run in runs for num in nums[run]]
|
||||||
|
|
||||||
run_num = [(run, num) for run in runs for num in nums[run]]
|
def map(self, func, select=None, num_process=None, **kwargs):
|
||||||
|
|
||||||
|
pp_list = self.get_pp_list(select)
|
||||||
|
|
||||||
|
if num_process is None:
|
||||||
|
num_process = self.pp_params.process.num_process
|
||||||
|
|
||||||
|
if num_process == 1:
|
||||||
|
result = [func(pp, **kwargs) for pp in pp_list]
|
||||||
|
else:
|
||||||
|
run_num = [(pp.run, pp.num) for pp in pp_list]
|
||||||
map_fn = partial(
|
map_fn = partial(
|
||||||
_map_rule, dep, dep_arg, overwrite, self.path, self.path_out, self.pp_params
|
_map_aux, func, self.path, self.path_out, self.pp_params, **kwargs
|
||||||
)
|
)
|
||||||
|
|
||||||
if self.pp_params.process.num_process > 1:
|
pool = MyPool(processes=num_process)
|
||||||
pool = MyPool(processes=self.pp_params.process.num_process)
|
|
||||||
result = pool.map(map_fn, run_num)
|
result = pool.map(map_fn, run_num)
|
||||||
pool.close()
|
pool.close()
|
||||||
pool.join()
|
pool.join()
|
||||||
else:
|
|
||||||
result = map(map_fn, run_num)
|
return result
|
||||||
|
|
||||||
|
def _not_self_dep(self, name, dep, dep_arg, overwrite, select):
|
||||||
|
|
||||||
|
result = self.map(
|
||||||
|
_map_rule,
|
||||||
|
select,
|
||||||
|
None,
|
||||||
|
rule=dep,
|
||||||
|
arg=dep_arg,
|
||||||
|
overwrite=overwrite,
|
||||||
|
overwrite_dep=overwrite,
|
||||||
|
)
|
||||||
|
|
||||||
if np.any([res is not None for res in result]):
|
if np.any([res is not None for res in result]):
|
||||||
self.just_done.append(dep)
|
self.just_done.append(dep)
|
||||||
|
|||||||
Reference in New Issue
Block a user