Files
pipeline/postprocessor.py
T
2020-12-14 16:35:19 +01:00

1373 lines
45 KiB
Python

# coding: utf-8
import sys
import os
import glob as glob
import tables
import pymses
import numpy as np
from numpy.polynomial.polynomial import polyfit
from scipy.stats import linregress
from pymses.sources.ramses import output
from pymses.sources.hop.file_formats import *
from pymses.analysis import Camera, raytracing, slicing, splatting
from pymses.filters import CellsToPoints
from pymses.analysis import ScalarOperator, FractionOperator, MaxLevelOperator
from mypool import MyPool as Pool
from functools import partial
from abc import ABCMeta, abstractmethod
import contextlib
import bunch
from run_selector import *
from units import *
class Rule:
def __init__(
self,
postproc,
process,
description="",
group="",
dependencies=[],
is_valid=lambda arg: True,
kind="classic",
unit=cst.none,
):
self.postproc = postproc
self.process_fn = process
self.dependencies = dependencies
self.is_valid_add = is_valid
self.group = group
self.description = description
self.unit = unit
self.kind = kind
def process(self, arg, **kwargs):
if not arg is None:
return self.process_fn(arg, **kwargs)
else:
return self.process_fn(**kwargs)
def is_valid(self, arg):
save = self.postproc.save
valid = True
for dep in self.dependencies:
if dep in self.postproc.rules:
rule_dep = self.postproc.rules[dep]
if not arg is None:
valid = (
valid and rule_dep.group + "/" + dep + "_" + str(arg) in save
)
else:
valid = valid and rule_dep.group + "/" + dep in save
return valid and self.is_valid_add(arg)
class BaseProcessor:
"""
Base class for processors, should not be instanciated
"""
__metaclass__ = ABCMeta
log_id = ""
rules = {}
solve_self_dep = True
def __init__(self, path, path_out=None, pp_params=None, tag=None):
if pp_params is None:
self.pp_params = default_params()
elif type(pp_params) == str:
self.pp_params = load_params(pp_params)
else:
self.pp_params = pp_params
if tag is not None:
self.pp_params.out.tag = tag
# Determining output directory
if path_out is None:
self.path_out = path
else:
self.path_out = path_out
def _log(self, string, status=""):
if self.pp_params.process.verbose:
if len(status) > 0:
print(status + ": " + self.log_id + string)
else:
print(self.log_id + string)
def process(
self,
to_process_list,
args=[None],
overwrite=False,
overwrite_dep=False,
**kwargs
):
"""
Render the data in to_process_list and save them
"""
if type(to_process_list) == str:
to_process_list = [to_process_list]
if type(args) == str or args is None:
args = [args]
self.overwrite_dep = overwrite_dep
self.just_done = [] # Computations done within this call
for name in to_process_list:
if name in self.rules:
rule = self.rules[name]
for arg in args:
self._solve_and_process_rule(name, rule, arg, overwrite, **kwargs)
else:
self._log(
"{} is unknown, allowed rules are {}".format(
name, self.rules.keys()
),
"ERROR",
)
return self.just_done
def _solve_and_process_rule(self, name, rule, arg, overwrite=False, **kwargs):
updated = self._solve_dependencies(name, rule, arg, overwrite, **kwargs)
overwrite_rule = overwrite or updated
self._process_rule(name, rule, arg, overwrite, **kwargs)
def _solve_dependencies(self, name, rule, arg, overwrite=False, **kwargs):
self.done_before_dep = len(self.just_done)
if type(rule.dependencies) == dict:
dep_arg = rule.dependencies[dep]
else:
dep_arg = arg
# Solve dependencies
for dep in rule.dependencies:
if self.solve_self_dep and dep in self.rules:
rule_dep = self.rules[dep]
self._solve_and_process_rule(dep, rule_dep, dep_arg, self.overwrite_dep)
else:
self._not_self_dep(name, dep, dep_arg, self.overwrite_dep, **kwargs)
# Whether dependencies where updated
return len(self.just_done) > self.done_before_dep
def _not_self_dep(self, name, dep, dep_arg, overwrite, **kwargs):
self._log("Dependency {} for {} is unknown".format(dep, name), "ERROR")
def _needs_computation(self, overwrite, name_full):
return overwrite or not (name_full in self.save)
def _process_rule(self, name, rule, arg, overwrite=False, **kwargs):
if not arg is None:
name_full = rule.group + "/" + name + "_" + str(arg)
else:
name_full = rule.group + "/" + name
if rule.is_valid(arg):
if not name_full in self.just_done:
if self._needs_computation(overwrite, name_full):
self._log("Processing {}".format(name_full))
data = rule.process(arg, **kwargs)
self._save_data(name_full, data, rule.description, rule.unit)
self._log("Data for {} computed".format(name_full), "SUCCESS")
self.just_done.append(name_full)
else:
self._log(
"Data for {} is already computed, skipping...".format(name_full)
)
else:
self._log("{} is not valid in this context".format(name_full), "ERROR")
def _save_data(self, name_full, data, description, unit):
"""
Save data in the HDF5 structure, overwrite if necessary
"""
if name_full in self.save:
self.save.remove_node(name_full, recursive=True)
if type(data) == dict:
if type(description) == str:
self.save.create_group(
os.path.dirname(name_full),
os.path.basename(name_full),
description,
createparents=True,
)
else:
self.save.create_group(
os.path.dirname(name_full),
os.path.basename(name_full),
"",
createparents=True,
)
if not type(unit) == dict:
self.save.get_node(name_full)._v_attrs.unit = unit
for key in data:
if type(description) == dict:
if type(unit) == dict:
self._save_data(
name_full + "/" + key,
data[key],
description[key],
unit[key],
)
else:
self._save_data(
name_full + "/" + key, data[key], description[key], unit
)
else:
if type(unit) == dict:
self._save_data(name_full + "/" + key, data[key], "", unit[key])
else:
self._save_data(name_full + "/" + key, data[key], "", unit)
else:
self.save.create_array(
os.path.dirname(name_full),
os.path.basename(name_full),
data,
description,
createparents=True,
)
self.save.get_node(name_full).attrs.unit = unit
@abstractmethod
def def_rules(self):
pass
class HDF5Container(BaseProcessor):
filename = ""
save = None
opened = False
def open(self):
if not self.opened:
self.save = tables.open_file(self.filename, mode="a")
self.opened = True
def close(self):
if self.opened:
self.save.close()
self.opened = False
def _process_rule(self, name, rule, arg, overwrite, **kwargs):
self.open()
try:
super(HDF5Container, self)._process_rule(
name, rule, arg, overwrite, **kwargs
)
finally:
self.close()
def get_value(self, node_name):
self.open()
try:
node = self.save.get_node(node_name)
if node._v_attrs.CLASS == "GROUP":
value = {}
for child_name in node._v_children:
value[child_name] = self.get_value(node_name + "/" + child_name)
else:
value = node.read()
finally:
self.close()
return value
def get_attribute(self, node_name, attr_name):
self.open()
try:
node = self.save.get_node(node_name)
attr = node._v_attrs[attr_name]
finally:
self.close()
return attr
def _map_rule(rule, arg, overwrite, path, path_out, pp_params, run_num):
pp = PostProcessor(
path + "/" + run_num[0], run_num[1], path_out + "/" + run_num[0], pp_params
)
return pp.process(rule, arg, overwrite)
class Aggregator:
def _not_self_dep(self, name, dep, dep_arg, overwrite, **kwargs):
if "runs" in kwargs:
dep_runs = [run for run in self.runs if run in kwargs["runs"]]
else:
dep_runs = self.runs
pps = [[self.pp_runs[run][num] for num in self.nums[run]] for run in dep_runs]
run_num = [(run, num) for run in dep_runs for num in self.nums[run]]
map_fn = partial(
_map_rule, dep, dep_arg, overwrite, self.path, self.path_out, self.pp_params
)
if self.pp_params.process.num_process > 1:
pool = Pool(processes=self.pp_params.process.num_process)
done = pool.map(map_fn, run_num)
pool.close()
pool.join()
else:
done = map(map_fn, run_num)
self.just_done.extend([item for li in done for item in li])
def simple_getter(name, dset):
return dset[name]
mass_func = lambda dset: dset["rho"] * dset.get_sizes() ** 3 # Mass function
class PostProcessor(HDF5Container):
"""
This class enable to compute and save derived quantities from the raw output
"""
# Axes information
_ax_nb = {"x": 0, "y": 1, "z": 2} # Number of each axes
_axes_h = {"x": "y", "y": "x", "z": "x"} # Associated horizontal axe
_axes_v = {"x": "z", "y": "z", "z": "y"} # Associated vertical axe
G = 1.0 # Gravitational constant
cells_loaded = False
def __init__(
self,
path=None,
num=None,
path_out=None,
pp_params=None,
tag=None,
variables=["rho", "vel", "Br", "Bl", "P", "g", "phi"],
):
"""
Creates the basic structures needed for the outputs
"""
super(PostProcessor, self).__init__(path, path_out, pp_params, tag)
# Open outfile
if not self.pp_params.out.tag == "":
tag_name = self.pp_params.out.tag + "_"
else:
tag_name = ""
self.filename = path_out + "/postproc_" + tag_name + format(num, "05") + ".h5"
if not os.path.exists(path_out):
os.makedirs(path_out)
self.open()
# Ramses Output
self.path = path
self.run = os.path.basename(path)
self.num = num
self._ro = pymses.RamsesOutput(path, num, order=pp_params.pymses.order)
self.variables = variables
self._amr = self._ro.amr_source(self.variables)
self.info = self._ro.info.copy()
# Density operator
self._rho_op = ScalarOperator(
lambda dset: dset["rho"], self._ro.info["unit_density"]
)
# Density ray tracer
if pp_params.pymses.fft:
self._rt = splatting.SplatterProcessor(
self._amr, self._ro.info, self._rho_op
)
else:
self._rt = raytracing.RayTracer(self._amr, self._ro.info, self._rho_op)
# Set the extend of the image
self._radius = 0.5 / pp_params.out.zoom
self._lbox = self.info["boxlen"]
center = pp_params.out.center
im_extent = [
(-self._radius + center[0]) * self._lbox,
(self._radius + center[0]) * self._lbox,
(-self._radius + center[1]) * self._lbox,
(self._radius + center[1]) * self._lbox,
]
# Get time
time = self._ro.info["time"] # time in codeunits
# Set post processing attributes
self.save.root._v_attrs.dir = os.path.dirname(path)
self.save.root._v_attrs.run = os.path.basename(path)
self.save.root._v_attrs.num = num
self.save.root._v_attrs.lbox = self._lbox
self.save.root._v_attrs.time = time
if not "/maps" in self.save:
self.save.create_group("/", "maps", "2D maps")
self.save.root.maps._v_attrs.center = center
self.save.root.maps._v_attrs.radius = self._radius
self.save.root.maps._v_attrs.im_extent = im_extent
# Initialize cameras
self._cam = {}
for ax_los in self._ax_nb: # los = line of sight
ax_h = self._axes_h[ax_los]
ax_v = self._axes_v[ax_los]
self._cam[ax_los] = Camera(
center=pp_params.out.center,
line_of_sight_axis=ax_los,
region_size=[2.0 * self._radius, 2.0 * self._radius],
distance=self._radius,
far_cut_depth=self._radius,
up_vector=ax_v,
map_max_size=pp_params.out.map_size,
)
self._add_metadata()
self.close()
self.log_id = "[{}, {}] ".format(self.run, self.num)
self.def_rules()
def _add_metadata(self):
"""
Add additional metadata to the file
"""
# Label of the run in the label.txt file
label_filename = self.path + "/" + self.pp_params.input.label_filename
if os.path.exists(label_filename):
label_file = open(label_filename, "r")
self.label = label_file.readline()
label_file.close()
else:
self.label = self.run
self.save.root._v_attrs.label = self.label
# def open_pymlog(self):
# if self.pp_params.pymses.verbose:
# return sys.stdout
# else:
# return open(os.devnull, "w")
def load_cells(self):
if not self.cells_loaded:
# with self.open_pymlog() as f, contextlib.redirect_stdout(f):
cell_source = CellsToPoints(self._amr)
self.cells = cell_source.flatten()
self.cells_loaded = True
def unload_cells(self):
if self.cells_loaded:
del self.cells
self.cells_loaded = False
def _slice(self, getter, ax_los="z", z=0, unit=cst.none):
op = ScalarOperator(getter, unit)
datamap = slicing.SliceMap(self._amr, self._cam[ax_los], op, z=z)
return datamap.map.T
def _ax_avg(
self, getter, ax_los, unit=cst.none, mass_weighted=True, surf_qty=False
):
if mass_weighted:
def num(cells):
value = getter(cells)
mass = mass_func(cells)
# Transpose (.T) is for vectorial values
return (mass * value.T).T
op = FractionOperator(num, mass_function, unit)
else:
op = ScalarOperator(getter, unit)
if pp_params.pymses.fft:
rt = splatting.SplatterProcessor(self._amr, self._ro.info, op)
else:
rt = raytracing.RayTracer(self._amr, self._ro.info, op)
datamap = rt.process(self._cam[ax_los], surf_qty=surf_qty)
return datamap.map.T
def _vol_avg(self, getter, mass_weighted=True):
self.load_cells()
value = getter(self.cells)
if mass_weighted:
mass = mass_func(self.cells)
# Transpose (.T) is for vectorial values
return np.sum((mass * value.T).T, axis=0) / np.sum(mass)
else:
return np.sum(value, axis=0)
def _mwa_sigma(self):
mw_speed = self.save.get_node("/globals/mwa_speed").read()
def getter(dset):
return np.sum((dset["vel"] - mw_speed) ** 2, axis=1)
return np.sqrt(self._vol_avg(getter, mass_weighted=True))
def _coldens(self, ax_los):
datamap = self._rt.process(self._cam[ax_los], surf_qty=True)
return datamap.map.T
def _rho(self, ax_los, z=0.0):
datamap_rho = slicing.SliceMap(self._amr, self._cam[ax_los], self._rho_op, z=z)
return (datamap_rho.map).T
def _speed_h(self, ax_los, z=0.0):
vh_op = ScalarOperator(
lambda dset: dset["vel"][:, self._ax_nb[self._axes_h[ax_los]]],
self._ro.info["unit_velocity"],
)
dmap_vh = slicing.SliceMap(self._amr, self._cam[ax_los], vh_op, z=z).map.T
return dmap_vh
def _speed_v(self, ax_los, z=0.0):
vv_op = ScalarOperator(
lambda dset: dset["vel"][:, self._ax_nb[self._axes_v[ax_los]]],
self._ro.info["unit_velocity"],
)
dmap_vv = slicing.SliceMap(self._amr, self._cam[ax_los], vv_op, z=z).map.T
return dmap_vv
def _temperature(self, ax_los, z=0.0):
P_op = ScalarOperator(lambda dset: dset["P"], self._ro.info["unit_pressure"])
dmap_P = (slicing.SliceMap(self._amr, self._cam[ax_los], P_op, z=z)).map.T
dmap_rho = self.save.get_node("/maps/rho_{}".format(ax_los)).read()
return dmap_P / dmap_rho
def _levels(self, ax_los):
self._amr.set_read_levelmax(20)
level_op = MaxLevelOperator()
rt_level = raytracing.RayTracer(self._amr, self._ro.info, level_op)
datamap = rt_level.process(self._cam[ax_los], surf_qty=True)
return datamap.map.T
def _jeans(self, ax_los):
dmap_T = self.save.get_node("/maps/T_" + ax_los).read()
dmap_rho = self.save.get_node("/maps/rho_" + ax_los).read()
dmap_jeans = np.sqrt(np.pi * dmap_T / dmap_rho)
return dmap_jeans
def _jeans_ratio(self, ax_los):
dmap_jeans = self.save.get_node("/maps/jeans_" + ax_los).read()
dmap_levels = self.save.get_node("/maps/levels_" + ax_los).read()
dmap_jeans_ratio = dmap_jeans * 2 ** (dmap_levels)
return dmap_jeans_ratio
def _toomreQ_disk(self, ax_los):
"""
Compute the Toomre Q parameter in a Keplerian disk
"""
# Operator to compute the angular speed times rho
def omega_rho_func(dset):
pos = dset.get_cell_centers()
pos = pos - (self.pp_params.disk.pos_star / self._lbox)
xx = pos[:, :, 0]
yy = pos[:, :, 1]
rc = np.sqrt(xx ** 2 + yy ** 2) # cylindrical radius
vx = dset["vel"][:, :, 0]
vy = dset["vel"][:, :, 1]
omega_rho = 1.0 / rc ** 2
omega_rho = omega_rho * dset["rho"]
vyx = vy * xx
vxy = vx * yy
omega_rho = omega_rho * (vyx - vxy)
return omega_rho
# Operator to compute the angular speed
omega_op = FractionOperator(
omega_rho_func, lambda dset: dset["rho"], 1.0 / self._ro.info["unit_time"]
)
# Operator to compute the sound speed
cs_op = FractionOperator(
lambda dset: dset["P"],
lambda dset: dset["rho"],
self._ro.info["unit_velocity"],
)
# Ray tracer for the angular speed
rt_omega = raytracing.RayTracer(self._amr, self._ro.info, omega_op)
# Ray tracer for the sound speed
if self.pp_params.pymses.fft:
rt_cs = splatting.SplatterProcessor(
self._amr, self._ro.info, cs_op, surf_qty=False
)
else:
rt_cs = raytracing.RayTracer(self._amr, self._ro.info, cs_op)
dmap_omega = rt_omega.process(self._cam[ax_los])
dmap_cs = rt_cs.process(self._cam[ax_los])
dmap_col = self.save.root.maps.coldens_z.read()
map_Q = (
(self._lbox * dmap_cs.map.T)
* dmap_omega.map.T
/ (np.pi * self.G * dmap_col)
)
return map_Q
def _radial_bins(self, _):
pos_star = self.pp_params.disk.pos_star
im_extent = self.save.root.maps._v_attrs.im_extent
# radius of the corner of the box plus a margin
rad_of_box = (
np.sqrt(
(im_extent[1] - pos_star[0]) ** 2 + (im_extent[3] - pos_star[1]) ** 2
)
+ 0.1
)
bin_in = self.pp_params.disk.bin_in
bin_out = self.pp_params.disk.bin_out
nb_bin = self.pp_params.disk.nb_bin
# radial bins
if self.pp_params.disk.binning == "log":
lrad_in = np.log10(bin_in)
lrad_ext = np.log10(bin_out)
rad_bins = np.logspace(lrad_in, lrad_ext, num=nb_bin)
elif self.pp_params.disk.binning == "lin":
rad_bins = np.linspace(bin_in, bin_out, num=nb_bin)
# Add boundaries
rad_bins = np.concatenate(([0.0], rad_bins, [rad_of_box]))
return rad_bins
def _rr(self, _):
im_extent = self.save.root.maps._v_attrs.im_extent
map_size = self.pp_params.out.map_size
pos_star = self.pp_params.disk.pos_star
x = np.linspace(im_extent[0], im_extent[1], map_size)
y = np.linspace(im_extent[2], im_extent[3], map_size)
xx, yy = np.meshgrid(x, y)
rr = np.sqrt((xx - pos_star[0]) ** 2 + (yy - pos_star[1]) ** 2)
return rr
def _bins_on_map(self, ax_los):
rad_bins = self.save.get_node("/radial/radial_bins_" + ax_los).read()
rr = self.save.get_node("/maps/rr_" + ax_los).read()
# Find appropriate bin for each coordinate set
bins = np.zeros(rr.shape, dtype=int)
for r in rad_bins[1:]:
bins = bins + (rr >= r).astype(int)
return bins
def _rad_avg(self, name, ax_los):
radial_bins = self.save.get_node("/radial/radial_bins_" + ax_los).read()
bins_on_map = self.save.get_node("/maps/bins_on_map_" + ax_los).read()
dmap = self.save.get_node("/maps/" + name + "_" + ax_los).read()
# mean of all the cells in the bin
mean_bin = np.zeros(len(radial_bins) - 1)
for j in range(len(radial_bins) - 1):
mean_bin[j] = np.mean(dmap[bins_on_map == j])
return mean_bin
def _rad_avg_map(self, name, ax_los):
radial_bins = self.save.get_node("/radial/radial_bins_" + ax_los).read()
bins_on_map = self.save.get_node("/maps/bins_on_map_" + ax_los).read()
rr = self.save.get_node("/maps/rr_" + ax_los).read()
mean_bin = self.save.get_node("/radial/rad_avg_" + name + "_" + ax_los).read()
# Add value for border
mean_bin = np.concatenate(([mean_bin[0]], mean_bin))
rr_flat = rr.flatten()
bins_on_map_flat = bins_on_map.flatten()
# Compute the map azimuthally averaged
# use linear interpolation to improve accuracy
avg_flat = (radial_bins[bins_on_map_flat + 1] - rr_flat) * mean_bin[
bins_on_map_flat
]
avg_flat = (
avg_flat
+ (rr_flat - radial_bins[bins_on_map_flat]) * mean_bin[bins_on_map_flat + 1]
)
avg_flat = avg_flat / (
radial_bins[bins_on_map_flat + 1] - radial_bins[bins_on_map_flat]
)
avg_map = np.reshape(avg_flat, rr.shape)
return avg_map
def _fluct_map(self, name, ax_los):
dmap = self.save.get_node("/maps/" + name + "_" + ax_los).read()
avg_map = self.save.get_node("/maps/avg_map_" + name + "_" + ax_los).read()
return dmap / avg_map
def _pdf(self, name, ax_los):
fluct_map = self.save.get_node("/maps/fluct_" + name + "_" + ax_los).read()
rr = self.save.get_node("/maps/rr_" + ax_los).read()
mask_pdf = (rr > self.pp_params.disk.rmin_pdf) & (
rr < self.pp_params.disk.rmax_pdf
)
nb_cells = np.sum(mask_pdf.flatten())
values, edges = np.histogram(
np.log10(fluct_map[mask_pdf].flatten()),
self.pp_params.pdf.nb_bin,
weights=np.ones(nb_cells) / nb_cells,
)
centers = 0.5 * (edges[1:] + edges[:-1])
return np.stack([values, centers])
def _fit_pdf(self, name, ax_los):
pdf = self.save.get_node("/hist/pdf_" + name + "_" + ax_los)
values, centers = pdf.read()
mask_fit = (
(centers > self.pp_params.pdf.xmin_fit)
& (centers < self.pp_params.pdf.xmax_fit)
& (values > 0)
)
(slope, origin, correlation, _, stderr) = linregress(
centers[mask_fit], np.log10(values[mask_fit])
)
pdf.attrs.slope = slope
pdf.attrs.origin = origin
pdf.attrs.correlation = correlation
pdf.attrs.stderr = stderr
pdf.attrs.var = np.var
return True
def _sinks(self):
csv_name = (
self.path
+ "/output_"
+ str(self.num).zfill(5)
+ "/sink_"
+ str(self.num).zfill(5)
+ ".csv"
)
sinks = np.loadtxt(csv_name, delimiter=",")
header = [
"Id",
"M",
"dmf",
"x",
"y",
"z",
"vx",
"vy",
"vz",
"rot_period",
"lx",
"ly",
"lz",
"acc_rate",
"acc_lum",
"age",
"int_lum",
"Teff",
]
if len(sinks) == 0:
sinks = np.zeros(len(header))
sinks_dict = {}
for key, a in zip(header, sinks):
sinks_dict[key] = a
return sinks_dict
def def_rules(self):
self.rules = {
# Base rules
"coldens": Rule(
self,
self._coldens,
"Column density",
"/maps",
unit=self.info["unit_density"] * self.info["unit_length"],
),
"rho": Rule(
self,
self._rho,
"Density slice",
"/maps",
unit=self.info["unit_density"],
),
"speed_h": Rule(
self,
self._speed_h,
"Horizontal speed slice wrt the line of sight",
"/maps",
unit=self.info["unit_velocity"],
),
"speed_v": Rule(
self,
self._speed_v,
"Vertical speed slice wrt the line of sight",
"/maps",
unit=self.info["unit_velocity"],
),
"T": Rule(
self,
self._temperature,
"Temperature slice",
"/maps",
dependencies=["rho"],
unit=self.info["unit_temperature"],
),
"levels": Rule(
self, self._levels, "Max level within line of sight", "/maps"
),
"jeans": Rule(
self,
self._jeans,
"Jeans lenght slice",
"/maps",
dependencies=["rho", "T"],
),
"jeans_ratio": Rule(
self,
self._jeans_ratio,
"Jeans' lenght divided by the max resolution",
"/maps",
dependencies=["jeans", "levels"],
),
"Q": Rule(
self,
self._toomreQ_disk,
"Toomre Q parameter for a Keplerian disk",
"/maps",
dependencies=["coldens"],
is_valid=lambda _: self.pp_params.disk.on,
),
"sinks": Rule(
self,
self._sinks,
group="/datasets",
unit={
"Id": cst.none,
"M": cst.Msun,
"dmf": cst.Msun,
"x": "",
"y": "",
"z": "",
"vx": "",
"vy": "",
"vz": "",
"rot_period": "[y]",
"lx": "|l|",
"ly": "|l|",
"lz": "|l|",
"acc_rate": "[Msol/y]",
"acc_lum": "[Lsol]",
"age": cst.year,
"int_lum": "[Lsol]",
"Teff": cst.K,
},
),
# Helpers
"radial_bins": Rule(self, self._radial_bins, "Radial bins", "/radial"),
"rr": Rule(self, self._rr, "Coordinate map", "/maps"),
"bins_on_map": Rule(
self,
self._bins_on_map,
"Convert map coordinates to bins",
"/maps",
dependencies=["radial_bins", "rr"],
),
# globals
"time_num": Rule(
self,
lambda: self.info["time"],
"Time",
"/globals",
unit=self.info["unit_time"],
),
"mwa_speed": Rule(
self,
partial(self._vol_avg, partial(simple_getter, "vel")),
"Mass weighted speed average",
"/globals",
unit=self.info["unit_velocity"],
),
"mwa_sigma": Rule(
self,
self._mwa_sigma,
"Mass weighted speed average",
"/globals",
dependencies=["mwa_speed"],
unit=self.info["unit_velocity"],
),
}
# Average and other
averageables = ["coldens", "rho", "T", "Q"]
for name in averageables:
self.rules["rad_avg_" + name] = Rule(
self,
partial(self._rad_avg, name),
"Azimuthal average of {}".format(name),
"/radial",
dependencies=["radial_bins", "bins_on_map", name],
)
self.rules["avg_map_" + name] = Rule(
self,
partial(self._rad_avg_map, name),
"Interpolated map of azimuthal average of {}".format(name),
"/maps",
dependencies=["radial_bins", "bins_on_map", "rr", "rad_avg_" + name],
)
self.rules["fluct_" + name] = Rule(
self,
partial(self._fluct_map, name),
"Fluctuation wrt to average of {}".format(name),
"/maps",
dependencies=[name, "avg_map_" + name],
)
self.rules["pdf_" + name] = Rule(
self,
partial(self._pdf, name),
"Probability density function of {} fluctuations".format(name),
"/hist",
dependencies=["rr", "fluct_" + name],
)
self.rules["fit_pdf_" + name] = Rule(
self,
partial(self._fit_pdf, name),
"Fit the PDF of {} fluctuations".format(name),
"/hist",
dependencies=["pdf_" + name],
)
class Comparator(Aggregator, HDF5Container):
"""
Do comparaison between outputs and runs
"""
def __init__(
self,
path,
in_runs,
in_nums,
path_out=None,
pp_params=default_params(),
selector=None,
tag=None,
**kwargs
):
"""
Creates the basic structures needed for the outputs
"""
super(Comparator, self).__init__(path, path_out, pp_params, tag)
# Open outfile
if not self.pp_params.out.tag == "":
tag_name = "_" + self.pp_params.out.tag
else:
tag_name = ""
self.filename = path_out + "/comp" + tag_name + ".h5"
# Select runs
if selector is None:
selector = RunSelector(path, in_runs, in_nums, self.pp_params, **kwargs)
# Save infos
self.path = path
self.runs = selector.runs
self.nums = selector.nums
# Get postprocesor objets for each run
self.pp_runs = {}
attrs = {}
for run in self.runs:
path_run = path + "/" + run
path_out_run = path_out + "/" + run
self.pp_runs[run] = {}
for num in self.nums[run]:
self.pp_runs[run][num] = PostProcessor(
path_run, num, path_out=path_out_run, pp_params=self.pp_params
)
self.namelist = selector.namelist
self.info = self.pp_runs[self.runs[0]][self.nums[self.runs[0]][0]].info
# log info
self.log_id = "[comp {}] ".format(self.pp_params.out.tag)
# Define rules
self.def_rules()
def _needs_computation(self, overwrite, name_full):
"""
Returns True if a new computation of the rule is needed
"""
if overwrite or not (name_full in self.save):
return True
elif not "nums" in self.save.get_node(name_full)._v_attrs:
return True
else:
saved_nums = self.save.get_node(name_full)._v_attrs.nums
missing_runs = len([run for run in self.nums if not run in saved_nums]) > 0
missing_nums = missing_runs and all(
[
len([num for num in self.nums[run] if not num in saved_nums[run]])
> 0
for run in self.nums
if run in saved_nums
]
)
return missing_nums
def _save_data(self, name_full, data, description, unit):
super(Comparator, self)._save_data(name_full, data, description, unit)
self.save.get_node(name_full)._v_attrs.nums = self.nums
def _time_series(self, getter):
series = {}
for run in self.runs:
series[run] = np.zeros(len(self.nums[run]))
for i, num in enumerate(self.nums[run]):
series[run][i] = getter(run, num)
return series
def _comp(self, getter):
prop = np.zeros(len(self.runs))
for i, run in enumerate(self.runs):
num = self.nums[run][0]
prop[i] = getter(run, num)
return prop
def _time_avg(self, name, start=None, end=None, span=None, group="/series"):
mean = np.zeros(len(self.runs))
median = np.zeros(len(self.runs))
std = np.zeros(len(self.runs))
for i, run in enumerate(self.runs):
serie = self.save.get_node(group + "/" + name + "/" + run).read()
time = self.save.get_node(group + "/time/" + run).read()
mask = abs(serie) != np.inf
if not ((start, end, span) == (None, None, None)):
start_r, end_r = start, end
# Be sure that start_r and end_r are defined
if start_r is None:
if end_r is None:
end_r = time[-1]
start_r = end_r - span
elif end_r is None:
end_r = start_r + span
mask = mask & (time >= start_r) & (time <= end_r)
mean[i] = np.nanmean(serie[mask])
median[i] = np.nanmedian(serie[mask])
std[i] = np.nanstd(serie[mask])
return {"runs": self.runs, "mean": mean, "std": std, "median": median}
def get_attr(self, attr_name, run, num, node_name="/"):
pp = self.pp_runs[run][num]
return pp.get_attribute(node_name, attr_name)
def get_global(self, node_name, run, num, args=[None], unload_cells=False):
pp = self.pp_runs[run][num]
if unload_cells:
pp.unload_cells()
value = pp.get_value(node_name)
return value
def get_nml(self, nml_key, run, num=0):
"""
num parameter is ignored
"""
res = self.namelist[run]
for key in nml_key.split("/"):
res = res[key]
return res
def get_pdf_slope(self, name, run, num):
pp = self.pp_runs[run][num]
pp.process(["fit_pdf_" + name], ["z"], overwrite=self.overwrite_dep)
slope = pp.get_attribute("/hist/pdf_" + name + "_z", "slope")
return slope
def _extract_sinks_from_log(self, series, log_filename, run):
cmd_grep = "grep 'Number of sink' {} -A 2".format(log_filename)
content = os.popen(cmd_grep).readlines()
for i in range(0, len(content), 4):
series["nb_sink"][run].append(np.int(content[i].split("=")[1]))
series["mass_sink"][run].append(np.float(content[i + 1].split("=")[1]))
series["time"][run].append(np.float(content[i + 2].split("=")[1]))
return series
def _extract_sfr_from_log(self, series, log_filename, run):
cmd_grep = "grep '\[SFR' {} ".format(log_filename)
content = os.popen(cmd_grep).readlines()
for i in range(0, len(content)):
time = np.float(content[i].split("]")[0].split("=")[1].split()[0])
sfr = np.float(content[i].split("]")[1].split("=")[1].split()[0])
series["time"][run].append(time)
series["sfr"][run].append(sfr)
return series
def _extract_rms_from_log(self, series, log_filename, run):
cmd_grep = "grep 'turbulent rms' {} -B 1".format(log_filename)
content = os.popen(cmd_grep).readlines()
for i in range(0, len(content), 3):
series["time"][run].append(np.float(content[i].split("=")[2].split()[0]))
series["turb_rms"][run].append(np.float(content[i + 1].split(":")[1]))
return series
def _from_log(self, keys, extractor):
nums = self.nums
# Initialize series
series = {}
for key in keys:
series[key] = {}
for run in self.runs:
# Initialize list
for key in keys:
series[key][run] = []
# get one preprocessor
path_run = self.path + "/" + run
# Get list of run files
log_files = path_run + "/" + self.pp_params.input.log_prefix + "*"
# Parse files
for log_filename in glob.glob(log_files):
series = extractor(series, log_filename, run)
# Numpify the lists
for key in series:
series[key][run] = np.array(series[key][run])
# Sort the arrays
ind_sort = series["time"][run].argsort()
for key in series:
series[key][run] = series[key][run][ind_sort]
return series
def _ssfr_from_mass_sink(self, avg_window=None):
"""
avg_window in year
"""
time_unit = self.save.get_node("/series/sinks_from_log/time")._v_attrs.unit
mass_unit = self.save.get_node("/series/sinks_from_log/mass_sink")._v_attrs.unit
# Surface of the box in pc^2
surface = (self.info["unit_length"].express(cst.pc)) ** 2
# WARNING : We do not multiply by boxlen since already done in 'unit_length' (pymses)
ssfr = {}
for run in self.runs:
time = self.save.get_node("/series/sinks_from_log/time/" + run).read()
time = time * time_unit.express(cst.year)
mass_sink = self.save.get_node(
"/series/sinks_from_log/mass_sink/" + run
).read()
mass_sink = mass_sink * mass_unit.express(cst.Msun)
if avg_window is None:
shift = 1
else:
# We assume that the timestep do not vary a lot ...
shift = np.searchsorted(time, avg_window, side="left")
sfr = (mass_sink[shift:] - mass_sink[:-shift]) / (
time[shift:] - time[:-shift]
)
ssfr[run] = np.zeros(len(mass_sink))
ssfr[run][shift:] = sfr / surface
return ssfr
def _gen_rule_avg(self, rule_src_name, subarray_name=None):
rule_src = self.rules[rule_src_name]
if subarray_name is None:
src_name = rule_src_name
group_src = rule_src.group
unit = rule_src.unit
descr = rule_src.description
else:
src_name = subarray_name
group_src = rule_src.group + "/" + rule_src_name
unit = rule_src.unit[subarray_name]
descr = rule_src.description[subarray_name]
description = {
"runs": "List of runs",
"mean": "Temporal average of {}".format(descr),
"std": "Standard deviation of {}".format(descr),
"median": "Median of {}".format(descr),
}
name = "avg_" + src_name
def fn(arg=None, **kwargs):
if arg is None:
return self._time_avg(src_name, group=group_src, **kwargs)
else:
return self._time_avg(
src_name + "_" + str(arg), group=group_src, **kwargs
)
self.rules[name] = Rule(
self,
fn,
group="/comp",
unit=unit,
description=description,
dependencies=[rule_src_name],
)
def def_rules(self):
averageables = ["coldens", "rho", "T", "Q"]
self.rules = {
# Read from log
"sinks_from_log": Rule(
self,
partial(
self._from_log,
["time", "mass_sink", "nb_sink"],
self._extract_sinks_from_log,
),
group="/series",
unit={
"time": self.info["unit_time"],
"mass_sink": cst.Msun,
"nb_sink": cst.none,
},
description={
"time": "Time",
"mass_sink": "Total mass of stars",
"nb_sink": "Number of stars",
},
),
"issfr": Rule(
self,
self._ssfr_from_mass_sink,
group="/series/sinks_from_log",
unit=cst.ssfr,
description="Instantaneous surfacic star formation rate",
dependencies=["sinks_from_log"],
),
"sfr_from_log": Rule(
self,
partial(self._from_log, ["time", "sfr"], self._extract_sfr_from_log),
group="/series",
unit={"time": cst.year, "sfr": cst.ssfr},
description={
"time": "Time",
"sfr": "Averaged surfacic star formation rate",
},
),
"rms_from_log": Rule(
self,
partial(
self._from_log, ["time", "turb_rms"], self._extract_rms_from_log
),
group="/series",
unit={"time": self.info["unit_time"], "turb_rms": cst.none},
description={"time": "Time", "turb_rms": "Computed turbulent RMS"},
),
# Read from outputs
"time": Rule(
self,
partial(
self._time_series, partial(self.get_global, "/globals/time_num")
),
group="/series",
unit=self.info["unit_time"],
dependencies=["time_num"],
),
"time_sigma": Rule(
self,
partial(
self._time_series,
partial(self.get_global, "/globals/mwa_sigma", unload_cells=True),
),
group="/series",
unit=self.info["unit_velocity"],
dependencies=["time", "mwa_sigma"],
),
# namelist
"nml": Rule(
self,
lambda nml_key: self._comp(partial(self.get_nml, nml_key)),
group="/comp",
),
}
for name in ["issfr", "time_sigma"]:
self._gen_rule_avg(name)
self._gen_rule_avg("sinks_from_log", "mass_sink")
self._gen_rule_avg("sinks_from_log", "nb_sink")
self._gen_rule_avg("sfr_from_log", "sfr")
def get_time(path, num):
"""
Return the time of the output (code units)
Parameters
----------
num output number
path_out path of the pipeline output
Returns
-------
time the time of the output (code units)
"""
try:
f = open(
path
+ "/output_"
+ str(num).zfill(5)
+ "/info_"
+ str(num).zfill(5)
+ ".txt"
)
for line in f:
ls = line.split()
if len(ls) > 1 and ls[0] == "time":
time = float(ls[2])
break
f.close()
return time
except IOError as e:
print(e)
return np.nan