# coding: utf-8 import sys import os import glob as glob import tables import pymses import numpy as np from numpy.polynomial.polynomial import polyfit from scipy.stats import linregress from pymses.sources.ramses import output from pymses.sources.hop.file_formats import * from pymses.analysis import Camera, raytracing, slicing, splatting from pymses.filters import CellsToPoints from pymses.analysis import ScalarOperator, FractionOperator, MaxLevelOperator import subprocess import module_extract as me from mypool import MyPool as Pool from functools import partial from abc import ABCMeta, abstractmethod import bunch from run_selector import * from units import * class Rule: def __init__( self, postproc, process, description="", group="", dependencies=[], is_valid=lambda arg: True, kind="classic", unit=cst.none, ): self.postproc = postproc self.process_fn = process self.dependencies = dependencies self.is_valid_add = is_valid self.group = group self.description = description self.unit = unit self.kind = kind def process(self, arg, **kwargs): if not arg is None: return self.process_fn(arg, **kwargs) else: return self.process_fn(**kwargs) def is_valid(self, arg): # save = self.postproc.save # valid = True # for dep in self.dependencies: # if dep in self.postproc.rules: # rule_dep = self.postproc.rules[dep] # if not arg is None: # valid = valid and rule_dep.group + '/' + dep + '_' + str(arg) in save # else: # valid = valid and rule_dep.group + '/' + dep in save # return valid and self.is_valid_add(arg) return self.is_valid_add(arg) class BaseProcessor: """ Base class for processors, should not be instanciated """ __metaclass__ = ABCMeta log_id = "" rules = {} solve_self_dep = True def __init__(self, path, path_out=None, pp_params=None, tag=None): if pp_params is None: self.pp_params = default_params() elif type(pp_params) == str: self.pp_params = load_params(pp_params) else: self.pp_params = pp_params if tag is not None: self.pp_params.out.tag = tag # Determining output directory if path_out is None: self.path_out = path else: self.path_out = path_out def _log(self, string, status=""): if self.pp_params.process.verbose: if len(status) > 0: print(status + ": " + self.log_id + string) else: print(self.log_id + string) def process( self, to_process_list, args=[None], overwrite=False, overwrite_dep=False, **kwargs ): """ Render the data in to_process_list and save them """ if type(to_process_list) == str: to_process_list = [to_process_list] if type(args) == str or args is None: args = [args] self.overwrite_dep = overwrite_dep self.just_done = [] # Computations done within this call for name in to_process_list: if name in self.rules: rule = self.rules[name] for arg in args: self._solve_and_process_rule(name, rule, arg, overwrite, **kwargs) else: self._log( "{} is unknown, allowed rules are {}".format( name, self.rules.keys() ), "ERROR", ) return self.just_done def _solve_and_process_rule(self, name, rule, arg, overwrite=False, **kwargs): updated = self._solve_dependencies(name, rule, arg, overwrite, **kwargs) overwrite_rule = overwrite or updated self._process_rule(name, rule, arg, overwrite_rule, **kwargs) def _solve_dependencies(self, name, rule, arg, overwrite=False, **kwargs): self.done_before_dep = len(self.just_done) # Solve dependencies for dep in rule.dependencies: try: dep_arg = rule.dependencies[dep] except: dep_arg = arg if dep_arg == "__parent__": dep_arg = arg if self.solve_self_dep and dep in self.rules: rule_dep = self.rules[dep] self._solve_and_process_rule(dep, rule_dep, dep_arg, self.overwrite_dep) else: self._not_self_dep(name, dep, dep_arg, self.overwrite_dep, **kwargs) # Whether dependencies where updated return len(self.just_done) > self.done_before_dep def _not_self_dep(self, name, dep, dep_arg, overwrite, **kwargs): self._log("Dependency {} for {} is unknown".format(dep, name), "ERROR") def _needs_computation(self, overwrite, name_full): return overwrite def _process_rule(self, name, rule, arg, overwrite=False, **kwargs): if not arg is None: name_full = rule.group + "/" + name + "_" + str(arg) else: name_full = rule.group + "/" + name if rule.is_valid(arg): if not name_full in self.just_done: if self._needs_computation(overwrite, name_full): self._log("Processing {}".format(name_full)) data = rule.process(arg, **kwargs) self._save_data(name_full, data, rule.description, rule.unit) self._log("Data for {} computed".format(name_full), "SUCCESS") self.just_done.append(name_full) else: self._log( "Data for {} is already computed, skipping...".format(name_full) ) else: self._log("{} is not valid in this context".format(name_full), "ERROR") def def_rules(self): for rule in self.rules: setattr(self, rule, partial(self.process, rule)) class HDF5Container(BaseProcessor): filename = "" save = None opened = False def open(self): if not self.opened: self.save = tables.open_file(self.filename, mode="a") self.opened = True def close(self): if self.opened: self.save.close() self.opened = False def _needs_computation(self, overwrite, name_full): return overwrite or not (name_full in self.save) def _process_rule(self, name, rule, arg, overwrite, **kwargs): self.open() try: super(HDF5Container, self)._process_rule( name, rule, arg, overwrite, **kwargs ) finally: self.close() def get_value(self, node_name): self.open() try: node = self.save.get_node(node_name) if node._v_attrs.CLASS == "GROUP": value = {} for child_name in node._v_children: value[child_name] = self.get_value(node_name + "/" + child_name) else: value = node.read() finally: self.close() return value def _save_data(self, name_full, data, description, unit): """ Save data in the HDF5 structure, overwrite if necessary """ if name_full in self.save: self.save.remove_node(name_full, recursive=True) if type(data) == dict: if type(description) == str: self.save.create_group( os.path.dirname(name_full), os.path.basename(name_full), description, createparents=True, ) else: self.save.create_group( os.path.dirname(name_full), os.path.basename(name_full), "", createparents=True, ) if not type(unit) == dict: self.save.get_node(name_full)._v_attrs.unit = unit for key in data: if type(description) == dict: if type(unit) == dict: self._save_data( name_full + "/" + key, data[key], description[key], unit[key], ) else: self._save_data( name_full + "/" + key, data[key], description[key], unit ) else: if type(unit) == dict: self._save_data(name_full + "/" + key, data[key], "", unit[key]) else: self._save_data(name_full + "/" + key, data[key], "", unit) else: self.save.create_array( os.path.dirname(name_full), os.path.basename(name_full), data, description, createparents=True, ) self.save.get_node(name_full).attrs.unit = unit def set_value(self, node_name, data, description, unit): self.open() try: self._save_data(node_name, data, description, unit) finally: self.close() def get_attribute(self, node_name, attr_name): self.open() try: node = self.save.get_node(node_name) attr = node._v_attrs[attr_name] finally: self.close() return attr def _map_rule(rule, arg, overwrite, path, path_out, pp_params, run_num): try: pp = PostProcessor( path + "/" + run_num[0], run_num[1], path_out + "/" + run_num[0], pp_params ) except pymses.RamsesIOError as e: print(e) return pp.process(rule, arg, overwrite) class Aggregator: def _not_self_dep(self, name, dep, dep_arg, overwrite, **kwargs): if "runs" in kwargs: dep_runs = [run for run in self.runs if run in kwargs["runs"]] else: dep_runs = self.runs pps = [[self.pp_runs[run][num] for num in self.nums[run]] for run in dep_runs] run_num = [(run, num) for run in dep_runs for num in self.nums[run]] map_fn = partial( _map_rule, dep, dep_arg, overwrite, self.path, self.path_out, self.pp_params ) if self.pp_params.process.num_process > 1: pool = Pool(processes=self.pp_params.process.num_process) done = pool.map(map_fn, run_num) pool.close() pool.join() else: done = map(map_fn, run_num) self.just_done.extend([item for li in done for item in li]) def simple_getter(name, dset): return dset[name] mass_func = lambda dset: dset["rho"] * dset.get_sizes() ** 3 # Mass function class PostProcessor(HDF5Container): """ This class enable to compute and save derived quantities from the raw output """ # Axes information _ax_nb = {"x": 0, "y": 1, "z": 2} # Number of each axes _axes_h = {"x": "y", "y": "x", "z": "x"} # Associated horizontal axe _axes_v = {"x": "z", "y": "z", "z": "y"} # Associated vertical axe G = 1.0 # Gravitational constant cells_loaded = False def __init__(self, path=None, num=None, path_out=None, pp_params=None, tag=None): """ Creates the basic structures needed for the outputs """ super(PostProcessor, self).__init__(path, path_out, pp_params, tag) # Open outfile if not self.pp_params.out.tag == "": tag_name = self.pp_params.out.tag + "_" else: tag_name = "" self.filename = path_out + "/postproc_" + tag_name + format(num, "05") + ".h5" if not os.path.exists(path_out): os.makedirs(path_out) self.open() # Ramses Output self.path = path self.run = os.path.basename(path) self.num = num self._ro = pymses.RamsesOutput( path, num, order=pp_params.pymses.order, verbose=pp_params.pymses.verbose ) self._amr = self._ro.amr_source(self.pp_params.pymses.variables) self.info = self._ro.info.copy() # Density operator self._rho_op = ScalarOperator( lambda dset: dset["rho"], self._ro.info["unit_density"] ) # Density ray tracer if pp_params.pymses.fft: self._rt = splatting.SplatterProcessor( self._amr, self._ro.info, self._rho_op ) else: self._rt = raytracing.RayTracer(self._amr, self._ro.info, self._rho_op) # Set the extend of the image self._radius = 0.5 / pp_params.pymses.zoom self._lbox = self.info["boxlen"] center = pp_params.pymses.center im_extent = [ (-self._radius + center[0]) * self._lbox, (self._radius + center[0]) * self._lbox, (-self._radius + center[1]) * self._lbox, (self._radius + center[1]) * self._lbox, ] # Get time time = self._ro.info["time"] # time in codeunits # Set post processing attributes self.save.root._v_attrs.dir = os.path.dirname(path) self.save.root._v_attrs.run = os.path.basename(path) self.save.root._v_attrs.num = num self.save.root._v_attrs.lbox = self._lbox self.save.root._v_attrs.time = time if not "/maps" in self.save: self.save.create_group("/", "maps", "2D maps") self.save.root.maps._v_attrs.center = center self.save.root.maps._v_attrs.radius = self._radius self.save.root.maps._v_attrs.im_extent = im_extent # Initialize cameras self._cam = {} for ax_los in self._ax_nb: # los = line of sight ax_h = self._axes_h[ax_los] ax_v = self._axes_v[ax_los] self._cam[ax_los] = Camera( center=pp_params.pymses.center, line_of_sight_axis=ax_los, region_size=[2.0 * self._radius, 2.0 * self._radius], distance=self._radius, far_cut_depth=self._radius, up_vector=ax_v, map_max_size=pp_params.pymses.map_size, ) self.close() self.log_id = "[{}, {}] ".format(self.run, self.num) self.def_rules() def load_cells(self): """ Load all cells from the source file in the memory. Cells will be accessible trough self.cells (/!\ Long and memory heavy) """ if not self.cells_loaded: cell_source = CellsToPoints(self._amr) self.cells = cell_source.flatten() self.cells_loaded = True def unload_cells(self): """ Free space in the memory by telling the garbage collectors that self.cells is not needed """ if self.cells_loaded: del self.cells self.cells_loaded = False def _slice(self, getter, ax_los="z", z=0, unit=cst.none): """ Slice process function. Return a slice of the source box. Parameters ---------- getter : callable A callable that extract the wanted data from a pymses dataset ax_los : string The axis perpendicular to the slice plane z : float Coordinate of the slice on the ax_los axis unit : cst.Unit Unit of the resulting dataset Returns ------- A numpy array containing the slice """ op = ScalarOperator(getter, unit) datamap = slicing.SliceMap(self._amr, self._cam[ax_los], op, z=z) return datamap.map.T def _ax_avg( self, getter, ax_los, unit=cst.none, mass_weighted=True, surf_qty=False ): """ Map of the average of a quantity (given by getter) along an axis (ax_los) """ if mass_weighted: def num(cells): value = getter(cells) mass = mass_func(cells) # Transpose (.T) is for vectorial values return (mass * value.T).T op = FractionOperator(num, mass_function, unit) else: op = ScalarOperator(getter, unit) if pp_params.pymses.fft: rt = splatting.SplatterProcessor(self._amr, self._ro.info, op) else: rt = raytracing.RayTracer(self._amr, self._ro.info, op) datamap = rt.process(self._cam[ax_los], surf_qty=surf_qty) return datamap.map.T def _vol_avg(self, getter, mass_weighted=True): self.load_cells() value = getter(self.cells) if mass_weighted: mass = mass_func(self.cells) # Transpose (.T) is for vectorial values return np.sum((mass * value.T).T, axis=0) / np.sum(mass) else: return np.sum(value, axis=0) def _mwa_sigma(self): mw_speed = self.save.get_node("/globals/mwa_speed").read() def getter(dset): return np.sum((dset["vel"] - mw_speed) ** 2, axis=1) return np.sqrt(self._vol_avg(getter, mass_weighted=True)) def _coldens(self, ax_los): datamap = self._rt.process(self._cam[ax_los], surf_qty=True) return datamap.map.T def _rho(self, ax_los, z=0.0): datamap_rho = slicing.SliceMap(self._amr, self._cam[ax_los], self._rho_op, z=z) return (datamap_rho.map).T def _speed_h(self, ax_los, z=0.0): vh_op = ScalarOperator( lambda dset: dset["vel"][:, self._ax_nb[self._axes_h[ax_los]]], self._ro.info["unit_velocity"], ) dmap_vh = slicing.SliceMap(self._amr, self._cam[ax_los], vh_op, z=z).map.T return dmap_vh def _speed_v(self, ax_los, z=0.0): vv_op = ScalarOperator( lambda dset: dset["vel"][:, self._ax_nb[self._axes_v[ax_los]]], self._ro.info["unit_velocity"], ) dmap_vv = slicing.SliceMap(self._amr, self._cam[ax_los], vv_op, z=z).map.T return dmap_vv def _temperature(self, ax_los, z=0.0): P_op = ScalarOperator(lambda dset: dset["P"], self._ro.info["unit_pressure"]) dmap_P = (slicing.SliceMap(self._amr, self._cam[ax_los], P_op, z=z)).map.T dmap_rho = self.save.get_node("/maps/rho_{}".format(ax_los)).read() return dmap_P / dmap_rho def _levels(self, ax_los): self._amr.set_read_levelmax(self.pp_params.pymses.levelmax) level_op = MaxLevelOperator() rt_level = raytracing.RayTracer(self._amr, self._ro.info, level_op) datamap = rt_level.process(self._cam[ax_los], surf_qty=True) return datamap.map.T def _jeans(self, ax_los): dmap_T = self.save.get_node("/maps/T_" + ax_los).read() dmap_rho = self.save.get_node("/maps/rho_" + ax_los).read() dmap_jeans = np.sqrt(np.pi * dmap_T / dmap_rho) return dmap_jeans def _jeans_ratio(self, ax_los): dmap_jeans = self.save.get_node("/maps/jeans_" + ax_los).read() dmap_levels = self.save.get_node("/maps/levels_" + ax_los).read() dmap_jeans_ratio = dmap_jeans * 2 ** (dmap_levels) return dmap_jeans_ratio def _toomreQ_disk(self, ax_los): """ Compute the Toomre Q parameter in a Keplerian disk """ # Operator to compute the angular speed times rho def omega_rho_func(dset): pos = dset.get_cell_centers() pos = pos - (self.pp_params.disk.pos_star / self._lbox) xx = pos[:, :, 0] yy = pos[:, :, 1] rc = np.sqrt(xx ** 2 + yy ** 2) # cylindrical radius vx = dset["vel"][:, :, 0] vy = dset["vel"][:, :, 1] omega_rho = 1.0 / rc ** 2 omega_rho = omega_rho * dset["rho"] vyx = vy * xx vxy = vx * yy omega_rho = omega_rho * (vyx - vxy) return omega_rho # Operator to compute the angular speed omega_op = FractionOperator( omega_rho_func, lambda dset: dset["rho"], 1.0 / self._ro.info["unit_time"] ) # Operator to compute the sound speed cs_op = FractionOperator( lambda dset: dset["P"], lambda dset: dset["rho"], self._ro.info["unit_velocity"], ) # Ray tracer for the angular speed rt_omega = raytracing.RayTracer(self._amr, self._ro.info, omega_op) # Ray tracer for the sound speed if self.pp_params.pymses.fft: rt_cs = splatting.SplatterProcessor( self._amr, self._ro.info, cs_op, surf_qty=False ) else: rt_cs = raytracing.RayTracer(self._amr, self._ro.info, cs_op) dmap_omega = rt_omega.process(self._cam[ax_los]) dmap_cs = rt_cs.process(self._cam[ax_los]) dmap_col = self.save.root.maps.coldens_z.read() map_Q = ( (self._lbox * dmap_cs.map.T) * dmap_omega.map.T / (np.pi * self.G * dmap_col) ) return map_Q def _radial_bins(self, _): """ Computes radial bins (for disk) """ pos_star = self.pp_params.disk.pos_star im_extent = self.save.root.maps._v_attrs.im_extent # radius of the corner of the box plus a margin rad_of_box = ( np.sqrt( (im_extent[1] - pos_star[0]) ** 2 + (im_extent[3] - pos_star[1]) ** 2 ) + 0.1 ) bin_in = self.pp_params.disk.bin_in bin_out = self.pp_params.disk.bin_out nb_bin = self.pp_params.disk.nb_bin # radial bins if self.pp_params.disk.binning == "log": lrad_in = np.log10(bin_in) lrad_ext = np.log10(bin_out) rad_bins = np.logspace(lrad_in, lrad_ext, num=nb_bin) elif self.pp_params.disk.binning == "lin": rad_bins = np.linspace(bin_in, bin_out, num=nb_bin) # Add boundaries rad_bins = np.concatenate(([0.0], rad_bins, [rad_of_box])) return rad_bins def _rr(self, _): """ Computes the radius from the center """ im_extent = self.save.root.maps._v_attrs.im_extent map_size = self.pp_params.pymses.map_size pos_star = self.pp_params.disk.pos_star x = np.linspace(im_extent[0], im_extent[1], map_size) y = np.linspace(im_extent[2], im_extent[3], map_size) xx, yy = np.meshgrid(x, y) rr = np.sqrt((xx - pos_star[0]) ** 2 + (yy - pos_star[1]) ** 2) return rr def _bins_on_map(self, ax_los): rad_bins = self.save.get_node("/radial/radial_bins_" + ax_los).read() rr = self.save.get_node("/maps/rr_" + ax_los).read() # Find appropriate bin for each coordinate set bins = np.zeros(rr.shape, dtype=int) for r in rad_bins[1:]: bins = bins + (rr >= r).astype(int) return bins def _rad_avg(self, name, ax_los): radial_bins = self.save.get_node("/radial/radial_bins_" + ax_los).read() bins_on_map = self.save.get_node("/maps/bins_on_map_" + ax_los).read() dmap = self.save.get_node("/maps/" + name + "_" + ax_los).read() # mean of all the cells in the bin mean_bin = np.zeros(len(radial_bins) - 1) for j in range(len(radial_bins) - 1): mean_bin[j] = np.mean(dmap[bins_on_map == j]) return mean_bin def _rad_avg_map(self, name, ax_los): radial_bins = self.save.get_node("/radial/radial_bins_" + ax_los).read() bins_on_map = self.save.get_node("/maps/bins_on_map_" + ax_los).read() rr = self.save.get_node("/maps/rr_" + ax_los).read() mean_bin = self.save.get_node("/radial/rad_avg_" + name + "_" + ax_los).read() # Add value for border mean_bin = np.concatenate(([mean_bin[0]], mean_bin)) rr_flat = rr.flatten() bins_on_map_flat = bins_on_map.flatten() # Compute the map azimuthally averaged # use linear interpolation to improve accuracy avg_flat = (radial_bins[bins_on_map_flat + 1] - rr_flat) * mean_bin[ bins_on_map_flat ] avg_flat = ( avg_flat + (rr_flat - radial_bins[bins_on_map_flat]) * mean_bin[bins_on_map_flat + 1] ) avg_flat = avg_flat / ( radial_bins[bins_on_map_flat + 1] - radial_bins[bins_on_map_flat] ) avg_map = np.reshape(avg_flat, rr.shape) return avg_map def _fluct_map(self, name, ax_los): dmap = self.save.get_node("/maps/" + name + "_" + ax_los).read() avg_map = self.save.get_node("/maps/avg_map_" + name + "_" + ax_los).read() return dmap / avg_map def _pdf(self, name, ax_los): fluct_map = self.save.get_node("/maps/fluct_" + name + "_" + ax_los).read() rr = self.save.get_node("/maps/rr_" + ax_los).read() mask_pdf = (rr > self.pp_params.disk.rmin_pdf) & ( rr < self.pp_params.disk.rmax_pdf ) nb_cells = np.sum(mask_pdf.flatten()) values, edges = np.histogram( np.log10(fluct_map[mask_pdf].flatten()), self.pp_params.pdf.nb_bin, weights=np.ones(nb_cells) / nb_cells, ) centers = 0.5 * (edges[1:] + edges[:-1]) return np.stack([values, centers]) def _fit_pdf(self, name, ax_los): pdf = self.save.get_node("/hist/pdf_" + name + "_" + ax_los) values, centers = pdf.read() mask_fit = ( (centers > self.pp_params.pdf.xmin_fit) & (centers < self.pp_params.pdf.xmax_fit) & (values > 0) ) (slope, origin, correlation, _, stderr) = linregress( centers[mask_fit], np.log10(values[mask_fit]) ) pdf.attrs.slope = slope pdf.attrs.origin = origin pdf.attrs.correlation = correlation pdf.attrs.stderr = stderr pdf.attrs.var = np.var return True def _clumps(self): name = self.path_out + "/" + self.tag + "_" + str(self.num).zfill(5) hop_save = name + "_hop" + "_prop_struct.save" me.make_clump_hop( self.path, self.num, name + "_hop", self.pp_params.hop.rho_thres, self.pp_params.hop.lvl_thres, [0.5, 0.5, 0.5], 1, path_out=path_out + "/", path_hop="./", force=True, gcomp=False, ) hop_save = me.clump_properties( name + "_hop", path, num, path_out=path_out + "/", gcomp=False ) f = open(path_out + "/" + hop_save) hop_data = pickle.load(f) f.close() return hop_data def _sinks(self): csv_name = ( self.path + "/output_" + str(self.num).zfill(5) + "/sink_" + str(self.num).zfill(5) + ".csv" ) sinks = np.loadtxt(csv_name, delimiter=",") header = [ "Id", "M", "dmf", "x", "y", "z", "vx", "vy", "vz", "rot_period", "lx", "ly", "lz", "acc_rate", "acc_lum", "age", "int_lum", "Teff", ] if len(sinks) == 0: sinks = np.zeros(len(header)) sinks_dict = {} for key, a in zip(header, sinks): sinks_dict[key] = a return sinks_dict def _transform(self, name, transform_fn, group="/maps", **kwargs): src = self.save.get_node(group + "/" + name).read() return transform_fn(src, **kwargs) def _gen_rule_transform( self, rule_src_name, transform_fn, transform_name, subarray_name=None, group=None, ): rule_src = self.rules[rule_src_name] if subarray_name is None: src_name = rule_src_name group_src = rule_src.group unit = rule_src.unit description = rule_src.description else: src_name = subarray_name group_src = rule_src.group + "/" + rule_src_name unit = rule_src.unit[subarray_name] description = rule_src.description[subarray_name] def fn(arg=None, **kwargs): if arg is None: return self._transform( src_name, transform_fn, group=group_src, **kwargs ) else: return self._transform( src_name + "_" + str(arg), transform_fn, group=group_src, **kwargs ) if group is None: group = group_src name = transform_name + "_" + rule_src_name self.rules[name] = Rule( self, fn, group=group, unit=unit, description=description, dependencies=[rule_src_name], ) def def_rules(self): self.rules = { # Base rules "coldens": Rule( self, self._coldens, "Column density", "/maps", unit=self.info["unit_density"] * self.info["unit_length"], ), "rho": Rule( self, self._rho, "Density slice", "/maps", unit=self.info["unit_density"], ), "speed_h": Rule( self, self._speed_h, "Horizontal speed slice wrt the line of sight", "/maps", unit=self.info["unit_velocity"], ), "speed_v": Rule( self, self._speed_v, "Vertical speed slice wrt the line of sight", "/maps", unit=self.info["unit_velocity"], ), "T": Rule( self, self._temperature, "Temperature slice", "/maps", dependencies=["rho"], unit=self.info["unit_temperature"], ), "levels": Rule( self, self._levels, "Max level within line of sight", "/maps" ), "jeans": Rule( self, self._jeans, "Jeans lenght slice", "/maps", dependencies=["rho", "T"], ), "jeans_ratio": Rule( self, self._jeans_ratio, "Jeans' lenght divided by the max resolution", "/maps", dependencies=["jeans", "levels"], ), "Q": Rule( self, self._toomreQ_disk, "Toomre Q parameter for a Keplerian disk", "/maps", dependencies=["coldens"], is_valid=lambda _: self.pp_params.disk.on, ), "sinks": Rule( self, self._sinks, group="/datasets", unit={ "Id": cst.none, "M": cst.Msun, "dmf": cst.Msun, "x": "", "y": "", "z": "", "vx": "", "vy": "", "vz": "", "rot_period": "[y]", "lx": "|l|", "ly": "|l|", "lz": "|l|", "acc_rate": "[Msol/y]", "acc_lum": "[Lsol]", "age": cst.year, "int_lum": "[Lsol]", "Teff": cst.K, }, ), "clumps": Rule(self, self._clumps, group="/datasets"), # Helpers "radial_bins": Rule(self, self._radial_bins, "Radial bins", "/radial"), "rr": Rule(self, self._rr, "Coordinate map", "/maps"), "bins_on_map": Rule( self, self._bins_on_map, "Convert map coordinates to bins", "/maps", dependencies=["radial_bins", "rr"], ), # globals "time_num": Rule( self, lambda: self.info["time"], "Time", "/globals", unit=self.info["unit_time"], ), "mwa_speed": Rule( self, partial(self._vol_avg, partial(simple_getter, "vel")), "Mass weighted speed average", "/globals", unit=self.info["unit_velocity"], ), "mwa_sigma": Rule( self, self._mwa_sigma, "Mass weighted speed average", "/globals", dependencies=["mwa_speed"], unit=self.info["unit_velocity"], ), } # Average and other averageables = ["coldens", "rho", "T", "Q"] for name in averageables: self.rules["rad_avg_" + name] = Rule( self, partial(self._rad_avg, name), "Azimuthal average of {}".format(name), "/radial", dependencies=["radial_bins", "bins_on_map", name], ) self.rules["avg_map_" + name] = Rule( self, partial(self._rad_avg_map, name), "Interpolated map of azimuthal average of {}".format(name), "/maps", dependencies=["radial_bins", "bins_on_map", "rr", "rad_avg_" + name], ) self.rules["fluct_" + name] = Rule( self, partial(self._fluct_map, name), "Fluctuation wrt to average of {}".format(name), "/maps", dependencies=[name, "avg_map_" + name], ) self.rules["pdf_" + name] = Rule( self, partial(self._pdf, name), "Probability density function of {} fluctuations".format(name), "/hist", dependencies=["rr", "fluct_" + name], ) self.rules["fit_pdf_" + name] = Rule( self, partial(self._fit_pdf, name), "Fit the PDF of {} fluctuations".format(name), "/hist", dependencies=["pdf_" + name], ) self._gen_rule_transform("fluct_coldens", np.max, "max", group="/globals") super(PostProcessor, self).def_rules() class Comparator(Aggregator, HDF5Container): """ Do comparaison between outputs and runs """ def __init__( self, path, in_runs, in_nums, path_out=None, pp_params=default_params(), selector=None, tag=None, **kwargs ): """ Creates the basic structures needed for the outputs """ super(Comparator, self).__init__(path, path_out, pp_params, tag) # Open outfile if not self.pp_params.out.tag == "": tag_name = "_" + self.pp_params.out.tag else: tag_name = "" self.filename = path_out + "/comp" + tag_name + ".h5" # Select runs if selector is None: selector = RunSelector(path, in_runs, in_nums, self.pp_params, **kwargs) # Save infos self.path = path self.runs = selector.runs self.nums = selector.nums # Get postprocesor objets for each run self.pp_runs = {} for run in self.runs: path_run = path + "/" + run path_out_run = path_out + "/" + run self.pp_runs[run] = {} for num in self.nums[run]: self.pp_runs[run][num] = PostProcessor( path_run, num, path_out=path_out_run, pp_params=self.pp_params ) self.namelist = selector.namelist # Get info from one output. TODO Avoid using pymses for that self.info = self.pp_runs[self.runs[0]][self.nums[self.runs[0]][0]].info # log info self.log_id = "[comp {}] ".format(self.pp_params.out.tag) # Define rules self.def_rules() def _needs_computation(self, overwrite, name_full): """ Returns True if a new computation of the rule is needed """ if overwrite or not (name_full in self.save): return True elif not "nums" in self.save.get_node(name_full)._v_attrs: return True else: saved_nums = self.save.get_node(name_full)._v_attrs.nums missing_runs = len([run for run in self.nums if not run in saved_nums]) > 0 missing_nums = missing_runs or all( [ len([num for num in self.nums[run] if not num in saved_nums[run]]) > 0 for run in self.nums if run in saved_nums ] ) return missing_nums def _save_data(self, name_full, data, description, unit): super(Comparator, self)._save_data(name_full, data, description, unit) self.save.get_node(name_full)._v_attrs.nums = self.nums def _time_series(self, getter, arg=None): series = {} for run in self.runs: series[run] = np.zeros(len(self.nums[run])) for i, num in enumerate(self.nums[run]): series[run][i] = getter(run, num, arg=arg) return series def _comp(self, getter, use_num=True): prop = np.zeros(len(self.runs)) for i, run in enumerate(self.runs): if use_num: num = self.nums[run][0] prop[i] = getter(run, num) else: prop[i] = getter(run) return prop def _time_avg(self, name, start=None, end=None, span=None, group="/series"): mean = np.zeros(len(self.runs)) median = np.zeros(len(self.runs)) std = np.zeros(len(self.runs)) for i, run in enumerate(self.runs): serie = self.save.get_node(group + "/" + name + "/" + run).read() time = self.save.get_node(group + "/time/" + run).read() mask = abs(serie) != np.inf if not ((start, end, span) == (None, None, None)): start_r, end_r = start, end # Be sure that start_r and end_r are defined if start_r is None: if end_r is None: end_r = time[-1] start_r = end_r - span elif end_r is None: end_r = start_r + span mask = mask & (time >= start_r) & (time <= end_r) mean[i] = np.nanmean(serie[mask]) median[i] = np.nanmedian(serie[mask]) std[i] = np.nanstd(serie[mask]) return {"runs": self.runs, "mean": mean, "std": std, "median": median} def get_attr(self, attr_name, run, num, node_name="/", arg=None): pp = self.pp_runs[run][num] if not arg is None: node_name = node_name + "_" + str(arg) return pp.get_attribute(node_name, attr_name) def get_global(self, node_name, run, num, arg=None, unload_cells=False): if not arg is None: node_name = node_name + "_" + str(arg) pp = self.pp_runs[run][num] if unload_cells: pp.unload_cells() value = pp.get_value(node_name) return value def get_nml(self, nml_key, run): res = self.namelist[run] for key in nml_key.split("/"): res = res[key] return res def get_pdf_slope(self, name, run, num): pp = self.pp_runs[run][num] pp.process(["fit_pdf_" + name], ["z"], overwrite=self.overwrite_dep) slope = pp.get_attribute("/hist/pdf_" + name + "_z", "slope") return slope def _extract_sinks_from_log(self, series, log_filename, run): cmd_grep = "sed '/cpu.*/d' {} | grep 'Number of sink' -A 2".format(log_filename) content = os.popen(cmd_grep).readlines() for i in range(0, len(content), 4): series["nb_sink"][run].append(np.int(content[i].split("=")[1])) series["mass_sink"][run].append(np.float(content[i + 1].split("=")[1])) series["time"][run].append(np.float(content[i + 2].split("=")[1])) return series def _extract_sfr_from_log(self, series, log_filename, run): cmd_grep = "grep '\[SFR' {} ".format(log_filename) content = os.popen(cmd_grep).readlines() for i in range(0, len(content)): time = np.float(content[i].split("]")[0].split("=")[1].split()[0]) sfr = np.float(content[i].split("]")[1].split("=")[1].split()[0]) series["time"][run].append(time) series["sfr"][run].append(sfr) return series def _extract_rms_from_log(self, series, log_filename, run): cmd_grep = "grep 'turbulent rms' {} -C 1".format(log_filename) content = os.popen(cmd_grep).readlines() for i in range(0, len(content), 4): series["time"][run].append(np.float(content[i].split("=")[2].split()[0])) series["turb_rms"][run].append(np.float(content[i + 1].split(":")[1])) try: turb_energy = np.float(content[i + 2].split(":")[1]) threshold = self.pp_params.rules.turb_energy_threshold assert threshold < 0 or abs(turb_energy) < threshold series["turb_energy"][run].append(turb_energy) except (ValueError, IndexError, AssertionError): series["turb_energy"][run].append(np.nan) return series def _from_log(self, keys, extractor): nums = self.nums # Initialize series series = {} for key in keys: series[key] = {} for run in self.runs: # Initialize list for key in keys: series[key][run] = [] # get one preprocessor path_run = self.path + "/" + run # Get list of run files log_files = path_run + "/" + self.pp_params.input.log_prefix + "*" # Parse files for log_filename in glob.glob(log_files): series = extractor(series, log_filename, run) # Numpify the lists for key in series: series[key][run] = np.array(series[key][run]) # Sort the arrays ind_sort = series["time"][run].argsort() for key in series: series[key][run] = series[key][run][ind_sort] return series def _ssfr_from_mass_sink(self, avg_window=None): """ avg_window in year """ time_unit = self.save.get_node("/series/sinks_from_log/time")._v_attrs.unit mass_unit = self.save.get_node("/series/sinks_from_log/mass_sink")._v_attrs.unit # Surface of the box in pc^2 surface = (self.info["unit_length"].express(cst.pc)) ** 2 # WARNING : We do not multiply by boxlen since already done in 'unit_length' (pymses) ssfr = {} for run in self.runs: time = self.save.get_node("/series/sinks_from_log/time/" + run).read() time = time * time_unit.express(cst.year) mass_sink = self.save.get_node( "/series/sinks_from_log/mass_sink/" + run ).read() mass_sink = mass_sink * mass_unit.express(cst.Msun) if avg_window is None: shift = 1 else: # We assume that the timestep do not vary a lot ... shift = np.searchsorted(time, avg_window, side="left") sfr = (mass_sink[shift:] - mass_sink[:-shift]) / ( time[shift:] - time[:-shift] ) ssfr[run] = np.zeros(len(mass_sink)) ssfr[run][shift:] = sfr / surface return ssfr def _gen_rule_time_global( self, glob_name, name=None, glob_group="/globals", subarray_name=None, unload_cells=True, unit=cst.none, description="", ): if name is None: name = "time_" + glob_name self.rules[name] = Rule( self, partial( self._time_series, partial( self.get_global, glob_group + "/" + glob_name, unload_cells=True ), ), group="/series", unit=unit, dependencies={"time": None, glob_name: "__parent__"}, ) def _gen_rule_avg(self, rule_src_name, subarray_name=None): rule_src = self.rules[rule_src_name] if subarray_name is None: src_name = rule_src_name group_src = rule_src.group unit = rule_src.unit descr = rule_src.description else: src_name = subarray_name group_src = rule_src.group + "/" + rule_src_name unit = rule_src.unit[subarray_name] descr = rule_src.description[subarray_name] description = { "runs": "List of runs", "mean": "Temporal average of {}".format(descr), "std": "Standard deviation of {}".format(descr), "median": "Median of {}".format(descr), } name = "avg_" + src_name def fn(arg=None, **kwargs): if arg is None: return self._time_avg(src_name, group=group_src, **kwargs) else: return self._time_avg( src_name + "_" + str(arg), group=group_src, **kwargs ) self.rules[name] = Rule( self, fn, group="/comp", unit=unit, description=description, dependencies=[rule_src_name], ) def def_rules(self): averageables = ["coldens", "rho", "T", "Q"] self.rules = { # Read from log "sinks_from_log": Rule( self, partial( self._from_log, ["time", "mass_sink", "nb_sink"], self._extract_sinks_from_log, ), group="/series", unit={ "time": self.info["unit_time"], "mass_sink": cst.Msun, "nb_sink": cst.none, }, description={ "time": "Time", "mass_sink": "Total mass of stars", "nb_sink": "Number of stars", }, ), "issfr": Rule( self, self._ssfr_from_mass_sink, group="/series/sinks_from_log", unit=cst.ssfr, description="Instantaneous surfacic star formation rate", dependencies=["sinks_from_log"], ), "sfr_from_log": Rule( self, partial(self._from_log, ["time", "sfr"], self._extract_sfr_from_log), group="/series", unit={"time": cst.year, "sfr": cst.ssfr}, description={ "time": "Time", "sfr": "Averaged surfacic star formation rate", }, ), "rms_from_log": Rule( self, partial( self._from_log, ["time", "turb_rms", "turb_energy"], self._extract_rms_from_log, ), group="/series", unit={ "time": self.info["unit_time"], "turb_rms": cst.none, "turb_energy": cst.none, }, description={ "time": "Time", "turb_rms": "Computed turbulent RMS", "turb_energy": "Injected turbulent energy", }, ), # Read from outputs "time": Rule( self, partial( self._time_series, partial(self.get_global, "/globals/time_num") ), group="/series", unit=self.info["unit_time"], dependencies=["time_num"], ), "time_pdf_slope_coldens": Rule( self, partial( self._time_series, partial( self.get_attr, "slope", node_name="/hist/pdf_coldens_z", ), ), group="/series", dependencies={"time": None, "fit_pdf_coldens": "z"}, ), # namelist "nml": Rule( self, lambda nml_key: self._comp( partial(self.get_nml, nml_key), use_num=False ), group="/comp", ), } self._gen_rule_time_global( "mwa_sigma", "time_sigma", unit=self.info["unit_velocity"] ) self._gen_rule_time_global("max_fluct_coldens") for name in ["issfr", "time_sigma", "time_pdf_slope_coldens"]: self._gen_rule_avg(name) self._gen_rule_avg("sinks_from_log", "mass_sink") self._gen_rule_avg("sinks_from_log", "nb_sink") self._gen_rule_avg("sfr_from_log", "sfr") self._gen_rule_avg("rms_from_log", "turb_rms") self._gen_rule_avg("rms_from_log", "turb_energy") super(Comparator, self).def_rules() def get_time(path, num): """ Return the time of the output (code units) Parameters ---------- num output number path_out path of the pipeline output Returns ------- time the time of the output (code units) """ try: f = open( path + "/output_" + str(num).zfill(5) + "/info_" + str(num).zfill(5) + ".txt" ) for line in f: ls = line.split() if len(ls) > 1 and ls[0] == "time": time = float(ls[2]) break f.close() return time except IOError as e: print(e) return np.nan