Author Archives: justme

Transaction management with Spring in Activiti

You can make sure that Activiti uses the transaction manager defined in Spring using the “transactionManagement” property on the SpringProcessEngineConfiguration class.

This works but behaves different from how Hibernate/JPA behave.

When you explicitly create your transaction boundaries (using TransactionTemplate or @Transactional annotations), these are simple used as expected.

However if you do not define the transaction boundaries, Hibernate/JPA will throw an exception indication that a transaction needs to be active while Activiti will simply create a transaction, more or less working in autocommit mode.

You can assure that Activiti has similar behaviour as Hibernate/JPA by using a custom SpringProcessEngineConfiguration class. Unfortunately, this does not work in all cases. When you use the jobExecutor (so jobExecutorActive needs to be false). You also need to assure that the conversion does not occur during auto deploy of resources.

public class MySpringProcessEngineConfiguration extends SpringProcessEngineConfiguration {
 
    private DcSpringTransactionInterceptor txInterceptor;
 
    @Override
    protected CommandInterceptor createTransactionInterceptor() {
        if (transactionManager == null) {
            throw new ActivitiException("transactionManager is required property for SpringProcessEngineConfiguration, use "
                    + StandaloneProcessEngineConfiguration.class.getName() + " otherwise");
        }
 
        txInterceptor = new DcSpringTransactionInterceptor(transactionManager);
        return txInterceptor;
    }
 
    @Override
    public ProcessEngine buildProcessEngine() {
        if (null != txInterceptor) {
            txInterceptor.setConvertRequiredToMandatory(false);
        }
        ProcessEngine processEngine = super.buildProcessEngine();
        ProcessEngines.setInitialized(true);
        autoDeployResources(processEngine);
        if (null != txInterceptor) {
            txInterceptor.setConvertRequiredToMandatory(true);
        }
        return processEngine;
    }
 
}

The actual work is done in the MySpringTransactionInterceptor which converts REQUIRED transaction propagation to MANDATORY.

@Slf4j
class MySpringTransactionInterceptor extends SpringTransactionInterceptor {
 
    @Setter
    private boolean convertRequiredToMandatory;
 
    /**
     * Constructor.
     *
     * @param transactionManager transaction manager
     */
    DcSpringTransactionInterceptor(PlatformTransactionManager transactionManager) {
        super(transactionManager);
    }
 
    @Override
    public <T> T execute(final CommandConfig config, final Command<T> command) {
        log.debug("Running command with propagation {}", config.getTransactionPropagation());
 
        TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);
        transactionTemplate.setPropagationBehavior(getPropagation(config));
 
        return transactionTemplate.execute((status) -> next.execute(config, command));
    }
 
    private int getPropagation(CommandConfig config) {
        switch (config.getTransactionPropagation()) {
            case NOT_SUPPORTED:
                return TransactionTemplate.PROPAGATION_NOT_SUPPORTED;
            case REQUIRED:
                if (convertRequiredToMandatory) {
                    return TransactionTemplate.PROPAGATION_MANDATORY; // omgezet naar mandatory, tx handling moet expliciet gebeuren
                } else {
                    return TransactionTemplate.PROPAGATION_REQUIRED;
                }
            case REQUIRES_NEW:
                return TransactionTemplate.PROPAGATION_REQUIRES_NEW;
            default:
                throw new ActivitiIllegalArgumentException("Unsupported transaction propagation: " + config.getTransactionPropagation());
        }
    }
 
}

Transactional annotation on a Spring service method not picked up

I spent several hours trying to hunt down why a @Transactional annotation on a method was not being picked up. It was picked up on various other method, but not on this particalar service which only contains one method.

@Component
public class SomeHelper {
 
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    protected ReturnType doSomething(String paramater) {
        // handling code
    }
 
}

There are also some autowired components in there which were working.

Several hours later, I found it.

protected

Apparently the annotation is only applied to public methods.

JMS with proper transactional properties

The main reason I am interested in JMS (inside the application) is for the transactional properties. JMS allows you to start an asynchronous task and make sure that the task is only scheduled if the transaction in which the JMS message was sent succeeds.

Similarly, JMS is smart enough to retry receiving the message until it was processed successfully (potentially with a backout scenario to assure that retries don’t hog the system).

This sounds great, but configuring JMS to work like this is hard.

The generic solution is to use XA transactions. In that case the transactions of your data sources (like your database) and your JMS provider are synchronized.

If you are only using one database, then using that database for persistence of your JMS messages avoids the need for XA transactions. There is only one data source, so no transactions to synchronize.

Using Spring and ActiveMQ this can be done using a configuration like the following. This only uses the JMS inside the application, connection from the outside is not possible. This example persists in a PostgreSQL database.

<!--  Embedded ActiveMQ Broker -->
<amq:broker id="broker" useJmx="false" persistent="true">
    <amq:transportConnectors>
        <amq:transportConnector uri="tcp://localhost:0" />
    </amq:transportConnectors>
    <amq:persistenceAdapter>
        <amq:jdbcPersistenceAdapter changeAutoCommitAllowed="false" createTablesOnStartup="false" useDatabaseLock="false">
            <amq:adapter><amq:postgresql-jdbc-adapter/></amq:adapter>
            <amq:dataSource><ref bean="dataSource" /></amq:dataSource>
        </amq:jdbcPersistenceAdapter>
    </amq:persistenceAdapter>
</amq:broker>
 
<!--  ActiveMQ Destinations  -->
<amq:queue id="zzzz" physicalName="your.queues.physical.name.ZzzzQueue" />
 
<!-- JMS ConnectionFactory to use, configuring the embedded broker using XML -->
<amq:connectionFactory id="jmsFactory" brokerURL="vm://localhost" />
 
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
    <property name="connectionFactory" ref="jmsFactory" />
</bean>
 
<bean id="jmsConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"
      depends-on="broker"
      p:targetConnectionFactory-ref="jmsFactory" />
 
<jms:annotation-driven/>
 
<bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
    <property name="connectionFactory" ref="jmsConnectionFactory" />
    <property name="sessionTransacted" value="true" />
    <property name="concurrency" value="3-10" />
</bean>
 
<bean id="jmsProducerTemplate" class="org.springframework.jms.core.JmsTemplate"
      p:connectionFactory-ref="jmsConnectionFactory"
      p:sessionTransacted="true" />

Your messages are now only delivered when your transaction in which the message was sent succeeds.
When the transaction in which the message is received rolls back, the message will be redelivered.

If needed, you can control how many times the message should be attempted to be processed. This can be done using code like the following:

@Transactional
@JmsListener(destination = "your.queues.physical.name.ZzzzQueue")
private static final int MAX_DELIVERY_ATTEMPTS = 3;
 
public void onMessage(Message message) {
    try {
        int count = message.getIntProperty("JMSXDeliveryCount");
        if (count > MAX_DELIVERY_ATTEMPTS) {
            LOG.warn("Processing JMS message {} failed {} times. It will not be retried.", message, MAX_DELIVERY_ATTEMPTS);
            return;
        }
 
        // ..... normal handling of message
    } catch (JMSException e) {
        // ..... handle exception
    }
}