Fast producer failover with ActiveMQ

When you’re sending messages to a cluster of ActiveMQ brokers using failover protocol and the current broker dies, the transport will try to connect and send messages to the next one. If that one is unavailable it will try next and so on and so on. So what happens when all brokers in the cluster are down? You’ll get the feeling that send call is hanging. At least until one of the brokers gets up again.

For some applications this is a desired behavior, but others don’t want (or can’t afford) to hang on sending messages under any circumstances. The easy solution for this is to use the timeout option on a failover transport. Take a look at the following queue producer.

public class QueueProducer {
  public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory =
          new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?timeout=1000");
        Connection connection = factory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection.start();
        Queue queue = session.createQueue("TEST");
        MessageProducer producer = session.createProducer(queue);
        // send messages
        for (int i = 0; i < 1000; i++) {
                producer.send(session.createTextMessage(i + " message"));
                System.out.println("Sent message " + i);
                Thread.sleep(1000);
        }
        producer.close(); session.close(); connection.close();
  }
}

What we have here is a producer that tries to send 1000 messages to the broker listening at port 61616. If that broker is down, it will wait one second (timeout=1000) before it fails. So when the broker is down, you can expect something like this:

2010-08-16 15:52:12,336 [main           ] INFO  FailoverTransport           - Failover timed out after 1008ms
Exception in thread "main" javax.jms.JMSException: Failover timeout of 1000 ms reached.
	at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:62)
	at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1298)
	at org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1382)
	at org.apache.activemq.ActiveMQConnection.createSession(ActiveMQConnection.java:309)
	at icap.QueueProducer.main(QueueProducer.java:15)
Caused by: java.io.IOException: Failover timeout of 1000 ms reached.
	at org.apache.activemq.transport.failover.FailoverTransport.oneway(FailoverTransport.java:529)
	at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
	at org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81)
	at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86)
	at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1276)
	... 3 more

So how does it work under the hood? The failover transport will try to reconnect to the broker (with interval of 100 ms between retries) and if don't succeed in a specify amount of time it will throw an exception. What implication does this introduce? The obvious one is that you cannot specify "small" timeout periods (less than 100 ms) as you'll lose your failover functionality. Again, for most applications this is more than enough, but even for those that require "fast producer failure" there is a solution. Actually two of them.

Solution number one is to use TransportListener to be notified on your connection status. So when you know you have a valid connection, you send your messages in a regular manner. And execute your backup logic in case all brokers are down. Consider the following:

public class FastFailProducer implements TransportListener {
  boolean volatile connected = false;
  public void run() throws Exception {
        ActiveMQConnectionFactory factory =
          new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?timeout=1000");
        factory.setTransportListener(this);
        Connection connection = factory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection.start();
        Queue queue = session.createQueue("TEST");
        MessageProducer producer = session.createProducer(queue);
        // send messages
        for (int i = 0; i < 1000; i++) {
             if (connected) {
                // send a message
                producer.send(session.createTextMessage(i + " message"));
                System.out.println("Sent message " + i);
             } else {
                // execute your backup logic
                System.out.println("Message " + i + " not sent");
             }
             Thread.sleep(1000);
        }
        producer.close(); session.close(); connection.close();
  }
  public static void main(String[] args) throws Exception {
     FastFailProducer producer = new FastFailProducer();
     producer.run();
  }
  public void transportResumed() {
     connected = true;
  }
  public void transportInterupted() {
     connected = false;
  }
  public void onException(IOException error) {
    connected = false;
  }
  public void onCommand(Object command) {}
}

Here we modified a previous example, so that now we implement the transport listener. When the link goes up, transportResumed() gets called. When the link goes down, transportInterupted() gets called. When there's an unrecoverable transport exception (like timeout, or maximum number of retries are reached) onException() gets called.

So now we can have a state of our connection and decide whether we want to send a message or just fail back even before we try. Executing the example against the broker that goes up and down, you can expect the following result

2010-08-16 17:09:23,799 [ActiveMQ Task  ] INFO  FailoverTransport   -
  Successfully connected to tcp://localhost:61616
Sent message 0
Sent message 1
Sent message 2
Sent message 3
2010-08-16 17:09:27,843 [127.0.0.1:61616] WARN  FailoverTransport      -
Transport (localhost/127.0.0.1:61616) failed to tcp://localhost:61616 ,
  attempting to automatically reconnect due to: java.io.EOFException
Message 4 not sent
Message 5 not sent
Message 6 not sent
Message 7 not sent
Message 8 not sent
Message 9 not sent
2010-08-16 17:09:33,164 [ActiveMQ Task  ] INFO  FailoverTransport  -
  Successfully reconnected to tcp://localhost:61616
Sent message 10
Sent message 11
Sent message 12
Sent message 13

Now this is all fine and well, but in most cases when your connection is down you simply want to queue messages somewhere else until remote broker comes up. Why don't you just use embedded broker for that? So with this second approach, you actually always send messages to the broker embedded in your producer application. That broker is then connected to the remote broker using the failover protocol. In this way your producers can work without caring about the state of the connection. So let's change the original example to make it work with the embedded broker:

public class EmbeddedProducer {
  public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory =
          new ActiveMQConnectionFactory("vm:(broker:(network:static:tcp://localhost:61616)?persistent=false&useJmx=false)");
        Connection connection = factory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection.start();
        Queue queue = session.createQueue("TEST");
        MessageProducer producer = session.createProducer(queue);
        // send messages
        for (int i = 0; i < 1000; i++) {
                producer.send(session.createTextMessage(i + " message"));
                System.out.println("Sent message " + i);
                Thread.sleep(1000);
        }
        producer.close(); session.close(); connection.close();
  }
}

Basically we only changed the connection url. So now we send messages to the in-memory only embedded broker, connected to the remote broker. Of course, this is just an example, you can configure you embedded broker anyway you like. One thing that is important is that with the configuration like we have in this example, we must have a consumer on a remote broker in order to have messages forwarded there. This is configurable and you can have all your messages forwarded to the remote broker using staticallyIncludedDestinations.

To run a consumer, we can use ActiveMQ consumer tool and run something like

/activemq/example$ ant -Dsubject=TEST -Durl="failover:(tcp://localhost:61616)" consumer

Now you can run the producer, stop and start the broker and see that messages are flowing through the system and that producers are not affected by the state of the remote broker.

So there you go, I hope I managed to explain how failover protocol affects producers and how you can find an ideal setup for your application. By the way, if you're in Boston area be sure you pop by to FUSE meetup on August 17th.
heather like viagra; Cheap Generic Viagra viagra and pulmonary fibrosis
viagra enhancement Cheapest Viagra Prices health net hmo ca viagra
viagra test; Lowest Price Viagra discount viagra sale
"viagra sale prices" Viagra No Prescription viagra moa
viagra dosages Viagra Soft Tabs viagra ibuprofen
viagra and sexual misfunction Buying Viagra find search viagra free?
availability of viagra in new zealand Viagra Alternative viagra patent expires
holdere a href womens viagra a Viagra Prescription online viagra pharmacy
effects viagra 200mg Viagra Professional viagra smaple

8 comments

  1. Hi Dejan,

    very nice article, just one minor fix: shouldn’t the “boolean connected” variable declared as volatile, because accessed by your own thread and the notifying one? Or does everything happen in a single thread?

  2. Hi Sergio,

    thanks for the comment. You’re absolutely right the variable is updated from the failover transport thread. I didn’t think it through, just wanted to create a simple example. It’s fixed now.

    Cheers,
    Dejan

  3. We configure activemq transport in mule. Also use reconnection strategy wit and also setting timeout on failover transport. The reconnection strategy never works because of the failover timeout exception. Any suggestions? This works on windows though.

  4. Thanks for this, it works really well. But this is using default embedded brokers, is there a way to setup username and password for the embedded broker? I’m not using spring. Thanks!

  5. Or what is the equivalent BrokerService to “vm:(broker:(network:static:tcp://localhost:61616)?persistent=false&useJmx=false)”

Comments are closed.