From 721f1f9ce8132710991aeee4501dd46d41b64da0 Mon Sep 17 00:00:00 2001 From: Andrew Lee Date: Tue, 22 Aug 2023 10:33:03 -0400 Subject: [PATCH] MS Contactor improvements (#1243) * Fixing issue with closures * Fixing pylint issues --- idaes/models/unit_models/mscontactor.py | 569 +++++++++++++----------- 1 file changed, 318 insertions(+), 251 deletions(-) diff --git a/idaes/models/unit_models/mscontactor.py b/idaes/models/unit_models/mscontactor.py index 611928b912..4251b0a019 100644 --- a/idaes/models/unit_models/mscontactor.py +++ b/idaes/models/unit_models/mscontactor.py @@ -13,6 +13,8 @@ """ IDAES model for a generic multiple-stream contactor unit. """ +from functools import partial + # Import Pyomo libraries from pyomo.environ import Block, Constraint, RangeSet, Reals, Set, units, Var from pyomo.common.config import ConfigDict, ConfigValue, Bool, In @@ -592,7 +594,6 @@ def _build_material_balance_constraints(self, flow_basis, uom): for stream, sconfig in self.config.streams.items(): state_block = getattr(self, stream) component_list = state_block.component_list - phase_list = state_block.phase_list pc_set = state_block.phase_component_set # Get reaction block if present @@ -637,25 +638,18 @@ def _build_material_balance_constraints(self, flow_basis, uom): rate_reaction_generation, ) - def rate_reaction_rule(b, t, s, p, j): - if (p, j) in pc_set: - return rate_reaction_generation[t, s, p, j] == ( - sum( - reaction_block[t, s].params.rate_reaction_stoichiometry[ - r, p, j - ] - * rate_reaction_extent[t, s, r] - for r in sconfig.reaction_package.rate_reaction_idx - ) - ) - return Constraint.Skip - rate_reaction_constraint = Constraint( self.flowsheet().time, self.elements, pc_set, doc=f"Rate-based reaction stoichiometry for stream {stream}", - rule=rate_reaction_rule, + rule=partial( + _rate_reaction_rule, + stream=stream, + rblock=reaction_block, + generation=rate_reaction_generation, + extent=rate_reaction_extent, + ), ) self.add_component( stream + "_rate_reaction_constraint", @@ -699,25 +693,18 @@ def rate_reaction_rule(b, t, s, p, j): equilibrium_reaction_generation, ) - def equilibrium_reaction_rule(b, t, s, p, j): - if (p, j) in pc_set: - return equilibrium_reaction_generation[t, s, p, j] == ( - sum( - reaction_block[ - t, s - ].params.equilibrium_reaction_stoichiometry[r, p, j] - * equilibrium_reaction_extent[t, s, r] - for r in sconfig.reaction_package.equilibrium_reaction_idx - ) - ) - return Constraint.Skip - equilibrium_reaction_constraint = Constraint( self.flowsheet().time, self.elements, pc_set, doc=f"Equilibrium reaction stoichiometry for stream {stream}", - rule=equilibrium_reaction_rule, + rule=partial( + _equilibrium_reaction_rule, + stream=stream, + rblock=reaction_block, + generation=equilibrium_reaction_generation, + extent=equilibrium_reaction_extent, + ), ) self.add_component( stream + "_equilibrium_reaction_constraint", @@ -755,25 +742,17 @@ def equilibrium_reaction_rule(b, t, s, p, j): inherent_reaction_generation, ) - def inherent_reaction_rule(b, t, s, p, j): - if (p, j) in pc_set: - return inherent_reaction_generation[t, s, p, j] == ( - sum( - state_block[ - t, s - ].params.inherent_reaction_stoichiometry[r, p, j] - * inherent_reaction_extent[t, s, r] - for r in sconfig.property_package.inherent_reaction_idx - ) - ) - return Constraint.Skip - inherent_reaction_constraint = Constraint( self.flowsheet().time, self.elements, pc_set, doc=f"Inherent reaction stoichiometry for stream {stream}", - rule=inherent_reaction_rule, + rule=partial( + _inherent_reaction_rule, + stream=stream, + generation=inherent_reaction_generation, + extent=inherent_reaction_extent, + ), ) self.add_component( stream + "_inherent_reaction_constraint", @@ -796,29 +775,16 @@ def inherent_reaction_rule(b, t, s, p, j): heterogeneous_reactions_generation, ) - def heterogeneous_reaction_rule(b, t, s, p, j): - if (p, j) in pc_set: - return heterogeneous_reactions_generation[t, s, p, j] == ( - sum( - self.heterogeneous_reactions[ - t, s - ].params.reaction_stoichiometry[r, p, j] - * b.heterogeneous_reaction_extent[t, s, r] - for r in self.config.heterogeneous_reactions.reaction_idx - if (r, p, j) - in self.heterogeneous_reactions[ - t, s - ].params.reaction_stoichiometry - ) - ) - return Constraint.Skip - heterogeneous_reaction_constraint = Constraint( self.flowsheet().time, self.elements, pc_set, doc="Heterogeneous reaction stoichiometry constraint", - rule=heterogeneous_reaction_rule, + rule=partial( + _heterogeneous_reaction_rule, + pc_set=pc_set, + generation=heterogeneous_reactions_generation, + ), ) self.add_component( stream + "_heterogeneous_reaction_constraint", @@ -826,103 +792,23 @@ def heterogeneous_reaction_rule(b, t, s, p, j): ) # Material balance for stream - def material_balance_rule(b, t, s, j): - in_state, out_state, side_state = _get_state_blocks(b, t, s, stream) - - if in_state is not None: - rhs = sum( - in_state.get_material_flow_terms(p, j) - for p in phase_list - if (p, j) in pc_set - ) - sum( - out_state.get_material_flow_terms(p, j) - for p in phase_list - if (p, j) in pc_set - ) - else: - rhs = -sum( - out_state.get_material_flow_terms(p, j) - for p in phase_list - if (p, j) in pc_set - ) - - # Add side stream energy flow if required - if side_state is not None: - rhs += sum( - side_state.get_material_flow_terms(p, j) - for p in phase_list - if (p, j) in pc_set - ) - - # As overall units come from the first StateBlock constructed, we - # cannot guarantee that any units are consistent, so convert all flow terms - if mb_units is not None: - rhs = units.convert(rhs, mb_units) - - # Add mass transfer terms - for k in b.stream_component_interactions: - if k[0] == stream and k[2] == j: - # Positive mass transfer - rhs += b.material_transfer_term[t, s, k] - elif k[1] == stream and k[2] == j: - # Negative mass transfer - rhs += -b.material_transfer_term[t, s, k] - - # Add rate reactions (if required) - if sconfig.has_rate_reactions: - rhs += sum(rate_reaction_generation[t, s, p, j] for p in phase_list) - - # Add equilibrium reactions (if required) - if sconfig.has_equilibrium_reactions: - rhs += sum( - equilibrium_reaction_generation[t, s, p, j] for p in phase_list - ) - - # Add inherent reactions (if required) - if state_block.include_inherent_reactions: - rhs += sum( - inherent_reaction_generation[t, s, p, j] for p in phase_list - ) - - # Add heterogeneous reactions (if required) - if self.config.heterogeneous_reactions is not None: - rhs += sum( - heterogeneous_reactions_generation[t, s, p, j] - for p in phase_list - ) - - return 0 == rhs - mbal = Constraint( self.flowsheet().time, self.elements, component_list, - rule=material_balance_rule, + rule=partial( + _material_balance_rule, + stream=stream, + mb_units=mb_units, + ), ) self.add_component(stream + "_material_balance", mbal) def _build_energy_balance_constraints(self, uom): # Energy Balances - # Assume that if energy balances are enabled that energy transfer - # occurs between all interacting phases. - # # For now, we will not distinguish different types of energy transfer. - # Convention is that a positive material flow term indicates flow into - # the first stream from the second stream. - self.energy_transfer_term = Var( - self.flowsheet().time, - self.elements, - self.stream_interactions, - initialize=0, - units=uom.POWER, - doc="Inter-stream energy transfer term", - ) - for stream, pconfig in self.config.streams.items(): if pconfig.has_energy_balance: - state_block = getattr(self, stream) - phase_list = state_block.phase_list - if pconfig.has_heat_transfer: heat = Var( self.flowsheet().time, @@ -933,80 +819,14 @@ def _build_energy_balance_constraints(self, uom): ) self.add_component(stream + "_heat", heat) - def energy_balance_rule(b, t, s): - in_state, out_state, side_state = _get_state_blocks(b, t, s, stream) - - if in_state is not None: - rhs = sum( - in_state.get_enthalpy_flow_terms(p) for p in phase_list - ) - sum( - out_state.get_enthalpy_flow_terms(p) for p in phase_list - ) - else: - rhs = -sum( - out_state.get_enthalpy_flow_terms(p) for p in phase_list - ) - - # Add side stream energy flow if required - if side_state is not None: - rhs += sum( - side_state.get_enthalpy_flow_terms(p) for p in phase_list - ) - - # As overall units come from the first StateBlock constructed, we - # cannot guarantee that any units are consistent, so convert all flow terms - rhs = units.convert(rhs, uom.POWER) - - # Add interstream transfer terms - for k in b.stream_interactions: - if k[0] == stream: - # Positive mass transfer - rhs += b.energy_transfer_term[t, s, k] - elif k[1] == stream: - # Negative mass transfer - rhs += -b.energy_transfer_term[t, s, k] - - # Add external heat term if required - if pconfig.has_heat_transfer: - rhs += heat[t, s] - - # Add heat of reaction terms if required - if pconfig.has_heat_of_reaction: - if not ( - hasattr(b, stream + "_rate_reaction_extent") - or hasattr(b, stream + "_equilibrium_reaction_extent") - ): - raise ConfigurationError( - f"Stream {stream} was set to include heats of reaction, " - "but no extent of reactions terms could be found. " - "Please ensure that you defined a reaction package for this " - "stream and that the material balances were set to include " - "reactions." - ) - reactions = getattr(b, stream + "_reactions") - - if hasattr(b, stream + "_rate_reaction_extent"): - rate_extent = getattr(b, stream + "_rate_reaction_extent") - rhs += -sum( - rate_extent[t, s, r] * reactions[t, s].dh_rxn[r] - for r in pconfig.reaction_package.rate_reaction_idx - ) - - if hasattr(b, stream + "_equilibrium_reaction_extent"): - equil_extent = getattr( - b, stream + "_equilibrium_reaction_extent" - ) - rhs += -sum( - equil_extent[t, s, e] * reactions[t, s].dh_rxn[e] - for e in pconfig.reaction_package.equilibrium_reaction_idx - ) - - return 0 == rhs - ebal = Constraint( self.flowsheet().time, self.elements, - rule=energy_balance_rule, + rule=partial( + _energy_balance_rule, + stream=stream, + uom=uom, + ), ) self.add_component(stream + "_energy_balance", ebal) @@ -1025,29 +845,14 @@ def _build_pressure_balance_constraints(self, uom): ) self.add_component(stream + "_deltaP", deltaP) - def pressure_balance_rule(b, t, s): - in_state, out_state, _ = _get_state_blocks(b, t, s, stream) - - if in_state is None: - # If there is no feed, then there is no need for a pressure balance - return Constraint.Skip - - rhs = in_state.pressure - out_state.pressure - - # As overall units come from the first StateBlock constructed, we - # cannot guarantee that any units are consistent, so convert all flow terms - rhs = units.convert(rhs, uom.PRESSURE) - - # Add deltaP term if required - if pconfig.has_pressure_change: - rhs += deltaP[t, s] - - return 0 == rhs - pbal = Constraint( self.flowsheet().time, self.elements, - rule=pressure_balance_rule, + rule=partial( + _pressure_balance_rule, + stream=stream, + uom=uom, + ), ) self.add_component(stream + "_pressure_balance", pbal) @@ -1106,29 +911,29 @@ def _get_performance_contents(self, time_point=0): raise NotImplementedError() -def _get_state_blocks(b, t, s, stream): +def _get_state_blocks(blk, t, s, stream): """ Utility method for collecting states representing flows into and out of a stage for a given stream. """ - state_block = getattr(b, stream) + state_block = getattr(blk, stream) - if b.config.streams[stream].flow_direction == FlowDirection.forward: - if s == b.elements.first(): - if not b.config.streams[stream].has_feed: + if blk.config.streams[stream].flow_direction == FlowDirection.forward: + if s == blk.elements.first(): + if not blk.config.streams[stream].has_feed: in_state = None else: - in_state = getattr(b, stream + "_inlet_state")[t] + in_state = getattr(blk, stream + "_inlet_state")[t] else: - in_state = state_block[t, b.elements.prev(s)] - elif b.config.streams[stream].flow_direction == FlowDirection.backward: - if s == b.elements.last(): - if not b.config.streams[stream].has_feed: + in_state = state_block[t, blk.elements.prev(s)] + elif blk.config.streams[stream].flow_direction == FlowDirection.backward: + if s == blk.elements.last(): + if not blk.config.streams[stream].has_feed: in_state = None else: - in_state = getattr(b, stream + "_inlet_state")[t] + in_state = getattr(blk, stream + "_inlet_state")[t] else: - in_state = state_block[t, b.elements.next(s)] + in_state = state_block[t, blk.elements.next(s)] else: raise BurntToast("If/else overrun when constructing balances") @@ -1136,10 +941,272 @@ def _get_state_blocks(b, t, s, stream): # Look for side state side_state = None - if hasattr(b, stream + "_side_stream_state"): + if hasattr(blk, stream + "_side_stream_state"): try: - side_state = getattr(b, stream + "_side_stream_state")[t, s] + side_state = getattr(blk, stream + "_side_stream_state")[t, s] except KeyError: pass return in_state, out_state, side_state + + +def _rate_reaction_rule(blk, t, s, p, j, stream, rblock, generation, extent): + sconfig = blk.config.streams[stream] + sblock = getattr(blk, stream) + pc_set = sblock.phase_component_set + + if (p, j) in pc_set: + return generation[t, s, p, j] == ( + sum( + rblock[t, s].params.rate_reaction_stoichiometry[r, p, j] + * extent[t, s, r] + for r in sconfig.reaction_package.rate_reaction_idx + ) + ) + return Constraint.Skip + + +def _equilibrium_reaction_rule(blk, t, s, p, j, stream, rblock, generation, extent): + sconfig = blk.config.streams[stream] + sblock = getattr(blk, stream) + pc_set = sblock.phase_component_set + + if (p, j) in pc_set: + return generation[t, s, p, j] == ( + sum( + rblock[t, s].params.equilibrium_reaction_stoichiometry[r, p, j] + * extent[t, s, r] + for r in sconfig.reaction_package.equilibrium_reaction_idx + ) + ) + return Constraint.Skip + + +def _inherent_reaction_rule(blk, t, s, p, j, stream, generation, extent): + sconfig = blk.config.streams[stream] + sblock = getattr(blk, stream) + pc_set = sblock.phase_component_set + + if (p, j) in pc_set: + return generation[t, s, p, j] == ( + sum( + sblock[t, s].params.inherent_reaction_stoichiometry[r, p, j] + * extent[t, s, r] + for r in sconfig.property_package.inherent_reaction_idx + ) + ) + return Constraint.Skip + + +def _heterogeneous_reaction_rule(blk, t, s, p, j, pc_set, generation): + if (p, j) in pc_set: + return generation[t, s, p, j] == ( + sum( + blk.heterogeneous_reactions[t, s].params.reaction_stoichiometry[r, p, j] + * blk.heterogeneous_reaction_extent[t, s, r] + for r in blk.config.heterogeneous_reactions.reaction_idx + if (r, p, j) + in blk.heterogeneous_reactions[t, s].params.reaction_stoichiometry + ) + ) + return Constraint.Skip + + +def _material_balance_rule(blk, t, s, j, stream, mb_units): + sconfig = blk.config.streams[stream] + state_block = getattr(blk, stream) + phase_list = state_block.phase_list + pc_set = state_block.phase_component_set + + in_state, out_state, side_state = _get_state_blocks(blk, t, s, stream) + + if in_state is not None: + rhs = sum( + in_state.get_material_flow_terms(p, j) + for p in phase_list + if (p, j) in pc_set + ) - sum( + out_state.get_material_flow_terms(p, j) + for p in phase_list + if (p, j) in pc_set + ) + else: + rhs = -sum( + out_state.get_material_flow_terms(p, j) + for p in phase_list + if (p, j) in pc_set + ) + + # Add side stream energy flow if required + if side_state is not None: + rhs += sum( + side_state.get_material_flow_terms(p, j) + for p in phase_list + if (p, j) in pc_set + ) + + # As overall units come from the first StateBlock constructed, we + # cannot guarantee that any units are consistent, so convert all flow terms + if mb_units is not None: + rhs = units.convert(rhs, mb_units) + + # Add mass transfer terms + for k in blk.stream_component_interactions: + if k[0] == stream and k[2] == j: + # Positive mass transfer + rhs += blk.material_transfer_term[t, s, k] + elif k[1] == stream and k[2] == j: + # Negative mass transfer + rhs += -blk.material_transfer_term[t, s, k] + + # Add rate reactions (if required) + if sconfig.has_rate_reactions: + rhs += sum( + getattr( + blk, + stream + "_rate_reaction_generation", + )[t, s, p, j] + for p in phase_list + ) + + # Add equilibrium reactions (if required) + if sconfig.has_equilibrium_reactions: + rhs += sum( + getattr( + blk, + stream + "_equilibrium_reaction_generation", + )[t, s, p, j] + for p in phase_list + ) + + # Add inherent reactions (if required) + if state_block.include_inherent_reactions: + rhs += sum( + getattr( + blk, + stream + "_inherent_reaction_generation", + )[t, s, p, j] + for p in phase_list + ) + + # Add heterogeneous reactions (if required) + if blk.config.heterogeneous_reactions is not None: + rhs += sum( + getattr( + blk, + stream + "_heterogeneous_reactions_generation", + )[t, s, p, j] + for p in phase_list + ) + + return 0 == rhs + + +def _get_energy_transfer_term(blk, uom): + try: + return blk.energy_transfer_term + except AttributeError: + # Assume that if energy balances are enabled that energy transfer + # occurs between all interacting phases. + # For now, we will not distinguish different types of energy transfer. + # Convention is that a positive material flow term indicates flow into + # the first stream from the second stream. + blk.energy_transfer_term = Var( + blk.flowsheet().time, + blk.elements, + blk.stream_interactions, + initialize=0, + units=uom.POWER, + doc="Inter-stream energy transfer term", + ) + + return blk.energy_transfer_term + + +def _energy_balance_rule(blk, t, s, stream, uom): + pconfig = blk.config.streams[stream] + state_block = getattr(blk, stream) + phase_list = state_block.phase_list + + in_state, out_state, side_state = _get_state_blocks(blk, t, s, stream) + + if in_state is not None: + rhs = sum(in_state.get_enthalpy_flow_terms(p) for p in phase_list) - sum( + out_state.get_enthalpy_flow_terms(p) for p in phase_list + ) + else: + rhs = -sum(out_state.get_enthalpy_flow_terms(p) for p in phase_list) + + # Add side stream energy flow if required + if side_state is not None: + rhs += sum(side_state.get_enthalpy_flow_terms(p) for p in phase_list) + + # As overall units come from the first StateBlock constructed, we + # cannot guarantee that any units are consistent, so convert all flow terms + rhs = units.convert(rhs, uom.POWER) + + # Add interstream transfer terms + for k in blk.stream_interactions: + ett = _get_energy_transfer_term(blk, uom) + if k[0] == stream: + # Positive mass transfer + rhs += ett[t, s, k] + elif k[1] == stream: + # Negative mass transfer + rhs += -ett[t, s, k] + + # Add external heat term if required + if pconfig.has_heat_transfer: + rhs += getattr(blk, stream + "_heat")[t, s] + + # Add heat of reaction terms if required + if pconfig.has_heat_of_reaction: + if not ( + hasattr(blk, stream + "_rate_reaction_extent") + or hasattr(blk, stream + "_equilibrium_reaction_extent") + ): + raise ConfigurationError( + f"Stream {stream} was set to include heats of reaction, " + "but no extent of reactions terms could be found. " + "Please ensure that you defined a reaction package for this " + "stream and that the material balances were set to include " + "reactions." + ) + reactions = getattr(blk, stream + "_reactions") + + if hasattr(blk, stream + "_rate_reaction_extent"): + rate_extent = getattr(blk, stream + "_rate_reaction_extent") + rhs += -sum( + rate_extent[t, s, r] * reactions[t, s].dh_rxn[r] + for r in pconfig.reaction_package.rate_reaction_idx + ) + + if hasattr(blk, stream + "_equilibrium_reaction_extent"): + equil_extent = getattr(blk, stream + "_equilibrium_reaction_extent") + rhs += -sum( + equil_extent[t, s, e] * reactions[t, s].dh_rxn[e] + for e in pconfig.reaction_package.equilibrium_reaction_idx + ) + + return 0 == rhs + + +def _pressure_balance_rule(blk, t, s, stream, uom): + pconfig = blk.config.streams[stream] + in_state, out_state, _ = _get_state_blocks(blk, t, s, stream) + + if in_state is None: + # If there is no feed, then there is no need for a pressure balance + return Constraint.Skip + + rhs = in_state.pressure - out_state.pressure + + # As overall units come from the first StateBlock constructed, we + # cannot guarantee that any units are consistent, so convert all flow terms + rhs = units.convert(rhs, uom.PRESSURE) + + # Add deltaP term if required + if pconfig.has_pressure_change: + rhs += getattr(blk, stream + "_deltaP")[t, s] + + return 0 == rhs