diff --git a/aggregator.py b/aggregator.py new file mode 100644 index 0000000..352de4b --- /dev/null +++ b/aggregator.py @@ -0,0 +1,33 @@ +from postprocessor import * + + +def _map_rule(rule, arg, overwrite, path, path_out, pp_params, run_num): + try: + pp = PostProcessor( + path + "/" + run_num[0], run_num[1], path_out + "/" + run_num[0], pp_params + ) + except Exception as e: + print(e) + return pp.process(rule, arg, overwrite) + + +class Aggregator: + def _not_self_dep(self, name, dep, dep_arg, overwrite, **kwargs): + if "runs" in kwargs: + dep_runs = [run for run in self.runs if run in kwargs["runs"]] + else: + dep_runs = self.runs + + run_num = [(run, num) for run in dep_runs for num in self.nums[run]] + map_fn = partial( + _map_rule, dep, dep_arg, overwrite, self.path, self.path_out, self.pp_params + ) + + if self.pp_params.process.num_process > 1: + pool = Pool(processes=self.pp_params.process.num_process) + done = pool.map(map_fn, run_num) + pool.close() + pool.join() + else: + done = map(map_fn, run_num) + self.just_done.extend([item for li in done for item in li]) diff --git a/baseprocessor.py b/baseprocessor.py new file mode 100644 index 0000000..4800ca5 --- /dev/null +++ b/baseprocessor.py @@ -0,0 +1,376 @@ +f # coding: utf-8 + +import sys +import os +import glob as glob + + +import tables +import pymses +import numpy as np +from numpy.polynomial.polynomial import polyfit +from scipy.stats import linregress + +from pymses.sources.ramses import output +from pymses.sources.hop.file_formats import * +from pymses.analysis import Camera, raytracing, slicing, splatting +from pymses.filters import CellsToPoints +from pymses.analysis import ScalarOperator, FractionOperator, MaxLevelOperator +import subprocess + +import module_extract as me + +from mypool import MyPool as Pool +from functools import partial +from abc import ABCMeta, abstractmethod +import bunch + +from run_selector import * +from units import * + + +class Rule: + def __init__( + self, + postproc, + process, + description="", + group="", + dependencies=[], + is_valid=lambda arg: True, + kind="classic", + unit=cst.none, + ): + self.postproc = postproc + self.process_fn = process + self.dependencies = dependencies + self.is_valid_add = is_valid + self.group = group + self.description = description + self.unit = unit + self.kind = kind + + def process(self, arg, **kwargs): + if not arg is None: + return self.process_fn(arg, **kwargs) + else: + return self.process_fn(**kwargs) + + def is_valid(self, arg): + # save = self.postproc.save + # valid = True + # for dep in self.dependencies: + # if dep in self.postproc.rules: + # rule_dep = self.postproc.rules[dep] + # if not arg is None: + # valid = valid and rule_dep.group + '/' + dep + '_' + str(arg) in save + # else: + # valid = valid and rule_dep.group + '/' + dep in save + # return valid and self.is_valid_add(arg) + return self.is_valid_add(arg) + + +class BaseProcessor: + """ + Base class for processors, should not be instanciated + """ + + __metaclass__ = ABCMeta + + log_id = "" + rules = {} + solve_self_dep = True + + def __init__(self, path, path_out=None, pp_params=None, tag=None): + if pp_params is None: + self.pp_params = default_params() + elif type(pp_params) == str: + self.pp_params = load_params(pp_params) + else: + self.pp_params = pp_params + + if tag is not None: + self.pp_params.out.tag = tag + + # Determining output directory + if path_out is None: + self.path_out = path + else: + self.path_out = path_out + + def _log(self, string, status=""): + if self.pp_params.process.verbose: + if len(status) > 0: + print(status + ": " + self.log_id + string) + else: + print(self.log_id + string) + + def process( + self, + to_process_list, + args=[None], + overwrite=False, + overwrite_dep=False, + **kwargs + ): + """ + Render the data in to_process_list and save them + """ + + if type(to_process_list) == str: + to_process_list = [to_process_list] + + if type(args) == str or args is None: + args = [args] + + self.overwrite_dep = overwrite_dep + self.just_done = [] # Computations done within this call + + for name in to_process_list: + if name in self.rules: + rule = self.rules[name] + for arg in args: + self._solve_and_process_rule(name, rule, arg, overwrite, **kwargs) + else: + self._log( + "{} is unknown, allowed rules are {}".format( + name, self.rules.keys() + ), + "ERROR", + ) + + return self.just_done + + def _solve_and_process_rule(self, name, rule, arg, overwrite=False, **kwargs): + updated = self._solve_dependencies(name, rule, arg, overwrite, **kwargs) + overwrite_rule = overwrite or updated + self._process_rule(name, rule, arg, overwrite_rule, **kwargs) + + def _solve_dependencies(self, name, rule, arg, overwrite=False, **kwargs): + + self.done_before_dep = len(self.just_done) + + # Solve dependencies + for dep in rule.dependencies: + try: + dep_arg = rule.dependencies[dep] + except: + dep_arg = arg + + if dep_arg == "__parent__": + dep_arg = arg + + if self.solve_self_dep and dep in self.rules: + rule_dep = self.rules[dep] + self._solve_and_process_rule(dep, rule_dep, dep_arg, self.overwrite_dep) + else: + self._not_self_dep(name, dep, dep_arg, self.overwrite_dep, **kwargs) + + # Whether dependencies where updated + return len(self.just_done) > self.done_before_dep + + def _not_self_dep(self, name, dep, dep_arg, overwrite, **kwargs): + self._log("Dependency {} for {} is unknown".format(dep, name), "ERROR") + + def _needs_computation(self, overwrite, name_full): + return overwrite + + def _process_rule(self, name, rule, arg, overwrite=False, **kwargs): + if not arg is None: + name_full = rule.group + "/" + name + "_" + str(arg) + else: + name_full = rule.group + "/" + name + + if rule.is_valid(arg): + if not name_full in self.just_done: + if self._needs_computation(overwrite, name_full): + self._log("Processing {}".format(name_full)) + data = rule.process(arg, **kwargs) + self._save_data(name_full, data, rule.description, rule.unit) + self._log("Data for {} computed".format(name_full), "SUCCESS") + self.just_done.append(name_full) + else: + self._log( + "Data for {} is already computed, skipping...".format(name_full) + ) + else: + self._log("{} is not valid in this context".format(name_full), "ERROR") + + def def_rules(self): + for rule in self.rules: + setattr(self, rule, partial(self.process, rule)) + + +class HDF5Container(BaseProcessor): + + filename = "" + save = None + opened = False + + def open(self): + if not self.opened: + self.save = tables.open_file(self.filename, mode="a") + self.opened = True + + def close(self): + if self.opened: + self.save.close() + self.opened = False + + def _needs_computation(self, overwrite, name_full): + return overwrite or not (name_full in self.save) + + def _process_rule(self, name, rule, arg, overwrite, **kwargs): + self.open() + try: + super(HDF5Container, self)._process_rule( + name, rule, arg, overwrite, **kwargs + ) + finally: + self.close() + + def get_value(self, node_name): + self.open() + try: + node = self.save.get_node(node_name) + if node._v_attrs.CLASS == "GROUP": + value = {} + for child_name in node._v_children: + value[child_name] = self.get_value(node_name + "/" + child_name) + else: + value = node.read() + finally: + self.close() + return value + + def _save_data(self, name_full, data, description, unit): + """ + Save data in the HDF5 structure, overwrite if necessary + """ + if name_full in self.save: + self.save.remove_node(name_full, recursive=True) + + attrs = None + if isinstance(data, tuple): + attrs = data[1] + data = data[0] + + if isinstance(data, dict): + if type(description) == str: + self.save.create_group( + os.path.dirname(name_full), + os.path.basename(name_full), + description, + createparents=True, + ) + else: + self.save.create_group( + os.path.dirname(name_full), + os.path.basename(name_full), + "", + createparents=True, + ) + + if not isinstance(unit, dict): + self.save.get_node(name_full)._v_attrs.unit = unit + + for key in data: + if isinstance(description, dict): + if isinstance(unit, dict): + self._save_data( + name_full + "/" + key, + data[key], + description[key], + unit[key], + ) + else: + self._save_data( + name_full + "/" + key, data[key], description[key], unit + ) + else: + if isinstance(unit, dict): + self._save_data(name_full + "/" + key, data[key], "", unit[key]) + else: + self._save_data(name_full + "/" + key, data[key], "", unit) + else: + self.save.create_array( + os.path.dirname(name_full), + os.path.basename(name_full), + data, + description, + createparents=True, + ) + self.save.get_node(name_full).attrs.unit = unit + + if not attrs is None: + self.save.get_node(name_full).attrs.update(attrs) + + def set_value(self, node_name, data, description, unit): + self.open() + try: + self._save_data(node_name, data, description, unit) + finally: + self.close() + + def get_attribute(self, node_name, attr_name): + self.open() + try: + node = self.save.get_node(node_name) + attr = node._v_attrs[attr_name] + finally: + self.close() + return attr + + def _transform(self, name, transform_fn, group="/maps", **kwargs): + src = self.save.get_node(group + "/" + name).read() + return transform_fn(src, **kwargs) + + def _gen_rule_transform( + self, + rule_src_name, + transform_fn, + transform_name, + subarray_name=None, + group=None, + ): + + rule_src = self.rules[rule_src_name] + + if subarray_name is None: + src_name = rule_src_name + group_src = rule_src.group + unit = rule_src.unit + description = rule_src.description + else: + src_name = subarray_name + group_src = rule_src.group + "/" + rule_src_name + unit = rule_src.unit[subarray_name] + description = rule_src.description[subarray_name] + + def fn(arg=None, **kwargs): + if arg is None: + return self._transform( + src_name, transform_fn, group=group_src, **kwargs + ) + else: + return self._transform( + src_name + "_" + str(arg), transform_fn, group=group_src, **kwargs + ) + + if group is None: + group = group_src + + name = transform_name + "_" + rule_src_name + + self.rules[name] = Rule( + self, + fn, + group=group, + unit=unit, + description=description, + dependencies=[rule_src_name], + ) + + +def simple_getter(name, dset): + return dset[name] diff --git a/comparator.py b/comparator.py new file mode 100644 index 0000000..8f401c5 --- /dev/null +++ b/comparator.py @@ -0,0 +1,520 @@ +# coding: utf-8 + +from aggregator import * + + +class Comparator(Aggregator, HDF5Container): + """ + Do comparaison between outputs and runs + """ + + def __init__( + self, + path, + in_runs, + in_nums, + path_out=None, + pp_params=default_params(), + selector=None, + tag=None, + **kwargs + ): + """ + Creates the basic structures needed for the outputs + """ + + super(Comparator, self).__init__(path, path_out, pp_params, tag) + + # Open outfile + if not self.pp_params.out.tag == "": + tag_name = "_" + self.pp_params.out.tag + else: + tag_name = "" + + self.filename = path_out + "/comp" + tag_name + ".h5" + + # Select runs + if selector is None: + selector = RunSelector(path, in_runs, in_nums, self.pp_params, **kwargs) + + # Save infos + self.path = path + self.runs = selector.runs + self.nums = selector.nums + + # Get postprocesor objets for each run and infos on them + self.pp = {} + self.info = {} + + for run in self.runs: + path_run = path + "/" + run + path_out_run = path_out + "/" + run + self.pp[run] = {} + + for num in self.nums[run]: + self.pp[run][num] = PostProcessor( + path_run, num, path_out=path_out_run, pp_params=self.pp_params + ) + + run0 = self.runs[0] + self.info = self.pp[run0][self.nums[run0][0]].info + + self.namelist = selector.namelist + + # log info + self.log_id = "[comp {}] ".format(self.pp_params.out.tag) + + # Define rules + self.def_rules() + + def _needs_computation(self, overwrite, name_full): + """ + Returns True if a new computation of the rule is needed + """ + if overwrite or not (name_full in self.save): + return True + elif not "nums" in self.save.get_node(name_full)._v_attrs: + return True + else: + saved_nums = self.save.get_node(name_full)._v_attrs.nums + missing_runs = len([run for run in self.nums if not run in saved_nums]) > 0 + missing_nums = missing_runs or all( + [ + len([num for num in self.nums[run] if not num in saved_nums[run]]) + > 0 + for run in self.nums + if run in saved_nums + ] + ) + return missing_nums + + def _get_units(self, unit, data=None): + "Get real units from info files" + if isinstance(unit, cst.Unit): + return unit + elif isinstance(unit, str): + # assert(not run is None) + return self.info[unit] # [run][unit] + # elif unit.keys()[0] in self.runs: + # for run in unit: + # unit[run] = self._get_units(unit[run], run=run) + # return unit + elif unit.keys()[0] in self.info: + new_unit = cst.none + for base_unit_str in unit: + expo = unit[base_unit_str] + base_unit = self._get_units(base_unit_str) + new_unit = new_unit * base_unit ** expo + return new_unit + elif (not data is None) and isinstance(data, dict) and unit.keys()[0] in data: + for key in unit: + unit[key] = self._get_units(unit[key]) + return unit + + else: + raise ValueError("Invalid unit") + + def _save_data(self, name_full, data, description, unit): + unit = self._get_units(unit, data=data) + super(Comparator, self)._save_data(name_full, data, description, unit) + self.save.get_node(name_full)._v_attrs.nums = self.nums + + def _time_series(self, getter, arg=None): + series = {} + for run in self.runs: + series[run] = np.zeros(len(self.nums[run])) + for i, num in enumerate(self.nums[run]): + series[run][i] = getter(run, num, arg=arg) + return series + + def _comp(self, getter, use_num=True): + prop = np.zeros(len(self.runs)) + for i, run in enumerate(self.runs): + if use_num: + num = self.nums[run][0] + prop[i] = getter(run, num) + else: + prop[i] = getter(run) + return prop + + def _time_avg( + self, name, start=None, end=None, span=None, ergodic=False, group="/series" + ): + mean = np.zeros(len(self.runs)) + median = np.zeros(len(self.runs)) + std = np.zeros(len(self.runs)) + v_min = np.zeros(len(self.runs)) + v_max = np.zeros(len(self.runs)) + q975 = np.zeros(len(self.runs)) + q025 = np.zeros(len(self.runs)) + + for i, run in enumerate(self.runs): + serie = self.save.get_node(group + "/" + name + "/" + run).read() + time = self.save.get_node(group + "/time/" + run).read() + mask = abs(serie) != np.inf + + if not ((start, end, span) == (None, None, None)): + start_r, end_r = start, end + # Be sure that start_r and end_r are defined + if end_r is None: + if span is None or start_r is None: + end_r = time[-1] + else: + end_r = start_r + span + if start_r is None: + if span is None: + start_r = time[0] + else: + start_r = end_r - span + + mask = mask & (time >= start_r) & (time <= end_r) & np.isfinite(serie) + + mean[i] = np.mean(serie[mask]) + std[i] = np.std(serie[mask]) + v_min[i], q025[i], median[i], q975[i], v_max[i] = np.percentile( + serie[mask], [0, 2.5, 50, 97.5, 100] + ) + if ergodic: # If the process is ergodic ... + std[i] = std[i] / np.sqrt(len(serie[mask])) + else: + std[i] = std[i] + return { + "runs": self.runs, + "mean": mean, + "std": std, + "median": median, + "min": v_min, + "max": v_max, + "q025": q025, + "q975": q975, + } + + def get_attr(self, attr_name, run, num, node_name="/", arg=None): + pp = self.pp[run][num] + if not arg is None: + node_name = node_name + "_" + str(arg) + return pp.get_attribute(node_name, attr_name) + + def get_global(self, node_name, run, num, arg=None, unload_cells=False): + if not arg is None: + node_name = node_name + "_" + str(arg) + pp = self.pp[run][num] + if unload_cells: + pp.unload_cells() + value = pp.get_value(node_name) + return value + + def get_nml(self, nml_key, run): + return self.namelist[run][nml_key] + + def get_pdf_slope(self, name, run, num): + pp = self.pp[run][num] + pp.process(["fit_pdf_" + name], ["z"], overwrite=self.overwrite_dep) + slope = pp.get_attribute("/hist/pdf_" + name + "_z", "slope") + return slope + + def _extract_sinks_from_log(self, series, log_filename, run): + cmd_grep = "sed '/cpu.*/d' {} | grep 'Number of sink' -A 2".format(log_filename) + content = os.popen(cmd_grep).readlines() + for i in range(0, len(content), 4): + series["nb_sink"][run].append(np.int(content[i].split("=")[1])) + series["mass_sink"][run].append(np.float(content[i + 1].split("=")[1])) + series["time"][run].append(np.float(content[i + 2].split("=")[1])) + return series + + def _extract_sfr_from_log(self, series, log_filename, run): + cmd_grep = "grep '\[SFR' {} ".format(log_filename) + content = os.popen(cmd_grep).readlines() + for i in range(0, len(content)): + time = np.float(content[i].split("]")[0].split("=")[1].split()[0]) + sfr = np.float(content[i].split("]")[1].split("=")[1].split()[0]) + series["time"][run].append(time) + series["sfr"][run].append(sfr) + return series + + def _extract_rms_from_log(self, series, log_filename, run): + cmd_grep = "grep 'turbulent rms' {} -C 1".format(log_filename) + content = os.popen(cmd_grep).readlines() + for i in range(0, len(content), 4): + series["time"][run].append(np.float(content[i].split("=")[2].split()[0])) + series["dt"][run].append(np.float(content[i].split("=")[3].split()[0])) + series["turb_rms"][run].append(np.float(content[i + 1].split(":")[1])) + try: + turb_energy = np.float(content[i + 2].split(":")[1]) + threshold = self.pp_params.rules.turb_energy_threshold + assert threshold < 0 or abs(turb_energy) < threshold + series["turb_energy"][run].append(abs(turb_energy)) + except (AssertionError, ValueError, IndexError): + series["turb_energy"][run].append(np.nan) + return series + + def _from_log(self, keys, extractor): + nums = self.nums + + # Initialize series + series = {} + for key in keys: + series[key] = {} + + for run in self.runs: + # Initialize list + for key in keys: + series[key][run] = [] + + # get one preprocessor + path_run = self.path + "/" + run + + # Get list of run files + log_files = path_run + "/" + self.pp_params.input.log_prefix + "*" + + # Parse files + for log_filename in glob.glob(log_files): + series = extractor(series, log_filename, run) + + # Numpify the lists + for key in series: + series[key][run] = np.array(series[key][run]) + + # Sort the arrays + ind_sort = series["time"][run].argsort() + for key in series: + series[key][run] = series[key][run][ind_sort] + return series + + def _ssfr_from_mass_sink(self, avg_window=None): + """ + avg_window in year + """ + time_unit = self.save.get_node("/series/sinks_from_log/time")._v_attrs.unit + mass_unit = self.save.get_node("/series/sinks_from_log/mass_sink")._v_attrs.unit + ssfr = {} + for run in self.runs: + # Surface of the box in pc^2 + surface = (self.info["unit_length"].express(cst.pc)) ** 2 + # WARNING : We do not multiply by boxlen since already done in 'unit_length' (pymses) + + time = self.save.get_node("/series/sinks_from_log/time/" + run).read() + time = time * time_unit.express(cst.year) + mass_sink = self.save.get_node( + "/series/sinks_from_log/mass_sink/" + run + ).read() + mass_sink = mass_sink * mass_unit.express(cst.Msun) + if avg_window is None: + shift = 1 + else: + # We assume that the timestep do not vary a lot ... + shift = np.searchsorted(time, time[0] + avg_window, side="left") + sfr = (mass_sink[shift:] - mass_sink[:-shift]) / ( + time[shift:] - time[:-shift] + ) + ssfr[run] = np.zeros(len(mass_sink)) + ssfr[run][shift:] = sfr / surface + return ssfr, {"avg_window": avg_window} + + def _turb_power(self): + turb_power = {} + for run in self.runs: + dt = self.save.get_node("/series/rms_from_log/dt/" + run).read() + # Energy injected at each timestep + energy = self.save.get_node( + "/series/rms_from_log/turb_energy/" + run + ).read() + # Power of the turbulence at this step in Watts + turb_power[run] = energy / dt + return turb_power + + def _gen_rule_time_global( + self, + glob_name, + name=None, + glob_group="/globals", + subarray_name=None, + unload_cells=True, + unit=cst.none, + description="", + ): + + if name is None: + name = "time_" + glob_name + + self.rules[name] = Rule( + self, + partial( + self._time_series, + partial( + self.get_global, glob_group + "/" + glob_name, unload_cells=True + ), + ), + group="/series", + unit=unit, + dependencies={"time": None, glob_name: "__parent__"}, + ) + + def _gen_rule_avg(self, rule_src_name, subarray_name=None, name=None): + + rule_src = self.rules[rule_src_name] + + if subarray_name is None: + src_name = rule_src_name + group_src = rule_src.group + unit = rule_src.unit + descr = rule_src.description + else: + src_name = subarray_name + group_src = rule_src.group + "/" + rule_src_name + unit = rule_src.unit[subarray_name] + descr = rule_src.description[subarray_name] + + description = { + "runs": "List of runs", + "mean": "Temporal average of {}".format(descr), + "std": "Standard deviation of {}".format(descr), + "median": "Median of {}".format(descr), + "max": "Maximum of {}".format(descr), + "min": "Minimum of {}".format(descr), + "q025": "2.5 percentile of {}".format(descr), + "q975": "97.5 percentile of {}".format(descr), + } + + if name is None: + name = "avg_" + src_name + + def fn(arg=None, **kwargs): + if arg is None: + return self._time_avg(src_name, group=group_src, **kwargs) + else: + return self._time_avg( + src_name + "_" + str(arg), group=group_src, **kwargs + ) + + self.rules[name] = Rule( + self, + fn, + group="/comp", + unit=unit, + description=description, + dependencies=[rule_src_name], + ) + + def def_rules(self): + averageables = ["coldens", "rho", "T", "Q"] + self.rules = { + # Read from log + "sinks_from_log": Rule( + self, + partial( + self._from_log, + ["time", "mass_sink", "nb_sink"], + self._extract_sinks_from_log, + ), + group="/series", + unit={"time": "unit_time", "mass_sink": cst.Msun, "nb_sink": cst.none}, + description={ + "time": "Time", + "mass_sink": "Total mass of stars", + "nb_sink": "Number of stars", + }, + ), + "issfr": Rule( + self, + self._ssfr_from_mass_sink, + group="/series/sinks_from_log", + unit=cst.ssfr, + description="Instantaneous surfacic star formation rate", + dependencies=["sinks_from_log"], + ), + "sfr_from_log": Rule( + self, + partial(self._from_log, ["time", "sfr"], self._extract_sfr_from_log), + group="/series", + unit={"time": cst.year, "sfr": cst.ssfr}, + description={ + "time": "Time", + "sfr": "Averaged surfacic star formation rate", + }, + ), + "rms_from_log": Rule( + self, + partial( + self._from_log, + ["time", "dt", "turb_rms", "turb_energy"], + self._extract_rms_from_log, + ), + group="/series", + unit={ + "time": "unit_time", + "dt": "unit_time", + "turb_rms": cst.none, + "turb_energy": { + "unit_length": 3, + "unit_velocity": 2, + "unit_density": 1, + }, + }, + description={ + "time": "Time", + "dt": "Timestep", + "turb_rms": "Computed turbulent RMS", + "turb_energy": "Injected turbulent energy", + }, + ), + "turb_power": Rule( + self, + self._turb_power, + group="/series/rms_from_log", + unit={ + "unit_length": 3, + "unit_velocity": 2, + "unit_density": 1, + "unit_time": -1, + }, + description="Injected turbulent power", + dependencies=["rms_from_log"], + ), + # Read from outputs + "time": Rule( + self, + partial( + self._time_series, partial(self.get_global, "/globals/time_num") + ), + group="/series", + unit="unit_time", + dependencies=["time_num"], + ), + "time_pdf_slope_coldens": Rule( + self, + partial( + self._time_series, + partial( + self.get_attr, + "slope", + node_name="/hist/pdf_coldens_z", + ), + ), + group="/series", + dependencies={"time": None, "fit_pdf_coldens": "z"}, + ), + # namelist + "nml": Rule( + self, + lambda nml_key: self._comp( + partial(self.get_nml, nml_key), use_num=False + ), + group="/comp", + ), + } + + self._gen_rule_time_global("mwa_sigma", "time_sigma", unit="unit_velocity") + self._gen_rule_time_global("max_fluct_coldens") + + for name in ["issfr", "time_sigma", "time_pdf_slope_coldens", "turb_power"]: + self._gen_rule_avg(name) + + self._gen_rule_avg("sinks_from_log", "mass_sink") + self._gen_rule_avg("sinks_from_log", "nb_sink") + self._gen_rule_avg("sfr_from_log", "sfr") + + self._gen_rule_avg("rms_from_log", "turb_rms") + self._gen_rule_avg("rms_from_log", "turb_energy") + + super(Comparator, self).def_rules() diff --git a/plotter.py b/plotter.py index 4227b23..89204ad 100644 --- a/plotter.py +++ b/plotter.py @@ -17,8 +17,9 @@ from matplotlib.collections import PatchCollection from functools import partial from numpy.polynomial.polynomial import polyfit from scipy.ndimage.filters import gaussian_filter1d +from scipy import optimize -from postprocessor import * +from comparator import * P.rcParams["image.cmap"] = "plasma" @@ -95,7 +96,7 @@ class Plotter(Aggregator, BaseProcessor): ) # Get postprocesor objets for each run - self.pp_runs = self.comp.pp_runs + self.pp = self.comp.pp # Define log prefix self.log_id = "[plot {}] ".format(self.pp_params.out.tag) @@ -128,7 +129,7 @@ class Plotter(Aggregator, BaseProcessor): for run in self.runs: for i, num in enumerate(self.nums[run]): plot_filename = self._find_filename(name_full, run, num) - save = tables.open_file(self.pp_runs[run][num].filename) + save = tables.open_file(self.pp[run][num].filename) try: self._plot_rule( rule, @@ -182,38 +183,36 @@ class Plotter(Aggregator, BaseProcessor): else: self._log("Plot {} is already done, skipping...".format(plot_filename)) - def _find_filename(self, name_full, run=None, num=None): - if not self.pp_params.out.tag == "": - tag_name = "_" + self.pp_params.out.tag - else: - tag_name = "" + def _find_filename(self, name_full, run=None, num=None, fmt=None): - if not run is None and not num is None: - return ( - self.path_out - + "/" - + run - + "/" - + name_full - + tag_name - + "_" - + format(num, "05") - + self.pp_params.plot.out_ext - ) - elif not run is None: - return ( - self.path_out - + "/" - + run - + "/" - + name_full - + tag_name - + self.pp_params.plot.out_ext - ) - else: - return ( - self.path_out + "/" + name_full + tag_name + self.pp_params.plot.out_ext - ) + tag_name = self.pp_params.out.tag + + if fmt is None and self.pp_params.out.fmt == "": + if not self.pp_params.out.tag == "": + tag_name = "_" + tag_name + + if not run is None and not num is None: + fmt = "{out}/{run}/{name}{tag}_{run}_{num:05}_{ext}" + elif not run is None: + fmt = "{out}/{run}/{name}{tag}_{run}{ext}" + else: + fmt = "{out}/{name}{tag}{ext}" + elif fmt is None: + fmt = self.pp_params.out.fmt + + nml = None + if not run is None: + nml = self.comp.namelist[run] + + return fmt.format( + run=run, + name=name_full, + tag=tag_name, + num=num, + nml=nml, + out=self.path_out, + ext=self.pp_params.out.ext, + ) def _label_run(self, run, node, label, nml_key): def get_label_nml(nml_key): @@ -226,7 +225,7 @@ class Plotter(Aggregator, BaseProcessor): if prop_name in self.value_convert: prop_value_str = self.value_convert[prop_name](prop_value) elif type(prop_value) in [int, float]: - prop_value_str = "${:.6g}$".format(prop_value) + prop_value_str = convert_exp(prop_value, digits=5) else: prop_value_str = str(prop_value) return r"{} = {}".format(prop_label, prop_value_str) @@ -443,7 +442,7 @@ class Plotter(Aggregator, BaseProcessor): **kwargs ): - node = self.save.get_node("/hist/" + name + "_" + ax_los) + node = self.save.get_node("/hist/" + name) label, unit_old, unit = self._ax_label_unit(node, label, unit, unit_coeff) values, centers = node.read() * unit_old.express(unit) @@ -493,10 +492,15 @@ class Plotter(Aggregator, BaseProcessor): yunit=None, xunit_coeff=1.0, yunit_coeff=1.0, - linearfit=False, + fit=None, + fitlabel=None, smooth=0, + sigma_err=2.0, nml_key=None, runs=None, + yerr_kind="std", + colors=None, + nml_color=None, **kwargs ): @@ -514,60 +518,146 @@ class Plotter(Aggregator, BaseProcessor): P.ylabel(ylabel) P.grid() + yerr = None if node_y._v_attrs.CLASS == "ARRAY": x = node_x.read() * xunit_old.express(xunit) y = node_y.read() * yunit_old.express(yunit) - mask = np.isfinite(y) + mask = np.isfinite(x) & np.isfinite(y) x, y = x[mask], y[mask] if smooth > 0: y = gaussian_filter1d(y, sigma=smooth) - yerr = None - P.plot(x, y, "*", **kwargs) + (base_line,) = P.plot(x, y, "*", **kwargs) elif "mean" in node_y: x = node_x.read() * xunit_old.express(xunit) y = node_y.mean.read() * yunit_old.express(yunit) - mask = np.isfinite(y) - x, y = x[mask], y[mask] - if smooth > 0: - y = gaussian_filter1d(y, sigma=smooth) - yerr = node_y.std.read() * yunit_old.express(yunit) - P.errorbar(x, y, yerr=yerr, fmt="*", **kwargs) - else: - yerr = None - if runs is None: - runs = self.runs - for run in runs: - x_run, y_run = node_x[run], node_y[run] - x = x_run.read() * xunit_old.express(xunit) - y = y_run.read() * yunit_old.express(yunit) + if yerr_kind == "std": + yerr = node_y.std.read() * yunit_old.express(yunit) * sigma_err + mask = np.isfinite(x) & np.isfinite(y) & np.isfinite(yerr) + x, y, yerr = x[mask], y[mask], yerr[mask] + if smooth > 0: + y = gaussian_filter1d(y, sigma=smooth) + base_line, _, _ = P.errorbar(x, y, yerr=yerr, label=label, **kwargs) + elif yerr_kind in ["min_max", "95per"]: + if yerr_kind == "min_max": + yerr_min = node_y.min.read() * yunit_old.express(yunit) + yerr_max = node_y.max.read() * yunit_old.express(yunit) + elif yerr_kind == "95per": + yerr_min = node_y.q025.read() * yunit_old.express(yunit) + yerr_max = node_y.q975.read() * yunit_old.express(yunit) + yerr = yerr_max - yerr_min + mask = ( + np.isfinite(x) + & np.isfinite(y) + & np.isfinite(yerr_min) + & np.isfinite(yerr_max) + ) + x, y, yerr, yerr_min, yerr_max = ( + x[mask], + y[mask], + yerr[mask], + yerr_min[mask], + yerr_max[mask], + ) + base_line, _, _ = P.errorbar( + x, y, yerr=[y - yerr_min, yerr_max - y], label=label, **kwargs + ) + else: mask = np.isfinite(y) x, y = x[mask], y[mask] + if smooth > 0: + y = gaussian_filter1d(y, sigma=smooth) + (base_line,) = P.plot(x, y, "*", **kwargs) + else: + if runs is None: + runs = self.runs + for i, run in enumerate(runs): + x_run, y_run = node_x[run], node_y[run] + x = x_run.read() * xunit_old.express(xunit) + y = y_run.read() * yunit_old.express(yunit) + mask = np.isfinite(x) & np.isfinite(y) + x, y = x[mask], y[mask] if smooth > 0: y = gaussian_filter1d(y, sigma=smooth) label_run = self._label_run(run, y_run, label, nml_key) - P.plot(x, y, label=label_run, **kwargs) + if colors is None: + (base_line,) = P.plot(x, y, label=label_run, **kwargs) + else: + if nml_color is None: + color = colors[i % len(colors)] + (base_line,) = P.plot(x, y, label=label_run, **kwargs) + else: + nml = self.comp.get_nml(nml_color, run) + color = colors[nml] + (base_line,) = P.plot(x, y, label=label_run, color=color, **kwargs) + P.legend() - if linearfit: - self._overlay_linearfit(x, y, yerr) + if not fit is None: + self._overlay_fit( + x, + y, + yerr, + kind=fit, + ls="--", + lw=1.5, + color=base_line.get_color(), + label=fitlabel, + ) - def _overlay_linearfit(self, x, y, yerr=None, fit_order=1): - if yerr is None: - (a, b, rho, _map_rule, stderr) = linregress(x, y) - self._log( - "Linear fit y = {} x + {} with R^2 = {} and error is {}".format( - a, b, rho, stderr + def _overlay_fit(self, x, y, yerr=None, kind="linear", label=None, **kwargs): + if kind == "linear": + if yerr is None: + (a, b, rho, _map_rule, stderr) = linregress(x, y) + self._log( + "Linear fit y = {} x + {} with R^2 = {} and error is {}".format( + a, b, rho, stderr + ) ) - ) - else: - fit = polyfit(x, y, 1, w=[1.0 / ty for ty in yerr], full=True) - c = fit[0] - residual = fit[1][0][0] - b, a = c[0], c[1] - self._log( - "Linear fit y = {} x + {} with residual {}".format(a, b, residual) - ) - P.plot(x, a * x + b, "--", linewidth=1.5) + if label is None: + label = r"Linear fit with slope ${:.3g}$ and $R^2 = {:.3f}$".format( + a, rho + ) + else: + fit = polyfit(x, y, 1, w=[1.0 / ty for ty in yerr], full=True) + c = fit[0] + residual = fit[1][0][0] + b, a = c[0], c[1] + self._log( + "Linear fit y = {} x + {} with residual {}".format(a, b, residual) + ) + if label is None: + label = r"Linear fit with slope ${:.3g}$".format(a) + P.plot(x, a * x + b, label=label, **kwargs) + elif kind == "power_law": + if yerr is None: + (a, b, rho, _map_rule, stderr) = linregress(np.log10(x), np.log10(y)) + self._log( + "Power law fit y = x^({}) * 10^({}) with R^2 = {} and error is {}".format( + a, b, rho, stderr + ) + ) + else: + fitfunc = lambda p, x: p[0] + p[1] * x + errfunc = lambda p, x, y, err: (y - fitfunc(p, x)) / err + pinit = [1.0, -1.0] + out = optimize.leastsq( + errfunc, + pinit, + args=(np.log10(x), np.log10(y), yerr / y), + full_output=1, + ) + + c = out[0] + b, a = c[0], c[1] + residual = errfunc(c, np.log10(x), np.log10(y), yerr / y) + self._log( + "Power law fit y = x^({}) * 10^({}) with residual {}".format( + a, b, residual + ) + ) + if label is None: + label = r"Power-law fit with index {:.1f}".format(a) + P.plot(x, (10 ** b) * x ** a, label=label, **kwargs) def overlay_kennicutt(self, n0, step): P.grid(False) @@ -650,6 +740,12 @@ class Plotter(Aggregator, BaseProcessor): "Toomre Q parameter for a Keplerian disk", dependencies=["Q"], ), + "rho_pdf": PlotRule( + self, + partial(self._plot_hist, "rho_pdf"), + "$\rho$-PDF", + dependencies=["rho_pdf"], + ), } averageables = ["coldens", "rho", "T", "Q"] @@ -767,6 +863,17 @@ class Plotter(Aggregator, BaseProcessor): kind="series", dependencies=["rms_from_log"], ), + "turb_power": PlotRule( + self, + partial( + self._plot, + "/series/rms_from_log/time", + "/series/rms_from_log/turb_power", + xunit=cst.Myr, + ), + kind="series", + dependencies=["turb_power"], + ), "sigma": PlotRule( self, partial( diff --git a/postprocessor.py b/postprocessor.py index 20105f0..22f3442 100644 --- a/postprocessor.py +++ b/postprocessor.py @@ -1,357 +1,9 @@ # coding: utf-8 -import sys -import os -import glob as glob - - -import tables -import pymses -import numpy as np -from numpy.polynomial.polynomial import polyfit -from scipy.stats import linregress - -from pymses.sources.ramses import output -from pymses.sources.hop.file_formats import * -from pymses.analysis import Camera, raytracing, slicing, splatting -from pymses.filters import CellsToPoints -from pymses.analysis import ScalarOperator, FractionOperator, MaxLevelOperator -import subprocess - -import module_extract as me - -from mypool import MyPool as Pool -from functools import partial -from abc import ABCMeta, abstractmethod -import bunch - -from run_selector import * -from units import * - - -class Rule: - def __init__( - self, - postproc, - process, - description="", - group="", - dependencies=[], - is_valid=lambda arg: True, - kind="classic", - unit=cst.none, - ): - self.postproc = postproc - self.process_fn = process - self.dependencies = dependencies - self.is_valid_add = is_valid - self.group = group - self.description = description - self.unit = unit - self.kind = kind - - def process(self, arg, **kwargs): - if not arg is None: - return self.process_fn(arg, **kwargs) - else: - return self.process_fn(**kwargs) - - def is_valid(self, arg): - # save = self.postproc.save - # valid = True - # for dep in self.dependencies: - # if dep in self.postproc.rules: - # rule_dep = self.postproc.rules[dep] - # if not arg is None: - # valid = valid and rule_dep.group + '/' + dep + '_' + str(arg) in save - # else: - # valid = valid and rule_dep.group + '/' + dep in save - # return valid and self.is_valid_add(arg) - return self.is_valid_add(arg) - - -class BaseProcessor: - """ - Base class for processors, should not be instanciated - """ - - __metaclass__ = ABCMeta - - log_id = "" - rules = {} - solve_self_dep = True - - def __init__(self, path, path_out=None, pp_params=None, tag=None): - if pp_params is None: - self.pp_params = default_params() - elif type(pp_params) == str: - self.pp_params = load_params(pp_params) - else: - self.pp_params = pp_params - - if tag is not None: - self.pp_params.out.tag = tag - - # Determining output directory - if path_out is None: - self.path_out = path - else: - self.path_out = path_out - - def _log(self, string, status=""): - if self.pp_params.process.verbose: - if len(status) > 0: - print(status + ": " + self.log_id + string) - else: - print(self.log_id + string) - - def process( - self, - to_process_list, - args=[None], - overwrite=False, - overwrite_dep=False, - **kwargs - ): - """ - Render the data in to_process_list and save them - """ - - if type(to_process_list) == str: - to_process_list = [to_process_list] - - if type(args) == str or args is None: - args = [args] - - self.overwrite_dep = overwrite_dep - self.just_done = [] # Computations done within this call - - for name in to_process_list: - if name in self.rules: - rule = self.rules[name] - for arg in args: - self._solve_and_process_rule(name, rule, arg, overwrite, **kwargs) - else: - self._log( - "{} is unknown, allowed rules are {}".format( - name, self.rules.keys() - ), - "ERROR", - ) - - return self.just_done - - def _solve_and_process_rule(self, name, rule, arg, overwrite=False, **kwargs): - updated = self._solve_dependencies(name, rule, arg, overwrite, **kwargs) - overwrite_rule = overwrite or updated - self._process_rule(name, rule, arg, overwrite_rule, **kwargs) - - def _solve_dependencies(self, name, rule, arg, overwrite=False, **kwargs): - - self.done_before_dep = len(self.just_done) - - # Solve dependencies - for dep in rule.dependencies: - try: - dep_arg = rule.dependencies[dep] - except: - dep_arg = arg - - if dep_arg == "__parent__": - dep_arg = arg - - if self.solve_self_dep and dep in self.rules: - rule_dep = self.rules[dep] - self._solve_and_process_rule(dep, rule_dep, dep_arg, self.overwrite_dep) - else: - self._not_self_dep(name, dep, dep_arg, self.overwrite_dep, **kwargs) - - # Whether dependencies where updated - return len(self.just_done) > self.done_before_dep - - def _not_self_dep(self, name, dep, dep_arg, overwrite, **kwargs): - self._log("Dependency {} for {} is unknown".format(dep, name), "ERROR") - - def _needs_computation(self, overwrite, name_full): - return overwrite - - def _process_rule(self, name, rule, arg, overwrite=False, **kwargs): - if not arg is None: - name_full = rule.group + "/" + name + "_" + str(arg) - else: - name_full = rule.group + "/" + name - - if rule.is_valid(arg): - if not name_full in self.just_done: - if self._needs_computation(overwrite, name_full): - self._log("Processing {}".format(name_full)) - data = rule.process(arg, **kwargs) - self._save_data(name_full, data, rule.description, rule.unit) - self._log("Data for {} computed".format(name_full), "SUCCESS") - self.just_done.append(name_full) - else: - self._log( - "Data for {} is already computed, skipping...".format(name_full) - ) - else: - self._log("{} is not valid in this context".format(name_full), "ERROR") - - def def_rules(self): - for rule in self.rules: - setattr(self, rule, partial(self.process, rule)) - - -class HDF5Container(BaseProcessor): - - filename = "" - save = None - opened = False - - def open(self): - if not self.opened: - self.save = tables.open_file(self.filename, mode="a") - self.opened = True - - def close(self): - if self.opened: - self.save.close() - self.opened = False - - def _needs_computation(self, overwrite, name_full): - return overwrite or not (name_full in self.save) - - def _process_rule(self, name, rule, arg, overwrite, **kwargs): - self.open() - try: - super(HDF5Container, self)._process_rule( - name, rule, arg, overwrite, **kwargs - ) - finally: - self.close() - - def get_value(self, node_name): - self.open() - try: - node = self.save.get_node(node_name) - if node._v_attrs.CLASS == "GROUP": - value = {} - for child_name in node._v_children: - value[child_name] = self.get_value(node_name + "/" + child_name) - else: - value = node.read() - finally: - self.close() - return value - - def _save_data(self, name_full, data, description, unit): - """ - Save data in the HDF5 structure, overwrite if necessary - """ - if name_full in self.save: - self.save.remove_node(name_full, recursive=True) - - if type(data) == dict: - if type(description) == str: - self.save.create_group( - os.path.dirname(name_full), - os.path.basename(name_full), - description, - createparents=True, - ) - else: - self.save.create_group( - os.path.dirname(name_full), - os.path.basename(name_full), - "", - createparents=True, - ) - - if not type(unit) == dict: - self.save.get_node(name_full)._v_attrs.unit = unit - - for key in data: - if type(description) == dict: - if type(unit) == dict: - self._save_data( - name_full + "/" + key, - data[key], - description[key], - unit[key], - ) - else: - self._save_data( - name_full + "/" + key, data[key], description[key], unit - ) - else: - if type(unit) == dict: - self._save_data(name_full + "/" + key, data[key], "", unit[key]) - else: - self._save_data(name_full + "/" + key, data[key], "", unit) - else: - self.save.create_array( - os.path.dirname(name_full), - os.path.basename(name_full), - data, - description, - createparents=True, - ) - self.save.get_node(name_full).attrs.unit = unit - - def set_value(self, node_name, data, description, unit): - self.open() - try: - self._save_data(node_name, data, description, unit) - finally: - self.close() - - def get_attribute(self, node_name, attr_name): - self.open() - try: - node = self.save.get_node(node_name) - attr = node._v_attrs[attr_name] - finally: - self.close() - return attr - - -def _map_rule(rule, arg, overwrite, path, path_out, pp_params, run_num): - try: - pp = PostProcessor( - path + "/" + run_num[0], run_num[1], path_out + "/" + run_num[0], pp_params - ) - except pymses.RamsesIOError as e: - print(e) - return pp.process(rule, arg, overwrite) - - -class Aggregator: - def _not_self_dep(self, name, dep, dep_arg, overwrite, **kwargs): - if "runs" in kwargs: - dep_runs = [run for run in self.runs if run in kwargs["runs"]] - else: - dep_runs = self.runs - - pps = [[self.pp_runs[run][num] for num in self.nums[run]] for run in dep_runs] - run_num = [(run, num) for run in dep_runs for num in self.nums[run]] - map_fn = partial( - _map_rule, dep, dep_arg, overwrite, self.path, self.path_out, self.pp_params - ) - - if self.pp_params.process.num_process > 1: - pool = Pool(processes=self.pp_params.process.num_process) - done = pool.map(map_fn, run_num) - pool.close() - pool.join() - else: - done = map(map_fn, run_num) - self.just_done.extend([item for li in done for item in li]) - - -def simple_getter(name, dset): - return dset[name] - +from baseprocessor import * mass_func = lambda dset: dset["rho"] * dset.get_sizes() ** 3 # Mass function +vol_func = lambda dset: dset.get_sizes() ** 3 # Volume function class PostProcessor(HDF5Container): @@ -543,11 +195,34 @@ class PostProcessor(HDF5Container): else: return np.sum(value, axis=0) - def _mwa_sigma(self): + def _vol_pdf(self, getter, log=False, weight_func=vol_func): + self.load_cells() + data = getter(self.cells) + if logbins: + data = np.log10(data) + weights = weight_func(self.cells) + + values, edges = np.histogram(data, weights=weights) + centers = 0.5 * (edges[1:] + edges[:-1]) + return np.stack([values, centers]) + + def _mwa_sigma(self, axes=["x", "y", "z"]): mw_speed = self.save.get_node("/globals/mwa_speed").read() - def getter(dset): - return np.sum((dset["vel"] - mw_speed) ** 2, axis=1) + if axes == ["x", "y", "z"]: + + def getter(dset): + return np.sum((dset["vel"] - mw_speed) ** 2, axis=1) + + else: + + def getter(dset): + sigma_squared = 0.0 + for ax in axes: + ax_nb = self._ax_nb[ax] + sigma_sq_ax = (dset["vel"][:, ax_nb] - mw_speed[ax_nb]) ** 2 + sigma_squared = sigma_squared + sigma_sq_ax + return sigma_squared return np.sqrt(self._vol_avg(getter, mass_weighted=True)) @@ -756,7 +431,7 @@ class PostProcessor(HDF5Container): return dmap / avg_map - def _pdf(self, name, ax_los): + def _rad_fluct_pdf(self, name, ax_los): fluct_map = self.save.get_node("/maps/fluct_" + name + "_" + ax_los).read() rr = self.save.get_node("/maps/rr_" + ax_los).read() @@ -856,56 +531,6 @@ class PostProcessor(HDF5Container): return sinks_dict - def _transform(self, name, transform_fn, group="/maps", **kwargs): - src = self.save.get_node(group + "/" + name).read() - return transform_fn(src, **kwargs) - - def _gen_rule_transform( - self, - rule_src_name, - transform_fn, - transform_name, - subarray_name=None, - group=None, - ): - - rule_src = self.rules[rule_src_name] - - if subarray_name is None: - src_name = rule_src_name - group_src = rule_src.group - unit = rule_src.unit - description = rule_src.description - else: - src_name = subarray_name - group_src = rule_src.group + "/" + rule_src_name - unit = rule_src.unit[subarray_name] - description = rule_src.description[subarray_name] - - def fn(arg=None, **kwargs): - if arg is None: - return self._transform( - src_name, transform_fn, group=group_src, **kwargs - ) - else: - return self._transform( - src_name + "_" + str(arg), transform_fn, group=group_src, **kwargs - ) - - if group is None: - group = group_src - - name = transform_name + "_" + rule_src_name - - self.rules[name] = Rule( - self, - fn, - group=group, - unit=unit, - description=description, - dependencies=[rule_src_name], - ) - def def_rules(self): self.rules = { @@ -1007,6 +632,13 @@ class PostProcessor(HDF5Container): "/maps", dependencies=["radial_bins", "rr"], ), + # PDF + "rho_pdf": Rule( + self, + partial(self._vol_pdf, partial(simple_getter, "rho")), + "Global rho-PDF", + "/hist", + ), # globals "time_num": Rule( self, @@ -1027,7 +659,7 @@ class PostProcessor(HDF5Container): self._mwa_sigma, "Mass weighted speed average", "/globals", - dependencies=["mwa_speed"], + dependencies={"mwa_speed": None}, unit=self.info["unit_velocity"], ), } @@ -1059,7 +691,7 @@ class PostProcessor(HDF5Container): ) self.rules["pdf_" + name] = Rule( self, - partial(self._pdf, name), + partial(self._rad_fluct_pdf, name), "Probability density function of {} fluctuations".format(name), "/hist", dependencies=["rr", "fluct_" + name], @@ -1078,438 +710,6 @@ class PostProcessor(HDF5Container): super(PostProcessor, self).def_rules() -class Comparator(Aggregator, HDF5Container): - """ - Do comparaison between outputs and runs - """ - - def __init__( - self, - path, - in_runs, - in_nums, - path_out=None, - pp_params=default_params(), - selector=None, - tag=None, - **kwargs - ): - """ - Creates the basic structures needed for the outputs - """ - - super(Comparator, self).__init__(path, path_out, pp_params, tag) - - # Open outfile - if not self.pp_params.out.tag == "": - tag_name = "_" + self.pp_params.out.tag - else: - tag_name = "" - - self.filename = path_out + "/comp" + tag_name + ".h5" - - # Select runs - if selector is None: - selector = RunSelector(path, in_runs, in_nums, self.pp_params, **kwargs) - - # Save infos - self.path = path - self.runs = selector.runs - self.nums = selector.nums - - # Get postprocesor objets for each run - self.pp_runs = {} - - for run in self.runs: - path_run = path + "/" + run - path_out_run = path_out + "/" + run - self.pp_runs[run] = {} - for num in self.nums[run]: - self.pp_runs[run][num] = PostProcessor( - path_run, num, path_out=path_out_run, pp_params=self.pp_params - ) - - self.namelist = selector.namelist - # Get info from one output. TODO Avoid using pymses for that - self.info = self.pp_runs[self.runs[0]][self.nums[self.runs[0]][0]].info - - # log info - self.log_id = "[comp {}] ".format(self.pp_params.out.tag) - - # Define rules - self.def_rules() - - def _needs_computation(self, overwrite, name_full): - """ - Returns True if a new computation of the rule is needed - """ - if overwrite or not (name_full in self.save): - return True - elif not "nums" in self.save.get_node(name_full)._v_attrs: - return True - else: - saved_nums = self.save.get_node(name_full)._v_attrs.nums - missing_runs = len([run for run in self.nums if not run in saved_nums]) > 0 - missing_nums = missing_runs or all( - [ - len([num for num in self.nums[run] if not num in saved_nums[run]]) - > 0 - for run in self.nums - if run in saved_nums - ] - ) - return missing_nums - - def _save_data(self, name_full, data, description, unit): - super(Comparator, self)._save_data(name_full, data, description, unit) - self.save.get_node(name_full)._v_attrs.nums = self.nums - - def _time_series(self, getter, arg=None): - series = {} - for run in self.runs: - series[run] = np.zeros(len(self.nums[run])) - for i, num in enumerate(self.nums[run]): - series[run][i] = getter(run, num, arg=arg) - return series - - def _comp(self, getter, use_num=True): - prop = np.zeros(len(self.runs)) - for i, run in enumerate(self.runs): - if use_num: - num = self.nums[run][0] - prop[i] = getter(run, num) - else: - prop[i] = getter(run) - return prop - - def _time_avg(self, name, start=None, end=None, span=None, group="/series"): - mean = np.zeros(len(self.runs)) - median = np.zeros(len(self.runs)) - std = np.zeros(len(self.runs)) - - for i, run in enumerate(self.runs): - serie = self.save.get_node(group + "/" + name + "/" + run).read() - time = self.save.get_node(group + "/time/" + run).read() - mask = abs(serie) != np.inf - - if not ((start, end, span) == (None, None, None)): - start_r, end_r = start, end - # Be sure that start_r and end_r are defined - if start_r is None: - if end_r is None: - end_r = time[-1] - start_r = end_r - span - elif end_r is None: - end_r = start_r + span - - mask = mask & (time >= start_r) & (time <= end_r) - - mean[i] = np.nanmean(serie[mask]) - median[i] = np.nanmedian(serie[mask]) - std[i] = np.nanstd(serie[mask]) - return {"runs": self.runs, "mean": mean, "std": std, "median": median} - - def get_attr(self, attr_name, run, num, node_name="/", arg=None): - pp = self.pp_runs[run][num] - if not arg is None: - node_name = node_name + "_" + str(arg) - return pp.get_attribute(node_name, attr_name) - - def get_global(self, node_name, run, num, arg=None, unload_cells=False): - if not arg is None: - node_name = node_name + "_" + str(arg) - pp = self.pp_runs[run][num] - if unload_cells: - pp.unload_cells() - value = pp.get_value(node_name) - return value - - def get_nml(self, nml_key, run): - res = self.namelist[run] - for key in nml_key.split("/"): - res = res[key] - return res - - def get_pdf_slope(self, name, run, num): - pp = self.pp_runs[run][num] - pp.process(["fit_pdf_" + name], ["z"], overwrite=self.overwrite_dep) - slope = pp.get_attribute("/hist/pdf_" + name + "_z", "slope") - return slope - - def _extract_sinks_from_log(self, series, log_filename, run): - cmd_grep = "sed '/cpu.*/d' {} | grep 'Number of sink' -A 2".format(log_filename) - content = os.popen(cmd_grep).readlines() - for i in range(0, len(content), 4): - series["nb_sink"][run].append(np.int(content[i].split("=")[1])) - series["mass_sink"][run].append(np.float(content[i + 1].split("=")[1])) - series["time"][run].append(np.float(content[i + 2].split("=")[1])) - return series - - def _extract_sfr_from_log(self, series, log_filename, run): - cmd_grep = "grep '\[SFR' {} ".format(log_filename) - content = os.popen(cmd_grep).readlines() - for i in range(0, len(content)): - time = np.float(content[i].split("]")[0].split("=")[1].split()[0]) - sfr = np.float(content[i].split("]")[1].split("=")[1].split()[0]) - series["time"][run].append(time) - series["sfr"][run].append(sfr) - return series - - def _extract_rms_from_log(self, series, log_filename, run): - cmd_grep = "grep 'turbulent rms' {} -C 1".format(log_filename) - content = os.popen(cmd_grep).readlines() - for i in range(0, len(content), 4): - series["time"][run].append(np.float(content[i].split("=")[2].split()[0])) - series["turb_rms"][run].append(np.float(content[i + 1].split(":")[1])) - try: - turb_energy = np.float(content[i + 2].split(":")[1]) - threshold = self.pp_params.rules.turb_energy_threshold - assert threshold < 0 or abs(turb_energy) < threshold - series["turb_energy"][run].append(turb_energy) - except (ValueError, IndexError, AssertionError): - series["turb_energy"][run].append(np.nan) - return series - - def _from_log(self, keys, extractor): - nums = self.nums - - # Initialize series - series = {} - for key in keys: - series[key] = {} - - for run in self.runs: - # Initialize list - for key in keys: - series[key][run] = [] - - # get one preprocessor - path_run = self.path + "/" + run - - # Get list of run files - log_files = path_run + "/" + self.pp_params.input.log_prefix + "*" - - # Parse files - for log_filename in glob.glob(log_files): - series = extractor(series, log_filename, run) - - # Numpify the lists - for key in series: - series[key][run] = np.array(series[key][run]) - - # Sort the arrays - ind_sort = series["time"][run].argsort() - for key in series: - series[key][run] = series[key][run][ind_sort] - return series - - def _ssfr_from_mass_sink(self, avg_window=None): - """ - avg_window in year - """ - time_unit = self.save.get_node("/series/sinks_from_log/time")._v_attrs.unit - mass_unit = self.save.get_node("/series/sinks_from_log/mass_sink")._v_attrs.unit - # Surface of the box in pc^2 - surface = (self.info["unit_length"].express(cst.pc)) ** 2 - # WARNING : We do not multiply by boxlen since already done in 'unit_length' (pymses) - ssfr = {} - for run in self.runs: - time = self.save.get_node("/series/sinks_from_log/time/" + run).read() - time = time * time_unit.express(cst.year) - mass_sink = self.save.get_node( - "/series/sinks_from_log/mass_sink/" + run - ).read() - mass_sink = mass_sink * mass_unit.express(cst.Msun) - if avg_window is None: - shift = 1 - else: - # We assume that the timestep do not vary a lot ... - shift = np.searchsorted(time, avg_window, side="left") - sfr = (mass_sink[shift:] - mass_sink[:-shift]) / ( - time[shift:] - time[:-shift] - ) - ssfr[run] = np.zeros(len(mass_sink)) - ssfr[run][shift:] = sfr / surface - return ssfr - - def _gen_rule_time_global( - self, - glob_name, - name=None, - glob_group="/globals", - subarray_name=None, - unload_cells=True, - unit=cst.none, - description="", - ): - - if name is None: - name = "time_" + glob_name - - self.rules[name] = Rule( - self, - partial( - self._time_series, - partial( - self.get_global, glob_group + "/" + glob_name, unload_cells=True - ), - ), - group="/series", - unit=unit, - dependencies={"time": None, glob_name: "__parent__"}, - ) - - def _gen_rule_avg(self, rule_src_name, subarray_name=None): - - rule_src = self.rules[rule_src_name] - - if subarray_name is None: - src_name = rule_src_name - group_src = rule_src.group - unit = rule_src.unit - descr = rule_src.description - else: - src_name = subarray_name - group_src = rule_src.group + "/" + rule_src_name - unit = rule_src.unit[subarray_name] - descr = rule_src.description[subarray_name] - - description = { - "runs": "List of runs", - "mean": "Temporal average of {}".format(descr), - "std": "Standard deviation of {}".format(descr), - "median": "Median of {}".format(descr), - } - name = "avg_" + src_name - - def fn(arg=None, **kwargs): - if arg is None: - return self._time_avg(src_name, group=group_src, **kwargs) - else: - return self._time_avg( - src_name + "_" + str(arg), group=group_src, **kwargs - ) - - self.rules[name] = Rule( - self, - fn, - group="/comp", - unit=unit, - description=description, - dependencies=[rule_src_name], - ) - - def def_rules(self): - averageables = ["coldens", "rho", "T", "Q"] - self.rules = { - # Read from log - "sinks_from_log": Rule( - self, - partial( - self._from_log, - ["time", "mass_sink", "nb_sink"], - self._extract_sinks_from_log, - ), - group="/series", - unit={ - "time": self.info["unit_time"], - "mass_sink": cst.Msun, - "nb_sink": cst.none, - }, - description={ - "time": "Time", - "mass_sink": "Total mass of stars", - "nb_sink": "Number of stars", - }, - ), - "issfr": Rule( - self, - self._ssfr_from_mass_sink, - group="/series/sinks_from_log", - unit=cst.ssfr, - description="Instantaneous surfacic star formation rate", - dependencies=["sinks_from_log"], - ), - "sfr_from_log": Rule( - self, - partial(self._from_log, ["time", "sfr"], self._extract_sfr_from_log), - group="/series", - unit={"time": cst.year, "sfr": cst.ssfr}, - description={ - "time": "Time", - "sfr": "Averaged surfacic star formation rate", - }, - ), - "rms_from_log": Rule( - self, - partial( - self._from_log, - ["time", "turb_rms", "turb_energy"], - self._extract_rms_from_log, - ), - group="/series", - unit={ - "time": self.info["unit_time"], - "turb_rms": cst.none, - "turb_energy": cst.none, - }, - description={ - "time": "Time", - "turb_rms": "Computed turbulent RMS", - "turb_energy": "Injected turbulent energy", - }, - ), - # Read from outputs - "time": Rule( - self, - partial( - self._time_series, partial(self.get_global, "/globals/time_num") - ), - group="/series", - unit=self.info["unit_time"], - dependencies=["time_num"], - ), - "time_pdf_slope_coldens": Rule( - self, - partial( - self._time_series, - partial( - self.get_attr, - "slope", - node_name="/hist/pdf_coldens_z", - ), - ), - group="/series", - dependencies={"time": None, "fit_pdf_coldens": "z"}, - ), - # namelist - "nml": Rule( - self, - lambda nml_key: self._comp( - partial(self.get_nml, nml_key), use_num=False - ), - group="/comp", - ), - } - - self._gen_rule_time_global( - "mwa_sigma", "time_sigma", unit=self.info["unit_velocity"] - ) - self._gen_rule_time_global("max_fluct_coldens") - - for name in ["issfr", "time_sigma", "time_pdf_slope_coldens"]: - self._gen_rule_avg(name) - - self._gen_rule_avg("sinks_from_log", "mass_sink") - self._gen_rule_avg("sinks_from_log", "nb_sink") - self._gen_rule_avg("sfr_from_log", "sfr") - - self._gen_rule_avg("rms_from_log", "turb_rms") - self._gen_rule_avg("rms_from_log", "turb_energy") - - super(Comparator, self).def_rules() - - def get_time(path, num): """ Return the time of the output (code units) diff --git a/pp_params.yml b/pp_params.yml index ab1b085..f66d2b5 100644 --- a/pp_params.yml +++ b/pp_params.yml @@ -1,5 +1,4 @@ plot : # Plot parameters - out_ext : '.jpeg' # extension for plots put_title : False # Add a title to plot # Maps @@ -49,10 +48,21 @@ input: # Parameters on how to look for input files (= output from Ramses) out: # Parameters for post processing tag : "" # Tag for the image interactive : False # Interactive mode (do not save the plots on the disk) + ext : '.jpeg' # extension for plots + fmt : "" # Format of the output filename for plots + # The following keys are accepted + # {out} : The output directory (where hdf5 files are also stored) + # {run} : Name of the relevant run + # {num} : Name of the input file (from Ramses) + # {ext} : Extension defined above + # {name} : Name of the rule + # {tag} : Tag defined above + # {nml[nml_key]} : The value of nml_key in the namelist (ex: amr_params/levelmin) + process: # General setting of the post-processor module verbose : True # Give more infos on what is going on num_process : 1 # Number of forks rules: # Specific rules parameters - turb_energy_threshold : 1e13 # Remove invalid data (<0 = no threshold) + turb_energy_threshold : -1 # Remove invalid data (<0 = no threshold) diff --git a/run_selector.py b/run_selector.py index 7dd27e9..ab79e89 100644 --- a/run_selector.py +++ b/run_selector.py @@ -7,6 +7,31 @@ from pp_params import * import f90nml +class NamelistRecursive: + def __init__(self, namelist): + self.data = namelist + + def get_nml_value(self, nml_key): + res = self.data + for key in nml_key.split("/"): + if key in res: + res = res[key] + elif key == nml_key.split("/")[-1]: + res = None + else: + raise KeyError(key) + return res + + def __getitem__(self, key): + return self.get_nml_value(key) + + def __repr__(self): + return self.data.__repr__() + + def __str__(self): + return self.data.__str__() + + class RunSelector: def __init__( self, @@ -26,6 +51,9 @@ class RunSelector: self.namelist = {} self.runs = self.get_runs(in_runs, name_run, namelist_cond, sort_run_by) + if len(self.runs) == 0: + raise ValueError("No runs found") + self.info = {} for run in self.runs: self.info[run] = {} @@ -37,24 +65,36 @@ class RunSelector: for run in self.runs: in_nums[run] = nums_temp - for run in self.runs: + for i, run in enumerate(self.runs): self.nums[run] = self.get_nums(run, in_nums[run], time_min, time_max) def load_namelist(self, run): path_run = self.path_in + "/" + run path_nml = path_run + "/" + self.pp_params.input.nml_filename - return f90nml.read(path_nml) + return NamelistRecursive(f90nml.read(path_nml)) def get_nml_value(self, nml_key, run): - res = self.namelist[run] - for key in nml_key.split("/"): - if key in res: - res = res[key] - elif key == nml_key.split("/")[-1]: - res = None - else: - raise KeyError(key) - return res + return self.namelist[run][nml_key] + + def nml_select(self, runs, namelist_cond): + if type(namelist_cond) == tuple: + namelist_cond = [namelist_cond] + + for (nml_key, operator, operand) in namelist_cond: + value = {} + for run in runs: + value[run] = self.get_nml_value(nml_key, run) + if operator == "=": + runs = filter(lambda r: value[r] == operand, runs) + if operator == "!=": + runs = filter(lambda r: not value[r] == operand, runs) + elif operator == ">": + runs = filter(lambda r: value[r] > operand, runs) + elif operator == "<": + runs = filter(lambda r: value[r] < operand, runs) + elif operator == "in": + runs = filter(lambda r: value[r] in operand, runs) + return runs def get_runs(self, in_runs=None, name_run="*", namelist_cond={}, sort_run_by=None): def try_load_nml(run): @@ -73,23 +113,8 @@ class RunSelector: runs = filter(lambda n: n in runs, in_runs) runs = filter(try_load_nml, runs) - if type(namelist_cond) == tuple: - namelist_cond = [namelist_cond] - - for (nml_key, operator, operand) in namelist_cond: - value = {} - for run in runs: - value[run] = self.get_nml_value(nml_key, run) - if operator == "=": - runs = filter(lambda r: value[r] == operand, runs) - if operator == "!=": - runs = filter(lambda r: not value[r] == operand, runs) - elif operator == ">": - runs = filter(lambda r: value[r] > operand, runs) - elif operator == "<": - runs = filter(lambda r: value[r] < operand, runs) - elif operator == "in": - runs = filter(lambda r: value[r] in operand, runs) + # Select runs that match namelist conditions + runs = self.nml_select(runs, namelist_cond) # Sort by the value in the namelist of sort_run_by if not sort_run_by is None: @@ -145,14 +170,20 @@ class RunSelector: if in_nums == "first": i = 0 - while i < len(nums) - 1 and not try_load_info(nums[i]): + while i < len(nums) and not try_load_info(nums[i]): i = i + 1 - nums = [nums[i]] + if i < len(nums): + nums = [nums[i]] + else: + nums = [] elif in_nums == "last": i = len(nums) - 1 - while i > 0 and not try_load_info(nums[i]): + while i >= 0 and not try_load_info(nums[i]): i = i - 1 - nums = [nums[i]] + if i >= 0: + nums = [nums[i]] + else: + nums = [] else: nums = filter(try_load_info, nums) @@ -160,5 +191,4 @@ class RunSelector: nums = filter(lambda n: self.info[run][n]["time"] >= time_min, nums) if not time_max is None: nums = filter(lambda n: self.info[run][n]["time"] <= time_max, nums) - return nums diff --git a/units.py b/units.py index d185b55..196914a 100644 --- a/units.py +++ b/units.py @@ -12,18 +12,20 @@ def parse_exp_unit(u): return name_u + exp -def convert_exp(number): - splitted = "{:.4g}".format(number).split("e") +def convert_exp(number, digits=4): + # Split string as [coeff, exponent] + splitted = "{num:.{digits}g}".format(num=number, digits=digits).split("e") + # If no need of scientific notation (low number of digits) if len(splitted) == 1: return "${}$".format(splitted[0]) else: - coeff = float(splitted[0]) - exp = int(splitted[1]) - exp_str = "10^{" + str(exp) + "}" - if coeff == 1.0: + coeff = splitted[0] + exp = splitted[1] + exp_str = "10^{" + str(int(exp)) + "}" + if float(coeff) == 1.0: return "$" + exp_str + "$" else: - return "$" + str(coeff) + "\\times" + exp_str + "$" + return "${}\\times {}$".format(coeff, exp_str) def unit_str(unit, base=None, prefix=""): @@ -52,20 +54,17 @@ def unit_str(unit, base=None, prefix=""): return r" [{}{} {}]".format(prefix, unit.coeff, base_str) -cst.coldens = cst.create_unit( - "Msun.pc^-2", base_unit=cst.Msun / cst.pc ** 2, descr="Column density" +cst.Msun_pc3 = cst.create_unit( + "Msun.pc^-3", base_unit=cst.Msun / cst.pc ** 3, descr="Density" ) -cst.km_s = cst.create_unit("km.s^-1", base_unit=cst.km / cst.s, descr="Speed") cst.Msun_pc3 = cst.create_unit( "Msun.pc^-3", base_unit=cst.Msun / cst.pc ** 3, descr="Density" ) -cst.kg_m3 = cst.create_unit("kg.m^-3", base_unit=cst.kg / cst.m ** 3, descr="Density") - cst.ssfr = cst.create_unit( "Msun.yr^-1.pc^-2", base_unit=cst.Msun / cst.year / cst.pc ** 2, descr="Surfacic SFR", - latex="M$_{\odot}$.yr$^{-1}$.p$c^{-2}$", + latex="M$_{\odot}$.yr$^{-1}$.pc$^{-2}$", )