Atomikos Transaction Manager
Atomikos is a distributed transaction processor adhering to the Java Transaction API (JTA) specification. It is used to coordinate distributed transactions across multiple XA-aware resources. Typically it is used to coordinate transactions between JDBC/XA and/or JMS/XA resources.
In this article we’ll be using the open source community edition of Atomikos labeled ‘TransactionEssentials’ to build transactional JDBC and JMS connection pools in Apache Karaf.
Setting up the transaction service
The Apache Karaf feature for Atomikos is:
<feature name="atomikos" version="3.9.3" resolver="(obr)"> <bundle dependency="true">mvn:org.apache.geronimo.specs/geronimo-j2ee-connector_1.5_spec/2.0.0</bundle> <bundle dependency="true">mvn:org.apache.geronimo.specs/geronimo-jms_1.1_spec/1.1.1</bundle> <bundle dependency="true">mvn:org.apache.geronimo.specs/geronimo-jta_1.1_spec/1.1.1</bundle> <bundle>mvn:com.atomikos/transactions-osgi/${atomikos.version}</bundle> </feature>
Through Atomikos, we will export three core services:
- javax.transaction.UserTransaction: This is used for transaction boundaries management.
- javax.transaction.TransactionManager: This adds suspend/resume functionality to support additional features such as spawning a new transaction (REQUIRES_NEW).
- org.springframework.transaction.PlatformTransactionManager: This is a convenient wrapper for both UserTransaction and TransactionManager
The Blueprint setup for these is:
<blueprint default-activation="eager" xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <bean id="transactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close"> <property name="startupTransactionService" value="false" /> <property name="forceShutdown" value="false" /> </bean> <bean id="userTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"> <property name="transactionTimeout" value="1800"/> </bean> <bean id="platformTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"> <property name="transactionManager" ref="transactionManager" /> <property name="userTransaction" ref="userTransaction" /> <property name="allowCustomIsolationLevels" value="true" /> </bean> <service ref="transactionManager" interface="javax.transaction.TransactionManager"> <service-properties> <entry key="osgi.jndi.service.name" value="io.modio.osgi.atomikos"/> </service-properties> </service> <service ref="userTransaction" interface="javax.transaction.UserTransaction"> <service-properties> <entry key="osgi.jndi.service.name" value="io.modio.osgi.atomikos"/> </service-properties> </service> <service ref="platformTransactionManager" interface="org.springframework.transaction.PlatformTransactionManager"> <service-properties> <entry key="osgi.jndi.service.name" value="io.modio.osgi.atomikos"/> </service-properties> </service> </blueprint>
All three services have an additional namespace qualifier to avoid a potential conflict with Apache Aries JTA which automatically exports similar services without service properties.
MySQL/JDBC Setup
The Blueprint context for the JDBC XA connection pool is shown below:
<blueprint default-activation="eager" xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0"> <cm:property-placeholder persistent-id="io.modio.blog.osgi.atomikos"/> <!-- JDBC --> <bean id="xaDataSource" class="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"> <property name="url" value="${io.modio.blog.osgi.atomikos.db.url}" /> <property name="user" value="${io.modio.blog.osgi.atomikos.db.user}" /> <property name="password" value="${io.modio.blog.osgi.atomikos.db.password}" /> </bean> <bean id="xaPoolDataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean"> <property name="uniqueResourceName" value="test-db" /> <property name="testQuery" value="select 1" /> <property name="xaDataSource" ref="xaDataSource" /> <property name="minPoolSize" value="2" /> <property name="maxPoolSize" value="30" /> <property name="reapTimeout" value="0" /> </bean> <service ref="xaPoolDataSource" interface="javax.sql.DataSource"> <service-properties> <entry key="osgi.jndi.service.name" value="io.modio.blog.osgi.atomikos/db" /> </service-properties> </service> </blueprint>
The xaPoolDataSource service is published with an additional qualifier to avoid conflicts with other DataSources.
jdbc:mysql://host.domain.com:3306/database?pinGlobalTxToPhysicalConnection=true
ActiveMQ/JMS Setup
The Blueprint setup for the ActiveMQ/JMS XA connection pool is shown below:
<blueprint default-activation="eager" xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <cm:property-placeholder persistent-id="io.modio.blog.osgi.atomikos"/> <reference id="platformTransactionManager" interface="org.springframework.transaction.PlatformTransactionManager" filter="(osgi.jndi.service.name=io.modio.blog.osgi.atomikos)"/> <!-- Underlying connection to ActiveMQ --> <bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory"> <property name="brokerURL" value="${io.modio.blog.osgi.atomikos.activemq.broker.url}" /> <property name="redeliveryPolicy"> <bean class="org.apache.activemq.RedeliveryPolicy"> <property name="initialRedeliveryDelay" value="1000" /> <property name="backOffMultiplier" value="5" /> <property name="useExponentialBackOff" value="true" /> <property name="maximumRedeliveries" value="5" /> </bean> </property> </bean> <!-- Connection factory wrapper to support auto-enlisting of XA resource --> <bean id="jmsXaPoolConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init" destroy-method="close"> <property name="uniqueResourceName" value="activemq-xa" /> <property name="xaConnectionFactory" ref="jmsXaConnectionFactory" /> <property name="localTransactionMode" value="false" /> <property name="minPoolSize" value="2" /> <property name="maxPoolSize" value="100" /> <property name="reapTimeout" value="0" /> </bean> <service ref="jmsXaPoolConnectionFactory" interface="javax.jms.ConnectionFactory"> <service-properties> <entry key="osgi.jndi.service.name" value="io.modio.blog.osgi.atomikos/jms"/> </service-properties> </service> </blueprint>
Given the setup above, any service that uses connections drawn from these connection pools in the same thread through the PlatformTransactionManager or the TransactionManager will automatically enlist them in the same transaction.
Very nice article! Did you experience problems during shutdown of Karaf, i.e. the connection factory or transaction manager blueprint bundles shutting down too early? Is recovery working correctly?
Hi Sebastian, you’re spot-on in both observations.
Yes, we do experience problems during shutdown with Atomikos complaining about the JMS connection being lost. This appears to be a bug in Karaf (at least 3.0.1) not respecting the ‘start-level’ attribute when unloading bundles. As a result the ActiveMQ broker is shutdown before the Atomikos pool is closed. If the ActiveMQ broker is instantiated in a separate container, this issue disappears.
Atomikos 3.9.3 seems to have a synchronization bug in the recovery code, specifically in XATransactionalResource.endRecovery() method where it sometimes throws an NPE. There’s a bug description in Atomikos 3.9.4 release notes that might have addressed this issue (72990: Exception on timeout of coordinator), but with 3.9.4 not publicly available, this is not simple to test.