Implementing retrial with a MDB or an MQ batch job? (WAS 7, MQ 6)

We need to listen for messages distributed via Websphere MQ to get informed when an employee joins or leaves IBM. And because the resources used in the processing (a database, a webservice) may be temporarily unavailable, we must be able to deal with such outages, which may range from minutes to hours, by repeatedly retrying the processing after some delay. And we must be also able to deal with "poison messages", that means messages whose processing always fails either because their content is invalid or because their data isn't consistent with the database. The question is whether this would be better implemented as a Message-Driven Bean (MDB) or a batch job regularly checking its queue given that we have Websphere Application Server 7 (and thus Java EE 5) and Websphere MQ 6, which both have some important changes compared to the previous versions. It turns out that it depends - both approaches have some advantages and disadvantages and so it's a question of the likelihood of particular problems and business requirements and priorities.

Setting the scene

MDB vs. a batch job: the decision factors

Whether we should select the MDB or the batch job approach depends on a number of factors, some of them are:
  • Requirements
    • Quantity: What quantity of messages do we need to handle?
    • Error likelihood: What's the probability that a resource will be temporarily unavailable or that a message will contain data that can't be processed correctly and how soon such a problem needs to be resolved? I.e. can we wait to another day or shall we get going as soon as the resource is up again? This will tell us how often we need to retry and whether a manual handling of an issue is sufficient.
  • Support for error handling/retrial logic
  • Ease of use/development/maintenance
  • Performance: We need to handle all the incoming messages and to have minimal negative impact on the  performance of the target system
  • Speed of processing of the incoming messages (immediate vs. once or few times per day)
  • Integration with our operations monitoring (logging, our Operations Dashboard webapp)

The problems to deal with

There are three types of problems that can occur:
  1. A failure to communicate with MQ, for instance because a network connection got interrupted
  2. Inability to process a message due to a resource (a DB or a WS) being temporary unavailable
  3. A poison (invalid) message (wrong data type, unexpected content) leading to an exception during its processing

The two approaches

Approach 1: A Message-Driven Bean (MDB)

A MDB is hosted by an application server that does a lot of work on behalf of the bean (such as transaction and concurrency management) thus simplifying its development and configuration. It may be as simple as writing

public class SimpleMessageBean implements javax.jms.MessageListener {
    public void onMessage(javax.jms.Message inMessage) {
    final javax.jms.TextMessage msg = (javax.jms.TextMessage) inMessage;
    final String msgBody = msg.getText();
    // handle the msgBody ...

and configuring the ActivationSpecification for the MDB in JNDI via the app. server's administration UI.

The question is, of course, how well it can handle poison messages and retrial when resources are temporarily unavailable.

MDB error handling and configuration in Websphere 7

Let's have a look how Websphere deals with various kinds of errors related to MDBs and how we do configure a MDB in this application server, especially with regard to error handling.

MDB error handling in Websphere

What happens when an error occurs?

Normally an Application Server starts an MQ transaction before it invokes a MDB and it either commits it when the MDB finishes or rolls it back when it throws an exception. If the transaction succeeds then the message is removed from the MDB's queue otherwise it will be returned there and processed again in the future. This is the default behavior corresponding to configuring container-managed transactions (tx) with the type of 'required'. Notice that also DB operations can participate in this transaction and thus be committed/rolled back as well, which might be useful.
  1. In the case of an MQ communication/connection failure, WAS logs the exception and retries the connection later based on its configuration. It's also possible to set an ExceptionListener that would be called with the exception as a parameter in such a case.
  2. In the case of an exception during message processing (or due to a manual setRollbackOnly invocation) the current transaction is rolled back, the message is put back to the queue and the MDB is restarted. When the queue is re-checked, the message is found again and passed to another MDB. If the cause of the problem persists, this will fail again - and so on.
We have two ways how to deal with a failed message:
  1. Remove the message from the queue either by discarding it or by moving it to the queue's "back-out queue". This is appropriate when the message itself is the problem (e.g. contains data inconsistent with the DB ...).
  2. Stop processing messages from the queue (pause the Activation Specification) and restart it later when the problem is resolved. This is appropriate when a necessary resource is temporarily unavailable.

Messaging provider and JMS resources configuration related to error handling

We will use JCA 1.5 Activation Specification (and not Listener Ports that are deprecated since WAS 7) with Websphere MQ as the provider, which limits our configuration options to those described below.
  • Disabling (temporarily) the whole queue, more exactly shutting down the MDB
    • "Number of sequential delivery failures before suspending endpoint" - on an MQ Activation Specification
    • "Stop endpoint if message delivery fails" - if true, message delivery suspended when the Number of sequential delivery failures... exceeded
      • Prior to the Fix Pack this only applies when either the MDB throws an exception or an internal error happens, not when the transaction is marked for rollback (Issue IC60714).
      • When the endpoint is stopped, a JMX notification can be sent so that somebody is informed that the endpoint will need to be re-enabled. Perhaps we could automate this re-activation with a scheduled stateless EJB using Websphere JMX to perform the reactivation after a delay.
  • Removing problematic messages (this is done by Websphere MQ itself, not WAS)
    • "Backout threshold" (BOTHRESH; a property of a queue, configured via MQ) specifies the maximum number of times a message can be put onto a queue before it is moved to the specified backout requeue queue.  Default: 0 or unset?! => never re-delivered
    • "Backout requeue queue" (BOQNAME; a property of a queue, configured via MQ) - the queue where to put the failed messages; default: SYSTEM.DEAD.LETTER.QUEUE
    • WARNING: This seems to apply only to a queue, not to a topic. But underneath topics use (likely dynamic) queues anyway so it should be possible somehow.
  • Other related settings
    • There is no way to configure how often WAS should check for new messages, at least none I could find. There is a "rescan interval" that tells WAS how long to wait before checking another queue but seems only to apply when there are more queues than scanning threads. The default value is 5s and according to some posts it can't be increased (though this might not be true anymore in our version/fixpack level).
Interesting resources:

Design of the MDB approach

Error handling design

The main problem with the MDB approach is that it doesn't support retrying an operation after a delay (either for a single failing message or for the whole queue, if a resource is temporarily unavailable). There are some workarounds, but not very nice.
  • For a single message I couldn't find a way to implement retrials after some, preferably increasing, delay; the only thing we can do is to retry it few times with the default Websphere's delay, which seems to be 5s, and if it still doesn't succeed then move it into a special queue that would be processed manually while perhaps also sending an email notification.
  • If there is some global problem, such as an unavailable resource, indicated by several consecutive failures of one or more messages (depending on the content of the queue), we could let WAS stop the MDB and re-enable it later either automatically after a delay or manually when the problem gets fixed.

MDB design

  • When resource unavailability is detected, let WAS stop the MDB automatically via the setting "Number of sequential delivery failures before suspending endpoint". It will need to be re-enabled either manually or automatically.
    • Manual re-activation of the MDB: We have to somehow detect that the MDB was disabled (the only option is perhaps by watching the log), find out why it failed, and re-enable it via the Websphere Administration Console.
    • Automated re-activation: Implement a scheduled stateless EJB, which re-enables the MDB after few tens of minutes - preferably do this few times with increasing delay, if still failing, give up and notify an admin.
      • To find out when reactivation is necessary, the re-activating EJB can either regularly check the status of the MDB (which is feasible) or listen for JMX notifications issued by the logging system and watch for a deactivation notification. (The log watching approach isn't very elegant, the EJB would receive many unrelated notification, which is an unnecessary work for the server. Unfortunately the endpoint doesn't produce any special  JMX notification, just a log record.)
      • The re-activation itself is performed via JMX by invoking resume() on the appropriate J2CMessageEndpoint MBean (see the link above for how to get a handle to it).
      • In any case the application would need the permission to perform some WAS administration operations, namely to re-activate the MDB, and perhaps also to access JMX or the AdminServiceFactory/AdminService, which might be a problem if corporate security rules do not allow that.
  • When there is a poison message, move it to the backout queue and notify somebody to handle it (e.g. via email)
    • If the queue contains only one message there is no generic way how to find out whether the problem is within the message or in some resource, the MDB would need to find this out and communicate it. If there are multiple messages and only one fails, we know it's a poison message and it could be automatically removed by means of the "Backout threshold". (Beware of the interaction between the message's redelivery count/backout threashold and the "Number of sequential delivery failures..." - the letter is reset when either a message processing succeeds or when the MDB is stopped/restarted.)
  • (Perhaps we could use JMS selectors on JMS header properties (e.g. JMSRedeliveredto, JMSXDeliveryCount) but the possibilities are rather limited because both the selector query and the properties are static (e.g. something like MyRetryTime >= now() isn't possible). Note: MQ V7 has a major rewrite of the selectors support and their handling was moved from the Java client directly to the queue manager.)

MDB approach evaluation

Key MDB issues

  • Permissions to perform a WAS administration operation required.
  • Difficult to distinguish a poison message from a resource outage without additional logic when the queue contains only one element.
    • But see the batch job design below, which also requires to be able to distinguish these two types of problems.
  • Inefficient determination of MDB's status for the delay reactivation logic: either polling its status regularly or watching the log with many unrelated messages that can't be filtered out.

Key MDB advantages and disadvantages

  • Advantages
    • The data is processed and reaches the destination system soon after it is published
    • Key characteristics are configurable via UI (Number of sequential delivery failures, Backout threshold, MQ connection data, MQ security/SSL, processing concurrency, DataSource configuration/injection, ...). Actually this is also a disadvantage due to needing an admin, see below
    • Logging configurable at the runtime (Java logging, levels can be set in WAS console)
  • Disadvantages
    • Any configuration requires a WAS administrator and thus lot of time due to the IBM bureaucracy and ceremony
    • Difficult to collect and communicate statistics for our Operations Dashboard because (a) there are frequent fine-grained changes instead of 1/day batch changes and (b) there is no support for the Job logging framework of ours (a possible but laborious solution is to gather statistics in an instance variable and log them in regular intervals into the job tables using JDBC and some code extracted from the job framework)
    • Necessary to somehow configure the reactivation EJB (the reactivation delay, number of attempts before giving up)
MDB design questions:
  • Do we need automated reactivation of a disabled MDB? Perhaps not if: (1) a resource outage happens rarely and/or (2) the administration team spots this automatically and can handle it automatically without any bureaucracy and consumption of our resources.

MDB resources

Essential docs Other (not all docs available for our version, namely MQ v6 and WAs v7)

Approach 2: A batch job checking MQ regularly

A batch job is a command-line application that is regularly, for example once a day, run by cron and actively scans its incoming queue/topic for new messages and processes them all at once. All the JMS communication and management and configuration must be implemented from scratch. (Though utilities such as Spring JMS template may simplify it.)

Job error handling and configuration

Error handling

The problems and ways to deal with errors are the same as when running in an application server, only we have to do everything ourselves. That means we need to manually start a transaction, catch exception and commit/roll back and configure the topic/queue for poison messages.

We would need to implement a problem cause detection logic to distinguish whether there is a temporary resource outage or whether there is a problem with the message itself (either incorrect data type or data inconsistent with the target DB ). There is no other good way to deal with these distinct kinds of problems without really knowing which of them it is.

We would deal with the problems as follows:
  • For a resource outage, retry few times with an increasing delay, then quit and postpone the processing till the next scheduled execution
  • For a poison message, move it to the backout queue and notify an administrator

JMS configuration in a batch job

We have two ways to configure the JMS resources (a Topic and a (Topic)ConnectionFactory) and their options related to error handling:
  1. Use MQ provider-specific APIs to create the objects and configure them. See this JMS + MQ API example.
  2. Configure the provider-specific resources in JNDI and use only the standardized JNDI and JMS APIs. This is very easy with Sun's filesystem-based JNDI provider (fscontext.jar and providerutil.jar) and vendor-supplied tools for generating the JNDI .bindings file for existing JMS resources . In the case of MQ you can do it in the MQ Explorer GUI or with the command-line utility JMSAdmin (a JMSAdmin example, another one).
You can create the JNDI configuration via the MQ Explorer wizards - after having added a new JNDI "context" using fscontext and a local directory - either by first creating the JMS resource and then letting the wizard generate an MQ resources for it and adjusting it as needed:

Or by creating the JMS resource from an existing MQ resource:

Provided that the FsContext configuration file .bindings produced by JMSAdmin/MQ Explorer is in the folder /etc/mqJndiconfig, we would connect to the MQ as follows:

final Hashtable<String, String> env = new Hashtable<String, String>();
env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.fscontext.RefFSContextFactory");
env.put(Context.PROVIDER_URL, "file:/etc/mqJndiconfig");

final InitialContext context = new InitialContext(env); ConnectionFactory qcf = (javax.jms.ConnectionFactory) context.lookup("MyConnectionFactory"); // Note: I set the channel property MCAUSER so it actually isn't necessary to supply credentials below: final Connection connection = qcf.createConnection("anna", "password ignored");

// Client ID is necessary for a durable subscr. // We could alternatively set in on the ConnectionFactory - the CLIENTID property connection.setClientID("MyCoolApplication");

final Session session = connection.createSession(true, -1); // the param 2 is ignored for durable subscr.

final Topic destination = (Topic) context.lookup(JNDI_PREFIX + TOPIC); final MessageConsumer receiver = subscribe(session, destination);

try { connection.start(); } catch (javax.jms.JMSException e) { throw new RuntimeException("Failed to start the JMS connection", e); }

We would then read the messages via:

while ((msg= receiver.receiveNoWait()) != null) {

The dependencies include jms.jar, fscontext.jar and providerutil.jar. You can find them either in the WMQ installation or on the web.

You may want to have a look at Writing a simple publish/subscribe application connecting through WebSphere MQ in WMQ help, which discusses some of the code above in more detail.

Job design

  • Set a reasonable backout threshold and a suitable backout queue on the queue used by the Topic so that problematic messages are removed automatically after few failed attempts
    • Some monitoring of the backout queue would be necessary. If the MQ infrastructure doesn't provide it then we can implement another MQ/JMS reader that would send an email when there are some new messages in the queue.
    • Regarding the type of the topic queue:
      • Either we can use the shared JMS queue (SYSTEM.JMS.D.SUBSCRIBER.QUEUE) for the topic
      • Or we can use a non-shared queue unique for our application, which would be actually better and more aligned with IBM standards. It's achieved by setting a broker durable subscription queue pattern in the form "SYSTEM.JMS.D..*" (notice the trailing *) on the JMS ConnectionFactory when defining it in JNDI or in the Java code. Upon the first connection a permanent dynamic queue is generated for the client. We can then set the backout options on it (or the administrators may define a model queue for these dynamic queues with this setting already applied).
  • Read and process each message in a new MQ transaction so that if the processing fails it will be put back into the queue (and its delivery count will be increased, thus making it perhaps eligible for the backout queue)
    • ISSUE: The failed message would be obtained again on the next read (because it goes to the front, not to the end of the subscriber's queue) and thus we can't process any other message before dealing with it. Possible solutions:
      1. Fail immediately, try again during the next regular run. If this happens several times in a row for a particular message then it will be moved to the backout queue by MQ (w.r.t. the settings above).
      2. Wait for a while such as 10-20m and try it again. If it still fails, continue as in #1.
    • Notice that DB transactions are not a part of the MQ transaction (unless we use some external transaction manager and XA resource managers, which would require more work) but that shouldn't be a problem for us. If the DB tx fails then we will manually roll back the MQ tx. If the DB tx succeeds but later we fail to communicate the success to MQ then the message will stay in the queue and be processed again, which isn't a big problem in our particular case. (Global transactions with DB operations being a part of the MQ tx are supported only either (1) for a "server application" running on the MQ machine or (2) with an external XA tx manager, such as in WAS.)

Batch job approach evaluation

  • Advantages
    • Simple implementation of flexible delayed retrials - upon a resource outage, end the job and try again during the next scheduled run or, perhaps, retry first after a manual delay (Thread.sleep(); beware connection timeouts).
    • Simple integration into our monitoring/logging framework incl. the Operations Dashboard.
  • Disadvantages
    • More coding to set up/clean the resources and handle errors, which is not trivial, and thus also more bug prone.
    • Concurrent processing of the messages would be considerably more difficult to implement correctly if possible at all (if MQ JMS does implement the necessary optional methods). We would need to use the advanced methods targeted at app. server vendors - there is a non-MQ example of a multithreaded (non-durable subscription) consumer. Hopefully it could be modified for a durable one using Connection.createDurableConnectionConsumer with the same simple implementation of ServerSessionPool.
      • Impl. details: The "pool" would always create a new custom ServerSession implementation. instance, whose getSession() would simply create a new transacted TopicSession, set its MessageListener, and run the session in a new Thread when start() called. Notice that (Topic)Session implements Runnable, whose run() invokes the provided listener sequentially for available messages. The listener would process a message and call commit/rollback on its owning session (see this transactional listener example).
      • Important: Handling of failed messages would need to be considered carefully as returning a message to the queue would lead to its immediate re-processing and likely rejection by one of the other threads, exceeding its backout treshold in a few seconds and thus subverting the delayed retrial processing. On the other hand, as mentioned above, we should anyway be able to distinguish resource outage, in which case we would stop processing immediately, and a problematic message, which would anyway end up in the backout queue so this is perhaps not a real issue.
      • Note: When using MessageListeners, it's important to set an ExceptionListener on the connection because some errors can only be communicated this way.

Summary and conclusion

The version and fixpack level of WMQ/WAS is very important.

Both the approaches have some pros and cons and it depends on the requirements and their priority which one would be more suitable.

MDB: It's more difficult to implement delayed retrial if it is required - it may be implemented via a scheduled EJB automatically re-enabling the MDB stopped by WAS after a number of failures (one issue is that we'd need WAS admin rights for the EJB to do that; another is performance due to the need to either monitor logs or check the MDB's status regularly). On the other hand, concurrent processing is available out of the box and also implementing a bean notifying about problematic messages in the backout queue is simpler thanks to the injection of the mail API resources. This solution may thus require some JMX and Java EE (scheduled EJB) magic and there may be unexpected problems with that.

JOB: Concurrency: it's more difficult to implement processing of the messages in parallel, there is even a slight chance that it's impossible. Also  more coding is required and thus there will be more bugs. On the other hand we can implement delayed retrials as we want. Thus if concurrent processing isn't critical while the automatic delayed retrial may be then this would be the best approach.

Tags: java

Copyright © 2024 Jakub Holý
Powered by Cryogen
Theme by KingMob