From 096965dd50bc850ca390de8e23b58415a5813028 Mon Sep 17 00:00:00 2001 From: Marco Biasion Date: Fri, 13 Feb 2026 17:26:10 +0100 Subject: [PATCH 1/5] implemented builder and started testing implementation changes --- sxpat/converting/legacy.py | 93 ++++-- sxpat/converting/porters.py | 16 +- .../distances/AbsoluteDifferenceOfInteger.py | 29 +- .../definitions/distances/HammingDistance.py | 48 ++- sxpat/graph/builder.py | 296 ++++++++++++++++++ sxpat/graph/graph.py | 7 +- 6 files changed, 448 insertions(+), 41 deletions(-) create mode 100644 sxpat/graph/builder.py diff --git a/sxpat/converting/legacy.py b/sxpat/converting/legacy.py index 22f059d9d..7c888025b 100644 --- a/sxpat/converting/legacy.py +++ b/sxpat/converting/legacy.py @@ -1,45 +1,90 @@ +from typing import Type, Union + +import functools +import networkx as nx + from sxpat.annotatedGraph import AnnotatedGraph from sxpat.graph import IOGraph, SGraph -from sxpat.graph.node import BoolVariable, BoolConstant, And, Not, Identity +from sxpat.graph.graph import GraphBuilder +from sxpat.graph.node import BoolVariable, BoolConstant, And, Extras, Node, Not, Identity + from sxpat.utils.functions import str_to_bool __all__ = ['iograph_from_legacy', 'sgraph_from_legacy'] -def _nodes_from_inner_legacy(inner_graph): - nodes = list() - for (name, value) in inner_graph.nodes(True): +# def _nodes_from_inner_legacy(inner_graph: nx.digraph.DiGraph) -> List[Union[BoolVariable, BoolConstant, And, Not, Identity]]: +# nodes = list() +# for (name, value) in inner_graph.nodes(True): +# # get features +# label = value.get('label') +# weight = value.get('weight', None) +# in_subgraph = bool(value.get('subgraph', False)) +# operands = inner_graph.predecessors(name) + +# # create node +# if label.startswith('in'): # input +# nodes.append(BoolVariable(name, weight, in_subgraph)) +# elif label.startswith('out'): # output +# nodes.append(Identity(name, operands, weight, in_subgraph)) +# elif label == 'and': # and +# nodes.append(And(name, operands, weight, in_subgraph)) +# elif label == 'not': # not +# nodes.append(Not(name, operands, weight, in_subgraph)) +# elif label in ('FALSE', 'TRUE'): # constant +# nodes.append(BoolConstant(name, str_to_bool(label), weight, in_subgraph)) +# else: +# raise RuntimeError(f'Unable to parse node {name} from AnnotatedGraph ({value})') + +# return nodes + + +def _builder_from_legacy(legacy_graph: AnnotatedGraph) -> GraphBuilder: + def get_type(label: str) -> Type[Union[Node, Extras]]: + if label.startswith('in'): return BoolVariable + elif label.startswith('out'): return Identity + elif label == 'and': return And + elif label == 'not': return Not + elif label in ('FALSE', 'TRUE'): return functools.partial(BoolConstant, value=str_to_bool(label)) + else: raise RuntimeError(f'Unable to parse node {name} from AnnotatedGraph ({value})') + + # prepare + builder = GraphBuilder() + inner_digraph: nx.digraph.DiGraph = legacy_graph.graph + + # add all nodes (and edges) + for (name, value) in inner_digraph.nodes(True): # get features label = value.get('label') weight = value.get('weight', None) in_subgraph = bool(value.get('subgraph', False)) - operands = inner_graph.predecessors(name) + operands = inner_digraph.predecessors(name) - # create node - if label.startswith('in'): # input - nodes.append(BoolVariable(name, weight, in_subgraph)) - elif label.startswith('out'): # output - nodes.append(Identity(name, operands, weight, in_subgraph)) - elif label in ('and', 'not'): # and/not - cls = {'not': Not, 'and': And}[label] - nodes.append(cls(name, operands, weight, in_subgraph)) - elif label in ('FALSE', 'TRUE'): # constant - nodes.append(BoolConstant(name, str_to_bool(label), weight, in_subgraph)) - else: - raise RuntimeError(f'Unable to parse node {name} from AnnotatedGraph ({value})') + # add node + builder.add_node(name, get_type(label), weight=weight, in_subgraph=in_subgraph) + if inner_digraph.in_degree(name) > 0: builder.add_operands(operands) - return nodes + operands = inner_digraph.predecessors(name) + + # mark inputs/outputs + builder.mark_inputs(legacy_graph.input_dict.values()) + builder.mark_outputs(legacy_graph.output_dict.values()) + + return builder def iograph_from_legacy(l_graph: AnnotatedGraph) -> IOGraph: - return IOGraph(_nodes_from_inner_legacy(l_graph.graph), - l_graph.input_dict.values(), - l_graph.output_dict.values()) + # return = IOGraph(_nodes_from_inner_legacy(l_graph.graph), + # l_graph.input_dict.values(), + # l_graph.output_dict.values()) + + return _builder_from_legacy(l_graph).build(IOGraph) def sgraph_from_legacy(l_graph: AnnotatedGraph) -> SGraph: - return SGraph(_nodes_from_inner_legacy(l_graph.subgraph), - l_graph.input_dict.values(), - l_graph.output_dict.values()) + # return SGraph(_nodes_from_inner_legacy(l_graph.subgraph), + # l_graph.input_dict.values(), + # l_graph.output_dict.values()) + return _builder_from_legacy(l_graph).build(SGraph) diff --git a/sxpat/converting/porters.py b/sxpat/converting/porters.py index 37bfebdc6..4b6ceb4d1 100644 --- a/sxpat/converting/porters.py +++ b/sxpat/converting/porters.py @@ -1,14 +1,15 @@ -from abc import abstractmethod +from __future__ import annotations +from abc import ABC, abstractmethod from typing import Type, Callable, Mapping, Optional, Union, Generic import dataclasses as dc from bidict import bidict - import itertools as it import re import json -from sxpat.graph import * +from sxpat.graph import Graph, IOGraph, SGraph, PGraph, CGraph, T_Graph +from sxpat.graph.graph import GraphBuilder from sxpat.graph.node import * from sxpat.utils.inheritance import get_all_subclasses, get_all_leaves_subclasses from sxpat.utils.functions import str_to_bool @@ -29,10 +30,11 @@ @make_utility_class -class GraphImporter(Generic[T_Graph]): +class GraphImporter(Generic[T_Graph], ABC): """Abstract class for importing a Graph from a string/file.""" @classmethod + @abstractmethod def from_string(cls, string: str) -> T_Graph: raise NotImplementedError(f'{cls.__name__}.from_string(...) is abstract.') @@ -44,7 +46,7 @@ def from_file(cls, filename: str) -> T_Graph: @make_utility_class -class GraphExporter(Generic[T_Graph]): +class GraphExporter(Generic[T_Graph], ABC): """Abstract class for exporting a Graph to a string/file.""" @classmethod @@ -384,8 +386,8 @@ class VerilogExporter(GraphExporter[IOGraph]): Not: lambda n: f'(~{n.operand})', And: lambda n: f'({" & ".join(n.operands)})', Or: lambda n: f'({" | ".join(n.operands)})', - Xor: lambda n : f'({" ^ ".join(n.operands)})', - Xnor: lambda n : f'(~({n.left} ^ {n.right}))', + Xor: lambda n: f'({" ^ ".join(n.operands)})', + Xnor: lambda n: f'(~({n.left} ^ {n.right}))', Implies: lambda n: f'(~{n.left} | {n.right})', # int-int operations Sum: lambda n: f'({" + ".join(n.operands)})', diff --git a/sxpat/definitions/distances/AbsoluteDifferenceOfInteger.py b/sxpat/definitions/distances/AbsoluteDifferenceOfInteger.py index b1b84ac7c..f55564833 100644 --- a/sxpat/definitions/distances/AbsoluteDifferenceOfInteger.py +++ b/sxpat/definitions/distances/AbsoluteDifferenceOfInteger.py @@ -1,6 +1,8 @@ from typing import Sequence, Tuple from typing_extensions import override +from sxpat.graph.graph import GraphBuilder + from .DistanceSpecification import DistanceSpecification from sxpat.graph import CGraph @@ -41,9 +43,30 @@ def _define(cls, _0, _1, return (dist_func, distance.name) + @classmethod + def _define_2(cls, _0, _1, + wanted_a: Sequence[str], wanted_b: Sequence[str], + ) -> Tuple[CGraph, str]: + # prepare builder + builder = GraphBuilder() + + # add placeholders + builder.add_placeholders(wanted_a).add_placeholders(wanted_b) + + # define outputs of a and of b as integers + builder.push_recording() + builder.add_node('dist_int_a_adoi', ToInt, operands=wanted_a) + builder.add_node('dist_int_b_adoi', ToInt, operands=wanted_b) + + # distance + distance_name = 'dist_distance' + builder.add_node(distance_name, AbsDiff, operands=builder.pop_recording()) + + return (builder.build(CGraph), distance_name) + @override @classmethod def _minimum_distance(cls, _0, - wanted_a: Sequence[str] - ) -> int: - return 1 \ No newline at end of file + wanted_a: Sequence[str] + ) -> int: + return 1 diff --git a/sxpat/definitions/distances/HammingDistance.py b/sxpat/definitions/distances/HammingDistance.py index c381a56e2..2b4968e06 100644 --- a/sxpat/definitions/distances/HammingDistance.py +++ b/sxpat/definitions/distances/HammingDistance.py @@ -4,6 +4,7 @@ from .DistanceSpecification import DistanceSpecification from sxpat.graph import CGraph, IOGraph +from sxpat.graph.graph import GraphBuilder from sxpat.graph.node import If, IntConstant, PlaceHolder, Sum, Xor from sxpat.utils.collections import formatted_int_range @@ -23,7 +24,6 @@ class HammingDistance(DistanceSpecification): def _define(cls, _0, _1, wanted_a: Sequence[str], wanted_b: Sequence[str], ) -> Tuple[CGraph, str]: - # guard if len(wanted_a) != len(wanted_b): raise ValueError('The sequences of wanted nodes have different lengths (or the graphs have different number of outputs).') @@ -66,9 +66,51 @@ def _define(cls, _0, _1, return (dist_func, distance.name) + @classmethod + def _define_2(cls, _0, _1, + wanted_a: Sequence[str], wanted_b: Sequence[str], + ) -> Tuple[CGraph, str]: + # guards + if len(wanted_a) != len(wanted_b): + raise ValueError('The sequences of wanted nodes have different lengths (or the graphs have different number of outputs).') + if len(_0.outputs_names) != len(_1.outputs_names): + raise ValueError('The sequences of wanted nodes have different lengths (or the graphs have different number of outputs).') + + # prepare builder + builder = GraphBuilder() + + # add placeholders + builder.add_placeholders(wanted_a).add_placeholders(wanted_b) + + # bit flips to int + builder.push_recording() + for (i, out_a, out_b) in zip( + formatted_int_range(len(wanted_a)), + wanted_a, + wanted_b, + ): + builder.push_recording() + + # create constants + builder \ + .add_node(f'dist_a{i}_const_0', IntConstant, value=0) \ + .add_node(f'dist_a{i}_const_1', IntConstant, value=1) + + # create node reflecting if a bit is flipped + builder.add_node(f'dist_is_different_{i}', Xor, operands=[out_a, out_b]) + + # create node that reflects 1 if the bit is flipped, or 0 + builder.add_node(f'dist_value_{i}', If, operands=builder.pop_recording()[::-1]) + + # distance + distance_name = 'dist_distance' + builder.add_node(distance_name, Sum, operands=builder.pop_recording()) + + return (builder.build(CGraph), distance_name) + @override @classmethod def minimum_distance(cls, _0, - wanted_a: Sequence[str] - ) -> int: + wanted_a: Sequence[str] + ) -> int: return 1 diff --git a/sxpat/graph/builder.py b/sxpat/graph/builder.py new file mode 100644 index 000000000..c3663779a --- /dev/null +++ b/sxpat/graph/builder.py @@ -0,0 +1,296 @@ +from __future__ import annotations +from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Type, Union, final, overload +from typing_extensions import Self + +import functools + +from .graph import T_Graph +from .node import AnyNode, Node, PlaceHolder + + +@final +class GraphBuilder: + # error classes + class AlreadyBuiltError(Exception): pass + class DuplicateNodeError(Exception): pass + class ReassigningOperandsError(Exception): pass + class InvalidNodeError(Exception): pass + class NoNamesBufferError(Exception): pass + + def __init__(self): + # utility state + self.__is_built: bool = False + self.__last_name: str = None + self.__names_record: List[List[str]] = list() + self.__nodes_with_operands: Set[str] = set() + # graph data + self.__partial_nodes: Dict[str, functools.partial] = dict() + self.__extras: Dict[str, Any] = dict() + + def push_recording(self) -> None: + """ + Push a new recording of names to the stack, which will record future node additions. + + The buffer can be retrieved using `.pop_recording()`. + """ + self.__names_record.append(list()) + + def pop_recording(self) -> List[str]: + """ + Pop the most recent buffer of names (created using `.push_recording()`) from the stack and return it. + If no buffer is present, raises a `NoNamesBufferError`. + + Future node additions will be recorded on the new topmost buffer, or discarded if none is present. + + :return: The list of names recorded from the most recent call to `.start_recording()`. + """ + # guard + if len(self.__names_record) == 0: raise self.NoNamesBufferError() + + return self.__names_record.pop() + + def build( + self, + graph_type: Type[T_Graph], + ) -> T_Graph: + """ + Build the graph using the given type. + + :param graph_type: The class to use to build the graph, the nodes and all accessories will be passed to it. + :return graph: The built graph. + """ + # prevent rebuilding + if self.__is_built: raise self.AlreadyBuiltError() + self.__is_built = True + + # finalize the nodes + nodes = (partial_node() for partial_node in self.__partial_nodes.values()) + # construct the graph + return graph_type(nodes, **self.__extras) + + def add_node( + self, + name: str, + constructor: Union[Type[AnyNode], Callable[..., Node]], + **kwargs, + ) -> Self: + """ + Add a node to the graph. + + :param name: The name of the node. + :param constructor: The type of the node or a function that will return the node. + :param **kwargs: Keyword arguments to pass to the node constructor. + :return self: The builder object. + """ + # guards + if self.__is_built: raise self.AlreadyBuiltError() + if name in self.__partial_nodes: raise self.DuplicateNodeError() + + # call inner + self._add_node(name, constructor, **kwargs) + + # return the builder for chaining + return self + + def _add_node( + self, + name: str, + constructor: Union[Type[AnyNode], Callable[..., Node]], + **kwargs, + ): + """ + :param name: The name of the node. + :param constructor: The type of the node or a function that will return the node. + :param **kwargs: Keyword arguments to pass to the node constructor. + """ + + # prepare partial node + self.__partial_nodes[name] = functools.partial(constructor, name, **kwargs) + # update utility state + self.__last_name = name + if 'operands' in kwargs: self.__nodes_with_operands.add(name) + # store the node name if recording + if len(self.__names_record) != 0: self.__names_record[-1].append(name) + + @property + def last_name(self) -> str: + """The name of the last added node.""" + return self.__last_name + + @property + def name(self) -> str: + """Alias of `.last_name`.""" + return self.__last_name + + @overload + def add_operands( + self, + operation: str, + operands: Iterable[str], + ) -> Self: + """ + Add the operands to the node. + + :param operation: The name of the node to add the operands to. + :param operands: The iterable of names of the nodes to be added as operands, the order is preserved. + :return self: The builder object. + """ + @overload + def add_operands( + self, + operands: Iterable[str], + ) -> Self: + """ + Add the operands to the last added node. + + :param operands: Iterable of names of the nodes to be added as operands, the order is preserved. + :return self: The builder object. + """ + + def add_operands( + self, + operation_or_operands: Union[str, Iterable[str]], + operands: Optional[Iterable[str]] = None, + ) -> Self: + # organize args + if isinstance(operation_or_operands, str): + operation = operation_or_operands + operands = tuple(operands) + else: + operation = self.__last_name + operands = tuple(operation_or_operands) + + # guards + if self.__is_built: raise self.AlreadyBuiltError() + if operation not in self.__partial_nodes: raise self.InvalidNodeError() + if any((n not in self.__partial_nodes) for n in operands): raise self.InvalidNodeError() + if operation in self.__nodes_with_operands: raise self.ReassigningOperandsError() + + # call inner + self._add_operands(operation, operands) + + # return the builder for chaining + return self + + def _add_operands( + self, + operation: str, + operands: Iterable[str] + ): + """ + :param operation: The name of the node to add the operands to. + :param operands: The iterable of names of the nodes to be added as operands, the order is preserved. + """ + # add operands + self.__partial_nodes[operation] = functools.partial(self.__partial_nodes[operation], operands=operands) + # update utility state + self.__nodes_with_operands.add(operation) + + def add_placeholders( + self, + nodes_name: Iterable[str], + ) -> Self: + """ + Add all the given names as placeholders. + + :param nodes_name: The names of the placeholders to be. + :return self: The builder object. + """ + # guards + if self.__is_built: raise self.AlreadyBuiltError() + if any(name in self.__partial_nodes for name in nodes_name): raise self.DuplicateNodeError() + + # call inner + for name in nodes_name: self._add_node(name, PlaceHolder) + + # return the builder for chaining + return self + + def mark_inputs( + self, + inputs_names: Iterable[str], + ) -> Self: + """ + Marks the wanted nodes as inputs. + + :param inputs_names: The names of the nodes to be marked as inputs. + :return self: The builder object. + """ + # freeze + inputs_names = tuple(inputs_names) + + # guards + if any((n not in self.__partial_nodes) for n in inputs_names): raise self.InvalidNodeError() + + # elaborate + self.__extras['inputs_names'] = tuple(inputs_names) + + # return the builder for chaining + return self + + def mark_outputs( + self, + outputs_names: Iterable[str], + ) -> Self: + """ + Marks the wanted nodes as outputs. + + :param outputs_names: The names of the nodes to be marked as outputs. + :return self: The builder object. + """ + # freeze + outputs_names = tuple(outputs_names) + + # guards + if any((n not in self.__partial_nodes) for n in outputs_names): raise self.InvalidNodeError() + + # elaborate + self.__extras['outputs_names'] = tuple(outputs_names) + + # return the builder for chaining + return self + + def mark_parameters( + self, + parameters_names: Iterable[str], + ) -> Self: + """ + Marks the wanted nodes as parameters. + + :param parameters_names: The names of the nodes to be marked as parameters. + :return self: The builder object. + """ + # freeze + parameters_names = tuple(parameters_names) + + # guards + if any((n not in self.__partial_nodes) for n in parameters_names): raise self.InvalidNodeError() + + # elaborate + self.__extras['parameters_names'] = tuple(parameters_names) + + # return the builder for chaining + return self + + def mark_subgraph( + self, + nodes_names: Iterable[str], + ) -> Self: + """ + Marks the wanted nodes as being in the subgraph. + + :param nodes_names: The names of the nodes to be marked as being in the subgraph. + :return self: The builder object. + """ + # freeze + nodes_names = tuple(nodes_names) + + # guards + if any((n not in self.__partial_nodes) for n in nodes_names): raise self.InvalidNodeError() + + # elaborate + for n in nodes_names: + self.__partial_nodes[n] = functools.partial(self.__partial_nodes[n], in_subgraph=True) + + # return the builder for chaining + return self diff --git a/sxpat/graph/graph.py b/sxpat/graph/graph.py index 3d929a128..c3f3ed147 100644 --- a/sxpat/graph/graph.py +++ b/sxpat/graph/graph.py @@ -9,13 +9,12 @@ import itertools as it from .node import ( - AnyVariable, BoolConstant, Expression, Extras, Node, Operation, Constant, GlobalTask, + BoolConstant, BoolVariable, Constraint, PlaceHolder, Target, # - BoolVariable, PlaceHolder, - Target, Constraint, + Expression, Extras, Node, Operation, Constant, GlobalTask, Variable, # AnyNode, AnyConstant, AnyOperation, AnyExpression, AnyGlobalObjective, - AnyNonEndPoint, AnyNonEntryPoint, Variable, + AnyNonEndPoint, AnyNonEntryPoint, AnyVariable, ) from .error import UndefinedNodeError From 88520ee08534f3267403b16ca269571938c21fd7 Mon Sep 17 00:00:00 2001 From: Marco Biasion Date: Wed, 18 Feb 2026 17:51:04 +0100 Subject: [PATCH 2/5] improved error messages and minor feature --- sxpat/graph/__init__.py | 1 + sxpat/graph/builder.py | 48 ++++++++++++++++++++++++++++++----------- 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/sxpat/graph/__init__.py b/sxpat/graph/__init__.py index 1a4f37d7e..300b0f0ab 100644 --- a/sxpat/graph/__init__.py +++ b/sxpat/graph/__init__.py @@ -8,3 +8,4 @@ ) from . import node from . import error +from . import builder \ No newline at end of file diff --git a/sxpat/graph/builder.py b/sxpat/graph/builder.py index c3663779a..d26fdbabd 100644 --- a/sxpat/graph/builder.py +++ b/sxpat/graph/builder.py @@ -1,6 +1,6 @@ from __future__ import annotations from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Type, Union, final, overload -from typing_extensions import Self +from typing_extensions import Self, Protocol import functools @@ -15,7 +15,7 @@ class AlreadyBuiltError(Exception): pass class DuplicateNodeError(Exception): pass class ReassigningOperandsError(Exception): pass class InvalidNodeError(Exception): pass - class NoNamesBufferError(Exception): pass + class NoRecordingError(Exception): pass def __init__(self): # utility state @@ -38,14 +38,14 @@ def push_recording(self) -> None: def pop_recording(self) -> List[str]: """ Pop the most recent buffer of names (created using `.push_recording()`) from the stack and return it. - If no buffer is present, raises a `NoNamesBufferError`. + If no buffer is present, raises a `NoRecordingError`. Future node additions will be recorded on the new topmost buffer, or discarded if none is present. :return: The list of names recorded from the most recent call to `.start_recording()`. """ # guard - if len(self.__names_record) == 0: raise self.NoNamesBufferError() + if len(self.__names_record) == 0: raise self.NoRecordingError() return self.__names_record.pop() @@ -84,7 +84,7 @@ def add_node( """ # guards if self.__is_built: raise self.AlreadyBuiltError() - if name in self.__partial_nodes: raise self.DuplicateNodeError() + if name in self.__partial_nodes: raise self.DuplicateNodeError(name) # call inner self._add_node(name, constructor, **kwargs) @@ -162,9 +162,10 @@ def add_operands( # guards if self.__is_built: raise self.AlreadyBuiltError() - if operation not in self.__partial_nodes: raise self.InvalidNodeError() - if any((n not in self.__partial_nodes) for n in operands): raise self.InvalidNodeError() - if operation in self.__nodes_with_operands: raise self.ReassigningOperandsError() + if operation not in self.__partial_nodes: raise self.InvalidNodeError(operation) + if any((n not in self.__partial_nodes) for n in operands): + raise self.InvalidNodeError(list(filter(lambda n: n not in self.__partial_nodes, operands))) + if operation in self.__nodes_with_operands: raise self.ReassigningOperandsError(operation) # call inner self._add_operands(operation, operands) @@ -198,7 +199,8 @@ def add_placeholders( """ # guards if self.__is_built: raise self.AlreadyBuiltError() - if any(name in self.__partial_nodes for name in nodes_name): raise self.DuplicateNodeError() + if any(name in self.__partial_nodes for name in nodes_name): + raise self.DuplicateNodeError(list(filter(lambda n: n in self.__partial_nodes, nodes_name))) # call inner for name in nodes_name: self._add_node(name, PlaceHolder) @@ -220,7 +222,8 @@ def mark_inputs( inputs_names = tuple(inputs_names) # guards - if any((n not in self.__partial_nodes) for n in inputs_names): raise self.InvalidNodeError() + if any((n not in self.__partial_nodes) for n in inputs_names): + raise self.InvalidNodeError(list(filter(lambda n: n not in self.__partial_nodes, inputs_names))) # elaborate self.__extras['inputs_names'] = tuple(inputs_names) @@ -242,7 +245,8 @@ def mark_outputs( outputs_names = tuple(outputs_names) # guards - if any((n not in self.__partial_nodes) for n in outputs_names): raise self.InvalidNodeError() + if any((n not in self.__partial_nodes) for n in outputs_names): + raise self.InvalidNodeError(list(filter(lambda n: n not in self.__partial_nodes, outputs_names))) # elaborate self.__extras['outputs_names'] = tuple(outputs_names) @@ -264,7 +268,8 @@ def mark_parameters( parameters_names = tuple(parameters_names) # guards - if any((n not in self.__partial_nodes) for n in parameters_names): raise self.InvalidNodeError() + if any((n not in self.__partial_nodes) for n in parameters_names): + raise self.InvalidNodeError(list(filter(lambda n: n not in self.__partial_nodes, parameters_names))) # elaborate self.__extras['parameters_names'] = tuple(parameters_names) @@ -286,7 +291,8 @@ def mark_subgraph( nodes_names = tuple(nodes_names) # guards - if any((n not in self.__partial_nodes) for n in nodes_names): raise self.InvalidNodeError() + if any((n not in self.__partial_nodes) for n in nodes_names): + raise self.InvalidNodeError(list(filter(lambda n: n not in self.__partial_nodes, nodes_names))) # elaborate for n in nodes_names: @@ -294,3 +300,19 @@ def mark_subgraph( # return the builder for chaining return self + + class PipableCallable(Protocol): + def __call__(self, builder: Self, *args: Any, **kwargs: Any) -> None: ... + + def update_with( + self, + func: PipableCallable, *args: Any, **kwargs: Any, + ) -> Self: + """ + Updates the builder by passing it to a function. + + :param func: Function to apply to the builder. `args`, and `kwargs` are passed into `func`. + :return self: The builder object. + """ + func(self, *args, **kwargs) + return self From 7639c9eef6e66595d09995871536086c471aab5b Mon Sep 17 00:00:00 2001 From: Marco Biasion Date: Wed, 18 Feb 2026 17:52:27 +0100 Subject: [PATCH 3/5] #186: refactored some distances with the builder, as examples --- .../distances/AbsoluteDifferenceOfInteger.py | 25 +------ .../distances/DistanceSpecification.py | 70 +++++++++++-------- .../definitions/distances/HammingDistance.py | 63 +++-------------- 3 files changed, 48 insertions(+), 110 deletions(-) diff --git a/sxpat/definitions/distances/AbsoluteDifferenceOfInteger.py b/sxpat/definitions/distances/AbsoluteDifferenceOfInteger.py index f55564833..036d8ca41 100644 --- a/sxpat/definitions/distances/AbsoluteDifferenceOfInteger.py +++ b/sxpat/definitions/distances/AbsoluteDifferenceOfInteger.py @@ -1,7 +1,7 @@ from typing import Sequence, Tuple from typing_extensions import override -from sxpat.graph.graph import GraphBuilder +from sxpat.graph.builder import GraphBuilder from .DistanceSpecification import DistanceSpecification @@ -22,29 +22,6 @@ class AbsoluteDifferenceOfInteger(DistanceSpecification): @override @classmethod def _define(cls, _0, _1, - wanted_a: Sequence[str], wanted_b: Sequence[str], - ) -> Tuple[CGraph, str]: - - # define outputs of a and of b as integers - int_a = ToInt('dist_int_a_adoi', operands=wanted_a) - int_b = ToInt('dist_int_b_adoi', operands=wanted_b) - - # distance - distance = AbsDiff('dist_distance', operands=[int_a, int_b]) - - # construct CGraph - dist_func = CGraph(( - *(PlaceHolder(name) for name in wanted_a), - int_a, - *(PlaceHolder(name) for name in wanted_b), - int_b, - distance, - )) - - return (dist_func, distance.name) - - @classmethod - def _define_2(cls, _0, _1, wanted_a: Sequence[str], wanted_b: Sequence[str], ) -> Tuple[CGraph, str]: # prepare builder diff --git a/sxpat/definitions/distances/DistanceSpecification.py b/sxpat/definitions/distances/DistanceSpecification.py index 6c051d859..bb0938147 100644 --- a/sxpat/definitions/distances/DistanceSpecification.py +++ b/sxpat/definitions/distances/DistanceSpecification.py @@ -16,9 +16,11 @@ class DistanceSpecification(metaclass=ABCMeta): @classmethod @overload - def define(cls, graph_a: IOGraph, graph_b: IOGraph, - wanted_a: Sequence[str], wanted_b: Optional[Sequence[str]] = None - ) -> Tuple[CGraph, str]: + def define( + cls, + graph_a: IOGraph, graph_b: IOGraph, + wanted_a: Sequence[str], wanted_b: Optional[Sequence[str]] = None + ) -> Tuple[CGraph, str]: """ Defines a distance between two circuits (given as graph), given a specific sequence (or one per circuit) of nodes to use. @@ -27,7 +29,10 @@ def define(cls, graph_a: IOGraph, graph_b: IOGraph, @classmethod @overload - def define(cls, graph_a: IOGraph, graph_b: IOGraph) -> Tuple[CGraph, str]: + def define( + cls, + graph_a: IOGraph, graph_b: IOGraph + ) -> Tuple[CGraph, str]: """ Defines a distance between the outputs of two circuits (given as graphs). @@ -36,9 +41,11 @@ def define(cls, graph_a: IOGraph, graph_b: IOGraph) -> Tuple[CGraph, str]: @classmethod @final - def define(cls, graph_a: IOGraph, graph_b: IOGraph, - wanted_a: Optional[Sequence[str]] = None, wanted_b: Optional[Sequence[str]] = None, - ) -> Tuple[CGraph, str]: + def define( + cls, + graph_a: IOGraph, graph_b: IOGraph, + wanted_a: Optional[Sequence[str]] = None, wanted_b: Optional[Sequence[str]] = None, + ) -> Tuple[CGraph, str]: # > generate distance function graph @@ -47,12 +54,7 @@ def define(cls, graph_a: IOGraph, graph_b: IOGraph, wanted_a = graph_a.outputs_names wanted_b = graph_b.outputs_names - # delegate computation - dist_func, root_name = cls._define( - graph_a, graph_b, - graph_a.outputs_names, graph_b.outputs_names, - ) - + # some wanted names given elif wanted_a is not None: # default if wanted_b is None: wanted_b = wanted_a @@ -63,13 +65,14 @@ def define(cls, graph_a: IOGraph, graph_b: IOGraph, if (missing := first(lambda n: n not in graph_b, wanted_b, None)) is not None: raise g_error.MissingNodeError(f'Node {missing} is not in graph_b ({graph_b}).') - # delegate computation - dist_func, root_name = cls._define( - graph_a, graph_b, - wanted_a, wanted_b, - ) + else: + raise ValueError('Illegal call with `wanted_b` without `wanted_a`.') - else: raise ValueError(f'Illegal call with `wanted_b` without `wanted_a`.') + # delegate computation + dist_func, root_name = cls._define( + graph_a, graph_b, + wanted_a, wanted_b, + ) # > assign rolling prefix prefix = get_rolling_code() + '_' @@ -80,22 +83,27 @@ def define(cls, graph_a: IOGraph, graph_b: IOGraph, @classmethod @abstractmethod - def _define(cls, graph_a: IOGraph, graph_b: IOGraph, - wanted_a: Sequence[str], wanted_b: Sequence[str] - ) -> Tuple[CGraph, str]: ... + def _define( + cls, + graph_a: IOGraph, graph_b: IOGraph, + wanted_a: Sequence[str], wanted_b: Sequence[str] + ) -> Tuple[CGraph, str]: ... @classmethod - def minimum_distance(cls, graph_a: SGraph, - wanted_a: Optional[Sequence[str]] = None - ) -> int: - + def minimum_distance( + cls, + graph_a: SGraph, + wanted_a: Optional[Sequence[str]] = None + ) -> int: if wanted_a is None: wanted_a = (n.name for n in graph_a.subgraph_outputs) - + return cls._minimum_distance(graph_a, wanted_a) - + @classmethod @abstractmethod - def _minimum_distance(cls, graph_a: SGraph, - wanted_a: Sequence[str] - ) -> int: ... \ No newline at end of file + def _minimum_distance( + cls, + graph_a: SGraph, + wanted_a: Sequence[str] + ) -> int: ... diff --git a/sxpat/definitions/distances/HammingDistance.py b/sxpat/definitions/distances/HammingDistance.py index 2b4968e06..026bd1195 100644 --- a/sxpat/definitions/distances/HammingDistance.py +++ b/sxpat/definitions/distances/HammingDistance.py @@ -3,9 +3,9 @@ from .DistanceSpecification import DistanceSpecification -from sxpat.graph import CGraph, IOGraph -from sxpat.graph.graph import GraphBuilder -from sxpat.graph.node import If, IntConstant, PlaceHolder, Sum, Xor +from sxpat.graph import CGraph +from sxpat.graph.builder import GraphBuilder +from sxpat.graph.node import If, IntConstant, Sum, Xor from sxpat.utils.collections import formatted_int_range @@ -24,52 +24,6 @@ class HammingDistance(DistanceSpecification): def _define(cls, _0, _1, wanted_a: Sequence[str], wanted_b: Sequence[str], ) -> Tuple[CGraph, str]: - # guard - if len(wanted_a) != len(wanted_b): - raise ValueError('The sequences of wanted nodes have different lengths (or the graphs have different number of outputs).') - if len(_0.outputs_names) != len(_1.outputs_names): - raise ValueError('The sequences of wanted nodes have different lengths (or the graphs have different number of outputs).') - - # bit flips to int - consts = [] - flipped_bits = [] - int_bits = [] - for (i, out_a, out_b) in zip( - formatted_int_range(len(wanted_a)), - wanted_a, - wanted_b, - ): - # create constants - consts.extend([ - const_0 := IntConstant(f'dist_a{i}_const_0', 0), - const_1 := IntConstant(f'dist_a{i}_const_1', 1), - ]) - - # create node reflecting if a bit is flipped - flipped_bits.append(bit := Xor(f'dist_is_different_{i}', operands=[out_a, out_b])) - - # create node that reflects 1 if the bit is flipped, or 0 - int_bits.append(If(f'dist_value_{i}', operands=[bit, const_1, const_0])) - - # distance - distance = Sum('dist_distance', operands=int_bits) - - # construct CGraph - dist_func = CGraph(( - *(PlaceHolder(name) for name in wanted_a), - *(PlaceHolder(name) for name in wanted_b), - *consts, - *flipped_bits, - *int_bits, - distance, - )) - - return (dist_func, distance.name) - - @classmethod - def _define_2(cls, _0, _1, - wanted_a: Sequence[str], wanted_b: Sequence[str], - ) -> Tuple[CGraph, str]: # guards if len(wanted_a) != len(wanted_b): raise ValueError('The sequences of wanted nodes have different lengths (or the graphs have different number of outputs).') @@ -91,16 +45,15 @@ def _define_2(cls, _0, _1, ): builder.push_recording() - # create constants - builder \ - .add_node(f'dist_a{i}_const_0', IntConstant, value=0) \ - .add_node(f'dist_a{i}_const_1', IntConstant, value=1) - # create node reflecting if a bit is flipped builder.add_node(f'dist_is_different_{i}', Xor, operands=[out_a, out_b]) + # create constants + builder \ + .add_node(f'dist_a{i}_const_1', IntConstant, value=1) \ + .add_node(f'dist_a{i}_const_0', IntConstant, value=0) # create node that reflects 1 if the bit is flipped, or 0 - builder.add_node(f'dist_value_{i}', If, operands=builder.pop_recording()[::-1]) + builder.add_node(f'dist_value_{i}', If, operands=builder.pop_recording()) # distance distance_name = 'dist_distance' From 531493db5e39d86c8d9eaa1f4e24ccfab8070cb8 Mon Sep 17 00:00:00 2001 From: Marco Biasion Date: Wed, 18 Feb 2026 17:52:55 +0100 Subject: [PATCH 4/5] #186: refactored legacy graph converters with builder --- sxpat/converting/legacy.py | 77 +++++++++++++------------------------- 1 file changed, 25 insertions(+), 52 deletions(-) diff --git a/sxpat/converting/legacy.py b/sxpat/converting/legacy.py index 7c888025b..f685a3e88 100644 --- a/sxpat/converting/legacy.py +++ b/sxpat/converting/legacy.py @@ -6,7 +6,7 @@ from sxpat.annotatedGraph import AnnotatedGraph from sxpat.graph import IOGraph, SGraph -from sxpat.graph.graph import GraphBuilder +from sxpat.graph.builder import GraphBuilder from sxpat.graph.node import BoolVariable, BoolConstant, And, Extras, Node, Not, Identity from sxpat.utils.functions import str_to_bool @@ -15,33 +15,7 @@ __all__ = ['iograph_from_legacy', 'sgraph_from_legacy'] -# def _nodes_from_inner_legacy(inner_graph: nx.digraph.DiGraph) -> List[Union[BoolVariable, BoolConstant, And, Not, Identity]]: -# nodes = list() -# for (name, value) in inner_graph.nodes(True): -# # get features -# label = value.get('label') -# weight = value.get('weight', None) -# in_subgraph = bool(value.get('subgraph', False)) -# operands = inner_graph.predecessors(name) - -# # create node -# if label.startswith('in'): # input -# nodes.append(BoolVariable(name, weight, in_subgraph)) -# elif label.startswith('out'): # output -# nodes.append(Identity(name, operands, weight, in_subgraph)) -# elif label == 'and': # and -# nodes.append(And(name, operands, weight, in_subgraph)) -# elif label == 'not': # not -# nodes.append(Not(name, operands, weight, in_subgraph)) -# elif label in ('FALSE', 'TRUE'): # constant -# nodes.append(BoolConstant(name, str_to_bool(label), weight, in_subgraph)) -# else: -# raise RuntimeError(f'Unable to parse node {name} from AnnotatedGraph ({value})') - -# return nodes - - -def _builder_from_legacy(legacy_graph: AnnotatedGraph) -> GraphBuilder: +def _add_nodes_from_legacy(builder: GraphBuilder, digraph: nx.digraph.DiGraph) -> GraphBuilder: def get_type(label: str) -> Type[Union[Node, Extras]]: if label.startswith('in'): return BoolVariable elif label.startswith('out'): return Identity @@ -50,41 +24,40 @@ def get_type(label: str) -> Type[Union[Node, Extras]]: elif label in ('FALSE', 'TRUE'): return functools.partial(BoolConstant, value=str_to_bool(label)) else: raise RuntimeError(f'Unable to parse node {name} from AnnotatedGraph ({value})') - # prepare - builder = GraphBuilder() - inner_digraph: nx.digraph.DiGraph = legacy_graph.graph - # add all nodes (and edges) - for (name, value) in inner_digraph.nodes(True): + for (name, value) in digraph.nodes(True): # get features label = value.get('label') weight = value.get('weight', None) in_subgraph = bool(value.get('subgraph', False)) - operands = inner_digraph.predecessors(name) + operands = digraph.predecessors(name) # add node builder.add_node(name, get_type(label), weight=weight, in_subgraph=in_subgraph) - if inner_digraph.in_degree(name) > 0: builder.add_operands(operands) - - operands = inner_digraph.predecessors(name) - - # mark inputs/outputs - builder.mark_inputs(legacy_graph.input_dict.values()) - builder.mark_outputs(legacy_graph.output_dict.values()) - - return builder + if digraph.in_degree(name) > 0: builder.add_operands(operands) def iograph_from_legacy(l_graph: AnnotatedGraph) -> IOGraph: - # return = IOGraph(_nodes_from_inner_legacy(l_graph.graph), - # l_graph.input_dict.values(), - # l_graph.output_dict.values()) - - return _builder_from_legacy(l_graph).build(IOGraph) + return ( + GraphBuilder() + # add nodes + .update_with(_add_nodes_from_legacy, l_graph.graph) + # mark inputs/outputs + .mark_inputs(l_graph.input_dict.values()) + .mark_outputs(l_graph.output_dict.values()) + # + .build(IOGraph) + ) def sgraph_from_legacy(l_graph: AnnotatedGraph) -> SGraph: - # return SGraph(_nodes_from_inner_legacy(l_graph.subgraph), - # l_graph.input_dict.values(), - # l_graph.output_dict.values()) - return _builder_from_legacy(l_graph).build(SGraph) + return ( + GraphBuilder() + # add nodes + .update_with(_add_nodes_from_legacy, l_graph.subgraph) + # mark inputs/outputs + .mark_inputs(l_graph.input_dict.values()) + .mark_outputs(l_graph.output_dict.values()) + # + .build(SGraph) + ) From fe2eb320c72954cddc1c6e87d7e4f5335134eb37 Mon Sep 17 00:00:00 2001 From: Marco Biasion Date: Wed, 18 Feb 2026 17:53:24 +0100 Subject: [PATCH 5/5] #186: minor cleanup --- sxpat/converting/porters.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sxpat/converting/porters.py b/sxpat/converting/porters.py index 4b6ceb4d1..f79b30127 100644 --- a/sxpat/converting/porters.py +++ b/sxpat/converting/porters.py @@ -9,7 +9,6 @@ import json from sxpat.graph import Graph, IOGraph, SGraph, PGraph, CGraph, T_Graph -from sxpat.graph.graph import GraphBuilder from sxpat.graph.node import * from sxpat.utils.inheritance import get_all_subclasses, get_all_leaves_subclasses from sxpat.utils.functions import str_to_bool