项目中用到了IBM WebSphere MQ,因为需求也很简单,所以只做了很简单的封装,支持向指定的主机的指定的队列发和收消息。

用法:

WQWraper wraper = null;
/// <summary>
///
/// </summary>
private void InitWraper() //创建连接
{
string hostname = this.tbConnection.Text; //Server地址
string channel = this.tbChannel.Text; //Channel 名
string qManager = this.tbQMng.Text; //Queue Manager名
string qqueue = “a1″;

WQServer server = new WQServer(hostname, channel, qqueue, qManager, 1414);
wraper = new WQWraper(server);
}

private void button1_Click(object sender, EventArgs e) //发送消息
{
wraper.sender(tbMsg.Text.Trim());

}
private void button2_Click(object sender, EventArgs e) //获取消息
{
string msg = string.Empty;
wraper.receiver(ref msg);
tbReceive.Text = msg;
}

WQWraper类:向指定主机的指定队列读取和发送消息

 /// <summary>
    /// 向指定主机的指定队列读取和发送消息
    /// </summary>
    public class WQWraper
    {
        private WQServer _server=null;
  
        public WQServer WQSERVER
        {
            get {
                return _server;
            }
        }
        public WQWraper(WQServer server)
        {
            _server = server;
            Init();
        }
        protected MQQueueManager queueManager = null;
        protected MQQueue queue = null;

        /// <summary>
        ///   MQEnvironment初始化
        /// </summary>
        private void Init()
        {

            MQEnvironment.Hostname = _server.HostName;

            MQEnvironment.Channel = _server.Channel;

            MQEnvironment.Port = _server.Port;

            MQEnvironment.properties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES);

         

        }
        /// <summary>
        /// 向指定的主机指定的队列发送消
        /// </summary>
  
        public void sender(string sendMessage)
        {
            try
            {
                /*连接到队列管理器*/

                queueManager = new MQQueueManager(_server.QueueManagerName);

                LogService.log.Debug(”Create Queue Manager Sucess”);
              
                 /*设置打开选项以便打开用于输出的队列,如果队列管理器已经停止,我们也设置了选项去应对不成功的情况*/

 

      int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING ;

      /*打开队列*/

      queue = queueManager.AccessQueue(_server.QueueName   , openOptions);

      /*设置放置消息选项,使用默认设置*/

      MQPutMessageOptions pmo = new MQPutMessageOptions() ;

      /*创建消息,MQMessage类包含实际消息数据的数据缓冲区和描述消息的所有MQMD参数*/

      MQMessage outMsg = new MQMessage() ;

      /*设置MQMD(Manager Queue Message Description)格式字段*/

      outMsg.Format = MQC.MQFMT_STRING ;

      /*准备用户数据消息*/

   

      outMsg.WriteString(sendMessage) ;
      /*队列上放置消息*/

      queue.Put(outMsg, pmo) ;
      /*提交事务处理*/

      queueManager.Commit() ;

 LogService.log.Debug(”The message has been successfully put!”);

 

      }
   catch (MQException ex)
    {

 

     LogService.log.Debug(”An MQ Error Occured:Completion Code is :\t” +ex.StackTrace) ;

 

    }

    catch (IOException ioe)
    {
  LogService.log.Debug(ioe.StackTrace);

    }

    finally

    {

      try
      {

        if (queue != null)

        {

          queue.Close() ;

 

         LogService.log.Debug(”Close the queue:[" + _server.QueueName + "] successfully”) ;

 

        }

        if (queueManager != null)

        {

          queueManager.Close() ;

          queueManager.Disconnect() ;

          LogService.log.Debug(”Disconnect the queue manager:[" + _server.QueueManagerName + "] successfully”) ;
        }

 

      }

 

      catch (MQException mqe)
      {
          LogService.log.Debug(mqe.StackTrace);

      }
    }

 

        }
       
        /// <summary>
        /// 从指定的主机指定的队列中取出消息
        /// </summary>
        public void receiver(ref string recvMessage)
        {
            try
            {
                 /*连接到队列管理器*/
                if(queueManager==null){
                    queueManager = new MQQueueManager(_server.QueueManagerName) ;
                }

                LogService.log.Debug(”Access the queue manager:[" + _server.QueueManagerName + "] successfully”);
                /*设置打开选项以便打开用于输出的队列,如果队列管理器已经停止,设置应对不成功的情况*/

 

                int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING;

 

      if (queue == null)
      {
          queue = queueManager.AccessQueue(_server.QueueName, openOptions, null, null, null);

      }
                 /*设置放置消息选项,使用默认设置*/

 

      MQGetMessageOptions gmo = new MQGetMessageOptions();

      /*在同步点控制下获取消息*/

 

      gmo.Options = gmo.Options + MQC.MQGMO_SYNCPOINT;

 

      /*如果在队列上没有消息则等待*/

 

      gmo.Options = gmo.Options + MQC.MQGMO_WAIT;

 

      /*如果队列管理器停顿则失败*/

 

      gmo.Options = gmo.Options + MQC.MQGMO_FAIL_IF_QUIESCING;

 

      /*设置等待时间间隔*/

 

      gmo.WaitInterval = 3000;

 /*创建MQMessage类*/

 

      MQMessage inMsg = new MQMessage() ;

 

      /*从队列到消息缓冲区获取消息*/

 

      queue.Get(inMsg, gmo) ;

 

 

 

      /*从消息读取用户数据*/

 

      recvMessage = inMsg.ReadString(inMsg.MessageLength);

 

      LogService.log.Debug(”The message from the Queue is :” + recvMessage);

 

 

 

      /*提交事务*/

 

      queueManager.Commit() ;

 

 

 

      LogService.log.Debug(”The Message has been successfully put”);

 

            }
   catch (MQException ex)

    {

     LogService.log.Debug(”An MQ Error Occured:Completion Code is :\t” +   ex.StackTrace) ;

    }

 

    catch (IOException ioe)

    {
       
     LogService.log.Debug(ioe.StackTrace ) ;
    }

 

    finally

 

    {

 

      try

 

      {

 

        if (queue != null)

        {

    queue.Close();
 LogService.log.Debug(”Close the queue:[" + _server.QueueName + "] successfully”) ;

 

        }

        if (queueManager != null)

 

        {

 

          queueManager.Close() ;

 

          queueManager.Disconnect() ;

 

          LogService.log.Debug(”Disconnect the queue manager:[" +  _server.QueueManagerName + "] successfully”) ;

 

        }

 

      }

 

      catch (MQException mqe)

 

      {
          LogService.log.Debug(mqe.StackTrace);

 

      }

 

 

 

    }

 
        }

WQServer类:指定的主机实体类

/// <summary>
    /// 指定的主机
    /// </summary>
   public class WQServer
    {
       private string _HostName;
       private string _Channel;
       private string _QueueName;
       private string _QueueManagerName;
       private int _Port;

       public string HostName
       {
           get { return _HostName; }
       }
       public string Channel
       {
           get { return _Channel; }
       }
       public string QueueName {
           get { return _QueueName; }
       }
       public string QueueManagerName {
           get { return _QueueManagerName; }
       }
       public int Port {
           get { return _Port; }
       }
       public WQServer(String strHostName, String strChannel,  String strQueueName, String strQueueManagerName,int intPort)
       {
           _HostName = strHostName;
        
         _Channel=strChannel;

         _QueueName=strQueueName;

        _QueueManagerName=strQueueManagerName;

         _Port = intPort;
       }

Most Commented Posts