projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs
author Michael Klishin <michael@rabbitmq.com>
Thu Apr 17 15:21:51 2014 +0400 (2 days ago)
changeset 1101 99669cb4ec37
parent 1090 e6e992ccaf7b
parent 1073 2f8b0a1a5a5b
child 1102 3c506f0cd008
permissions -rw-r--r--
Update (c) year in files that were created on recently merged branches
     1 // This source code is dual-licensed under the Apache License, version
     2 // 2.0, and the Mozilla Public License, version 1.1.
     3 //
     4 // The APL v2.0:
     5 //
     6 //---------------------------------------------------------------------------
     7 //   Copyright (C) 2007-2014 GoPivotal, Inc.
     8 //
     9 //   Licensed under the Apache License, Version 2.0 (the "License");
    10 //   you may not use this file except in compliance with the License.
    11 //   You may obtain a copy of the License at
    12 //
    13 //       http://www.apache.org/licenses/LICENSE-2.0
    14 //
    15 //   Unless required by applicable law or agreed to in writing, software
    16 //   distributed under the License is distributed on an "AS IS" BASIS,
    17 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    18 //   See the License for the specific language governing permissions and
    19 //   limitations under the License.
    20 //---------------------------------------------------------------------------
    21 //
    22 // The MPL v1.1:
    23 //
    24 //---------------------------------------------------------------------------
    25 //  The contents of this file are subject to the Mozilla Public License
    26 //  Version 1.1 (the "License"); you may not use this file except in
    27 //  compliance with the License. You may obtain a copy of the License
    28 //  at http://www.mozilla.org/MPL/
    29 //
    30 //  Software distributed under the License is distributed on an "AS IS"
    31 //  basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
    32 //  the License for the specific language governing rights and
    33 //  limitations under the License.
    34 //
    35 //  The Original Code is RabbitMQ.
    36 //
    37 //  The Initial Developer of the Original Code is GoPivotal, Inc.
    38 //  Copyright (c) 2007-2014 GoPivotal, Inc.  All rights reserved.
    39 //---------------------------------------------------------------------------
    40 
    41 using System;
    42 using System.IO;
    43 using System.Collections;
    44 
    45 using RabbitMQ.Client;
    46 using RabbitMQ.Client.Exceptions;
    47 using RabbitMQ.Client.Events;
    48 using RabbitMQ.Util;
    49 
    50 namespace RabbitMQ.Client.MessagePatterns {
    51     ///<summary>Manages a subscription to a queue or exchange.</summary>
    52     ///<remarks>
    53     ///<para>
    54     /// This convenience class abstracts away from much of the detail
    55     /// involved in receiving messages from a queue or an exchange.
    56     ///</para>
    57     ///<para>
    58     /// Once created, the Subscription consumes from a queue (using a
    59     /// QueueingBasicConsumer). Received deliveries can be retrieved
    60     /// by calling Next(), or by using the Subscription as an
    61     /// IEnumerator in, for example, a foreach loop.
    62     ///</para>
    63     ///<para>
    64     /// Note that if the "noAck" option is enabled (which it is by
    65     /// default), then received deliveries are automatically acked
    66     /// within the server before they are even transmitted across the
    67     /// network to us. Calling Ack() on received events will always do
    68     /// the right thing: if "noAck" is enabled, nothing is done on an
    69     /// Ack() call, and if "noAck" is disabled, IModel.BasicAck() is
    70     /// called with the correct parameters.
    71     ///</para>
    72     ///</remarks>
    73     public class Subscription: IEnumerable, IEnumerator, IDisposable {
    74         protected IModel m_model;
    75 
    76         ///<summary>Retrieve the IModel our subscription is carried by.</summary>
    77         public IModel Model { get { return m_model; } }
    78 
    79         protected string m_queueName;
    80         protected bool m_noAck;
    81 
    82         protected readonly object m_consumerLock = new object();
    83         protected readonly object m_eventLock = new object();
    84         protected volatile QueueingBasicConsumer m_consumer;
    85         protected string m_consumerTag;
    86 
    87         ///<summary>Retrieve the queue name we have subscribed to.</summary>
    88         public string QueueName { get { return m_queueName; } }
    89         ///<summary>Retrieve the IBasicConsumer that is receiving the
    90         ///messages from the server for us. Normally, you will not
    91         ///need to access this property - use Next() and friends
    92         ///instead.</summary>
    93         public IBasicConsumer Consumer { get { return m_consumer; } }
    94         ///<summary>Retrieve the consumer-tag that this subscription
    95         ///is using. Will usually be a server-generated
    96         ///name.</summary>
    97         public string ConsumerTag { get { return m_consumerTag; } }
    98         ///<summary>Returns true if we are in "noAck" mode, where
    99         ///calls to Ack() will be no-ops, and where the server acks
   100         ///messages before they are delivered to us. Returns false if
   101         ///we are in a mode where calls to Ack() are required, and
   102         ///where such calls will actually send an acknowledgement
   103         ///message across the network to the server.</summary>
   104         public bool NoAck { get { return m_noAck; } }
   105 
   106         protected BasicDeliverEventArgs m_latestEvent;
   107 
   108         ///<summary>Returns the most recent value returned by Next(),
   109         ///or null when either no values have been retrieved yet, the
   110         ///end of the subscription has been reached, or the most
   111         ///recent value has already been Ack()ed. See also the
   112         ///documentation for Ack().</summary>
   113         public BasicDeliverEventArgs LatestEvent { get { return m_latestEvent; } }
   114 
   115         ///<summary>Creates a new Subscription in "noAck" mode,
   116         ///consuming from a named queue.</summary>
   117         public Subscription(IModel model, string queueName)
   118             : this(model, queueName, true) {}
   119 
   120         ///<summary>Creates a new Subscription, with full control over
   121         ///both "noAck" mode and the name of the queue.</summary>
   122         public Subscription(IModel model, string queueName, bool noAck)
   123         {
   124             m_model = model;
   125             m_queueName = queueName;
   126             m_noAck = noAck;
   127             m_consumer = new QueueingBasicConsumer(m_model);
   128             m_consumerTag = m_model.BasicConsume(m_queueName, m_noAck, m_consumer);
   129             m_latestEvent = null;
   130         }
   131 
   132 
   133         ///<summary>Creates a new Subscription, with full control over
   134         ///both "noAck" mode, the name of the queue, and the consumer tag.</summary>
   135         public Subscription(IModel model, string queueName, bool noAck, string consumerTag)
   136         {
   137             m_model = model;
   138             m_queueName = queueName;
   139             m_noAck = noAck;
   140             m_consumer = new QueueingBasicConsumer(m_model);
   141             m_consumerTag = m_model.BasicConsume(m_queueName, m_noAck, consumerTag, m_consumer);
   142             m_latestEvent = null;
   143         }
   144 
   145         ///<summary>Closes this Subscription, cancelling the consumer
   146         ///record in the server.</summary>
   147         public void Close()
   148         {
   149             try {
   150                 bool shouldCancelConsumer = false;
   151 
   152                 lock (m_consumerLock) {
   153                     if (m_consumer != null) {
   154                         shouldCancelConsumer = true;
   155                         m_consumer = null;
   156                     }
   157                 }
   158 
   159                 if (shouldCancelConsumer) {
   160                     if(m_model.IsOpen)
   161                     {
   162                         m_model.BasicCancel(m_consumerTag);
   163                     }
   164                     
   165                     m_consumerTag = null;
   166                 }
   167             } catch (OperationInterruptedException) {
   168                 // We don't mind, here.
   169             }
   170         }
   171 
   172         ///<summary>If LatestEvent is non-null, passes it to
   173         ///Ack(BasicDeliverEventArgs). Causes LatestEvent to become
   174         ///null.</summary>
   175         public void Ack()
   176         {
   177             Ack(m_latestEvent);
   178         }
   179 
   180         ///<summary>If we are not in "noAck" mode, calls
   181         ///IModel.BasicAck with the delivery-tag from <paramref name="evt"/>;
   182         ///otherwise, sends nothing to the server. if <paramref name="evt"/> is the same as LatestEvent
   183         ///by pointer comparison, sets LatestEvent to null.
   184         ///</summary>
   185         ///<remarks>
   186         ///Passing an event that did not originate with this Subscription's
   187         /// channel, will lead to unpredictable behaviour
   188         ///</remarks>
   189         public void Ack(BasicDeliverEventArgs evt)
   190         {
   191             if (evt == null) {
   192                 return;
   193             }
   194 
   195             if (!m_noAck && m_model.IsOpen) {
   196                 m_model.BasicAck(evt.DeliveryTag, false);
   197             }
   198 
   199             if (evt == m_latestEvent) {
   200                 MutateLatestEvent(null);
   201             }
   202         }
   203 
   204         ///<summary>If LatestEvent is non-null, passes it to
   205         ///Nack(BasicDeliverEventArgs, false, requeue). Causes LatestEvent to become
   206         ///null.</summary>
   207         public void Nack(bool requeue)
   208         {
   209             Nack(m_latestEvent, false, requeue);
   210         }
   211 
   212 
   213         ///<summary>If LatestEvent is non-null, passes it to
   214         ///Nack(BasicDeliverEventArgs, multiple, requeue). Causes LatestEvent to become
   215         ///null.</summary>
   216         public void Nack(bool multiple, bool requeue)
   217         {
   218             Nack(m_latestEvent, multiple, requeue);
   219         }
   220 
   221         ///<summary>If we are not in "noAck" mode, calls
   222         ///IModel.BasicNack with the delivery-tag from <paramref name="evt"/>;
   223         ///otherwise, sends nothing to the server. if <paramref name="evt"/> is the same as LatestEvent
   224         ///by pointer comparison, sets LatestEvent to null.
   225         ///</summary>
   226         ///<remarks>
   227         ///Passing an event that did not originate with this Subscription's
   228         /// channel, will lead to unpredictable behaviour
   229         ///</remarks>
   230         public void Nack(BasicDeliverEventArgs evt,
   231                          bool multiple,
   232                          bool requeue)
   233         {
   234             if (evt == null) {
   235                 return;
   236             }
   237 
   238             if (!m_noAck && m_model.IsOpen) {
   239                 m_model.BasicNack(evt.DeliveryTag, multiple, requeue);
   240             }
   241 
   242             if (evt == m_latestEvent) {
   243                 MutateLatestEvent(null);
   244             }
   245         }
   246 
   247         ///<summary>Retrieves the next incoming delivery in our
   248         ///subscription queue.</summary>
   249         ///<remarks>
   250         ///<para>
   251         /// Returns null when the end of the stream is reached and on
   252         /// every subsequent call. End-of-stream can arise through the
   253         /// action of the Subscription.Close() method, or through the
   254         /// closure of the IModel or its underlying IConnection.
   255         ///</para>
   256         ///<para>
   257         /// Updates LatestEvent to the value returned.
   258         ///</para>
   259         ///<para>
   260         /// Does not acknowledge any deliveries at all (but in "noAck"
   261         /// mode, the server will have auto-acknowledged each event
   262         /// before it is even sent across the wire to us).
   263         ///</para>
   264         ///</remarks>
   265         public BasicDeliverEventArgs Next()
   266         {
   267             // Alias the pointer as otherwise it may change out
   268             // from under us by the operation of Close() from
   269             // another thread.
   270             QueueingBasicConsumer consumer = m_consumer;
   271             try {
   272                 if (consumer == null || m_model.IsClosed) {
   273                     MutateLatestEvent(null);
   274                 } else {
   275                     BasicDeliverEventArgs bdea = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
   276                     MutateLatestEvent(bdea);
   277                 }
   278             } catch (EndOfStreamException) {
   279                 MutateLatestEvent(null);
   280             }
   281             return m_latestEvent;
   282         }
   283 
   284         ///<summary>Retrieves the next incoming delivery in our
   285         ///subscription queue, or times out after a specified number
   286         ///of milliseconds.</summary>
   287         ///<remarks>
   288         ///<para>
   289         /// Returns false only if the timeout expires before either a
   290         /// delivery appears or the end-of-stream is reached. If false
   291         /// is returned, the out parameter "result" is set to null,
   292         /// but LatestEvent is not updated.
   293         ///</para>
   294         ///<para>
   295         /// Returns true to indicate a delivery or the end-of-stream.
   296         ///</para>
   297         ///<para>
   298         /// If a delivery is already waiting in the queue, or one
   299         /// arrives before the timeout expires, it is removed from the
   300         /// queue and placed in the "result" out parameter. If the
   301         /// end-of-stream is detected before the timeout expires,
   302         /// "result" is set to null.
   303         ///</para>
   304         ///<para>
   305         /// Whenever this method returns true, it updates LatestEvent
   306         /// to the value placed in "result" before returning.
   307         ///</para>
   308         ///<para>
   309         /// End-of-stream can arise through the action of the
   310         /// Subscription.Close() method, or through the closure of the
   311         /// IModel or its underlying IConnection.
   312         ///</para>
   313         ///<para>
   314         /// This method does not acknowledge any deliveries at all
   315         /// (but in "noAck" mode, the server will have
   316         /// auto-acknowledged each event before it is even sent across
   317         /// the wire to us).
   318         ///</para>
   319         ///<para>
   320         /// A timeout of -1 (i.e. System.Threading.Timeout.Infinite)
   321         /// will be interpreted as a command to wait for an
   322         /// indefinitely long period of time for an item or the end of
   323         /// the stream to become available. Usage of such a timeout is
   324         /// equivalent to calling Next() with no arguments (modulo
   325         /// predictable method signature differences).
   326         ///</para>
   327         ///</remarks>
   328         public bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result)
   329         {
   330             try {
   331                 // Alias the pointer as otherwise it may change out
   332                 // from under us by the operation of Close() from
   333                 // another thread.
   334                 QueueingBasicConsumer consumer = m_consumer;
   335                 if (consumer == null || m_model.IsClosed) {
   336                     MutateLatestEvent(null);
   337                     result = null;
   338                     return false;
   339                 } else {
   340                     BasicDeliverEventArgs qValue;
   341                     if (!consumer.Queue.Dequeue(millisecondsTimeout, out qValue)) {
   342                         result = null;
   343                         return false;
   344                     }
   345                     MutateLatestEvent(qValue);
   346                 }
   347             } catch (EndOfStreamException) {
   348                 MutateLatestEvent(null);
   349             }
   350             result = m_latestEvent;
   351             return true;
   352         }
   353 
   354         ///<summary>Implementation of the IEnumerable interface, for
   355         ///permitting Subscription to be used in foreach
   356         ///loops.</summary>
   357         IEnumerator IEnumerable.GetEnumerator()
   358         {
   359             return this;
   360         }
   361 
   362         ///<summary>Implementation of the IEnumerator interface, for
   363         ///permitting Subscription to be used in foreach
   364         ///loops.</summary>
   365         ///<remarks>
   366         ///<para>
   367         /// As per the IEnumerator interface definition, throws
   368         /// InvalidOperationException if LatestEvent is null.
   369         ///</para>
   370         ///<para>
   371         /// Does not acknowledge any deliveries at all. Ack() must be
   372         /// called explicitly on received deliveries.
   373         ///</para>
   374         ///</remarks>
   375         object IEnumerator.Current {
   376             get {
   377                 if (m_latestEvent == null) {
   378                     throw new InvalidOperationException();
   379                 }
   380                 return m_latestEvent;
   381             }
   382         }
   383 
   384         ///<summary>Implementation of the IEnumerator interface, for
   385         ///permitting Subscription to be used in foreach
   386         ///loops.</summary>
   387         ///<remarks>
   388         ///<para>
   389         /// Does not acknowledge any deliveries at all. Ack() must be
   390         /// called explicitly on received deliveries.
   391         ///</para>
   392         ///</remarks>
   393         bool IEnumerator.MoveNext()
   394         {
   395             return Next() != null;
   396         }
   397 
   398         ///<summary>Dummy implementation of the IEnumerator interface,
   399         ///for permitting Subscription to be used in foreach loops;
   400         ///Reset()ting a Subscription doesn't make sense, so this
   401         ///method always throws InvalidOperationException.</summary>
   402         void IEnumerator.Reset()
   403         {
   404             // It really doesn't make sense to try to reset a subscription.
   405             throw new InvalidOperationException("Subscription.Reset() does not make sense");
   406         }
   407 
   408         ///<summary>Implementation of the IDisposable interface,
   409         ///permitting Subscription to be used in using
   410         ///statements. Simply calls Close().</summary>
   411         void IDisposable.Dispose()
   412         {
   413             Close();
   414         }
   415 
   416         protected void MutateLatestEvent(BasicDeliverEventArgs value)
   417         {
   418             lock(m_eventLock)
   419             {
   420                 m_latestEvent = value;
   421             }
   422         }
   423     }
   424 }