diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h index 749d723d5..c867c8b25 100644 --- a/include/qpid/dispatch/protocol_adaptor.h +++ b/include/qpid/dispatch/protocol_adaptor.h @@ -363,6 +363,7 @@ typedef void (*qdr_connection_bind_context_t) (qdr_connection_t *context, void * * @param incoming True iff this connection is associated with a listener, False if a connector * @param role The configured role of this connection * @param cost If the role is inter_router, this is the configured cost for the connection. + * @param remote_cost If the role is inter_router, this is the cost sent by the peer router in the open frame * @param management_id - A unique identifier that is used in management and logging operations. * @param label Optional label provided in the connection's configuration. * @param strip_annotations_in True if configured to remove annotations on inbound messages. @@ -370,20 +371,21 @@ typedef void (*qdr_connection_bind_context_t) (qdr_connection_t *context, void * * @param link_capacity The capacity, in deliveries, for links in this connection. * @return Pointer to a connection object that can be used to refer to this connection over its lifetime. */ -qdr_connection_t *qdr_connection_opened(qdr_core_t *core, - qdr_protocol_adaptor_t *protocol_adaptor, - bool incoming, - qdr_connection_role_t role, - int cost, - uint64_t management_id, - const char *label, - bool strip_annotations_in, - bool strip_annotations_out, - int link_capacity, - const qd_policy_spec_t *policy_spec, - qdr_connection_info_t *connection_info, - qdr_connection_bind_context_t context_binder, - void *bind_token); +qdr_connection_t *qdr_connection_opened(qdr_core_t *core, + qdr_protocol_adaptor_t *protocol_adaptor, + bool incoming, + qdr_connection_role_t role, + int cost, + int remote_cost, + uint64_t management_id, + const char *label, + bool strip_annotations_in, + bool strip_annotations_out, + int link_capacity, + const qd_policy_spec_t *policy_spec, + qdr_connection_info_t *connection_info, + qdr_connection_bind_context_t context_binder, + void *bind_token); /** * qdr_connection_notify_closed diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 32563edc6..fc6e82804 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -108,12 +108,14 @@ void qdr_core_topology_changed(qdr_core_t *core, int router_maskbit); typedef void (*qdr_set_mobile_seq_t) (void *context, int router_maskbit, uint64_t mobile_seq); typedef void (*qdr_set_my_mobile_seq_t) (void *context, uint64_t mobile_seq); typedef void (*qdr_link_lost_t) (void *context, int link_maskbit); +typedef void (*qdr_peer_cost_update_t)(void *context, const char *container_id, int new_cost); -void qdr_core_route_table_handlers(qdr_core_t *core, - void *context, - qdr_set_mobile_seq_t set_mobile_seq, - qdr_set_my_mobile_seq_t set_my_mobile_seq, - qdr_link_lost_t link_lost); +void qdr_core_route_table_handlers(qdr_core_t *core, + void *context, + qdr_set_mobile_seq_t set_mobile_seq, + qdr_set_my_mobile_seq_t set_my_mobile_seq, + qdr_link_lost_t link_lost, + qdr_peer_cost_update_t peer_cost_update); /** ****************************************************************************** @@ -162,6 +164,18 @@ qdr_subscription_t *qdr_core_subscribe(qdr_core_t *core, void qdr_core_unsubscribe(qdr_subscription_t *sub); +/** + * qdr_core_update_connection_cost + * + * Update the cost of an inter-router connection identified by conn_id or maskbit + * + * @param core Pointer to the core module + * @param conn_id The connection identity or maskbit + * @param new_cost The new cost value (must be >= 1) + * @param use_maskbit True if conn_id holds the maskbit of the connection, False if it holds the connection identity + */ +void qdr_core_update_connection_cost(qdr_core_t *core, uint64_t conn_id, int new_cost, bool use_maskbit); + /** * qdr_send_to * diff --git a/python/skupper_router/management/skrouter.json b/python/skupper_router/management/skrouter.json index 6c033c269..e40b5b8df 100644 --- a/python/skupper_router/management/skrouter.json +++ b/python/skupper_router/management/skrouter.json @@ -992,7 +992,7 @@ "connector": { "description": "Establishes an outgoing connection from the router.", "extends": "configurationEntity", - "operations": ["CREATE", "DELETE"], + "operations": ["CREATE", "UPDATE", "DELETE"], "attributes": { "host": { "description":"IP address: ipv4 or ipv6 literal or a host name", @@ -1032,6 +1032,7 @@ "default": 1, "required": false, "create": true, + "update": true, "description": "For the 'inter-router' role only. This value assigns a cost metric to the inter-router connection. The default (and minimum) value is one. Higher values represent higher costs. The cost is used to influence the routing algorithm as it attempts to use the path with the lowest total cost from ingress to egress." }, "sslProfile": { diff --git a/python/skupper_router_internal/dispatch.py b/python/skupper_router_internal/dispatch.py index 9f3ac443c..939568dfe 100644 --- a/python/skupper_router_internal/dispatch.py +++ b/python/skupper_router_internal/dispatch.py @@ -123,6 +123,7 @@ def __init__(self) -> None: self._prototype(self.qd_dispatch_delete_tcp_connector, None, [self.qd_dispatch_p, c_void_p]) self._prototype(self.qd_dispatch_configure_connector, c_void_p, [self.qd_dispatch_p, py_object]) self._prototype(self.qd_connection_manager_delete_connector, None, [self.qd_dispatch_p, c_void_p]) + self._prototype(self.qd_dispatch_update_connector, None, [self.qd_dispatch_p, c_void_p, c_long]) #sslProfile and display name service self._prototype(self.qd_tls_configure_ssl_profile, c_void_p, [self.qd_dispatch_p, py_object]) diff --git a/python/skupper_router_internal/management/agent.py b/python/skupper_router_internal/management/agent.py index 8db6cb7f4..4c1c3778b 100644 --- a/python/skupper_router_internal/management/agent.py +++ b/python/skupper_router_internal/management/agent.py @@ -532,6 +532,12 @@ def _identifier(self): def __str__(self): return super(ConnectorEntity, self).__str__().replace("Entity(", "ConnectorEntity(") + def _update(self): + new_cost = self.attributes.get('cost') + if new_cost is None or new_cost < 1: + raise ValidationError("Connector configuration update failed: cost value must be >= 1") + self._qd.qd_dispatch_update_connector(self._dispatch, self._implementations[0].key, new_cost) + class AddressEntity(EntityAdapter): def create(self): diff --git a/python/skupper_router_internal/router/data.py b/python/skupper_router_internal/router/data.py index 2dd8035c6..60711be9a 100644 --- a/python/skupper_router_internal/router/data.py +++ b/python/skupper_router_internal/router/data.py @@ -118,6 +118,12 @@ def del_all_peers(self): self.peers = {} self.ls_seq = 0 + def update_peer_cost(self, _id, _cost): + if _id in self.peers and self.peers[_id] != _cost: + self.peers[_id] = _cost + return True + return False + def has_peers(self): return len(self.peers) > 0 diff --git a/python/skupper_router_internal/router/engine.py b/python/skupper_router_internal/router/engine.py index 9993e2f80..c8b8946f5 100644 --- a/python/skupper_router_internal/router/engine.py +++ b/python/skupper_router_internal/router/engine.py @@ -115,6 +115,12 @@ def linkLost(self, link_id): """ self.node_tracker.link_lost(link_id) + def peerCostUpdate(self, node_id, new_cost): + """ + Invoked from C core when the cost to a neighbor router changes dynamically. + """ + self.node_tracker.peer_cost_update(node_id, new_cost) + def handleTimerTick(self): """ """ @@ -230,8 +236,3 @@ def send(self, dest, msg): """ app_props = {'opcode' : msg.get_opcode()} self.io_adapter[0].send(Message(address=dest, properties=app_props, body=msg.to_dict()), True, True) - - def node_updated(self, addr, reachable, neighbor): - """ - """ - self.router_adapter(addr, reachable, neighbor) diff --git a/python/skupper_router_internal/router/node.py b/python/skupper_router_internal/router/node.py index 115e4d20a..9d3cdbac9 100644 --- a/python/skupper_router_internal/router/node.py +++ b/python/skupper_router_internal/router/node.py @@ -272,6 +272,16 @@ def link_lost(self, link_id): if self.link_state.del_peer(node_id): self.link_state_changed = True + def peer_cost_update(self, node_id, new_cost): + """ + Invoked when neighbor cost got changed dynamically + """ + if self.link_state.is_peer(node_id): + old_cost = self.link_state.peers[node_id] + if self.link_state.update_peer_cost(node_id, new_cost): + self.link_state_changed = True + self.container.log_ls(LOG_DEBUG, "Updated cost to neighbor %s in local link state: %d -> %d" % (node_id, old_cost, new_cost)) + def set_mobile_seq(self, router_maskbit, mobile_seq): """ """ @@ -364,6 +374,18 @@ def link_state_received(self, node_id, version, link_state, instance, now): node.link_state.last_seen = now self.recompute_topology = True + ## + # If this is a neighbor router and the cost has changed in the received + # link state, call the C core to update the connection cost locally. The + # C core then calls back to python to update the local link state. + ## + if node.is_neighbor() and self.my_id in link_state.peers: + new_cost = link_state.peers[self.my_id] + old_cost = self.link_state.peers[node_id] + if old_cost != new_cost: + self.container.log(LOG_DEBUG, "Neighbor LSU received from node %s - cost change %d -> %d" % (node_id, old_cost, new_cost)) + self.container.router_adapter.update_connection_cost(node.peer_link_id, new_cost) + ## # Look through the new link state for references to nodes that we don't # know about. Schedule link state requests for those nodes to be sent diff --git a/src/adaptors/amqp/amqp_adaptor.c b/src/adaptors/amqp/amqp_adaptor.c index 1183767d2..efe6edbd7 100644 --- a/src/adaptors/amqp/amqp_adaptor.c +++ b/src/adaptors/amqp/amqp_adaptor.c @@ -1373,6 +1373,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool { qdr_connection_role_t role = 0; int cost = 1; + int remote_cost = 1; int link_capacity = 1; const char *name = 0; bool streaming_links = false; @@ -1464,7 +1465,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool if (!pn_data_next(props)) break; if (is_router) { if (pn_data_type(props) == PN_INT) { - const int remote_cost = (int) pn_data_get_int(props); + remote_cost = (int) pn_data_get_int(props); if (remote_cost > cost) cost = remote_cost; } @@ -1612,6 +1613,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool inbound, role, cost, + remote_cost, connection_id, name, conn->strip_annotations_in, diff --git a/src/adaptors/amqp/connection_manager.c b/src/adaptors/amqp/connection_manager.c index 5afb1d3fe..ef04b2dc6 100644 --- a/src/adaptors/amqp/connection_manager.c +++ b/src/adaptors/amqp/connection_manager.c @@ -341,6 +341,50 @@ QD_EXPORT void qd_connection_manager_delete_connector(qd_dispatch_t *qd, void *i qd_connector_config_delete(ctor_config); } +QD_EXPORT void qd_dispatch_update_connector(qd_dispatch_t *qd, void *impl, int new_cost) +{ + qd_connector_config_t *ctor_config = (qd_connector_config_t *) impl; + if (!ctor_config) { + return; + } + + int old_cost = ctor_config->config.inter_router_cost; + if (old_cost == new_cost) { + qd_log(LOG_CONN_MGR, QD_LOG_DEBUG, + "Connector '%s' cost unchanged (already %d)", + ctor_config->config.name, new_cost); + return; + } + + // Update the connector's configuration + ctor_config->config.inter_router_cost = new_cost; + + // Update all active connectors using this configuration + qd_connector_t *ctor = DEQ_HEAD(ctor_config->connectors); + while (ctor) { + sys_mutex_lock(&ctor->lock); + + // If there's an active connection, update its cost + qd_connection_t *conn = ctor->qd_conn; + if (conn && conn->pn_conn) { + // Get the connection's identity for the core + uint64_t conn_id = qd_connection_connection_id(conn); + + // Update the cost in the router core + // This will trigger the link state update + const bool use_maskbit = false; + qdr_core_update_connection_cost(qd->router->router_core, conn_id, new_cost, use_maskbit); + } + + sys_mutex_unlock(&ctor->lock); + ctor = DEQ_NEXT(ctor); + } + + qd_log(LOG_CONN_MGR, QD_LOG_INFO, + "Connector '%s' cost updated: %d -> %d", + ctor_config->config.name, old_cost, new_cost); +} + const char *qd_connector_name(qd_connector_t *ct) { diff --git a/src/adaptors/tcp/tcp_adaptor.c b/src/adaptors/tcp/tcp_adaptor.c index 75469bb03..dee3d45f8 100644 --- a/src/adaptors/tcp/tcp_adaptor.c +++ b/src/adaptors/tcp/tcp_adaptor.c @@ -351,6 +351,7 @@ static qdr_connection_t *TL_open_core_connection(uint64_t conn_id, bool incoming incoming, // incoming QDR_ROLE_NORMAL, // role 1, // cost + 1, // remote cost conn_id, // management_id 0, // label false, // strip_annotations_in diff --git a/src/router_core/connections.c b/src/router_core/connections.c index af2801943..6bb6b687a 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -84,6 +84,7 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core, bool incoming, qdr_connection_role_t role, int cost, + int remote_cost, uint64_t management_id, const char *label, bool strip_annotations_in, @@ -106,6 +107,7 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core, conn->incoming = incoming; conn->role = role; conn->inter_router_cost = cost; + conn->remote_cost = remote_cost; conn->strip_annotations_in = strip_annotations_in; conn->strip_annotations_out = strip_annotations_out; conn->policy_spec = policy_spec; @@ -2810,3 +2812,129 @@ static void setup_inter_router_control_conn_CT(qdr_core_t *core, qdr_connection_ } } } + +/** Inter-router connection cost update function + * + * Dynamic connection cost update happens in two steps: + * (1) connector side cost is updated by mgmt call, LS seq bumped + * (2) listener side updates the connection cost from LSU reveived from peer + * + * This core function is called when either mgmt updates the local connector cost (step 1), + * or an LSU is recieved from a neighbor with our connection cost changed (which happens + * at step 2). + * + * The action->args.cost_update.use_maskbit flag is set if it is called from LSU processing. + * In that case, action->args.cost_update.conn_id holds the maskbit of the connection. If the + * flag is not set then the function is called from the managment thread (step 1) and + * action->args.cost_update.conn_id holds the coonnection identity. + * + * */ +static void qdr_core_update_connection_cost_CT(qdr_core_t *core, + qdr_action_t *action, + bool discard) +{ + if (discard) return; + + int new_cost = action->args.cost_update.cost; + qdr_connection_t *conn = 0; + const bool local_mgmt_update = !action->args.cost_update.use_maskbit; + + if (action->args.cost_update.use_maskbit) { + // Look up connection by mask_bit. We are called due to a neighbour LSU was received. + int mask_bit = (int)action->args.cost_update.conn_id; + + if (mask_bit >= 0 && mask_bit < qd_bitmask_width()) { + conn = core->rnode_conns_by_mask_bit[mask_bit]; + } + + if (!conn) { + qd_log(LOG_ROUTER_CORE, QD_LOG_WARNING, + "Cost update failed: no connection found for mask_bit %d", mask_bit); + return; + } + } else { + // Find the connection by id. We are called by local management. + uint64_t conn_id = action->args.cost_update.conn_id; + for (qdr_connection_t *c = DEQ_HEAD(core->open_connections); c; c = DEQ_NEXT(c)) { + if (c->identity == conn_id) { + conn = c; + break; + } + } + + if (!conn) { + qd_log(LOG_ROUTER_CORE, QD_LOG_WARNING, + "Cost update failed: connection C%"PRIu64" not found", conn_id); + return; + } + } + + if (conn->role != QDR_ROLE_INTER_ROUTER) { + if (conn->role == QDR_ROLE_INTER_ROUTER_DATA) + return; + qd_log(LOG_ROUTER_CORE, QD_LOG_WARNING, + "Cost update ignored: connection C%"PRIu64" is not inter-router (role: %d)", conn->identity, conn->role); + return; + } + + const int old_cost = conn->inter_router_cost; + if (old_cost == new_cost) { + if (local_mgmt_update) { + // Connector cost got changed by mgmt but connection cost does not change. + // This can happen if local connector cost had been smaller then peer + // listener cost but they got equal now. + return; + } + // A peer sent us an LSU with a cost value that is different from the cost of that peer in our local link + // state. However, the cost of this connection is the same as the peer cost from the LSU, i.e. it is + // different from the peer cost in the local link state. This can happen only if we are connected by multiple + // inter-router connections. This is unsupported configuration. + qd_log(LOG_ROUTER_CORE, QD_LOG_WARNING, "[C%"PRIu64"] Multiple inter-router connections to peer router %s", + conn->identity, conn->connection_info->container); + return; + } else if (!conn->incoming && !local_mgmt_update) { + // We can get here if the peer (listener side) sends us an LSU right after the connector cost + // has updated locally by mgmt but before the peer receives our updated LSU. This can be ignored. + qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, + "[C%"PRIu64"] Ignoring cost update: peer listener side cost does not match connector side cost: %d != %d", + conn->identity, new_cost, old_cost); + return; + } + + if (conn->incoming) { + // Peer updated connector cost and notified us (listener side) via LSU + qd_log(LOG_ROUTER_CORE, QD_LOG_INFO, + "[C%"PRIu64"] Connection cost updated at listener side: %d -> %d", + conn->identity, conn->inter_router_cost , new_cost); + conn->inter_router_cost = new_cost; + } else { + // Local mgmt cost update. New connection cost is the max of the new connector cost and + // the immutable listener cost + const int max_cost = MAX(new_cost, conn->remote_cost); + if (max_cost == old_cost) + return; // no change for connection cost + + conn->inter_router_cost = max_cost; + qd_log(LOG_ROUTER_CORE, QD_LOG_INFO, + "[C%"PRIu64"] Connection cost updated at connector side: %d -> %d (new connector cost: %d listener cost: %d)", + conn->identity, old_cost, conn->inter_router_cost, new_cost, conn->remote_cost); + } + + // Notify Python routing engine to update local link state. Container ID + // indentifies the peer router node. + qdr_post_peer_cost_update_CT(core, conn->connection_info->container, conn->inter_router_cost); +} + + +void qdr_core_update_connection_cost(qdr_core_t *core, + uint64_t conn_id, + int new_cost, + bool use_maskbit) +{ + qdr_action_t *action = qdr_action(qdr_core_update_connection_cost_CT, + "connection_update_cost"); + action->args.cost_update.conn_id = conn_id; + action->args.cost_update.cost = new_cost; + action->args.cost_update.use_maskbit = use_maskbit; + qdr_action_enqueue(core, action); +} diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c index cdc82b5b1..296e24cb2 100644 --- a/src/router_core/route_tables.c +++ b/src/router_core/route_tables.c @@ -134,12 +134,14 @@ void qdr_core_route_table_handlers(qdr_core_t *core, void *context, qdr_set_mobile_seq_t set_mobile_seq, qdr_set_my_mobile_seq_t set_my_mobile_seq, - qdr_link_lost_t link_lost) + qdr_link_lost_t link_lost, + qdr_peer_cost_update_t peer_cost_update) { core->rt_context = context; core->rt_set_mobile_seq = set_mobile_seq; core->rt_set_my_mobile_seq = set_my_mobile_seq; core->rt_link_lost = link_lost; + core->rt_peer_cost_update = peer_cost_update; } @@ -757,6 +759,12 @@ static void qdr_do_link_lost(qdr_core_t *core, qdr_general_work_t *work, bool di core->rt_link_lost(core->rt_context, work->maskbit); } +static void qdr_do_peer_cost_update(qdr_core_t *core, qdr_general_work_t *work, bool discard) +{ + if (!discard) + core->rt_peer_cost_update(core->rt_context, work->container, work->inter_router_cost); + free(work->container); +} void qdr_post_set_mobile_seq_CT(qdr_core_t *core, int router_maskbit, uint64_t mobile_seq) { @@ -783,3 +791,12 @@ void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit) } +void qdr_post_peer_cost_update_CT(qdr_core_t *core, const char *container, int inter_router_cost) +{ + qdr_general_work_t *work = qdr_general_work(qdr_do_peer_cost_update); + work->container = strdup(container); + work->inter_router_cost = inter_router_cost; + qdr_post_general_work_CT(core, work); +} + + diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 4aae29fb8..a3b36d37a 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -183,6 +183,15 @@ struct qdr_action_t { void *context; } stats_request; + // + // Arguments for connection cost update + // + struct { + uint64_t conn_id; // connection identity or mask_bit depending on use_maskbit flag + int cost; + bool use_maskbit; // true if conn_id field contains mask_bit, false if it contains connection identity + } cost_update; + // // Arguments for general use // @@ -235,6 +244,7 @@ struct qdr_general_work_t { qdr_general_work_handler_t handler; int maskbit; int inter_router_cost; + char *container; qd_message_t *msg; qdr_receive_t on_message; void *on_message_context; @@ -700,6 +710,7 @@ struct qdr_connection_t { bool enable_protocol_trace; // Has trace level logging been turned on for this connection. bool has_streaming_links; ///< one or more of this connection's links are for streaming messages int inter_router_cost; + int remote_cost; // Remote connection cost from the OPEN frame int link_capacity; int mask_bit; ///< set only if inter-router control connection int group_parent_mask_bit; ///< if inter-router data connection maskbit of group parent inter-router control conn @@ -861,6 +872,7 @@ struct qdr_core_t { qdr_set_mobile_seq_t rt_set_mobile_seq; qdr_set_my_mobile_seq_t rt_set_my_mobile_seq; qdr_link_lost_t rt_link_lost; + qdr_peer_cost_update_t rt_peer_cost_update; // // Events section @@ -981,6 +993,7 @@ void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query); void qdr_post_set_mobile_seq_CT(qdr_core_t *core, int router_maskbit, uint64_t mobile_seq); void qdr_post_set_my_mobile_seq_CT(qdr_core_t *core, uint64_t mobile_seq); void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit); +void qdr_post_peer_cost_update_CT(qdr_core_t *core, const char *container, int new_cost); void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work); void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr); diff --git a/src/router_pynode.c b/src/router_pynode.c index 41d0a7702..a18b9f912 100644 --- a/src/router_pynode.c +++ b/src/router_pynode.c @@ -34,6 +34,7 @@ static PyObject *pyTick = 0; static PyObject *pySetMobileSeq = 0; static PyObject *pySetMyMobileSeq = 0; static PyObject *pyLinkLost = 0; +static PyObject *pyPeerCostUpdate = 0; typedef struct { PyObject_HEAD @@ -279,6 +280,35 @@ static PyObject* qd_mobile_seq_advanced(PyObject *self, PyObject *args) } +static PyObject* qd_update_connection_cost(PyObject *self, PyObject *args) +{ + RouterAdapter *adapter = (RouterAdapter*) self; + qd_router_t *router = adapter->router; + int mask_bit; + int new_cost; + + if (!PyArg_ParseTuple(args, "ii", &mask_bit, &new_cost)) + return 0; + + if (mask_bit >= qd_bitmask_width() || mask_bit < 0) { + PyErr_SetString(PyExc_Exception, "Router bit mask out of range"); + return 0; + } + + if (new_cost < 1) { + PyErr_SetString(PyExc_Exception, "Cost must be >= 1"); + return 0; + } + + // Update the connection cost via the core using the mask_bit + const bool use_maskbit = true; + qdr_core_update_connection_cost(router->router_core, mask_bit, new_cost, use_maskbit); + + Py_INCREF(Py_None); + return Py_None; +} + + static PyObject* qd_topology_changed(PyObject *self, PyObject *args) { RouterAdapter *adapter = (RouterAdapter*) self; @@ -306,19 +336,20 @@ static PyObject* qd_get_agent(PyObject *self, PyObject *args) { } static PyMethodDef RouterAdapter_methods[] = { - {"add_router", qd_add_router, METH_VARARGS, "A new remote/reachable router has been discovered"}, - {"del_router", qd_del_router, METH_VARARGS, "We've lost reachability to a remote router"}, - {"set_link", qd_set_link, METH_VARARGS, "Set the link for a neighbor router"}, - {"remove_link", qd_remove_link, METH_VARARGS, "Remove the link for a neighbor router"}, - {"set_next_hop", qd_set_next_hop, METH_VARARGS, "Set the next hop for a remote router"}, - {"remove_next_hop", qd_remove_next_hop, METH_VARARGS, "Remove the next hop for a remote router"}, - {"set_cost", qd_set_cost, METH_VARARGS, "Set the cost to reach a remote router"}, - {"set_valid_origins", qd_set_valid_origins, METH_VARARGS, "Set the valid origins for a remote router"}, - {"set_radius", qd_set_radius, METH_VARARGS, "Set the current topology radius"}, - {"flush_destinations", qd_flush_destinations, METH_VARARGS, "Remove all mapped destinations from a router"}, - {"mobile_seq_advanced", qd_mobile_seq_advanced, METH_VARARGS, "Mobile sequence for a router moved ahead of the local value"}, - {"topology_changed", qd_topology_changed, METH_VARARGS, "The computed topology has changed. Passes in the timestamp"}, - {"get_agent", qd_get_agent, METH_VARARGS, "Get the management agent"}, + {"add_router", qd_add_router, METH_VARARGS, "A new remote/reachable router has been discovered"}, + {"del_router", qd_del_router, METH_VARARGS, "We've lost reachability to a remote router"}, + {"set_link", qd_set_link, METH_VARARGS, "Set the link for a neighbor router"}, + {"remove_link", qd_remove_link, METH_VARARGS, "Remove the link for a neighbor router"}, + {"set_next_hop", qd_set_next_hop, METH_VARARGS, "Set the next hop for a remote router"}, + {"remove_next_hop", qd_remove_next_hop, METH_VARARGS, "Remove the next hop for a remote router"}, + {"set_cost", qd_set_cost, METH_VARARGS, "Set the cost to reach a remote router"}, + {"set_valid_origins", qd_set_valid_origins, METH_VARARGS, "Set the valid origins for a remote router"}, + {"set_radius", qd_set_radius, METH_VARARGS, "Set the current topology radius"}, + {"flush_destinations", qd_flush_destinations, METH_VARARGS, "Remove all mapped destinations from a router"}, + {"mobile_seq_advanced", qd_mobile_seq_advanced, METH_VARARGS, "Mobile sequence for a router moved ahead of the local value"}, + {"update_connection_cost", qd_update_connection_cost, METH_VARARGS, "Update the cost of an inter-router connection"}, + {"topology_changed", qd_topology_changed, METH_VARARGS, "The computed topology has changed. Passes in the timestamp"}, + {"get_agent", qd_get_agent, METH_VARARGS, "Get the management agent"}, {0, 0, 0, 0} }; @@ -390,6 +421,26 @@ static void qd_router_link_lost(void *context, int link_mask_bit) } +static void qd_router_peer_cost_update(void *context, const char *container_id, int new_cost) +{ + qd_router_t *router = (qd_router_t*) context; + PyObject *pArgs; + PyObject *pValue; + + if (pyPeerCostUpdate && router->router_mode == QD_ROUTER_MODE_INTERIOR) { + qd_python_lock_state_t lock_state = qd_python_lock(); + pArgs = PyTuple_New(2); + PyTuple_SetItem(pArgs, 0, PyUnicode_FromString(container_id)); + PyTuple_SetItem(pArgs, 1, PyLong_FromLong((long) new_cost)); + pValue = PyObject_CallObject(pyPeerCostUpdate, pArgs); + qd_error_py(); + Py_DECREF(pArgs); + Py_XDECREF(pValue); + qd_python_unlock(lock_state); + } +} + + qd_error_t qd_router_python_setup(qd_router_t *router) { qd_error_clear(); @@ -398,7 +449,8 @@ qd_error_t qd_router_python_setup(qd_router_t *router) router, qd_router_set_mobile_seq, qd_router_set_my_mobile_seq, - qd_router_link_lost); + qd_router_link_lost, + qd_router_peer_cost_update); // // If we are not operating as an interior router, don't start the @@ -467,6 +519,7 @@ qd_error_t qd_router_python_setup(qd_router_t *router) pySetMobileSeq = PyObject_GetAttrString(pyRouter, "setMobileSeq"); QD_ERROR_PY_RET(); pySetMyMobileSeq = PyObject_GetAttrString(pyRouter, "setMyMobileSeq"); QD_ERROR_PY_RET(); pyLinkLost = PyObject_GetAttrString(pyRouter, "linkLost"); QD_ERROR_PY_RET(); + pyPeerCostUpdate = PyObject_GetAttrString(pyRouter, "peerCostUpdate"); QD_ERROR_PY_RET(); return qd_error_code(); } @@ -477,6 +530,7 @@ void qd_router_python_free(qd_router_t *router) { Py_CLEAR(pySetMobileSeq); Py_CLEAR(pySetMyMobileSeq); Py_CLEAR(pyLinkLost); + Py_CLEAR(pyPeerCostUpdate); PyGC_Collect(); qd_python_unlock(ls); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 5f5b835b8..147ab710e 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -142,6 +142,7 @@ foreach(py_test_module system_tests_topology system_tests_topology_disposition system_tests_topology_addition + system_tests_topology_dynamic_cost system_tests_delivery_counts system_tests_cmdline_parsing system_tests_bad_configuration diff --git a/tests/system_tests_topology_dynamic_cost.py b/tests/system_tests_topology_dynamic_cost.py new file mode 100644 index 000000000..dc0b93b4b --- /dev/null +++ b/tests/system_tests_topology_dynamic_cost.py @@ -0,0 +1,602 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import time + +# must include interrouter_msg BEFORE any proton modules because it +# monkey-patches proton.Message so we can get the message trace +# annotation +import interrouter_msg # noqa: F401 # pylint: disable=unused-import + +from proton import Message +from proton.handlers import MessagingHandler +from proton.reactor import Container + +from system_test import TestCase, Qdrouterd, main_module +from system_test import TIMEOUT, AMQP_CONNECTOR_TYPE +from system_test import unittest + +# ------------------------------------------------ +# Helper classes for all tests. +# ------------------------------------------------ + + +class Timeout: + """ + Named timeout object can handle multiple simultaneous + timers, by telling the parent which one fired. + """ + + def __init__(self, parent, name): + self.parent = parent + self.name = name + + def on_timer_task(self, event): + self.parent.timeout(self.name) + + +class ManagementMessageHelper: + """ + Format management messages. + """ + + def __init__(self, reply_addr): + self.reply_addr = reply_addr + + def make_connector_query(self, connector_name): + props = {'operation': 'READ', 'type': AMQP_CONNECTOR_TYPE, 'name' : connector_name} + msg = Message(properties=props, reply_to=self.reply_addr) + return msg + + def make_connector_update_command(self, connector_name, new_cost): + props = {'operation': 'UPDATE', 'type': AMQP_CONNECTOR_TYPE, 'name' : connector_name} + msg_body = {'cost': new_cost} + msg = Message(body=msg_body, properties=props, reply_to=self.reply_addr) + return msg + + +# ------------------------------------------------ +# END Helper classes for all tests. +# ------------------------------------------------ + + +# ================================================================ +# Setup +# ================================================================ + +class TopologyTests (TestCase): + + @classmethod + def setUpClass(cls): + super(TopologyTests, cls).setUpClass() + + def router(name, more_config): + + config = [('router', {'mode': 'interior', 'id': name}), + ('address', {'prefix': 'closest', 'distribution': 'closest'}), + ('address', {'prefix': 'balanced', 'distribution': 'balanced'}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}) + ] \ + + more_config + + config = Qdrouterd.Config(config) + + cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) + + cls.routers = [] + + A_client_port = cls.tester.get_port() + B_client_port = cls.tester.get_port() + C_client_port = cls.tester.get_port() + D_client_port = cls.tester.get_port() + + A_inter_router_port = cls.tester.get_port() + B_inter_router_port = cls.tester.get_port() + C_inter_router_port = cls.tester.get_port() + + # + # + # Topology of the 4-mesh, with costs of connections marked. + # Tail of arrow indicates initiator of connection. + # + # 1 + # D ----------> A + # | \ > ^ + # | 20\ 50/ | + # | \ / | + # 1 | / \ | 100 + # | / \ | + # v / > | + # C ----------> B (listener cost: 5) + # 10 + # + # Test 1 TopologyCostUpdate Notes + # + # 1. Messages are always sent from A, and go to B. + # 2. First route should be ADCB (cost: 1 + 1 + 10) + # 3. Then change A_B_cost to 11 + # 4. Next route should be AB (cost: 11) + # 5. Then change B_D_cost to 1 + # 6. Next route should be ADB (cost: 1 + 5 due to larger listener cost) + # 7. Then change A_B_cost to 5 + # 8. Final route should be AB (cost: 5) + + # Initial connector costs + cls.A_B_cost = 100 + cls.A_C_cost = 50 + cls.A_D_cost = 1 + cls.B_C_cost = 10 + cls.B_D_cost = 20 + cls.C_D_cost = 1 + + # Listener costs + cls.B_listener_cost = 5 + + router('A', + [ + ('listener', + {'port': A_client_port, + 'role': 'normal', + 'stripAnnotations': 'no' + } + ), + ('listener', + {'role': 'inter-router', + 'port': A_inter_router_port, + 'stripAnnotations': 'no' + } + ) + ] + ) + + router('B', + [ + ('listener', + {'port': B_client_port, + 'role': 'normal', + 'stripAnnotations': 'no' + } + ), + ('listener', + {'role': 'inter-router', + 'port': B_inter_router_port, + 'stripAnnotations': 'no', + 'cost': cls.B_listener_cost + } + ), + ('connector', + {'name': 'AB_connector', + 'role': 'inter-router', + 'port': A_inter_router_port, + 'cost': cls.A_B_cost, + 'stripAnnotations': 'no' + } + ) + ] + ) + + router('C', + [ + ('listener', + {'port': C_client_port, + 'role': 'normal', + 'stripAnnotations': 'no' + } + ), + ('listener', + {'role': 'inter-router', + 'port': C_inter_router_port, + 'stripAnnotations': 'no' + } + ), + ('connector', + {'name': 'AC_connector', + 'role': 'inter-router', + 'port': A_inter_router_port, + 'cost' : cls.A_C_cost, + 'stripAnnotations': 'no' + } + ), + ('connector', + {'name': 'BC_connector', + 'role': 'inter-router', + 'port': B_inter_router_port, + 'cost' : cls.B_C_cost, + 'stripAnnotations': 'no' + } + ) + ] + ) + + router('D', + [ + ('listener', + {'port': D_client_port, + 'role': 'normal', + 'stripAnnotations': 'no' + } + ), + ('connector', + {'name': 'AD_connector', + 'role': 'inter-router', + 'port': A_inter_router_port, + 'cost' : cls.A_D_cost, + 'stripAnnotations': 'no' + } + ), + ('connector', + {'name': 'BD_connector', + 'role': 'inter-router', + 'port': B_inter_router_port, + 'cost' : cls.B_D_cost, + 'stripAnnotations': 'no' + } + ), + ('connector', + {'name': 'CD_connector', + 'role': 'inter-router', + 'port': C_inter_router_port, + 'cost' : cls.C_D_cost, + 'stripAnnotations': 'no' + } + ) + ] + ) + + router_A = cls.routers[0] + router_B = cls.routers[1] + router_C = cls.routers[2] + router_D = cls.routers[3] + + # Make sure every router is connected to every other router + router_A.wait_router_connected('B') + router_A.wait_router_connected('C') + router_A.wait_router_connected('D') + + router_B.wait_router_connected('A') + router_B.wait_router_connected('C') + router_B.wait_router_connected('D') + + router_C.wait_router_connected('A') + router_C.wait_router_connected('B') + router_C.wait_router_connected('D') + + router_D.wait_router_connected('A') + router_D.wait_router_connected('B') + router_D.wait_router_connected('C') + + # Additional reinforcements. This wait_connectors() queries for connections. + # We can now be absolutely sure that the router network has been established. + router_A.wait_connectors() + router_B.wait_connectors() + router_C.wait_connectors() + router_D.wait_connectors() + + cls.client_addrs = (router_A.addresses[0], + router_B.addresses[0], + router_C.addresses[0], + router_D.addresses[0] + ) + + # 1 means skip that test. + cls.skip = {'test_01' : 0 + } + + def test_01_topology_cost_update(self): + name = 'test_01' + if self.skip[name]: + self.skipTest("Test skipped during development.") + test = TopologyCostUpdate(self.client_addrs, + "closest/01" + ) + test.run() + self.assertIsNone(test.error) + + +# ================================================================ +# Tests +# ================================================================ + +class TopologyCostUpdate (MessagingHandler): + """ + Test that the lowest-cost route is always chosen in a 4-mesh + network topology, as costs of connectors changes dynamically. + + """ + + def __init__(self, client_addrs, destination): + super(TopologyCostUpdate, self).__init__(prefetch=0) + self.client_addrs = client_addrs + self.dest = destination + self.error = None + self.sender = None + self.receiver = None + self.test_timer = None + self.send_timer = None + self.n_sent = 0 + self.n_received = 0 + self.n_accepted = 0 + self.n_released = 0 + self.reactor = None + self.state = None + self.send_conn = None + self.recv_conn = None + self.nap_time = 2 + self.debug = True + self.trace_count = 0 + + # Holds the management sender, receiver, and 'helper' + # associated with each router. + self.routers = { + 'A' : dict(), + 'B' : dict(), + 'C' : dict(), + 'D' : dict() + } + + # These are the expected routing traces, in the order we + # expect to receive them. + self.expected_traces = [ + ['0/A', '0/D', '0/C', '0/B'], + ['0/A', '0/B'], + ['0/A', '0/D', '0/B'], + ['0/A', '0/B'] + ] + + # This tells the system in what order to update connector costs. + self.update_list = ( + ('B', 'AB_connector', 11), + ('D', 'BD_connector', 1), + ('B', 'AB_connector', 5), + ) + + # Use this to keep track of which connectors we have found + # when the test is first getting started and we are checking + # the topology. + self.connectors_map = { + 'AB_connector' : 0, + 'AC_connector' : 0, + 'AD_connector' : 0, + 'BC_connector' : 0, + 'BD_connector' : 0, + 'CD_connector' : 0 + } + + # Number of inter-router connections per router at the start of the test + self.connections_map = { + 'A' : 3, + 'B' : 3, + 'C' : 3, + 'D' : 3 + } + + # The simple state machine transitions when certain events happen, + # if certain conditions are met. The conditions are checked for + # by the callbacks for the events. + # The normal sequence of states in the state machine is: + # 1. starting -- doesn't do anything + # 2. checking -- checks initial topology + # 3. examine_trace -- look at routing trace of first message + # 4. update_connector_cost -- updates the first connector cost (BA) + # 5. examine_trace -- checks routing trace of next message(s) + # 6. update_connector_cost -- updates the next connector cost (BD) + # 7. examine_trace -- checks routing trace of next message + # 8. update_connector_cost -- updates the next connector cost (BA) + # 9. examine_trace -- checks routing trace of final message + # 10. bailing -- bails out with success + + def state_transition(self, message, new_state) : + if self.state == new_state : + return + self.state = new_state + self.debug_print("state transition to : %s -- because %s" % (self.state, message)) + + def debug_print(self, text) : + if self.debug: + print("%s %s" % (time.time(), text)) + + # Shut down everything and exit. + def bail(self, text): + self.error = text + + self.send_conn.close() + self.recv_conn.close() + + self.routers['B']['mgmt_conn'].close() + self.routers['C']['mgmt_conn'].close() + self.routers['D']['mgmt_conn'].close() + + self.test_timer.cancel() + self.send_timer.cancel() + + # ------------------------------------------------------------------------ + # I want some behavior from this test that is a little too complex + # to be governed by the usual callback functions. The way I do this + # is by making a simple state machine that checks some conditions + # during some callback, and then either steps forward or terminates + # the test. + # The callbacks that activate the state machine are mostly on_message, + # or timeout. But there are two different timers: the one-second + # timer that mostly runs the test, and the 60-second timer that, if it + # fires, will terminate the test with a timeout error. + # ------------------------------------------------------------------------ + def timeout(self, name): + if name == 'test': + self.state_transition('Timeout Expired', 'bailing') + self.bail("Timeout Expired: n_sent=%d n_received=%d n_accepted=%d" % + (self.n_sent, self.n_received, self.n_accepted)) + elif name == 'sender': + if self.state == 'examine_trace' : + self.send_messages() + self.send_timer = self.reactor.schedule(1, Timeout(self, "sender")) + + def on_start(self, event): + self.state_transition('on_start', 'starting') + self.reactor = event.reactor + self.test_timer = event.reactor.schedule(TIMEOUT, Timeout(self, "test")) + self.send_timer = event.reactor.schedule(1, Timeout(self, "sender")) + self.send_conn = event.container.connect(self.client_addrs[0]) # A + self.recv_conn = event.container.connect(self.client_addrs[1]) # B + + self.sender = event.container.create_sender(self.send_conn, self.dest) + self.receiver = event.container.create_receiver(self.recv_conn, self.dest) + self.receiver.flow(100) + + # I will only send management messages to B, C, and D, because + # they are the owners of the connectirs that I will want to check and update. + self.routers['B']['mgmt_conn'] = event.container.connect(self.client_addrs[1]) + self.routers['C']['mgmt_conn'] = event.container.connect(self.client_addrs[2]) + self.routers['D']['mgmt_conn'] = event.container.connect(self.client_addrs[3]) + + self.routers['B']['mgmt_receiver'] = event.container.create_receiver(self.routers['B']['mgmt_conn'], dynamic=True) + self.routers['C']['mgmt_receiver'] = event.container.create_receiver(self.routers['C']['mgmt_conn'], dynamic=True) + self.routers['D']['mgmt_receiver'] = event.container.create_receiver(self.routers['D']['mgmt_conn'], dynamic=True) + + self.routers['B']['mgmt_sender'] = event.container.create_sender(self.routers['B']['mgmt_conn'], "$management") + self.routers['C']['mgmt_sender'] = event.container.create_sender(self.routers['C']['mgmt_conn'], "$management") + self.routers['D']['mgmt_sender'] = event.container.create_sender(self.routers['D']['mgmt_conn'], "$management") + + # ----------------------------------------------------------------- + # At start-time, as the links to the three managed routers + # open, check each one to make sure that it has all the expected + # connections. + # ----------------------------------------------------------------- + + def on_link_opened(self, event) : + self.state_transition('on_link_opened', 'checking') + # The B mgmt link has opened. Check its connections. -------------------------- + if event.receiver == self.routers['B']['mgmt_receiver'] : + event.receiver.flow(1000) + self.routers['B']['mgmt_helper'] = ManagementMessageHelper(event.receiver.remote_source.address) + for connector in ['AB_connector'] : + self.connector_check('B', connector) + # The C mgmt link has opened. Check its connections. -------------------------- + elif event.receiver == self.routers['C']['mgmt_receiver'] : + event.receiver.flow(1000) + self.routers['C']['mgmt_helper'] = ManagementMessageHelper(event.receiver.remote_source.address) + for connector in ['AC_connector', 'BC_connector'] : + self.connector_check('C', connector) + # The D mgmt link has opened. Check its connections. -------------------------- + elif event.receiver == self.routers['D']['mgmt_receiver']: + event.receiver.flow(1000) + self.routers['D']['mgmt_helper'] = ManagementMessageHelper(event.receiver.remote_source.address) + for connector in ['AD_connector', 'BD_connector', 'CD_connector'] : + self.connector_check('D', connector) + + def send_messages(self): + n_sent_this_time = 0 + if self.sender.credit <= 0: + self.receiver.flow(100) + self.debug_print("receiver sends flow of 100") + return + # Send messages one at a time. + if self.sender.credit > 0 : + msg = Message(body=self.n_sent) + self.sender.send(msg) + n_sent_this_time += 1 + self.n_sent += 1 + else: + self.debug_print("No credit yet, not sending") + self.debug_print("sent: %d" % self.n_sent) + + def on_message(self, event): + if event.receiver in ( + self.routers['B']['mgmt_receiver'], + self.routers['C']['mgmt_receiver'], + self.routers['D']['mgmt_receiver']): + + if event.receiver == self.routers['B']['mgmt_receiver']: + router = 'B' + elif event.receiver == self.routers['C']['mgmt_receiver']: + router = 'C' + elif event.receiver == self.routers['D']['mgmt_receiver']: + router = 'D' + else: + raise Exception("Unexpected event receiver") + + # ---------------------------------------------------------------- + # This is a management message. + # ---------------------------------------------------------------- + if self.state == 'checking' : + connection_name = event.message.body['name'] + + if connection_name in self.connectors_map : + self.connectors_map[connection_name] = 1 + else : + self.state_transition("bad connection name: %s" % connection_name, 'bailing') + self.bail("bad connection name: %s" % connection_name) + + n_connections = sum(self.connectors_map.values()) + if n_connections == 6 : + self.state_transition("all %d connections found" % n_connections, 'examine_trace') + elif self.state == 'update_connector_cost': + if event.message.properties['statusDescription'] == 'OK': + self.state_transition('connector cost update done', 'examine_trace') + else : + self.state_transition(f"Connector cost update request failed, reply msg: {event.message}", 'bailing') + self.bail("Connector cost update request failed") + else: + # ---------------------------------------------------------------- + # This is a payload message. + # ---------------------------------------------------------------- + self.n_received += 1 + if self.state == 'examine_trace' : + trace = event.message.router_annotations.trace + expected = self.expected_traces[self.trace_count] + if trace == expected : + if self.trace_count == len(self.expected_traces) - 1 : + self.state_transition('final expected trace %s observed' % expected, 'bailing') + self.bail(None) + return + self.state_transition("expected trace %d observed successfully %s" % (self.trace_count, expected), 'update_connector_cost') + self.update_connector_cost(self.update_list[self.trace_count]) + self.trace_count += 1 + else: + self.debug_print("expected trace %s but got %s -- trying again" % (expected, trace)) + + def on_accepted(self, event): + self.n_accepted += 1 + + def on_released(self, event) : + self.n_released += 1 + + def connector_check(self, router, connector) : + self.debug_print("checking connector for router %s" % router) + mgmt_helper = self.routers[router]['mgmt_helper'] + mgmt_sender = self.routers[router]['mgmt_sender'] + msg = mgmt_helper.make_connector_query(connector) + mgmt_sender.send(msg) + + def update_connector_cost(self, target) : + router = target[0] + connector = target[1] + new_cost = target[2] + + self.debug_print("Updating cost of connector %s to %d on router %s" % (connector, new_cost, router)) + mgmt_helper = self.routers[router]['mgmt_helper'] + mgmt_sender = self.routers[router]['mgmt_sender'] + msg = mgmt_helper.make_connector_update_command(connector, new_cost) + mgmt_sender.send(msg) + + def run(self): + Container(self).run() + + +if __name__ == '__main__': + unittest.main(main_module())