# coding: utf-8 """ Galaxy kiloparsec extractor """ import numpy as np from astropy.table import QTable, hstack from astropy import units as u from astropy.units.quantity import Quantity from scipy.interpolate import griddata from scipy.fft import fftn def vect_r(position: np.array, vector: np.array) -> np.array: """Radial component of a vector Parameters ---------- position : np.array (N, 3) (only reads x and y) vector : np.array (N, 3) (only reads x and y components) Returns ------- np.array (N) Radial component """ r = position[:, :2] ur = np.transpose((np.transpose(r) / np.sqrt(np.sum(r**2, axis=1)))) return np.einsum("ij, ij -> i", vector[:, :2], ur) def vect_phi(position: np.array, vector: np.array): """Azimuthal component of a vecto Parameters ---------- position : np.array (N, 3) (only reads x and y) vector : np.array (N, 3) (only reads x and y components) Returns ------- np.array (N) Azimuthal component """ r = position[:, :2] r_norm = np.sqrt(np.sum(r**2, axis=1)) rot = np.array([[0, -1], [1, 0]]) uphi = np.transpose(np.einsum("ij, kj -> ik", rot, r) / r_norm) return np.einsum("ij,ij -> i", vector[:, :2], uphi) def get_sfr( stars: QTable, time: Quantity[u.Myr], average_on: Quantity[u.Myr] = 30 * u.Myr ) -> Quantity[u.Msun / u.year]: """_summary_ Parameters ---------- stars : QTable _description_ time : Quantity[u.Myr], time at wich the SFR should be computed average_on : Quantity, optional compute the sfr on the last `average_on` years, by default 30*u.Myr Returns ------- float SFR in Msun/yr """ try: mask = (stars["birth_time"] > max(time - average_on, 0 * u.Myr)) & ( stars["birth_time"] < time ) dtime = min(average_on, time).to(u.year) if dtime == 0: sfr = 0.0 * u.Msun / u.year else: sfr = np.sum(stars["mass"][mask]) / dtime except KeyError: sfr = 0.0 * u.Msun / u.year return sfr def aggregate( grouped_data: QTable, weight_field: str = "mass", extensive_fields: str = ["mass", "ek"], weighted_fields: list = ["velr", "velphi", "velx", "vely", "velz"], ) -> QTable: """Aggregate wisely from grouped data Parameters ---------- grouped_data : QTable should already have group information weight_field : str, optional the field used for the weighting of the non extensive quantities, by default "mass" extensive_fields : str, optional these will be summed. Should include weight_field, by default ["mass", "ek"] weighted_fields : list, optional the field that will be weighted, by default ["velr", "velphi", "velz"] Returns ------- QTable a tabble with the aggregated value for each group """ assert weight_field in extensive_fields for field in weighted_fields: grouped_data[field] *= grouped_data[weight_field] binned_data = grouped_data[extensive_fields + weighted_fields].groups.aggregate( np.add ) for field in weighted_fields: binned_data[field] /= binned_data[weight_field] # weighted mean # Veocity dispersion grouped_data[field] /= grouped_data[weight_field] for i in range(len(grouped_data.groups)): slice = grouped_data.groups.indices[i], grouped_data.groups.indices[i + 1] grouped_data[slice[0] : slice[1]][field] -= binned_data[i][field] grouped_data[field] = grouped_data[weight_field] * grouped_data[field] ** 2 binned_data[f"sigma_{field}"] = grouped_data[field,].groups.aggregate( np.add )[field] binned_data[f"sigma_{field}"] = np.sqrt( binned_data[f"sigma_{field}"] / binned_data[weight_field] ) return binned_data def get_bouncing_box_mask( data: QTable, r: Quantity[u.kpc], phi: Quantity[u.rad], size: Quantity[u.kpc] ): x = r * np.cos(phi) y = r * np.sin(phi) norm_inf = np.maximum( np.abs(data["position"][:, 0] - x), np.abs(data["position"][:, 1] - y) ) mask = (norm_inf < size / 2) & (np.abs(data["position"][:, 2]) < size / 2) return mask def regrid( position: Quantity, value: Quantity, resolution: Quantity[u.pc], ): min_x, max_x = position[:, 0].min(), position[:, 0].max() min_y, max_y = position[:, 1].min(), position[:, 1].max() min_z, max_z = position[:, 2].min(), position[:, 2].max() size = max([max_x - min_x, max_y - min_y, max_z - min_z]) nb_points = int(np.ceil(((size / resolution).to(u.dimensionless_unscaled).value))) gx, gy, gz = np.mgrid[ min_x.value : max_x.value : nb_points * 1j, min_y.value : max_y.value : nb_points * 1j, min_z.value : max_z.value : nb_points * 1j, ] grid = griddata( position.value, value.value, (gx, gy, gz), method="nearest", ) import matplotlib.pyplot as plt plt.imshow( grid[:, :, len(grid) // 2].T, origin="lower", extent=[min_x.value, max_x.value, min_y.value, max_y.value], ) plt.show() return ( grid, (gx, gy, gz), ) def fft( position: Quantity, value: Quantity, resolution: Quantity[u.pc], ): grid, (gx, gy, gz) = regrid(position, value, resolution) fftn(grid, overwrite_x=True) class Galsec: """ Galactic sector extractor """ fluids = ["gas", "stars", "dm"] units = { "gas": { "position": u.kpc, "volume": u.kpc**3, "velocity": u.km / u.s, "mass": u.Msun, }, "stars": { "position": u.kpc, "velocity": u.km / u.s, "mass": u.Msun, "birth_time": u.Myr, }, "dm": {"position": u.kpc, "velocity": u.km / u.s, "mass": u.Msun}, } def __init__(self, data: dict, copy=True) -> None: """Initiazise a Galaxasec object Parameters ---------- cell_data : dict A dataset of cells in the following format: header: time float [Myr] time since the start of the simulation box_size float [kpc] size of the box fluids str list list of fluids included gas: position float array (Ngas, 3) [kpc], centered volume float array (Ngas) [pc^3] velocity float array (Ngas, 3) [km/s] mass float array (Ngas) [Msun] stars: position float array (Nstar, 3) [kpc], centered velocity float array (Nstar, 3) [km/s] mass float array (Nstar) [Msun] birth_time float array (Nstar) [Myr] dm: position float array (Ngas, 3) [kpc], centered velocity float array (Ngas, 3) [km/s] mass float array (Ngas) [Msun] maps: extent float list [xmin, xmax, ymin, ymax] Coordinates of the edges of the map, centered gas_coldens float array (Nx, Ny) [Msun/pc^2] Map of column density copy : bool, default=True Wheter the data should be copied. """ self.data = {} if "fluids" in data["header"]: self.fluids = data["header"]["fluids"] for fluid in self.fluids: self.data[fluid] = QTable(data[fluid], units=self.units[fluid], copy=copy) self.time = data["header"]["time"] * u.Myr self.box_size = data["header"]["box_size"] * u.kpc self.compute_derived_values() def compute_derived_values(self) -> None: """ Helper function to computed values derivated from input data """ for fluid in self.fluids: dset = self.data[fluid] dset["r"] = np.sqrt( np.sum(dset["position"][:, :2] ** 2, axis=1) ) # galactic radius dset["phi"] = np.angle( dset["position"][:, 0] + dset["position"][:, 1] * 1j ) # galactic longitude dset["phi"][dset["phi"] < 0] += ( 2 * np.pi * u.rad ) # rescaling to get only positive values dset["velphi"] = vect_phi( dset["position"], dset["velocity"] ) # azimuthal velocity dset["velr"] = vect_r(dset["position"], dset["velocity"]) # radial velocity dset["velx"] = dset["velocity"][:, 0] # x velocity (TODO this is stupid) dset["vely"] = dset["velocity"][:, 1] # y velocity dset["velz"] = dset["velocity"][:, 2] # vertical velocity dset["ek"] = (dset["mass"] * np.sum(dset["velocity"] ** 2, axis=1)).to( u.erg ) # kinetic energy def ring_binning(self, delta_r: Quantity[u.kpc]): """Add radial bin informations to the dataset Parameters ---------- delta_r : Quantity[u.kpc] spacing between two radial bins """ for fluid in self.fluids: r_bin = np.trunc(self.data[fluid]["r"] / delta_r) r_bin = (r_bin + 0.5) * delta_r # Store the middle value self.data[fluid]["r_bin"] = r_bin def sector_binning(self, delta_r: Quantity[u.kpc], delta_l: Quantity[u.kpc]): """Add sector bin information to the dataset Parameters ---------- delta_r : Quantity[u.kpc] spacing between two radial bins delta_l : Quantity[u.kpc] spacing (in spatial units) between two azimuthal bins """ self.ring_binning(delta_r) for fluid in self.fluids: delta_phi = (delta_l / self.data[fluid]["r_bin"]) * u.rad phi_bin = (np.trunc(self.data[fluid]["phi"] / delta_phi) + 0.5) * delta_phi self.data[fluid]["phi_bin"] = phi_bin def cartesian_binning(self, delta_x: Quantity[u.kpc], delta_y: Quantity[u.kpc], delta_z: Quantity[u.kpc]): """Add cartesian bin information to the dataset Parameters ---------- delta_x : Quantity[u.kpc] spacing between two x bins delta_y : Quantity[u.kpc] spacing between two y bins delta_z : Quantity[u.kpc] spacing between two y bins """ boxsize = self.box_size for fluid in self.fluids: for i, (delta, name) in enumerate(zip([delta_x, delta_y, delta_z], ["x", "y", "z"])): pos = self.data[fluid]["position"][:, i] + 0.5 * boxsize # stay positive bin = np.floor(pos / delta) # Store the middle value self.data[fluid][f"{name}_bin"] = (bin + 0.5) * delta - 0.5 * boxsize def sector_analysis( self, delta_r: Quantity[u.kpc] = u.kpc, delta_l: Quantity[u.kpc] = u.kpc, rmin: Quantity[u.kpc] = 1 * u.kpc, rmax: Quantity[u.kpc] = 12 * u.kpc, zmin: Quantity[u.kpc] = -0.5 * u.kpc, zmax: Quantity[u.kpc] = 0.5 * u.kpc, ): """Compute the aggration of quantities in sectors bins Parameters ---------- delta_r : Quantity[u.kpc], optional spacing between two radial bins, by default u.kpc delta_l : Quantity[u.kpc], optional spacing (in spatial units) between two azimuthal bins, by default u.kpc rmin : Quantity[u.kpc], optional filter out bin below that radius, by default 1*u.kpc rmax : Quantity[u.kpc], optional filter out bin beyond that radius, by default 12*u.kpc """ self.sector_binning(delta_r, delta_l) self.grouped_data = {} self.sectors = {} for fluid in self.fluids: if fluid == "gas": extensive_fields = ["mass", "ek", "volume"] else: extensive_fields = ["mass", "ek"] filtered_data = self.data[fluid][ (self.data[fluid]["r"] > rmin) & (self.data[fluid]["r"] < rmax) & (self.data[fluid]["position"][:, 2] > zmin) & (self.data[fluid]["position"][:, 2] < zmax) ] self.grouped_data[fluid] = filtered_data.group_by(["r_bin", "phi_bin"]) self.sectors[fluid] = hstack( [ self.grouped_data[fluid]["r_bin", "phi_bin"].groups.aggregate( np.fmin ), aggregate( self.grouped_data[fluid], extensive_fields=extensive_fields ), ] ) self.sectors[fluid].rename_column("r_bin", "r") self.sectors[fluid].rename_column("phi_bin", "phi") self.sectors["stars"]["sfr"] = ( np.zeros(len(self.sectors["stars"]["mass"])) * u.Msun / u.year ) for i, group in enumerate(self.grouped_data["stars"].groups): self.sectors["stars"]["sfr"][i] = get_sfr(group, self.time) self.sectors["stars"]["sfr"][i] = get_sfr(group, self.time) def cartesian_analysis( self, delta_x: Quantity[u.kpc] = u.kpc, delta_y: Quantity[u.kpc] = u.kpc, delta_z: Quantity[u.kpc] = u.kpc, zmin: Quantity[u.kpc] = -0.5 * u.kpc, zmax: Quantity[u.kpc] = 0.5 * u.kpc, ): """Compute the aggration of quantities in cartesian bins Parameters ---------- delta_x : Quantity[u.kpc] spacing between two x bins delta_y : Quantity[u.kpc] spacing between two y bins """ self.cartesian_binning(delta_x, delta_y, delta_z) self.grouped_data = {} self.grid = {} for fluid in self.fluids: if fluid == "gas": extensive_fields = ["mass", "ek", "volume"] else: extensive_fields = ["mass", "ek"] filtered_data = self.data[fluid][ (self.data[fluid]["position"][:, 2] > zmin) & (self.data[fluid]["position"][:, 2] < zmax) ] self.grouped_data[fluid] = filtered_data.group_by(["x_bin", "y_bin", "z_bin"]) self.grid[fluid] = hstack( [ self.grouped_data[fluid]["x_bin", "y_bin", "z_bin"].groups.aggregate( np.fmin ), aggregate( self.grouped_data[fluid], extensive_fields=extensive_fields ), ] ) self.grid[fluid].rename_column("x_bin", "x") self.grid[fluid].rename_column("y_bin", "y") self.grid[fluid].rename_column("z_bin", "z") self.grid["stars"]["sfr"] = ( np.zeros(len(self.grid["stars"]["mass"])) * u.Msun / u.year ) for i, group in enumerate(self.grouped_data["stars"].groups): self.grid["stars"]["sfr"][i] = get_sfr(group, self.time) self.grid["stars"]["sfr"][i] = get_sfr(group, self.time) def ring_analysis( self, delta_r: Quantity[u.kpc] = u.kpc, rmin: Quantity[u.kpc] = 1 * u.kpc, rmax: Quantity[u.kpc] = 12 * u.kpc, zmin: Quantity[u.kpc] = -0.5 * u.kpc, zmax: Quantity[u.kpc] = 0.5 * u.kpc, ): """Compute the aggration of quantities in radial bins Parameters ---------- delta_r : Quantity[u.kpc], optional spacing between two radial bins, by default u.kpc rmin : Quantity[u.kpc], optional filter out bin below that radius, by default 1*u.kpc rmax : Quantity[u.kpc], optional filter out bin beyond that radius, by default 12*u.kpc """ self.ring_binning(delta_r) grouped_data = {} self.rings = {} for fluid in self.fluids: if fluid == "gas": extensive_fields = ["mass", "ek", "volume"] else: extensive_fields = ["mass", "ek"] filtered_data = self.data[fluid][ (self.data[fluid]["r"] > rmin) & (self.data[fluid]["r"] < rmax) & (self.data[fluid]["position"][:, 2] > zmin) & (self.data[fluid]["position"][:, 2] < zmax) ] grouped_data[fluid] = filtered_data.group_by(["r_bin"]) self.rings[fluid] = hstack( [ grouped_data[fluid][ "r_bin", ].groups.aggregate(np.fmin), aggregate(grouped_data[fluid], extensive_fields=extensive_fields), ] ) self.rings[fluid].rename_column("r_bin", "r") self.rings["stars"]["sfr"] = ( np.zeros(len(self.rings["stars"]["mass"])) * u.Msun / u.year ) for i, group in enumerate(grouped_data["stars"].groups): self.rings["stars"]["sfr"][i] = get_sfr(group, self.time)