Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
.classpath
.project
.settings
.checkstyle

# IntelliJ IDEA #
#################
Expand All @@ -29,3 +30,7 @@ target
Icon?
ehthumbs.db
Thumbs.db

# Testsuite generated *_sctp.xml files #
########################################
testsuite/tests/*_sctp.xml
28 changes: 15 additions & 13 deletions .project
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>diameter-parent</name>
<comment/>
<projects/>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments/>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
<name>diameter-parent</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,39 @@ public interface IRouter {
*/
void processRedirectAnswer(IRequest request, IAnswer answer, IPeerTable table) throws InternalException, RouteException;


/**
* Indicates whether this router implementation is able to resubmit requests to an alternative peer,
* for which a Busy or Unable to Deliver Answer has already been received from one peer.<br /><br />
*
* <strong>Note: </strong> Returning <code>true</code> from this method when the router implementation has not been designed to
* handle resubmitting such requests can result in a request being resubmitted perpetually.
*
* @return <code>false</code> by default. <code>true</code> when and only when the router implementation has specific logic to handle
* submitting requests which have received a Busy or Unable to Deliver Answer from one peer, to an alternative peer, and to avoid
* perpetual re-submission of such requests.
*/
boolean canProcessBusyOrUnableToDeliverAnswer();


/**
* Called when a 3002 or 3004 is received for a request. This method attempts to resubmit the request to an alternative peer.
*
* @param request
* @param table
*/
void processBusyOrUnableToDeliverAnswer(IRequest request, IPeerTable table) throws InternalException, RouteException;


/**
* Based on Redirect entries or any other factors, this method changes route information.
* @param message
* @return
* @throws RouteException
* @throws AvpDataException
*/


boolean updateRoute(IRequest message) throws RouteException, AvpDataException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,17 @@ private boolean isRedirectAnswer(Avp avpResCode, IMessage answer) {
}
}

private boolean isBusyOrUnableToDeliverAnswer(Avp avpResCode, IMessage answer) {
try {
// E-bit set indicating a protocol error, and Result Code one of 3002 or 3004
return (answer.getFlags() & 0x20) != 0 && avpResCode != null
&& (avpResCode.getInteger32() == ResultCode.TOO_BUSY || avpResCode.getInteger32() == ResultCode.UNABLE_TO_DELIVER);
}
catch (AvpDataException e) {
return false;
}
}

@Override
public IStatistic getStatistic() {
return statistic;
Expand Down Expand Up @@ -480,6 +491,24 @@ private IMessage processRedirectAnswer(IMessage request, IMessage answer) {
return answer;
}

private IMessage processBusyOrUnableToDeliverAnswer(IMessage request, IMessage answer) {
if (router.canProcessBusyOrUnableToDeliverAnswer()) {
try {
logger.debug("Message with [sessionId={}] received a Busy or Unable to Deliver Answer and will be resubmitted.", request.getSessionId());
router.processBusyOrUnableToDeliverAnswer(request, table);
return null;
}
catch (Throwable exc) {
// Any error when attempting a resubmit to an alternative peer simply results in the original
// Busy or Unable to Deliver Answer being returned
if (logger.isErrorEnabled()) {
logger.error("Failed to reprocess busy or unable to deliver response - all peers exhausted?", exc);
}
}
}
return answer;
}

@Override
public void connect() throws InternalException, IOException, IllegalDiameterStateException {
if (getState(PeerState.class) != PeerState.DOWN) {
Expand Down Expand Up @@ -1031,7 +1060,12 @@ public boolean receiveMessage(IMessage message) {
if (isRedirectAnswer(avpResCode, message)) {
message.setListener(request.getEventListener());
message = processRedirectAnswer(request, message);
//if return value is not null, there was some error, lets try to invoke listener if it exists...
// if return value is not null, there was some error, lets try to invoke listener if it exists...
isProcessed = message == null;
}
if (isBusyOrUnableToDeliverAnswer(avpResCode, message)) {
message = processBusyOrUnableToDeliverAnswer(request, message);
// if return value is not null, there was some error, lets try to invoke listener if it exists...
isProcessed = message == null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ public IPeer getPeer(IMessage message, IPeerTable manager) throws RouteException
}

// Balancing
IPeer peer = selectPeer(availablePeers);
IPeer peer = selectPeer(message, availablePeers);
if (peer == null) {
throw new RouteException("Unable to find valid connection to peer[" + destHost + "] in realm[" + destRealm + "]");
}
Expand Down Expand Up @@ -639,6 +639,33 @@ public void processRedirectAnswer(IRequest request, IAnswer answer, IPeerTable t
}
}

/**
* This method should always return false unless specifically designed to handle
* submitting requests which have received a Busy or Unable to Deliver Answer from one peer, to an alternative peer, and to avoid
* perpetual re-submission of such requests.
*
* @return <code>false</code>
*/
@Override
public boolean canProcessBusyOrUnableToDeliverAnswer() {
return false;
}

public void processBusyOrUnableToDeliverAnswer(IRequest request, IPeerTable table) throws InternalException, RouteException {
try {
table.sendMessage((IMessage) request);
}
catch (AvpDataException exc) {
throw new InternalException(exc);
}
catch (IllegalDiameterStateException e) {
throw new InternalException(e);
}
catch (IOException e) {
throw new InternalException(e);
}
}

/**
*
*/
Expand Down Expand Up @@ -797,6 +824,10 @@ protected IPeer selectPeer(List<IPeer> availablePeers) {
return p;
}

protected IPeer selectPeer(IMessage message, List<IPeer> availablePeers) {
return selectPeer(availablePeers);
}

// protected void redirectProcessing(IMessage message, final String destRealm, final String destHost) throws AvpDataException {
// String userName = null;
// // get Session id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,23 @@

package org.jdiameter.client.impl.router;

import java.util.Arrays;
import java.util.List;

import org.jdiameter.api.Configuration;
import org.jdiameter.api.MetaData;
import org.jdiameter.api.PeerState;
import org.jdiameter.client.api.IContainer;
import org.jdiameter.client.api.IMessage;
import org.jdiameter.client.api.controller.IPeer;
import org.jdiameter.client.api.controller.IRealmTable;
import org.jdiameter.common.api.concurrent.IConcurrentFactory;
import org.jdiameter.common.api.statistic.IStatistic;
import org.jdiameter.common.api.statistic.IStatisticRecord;
import org.jdiameter.server.api.IRouter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import org.jdiameter.server.api.IRouter;

/**
* Weighted Least-Connections router implementation<br/><br/>
*
Expand Down Expand Up @@ -102,13 +103,13 @@ public WeightedLeastConnectionsRouter(IContainer container, IConcurrentFactory c
* <pre>
* {@code
* for (m = 0; m < n; m++) {
* if (W(Sm) > 0) {
* for (i = m+1; i < n; i++) {
* if (C(Sm)*W(Si) > C(Si)*W(Sm))
* m = i;
* if (W(Sm) > 0) {
* for (i = m+1; i < n; i++) {
* if (C(Sm)*W(Si) > C(Si)*W(Sm))
* m = i;
* }
* return Sm;
* }
* return Sm;
* }
* }
* return NULL;
* }
Expand All @@ -125,6 +126,19 @@ public WeightedLeastConnectionsRouter(IContainer container, IConcurrentFactory c
*/
@Override
public IPeer selectPeer(List<IPeer> availablePeers) {
return selectPeer(null, availablePeers);
}

/**
* Return peer with least connections
*
* @param message the message to be sent
* @param availablePeers list of peers that are in {@link PeerState#OKAY OKAY} state
* @return the selected peer according to algorithm
*
*/
@Override
public IPeer selectPeer(IMessage message, List<IPeer> availablePeers) {
int peerSize = availablePeers != null ? availablePeers.size() : 0;

// Return none if empty, or first if only one member found
Expand Down
Loading