Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions include/qpid/dispatch/protocol_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,27 +363,29 @@ typedef void (*qdr_connection_bind_context_t) (qdr_connection_t *context, void *
* @param incoming True iff this connection is associated with a listener, False if a connector
* @param role The configured role of this connection
* @param cost If the role is inter_router, this is the configured cost for the connection.
* @param remote_cost If the role is inter_router, this is the cost sent by the peer router in the open frame
* @param management_id - A unique identifier that is used in management and logging operations.
* @param label Optional label provided in the connection's configuration.
* @param strip_annotations_in True if configured to remove annotations on inbound messages.
* @param strip_annotations_out True if configured to remove annotations on outbound messages.
* @param link_capacity The capacity, in deliveries, for links in this connection.
* @return Pointer to a connection object that can be used to refer to this connection over its lifetime.
*/
qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
qdr_protocol_adaptor_t *protocol_adaptor,
bool incoming,
qdr_connection_role_t role,
int cost,
uint64_t management_id,
const char *label,
bool strip_annotations_in,
bool strip_annotations_out,
int link_capacity,
const qd_policy_spec_t *policy_spec,
qdr_connection_info_t *connection_info,
qdr_connection_bind_context_t context_binder,
void *bind_token);
qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
qdr_protocol_adaptor_t *protocol_adaptor,
bool incoming,
qdr_connection_role_t role,
int cost,
int remote_cost,
uint64_t management_id,
const char *label,
bool strip_annotations_in,
bool strip_annotations_out,
int link_capacity,
const qd_policy_spec_t *policy_spec,
qdr_connection_info_t *connection_info,
qdr_connection_bind_context_t context_binder,
void *bind_token);

/**
* qdr_connection_notify_closed
Expand Down
24 changes: 19 additions & 5 deletions include/qpid/dispatch/router_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,14 @@ void qdr_core_topology_changed(qdr_core_t *core, int router_maskbit);
typedef void (*qdr_set_mobile_seq_t) (void *context, int router_maskbit, uint64_t mobile_seq);
typedef void (*qdr_set_my_mobile_seq_t) (void *context, uint64_t mobile_seq);
typedef void (*qdr_link_lost_t) (void *context, int link_maskbit);
typedef void (*qdr_peer_cost_update_t)(void *context, const char *container_id, int new_cost);

void qdr_core_route_table_handlers(qdr_core_t *core,
void *context,
qdr_set_mobile_seq_t set_mobile_seq,
qdr_set_my_mobile_seq_t set_my_mobile_seq,
qdr_link_lost_t link_lost);
void qdr_core_route_table_handlers(qdr_core_t *core,
void *context,
qdr_set_mobile_seq_t set_mobile_seq,
qdr_set_my_mobile_seq_t set_my_mobile_seq,
qdr_link_lost_t link_lost,
qdr_peer_cost_update_t peer_cost_update);

/**
******************************************************************************
Expand Down Expand Up @@ -162,6 +164,18 @@ qdr_subscription_t *qdr_core_subscribe(qdr_core_t *core,

void qdr_core_unsubscribe(qdr_subscription_t *sub);

/**
* qdr_core_update_connection_cost
*
* Update the cost of an inter-router connection identified by conn_id or maskbit
*
* @param core Pointer to the core module
* @param conn_id The connection identity or maskbit
* @param new_cost The new cost value (must be >= 1)
* @param use_maskbit True if conn_id holds the maskbit of the connection, False if it holds the connection identity
*/
void qdr_core_update_connection_cost(qdr_core_t *core, uint64_t conn_id, int new_cost, bool use_maskbit);

/**
* qdr_send_to
*
Expand Down
3 changes: 2 additions & 1 deletion python/skupper_router/management/skrouter.json
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,7 @@
"connector": {
"description": "Establishes an outgoing connection from the router.",
"extends": "configurationEntity",
"operations": ["CREATE", "DELETE"],
"operations": ["CREATE", "UPDATE", "DELETE"],
"attributes": {
"host": {
"description":"IP address: ipv4 or ipv6 literal or a host name",
Expand Down Expand Up @@ -1032,6 +1032,7 @@
"default": 1,
"required": false,
"create": true,
"update": true,
"description": "For the 'inter-router' role only. This value assigns a cost metric to the inter-router connection. The default (and minimum) value is one. Higher values represent higher costs. The cost is used to influence the routing algorithm as it attempts to use the path with the lowest total cost from ingress to egress."
},
"sslProfile": {
Expand Down
1 change: 1 addition & 0 deletions python/skupper_router_internal/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def __init__(self) -> None:
self._prototype(self.qd_dispatch_delete_tcp_connector, None, [self.qd_dispatch_p, c_void_p])
self._prototype(self.qd_dispatch_configure_connector, c_void_p, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_connection_manager_delete_connector, None, [self.qd_dispatch_p, c_void_p])
self._prototype(self.qd_dispatch_update_connector, None, [self.qd_dispatch_p, c_void_p, c_long])

#sslProfile and display name service
self._prototype(self.qd_tls_configure_ssl_profile, c_void_p, [self.qd_dispatch_p, py_object])
Expand Down
6 changes: 6 additions & 0 deletions python/skupper_router_internal/management/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,12 @@ def _identifier(self):
def __str__(self):
return super(ConnectorEntity, self).__str__().replace("Entity(", "ConnectorEntity(")

def _update(self):
new_cost = self.attributes.get('cost')
if new_cost is None or new_cost < 1:
raise ValidationError("Connector configuration update failed: cost value must be >= 1")
self._qd.qd_dispatch_update_connector(self._dispatch, self._implementations[0].key, new_cost)


class AddressEntity(EntityAdapter):
def create(self):
Expand Down
6 changes: 6 additions & 0 deletions python/skupper_router_internal/router/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ def del_all_peers(self):
self.peers = {}
self.ls_seq = 0

def update_peer_cost(self, _id, _cost):
if _id in self.peers and self.peers[_id] != _cost:
self.peers[_id] = _cost
return True
return False

def has_peers(self):
return len(self.peers) > 0

Expand Down
11 changes: 6 additions & 5 deletions python/skupper_router_internal/router/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ def linkLost(self, link_id):
"""
self.node_tracker.link_lost(link_id)

def peerCostUpdate(self, node_id, new_cost):
"""
Invoked from C core when the cost to a neighbor router changes dynamically.
"""
self.node_tracker.peer_cost_update(node_id, new_cost)

def handleTimerTick(self):
"""
"""
Expand Down Expand Up @@ -230,8 +236,3 @@ def send(self, dest, msg):
"""
app_props = {'opcode' : msg.get_opcode()}
self.io_adapter[0].send(Message(address=dest, properties=app_props, body=msg.to_dict()), True, True)

def node_updated(self, addr, reachable, neighbor):
"""
"""
self.router_adapter(addr, reachable, neighbor)
22 changes: 22 additions & 0 deletions python/skupper_router_internal/router/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,16 @@ def link_lost(self, link_id):
if self.link_state.del_peer(node_id):
self.link_state_changed = True

def peer_cost_update(self, node_id, new_cost):
"""
Invoked when neighbor cost got changed dynamically
"""
if self.link_state.is_peer(node_id):
old_cost = self.link_state.peers[node_id]
if self.link_state.update_peer_cost(node_id, new_cost):
self.link_state_changed = True
self.container.log_ls(LOG_DEBUG, "Updated cost to neighbor %s in local link state: %d -> %d" % (node_id, old_cost, new_cost))

def set_mobile_seq(self, router_maskbit, mobile_seq):
"""
"""
Expand Down Expand Up @@ -364,6 +374,18 @@ def link_state_received(self, node_id, version, link_state, instance, now):
node.link_state.last_seen = now
self.recompute_topology = True

##
# If this is a neighbor router and the cost has changed in the received
# link state, call the C core to update the connection cost locally. The
# C core then calls back to python to update the local link state.
##
if node.is_neighbor() and self.my_id in link_state.peers:
new_cost = link_state.peers[self.my_id]
old_cost = self.link_state.peers[node_id]
if old_cost != new_cost:
self.container.log(LOG_DEBUG, "Neighbor LSU received from node %s - cost change %d -> %d" % (node_id, old_cost, new_cost))
self.container.router_adapter.update_connection_cost(node.peer_link_id, new_cost)

##
# Look through the new link state for references to nodes that we don't
# know about. Schedule link state requests for those nodes to be sent
Expand Down
4 changes: 3 additions & 1 deletion src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1373,6 +1373,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
{
qdr_connection_role_t role = 0;
int cost = 1;
int remote_cost = 1;
int link_capacity = 1;
const char *name = 0;
bool streaming_links = false;
Expand Down Expand Up @@ -1464,7 +1465,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
if (!pn_data_next(props)) break;
if (is_router) {
if (pn_data_type(props) == PN_INT) {
const int remote_cost = (int) pn_data_get_int(props);
remote_cost = (int) pn_data_get_int(props);
if (remote_cost > cost)
cost = remote_cost;
}
Expand Down Expand Up @@ -1612,6 +1613,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
inbound,
role,
cost,
remote_cost,
connection_id,
name,
conn->strip_annotations_in,
Expand Down
44 changes: 44 additions & 0 deletions src/adaptors/amqp/connection_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,50 @@ QD_EXPORT void qd_connection_manager_delete_connector(qd_dispatch_t *qd, void *i
qd_connector_config_delete(ctor_config);
}

QD_EXPORT void qd_dispatch_update_connector(qd_dispatch_t *qd, void *impl, int new_cost)
{
qd_connector_config_t *ctor_config = (qd_connector_config_t *) impl;
if (!ctor_config) {
return;
}

int old_cost = ctor_config->config.inter_router_cost;
if (old_cost == new_cost) {
qd_log(LOG_CONN_MGR, QD_LOG_DEBUG,
"Connector '%s' cost unchanged (already %d)",
ctor_config->config.name, new_cost);
return;
}

// Update the connector's configuration
ctor_config->config.inter_router_cost = new_cost;

// Update all active connectors using this configuration
qd_connector_t *ctor = DEQ_HEAD(ctor_config->connectors);
while (ctor) {
sys_mutex_lock(&ctor->lock);

// If there's an active connection, update its cost
qd_connection_t *conn = ctor->qd_conn;
if (conn && conn->pn_conn) {
// Get the connection's identity for the core
uint64_t conn_id = qd_connection_connection_id(conn);

// Update the cost in the router core
// This will trigger the link state update
const bool use_maskbit = false;
qdr_core_update_connection_cost(qd->router->router_core, conn_id, new_cost, use_maskbit);
}

sys_mutex_unlock(&ctor->lock);
ctor = DEQ_NEXT(ctor);
}

qd_log(LOG_CONN_MGR, QD_LOG_INFO,
"Connector '%s' cost updated: %d -> %d",
ctor_config->config.name, old_cost, new_cost);
}


const char *qd_connector_name(qd_connector_t *ct)
{
Expand Down
1 change: 1 addition & 0 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ static qdr_connection_t *TL_open_core_connection(uint64_t conn_id, bool incoming
incoming, // incoming
QDR_ROLE_NORMAL, // role
1, // cost
1, // remote cost
conn_id, // management_id
0, // label
false, // strip_annotations_in
Expand Down
Loading
Loading