# coding: utf-8 import os import glob import numpy as np from functools import partial from scipy.stats import linregress from baseprocessor import Rule, HDF5Container from aggregator import Aggregator from postprocessor import PostProcessor from run_selector import RunSelector from pp_params import default_params from units import U class Comparator(Aggregator, HDF5Container): """ Do comparaison between outputs and runs """ def __init__( self, path, in_runs, in_nums, path_out=None, pp_params=default_params(), selector=None, tag=None, unit_time=U.year, **kwargs ): """ Creates the basic structures needed for the outputs """ super(Comparator, self).__init__(path, path_out, pp_params, tag) # Open outfile if not self.pp_params.out.tag == "": tag_name = "_" + self.pp_params.out.tag else: tag_name = "" self.filename = path_out + "/comp" + tag_name + ".h5" # Select runs if selector is None: selector = RunSelector(path, in_runs, in_nums, self.pp_params, **kwargs) # Save infos self.path = path self.runs = selector.runs self.nums = selector.nums # Get postprocesor objets for each run and infos on them self.pp = {} self.info = {} for run in self.runs: path_run = path + "/" + run path_out_run = path_out + "/" + run self.pp[run] = {} for num in self.nums[run]: self.pp[run][num] = PostProcessor( path_run, num, path_out=path_out_run, pp_params=self.pp_params, unit_time=unit_time, ) run0 = self.runs[0] self.info = self.pp[run0][self.nums[run0][0]].info self.namelist = selector.namelist # log info self.log_id = "[comp {}] ".format(self.pp_params.out.tag) # Define rules self.def_rules() def _needs_computation(self, overwrite, name_full): """ Returns True if a new computation of the rule is needed """ if overwrite or not (name_full in self.save): return True elif "nums" not in self.save.get_node(name_full)._v_attrs: return True else: saved_nums = self.save.get_node(name_full)._v_attrs.nums missing_runs = len([run for run in self.nums if run not in saved_nums]) > 0 missing_nums = missing_runs or all( [ len([num for num in self.nums[run] if num not in saved_nums[run]]) > 0 for run in self.nums if run in saved_nums ] ) return missing_nums def _save_data(self, name_full, data, description, unit): super(Comparator, self)._save_data(name_full, data, description, unit) self.save.get_node(name_full)._v_attrs.nums = self.nums def _time_series(self, getter, arg=None): series = {} for run in self.runs: series[run] = [] for i, num in enumerate(self.nums[run]): series[run].append(getter(run, num, arg=arg)) series[run] = np.array(series[run]) return series def _comp(self, getter, use_num=True): prop = [] for i, run in enumerate(self.runs): if use_num: num = self.nums[run][0] prop.append(getter(run, num)) else: prop.append(getter(run)) return np.array(prop) def _time_avg(self, name, start=None, end=None, span=None, group="/series"): serie0 = self.save.get_node(group + "/" + name + "/" + self.runs[0]).read() if len(serie0.shape) > 1: shape = [len(self.runs)] + list(serie0.shape[1:]) else: shape = len(self.runs) mean = np.zeros(shape) median = np.zeros(shape) std = np.zeros(shape) v_min = np.zeros(shape) v_max = np.zeros(shape) q975 = np.zeros(shape) q025 = np.zeros(shape) q16 = np.zeros(shape) q84 = np.zeros(shape) for i, run in enumerate(self.runs): serie = self.save.get_node(group + "/" + name + "/" + run).read() time = self.save.get_node(group + "/time/" + run).read() if len(serie.shape) <= 1: mask = abs(serie) != np.inf if not ((start, end, span) == (None, None, None)): start_r, end_r = start, end # Be sure that start_r and end_r are defined if end_r is None: if span is None or start_r is None: end_r = time[-1] else: end_r = start_r + span if start_r is None: if span is None: start_r = time[0] else: start_r = end_r - span mask = ( mask & (time >= start_r) & (time <= end_r) & np.isfinite(serie) ) serie = serie[mask] mean[i] = np.mean(serie, axis=0) std[i] = np.std(serie, axis=0) ( v_min[i], q025[i], q16[i], median[i], q84[i], q975[i], v_max[i], ) = np.percentile(serie, [0, 2.5, 16, 50, 84, 97.5, 100], axis=0) return { "runs": self.runs, "mean": mean, "std": std, "median": median, "min": v_min, "max": v_max, "q025": q025, "q975": q975, "q16": q16, "q84": q84, } def get_attr(self, attr_name, run, num, node_name="/", arg=None): pp = self.pp[run][num] if arg is not None: node_name = node_name + "_" + str(arg) return pp.get_attribute(node_name, attr_name) def get_pp_value(self, name, run, num, arg=None): pp = self.pp[run][num] if arg is not None: name = name + "_" + str(arg) return pp.get_value(name) def get_global(self, node_name, run, num, arg=None, unload_cells=False): if arg is not None: node_name = node_name + "_" + str(arg) pp = self.pp[run][num] if unload_cells: pp.unload_cells() value = pp.get_value(node_name) return value def get_nml(self, nml_key, run): return self.namelist[run][nml_key] def get_pdf_slope(self, name, run, num): pp = self.pp[run][num] pp.process(["fit_pdf_" + name], ["z"], overwrite=self.overwrite_dep) slope = pp.get_attribute("/hist/pdf_" + name + "_z", "slope") return slope def _extract_sinks_from_log(self, series, log_filename, run): cmd_grep = "sed '/cpu.*/d' {} | grep 'Number of sink' -A 2".format(log_filename) content = os.popen(cmd_grep).readlines() for i in range(0, len(content), 4): try: nb_sink = np.int(content[i].split("=")[1]) mass_sink = np.float(content[i + 1].split("=")[1]) time = np.float(content[i + 2].split("=")[1]) series["nb_sink"][run].append(nb_sink) series["mass_sink"][run].append(mass_sink) series["time"][run].append(time) except (ValueError, IndexError): self._log( "Error encountered in parsing {} (grepped block {})".format( log_filename, i ), "WARNING", ) return series def _extract_sfr_from_log(self, series, log_filename, run): cmd_grep = "grep '\[SFR' {} ".format(log_filename) content = os.popen(cmd_grep).readlines() for i in range(0, len(content)): time = np.float(content[i].split("]")[0].split("=")[1].split()[0]) sfr = np.float(content[i].split("]")[1].split("=")[1].split()[0]) series["time"][run].append(time) series["sfr"][run].append(sfr) return series def _extract_cons_from_log(self, series, log_filename, run): rism = self.pp_params.input.ramses_ism nlines = 2 + int(rism) # Number of useful lines cmd_grep = "grep 'Main step' {} -A {}".format(log_filename, nlines - 1) content = os.popen(cmd_grep).readlines() for i in range(0, len(content), nlines + 1): series["time"][run].append( np.float(content[i + nlines - 1].split("=")[2].split()[0]) ) series["step"][run].append(np.int(content[i].split("=")[1].split()[0])) series["mcons"][run].append(np.float(content[i].split("=")[2].split()[0])) series["econs"][run].append(np.float(content[i].split("=")[3].split()[0])) series["epot"][run].append(np.float(content[i].split("=")[4].split()[0])) series["ekin"][run].append(np.float(content[i].split("=")[5].split()[0])) if rism: eint = np.float(content[i].split("=")[6].split()[0]) emag = np.float(content[i + 1].split("=")[1].split()[0]) else: eint = 0.0 emag = 0.0 series["eint"][run].append(eint) series["emag"][run].append(emag) return series def _extract_rms_from_log(self, series, log_filename, run): cmd_grep = "grep 'turbulent rms' {} -C 1".format(log_filename) content = os.popen(cmd_grep).readlines() for i in range(0, len(content), 4): series["time"][run].append(np.float(content[i].split("=")[2].split()[0])) series["dt"][run].append(np.float(content[i].split("=")[3].split()[0])) series["turb_rms"][run].append(np.float(content[i + 1].split(":")[1])) try: turb_energy = np.float(content[i + 2].split(":")[1]) threshold = self.pp_params.rules.turb_energy_threshold assert threshold < 0 or abs(turb_energy) < threshold series["turb_energy"][run].append(abs(turb_energy)) except (AssertionError, ValueError, IndexError): series["turb_energy"][run].append(np.nan) return series def _from_log(self, keys, extractor): # Initialize series series = {} for key in keys: series[key] = {} for run in self.runs: # Initialize list for key in keys: series[key][run] = [] # get one preprocessor path_run = self.path + "/" + run # Get list of run files log_files = path_run + "/" + self.pp_params.input.log_prefix + "*" # Parse files for log_filename in glob.glob(log_files): series = extractor(series, log_filename, run) # Numpify the lists for key in series: series[key][run] = np.array(series[key][run]) # Sort the arrays ind_sort = series["time"][run].argsort() for key in series: series[key][run] = series[key][run][ind_sort] return series def _ssfr_from_mass_sink(self, avg_window=None): """ avg_window in year """ time_unit = self.save.get_node("/series/sinks_from_log/time")._v_attrs.unit mass_unit = self.save.get_node("/series/sinks_from_log/mass_sink")._v_attrs.unit ssfr = {} for run in self.runs: # Surface of the box in pc^2 info = self.pp[run][self.nums[run][0]].info surface = (info["unit_length"].express(U.pc)) ** 2 # WARNING : We do not multiply by boxlen since already done in 'unit_length' (pymses) time = self.save.get_node("/series/sinks_from_log/time/" + run).read() time = time * time_unit.express(U.year) mass_sink = self.save.get_node( "/series/sinks_from_log/mass_sink/" + run ).read() mass_sink = mass_sink * mass_unit.express(U.Msun) if avg_window is None: shift = 1 else: # We assume that the timestep do not vary a lot ... shift = np.searchsorted(time, time[-1] - avg_window, side="right") shift = len(time) - shift sfr = (mass_sink[shift:] - mass_sink[:-shift]) / ( time[shift:] - time[:-shift] ) sfr_beg = (mass_sink[:shift] - mass_sink[0]) / (time[:shift] - time[0]) ssfr[run] = np.zeros(len(mass_sink)) ssfr[run][shift:] = sfr / surface ssfr[run][:shift] = sfr_beg / surface return ssfr, {"avg_window": avg_window} def _surfacic_sink_mass(self): mass_unit = self.save.get_node("/series/sinks_from_log/mass_sink")._v_attrs.unit ssm = {} for run in self.runs: # Surface of the box in pc^2 info = self.pp[run][self.nums[run][0]].info surface = (info["unit_length"].express(U.pc)) ** 2 mass_sink = self.save.get_node( "/series/sinks_from_log/mass_sink/" + run ).read() mass_sink = mass_sink * mass_unit.express(U.Msun) ssm[run] = mass_sink / surface return ssm def _turb_power(self): turb_power = {} for run in self.runs: dt = self.save.get_node("/series/rms_from_log/dt/" + run).read() # Energy injected at each timestep energy = self.save.get_node( "/series/rms_from_log/turb_energy/" + run ).read() # Power of the turbulence at this step in Watts turb_power[run] = energy / dt return turb_power def _sbeta_onavg(self): """ [ismfeed] Compute the slope of the Sigma pdf as a function of the value of beta """ col_pdf = self.get_value("/comp/avg_time_coldens_pdf_z") beta = self.get_value("/comp/nml_cloud_params/beta_cool") slope = np.zeros(len(col_pdf["runs"])) origin = np.zeros(len(col_pdf["runs"])) stderr = np.zeros(len(col_pdf["runs"])) for i, run in enumerate(col_pdf["runs"]): values, centers = col_pdf["mean"][i] mask_fit = ( (centers > self.pp_params.pdf.xmin_fit) & (centers < self.pp_params.pdf.xmax_fit) & (values > np.max(values) * self.pp_params.pdf.fit_cut) ) (slope[i], origin[i], correlation, _, stderr[i]) = linregress( centers[mask_fit], np.log10(values[mask_fit]) ) return {"beta": beta, "slope": slope, "origin": origin, "stderr": stderr} def _gen_rule_time_global( self, glob_name, name=None, glob_group="/globals", subarray_name=None, unload_cells=True, unit=U.none, description="", ): if name is None: name = "time_" + glob_name self.rules[name] = Rule( self, partial( self._time_series, partial( self.get_global, glob_group + "/" + glob_name, unload_cells=True ), ), group="/series", unit=unit, dependencies={"time": None, glob_name: "__parent__"}, ) def _gen_rule_avg(self, rule_src_name, subarray_name=None, name=None): rule_src = self.rules[rule_src_name] if subarray_name is None: src_name = rule_src_name group_src = rule_src.group unit = rule_src.unit descr = rule_src.description else: src_name = subarray_name group_src = rule_src.group + "/" + rule_src_name unit = rule_src.unit[subarray_name] descr = rule_src.description[subarray_name] description = { "runs": "List of runs", "mean": "Temporal average of {}".format(descr), "std": "Standard deviation of {}".format(descr), "median": "Median of {}".format(descr), "max": "Maximum of {}".format(descr), "min": "Minimum of {}".format(descr), "q025": "2.5 percentile of {}".format(descr), "q975": "97.5 percentile of {}".format(descr), "q16": "16 percentile of {}".format(descr), "q84": "84 percentile of {}".format(descr), } units = unit if name is None: name = "avg_" + src_name def fn(arg=None, **kwargs): if arg is None: return self._time_avg(src_name, group=group_src, **kwargs) else: return self._time_avg( src_name + "_" + str(arg), group=group_src, **kwargs ) self.rules[name] = Rule( self, fn, group="/comp", unit=units, description=description, dependencies=[rule_src_name], ) def def_rules(self): self.rules = { # Read from log "sinks_from_log": Rule( self, partial( self._from_log, ["time", "mass_sink", "nb_sink"], self._extract_sinks_from_log, ), group="/series", unit={"time": "unit_time", "mass_sink": U.Msun, "nb_sink": U.none}, description={ "time": "Time", "mass_sink": "Total mass of stars", "nb_sink": "Number of stars", }, ), "issfr": Rule( self, self._ssfr_from_mass_sink, group="/series/sinks_from_log", unit=U.ssfr, description="Instantaneous surfacic star formation rate", dependencies=["sinks_from_log"], ), "ssm": Rule( self, self._surfacic_sink_mass, group="/series/sinks_from_log", unit=U.Msun / U.pc ** 2, description="Surfacic sink mass", dependencies=["sinks_from_log"], ), "sfr_from_log": Rule( self, partial(self._from_log, ["time", "sfr"], self._extract_sfr_from_log), group="/series", unit={"time": U.year, "sfr": U.ssfr}, description={ "time": "Time", "sfr": "Averaged surfacic star formation rate", }, ), "rms_from_log": Rule( self, partial( self._from_log, ["time", "dt", "turb_rms", "turb_energy"], self._extract_rms_from_log, ), group="/series", unit={ "time": "unit_time", "dt": "unit_time", "turb_rms": U.none, "turb_energy": { "unit_length": 3, "unit_velocity": 2, "unit_density": 1, }, }, description={ "time": "Time", "dt": "Timestep", "turb_rms": "Computed turbulent RMS", "turb_energy": "Injected turbulent energy", }, ), "cons_from_log": Rule( self, partial( self._from_log, ["time", "step", "mcons", "econs", "epot", "ekin", "eint", "emag"], self._extract_cons_from_log, ), group="/series", unit={ "time": "unit_time", "step": U.none, "mcons": U.none, "econs": U.none, "epot": U.none, # TODO find unit "ekin": U.none, "eint": U.none, "emag": U.none, }, ), "turb_power": Rule( self, self._turb_power, group="/series/rms_from_log", unit={ "unit_length": 3, "unit_velocity": 2, "unit_density": 1, "unit_time": -1, }, description="Injected turbulent power", dependencies=["rms_from_log"], ), # Read from outputs "time": Rule( self, partial( self._time_series, partial(self.get_global, "/globals/time_num") ), group="/series", unit="unit_time", dependencies=["time_num"], ), "time_rho_prof": Rule( self, partial( self._time_series, partial(self.get_pp_value, "/profile/rho_prof") ), group="/series", dependencies={"time": None, "rho_prof": "__parent__"}, ), "time_coldens_pdf": Rule( self, partial( self._time_series, partial(self.get_pp_value, "/hist/pdf_coldens") ), group="/series", dependencies={"time": None, "pdf_coldens": "__parent__"}, ), "time_pdf_slope_coldens": Rule( self, partial( self._time_series, partial( self.get_attr, "slope", node_name="/hist/pdf_coldens_z", ), ), group="/series", dependencies={"time": None, "fit_pdf_coldens": "z"}, ), "sbeta_onavg": Rule( self, partial(self._sbeta_onavg), group="/comp", dependencies={ "avg_time_coldens_pdf": "z", "nml": "cloud_params/beta_cool", }, ), # namelist "nml": Rule( self, lambda nml_key: self._comp( partial(self.get_nml, nml_key), use_num=False ), group="/comp", ), } self._gen_rule_time_global("mwa_sigma", "time_sigma", unit="unit_velocity") self._gen_rule_time_global("max_fluct_coldens") self._gen_rule_time_global( "mass", unit=self.info["unit_density"] * self.info["unit_length"] ** 3 ) self._gen_rule_time_global("mwa_B_int", unit="unit_mag") for name in [ "issfr", "time_sigma", "time_pdf_slope_coldens", "turb_power", "time_rho_prof", "time_coldens_pdf", ]: self._gen_rule_avg(name) self._gen_rule_avg("sinks_from_log", "mass_sink") self._gen_rule_avg("sinks_from_log", "nb_sink") self._gen_rule_avg("sfr_from_log", "sfr") self._gen_rule_avg("rms_from_log", "turb_rms") self._gen_rule_avg("rms_from_log", "turb_energy") super(Comparator, self).def_rules()