ONJava.com -- The Independent Source for Enterprise Java
oreilly.comSafari Books Online.Conferences.

advertisement

AddThis Social Bookmark Button
Article:
  Designing Messaging Applications with Temporary Queues
Subject:   Code on the client side
Date:   2007-04-24 17:44:09
From:   ThribhuvanThakur
Response to: Code on the client side

Here is the protoype.
We start off be initiating a JNDI look up for Queue Connection Factory

Access the resources by looking up the JNDI tree

Get the Connection Factory reference

Get the connection to JMS Server

Get the reference to the static destination that MDB is listening on


Create a Session using Connection Object

Create the Temporary Destinations using Session Object

Create the Message - in our example a text message

Set the temporary queue name as a message header - JMSReplyTo


Create Receiver - We are now ready to listen on the temporary Queue


Create worker thread pass in Session, Receiver and the Connection so that it makes the blocking call


Start the thread.

At this point a new thread is spawned and waiting for the reply

Create a sender to the static queue using the session

send the message to the static destination

Assuming MDB is active and accepting requests, depending upon the vendor, you could accept the message in a second or couple of seconds - depends upon the complexity of the workflow you are dealing with on the Facade side


Clean up

Worker thread will close the session and the receiver passes in, but not the connection

Close the Connection as soon as you are done with all the threads


IMPORTANT NOTE: YOU CAN'T PASS IN THE SAME SESSION TO MORE THAN ONE THREAD, EVEN THOUGH YOU CAN PASS IN THE SAME CONNECTION OBJECT, OTHERWISE, YOU GET javax.jms.IllegalStateException SINCE JMS SESSIONS ARE NOT MULTITHREADED AS DESCRIBED IN THE ARTICLE



/** We start off be initiating a JNDI look up for Queue Connection Factory*/
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, VendorSpecificContextFactory);
env.put(Context.PROVIDER_URL, JMS_URL);
env.put(Context.SECURITY_PRINCIPAL, USERNAME );
env.put(Context.SECURITY_CREDENTIALS, PASSWORD);
Context context = new InitialContext(env);
System.out.println("Go the Initial Context");
/** Access the resources by looking up the JNDI tree */
/** Get the Connection Factory reference*/
QueueConnectionFactory qcf = (QueueConnectionFactory)context.lookup(QCFNAME);
System.out.println("Go the QCF");


/** Get the connection to JMS Server */
javax.jms.QueueConnection qConnection = (javax.jms.QueueConnection)qcf.createQueueConnection(USERNAME, PASSWORD);

/** Get the reference to the static destination that MDB is listening on */
javax.jms.Queue staticQueue = (javax.jms.Queue)context.lookup(STATIC_QUEUE_NAME);

/** Create a Session using Connection Object */
javax.jms.Session qSession = (QueueSession)qConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);

/** Create the Temporary Destinations using Session Object */
javax.jms.Queue replyTempQueue = qSession.createTemporaryQueue();
System.out.println("Temporary Queue Name:" + replyTempQueue);

/** Create a Message - in our example a text message */
TextMessage message= qSession.createTextMessage();
message.setText("sampletext"); // in most cases it is an xml

/**Optional - set message properties - if any for example user, department*/
message.setStringProperty(USER, USER);
message.setStringProperty(DEPT, DEPT);

/** Set the temporary queue name as a message header - JMSReplyTo*/
message.setJMSReplyTo(replyTempQueue); //mandatory in the context of temporary destinations

/** Optional - vendor specific - set the message expiration*/
message.setJMSExpiration(30000); //message will expire after 30 seconds

/**Assuming MDB is active and accepting requests, depending upon the vendor,
* you could accept the message in a second or couple of seconds - depends upon
* the complexity of the workflow you are dealing with on the Facade side*/

/** Create Receiver - We are now ready to listen on the temporary Queue*/
QueueReceiver receiver= ((QueueSession)qSession).createReceiver(replyTempQueue);

/** Create worker thread that makes the blocking call*/
RequestHelper worker = new RequestHelper(qSession, qConnection,replyTempQueue,receiver );
worker.start();
/** At this point a new thread is spawned and waiting for the reply*/

/** Create a sender to the static queue using the session*/
QueueSender sender= ((QueueSession)qSession).createSender(staticQueue);
/** send the message*/
sender.send(message);
System.out.println("Message Send to Static Queue:" + staticQueue.getQueueName());


public class RequestHelper extends Thread
{
private Connection conn;
private MessageConsumer receiver;
private Session qSession;
private Queue tempQueue; /** Used only for logging purposes*/
private String threadId; /** Used only for logging purposes*/

public RequestHelper(Session session, Connection conn, Queue tmpQue, QueueReceiver queueReceiver) throws JMSException
{
this.receiver = queueReceiver;
this.conn = conn;
this.qSession = session;
this.threadId = getName();
this.tempQueue = tmpQue;
System.out.println("Temporay QueueName:" + threadId + ":" + tmpQue.getQueueName());
}

public void run()
{
try
{
System.out.println(threadId + ":Calling Connection.start");
conn.start();
System.out.println(threadId + ":Connection.Start Called");
System.out.println(threadId + ":Delivery of Messages Started");

String receivedText = null;
System.out.println(threadId + ":Waiting for Messages on Receiver");
Message messageM = receiver.receive();
System.out.println(threadId + ":Processing of Response started, Unpacking TextMessage");

TextMessage textMessage = (TextMessage)messageM;
receivedText = textMessage.getText();
System.out.println (threadId + ":Successfully Completed Processing of Response" + threadId + ":" + receivedText );

receiver.close();
qSession.close();
}catch ( JMSException exp)
{
exp.printStackTrace();
}
}
}