Skip to content

fluxopt.model_data

FlowsData dataclass

FlowsData(
    bound_type: DataArray,
    rel_lb: DataArray,
    rel_ub: DataArray,
    fixed_profile: DataArray,
    size: DataArray,
    effect_coeff: DataArray,
    sizing_min: DataArray | None = None,
    sizing_max: DataArray | None = None,
    sizing_mandatory: DataArray | None = None,
    sizing_effects_per_size: DataArray | None = None,
    sizing_effects_fixed: DataArray | None = None,
    status_min_uptime: DataArray | None = None,
    status_max_uptime: DataArray | None = None,
    status_min_downtime: DataArray | None = None,
    status_max_downtime: DataArray | None = None,
    status_initial: DataArray | None = None,
    status_effects_running: DataArray | None = None,
    status_effects_startup: DataArray | None = None,
    status_previous_uptime: DataArray | None = None,
    status_previous_downtime: DataArray | None = None,
    invest_min: DataArray | None = None,
    invest_max: DataArray | None = None,
    invest_mandatory: DataArray | None = None,
    invest_lifetime: DataArray | None = None,
    invest_prior_size: DataArray | None = None,
    invest_effects_per_size: DataArray | None = None,
    invest_effects_fixed: DataArray | None = None,
    invest_effects_per_size_periodic: DataArray | None = None,
    invest_effects_fixed_periodic: DataArray | None = None,
)

__post_init__

__post_init__() -> None

Validate relative bounds: non-negative and lb <= ub.

Source code in src/fluxopt/model_data.py
def __post_init__(self) -> None:
    """Validate relative bounds: non-negative and lb <= ub."""
    bad_neg = (self.rel_lb < -1e-12).any('time')
    if bad_neg.any():
        raise ValueError(f'Negative lower bounds on flows: {list(self.rel_lb.coords["flow"][bad_neg].values)}')
    bad_order = (self.rel_lb > self.rel_ub + 1e-12).any('time')
    if bad_order.any():
        raise ValueError(
            f'Lower bound > upper bound on flows: {list(self.rel_lb.coords["flow"][bad_order].values)}'
        )

to_dataset

to_dataset() -> xr.Dataset

Serialize to xr.Dataset.

Source code in src/fluxopt/model_data.py
def to_dataset(self) -> xr.Dataset:
    """Serialize to xr.Dataset."""
    return _to_dataset(self)

from_dataset classmethod

from_dataset(ds: Dataset) -> Self

Deserialize from xr.Dataset.

Parameters:

Name Type Description Default
ds Dataset

Dataset with matching variable names.

required
Source code in src/fluxopt/model_data.py
@classmethod
def from_dataset(cls, ds: xr.Dataset) -> Self:
    """Deserialize from xr.Dataset.

    Args:
        ds: Dataset with matching variable names.
    """
    kwargs: dict[str, Any] = {f.name: ds.get(f.name) for f in fields(cls)}
    return cls(**kwargs)

build classmethod

build(
    flows: list[Flow],
    time: TimeIndex,
    effects: list[Effect],
    dt: float = 1.0,
    period: Index | None = None,
) -> Self

Build FlowsData from element objects.

Parameters:

Name Type Description Default
flows list[Flow]

All collected flows with qualified ids.

required
time TimeIndex

Time index.

required
effects list[Effect]

Effect definitions for cost coefficients.

required
dt float

Scalar timestep duration in hours for prior duration computation.

1.0
period Index | None

Period index for multi-period models. When provided, effect_coeff gains a period dimension so that effects_per_flow_hour values can vary across periods.

None
Source code in src/fluxopt/model_data.py
@classmethod
def build(
    cls,
    flows: list[Flow],
    time: TimeIndex,
    effects: list[Effect],
    dt: float = 1.0,
    period: pd.Index | None = None,
) -> Self:
    """Build FlowsData from element objects.

    Args:
        flows: All collected flows with qualified ids.
        time: Time index.
        effects: Effect definitions for cost coefficients.
        dt: Scalar timestep duration in hours for prior duration computation.
        period: Period index for multi-period models. When provided,
            ``effect_coeff`` gains a ``period`` dimension so that
            ``effects_per_flow_hour`` values can vary across periods.
    """
    from fluxopt.elements import Investment, Sizing

    flow_ids = [f.id for f in flows]
    effect_ids = [e.id for e in effects]
    effect_set = set(effect_ids)
    n_time = len(time)
    n_effects = len(effect_ids)

    bound_type: list[str] = []
    rel_lbs: list[xr.DataArray] = []
    rel_ubs: list[xr.DataArray] = []
    profiles: list[xr.DataArray] = []
    size_vals = np.full(len(flows), np.nan)
    effect_coeffs: list[xr.DataArray] = []
    sizing_items: list[tuple[str, Sizing]] = []
    invest_items: list[tuple[str, Investment]] = []
    status_items: list[tuple[str, Status]] = []
    prior_rates_map: dict[str, list[float]] = {}

    nan_time = xr.DataArray(np.full(n_time, np.nan), dims=['time'], coords={'time': time})

    for i, f in enumerate(flows):
        rel_lbs.append(as_dataarray(f.relative_minimum, {'time': time}))
        rel_ubs.append(as_dataarray(f.relative_maximum, {'time': time}))

        if isinstance(f.size, Sizing):
            sizing_items.append((f.id, f.size))
        elif isinstance(f.size, Investment):
            invest_items.append((f.id, f.size))
        elif f.size is not None:
            size_vals[i] = float(f.size)

        if f.fixed_relative_profile is not None:
            profiles.append(as_dataarray(f.fixed_relative_profile, {'time': time}))
            bound_type.append('profile')
        elif f.size is None:
            profiles.append(nan_time)
            bound_type.append('unsized')
        else:
            profiles.append(nan_time)
            bound_type.append('bounded')

        # Effect coefficients for this flow
        ec_coords: dict[str, Any] = {'effect': effect_ids, 'time': time}
        ec_shape = [n_effects, n_time]
        ec_dims = ['effect', 'time']
        if period is not None:
            ec_coords['period'] = period
            ec_shape.append(len(period))
            ec_dims.append('period')
        ec = xr.DataArray(
            np.zeros(ec_shape),
            dims=ec_dims,
            coords=ec_coords,
        )
        as_da_coords: dict[str, Any] = {'time': time}
        if period is not None:
            as_da_coords['period'] = period
        for effect_label, factor in f.effects_per_flow_hour.items():
            if effect_label not in effect_set:
                raise ValueError(f'Unknown effect {effect_label!r} in Flow.effects_per_flow_hour on {f.id!r}')
            ec.loc[effect_label] = as_dataarray(factor, as_da_coords)
        effect_coeffs.append(ec)

        if f.status is not None:
            status_items.append((f.id, f.status))

        if f.prior_rates is not None:
            prior_rates_map[f.id] = f.prior_rates

    flow_idx = pd.Index(flow_ids, name='flow')
    sz = _SizingArrays.build(sizing_items, effect_ids, dim='sizing_flow', period=period)
    inv = _InvestmentArrays.build(invest_items, effect_ids, dim='invest_flow', period=period)
    st = _StatusArrays.build(
        status_items, effect_ids, time, dim='status_flow', prior_rates_map=prior_rates_map, dt=dt, period=period
    )

    return cls(
        bound_type=xr.DataArray(bound_type, dims=['flow'], coords={'flow': flow_ids}),
        rel_lb=fast_concat(rel_lbs, flow_idx),
        rel_ub=fast_concat(rel_ubs, flow_idx),
        fixed_profile=fast_concat(profiles, flow_idx),
        size=xr.DataArray(size_vals, dims=['flow'], coords={'flow': flow_ids}),
        effect_coeff=fast_concat(effect_coeffs, flow_idx),
        sizing_min=sz.min,
        sizing_max=sz.max,
        sizing_mandatory=sz.mandatory,
        sizing_effects_per_size=sz.effects_per_size,
        sizing_effects_fixed=sz.effects_fixed,
        status_min_uptime=st.min_uptime,
        status_max_uptime=st.max_uptime,
        status_min_downtime=st.min_downtime,
        status_max_downtime=st.max_downtime,
        status_initial=st.initial,
        status_effects_running=st.effects_running,
        status_effects_startup=st.effects_startup,
        status_previous_uptime=st.previous_uptime,
        status_previous_downtime=st.previous_downtime,
        invest_min=inv.min,
        invest_max=inv.max,
        invest_mandatory=inv.mandatory,
        invest_lifetime=inv.lifetime,
        invest_prior_size=inv.prior_size,
        invest_effects_per_size=inv.effects_per_size,
        invest_effects_fixed=inv.effects_fixed,
        invest_effects_per_size_periodic=inv.effects_per_size_periodic,
        invest_effects_fixed_periodic=inv.effects_fixed_periodic,
    )

CarriersData dataclass

CarriersData(
    flow_coeff: DataArray,
    unit: DataArray,
    color: DataArray,
    description: DataArray,
)

to_dataset

to_dataset() -> xr.Dataset

Serialize to xr.Dataset.

Source code in src/fluxopt/model_data.py
def to_dataset(self) -> xr.Dataset:
    """Serialize to xr.Dataset."""
    return _to_dataset(self)

from_dataset classmethod

from_dataset(ds: Dataset) -> Self

Deserialize from xr.Dataset.

Parameters:

Name Type Description Default
ds Dataset

Dataset with flow_coeff, unit, color, description.

required
Source code in src/fluxopt/model_data.py
@classmethod
def from_dataset(cls, ds: xr.Dataset) -> Self:
    """Deserialize from xr.Dataset.

    Args:
        ds: Dataset with ``flow_coeff``, ``unit``, ``color``, ``description``.
    """
    return cls(
        flow_coeff=ds['flow_coeff'],
        unit=ds['unit'],
        color=ds['color'],
        description=ds['description'],
    )

build classmethod

build(
    carriers: list[Carrier], flows: list[Flow], carrier_coeff: dict[str, float]
) -> Self

Build CarriersData from explicit carrier declarations.

Parameters:

Name Type Description Default
carriers list[Carrier]

Declared carriers.

required
flows list[Flow]

All collected flows.

required
carrier_coeff dict[str, float]

Mapping of flow id to +1 (produces) or -1 (consumes).

required
Source code in src/fluxopt/model_data.py
@classmethod
def build(cls, carriers: list[Carrier], flows: list[Flow], carrier_coeff: dict[str, float]) -> Self:
    """Build CarriersData from explicit carrier declarations.

    Args:
        carriers: Declared carriers.
        flows: All collected flows.
        carrier_coeff: Mapping of flow id to +1 (produces) or -1 (consumes).
    """
    from fluxopt.elements import node_id

    flow_ids = [f.id for f in flows]
    # Build carrier dim ids from explicit declarations
    carrier_ids: list[str] = []
    for c in carriers:
        if c.nodes:
            carrier_ids.extend(node_id(c.id, node) for node in c.nodes)
        else:
            carrier_ids.append(c.id)

    coeff = np.full((len(carrier_ids), len(flow_ids)), np.nan)
    for f in flows:
        ci = carrier_ids.index(_carrier_dim_id(f))
        fi = flow_ids.index(f.id)
        coeff[ci, fi] = carrier_coeff[f.id]

    # Expand carrier metadata to match carrier dim (one entry per node)
    units: list[str] = []
    colors: list[str] = []
    descriptions: list[str] = []
    for c in carriers:
        n = max(len(c.nodes), 1)
        units.extend([c.unit] * n)
        colors.extend([c.color or ''] * n)
        descriptions.extend([c.description] * n)

    return cls(
        flow_coeff=xr.DataArray(coeff, dims=['carrier', 'flow'], coords={'carrier': carrier_ids, 'flow': flow_ids}),
        unit=xr.DataArray(units, dims=['carrier'], coords={'carrier': carrier_ids}),
        color=xr.DataArray(colors, dims=['carrier'], coords={'carrier': carrier_ids}),
        description=xr.DataArray(descriptions, dims=['carrier'], coords={'carrier': carrier_ids}),
    )

ConvertersData dataclass

ConvertersData(
    pair_coeff: DataArray,
    pair_converter: DataArray,
    pair_flow: DataArray,
    eq_mask: DataArray,
)

flow_coeff property

flow_coeff: DataArray

Dense (converter, eq_idx, flow, time) view for inspection.

to_dataset

to_dataset() -> xr.Dataset

Serialize to xr.Dataset.

Source code in src/fluxopt/model_data.py
def to_dataset(self) -> xr.Dataset:
    """Serialize to xr.Dataset."""
    return _to_dataset(self)

from_dataset classmethod

from_dataset(ds: Dataset) -> Self

Deserialize from xr.Dataset.

Parameters:

Name Type Description Default
ds Dataset

Dataset with pair-based converter coefficient variables.

required
Source code in src/fluxopt/model_data.py
@classmethod
def from_dataset(cls, ds: xr.Dataset) -> Self:
    """Deserialize from xr.Dataset.

    Args:
        ds: Dataset with pair-based converter coefficient variables.
    """
    return cls(
        pair_coeff=ds['pair_coeff'],
        pair_converter=ds['pair_converter'],
        pair_flow=ds['pair_flow'],
        eq_mask=ds['eq_mask'],
    )

build classmethod

build(converters: list[Converter], time: TimeIndex) -> Self | None

Build ConvertersData with sparse pair-based conversion coefficients.

Parameters:

Name Type Description Default
converters list[Converter]

Converter definitions.

required
time TimeIndex

Time index.

required
Source code in src/fluxopt/model_data.py
@classmethod
def build(cls, converters: list[Converter], time: TimeIndex) -> Self | None:
    """Build ConvertersData with sparse pair-based conversion coefficients.

    Args:
        converters: Converter definitions.
        time: Time index.
    """
    if not converters:
        return None

    conv_ids = [c.id for c in converters]
    max_eq = max(len(c.conversion_factors) for c in converters)
    n_time = len(time)
    eq_idx_list = list(range(max_eq))

    eq_mask_rows: list[np.ndarray] = []
    pairs_conv: list[str] = []
    pairs_flow: list[str] = []
    coeff_arrays: list[np.ndarray] = []

    for conv in converters:
        mask_row = np.zeros(max_eq, dtype=bool)
        for eq_i in range(len(conv.conversion_factors)):
            mask_row[eq_i] = True
        eq_mask_rows.append(mask_row)

        qid_to_short = {v: k for k, v in conv._short_to_id.items()}
        for flow in (*conv.inputs, *conv.outputs):
            short = qid_to_short[flow.id]
            eq_coeffs = np.zeros((max_eq, n_time))
            for eq_i, equation in enumerate(conv.conversion_factors):
                if short in equation:
                    eq_coeffs[eq_i] = as_dataarray(equation[short], {'time': time}).values
            pairs_conv.append(conv.id)
            pairs_flow.append(flow.id)
            coeff_arrays.append(eq_coeffs)

    return cls(
        pair_coeff=xr.DataArray(
            np.array(coeff_arrays),
            dims=['pair', 'eq_idx', 'time'],
            coords={'eq_idx': eq_idx_list, 'time': time},
        ),
        pair_converter=xr.DataArray(pairs_conv, dims=['pair']),
        pair_flow=xr.DataArray(pairs_flow, dims=['pair']),
        eq_mask=xr.DataArray(
            np.array(eq_mask_rows),
            dims=['converter', 'eq_idx'],
            coords={'converter': conv_ids, 'eq_idx': eq_idx_list},
        ),
    )

EffectsData dataclass

EffectsData(
    min_total: DataArray,
    max_total: DataArray,
    min_per_hour: DataArray,
    max_per_hour: DataArray,
    is_objective: DataArray,
    objective_effect: str,
    cf_periodic: DataArray | None = None,
    cf_temporal: DataArray | None = None,
    period_weights_periodic: DataArray | None = None,
    period_weights_once: DataArray | None = None,
)

__post_init__

__post_init__() -> None

Validate exactly one objective effect exists.

Source code in src/fluxopt/model_data.py
def __post_init__(self) -> None:
    """Validate exactly one objective effect exists."""
    n_obj = int(self.is_objective.sum())
    if n_obj == 0:
        raise ValueError('No objective effect found. Include an Effect with is_objective=True.')
    if n_obj > 1:
        raise ValueError(
            f'Multiple objective effects: {list(self.is_objective.coords["effect"][self.is_objective].values)}. Only one is allowed.'
        )

objective_weights

objective_weights(
    global_period_weights: DataArray | None,
) -> tuple[xr.DataArray | int, xr.DataArray | int]

Resolve period weights for the objective effect's two domains.

Parameters:

Name Type Description Default
global_period_weights DataArray | None

Default period weights from Dims (or None).

required

Returns:

Type Description
DataArray | int

(w_periodic, w_once) — weights for recurring and one-time domains.

DataArray | int

Falls back to global_period_weights / 1 when no per-effect override.

Source code in src/fluxopt/model_data.py
def objective_weights(
    self,
    global_period_weights: xr.DataArray | None,
) -> tuple[xr.DataArray | int, xr.DataArray | int]:
    """Resolve period weights for the objective effect's two domains.

    Args:
        global_period_weights: Default period weights from Dims (or None).

    Returns:
        (w_periodic, w_once) — weights for recurring and one-time domains.
        Falls back to global_period_weights / 1 when no per-effect override.
    """
    k = self.objective_effect

    if self.period_weights_periodic is not None and not self.period_weights_periodic.sel(effect=k).isnull().all():
        w_periodic: xr.DataArray | int = self.period_weights_periodic.sel(effect=k)
    elif global_period_weights is not None:
        w_periodic = global_period_weights
    else:
        w_periodic = 1

    if self.period_weights_once is not None and not self.period_weights_once.sel(effect=k).isnull().all():
        w_once: xr.DataArray | int = self.period_weights_once.sel(effect=k)
    else:
        w_once = 1

    return w_periodic, w_once

to_dataset

to_dataset() -> xr.Dataset

Serialize to xr.Dataset.

Source code in src/fluxopt/model_data.py
def to_dataset(self) -> xr.Dataset:
    """Serialize to xr.Dataset."""
    return _to_dataset(self)

from_dataset classmethod

from_dataset(ds: Dataset) -> Self

Deserialize from xr.Dataset.

Parameters:

Name Type Description Default
ds Dataset

Dataset with effect variables and attrs.

required
Source code in src/fluxopt/model_data.py
@classmethod
def from_dataset(cls, ds: xr.Dataset) -> Self:
    """Deserialize from xr.Dataset.

    Args:
        ds: Dataset with effect variables and attrs.
    """
    kwargs: dict[str, object] = {}
    for f in fields(cls):
        if f.name in ds.data_vars:
            kwargs[f.name] = ds[f.name]
        elif f.name in ds.attrs:
            kwargs[f.name] = ds.attrs[f.name]
        # else: rely on dataclass default (e.g. None for optional fields)
    return cls(**kwargs)  # type: ignore[arg-type]

build classmethod

build(
    effects: list[Effect], time: TimeIndex, period: Index | None = None
) -> Self

Build EffectsData from element objects.

Parameters:

Name Type Description Default
effects list[Effect]

Effect definitions.

required
time TimeIndex

Time index.

required
period Index | None

Period index (multi-period only).

None
Source code in src/fluxopt/model_data.py
@classmethod
def build(
    cls,
    effects: list[Effect],
    time: TimeIndex,
    period: pd.Index | None = None,
) -> Self:
    """Build EffectsData from element objects.

    Args:
        effects: Effect definitions.
        time: Time index.
        period: Period index (multi-period only).
    """
    effect_ids = [e.id for e in effects]
    effect_set = set(effect_ids)
    n = len(effects)
    n_time = len(time)
    objective_effect = next(
        (e.id for e in effects if e.is_objective),
        None,
    )
    if objective_effect is None:
        raise ValueError('No objective effect found. Include an Effect with is_objective=True.')

    min_total = np.full(n, np.nan)
    max_total = np.full(n, np.nan)
    min_per_hours: list[xr.DataArray] = []
    max_per_hours: list[xr.DataArray] = []
    is_objective = np.zeros(n, dtype=bool)

    nan_time = xr.DataArray(np.full(n_time, np.nan), dims=['time'], coords={'time': time})

    has_contributions = False
    for i, e in enumerate(effects):
        if e.minimum_total is not None:
            min_total[i] = e.minimum_total
        if e.maximum_total is not None:
            max_total[i] = e.maximum_total
        min_per_hours.append(
            as_dataarray(e.minimum_per_hour, {'time': time}) if e.minimum_per_hour is not None else nan_time
        )
        max_per_hours.append(
            as_dataarray(e.maximum_per_hour, {'time': time}) if e.maximum_per_hour is not None else nan_time
        )
        is_objective[i] = e.is_objective
        if e.contribution_from or e.contribution_from_per_hour:
            has_contributions = True

    # Build cross-effect contribution arrays
    cf_periodic: xr.DataArray | None = None
    cf_temporal: xr.DataArray | None = None
    if has_contributions:
        # Self-reference check
        for e in effects:
            for src_id in (*e.contribution_from, *e.contribution_from_per_hour):
                if src_id == e.id:
                    raise ValueError(f'Effect {e.id!r} cannot reference itself in contribution_from')

        # Cycle check
        adjacency: dict[str, list[str]] = {eid: [] for eid in effect_ids}
        for e in effects:
            for src_id in {*e.contribution_from, *e.contribution_from_per_hour}:
                if src_id not in effect_set:
                    raise ValueError(f'Unknown effect {src_id!r} in contribution_from on {e.id!r}')
                adjacency[e.id].append(src_id)
        cycle = _detect_contribution_cycle(adjacency)
        if cycle is not None:
            raise ValueError(f'Circular contribution_from dependency: {" -> ".join(cycle)}')

        tmpl_p = _effect_template({'effect': effect_ids, 'source_effect': effect_ids}, period)
        tmpl_t = _effect_template({'effect': effect_ids, 'source_effect': effect_ids, 'time': time}, period)

        periodic_mat = tmpl_p.zeros()
        temporal_mat = tmpl_t.zeros()
        for e in effects:
            for src_id, factor in e.contribution_from.items():
                if src_id not in effect_set:
                    raise ValueError(f'Unknown effect {src_id!r} in Effect.contribution_from on {e.id!r}')
                periodic_mat.loc[e.id, src_id] = as_dataarray(factor, tmpl_p.as_da_coords)
                temporal_mat.loc[e.id, src_id] = as_dataarray(factor, tmpl_t.as_da_coords)
            for src_id, factor_ts in e.contribution_from_per_hour.items():
                if src_id not in effect_set:
                    raise ValueError(f'Unknown effect {src_id!r} in Effect.contribution_from_per_hour on {e.id!r}')
                temporal_mat.loc[e.id, src_id] = as_dataarray(factor_ts, tmpl_t.as_da_coords)
        cf_periodic = periodic_mat
        cf_temporal = temporal_mat

    effect_idx = pd.Index(effect_ids, name='effect')

    # Per-effect period weights
    pw_periodic: xr.DataArray | None = None
    pw_once: xr.DataArray | None = None
    if period is not None:
        has_pw_periodic = any(e.period_weights_periodic is not None for e in effects)
        has_pw_once = any(e.period_weights_once is not None for e in effects)
        n_periods = len(period)
        if has_pw_periodic:
            mat = np.full((n, n_periods), np.nan)
            for i, e in enumerate(effects):
                if e.period_weights_periodic is not None:
                    if len(e.period_weights_periodic) != n_periods:
                        msg = f'Effect {e.id!r}: period_weights_periodic has {len(e.period_weights_periodic)} entries, expected {n_periods}'
                        raise ValueError(msg)
                    vals = np.asarray(e.period_weights_periodic, dtype=float)
                    if not np.all(np.isfinite(vals)) or not np.all(vals > 0):
                        msg = f'Effect {e.id!r}: period_weights_periodic must be positive and finite, got {vals}'
                        raise ValueError(msg)
                    mat[i] = vals
            pw_periodic = xr.DataArray(
                mat, dims=['effect', 'period'], coords={'effect': effect_ids, 'period': period}
            )
        if has_pw_once:
            mat = np.full((n, n_periods), np.nan)
            for i, e in enumerate(effects):
                if e.period_weights_once is not None:
                    if len(e.period_weights_once) != n_periods:
                        msg = f'Effect {e.id!r}: period_weights_once has {len(e.period_weights_once)} entries, expected {n_periods}'
                        raise ValueError(msg)
                    vals = np.asarray(e.period_weights_once, dtype=float)
                    if not np.all(np.isfinite(vals)) or not np.all(vals > 0):
                        msg = f'Effect {e.id!r}: period_weights_once must be positive and finite, got {vals}'
                        raise ValueError(msg)
                    mat[i] = vals
            pw_once = xr.DataArray(mat, dims=['effect', 'period'], coords={'effect': effect_ids, 'period': period})

    return cls(
        min_total=xr.DataArray(min_total, dims=['effect'], coords={'effect': effect_ids}),
        max_total=xr.DataArray(max_total, dims=['effect'], coords={'effect': effect_ids}),
        min_per_hour=fast_concat(min_per_hours, effect_idx),
        max_per_hour=fast_concat(max_per_hours, effect_idx),
        is_objective=xr.DataArray(is_objective, dims=['effect'], coords={'effect': effect_ids}),
        objective_effect=objective_effect,
        cf_periodic=cf_periodic,
        cf_temporal=cf_temporal,
        period_weights_periodic=pw_periodic,
        period_weights_once=pw_once,
    )

StoragesData dataclass

StoragesData(
    capacity: DataArray,
    eta_c: DataArray,
    eta_d: DataArray,
    loss: DataArray,
    rel_level_lb: DataArray,
    rel_level_ub: DataArray,
    prior_level: DataArray,
    cyclic: DataArray,
    charge_flow: DataArray,
    discharge_flow: DataArray,
    sizing_min: DataArray | None = None,
    sizing_max: DataArray | None = None,
    sizing_mandatory: DataArray | None = None,
    sizing_effects_per_size: DataArray | None = None,
    sizing_effects_fixed: DataArray | None = None,
    invest_min: DataArray | None = None,
    invest_max: DataArray | None = None,
    invest_mandatory: DataArray | None = None,
    invest_lifetime: DataArray | None = None,
    invest_prior_size: DataArray | None = None,
    invest_effects_per_size: DataArray | None = None,
    invest_effects_fixed: DataArray | None = None,
    invest_effects_per_size_periodic: DataArray | None = None,
    invest_effects_fixed_periodic: DataArray | None = None,
)

__post_init__

__post_init__() -> None

Validate capacity, efficiencies, and loss rates.

Source code in src/fluxopt/model_data.py
def __post_init__(self) -> None:
    """Validate capacity, efficiencies, and loss rates."""
    s = self.capacity.coords['storage']
    cap = self.capacity
    bad_cap = ~np.isnan(cap) & (cap < 0)
    if bad_cap.any():
        raise ValueError(f'Negative capacity on storages: {list(s[bad_cap].values)}')
    bad_eta_c = ((self.eta_c <= 0) | (self.eta_c > 1)).any('time')
    if bad_eta_c.any():
        raise ValueError(f'eta_charge must be in (0, 1] on storages: {list(s[bad_eta_c].values)}')
    bad_eta_d = ((self.eta_d <= 0) | (self.eta_d > 1)).any('time')
    if bad_eta_d.any():
        raise ValueError(f'eta_discharge must be in (0, 1] on storages: {list(s[bad_eta_d].values)}')
    bad_loss = ((self.loss < 0) | (self.loss > 1)).any('time')
    if bad_loss.any():
        raise ValueError(f'relative_loss_per_hour must be in [0, 1] on storages: {list(s[bad_loss].values)}')

to_dataset

to_dataset() -> xr.Dataset

Serialize to xr.Dataset.

Source code in src/fluxopt/model_data.py
def to_dataset(self) -> xr.Dataset:
    """Serialize to xr.Dataset."""
    return _to_dataset(self)

from_dataset classmethod

from_dataset(ds: Dataset) -> Self

Deserialize from xr.Dataset.

Parameters:

Name Type Description Default
ds Dataset

Dataset with matching variable names.

required
Source code in src/fluxopt/model_data.py
@classmethod
def from_dataset(cls, ds: xr.Dataset) -> Self:
    """Deserialize from xr.Dataset.

    Args:
        ds: Dataset with matching variable names.
    """
    kwargs: dict[str, Any] = {f.name: ds.get(f.name) for f in fields(cls)}
    return cls(**kwargs)

build classmethod

build(
    storages: list[Storage],
    time: TimeIndex,
    dt: DataArray,
    effects: list[Effect] | None = None,
    period: Index | None = None,
) -> Self | None

Build StoragesData from element objects.

Parameters:

Name Type Description Default
storages list[Storage]

Storage definitions.

required
time TimeIndex

Time index.

required
dt DataArray

Timestep durations.

required
effects list[Effect] | None

Effect definitions for sizing cost validation.

None
period Index | None

Period index for period-varying effects.

None
Source code in src/fluxopt/model_data.py
@classmethod
def build(
    cls,
    storages: list[Storage],
    time: TimeIndex,
    dt: xr.DataArray,
    effects: list[Effect] | None = None,
    period: pd.Index | None = None,
) -> Self | None:
    """Build StoragesData from element objects.

    Args:
        storages: Storage definitions.
        time: Time index.
        dt: Timestep durations.
        effects: Effect definitions for sizing cost validation.
        period: Period index for period-varying effects.
    """
    from fluxopt.elements import Investment, Sizing

    if not storages:
        return None

    effect_ids = [e.id for e in effects] if effects else []
    stor_ids = [s.id for s in storages]
    n = len(storages)

    capacity_vals = np.full(n, np.nan)
    eta_cs: list[xr.DataArray] = []
    eta_ds: list[xr.DataArray] = []
    losses: list[xr.DataArray] = []
    level_lbs: list[xr.DataArray] = []
    level_ubs: list[xr.DataArray] = []
    prior_level_vals = np.full(n, np.nan)
    cyclic_vals = np.zeros(n, dtype=bool)
    charge_flow: list[str] = []
    discharge_flow: list[str] = []
    sizing_items: list[tuple[str, Sizing]] = []
    invest_items: list[tuple[str, Investment]] = []

    for i, s in enumerate(storages):
        if isinstance(s.capacity, Sizing):
            sizing_items.append((s.id, s.capacity))
        elif isinstance(s.capacity, Investment):
            invest_items.append((s.id, s.capacity))
        elif s.capacity is not None:
            capacity_vals[i] = s.capacity

        eta_cs.append(as_dataarray(s.eta_charge, {'time': time}))
        eta_ds.append(as_dataarray(s.eta_discharge, {'time': time}))
        losses.append(as_dataarray(s.relative_loss_per_hour, {'time': time}))

        level_lbs.append(as_dataarray(s.relative_minimum_level, {'time': time}))
        level_ubs.append(as_dataarray(s.relative_maximum_level, {'time': time}))

        cyclic_vals[i] = s.cyclic
        if s.prior_level is not None:
            prior_level_vals[i] = s.prior_level

        charge_flow.append(s.charging.id)
        discharge_flow.append(s.discharging.id)

    stor_idx = pd.Index(stor_ids, name='storage')
    sz = _SizingArrays.build(sizing_items, effect_ids, dim='sizing_storage', period=period)
    inv = _InvestmentArrays.build(invest_items, effect_ids, dim='invest_storage', period=period)

    return cls(
        capacity=xr.DataArray(capacity_vals, dims=['storage'], coords={'storage': stor_ids}),
        eta_c=xr.concat(eta_cs, dim=stor_idx),
        eta_d=xr.concat(eta_ds, dim=stor_idx),
        loss=xr.concat(losses, dim=stor_idx),
        rel_level_lb=xr.concat(level_lbs, dim=stor_idx),
        rel_level_ub=xr.concat(level_ubs, dim=stor_idx),
        prior_level=xr.DataArray(prior_level_vals, dims=['storage'], coords={'storage': stor_ids}),
        cyclic=xr.DataArray(cyclic_vals, dims=['storage'], coords={'storage': stor_ids}),
        charge_flow=xr.DataArray(charge_flow, dims=['storage'], coords={'storage': stor_ids}),
        discharge_flow=xr.DataArray(discharge_flow, dims=['storage'], coords={'storage': stor_ids}),
        sizing_min=sz.min,
        sizing_max=sz.max,
        sizing_mandatory=sz.mandatory,
        sizing_effects_per_size=sz.effects_per_size,
        sizing_effects_fixed=sz.effects_fixed,
        invest_min=inv.min,
        invest_max=inv.max,
        invest_mandatory=inv.mandatory,
        invest_lifetime=inv.lifetime,
        invest_prior_size=inv.prior_size,
        invest_effects_per_size=inv.effects_per_size,
        invest_effects_fixed=inv.effects_fixed,
        invest_effects_per_size_periodic=inv.effects_per_size_periodic,
        invest_effects_fixed_periodic=inv.effects_fixed_periodic,
    )

Dims dataclass

Dims(
    time: DataArray,
    dt: DataArray,
    weights: DataArray,
    period: DataArray | None = None,
    period_weights: DataArray | None = None,
)

Shared model coordinates and temporal metadata.

Owns the time and period dimensions, timestep durations, and weights.

coords

coords(*, time: bool = False, period: bool = False) -> dict[str, xr.DataArray]

Return shared coordinates for variable/DataArray creation.

Parameters:

Name Type Description Default
time bool

Include the time coordinate.

False
period bool

Include the period coordinate (no-op in single-period mode).

False
Source code in src/fluxopt/model_data.py
def coords(self, *, time: bool = False, period: bool = False) -> dict[str, xr.DataArray]:
    """Return shared coordinates for variable/DataArray creation.

    Args:
        time: Include the time coordinate.
        period: Include the period coordinate (no-op in single-period mode).
    """
    result: dict[str, xr.DataArray] = {}
    if time:
        result['time'] = self.time
    if period and self.period is not None:
        result['period'] = self.period
    return result

to_dataset

to_dataset() -> xr.Dataset

Serialize to xr.Dataset.

Source code in src/fluxopt/model_data.py
def to_dataset(self) -> xr.Dataset:
    """Serialize to xr.Dataset."""
    data_vars: dict[str, xr.DataArray] = {'dt': self.dt, 'weights': self.weights}
    if self.period is not None:
        data_vars['period'] = self.period
    if self.period_weights is not None:
        data_vars['period_weights'] = self.period_weights
    return xr.Dataset(data_vars)

from_dataset classmethod

from_dataset(ds: Dataset) -> Self

Deserialize from xr.Dataset.

Parameters:

Name Type Description Default
ds Dataset

Dataset with dt, weights, and optional period fields.

required
Source code in src/fluxopt/model_data.py
@classmethod
def from_dataset(cls, ds: xr.Dataset) -> Self:
    """Deserialize from xr.Dataset.

    Args:
        ds: Dataset with dt, weights, and optional period fields.
    """
    dt = ds['dt']
    time_idx = dt.coords['time']
    return cls(
        time=time_idx,
        dt=dt,
        weights=ds['weights'],
        period=ds.get('period', None),
        period_weights=ds.get('period_weights', None),
    )

build classmethod

build(
    time: TimeIndex,
    dt: DataArray,
    periods: list[int] | Index | None = None,
    period_weights: list[float] | None = None,
) -> Self

Build Dims from a time index and optional periods.

Parameters:

Name Type Description Default
time TimeIndex

Normalized time index.

required
dt DataArray

Timestep durations.

required
periods list[int] | Index | None

Integer period labels for multi-period optimization.

None
period_weights list[float] | None

Explicit weights per period. Inferred from gaps if None.

None
Source code in src/fluxopt/model_data.py
@classmethod
def build(
    cls,
    time: TimeIndex,
    dt: xr.DataArray,
    periods: list[int] | pd.Index | None = None,
    period_weights: list[float] | None = None,
) -> Self:
    """Build Dims from a time index and optional periods.

    Args:
        time: Normalized time index.
        dt: Timestep durations.
        periods: Integer period labels for multi-period optimization.
        period_weights: Explicit weights per period. Inferred from gaps if None.
    """
    time_coord = xr.DataArray(time, dims=['time'], coords={'time': time})
    weights = xr.DataArray(np.ones(len(time)), dims=['time'], coords={'time': time}, name='weight')

    period_da: xr.DataArray | None = None
    period_weights_da: xr.DataArray | None = None
    if periods is not None:
        period_idx, period_weights_da = _compute_period_weights(periods, period_weights)
        period_da = xr.DataArray(period_idx.values, dims=['period'], coords={'period': period_idx})

    return cls(
        time=time_coord,
        dt=dt,
        weights=weights,
        period=period_da,
        period_weights=period_weights_da,
    )

ModelData dataclass

ModelData(
    flows: FlowsData,
    carriers: CarriersData,
    converters: ConvertersData | None,
    effects: EffectsData,
    storages: StoragesData | None,
    dims: Dims,
)

to_netcdf

to_netcdf(path: str | Path, *, mode: Literal['w', 'a'] = 'a') -> None

Write model data as NetCDF groups under /model/.

Parameters:

Name Type Description Default
path str | Path

Output file path.

required
mode Literal['w', 'a']

Write mode ('w' to overwrite, 'a' to append).

'a'
Source code in src/fluxopt/model_data.py
def to_netcdf(self, path: str | Path, *, mode: Literal['w', 'a'] = 'a') -> None:
    """Write model data as NetCDF groups under ``/model/``.

    Args:
        path: Output file path.
        mode: Write mode ('w' to overwrite, 'a' to append).
    """
    p = Path(path)
    dataset_fields: dict[str, FlowsData | CarriersData | ConvertersData | EffectsData | StoragesData | None] = {
        'flows': self.flows,
        'carriers': self.carriers,
        'converters': self.converters,
        'effects': self.effects,
        'storages': self.storages,
    }
    current_mode = mode
    for name, obj in dataset_fields.items():
        if obj is not None:
            obj.to_dataset().to_netcdf(p, mode=current_mode, group=_NC_GROUPS[name], engine='netcdf4')
            current_mode = 'a'
    self.dims.to_dataset().to_netcdf(p, mode=current_mode, group='model/meta', engine='netcdf4')

from_netcdf classmethod

from_netcdf(path: str | Path) -> ModelData

Read model data from NetCDF groups.

Parameters:

Name Type Description Default
path str | Path

Input file path.

required

Raises:

Type Description
OSError

If no model data groups found in the file.

Source code in src/fluxopt/model_data.py
@classmethod
def from_netcdf(cls, path: str | Path) -> ModelData:
    """Read model data from NetCDF groups.

    Args:
        path: Input file path.

    Raises:
        OSError: If no model data groups found in the file.
    """
    p = Path(path)
    meta = xr.load_dataset(p, group='model/meta', engine='netcdf4')

    datasets: dict[str, xr.Dataset] = {}
    for name, group in _NC_GROUPS.items():
        try:
            datasets[name] = xr.load_dataset(p, group=group, engine='netcdf4')
        except OSError:
            datasets[name] = xr.Dataset()

    flows = FlowsData.from_dataset(datasets['flows'])
    carriers = CarriersData.from_dataset(datasets['carriers'])
    converters = ConvertersData.from_dataset(datasets['converters']) if datasets['converters'].data_vars else None
    effects = EffectsData.from_dataset(datasets['effects'])
    storages = StoragesData.from_dataset(datasets['storages']) if datasets['storages'].data_vars else None

    return cls(
        flows=flows,
        carriers=carriers,
        converters=converters,
        effects=effects,
        storages=storages,
        dims=Dims.from_dataset(meta),
    )

build classmethod

build(
    timesteps: Timesteps,
    carriers: list[Carrier],
    effects: list[Effect],
    ports: list[Port],
    converters: list[Converter] | None = None,
    storages: list[Storage] | None = None,
    dt: float | list[float] | None = None,
    periods: list[int] | Index | None = None,
    period_weights: list[float] | None = None,
) -> Self

Build ModelData from element objects.

Parameters:

Name Type Description Default
timesteps Timesteps

Time index for the optimization horizon.

required
carriers list[Carrier]

Carrier declarations.

required
effects list[Effect]

Effects to track.

required
ports list[Port]

System boundary ports.

required
converters list[Converter] | None

Linear converters.

None
storages list[Storage] | None

Energy storages.

None
dt float | list[float] | None

Timestep duration in hours. Auto-derived if None.

None
periods list[int] | Index | None

Integer period labels for multi-period optimization.

None
period_weights list[float] | None

Explicit weights per period. Inferred from gaps if None.

None
Source code in src/fluxopt/model_data.py
@classmethod
def build(
    cls,
    timesteps: Timesteps,
    carriers: list[Carrier],
    effects: list[Effect],
    ports: list[Port],
    converters: list[Converter] | None = None,
    storages: list[Storage] | None = None,
    dt: float | list[float] | None = None,
    periods: list[int] | pd.Index | None = None,
    period_weights: list[float] | None = None,
) -> Self:
    """Build ModelData from element objects.

    Args:
        timesteps: Time index for the optimization horizon.
        carriers: Carrier declarations.
        effects: Effects to track.
        ports: System boundary ports.
        converters: Linear converters.
        storages: Energy storages.
        dt: Timestep duration in hours. Auto-derived if None.
        periods: Integer period labels for multi-period optimization.
        period_weights: Explicit weights per period. Inferred from gaps if None.
    """
    from fluxopt.elements import PENALTY_EFFECT_ID, Effect
    from fluxopt.types import compute_dt as _compute_dt

    converters = converters or []
    stor_list = storages or []
    time = normalize_timesteps(timesteps)
    dt_da = _compute_dt(time, dt)

    if not any(e.id == PENALTY_EFFECT_ID for e in effects):
        effects = [*effects, Effect(PENALTY_EFFECT_ID)]

    flows, carrier_coeff = _collect_flows(ports, converters, stor_list)
    _validate_system(effects, ports, converters, stor_list, flows, carriers)

    dims = Dims.build(time, dt_da, periods=periods, period_weights=period_weights)

    # Scalar dt for prior duration computation (use first timestep)
    dt_scalar = float(dims.dt.values[0])
    period_idx = pd.Index(dims.period.values) if dims.period is not None else None
    flows_data = FlowsData.build(flows, time, effects, dt=dt_scalar, period=period_idx)
    carriers_data = CarriersData.build(carriers, flows, carrier_coeff)
    converters_data = ConvertersData.build(converters, time)
    effects_data = EffectsData.build(effects, time, period=period_idx)
    storages_data = StoragesData.build(stor_list, time, dims.dt, effects, period=period_idx)

    return cls(
        flows=flows_data,
        carriers=carriers_data,
        converters=converters_data,
        effects=effects_data,
        storages=storages_data,
        dims=dims,
    )