Introduce a message limit flag, and make it so that if there are no consumers and a named queue we declare the queue anyway. No more qos1/tx2 trick! bug24527
authorSimon MacMullen <simon@rabbitmq.com>
Wed, 08 Feb 2012 13:40:10 +0000
branchbug24527
changeset 1932e1b9b8ae268b
parent 1930 35ea468e2a67
child 1933 ce999c9ad57e
Introduce a message limit flag, and make it so that if there are no consumers and a named queue we declare the queue anyway. No more qos1/tx2 trick!
test/src/com/rabbitmq/examples/MulticastMain.java
test/src/com/rabbitmq/examples/perf/Consumer.java
test/src/com/rabbitmq/examples/perf/MulticastParams.java
test/src/com/rabbitmq/examples/perf/MulticastSet.java
test/src/com/rabbitmq/examples/perf/Producer.java
     1.1 --- a/test/src/com/rabbitmq/examples/MulticastMain.java	Tue Feb 07 15:36:52 2012 +0000
     1.2 +++ b/test/src/com/rabbitmq/examples/MulticastMain.java	Wed Feb 08 13:40:10 2012 +0000
     1.3 @@ -59,6 +59,7 @@
     1.4              int prefetchCount    = intArg(cmd, 'q', 0);
     1.5              int minMsgSize       = intArg(cmd, 's', 0);
     1.6              int timeLimit        = intArg(cmd, 'z', 0);
     1.7 +            int msgLimit         = intArg(cmd, 'C', 0);
     1.8              List<?> flags        = lstArg(cmd, 'f');
     1.9              int frameMax         = intArg(cmd, 'M', 0);
    1.10              int heartbeat        = intArg(cmd, 'b', 0);
    1.11 @@ -91,6 +92,7 @@
    1.12              p.setExclusive(        exclusive);
    1.13              p.setFlags(            flags);
    1.14              p.setMinMsgSize(       minMsgSize);
    1.15 +            p.setMsgLimit(         msgLimit);
    1.16              p.setPrefetchCount(    prefetchCount);
    1.17              p.setProducerCount(    producerCount);
    1.18              p.setProducerTxSize(   producerTxSize);
    1.19 @@ -136,6 +138,7 @@
    1.20          options.addOption(new Option("q", "qos",       true, "qos prefetch count"));
    1.21          options.addOption(new Option("s", "size",      true, "message size"));
    1.22          options.addOption(new Option("z", "time",      true, "time limit"));
    1.23 +        options.addOption(new Option("C", "messages",  true, "message limit"));
    1.24          Option flag =     new Option("f", "flag",      true, "message flag");
    1.25          flag.setArgs(Option.UNLIMITED_VALUES);
    1.26          options.addOption(flag);
     2.1 --- a/test/src/com/rabbitmq/examples/perf/Consumer.java	Tue Feb 07 15:36:52 2012 +0000
     2.2 +++ b/test/src/com/rabbitmq/examples/perf/Consumer.java	Wed Feb 08 13:40:10 2012 +0000
     2.3 @@ -35,11 +35,12 @@
     2.4      private int              txSize;
     2.5      private boolean          autoAck;
     2.6      private Stats stats;
     2.7 +    private int              msgLimit;
     2.8      private long             timeLimit;
     2.9  
    2.10      public Consumer(Channel channel, String id,
    2.11                      String queueName, int txSize, boolean autoAck,
    2.12 -                    Stats stats, int timeLimit) {
    2.13 +                    Stats stats, int msgLimit, int timeLimit) {
    2.14  
    2.15          this.channel   = channel;
    2.16          this.id        = id;
    2.17 @@ -47,6 +48,7 @@
    2.18          this.txSize    = txSize;
    2.19          this.autoAck   = autoAck;
    2.20          this.stats     = stats;
    2.21 +        this.msgLimit  = msgLimit;
    2.22          this.timeLimit = 1000L * timeLimit;
    2.23      }
    2.24  
    2.25 @@ -60,7 +62,8 @@
    2.26              q = new QueueingConsumer(channel);
    2.27              channel.basicConsume(queueName, autoAck, q);
    2.28  
    2.29 -            while (timeLimit == 0 || now < startTime + timeLimit) {
    2.30 +            while ((timeLimit == 0 || now < startTime + timeLimit) &&
    2.31 +                   totalMsgCount < msgLimit) {
    2.32                  QueueingConsumer.Delivery delivery;
    2.33                  try {
    2.34                      if (timeLimit == 0) {
     3.1 --- a/test/src/com/rabbitmq/examples/perf/MulticastParams.java	Tue Feb 07 15:36:52 2012 +0000
     3.2 +++ b/test/src/com/rabbitmq/examples/perf/MulticastParams.java	Wed Feb 08 13:40:10 2012 +0000
     3.3 @@ -30,6 +30,7 @@
     3.4  
     3.5      protected int timeLimit = 10;
     3.6      protected int rateLimit = 0;
     3.7 +    protected int msgLimit = 0;
     3.8  
     3.9      protected String exchangeName = "direct";
    3.10      protected String exchangeType = "direct";
    3.11 @@ -93,6 +94,10 @@
    3.12          this.timeLimit = timeLimit;
    3.13      }
    3.14  
    3.15 +    public void setMsgLimit(int msgLimit) {
    3.16 +        this.msgLimit = msgLimit;
    3.17 +    }
    3.18 +
    3.19      public void setFlags(List<?> flags) {
    3.20          this.flags = flags;
    3.21      }
     4.1 --- a/test/src/com/rabbitmq/examples/perf/MulticastSet.java	Tue Feb 07 15:36:52 2012 +0000
     4.2 +++ b/test/src/com/rabbitmq/examples/perf/MulticastSet.java	Wed Feb 08 13:40:10 2012 +0000
     4.3 @@ -63,10 +63,21 @@
     4.4              Thread t =
     4.5                  new Thread(new Consumer(channel, id, qName,
     4.6                                          p.consumerTxSize, p.autoAck,
     4.7 -                                        stats, p.timeLimit));
     4.8 +                                        stats, p.msgLimit, p.timeLimit));
     4.9              consumerThreads[i] = t;
    4.10          }
    4.11  
    4.12 +        if (p.consumerCount == 0 && !p.queueName.equals("")) {
    4.13 +            Connection conn = factory.newConnection();
    4.14 +            Channel channel = conn.createChannel();
    4.15 +            channel.queueDeclare(p.queueName,
    4.16 +                                 p.flags.contains("persistent"),
    4.17 +                                 p.exclusive, p.autoDelete,
    4.18 +                                 null).getQueue();
    4.19 +            channel.queueBind(p.queueName, p.exchangeName, id);
    4.20 +            conn.close();
    4.21 +        }
    4.22 +
    4.23          Thread[] producerThreads = new Thread[p.producerCount];
    4.24          Connection[] producerConnections = new Connection[p.producerCount];
    4.25          Channel[] producerChannels = new Channel[p.producerCount];
    4.26 @@ -83,7 +94,8 @@
    4.27              channel.exchangeDeclare(p.exchangeName, p.exchangeType);
    4.28              final Producer producer = new Producer(channel, p.exchangeName, id,
    4.29                                                     p.flags, p.producerTxSize,
    4.30 -                                                   p.rateLimit, p.minMsgSize, p.timeLimit,
    4.31 +                                                   p.rateLimit, p.msgLimit,
    4.32 +                                                   p.minMsgSize, p.timeLimit,
    4.33                                                     p.confirm, stats);
    4.34              channel.addReturnListener(producer);
    4.35              channel.addConfirmListener(producer);
     5.1 --- a/test/src/com/rabbitmq/examples/perf/Producer.java	Tue Feb 07 15:36:52 2012 +0000
     5.2 +++ b/test/src/com/rabbitmq/examples/perf/Producer.java	Wed Feb 08 13:40:10 2012 +0000
     5.3 @@ -42,6 +42,7 @@
     5.4      private boolean persistent;
     5.5      private int     txSize;
     5.6      private int     rateLimit;
     5.7 +    private int     msgLimit;
     5.8      private long    timeLimit;
     5.9  
    5.10      private Stats stats;
    5.11 @@ -58,7 +59,7 @@
    5.12  
    5.13      public Producer(Channel channel, String exchangeName, String id,
    5.14                      List<?> flags, int txSize,
    5.15 -                    int rateLimit, int minMsgSize, int timeLimit,
    5.16 +                    int rateLimit, int msgLimit, int minMsgSize, int timeLimit,
    5.17                      long confirm, Stats stats)
    5.18          throws IOException {
    5.19  
    5.20 @@ -70,6 +71,7 @@
    5.21          this.persistent   = flags.contains("persistent");
    5.22          this.txSize       = txSize;
    5.23          this.rateLimit    = rateLimit;
    5.24 +        this.msgLimit     = msgLimit;
    5.25          this.timeLimit    = 1000L * timeLimit;
    5.26          this.message      = new byte[minMsgSize];
    5.27          if (confirm > 0) {
    5.28 @@ -128,7 +130,8 @@
    5.29  
    5.30          try {
    5.31  
    5.32 -            while (timeLimit == 0 || now < startTime + timeLimit) {
    5.33 +            while ((timeLimit == 0 || now < startTime + timeLimit) &&
    5.34 +                   msgCount < msgLimit) {
    5.35                  if (confirmPool != null) {
    5.36                      confirmPool.acquire();
    5.37                  }