Product SiteDocumentation Site

Chapter 3. Queues

Queues are bound to one or more exchanges. Messages that are published to the exchanges are routed to queues where the routing and binding keys match. Queues then store messages until they are consumed by clients.
Every queue is bound to the default exchange, which provides a simple and direct method of publishing messages. Other methods are discussed in Chapter 1, MRG Messaging Concepts.
Clients receive messages by subscribing to the queue that contains the messages they want to see. These subscribers can browse through messages without acquiring them, leaving messages on the queue for other subscribers to browse. Alternatively, clients can consume the messages, permanently removing them from the queue once they are read. This creates competition for messages. Once they have been removed from the queue, they are no longer available for other consumers to read.
Exclusive queues
Exclusive queues can only be used in one session at a time. When a queue is declared with the exclusive property set, that queue is not available for use in any other session until the session that declared the queue has been closed.
If the server receives a declare, bind, delete or subscribe request for a queue that has been declared as exclusive, an exception will be raised and the requesting session will be ended.
Deleting Queues
When a queue is deleted the queue and any messages it contains are destroyed, and all bindings that refer to the queue are removed. When the broker receives a queue delete command for a particular queue, the following checks are made before the deletion occurs:
  1. If ACL is enabled, the broker will check that the user who initiated the deletion has permission to do so
  2. If the ifEmpty flag is passed the broker will raise an exception if the queue is not empty
  3. If the ifUnused flag is passed the broker will raise an exception if the queue has subscribers
  4. If the queue is exclusive the broker will check that the user who initiated the deletion owns the queue
Once the queue has been deleted, the management object associated with the queue will remain. This makes it possible to see the deleted queue using qpidd-tool or the MRG Management Console. These tools will show the queue with a timestamp of when it was deleted.
Automatically deleted queues
When a queue is created, its lifecycle can be limited by marking it to be automatically deleted. This can be achieved by setting the auto-delete field. Auto-delete is handled differently for exclusive and non-exclusive queues:
Rejected and orphaned messages
Messages can be rejected by a subscribed client. Once a message has been rejected by a client, it will not be re-delivered. Messages can also be orphaned if they are left on a queue when that queue is deleted.
For both rejected and orphaned messages, the system can be configured to handle them using an alternate-exchange. An alternate exchange is specified when the queue is declared. Any rejected or orphaned messages will automatically be routed to the alternate exchange, to be re-routed to other bound queus or deleted if necessary. If no alternate exchange is specified, all rejected and orphaned messages will be automatically deleted.
Controlling queue size
A size limit can be set on a queue by specifying values for qpid.max_count and qpid.max_size when declaring the queue. By default, an exception will be raised when published messages exceed this limit.
The default behaviour can be controlled by changing the qpid.policy_type option. The possible values for this option are:
reject
The publisher of a message that exceeds the limit receives an exception. This is the default behavior for all non-durable queues
flow_to_disk
The content of messages that exceed the limit is freed from memory and held on disk. This occurs for both persistant and non-persistant messages and is the default behavior for durable queues
ring
The oldest messages are removed to make room for newer messages
ring_strict
Similar to the ring policy, but will not remove messages that have not yet been accepted by a client. If the limit is exceeded and the oldest message has not been accepted, the publisher will receive an exception.
Ignoring locally published messages
Under the AMQP model, exclusive, auto-deleted queues are often bound to an exchange that enables the queue owner to subscribe to messages for consumption. In this case, a queue owner might send itself messages using that exchange, but have no need to receive those messages.
To ignore locally published messages, a no-local key can be specified in the arguments to the declare used to create the queue. The value of this key is irrelevant, its presence alone will cause the correct behavior. This key will cause the queue to discard any messages that were published by the same connection as that of the session that owns the queue.
Last value queues
The last value queue type causes logically updated versions of previous messages to appear to overwrite the older messages.
The last value queue uses the value of the qpid.LVQ_key to determine whether a newly published message is an update to an existing message on the queue. If this is the case, the new message will appear to overwrite the older message. A subscriber that requests messages after this has occurred will see only the newer message.
There are two types of Last Value Queue:
LVQ uses a header as a key. If the key matches it replaces the message in the queue, unless:
  • the message with the matching key has been acquired
  • the message with the matching key has been browsed
In these cases the message is placed into the queue in FIFO. If another message with the same key is received the message that has not been accessed will be replaced. These two exceptions protect the consumer from missing the last update if a consumer or browser has accessed a message.
LVQ NO BROWSE also uses a header for a key. If the key matches it replaces the message in the queue unless the message with the matching key has been acquired. In this case browsed messaged are not invalidated, so updates to messages already browsed on a key will be missed. If a new subscription is created the latest values will be seen.
To use this feature, add a qpid.last_value_queue or qpid.last_value_queue_no_browse key to the arguments of queue declare. The value of the key is user-selected and used only for key matching. Messages published to the queue then need to specify a value for the qpid.LVQ_key in the headers of messages they publish.
This example demonstrates the use of LVQ.
{
#include "qpid/client/QueueOptions.h"

QueueOptions qo;
qo.setOrdering(LVQ);

session.queueDeclare(arg::queue=queue, arg::arguments=qo);

.....
string key;
qo.getLVQKey(key);

....
For each message, set the into application headers before transfer
message.getHeaders().setString(key,"RHT");
}
Example 3.1. Setting the LVQ

Durable queues
Queues can be defined as durable. A durable queue is stored in memory, and can survive a restart of the broker. However, messages on the queue must also be declared as persistent for them to be recovered..
There are other options that can be used with durable queues to control the sizing and tuning of the journal used to record queue state on disk. For more information, see Chapter 6, Persistence.
Enforcing persistence on the last node in a cluster
PersistLastNode is used if a cluster fails down to a single node. In this situation, a queue would treat all transient messages as persistent until additional nodes in the cluster are restored.
This mode will not be triggered if a cluster is started with only one node. It will only be triggered if active nodes fail until there is only one node remaining.
If this mode is used, queues must be configured to be durable, otherwise it will fail to persist.
This example demonstrates the use of Persist Last Node
#include "qpid/client/QueueOptions.h"
	
QueueOptions qo;
qo.clearPersistLastNode();

session.queueDeclare(arg::queue=queue, arg::durable=true,
rg::arguments=qo);
Example 3.2. Using Persist Last Node

Configuring queue options
This section explains how to set queue options from the shell prompt using the qpid-config tool.

Note

Information on obtaining and using the qpid-config tool is in Section 2.2.1, “Using qpid-config.
  1. List information on all existing queues by using the queues command:
    $ qpid-config  queues
    
    				     Store Size
    Durable  AutoDel  Excl  Bindings  (files x file pages)  Queue Name
    =======================================================================================
     N	   N      N         1                          pub_start
     N         N      N         1                          pub_done
     N         N      N         1                          sub_ready
     N         N      N         1                          sub_done
     N         N      N         1                          perftest0
     N         Y      N         2                          mgmt-3206ff16-fb29-4a30-82ea
     N         Y      N         2                          repl-3206ff16-fb29-4a30-82ea
     N         Y      N         2                          mgmt-df06c7a6-4ce7-426a-9f66
     N         Y      N         2                          repl-df06c7a6-4ce7-426a-9f66
    
  2. Queues are created using a command with this syntax:
    $ qpid-config [options] add queue queue_name [add queue options]
    
    The possible options are:
    --durable
    Makes the queue durable (see Chapter 6, Persistence for more information about durable queues
    --cluster-durable
    Makes the queue durable if there is only one functioning cluster node
    --file-count NUMBER
    Set the number of files in the persistence journal for the queue. Defaults to 8
    --file-size NUMBER
    Set the number of pages in the file (each page is 64KB). Defaults to 24
    --max-queue-size NUMBER
    Maximum queue size in bytes.
    --max-queue-count NUMBER
    Maximum queue size in number of messages
    --policy-type TYPE
    Action to take when queue limit is reached. TYPE can be:
    • reject
    • flow_to_disk
    • ring
    • ring_strict
    --last-value-queue
    Enable last value queue behavior on the queue
  3. To delete a queue, use the del queue command with the name of the queue to remove:
    $ qpid-config del queue queue_name
    
Creating queues from within applciations
Applications create queues using AMQPs queue declare command. This command allows the durable, exclusive, auto-delete and alternate-exchange properties to be specified. Any qpidd specific options can be passed in the arguments field. See the MRG Messaging API documentation for the client language you wish to use for more details.