Files
pipeline/baseprocessor.py
T

474 lines
15 KiB
Python

# coding: utf-8
import sys
import os
import glob as glob
import copy
import time
import tables
from tables import HDF5ExtError
import pymses
import numpy as np
from numpy.polynomial.polynomial import polyfit
from scipy.stats import linregress
from pymses.sources.ramses import output
from pymses.sources.hop.file_formats import *
from pymses.analysis import Camera, raytracing, slicing, splatting
from pymses.filters import CellsToPoints
from pymses.analysis import ScalarOperator, FractionOperator, MaxLevelOperator
import subprocess
# import module_extract as me
from mypool import MyPool as Pool
from functools import partial
from abc import ABCMeta, abstractmethod
from run_selector import *
from units import *
class Rule:
def __init__(
self,
postproc,
process,
description="",
group="",
dependencies=[],
is_valid=lambda arg: True,
kind="classic",
unit=cst.none,
):
self.postproc = postproc
self.process_fn = process
self.dependencies = dependencies
self.is_valid_add = is_valid
self.group = group
self.description = description
self.unit = unit
self.kind = kind
def process(self, arg, **kwargs):
if not arg is None:
return self.process_fn(arg, **kwargs)
else:
return self.process_fn(**kwargs)
def is_valid(self, arg):
# save = self.postproc.save
# valid = True
# for dep in self.dependencies:
# if dep in self.postproc.rules:
# rule_dep = self.postproc.rules[dep]
# if not arg is None:
# valid = valid and rule_dep.group + '/' + dep + '_' + str(arg) in save
# else:
# valid = valid and rule_dep.group + '/' + dep in save
# return valid and self.is_valid_add(arg)
return self.is_valid_add(arg)
class BaseProcessor:
"""
Base class for processors, should not be instanciated
"""
__metaclass__ = ABCMeta
log_id = ""
rules = {}
solve_self_dep = True
def __init__(self, path, path_out=None, pp_params=None, tag=None):
if pp_params is None:
self.pp_params = default_params()
elif type(pp_params) == str:
self.pp_params = load_params(pp_params)
else:
self.pp_params = copy.deepcopy(pp_params)
if tag is not None:
self.pp_params.out.tag = tag
# Determining output directory
if path_out is None:
self.path_out = path
else:
self.path_out = path_out
def _log(self, string, status=""):
if self.pp_params.process.verbose:
if len(status) > 0:
print(status + ": " + self.log_id + string)
else:
print(self.log_id + string)
def process(
self,
to_process_list,
args=[None],
overwrite=False,
overwrite_dep=False,
**kwargs
):
"""
Render the data in to_process_list and save them
"""
if type(to_process_list) == str:
to_process_list = [to_process_list]
if type(args) == str or args is None:
args = [args]
self.overwrite_dep = overwrite_dep
self.just_done = [] # Computations done within this call
for name in to_process_list:
if name in self.rules:
rule = self.rules[name]
for arg in args:
self._solve_and_process_rule(name, rule, arg, overwrite, **kwargs)
else:
self._log(
"{} is unknown, allowed rules are {}".format(
name, self.rules.keys()
),
"ERROR",
)
return self.just_done
def _solve_and_process_rule(self, name, rule, arg, overwrite=False, **kwargs):
updated = self._solve_dependencies(name, rule, arg, overwrite, **kwargs)
overwrite_rule = overwrite or updated
self._process_rule(name, rule, arg, overwrite_rule, **kwargs)
def _solve_dependencies(self, name, rule, arg, overwrite=False, **kwargs):
self.done_before_dep = len(self.just_done)
# Solve dependencies
for dep in rule.dependencies:
try:
dep_arg = rule.dependencies[dep]
except:
dep_arg = arg
if dep_arg == "__parent__":
dep_arg = arg
if self.solve_self_dep and dep in self.rules:
rule_dep = self.rules[dep]
self._solve_and_process_rule(dep, rule_dep, dep_arg, self.overwrite_dep)
else:
self._not_self_dep(name, dep, dep_arg, self.overwrite_dep, **kwargs)
# Whether dependencies where updated
return len(self.just_done) > self.done_before_dep
def _not_self_dep(self, name, dep, dep_arg, overwrite, **kwargs):
self._log("Dependency {} for {} is unknown".format(dep, name), "ERROR")
def _needs_computation(self, overwrite, name_full):
return overwrite
def _process_rule(self, name, rule, arg, overwrite=False, **kwargs):
if not arg is None:
name_full = rule.group + "/" + name + "_" + str(arg)
else:
name_full = rule.group + "/" + name
if rule.is_valid(arg):
if not name_full in self.just_done:
if self._needs_computation(overwrite, name_full):
self._log("Processing {}".format(name_full))
data = rule.process(arg, **kwargs)
self._save_data(name_full, data, rule.description, rule.unit)
self._log("Data for {} computed".format(name_full), "SUCCESS")
self.just_done.append(name_full)
else:
self._log(
"Data for {} is already computed, skipping...".format(name_full)
)
else:
self._log("{} is not valid in this context".format(name_full), "ERROR")
def def_rules(self):
for rule in self.rules:
func = partial(self.process, rule)
func.__doc__ = self.rules[rule].description
setattr(self, rule, func)
class HDF5Container(BaseProcessor):
filename = ""
save = None
opened = False
def open(self):
if not self.opened:
try:
self.save = tables.open_file(self.filename, mode="a")
except HDF5ExtError:
# Wait a bit if the lock was not still released
time.sleep(3)
self.save = tables.open_file(self.filename, mode="a")
self.opened = True
def close(self):
if self.opened:
self.save.close()
self.opened = False
def _needs_computation(self, overwrite, name_full):
return overwrite or not (name_full in self.save)
def _process_rule(self, name, rule, arg, overwrite, **kwargs):
self.open()
try:
super(HDF5Container, self)._process_rule(
name, rule, arg, overwrite, **kwargs
)
finally:
self.close()
def get_value(self, node_name, unit=None, unit_old=None):
self.open()
try:
node = self.save.get_node(node_name)
if "unit" in node._v_attrs:
unit_old = node._v_attrs.unit
if node._v_attrs.CLASS == "GROUP":
value = {}
for child_name in node._v_children:
value[child_name] = self.get_value(
node_name + "/" + child_name, unit, unit_old
)
else:
value = node.read()
if not (unit is None or unit_old is None or unit_old == cst.none):
value = value * unit_old.express(unit)
finally:
self.close()
return value
def _get_units(self, unit, data=None):
"""
Get real units from info files
unit is either:
1. An instance of cst.Unit (pymses unit class)
2. A string beginning by "unit_", referring to a code unit,
available in self.info
3. A dict {unit1 : exp1, unit2: exp2, ...} with unitX as 2.
and expX a float, referring to the compound unit
unit1**exp1 * unit2**exp2
4. A dict {key: unit, ...} where key is a field name (eg. 'time', or 'mass')
and unit the corresponding unit (on one on the above format)
Returns:
1-3. : a cst.Unit instance
4. : a dict {key: unit, ...} with same key as input and unit being cst.Unit instances
"""
if isinstance(unit, cst.Unit):
return unit
if isinstance(unit, str) and unit[:5] == "unit_":
res = self.info[unit]
if unit == "unit_length":
res = res / self.info["boxlen"]
return res
if list(unit)[0][:5] == "unit_":
new_unit = cst.none
for base_unit_str in unit:
expo = unit[base_unit_str]
base_unit = self._get_units(base_unit_str)
new_unit = new_unit * base_unit ** expo
return new_unit
if (not data is None) and isinstance(data, dict) and list(unit)[0] in data:
for key in unit:
unit[key] = self._get_units(unit[key])
return unit
else:
raise ValueError("Invalid unit")
def _save_data(self, name_full, data, description, unit):
"""
Save data in the HDF5 structure, overwrite if necessary
"""
unit = self._get_units(unit, data=data)
if name_full in self.save:
self.save.remove_node(name_full, recursive=True)
attrs = None
if isinstance(data, tuple):
attrs = data[1]
data = data[0]
if isinstance(data, dict):
if type(description) == str:
self.save.create_group(
os.path.dirname(name_full),
os.path.basename(name_full),
description,
createparents=True,
)
else:
self.save.create_group(
os.path.dirname(name_full),
os.path.basename(name_full),
"",
createparents=True,
)
if not isinstance(unit, dict):
self.save.get_node(name_full)._v_attrs.unit = unit
for key in data:
key = str(key)
if isinstance(description, dict):
if isinstance(unit, dict):
self._save_data(
name_full + "/" + key,
data[key],
description[key],
unit[key],
)
else:
self._save_data(
name_full + "/" + key, data[key], description[key], unit
)
else:
if isinstance(unit, dict):
self._save_data(name_full + "/" + key, data[key], "", unit[key])
else:
self._save_data(name_full + "/" + key, data[key], "", unit)
else:
self.save.create_array(
os.path.dirname(name_full),
os.path.basename(name_full),
data,
description,
createparents=True,
)
self.save.get_node(name_full).attrs.unit = unit
if not attrs is None:
for key in attrs:
key = str(key)
self.save.get_node(name_full)._v_attrs[key] = attrs[key]
def set_value(self, node_name, data, description, unit):
self.open()
try:
self._save_data(node_name, data, description, unit)
finally:
self.close()
def get_attribute(self, node_name, attr_name):
self.open()
try:
node = self.save.get_node(node_name)
attr = node._v_attrs[attr_name]
finally:
self.close()
return attr
def set_attribute(self, node_name, attr_name, attr_value):
self.open()
try:
node = self.save.get_node(node_name)
node._v_attrs[attr_name] = attr_value
finally:
self.close()
def _transform(self, name, transform_fn, group="/maps", **kwargs):
src = self.save.get_node(group + "/" + name).read()
return transform_fn(src, **kwargs)
def _gen_rule_transform(
self,
rule_src_name,
transform_fn,
transform_name,
subarray_name=None,
group=None,
):
rule_src = self.rules[rule_src_name]
if subarray_name is None:
src_name = rule_src_name
group_src = rule_src.group
unit = rule_src.unit
description = rule_src.description
else:
src_name = subarray_name
group_src = rule_src.group + "/" + rule_src_name
unit = rule_src.unit[subarray_name]
description = rule_src.description[subarray_name]
def fn(arg=None, **kwargs):
if arg is None:
return self._transform(
src_name, transform_fn, group=group_src, **kwargs
)
else:
return self._transform(
src_name + "_" + str(arg), transform_fn, group=group_src, **kwargs
)
if group is None:
group = group_src
name = transform_name + "_" + rule_src_name
def apply(
self,
fn,
name_array_in,
name_array_out,
unit_out=cst.none,
description="",
recursive=True,
):
array_in = self.get_value(name_array_in)
if recursive and isinstance(array_in, dict):
for key in array_in:
self.apply(
fn,
name_array_in + "/" + key,
name_array_out + "/" + key,
unit_out,
description,
)
self.set_attribute(name_array_out, "unit", unit_out)
else:
try:
array_out = fn(array_in)
except TypeError:
array_out = array_in
self.set_value(name_array_out, array_out, description, unit_out)
def simple_getter(name, dset):
return dset[name]
def vect_getter(name, i, dset):
return dset[name][:, i]
def norm_getter(name, dset):
return np.sqrt(np.sum(dset[name] ** 2, axis=1))