Files
galaxy_ninja/galsec.py
Noe Brucy 304a8b9e84 fix
2023-05-08 18:19:13 +02:00

510 lines
17 KiB
Python

# coding: utf-8
"""
Galaxy kiloparsec extractor
"""
import numpy as np
from astropy.table import QTable, hstack
from astropy import units as u
from astropy.units.quantity import Quantity
from scipy.interpolate import griddata
from scipy.fft import fftn
def vect_r(position: np.array, vector: np.array) -> np.array:
"""Radial component of a vector
Parameters
----------
position : np.array (N, 3)
(only reads x and y)
vector : np.array (N, 3)
(only reads x and y components)
Returns
-------
np.array (N)
Radial component
"""
r = position[:, :2]
ur = np.transpose((np.transpose(r) / np.sqrt(np.sum(r**2, axis=1))))
return np.einsum("ij, ij -> i", vector[:, :2], ur)
def vect_phi(position: np.array, vector: np.array):
"""Azimuthal component of a vecto
Parameters
----------
position : np.array (N, 3)
(only reads x and y)
vector : np.array (N, 3)
(only reads x and y components)
Returns
-------
np.array (N)
Azimuthal component
"""
r = position[:, :2]
r_norm = np.sqrt(np.sum(r**2, axis=1))
rot = np.array([[0, -1], [1, 0]])
uphi = np.transpose(np.einsum("ij, kj -> ik", rot, r) / r_norm)
return np.einsum("ij,ij -> i", vector[:, :2], uphi)
def get_sfr(
stars: QTable, time: Quantity[u.Myr], average_on: Quantity[u.Myr] = 30 * u.Myr
) -> Quantity[u.Msun / u.year]:
"""_summary_
Parameters
----------
stars : QTable
_description_
time : Quantity[u.Myr],
time at wich the SFR should be computed
average_on : Quantity, optional
compute the sfr on the last `average_on` years, by default 30*u.Myr
Returns
-------
float
SFR in Msun/yr
"""
try:
mask = (stars["birth_time"] > max(time - average_on, 0 * u.Myr)) & (
stars["birth_time"] < time
)
dtime = min(average_on, time).to(u.year)
if dtime == 0:
sfr = 0.0 * u.Msun / u.year
else:
sfr = np.sum(stars["mass"][mask]) / dtime
except KeyError:
sfr = 0.0 * u.Msun / u.year
return sfr
def aggregate(
grouped_data: QTable,
weight_field: str = "mass",
extensive_fields: str = ["mass", "ek"],
weighted_fields: list = ["velr", "velphi", "velx", "vely", "velz"],
) -> QTable:
"""Aggregate wisely from grouped data
Parameters
----------
grouped_data : QTable
should already have group information
weight_field : str, optional
the field used for the weighting of the non extensive quantities, by default "mass"
extensive_fields : str, optional
these will be summed. Should include weight_field, by default ["mass", "ek"]
weighted_fields : list, optional
the field that will be weighted, by default ["velr", "velphi", "velz"]
Returns
-------
QTable
a tabble with the aggregated value for each group
"""
assert weight_field in extensive_fields
for field in weighted_fields:
grouped_data[field] *= grouped_data[weight_field]
binned_data = grouped_data[extensive_fields + weighted_fields].groups.aggregate(
np.add
)
for field in weighted_fields:
binned_data[field] /= binned_data[weight_field] # weighted mean
# Veocity dispersion
grouped_data[field] /= grouped_data[weight_field]
for i in range(len(grouped_data.groups)):
slice = grouped_data.groups.indices[i], grouped_data.groups.indices[i + 1]
grouped_data[slice[0] : slice[1]][field] -= binned_data[i][field]
grouped_data[field] = grouped_data[weight_field] * grouped_data[field] ** 2
binned_data[f"sigma_{field}"] = grouped_data[field,].groups.aggregate(
np.add
)[field]
binned_data[f"sigma_{field}"] = np.sqrt(
binned_data[f"sigma_{field}"] / binned_data[weight_field]
)
return binned_data
def get_bouncing_box_mask(
data: QTable, r: Quantity[u.kpc], phi: Quantity[u.rad], size: Quantity[u.kpc]
):
x = r * np.cos(phi)
y = r * np.sin(phi)
norm_inf = np.maximum(
np.abs(data["position"][:, 0] - x), np.abs(data["position"][:, 1] - y)
)
mask = (norm_inf < size / 2) & (np.abs(data["position"][:, 2]) < size / 2)
return mask
def regrid(
position: Quantity,
value: Quantity,
resolution: Quantity[u.pc],
):
min_x, max_x = position[:, 0].min(), position[:, 0].max()
min_y, max_y = position[:, 1].min(), position[:, 1].max()
min_z, max_z = position[:, 2].min(), position[:, 2].max()
size = max([max_x - min_x, max_y - min_y, max_z - min_z])
nb_points = int(np.ceil(((size / resolution).to(u.dimensionless_unscaled).value)))
gx, gy, gz = np.mgrid[
min_x.value : max_x.value : nb_points * 1j,
min_y.value : max_y.value : nb_points * 1j,
min_z.value : max_z.value : nb_points * 1j,
]
grid = griddata(
position.value,
value.value,
(gx, gy, gz),
method="nearest",
)
import matplotlib.pyplot as plt
plt.imshow(
grid[:, :, len(grid) // 2].T,
origin="lower",
extent=[min_x.value, max_x.value, min_y.value, max_y.value],
)
plt.show()
return (
grid,
(gx, gy, gz),
)
def fft(
position: Quantity,
value: Quantity,
resolution: Quantity[u.pc],
):
grid, (gx, gy, gz) = regrid(position, value, resolution)
fftn(grid, overwrite_x=True)
class Galsec:
"""
Galactic sector extractor
"""
fluids = ["gas", "stars", "dm"]
units = {
"gas": {
"position": u.kpc,
"volume": u.kpc**3,
"velocity": u.km / u.s,
"mass": u.Msun,
},
"stars": {
"position": u.kpc,
"velocity": u.km / u.s,
"mass": u.Msun,
"birth_time": u.Myr,
},
"dm": {"position": u.kpc, "velocity": u.km / u.s, "mass": u.Msun},
}
def __init__(self, data: dict, copy=True) -> None:
"""Initiazise a Galaxasec object
Parameters
----------
cell_data : dict
A dataset of cells in the following format:
header:
time float [Myr] time since the start of the simulation
fluids str list list of fluids included
gas:
position float array (Ngas, 3) [kpc], centered
volume float array (Ngas) [pc^3]
velocity float array (Ngas, 3) [km/s]
mass float array (Ngas) [Msun]
stars:
position float array (Nstar, 3) [kpc], centered
velocity float array (Nstar, 3) [km/s]
mass float array (Nstar) [Msun]
birth_time float array (Nstar) [Myr]
dm:
position float array (Ngas, 3) [kpc], centered
velocity float array (Ngas, 3) [km/s]
mass float array (Ngas) [Msun]
maps:
extent float list [xmin, xmax, ymin, ymax] Coordinates of the edges of the map, centered
gas_coldens float array (Nx, Ny) [Msun/pc^2] Map of column density
copy : bool, default=True
Wheter the data should be copied.
"""
self.data = {}
if "fluids" in data["header"]:
self.fluids = data["header"]["fluids"]
for fluid in self.fluids:
self.data[fluid] = QTable(data[fluid], units=self.units[fluid], copy=copy)
self.time = data["header"]["time"] * u.Myr
self.compute_derived_values()
def compute_derived_values(self) -> None:
"""
Helper function to computed values derivated from input data
"""
for fluid in self.fluids:
dset = self.data[fluid]
dset["r"] = np.sqrt(
np.sum(dset["position"][:, :2] ** 2, axis=1)
) # galactic radius
dset["phi"] = np.angle(
dset["position"][:, 0] + dset["position"][:, 1] * 1j
) # galactic longitude
dset["phi"][dset["phi"] < 0] += (
2 * np.pi * u.rad
) # rescaling to get only positive values
dset["velphi"] = vect_phi(
dset["position"], dset["velocity"]
) # azimuthal velocity
dset["velr"] = vect_r(dset["position"], dset["velocity"]) # radial velocity
dset["velx"] = dset["velocity"][:, 0] # x velocity (TODO this is stupid)
dset["vely"] = dset["velocity"][:, 1] # y velocity
dset["velz"] = dset["velocity"][:, 2] # vertical velocity
dset["ek"] = (dset["mass"] * np.sum(dset["velocity"] ** 2, axis=1)).to(
u.erg
) # kinetic energy
def ring_binning(self, delta_r: Quantity[u.kpc]):
"""Add radial bin informations to the dataset
Parameters
----------
delta_r : Quantity[u.kpc]
spacing between two radial bins
"""
for fluid in self.fluids:
r_bin = np.trunc(self.data[fluid]["r"] / delta_r)
r_bin = (r_bin + 0.5) * delta_r # Store the middle value
self.data[fluid]["r_bin"] = r_bin
def sector_binning(self, delta_r: Quantity[u.kpc], delta_l: Quantity[u.kpc]):
"""Add sector bin information to the dataset
Parameters
----------
delta_r : Quantity[u.kpc]
spacing between two radial bins
delta_l : Quantity[u.kpc]
spacing (in spatial units) between two azimuthal bins
"""
self.ring_binning(delta_r)
for fluid in self.fluids:
delta_phi = (delta_l / self.data[fluid]["r_bin"]) * u.rad
phi_bin = (np.trunc(self.data[fluid]["phi"] / delta_phi) + 0.5) * delta_phi
self.data[fluid]["phi_bin"] = phi_bin
def cartesian_binning(self, delta_x: Quantity[u.kpc],
delta_y: Quantity[u.kpc],
delta_z: Quantity[u.kpc]):
"""Add cartesian bin information to the dataset
Parameters
----------
delta_x : Quantity[u.kpc]
spacing between two x bins
delta_y : Quantity[u.kpc]
spacing between two y bins
delta_z : Quantity[u.kpc]
spacing between two y bins
"""
for fluid in self.fluids:
for i, (delta, name) in enumerate(zip([delta_x, delta_y, delta_z], ["x", "y", "z"])):
pos = self.data[fluid]["position"][:, i]
minval = np.floor(np.min(pos) / delta)
bin = np.floor((pos - minval) / delta)
# Store the middle value
self.data[fluid][f"{name}_bin"] = (bin + 0.5) * delta + minval
def sector_analysis(
self,
delta_r: Quantity[u.kpc] = u.kpc,
delta_l: Quantity[u.kpc] = u.kpc,
rmin: Quantity[u.kpc] = 1 * u.kpc,
rmax: Quantity[u.kpc] = 12 * u.kpc,
zmin: Quantity[u.kpc] = -0.5 * u.kpc,
zmax: Quantity[u.kpc] = 0.5 * u.kpc,
):
"""Compute the aggration of quantities in sectors bins
Parameters
----------
delta_r : Quantity[u.kpc], optional
spacing between two radial bins, by default u.kpc
delta_l : Quantity[u.kpc], optional
spacing (in spatial units) between two azimuthal bins, by default u.kpc
rmin : Quantity[u.kpc], optional
filter out bin below that radius, by default 1*u.kpc
rmax : Quantity[u.kpc], optional
filter out bin beyond that radius, by default 12*u.kpc
"""
self.sector_binning(delta_r, delta_l)
self.grouped_data = {}
self.sectors = {}
for fluid in self.fluids:
if fluid == "gas":
extensive_fields = ["mass", "ek", "volume"]
else:
extensive_fields = ["mass", "ek"]
filtered_data = self.data[fluid][
(self.data[fluid]["r"] > rmin)
& (self.data[fluid]["r"] < rmax)
& (self.data[fluid]["position"][:, 2] > zmin)
& (self.data[fluid]["position"][:, 2] < zmax)
]
self.grouped_data[fluid] = filtered_data.group_by(["r_bin", "phi_bin"])
self.sectors[fluid] = hstack(
[
self.grouped_data[fluid]["r_bin", "phi_bin"].groups.aggregate(
np.fmin
),
aggregate(
self.grouped_data[fluid], extensive_fields=extensive_fields
),
]
)
self.sectors[fluid].rename_column("r_bin", "r")
self.sectors[fluid].rename_column("phi_bin", "phi")
self.sectors["stars"]["sfr"] = (
np.zeros(len(self.sectors["stars"]["mass"])) * u.Msun / u.year
)
for i, group in enumerate(self.grouped_data["stars"].groups):
self.sectors["stars"]["sfr"][i] = get_sfr(group, self.time)
self.sectors["stars"]["sfr"][i] = get_sfr(group, self.time)
def cartesian_analysis(
self,
delta_x: Quantity[u.kpc] = u.kpc,
delta_y: Quantity[u.kpc] = u.kpc,
delta_z: Quantity[u.kpc] = u.kpc,
zmin: Quantity[u.kpc] = -0.5 * u.kpc,
zmax: Quantity[u.kpc] = 0.5 * u.kpc,
):
"""Compute the aggration of quantities in cartesian bins
Parameters
----------
delta_x : Quantity[u.kpc]
spacing between two x bins
delta_y : Quantity[u.kpc]
spacing between two y bins
"""
self.cartesian_binning(delta_x, delta_y, delta_z)
self.grouped_data = {}
self.grid = {}
for fluid in self.fluids:
if fluid == "gas":
extensive_fields = ["mass", "ek", "volume"]
else:
extensive_fields = ["mass", "ek"]
filtered_data = self.data[fluid][
(self.data[fluid]["position"][:, 2] > zmin)
& (self.data[fluid]["position"][:, 2] < zmax)
]
self.grouped_data[fluid] = filtered_data.group_by(["x_bin", "y_bin", "z_bin"])
self.grid[fluid] = hstack(
[
self.grouped_data[fluid]["x_bin", "y_bin", "z_bin"].groups.aggregate(
np.fmin
),
aggregate(
self.grouped_data[fluid], extensive_fields=extensive_fields
),
]
)
self.grid[fluid].rename_column("x_bin", "x")
self.grid[fluid].rename_column("y_bin", "y")
self.grid[fluid].rename_column("z_bin", "z")
self.grid["stars"]["sfr"] = (
np.zeros(len(self.grid["stars"]["mass"])) * u.Msun / u.year
)
for i, group in enumerate(self.grouped_data["stars"].groups):
self.grid["stars"]["sfr"][i] = get_sfr(group, self.time)
self.grid["stars"]["sfr"][i] = get_sfr(group, self.time)
def ring_analysis(
self,
delta_r: Quantity[u.kpc] = u.kpc,
rmin: Quantity[u.kpc] = 1 * u.kpc,
rmax: Quantity[u.kpc] = 12 * u.kpc,
zmin: Quantity[u.kpc] = -0.5 * u.kpc,
zmax: Quantity[u.kpc] = 0.5 * u.kpc,
):
"""Compute the aggration of quantities in radial bins
Parameters
----------
delta_r : Quantity[u.kpc], optional
spacing between two radial bins, by default u.kpc
rmin : Quantity[u.kpc], optional
filter out bin below that radius, by default 1*u.kpc
rmax : Quantity[u.kpc], optional
filter out bin beyond that radius, by default 12*u.kpc
"""
self.ring_binning(delta_r)
grouped_data = {}
self.rings = {}
for fluid in self.fluids:
if fluid == "gas":
extensive_fields = ["mass", "ek", "volume"]
else:
extensive_fields = ["mass", "ek"]
filtered_data = self.data[fluid][
(self.data[fluid]["r"] > rmin)
& (self.data[fluid]["r"] < rmax)
& (self.data[fluid]["position"][:, 2] > zmin)
& (self.data[fluid]["position"][:, 2] < zmax)
]
grouped_data[fluid] = filtered_data.group_by(["r_bin"])
self.rings[fluid] = hstack(
[
grouped_data[fluid][
"r_bin",
].groups.aggregate(np.fmin),
aggregate(grouped_data[fluid], extensive_fields=extensive_fields),
]
)
self.rings[fluid].rename_column("r_bin", "r")
self.rings["stars"]["sfr"] = (
np.zeros(len(self.rings["stars"]["mass"])) * u.Msun / u.year
)
for i, group in enumerate(grouped_data["stars"].groups):
self.rings["stars"]["sfr"][i] = get_sfr(group, self.time)