Files
pipeline/studyprocessor.py
T
2021-11-02 18:02:00 +01:00

769 lines
27 KiB
Python

# coding: utf-8
import os
from distutils.file_util import copy_file
import glob
import numpy as np
from functools import partial
from scipy.stats import linregress
from baseprocessor import Rule, HDF5Container
from aggregator import Aggregator
from snapshotprocessor import SnapshotProcessor
from run_selector import RunSelector
from params import default_params
from units import U
class StudyProcessor(Aggregator, HDF5Container):
"""
This object is linked to several ramses simulation of a same study
"""
def __init__(
self,
path,
runs,
nums,
path_out=None,
params=default_params(),
selector=None,
tag=None,
unit_time=U.year,
**kwargs,
):
"""
Creates the basic structures needed for the outputs
"""
super(StudyProcessor, self).__init__(path, path_out, params, tag)
# Open outfile
if not self.params.out.tag == "":
tag_name = "_" + self.params.out.tag
else:
tag_name = ""
if self.params.out.ext_subfolder:
self.filename = f"{self.path_out}/h5/study{tag_name}.h5"
else:
self.filename = f"{self.path_out}/study{tag_name}.h5"
os.makedirs(os.path.dirname(self.filename), exist_ok=True)
# Select runs
if selector is None:
selector = RunSelector(
path, runs, nums, self.params.input.nml_filename, unit_time=unit_time, **kwargs
)
# Save infos
self.path = path
self.runs = selector.runs
self.nums = selector.nums
# Get postprocesor objets for each run and infos on them
self.snaps = {}
self.info = {}
for run in self.runs:
path_run = path + "/" + run
path_out_run = self.path_out + "/" + run
self.snaps[run] = {}
for num in self.nums[run]:
self.snaps[run][num] = SnapshotProcessor(
path_run,
num,
selector=selector,
path_out=path_out_run,
params=self.params,
unit_time=unit_time,
)
run0 = self.runs[0]
self.info = selector.info[run0][self.nums[run0][0]]
self.namelist = selector.namelist
# Save namelist and logs
if self.params.out.copy_info:
for run in self.runs:
nml_src = f"{self.path}/{run}/{self.params.input.nml_filename}"
nml_dest = f"{self.path_out}/{run}/{self.params.input.nml_filename}"
copy_file(nml_src, nml_dest, update=1)
logs = self.get_logs(run)
os.makedirs(f"{self.path_out}/{run}/logs", exist_ok=True)
for log in logs:
dest = f"{self.path_out}/{run}/logs/{os.path.basename(log)}"
copy_file(log, dest, update=1)
# log info
self.log_id = "[study {}] ".format(self.params.out.tag)
# Define rules
self.def_rules()
def _needs_computation(self, overwrite, name_full):
"""
Returns True if a new computation of the rule is needed
"""
if overwrite or not (name_full in self.save):
return True
elif "nums" not in self.save.get_node(name_full)._v_attrs:
return True
else:
saved_nums = self.save.get_node(name_full)._v_attrs.nums
missing_runs = len([run for run in self.nums if run not in saved_nums]) > 0
missing_nums = missing_runs or all(
[
len([num for num in self.nums[run] if num not in saved_nums[run]])
> 0
for run in self.nums
if run in saved_nums
]
)
return missing_nums
def _save_data(self, name_full, data, description, unit):
super(StudyProcessor, self)._save_data(name_full, data, description, unit)
self.save.get_node(name_full)._v_attrs.nums = self.nums
def _time_series(self, getter, arg=None):
series = {}
for run in self.runs:
series[run] = []
for i, num in enumerate(self.nums[run]):
series[run].append(getter(run, num, arg=arg))
series[run] = np.array(series[run], dtype=float)
return series
def _compare(self, getter, use_num=True):
prop = []
for i, run in enumerate(self.runs):
if use_num:
num = self.nums[run][0]
prop.append(getter(run, num))
else:
prop.append(getter(run))
return np.array(prop)
def _time_avg(self, name, start=None, end=None, span=None, group="/series"):
serie0 = self.save.get_node(group + "/" + name + "/" + self.runs[0]).read()
if len(serie0.shape) > 1:
shape = [len(self.runs)] + list(serie0.shape[1:])
else:
shape = len(self.runs)
mean = np.zeros(shape)
median = np.zeros(shape)
std = np.zeros(shape)
v_min = np.zeros(shape)
v_max = np.zeros(shape)
q975 = np.zeros(shape)
q025 = np.zeros(shape)
q16 = np.zeros(shape)
q84 = np.zeros(shape)
for i, run in enumerate(self.runs):
serie = self.save.get_node(group + "/" + name + "/" + run).read()
time = self.save.get_node(group + "/time/" + run).read()
if len(serie.shape) <= 1:
mask = abs(serie) != np.inf
if not ((start, end, span) == (None, None, None)):
start_r, end_r = start, end
# Be sure that start_r and end_r are defined
if end_r is None:
if span is None or start_r is None:
end_r = time[-1]
else:
end_r = start_r + span
if start_r is None:
if span is None:
start_r = time[0]
else:
start_r = end_r - span
mask = (
mask & (time >= start_r) & (time <= end_r) & np.isfinite(serie)
)
serie = serie[mask]
mean[i] = np.mean(serie, axis=0)
std[i] = np.std(serie, axis=0)
(
v_min[i],
q025[i],
q16[i],
median[i],
q84[i],
q975[i],
v_max[i],
) = np.percentile(serie, [0, 2.5, 16, 50, 84, 97.5, 100], axis=0)
return {
"runs": self.runs,
"mean": mean,
"std": std,
"median": median,
"min": v_min,
"max": v_max,
"q025": q025,
"q975": q975,
"q16": q16,
"q84": q84,
}
def get_attr(self, attr_name, run, num, node_name="/", arg=None):
snap = self.snaps[run][num]
if arg is not None:
node_name = node_name + "_" + str(arg)
return snap.get_attribute(node_name, attr_name)
def get_snap_value(self, name, run, num, arg=None):
snap = self.snaps[run][num]
if arg is not None:
name = name + "_" + str(arg)
return snap.get_value(name)
def get_global(self, node_name, run, num, arg=None, unload_cells=False):
if arg is not None:
node_name = node_name + "_" + str(arg)
snap = self.snaps[run][num]
if unload_cells:
snap.unload_cells()
value = snap.get_value(node_name)
return value
def get_nml(self, nml_key, run):
return self.namelist[run][nml_key]
def get_pdf_slope(self, name, run, num):
snap = self.snaps[run][num]
snap.process(["fit_pdf_" + name], ["z"], overwrite=self.overwrite_dep)
slope = snap.get_attribute("/hist/pdf_" + name + "_z", "slope")
return slope
def _extract_sinks_from_log(self, series, log_filename, run):
cmd_grep = "sed '/cpu.*/d' {} | grep 'Number of sink' -A 2".format(log_filename)
content = os.popen(cmd_grep).readlines()
for i in range(0, len(content), 4):
try:
nb_sink = np.int(content[i].split("=")[1])
mass_sink = np.float(content[i + 1].split("=")[1])
time = np.float(content[i + 2].split("=")[1])
series["nb_sink"][run].append(nb_sink)
series["mass_sink"][run].append(mass_sink)
series["time"][run].append(time)
except (ValueError, IndexError):
self._log(
"Error encountered in parsing {} (grepped block {})".format(
log_filename, i
),
"WARNING",
)
return series
def _extract_sfr_from_log(self, series, log_filename, run):
cmd_grep = "grep '\[SFR' {} ".format(log_filename)
content = os.popen(cmd_grep).readlines()
for i in range(0, len(content)):
time = np.float(content[i].split("]")[0].split("=")[1].split()[0])
sfr = np.float(content[i].split("]")[1].split("=")[1].split()[0])
series["time"][run].append(time)
series["sfr"][run].append(sfr)
return series
def _extract_fine_step_from_log(self, series, log_filename, run):
cmd_grep = "grep 'Fine step' {} ".format(log_filename)
content = os.popen(cmd_grep).readlines()
for i in range(0, len(content)):
data = content[i].replace("=", " ").split()
fine_step = np.int(data[2])
time = np.float(data[4])
dt = np.float(data[6])
a = np.float(data[8])
mempc1 = np.float(data[10][:-1])
mempc2 = np.float(data[11][:-1])
series["time"][run].append(time)
series["fine_step"][run].append(fine_step)
series["dt"][run].append(dt)
series["a"][run].append(a)
series["mem_cells"][run].append(mempc1)
series["mem_parts"][run].append(mempc2)
return series
def _extract_coarse_step_from_log(self, series, log_filename, run):
rism = self.params.input.ramses_ism
nlines = 2 + int(rism) # Number of useful lines
cmd_grep = "grep 'Main step\\|coarse step' {} -A {}".format(
log_filename, nlines - 1
)
content = os.popen(cmd_grep).readlines()
for j in range(0, len(content), 2 * (nlines + 1)):
i = j + nlines + 1 # Index for the "Main step" grep
if i + nlines - 1 < len(content):
series["time"][run].append(
np.float(content[i + nlines - 1].split("=")[2].split()[0])
)
series["step"][run].append(np.int(content[i].split("=")[1].split()[0]))
series["mcons"][run].append(
np.float(content[i].split("=")[2].split()[0])
)
series["econs"][run].append(
np.float(content[i].split("=")[3].split()[0])
)
series["epot"][run].append(
np.float(content[i].split("=")[4].split()[0])
)
series["ekin"][run].append(
np.float(content[i].split("=")[5].split()[0])
)
if rism:
eint = np.float(content[i].split("=")[6].split()[0])
emag = np.float(content[i + 1].split("=")[1].split()[0])
else:
eint = 0.0
emag = 0.0
series["eint"][run].append(eint)
series["emag"][run].append(emag)
series["elapsed"][run].append(
np.float(content[j].split(":")[1].split()[0])
)
series["memory"][run].append(content[j + 1].split(":")[1])
return series
def _extract_rms_from_log(self, series, log_filename, run):
cmd_grep = "grep 'turbulent rms' {} -C 1".format(log_filename)
content = os.popen(cmd_grep).readlines()
for i in range(0, len(content), 4):
series["time"][run].append(np.float(content[i].split("=")[2].split()[0]))
series["dt"][run].append(np.float(content[i].split("=")[3].split()[0]))
series["turb_rms"][run].append(np.float(content[i + 1].split(":")[1]))
try:
turb_energy = np.float(content[i + 2].split(":")[1])
threshold = self.params.rules.turb_energy_threshold
assert threshold < 0 or abs(turb_energy) < threshold
series["turb_energy"][run].append(abs(turb_energy))
except (AssertionError, ValueError, IndexError):
series["turb_energy"][run].append(np.nan)
return series
def get_logs(self, run):
glob_str = f"{self.path}/{run}/{self.params.input.log_prefix}*"
logs = glob.glob(glob_str)
if len(logs) == 0:
glob_str = f"{self.path}/{run}/logs/{self.params.input.log_prefix}*"
logs = glob.glob(glob_str)
return logs
def _from_log(self, keys, extractor):
# Initialize series
series = {}
for key in keys:
series[key] = {}
for run in self.runs:
# Initialize list
for key in keys:
series[key][run] = []
# Get list of log files
log_files = self.get_logs(run)
# Parse files
for log_filename in log_files:
series = extractor(series, log_filename, run)
# Numpify the lists
for key in series:
series[key][run] = np.array(series[key][run])
# Sort the arrays
ind_sort = series["time"][run].argsort()
for key in series:
series[key][run] = series[key][run][ind_sort]
return series
def _ssfr_from_mass_sink(self, avg_window=None):
"""
avg_window in year
"""
time_unit = self.save.get_node("/series/sinks_from_log/time")._v_attrs.unit
mass_unit = self.save.get_node("/series/sinks_from_log/mass_sink")._v_attrs.unit
ssfr = {}
for run in self.runs:
# Surface of the box in pc^2
info = self.snaps[run][self.nums[run][0]].info
surface = (info["unit_length"].express(U.pc)) ** 2
# WARNING : We do not multiply by boxlen since already done in 'unit_length' (pymses)
time = self.save.get_node("/series/sinks_from_log/time/" + run).read()
time = time * time_unit.express(U.year)
mass_sink = self.save.get_node(
"/series/sinks_from_log/mass_sink/" + run
).read()
mass_sink = mass_sink * mass_unit.express(U.Msun)
if avg_window is None:
shift = 1
else:
# We assume that the timestep do not vary a lot ...
shift = np.searchsorted(time, time[-1] - avg_window, side="right")
shift = len(time) - shift
sfr = (mass_sink[shift:] - mass_sink[:-shift]) / (
time[shift:] - time[:-shift]
)
sfr_beg = (mass_sink[:shift] - mass_sink[0]) / (time[:shift] - time[0])
ssfr[run] = np.zeros(len(mass_sink))
ssfr[run][shift:] = sfr / surface
ssfr[run][:shift] = sfr_beg / surface
return ssfr, {"avg_window": avg_window}
def _surfacic_sink_mass(self):
mass_unit = self.save.get_node("/series/sinks_from_log/mass_sink")._v_attrs.unit
ssm = {}
for run in self.runs:
# Surface of the box in pc^2
info = self.snaps[run][self.nums[run][0]].info
surface = (info["unit_length"].express(U.pc)) ** 2
mass_sink = self.save.get_node(
"/series/sinks_from_log/mass_sink/" + run
).read()
mass_sink = mass_sink * mass_unit.express(U.Msun)
ssm[run] = mass_sink / surface
return ssm
def _turb_power(self):
turb_power = {}
for run in self.runs:
dt = self.save.get_node("/series/rms_from_log/dt/" + run).read()
# Energy injected at each timestep
energy = self.save.get_node(
"/series/rms_from_log/turb_energy/" + run
).read()
# Power of the turbulence at this step in Watts
turb_power[run] = energy / dt
return turb_power
def _sbeta_onavg(self):
"""
[ismfeed] Compute the slope of the Sigma pdf as a function of the value of beta
"""
col_pdf = self.get_value("/comp/avg_time_coldens_pdf_z")
beta = self.get_value("/comp/nml_cloud_params/beta_cool")
slope = np.zeros(len(col_pdf["runs"]))
origin = np.zeros(len(col_pdf["runs"]))
stderr = np.zeros(len(col_pdf["runs"]))
for i, run in enumerate(col_pdf["runs"]):
values, centers = col_pdf["mean"][i]
mask_fit = (
(centers > self.params.pdf.xmin_fit)
& (centers < self.params.pdf.xmax_fit)
& (values > np.max(values) * self.params.pdf.fit_cut)
)
(slope[i], origin[i], correlation, _, stderr[i]) = linregress(
centers[mask_fit], np.log10(values[mask_fit])
)
return {"beta": beta, "slope": slope, "origin": origin, "stderr": stderr}
def _gen_rule_time_global(
self,
glob_name,
name=None,
glob_group="/globals",
subarray_name=None,
unload_cells=True,
unit=U.none,
description="",
):
if name is None:
name = "time_" + glob_name
self.rules[name] = Rule(
self,
partial(
self._time_series,
partial(
self.get_global, glob_group + "/" + glob_name, unload_cells=True
),
),
group="/series",
unit=unit,
dependencies={"time": None, glob_name: "__parent__"},
)
def _gen_rule_avg(self, rule_src_name, subarray_name=None, name=None):
rule_src = self.rules[rule_src_name]
if subarray_name is None:
src_name = rule_src_name
group_src = rule_src.group
unit = rule_src.unit
descr = rule_src.description
else:
src_name = subarray_name
group_src = rule_src.group + "/" + rule_src_name
unit = rule_src.unit[subarray_name]
descr = rule_src.description[subarray_name]
description = {
"runs": "List of runs",
"mean": "Temporal average of {}".format(descr),
"std": "Standard deviation of {}".format(descr),
"median": "Median of {}".format(descr),
"max": "Maximum of {}".format(descr),
"min": "Minimum of {}".format(descr),
"q025": "2.5 percentile of {}".format(descr),
"q975": "97.5 percentile of {}".format(descr),
"q16": "16 percentile of {}".format(descr),
"q84": "84 percentile of {}".format(descr),
}
units = unit
if name is None:
name = "avg_" + src_name
def fn(arg=None, **kwargs):
if arg is None:
return self._time_avg(src_name, group=group_src, **kwargs)
else:
return self._time_avg(
src_name + "_" + str(arg), group=group_src, **kwargs
)
self.rules[name] = Rule(
self,
fn,
group="/comp",
unit=units,
description=description,
dependencies=[rule_src_name],
)
def def_rules(self):
self.rules = {
# Read from log
"sinks_from_log": Rule(
self,
partial(
self._from_log,
["time", "mass_sink", "nb_sink"],
self._extract_sinks_from_log,
),
group="/series",
unit={"time": "unit_time", "mass_sink": U.Msun, "nb_sink": U.none},
description={
"time": "Time",
"mass_sink": "Total mass of stars",
"nb_sink": "Number of stars",
},
),
"issfr": Rule(
self,
self._ssfr_from_mass_sink,
group="/series/sinks_from_log",
unit=U.ssfr,
description="Instantaneous surfacic star formation rate",
dependencies=["sinks_from_log"],
),
"ssm": Rule(
self,
self._surfacic_sink_mass,
group="/series/sinks_from_log",
unit=U.Msun / U.pc ** 2,
description="Surfacic sink mass",
dependencies=["sinks_from_log"],
),
"sfr_from_log": Rule(
self,
partial(self._from_log, ["time", "sfr"], self._extract_sfr_from_log),
group="/series",
unit={"time": U.year, "sfr": U.ssfr},
description={
"time": "Time",
"sfr": "Averaged surfacic star formation rate",
},
),
"rms_from_log": Rule(
self,
partial(
self._from_log,
["time", "dt", "turb_rms", "turb_energy"],
self._extract_rms_from_log,
),
group="/series",
unit={
"time": "unit_time",
"dt": "unit_time",
"turb_rms": U.none,
"turb_energy": {
"unit_length": 3,
"unit_velocity": 2,
"unit_density": 1,
},
},
description={
"time": "Time",
"dt": "Timestep",
"turb_rms": "Computed turbulent RMS",
"turb_energy": "Injected turbulent energy",
},
),
"coarse_step_from_log": Rule(
self,
partial(
self._from_log,
[
"time",
"step",
"mcons",
"econs",
"epot",
"ekin",
"eint",
"emag",
"elapsed",
"memory",
],
self._extract_coarse_step_from_log,
),
group="/series",
unit={
"time": "unit_time",
"step": U.none,
"mcons": U.none,
"econs": U.none,
"epot": U.none, # TODO find unit
"ekin": U.none,
"eint": U.none,
"emag": U.none,
"elapsed": U.s,
"memory": U.none,
},
),
"fine_step_from_log": Rule(
self,
partial(
self._from_log,
["time", "fine_step", "dt", "a", "mem_cells", "mem_parts"],
self._extract_fine_step_from_log,
),
group="/series",
unit={
"time": "unit_time",
"fine_step": U.none,
"dt": "unit_time",
"a": U.none,
"mem_cells": U.none,
"mem_parts": U.none,
},
),
"turb_power": Rule(
self,
self._turb_power,
group="/series/rms_from_log",
unit={
"unit_length": 3,
"unit_velocity": 2,
"unit_density": 1,
"unit_time": -1,
},
description="Injected turbulent power",
dependencies=["rms_from_log"],
),
# Read from outputs
"time": Rule(
self,
partial(
self._time_series, partial(self.get_global, "/globals/time_num")
),
group="/series",
unit="unit_time",
dependencies=["time_num"],
),
"time_rho_prof": Rule(
self,
partial(
self._time_series, partial(self.get_snap_value, "/profile/rho_prof")
),
group="/series",
dependencies={"time": None, "rho_prof": "__parent__"},
),
"time_coldens_pdf": Rule(
self,
partial(
self._time_series, partial(self.get_snap_value, "/hist/pdf_coldens")
),
group="/series",
dependencies={"time": None, "pdf_coldens": "__parent__"},
),
"time_pdf_slope_coldens": Rule(
self,
partial(
self._time_series,
partial(
self.get_attr,
"slope",
node_name="/hist/pdf_coldens_z",
),
),
group="/series",
dependencies={"time": None, "fit_pdf_coldens": "z"},
),
"sbeta_onavg": Rule(
self,
partial(self._sbeta_onavg),
group="/comp",
dependencies={
"avg_time_coldens_pdf": "z",
"nml": "cloud_params/beta_cool",
},
),
# namelist
"nml": Rule(
self,
lambda nml_key: self._compare(
partial(self.get_nml, nml_key), use_num=False
),
group="/comp",
),
}
self._gen_rule_time_global("mwa_sigma", "time_sigma", unit="unit_velocity")
self._gen_rule_time_global("max_fluct_coldens")
self._gen_rule_time_global(
"mass", unit=self.info["unit_density"] * self.info["unit_length"] ** 3
)
self._gen_rule_time_global("mwa_B_int", unit="unit_mag")
for name in [
"issfr",
"time_sigma",
"time_pdf_slope_coldens",
"turb_power",
"time_rho_prof",
"time_coldens_pdf",
]:
self._gen_rule_avg(name)
self._gen_rule_avg("sinks_from_log", "mass_sink")
self._gen_rule_avg("sinks_from_log", "nb_sink")
self._gen_rule_avg("sfr_from_log", "sfr")
self._gen_rule_avg("rms_from_log", "turb_rms")
self._gen_rule_avg("rms_from_log", "turb_energy")
super(StudyProcessor, self).def_rules()