spring
对于Jms1.1有一个简单的框架,可以调用。主要是JmsTemplate这个类。
我们应该有一个监听程序去监听
activemq的消息,监听到的消息去放到一个队列,有一个多线程的处理程序去处理队列里的消息。当然还要有一个发送消息的程序。
为了在web应用程序开始就启动这个消息监听程序。建一个servlet.
public class ActiveMQConsumerServlet implements
ServletContextListener {
JMSMsgConsumer jmsMsgConsumer;
public
ActiveMQConsumerServlet()
{
try {
jmsMsgConsumer=new JMSMsgConsumer();
logger.info("ActiveMQConsumer has start");
} catch (Exception e) {
logger.info("ActiveMQConsumer start Error");
logger.error(e);
}
}
消息监听类,初始化时,加载配置。触发消息的方法onMessage,应该将消息加入队列。
public class JMSMsgConsumer implements MessageListener {
ApplicationContext mAppContext = null;
Logger mLog
= Logger.getLogger(this.getClass());
private
String mConfFile;
private
MessageQueueWorker mMessageQueueWorker;
public JMSMsgConsumer() {
setConfigurationFile("JMSMessaging.xml");
init();
private void
init()
mLog.info("Creating the JMSMsgConsumer");
mAppContext = new
ClassPathXmlApplicationContext(getConfigurationFile());
mMessageQueueWorker = new MessageQueueWorker();
Thread t = new Thread(mMessageQueueWorker);
t.start();
initializeJMS();
mLog.info("Created the JMSMsgConsumer");
public void
setConfigurationFile(String confFile) {
mConfFile = confFile;
public String getConfigurationFile() {
return mConfFile;
public void setMessageQueueWorker(MessageQueueWorker
msgQueueWorker)
mMessageQueueWorker = msgQueueWorker;
public
MessageQueueWorker getMessageQueueWorker()
return mMessageQueueWorker;
private void
initializeJMS() {
try {
mLog.info("Initialing the JMS configurations");
JmsTemplate template = (JmsTemplate) mAppContext
.getBean("receiveJmsTemplate");
Queue queue = (Queue)
mAppContext.getBean("receiveDestination");
ConnectionFactory conFactory =
template.getConnectionFactory();
Connection con = conFactory.createConnection();
Session session = con
.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = session.createConsumer(queue);
cons.setMessageListener(this);
con.start();
mLog.info("Initialized the JMS configurations");
} catch (Exception e) {
mLog.error("Error in initializing the JMS " + e);
}
public void onMessage(Message msg) {
mLog.info("on message ");
mMessageQueueWorker.addMessage(msg);
}
消息队列类的处理,主要是在run方法调用消息处理类,处理消息。
public class MessageQueueWorker implements Runnable {
Queue
mMessageQueue;
Integer
mThreadIndex = 0;
MessageProcessor mMessageProcessor;
private
final int MAX_THREAD_COUNT = 20;
Logger
mLog = Logger.getLogger(this.getClass());
public
MessageQueueWorker() {
mLog.info("Starting the Message worker");
mMessageQueue = new LinkedList();
mLog.info("Starting the Message worker");
}
public void addMessage(Message msg) {
synchronized (this) {
mLog.info("Adding the message to the queue");
mMessageQueue.add(msg);
notifyAll();
}
public void setMessageQueue(Queue msgQueue) {
mMessageQueue = msgQueue;
}
public
void run() {
try {
while (true) {
if (mMessageQueue.size() == 0) {
synchronized (this) {
wait();
} else {
synchronized (mThreadIndex) {
Message msg = (Message) mMessageQueue.poll();
if (mThreadIndex == MAX_THREAD_COUNT) {
throw new Exception(
"The number of threads in the pool are exhausted");
if (mThreadIndex < MAX_THREAD_COUNT) {
XMLMessageProcessor msgProcessor = new XMLMessageProcessor(
this);
msgProcessor.setMessage(msg);
Thread t = new Thread(msgProcessor,
"MessageWorkerThread#" + mThreadIndex);
t.start();
mThreadIndex++;
}
} catch (Exception e) {
e.printStackTrace();
}
public void notifyWorkComplete() {
synchronized (mThreadIndex) {
// mLog.info("Notified the work complete");
mThreadIndex--;
if (mLog.isDebugEnabled())
{ mLog.info("Current thread number " +
mThreadIndex);}
}
public
MessageProcessor getMessageProcessor() {
return mMessageProcessor;
}
public
void setMessageProcessor(MessageProcessor messageProcessor) {
mMessageProcessor = messageProcessor;
}
}
消息处理类处理消息,我这里主要是接受一个XML进行处理,处理的过程和内容就不讲了
public class XMLMessageProcessor implements MessageProcessor,
Runnable {
MessageQueueWorker mMessageWorker;
DownloadService ds;
Message
mMessage;
private
static XmlOptions m_validationOptions;
Logger
mLog = Logger.getLogger(this.getClass());
public XMLMessageProcessor() {
}
public
XMLMessageProcessor(MessageQueueWorker msgWorker) {
mMessageWorker = msgWorker;
}
public
void processMessage() {
try {
TextMessage msg = (TextMessage) mMessage;
if (mLog.isDebugEnabled()) {
mLog
.debug("onMessage(Message) - CLIENT: The received text message is
"
+ msg.getText().trim());
/*
略过xml处理的内容
*/
}
catch (Exception e) {
mLog.info(e);
}
}