Files
pipeline/aggregator.py
T

44 lines
1.2 KiB
Python

# coding: utf-8
import numpy as np
from functools import partial
from mypool import MyPool
from postprocessor import PostProcessor
def _map_rule(rule, arg, overwrite, path, path_out, pp_params, run_num):
try:
pp = PostProcessor(
path + "/" + run_num[0], run_num[1], path_out + "/" + run_num[0], pp_params
)
except Exception as e:
print(e)
raise
return pp.process(rule, arg, overwrite, overwrite)
class Aggregator:
def _not_self_dep(self, name, dep, dep_arg, overwrite, select):
if select is not None:
runs, nums = self.selector.select(**select)
else:
runs = self.runs
nums = self.nums
run_num = [(run, num) for run in runs for num in nums[run]]
map_fn = partial(
_map_rule, dep, dep_arg, overwrite, self.path, self.path_out, self.pp_params
)
if self.pp_params.process.num_process > 1:
pool = MyPool(processes=self.pp_params.process.num_process)
result = pool.map(map_fn, run_num)
pool.close()
pool.join()
else:
result = map(map_fn, run_num)
if np.any([res is not None for res in result]):
self.just_done.append(dep)