"""
===========
MagmaFrames
===========
Module with the generic MagmaFrame class.
"""
import re
from typing import Dict, List
import elementMass as e
import numpy as np
import pandas as pd
from typing_extensions import Self
from MagmaPandas.Elements import element_weights, oxide_compositions
from MagmaPandas.enums import Datatype, Unit
from MagmaPandas.parse_io.validate import _check_argument, _check_attribute
[docs]
class MagmaFrame(pd.DataFrame):
"""
Generic MagmaPandas DataFrame class for geochemical data.
Parameters
----------
data : ndarray (structured or homogeneous), Iterable, dict, or DataFrame
geochemical data with elements or oxides in columns
units : None, str
data units, either "mol fraction", "wt. %" or "ppm"
datatype : None, str
datatype either "cation" or "oxide"
weights : None, pandas Series
atomic weights of elements or oxides in the MagmaFrame
"""
# New attributes
_metadata = ["_weights", "_units", "_datatype"]
@_check_argument("units", [None, "mol fraction", "wt. %", "ppm"])
@_check_argument("datatype", [None, "cation", "oxide"])
def __init__(
self,
data=None,
*args,
units: None | str = None,
datatype: None | str = None,
weights: pd.Series = None,
**kwargs,
) -> None:
self._units: Unit = Unit(units)
self._datatype: Datatype = Datatype(datatype)
super().__init__(data, **kwargs)
if not self._total:
total_regex = re.search(
"total", "".join(map(str, self.columns.to_list())), re.IGNORECASE
)
if total_regex:
self.rename(columns={total_regex[0]: "total"}, inplace=True)
if weights is not None:
self._weights = weights.copy()
elif not hasattr(self, "_weights"):
self._weights = element_weights.weights_as_series(self.columns)
@property
def _constructor(self):
"""This is the key to letting Pandas know how to keep
derivatives of `MagmaBase` the same type as yours. It should
be enough to return the name of the Class. However, in
some cases, `__finalize__` is not called and `new attributes` are
not carried over. We can fix that by constructing a callable
that makes sure to call `__finalize__` every time."""
def _c(*args, **kwargs):
if (weights := getattr(self, "_weights", None)) is not None:
weights = weights.copy(deep=True)
current_class = type(self)
return current_class(*args, weights=weights, **kwargs).__finalize__(self)
return _c
@property
def _constructor_sliced(self):
from MagmaPandas.MagmaSeries import MagmaSeries
def _c(*args, **kwargs):
if (weights := getattr(self, "_weights", None)) is not None:
weights = weights.copy(deep=True)
return MagmaSeries(*args, weights=weights, **kwargs).__finalize__(self)
return _c
@property
def _no_data(self) -> List:
"""
Names of all columns without chemical data
"""
no_data = list(self.columns.difference(self.elements))
if "total" in no_data:
no_data.remove("total")
return no_data
@property
def _total(self) -> bool:
"""
Dataframe contains column with totals
"""
return "total" in self.columns
@property
def units(self) -> str:
"""
Datatype and units.
"""
return f"{self._datatype.value} {self._units.value}"
@units.setter
def units(self, value):
print("units are read only")
@property
def weights(self) -> pd.Series:
"""
Atomic weights of all elements in the MagmaFrame.
"""
return self._weights.copy()
@property
def elements(self) -> List[str]:
"""
Names of all elements in the MagmaFrame.
"""
return list(self._weights.index).copy()
# @_check_attribute("_units", ["wt. %", "ppm"])
[docs]
def moles(self, normalise=True) -> Self:
"""
Data converted to mol fraction.
"""
if self._units == Unit.MOL_FRACTIONS:
return self.copy()
if self._units == Unit.WT_PERCENT:
return self._convert_moles_wtPercent(normalise=normalise)
elif self._units == Unit.PPM:
return self.convert_ppm_wtPercent()._convert_moles_wtPercent(
normalise=normalise
)
return self.copy()
[docs]
def wt_pc(self, normalise=True) -> Self:
"""
Data converted to wt. %.
"""
if self._units == Unit.WT_PERCENT:
return self.copy()
if self._units == Unit.MOL_FRACTIONS:
return self._convert_moles_wtPercent(normalise=normalise)
elif self._units == Unit.PPM:
return self.convert_ppm_wtPercent()
return self.copy()
@property
def ppm(self) -> Self:
"""
Data converted to ppm.
"""
if self._units == Unit.PPM:
return self.copy()
if self._units == Unit.WT_PERCENT:
return self.convert_ppm_wtPercent()
elif self._units == Unit.MOL_FRACTIONS:
return self._convert_moles_wtPercent().convert_ppm_wtPercent()
return self.copy()
[docs]
def cations(self, normalise=True, norm_to=1, mol_fractions=True) -> Self:
"""
Data converted to cation mol fraction
"""
# Calculate oxide moles
if (self._datatype == Datatype.CATION) & (
mol_fractions & (self._units == Unit.MOL_FRACTIONS)
):
return self.copy()
moles = self[self.elements].moles(normalise=False)
# Calculate cation moles
cations_per_oxide = oxide_compositions.cation_amount(moles.elements)
cations = moles[moles.elements].mul(cations_per_oxide)
# Rename columns to cations
cations.columns = oxide_compositions.cation_names(moles.elements)
cations._datatype = Datatype.CATION
cations = cations.recalculate()
if not mol_fractions:
cations = cations[cations.elements].mul(cations.weights)
cations._units = Unit.WT_PERCENT
norm_to = 100
if not normalise:
cations["total"] = cations.sum(axis=1)
return cations
# Normalise to 1
total = cations.sum(axis=1)
cations = cations.div(total, axis=0) * norm_to
cations["total"] = norm_to
# Set the right datatype and elements
cations._datatype = Datatype.CATION
cations.recalculate(inplace=True)
return cations
[docs]
def oxides(self, normalise=True, oxidation_state: Dict[str, int] = {}) -> Self:
"""
Data converted to oxides
"""
if (self._datatype == Datatype.OXIDE) & (not bool(oxidation_state)):
return self.copy()
units = self._units
cations = self[self.elements].cations(
normalise=False
) # convert to cation mol fractions
cation_names = cations.elements
cation_element_names = [
re.sub(r"\d+", "", e) for e in cation_names
] # strip numbers/charges from the names
cation_names_new = [
(
i
if oxidation_state.get(j, None) is None
else f"{j}{int(oxidation_state[j])}"
)
for i, j in zip(cation_names, cation_element_names)
] # new names include oxidation state for elements with non-default values
oxide_names = e.get_oxide_names(cation_names_new)
cations_per_oxide = e.cation_numbers(oxide_names)
oxides = cations.rename(
columns={cation: oxide for cation, oxide in zip(cation_names, oxide_names)}
).recalculate() # rename to oxides
oxides = oxides.div(cations_per_oxide, axis=1) # recalculate to oxides
oxides["total"] = oxides[oxides.elements].sum(axis=1)
oxides._datatype = Datatype.OXIDE
if units == Unit.MOL_FRACTIONS:
if not normalise:
return oxides
return oxides.normalise()
oxides_wt_pc = oxides.wt_pc(normalise=False)
# oxides_wt_pc["total"] = oxides_wt_pc[oxides_wt_pc.elements].sum(axis=1)
if not normalise:
return oxides_wt_pc
return oxides_wt_pc.normalise()
@property
def oxygen(self) -> pd.Series:
"""
oxygen per 1 mole of cations
"""
# Calculate oxide moles
if self._datatype != Datatype.CATION:
cations = self.cations()
cations = cations[cations.elements]
else:
cations = self[self.elements].copy()
oxygen_per_mole = oxide_compositions.oxygen_amount(
cations.elements, type="cation"
)
cation_per_mole = oxide_compositions.cation_amount(
cations.elements, type="cation"
)
oxygen_per_cation = oxygen_per_mole / cation_per_mole
oxygen = cations.mul(oxygen_per_cation).sum(axis=1)
return oxygen
[docs]
@_check_attribute("_units", ["wt. %", "ppm"])
def convert_ppm_wtPercent(self) -> Self:
"""
ppm converted to wt. % and vice versa
"""
convert_dict = {
Unit.WT_PERCENT: [1e4, Unit.PPM],
Unit.PPM: [1e-4, Unit.WT_PERCENT],
}
converted = self.mul(convert_dict[self._units][0])
converted._units = convert_dict[self._units][1]
return converted
@_check_attribute("_units", ["wt. %", "mol fraction"])
def _convert_moles_wtPercent(self, normalise=True) -> Self:
"""
moles converted to wt. % and vice versa
"""
converted = self[self.elements].copy()
if self._units == Unit.WT_PERCENT:
converted = converted.div(converted.weights)
units = Unit.MOL_FRACTIONS
elif self._units == Unit.MOL_FRACTIONS:
converted = converted.mul(converted.weights)
units = Unit.WT_PERCENT
if not normalise:
converted["total"] = converted[converted.elements].sum(axis=1)
converted._units = units
return converted.recalculate()
# Normalise
total = converted[converted.elements].sum(axis=1)
converted = converted.div(total, axis=0)
converted["total"] = converted[converted.elements].sum(axis=1)
# Set the right units
if self._units == Unit.MOL_FRACTIONS:
converted = converted.mul(100)
converted._units = units
return converted
[docs]
def recalculate(self, inplace=False) -> Self:
"""
Recalculate element masses and total weight.
"""
df = self if inplace else self.copy()
df._weights = element_weights.weights_as_series(self.columns)
if df._total:
totals = df.loc[:, df.elements].sum(axis=1)
df.loc[:, "total"] = totals.astype(df["total"].dtype).values
if not inplace:
return df
[docs]
def normalise(self, to=None) -> Self:
"""
Normalise compositions.
Parameters
----------
to : float, int
normalisation value
Returns
-------
normalised data : MagmaFrame
"""
if to is not None:
norm = float(to)
elif self._units == Unit.WT_PERCENT:
norm = 100.0
else:
norm = 1.0
# self = self.recalculate()
normalised = self[self.elements].copy()
total = normalised.sum(axis=1)
normalised = normalised.div(total, axis=0)
normalised = normalised.mul(norm, axis=0)
normalised.loc[:, "total"] = normalised.sum(axis=1)
return normalised
[docs]
def random_sample(self, errors) -> Self:
"""
Randomly resample compositions within errors.
Sampling distribution is assumed normal with measured values as means and errors as standard deviations.
Parameters
----------
errors : float, array-like
standard deviation of the normal distributions. Use int for a fixed value for all elements or an array for specific values for all elements in :py:attr:`~MagmaPandas.MagmaFrames.magmaFrame.MagmaFrame.elements`
Returns
-------
resampled data : MagmaFrame
Randomly resampled compositions
"""
random_sample = np.random.normal(self[self.elements], errors)
random_sample[random_sample < 0] = 0.0
df = self.copy()
df[df.elements] = random_sample
return df