# coding: utf-8 from aggregator import * class Comparator(Aggregator, HDF5Container): """ Do comparaison between outputs and runs """ def __init__( self, path, in_runs, in_nums, path_out=None, pp_params=default_params(), selector=None, tag=None, **kwargs ): """ Creates the basic structures needed for the outputs """ super(Comparator, self).__init__(path, path_out, pp_params, tag) # Open outfile if not self.pp_params.out.tag == "": tag_name = "_" + self.pp_params.out.tag else: tag_name = "" self.filename = path_out + "/comp" + tag_name + ".h5" # Select runs if selector is None: selector = RunSelector(path, in_runs, in_nums, self.pp_params, **kwargs) # Save infos self.path = path self.runs = selector.runs self.nums = selector.nums # Get postprocesor objets for each run and infos on them self.pp = {} self.info = {} for run in self.runs: path_run = path + "/" + run path_out_run = path_out + "/" + run self.pp[run] = {} for num in self.nums[run]: self.pp[run][num] = PostProcessor( path_run, num, path_out=path_out_run, pp_params=self.pp_params ) run0 = self.runs[0] self.info = self.pp[run0][self.nums[run0][0]].info self.namelist = selector.namelist # log info self.log_id = "[comp {}] ".format(self.pp_params.out.tag) # Define rules self.def_rules() def _needs_computation(self, overwrite, name_full): """ Returns True if a new computation of the rule is needed """ if overwrite or not (name_full in self.save): return True elif not "nums" in self.save.get_node(name_full)._v_attrs: return True else: saved_nums = self.save.get_node(name_full)._v_attrs.nums missing_runs = len([run for run in self.nums if not run in saved_nums]) > 0 missing_nums = missing_runs or all( [ len([num for num in self.nums[run] if not num in saved_nums[run]]) > 0 for run in self.nums if run in saved_nums ] ) return missing_nums def _save_data(self, name_full, data, description, unit): super(Comparator, self)._save_data(name_full, data, description, unit) self.save.get_node(name_full)._v_attrs.nums = self.nums def _time_series(self, getter, arg=None): series = {} for run in self.runs: series[run] = [] for i, num in enumerate(self.nums[run]): series[run].append(getter(run, num, arg=arg)) series[run] = np.array(series[run]) return series def _comp(self, getter, use_num=True): prop = [] for i, run in enumerate(self.runs): if use_num: num = self.nums[run][0] prop.append(getter(run, num)) else: prop.append(getter(run)) return np.array(prop) def _time_avg(self, name, start=None, end=None, span=None, group="/series"): serie0 = self.save.get_node(group + "/" + name + "/" + self.runs[0]).read() if len(serie0.shape) > 1: shape = [len(self.runs)] + list(serie0.shape[1:]) else: shape = len(self.runs) mean = np.zeros(shape) median = np.zeros(shape) std = np.zeros(shape) v_min = np.zeros(shape) v_max = np.zeros(shape) q975 = np.zeros(shape) q025 = np.zeros(shape) q16 = np.zeros(shape) q84 = np.zeros(shape) for i, run in enumerate(self.runs): serie = self.save.get_node(group + "/" + name + "/" + run).read() time = self.save.get_node(group + "/time/" + run).read() if len(serie.shape) <= 1: mask = abs(serie) != np.inf if not ((start, end, span) == (None, None, None)): start_r, end_r = start, end # Be sure that start_r and end_r are defined if end_r is None: if span is None or start_r is None: end_r = time[-1] else: end_r = start_r + span if start_r is None: if span is None: start_r = time[0] else: start_r = end_r - span mask = ( mask & (time >= start_r) & (time <= end_r) & np.isfinite(serie) ) serie = serie[mask] mean[i] = np.mean(serie, axis=0) std[i] = np.std(serie, axis=0) ( v_min[i], q025[i], q16[i], median[i], q84[i], q975[i], v_max[i], ) = np.percentile(serie, [0, 2.5, 16, 50, 84, 97.5, 100], axis=0) return { "runs": self.runs, "mean": mean, "std": std, "median": median, "min": v_min, "max": v_max, "q025": q025, "q975": q975, "q16": q16, "q84": q84, } def get_attr(self, attr_name, run, num, node_name="/", arg=None): pp = self.pp[run][num] if not arg is None: node_name = node_name + "_" + str(arg) return pp.get_attribute(node_name, attr_name) def get_pp_value(self, name, run, num, arg=None): pp = self.pp[run][num] if not arg is None: name = name + "_" + str(arg) return pp.get_value(name) def get_global(self, node_name, run, num, arg=None, unload_cells=False): if not arg is None: node_name = node_name + "_" + str(arg) pp = self.pp[run][num] if unload_cells: pp.unload_cells() value = pp.get_value(node_name) return value def get_nml(self, nml_key, run): return self.namelist[run][nml_key] def get_pdf_slope(self, name, run, num): pp = self.pp[run][num] pp.process(["fit_pdf_" + name], ["z"], overwrite=self.overwrite_dep) slope = pp.get_attribute("/hist/pdf_" + name + "_z", "slope") return slope def _extract_sinks_from_log(self, series, log_filename, run): cmd_grep = "sed '/cpu.*/d' {} | grep 'Number of sink' -A 2".format(log_filename) content = os.popen(cmd_grep).readlines() for i in range(0, len(content), 4): try: nb_sink = np.int(content[i].split("=")[1]) mass_sink = np.float(content[i + 1].split("=")[1]) time = np.float(content[i + 2].split("=")[1]) series["nb_sink"][run].append(nb_sink) series["mass_sink"][run].append(mass_sink) series["time"][run].append(time) except (ValueError, IndexError): self._log( "Error encountered in parsing {} (grepped block {})".format( log_filename, i ), "WARNING", ) return series def _extract_sfr_from_log(self, series, log_filename, run): cmd_grep = "grep '\[SFR' {} ".format(log_filename) content = os.popen(cmd_grep).readlines() for i in range(0, len(content)): time = np.float(content[i].split("]")[0].split("=")[1].split()[0]) sfr = np.float(content[i].split("]")[1].split("=")[1].split()[0]) series["time"][run].append(time) series["sfr"][run].append(sfr) return series def _extract_cons_from_log(self, series, log_filename, run): cmd_grep = "grep 'Main step' {} -A 2".format(log_filename) content = os.popen(cmd_grep).readlines() for i in range(0, len(content), 4): series["time"][run].append( np.float(content[i + 2].split("=")[2].split()[0]) ) series["step"][run].append(np.int(content[i].split("=")[1].split()[0])) series["mcons"][run].append(np.float(content[i].split("=")[2].split()[0])) series["econs"][run].append(np.float(content[i].split("=")[3].split()[0])) series["epot"][run].append(np.float(content[i].split("=")[4].split()[0])) series["ekin"][run].append(np.float(content[i].split("=")[5].split()[0])) series["eint"][run].append(np.float(content[i].split("=")[6].split()[0])) series["emag"][run].append( np.float(content[i + 1].split("=")[1].split()[0]) ) return series def _extract_rms_from_log(self, series, log_filename, run): cmd_grep = "grep 'turbulent rms' {} -C 1".format(log_filename) content = os.popen(cmd_grep).readlines() for i in range(0, len(content), 4): series["time"][run].append(np.float(content[i].split("=")[2].split()[0])) series["dt"][run].append(np.float(content[i].split("=")[3].split()[0])) series["turb_rms"][run].append(np.float(content[i + 1].split(":")[1])) try: turb_energy = np.float(content[i + 2].split(":")[1]) threshold = self.pp_params.rules.turb_energy_threshold assert threshold < 0 or abs(turb_energy) < threshold series["turb_energy"][run].append(abs(turb_energy)) except (AssertionError, ValueError, IndexError): series["turb_energy"][run].append(np.nan) return series def _from_log(self, keys, extractor): nums = self.nums # Initialize series series = {} for key in keys: series[key] = {} for run in self.runs: # Initialize list for key in keys: series[key][run] = [] # get one preprocessor path_run = self.path + "/" + run # Get list of run files log_files = path_run + "/" + self.pp_params.input.log_prefix + "*" # Parse files for log_filename in glob.glob(log_files): series = extractor(series, log_filename, run) # Numpify the lists for key in series: series[key][run] = np.array(series[key][run]) # Sort the arrays ind_sort = series["time"][run].argsort() for key in series: series[key][run] = series[key][run][ind_sort] return series def _ssfr_from_mass_sink(self, avg_window=None): """ avg_window in year """ time_unit = self.save.get_node("/series/sinks_from_log/time")._v_attrs.unit mass_unit = self.save.get_node("/series/sinks_from_log/mass_sink")._v_attrs.unit ssfr = {} for run in self.runs: # Surface of the box in pc^2 info = self.pp[run][self.nums[run][0]].info surface = (info["unit_length"].express(cst.pc)) ** 2 # WARNING : We do not multiply by boxlen since already done in 'unit_length' (pymses) time = self.save.get_node("/series/sinks_from_log/time/" + run).read() time = time * time_unit.express(cst.year) mass_sink = self.save.get_node( "/series/sinks_from_log/mass_sink/" + run ).read() mass_sink = mass_sink * mass_unit.express(cst.Msun) if avg_window is None: shift = 1 else: # We assume that the timestep do not vary a lot ... shift = np.searchsorted(time, time[-1] - avg_window, side="right") shift = len(time) - shift sfr = (mass_sink[shift:] - mass_sink[:-shift]) / ( time[shift:] - time[:-shift] ) sfr_beg = (mass_sink[:shift] - mass_sink[0]) / (time[:shift] - time[0]) ssfr[run] = np.zeros(len(mass_sink)) ssfr[run][shift:] = sfr / surface ssfr[run][:shift] = sfr_beg / surface return ssfr, {"avg_window": avg_window} def _surfacic_sink_mass(self): mass_unit = self.save.get_node("/series/sinks_from_log/mass_sink")._v_attrs.unit ssm = {} for run in self.runs: # Surface of the box in pc^2 info = self.pp[run][self.nums[run][0]].info surface = (info["unit_length"].express(cst.pc)) ** 2 mass_sink = self.save.get_node( "/series/sinks_from_log/mass_sink/" + run ).read() mass_sink = mass_sink * mass_unit.express(cst.Msun) ssm[run] = mass_sink / surface return ssm def _turb_power(self): turb_power = {} for run in self.runs: dt = self.save.get_node("/series/rms_from_log/dt/" + run).read() # Energy injected at each timestep energy = self.save.get_node( "/series/rms_from_log/turb_energy/" + run ).read() # Power of the turbulence at this step in Watts turb_power[run] = energy / dt return turb_power def _gen_rule_time_global( self, glob_name, name=None, glob_group="/globals", subarray_name=None, unload_cells=True, unit=cst.none, description="", ): if name is None: name = "time_" + glob_name self.rules[name] = Rule( self, partial( self._time_series, partial( self.get_global, glob_group + "/" + glob_name, unload_cells=True ), ), group="/series", unit=unit, dependencies={"time": None, glob_name: "__parent__"}, ) def _gen_rule_avg(self, rule_src_name, subarray_name=None, name=None): rule_src = self.rules[rule_src_name] if subarray_name is None: src_name = rule_src_name group_src = rule_src.group unit = rule_src.unit descr = rule_src.description else: src_name = subarray_name group_src = rule_src.group + "/" + rule_src_name unit = rule_src.unit[subarray_name] descr = rule_src.description[subarray_name] description = { "runs": "List of runs", "mean": "Temporal average of {}".format(descr), "std": "Standard deviation of {}".format(descr), "median": "Median of {}".format(descr), "max": "Maximum of {}".format(descr), "min": "Minimum of {}".format(descr), "q025": "2.5 percentile of {}".format(descr), "q975": "97.5 percentile of {}".format(descr), "q16": "16 percentile of {}".format(descr), "q84": "84 percentile of {}".format(descr), } units = unit if name is None: name = "avg_" + src_name def fn(arg=None, **kwargs): if arg is None: return self._time_avg(src_name, group=group_src, **kwargs) else: return self._time_avg( src_name + "_" + str(arg), group=group_src, **kwargs ) self.rules[name] = Rule( self, fn, group="/comp", unit=units, description=description, dependencies=[rule_src_name], ) def def_rules(self): averageables = ["coldens", "rho", "T", "Q"] self.rules = { # Read from log "sinks_from_log": Rule( self, partial( self._from_log, ["time", "mass_sink", "nb_sink"], self._extract_sinks_from_log, ), group="/series", unit={"time": "unit_time", "mass_sink": cst.Msun, "nb_sink": cst.none}, description={ "time": "Time", "mass_sink": "Total mass of stars", "nb_sink": "Number of stars", }, ), "issfr": Rule( self, self._ssfr_from_mass_sink, group="/series/sinks_from_log", unit=cst.ssfr, description="Instantaneous surfacic star formation rate", dependencies=["sinks_from_log"], ), "ssm": Rule( self, self._surfacic_sink_mass, group="/series/sinks_from_log", unit=cst.Msun / cst.pc ** 2, description="Surfacic sink mass", dependencies=["sinks_from_log"], ), "sfr_from_log": Rule( self, partial(self._from_log, ["time", "sfr"], self._extract_sfr_from_log), group="/series", unit={"time": cst.year, "sfr": cst.ssfr}, description={ "time": "Time", "sfr": "Averaged surfacic star formation rate", }, ), "rms_from_log": Rule( self, partial( self._from_log, ["time", "dt", "turb_rms", "turb_energy"], self._extract_rms_from_log, ), group="/series", unit={ "time": "unit_time", "dt": "unit_time", "turb_rms": cst.none, "turb_energy": { "unit_length": 3, "unit_velocity": 2, "unit_density": 1, }, }, description={ "time": "Time", "dt": "Timestep", "turb_rms": "Computed turbulent RMS", "turb_energy": "Injected turbulent energy", }, ), "cons_from_log": Rule( self, partial( self._from_log, ["time", "step", "mcons", "econs", "epot", "ekin", "eint", "emag"], self._extract_cons_from_log, ), group="/series", unit={ "time": "unit_time", "step": cst.none, "mcons": cst.none, "econs": cst.none, "epot": cst.none, # TODO find unit "ekin": cst.none, "eint": cst.none, "emag": cst.none, }, ), "turb_power": Rule( self, self._turb_power, group="/series/rms_from_log", unit={ "unit_length": 3, "unit_velocity": 2, "unit_density": 1, "unit_time": -1, }, description="Injected turbulent power", dependencies=["rms_from_log"], ), # Read from outputs "time": Rule( self, partial( self._time_series, partial(self.get_global, "/globals/time_num") ), group="/series", unit="unit_time", dependencies=["time_num"], ), "time_rho_prof": Rule( self, partial( self._time_series, partial(self.get_pp_value, "/profile/rho_prof") ), group="/series", dependencies={"time": None, "rho_prof": "__parent__"}, ), "time_coldens_pdf": Rule( self, partial( self._time_series, partial(self.get_pp_value, "/hist/pdf_coldens") ), group="/series", dependencies={"time": None, "pdf_coldens": "__parent__"}, ), "time_pdf_slope_coldens": Rule( self, partial( self._time_series, partial( self.get_attr, "slope", node_name="/hist/pdf_coldens_z", ), ), group="/series", dependencies={"time": None, "fit_pdf_coldens": "z"}, ), # namelist "nml": Rule( self, lambda nml_key: self._comp( partial(self.get_nml, nml_key), use_num=False ), group="/comp", ), } self._gen_rule_time_global("mwa_sigma", "time_sigma", unit="unit_velocity") self._gen_rule_time_global("max_fluct_coldens") self._gen_rule_time_global( "mass", unit=self.info["unit_density"] * self.info["unit_length"] ** 3 ) self._gen_rule_time_global("mwa_B_int", unit="unit_mag") for name in [ "issfr", "time_sigma", "time_pdf_slope_coldens", "turb_power", "time_rho_prof", "time_coldens_pdf", ]: self._gen_rule_avg(name) self._gen_rule_avg("sinks_from_log", "mass_sink") self._gen_rule_avg("sinks_from_log", "nb_sink") self._gen_rule_avg("sfr_from_log", "sfr") self._gen_rule_avg("rms_from_log", "turb_rms") self._gen_rule_avg("rms_from_log", "turb_energy") super(Comparator, self).def_rules()