Files
pipeline/plotter.py
T
2020-12-14 16:51:00 +01:00

1209 lines
40 KiB
Python

# coding: utf-8
"""
This software is a helper to use pysmes tools to read and analyse RAMSES Outputs.
It's a rule based interface.
This is the plotter module.
@author Noé Brucy 2019-2020
"""
import sys
import os
from functools import partial
import tables
import numpy as np
from scipy.stats import linregress
from numpy.polynomial.polynomial import polyfit
from scipy.ndimage.filters import gaussian_filter1d
from scipy import optimize
import matplotlib as mpl
if os.environ.get("DISPLAY", "") == "":
print("No display found. Using non-interactive Agg backend")
mpl.use("Agg")
import pylab as P
from comparator import *
import pspec_read
P.rcParams["image.cmap"] = "plasma"
P.rcParams["savefig.dpi"] = 400
tex_params = {"text.latex.preamble": [r"\usepackage{amsmath}"]}
P.rcParams.update(tex_params)
class PlotRule(Rule):
"""
The rule class, speficic to plot.
Add an extra method, plot, that take the reference to an open hdf5 file (from pytables)
"""
def plot(self, save, arg, **kwargs):
"""
Set the plotter's storage to 'save' and exetute the rule
Parameters
----------
save : opended pytables hdf5 file, where to find the data
arg : main argument of the plotting function
kargs : optional keyword arguments to the plotting function
"""
self.postproc.save = save
return self.process_fn(arg, **kwargs)
class Plotter(Aggregator, BaseProcessor):
"""
This class loads derived quantities and plot them
"""
solve_self_dep = False
# Axes information
_ax_nb = {"x": 0, "y": 1, "z": 2} # Number of each axes
_axes_h = {"x": "y", "y": "x", "z": "x"} # Associated horizontal axe
_axes_v = {"x": "z", "y": "z", "z": "y"} # Associated vertical axe
_ax_title = {"x": r"$x$", "y": r"$y$", "z": r"$z$"}
G = 1.0 # Gravitational constant
# Conversion table from namelist keys (from amses config file) into LaTex strings
label_convert = {
"turb_rms": "$f_{rms}$",
"beta": "$\\beta$",
"beta_cool": "$\\beta_{c}$",
"dens0": "$n_0$",
"coldens0": "$\Sigma_0$",
"sfr_avg_window": "window",
"comp_frac": "$\\zeta$",
}
# Conversion table from namelist values (from amses config file) into LaTex strings
value_convert = {
"sfr_avg_window": lambda x: "${:g}$ Myr".format(80 * x),
"comp_frac": lambda x: "${:g}$".format(1 - x),
}
def __init__(
self,
path,
in_runs=None,
in_nums=None,
path_out=None,
pp_params=None,
selector=None,
tag=None,
**kwargs
):
"""
Create a new Plotter instance. Will select run and outputs via a RunSelector object.
Parameters
----------
path : path to the main folder of the simulations (ex '~/simus/myproject')
in_runs : list of the runs to consider (ex ['run1', 'run2'])
in_nums : list or dict of the outputs numbers to consider (ex [3, 5] or {'run1' : [3, 5], 'run2' : [4, 6])
path_out : Path where the plot will be saved. By default set to `path`
pp_params : Parameters for postprocessing. See pp_params module.
selector : Existing instance of RunSelector, that selects runs and outputs. If set, in_runs and in_nums will be ignored
tag : string to add in the output and data files.
kwargs : Keyword arguments for RunSelector.
"""
super(Plotter, self).__init__(path, path_out, pp_params, tag)
# Select runs
if selector is None:
selector = RunSelector(path, in_runs, in_nums, self.pp_params, **kwargs)
# Save infos
self.path = path
self.runs = selector.runs
self.nums = selector.nums
# Get comparator object
self.comp = Comparator(
path, self.runs, self.nums, path_out, self.pp_params, selector=selector
)
# Get postprocesor objets for each run
self.pp = self.comp.pp
# Define log prefix
self.log_id = "[plot {}] ".format(self.pp_params.out.tag)
# Define rules
self.def_rules()
self.save = None
def _not_self_dep(self, name, dep, dep_arg, overwrite, **kwargs):
""""""
if dep in self.comp.rules:
done = self.comp.process(dep, dep_arg, overwrite, overwrite)
self.just_done.extend(done)
else:
super(Plotter, self)._not_self_dep(name, dep, dep_arg, overwrite, **kwargs)
def _needs_computation(self, overwrite, plot_filename):
return (
self.pp_params.out.interactive
or overwrite
or not os.path.exists(plot_filename)
)
def _process_rule(
self,
name,
rule,
arg,
overwrite=False,
ax=None,
movie=False,
from_cells=False,
**kwargs
):
if not arg is None:
name_full = name + "_" + str(arg)
else:
name_full = name
if rule.is_valid(arg):
if rule.kind == "classic" or rule.kind == "runs":
try:
runs = kwargs.pop("runs")
if isinstance(runs, RunSelector):
runs = runs.runs
except KeyError:
runs = self.runs
i = 0
for run in runs:
files = []
if rule.kind == "classic":
nums = self.nums[run]
else:
nums = [None]
for num in nums:
plot_filename = self._find_filename(name_full, run, num)
if from_cells or rule.kind == "cells":
if not os.exists(self.pp[run][num].cells_filename):
self.pp[run][num].load_cells()
self.pp[run][num].unload_cells()
save = tables.open_file(self.pp[run][num].cells_filename)
elif rule.kind == "classic":
save = tables.open_file(self.pp[run][num].filename)
else:
save = tables.open_file(self.comp.filename, "r")
try:
self._plot_rule(
rule,
save,
arg,
plot_filename,
overwrite,
ax=ax[i],
run=run,
**kwargs
)
except TypeError as e:
if str(e) in [
"'LocatableAxes' object does not support indexing",
"'AxesSubplot' object does not support indexing",
]:
self._plot_rule(
rule,
save,
arg,
plot_filename,
overwrite,
ax=ax,
run=run,
**kwargs
)
elif ax is None:
fig = P.figure()
self._plot_rule(
rule,
save,
arg,
plot_filename,
overwrite,
ax=P.gca(),
run=run,
**kwargs
)
else:
raise
finally:
save.close()
i = i + 1
files.append(plot_filename)
else:
if ax is None:
ax = P.gca()
if rule.kind == "series" and len(self.runs) == 1:
run = self.runs[0]
plot_filename = self._find_filename(name_full, run)
else:
plot_filename = self._find_filename(name_full)
save = tables.open_file(self.comp.filename, "r")
try:
self._plot_rule(
rule, save, arg, plot_filename, overwrite, ax, **kwargs
)
finally:
save.close()
else:
self._log("{} is not valid in this context".format(name_full), "ERROR")
def _plot_rule(self, rule, save, arg, plot_filename, overwrite, ax, **kwargs):
P.sca(ax)
if self._needs_computation(overwrite, plot_filename):
rule.plot(save, arg, **kwargs)
P.tight_layout(pad=1)
if not self.pp_params.out.interactive:
P.savefig(plot_filename)
P.close()
self._log("{} plotted".format(plot_filename), "SUCCESS")
else:
self._log(
"{} plotted".format(os.path.basename(plot_filename)), "SUCCESS"
)
else:
self._log("Plot {} is already done, skipping...".format(plot_filename))
def _find_filename(self, name_full, run=None, num=None, fmt=None):
tag_name = self.pp_params.out.tag
if fmt is None and self.pp_params.out.fmt == "":
if not self.pp_params.out.tag == "":
tag_name = "_" + tag_name
if not run is None and not num is None:
fmt = "{out}/{run}/{name}{tag}_{run}_{num:05}{ext}"
elif not run is None:
fmt = "{out}/{run}/{name}{tag}_{run}{ext}"
else:
fmt = "{out}/{name}{tag}{ext}"
elif fmt is None:
fmt = self.pp_params.out.fmt
nml = None
if not run is None:
nml = self.comp.namelist[run]
return fmt.format(
run=run,
name=name_full,
tag=tag_name,
num=num,
nml=nml,
out=self.path_out,
ext=self.pp_params.out.ext,
)
def _label_run(self, run, node, label, nml_key):
def get_label_nml(nml_key):
prop_name = os.path.basename(nml_key)
if prop_name in self.label_convert:
prop_label = self.label_convert[prop_name]
else:
prop_label = prop_name
prop_value = self.comp.get_nml(nml_key, run)
if prop_name in self.value_convert:
prop_value_str = self.value_convert[prop_name](prop_value)
elif type(prop_value) in [int, float]:
prop_value_str = convert_exp(prop_value, digits=5)
else:
prop_value_str = str(prop_value)
return r"{} = {}".format(prop_label, prop_value_str)
if nml_key is None and label is None:
if (
"attrs" in self.save.root._v_attrs
and run in self.save.root._v_attrs.attrs
):
label_run = r"{}".format(self.save.root._v_attrs.attrs[run].label)
else:
label_run = run
elif not nml_key is None:
if not type(nml_key) == list:
nml_key = [nml_key]
label_run = ", ".join(map(get_label_nml, nml_key))
if not label is None:
label_run = label + " (" + label_run + ")"
else:
label_run = label
return label_run
def _ax_label_unit(self, node, label, unit, unit_coeff):
if label is None:
if "label" in node._v_attrs:
label = node._v_attrs.label
elif node._v_name in self.label_convert:
label = self.label_convert[node._v_name]
elif not node._v_title == "":
label = node._v_title
else:
label = node._v_name
if "unit" in node._v_attrs:
unit_old = node._v_attrs.unit
else:
unit_old = cst.none
if unit is None:
unit = unit_old
if not unit_coeff == 1:
base = unit
unit = unit_coeff * unit
label = label + unit_str(unit, base=base)
else:
label = label + unit_str(unit)
return label, unit_old, unit
def _plot_map(
self,
name,
ax_los,
run,
label=None,
unit=None,
unit_coeff=1.0,
overlays=[],
overlays_kwargs=[],
title=None,
put_title=True,
nml_key=None,
put_time=True,
time_unit=cst.Myr,
unit_space=cst.pc,
cmap="plasma",
norm="log",
put_cbar=True,
autoscale=True,
**kwargs
):
ax_h = self._axes_h[ax_los]
ax_v = self._axes_v[ax_los]
im_extent = self.save.root.maps._v_attrs.im_extent
unit_length = self.save.root._v_attrs["unit_length"]
im_extent = np.array(im_extent) * unit_length.express(unit_space)
node = self.save.get_node("/maps/{}_{}".format(name, ax_los))
dmap = node.read()
label, unit_old, unit = self._ax_label_unit(node, label, unit, unit_coeff)
dmap = dmap * unit_old.express(unit)
if norm == "log":
norm = mpl.colors.LogNorm()
elif norm == "linear":
norm = mpl.colors.NoNorm()
if autoscale and not norm is None:
norm.autoscale(dmap)
im = P.imshow(
dmap, extent=im_extent, origin="lower", norm=norm, cmap=cmap, **kwargs
)
P.locator_params(axis=ax_h, nbins=self.pp_params.plot.ntick)
P.locator_params(axis=ax_v, nbins=self.pp_params.plot.ntick)
P.xlabel(self._ax_title[ax_h] + unit_str(unit_space))
P.ylabel(self._ax_title[ax_v] + unit_str(unit_space))
try:
cbar = P.colorbar(im, cax=P.gca().cax)
except AttributeError:
cbar = P.colorbar()
if not label is None:
cbar.set_label(label)
if put_title:
title = self._label_run(run, node, title, nml_key)
if put_time:
time = self.save.root._v_attrs.time * self.comp.info["unit_time"]
time_str = self.pp_params.plot.time_fmt.format(
time.express(time_unit), time_unit.latex
)
if len(title) > 0:
title = title + " | " + time_str
else:
title = time_str
P.title(title)
for i, plot_overlay in enumerate(overlays):
try:
plot_overlay(ax_los, **overlays_kwargs[i])
except:
plot_overlay(ax_los)
def _overlay_levels(self, ax_los):
map_level = self.save.get_node("/maps/{}_{}".format("levels", ax_los)).read()
# Computing linewidths
levels_ar = np.arange(np.min(map_level), np.max(map_level) + 1)
lw = np.ones(levels_ar.size) * 2
lvl_th = 8 # Level threeshold for reducing linewidths
lw[levels_ar >= lvl_th] = lw[levels_ar >= lvl_th] ** (
lvl_th - levels_ar[levels_ar >= lvl_th]
)
lw[levels_ar < lvl_th] = 1.0
cont = P.contour(
map_level,
extent=self.save.root.maps._v_attrs.im_extent,
origin="lower",
colors="grey",
linewidths=lw,
levels=levels_ar,
)
cont.levels = cont.levels + 1
P.clabel(cont, cont.levels[cont.levels < 11], inline=1, fontsize=8.0, fmt="%1d")
def _overlay_speed(self, ax_los, unit=cst.km_s, unit_coeff=1.0, key_v=None):
ax_h = self._axes_h[ax_los]
ax_v = self._axes_v[ax_los]
dmap_vh_node = self.save.get_node("/maps/speed_h_{}".format(ax_los))
dmap_vh = dmap_vh_node.read()
dmap_vv = self.save.get_node("/maps/speed_v_{}".format(ax_los)).read()
vel_red = self.pp_params.plot.vel_red
radius = self.save.root.maps._v_attrs.radius
center = self.save.root.maps._v_attrs.center
lbox = self.save.root._v_attrs.lbox
map_vh_red = dmap_vh[::vel_red, ::vel_red] # take only a subset of velocities
map_vv_red = dmap_vv[::vel_red, ::vel_red]
nh = map_vh_red.shape[0]
nv = map_vv_red.shape[1]
vec_h = (
np.arange(nh) * 2.0 / nh * radius - radius + center[0] + radius / nh
) * lbox
vec_v = (
np.arange(nv) * 2.0 / nv * radius - radius + center[1] + radius / nv
) * lbox
hh, vv = np.meshgrid(vec_h, vec_v)
norm_v = np.sqrt(map_vh_red ** 2 + map_vv_red ** 2)
max_v = np.max(norm_v)
min_v = np.min(norm_v)
Q = P.quiver(hh, vv, map_vh_red, map_vv_red, units="width", color="grey")
label, unit_old, unit = self._ax_label_unit(dmap_vh_node, "", unit, unit_coeff)
if key_v is None:
key_v = (max_v + min_v) / 2.0
P.quiverkey(
Q,
0.6,
0.98,
key_v,
r"${:g}$".format(key_v) + label,
labelpos="E",
coordinates="figure",
)
def _overlay_B(self, ax_los):
ax_h = self._axes_h[ax_los]
ax_v = self._axes_v[ax_los]
dmap_Bh_node = self.save.get_node("/maps/B_h_{}".format(ax_los))
dmap_Bh = dmap_Bh_node.read()
dmap_Bv = self.save.get_node("/maps/B_v_{}".format(ax_los)).read()
vel_red = self.pp_params.plot.vel_red
radius = self.save.root.maps._v_attrs.radius
center = self.save.root.maps._v_attrs.center
lbox = self.save.root._v_attrs.lbox
map_Bh_red = dmap_Bh[::vel_red, ::vel_red] # take only a subset of velocities
map_Bv_red = dmap_Bv[::vel_red, ::vel_red]
nh = map_Bh_red.shape[0]
nv = map_Bv_red.shape[1]
vec_h = (
np.arange(nh) * 2.0 / nh * radius - radius + center[0] + radius / nh
) * lbox
vec_v = (
np.arange(nv) * 2.0 / nv * radius - radius + center[1] + radius / nv
) * lbox
hh, vv = np.meshgrid(vec_h, vec_v)
P.streamplot(hh, vv, map_Bh_red, map_Bv_red)
def _plot_radial(self, name, ax_los, label=None, xlog=False, ylog=False):
radial_bins = self.save.get_node("/radial/radial_bins_" + ax_los).read()
bin_centers = 0.5 * (radial_bins[1:] + radial_bins[:-1])
mean_bin = self.save.get_node("/radial/{}_{}".format(name, ax_los)).read()
P.grid()
P.xlabel(r"$r$")
if xlog:
P.xscale("log")
if ylog:
P.yscale("log")
P.plot(bin_centers, mean_bin)
if not label is None:
P.ylabel(label)
def _plot_hist(
self,
name,
ax_los,
run,
group="/hist/",
label=None,
unit=None,
unit_coeff=1.0,
title=None,
nml_key=None,
put_time=True,
time_unit=cst.Myr,
xlog=None,
ylog=False,
kind="bar",
ylabel="$\mathcal{P}$",
color=None,
colors=None,
nml_color=None,
fit=None,
fitlabel=None,
**kwargs
):
if not ax_los is None:
name = name + "_" + ax_los
node = self.save.get_node(group + name)
if xlog is None:
try:
xlog = node._v_attrs_.logbins
except:
xlog = False
label, unit_old, unit = self._ax_label_unit(node, label, unit, unit_coeff)
if "mean" in node:
index = node["runs"].read().index(run)
values, centers = node["mean"].read()[index]
else:
values, centers = node.read()
if xlog:
centers = centers + np.log10(unit_old.express(unit))
else:
centers = centers * unit_old.express(unit)
title = self._label_run(run, node, title, nml_key)
if put_time:
time = self.save.root._v_attrs.time * self.comp.info["unit_time"]
time_str = self.pp_params.plot.time_fmt.format(
time.express(time_unit), time_unit.latex
)
if len(title) > 0:
title = title + " | " + time_str
else:
title = time_str
P.title(title)
if color is None and not colors is None:
if nml_color is None:
color = colors[run]
else:
nml = self.comp.get_nml(nml_color, run)
try:
color = colors[nml]
except:
color = colors(nml)
if kind == "bar":
width = centers[1] - centers[0]
P.bar(centers, values, width, log=ylog, color=color, label=title, **kwargs)
elif kind == "step":
if ylog:
P.yscale("log")
P.step(centers, values, where="mid", color=color, label=title, **kwargs)
else:
raise ValueError("kind must be 'bar' or 'step'")
P.grid()
if not label is None:
P.xlabel(label)
if not ylabel is None:
P.ylabel(ylabel)
if not ax_los is None and "/hist/fit_" + name + "_" + ax_los in self.save:
slope = node.attrs.slope
origin = node.attrs.origin
P.plot(
centers,
10 ** (slope * centers + origin),
"--",
linewidth=2,
color="orange",
)
P.ylim([None, 1.0])
if not fit is None:
self._overlay_fit(
centers, values, kind=fit, ls="--", lw=1.5, label=fitlabel
)
def _plot(
self,
name_x,
name_y,
node_arg=None,
xlabel=None,
ylabel=None,
label=None,
xunit=None,
yunit=None,
xunit_coeff=1.0,
yunit_coeff=1.0,
ylog=False,
fit=None,
fitlabel=None,
smooth=0,
nml_key=None,
run=None,
runs=None,
yerr_kind="std",
sigma_err=2.0,
grid=True,
put_time=False,
time_unit=cst.Myr,
colors=None,
nml_color=None,
legend=None,
subname_x=None,
subname_y=None,
**kwargs
):
if not node_arg is None:
name_x, name_y = name_x + "_" + node_arg, name_y + "_" + node_arg
node_x = self.save.get_node(name_x)
node_y = self.save.get_node(name_y)
if subname_x:
hdf5_x = tables.open_file(node_x.read())
node_x = hdf5_x.get_node(subname_x)
if subname_y:
hdf5_y = tables.open_file(node_y.read())
node_y = hdf5_y.get_node(subname_y)
xlabel, xunit_old, xunit = self._ax_label_unit(
node_x, xlabel, xunit, xunit_coeff
)
ylabel, yunit_old, yunit = self._ax_label_unit(
node_y, ylabel, yunit, yunit_coeff
)
P.xlabel(xlabel)
P.ylabel(ylabel)
if grid:
P.grid()
if ylog:
P.yscale("log")
if put_time:
time = self.save.root._v_attrs.time * self.comp.info["unit_time"]
time_str = self.pp_params.plot.time_fmt.format(
time.express(time_unit), time_unit.latex
)
if len(label) > 0:
label = label + " | " + time_str
else:
label = time_str
yerr = None
if node_y._v_attrs.CLASS == "ARRAY":
x = node_x.read() * xunit_old.express(xunit)
y = node_y.read() * yunit_old.express(yunit)
mask = np.isfinite(x) & np.isfinite(y)
x, y = x[mask], y[mask]
if smooth > 0:
y = gaussian_filter1d(y, sigma=smooth)
if not run is None:
label = self._label_run(run, node_y, label, nml_key)
if colors is None:
(base_line,) = P.plot(x, y, label=label, **kwargs)
else:
if nml_color is None:
color = colors[run]
elif nml_color == "time":
time = (
self.save.root._v_attrs.time * self.comp.info["unit_time"]
).express(time_unit)
color = colors(time)
else:
nml = self.comp.get_nml(nml_color, run)
try:
color = colors[nml]
except:
color = colors(nml)
(base_line,) = P.plot(x, y, label=label, color=color, **kwargs)
elif "mean" in node_y:
x = node_x.read() * xunit_old.express(xunit)
y = node_y.mean.read() * yunit_old.express(yunit)
if yerr_kind == "std":
std = node_y.std.read() * yunit_old.express(yunit)
yerr_min = y - sigma_err * std
yerr_max = y + sigma_err * std
elif yerr_kind == "min_max":
yerr_min = node_y.min.read() * yunit_old.express(yunit)
yerr_max = node_y.max.read() * yunit_old.express(yunit)
elif yerr_kind == "95per":
yerr_min = node_y.q025.read() * yunit_old.express(yunit)
yerr_max = node_y.q975.read() * yunit_old.express(yunit)
elif yerr_kind == "68per":
yerr_min = node_y.q16.read() * yunit_old.express(yunit)
yerr_max = node_y.q84.read() * yunit_old.express(yunit)
else:
yerr_min = y
yerr_max = y
yerr = yerr_max - yerr_min
mask = np.isfinite(x) & np.isfinite(y) & np.isfinite(yerr)
x, y, yerr, yerr_min, yerr_max = (
x[mask],
y[mask],
yerr[mask],
yerr_min[mask],
yerr_max[mask],
)
if not run is None:
label = self._label_run(run, node_y, label, nml_key)
base_line, _, _ = P.errorbar(
x, y, yerr=[y - yerr_min, yerr_max - y], label=label, **kwargs
)
else:
if runs is None:
runs = self.runs
for i, run in enumerate(runs):
x_run, y_run = node_x[run], node_y[run]
x = x_run.read() * xunit_old.express(xunit)
y = y_run.read() * yunit_old.express(yunit)
mask = np.isfinite(x) & np.isfinite(y)
x, y = x[mask], y[mask]
if smooth > 0:
y = gaussian_filter1d(y, sigma=smooth)
label_run = self._label_run(run, y_run, label, nml_key)
if colors is None:
(base_line,) = P.plot(x, y, label=label_run, **kwargs)
else:
if nml_color is None:
color = colors[i % len(colors)]
else:
nml = self.comp.get_nml(nml_color, run)
try:
color = colors[nml]
except:
color = colors(nml)
(base_line,) = P.plot(x, y, label=label_run, color=color, **kwargs)
if legend is None:
legend = True
if legend:
P.legend()
if not fit is None:
self._overlay_fit(
x,
y,
yerr,
kind=fit,
ls="--",
lw=1.5,
color=base_line.get_color(),
label=fitlabel,
)
if subname_x:
hdf5_x.close()
if subname_y:
hdf5_y.close()
def _pspec(self, name, **kwargs):
del kwargs["run"]
file_pspec = self.save.get_node("/hdf5/pspec").read()
num = self.save.root._v_attrs.num
getattr(pspec_read, "pspec_" + name)(file_pspec, ".", num, **kwargs)
def _overlay_fit(self, x, y, yerr=None, kind="linear", label=None, **kwargs):
if kind == "linear":
if yerr is None:
(a, b, rho, _map_rule, stderr) = linregress(x, y)
self._log(
"Linear fit y = {} x + {} with R^2 = {} and error is {}".format(
a, b, rho, stderr
)
)
if label is None:
label = r"Linear fit with slope ${:.3g}$ and $R^2 = {:.3f}$".format(
a, rho
)
else:
fit = polyfit(x, y, 1, w=[1.0 / ty for ty in yerr], full=True)
c = fit[0]
residual = fit[1][0][0]
b, a = c[0], c[1]
self._log(
"Linear fit y = {} x + {} with residual {}".format(a, b, residual)
)
if label is None:
label = r"Linear fit with slope ${:.3g}$".format(a)
P.plot(x, a * x + b, label=label, **kwargs)
elif kind == "power_law":
if yerr is None:
(a, b, rho, _map_rule, stderr) = linregress(np.log10(x), np.log10(y))
self._log(
"Power law fit y = x^({}) * {} with R^2 = {} and error is {}".format(
a, 10 ** b, rho, stderr
)
)
else:
fitfunc = lambda p, x: p[0] + p[1] * x
errfunc = lambda p, x, y, err: (y - fitfunc(p, x)) / err
pinit = [1.0, -1.0]
out = optimize.leastsq(
errfunc,
pinit,
args=(np.log10(x), np.log10(y), yerr / y),
full_output=1,
)
c = out[0]
b, a = c[0], c[1]
residual = errfunc(c, np.log10(x), np.log10(y), yerr / y)
self._log(
"Power law fit y = x^({}) * {} with residual {}".format(
a, 10 ** b, residual
)
)
if label is None:
label = r"Power-law fit with index {:.1f}".format(a)
P.plot(x, (10 ** b) * x ** a, label=label, **kwargs)
def overlay_kennicutt(self, n0, step):
P.grid(False)
ylim = P.ylim()
(tmin, tmax) = P.xlim()
tmax = tmax + 20
ymax = P.ylim()[1]
ssfr_sun = 2.5e-9
ssfr_ken = ssfr_sun * n0 ** 1.4
coeff = ssfr_ken * 1e6 * (self.comp.info["unit_length"].express(cst.pc)) ** 2
for i in np.arange(tmin, max(tmax, tmin + ymax / coeff), step):
t = np.linspace(0, tmax, 1000)
P.plot(t + i, t * coeff, ls="--", lw=0.9, color="grey")
P.plot(t + tmin, (t + i - tmin) * coeff, ls="--", lw=0.9, color="grey")
P.xlim(tmin, tmax)
P.ylim(ylim)
def def_rules(self):
self.rules = {
# Generic rules
"plot": PlotRule(
self, lambda arg, **kwargs: self._plot(*arg, **kwargs), kind="comp"
),
"coldens": PlotRule(
self,
partial(self._plot_map, "coldens", label=r"$\Sigma$", unit=cst.coldens),
"Column density map",
dependencies=["coldens"],
),
"rho": PlotRule(
self,
partial(self._plot_map, "rho", label=r"$\rho$", unit=cst.Msun_pc3),
"Density slice at s = 0, with s = x, y or z.",
dependencies=["rho"],
),
"coldens_l": PlotRule(
self,
partial(
self._plot_map,
"coldens",
label=r"$\Sigma$",
unit=cst.coldens,
overlays=[self._overlay_levels],
),
"Column density with level overlay",
dependencies=["coldens", "levels"],
),
"rho_v": PlotRule(
self,
partial(
self._plot_map,
"rho",
label=r"$\rho$",
unit=cst.Msun_pc3,
overlays=[self._overlay_speed],
),
"Density slice with speed overlay",
dependencies=["rho", "speed_h", "speed_v"],
),
"rho_B": PlotRule(
self,
partial(
self._plot_map,
"rho",
label=r"$\rho$",
unit=cst.Msun_pc3,
overlays=[self._overlay_B],
),
"Density slice with magnetic field overlay",
dependencies=["rho", "B_h", "B_v"],
),
"jeans_ratio": PlotRule(
self,
partial(
self._plot_map,
"jeans_ratio",
vmin=0.1,
vmax=100,
cmap="RdBu_r",
overlays=[self._overlay_levels],
),
"Jeans' lenght divided by the max resolution",
dependencies=["jeans_ratio", "levels"],
),
"Q": PlotRule(
self,
partial(
self._plot_map,
"rho",
label=r"$Q$",
vmin=0.01,
vmax=100,
cmap="RdBu_r",
),
"Toomre Q parameter for a Keplerian disk",
dependencies=["Q"],
),
"rho_pdf": PlotRule(
self,
partial(self._plot_hist, "rho_pdf"),
"$\rho$-PDF",
dependencies=["rho_pdf"],
),
"avg_coldens_pdf": PlotRule(
self,
partial(
self._plot_hist,
"avg_time_coldens_pdf_z",
group="/comp/",
xlog=True,
put_time=False,
),
"Column density PDF, averaged in time",
kind="runs",
dependencies={"avg_time_coldens_pdf": "z"},
),
"T_pdf": PlotRule(
self,
partial(self._plot_hist, "T_pdf"),
"T-PDF on a 2D slice",
dependencies=["T_pdf"],
),
"P_pdf": PlotRule(
self,
partial(self._plot_hist, "P_pdf"),
"P-PDF on a 2D slice ",
dependencies=["P_pdf"],
),
"rho_prof": PlotRule(
self,
partial(self._plot, "/profile/axis", "/profile/rho_prof"),
"Density profile",
dependencies=["axis", "rho_prof"],
),
"pspec": PlotRule(self, self._pspec, dependencies={"pspec": None}),
"kappa_beta": PlotRule(
self,
partial(
self._plot,
"/comp/nml_cloud_params/beta_cool",
"/comp/avg_time_pdf_slope_coldens",
),
"Slope of the Sigma-PDF against cooling beta factor",
kind="comp",
dependencies={
"nml": "cloud_params/beta_cool",
"avg_time_pdf_slope_coldens": None,
},
),
"sink_mass": PlotRule(
self,
partial(
self._plot,
"/series/sinks_from_log/time",
"/series/sinks_from_log/mass_sink",
xunit=cst.Myr,
yunit=cst.Msun,
),
"Mass of the sinks against time",
kind="series",
dependencies=["sinks_from_log"],
),
"assfr": PlotRule(
self,
partial(
self._plot,
"/series/sfr_from_log/time",
"/series/sfr_from_log/sfr",
ylabel="Averaged surfacic SFR",
xunit=cst.Myr,
yunit=cst.ssfr,
),
kind="series",
dependencies=["sfr_from_log"],
),
"issfr": PlotRule(
self,
partial(
self._plot,
"/series/sinks_from_log/time",
"/series/sinks_from_log/issfr",
ylabel="Surfacic SFR",
xunit=cst.Myr,
yunit=cst.ssfr,
),
kind="series",
dependencies=["issfr"],
),
"turb_rms": PlotRule(
self,
partial(
self._plot,
"/series/rms_from_log/time",
"/series/rms_from_log/turb_rms",
xunit=cst.Myr,
),
"Turbulent RMS",
kind="series",
dependencies=["rms_from_log"],
),
"turb_energy": PlotRule(
self,
partial(
self._plot,
"/series/rms_from_log/time",
"/series/rms_from_log/turb_energy",
xunit=cst.Myr,
),
"Turbulent energy",
kind="series",
dependencies=["rms_from_log"],
),
"turb_power": PlotRule(
self,
partial(
self._plot,
"/series/rms_from_log/time",
"/series/rms_from_log/turb_power",
xunit=cst.Myr,
),
"Turbulent power",
kind="series",
dependencies=["turb_power"],
),
"sigma": PlotRule(
self,
partial(
self._plot,
"/series/time",
"/series/time_sigma",
ylabel="$\\sigma$",
xunit=cst.Myr,
yunit=cst.km_s,
),
"Velocity dispersion",
kind="series",
dependencies=["time_sigma"],
),
"max_fluct_coldens": PlotRule(
self,
partial(
self._plot,
"/series/time",
"/series/time_max_fluct_coldens_z",
ylabel="$\\max(\Sigma/\overline{\Sigma})$",
xunit=cst.Myr,
),
"Maximal fluctuation of the column density against time",
kind="series",
dependencies={"time_max_fluct_coldens": "z"},
),
}
averageables = ["coldens", "rho", "T", "Q"]
for name in averageables:
self.rules["rad_" + name] = PlotRule(
self,
partial(
self._plot_radial,
"rad_avg_" + name,
label=name,
xlog=True,
ylog=True,
),
"Azimuthal average of {}".format(name),
dependencies=["radial_bins", "rad_avg_" + name],
)
self.rules["fluct_" + name] = PlotRule(
self,
partial(
self._plot_map,
"fluct_" + name,
vmin=0.01,
vmax=100,
cmap="RdBu_r",
label="{}/avg({})".format(name, name),
),
"Fluctuation of {}".format(name),
dependencies=["fluct_" + name],
)
self.rules["pdf_" + name] = PlotRule(
self,
partial(
self._plot_hist,
"pdf_" + name,
ylog=True,
label="{}/avg({})".format(name, name),
),
"Probability density function of {} fluctuations".format(name),
dependencies=["fit_pdf_" + name],
)
super(Plotter, self).def_rules()