Asynchronous handling using JMS in a Spring application

We needed to asynchronous handling in our application, so we decided to use JMS to achieve this. We had one requirement, the asynchronous operation should run after the “current” transaction but only when the transaction succeeded. That is why we chose to use JMS instead of just dispatching to a different thread or using something like the LMAX disruptor.

The application was already using the Spring framework and uses Hibernate to access the database and a Hibernate transaction manager.

To introduce JMS, we need to add a JMS provider. In our case, we are using ActiveMQ.

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>${spring.version}</version>
</dependency>
 
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-broker</artifactId>
    <version>5.10.0</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-spring</artifactId>
    <version>5.10.0</version>
</dependency>

Now we can create a message queue with the name “myEventQueue”. This is done using the following configuration.

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                           http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
                           http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
 
    <!--  Embedded ActiveMQ Broker -->
    <amq:broker id="broker" useJmx="false" persistent="false">
        <amq:transportConnectors>
            <amq:transportConnector uri="tcp://localhost:0" />
        </amq:transportConnectors>
    </amq:broker>
 
    <!--  ActiveMQ Destinations  -->
    <amq:queue id="myEventQueue" physicalName="my.pkg.myEventQueue" ></amq:queue>
 
    <!-- JMS ConnectionFactory to use, configuring the embedded broker using XML -->
    <amq:connectionFactory id="jmsFactory" brokerURL="vm://localhost" />
 
    <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
        <property name="connectionFactory" ref="jmsFactory" />
    </bean>
 
    <bean id="jmsConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"
          depends-on="broker"
          p:targetConnectionFactory-ref="jmsFactory" />
 
    <jms:annotation-driven/>
 
    <bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
        <property name="connectionFactory" ref="jmsConnectionFactory"/>
        <property name="concurrency" value="3-10"/>
    </bean>
 
    <bean id="jmsProducerTemplate" class="org.springframework.jms.core.JmsTemplate"
          p:connectionFactory-ref="jmsConnectionFactory"
          p:sessionTransacted="true" />
 
</beans>

It assures that an embedded JMS server is setup (the “broker”) and it creates a JMS transactino manager. This setup makes the sending of messages transactional. When the transaction in which the event is sent is rolled back then the JMS message will not be actually sent. Note that receipt of the message is not trasnactional in this setup. So if the transaction which processes the message fails then the message is lost. This can be solved, but we haven’t done that yet.

Send a message to the queue can be done using code like the following:

@Component
public class EventProducer {
 
    @Autowired
    private JmsTemplate template;
 
    public void sendEvent(String eventParameter) {
        try {
            template.send("my.pkg.myEventQueue", session -> {
                    TextMessage message = session.createTextMessage("myEventQueue");
                    message.setStringProperty("parameter", eventParameter);
                    return message;
                });
        } catch (JmsException je) {
            // ... handle exception
        }
    }
 
}

To listen to events, you can create a listener component like this

@Component
public class EventListener implements MessageListener {
 
    @Autowired
    private WhateverHelper whateverHelper;
 
    @Override
    @Transactional
    @JmsListener(destination = "my.pkg.myEventQueue")
    public void onMessage(Message message) {
        try {
            String parameter = message.getStringProperty("parameter");
 
            if (message instanceof TextMessage) {
                TextMessage tm = (TextMessage) message;
                String msg = tm.getText(); // can be used for logging the event
 
                whateverHelper.doSomething(parameter);
            }
        } catch (JMSException e) {
            // ... handle exception
        }
    }
 
}

Leave a Reply

Your email address will not be published. Required fields are marked *

question razz sad evil exclaim smile redface biggrin surprised eek confused cool lol mad twisted rolleyes wink idea arrow neutral cry mrgreen

*