Files
pipeline/snapshotprocessor.py
2024-03-22 15:06:07 +01:00

2328 lines
78 KiB
Python

# coding: utf-8
import numpy as np
import tables
import pickle
import astropy.units as u
import pandas as pd
from struct import pack
from skimage.morphology import medial_axis
import os
from distutils.file_util import copy_file
from functools import partial
from scipy.stats import linregress
from astrophysix.simdm.results import Snapshot
import pymses
import pymses.utils.regions as reg
from pymses.analysis import (
Camera,
FractionOperator,
MaxLevelOperator,
ScalarOperator,
raytracing,
slicing,
splatting,
cube3d,
)
from pymses.filters import CellsToPoints, RegionFilter
from pymses.sources.hop.hop import HOP
try:
from fil_finder import FilFinder2D
except ModuleNotFoundError:
print("WARNING: No filFinder support")
from scipy import fft
from scripts import pspec as pspec
from utils.units import U
from baseprocessor import (
HDF5Container,
Rule,
norm_getter,
oct_norm_getter,
simple_getter,
vect_getter,
oct_vect_getter,
)
from utils.runselector import RunSelector
# Getters
def mass_func(dset):
dx = dset["dx"]
return dset["rho"] * dx**3 # Mass function
def vol_func(dset):
return dset["dx"] ** 3 # Volume function
def getter_T(dset):
return dset["P"] / dset["rho"] # Temperature
def getter_P(dset):
return dset["P"]
def getter_abs_cos_vB(dset):
B_norm = np.sqrt(np.sum(dset["Br"] ** 2, axis=1))
v_norm = np.sqrt(np.sum(dset["vel"] ** 2, axis=1))
# Compute the dot product in each cell
dot_prod = np.einsum("ij,ij->i", dset["vel"], dset["Br"])
return np.abs(dot_prod) / (v_norm * B_norm)
def getter_B_int(dset):
B_norm = np.sqrt(np.sum(dset["Br"] ** 2, axis=1))
return B_norm
def getter_rho(dset):
return dset["rho"]
def getter_v_norm(dset):
v_norm = np.sqrt(np.sum(dset["vel"] ** 2, axis=1))
return v_norm
# Helpers
def mean_by_bins(
x,
y,
bins=100,
logbins=False,
weights=None,
range=None,
):
"""
Compute the mean of y in bins of x
Parameters
----------
x, y : np.array of same dimensions
bins : int, number of bins
logbins : bool, if true, the bins will be logaritmically distributed
weights : np.array, same size as y, default None.
range : tuple of float, default None.
"""
mask = np.isfinite(x) & np.isfinite(y)
x = x[mask].flatten()
y = y[mask].flatten()
if logbins:
if range is None:
range = (np.min(x[x > 0]), np.max(x))
x_bins = np.logspace(np.log10(range[0]), np.log10(range[1]), bins, base=10)
else:
if range is None:
range = (np.min(x), np.max(x))
x_bins = np.linspace(range[0], range[1], bins)
# For each cell, bin_number contains the number of the bins it belongs to
bin_number = np.zeros(len(y))
# Go through the min value of x of each bin
for x_min in x_bins[1:-1]:
bin_number = bin_number + (x > x_min).astype(int)
# Compute the mean in each bin
y_mean = np.zeros(len(x_bins) - 1)
for i in np.arange(len(y_mean)):
mask_bin = bin_number == i
if weights is None:
y_mean[i] = np.mean(y[mask_bin])
else:
y_mean[i] = np.sum(y[mask_bin] * weights[mask_bin]) / np.sum(
weights[mask_bin]
)
# Get the center of each bin
if logbins:
centers = 10 ** (0.5 * (np.log10(x_bins[1:]) + np.log10(x_bins[:-1])))
else:
centers = 0.5 * (x_bins[1:] + x_bins[:-1])
return centers, y_mean
# Filament helpers
def find_center(distance, skeleton, i_center, j_center, i, j):
"""
Given a distance array, find the cells at a center of a filament at a given postion
"""
if skeleton[i, j]:
i_center[i, j], j_center[i, j] = i, j
return i, j
elif i_center[i, j] or j_center[i, j]:
return i_center[i, j], j_center[i, j]
else:
i_neigh = np.array([i - 1, i, i + 1])
i_neigh = i_neigh[(i_neigh > 0) & (i_neigh < distance.shape[0])]
j_neigh = np.array([j - 1, j, j + 1])
j_neigh = j_neigh[(j_neigh > 0) & (j_neigh < distance.shape[1])]
ii_neigh, jj_neigh = np.meshgrid(i_neigh, j_neigh)
d_neigh = distance[ii_neigh, jj_neigh]
ind_max = np.unravel_index(np.argmax(d_neigh), d_neigh.shape)
i_max, j_max = ii_neigh[ind_max], jj_neigh[ind_max]
if i_max == i and j_max == j:
i_center[i, j], j_center[i, j] = i, j
else:
i_center[i, j], j_center[i, j] = find_center(
distance, skeleton, i_center, j_center, i_max, j_max
)
return i_center[i, j], j_center[i, j]
# Power spectrum helpers
def pspec2d(map2D):
"""
Computes the 2D powerspectrum of a 2D map
Parameters
----------
map2D : square map to compute the fft from
"""
# Resolution of the map
n = map2D.shape[0]
assert map2D.shape[1] == n
# Create bin array for the wavenumber k
# (Take into account the symmetry)
kbins = np.linspace(0, n // 2, n // 4)
k_alias = np.arange(n, dtype=np.float64)
k = np.where(k_alias >= n // 2, k_alias - n, k_alias)
kx, ky = np.meshgrid(k, k, indexing="ij")
# Compute map of k
kmap = np.sqrt(kx**2 + ky**2)
# Compute fft
fmap = fft.fft2(map2D)
# Compute the power map from the fft
pmap = pspec.pcube(fmap)
# Use the power map and the fft to compute the powerspectrum
# This is typically an histogram of k weighted by the fourier transform value
power, kbins, power2, fbins = pspec.pspectrum(pmap, kmap, kbins, 1, 0)
# Return bin center and power spectrum
return 0.5 * (kbins[1:] + kbins[:-1]), power
def degrade_map(dmap, nnew, integrate=False):
"""Degrade a data dmap to a coarser resolution
Parameters:
-----------
cube
(numpy.ndarray, ndim=2) input data cube, lengths in each direction must
integrate
(bool, default False) if True, values are added instead of averaged
Return value:
-------------
degraded_cube
(numpy.ndarray, ndim=3) output data cube, 2**lvl values in each
direction
"""
assert dmap.ndim == 2
nold = dmap.shape[0]
assert nnew <= nold
nsum, rem = divmod(nold, nnew)
assert rem == 0
# For each direction, we split the corresponding axis (length nold) into
# 2 axes, the first one being the axis for the output dmap (length nnew),
# and the second one the axis to average or integrate over (fine cells).
# reshape creates a view, no copy is involved
v = dmap.reshape((nnew, nsum, nnew, nsum), order="C")
# Even indexes are kept, odd ones are to be summed
dmap_new = np.einsum("iajb->ij", v)
if not integrate:
dmap_new /= nsum
return dmap_new
class SnapshotProcessor(HDF5Container):
"""
This class enable to compute and save derived quantities from the raw output
"""
# Axes information
_ax_nb = {"x": 0, "y": 1, "z": 2} # Number of each axes
_axes_h = {"x": "y", "y": "x", "z": "x"} # Associated horizontal axe
_axes_v = {"x": "z", "y": "z", "z": "y"} # Associated vertical axe
# Pymses unit key of amr fiels
unit_key = {
"rho": "unit_density",
"vel": "unit_velocity",
"Br": "unit_mag",
"Bl": "unit_mag",
"P": "unit_pressure",
"g": {"unit_gravpot": 1, "unit_length": -1},
"phi": "unit_gravpot",
"mass": "unit_mass",
"epoch": "unit_time",
"size": "unit_length",
"dx": "unit_length",
"pos": "unit_length",
"id": U.none,
"level": U.none,
}
# Units sinks
unit_sinks = {
"id": U.none,
"msink": U.Msun,
"dmfsink": U.Msun,
"x": "unit_density",
"y": "unit_density",
"z": "unit_density",
"vx": "unit_velocity",
"vy": "unit_velocity",
"vz": "unit_velocity",
"rot_period": "unit_time",
"lx": {"unit_mass": 1, "unit_length": 2, "unit_time": -1},
"ly": {"unit_mass": 1, "unit_length": 2, "unit_time": -1},
"lz": {"unit_mass": 1, "unit_length": 2, "unit_time": -1},
"acc_rate": {"unit_mass": 1, "unit_time": -1},
"acc_lum": U.none,
"age": U.year,
"int_lum": U.none,
"Teff": U.K,
"level": U.none,
"mbh": "unit_mass",
"tform": "unit_time",
"del_mass": U.Msun,
"rho_gas": "unit_density",
"cs**2": {"unit_velocity": 2},
"etherm": {"unit_mass": 1, "unit_velocity": 2},
"vx_gas": "unit_velocity",
"vy_gas": "unit_velocity",
"vz_gas": "unit_velocity",
}
G = 1.0 # Gravitational constant
cells_loaded = False
parts_loaded = False
def __init__(
self,
path=".",
num=None,
path_out=".",
params=None,
tag=None,
selector=None,
unit_time=U.year,
):
"""
Creates the basic structures needed for the outputs
Parameters
----------
path : str, path of where the outputs are located
path_out : str, where to store postprocessings results
params : str or params instance
tag : str, distinguish this postprocessing set
selector : RunSelector instance
unit_time : Unit instance, used for astrophysix
"""
self.path = path
self.run = os.path.basename(path)
self.num = num
self.log_id = "snap({}, {})".format(self.run, self.num)
super(SnapshotProcessor, self).__init__(path, path_out, params, tag)
# Open outfile
if not self.params.out.tag == "":
tag_name = self.params.out.tag + "_"
else:
tag_name = ""
if self.params.out.ext_subfolder:
subfolder = "/h5"
else:
subfolder = ""
self.filename = f"{self.path_out}{subfolder}/postproc_{tag_name}{num:05}.h5"
self.cells_filename = (
f"{self.path_out}{subfolder}/cells_{tag_name}{num:05}.h5" # legacy only
)
self.parts_filename = (
f"{self.path_out}{subfolder}/parts_{tag_name}{num:05}.h5" # legacy only
)
self.snap_filename = (
f"{self.path_out}{subfolder}/snap_{num:05}.h5" # new hdf5 snap format
)
self.pspec_filename = f"{self.path_out}{subfolder}/pspec_{tag_name}{num:05}.h5"
self.filaments_filename = (
f"{self.path_out}/{subfolder}filaments_{tag_name}{num:05}.h5"
)
if not os.path.exists(f"{self.path_out}{subfolder}"):
os.makedirs(f"{self.path_out}{subfolder}")
# Create selector object
if selector is None:
dirpath = os.path.dirname(path)
if dirpath == "":
dirpath = "."
selector = RunSelector(
dirpath,
self.run,
self.num,
self.params.input.nml_filename,
)
self.info = selector.info[self.run][self.num]
self.namelist = selector.namelist[self.run]
# Save important info files
if self.params.out.copy_info:
info_src = f"{self.path}/output_{self.num:05}/info_{self.num:05}.txt"
info_dest = f"{self.path_out}/info/info_{self.num:05}.txt"
if os.path.exists(info_src):
os.makedirs(os.path.dirname(info_dest), exist_ok=True)
copy_file(info_src, info_dest, update=1)
# Get box length
self.lbox = self.info["boxlen"]
# Get time
self.time = self.info["time"]
# Set post processing attributes
self.open()
self.save.root._v_attrs.dir = os.path.dirname(path)
self.save.root._v_attrs.run = os.path.basename(path)
self.save.root._v_attrs.num = self.num
self.save.root._v_attrs.lbox = self.lbox
self.save.root._v_attrs.unit_length = self.info["unit_length"]
self.save.root._v_attrs.time = self.time
self.close()
if os.path.exists(self.filaments_filename):
with open(self.filaments_filename, "rb") as f:
self.fil = pickle.load(f)
else:
self.fil = None
# Convert time unit
if isinstance(unit_time, str):
factor = self.get_nml(unit_time)
unit_time = U.Unit(
name=os.path.basename(unit_time),
base_unit=self.info["unit_time"],
coeff=factor,
)
time_in_right_unit = self.time * self.info["unit_time"].express(unit_time)
if self.params.astrophysix.generate:
self.snapshot = Snapshot(
name=str(self.num),
description="",
time=(time_in_right_unit, unit_time),
directory_path=self.path,
data_reference="OUTPUT_{}".format(self.num),
)
try:
self.init_pymses()
except IOError:
self.logger.error("Pymses not initialized", exc_info=1)
self.def_rules()
def check_variables(self, path):
# Check if variables are in output
name_conv = {
"rho": "density",
"vel": "velocity",
"P": "pressure",
"Br": "B",
"Bl": "B",
"mass": "mass",
"id": "identity",
"level": "levelp",
"epoch": "epoch",
}
hydro_filename = f"{path}/output_{self.num:05}/hydro_file_descriptor.txt"
if os.path.exists(hydro_filename):
hydro_file = open(hydro_filename)
hlines = hydro_file.readlines()
hydro_var = np.unique(
list(map(lambda s: s.split(",")[1][1:].split("_")[0], hlines[2:]))
)
else:
hydro_var = []
part_filename = f"{path}/output_{self.num:05}/part_file_descriptor.txt"
if os.path.exists(part_filename):
hydro_file = open(part_filename)
part_file = open(f"{path}/output_{self.num:05}/part_file_descriptor.txt")
plines = part_file.readlines()
part_var = np.unique(
list(map(lambda s: s.split(",")[1][1:].split("_")[0], plines[2:]))
)
else:
part_var = []
has_grav = os.path.exists(
f"{path}/output_{self.num:05}/grav_{self.num:05}.out00001"
)
def is_available(available_vars, pymsesrc, var):
if var in ["g", "phi"]:
if not has_grav:
self.logger.debug(f"Variable {var} not in output")
if var not in pymsesrc:
self.logger.debug(f"Variable {var} not in pymsesrc")
return has_grav and var in pymsesrc
else:
if var in name_conv:
if name_conv[var] not in available_vars:
self.logger.debug(f"Variable {var} not in output")
if var not in pymsesrc:
self.logger.debug(f"Variable {var} not in pymsesrc")
return name_conv[var] in available_vars and var in pymsesrc
else:
self.logger.warning(f"Variable {var} is unknown")
return False
self.params.pymses.variables = list(
filter(
partial(
is_available,
hydro_var,
pymses.rcConfig.Ramses.amr_fields.field_name_list,
),
self.params.pymses.variables,
)
)
self.params.pymses.part_variables = list(
filter(
partial(
is_available, part_var, ["vel", "mass", "id", "level", "epoch"]
),
self.params.pymses.part_variables,
)
)
if (
"Br" in pymses.rcConfig.Ramses.amr_fields.field_name_list
and "B" not in hydro_var
):
pymses.rcConfig.Ramses.amr_fields.remove_field("P")
pymses.rcConfig.Ramses.amr_fields.remove_field("Br")
pymses.rcConfig.Ramses.amr_fields.remove_field("Bl")
pymses.rcConfig.Ramses.amr_fields.add_scalar(
name="P", ivar=4, file_prefix="hydro"
)
if not has_grav and "g" in pymses.rcConfig.Ramses.amr_fields.field_name_list:
pymses.rcConfig.Ramses.amr_fields.remove_field("g")
pymses.rcConfig.Ramses.amr_fields.remove_field("phi")
def init_pymses(self):
# If ratarmount was used
if os.path.exists(f"{self.path}/output_{self.num:05}/output_{self.num:05}"):
path = f"{self.path}/output_{self.num:05}"
else:
path = self.path
self._ro = pymses.RamsesOutput(
path,
self.num,
order=self.params.pymses.order,
verbose=self.params.pymses.verbose,
check_endianness=False,
)
if self.params.pymses.check_variables:
self.check_variables(path)
self._amr = self._ro.amr_source(self.params.pymses.variables)
self._part = self._ro.particle_source(self.params.pymses.part_variables)
if self.params.pymses.filter:
self.min_coords = np.array(self.params.pymses.min_coords)
self.max_coords = np.array(self.params.pymses.max_coords)
box = reg.Box((self.min_coords, self.max_coords))
amr_filt = RegionFilter(box, self._amr)
self._amr = amr_filt
part_filt = RegionFilter(box, self._part)
self._part = part_filt
# Density operator
self._rho_op = ScalarOperator(
lambda dset: dset["rho"], self._ro.info["unit_density"]
)
# Density ray tracer
if self.params.pymses.fft:
self._rt = splatting.SplatterProcessor(
self._amr, self._ro.info, self._rho_op
)
else:
self._rt = raytracing.RayTracer(self._amr, self._ro.info, self._rho_op)
if not self.params.pymses.multiprocessing:
self._rt.disable_multiprocessing()
# Set the extent of the image
self._radius = 0.5 / self.params.pymses.zoom
if self.params.pymses.filter:
center = (self.max_coords + self.min_coords) / 2.0
im_extent = np.array(
[
self.min_coords[0],
self.max_coords[0],
self.min_coords[1],
self.max_coords[1],
]
)
distance = (self.max_coords[2] - self.min_coords[2]) / 2.0
else:
center = self.params.pymses.center
im_extent = np.array(
[
(-self._radius + center[0]),
(self._radius + center[0]),
(-self._radius + center[1]),
(self._radius + center[1]),
]
)
distance = self._radius
# Initialize cameras
self._cam = {}
for ax_los in self._ax_nb: # los = line of sight
ax_v = self._axes_v[ax_los]
real_ax = ax_los
if real_ax == "y":
real_ax = [0, -1, 0] # we look from -y to keep z upwards
self._cam[ax_los] = Camera(
center=self.params.pymses.center,
line_of_sight_axis=real_ax,
region_size=[im_extent[1] - im_extent[0], im_extent[3] - im_extent[2]],
distance=distance,
far_cut_depth=distance,
up_vector=ax_v,
map_max_size=self.params.pymses.map_size,
)
# Initialize HDF5 group
try:
self.open()
if "/maps" not in self.save:
self.save.create_group("/", "maps", "2D maps")
self.save.root.maps._v_attrs.center = center
self.save.root.maps._v_attrs.radius = self._radius
self.save.root.maps._v_attrs.im_extent = im_extent
except Exception() as e:
self.logger.error("Error in HDF5", exc_info=1)
raise e
finally:
self.close()
def load_data(self, points_src, filename, save, keys=None, group="data"):
"""
Load data from the source file in the memory.
(Long and memory heavy)
"""
loaded = False
if os.path.exists(filename):
self.logger.debug(f"Found hdf5, loading {filename}.")
hdf5 = tables.open_file(filename, mode="r")
if group in hdf5.root:
node = hdf5.get_node(f"/{group}")
loaded = True
elif "data" in hdf5.root:
self.logger.warning(
f"{filename} has no {group} group, but I found the group data."
)
node = hdf5.get_node("/data")
loaded = True
else:
self.logger.warning(f"{filename} has no {group} group")
if loaded:
data = {}
if keys is None:
keys = node._v_children
for key in keys:
if key in node._v_children:
data[key] = node[key].read()
else:
self.logger.warning(
f"Key {key} is missing, I will try a full reload"
)
loaded = False
break
hdf5.close()
if not loaded:
self.logger.debug("No hdf5, loading from ramses data.")
data_pymses = points_src.flatten()
data = {}
for key in data_pymses.fields:
data[key] = data_pymses[key]
try:
data["dx"] = data_pymses.get_sizes()
except AttributeError:
pass
data["pos"] = data_pymses.points
self.logger.info("Ramses data loaded.")
if save:
self.save_data(data, filename, group)
return data
def save_data(self, data, filename, group, overwrite=False, units=None):
self.logger.debug(f"Writing {filename}")
hdf5 = tables.open_file(filename, mode="a")
units = units or self.unit_key
try:
nb_written = 0
for key in data:
if len(data[key] > 0):
if f"/{group}/{key}" in hdf5 and overwrite:
hdf5.remove_node(f"/{group}/{key}")
elif f"/{group}/{key}" not in hdf5:
hdf5.create_array(
f"/{group}", key, data[key], "", createparents=True
)
unit = self._get_units(units[key])
hdf5.get_node(f"/{group}/{key}").unit = unit
nb_written += 1
else:
self.logger.warning("Empty key")
if "namelist" not in hdf5.root._v_attrs:
hdf5.root._v_attrs.namelist = self.namelist.data.todict()
if "info " not in hdf5.root._v_attrs:
hdf5.root._v_attrs.info = self.info
finally:
hdf5.close()
self.logger.info(
f"{filename} successfully written with {nb_written}/{len(data.keys())} updated fields"
)
def load_parts(self, keys=None, force_new_filename=False, save=True):
if not self.parts_loaded:
self.logger.debug("Loading particles")
if os.path.exists(self.parts_filename) and not force_new_filename:
filename = self.parts_filename
else:
filename = self.snap_filename
self.parts = self.load_data(
self._part,
filename,
self.params.process.save_parts and save,
keys=keys,
group="parts",
)
self.parts_loaded = True
self.logger.info("Particles loaded")
def unload_parts(self):
"""
Free space in the memory by telling the garbage collectors that
self.parts is not needed
"""
if self.parts_loaded:
del self.parts
self.parts_loaded = False
self.logger.info("Particles unloaded")
def load_cells(self, keys=None, force_new_filename=False, save=True):
"""
Load all cells from the source file in the memory.
Cells will be accessible trough self.cells
(Long and memory heavy)
"""
if not self.cells_loaded:
self.logger.debug("Loading cells")
if os.path.exists(self.cells_filename) and not force_new_filename:
filename = self.cells_filename
else:
filename = self.snap_filename
cells_src = CellsToPoints(self._amr)
self.cells = self.load_data(
cells_src,
filename,
save,
keys=keys,
group="cells",
)
self.cells_loaded = True
self.logger.info("Cells loaded")
def unload_cells(self):
"""
Free space in the memory by telling the garbage collector that
self.cells is not needed
"""
if self.cells_loaded:
del self.cells
self.cells_loaded = False
self.logger.info("Cells unloaded")
def load_destructured(self, save=True):
self.load_cells(save=save)
self.load_parts(save=save)
def unload_destructured(self):
self.unload_cells()
self.unload_parts()
def convert_hdf5(self, filename=None):
self.load_destructured(save=False)
self.load_sinks()
sinks = {key: self.sinks[key].values for key in self.sinks}
if filename is None:
filename = self.snap_filename
self.save_data(self.cells, filename, "cells")
self.save_data(self.parts, filename, "parts")
self.save_data(sinks, filename, "sinks", units=self.unit_sinks)
self.unload_destructured()
def get_nml(self, nml_key):
if self.namelist is not None:
value = self.namelist[nml_key]
else:
raise AttributeError("No namelist associated with this snapshot")
return value
def getter_pos_disk(self, dset):
"""
Returns the position in normalized and centered units
"""
try:
pos = dset.points
except AttributeError:
pos = dset["pos"]
pos = pos - np.array(self.params.disk.center)
return pos
def getter_vect_r(self, dset, name_vect):
"""Radial component of a vector"""
r = self.getter_pos_disk(dset)[:, :2]
ur = np.transpose((np.transpose(r) / np.sqrt(np.sum(r**2, axis=1))))
return np.einsum("ij, ij -> i", dset[name_vect][:, :2], ur)
def getter_vect_phi(self, dset, name_vect):
"""Azimuthal component of a vector"""
r = self.getter_pos_disk(dset)[:, :2]
r_norm = np.sqrt(np.sum(r**2, axis=1))
rot = np.array([[0, -1], [1, 0]])
uphi = np.transpose(np.einsum("ij, kj -> ik", rot, r) / r_norm)
vect = dset[name_vect][:, :2]
return np.einsum("ij,ij -> i", vect, uphi)
def oct_getter_pos_disk(self, dset):
"""
Returns the position in normalized units centered on the position of the star
"""
pos = dset.get_cell_centers()
pos = pos - np.array(self.params.disk.center)
return pos
def oct_getter_vect_r(self, dset, name_vect):
"""Radial component of a vector"""
r = self.oct_getter_pos_disk(dset)[:, :, :2]
ur = np.transpose(
(np.transpose(r, (2, 0, 1)) / np.sqrt(np.sum(r**2, axis=2))), (1, 2, 0)
)
return np.einsum("ikj, ikj -> ik", dset[name_vect][:, :, :2], ur)
def oct_getter_vect_phi(self, dset, name_vect):
"""Azimuthal component of a vector"""
r = self.oct_getter_pos_disk(dset)[:, :, :2]
r_norm = np.sqrt(np.sum(r**2, axis=2))
rot = np.array([[0, -1], [1, 0]])
uphi = np.transpose(np.einsum("ij, klj -> ikl", rot, r) / r_norm, (1, 2, 0))
vect = dset[name_vect][:, :, :2]
return np.einsum("ikj,ikj -> ik", vect, uphi)
def oct_getter_vr(self, dset):
return self.oct_getter_vect_r(dset, "vel")
def oct_getter_vphi(self, dset):
"""Azimuthal velocity"""
return self.oct_getter_vect_phi(dset, "vel")
def datacube(self, getter, level=None, unit=U.none):
"""
Return a datacube of the source box.
Parameters
----------
getter : callable
A callable that extract the wanted data from a pymses dataset
unit : U.Unit
Unit of the resulting dataset
Returns
-------
A numpy array containing the slice
"""
unit = self._get_units(unit)
operator = ScalarOperator(getter, unit)
extractor = cube3d.CubeExtractor(self._amr, operator)
if level is None:
level = self.get_nml("amr_params/levelmin")
size = 1.0
cam = Camera(
center=self.params.pymses.center,
line_of_sight_axis="z",
region_size=[size, size],
distance=size / 2.0,
far_cut_depth=size / 2.0,
up_vector="y",
map_max_size=2**level,
)
cube = extractor.process(cam, cube_size=1.0, resolution=2**level).data
return cube
def slice(self, getter, ax_los="z", z=0.0, unit=U.none):
"""
Slice process function.
Return a slice of the source box.
Parameters
----------
getter : callable
A callable that extract the wanted data from a pymses dataset
ax_los : string
The axis perpendicular to the slice plane
z : float
Coordinate of the slice on the ax_los axis
unit : U.Unit
Unit of the resulting dataset
Returns
-------
A numpy array containing the slice
"""
unit = self._get_units(unit)
op = ScalarOperator(getter, unit)
datamap = slicing.SliceMap(self._amr, self._cam[ax_los], op, z=z)
return datamap.map.T
def ax_avg(
self, oct_getter, ax_los, unit=U.none, mass_weighted=True, surf_qty=False
):
"""
Map of the average of a quantity (given by getter) along an axis (ax_los)
Returns 2D array if getter returns a scalar quantity
If surf_qty is set (projection mode), mass_weighted is ignored
"""
unit = self._get_units(unit)
if surf_qty:
op = ScalarOperator(oct_getter, unit)
else:
if mass_weighted:
def num(dset):
value = oct_getter(dset)
rho = getter_rho(dset)
return rho * value
def denum(dset):
return getter_rho(dset)
else: # Volume weighted
def num(dset):
value = oct_getter(dset)
return value
def denum(dset):
return 1.0
op = FractionOperator(num, denum, unit)
if self.params.pymses.fft:
rt = splatting.SplatterProcessor(self._amr, self._ro.info, op)
else:
rt = raytracing.RayTracer(self._amr, self._ro.info, op)
if not self.params.pymses.multiprocessing:
rt.disable_multiprocessing()
datamap = rt.process(self._cam[ax_los], surf_qty=surf_qty)
return datamap.map.T
def _get_axis(self, axis):
if isinstance(axis, str):
axis = self._ax_nb[axis]
self.load_cells()
return np.sort(np.unique(self.cells["pos"][:, axis]))
def _plane_avg_uniform(self, getter, axis, unit=U.none, mass_weighted=False):
"""
Profile of the average of a quantity (given by getter) perpendicular to an axis
WARNING : This version only works on an uniform grid, need of a box version for AMR
Returns 1D array if getter returns a scalar quantity
"""
unit = self._get_units(unit)
self.load_cells()
if isinstance(axis, str):
axis = self._ax_nb[axis]
axis_data = self.cells["pos"][:, axis]
value = getter(self.cells)
df = pd.DataFrame({"axis": axis_data})
if mass_weighted:
mass = mass_func(self.cells)
tot_mass = np.sum(mass)
df["value"] = value * mass / tot_mass
else:
df["value"] = value
if self.params.process.unload_cells:
self.unload_cells()
df.sort_values("axis", inplace=True)
return df.groupby("axis").mean().values[:, 0]
def sum(self, getter, mass_weighted=True):
"""
Global sum of the quantity returned by getter (variable must be extensive)
Returns a scalar (or a vector if the quantity returned by getter is a getter, eg. speed)
"""
self.load_cells()
value = getter(self.cells)
if mass_weighted:
mass = mass_func(self.cells)
# Transpose (.T) is for vectorial values
data = np.sum((mass * value.T).T, axis=0) / np.sum(mass)
else:
data = np.sum(value, axis=0)
if self.params.process.unload_cells:
self.unload_cells()
return data
def vol_avg(self, getter, mass_weighted=True):
"""
Global volumic (or mass_weighted) average of the quantity returned by getter
Returns a scalar (or a vector if the quantity returned by getter is a getter, eg. speed)
"""
self.load_cells()
value = getter(self.cells)
if mass_weighted:
weight = mass_func(self.cells)
else:
weight = vol_func(self.cells)
# Transpose (.T) is for vectorial values
data = np.sum((weight * value.T).T, axis=0) / np.sum(weight)
if self.params.process.unload_cells:
self.unload_cells()
return data
def vol_pdf(
self,
getter,
bins=100,
old_unit=None,
unit=None,
logbins=False,
weight_func=vol_func,
**kwargs,
):
self.load_cells()
data = getter(self.cells)
if old_unit is not None and unit is not None:
data *= old_unit.express(unit)
if logbins:
data = np.log10(data)
weights = weight_func(self.cells)
if self.params.process.unload_cells:
self.unload_cells()
values, edges = np.histogram(
data, bins, weights=weights, density=True, **kwargs
)
centers = 0.5 * (edges[1:] + edges[:-1])
return (np.stack([values, centers]), {"logbins": logbins})
def _Brho(self, bins=100, logbins=True):
"""
Mean of B in rho bins
"""
self.load_cells()
B = getter_B_int(self.cells)
rho = getter_rho(self.cells)
centers, B_mean = mean_by_bins(rho, B, bins, logbins)
if self.params.process.unload_cells:
self.unload_cells()
return ({"rho": centers, "B": B_mean}, {"logbins": logbins})
def mean_by_bins(
self, name_x, name_y, ax_los, group="/maps/", bins=100, logbins=True
):
"""
Compute the mean of y in bins of x, where x, y are to array of the hdf5 file
Parameters
----------
x_name, y_name : str, path of x and y in the hdf5 file
bins : int, number of bins
logbins : bool, if true, the bins will be logaritmically distributed
"""
x = self.get_value(group + name_x + "_" + ax_los)
y = self.get_value(group + name_y + "_" + ax_los)
centers, values = mean_by_bins(x, y, bins, logbins)
# return ({os.path.basename(name_x) : centers, os.path.basename(name_y) : y_mean},
# {"logbins" : logbins})
return (np.stack([values, centers]), {"logbins": logbins})
def _Ek_Eb_rho(self, bins=100, logbins=True):
"""
Mean of Ek/Eb in rho bins
"""
self.load_cells()
mean_speed = self.get_value("/globals/mwa_speed", unit=U.km_s)
vel_fluct = (self.cells)["vel"] * self.info["unit_velocity"].express(
U.km_s
) - mean_speed
B_norm = getter_B_int(self.cells)
B_norm = B_norm * self.info["unit_mag"].express(U.T)
v_norm = np.sqrt(
np.sum((vel_fluct * 10 ** (3)) ** 2, axis=1)
) # v_norm [m/s] et vel_fluct [km/s]
rho = getter_rho(self.cells)
rho_kg_m3 = rho * self.info["unit_density"].express(U.kg_m3)
eb = 0.5 * (B_norm) ** 2 / (4 * np.pi * 10 ** (-7)) # mettre le bon mu
ek = 0.5 * v_norm**2 * rho_kg_m3
rapport = ek / eb
if logbins:
rho_bins = np.logspace(
np.log10(np.min(rho)), np.log10(np.max(rho)), bins, base=10
)
else:
rho_bins = np.linspace(np.min(rho), np.max(rho), bins)
# For each cell, bin_number contains the number of the bins it belongs to
bin_number = np.zeros(len(B_norm))
# Go through the min value of rho of each bin
for rho_min in rho_bins[:-1]:
bin_number = bin_number + (rho > rho_min).astype(int)
# Compute the mean in each bin
ek_eb = np.zeros(len(rho_bins) - 1)
for i in range(len(ek_eb)):
ek_eb[i] = np.mean(rapport[bin_number == i])
# Get the center of each bin
if logbins:
centers = 10 ** (0.5 * (np.log10(rho_bins[1:]) + np.log10(rho_bins[:-1])))
else:
centers = 0.5 * (rho_bins[1:] + rho_bins[:-1])
if self.params.process.unload_cells:
self.unload_cells()
return ({"rho": centers, "Ek_Eb_rho": ek_eb}, {"logbins": logbins})
def cos_vfluct_B(self):
"return the cos of the angle between the magnetic field and the velocity fluctuation field"
mean_speed = self.get_value("/globals/mwa_speed")
def getter_cos_vfluct_B(dset):
vel_fluct = dset["vel"] - mean_speed
B_norm = np.sqrt(np.sum(dset["Br"] ** 2, axis=1))
v_norm = np.sqrt(np.sum(vel_fluct**2, axis=1))
# Compute the dot product in each cell
dot_prod = np.einsum("ij,ij->i", vel_fluct, dset["Br"])
return np.abs(dot_prod) / (v_norm * B_norm)
return self.vol_pdf(getter_cos_vfluct_B)
def _mwa_sigma(self, axes=["x", "y", "z"]):
mw_speed = self.get_value("/globals/mwa_speed")
if axes == ["x", "y", "z"]:
def getter(dset):
return np.sum((dset["vel"] - mw_speed) ** 2, axis=1)
else:
def getter(dset):
sigma_squared = 0.0
for ax in axes:
ax_nb = self._ax_nb[ax]
sigma_sq_ax = (dset["vel"][:, ax_nb] - mw_speed[ax_nb]) ** 2
sigma_squared = sigma_squared + sigma_sq_ax
return sigma_squared
return np.sqrt(self.vol_avg(getter, mass_weighted=True))
def _coldens(self, ax_los):
datamap = self._rt.process(self._cam[ax_los], surf_qty=True)
return datamap.map.T
def _B_int(self, ax_los, z=0.0):
"""
Slice ont the intensity of the magnetic field
"""
B_op = ScalarOperator(
lambda dset: np.sqrt(np.sum(dset["Br"] ** 2, axis=1)),
self._ro.info["unit_mag"],
)
dmap_B = (slicing.SliceMap(self._amr, self._cam[ax_los], B_op, z=z)).map.T
return dmap_B
def _temperature(self, ax_los, z=0.0):
P_op = ScalarOperator(lambda dset: dset["P"], self._ro.info["unit_pressure"])
dmap_P = (slicing.SliceMap(self._amr, self._cam[ax_los], P_op, z=z)).map.T
dmap_rho = self.get_value("/maps/slice_rho_{}".format(ax_los))
return dmap_P / dmap_rho
def _levels(self, ax_los):
self._amr.set_read_levelmax(self.params.pymses.levelmax)
level_op = MaxLevelOperator()
rt_level = raytracing.RayTracer(self._amr, self._ro.info, level_op)
if not self.params.pymses.multiprocessing:
rt_level.disable_multiprocessing()
datamap = rt_level.process(self._cam[ax_los], surf_qty=True)
return datamap.map.T
def _jeans(self, ax_los):
dmap_T = self.get_value("/maps/T_" + ax_los)
dmap_rho = self.get_value("/maps/slice_rho_" + ax_los)
dmap_jeans = np.sqrt(np.pi * dmap_T / dmap_rho)
return dmap_jeans
def _jeans_ratio(self, ax_los):
dmap_jeans = self.get_value("/maps/jeans_" + ax_los)
dmap_levels = self.get_value("/maps/levels_" + ax_los)
dmap_jeans_ratio = dmap_jeans * 2 ** (dmap_levels)
return dmap_jeans_ratio
def _omega_average(self, ax_los):
# Operator to compute the angular speed times rho
lbox = self.lbox
def omega_rho_func(dset):
pos = self.oct_getter_pos_disk(dset)
xx = pos[:, :, 0] * lbox
yy = pos[:, :, 1] * lbox
rc2 = xx**2 + yy**2 # square of cylindrical radius
vx = dset["vel"][:, :, 0]
vy = dset["vel"][:, :, 1]
omega_rho = dset["rho"]
vyx = vy * xx
vxy = vx * yy
omega_rho = omega_rho * (vyx - vxy) / rc2
return omega_rho
# Operator to compute the angular speed
omega_unit = self._ro.info["unit_velocity"] / (
self._ro.info["unit_length"] / lbox
)
omega_op = FractionOperator(
omega_rho_func,
lambda dset: dset["rho"],
omega_unit,
)
# Ray tracer for the angular speed
rt_omega = raytracing.RayTracer(self._amr, self._ro.info, omega_op)
if not self.params.pymses.multiprocessing:
rt_omega.disable_multiprocessing()
dmap_omega = rt_omega.process(self._cam[ax_los]).map.T
return dmap_omega
def _toomreQ_disk(
self, ax_los, omega_approx=False, G1_units=False, coarsen_factor=1
):
"""
Compute the Toomre Q parameter
"""
# Get maps
if G1_units:
G = 1.0
cs2 = self.get_value("/maps/T_mwavg_z")
coldens = self.get_value("/maps/coldens_z")
omega = self.get_value("/maps/omega_mwavg_z")
space_coeff = self.lbox
else:
G = U.G.express(U.kg**-1 * U.m**3 * U.s**-2)
cs2 = self.get_value("/maps/T_mwavg_z", unit=U.m**2 * U.s**-2)
coldens = self.get_value("/maps/coldens_z", unit=U.kg * U.m**-2)
omega = self.get_value("/maps/omega_mwavg_z", unit=U.s**-1)
space_coeff = self.info["unit_length"].express(U.m)
map_size = self.params.pymses.map_size
if coarsen_factor > 1:
map_size = map_size // coarsen_factor
omega = degrade_map(omega, map_size)
coldens = degrade_map(coldens, map_size)
cs2 = degrade_map(cs2, map_size)
# Compute Q
if omega_approx:
# Use angular frequency as epiciclic frequency (true if the disk is Keplerian)
kappa = omega
else:
# Get coordinates
im_extent = np.array(self.save.root.maps._v_attrs.im_extent) * space_coeff
center = np.array(self.params.disk.center) * space_coeff
# Physical size of cells
dx = (im_extent[1] - im_extent[0]) / map_size
dy = (im_extent[3] - im_extent[2]) / map_size
# Physical coordinates of the center of the cells
x = np.linspace(im_extent[0], im_extent[1], map_size) + 0.5 * dx - center[0]
y = np.linspace(im_extent[2], im_extent[3], map_size) + 0.5 * dy - center[1]
xx, yy = np.meshgrid(x, y)
rr = np.sqrt(xx**2 + yy**2)
# Compute kappa**2 = (2omega/R)*d(R**2*omega)/dR
Gy, Gx = np.gradient(rr**2 * omega, dy, dx, edge_order=2)
Gr = (xx * Gx + yy * Gy) / rr
kappa_square = (2 * omega / rr) * Gr
kappa = np.sqrt(kappa_square)
map_Q = (np.sqrt(cs2) * kappa) / (np.pi * G * coldens)
return map_Q
def _radial_bins(self, ax_los="z"):
"""
Computes radial bins (for disk)
"""
center = np.array(self.params.disk.center) * self.lbox
im_extent = np.array(self.save.root.maps._v_attrs.im_extent) * self.lbox
# radius of the corner of the box plus a margin
rad_of_box = (
np.sqrt((im_extent[1] - center[0]) ** 2 + (im_extent[3] - center[1]) ** 2)
+ 0.1
)
bin_in = self.params.disk.bin_in
bin_out = self.params.disk.bin_out
nb_bin = self.params.disk.nb_bin
# radial bins
if self.params.disk.binning in ["log", "logarithmic"]:
lrad_in = np.log10(bin_in)
lrad_ext = np.log10(bin_out)
rad_bins = np.logspace(lrad_in, lrad_ext, num=nb_bin)
elif self.params.disk.binning in ["lin", "linear"]:
rad_bins = np.linspace(bin_in, bin_out, num=nb_bin)
else:
raise RuntimeWarning(
f"Invalid parameter {self.params.disk.binning} for disk binning method, using linear binning"
)
rad_bins = np.linspace(bin_in, bin_out, num=nb_bin)
# Add boundaries
rad_bins = np.concatenate(([0.0], rad_bins, [rad_of_box]))
return rad_bins
def _radial_centers(self, ax_los="z"):
radial_bins = self.get_value("/radial/radial_bins_" + ax_los)
bin_centers = 0.5 * (radial_bins[1:] + radial_bins[:-1])
return bin_centers
def _rr(self, ax_los="z"):
"""
Computes the radius from the center
"""
im_extent = np.array(self.save.root.maps._v_attrs.im_extent) * self.lbox
map_size = self.params.pymses.map_size
center = np.array(self.params.disk.center) * self.lbox
# Physical size of cells
dx = (im_extent[1] - im_extent[0]) / map_size
dy = (im_extent[3] - im_extent[2]) / map_size
# Physical coordinates of the center of the cells
x = np.linspace(im_extent[0], im_extent[1], map_size) + 0.5 * dx
y = np.linspace(im_extent[2], im_extent[3], map_size) + 0.5 * dy
xx, yy = np.meshgrid(x, y)
# Physical radius
rr = np.sqrt((xx - center[0]) ** 2 + (yy - center[1]) ** 2)
return rr
def _bins_on_map(self, ax_los="z"):
rad_bins = self.get_value("/radial/radial_bins_" + ax_los)
rr = self.get_value("/maps/rr_" + ax_los)
# Find appropriate bin for each coordinate set
bins = np.zeros(rr.shape, dtype=int)
for r in rad_bins[1:]:
bins = bins + (rr >= r).astype(int)
return bins
def _rad_avg(self, name, ax_los="z", mass_weighted=False):
radial_bins = self.get_value("/radial/radial_bins_" + ax_los)
bins_on_map = self.get_value("/maps/bins_on_map_" + ax_los)
dmap = self.get_value("/maps/" + name + "_" + ax_los)
if mass_weighted:
coldens = self.get_value("/maps/coldens_" + ax_los)
# mean of all the cells in the bin
mean_bin = np.zeros(len(radial_bins) - 1)
for j in range(len(radial_bins) - 1):
mask_bin = bins_on_map == j
if mass_weighted:
weight = coldens[mask_bin]
mean_bin[j] = np.nanmean(dmap[mask_bin] * weight)
mean_bin[j] = mean_bin[j] / np.mean(weight)
else:
mean_bin[j] = np.nanmean(dmap[mask_bin])
return mean_bin
def _rad_avg_map(self, name, ax_los="z", mass_weighted=False):
radial_bins = self.get_value("/radial/radial_bins_" + ax_los)
bins_on_map = self.get_value("/maps/bins_on_map_" + ax_los)
rr = self.get_value("/maps/rr_" + ax_los)
if mass_weighted:
mean_bin = self.get_value("/radial/rad_mwavg_" + name + "_" + ax_los)
else:
mean_bin = self.get_value("/radial/rad_avg_" + name + "_" + ax_los)
# Add value for border
mean_bin = np.concatenate((mean_bin, [mean_bin[-1]]))
rr_flat = rr.flatten()
bins_on_map_flat = bins_on_map.flatten()
# Compute the map azimuthally averaged
# use linear interpolation to improve accuracy
avg_flat = (radial_bins[bins_on_map_flat + 1] - rr_flat) * mean_bin[
bins_on_map_flat
]
avg_flat = (
avg_flat
+ (rr_flat - radial_bins[bins_on_map_flat]) * mean_bin[bins_on_map_flat + 1]
)
avg_flat = avg_flat / (
radial_bins[bins_on_map_flat + 1] - radial_bins[bins_on_map_flat]
)
avg_map = np.reshape(avg_flat, rr.shape)
return avg_map
def _fluct_map(self, name, ax_los="z", mass_weighted=False):
dmap = self.get_value("/maps/" + name + "_" + ax_los)
if mass_weighted:
avg_map = self.get_value("/maps/rad_mwavg_map_" + name + "_" + ax_los)
else:
avg_map = self.get_value("/maps/rad_avg_map_" + name + "_" + ax_los)
return dmap / avg_map
def _dispersion_rad(self, name, ax_los="z"):
"""
Computes the dispersion in radial bins of the quantity `name`
Parameters
----------
name : name of 2D map of a scalar quantity
ax_los : axis of the line of sight
"""
# 1. Get full storage names for the map and its azimuthal avererage
map_name = f"/maps/{name}_{ax_los}"
avg_map_name = f"/maps/rad_avg_map_{name}_{ax_los}"
# 2. Get maps from the storage
dmap = self.get_value(map_name)
avgmap = self.get_value(avg_map_name)
# 3. Get radial bins and centers
bins_on_map = self.get_value(f"/maps/bins_on_map_{ax_los}")
centers = self.get_value(f"/radial/radial_centers_{ax_los}")
# 4. Initialize dispersion array
sigma = np.zeros(len(centers))
# 5. Compute RMS in each bin
for j in range(len(centers)):
mask_bin = bins_on_map == j
sigma[j] = np.sqrt(
np.sum((dmap[mask_bin] - avgmap[mask_bin]) ** 2) / np.sum(mask_bin)
)
# 6. Returns computed RMS
return sigma
def _rad_fluct_pdf(self, name, ax_los="z", mass_weighted=False):
if mass_weighted:
fluct_map = self.get_value("/maps/mwfluct_" + name + "_" + ax_los)
else:
fluct_map = self.get_value("/maps/fluct_" + name + "_" + ax_los)
rr = self.get_value("/maps/rr_" + ax_los)
mask_pdf = (
(rr > self.params.disk.rmin_pdf)
& (rr < self.params.disk.rmax_pdf)
& (fluct_map > 0)
)
values, edges = np.histogram(
np.log10(fluct_map[mask_pdf].flatten()),
self.params.pdf.nb_bin,
range=self.params.pdf.range,
density=True,
)
centers = 0.5 * (edges[1:] + edges[:-1])
return np.stack([values, centers])
def _fit_pdf(self, name, ax_los="z"):
pdf = self.get_value("/hist/fluct_pdf_" + name + "_" + ax_los)
values, centers = pdf
mask_fit = (
(centers > self.params.pdf.xmin_fit)
& (centers < self.params.pdf.xmax_fit)
& (values > self.params.pdf.fit_cut * np.max(values))
)
(slope, origin, correlation, _, stderr) = linregress(
centers[mask_fit], np.log10(values[mask_fit])
)
pdf.attrs.slope = slope
pdf.attrs.origin = origin
pdf.attrs.correlation = correlation
pdf.attrs.stderr = stderr
pdf.attrs.var = np.var
return True
def _alpha_disk(self, ax_los="z"):
"Map of the Rayleigh contribution to the Shakura&Sunaev alpha parameter for disks"
assert ax_los == "z"
# Mean part
T_avg = self.get_value("/maps/rad_avg_map_T_mwavg_z")
radial_bins = self.get_value("/radial/radial_bins_" + ax_los)
mean_bin_vr = self.get_value("/radial/rad_avg_" + "slice_velr" + "_" + ax_los)
mean_bin_vphi = self.get_value(
"/radial/rad_avg_" + "slice_velphi" + "_" + ax_los
)
mean_bin_vr = np.concatenate((mean_bin_vr, [mean_bin_vr[-1]]))
mean_bin_vphi = np.concatenate((mean_bin_vphi, [mean_bin_vr[-1]]))
# Fluct part
def getter_alpha_num(dset):
r = np.sqrt(
np.sum((self.lbox * self.oct_getter_pos_disk(dset)) ** 2, axis=2)
)
bins = np.zeros(r.shape, dtype=int)
for r0 in radial_bins[1:-1]:
bins = bins + (r >= r0).astype(int)
vr_mean = mean_bin_vr[bins]
vphi_mean = mean_bin_vphi[bins]
# use linear interpolation
# v = ((r[i+1] - r)v[i] + (r - r[i])v[i + 1]) / (r[i+1] - r[i])
vr_mean = (radial_bins[bins + 1] - r) * vr_mean
vr_mean = vr_mean + (r - radial_bins[bins]) * mean_bin_vr[bins + 1]
vr_mean = vr_mean / (radial_bins[bins + 1] - radial_bins[bins])
vphi_mean = (radial_bins[bins + 1] - r) * vphi_mean
vphi_mean = vphi_mean + (r - radial_bins[bins]) * mean_bin_vphi[bins + 1]
vphi_mean = vphi_mean / (radial_bins[bins + 1] - radial_bins[bins])
vr = self.oct_getter_vr(dset)
vphi = self.oct_getter_vphi(dset)
alpha = (vphi - vphi_mean) * (vr - vr_mean)
return alpha
alpha_f = (
self.ax_avg(getter_alpha_num, "z", unit=U.none, mass_weighted=True) / T_avg
)
# alpha
alpha = (2.0 / 3) * alpha_f
return alpha
def _alpha_grav(self, ax_los="z"):
"Map of the gravitational contribution to the Shakura&Sunaev alpha parameter for disks"
assert ax_los == "z"
T_avg = self.get_value("/maps/rad_avg_map_T_mwavg_z")
coldens = self.get_value("/maps/rad_avg_map_coldens_z")
def getter_alpha_grav(dset):
r2 = np.sum((self.lbox * self.oct_getter_pos_disk(dset)) ** 2, axis=2)
e2 = (1.0 / 256.0) ** 2
gstar = -self.G * self.params.disk.mass_star / (e2 + r2)
gr = self.oct_getter_vect_r(dset, "g") - gstar
gphi = self.oct_getter_vect_phi(dset, "g")
return gr * gphi / (4 * np.pi * self.G)
alpha_g = self.ax_avg(getter_alpha_grav, "z", unit=U.none, surf_qty=True) / (
coldens * T_avg
)
# alpha
alpha_g = (2.0 / 3) * alpha_g
return alpha_g
def load_sinks(self):
csv_name = f"{self.path}/output_{self.num:05}/sink_{self.num:05}.csv"
if not os.path.exists(csv_name): # If ratarmount was used
csv_name = f"{self.path}/output_{self.num:05}/output_{self.num:05}/sink_{self.num:05}.csv"
if not os.path.exists(csv_name):
self.sinks = {}
return {}
f = open(csv_name)
first_line = f.readline()
second_line = f.readline()
f.close()
if len(first_line) > 0 and first_line[1] == "#":
header = first_line[3:-2].split(",")
units = second_line[3:-2].split(",")
df = pd.read_csv(csv_name, header=None, names=header, skiprows=2)
for key in header:
if units[header.index(key)] == "m":
df[key] = df[key] * self.info["unit_mass"].express(U.Msun)
if "M" in df:
df["msink"] = df["M"]
else:
header = [
"id",
"msink",
"dmfsink",
"x",
"y",
"z",
"vx",
"vy",
"vz",
"rot_period",
"lx",
"ly",
"lz",
"acc_rate",
"acc_lum",
"age",
"int_lum",
"Teff",
]
df = pd.read_csv(csv_name, header=None, names=header)
self.sinks = df
return {key: df[key].values for key in df}
def pspec(self, overwrite_file=False, **kwargs):
outfile = self.pspec_filename
if overwrite_file or not os.path.exists(self.pspec_filename):
pspec.pspec(repo=self.path, iouts=[self.num], outfile=outfile, **kwargs)
def _write_particles(self):
"""Ensure particles are written in the hdf5 file"""
if not os.path.exists(self.parts_filename) and not self.parts_loaded:
self.load_parts()
self.unload_parts()
return np.array([self.parts_filename])
def _write_cells(self):
"""Ensure cells are written in the hdf5 file"""
if not os.path.exists(self.cells_filename) and not self.cells_loaded:
self.load_cells()
self.unload_cells()
return np.array([self.cells_filename])
def _filaments(self):
datamap_name = self.params.filaments.datamap
verbose = self.params.filaments.verbose
rmin_frac = self.params.filaments.rmin
rmax_frac = self.params.filaments.rmax
size_thresh = self.params.filaments.size_thresh
skel_thresh = self.params.filaments.skel_thresh
branch_thresh = self.params.filaments.branch_thresh
glob_thresh = self.params.filaments.glob_thresh
datamap = self.get_value("/maps/" + datamap_name + "_z")
shape = datamap.shape
x = np.arange(shape[0]) - shape[0] / 2
y = np.arange(shape[1]) - shape[1] / 2
xx, yy = np.meshgrid(x, y)
rr = np.sqrt(xx**2 + yy**2)
rmin = int(rmin_frac * shape[0])
rmax = int(rmax_frac * shape[0])
mask = (rr >= rmin) & (rr <= rmax)
datamap[np.logical_not(mask)] = np.nan
self.fil = FilFinder2D(datamap, distance=1 * u.cm, beamwidth=1 * u.pix)
self.fil.preprocess_image(flatten_percent=95)
self.fil.create_mask(
verbose=verbose,
smooth_size=1 * u.pix,
adapt_thresh=4 * u.pix,
size_thresh=size_thresh * u.pix**2,
glob_thresh=glob_thresh,
fill_hole_size=0.1 * u.pix**2,
)
self.fil.medskel(verbose=verbose)
self.fil.analyze_skeletons(
skel_thresh=skel_thresh * u.pix,
branch_thresh=branch_thresh * u.pix,
relintens_thresh=0.1,
)
self.fil.exec_rht()
self.fil.find_widths()
with open(self.filaments_filename, "wb") as f:
pickle.dump(self.fil, f, pickle.HIGHEST_PROTOCOL)
return True
def _filaments_center(self):
"""
Fill an array with center postion for each cell in a filament
"""
fil = self.fil
mask = fil.mask.copy()
_, distance = medial_axis(mask, return_distance=True)
skel = fil.skeleton
i_center = np.zeros(distance.shape, dtype=int)
j_center = np.zeros(distance.shape, dtype=int)
x_mask, y_mask = np.where(mask)
for k in range(len(x_mask)):
find_center(distance, skel, i_center, j_center, x_mask[k], y_mask[k])
return np.stack([i_center, j_center])
def _filaments_forces(self):
"""
Compute forces within a filament (for disks), within the slice at z=0
"""
GM = self.G * self.params.disk.mass_star # Mass parameter
# Find center of filaments
i_center, j_center = self._filaments_center()
# Get slices and projections at z = 0
vphi = self.get_value("/maps/slice_velphi_z")
gr = self.get_value("/maps/slice_gr_z")
Pz = self.get_value("/maps/slice_P_z")
rho = self.get_value("/maps/slice_rho_z")
vr = self.get_value("/maps/slice_velr_z")
# Get coordinates
im_extent = np.array(self.save.root.maps._v_attrs.im_extent) * self.lbox
map_size = self.params.pymses.map_size
center = np.array(self.params.disk.center) * self.lbox
# Physical size of cells
dx = (im_extent[1] - im_extent[0]) / map_size
dy = (im_extent[3] - im_extent[2]) / map_size
# Physical coordinates of the center of the cells
x = np.linspace(im_extent[0], im_extent[1], map_size) + 0.5 * dx - center[0]
y = np.linspace(im_extent[2], im_extent[3], map_size) + 0.5 * dy - center[1]
xx, yy = np.meshgrid(x, y)
rr = np.sqrt(xx**2 + yy**2)
# Rotational support
R = vphi**2 / rr
# Equilibrium
Gvry, Gvrx = np.gradient(vr, dy, dx, edge_order=2)
gradvr = (xx * Gvrx + yy * Gvry) / rr
dvr = vr * gradvr # Complete derivative
# Thermal support
GPy, GPx = np.gradient(Pz, dy, dx, edge_order=2)
gradPr = (xx * GPx + yy * GPy) / rr
fP = -gradPr / rho
# Gravitational field
e2 = (1.0 / 512) ** 2
gstar = -GM / (rr**2 + e2)
# Substract gravitational field from the star
Rdisk = R + gstar
gdisk = gr - gstar
# Forces at the center of filaments
Rdisk_center = Rdisk[i_center, j_center]
gr_center = gdisk[i_center, j_center]
dvr_center = dvr[i_center, j_center]
# Forces for the filaments equilibrium
Rfil = Rdisk - Rdisk_center
gfil = gdisk - gr_center
fPfil = fP
dvr_fil = dvr - dvr_center
return {"gfil": gfil, "Rfil": Rfil, "fPfil": fPfil, "dvr": dvr_fil}
def make_clump_hop(self, threshold_density=1e2):
"""
Apply HOP algorithm
Args:
threshold_density (float): select only cells over threshold
"""
# Selection of cells
mask = (
self.cells["rho"] * self.info["unit_density"].express(U.H_cc)
> threshold_density
)
ncells = np.sum(mask)
# fill the matrice with ID, x,y,z and masses of particles
cells_group = np.zeros((ncells, 6))
cells_group[:, 0] = np.arange(ncells) # index
position = self.cells["pos"][mask] * self.info["unit_length"].express(U.pc)
cells_group[:, 1:4] = position # position
density = self.cells["rho"][mask] * self.info["unit_density"].express(
U.Msun / U.pc**3
)
size = self.cells["dx"][mask] * self.info["unit_length"].express(U.pc)
cells_group[:, 4] = density * size**3 # mass
# save file.txt
head = str(ncells)
np.savetxt(
self.filename[:-3] + "_hop.txt",
cells_group[:, :-1],
fmt="%10d %.10e %.10e %.10e %.10e",
header=head,
delimiter=" ",
comments=" ",
)
# save file.den
f = open(self.filename[:-3] + "_hop.den", "wb")
f.write(pack("I", ncells))
self.cells["rho"][mask].astype("f").tofile(f)
f.close()
# exec HOP algo
h = HOP(self.filename[:-3] + "_hop.txt", os.path.dirname(self.filename))
h.process_hop()
# get the igroup array
group_ids = h.get_group_ids()
# sort it and apply the sorting to the coordinates
# this means that the particules of group 1 are written first then of group 2 etc...
ind_sort = np.argsort(group_ids)
cells_group = cells_group[ind_sort]
cells_group[6] = group_ids[ind_sort]
# Make it a pandas' DataFrame
cells_group = pd.DataFrame(
cells_group, header=["id", "x", "y", "z", "mass", "group"]
)
self.clumps = cells_group
return cells_group
def clump_properties(self, nograv=False, log_min=None, log_max=None):
cells_group = self.make_clump_hop()
cells_group.groupby("group")
def def_rules(self):
self.rules = {
# Base rules
"coldens": Rule(
self._coldens,
"Column density",
"/maps",
unit=self.info["unit_density"] * self.info["unit_length"],
),
"T_mwavg": Rule(
partial(
self.ax_avg,
getter_T,
mass_weighted=True,
unit=self.info["unit_pressure"] / self.info["unit_density"],
),
"Ax mass-weighted averaged azimuthal temperature",
"/maps",
unit=self.info["unit_pressure"] / self.info["unit_density"],
),
"omega_mwavg": Rule(
partial(self._omega_average),
"Ax mass-weighted averaged azimuthal of angular frequency",
"/maps",
unit=self.info["unit_velocity"]
/ (self.info["unit_length"] / self.lbox),
),
"alpha_disk": Rule(
self._alpha_disk,
"Map of the Shakura&Sunaev alpha parameter for disks",
"/maps",
unit=U.none,
dependencies=[
"avg_map_slice_rho_avg",
"avg_map_T_mwavg",
"avg_map_slice_velr",
"avg_map_slice_velphi",
],
),
"alpha_grav": Rule(
self._alpha_grav,
"Map of the graviational contrib to\
Shakura&Sunaev alpha parameter for disks",
"/maps",
unit=U.none,
dependencies=["avg_map_coldens", "avg_map_T_mwavg"],
),
"T": Rule(
self._temperature,
"Temperature slice",
"/maps",
dependencies=["slice_rho"],
unit=self.info["unit_pressure"] / self.info["unit_density"],
),
"levels": Rule(self._levels, "Max level within line of sight", "/maps"),
"jeans": Rule(
self._jeans,
"Jeans length slice",
"/maps",
dependencies=["slice_rho", "T"],
),
"jeans_ratio": Rule(
self._jeans_ratio,
"Jeans' length divided by the max resolution",
"/maps",
dependencies=["jeans", "levels"],
),
"Q": Rule(
self._toomreQ_disk,
"Toomre Q parameter for a Keplerian disk",
"/maps",
dependencies=["coldens", "T_mwavg", "omega_mwavg"],
),
"load_sinks_rule": Rule(
self.load_sinks,
group="/datasets",
unit=self.unit_sinks,
),
"pspec": Rule(self.pspec, "Power spectrum", "/hdf5"),
"write_particles": Rule(self._write_particles, "Particles file", "/hdf5"),
"write_cells": Rule(self._write_cells, "Cells file", "/hdf5"),
"filaments": Rule(
self._filaments,
"Filaments",
"/datasets",
dependencies={self.params.filaments.datamap: "z"},
),
"filaments_forces": Rule(
self._filaments_forces,
"Filaments",
"/datasets",
dependencies={
"filaments": None,
"slice_velphi": "z",
"slice_gr": "z",
"slice_P": "z",
"slice_rho": "z",
"slice_velr": "z",
},
),
# Helpers
"radial_bins": Rule(
self._radial_bins,
"Radial bins",
"/radial",
unit=self.info["unit_length"] / self.lbox,
),
"radial_centers": Rule(
self._radial_centers,
"Centers of radial bins",
"/radial",
dependencies=["radial_bins"],
unit=self.info["unit_length"] / self.lbox,
),
"rr": Rule(
self._rr,
"Coordinate map",
"/maps",
unit=self.info["unit_length"] / self.lbox,
),
"bins_on_map": Rule(
self._bins_on_map,
"Convert map coordinates to bins",
"/maps",
dependencies=["radial_bins", "rr"],
),
"B_int": Rule(
self._B_int,
"Magnetic intensity slice",
"/maps",
dependencies=["slice_rho"],
unit=self.info["unit_mag"],
),
# PDF
"rho_pdf": Rule(
partial(self.vol_pdf, partial(simple_getter, "rho"), logbins=True),
"Global rho-PDF",
"/hist",
unit=self.info["unit_density"],
),
"rho_pdf_mw": Rule(
partial(
self.vol_pdf,
partial(simple_getter, "rho"),
weight_func=mass_func,
logbins=True,
),
"Global rho-PDF",
"/hist",
unit=self.info["unit_density"],
),
"T_pdf": Rule(
partial(self.vol_pdf, getter_T, logbins=True),
"Global T-PDF",
"/hist",
unit=self.info["unit_pressure"] / self.info["unit_density"],
),
"cos_pdf": Rule(
partial(self.cos_vfluct_B),
"Global cos fluctuation-PDF",
"/hist",
dependencies=["mwa_speed"],
unit=U.none,
),
"Brho": Rule(
self._Brho,
"Average of B as a function of rho",
"/datasets",
unit={"rho": self.info["unit_density"], "B": self.info["unit_mag"]},
),
"Ek_Eb_rho": Rule(
self._Ek_Eb_rho,
"Average of Ek/Eb as a function of rho",
"/datasets",
dependencies=["mwa_speed"],
unit={"rho": self.info["unit_density"], "Ek_Eb_rho": U.none},
),
# Profiles
"axis": Rule(
partial(self._get_axis),
"Axis",
"/profile",
unit=self.info["unit_length"],
),
"rho_prof": Rule(
partial(self._plane_avg_uniform, partial(simple_getter, "rho")),
"Rho profile",
"/profile",
unit=self.info["unit_density"],
dependencies=["axis"],
),
# globals
"time_num": Rule(
lambda: self.info["time"],
"Time",
"/globals",
unit=self.info["unit_time"],
),
"mass": Rule(
partial(self.sum, mass_func, mass_weighted=False),
"Total mass",
"/globals",
unit=self.info["unit_density"] * self.info["unit_length"] ** 3,
),
"mwa_speed": Rule(
partial(self.vol_avg, partial(simple_getter, "vel")),
"Mass weighted speed average",
"/globals",
unit=self.info["unit_velocity"],
),
"mwa_sigma": Rule(
self._mwa_sigma,
"Mass weighted speed average",
"/globals",
dependencies={"mwa_speed": None},
unit=self.info["unit_velocity"],
),
"mwa_B_int": Rule(
partial(self.vol_avg, getter_B_int),
"Mass weighted Magnetic intensity average",
"/globals",
unit=self.info["unit_mag"],
),
}
averageable_maps = [
"coldens",
"Q",
"T",
"T_mwavg",
"alpha_disk",
"alpha_grav",
]
# Generic rules directly from Ramses fields
for field in self.params.pymses.variables:
def generic_rule(name, getter, unit, oct_getter=None):
if oct_getter is None:
oct_getter = getter
self.rules["slice_" + name] = Rule(
partial(self.slice, getter, z=0.0, unit=unit),
"{} slice".format(name),
"/maps",
unit=unit,
)
self.rules[name + "_mwavg"] = Rule(
partial(self.ax_avg, oct_getter, mass_weighted=True, unit=unit),
"Ax mass-weighted averaged {}".format(name),
"/maps",
unit=unit,
)
self.rules[name + "_avg"] = Rule(
partial(self.ax_avg, oct_getter, mass_weighted=False, unit=unit),
"Ax averaged {}".format(name),
"/maps",
unit=unit,
)
averageable_maps.append("slice_" + name)
averageable_maps.append(name + "_mwavg")
averageable_maps.append(name + "_avg")
# special for vectors
if field in ["g", "vel", "Bl", "Br"]:
# Components
for i, dir in enumerate(["x", "y", "z"]):
generic_rule(
field + dir,
partial(vect_getter, field, i),
self.unit_key[field],
oct_getter=partial(oct_vect_getter, field, i),
)
# Radial
generic_rule(
field + "r",
partial(self.getter_vect_r, name_vect=field),
self.unit_key[field],
oct_getter=partial(self.oct_getter_vect_r, name_vect=field),
)
# Othoradial
generic_rule(
field + "phi",
partial(self.getter_vect_phi, name_vect=field),
self.unit_key[field],
oct_getter=partial(self.oct_getter_vect_phi, name_vect=field),
)
# Norm
generic_rule(
field + "_norm",
partial(norm_getter, field),
self.unit_key[field],
oct_getter=partial(oct_norm_getter, field),
)
else:
generic_rule(field, partial(simple_getter, field), self.unit_key[field])
# radial average and other
for name in averageable_maps:
unit = self.rules[name].unit
self.rules["rad_avg_" + name] = Rule(
partial(self._rad_avg, name),
"Azimuthal average of {}".format(name),
"/radial",
dependencies=["radial_centers", "bins_on_map", name],
unit=unit,
)
self.rules["rad_mwavg_" + name] = Rule(
partial(self._rad_avg, name, mass_weighted=True),
"Mass weighted azimuthal average of {}".format(name),
"/radial",
dependencies=["coldens", "radial_centers", "bins_on_map", name],
unit=unit,
)
self.rules["rad_avg_map_" + name] = Rule(
partial(self._rad_avg_map, name),
"Interpolated map of azimuthal average of {}".format(name),
"/maps",
dependencies=["radial_centers", "bins_on_map", "rad_avg_" + name],
unit=unit,
)
self.rules["rad_mwavg_map_" + name] = Rule(
partial(self._rad_avg_map, name, mass_weighted=True),
"Interpolated map of azimuthal average of {}".format(name),
"/maps",
dependencies=["radial_centers", "bins_on_map", "rad_mwavg_" + name],
unit=unit,
)
self.rules["fluct_" + name] = Rule(
partial(self._fluct_map, name),
"Fluctuation wrt to average of {}".format(name),
"/maps",
dependencies=[name, "rad_avg_map_" + name],
)
self.rules["dispersion_rad_" + name] = Rule(
partial(self._dispersion_rad, name),
"radial RMS of {}".format(name),
"/radial",
dependencies=[name, "rad_avg_map_" + name],
unit=unit,
)
self.rules["mwfluct_" + name] = Rule(
partial(self._fluct_map, name, mass_weigthed=True),
"Fluctuation wrt to mass weighted average of {}".format(name),
"/maps",
dependencies=[name, "rad_mwavg_map_" + name],
)
self.rules["fluct_pdf_" + name] = Rule(
partial(self._rad_fluct_pdf, name),
"Probability density function of {} fluctuations".format(name),
"/hist",
dependencies=["rr", "fluct_" + name],
)
self.rules["fluct_mwpdf_" + name] = Rule(
partial(self._rad_fluct_pdf, name, mass_weigthed=True),
"Probability density function of {} mass weighted fluctuations".format(
name
),
"/hist",
dependencies=["rr", "mwfluct_" + name],
)
self.rules["fit_fluct_pdf_" + name] = Rule(
partial(self._fit_pdf, name),
"Fit the PDF of {} fluctuations".format(name),
"/hist",
dependencies=["fluct_pdf_" + name],
)
for name_bin in averageable_maps:
unit_bin = self.rules[name_bin].unit
if name_bin is not name:
self.rules["mbb_" + name + "_" + name_bin] = Rule(
partial(self.mean_by_bins, name_bin, name),
"Mean of {} by bins of {}".format(name, name_bin),
"/hist",
dependencies=[name, name_bin],
unit=[unit, unit_bin],
)
self._gen_rule_transform("fluct_coldens", np.nanmax, "max", group="/globals")
super(SnapshotProcessor, self).def_rules()
def get_time(path, num):
"""
Return the time of the output (code units)
Parameters
----------
num output number
path_out path of the pipeline output
Returns
-------
time the time of the output (code units)
"""
try:
f = open(
path
+ "/output_"
+ str(num).zfill(5)
+ "/info_"
+ str(num).zfill(5)
+ ".txt"
)
for line in f:
ls = line.split()
if len(ls) > 1 and ls[0] == "time":
time = float(ls[2])
break
f.close()
return time
except IOError as e:
print(e)
return np.nan