rss· 投稿· 设为首页· 加入收藏· 繁體版
当前位置: 火魔网 » 程序开发 » JavaEE

结合spring使用ActiveMQ进行异步消息调用(二)

spring 对于Jms1.1有一个简单的框架,可以调用。主要是JmsTemplate这个类。   我们应该有一个监听程序去监听activemq的消息,监听到的消息去放到一个队列,有一个多线程的处理程序去处理队列里的消息。当然还要有一个发送消息的程序。 为了在web应用程序开始就启动这个消息监听程序。建一个servlet. public class ActiveMQConsumerServlet implements ServletContextListener {    JMSMsgConsumer  jmsMsgConsumer;  public  ActiveMQConsumerServlet()
    {   try {
          jmsMsgConsumer=new JMSMsgConsumer();         logger.info("ActiveMQConsumer has start");
   } catch (Exception e) {
           logger.info("ActiveMQConsumer start Error");
       logger.error(e);      }   }     消息监听类,初始化时,加载配置。触发消息的方法onMessage,应该将消息加入队列。
public class JMSMsgConsumer implements MessageListener {     ApplicationContext mAppContext = null;     Logger mLog = Logger.getLogger(this.getClass());     private String mConfFile;     private MessageQueueWorker mMessageQueueWorker;   public JMSMsgConsumer() {
        setConfigurationFile("JMSMessaging.xml");
        init();    
    private void init()      
        mLog.info("Creating the JMSMsgConsumer");         mAppContext = new ClassPathXmlApplicationContext(getConfigurationFile());
        mMessageQueueWorker = new MessageQueueWorker();
        Thread t = new Thread(mMessageQueueWorker);
        t.start();         initializeJMS();         mLog.info("Created the JMSMsgConsumer");          public void setConfigurationFile(String confFile) {
        mConfFile = confFile;     
        public String getConfigurationFile() {
        return mConfFile;     
        public void setMessageQueueWorker(MessageQueueWorker msgQueueWorker)         mMessageQueueWorker = msgQueueWorker;          public MessageQueueWorker getMessageQueueWorker()         return mMessageQueueWorker;    
    private void initializeJMS() {
        try {
            mLog.info("Initialing the JMS configurations");
            JmsTemplate template = (JmsTemplate) mAppContext
            .getBean("receiveJmsTemplate");
            Queue queue = (Queue) mAppContext.getBean("receiveDestination");
            ConnectionFactory conFactory = template.getConnectionFactory();
            Connection con = conFactory.createConnection();
                   Session session = con
            .createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageConsumer cons = session.createConsumer(queue);
            cons.setMessageListener(this);
            con.start();
            mLog.info("Initialized the JMS configurations");
        } catch (Exception e) {
            mLog.error("Error in initializing the JMS " + e);     }        public void onMessage(Message msg) {        mLog.info("on  message ");
         mMessageQueueWorker.addMessage(msg);   }
消息队列类的处理,主要是在run方法调用消息处理类,处理消息。
public class MessageQueueWorker implements Runnable {
    Queue mMessageQueue;     Integer mThreadIndex = 0;     MessageProcessor mMessageProcessor;     private final int MAX_THREAD_COUNT = 20;     Logger mLog = Logger.getLogger(this.getClass());     public MessageQueueWorker() {
        mLog.info("Starting the Message worker");
        mMessageQueue = new LinkedList();
        mLog.info("Starting the Message worker");
    }        public void addMessage(Message msg) {
        synchronized (this) {
            mLog.info("Adding the message to the queue");
            mMessageQueue.add(msg);
            notifyAll();     }        public void setMessageQueue(Queue msgQueue) {
        mMessageQueue = msgQueue;
    }     public void run() {
        try {
            while (true) {
                if (mMessageQueue.size() == 0) {
                    synchronized (this) {
                        wait();                 } else {
                    synchronized (mThreadIndex) {
                        Message msg = (Message) mMessageQueue.poll();
                        if (mThreadIndex == MAX_THREAD_COUNT) {
                            throw new Exception(
                            "The number of threads in the pool are exhausted");                         if (mThreadIndex < MAX_THREAD_COUNT) {
                            XMLMessageProcessor msgProcessor = new XMLMessageProcessor(
                                    this);
                            msgProcessor.setMessage(msg);
                            Thread t = new Thread(msgProcessor,
                                    "MessageWorkerThread#" + mThreadIndex);
                            t.start();                         mThreadIndex++;                 }         } catch (Exception e) {
            e.printStackTrace();     }        public void notifyWorkComplete() {
        synchronized (mThreadIndex) {
           // mLog.info("Notified the work complete");             mThreadIndex--;
            if (mLog.isDebugEnabled())
            {  mLog.info("Current thread number " + mThreadIndex);}     }     public MessageProcessor getMessageProcessor() {
        return mMessageProcessor;
    }     public void setMessageProcessor(MessageProcessor messageProcessor) {
        mMessageProcessor = messageProcessor;
    } }   消息处理类处理消息,我这里主要是接受一个XML进行处理,处理的过程和内容就不讲了
public class XMLMessageProcessor implements MessageProcessor, Runnable {     MessageQueueWorker mMessageWorker;     DownloadService ds;     Message mMessage;     private static XmlOptions m_validationOptions;     Logger mLog = Logger.getLogger(this.getClass());       public XMLMessageProcessor() {
            }     public XMLMessageProcessor(MessageQueueWorker msgWorker) {
        mMessageWorker = msgWorker;
    }        public void processMessage() {
        try {
            TextMessage msg = (TextMessage) mMessage;
            if (mLog.isDebugEnabled()) {
                mLog
                        .debug("onMessage(Message) - CLIENT: The received text message is "
                                + msg.getText().trim());           /* 略过xml处理的内容 */            
            } catch (Exception e) {
            mLog.info(e);     } }        
顶一下
(0)
踩一下
(0)