projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs
author Michael Klishin <michael@rabbitmq.com>
Fri Jul 04 15:28:03 2014 +0400 (2 weeks ago)
changeset 1187 7b7019f3c0a1
parent 1098 9e4888f7cb23
permissions -rw-r--r--
merge stable into default
     1 // This source code is dual-licensed under the Apache License, version
     2 // 2.0, and the Mozilla Public License, version 1.1.
     3 //
     4 // The APL v2.0:
     5 //
     6 //---------------------------------------------------------------------------
     7 //   Copyright (C) 2007-2014 GoPivotal, Inc.
     8 //
     9 //   Licensed under the Apache License, Version 2.0 (the "License");
    10 //   you may not use this file except in compliance with the License.
    11 //   You may obtain a copy of the License at
    12 //
    13 //       http://www.apache.org/licenses/LICENSE-2.0
    14 //
    15 //   Unless required by applicable law or agreed to in writing, software
    16 //   distributed under the License is distributed on an "AS IS" BASIS,
    17 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    18 //   See the License for the specific language governing permissions and
    19 //   limitations under the License.
    20 //---------------------------------------------------------------------------
    21 //
    22 // The MPL v1.1:
    23 //
    24 //---------------------------------------------------------------------------
    25 //  The contents of this file are subject to the Mozilla Public License
    26 //  Version 1.1 (the "License"); you may not use this file except in
    27 //  compliance with the License. You may obtain a copy of the License
    28 //  at http://www.mozilla.org/MPL/
    29 //
    30 //  Software distributed under the License is distributed on an "AS IS"
    31 //  basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
    32 //  the License for the specific language governing rights and
    33 //  limitations under the License.
    34 //
    35 //  The Original Code is RabbitMQ.
    36 //
    37 //  The Initial Developer of the Original Code is GoPivotal, Inc.
    38 //  Copyright (c) 2007-2014 GoPivotal, Inc.  All rights reserved.
    39 //---------------------------------------------------------------------------
    40 
    41 using System;
    42 using System.IO;
    43 using System.Collections;
    44 
    45 using RabbitMQ.Client;
    46 using RabbitMQ.Client.Exceptions;
    47 using RabbitMQ.Client.Events;
    48 using RabbitMQ.Util;
    49 
    50 namespace RabbitMQ.Client.MessagePatterns {
    51     ///<summary>Manages a subscription to a queue or exchange.</summary>
    52     ///<remarks>
    53     ///<para>
    54     /// This convenience class abstracts away from much of the detail
    55     /// involved in receiving messages from a queue or an exchange.
    56     ///</para>
    57     ///<para>
    58     /// Once created, the Subscription consumes from a queue (using a
    59     /// QueueingBasicConsumer). Received deliveries can be retrieved
    60     /// by calling Next(), or by using the Subscription as an
    61     /// IEnumerator in, for example, a foreach loop.
    62     ///</para>
    63     ///<para>
    64     /// Note that if the "noAck" option is enabled (which it is by
    65     /// default), then received deliveries are automatically acked
    66     /// within the server before they are even transmitted across the
    67     /// network to us. Calling Ack() on received events will always do
    68     /// the right thing: if "noAck" is enabled, nothing is done on an
    69     /// Ack() call, and if "noAck" is disabled, IModel.BasicAck() is
    70     /// called with the correct parameters.
    71     ///</para>
    72     ///</remarks>
    73     public class Subscription: IEnumerable, IEnumerator, IDisposable {
    74         protected IModel m_model;
    75 
    76         ///<summary>Retrieve the IModel our subscription is carried by.</summary>
    77         public IModel Model { get { return m_model; } }
    78 
    79         protected string m_queueName;
    80         protected bool m_noAck;
    81 
    82         protected readonly object m_eventLock = new object();
    83         protected volatile QueueingBasicConsumer m_consumer;
    84         protected string m_consumerTag;
    85 
    86         ///<summary>Retrieve the queue name we have subscribed to.</summary>
    87         public string QueueName { get { return m_queueName; } }
    88         ///<summary>Retrieve the IBasicConsumer that is receiving the
    89         ///messages from the server for us. Normally, you will not
    90         ///need to access this property - use Next() and friends
    91         ///instead.</summary>
    92         public IBasicConsumer Consumer { get { return m_consumer; } }
    93         ///<summary>Retrieve the consumer-tag that this subscription
    94         ///is using. Will usually be a server-generated
    95         ///name.</summary>
    96         public string ConsumerTag { get { return m_consumerTag; } }
    97         ///<summary>Returns true if we are in "noAck" mode, where
    98         ///calls to Ack() will be no-ops, and where the server acks
    99         ///messages before they are delivered to us. Returns false if
   100         ///we are in a mode where calls to Ack() are required, and
   101         ///where such calls will actually send an acknowledgement
   102         ///message across the network to the server.</summary>
   103         public bool NoAck { get { return m_noAck; } }
   104 
   105         protected BasicDeliverEventArgs m_latestEvent;
   106 
   107         ///<summary>Returns the most recent value returned by Next(),
   108         ///or null when either no values have been retrieved yet, the
   109         ///end of the subscription has been reached, or the most
   110         ///recent value has already been Ack()ed. See also the
   111         ///documentation for Ack().</summary>
   112         public BasicDeliverEventArgs LatestEvent { get { return m_latestEvent; } }
   113 
   114         ///<summary>Creates a new Subscription in "noAck" mode,
   115         ///consuming from a named queue.</summary>
   116         public Subscription(IModel model, string queueName)
   117             : this(model, queueName, true) {}
   118 
   119         ///<summary>Creates a new Subscription, with full control over
   120         ///both "noAck" mode and the name of the queue.</summary>
   121         public Subscription(IModel model, string queueName, bool noAck)
   122         {
   123             m_model = model;
   124             m_queueName = queueName;
   125             m_noAck = noAck;
   126             m_consumer = new QueueingBasicConsumer(m_model);
   127             m_consumerTag = m_model.BasicConsume(m_queueName, m_noAck, m_consumer);
   128             m_latestEvent = null;
   129         }
   130 
   131 
   132         ///<summary>Creates a new Subscription, with full control over
   133         ///both "noAck" mode, the name of the queue, and the consumer tag.</summary>
   134         public Subscription(IModel model, string queueName, bool noAck, string consumerTag)
   135         {
   136             m_model = model;
   137             m_queueName = queueName;
   138             m_noAck = noAck;
   139             m_consumer = new QueueingBasicConsumer(m_model);
   140             m_consumerTag = m_model.BasicConsume(m_queueName, m_noAck, consumerTag, m_consumer);
   141             m_latestEvent = null;
   142         }
   143 
   144         ///<summary>Closes this Subscription, cancelling the consumer
   145         ///record in the server.</summary>
   146         public void Close()
   147         {
   148             try {
   149                 bool shouldCancelConsumer = false;
   150 
   151                 if (m_consumer != null) {
   152                     shouldCancelConsumer = true;
   153                     m_consumer = null;
   154                 }
   155 
   156                 if (shouldCancelConsumer) {
   157                     if(m_model.IsOpen)
   158                     {
   159                         m_model.BasicCancel(m_consumerTag);
   160                     }
   161                     
   162                     m_consumerTag = null;
   163                 }
   164             } catch (OperationInterruptedException) {
   165                 // We don't mind, here.
   166             }
   167         }
   168 
   169         ///<summary>If LatestEvent is non-null, passes it to
   170         ///Ack(BasicDeliverEventArgs). Causes LatestEvent to become
   171         ///null.</summary>
   172         public void Ack()
   173         {
   174             Ack(m_latestEvent);
   175         }
   176 
   177         ///<summary>If we are not in "noAck" mode, calls
   178         ///IModel.BasicAck with the delivery-tag from <paramref name="evt"/>;
   179         ///otherwise, sends nothing to the server. if <paramref name="evt"/> is the same as LatestEvent
   180         ///by pointer comparison, sets LatestEvent to null.
   181         ///</summary>
   182         ///<remarks>
   183         ///Passing an event that did not originate with this Subscription's
   184         /// channel, will lead to unpredictable behaviour
   185         ///</remarks>
   186         public void Ack(BasicDeliverEventArgs evt)
   187         {
   188             if (evt == null) {
   189                 return;
   190             }
   191 
   192             if (!m_noAck && m_model.IsOpen) {
   193                 m_model.BasicAck(evt.DeliveryTag, false);
   194             }
   195 
   196             if (evt == m_latestEvent) {
   197                 MutateLatestEvent(null);
   198             }
   199         }
   200 
   201         ///<summary>If LatestEvent is non-null, passes it to
   202         ///Nack(BasicDeliverEventArgs, false, requeue). Causes LatestEvent to become
   203         ///null.</summary>
   204         public void Nack(bool requeue)
   205         {
   206             Nack(m_latestEvent, false, requeue);
   207         }
   208 
   209 
   210         ///<summary>If LatestEvent is non-null, passes it to
   211         ///Nack(BasicDeliverEventArgs, multiple, requeue). Causes LatestEvent to become
   212         ///null.</summary>
   213         public void Nack(bool multiple, bool requeue)
   214         {
   215             Nack(m_latestEvent, multiple, requeue);
   216         }
   217 
   218         ///<summary>If we are not in "noAck" mode, calls
   219         ///IModel.BasicNack with the delivery-tag from <paramref name="evt"/>;
   220         ///otherwise, sends nothing to the server. if <paramref name="evt"/> is the same as LatestEvent
   221         ///by pointer comparison, sets LatestEvent to null.
   222         ///</summary>
   223         ///<remarks>
   224         ///Passing an event that did not originate with this Subscription's
   225         /// channel, will lead to unpredictable behaviour
   226         ///</remarks>
   227         public void Nack(BasicDeliverEventArgs evt,
   228                          bool multiple,
   229                          bool requeue)
   230         {
   231             if (evt == null) {
   232                 return;
   233             }
   234 
   235             if (!m_noAck && m_model.IsOpen) {
   236                 m_model.BasicNack(evt.DeliveryTag, multiple, requeue);
   237             }
   238 
   239             if (evt == m_latestEvent) {
   240                 MutateLatestEvent(null);
   241             }
   242         }
   243 
   244         ///<summary>Retrieves the next incoming delivery in our
   245         ///subscription queue.</summary>
   246         ///<remarks>
   247         ///<para>
   248         /// Returns null when the end of the stream is reached and on
   249         /// every subsequent call. End-of-stream can arise through the
   250         /// action of the Subscription.Close() method, or through the
   251         /// closure of the IModel or its underlying IConnection.
   252         ///</para>
   253         ///<para>
   254         /// Updates LatestEvent to the value returned.
   255         ///</para>
   256         ///<para>
   257         /// Does not acknowledge any deliveries at all (but in "noAck"
   258         /// mode, the server will have auto-acknowledged each event
   259         /// before it is even sent across the wire to us).
   260         ///</para>
   261         ///</remarks>
   262         public BasicDeliverEventArgs Next()
   263         {
   264             // Alias the pointer as otherwise it may change out
   265             // from under us by the operation of Close() from
   266             // another thread.
   267             QueueingBasicConsumer consumer = m_consumer;
   268             try {
   269                 if (consumer == null || m_model.IsClosed) {
   270                     MutateLatestEvent(null);
   271                 } else {
   272                     BasicDeliverEventArgs bdea = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
   273                     MutateLatestEvent(bdea);
   274                 }
   275             } catch (EndOfStreamException) {
   276                 MutateLatestEvent(null);
   277             }
   278             return m_latestEvent;
   279         }
   280 
   281         ///<summary>Retrieves the next incoming delivery in our
   282         ///subscription queue, or times out after a specified number
   283         ///of milliseconds.</summary>
   284         ///<remarks>
   285         ///<para>
   286         /// Returns false only if the timeout expires before either a
   287         /// delivery appears or the end-of-stream is reached. If false
   288         /// is returned, the out parameter "result" is set to null,
   289         /// but LatestEvent is not updated.
   290         ///</para>
   291         ///<para>
   292         /// Returns true to indicate a delivery or the end-of-stream.
   293         ///</para>
   294         ///<para>
   295         /// If a delivery is already waiting in the queue, or one
   296         /// arrives before the timeout expires, it is removed from the
   297         /// queue and placed in the "result" out parameter. If the
   298         /// end-of-stream is detected before the timeout expires,
   299         /// "result" is set to null.
   300         ///</para>
   301         ///<para>
   302         /// Whenever this method returns true, it updates LatestEvent
   303         /// to the value placed in "result" before returning.
   304         ///</para>
   305         ///<para>
   306         /// End-of-stream can arise through the action of the
   307         /// Subscription.Close() method, or through the closure of the
   308         /// IModel or its underlying IConnection.
   309         ///</para>
   310         ///<para>
   311         /// This method does not acknowledge any deliveries at all
   312         /// (but in "noAck" mode, the server will have
   313         /// auto-acknowledged each event before it is even sent across
   314         /// the wire to us).
   315         ///</para>
   316         ///<para>
   317         /// A timeout of -1 (i.e. System.Threading.Timeout.Infinite)
   318         /// will be interpreted as a command to wait for an
   319         /// indefinitely long period of time for an item or the end of
   320         /// the stream to become available. Usage of such a timeout is
   321         /// equivalent to calling Next() with no arguments (modulo
   322         /// predictable method signature differences).
   323         ///</para>
   324         ///</remarks>
   325         public bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result)
   326         {
   327             try {
   328                 // Alias the pointer as otherwise it may change out
   329                 // from under us by the operation of Close() from
   330                 // another thread.
   331                 QueueingBasicConsumer consumer = m_consumer;
   332                 if (consumer == null || m_model.IsClosed) {
   333                     MutateLatestEvent(null);
   334                     result = null;
   335                     return false;
   336                 } else {
   337                     BasicDeliverEventArgs qValue;
   338                     if (!consumer.Queue.Dequeue(millisecondsTimeout, out qValue)) {
   339                         result = null;
   340                         return false;
   341                     }
   342                     MutateLatestEvent(qValue);
   343                 }
   344             } catch (EndOfStreamException) {
   345                 MutateLatestEvent(null);
   346             }
   347             result = m_latestEvent;
   348             return true;
   349         }
   350 
   351         ///<summary>Implementation of the IEnumerable interface, for
   352         ///permitting Subscription to be used in foreach
   353         ///loops.</summary>
   354         IEnumerator IEnumerable.GetEnumerator()
   355         {
   356             return this;
   357         }
   358 
   359         ///<summary>Implementation of the IEnumerator interface, for
   360         ///permitting Subscription to be used in foreach
   361         ///loops.</summary>
   362         ///<remarks>
   363         ///<para>
   364         /// As per the IEnumerator interface definition, throws
   365         /// InvalidOperationException if LatestEvent is null.
   366         ///</para>
   367         ///<para>
   368         /// Does not acknowledge any deliveries at all. Ack() must be
   369         /// called explicitly on received deliveries.
   370         ///</para>
   371         ///</remarks>
   372         object IEnumerator.Current {
   373             get {
   374                 if (m_latestEvent == null) {
   375                     throw new InvalidOperationException();
   376                 }
   377                 return m_latestEvent;
   378             }
   379         }
   380 
   381         ///<summary>Implementation of the IEnumerator interface, for
   382         ///permitting Subscription to be used in foreach
   383         ///loops.</summary>
   384         ///<remarks>
   385         ///<para>
   386         /// Does not acknowledge any deliveries at all. Ack() must be
   387         /// called explicitly on received deliveries.
   388         ///</para>
   389         ///</remarks>
   390         bool IEnumerator.MoveNext()
   391         {
   392             return Next() != null;
   393         }
   394 
   395         ///<summary>Dummy implementation of the IEnumerator interface,
   396         ///for permitting Subscription to be used in foreach loops;
   397         ///Reset()ting a Subscription doesn't make sense, so this
   398         ///method always throws InvalidOperationException.</summary>
   399         void IEnumerator.Reset()
   400         {
   401             // It really doesn't make sense to try to reset a subscription.
   402             throw new InvalidOperationException("Subscription.Reset() does not make sense");
   403         }
   404 
   405         ///<summary>Implementation of the IDisposable interface,
   406         ///permitting Subscription to be used in using
   407         ///statements. Simply calls Close().</summary>
   408         void IDisposable.Dispose()
   409         {
   410             Close();
   411         }
   412 
   413         protected void MutateLatestEvent(BasicDeliverEventArgs value)
   414         {
   415             lock(m_eventLock)
   416             {
   417                 m_latestEvent = value;
   418             }
   419         }
   420     }
   421 }