projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs
author Michael Klishin <michael@rabbitmq.com>
Wed, 28 Jan 2015 04:56:11 +0300
changeset 1436 1f520069d3e0
parent 1102 3c506f0cd008
permissions -rw-r--r--
Try bumping these to improve CI stability
     1 // This source code is dual-licensed under the Apache License, version
     2 // 2.0, and the Mozilla Public License, version 1.1.
     3 //
     4 // The APL v2.0:
     5 //
     6 //---------------------------------------------------------------------------
     7 //   Copyright (C) 2007-2014 GoPivotal, Inc.
     8 //
     9 //   Licensed under the Apache License, Version 2.0 (the "License");
    10 //   you may not use this file except in compliance with the License.
    11 //   You may obtain a copy of the License at
    12 //
    13 //       http://www.apache.org/licenses/LICENSE-2.0
    14 //
    15 //   Unless required by applicable law or agreed to in writing, software
    16 //   distributed under the License is distributed on an "AS IS" BASIS,
    17 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    18 //   See the License for the specific language governing permissions and
    19 //   limitations under the License.
    20 //---------------------------------------------------------------------------
    21 //
    22 // The MPL v1.1:
    23 //
    24 //---------------------------------------------------------------------------
    25 //  The contents of this file are subject to the Mozilla Public License
    26 //  Version 1.1 (the "License"); you may not use this file except in
    27 //  compliance with the License. You may obtain a copy of the License
    28 //  at http://www.mozilla.org/MPL/
    29 //
    30 //  Software distributed under the License is distributed on an "AS IS"
    31 //  basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
    32 //  the License for the specific language governing rights and
    33 //  limitations under the License.
    34 //
    35 //  The Original Code is RabbitMQ.
    36 //
    37 //  The Initial Developer of the Original Code is GoPivotal, Inc.
    38 //  Copyright (c) 2007-2014 GoPivotal, Inc.  All rights reserved.
    39 //---------------------------------------------------------------------------
    40 
    41 using System;
    42 using System.Collections;
    43 using System.IO;
    44 using RabbitMQ.Client.Events;
    45 using RabbitMQ.Client.Exceptions;
    46 
    47 namespace RabbitMQ.Client.MessagePatterns
    48 {
    49     ///<summary>Manages a subscription to a queue or exchange.</summary>
    50     ///<remarks>
    51     ///<para>
    52     /// This convenience class abstracts away from much of the detail
    53     /// involved in receiving messages from a queue or an exchange.
    54     ///</para>
    55     ///<para>
    56     /// Once created, the Subscription consumes from a queue (using a
    57     /// QueueingBasicConsumer). Received deliveries can be retrieved
    58     /// by calling Next(), or by using the Subscription as an
    59     /// IEnumerator in, for example, a foreach loop.
    60     ///</para>
    61     ///<para>
    62     /// Note that if the "noAck" option is enabled (which it is by
    63     /// default), then received deliveries are automatically acked
    64     /// within the server before they are even transmitted across the
    65     /// network to us. Calling Ack() on received events will always do
    66     /// the right thing: if "noAck" is enabled, nothing is done on an
    67     /// Ack() call, and if "noAck" is disabled, IModel.BasicAck() is
    68     /// called with the correct parameters.
    69     ///</para>
    70     ///</remarks>
    71     public class Subscription : IEnumerable, IEnumerator, IDisposable
    72     {
    73         protected readonly object m_eventLock = new object();
    74         protected volatile QueueingBasicConsumer m_consumer;
    75 
    76         ///<summary>Creates a new Subscription in "noAck" mode,
    77         ///consuming from a named queue.</summary>
    78         public Subscription(IModel model, string queueName)
    79             : this(model, queueName, true)
    80         {
    81         }
    82 
    83         ///<summary>Creates a new Subscription, with full control over
    84         ///both "noAck" mode and the name of the queue.</summary>
    85         public Subscription(IModel model, string queueName, bool noAck)
    86         {
    87             Model = model;
    88             QueueName = queueName;
    89             NoAck = noAck;
    90             m_consumer = new QueueingBasicConsumer(Model);
    91             ConsumerTag = Model.BasicConsume(QueueName, NoAck, m_consumer);
    92             LatestEvent = null;
    93         }
    94 
    95         ///<summary>Creates a new Subscription, with full control over
    96         ///both "noAck" mode, the name of the queue, and the consumer tag.</summary>
    97         public Subscription(IModel model, string queueName, bool noAck, string consumerTag)
    98         {
    99             Model = model;
   100             QueueName = queueName;
   101             NoAck = noAck;
   102             m_consumer = new QueueingBasicConsumer(Model);
   103             ConsumerTag = Model.BasicConsume(QueueName, NoAck, consumerTag, m_consumer);
   104             LatestEvent = null;
   105         }
   106 
   107         ///<summary>Retrieve the IBasicConsumer that is receiving the
   108         ///messages from the server for us. Normally, you will not
   109         ///need to access this property - use Next() and friends
   110         ///instead.</summary>
   111         public IBasicConsumer Consumer
   112         {
   113             get { return m_consumer; }
   114         }
   115 
   116         ///<summary>Retrieve the consumer-tag that this subscription
   117         ///is using. Will usually be a server-generated
   118         ///name.</summary>
   119         public string ConsumerTag { get; protected set; }
   120 
   121         ///<summary>Returns the most recent value returned by Next(),
   122         ///or null when either no values have been retrieved yet, the
   123         ///end of the subscription has been reached, or the most
   124         ///recent value has already been Ack()ed. See also the
   125         ///documentation for Ack().</summary>
   126         public BasicDeliverEventArgs LatestEvent { get; protected set; }
   127 
   128         ///<summary>Retrieve the IModel our subscription is carried by.</summary>
   129         public IModel Model { get; protected set; }
   130 
   131         ///<summary>Returns true if we are in "noAck" mode, where
   132         ///calls to Ack() will be no-ops, and where the server acks
   133         ///messages before they are delivered to us. Returns false if
   134         ///we are in a mode where calls to Ack() are required, and
   135         ///where such calls will actually send an acknowledgement
   136         ///message across the network to the server.</summary>
   137         public bool NoAck { get; protected set; }
   138 
   139         ///<summary>Retrieve the queue name we have subscribed to.</summary>
   140         public string QueueName { get; protected set; }
   141 
   142         ///<summary>Implementation of the IEnumerator interface, for
   143         ///permitting Subscription to be used in foreach
   144         ///loops.</summary>
   145         ///<remarks>
   146         ///<para>
   147         /// As per the IEnumerator interface definition, throws
   148         /// InvalidOperationException if LatestEvent is null.
   149         ///</para>
   150         ///<para>
   151         /// Does not acknowledge any deliveries at all. Ack() must be
   152         /// called explicitly on received deliveries.
   153         ///</para>
   154         ///</remarks>
   155         object IEnumerator.Current
   156         {
   157             get
   158             {
   159                 if (LatestEvent == null)
   160                 {
   161                     throw new InvalidOperationException();
   162                 }
   163                 return LatestEvent;
   164             }
   165         }
   166 
   167         ///<summary>If LatestEvent is non-null, passes it to
   168         ///Ack(BasicDeliverEventArgs). Causes LatestEvent to become
   169         ///null.</summary>
   170         public void Ack()
   171         {
   172             Ack(LatestEvent);
   173         }
   174 
   175         ///<summary>If we are not in "noAck" mode, calls
   176         ///IModel.BasicAck with the delivery-tag from <paramref name="evt"/>;
   177         ///otherwise, sends nothing to the server. if <paramref name="evt"/> is the same as LatestEvent
   178         ///by pointer comparison, sets LatestEvent to null.
   179         ///</summary>
   180         ///<remarks>
   181         ///Passing an event that did not originate with this Subscription's
   182         /// channel, will lead to unpredictable behaviour
   183         ///</remarks>
   184         public void Ack(BasicDeliverEventArgs evt)
   185         {
   186             if (evt == null)
   187             {
   188                 return;
   189             }
   190 
   191             if (!NoAck && Model.IsOpen)
   192             {
   193                 Model.BasicAck(evt.DeliveryTag, false);
   194             }
   195 
   196             if (evt == LatestEvent)
   197             {
   198                 MutateLatestEvent(null);
   199             }
   200         }
   201 
   202         ///<summary>Closes this Subscription, cancelling the consumer
   203         ///record in the server.</summary>
   204         public void Close()
   205         {
   206             try
   207             {
   208                 bool shouldCancelConsumer = false;
   209 
   210                 if (m_consumer != null)
   211                 {
   212                     shouldCancelConsumer = true;
   213                     m_consumer = null;
   214                 }
   215 
   216                 if (shouldCancelConsumer)
   217                 {
   218                     if (Model.IsOpen)
   219                     {
   220                         Model.BasicCancel(ConsumerTag);
   221                     }
   222 
   223                     ConsumerTag = null;
   224                 }
   225             }
   226             catch (OperationInterruptedException)
   227             {
   228                 // We don't mind, here.
   229             }
   230         }
   231 
   232         ///<summary>If LatestEvent is non-null, passes it to
   233         ///Nack(BasicDeliverEventArgs, false, requeue). Causes LatestEvent to become
   234         ///null.</summary>
   235         public void Nack(bool requeue)
   236         {
   237             Nack(LatestEvent, false, requeue);
   238         }
   239 
   240         ///<summary>If LatestEvent is non-null, passes it to
   241         ///Nack(BasicDeliverEventArgs, multiple, requeue). Causes LatestEvent to become
   242         ///null.</summary>
   243         public void Nack(bool multiple, bool requeue)
   244         {
   245             Nack(LatestEvent, multiple, requeue);
   246         }
   247 
   248         ///<summary>If we are not in "noAck" mode, calls
   249         ///IModel.BasicNack with the delivery-tag from <paramref name="evt"/>;
   250         ///otherwise, sends nothing to the server. if <paramref name="evt"/> is the same as LatestEvent
   251         ///by pointer comparison, sets LatestEvent to null.
   252         ///</summary>
   253         ///<remarks>
   254         ///Passing an event that did not originate with this Subscription's
   255         /// channel, will lead to unpredictable behaviour
   256         ///</remarks>
   257         public void Nack(BasicDeliverEventArgs evt, bool multiple, bool requeue)
   258         {
   259             if (evt == null)
   260             {
   261                 return;
   262             }
   263 
   264             if (!NoAck && Model.IsOpen)
   265             {
   266                 Model.BasicNack(evt.DeliveryTag, multiple, requeue);
   267             }
   268 
   269             if (evt == LatestEvent)
   270             {
   271                 MutateLatestEvent(null);
   272             }
   273         }
   274 
   275         ///<summary>Retrieves the next incoming delivery in our
   276         ///subscription queue.</summary>
   277         ///<remarks>
   278         ///<para>
   279         /// Returns null when the end of the stream is reached and on
   280         /// every subsequent call. End-of-stream can arise through the
   281         /// action of the Subscription.Close() method, or through the
   282         /// closure of the IModel or its underlying IConnection.
   283         ///</para>
   284         ///<para>
   285         /// Updates LatestEvent to the value returned.
   286         ///</para>
   287         ///<para>
   288         /// Does not acknowledge any deliveries at all (but in "noAck"
   289         /// mode, the server will have auto-acknowledged each event
   290         /// before it is even sent across the wire to us).
   291         ///</para>
   292         ///</remarks>
   293         public BasicDeliverEventArgs Next()
   294         {
   295             // Alias the pointer as otherwise it may change out
   296             // from under us by the operation of Close() from
   297             // another thread.
   298             QueueingBasicConsumer consumer = m_consumer;
   299             try
   300             {
   301                 if (consumer == null || Model.IsClosed)
   302                 {
   303                     MutateLatestEvent(null);
   304                 }
   305                 else
   306                 {
   307                     BasicDeliverEventArgs bdea = consumer.Queue.Dequeue();
   308                     MutateLatestEvent(bdea);
   309                 }
   310             }
   311             catch (EndOfStreamException)
   312             {
   313                 MutateLatestEvent(null);
   314             }
   315             return LatestEvent;
   316         }
   317 
   318         ///<summary>Retrieves the next incoming delivery in our
   319         ///subscription queue, or times out after a specified number
   320         ///of milliseconds.</summary>
   321         ///<remarks>
   322         ///<para>
   323         /// Returns false only if the timeout expires before either a
   324         /// delivery appears or the end-of-stream is reached. If false
   325         /// is returned, the out parameter "result" is set to null,
   326         /// but LatestEvent is not updated.
   327         ///</para>
   328         ///<para>
   329         /// Returns true to indicate a delivery or the end-of-stream.
   330         ///</para>
   331         ///<para>
   332         /// If a delivery is already waiting in the queue, or one
   333         /// arrives before the timeout expires, it is removed from the
   334         /// queue and placed in the "result" out parameter. If the
   335         /// end-of-stream is detected before the timeout expires,
   336         /// "result" is set to null.
   337         ///</para>
   338         ///<para>
   339         /// Whenever this method returns true, it updates LatestEvent
   340         /// to the value placed in "result" before returning.
   341         ///</para>
   342         ///<para>
   343         /// End-of-stream can arise through the action of the
   344         /// Subscription.Close() method, or through the closure of the
   345         /// IModel or its underlying IConnection.
   346         ///</para>
   347         ///<para>
   348         /// This method does not acknowledge any deliveries at all
   349         /// (but in "noAck" mode, the server will have
   350         /// auto-acknowledged each event before it is even sent across
   351         /// the wire to us).
   352         ///</para>
   353         ///<para>
   354         /// A timeout of -1 (i.e. System.Threading.Timeout.Infinite)
   355         /// will be interpreted as a command to wait for an
   356         /// indefinitely long period of time for an item or the end of
   357         /// the stream to become available. Usage of such a timeout is
   358         /// equivalent to calling Next() with no arguments (modulo
   359         /// predictable method signature differences).
   360         ///</para>
   361         ///</remarks>
   362         public bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result)
   363         {
   364             try
   365             {
   366                 // Alias the pointer as otherwise it may change out
   367                 // from under us by the operation of Close() from
   368                 // another thread.
   369                 QueueingBasicConsumer consumer = m_consumer;
   370                 if (consumer == null || Model.IsClosed)
   371                 {
   372                     MutateLatestEvent(null);
   373                     result = null;
   374                     return false;
   375                 }
   376                 else
   377                 {
   378                     BasicDeliverEventArgs qValue;
   379                     if (!consumer.Queue.Dequeue(millisecondsTimeout, out qValue))
   380                     {
   381                         result = null;
   382                         return false;
   383                     }
   384                     MutateLatestEvent(qValue);
   385                 }
   386             }
   387             catch (EndOfStreamException)
   388             {
   389                 MutateLatestEvent(null);
   390             }
   391             result = LatestEvent;
   392             return true;
   393         }
   394 
   395         ///<summary>Implementation of the IDisposable interface,
   396         ///permitting Subscription to be used in using
   397         ///statements. Simply calls Close().</summary>
   398         void IDisposable.Dispose()
   399         {
   400             Close();
   401         }
   402 
   403         ///<summary>Implementation of the IEnumerable interface, for
   404         ///permitting Subscription to be used in foreach
   405         ///loops.</summary>
   406         IEnumerator IEnumerable.GetEnumerator()
   407         {
   408             return this;
   409         }
   410 
   411         ///<summary>Implementation of the IEnumerator interface, for
   412         ///permitting Subscription to be used in foreach
   413         ///loops.</summary>
   414         ///<remarks>
   415         ///<para>
   416         /// Does not acknowledge any deliveries at all. Ack() must be
   417         /// called explicitly on received deliveries.
   418         ///</para>
   419         ///</remarks>
   420         bool IEnumerator.MoveNext()
   421         {
   422             return Next() != null;
   423         }
   424 
   425         ///<summary>Dummy implementation of the IEnumerator interface,
   426         ///for permitting Subscription to be used in foreach loops;
   427         ///Reset()ting a Subscription doesn't make sense, so this
   428         ///method always throws InvalidOperationException.</summary>
   429         void IEnumerator.Reset()
   430         {
   431             // It really doesn't make sense to try to reset a subscription.
   432             throw new InvalidOperationException("Subscription.Reset() does not make sense");
   433         }
   434 
   435         protected void MutateLatestEvent(BasicDeliverEventArgs value)
   436         {
   437             lock (m_eventLock)
   438             {
   439                 LatestEvent = value;
   440             }
   441         }
   442     }
   443 }