Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 78 additions & 9 deletions src/power_grid_model_ds/_core/model/graphs/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from collections import Counter
from collections.abc import Generator
from contextlib import contextmanager
from itertools import combinations
from typing import TYPE_CHECKING

from numpy._typing import NDArray
Expand All @@ -31,6 +32,12 @@ class BaseGraphModel(ABC):
def __init__(self, active_only=False) -> None:
self.active_only = active_only

# Three winding transformers are represented as a cycle in our graph ((1,2), (2,3) and (1,3))
# This makes certain graph algorithms invalid.
# With self._three_winding_nodes we keep track of the three winding transformers in the graph.
# This is used to correct the graph state before we perform these graph algorithms.
self._three_winding_nodes: set[tuple[int, int, int]] = set()

def __repr__(self) -> str:
return (
f"{self.__class__.__name__}(nodes={self.nr_nodes}, "
Expand Down Expand Up @@ -189,6 +196,7 @@ def add_branch3_array(self, branch3_array: Branch3Array) -> None:
"""Add all branch3s in the branch3 array to the graph."""
for branch3 in branch3_array:
self.add_branch_array(branch3.as_branches())
self._three_winding_nodes.add(self._get_branch3_nodes(branch3))

def delete_branch_array(self, branch_array: BranchArray, raise_on_fail: bool = True) -> None:
"""Delete all branches in branch_array from the graph."""
Expand All @@ -200,6 +208,7 @@ def delete_branch3_array(self, branch3_array: Branch3Array, raise_on_fail: bool
"""Delete all branch3s in the branch3 array from the graph."""
for branch3 in branch3_array:
self.delete_branch_array(branch3.as_branches(), raise_on_fail=raise_on_fail)
self._three_winding_nodes.discard(self._get_branch3_nodes(branch3))

@contextmanager
def tmp_remove_nodes(self, nodes: list[int]) -> Generator:
Expand Down Expand Up @@ -245,6 +254,27 @@ def tmp_remove_branches(self, branches: list[tuple[int, int]]) -> Generator:
for from_node, to_node in removed_branches:
self.add_branch(from_node, to_node)

@contextmanager
def _without_three_winding_cycles(self) -> Generator[bool, None, None]:
"""Context manager that temporarily removes cycles introduced by three winding transformers in the graph.

Three winding transformers are represented as a cycle. To make graph algorithms valid,
we temporarily remove one branch per three winding transformer to break the cycle.
The graph still has the same components after removing these branches.
We just force the path through the three winding transformers.

NOTE: we only remove branches for a three winding transformer if all three branches are active.

Yields True if branches were removed (and correction for three-winding transformers is needed).
"""
branches_to_remove = [
(group[1], group[2])
for group in self._three_winding_nodes
if all(self.has_branch(from_node, to_node) for from_node, to_node in combinations(group, 2))
]
with self.tmp_remove_branches(branches_to_remove):
yield bool(branches_to_remove)

def get_shortest_path(self, ext_start_node_id: int, ext_end_node_id: int) -> tuple[list[int], int]:
"""Calculate the shortest path between two nodes

Expand All @@ -268,7 +298,7 @@ def get_shortest_path(self, ext_start_node_id: int, ext_end_node_id: int) -> tup
internal_path, distance = self._get_shortest_path(
source=self.external_to_internal(ext_start_node_id), target=self.external_to_internal(ext_end_node_id)
)
return self._internals_to_externals(internal_path), distance
return self._to_external_path(internal_path), distance
except NoPathBetweenNodes as e:
raise NoPathBetweenNodes(f"No path between nodes {ext_start_node_id} and {ext_end_node_id}") from e

Expand All @@ -279,12 +309,16 @@ def get_all_paths(self, ext_start_node_id: int, ext_end_node_id: int) -> list[li
if ext_start_node_id == ext_end_node_id:
return []

internal_paths = self._get_all_paths(
source=self.external_to_internal(ext_start_node_id),
target=self.external_to_internal(ext_end_node_id),
)
with self._without_three_winding_cycles() as correct_for_three_winding:
internal_paths = self._get_all_paths(
source=self.external_to_internal(ext_start_node_id),
target=self.external_to_internal(ext_end_node_id),
)

return [self._internals_to_externals(path) for path in internal_paths]
return [
self._to_external_path(internal_path=path, correct_for_three_winding=correct_for_three_winding)
for path in internal_paths
]

def get_components(self) -> list[list[int]]:
"""Returns all separate components of the graph as lists
Expand Down Expand Up @@ -369,8 +403,13 @@ def find_fundamental_cycles(self) -> list[list[int]]:
Returns:
list[list[int]]: list of cycles, each cycle is a list of (external) node ids
"""
internal_cycles = self._find_fundamental_cycles()
return [self._internals_to_externals(nodes) for nodes in internal_cycles]
with self._without_three_winding_cycles() as correct_for_three_winding:
internal_cycles = self._find_fundamental_cycles()

return [
self._to_external_path(internal_path=cycle, correct_for_three_winding=correct_for_three_winding)
for cycle in internal_cycles
]

@classmethod
def from_arrays(cls, arrays: "Grid", active_only=False) -> "BaseGraphModel":
Expand All @@ -392,8 +431,32 @@ def from_grid(cls, grid: "Grid", active_only=False) -> "BaseGraphModel":
new_graph.add_branch3_array(grid.three_winding_transformer)
return new_graph

def _to_external_path(self, internal_path: list[int], correct_for_three_winding: bool = False) -> list[int]:
"""Convert a path of internal node ids to external node ids.

If correct_for_three_winding is True, also removes detours through three winding transformers.
This should be used together with _without_three_winding_cycles.

If a path contains all nodes of a three winding transformer after each other, we can remove the middle node.
Since if we hadn't removed the third branch of the three winding transformer, we would have gone directly.

NOTE: if a node has an inactive status, we can never have a situation where
we go through 3 nodes of the transformer directly after each other.
So this also works for inactive three winding transformers.
"""
path = self._internals_to_externals(internal_path)
if not correct_for_three_winding:
return path

replacements = {frozenset(group) for group in self._three_winding_nodes}
return [
node
for index, node in enumerate(path)
if (index in (0, len(path) - 1) or frozenset([path[index - 1], node, path[index + 1]]) not in replacements)
]

def _internals_to_externals(self, internal_nodes: list[int]) -> list[int]:
"""Convert a list of internal nodes to external nodes"""
"""Convert a list of internal node ids to external node ids"""
return [self.internal_to_external(node_id) for node_id in internal_nodes]

def _externals_to_internals(self, external_nodes: list[int] | NDArray) -> list[int]:
Expand All @@ -406,6 +469,12 @@ def _branch_is_relevant(self, branch: BranchArray) -> bool:
return branch.is_active.item()
return True

def _get_branch3_nodes(self, branch3_array: Branch3Array) -> tuple[int, int, int]:
"""Get the node ids of the branch3 array as a tuple"""
if len(branch3_array) != 1:
raise ValueError("branch3_array must be of length one element")
return (branch3_array.node_1.item(), branch3_array.node_2.item(), branch3_array.node_3.item())

@abstractmethod
def _in_branches(self, int_node_id: int) -> Generator[tuple[int, int], None, None]: ...

Expand Down
Loading