Huge refactoring

This commit is contained in:
Noe Brucy
2021-06-22 12:23:03 +02:00
parent a286c08b72
commit 0dc9e8fc7b
12 changed files with 343 additions and 361 deletions
+741
View File
@@ -0,0 +1,741 @@
# coding: utf-8
import os
import glob
import numpy as np
from functools import partial
from scipy.stats import linregress
from baseprocessor import Rule, HDF5Container
from aggregator import Aggregator
from snapshotprocessor import SnapshotProcessor
from run_selector import RunSelector
from params import default_params
from units import U
class StudyProcessor(Aggregator, HDF5Container):
"""
This object is linked to several ramses simulation of a same study
"""
def __init__(
self,
path,
in_runs,
in_nums,
path_out=None,
params=default_params(),
selector=None,
tag=None,
unit_time=U.year,
**kwargs
):
"""
Creates the basic structures needed for the outputs
"""
super(StudyProcessor, self).__init__(path, path_out, params, tag)
# Open outfile
if not self.params.out.tag == "":
tag_name = "_" + self.params.out.tag
else:
tag_name = ""
self.filename = path_out + "/comp" + tag_name + ".h5"
# Select runs
if selector is None:
selector = RunSelector(
path, in_runs, in_nums, self.params.input.nml_filename, **kwargs
)
# Save infos
self.path = path
self.runs = selector.runs
self.nums = selector.nums
# Get postprocesor objets for each run and infos on them
self.snaps = {}
self.info = {}
for run in self.runs:
path_run = path + "/" + run
path_out_run = path_out + "/" + run
self.snaps[run] = {}
for num in self.nums[run]:
self.snaps[run][num] = SnapshotProcessor(
path_run,
num,
path_out=path_out_run,
params=self.params,
unit_time=unit_time,
)
run0 = self.runs[0]
self.info = selector.info[run0][self.nums[run0][0]]
self.namelist = selector.namelist
# log info
self.log_id = "[comp {}] ".format(self.params.out.tag)
# Define rules
self.def_rules()
def _needs_computation(self, overwrite, name_full):
"""
Returns True if a new computation of the rule is needed
"""
if overwrite or not (name_full in self.save):
return True
elif "nums" not in self.save.get_node(name_full)._v_attrs:
return True
else:
saved_nums = self.save.get_node(name_full)._v_attrs.nums
missing_runs = len([run for run in self.nums if run not in saved_nums]) > 0
missing_nums = missing_runs or all(
[
len([num for num in self.nums[run] if num not in saved_nums[run]])
> 0
for run in self.nums
if run in saved_nums
]
)
return missing_nums
def _save_data(self, name_full, data, description, unit):
super(StudyProcessor, self)._save_data(name_full, data, description, unit)
self.save.get_node(name_full)._v_attrs.nums = self.nums
def _time_series(self, getter, arg=None):
series = {}
for run in self.runs:
series[run] = []
for i, num in enumerate(self.nums[run]):
series[run].append(getter(run, num, arg=arg))
series[run] = np.array(series[run])
return series
def _comp(self, getter, use_num=True):
prop = []
for i, run in enumerate(self.runs):
if use_num:
num = self.nums[run][0]
prop.append(getter(run, num))
else:
prop.append(getter(run))
return np.array(prop)
def _time_avg(self, name, start=None, end=None, span=None, group="/series"):
serie0 = self.save.get_node(group + "/" + name + "/" + self.runs[0]).read()
if len(serie0.shape) > 1:
shape = [len(self.runs)] + list(serie0.shape[1:])
else:
shape = len(self.runs)
mean = np.zeros(shape)
median = np.zeros(shape)
std = np.zeros(shape)
v_min = np.zeros(shape)
v_max = np.zeros(shape)
q975 = np.zeros(shape)
q025 = np.zeros(shape)
q16 = np.zeros(shape)
q84 = np.zeros(shape)
for i, run in enumerate(self.runs):
serie = self.save.get_node(group + "/" + name + "/" + run).read()
time = self.save.get_node(group + "/time/" + run).read()
if len(serie.shape) <= 1:
mask = abs(serie) != np.inf
if not ((start, end, span) == (None, None, None)):
start_r, end_r = start, end
# Be sure that start_r and end_r are defined
if end_r is None:
if span is None or start_r is None:
end_r = time[-1]
else:
end_r = start_r + span
if start_r is None:
if span is None:
start_r = time[0]
else:
start_r = end_r - span
mask = (
mask & (time >= start_r) & (time <= end_r) & np.isfinite(serie)
)
serie = serie[mask]
mean[i] = np.mean(serie, axis=0)
std[i] = np.std(serie, axis=0)
(
v_min[i],
q025[i],
q16[i],
median[i],
q84[i],
q975[i],
v_max[i],
) = np.percentile(serie, [0, 2.5, 16, 50, 84, 97.5, 100], axis=0)
return {
"runs": self.runs,
"mean": mean,
"std": std,
"median": median,
"min": v_min,
"max": v_max,
"q025": q025,
"q975": q975,
"q16": q16,
"q84": q84,
}
def get_attr(self, attr_name, run, num, node_name="/", arg=None):
pp = self.snaps[run][num]
if arg is not None:
node_name = node_name + "_" + str(arg)
return pp.get_attribute(node_name, attr_name)
def get_snap_value(self, name, run, num, arg=None):
pp = self.snaps[run][num]
if arg is not None:
name = name + "_" + str(arg)
return pp.get_value(name)
def get_global(self, node_name, run, num, arg=None, unload_cells=False):
if arg is not None:
node_name = node_name + "_" + str(arg)
pp = self.snaps[run][num]
if unload_cells:
pp.unload_cells()
value = pp.get_value(node_name)
return value
def get_nml(self, nml_key, run):
return self.namelist[run][nml_key]
def get_pdf_slope(self, name, run, num):
pp = self.snaps[run][num]
pp.process(["fit_pdf_" + name], ["z"], overwrite=self.overwrite_dep)
slope = pp.get_attribute("/hist/pdf_" + name + "_z", "slope")
return slope
def _extract_sinks_from_log(self, series, log_filename, run):
cmd_grep = "sed '/cpu.*/d' {} | grep 'Number of sink' -A 2".format(log_filename)
content = os.popen(cmd_grep).readlines()
for i in range(0, len(content), 4):
try:
nb_sink = np.int(content[i].split("=")[1])
mass_sink = np.float(content[i + 1].split("=")[1])
time = np.float(content[i + 2].split("=")[1])
series["nb_sink"][run].append(nb_sink)
series["mass_sink"][run].append(mass_sink)
series["time"][run].append(time)
except (ValueError, IndexError):
self._log(
"Error encountered in parsing {} (grepped block {})".format(
log_filename, i
),
"WARNING",
)
return series
def _extract_sfr_from_log(self, series, log_filename, run):
cmd_grep = "grep '\[SFR' {} ".format(log_filename)
content = os.popen(cmd_grep).readlines()
for i in range(0, len(content)):
time = np.float(content[i].split("]")[0].split("=")[1].split()[0])
sfr = np.float(content[i].split("]")[1].split("=")[1].split()[0])
series["time"][run].append(time)
series["sfr"][run].append(sfr)
return series
def _extract_fine_step_from_log(self, series, log_filename, run):
cmd_grep = "grep 'Fine step' {} ".format(log_filename)
content = os.popen(cmd_grep).readlines()
for i in range(0, len(content)):
data = content[i].replace("=", " ").split()
fine_step = np.int(data[2])
time = np.float(data[4])
dt = np.float(data[6])
a = np.float(data[8])
mempc1 = np.float(data[10][:-1])
mempc2 = np.float(data[11][:-1])
series["time"][run].append(time)
series["fine_step"][run].append(fine_step)
series["dt"][run].append(dt)
series["a"][run].append(a)
series["mem_cells"][run].append(mempc1)
series["mem_parts"][run].append(mempc2)
return series
def _extract_coarse_step_from_log(self, series, log_filename, run):
rism = self.params.input.ramses_ism
nlines = 2 + int(rism) # Number of useful lines
cmd_grep = "grep 'Main step\\|coarse step' {} -A {}".format(
log_filename, nlines - 1
)
content = os.popen(cmd_grep).readlines()
for j in range(0, len(content), 2 * (nlines + 1)):
i = j + nlines + 1 # Index for the "Main step" grep
if i + nlines - 1 < len(content):
series["time"][run].append(
np.float(content[i + nlines - 1].split("=")[2].split()[0])
)
series["step"][run].append(np.int(content[i].split("=")[1].split()[0]))
series["mcons"][run].append(
np.float(content[i].split("=")[2].split()[0])
)
series["econs"][run].append(
np.float(content[i].split("=")[3].split()[0])
)
series["epot"][run].append(
np.float(content[i].split("=")[4].split()[0])
)
series["ekin"][run].append(
np.float(content[i].split("=")[5].split()[0])
)
if rism:
eint = np.float(content[i].split("=")[6].split()[0])
emag = np.float(content[i + 1].split("=")[1].split()[0])
else:
eint = 0.0
emag = 0.0
series["eint"][run].append(eint)
series["emag"][run].append(emag)
series["elapsed"][run].append(
np.float(content[j].split(":")[1].split()[0])
)
series["memory"][run].append(content[j + 1].split(":")[1])
return series
def _extract_rms_from_log(self, series, log_filename, run):
cmd_grep = "grep 'turbulent rms' {} -C 1".format(log_filename)
content = os.popen(cmd_grep).readlines()
for i in range(0, len(content), 4):
series["time"][run].append(np.float(content[i].split("=")[2].split()[0]))
series["dt"][run].append(np.float(content[i].split("=")[3].split()[0]))
series["turb_rms"][run].append(np.float(content[i + 1].split(":")[1]))
try:
turb_energy = np.float(content[i + 2].split(":")[1])
threshold = self.params.rules.turb_energy_threshold
assert threshold < 0 or abs(turb_energy) < threshold
series["turb_energy"][run].append(abs(turb_energy))
except (AssertionError, ValueError, IndexError):
series["turb_energy"][run].append(np.nan)
return series
def _from_log(self, keys, extractor):
# Initialize series
series = {}
for key in keys:
series[key] = {}
for run in self.runs:
# Initialize list
for key in keys:
series[key][run] = []
# get one preprocessor
path_run = self.path + "/" + run
# Get list of run files
log_files = path_run + "/" + self.params.input.log_prefix + "*"
# Parse files
for log_filename in glob.glob(log_files):
series = extractor(series, log_filename, run)
# Numpify the lists
for key in series:
series[key][run] = np.array(series[key][run])
# Sort the arrays
ind_sort = series["time"][run].argsort()
for key in series:
series[key][run] = series[key][run][ind_sort]
return series
def _ssfr_from_mass_sink(self, avg_window=None):
"""
avg_window in year
"""
time_unit = self.save.get_node("/series/sinks_from_log/time")._v_attrs.unit
mass_unit = self.save.get_node("/series/sinks_from_log/mass_sink")._v_attrs.unit
ssfr = {}
for run in self.runs:
# Surface of the box in pc^2
info = self.snaps[run][self.nums[run][0]].info
surface = (info["unit_length"].express(U.pc)) ** 2
# WARNING : We do not multiply by boxlen since already done in 'unit_length' (pymses)
time = self.save.get_node("/series/sinks_from_log/time/" + run).read()
time = time * time_unit.express(U.year)
mass_sink = self.save.get_node(
"/series/sinks_from_log/mass_sink/" + run
).read()
mass_sink = mass_sink * mass_unit.express(U.Msun)
if avg_window is None:
shift = 1
else:
# We assume that the timestep do not vary a lot ...
shift = np.searchsorted(time, time[-1] - avg_window, side="right")
shift = len(time) - shift
sfr = (mass_sink[shift:] - mass_sink[:-shift]) / (
time[shift:] - time[:-shift]
)
sfr_beg = (mass_sink[:shift] - mass_sink[0]) / (time[:shift] - time[0])
ssfr[run] = np.zeros(len(mass_sink))
ssfr[run][shift:] = sfr / surface
ssfr[run][:shift] = sfr_beg / surface
return ssfr, {"avg_window": avg_window}
def _surfacic_sink_mass(self):
mass_unit = self.save.get_node("/series/sinks_from_log/mass_sink")._v_attrs.unit
ssm = {}
for run in self.runs:
# Surface of the box in pc^2
info = self.snaps[run][self.nums[run][0]].info
surface = (info["unit_length"].express(U.pc)) ** 2
mass_sink = self.save.get_node(
"/series/sinks_from_log/mass_sink/" + run
).read()
mass_sink = mass_sink * mass_unit.express(U.Msun)
ssm[run] = mass_sink / surface
return ssm
def _turb_power(self):
turb_power = {}
for run in self.runs:
dt = self.save.get_node("/series/rms_from_log/dt/" + run).read()
# Energy injected at each timestep
energy = self.save.get_node(
"/series/rms_from_log/turb_energy/" + run
).read()
# Power of the turbulence at this step in Watts
turb_power[run] = energy / dt
return turb_power
def _sbeta_onavg(self):
"""
[ismfeed] Compute the slope of the Sigma pdf as a function of the value of beta
"""
col_pdf = self.get_value("/comp/avg_time_coldens_pdf_z")
beta = self.get_value("/comp/nml_cloud_params/beta_cool")
slope = np.zeros(len(col_pdf["runs"]))
origin = np.zeros(len(col_pdf["runs"]))
stderr = np.zeros(len(col_pdf["runs"]))
for i, run in enumerate(col_pdf["runs"]):
values, centers = col_pdf["mean"][i]
mask_fit = (
(centers > self.params.pdf.xmin_fit)
& (centers < self.params.pdf.xmax_fit)
& (values > np.max(values) * self.params.pdf.fit_cut)
)
(slope[i], origin[i], correlation, _, stderr[i]) = linregress(
centers[mask_fit], np.log10(values[mask_fit])
)
return {"beta": beta, "slope": slope, "origin": origin, "stderr": stderr}
def _gen_rule_time_global(
self,
glob_name,
name=None,
glob_group="/globals",
subarray_name=None,
unload_cells=True,
unit=U.none,
description="",
):
if name is None:
name = "time_" + glob_name
self.rules[name] = Rule(
self,
partial(
self._time_series,
partial(
self.get_global, glob_group + "/" + glob_name, unload_cells=True
),
),
group="/series",
unit=unit,
dependencies={"time": None, glob_name: "__parent__"},
)
def _gen_rule_avg(self, rule_src_name, subarray_name=None, name=None):
rule_src = self.rules[rule_src_name]
if subarray_name is None:
src_name = rule_src_name
group_src = rule_src.group
unit = rule_src.unit
descr = rule_src.description
else:
src_name = subarray_name
group_src = rule_src.group + "/" + rule_src_name
unit = rule_src.unit[subarray_name]
descr = rule_src.description[subarray_name]
description = {
"runs": "List of runs",
"mean": "Temporal average of {}".format(descr),
"std": "Standard deviation of {}".format(descr),
"median": "Median of {}".format(descr),
"max": "Maximum of {}".format(descr),
"min": "Minimum of {}".format(descr),
"q025": "2.5 percentile of {}".format(descr),
"q975": "97.5 percentile of {}".format(descr),
"q16": "16 percentile of {}".format(descr),
"q84": "84 percentile of {}".format(descr),
}
units = unit
if name is None:
name = "avg_" + src_name
def fn(arg=None, **kwargs):
if arg is None:
return self._time_avg(src_name, group=group_src, **kwargs)
else:
return self._time_avg(
src_name + "_" + str(arg), group=group_src, **kwargs
)
self.rules[name] = Rule(
self,
fn,
group="/comp",
unit=units,
description=description,
dependencies=[rule_src_name],
)
def def_rules(self):
self.rules = {
# Read from log
"sinks_from_log": Rule(
self,
partial(
self._from_log,
["time", "mass_sink", "nb_sink"],
self._extract_sinks_from_log,
),
group="/series",
unit={"time": "unit_time", "mass_sink": U.Msun, "nb_sink": U.none},
description={
"time": "Time",
"mass_sink": "Total mass of stars",
"nb_sink": "Number of stars",
},
),
"issfr": Rule(
self,
self._ssfr_from_mass_sink,
group="/series/sinks_from_log",
unit=U.ssfr,
description="Instantaneous surfacic star formation rate",
dependencies=["sinks_from_log"],
),
"ssm": Rule(
self,
self._surfacic_sink_mass,
group="/series/sinks_from_log",
unit=U.Msun / U.pc ** 2,
description="Surfacic sink mass",
dependencies=["sinks_from_log"],
),
"sfr_from_log": Rule(
self,
partial(self._from_log, ["time", "sfr"], self._extract_sfr_from_log),
group="/series",
unit={"time": U.year, "sfr": U.ssfr},
description={
"time": "Time",
"sfr": "Averaged surfacic star formation rate",
},
),
"rms_from_log": Rule(
self,
partial(
self._from_log,
["time", "dt", "turb_rms", "turb_energy"],
self._extract_rms_from_log,
),
group="/series",
unit={
"time": "unit_time",
"dt": "unit_time",
"turb_rms": U.none,
"turb_energy": {
"unit_length": 3,
"unit_velocity": 2,
"unit_density": 1,
},
},
description={
"time": "Time",
"dt": "Timestep",
"turb_rms": "Computed turbulent RMS",
"turb_energy": "Injected turbulent energy",
},
),
"coarse_step_from_log": Rule(
self,
partial(
self._from_log,
[
"time",
"step",
"mcons",
"econs",
"epot",
"ekin",
"eint",
"emag",
"elapsed",
"memory",
],
self._extract_coarse_step_from_log,
),
group="/series",
unit={
"time": "unit_time",
"step": U.none,
"mcons": U.none,
"econs": U.none,
"epot": U.none, # TODO find unit
"ekin": U.none,
"eint": U.none,
"emag": U.none,
"elapsed": U.s,
"memory": U.none,
},
),
"fine_step_from_log": Rule(
self,
partial(
self._from_log,
["time", "fine_step", "dt", "a", "mem_cells", "mem_parts"],
self._extract_fine_step_from_log,
),
group="/series",
unit={
"time": "unit_time",
"fine_step": U.none,
"dt": "unit_time",
"a": U.none,
"mem_cells": U.none,
"mem_parts": U.none,
},
),
"turb_power": Rule(
self,
self._turb_power,
group="/series/rms_from_log",
unit={
"unit_length": 3,
"unit_velocity": 2,
"unit_density": 1,
"unit_time": -1,
},
description="Injected turbulent power",
dependencies=["rms_from_log"],
),
# Read from outputs
"time": Rule(
self,
partial(
self._time_series, partial(self.get_global, "/globals/time_num")
),
group="/series",
unit="unit_time",
dependencies=["time_num"],
),
"time_rho_prof": Rule(
self,
partial(
self._time_series, partial(self.get_pp_value, "/profile/rho_prof")
),
group="/series",
dependencies={"time": None, "rho_prof": "__parent__"},
),
"time_coldens_pdf": Rule(
self,
partial(
self._time_series, partial(self.get_pp_value, "/hist/pdf_coldens")
),
group="/series",
dependencies={"time": None, "pdf_coldens": "__parent__"},
),
"time_pdf_slope_coldens": Rule(
self,
partial(
self._time_series,
partial(
self.get_attr,
"slope",
node_name="/hist/pdf_coldens_z",
),
),
group="/series",
dependencies={"time": None, "fit_pdf_coldens": "z"},
),
"sbeta_onavg": Rule(
self,
partial(self._sbeta_onavg),
group="/comp",
dependencies={
"avg_time_coldens_pdf": "z",
"nml": "cloud_params/beta_cool",
},
),
# namelist
"nml": Rule(
self,
lambda nml_key: self._comp(
partial(self.get_nml, nml_key), use_num=False
),
group="/comp",
),
}
self._gen_rule_time_global("mwa_sigma", "time_sigma", unit="unit_velocity")
self._gen_rule_time_global("max_fluct_coldens")
self._gen_rule_time_global(
"mass", unit=self.info["unit_density"] * self.info["unit_length"] ** 3
)
self._gen_rule_time_global("mwa_B_int", unit="unit_mag")
for name in [
"issfr",
"time_sigma",
"time_pdf_slope_coldens",
"turb_power",
"time_rho_prof",
"time_coldens_pdf",
]:
self._gen_rule_avg(name)
self._gen_rule_avg("sinks_from_log", "mass_sink")
self._gen_rule_avg("sinks_from_log", "nb_sink")
self._gen_rule_avg("sfr_from_log", "sfr")
self._gen_rule_avg("rms_from_log", "turb_rms")
self._gen_rule_avg("rms_from_log", "turb_energy")
super(StudyProcessor, self).def_rules()