Source code for dhnx.simulation

# -*- coding: utf-8

This module is designed to hold implementations of simulation models. The
implementation uses oemof/tespy.

This file is part of project dhnx (). It's copyrighted
by the contributors recorded in the version control history of the file,
available from its original location:

SPDX-License-Identifier: MIT
import re
import warnings
from functools import partial

import networkx as nx
import numpy as np
import pandas as pd

from .model import SimulationModel
from .helpers import Dict, sum_ignore_none
from .graph import write_edge_data_to_graph
from .input_output import save_results

idx = pd.IndexSlice

[docs]class SimulationModelNumpy(SimulationModel): r""" Implementation of a simulation model using numpy. """ def __init__( self, thermal_network, rho=971.78, c=4190, mu=0.00035, eta_pump=1, tolerance=1e-10 ): super().__init__(thermal_network) self.results = {} self.nx_graph = thermal_network.to_nx_graph() assert nx.algorithms.tree.is_tree(self.nx_graph),\ "Currently, only tree networks can be modeled. " \ "Looped networks are not implemented yet." self.inc_mat = nx.incidence_matrix(self.nx_graph, oriented=True).todense() self.input_data = Dict() self.rho = rho # kg/m3 self.c = c # J/(kg*K) = mu # kg/(m*s) self.temp_env = thermal_network.sequences.environment.temp_env.iloc[:, 0] self.eta_pump = eta_pump self.tolerance = tolerance if self._concat_scalars('height') is not None: warnings.warn( "Pressure differences due to height differences are not implemented yet." )
[docs] def prepare(self): self.prepare_hydraulic_eqn() self.prepare_thermal_eqn()
[docs] def solve(self): self.solve_hydraulic_eqn() self.solve_thermal_eqn()
[docs] def get_results(self): return self.results
[docs] def prepare_hydraulic_eqn(self): r""" Prepares the input data for the hydraulic problem. """ self.input_data.mass_flow = pd.DataFrame( 0, columns=self.nx_graph.nodes(), index=self.thermal_network.timeindex ) input_data = self._concat_sequences('mass_flow') self.input_data.mass_flow.loc[:, input_data.columns] = input_data self.input_data.mass_flow = self._set_producers_mass_flow(self.input_data.mass_flow)
[docs] def prepare_thermal_eqn(self): r""" Prepares the input data for the thermal problem. """ self.input_data.temp_inlet = pd.DataFrame( 0, columns=self.nx_graph.nodes(), index=self.thermal_network.timeindex ) input_data = self._concat_sequences('temp_inlet') self.input_data.temp_inlet.loc[:, input_data.columns] = input_data
[docs] def solve_hydraulic_eqn(self): r""" Solves the hydraulic problem. """ self.results['pipes-mass_flow'] = self._calculate_pipes_mass_flow() reynolds = self._calculate_reynolds() lamb = self._calculate_lambda(reynolds) pipes_dist_pressure_losses = self._calculate_pipes_distributed_pressure_losses(lamb) pipes_loc_pressure_losses = self._calculate_pipes_localized_pressure_losses() pipes_total_pressure_losses = sum_ignore_none( pipes_dist_pressure_losses, pipes_loc_pressure_losses ) global_pressure_losses = self._calculate_global_pressure_losses(pipes_total_pressure_losses) pump_power = self._calculate_pump_power(global_pressure_losses) self.results['pipes-dist_pressure_losses'] = pipes_dist_pressure_losses self.results['pipes_loc_pressure_losses'] = pipes_loc_pressure_losses self.results['global-pressure_losses'] = global_pressure_losses self.results['producers-pump_power'] = pump_power
[docs] def solve_thermal_eqn(self): r""" Solves the thermal problem. """ exponent_constant = self._calculate_exponent_constant() temp_inlet = self._calc_temps(exponent_constant, self.input_data.temp_inlet, direction=1) temp_return_known = self._set_temp_return_input(temp_inlet) temp_return = self._calc_temps(exponent_constant, temp_return_known, direction=-1) pipes_heat_losses = self._calculate_pipes_heat_losses(temp_inlet) \ + self._calculate_pipes_heat_losses(temp_return) global_heat_losses = pipes_heat_losses.sum(axis=1) = 'global_heat_losses' self.results['nodes-temp_inlet'] = temp_inlet self.results['nodes-temp_return'] = temp_return self.results['pipes-heat_losses'] = pipes_heat_losses self.results['global-heat_losses'] = global_heat_losses
[docs] def _concat_scalars(self, name): r""" Concatenates scalars of all components with a given variable name Parameters ---------- name : str Name of the variable Returns ------- concat_sequences : pd.DataFrame DataFrame containing the sequences """ select_scalars = [ scalar[name].copy().rename(index=lambda x, prefix=component: prefix + '-' + str(x)) for component, scalar in self.thermal_network.components.items() if name in scalar ] if select_scalars: select_scalars = pd.concat(select_scalars, 0) else: select_scalars = None return select_scalars
[docs] def _concat_sequences(self, name): r""" Concatenates sequences of all components with a given variable name Parameters ---------- name : str Name of the variable Returns ------- concat_sequences : pd.DataFrame DataFrame containing the sequences """ select_sequences = [ d[name].copy().rename(columns=lambda x, prefix=component: prefix + '-' + x) for component, d in self.thermal_network.sequences.items() if name in d ] concat_sequences = pd.concat(select_sequences, 1) return concat_sequences
[docs] @staticmethod def _set_producers_mass_flow(m): r""" Sets the mass flow of the producer. Parameters ---------- m : pd.DataFrame DataFrame with all know consumer mass flows. Returns ------- m : pd.DataFrame DataFrame with all know mass flow of consumers and producer. """ producers = [name for name in m.columns if'producers', name)] assert len(producers) == 1, "Currently, only one producer allowed." m.loc[:, producers] = - m.loc[:, ~m.columns.isin(producers)].sum(1) return m
[docs] def _calculate_pipes_mass_flow(self): r""" Determines the mass flow in all pipes using numpy's least squares function. Returns ------- pipes_mass_flow : pd.DataFrame Mass flow in the pipes [kg/s] """ pipes_mass_flow = {} for t in self.thermal_network.timeindex: # The order of columns in self.input_data.mass_flow fit with those of self.inc_mat # because the columns have been generated from the graph's nodes in # prepare_hydraulic_eqn() x, residuals, _, _ = np.linalg.lstsq( self.inc_mat, self.input_data.mass_flow.loc[t, :], rcond=None ) assert residuals < self.tolerance,\ f"Residuals {residuals} are larger than tolerance {self.tolerance}!" pipes_mass_flow.update({t: x}) pipes_mass_flow = pd.DataFrame.from_dict( pipes_mass_flow, orient='index', columns=self.nx_graph.edges() ) pipes_mass_flow.columns.names = ('from_node', 'to_node') return pipes_mass_flow
[docs] def _calculate_reynolds(self): r""" Calculates the Reynolds number. .. math:: Re = \frac{4\dot{m}}{\pi\mu D} Returns ------- re : pd.DataFrame Reynolds number for every time step and pipe [-] """ abs_pipes_mass_flow = self.results['pipes-mass_flow'].copy() abs_pipes_mass_flow = abs(abs_pipes_mass_flow) diameter = \ self.thermal_network.components.pipes[['from_node', 'to_node', 'diameter']] diameter = 1e-3 * diameter.set_index(['from_node', 'to_node'])['diameter'] reynolds = 4 * abs_pipes_mass_flow.divide(diameter, axis='columns') \ / (np.pi * return reynolds
[docs] def _calculate_lambda(self, reynolds): r""" Calculates the darcy friction factor. .. math:: \lambda = 0.07 \cdot Re ^ {-0.13} \cdot D ^ {-0.14} Parameters ---------- re : pd.DataFrame Reynolds number for every time step and pipe [-] Returns ------- lamb : pd.DataFrame Darcy friction factor for every time step and pipe [-] """ factor_diameter = self.thermal_network.components.pipes[ ['from_node', 'to_node', 'diameter'] ] factor_diameter = ( 1e-3 * factor_diameter.set_index(['from_node', 'to_node'])['diameter'] ) ** -0.14 lamb = 0.07 * reynolds ** -0.13 lamb = lamb.multiply(factor_diameter, axis='columns') return lamb
[docs] def _calculate_pipes_distributed_pressure_losses(self, lamb): r""" Calculates the pressure losses in the pipes. Equal-sized inlet and return pipes are assumed which leads to equal mass flows and pressure losses for both. This introduces the initial factor of 2 in the equation. .. math:: \delta p = 2 \cdot \lambda \frac{8L}{\rho \pi^2 D^5}\dot{m}^2. Parameters ---------- lamb : pd.DataFrame Darcy friction factor for every time step and pipe [-] Returns ------- pipes_pressure_losses : pd.DataFrame DataFrame with distributed pressure losses for inlet and return for every time step and pipe [Pa] """ pipes_mass_flow = self.results['pipes-mass_flow'].copy() pipes_mass_flow_2 = pipes_mass_flow ** 2 constant = 8 * lamb / (self.rho * np.pi**2) length = self.thermal_network.components.pipes[['from_node', 'to_node', 'length']] diameter = \ self.thermal_network.components.pipes[['from_node', 'to_node', 'diameter']] length = length.set_index(['from_node', 'to_node'])['length'] diameter = diameter.set_index(['from_node', 'to_node'])['diameter'] diameter_5 = (1e-3 * diameter) ** 5 pipes_pressure_losses = constant * pipes_mass_flow_2\ .multiply(length, axis='columns')\ .divide(diameter_5, axis='columns') # We multiply by the factor of two to represent the pressure losses along inlet # and return flow. pipes_pressure_losses *= 2 return pipes_pressure_losses
[docs] def _calculate_pipes_localized_pressure_losses(self): r""" Calculates localized pressure losses at the nodes. .. math:: \Delta p_{loc} = \frac{8\zeta\dot{m}^2}{\rho \pi^2 D^4} Returns ------- nodes_pressure_losses : pd.DataFrame Localized pressure losses at the nodes [Pa] """ def _assign_zeta_to_pipes(flow_type): def func(row, level): res = zeta.reindex(row.index.get_level_values(level).values) res.index = row.index return res zeta = self._concat_scalars('zeta_' + flow_type) if zeta is None: print(f"No values for zeta_{flow_type} found. Skipping.") return None flow_direction = np.sign(self.results['pipes-mass_flow']) zeta_pipes = pd.DataFrame( columns=flow_direction.columns, index=flow_direction.index ) zeta_pipes_forward = zeta_pipes.copy() zeta_pipes_backward = zeta_pipes.copy() func_forward = partial(func, level=0) zeta_pipes_forward = zeta_pipes_forward.apply(func_forward, axis=1) func_backward = partial(func, level=1) zeta_pipes_backward = zeta_pipes_backward.apply(func_backward, axis=1) if flow_type == 'inlet': zeta_pipes[flow_direction > 0] = zeta_pipes_forward zeta_pipes[flow_direction < 0] = zeta_pipes_backward elif flow_type == 'return': zeta_pipes[flow_direction > 0] = zeta_pipes_backward zeta_pipes[flow_direction < 0] = zeta_pipes_forward else: raise ValueError("Flow type has to be either inlet or return.") return zeta_pipes def _calc_loc_pressure_loss_for_flow_type(flow_type): zeta_pipes = _assign_zeta_to_pipes(flow_type) if zeta_pipes is None: return None pipes_localized_pressure_losses = {} for t in self.thermal_network.timeindex: pipes_mass_flow = self.results['pipes-mass_flow'].loc[t, :] mass_flow_2 = pipes_mass_flow ** 2 mass_flow_2_over_diameter_4 = mass_flow_2.divide(diameter_4) = 'mass_flow_2_over_diameter_4' x = constant * zeta_pipes.loc[t, :].multiply(mass_flow_2_over_diameter_4) pipes_localized_pressure_losses.update({t: x}) pipes_localized_pressure_losses = pd.DataFrame.from_dict( pipes_localized_pressure_losses, orient='index' ) pipes_localized_pressure_losses = pipes_localized_pressure_losses.fillna(0) return pipes_localized_pressure_losses constant = 8 / (self.rho * np.pi ** 2) diameter_4 = self.thermal_network.components.pipes[ ['from_node', 'to_node', 'diameter'] ] diameter_4 = diameter_4.set_index(['from_node', 'to_node'])['diameter'] diameter_4 = (1e-3 * diameter_4) ** 4 pipes_localized_pressure_losses_inlet = _calc_loc_pressure_loss_for_flow_type('inlet') pipes_localized_pressure_losses_return = _calc_loc_pressure_loss_for_flow_type('return') pipes_localized_pressure_losses = sum_ignore_none( pipes_localized_pressure_losses_inlet, pipes_localized_pressure_losses_return ) return pipes_localized_pressure_losses
[docs] def _calculate_global_pressure_losses(self, pipes_pressure_losses): r""" Calculates global pressure losses. Finds the path with the maximal pressure loss among from the set of paths from the producer to all consumers. Parameters ---------- pipes_pressure_losses : pd.DataFrame Total pressure losses for every time step and pipe [Pa] Returns ------- global_pressure_losses : pd.DataFrame Global pressure losses [Pa] """ def calculate_path_weights(source_nodes, sink_nodes, graph, weight): path_weights = {} for sink in sink_nodes: for source in source_nodes: path_weights[sink] = nx.dijkstra_path_length( graph, source=source, target=sink, weight=weight ) return path_weights def _change_graph_dir(graph, direction): r""" Takes a nx.DiGraph and returns a new graph with adapted edge directions according to the direction specified. Parameters ---------- graph : nx.DiGraph Directed graph direction : pd.Series Series with the edges as index and 1 or -1 as values, indicating the direction Returns ------- dir_graph : nx.DiGraph Graph with adapted directions """ def _get_tuple(e): return tuple(e[:2]) def _swap_tuple(e): return tuple((*e[:2][::-1], e[2])) dir_edges = [ e if direction.loc[_get_tuple(e)] > 0 else _swap_tuple(e) for e in graph.edges(data=True) ] dir_graph = nx.DiGraph() dir_graph.add_edges_from(dir_edges) return dir_graph def _calculate_paths_pressure_losses(): sink_nodes = [node for node, data in self.nx_graph.nodes(data=True) if data['component_type'] == 'Consumer'] source_nodes = [node for node, data in self.nx_graph.nodes(data=True) if data['component_type'] == 'Producer'] paths_pressure_losses = {} flow_direction = np.sign(self.results['pipes-mass_flow']) for t in self.thermal_network.timeindex: graph = write_edge_data_to_graph( pipes_pressure_losses.loc[t, :], self.nx_graph, var_name='pipes_pressure_losses' ) flow_dir_graph = _change_graph_dir(graph, flow_direction.loc[t, :]) path_weights = calculate_path_weights( source_nodes, sink_nodes, flow_dir_graph, 'pipes_pressure_losses' ) paths_pressure_losses[t] = path_weights paths_pressure_losses = \ pd.DataFrame.from_dict(paths_pressure_losses, orient='index') return paths_pressure_losses paths_pressure_losses = _calculate_paths_pressure_losses() # Here, we take the path with the maximum pressure losses and assume that the other # consumer's valves are adjusted so that in sum, the pressure losses along all paths are # equal. global_pressure_losses = paths_pressure_losses.max(axis=1) return global_pressure_losses
[docs] def _calculate_pump_power(self, global_pressure_losses): r""" Calculates the pump power. .. math:: P_{el. pump} = \frac{1}{\eta_{el}\eta_{hyd}}\frac{\Delta p }{\rho} \dot{m} Parameters ---------- global_pressure_losses : pd.DataFrame Global pressure losses [Pa] Returns ------- pump_power : pd.Series Pump power [W] """ producers = [ node for node, data in self.nx_graph.nodes(data=True) if data['component_type'] == 'Producer' ] mass_flow_producers = \ self.results['pipes-mass_flow'].loc[:, idx[producers, :]].sum(axis=1) pump_power = mass_flow_producers * global_pressure_losses / (self.eta_pump * self.rho) return pump_power
[docs] def _calculate_exponent_constant(self): r""" Calculates the constant part of the exponent that determines the cooling of the medium in the pipes. .. math:: exp_{const} = - \frac{U \pi D L }{c} Returns ------- exponent_constant : np.matrix Constant part of the exponent [kg/s] """ heat_transfer_coefficient = nx.adjacency_matrix( self.nx_graph, weight='heat_transfer_coefficient').todense() diameter = 1e-3 * nx.adjacency_matrix(self.nx_graph, weight='diameter').todense() length = nx.adjacency_matrix(self.nx_graph, weight='length').todense() exponent_constant = - np.pi \ * np.multiply(heat_transfer_coefficient, np.multiply(diameter, length)) \ / self.c return exponent_constant
[docs] def _calc_temps(self, exponent_constant, known_temp, direction): r""" Calculate temperatures .. math:: T_{out} = T_{env} + (T_{in} - T_{env}) \cdot exp\{exp_{const} \cdot exp_{var}\} = T_{out} = T_{env} + (T_{in} - T_{env}) \cdot exp\{-\frac{U \pi D L}{c \cdot \dot{m}}\} Parameters ---------- exponent_constant : np.array Constant part of the exponent [kg/s] known_temp : pd.DataFrame Known temperatures at producers or consumers [°C] direction : +1 or -1 For inlet and return flow [-] Returns ------- temp_df : pd.DataFrame DataFrame containing temperatures for all nodes [°C] """ # TODO: Rethink function layout and naming temps = {} for t in self.thermal_network.timeindex: # Divide exponent matrix by current pipes-mass_flows. data = self.results['pipes-mass_flow'].loc[t, :].copy() data = 1 / data graph_with_data = write_edge_data_to_graph( data, self.nx_graph, var_name='pipes-mass_flow' ) inverse_mass_flows = nx.adjacency_matrix( graph_with_data, weight='pipes-mass_flow' ).todense() exponent = np.multiply(exponent_constant, inverse_mass_flows) matrix = np.exp(exponent) # Clear out elements where matrix was zero before exponentiation. This could be # replaced by properly passing `where` to np.multiply in the line above. matrix = np.multiply(matrix, nx.adjacency_matrix(self.nx_graph).todense()) # Adapt matrix if direction == 1: matrix = matrix.T normalisation = np.array(nx.adjacency_matrix(self.nx_graph).sum(0)).flatten() normalisation = \ np.divide(np.array([1]), normalisation, where=normalisation != 0) elif direction == -1: normalisation = np.array(nx.adjacency_matrix(self.nx_graph).sum(1)).flatten() normalisation = \ np.divide(np.array([1]), normalisation, where=normalisation != 0) else: raise ValueError("Direction has to be either 1 or -1.") normalisation = np.diag(normalisation) matrix =, matrix) matrix = np.identity(len(matrix)) - matrix vector = np.array(known_temp.loc[t]) vector[vector != 0] -= self.temp_env.loc[t] x, _, _, _ = np.linalg.lstsq( matrix, vector, rcond=None ) temps.update({t: x + self.temp_env.loc[t]}) temp_df = pd.DataFrame.from_dict( temps, orient='index', columns=self.nx_graph.nodes() ) return temp_df
[docs] def _set_temp_return_input(self, temp_inlet): r""" Sets the temperature of the return pipes at the consumers. .. math:: T_{cons,r} = T_{cons,i} - T_{cons,drop} Parameters ---------- temp_inlet : pd.DataFrame Known inlet temperature [°C] Returns ------- temp_return : pd.DataFrame Return temperature with the consumers values set [°C] """ temp_return = pd.DataFrame( 0, columns=self.nx_graph.nodes(), index=self.thermal_network.timeindex ) temp_drop = self._concat_sequences('delta_temp_drop') temp_return.loc[:, temp_drop.columns] = temp_inlet.loc[:, temp_drop.columns] - temp_drop return temp_return
[docs] def _calculate_pipes_heat_losses(self, temp_node): r""" Calculates the pipes' heat losses given the temperatures. .. math:: \dot{Q}_{losses} = c \cdot \dot{m} \cdot \Delta T Parameters ---------- temp_node : pd.DataFrame Temperatures at the nodes [°C] Returns ------- pipes_heat_losses : pd.DataFrame Heat losses in the pipes [W] """ pipes_heat_losses = {} for i, row in temp_node.iterrows(): mass_flow = self.results['pipes-mass_flow'].loc[i, :].copy() temp_difference = np.abs(np.array(, self.inc_mat)).flatten()) pipes_heat_losses[i] = self.c * mass_flow.multiply(temp_difference, axis=0) pipes_heat_losses = pd.DataFrame.from_dict(pipes_heat_losses, orient='index') return pipes_heat_losses
[docs]def simulate(thermal_network, results_dir=None): r""" Takes a thermal network and returns the result of the simulation. Parameters ---------- thermal_network Returns ------- results : dict """ model = SimulationModelNumpy(thermal_network) model.prepare() model.solve() results = model.get_results() if results_dir is not None: save_results(results, results_dir) return results