Friday, October 7, 2011

Oracle AQ with Spring

In my current project we use a Java SEDA. The MOM to support this is IBM (Websphere) MQ. Our most used object is the queue, which enables us to handle events asynchronously and by multiple consumers which greatly improves scalability and robustness.

From a Java point of view the MOM implementation is really not that important, as it is accessed via the JMS API. So whether its Websphere MQ, JBoss MQ, ... as long as it has JMS support its pretty transparent. We do use some minor MQ specific extensions (PCF) to get queue depths and the likes, but that is more from an operational management point of view.

The choice of MQ was made before I joined the project, probably because other legacy subsystems have the least trouble dealing with MQ since they are already IBM based. Although we don't benefit a lot from it possibilities, since there is no QueueManager to QueueManager communication or the likes in which MQ is really strong. But it has to be said that MQ is a solid and mature product with a lot of possibilities.

The downside is probably its price (especially if you under-use it) and that it requires specific MQ knowledge to operate and maintain a running instance. For example; moving messages from a queue to another natively on Solaris is not a trivial thing if your not into the MQ administration (no, the 'MOVE' command is not supported on MQ Solaris).

Since we are using 2 resources most of the time, this also implies that our backends are running XA transactions to make 2PC work between our MOM and RDBMS (Oracle).
A while ago someone threw the idea on the table to switch to Oracle AQ (Advanced Queues) which is the Oracle MOM implementation. I'm not going in the area of comparing MQ vs AQ, but the fact is that AQ supports JMS and is a fully fledged MOM (It also has Topics, btw), so on paper it is more then enough for our usages.

Cool detail is that JMS Connection that you obtain is actually backed by a normal (JDBC) database connection. In fact, what happens is that the AQ driver uses a datasource under the hood. If you do a Queue.publish() or a Queue.read() the AQ driver will translate that to stored procedure and send them through the SQL datasource you instantiated it with. This also means we could drop our XA, since we only need to enlist a single resource for both our MOM and RDBMS access.


To set this up my first idea was to look for a resource adapter (RAR) which would enable AQ via the application server (Webshere MQ also ships with a JEE RAR). At that point I did not knew how it would handle the JDBC connection sharing if connection would be made via the RAR, but anyway. Quickly I found out that there is no real AQ resource adapter available for other JEE servers then Oracle AS itself (for this I was using Glassfish btw).

There is: genericjmsra but you cannot use it "properties based" like you enter the uri/username/password of the MOM. See here for its AQ specific manual:

Quote:

Oracle JMS client does not allow creation of ConnectionFactory, QueueConnectionFactory or
TopicConnectionFactory utilizing JavaBean approach. The factory creation is only possible through AQjmsFactory class provided in the Oracle jms client api. However fortunately, Oracle does support the JNDI lookup approach. We will be focusing on the JNDI approach for Oracle AQ and glassfish integration

This means you need an Oracle LDAP server in which some remote objects are published which are then looked up by the RA. So sharing the same JDBC connection for relational access and AQ will certainly not be possible this way.

Fortunately you can use the AQJmsFactory (that's the main factory which you feed a datasource and it gives you back a JMS ConnectionFactory) directly from your code, but that would require some boiler plate code as the AQJmsFactory checks that the actual connection is a direct Oracle connection.

If you are using a JDBC pool, like for example C3PO or Commons DBCP they will wrap the connections (in order to suppress closes etc) and these connections will be rejected because they are no direct instance of the Oracle connection. Thankfully a new Spring module was released at the right time and comes to the rescue with: Spring jdbc-extensions.

This is that boiler plate you want to seamlessly integrate Oracle AQ with your existing Spring managed datasource and transactions. The extension will make sure the Oracle AQJmsFactory is given a proxy which will be an instance of the Oracle connection. The proxy enables us to control what we give the Oracle AQ implementation.

For example when it tries to call 'close' we will suppress the call, since we know it will be handled by transaction manager (datasource,hibernate, jta, ...) later on. If your interested in this check the source at: org.springframework.data.jdbc.config.oracle.AqJmsFactoryBeanFactory.
That is the custom namespace handler for the AQ Spring XML config which creates the appropriate beans to do the boiler plate.

In this first example we create a scenario in which an event is received (Q1), a database record is inserted (T1) and a second event is published (Q2). All of this should run in one transaction, so if there is a failure at any point everything should be reverted (1 message back on Q1, no records in T1, and no messages on Q2). If everything succeeds, the message from Q1 should be processed, the record inserted and a new message published on Q2.

To start I'm going to setup the two AQ queue's and their queue table:

EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'Q1_T', queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE');
EXECUTE DBMS_AQADM.CREATE_QUEUE (Queue_name => 'Q1',  Queue_table => 'Q1_T', max_retries => 2147483647);
EXECUTE DBMS_AQADM.START_QUEUE (Queue_name => 'Q1');

EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'Q2_T', queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE');
EXECUTE DBMS_AQADM.CREATE_QUEUE (Queue_name => 'Q2',  Queue_table => 'Q2_T', max_retries => 2147483647);
EXECUTE DBMS_AQADM.START_QUEUE (Queue_name => 'Q2');

On AQ each Queue needs to have a corresponding queue table. The queue table is the table where the data is physically stored. You will never talk to a queue table directly, but you can use it with DML to query them via your favorite database IDE. On each you can specifiy additional properties, on the queue table you have to specifiy which payload it will have. On the queue itself you can specifiy after how many unsuccesful dequeues the message is moved to the exception queue.

In our project we make use of an application level failover and DLQ management system with separate queueing. So we don't need this feature. There is however no way to turn this off, so we've chosen the max setting (which is Integer.MAX_VALUE). Btw; the exception queues are generated automatically, you have no control over them.

To check if everything is created:

select * from all_queues where name like 'Q1%' or name like 'AQ$_Q1%' or name like 'Q2%' or name like 'AQ$_Q2%'
The results:

NAME QUEUE_TABLE QID QUEUE_TYPE MAX_RETRIES
Q2 Q2_T 365831 NORMAL_QUEUE 2147483647
AQ$_Q2_T_E Q2_T 365830 EXCEPTION_QUEUE 0
Q1 Q1_T 365816 NORMAL_QUEUE 2147483647
AQ$_Q1_T_E Q1_T 365815 EXCEPTION_QUEUE 0

Next we'll setup our Spring config. The goal is to create a message consumer that listens for messages on Q1 and processes them. Our processing will consist of inserting a record in T1 and putting a message on Q2.
 <!-- Sets up the JMS ConnectionFactory, in this case backed by Oracle AQ -->
 <bean id="oracleNativeJdbcExtractor" class="org.springframework.jdbc.support.nativejdbc.SimpleNativeJdbcExtractor"/>
 <orcl:aq-jms-connection-factory id="connectionFactory" data-source="dataSource" use-local-data-source-transaction="true" native-jdbc-extractor="oracleNativeJdbcExtractor"/>

 <tx:annotation-driven/>

 <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" lazy-init="true">
  <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
  <property name="url" value="jdbc:oracle:thin:host:port:SID"/>
  <property name="username" value="Scott"/>
  <property name="password" value="Tiger"/>
 </bean>

 <!-- Using DataSourceTxManager, but could also be HibernateTxManager or JtaTxManager -->
 <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager" lazy-init="true">
  <property name="dataSource" ref="dataSource"/>
 </bean>

 <!-- You can also construct the JMSTemplate in code, but we'll do it here so its all together in one place -->
 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  <property name="connectionFactory" ref="connectionFactory"/>
  <property name="defaultDestinationName" value="Q2"/>
  <property name="sessionTransacted" value="true"/>
 </bean>

 <bean id="myMessageListener" class="be.error.jms.MyMessageListener">
  <property name="dataSource" ref="dataSource"/>
  <property name="jmsTemplate" ref="jmsTemplate"/>
 </bean>

 <!-- Once it is started, it will try to read messages from Q1 and let 'messageListener' process them -->
 <bean id="messageListenerContainer"class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory"/>
  <property name="transactionManager" ref="transactionManager"/>
  <property name="destinationName" value="Q1"/>
  <property name="messageListener" ref="myMessageListener"/>
  <property name="sessionTransacted" value="true"/>
 </bean>
As you can see the magic is in the orcl:aq-jms-connection-factory which will make a JmsConnectionFactory available under the id 'connectionFactory' and using our datasource to do the AQ queueing.

Very important: if you don't want to spend half a day investigating some weird transaction behaviour (I even mistakenly thought it was a bug and pointed that out here) I suggest to read this:

In my configuration you will see that the 'sessionTransacted' is set to "true" for the JmsTemplate and for the DefaultMessageListenerContainer. This makes sense as we are running outside of a JEE managed environment and we want to have local transactions for our JMS operations. The theory behind it is however a bit more complex.

When running outside of a JEE managed environment you have the choice of letting your session interaction be part of a local transaction. This is controlled by the sessionTransacted setting (it maps directly on the JMS API). This means that if you consume messages from different objects belonging to the same session, they will be controlled in a single transaction.
For example, I create QueueSession #1 and I use it to consume a message from Q1 and consume another message from Q2. After consuming both messages, I can issue a session.rollback() and everything is brought back to its initial state. If I would have used no transactions, I would be working with an acknowledgement mode. Suppose I would have chosen CLIENT_ACKNOWLEDGE then I had to acknowledge on message level whether my message was successfully consumed. So I would have first retrieved message #1 from Q1 and then message #2 to Q2 (all via QueueSession #1). In the end I would have to do:

messageOne.acknowledge();
//system crashes here
messageTwo.acknowledge();
This could of course create inconsistency as in my example messageOne was marked consumed but messageTwo wasn't. This is only a problem if your unit of work should be treated in an atomic way. If it is you should use at least local transactions.

When you want to consume/produce messages from a Queue and do interaction with another resource (RDBMS) for example you should use a distributed transactionmanager (in our case that would mean JTA). But remember that we are not dealing with different resources here, it all comes down to a single database connection. So in our case the "local transaction" is a bit "longer local" then it would normally be as it also includes all our (SQL) calls made to that same database connection as the JMS infrastructure is using.

In our case the DataSourceTransactionManager will control the local transaction, and that includes JMS operations as well as SQL operations issued via JDBC. It is that component which will call commit or rollback. there is no need for intermediate commits on the queueSession.

So basically: by setting sessionTransacted to true, no one performs intermediate commits and leaves everything to whoever controls the transaction, in our case DataSourceTransactionManager.
Make sure you use JdbcTemplate for direct JDBC access and JmsTemplate for MOM access. Make sure sessionTransacted is set to true when you should create JmsTemplate in code. Also, the DefaultMessageListenerContainer is a JMS receiver and must also be sessionTransacted for the same reason.

You might want to be tempted to remove the sessionTransacted from the JmsTemplate and DefaultMessageListenerContainer if you are running in an JEE environment. The JMS API says that the values to sessionTransacted and acknowledgementMode are ignored in such case.
While this is true in general, it is not true in this case. The Oracle AQ will not properly detected that it is running in a JEE JTA environment if you are using anything else then Oracle AS. If you remove the property, then the driver will perform intermediate commits and your transaction will be broken. So also in JEE mode you will have to leave this set to true!

But don't worry, in the JTA case your datasource will then be XA enabled and the transactionmaanger performing commmits will be the JtaTranasctionManager. As far as the AQ driver is concerned it sees no difference (all transaction handling an coordination is done at an higher level).

Also, I'm using a DataSourceTransactionManager here, since I only require direct JDBC access.
If you would be using hibernate, you could use HibernateTransactionManager. You could then do AQ, plain JDBC access and work with hibernate's SessionFactory at the same time.
If you would still have another resource (maybe a 2nd RDBMS) and still want XA, you can simply plugin the JTA transaction manager without any problem (its just a matter of switching configuration).

For the Java messageListener part, this is all standard:

public class MyMessageListener implements SessionAwareMessageListener<Message> {

 private DataSource dataSource;
 private JmsTemplate jmsTemplate;


 @Override
 public void onMessage(Message message, Session session) throws JMSException {
  //Message received from Q1 via 'messageListenerContainer'
  TextMessage textMessage = (TextMessage) message;
  System.out.println("Received message with content:" + textMessage.getText());

  //Insert its content into T1
  new JdbcTemplate(dataSource).update("insert into T1 values (?)", textMessage.getText());
  System.out.println("Inserted into table T1");

  //Publish a message to Q2
  jmsTemplate.send(new MessageCreator() {
   @Override
   public Message createMessage(Session session) throws JMSException {
    TextMessage textMessage = session.createTextMessage();
    textMessage.setText("payload");
    return textMessage;
   }
  });
  System.out.println("Sended message to Q2");
 }

 public void setDataSource(DataSource dataSource) {
  this.dataSource = dataSource;
 }

 public void setJmsTemplate(JmsTemplate jmsTemplate) {
  this.jmsTemplate = jmsTemplate;
 }
}
I then created a small forever blocking test case to quickly fire up the application context so that the DefaultMessageListenerContainer could start looking for messages on Q1.

@Test
@ContextConfiguration(locations = { "classpath:/spring/aq-test.xml" })
public class OracleAqTransactionResourceTest extends AbstractTestNGSpringContextTests {

 @Autowired
 private DataSource dataSource;
 private JdbcTemplate jdbcTemplate;

 @BeforeMethod
 public void setup() {
  jdbcTemplate = new JdbcTemplate(dataSource);
 }

 public void testSingleTransaction() {
  System.out.println("Running...");
  blockUntillReadyOrTimeout();
  System.out.println("Done.");
 }

 private void blockUntillReadyOrTimeout() {
  while (true) {
   try {
    Thread.sleep(2000);
   } catch (InterruptedException e) {
    throw new RuntimeException(e);
   }
  }
 }
}
After launching the test, I inject a message into Q1 (I use Oracle SQL developer):

DECLARE
    msg SYS.AQ$_JMS_TEXT_MESSAGE;
    queue_options       DBMS_AQ.ENQUEUE_OPTIONS_T;
    message_properties  DBMS_AQ.MESSAGE_PROPERTIES_T;
    message_id RAW(30);

BEGIN
      msg := SYS.AQ$_JMS_TEXT_MESSAGE.CONSTRUCT();  
      msg.set_text('testing 123');
      DBMS_AQ.ENQUEUE(
        queue_name => 'Q1',
        enqueue_options => queue_options,
        message_properties => message_properties,
        payload => msg,
        msgid => message_id);
        commit;
END;
And off we go:

Running...
Received message with content:testing 123
Inserted into table T1
Sended message to Q2
In oracle we see that the message is present on Q2 (at least its queue table):
And that a record is inserted into T1:

You are free to play with some transaction scenario's, as creating multiple (possibly nested) transactions, let them rollback etc. I performed 5 scenario's and they all worked fine.

PS. make sure you use at least spring-jdbc 1.0_M2 (or up) since we discovered a small bug in M1 which could cost you some time to investigate :)