Files
pipeline/turbox/turbox.py
T
2022-11-28 18:01:18 +01:00

184 lines
5.5 KiB
Python

"""
Toolbox for the TURBOX project (simulation part).
By Noé Brucy and Corentin Le Yuehlic, 2022
Compute and plot eta_d, from the slope of the logdensity power spectrum
"""
import numpy as np
from plotter import U
from baseprocessor import Rule
from matplotlib import pyplot as plt
import pandas as pd
import tables
from scipy.stats import linregress
def get_pspec(pp, field: str, dim: int = 3):
"""Read power spectruù
Parameters
----------
pp : SnapshotProcessor
field : str
field to get
dim : int, optional
dimension (2 or 3), by default 3
Returns
-------
tupple (np.array, np.array)
wave number and corresponding powers
"""
h5file = tables.File(pp.pspec_filename)
path = f"/out_{pp.num:05}/d{dim}/{field}"
node = h5file.get_node(path)
kbins = node.kbins.read()
pspec = node.pspec.read()
h5file.close()
k = (kbins[:-1] + kbins[1:]) / 2
return k, pspec
span_resolution = {256: (0.8, 1.1), 512: (0.5, 1.7), 1024: (0.4, 1.7)}
def get_pspec_slope(pp, field: str, resol: int, plotdebug: bool = False):
"""Get the slope of the Power specturm using linear regression in the selected range
Parameters
----------
pp : Snapshotprocessor
field : str
field name
resol : int
resolution (number of cells on 1 side of the simulation cube)
Returns
-------
tuple (float, float)
Slope, square value of the correlation coefficient
"""
# Trustworthy span od the power spectrum in log10(k) as a function of the resolution
logkmin, logkmax = span_resolution[resol]
k, power = get_pspec(pp, field)
logk, logpower = np.log10(k), np.log10(power)
mask = (logk >= logkmin) & (logk < logkmax)
results = linregress(logk[mask], logpower[mask])
if plotdebug:
plt.figure()
plt.plot(logk, logpower)
plt.plot(
logk[mask],
results.slope * logk[mask] + results.intercept,
lw=3,
ls=":",
color="k",
)
pp.logger.info(
f"Fit results in get_slope({field}, {resol}): slope:{results.slope:.2f}"
+ f", b:{results.intercept:.2f}, R2:{results.rvalue**2:.2f}"
)
if results.rvalue ** 2 < 0.8:
pp.logger.warning(
f"Bad fit in get_slope({field}, {resol}) with {logkmin} <= logk < {logkmax}"
)
pp.logger.warning(f"log(k) is \n {logk[mask]}")
pp.logger.warning(f"log(power) is \n {logpower[mask]}")
return results.slope, results.intercept, results.rvalue ** 2
def build_suite(pl, redo=False, cs0=0.28834810480560674):
"""Compute an array of parameter for each run in the Plotter pl
Parameters
----------
pl : Plotter
redo : bool, optional
redo the comptutation of the velocity dispersion, by default False
cs0 : float, optional
Sound speed in the simulations, by default 0.28834810480560674
Returns
-------
pd.Dataframe
dataframe with the properties of the simulation
"""
df = {}
df["snapshots"] = pl.nums.values()
df["n0"] = pl.study.get_nml("galbox_params/dens0").values()
df["turbinit"] = pl.study.get_nml("galbox_params/turb").values()
df["solver"] = pl.study.get_nml("hydro_params/riemann").values()
df["slope"] = pl.study.get_nml("hydro_params/slope_type").values()
df["res"] = pl.study.get_nml("amr_params/levelmin").values()
df["frms"] = pl.study.get_nml("turb_params/turb_rms").values()
df["seed"] = pl.study.get_nml("turb_params/turb_seed").values()
df["comp"] = pl.study.get_nml("turb_params/comp_frac").values()
df["L"] = pl.study.get_nml("amr_params/boxlen").values()
df["T_turb"] = np.array(
list(pl.study.get_nml("turb_params/turb_T").values())
) * pl.study.info["unit_time"].express(U.Myr)
df = pd.DataFrame(df, index=pl.runs)
if redo:
pl.study.avg_time_sigma("x", overwrite_dep=False)
pl.study.avg_time_sigma("y", overwrite_dep=False)
pl.study.avg_time_sigma("z", overwrite_dep=False)
pl.study.time(overwrite=True)
for ax in ["x", "y", "z"]:
df[f"sigma_{ax}"] = np.array(
list(
map(
lambda x: x.T[0],
[
pl.study.get_value(f"/series/time_sigma_{ax}", unit=U.km_s)[run]
for run in pl.runs
],
)
)
)
df["sigma_all"] = df["sigma_x"] ** 2 + df["sigma_y"] ** 2 + df["sigma_z"] ** 2
df["sigma_all"] = list(map(np.sqrt, df["sigma_all"].values))
df["Mach_all"] = list(map(lambda v: v / cs0, df["sigma_all"].values))
df["time"] = list(
map(
lambda x: x.T[0],
[pl.study.get_value("/series/time", unit=U.Myr)[run] for run in pl.runs],
)
)
df["sigma"] = list(map(lambda l: np.mean(l), df["sigma_all"].values))
df["Mach"] = df["sigma"] / cs0
df["turnover"] = (df["L"] * U.pc.express(U.km) / (2 * df["sigma"])) * U.s.express(
U.Myr
)
return df
def rho_pdf(pp):
pp.load_cells()
rho = pp.cells["rho"] * pp.info["unit_density"].express(U.H_cc)
rho_0 = np.mean(rho)
print(rho_0)
s = np.log(rho / rho_0)
values, edges = np.histogram(s, bins=300, range=(-15, 11), density=True)
pp.unload_cells()
centers = 0.5 * (edges[1:] + edges[:-1])
return (np.stack([values, centers]), {"logbins": True})
rule_pdf = Rule(rho_pdf, "Density PDF", name="rho_pdf", group="/hist")
def apply_rule_pdf(pp):
return pp.process(rule_pdf, pp, overwrite=True)