# coding: utf-8 import os from distutils.file_util import copy_file import glob import numpy as np from functools import partial from scipy.stats import linregress from baseprocessor import Rule, HDF5Container from aggregator import Aggregator from snapshotprocessor import SnapshotProcessor from run_selector import RunSelector from params import default_params from units import U class StudyProcessor(Aggregator, HDF5Container): """ This object is linked to several ramses simulation of a same study """ def __init__( self, path, runs, nums, path_out=None, params=default_params(), selector=None, tag=None, unit_time=U.year, **kwargs, ): """ Creates the basic structures needed for the outputs """ super(StudyProcessor, self).__init__(path, path_out, params, tag) # Open outfile if not self.params.out.tag == "": tag_name = "_" + self.params.out.tag else: tag_name = "" if self.params.out.ext_subfolder: self.filename = f"{self.path_out}/h5/study{tag_name}.h5" else: self.filename = f"{self.path_out}/study{tag_name}.h5" os.makedirs(os.path.dirname(self.filename), exist_ok=True) # Select runs if selector is None: selector = RunSelector( path, runs, nums, self.params.input.nml_filename, unit_time=unit_time, **kwargs ) # Save infos self.path = path self.runs = selector.runs self.nums = selector.nums # Get postprocesor objets for each run and infos on them self.snaps = {} self.info = {} for run in self.runs: path_run = path + "/" + run path_out_run = self.path_out + "/" + run self.snaps[run] = {} for num in self.nums[run]: self.snaps[run][num] = SnapshotProcessor( path_run, num, selector=selector, path_out=path_out_run, params=self.params, unit_time=unit_time, ) run0 = self.runs[0] self.info = selector.info[run0][self.nums[run0][0]] self.namelist = selector.namelist # Save namelist and logs if self.params.out.copy_info: for run in self.runs: nml_src = f"{self.path}/{run}/{self.params.input.nml_filename}" nml_dest = f"{self.path_out}/{run}/{self.params.input.nml_filename}" copy_file(nml_src, nml_dest, update=1) logs = self.get_logs(run) os.makedirs(f"{self.path_out}/{run}/logs", exist_ok=True) for log in logs: dest = f"{self.path_out}/{run}/logs/{os.path.basename(log)}" copy_file(log, dest, update=1) # log info self.log_id = "[study {}] ".format(self.params.out.tag) # Define rules self.def_rules() def _needs_computation(self, overwrite, name_full): """ Returns True if a new computation of the rule is needed """ if overwrite or not (name_full in self.save): return True elif "nums" not in self.save.get_node(name_full)._v_attrs: return True else: saved_nums = self.save.get_node(name_full)._v_attrs.nums missing_runs = len([run for run in self.nums if run not in saved_nums]) > 0 missing_nums = missing_runs or all( [ len([num for num in self.nums[run] if num not in saved_nums[run]]) > 0 for run in self.nums if run in saved_nums ] ) return missing_nums def _save_data(self, name_full, data, description, unit): super(StudyProcessor, self)._save_data(name_full, data, description, unit) self.save.get_node(name_full)._v_attrs.nums = self.nums def _time_series(self, getter, arg=None): series = {} for run in self.runs: series[run] = [] for i, num in enumerate(self.nums[run]): series[run].append(getter(run, num, arg=arg)) series[run] = np.array(series[run], dtype=float) return series def _compare(self, getter, use_num=True): prop = [] for i, run in enumerate(self.runs): if use_num: num = self.nums[run][0] prop.append(getter(run, num)) else: prop.append(getter(run)) return np.array(prop) def _time_avg(self, name, start=None, end=None, span=None, group="/series"): serie0 = self.save.get_node(group + "/" + name + "/" + self.runs[0]).read() if len(serie0.shape) > 1: shape = [len(self.runs)] + list(serie0.shape[1:]) else: shape = len(self.runs) mean = np.zeros(shape) median = np.zeros(shape) std = np.zeros(shape) v_min = np.zeros(shape) v_max = np.zeros(shape) q975 = np.zeros(shape) q025 = np.zeros(shape) q16 = np.zeros(shape) q84 = np.zeros(shape) for i, run in enumerate(self.runs): serie = self.save.get_node(group + "/" + name + "/" + run).read() time = self.save.get_node(group + "/time/" + run).read() if len(serie.shape) <= 1: mask = abs(serie) != np.inf if not ((start, end, span) == (None, None, None)): start_r, end_r = start, end # Be sure that start_r and end_r are defined if end_r is None: if span is None or start_r is None: end_r = time[-1] else: end_r = start_r + span if start_r is None: if span is None: start_r = time[0] else: start_r = end_r - span mask = ( mask & (time >= start_r) & (time <= end_r) & np.isfinite(serie) ) serie = serie[mask] mean[i] = np.mean(serie, axis=0) std[i] = np.std(serie, axis=0) ( v_min[i], q025[i], q16[i], median[i], q84[i], q975[i], v_max[i], ) = np.percentile(serie, [0, 2.5, 16, 50, 84, 97.5, 100], axis=0) return { "runs": self.runs, "mean": mean, "std": std, "median": median, "min": v_min, "max": v_max, "q025": q025, "q975": q975, "q16": q16, "q84": q84, } def get_attr(self, attr_name, run, num, node_name="/", arg=None): snap = self.snaps[run][num] if arg is not None: node_name = node_name + "_" + str(arg) return snap.get_attribute(node_name, attr_name) def get_snap_value(self, name, run, num, arg=None): snap = self.snaps[run][num] if arg is not None: name = name + "_" + str(arg) return snap.get_value(name) def get_global(self, node_name, run, num, arg=None, unload_cells=False): if arg is not None: node_name = node_name + "_" + str(arg) snap = self.snaps[run][num] if unload_cells: snap.unload_cells() value = snap.get_value(node_name) return value def get_nml(self, nml_key, run): return self.namelist[run][nml_key] def get_pdf_slope(self, name, run, num): snap = self.snaps[run][num] snap.process(["fit_pdf_" + name], ["z"], overwrite=self.overwrite_dep) slope = snap.get_attribute("/hist/pdf_" + name + "_z", "slope") return slope def _extract_sinks_from_log(self, series, log_filename, run): cmd_grep = f"grep 'Number of sink' {log_filename} -A 2" content = os.popen(cmd_grep).readlines() for i in range(0, len(content), 4): try: nb_sink = np.int(content[i].split("=")[1]) mass_sink = np.float(content[i + 1].split("=")[1]) time = np.float(content[i + 2].split("=")[1]) timekey = content[i + 2].split("=")[0] series["nb_sink"][run].append(nb_sink) series["mass_sink"][run].append(mass_sink) if "[yr]" not in timekey: time *= self.info["unit_time"].express(U.year) series["time"][run].append(time) except (ValueError, IndexError): self._log( "Error encountered in parsing {} (grepped block {})".format( log_filename, i ), "WARNING", ) return series def _extract_stellar_from_log(self, stellar_objects, log_filename, run): cmd_grep = f"grep stellar {log_filename} -n" content = os.popen(cmd_grep).readlines() nb_stellar = list(map(lambda s: int(s.split()[1]), content)) line_numbers = list(map(lambda s: int(s.split(":")[0]), content)) current_line = 0 logfile = open(log_filename) for i in range(0, len(line_numbers)): try: while current_line < line_numbers[i] + 3: logfile.readline() current_line += 1 for j in range(nb_stellar[i]): line_stellar = logfile.readline().split() current_line += 1 while line_stellar[0] == "random": # random number outputs are ... random line_stellar = logfile.readline().split() current_line += 1 mass = float(line_stellar[3]) time = float(line_stellar[4]) lifetime = float(line_stellar[5]) id = int(line_stellar[6]) stellar_objects["mass"][run].append(mass) stellar_objects["time"][run].append(time) stellar_objects["lifetime"][run].append(lifetime) stellar_objects["id"][run].append(id) except (ValueError, IndexError): self._log( "[stellar] Error encountered in parsing {} (grepped block {})".format( log_filename, i ), "WARNING", ) logfile.close() return stellar_objects def _extract_sfr_from_log(self, series, log_filename, run): cmd_grep = "grep '\[SFR' {} ".format(log_filename) content = os.popen(cmd_grep).readlines() for i in range(0, len(content)): time = np.float(content[i].split("]")[0].split("=")[1].split()[0]) sfr = np.float(content[i].split("]")[1].split("=")[1].split()[0]) series["time"][run].append(time) series["sfr"][run].append(sfr) return series def _extract_fine_step_from_log(self, series, log_filename, run): cmd_grep = "grep 'Fine step' {} ".format(log_filename) content = os.popen(cmd_grep).readlines() for i in range(0, len(content)): data = content[i].replace("=", " ").split() fine_step = np.int(data[2]) time = np.float(data[4]) dt = np.float(data[6]) a = np.float(data[8]) mempc1 = np.float(data[10][:-1]) mempc2 = np.float(data[11][:-1]) series["time"][run].append(time) series["fine_step"][run].append(fine_step) series["dt"][run].append(dt) series["a"][run].append(a) series["mem_cells"][run].append(mempc1) series["mem_parts"][run].append(mempc2) return series def _extract_coarse_step_from_log(self, series, log_filename, run): rism = self.params.input.ramses_ism nlines = 2 + int(rism) # Number of useful lines cmd_grep = "grep 'Main step\\|coarse step' {} -A {}".format( log_filename, nlines - 1 ) content = os.popen(cmd_grep).readlines() for j in range(0, len(content), 2 * (nlines + 1)): i = j + nlines + 1 # Index for the "Main step" grep if i + nlines - 1 < len(content): series["time"][run].append( np.float(content[i + nlines - 1].split("=")[2].split()[0]) ) series["step"][run].append(np.int(content[i].split("=")[1].split()[0])) series["mcons"][run].append( np.float(content[i].split("=")[2].split()[0]) ) series["econs"][run].append( np.float(content[i].split("=")[3].split()[0]) ) series["epot"][run].append( np.float(content[i].split("=")[4].split()[0]) ) series["ekin"][run].append( np.float(content[i].split("=")[5].split()[0]) ) if rism: eint = np.float(content[i].split("=")[6].split()[0]) emag = np.float(content[i + 1].split("=")[1].split()[0]) else: eint = 0.0 emag = 0.0 series["eint"][run].append(eint) series["emag"][run].append(emag) series["elapsed"][run].append( np.float(content[j].split(":")[1].split()[0]) ) series["memory"][run].append(content[j + 1].split(":")[1]) return series def _extract_rms_from_log(self, series, log_filename, run): cmd_grep = "grep 'turbulent rms' {} -C 1".format(log_filename) content = os.popen(cmd_grep).readlines() for i in range(0, len(content), 4): series["time"][run].append(np.float(content[i].split("=")[2].split()[0])) series["dt"][run].append(np.float(content[i].split("=")[3].split()[0])) series["turb_rms"][run].append(np.float(content[i + 1].split(":")[1])) try: turb_energy = np.float(content[i + 2].split(":")[1]) threshold = self.params.rules.turb_energy_threshold assert threshold < 0 or abs(turb_energy) < threshold series["turb_energy"][run].append(abs(turb_energy)) except (AssertionError, ValueError, IndexError): series["turb_energy"][run].append(np.nan) return series def get_logs(self, run): glob_str = f"{self.path}/{run}/{self.params.input.log_prefix}*" logs = glob.glob(glob_str) if len(logs) == 0: glob_str = f"{self.path}/{run}/logs/{self.params.input.log_prefix}*" logs = glob.glob(glob_str) return logs def _from_log(self, keys, extractor, unique=False): # Initialize series series = {} for key in keys: series[key] = {} for run in self.runs: # Initialize list for key in keys: series[key][run] = [] # Get list of log files log_files = self.get_logs(run) # Parse files for log_filename in log_files: size = len(series["time"][run]) series = extractor(series, log_filename, run) if unique: # Always prefer data from last log, assuming they come in the right order time = series["time"][run] time_new = time[size] ind_overlap = np.searchsorted(time[:size], time_new, side='right') for key in series: del series[key][run][ind_overlap:size] # Numpify the lists for key in series: series[key][run] = np.array(series[key][run]) # Sort the arrays ind_sort = series["time"][run].argsort() for key in series: series[key][run] = series[key][run][ind_sort] return series def get_coldens0(self, run): mp = 1.4 * 1.66 * 10**(-24) * U.g z0 = self.get_nml("galbox_params/height0", run) * U.pc n0 = self.get_nml("galbox_params/dens0", run) * U.cm**(-3) return (np.sqrt(2 * np.pi) * mp * z0 * n0).express(U.coldens) def total_mass(self): """ Sum of gas plus sink mass """ time_gas = self.get_value("/series/coarse_step_from_log/time") mass_gas = self.get_value("/series/coarse_step_from_log/mcons") mass_sink = self.get_value("/series/sinks_from_log/mass_sink") total_mass = dict() for run in self.runs: # A bit specific ... needs to be generalized (TODO) info = self.snaps[run][self.nums[run][0]].info surface = (info["unit_length"].express(U.pc)) ** 2 m0 = self.get_coldens0(run) * surface # Initial mass in Msun offset = time_gas[run].size - mass_sink[run].size mass_gas[run] = m0 + m0*mass_gas[run] # convert in Msun total_mass[run] = mass_gas[run].copy() total_mass[run][offset:] = mass_gas[run][offset:] + mass_sink[run] # re add sink_mass return time_gas, total_mass, mass_gas def _ssfr_from_mass_sink(self, avg_window=None): """ avg_window in year """ time_unit = self.get_attribute("/series/sinks_from_log/time", "unit") mass_unit = self.get_attribute("/series/sinks_from_log/mass_sink", "unit") ssfr = {} for run in self.runs: # Surface of the box in pc^2 info = self.snaps[run][self.nums[run][0]].info surface = (info["unit_length"].express(U.pc)) ** 2 # WARNING : We do not multiply by boxlen since already done in 'unit_length' (pymses) time = self.get_value(f"/series/sinks_from_log/time/{run}") time = time * time_unit.express(U.year) mass_sink = self.get_value(f"/series/sinks_from_log/mass_sink/{run}") mass_sink = mass_sink * mass_unit.express(U.Msun) if avg_window is None: shift = 1 else: # We assume that the timestep do not vary a lot ... shift = np.searchsorted(time, time[-1] - avg_window, side="right") shift = len(time) - shift sfr = (mass_sink[shift:] - mass_sink[:-shift]) / ( time[shift:] - time[:-shift] ) sfr_beg = (mass_sink[:shift] - mass_sink[0]) / (time[:shift] - time[0]) ssfr[run] = np.zeros(len(mass_sink)) ssfr[run][shift:] = sfr / surface ssfr[run][:shift] = sfr_beg / surface return ssfr, {"avg_window": avg_window} def _surfacic_sink_mass(self): mass_unit = self.get_attribute("/series/sinks_from_log/mass_sink", "unit") ssm = {} for run in self.runs: # Surface of the box in pc^2 info = self.snaps[run][self.nums[run][0]].info surface = (info["unit_length"].express(U.pc)) ** 2 mass_sink = self.get_value(f"/series/sinks_from_log/mass_sink/{run}") mass_sink = mass_sink * mass_unit.express(U.Msun) ssm[run] = mass_sink / surface return ssm def _turb_power(self): turb_power = {} for run in self.runs: dt = self.get_value(f"/series/rms_from_log/dt/{run}") # Energy injected at each timestep energy = self.get_value(f"/series/rms_from_log/turb_energy/{run}") # Power of the turbulence at this step in Watts turb_power[run] = energy / dt return turb_power def _sbeta_onavg(self): """ [ismfeed] Compute the slope of the Sigma pdf as a function of the value of beta """ col_pdf = self.get_value("/comp/avg_time_coldens_pdf_z") beta = self.get_value("/comp/nml_cloud_params/beta_cool") slope = np.zeros(len(col_pdf["runs"])) origin = np.zeros(len(col_pdf["runs"])) stderr = np.zeros(len(col_pdf["runs"])) for i, run in enumerate(col_pdf["runs"]): values, centers = col_pdf["mean"][i] mask_fit = ( (centers > self.params.pdf.xmin_fit) & (centers < self.params.pdf.xmax_fit) & (values > np.max(values) * self.params.pdf.fit_cut) ) (slope[i], origin[i], correlation, _, stderr[i]) = linregress( centers[mask_fit], np.log10(values[mask_fit]) ) return {"beta": beta, "slope": slope, "origin": origin, "stderr": stderr} def _gen_rule_time_global( self, glob_name, name=None, glob_group="/globals", subarray_name=None, unload_cells=True, unit=U.none, description="", ): if name is None: name = "time_" + glob_name self.rules[name] = Rule( self, partial( self._time_series, partial( self.get_global, glob_group + "/" + glob_name, unload_cells=True ), ), group="/series", unit=unit, dependencies={"time": None, glob_name: "__parent__"}, ) def _gen_rule_avg(self, rule_src_name, subarray_name=None, name=None): rule_src = self.rules[rule_src_name] if subarray_name is None: src_name = rule_src_name group_src = rule_src.group unit = rule_src.unit descr = rule_src.description else: src_name = subarray_name group_src = rule_src.group + "/" + rule_src_name unit = rule_src.unit[subarray_name] descr = rule_src.description[subarray_name] description = { "runs": "List of runs", "mean": "Temporal average of {}".format(descr), "std": "Standard deviation of {}".format(descr), "median": "Median of {}".format(descr), "max": "Maximum of {}".format(descr), "min": "Minimum of {}".format(descr), "q025": "2.5 percentile of {}".format(descr), "q975": "97.5 percentile of {}".format(descr), "q16": "16 percentile of {}".format(descr), "q84": "84 percentile of {}".format(descr), } units = unit if name is None: name = "avg_" + src_name def fn(arg=None, **kwargs): if arg is None: return self._time_avg(src_name, group=group_src, **kwargs) else: return self._time_avg( src_name + "_" + str(arg), group=group_src, **kwargs ) self.rules[name] = Rule( self, fn, group="/comp", unit=units, description=description, dependencies=[rule_src_name], ) def def_rules(self): self.rules = { # Read from log "sinks_from_log": Rule( self, partial( self._from_log, ["time", "mass_sink", "nb_sink"], self._extract_sinks_from_log, ), group="/series", unit={"time": U.year, "mass_sink": U.Msun, "nb_sink": U.none}, description={ "time": "Time", "mass_sink": "Total mass of stars", "nb_sink": "Number of stars", }, ), "issfr": Rule( self, self._ssfr_from_mass_sink, group="/series/sinks_from_log", unit=U.ssfr, description="Instantaneous surfacic star formation rate", dependencies=["sinks_from_log"], ), "ssm": Rule( self, self._surfacic_sink_mass, group="/series/sinks_from_log", unit=U.Msun / U.pc ** 2, description="Surfacic sink mass", dependencies=["sinks_from_log"], ), "sfr_from_log": Rule( self, partial(self._from_log, ["time", "sfr"], self._extract_sfr_from_log), group="/series", unit={"time": U.year, "sfr": U.ssfr}, description={ "time": "Time", "sfr": "Averaged surfacic star formation rate", }, ), "rms_from_log": Rule( self, partial( self._from_log, ["time", "dt", "turb_rms", "turb_energy"], self._extract_rms_from_log, ), group="/series", unit={ "time": "unit_time", "dt": "unit_time", "turb_rms": U.none, "turb_energy": { "unit_length": 3, "unit_velocity": 2, "unit_density": 1, }, }, description={ "time": "Time", "dt": "Timestep", "turb_rms": "Computed turbulent RMS", "turb_energy": "Injected turbulent energy", }, ), "coarse_step_from_log": Rule( self, partial( self._from_log, [ "time", "step", "mcons", "econs", "epot", "ekin", "eint", "emag", "elapsed", "memory", ], self._extract_coarse_step_from_log, ), group="/series", unit={ "time": "unit_time", "step": U.none, "mcons": U.none, "econs": U.none, "epot": U.none, # TODO find unit "ekin": U.none, "eint": U.none, "emag": U.none, "elapsed": U.s, "memory": U.none, }, ), "fine_step_from_log": Rule( self, partial( self._from_log, ["time", "fine_step", "dt", "a", "mem_cells", "mem_parts"], self._extract_fine_step_from_log, ), group="/series", unit={ "time": "unit_time", "fine_step": U.none, "dt": "unit_time", "a": U.none, "mem_cells": U.none, "mem_parts": U.none, }, ), "stellar_from_log": Rule( self, partial( self._from_log, ["time", "mass", "lifetime", "id"], self._extract_stellar_from_log, ), group="/dataset", unit={ "time": "unit_time", "mass": "unit_mass", "lifetime": "unit_time", "id": U.none, }, ), "turb_power": Rule( self, self._turb_power, group="/series/rms_from_log", unit={ "unit_length": 3, "unit_velocity": 2, "unit_density": 1, "unit_time": -1, }, description="Injected turbulent power", dependencies=["rms_from_log"], ), # Read from outputs "time": Rule( self, partial( self._time_series, partial(self.get_global, "/globals/time_num") ), group="/series", unit="unit_time", dependencies=["time_num"], ), "time_rho_prof": Rule( self, partial( self._time_series, partial(self.get_snap_value, "/profile/rho_prof") ), group="/series", dependencies={"time": None, "rho_prof": "__parent__"}, ), "time_coldens_pdf": Rule( self, partial( self._time_series, partial(self.get_snap_value, "/hist/pdf_coldens") ), group="/series", dependencies={"time": None, "pdf_coldens": "__parent__"}, ), "time_pdf_slope_coldens": Rule( self, partial( self._time_series, partial( self.get_attr, "slope", node_name="/hist/pdf_coldens_z", ), ), group="/series", dependencies={"time": None, "fit_pdf_coldens": "z"}, ), "sbeta_onavg": Rule( self, partial(self._sbeta_onavg), group="/comp", dependencies={ "avg_time_coldens_pdf": "z", "nml": "cloud_params/beta_cool", }, ), # namelist "nml": Rule( self, lambda nml_key: self._compare( partial(self.get_nml, nml_key), use_num=False ), group="/comp", ), } self._gen_rule_time_global("mwa_sigma", "time_sigma", unit="unit_velocity") self._gen_rule_time_global("max_fluct_coldens") self._gen_rule_time_global( "mass", unit=self.info["unit_density"] * self.info["unit_length"] ** 3 ) self._gen_rule_time_global("mwa_B_int", unit="unit_mag") for name in [ "issfr", "time_sigma", "time_pdf_slope_coldens", "turb_power", "time_rho_prof", "time_coldens_pdf", ]: self._gen_rule_avg(name) self._gen_rule_avg("sinks_from_log", "mass_sink") self._gen_rule_avg("sinks_from_log", "nb_sink") self._gen_rule_avg("sfr_from_log", "sfr") self._gen_rule_avg("rms_from_log", "turb_rms") self._gen_rule_avg("rms_from_log", "turb_energy") super(StudyProcessor, self).def_rules()