.Net下的MSMQ的同步異步調用_.Net教程

      編輯Tag賺U幣
      教程Tag:暫無Tag,歡迎添加,賺取U幣!

      推薦:用.net動態創建類的實例
      用.net動態創建類的實例 看了網上很多關于DotNet動態創建類的實例的文章,我這里想總結一下,其實方法很簡單,就是用“Activator.CreateInstance”。但是這個方法需要待創建的類的

      一、MSMQ簡介

      MSMQ(微軟消息隊列)是Windows操作系統中消息應用程序的基礎,是用于創建分布式、松散連接的消息通訊應用程序的開發工具。消息隊列

      和電子郵件有著很多相似處,他們都包含多個屬性,用于保存消息,消息類型中都指出發送者和接收者的地址;然而他們的用處卻有著很大的

      區別:消息隊列的發送者和接收者是應用程序,而電子郵件的發送者和接收者通常是人。如同電子郵件一樣,消息隊列的發送和接收也不需要

      發送者和接收者同時在場,可以存儲在消息隊列或是郵件服務器中。

      二、消息隊列的安裝

      默認情況下安裝操作系統是不安裝消息隊列的,你可以在控制面板中找到添加/刪除程序,然后選擇添加/刪除Windows組件一項,然后選擇應

      用程序服務器,雙擊它進入詳細資料中選擇消息隊列一項進行安裝,如圖:

      三、消息隊列類型

      消息對列分為3類:

      公共隊列

      MachineName\QueueName

      能被別的機器所訪問,如果你的多個項目中用到消息隊列,那么你可以把隊列定義為公共隊列

      專用隊列

      MachineName\Private$\QueueName

      只針對于本機的程序才可以調用的隊列,有些情況下為了安全起見定義為私有隊列。

      日志隊列

      MachineName\QueueName\Journal$

      四、消息隊列的創建

      MessageQueue Mq=new MessageQueue(“.\\private$\\Mymq”);

      通過Path屬性引用消息隊列的代碼也十分簡單:

      MessageQueue Mq=new MessageQueue();

      Mq.Path=”.\\private$\\Mymq”;

      使用 Create 方法可以在計算機上創建隊列:

      System.Messaging.MessageQueue.Create(@".\private$\Mymq");

      這里注意由于在C#中要記住用反斜杠將“\”轉義。

      由于消息對列所放置的地方經常改變,所以建議消息隊列路徑不要寫死,建議放在配置文件中。

      五、消息的發送

      消息的發送可以分為簡單消息和復雜消息,簡單消息類型就是常用的數據類型,例如整型、字符串等數據;復雜消息的數據類型通常對應于系統中的復雜數據類型,例如結構,對象等等。

      Mq.Send("Hello!");

      在這里建議你可以事先定義一個對象類,然后發送這個對象類的實例對象,這樣以后無論在增加什么發送信息,只需在對象類中增加相應的屬性即可。

      六、消息的接收和閱讀

      (1)同步接收消息

      接收消息的代碼很簡單:

      Mq.Receive();
      Mq.Receive(TimeSpan timeout); //設定超時時間
      Mq.ReceiveById(ID);
      Mq.Peek();

      通過Receive方法接收消息同時永久性地從隊列中刪除消息;

      通過Peek方法從隊列中取出消息而不從隊列中移除該消息。

      如果知道消息的標識符(ID),還可以通過ReceiveById方法和PeekById方法完成相應的操作。

      (2)異步接受消息

      利用委托機制:MessQueue.ReceiveCompleted =new ReceiveCompletedEventHandler(mq_ReceiveCompleted);

      (3)消息閱讀

      在應用程序能夠閱讀的消息和消息隊列中的消息格式不同,應用程序發送出去的消息經過序列化以后才發送給了消息隊列
      而在接受端必須反序列化,利用下面的代碼可以實現:

      public void mq_ReceiveCompleted(object sender, System.Messaging.ReceiveCompletedEventArgs e)
      {
      System.Messaging.Message m = MessQueue.EndReceive(e.AsyncResult);
      m.Formatter = new System.Messaging.XmlMessageFormatter(new string[]{"System.String,mscorlib"});
      Console.WriteLine("Message: " (string)m.Body);
      MessQueue.BeginReceive() ;

      }

      反序列化還有另一種寫法:m.Formatter = new XmlMessageFormatter ( new Type [] { typeof (string) } );

      七、由于消息隊列的代碼有些是固定不便的,所以把這些代碼封裝成一個類方便以后使用:

      以下為引用的內容:
      1using System;
      2using System.Messaging;
      3using System.Threading;

      5
      6namespace LoveStatusService
      7{
      8 /**//// <summary>
      9 /// Summary description for Msmq.
      10 /// </summary>
      11 public class Msmq
      12 {
      13 public Msmq()
      14 {
      15 //
      16 // TODO: Add constructor logic here
      17 //
      18 }
      19
      20
      21 private MessageQueue _messageQueue=null;
      22 //最大并發線程數
      23 private static int MAX_WORKER_THREADS=Convert.ToInt32( System.Configuration.ConfigurationSettings.AppSettings["MAX_WORKER_THREADS"].ToString());
      24 //Msmq路徑
      25 private static string MsmqPath=System.Configuration.ConfigurationSettings.AppSettings["LoveStatusMQPath"];
      26 //等待句柄
      27 private WaitHandle[] waitHandleArray = new WaitHandle[MAX_WORKER_THREADS];
      28 //任務類型
      29 //1. Send Email 2. Send Message 3. Send Email and Message
      30 private string TaskType=System.Configuration.ConfigurationSettings.AppSettings["TaskType"];
      31 public MessageQueue MessQueue
      32 {
      33 get
      34 {
      35
      36 if (_messageQueue==null)
      37 {
      38 if(MessageQueue.Exists(MsmqPath))
      39 {
      40 _messageQueue = new MessageQueue(MsmqPath);
      41 }
      42 else
      43 {
      44 _messageQueue = MessageQueue.Create(MsmqPath);
      45 }
      46 }
      47
      48
      49 return _messageQueue;
      50 }
      51 }
      52
      53
      54 Private Method#region Private Method
      55
      56 private void mq_ReceiveCompleted(object sender, System.Messaging.ReceiveCompletedEventArgs e)
      57 {
      58 MessageQueue mqq = (MessageQueue)sender;
      59 System.Messaging.Message m = mqq.EndReceive(e.AsyncResult);
      60 //m.Formatter = new System.Messaging.XmlMessageFormatter(new string[]{"System.String,mscorlib"});
      61 m.Formatter =new System.Messaging.XmlMessageFormatter(new Type[] {typeof(UserObject)}) ;
      62 //log.Info("Receive UserID: " (string)m.Body) ;
      63 UserObject obj=(UserObject)m.Body ;
      64 long curUserId=obj.curUserID ;
      65 long oppUserId=obj.oppUserID;
      66 string curUserName=obj.curUserName;
      67 string oppUserName=obj.oppUserName;
      68 string curEmail=obj.curEmail ;
      69 string oppEmail=obj.oppEmail;
      70 string subject =obj.subject ;
      71 string body=obj.body ;
      72 //AppLog.log.Info("curUserId:" curUserId) ;
      73 //AppLog.log.Info("oppUserId:" oppUserId) ;
      74 AppLog.log.Info("==type=" TaskType) ;
      75 switch(TaskType)
      76 {
      77 //Email
      78 case "1":
      79 EmailForMQ.SendEmailForLoveStatus(curEmail,oppEmail,curUserName,oppUserName,subject) ;
      80 AppLog.log.Info("==Send to==" oppEmail) ;
      81 break;
      82 //Message
      83 case "2":
      84 MessageForMQ.SendMessage(curUserId,oppUserId,subject,body) ;
      85 AppLog.log.Info("==Send Msg to==" oppUserId) ;
      86 break;
      87 //Email and Message
      88 case "3":
      89 EmailForMQ.SendEmailForLoveStatus(curEmail,oppEmail,curUserName,oppUserName,subject) ;
      90 AppLog.log.Info("==Send to==" oppEmail) ;
      91 MessageForMQ.SendMessage(curUserId,oppUserId,subject,body) ;
      92 AppLog.log.Info("==Send Msg to==" oppUserId) ;
      93 break;
      94 default:
      95 break;
      96
      97 }
      98 mqq.BeginReceive() ;
      99
      100 }
      101
      102 #endregion
      103
      104 Public Method#region Public Method
      105
      106 //一個將對象發送到隊列的方法,這里發送的是對象
      107 public void SendUserIDToMQ(object arr)
      108 {
      109 MessQueue.Send(arr) ;
      110 Console.WriteLine("Ok") ;
      111 Console.Read() ;
      112 }
      113
      114 //同步接受隊列內容的方法
      115 public void ReceiveFromMQ()
      116 {
      117 Message ms=new Message() ;
      118
      119 //ms=MessQueue.Peek();
      120 try
      121 {
      122 ms=MessQueue.Receive(new TimeSpan(0,0,5));
      123 if(ms!=null)
      124 {
      125 ms.Formatter = new XmlMessageFormatter ( new Type [] { typeof (string) } );
      126 AppLog.log.Info((string)ms.Body) ;
      127 }
      128 }
      129 catch(Exception ex)
      130 {
      131
      132 }
      133
      134
      135 }
      136
      137 //開始監聽工作線程
      138 public void startListen()
      139 {
      140 AppLog.log.Info("--Thread--" MAX_WORKER_THREADS) ;
      141 MessQueue.ReceiveCompleted =new ReceiveCompletedEventHandler(mq_ReceiveCompleted);
      142
      143 //異步方式,并發
      144
      145 for(int i=0; i<MAX_WORKER_THREADS; i )
      146 {
      147 // Begin asynchronous operations.
      148 waitHandleArray[i] = MessQueue.BeginReceive().AsyncWaitHandle;
      149 }
      150
      151 AppLog.log.Info("------Start Listen--------") ;
      152
      153 return;
      154
      155 }
      156
      157
      158 //停止監聽工作線程
      159 public void stopListen()
      160 {
      161
      162 for(int i=0;i<waitHandleArray.Length;i )
      163 {
      164
      165 try
      166 {
      167 waitHandleArray[i].Close();
      168 }
      169 catch
      170 {
      171 AppLog.log.Info("---waitHandleArray[i].Close() Error!-----") ;
      172 }
      173
      174 }
      175
      176 try
      177 {
      178 // Specify to wait for all operations to return.
      179 WaitHandle.WaitAll(waitHandleArray,1000,false);
      180 }
      181 catch
      182 {
      183 AppLog.log.Info("---WaitHandle.WaitAll Error!-----") ;
      184 }
      185 AppLog.log.Info("------Stop Listen--------") ;
      186
      187 }
      188
      189 #endregion
      190
      191
      192
      193
      194 }
      195}
      196

      UserObject的代碼

      以下為引用的內容:
      1using System;
      2
      3namespace Goody9807
      4{
      5 /**//// <summary>
      6 /// 用與在MQ上傳輸數據的對象
      7 /// </summary>
      8 public class UserObject
      9 {
      10 public UserObject()
      11 {
      12 //
      13 // TODO: Add constructor logic here
      14 //
      15 }
      16
      17 private long _curUserID;
      18 public long curUserID
      19 {
      20 get
      21 {
      22 return _curUserID;
      23 }
      24 set
      25 {
      26 _curUserID=value;
      27 }
      28 }
      29
      30 private string _curUserName="";
      31 public string curUserName
      32 {
      33 get
      34 {
      35 return _curUserName;
      36 }
      37 set
      38 {
      39 _curUserName=value;
      40 }
      41 }
      42
      43 private string _curEmail="";
      44 public string curEmail
      45 {
      46 get
      47 {
      48 return _curEmail;
      49 }
      50 set
      51 {
      52 _curEmail=value;
      53 }
      54 }
      55
      56
      57 private long _oppUserID;
      58 public long oppUserID
      59 {
      60 get
      61 {
      62 return _oppUserID;
      63 }
      64 set
      65 {
      66 _oppUserID=value;
      67 }
      68 }
      69
      70 private string _oppUserName="";
      71 public string oppUserName
      72 {
      73 get
      74 {
      75 return _oppUserName;
      76 }
      77 set
      78 {
      79 _oppUserName=value;
      80 }
      81 }
      82
      83 private string _oppEmail="";
      84 public string oppEmail
      85 {
      86 get
      87 {
      88 return _oppEmail;
      89 }
      90 set
      91 {
      92 _oppEmail=value;
      93 }
      94 }
      95
      96 private string _subject ="";
      97 public string subject
      98 {
      99 get
      100 {
      101 return _subject;
      102 }
      103 set
      104 {
      105 _subject=value;
      106 }
      107 }
      108
      109 private string _body="";
      110 public string body
      111 {
      112 get
      113 {
      114 return _body;
      115 }
      116 set
      117 {
      118 _body=value;
      119 }
      120 }
      121 }
      122}
      123

      另一個同事寫的封裝類

      以下為引用的內容:
      1using System;
      2
      3using System.Threading;
      4
      5using System.Messaging;
      6
      7
      8
      9namespace Wapdm.SmsApp
      10
      11{
      12
      13 /**//// <summary>
      14
      15 /// <para>
      16
      17 /// A Logger implementation that writes messages to a message queue.
      18
      19 /// The default event formatter used is an instance of XMLEventFormatter
      20
      21 /// </para>
      22
      23 /// </summary>
      24
      25 public sealed class MsgQueue
      26
      27 {
      28
      29
      30
      31 private const string BLANK_STRING = "";
      32
      33 private const string PERIOD = @".\private$"; //".";
      34
      35 private const string ELLIPSIS = "";
      36
      37
      38
      39 private string serverAddress;
      40
      41 private string queueName;
      42
      43 private string queuePath;
      44
      45
      46
      47 private bool IsContextEnabled;
      48
      49
      50
      51 private MessageQueue queue;
      52
      53
      54
      55 private object queueMonitor = new object();
      56
      57
      58
      59 private MsgQueue() {}
      60
      61
      62
      63 public static MsgQueue mq = null;
      64
      65 public static WaitHandle[] waitHandleArray = new WaitHandle[Util.MAX_WORKER_THREADS];
      66
      67
      68
      69 public MsgQueue(string _serverAddress, string _queueName, string _summaryPattern)
      70
      71 {
      72
      73 if ((_serverAddress == null) || (_queueName == null) || (_summaryPattern == null))
      74
      75 {
      76
      77 throw new ArgumentNullException();
      78
      79 }
      80
      81 ServerAddress = _serverAddress;
      82
      83 QueueName = _queueName;
      84
      85 IsContextEnabled = true;
      86
      87 }
      88
      89
      90
      91 public MsgQueue(string _serverAddress, string _queueName)
      92
      93 {
      94
      95 if ((_serverAddress == null) || (_queueName == null))
      96
      97 {
      98
      99 throw new ArgumentNullException();
      100
      101 }
      102
      103 ServerAddress = _serverAddress;
      104
      105 QueueName = _queueName;
      106
      107 IsContextEnabled = true;
      108
      109 }
      110
      111
      112
      113 public MsgQueue(string _queueName)
      114
      115 {
      116
      117 if (_queueName == null)
      118
      119 {
      120
      121 throw new ArgumentNullException();
      122
      123 }
      124
      125 serverAddress = PERIOD;
      126
      127 QueueName = _queueName;
      128
      129 IsContextEnabled = true;
      130
      131 if ( IsContextEnabled == false )
      132
      133 throw new ArgumentNullException();
      134
      135 }
      136
      137
      138
      139 public string ServerAddress
      140
      141 {
      142
      143 get
      144
      145 {
      146
      147 return serverAddress;
      148
      149 }
      150
      151 set
      152
      153 {
      154
      155 if (value == null)
      156
      157 {
      158
      159 value = PERIOD;
      160
      161 }
      162
      163 value = value.Trim();
      164
      165 if (value.Equals(BLANK_STRING))
      166
      167 {
      168
      169 throw new ArgumentException("Invalid value (must contain non-whitespace characters)");
      170
      171 }
      172
      173 lock (queueMonitor)
      174
      175 {
      176
      177 serverAddress = value;
      178
      179 queuePath = serverAddress '\\' queueName;
      180
      181 InitializeQueue();
      182
      183 }
      184
      185 }
      186
      187 }
      188
      189
      190
      191 public string QueueName
      192
      193 {
      194
      195 get
      196
      197 {
      198
      199 return queueName;
      200
      201 }
      202
      203 set
      204
      205 {
      206
      207 if (value == null)
      208
      209 {
      210
      211 throw new ArgumentNullException();
      212
      213 }
      214
      215 value = value.Trim();
      216
      217 if (value.Equals(BLANK_STRING))
      218
      219 {
      220
      221 throw new ArgumentException("Invalid value (must contain non-whitespace characters)");
      222
      223 }
      224
      225 lock (queueMonitor)
      226
      227 {
      228
      229 queueName = value;
      230
      231 queuePath = serverAddress '\\' queueName;
      232
      233 InitializeQueue();
      234
      235 }
      236
      237 }
      238
      239 }
      240
      241
      242
      243 private void InitializeQueue()
      244
      245 {
      246
      247 lock (queueMonitor)
      248
      249 {
      250
      251 if (queue != null)
      252
      253 {
      254
      255 try { queue.Close(); }
      256
      257 catch {}
      258
      259 queue = null;
      260
      261 }
      262
      263
      264
      265 try
      266
      267 {
      268
      269 if(!MessageQueue.Exists(queuePath))
      270
      271 MessageQueue.Create(queuePath);
      272
      273 }
      274
      275 catch {}
      276
      277 try
      278
      279 {
      280
      281 queue = new MessageQueue(queuePath);
      282
      283 queue.SetPermissions("EveryOne",MessageQueueAccessRights.FullControl);
      284
      285 queue.Formatter = new XmlMessageFormatter(new Type[] {typeof(MoMsg)});
      286
      287 }
      288
      289 catch (Exception e)
      290
      291 {
      292
      293 try { queue.Close(); }
      294
      295 catch {}
      296
      297 queue = null;
      298
      299 throw new ApplicationException("Couldn't open queue at '" queuePath "': " e.GetType().FullName ": " e.Message);
      300
      301 }
      302
      303
      304
      305 }
      306
      307 }
      308
      309
      310
      311 private void AcquireResources()
      312
      313 {
      314
      315 InitializeQueue();
      316
      317 }
      318
      319
      320
      321 public void ReleaseResources()
      322
      323 {
      324
      325 lock (queueMonitor)
      326
      327 {
      328
      329 if (queue != null)
      330
      331 {
      332
      333 try
      334
      335 {
      336
      337 queue.Close();
      338
      339 }
      340
      341 catch {}
      342
      343 queue = null;
      344
      345 }
      346
      347 }
      348
      349 }
      350
      351
      352
      353 //阻塞方式
      354
      355 public MoMsg Read( )
      356
      357 {
      358
      359 MoMsg _event = null;
      360
      361 lock (queueMonitor)
      362
      363 {
      364
      365 if (queue == null)
      366
      367 {
      368
      369 InitializeQueue();
      370
      371 }
      372
      373 try
      374
      375 {
      376
      377 Message message = queue.Receive( new TimeSpan(0,0,1) );//等待10秒
      378
      379 _event = (MoMsg) (message.Body);
      380
      381 return _event;
      382
      383 }
      384
      385 catch (Exception )
      386
      387 {
      388
      389 try { queue.Close(); }
      390
      391 catch {}
      392
      393 queue = null;
      394
      395 }
      396
      397 }
      398
      399 return null;
      400
      401 }
      402
      403
      404
      405 public void Write(MoMsg _event)
      406
      407 {
      408
      409 if (_event == null)
      410
      411 {
      412
      413 return;
      414
      415 }
      416
      417 lock (queueMonitor)
      418
      419 {
      420
      421 try
      422
      423 {
      424
      425 if (queue == null)
      426
      427 {
      428
      429 InitializeQueue();
      430
      431 }
      432
      433
      434
      435 Message message = new Message();
      436
      437 message.Priority = _event.Priority;
      438
      439 message.Recoverable = true;
      440
      441 message.Body = _event; //eventFormatter.Format(_event);
      442
      443
      444
      445 queue.Send(message);
      446
      447 }
      448
      449 catch (Exception e)
      450
      451 {
      452
      453 try { queue.Close(); }
      454
      455 catch {}
      456
      457 queue = null;
      458
      459 Util.Log.log("Couldn't write Message (" e.GetType().FullName ": " e.Message ")");
      460
      461 }
      462
      463 }
      464
      465 }
      466
      467
      468
      469 public static bool statusTest()
      470
      471 {
      472
      473 bool reValue = false;
      474
      475 try
      476
      477 {
      478
      479 MessageEnumerator re = mq.queue.GetMessageEnumerator();
      480
      481 bool rev = re.MoveNext();
      482
      483 reValue = true;
      484
      485 }
      486
      487 catch
      488
      489 {
      490
      491 reValue = false;
      492
      493 }
      494
      495
      496
      497 return reValue;
      498
      499 }
      500
      501
      502
      503 public static void startListen()
      504
      505 {
      506
      507 mq = new MsgQueue(Util.MqName);
      508
      509
      510
      511 mq.queue.ReceiveCompleted =new ReceiveCompletedEventHandler(queue_ReceiveCompleted);
      512
      513
      514
      515 //異步方式,并發
      516
      517 for(int i=0; i<Util.MAX_WORKER_THREADS; i )
      518
      519 {
      520
      521 // Begin asynchronous operations.
      522
      523 waitHandleArray[i] =
      524
      525 mq.queue.BeginReceive().AsyncWaitHandle;
      526
      527 }
      528
      529
      530
      531 return;
      532
      533 }
      534
      535
      536
      537 public static void stopListen()
      538
      539 {
      540
      541
      542
      543 for(int i=0;i<waitHandleArray.Length;i )
      544
      545 {
      546
      547 try
      548
      549 {
      550
      551 waitHandleArray[i].Close();
      552
      553 }
      554
      555 catch
      556
      557 {
      558
      559 //忽略錯誤
      560
      561 }
      562
      563 }
      564
      565
      566
      567 try
      568
      569 {
      570
      571 // Specify to wait for all operations to return.
      572
      573 WaitHandle.WaitAll(waitHandleArray,1000,false);
      574
      575 }
      576
      577 catch
      578
      579 {
      580
      581 //忽略錯誤
      582
      583 }
      584
      585 }
      586
      587
      588
      589 private static void queue_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
      590
      591 {
      592
      593 // Connect to the queue.
      594
      595 MessageQueue mqq = (MessageQueue)sender;
      596
      597
      598
      599 // End the asynchronous Receive operation.
      600
      601 Message m = mqq.EndReceive(e.AsyncResult);
      602
      603
      604
      605 Util.ProcessMo((MoMsg)(m.Body));
      606
      607
      608
      609 if(Util.isRunning)
      610
      611 {
      612
      613 // Restart the asynchronous Receive operation.
      614
      615 mqq.BeginReceive();
      616
      617 }
      618
      619
      620
      621 return;
      622
      623 }
      624
      625 }
      626
      627}

      分享:ASP.NET 2.0 里輸出文本格式流
      在用 ASP.NET 編程時,打開一個頁面一般是通過指定超鏈接地址,調用指定的頁面文件(.html、.aspx)等方法。 但是,如果即將打開的頁面文件的內容是在程序中動態生成,或者是從數據庫的表里取出

      來源:模板無憂//所屬分類:.Net教程/更新時間:2008-08-22
      相關.Net教程