Hi All,
Acc to what I have read it should not matter whether a consumer receive messages from a queue synchronously or asynchronously, A queue always has one subscriber and once the message is consumed it is removed from the queue.
However I am not sure if what I am seeing under
JBOSS is correct or if something in the program is wrong(This program was copied from
http://www.coredevelopers.net/library/jboss/jms-dev/dev-applications.jsp) Even after the message has been received from the queue, the message is still available if I look via JMX on JBOSS- listMessages. Why is that the case? Shouldnt the message be deleted from the queue once it is consumed?
When I try a client with a synchronous receiver the message is deleted from the JBOSS queue and everythings fine.
Could it be that in the MessageListener's onMessage method, I need to do something specific that removes it from the client side.
Code is as follows
=======================================================================
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
* Listens for a TextMessage from a Queue
*/
public class AsynchQueueToMessage
{
InitialContext ctx;
QueueConnectionFactory cf;
QueueConnection connection;
QueueSession session;
Queue destination;
QueueReceiver receiver;
public static void main(
String[] args) throws NamingException,
JMSException
{
new AsynchQueueToMessage().run();
}
public void run() throws NamingException, JMSException
{
Properties props = new Properties();
props.put(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory");
props.put(Context.PROVIDER_URL, "localhost:1099");
ctx = new InitialContext(props);
cf = (QueueConnectionFactory)ctx.lookup("ConnectionFactory");
destination = (Queue)ctx.lookup("queue/testQueue");
connection = cf.createQueueConnection();
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
receiver = session.createReceiver(destination);
receiver.setMessageListener(new MyMessageListener());
System.out.println("Waiting For A Message.");
connection.start();
}
class MyMessageListener implements MessageListener {
public void onMessage(Message msg)
{
try
{
TextMessage message = (TextMessage)msg;
System.out.println("The message was: "+message.getText());
connection.close();
System.out.println("Done.");
}
catch (JMSException e)
{
e.printStackTrace();
}
}
}
}