# coding: utf-8 """ Galaxy kiloparsec extractor Noé Brucy 2023 """ import numpy as np from astropy.table import QTable, hstack, vstack from astropy import units as u from astropy.units.quantity import Quantity from collections import defaultdict from numba import jit, prange from abc import ABC _atomic_mass = { "H+": 1, "H2": 2, "CO": 28, } def vect_r(position: np.array, vector: np.array) -> np.array: """Radial component of a vector Parameters ---------- position : np.array (N, 3) (only reads x and y) vector : np.array (N, 3) (only reads x and y components) Returns ------- np.array (N) Radial component """ r = position[:, :2] ur = np.transpose((np.transpose(r) / np.sqrt(np.sum(r**2, axis=1)))) return np.einsum("ij, ij -> i", vector[:, :2], ur) def vect_phi(position: np.array, vector: np.array): """Azimuthal component of a vecto Parameters ---------- position : np.array (N, 3) (only reads x and y) vector : np.array (N, 3) (only reads x and y components) Returns ------- np.array (N) Azimuthal component """ r = position[:, :2] r_norm = np.sqrt(np.sum(r**2, axis=1)) rot = np.array([[0, -1], [1, 0]]) uphi = np.transpose(np.einsum("ij, kj -> ik", rot, r) / r_norm) return np.einsum("ij,ij -> i", vector[:, :2], uphi) def filter_data(table, bounds): mask = np.ones(len(table), dtype=bool) for field in bounds: field_min, field_max = bounds[field] if field_min is not None: mask &= table[field] > field_min if field_max is not None: mask &= table[field] < field_max return table[mask] @jit(nopython=True, parallel=True) def get_sfr( mass: np.ndarray, birth_time: np.ndarray, group_indices: np.ndarray, time: float, average_on: float = 30, ): """Compute SFR in each group Parameters ---------- mass : np.ndarray mass in Msun, dim Ncell, sorted by group birth_time : np.ndarray birth time in Myr, dim Ncell, sorted by group group_indices : np.ndarray array of the indices of each bin, dim Ngroup + 1 time : float time in Myr average_on : float, optional time sfr is averaged on in Myr, by default 30 Returns ------- np.ndarray sfr for each group in Msun/year, dim Ngroup """ sfr = np.zeros(len(group_indices) - 1) dtime = min(average_on, time) if dtime > 0: for i in prange(len(group_indices) - 1): slice_group = slice(group_indices[i], group_indices[i + 1]) mask = (birth_time[slice_group] > max(time - average_on, 0)) & ( birth_time[slice_group] < time ) sfr[i] = np.sum(mass[slice_group][mask]) / (1e6 * dtime) return sfr @jit(nopython=True, parallel=True) def mul_grouped(array: np.ndarray, group_indices: np.ndarray, to_mul: np.ndarray): """Multiply each group by a different value Parameters ---------- array : np.ndarray array to be multiplied, dim Ncell, sorted by group group_indices : np.ndarray dim Ngroup + 1 to_mul : np.ndarray array to multiply with, dim Ngroup Returns ------- np.ndarray result, dim Ncell """ for i in prange(len(group_indices) - 1): slice_group = slice(group_indices[i], group_indices[i + 1]) array[slice_group] *= to_mul[i] return array def aggregate( grouped_data: QTable, weight_field: str = "mass", extensive_fields: str = [], weighted_fields: list = ["velr", "velphi", "velx", "vely", "velz"], compute_dispersions=True, species: list = [], abundance_He=0.1, ) -> QTable: """Aggregate wisely from grouped data Parameters ---------- grouped_data : QTable should already have group information weight_field : str, optional the field used for the weighting of the non extensive quantities, by default "mass" extensive_fields : str, optional these will be summed. Should include weight_field, by default ["mass", "ek"] weighted_fields : list, optional the field that will be weighted, by default ["velr", "velphi", "velz"] Returns ------- QTable a table with the aggregated value for each group """ if weight_field not in extensive_fields: extensive_fields.append(weight_field) weight_fields_species = [] weighted_fields_species = [] for i, spec in enumerate(species): abundance = grouped_data["chemical_abundances"][:, i] atomic_mass = _atomic_mass[spec] grouped_data[f"{weight_field}_{spec}"] = ( grouped_data[weight_field] * abundance * atomic_mass / (1 + 4 * abundance_He) ) weight_fields_species.append(f"{weight_field}_{spec}") for field in weighted_fields: # Multiply weighted field by the weight v*m for spec in species: grouped_data[f"{field}_{spec}"] = ( grouped_data[field] * grouped_data[f"{weight_field}_{spec}"] ) weighted_fields_species.append(f"{field}_{spec}") grouped_data[field] *= grouped_data[weight_field] to_sum = ( extensive_fields + weighted_fields + weight_fields_species + weighted_fields_species ) # Compute the sum of all fields binned_data = grouped_data[to_sum].groups.aggregate(np.add) for field in weighted_fields: for spec in ["all"] + species: if spec == "all": weight_field_here = weight_field field_here = field else: weight_field_here = f"{weight_field}_{spec}" field_here = f"{field}_{spec}" # For weighted field, divided by the total mass # We obtain the weighted mean vmean = 𝚺 (m*v) / 𝚺 m # First we remove 0s to avoid dividing by 0 binned_data[weight_field_here][binned_data[weight_field_here] == 0] = ( 1.0 * binned_data[weight_field_here].unit ) binned_data[field_here] /= binned_data[weight_field_here] if compute_dispersions: # We also compute the weighted dispersion around the weighted mean # First we get m * vmean weight_mean = grouped_data[weight_field_here].value.copy() mul_grouped( weight_mean, grouped_data.groups.indices, binned_data[field_here].value, ) # Then we comput (m*v - m*vmean) grouped_data[field_here] -= weight_mean * grouped_data[field_here].unit # Compute m * (v - vmean)^2 = (m*v - m*vmean) * (m*v - m*vmean) / m # Since we don't want to divide by zero, we first clean them out grouped_data[weight_field_here][ grouped_data[weight_field_here] == 0 ] = (1.0 * grouped_data[weight_field_here].unit) grouped_data[field_here] *= ( grouped_data[field_here] / grouped_data[weight_field_here] ) if not np.all(np.isfinite(grouped_data[field_here])): raise FloatingPointError # Compute 𝚺 m * (v - vmean)^2 binned_data[f"sigma_{field_here}"] = grouped_data[ field_here, ].groups.aggregate(np.add)[field_here] # Compute sigma = 𝚺 (m * (v - vmean)^2) / 𝚺 m binned_data[f"sigma_{field_here}"] = np.sqrt( binned_data[f"sigma_{field_here}"] / binned_data[weight_field_here] ) return binned_data def regroup( galex, dataset_key="sectors", bin_key="r", weighted_fields=None, compute_dispersions=True, ): dataset = galex.__dict__[dataset_key] result = {} for fluid in galex.fluids: if weighted_fields is None: weighted_fields_fluid = galex.weighted_fields[fluid] else: weighted_fields_fluid = weighted_fields grouped = dataset[fluid].group_by(bin_key) agg_spec = {} for spec in galex.species[fluid]: weighted_fields_spec = [ f"{field}_{spec}" for field in weighted_fields_fluid ] agg_spec[spec] = aggregate( grouped, weight_field=f"mass_{spec}", extensive_fields=[], weighted_fields=weighted_fields_spec, compute_dispersions=compute_dispersions, ) extensive_fields = [] if "sfr" in grouped.keys(): extensive_fields.append("sfr") bin_value = grouped[ bin_key, ].groups.aggregate(np.fmin) all_values = aggregate( grouped, extensive_fields=extensive_fields, weighted_fields=weighted_fields_fluid, compute_dispersions=compute_dispersions, ) result[fluid] = hstack([bin_value, all_values] + list(agg_spec.values())) return result class GalsecAnalysis(ABC): def __init__(self): raise NotImplementedError("This method should be implemented in a subclass") def analysis(self, bin_deltas: dict, filter_bounds: dict): raise NotImplementedError("This method should be implemented in a subclass") def sector_analysis( self, delta_r: Quantity[u.kpc] = u.kpc, delta_l: Quantity[u.kpc] = u.kpc, rmin: Quantity[u.kpc] = 1 * u.kpc, rmax: Quantity[u.kpc] = 12 * u.kpc, zmin: Quantity[u.kpc] = -0.5 * u.kpc, zmax: Quantity[u.kpc] = 0.5 * u.kpc, **kwargs, ): """Compute the aggregation of quantities in sectors bins Parameters ---------- delta_r : Quantity[u.kpc], optional spacing between two radial bins, by default u.kpc delta_l : Quantity[u.kpc], optional spacing (in spatial units) between two azimuthal bins, by default u.kpc rmin : Quantity[u.kpc], optional filter out bin below that radius, by default 1*u.kpc rmax : Quantity[u.kpc], optional filter out bin beyond that radius, by default 12*u.kpc """ bin_deltas = {"r": delta_r, "phi": delta_l} filter_bounds = {"r": [rmin, rmax], "z": [zmin, zmax]} self.sectors = self.analysis(bin_deltas, filter_bounds, **kwargs) return self.sectors def cartesian_analysis( self, delta_x: Quantity[u.kpc] = u.kpc, delta_y: Quantity[u.kpc] = u.kpc, delta_z: Quantity[u.kpc] = u.kpc, xmin: Quantity[u.kpc] = -30 * u.kpc, xmax: Quantity[u.kpc] = 30 * u.kpc, ymin: Quantity[u.kpc] = -30 * u.kpc, ymax: Quantity[u.kpc] = 30 * u.kpc, zmin: Quantity[u.kpc] = -0.5 * u.kpc, zmax: Quantity[u.kpc] = 0.5 * u.kpc, **kwargs, ): """Compute the aggregation of quantities in cartesian bins Parameters ---------- delta_x : Quantity[u.kpc] spacing between two x bins delta_y : Quantity[u.kpc] spacing between two y bins """ bin_deltas = {"x": delta_x, "y": delta_y, "z": delta_z} filter_bounds = {"x": [xmin, xmax], "y": [ymin, ymax], "z": [zmin, zmax]} self.grid = self.analysis(bin_deltas, filter_bounds) return self.grid def ring_analysis( self, delta_r: Quantity[u.kpc] = u.kpc, rmin: Quantity[u.kpc] = 1 * u.kpc, rmax: Quantity[u.kpc] = 30 * u.kpc, zmin: Quantity[u.kpc] = -0.5 * u.kpc, zmax: Quantity[u.kpc] = 0.5 * u.kpc, **kwargs, ): """Compute the aggration of quantities in radial bins Parameters ---------- delta_r : Quantity[u.kpc], optional spacing between two radial bins, by default u.kpc rmin : Quantity[u.kpc], optional filter out bin below that radius, by default 1*u.kpc rmax : Quantity[u.kpc], optional filter out bin beyond that radius, by default 30*u.kpc """ bin_deltas = {"r": delta_r} filter_bounds = {"r": [rmin, rmax], "z": [zmin, zmax]} self.rings = self.analysis(bin_deltas, filter_bounds, **kwargs) return self.rings def vertical_ring_analysis( self, delta_r: Quantity[u.kpc] = u.kpc, delta_z: Quantity[u.kpc] = u.kpc, rmin: Quantity[u.kpc] = 1 * u.kpc, rmax: Quantity[u.kpc] = 30 * u.kpc, zmin: Quantity[u.kpc] = -0.5 * u.kpc, zmax: Quantity[u.kpc] = 0.5 * u.kpc, **kwargs, ): bin_deltas = {"r": delta_r, "z": delta_z} filter_bounds = {"r": [rmin, rmax], "z": [zmin, zmax]} return self.analysis(bin_deltas, filter_bounds, **kwargs) def scale_analysis( self, sub_analysis_method: str = "cartesian_analysis", scales: dict = { "delta_x": np.array([0.01, 0.1, 1, 10, 20]) * u.kpc, "delta_y": np.array([0.01, 0.1, 1, 10, 20]) * u.kpc, "delta_z": np.array([0.01, 0.1, 1, 10, 20]) * u.kpc, }, **kwargs, ): nb_scales = len(list(scales.values())[0]) methods = { "cartesian_analysis": self.cartesian_analysis, "sector_analysis": self.sector_analysis, "ring_analysis": self.ring_analysis, "vertical_ring_analysis": self.vertical_ring_analysis, } for i in range(nb_scales): scale_i = {key: value[i] for key, value in scales.items()} print(f"Doing {sub_analysis_method} for scale {i}/{nb_scales}: {scale_i}") grid = methods[sub_analysis_method](**scale_i, **kwargs) if i == 0: all_grids = grid else: all_grids = { fluid: vstack([all_grids[fluid], grid[fluid]]) for fluid in self.fluids } return all_grids class Galsec(GalsecAnalysis): """ Galactic sector extractor """ fluids = ["gas", "stars", "dm"] extensive_fields = defaultdict( lambda: ["mass", "number"], gas=["mass", "volume", "number"], ) weighted_fields = defaultdict(lambda: ["velr", "velphi", "velx", "vely", "velz"]) units = { "position": u.kpc, "volume": u.kpc**3, "mass": u.Msun, "density": u.g / u.cm**3, "velocity": u.km / u.s, "birth_time": u.Myr, "internal_energy": u.km**2 / u.s**2, } species = {} abundance_He = 0.1 def __init__(self, data: dict, copy=True, dense_output=True) -> None: """Initiazise a Galsec object Parameters ---------- cell_data : dict A dataset of cells in the following format: header: time float [Myr] time since the start of the simulation box_size float [kpc] size of the box fluids str list list of fluids included gas: position float array (Ngas, 3) [kpc], centered volume float array (Ngas) [pc^3] velocity float array (Ngas, 3) [km/s] mass float array (Ngas) [Msun] stars: position float array (Nstar, 3) [kpc], centered velocity float array (Nstar, 3) [km/s] mass float array (Nstar) [Msun] birth_time float array (Nstar) [Myr] dm: position float array (Ngas, 3) [kpc], centered velocity float array (Ngas, 3) [km/s] mass float array (Ngas) [Msun] copy : bool, default=True Whether the data should be copied. """ self.data = {} if "fluids" in data["header"]: self.fluids = data["header"]["fluids"] for fluid in self.fluids: units = {} for key in data[fluid]: if key in self.units: units[key] = self.units[key] self.data[fluid] = QTable(data[fluid], units=units, copy=copy) self.species[fluid] = [] self.time = data["header"]["time"] * u.Myr self.box_size = data["header"]["box_size"] * u.kpc if "species" in data["header"]: assert "chemical_abundances" in data["gas"] self.species["gas"] = data["header"]["species"] self.abundance_He = data["header"]["ABHE"] self.compute_derived_values() self.dense_output = dense_output def compute_derived_values(self) -> None: """ Helper function to computed values derivated from input data """ for fluid in self.fluids: dset = self.data[fluid] dset["r"] = np.sqrt( np.sum(dset["position"][:, :2] ** 2, axis=1) ) # galactic radius dset["phi"] = np.angle( dset["position"][:, 0] + dset["position"][:, 1] * 1j ) # galactic longitude dset["phi"][dset["phi"] < 0] += ( 2 * np.pi * u.rad ) # rescaling to get only positive values dset["velphi"] = vect_phi( dset["position"], dset["velocity"] ) # azimuthal velocity dset["velr"] = vect_r(dset["position"], dset["velocity"]) # radial velocity # Number of cells / resolution element per bin (1 when not binned!) dset["number"] = np.ones(len(dset["r"])) # Making aliases. No copy is done here. dset["x"] = dset["position"][:, 0] dset["y"] = dset["position"][:, 1] dset["z"] = dset["position"][:, 2] dset["velx"] = dset["velocity"][:, 0] dset["vely"] = dset["velocity"][:, 1] dset["velz"] = dset["velocity"][:, 2] def binning(self, bins, filter_bounds, binning_mode="delta", dataset=None): """ Bin the data """ if dataset is None: dataset = self.data boxsize = self.box_size keys_binning = [] for fluid in self.fluids: data = dataset[fluid] # no copy for name in bins: delta = None if binning_mode == "delta": delta = bins[name] elif binning_mode == "number": bin_min = filter_bounds[name][0].value bin_max = filter_bounds[name][1].value delta = (bin_max - bin_min) / bins[name] if delta is not None: if name in ["x", "y", "z"]: # stay positive pos = data[name] + 0.5 * boxsize bin = np.floor(pos / delta) # Store the middle value data[f"{name}_bin"] = (bin + 0.5) * delta - 0.5 * boxsize elif name == "r": r_bin = np.trunc(data["r"] / delta) r_bin = (r_bin + 0.5) * delta # Store the middle value data["r_bin"] = r_bin elif name == "phi": delta_phi = (delta / data["r_bin"]) * u.rad phi_bin = (np.trunc(data["phi"] / delta_phi) + 0.5) * delta_phi phi_bin[phi_bin >= 2 * np.pi * u.rad] = ( 0.5 * delta_phi[phi_bin >= 2 * np.pi * u.rad] ) data["phi_bin"] = phi_bin else: print(f"Unsupported binning variable {name}") break if name not in keys_binning: keys_binning.append(name) elif binning_mode == "array": bin_array = bins[name] bin = np.digitize(data[name], bin_array, right=False) bin_array = np.append(bin_array, np.max(data[name])) # Store the middle value data[f"{name}_bin"] = (bin_array[bin - 1] + bin_array[bin]) / 2 if name not in keys_binning: keys_binning.append(name) return keys_binning def densify(self, processed_data: dict, bin_deltas: dict, filter_bounds: dict): """ Densify the data by adding void bins where no data is present """ bins_array = {} for bin_name in bin_deltas: if bin_name == "phi": if "r" in filter_bounds: rmax = filter_bounds["r"][1].value else: rmax = np.max( [ np.max(processed_data[fluid]["r"].value) for fluid in processed_data ] ) delta_phi = bin_deltas["phi"].value / rmax bins_array[bin_name] = np.arange(0, 2 * np.pi, delta_phi * 0.9) else: if bin_name in filter_bounds: bin_min = ( filter_bounds[bin_name][0].value + bin_deltas[bin_name].value / 2 ) bin_max = filter_bounds[bin_name][1].value else: bin_min = np.min( [ np.min(processed_data[fluid][bin_name].value) for fluid in processed_data ] ) bin_max = np.max( [ np.max(processed_data[fluid][bin_name].value) for fluid in processed_data ] ) bins_array[bin_name] = np.arange( bin_min, bin_max, bin_deltas[bin_name].value * 0.9 ) grids = np.meshgrid(*list(bins_array.values())) bins_array = { key: grid.flatten() for key, grid in zip(bins_array.keys(), grids) } for fluid in processed_data: keys = processed_data[fluid].keys() units = {key: processed_data[fluid][key].unit for key in keys} void_array = { key: np.zeros_like(list(bins_array.values())[0]) for key in keys } for bin_name in bin_deltas: void_array[bin_name] = bins_array[bin_name] void_array = QTable(void_array, names=keys, units=units) dense_array = vstack([processed_data[fluid], void_array]) processed_data[fluid] = dense_array self.binning( dataset=processed_data, bins=bin_deltas, filter_bounds=filter_bounds ) for fluid in processed_data: bin_names_with_bin = [bin_name + "_bin" for bin_name in bin_deltas] processed_data[fluid] = ( processed_data[fluid] .group_by(bin_names_with_bin) .groups.aggregate(np.add) ) for bin_name in bin_deltas: processed_data[fluid].remove_column(bin_name) processed_data[fluid].rename_column(bin_name + "_bin", bin_name) return processed_data def analysis(self, bins: dict, filter_bounds: dict, binning_mode="delta"): result = {} keys = self.binning(bins, filter_bounds, binning_mode=binning_mode) keys_bin = [key + "_bin" for key in keys] for fluid in self.fluids: grouped_data = filter_data(self.data[fluid], filter_bounds).group_by( keys_bin ) result[fluid] = hstack( [ grouped_data[keys_bin].groups.aggregate(np.fmin), aggregate( grouped_data, extensive_fields=self.extensive_fields[fluid], weighted_fields=self.weighted_fields[fluid], species=self.species[fluid], abundance_He=self.abundance_He, ), ] ) for key in keys: result[fluid].rename_column(key + "_bin", key) if fluid == "stars" and "birth_time" in self.data["stars"].keys(): sfr = get_sfr( grouped_data["mass"].value, grouped_data["birth_time"].value, grouped_data.groups.indices, self.time.value, ) result["stars"]["sfr"] = sfr * u.Msun / u.year if self.dense_output and (binning_mode == "delta" or binning_mode == "number"): result = self.densify(result, bins, filter_bounds) return result class GalsecTimeSeries(GalsecAnalysis): def __init__(self, galsecs: list, loader=None): self.galsecs = galsecs self.loader = loader self.loaded = loader is None def _analysis_single(self, galsec, bins, filter_bounds, binning_mode="delta"): if not self.loaded: galsec = self.loader(galsec) analysis_result = galsec.analysis( bins, filter_bounds, binning_mode=binning_mode ) for fluid in analysis_result: analysis_result[fluid]["time"] = galsec.time return analysis_result def analysis( self, bins: dict, filter_bounds: dict, binning_mode="delta", aggregate=True, std=True, percentiles=[], num_processes=1, ): timed_data = [] if num_processes == 1: for galsec in self.galsecs: timed_data.append( self._analysis_single(galsec, bins, filter_bounds, binning_mode) ) else: from multiprocessing import Pool with Pool(num_processes) as p: timed_data = p.starmap( self._analysis_single, [ (galsec, bins, filter_bounds, binning_mode) for galsec in self.galsecs ], ) averaged_data = {} for fluid in timed_data[0]: averaged_data[fluid] = vstack([result[fluid] for result in timed_data]) if aggregate: grouped_data = averaged_data[fluid].group_by(list(bins.keys())).groups if std: averaged_data[fluid + "_std"] = grouped_data.aggregate( lambda x: np.std(x.value) ) for q in percentiles: averaged_data[fluid + f"_q{q}"] = grouped_data.aggregate( lambda x: np.percentile(x.value, q) ) averaged_data[fluid] = grouped_data.aggregate(np.mean) return averaged_data