47 lines
1.4 KiB
Python
47 lines
1.4 KiB
Python
from mypool import MyPool
|
|
from postprocessor import *
|
|
|
|
|
|
def _map_rule(rule, arg, overwrite, path, path_out, pp_params, run_num):
|
|
try:
|
|
pp = PostProcessor(
|
|
path + "/" + run_num[0], run_num[1], path_out + "/" + run_num[0], pp_params
|
|
)
|
|
except Exception as e:
|
|
print(e)
|
|
raise
|
|
return pp.process(rule, arg, overwrite, overwrite)
|
|
|
|
|
|
class Aggregator:
|
|
def _not_self_dep(self, name, dep, dep_arg, overwrite, **kwargs):
|
|
if "select" in kwargs:
|
|
select = kwargs["select"]
|
|
runs, nums = self.selector.select(**select)
|
|
elif "runs" in kwargs:
|
|
runs = kwargs["runs"]
|
|
if isinstance(runs, RunSelector):
|
|
nums = runs.nums
|
|
runs = runs.runs
|
|
else:
|
|
nums = self.nums
|
|
else:
|
|
runs = self.runs
|
|
nums = self.nums
|
|
|
|
run_num = [(run, num) for run in runs for num in nums[run]]
|
|
map_fn = partial(
|
|
_map_rule, dep, dep_arg, overwrite, self.path, self.path_out, self.pp_params
|
|
)
|
|
|
|
if self.pp_params.process.num_process > 1:
|
|
pool = MyPool(processes=self.pp_params.process.num_process)
|
|
result = pool.map(map_fn, run_num)
|
|
pool.close()
|
|
pool.join()
|
|
else:
|
|
result = map(map_fn, run_num)
|
|
|
|
if np.any([res is not None for res in result]):
|
|
self.just_done.append(dep)
|