.Net下的MSMQ的同步異步調用_.Net教程
推薦:用.net動態創建類的實例用.net動態創建類的實例 看了網上很多關于DotNet動態創建類的實例的文章,我這里想總結一下,其實方法很簡單,就是用“Activator.CreateInstance”。但是這個方法需要待創建的類的
一、MSMQ簡介
MSMQ(微軟消息隊列)是Windows操作系統中消息應用程序的基礎,是用于創建分布式、松散連接的消息通訊應用程序的開發工具。消息隊列
和電子郵件有著很多相似處,他們都包含多個屬性,用于保存消息,消息類型中都指出發送者和接收者的地址;然而他們的用處卻有著很大的
區別:消息隊列的發送者和接收者是應用程序,而電子郵件的發送者和接收者通常是人。如同電子郵件一樣,消息隊列的發送和接收也不需要
發送者和接收者同時在場,可以存儲在消息隊列或是郵件服務器中。
二、消息隊列的安裝
默認情況下安裝操作系統是不安裝消息隊列的,你可以在控制面板中找到添加/刪除程序,然后選擇添加/刪除Windows組件一項,然后選擇應
用程序服務器,雙擊它進入詳細資料中選擇消息隊列一項進行安裝,如圖:
三、消息隊列類型
消息對列分為3類:
公共隊列
MachineName\QueueName
能被別的機器所訪問,如果你的多個項目中用到消息隊列,那么你可以把隊列定義為公共隊列
專用隊列
MachineName\Private$\QueueName
只針對于本機的程序才可以調用的隊列,有些情況下為了安全起見定義為私有隊列。
日志隊列
MachineName\QueueName\Journal$
四、消息隊列的創建
MessageQueue Mq=new MessageQueue(“.\\private$\\Mymq”);
通過Path屬性引用消息隊列的代碼也十分簡單:
MessageQueue Mq=new MessageQueue();
Mq.Path=”.\\private$\\Mymq”;
使用 Create 方法可以在計算機上創建隊列:
System.Messaging.MessageQueue.Create(@".\private$\Mymq");
這里注意由于在C#中要記住用反斜杠將“\”轉義。
由于消息對列所放置的地方經常改變,所以建議消息隊列路徑不要寫死,建議放在配置文件中。
五、消息的發送
消息的發送可以分為簡單消息和復雜消息,簡單消息類型就是常用的數據類型,例如整型、字符串等數據;復雜消息的數據類型通常對應于系統中的復雜數據類型,例如結構,對象等等。
Mq.Send("Hello!");
在這里建議你可以事先定義一個對象類,然后發送這個對象類的實例對象,這樣以后無論在增加什么發送信息,只需在對象類中增加相應的屬性即可。
六、消息的接收和閱讀
(1)同步接收消息
接收消息的代碼很簡單:
Mq.Receive();
Mq.Receive(TimeSpan timeout); //設定超時時間
Mq.ReceiveById(ID);
Mq.Peek();
通過Receive方法接收消息同時永久性地從隊列中刪除消息;
通過Peek方法從隊列中取出消息而不從隊列中移除該消息。
如果知道消息的標識符(ID),還可以通過ReceiveById方法和PeekById方法完成相應的操作。
(2)異步接受消息
利用委托機制:MessQueue.ReceiveCompleted =new ReceiveCompletedEventHandler(mq_ReceiveCompleted);
(3)消息閱讀
在應用程序能夠閱讀的消息和消息隊列中的消息格式不同,應用程序發送出去的消息經過序列化以后才發送給了消息隊列
而在接受端必須反序列化,利用下面的代碼可以實現:
public void mq_ReceiveCompleted(object sender, System.Messaging.ReceiveCompletedEventArgs e)
{
System.Messaging.Message m = MessQueue.EndReceive(e.AsyncResult);
m.Formatter = new System.Messaging.XmlMessageFormatter(new string[]{"System.String,mscorlib"});
Console.WriteLine("Message: " (string)m.Body);
MessQueue.BeginReceive() ;
}
反序列化還有另一種寫法:m.Formatter = new XmlMessageFormatter ( new Type [] { typeof (string) } );
七、由于消息隊列的代碼有些是固定不便的,所以把這些代碼封裝成一個類方便以后使用:
以下為引用的內容: 5 |
UserObject的代碼
以下為引用的內容: 1using System; 2 3namespace Goody9807 4{ 5 /**//// <summary> 6 /// 用與在MQ上傳輸數據的對象 7 /// </summary> 8 public class UserObject 9 { 10 public UserObject() 11 { 12 // 13 // TODO: Add constructor logic here 14 // 15 } 16 17 private long _curUserID; 18 public long curUserID 19 { 20 get 21 { 22 return _curUserID; 23 } 24 set 25 { 26 _curUserID=value; 27 } 28 } 29 30 private string _curUserName=""; 31 public string curUserName 32 { 33 get 34 { 35 return _curUserName; 36 } 37 set 38 { 39 _curUserName=value; 40 } 41 } 42 43 private string _curEmail=""; 44 public string curEmail 45 { 46 get 47 { 48 return _curEmail; 49 } 50 set 51 { 52 _curEmail=value; 53 } 54 } 55 56 57 private long _oppUserID; 58 public long oppUserID 59 { 60 get 61 { 62 return _oppUserID; 63 } 64 set 65 { 66 _oppUserID=value; 67 } 68 } 69 70 private string _oppUserName=""; 71 public string oppUserName 72 { 73 get 74 { 75 return _oppUserName; 76 } 77 set 78 { 79 _oppUserName=value; 80 } 81 } 82 83 private string _oppEmail=""; 84 public string oppEmail 85 { 86 get 87 { 88 return _oppEmail; 89 } 90 set 91 { 92 _oppEmail=value; 93 } 94 } 95 96 private string _subject =""; 97 public string subject 98 { 99 get 100 { 101 return _subject; 102 } 103 set 104 { 105 _subject=value; 106 } 107 } 108 109 private string _body=""; 110 public string body 111 { 112 get 113 { 114 return _body; 115 } 116 set 117 { 118 _body=value; 119 } 120 } 121 } 122} 123 |
另一個同事寫的封裝類
以下為引用的內容: 1using System; 2 3using System.Threading; 4 5using System.Messaging; 6 7 8 9namespace Wapdm.SmsApp 10 11{ 12 13 /**//// <summary> 14 15 /// <para> 16 17 /// A Logger implementation that writes messages to a message queue. 18 19 /// The default event formatter used is an instance of XMLEventFormatter 20 21 /// </para> 22 23 /// </summary> 24 25 public sealed class MsgQueue 26 27 { 28 29 30 31 private const string BLANK_STRING = ""; 32 33 private const string PERIOD = @".\private$"; //"."; 34 35 private const string ELLIPSIS = ""; 36 37 38 39 private string serverAddress; 40 41 private string queueName; 42 43 private string queuePath; 44 45 46 47 private bool IsContextEnabled; 48 49 50 51 private MessageQueue queue; 52 53 54 55 private object queueMonitor = new object(); 56 57 58 59 private MsgQueue() {} 60 61 62 63 public static MsgQueue mq = null; 64 65 public static WaitHandle[] waitHandleArray = new WaitHandle[Util.MAX_WORKER_THREADS]; 66 67 68 69 public MsgQueue(string _serverAddress, string _queueName, string _summaryPattern) 70 71 { 72 73 if ((_serverAddress == null) || (_queueName == null) || (_summaryPattern == null)) 74 75 { 76 77 throw new ArgumentNullException(); 78 79 } 80 81 ServerAddress = _serverAddress; 82 83 QueueName = _queueName; 84 85 IsContextEnabled = true; 86 87 } 88 89 90 91 public MsgQueue(string _serverAddress, string _queueName) 92 93 { 94 95 if ((_serverAddress == null) || (_queueName == null)) 96 97 { 98 99 throw new ArgumentNullException(); 100 101 } 102 103 ServerAddress = _serverAddress; 104 105 QueueName = _queueName; 106 107 IsContextEnabled = true; 108 109 } 110 111 112 113 public MsgQueue(string _queueName) 114 115 { 116 117 if (_queueName == null) 118 119 { 120 121 throw new ArgumentNullException(); 122 123 } 124 125 serverAddress = PERIOD; 126 127 QueueName = _queueName; 128 129 IsContextEnabled = true; 130 131 if ( IsContextEnabled == false ) 132 133 throw new ArgumentNullException(); 134 135 } 136 137 138 139 public string ServerAddress 140 141 { 142 143 get 144 145 { 146 147 return serverAddress; 148 149 } 150 151 set 152 153 { 154 155 if (value == null) 156 157 { 158 159 value = PERIOD; 160 161 } 162 163 value = value.Trim(); 164 165 if (value.Equals(BLANK_STRING)) 166 167 { 168 169 throw new ArgumentException("Invalid value (must contain non-whitespace characters)"); 170 171 } 172 173 lock (queueMonitor) 174 175 { 176 177 serverAddress = value; 178 179 queuePath = serverAddress '\\' queueName; 180 181 InitializeQueue(); 182 183 } 184 185 } 186 187 } 188 189 190 191 public string QueueName 192 193 { 194 195 get 196 197 { 198 199 return queueName; 200 201 } 202 203 set 204 205 { 206 207 if (value == null) 208 209 { 210 211 throw new ArgumentNullException(); 212 213 } 214 215 value = value.Trim(); 216 217 if (value.Equals(BLANK_STRING)) 218 219 { 220 221 throw new ArgumentException("Invalid value (must contain non-whitespace characters)"); 222 223 } 224 225 lock (queueMonitor) 226 227 { 228 229 queueName = value; 230 231 queuePath = serverAddress '\\' queueName; 232 233 InitializeQueue(); 234 235 } 236 237 } 238 239 } 240 241 242 243 private void InitializeQueue() 244 245 { 246 247 lock (queueMonitor) 248 249 { 250 251 if (queue != null) 252 253 { 254 255 try { queue.Close(); } 256 257 catch {} 258 259 queue = null; 260 261 } 262 263 264 265 try 266 267 { 268 269 if(!MessageQueue.Exists(queuePath)) 270 271 MessageQueue.Create(queuePath); 272 273 } 274 275 catch {} 276 277 try 278 279 { 280 281 queue = new MessageQueue(queuePath); 282 283 queue.SetPermissions("EveryOne",MessageQueueAccessRights.FullControl); 284 285 queue.Formatter = new XmlMessageFormatter(new Type[] {typeof(MoMsg)}); 286 287 } 288 289 catch (Exception e) 290 291 { 292 293 try { queue.Close(); } 294 295 catch {} 296 297 queue = null; 298 299 throw new ApplicationException("Couldn't open queue at '" queuePath "': " e.GetType().FullName ": " e.Message); 300 301 } 302 303 304 305 } 306 307 } 308 309 310 311 private void AcquireResources() 312 313 { 314 315 InitializeQueue(); 316 317 } 318 319 320 321 public void ReleaseResources() 322 323 { 324 325 lock (queueMonitor) 326 327 { 328 329 if (queue != null) 330 331 { 332 333 try 334 335 { 336 337 queue.Close(); 338 339 } 340 341 catch {} 342 343 queue = null; 344 345 } 346 347 } 348 349 } 350 351 352 353 //阻塞方式 354 355 public MoMsg Read( ) 356 357 { 358 359 MoMsg _event = null; 360 361 lock (queueMonitor) 362 363 { 364 365 if (queue == null) 366 367 { 368 369 InitializeQueue(); 370 371 } 372 373 try 374 375 { 376 377 Message message = queue.Receive( new TimeSpan(0,0,1) );//等待10秒 378 379 _event = (MoMsg) (message.Body); 380 381 return _event; 382 383 } 384 385 catch (Exception ) 386 387 { 388 389 try { queue.Close(); } 390 391 catch {} 392 393 queue = null; 394 395 } 396 397 } 398 399 return null; 400 401 } 402 403 404 405 public void Write(MoMsg _event) 406 407 { 408 409 if (_event == null) 410 411 { 412 413 return; 414 415 } 416 417 lock (queueMonitor) 418 419 { 420 421 try 422 423 { 424 425 if (queue == null) 426 427 { 428 429 InitializeQueue(); 430 431 } 432 433 434 435 Message message = new Message(); 436 437 message.Priority = _event.Priority; 438 439 message.Recoverable = true; 440 441 message.Body = _event; //eventFormatter.Format(_event); 442 443 444 445 queue.Send(message); 446 447 } 448 449 catch (Exception e) 450 451 { 452 453 try { queue.Close(); } 454 455 catch {} 456 457 queue = null; 458 459 Util.Log.log("Couldn't write Message (" e.GetType().FullName ": " e.Message ")"); 460 461 } 462 463 } 464 465 } 466 467 468 469 public static bool statusTest() 470 471 { 472 473 bool reValue = false; 474 475 try 476 477 { 478 479 MessageEnumerator re = mq.queue.GetMessageEnumerator(); 480 481 bool rev = re.MoveNext(); 482 483 reValue = true; 484 485 } 486 487 catch 488 489 { 490 491 reValue = false; 492 493 } 494 495 496 497 return reValue; 498 499 } 500 501 502 503 public static void startListen() 504 505 { 506 507 mq = new MsgQueue(Util.MqName); 508 509 510 511 mq.queue.ReceiveCompleted =new ReceiveCompletedEventHandler(queue_ReceiveCompleted); 512 513 514 515 //異步方式,并發 516 517 for(int i=0; i<Util.MAX_WORKER_THREADS; i ) 518 519 { 520 521 // Begin asynchronous operations. 522 523 waitHandleArray[i] = 524 525 mq.queue.BeginReceive().AsyncWaitHandle; 526 527 } 528 529 530 531 return; 532 533 } 534 535 536 537 public static void stopListen() 538 539 { 540 541 542 543 for(int i=0;i<waitHandleArray.Length;i ) 544 545 { 546 547 try 548 549 { 550 551 waitHandleArray[i].Close(); 552 553 } 554 555 catch 556 557 { 558 559 //忽略錯誤 560 561 } 562 563 } 564 565 566 567 try 568 569 { 570 571 // Specify to wait for all operations to return. 572 573 WaitHandle.WaitAll(waitHandleArray,1000,false); 574 575 } 576 577 catch 578 579 { 580 581 //忽略錯誤 582 583 } 584 585 } 586 587 588 589 private static void queue_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e) 590 591 { 592 593 // Connect to the queue. 594 595 MessageQueue mqq = (MessageQueue)sender; 596 597 598 599 // End the asynchronous Receive operation. 600 601 Message m = mqq.EndReceive(e.AsyncResult); 602 603 604 605 Util.ProcessMo((MoMsg)(m.Body)); 606 607 608 609 if(Util.isRunning) 610 611 { 612 613 // Restart the asynchronous Receive operation. 614 615 mqq.BeginReceive(); 616 617 } 618 619 620 621 return; 622 623 } 624 625 } 626 627} |
分享:ASP.NET 2.0 里輸出文本格式流在用 ASP.NET 編程時,打開一個頁面一般是通過指定超鏈接地址,調用指定的頁面文件(.html、.aspx)等方法。 但是,如果即將打開的頁面文件的內容是在程序中動態生成,或者是從數據庫的表里取出
- asp.net如何得到GRIDVIEW中某行某列值的方法
- .net SMTP發送Email實例(可帶附件)
- js實現廣告漂浮效果的小例子
- asp.net Repeater 數據綁定的具體實現
- Asp.Net 無刷新文件上傳并顯示進度條的實現方法及思路
- Asp.net獲取客戶端IP常見代碼存在的偽造IP問題探討
- VS2010 水晶報表的使用方法
- ASP.NET中操作SQL數據庫(連接字符串的配置及獲取)
- asp.net頁面傳值測試實例代碼
- DataGridView - DataGridViewCheckBoxCell的使用介紹
- asp.net中javascript的引用(直接引入和間接引入)
- 三層+存儲過程實現分頁示例代碼
- 相關鏈接:
- 教程說明:
.Net教程-.Net下的MSMQ的同步異步調用。