Queues are bound to one or more exchanges. Messages that are published to the exchanges are routed to queues where the routing and binding keys match. Queues then store messages until they are consumed by clients.
Every queue is bound to the default exchange, which provides a simple and direct method of publishing messages. Other methods are discussed in
Chapter 1, MRG Messaging Concepts.
Clients receive messages by subscribing to the queue that contains the messages they want to see. These subscribers can browse through messages without acquiring them, leaving messages on the queue for other subscribers to browse. Alternatively, clients can consume the messages, permanently removing them from the queue once they are read. This creates competition for messages. Once they have been removed from the queue, they are no longer available for other consumers to read.
Exclusive queues can only be used in one session at a time. When a queue is declared with the exclusive property set, that queue is not available for use in any other session until the session that declared the queue has been closed.
If the server receives a declare, bind, delete or subscribe request for a queue that has been declared as exclusive, an exception will be raised and the requesting session will be ended.
When a queue is deleted the queue and any messages it contains are destroyed, and all bindings that refer to the queue are removed. When the broker receives a queue delete command for a particular queue, the following checks are made before the deletion occurs:
If ACL is enabled, the broker will check that the user who initiated the deletion has permission to do so
If the ifEmpty flag is passed the broker will raise an exception if the queue is not empty
If the ifUnused flag is passed the broker will raise an exception if the queue has subscribers
If the queue is exclusive the broker will check that the user who initiated the deletion owns the queue
Once the queue has been deleted, the management object associated with the queue will remain. This makes it possible to see the deleted queue using
qpidd-tool or the MRG Management Console. These tools will show the queue with a timestamp of when it was deleted.
When a queue is created, its lifecycle can be limited by marking it to be automatically deleted. This can be achieved by setting the auto-delete field. Auto-delete is handled differently for exclusive and non-exclusive queues:
An exclusive, auto-deleted queue is deleted when the session that declared it ends
A non-exclusive auto-deleted queue will be deleted once the last subscriber is cancelled. It will not not be deleted until at least one session has subscribed and then cancelled that subscription.
Messages can be rejected by a subscribed client. Once a message has been rejected by a client, it will not be re-delivered. Messages can also be orphaned if they are left on a queue when that queue is deleted.
For both rejected and orphaned messages, the system can be configured to handle them using an alternate-exchange. An alternate exchange is specified when the queue is declared. Any rejected or orphaned messages will automatically be routed to the alternate exchange, to be re-routed to other bound queus or deleted if necessary. If no alternate exchange is specified, all rejected and orphaned messages will be automatically deleted.
A size limit can be set on a queue by specifying values for qpid.max_count and qpid.max_size when declaring the queue. By default, an exception will be raised when published messages exceed this limit.
The default behaviour can be controlled by changing the
qpid.policy_type option. The possible values for this option are:
reject
The publisher of a message that exceeds the limit receives an exception. This is the default behavior for all non-durable queues
flow_to_disk
The content of messages that exceed the limit is freed from memory and held on disk. This occurs for both persistant and non-persistant messages and is the default behavior for durable queues
ring
The oldest messages are removed to make room for newer messages
ring_strict
Similar to the ring policy, but will not remove messages that have not yet been accepted by a client. If the limit is exceeded and the oldest message has not been accepted, the publisher will receive an exception.
Under the AMQP model, exclusive, auto-deleted queues are often bound to an exchange that enables the queue owner to subscribe to messages for consumption. In this case, a queue owner might send itself messages using that exchange, but have no need to receive those messages.
To ignore locally published messages, a no-local key can be specified in the arguments to the declare used to create the queue. The value of this key is irrelevant, its presence alone will cause the correct behavior. This key will cause the queue to discard any messages that were published by the same connection as that of the session that owns the queue.
The last value queue type causes logically updated versions of previous messages to appear to overwrite the older messages.
The last value queue uses the value of the qpid.LVQ_key to determine whether a newly published message is an update to an existing message on the queue. If this is the case, the new message will appear to overwrite the older message. A subscriber that requests messages after this has occurred will see only the newer message.
There are two types of Last Value Queue:
LVQ uses a header as a key. If the key matches it replaces the message in the queue, unless:
In these cases the message is placed into the queue in FIFO. If another message with the same key is received the message that has not been accessed will be replaced. These two exceptions protect the consumer from missing the last update if a consumer or browser has accessed a message.
LVQ NO BROWSE also uses a header for a key. If the key matches it replaces the message in the queue unless the message with the matching key has been acquired. In this case browsed messaged are not invalidated, so updates to messages already browsed on a key will be missed. If a new subscription is created the latest values will be seen.
To use this feature, add a qpid.last_value_queue or qpid.last_value_queue_no_browse key to the arguments of queue declare. The value of the key is user-selected and used only for key matching. Messages published to the queue then need to specify a value for the qpid.LVQ_key in the headers of messages they publish.
This example demonstrates the use of LVQ.
{
#include "qpid/client/QueueOptions.h"
QueueOptions qo;
qo.setOrdering(LVQ);
session.queueDeclare(arg::queue=queue, arg::arguments=qo);
.....
string key;
qo.getLVQKey(key);
....
For each message, set the into application headers before transfer
message.getHeaders().setString(key,"RHT");
}
Example 3.1. Setting the LVQ
Queues can be defined as durable. A durable queue is stored in memory, and can survive a restart of the broker. However, messages on the queue must also be declared as persistent for them to be recovered..
There are other options that can be used with durable queues to control the sizing and tuning of the journal used to record queue state on disk. For more information, see
Chapter 6, Persistence.
PersistLastNode is used if a cluster fails down to a single node. In this situation, a queue would treat all transient messages as persistent until additional nodes in the cluster are restored.
This mode will not be triggered if a cluster is started with only one node. It will only be triggered if active nodes fail until there is only one node remaining.
If this mode is used, queues must be configured to be durable, otherwise it will fail to persist.
This example demonstrates the use of
Persist Last Node
#include "qpid/client/QueueOptions.h"
QueueOptions qo;
qo.clearPersistLastNode();
session.queueDeclare(arg::queue=queue, arg::durable=true,
rg::arguments=qo);
Example 3.2. Using Persist Last Node
Configuring queue options
This section explains how to set queue options from the shell prompt using the
qpid-config tool.
List information on all existing queues by using the
queues command:
$ qpid-config queues
Store Size
Durable AutoDel Excl Bindings (files x file pages) Queue Name
=======================================================================================
N N N 1 pub_start
N N N 1 pub_done
N N N 1 sub_ready
N N N 1 sub_done
N N N 1 perftest0
N Y N 2 mgmt-3206ff16-fb29-4a30-82ea
N Y N 2 repl-3206ff16-fb29-4a30-82ea
N Y N 2 mgmt-df06c7a6-4ce7-426a-9f66
N Y N 2 repl-df06c7a6-4ce7-426a-9f66
Queues are created using a command with this syntax:
$ qpid-config [options] add queue queue_name [add queue options]
The possible options are:
--durable --cluster-durable
Makes the queue durable if there is only one functioning cluster node
--file-count NUMBER
Set the number of files in the persistence journal for the queue. Defaults to 8
--file-size NUMBER
Set the number of pages in the file (each page is 64KB). Defaults to 24
--max-queue-size NUMBER
Maximum queue size in bytes.
--max-queue-count NUMBER
Maximum queue size in number of messages
--policy-type TYPE
Action to take when queue limit is reached.
TYPE can be:
reject
flow_to_disk
ring
ring_strict
--last-value-queue
Enable last value queue behavior on the queue
To delete a queue, use the
del queue command with the name of the queue to remove:
$ qpid-config del queue queue_name
Applications create queues using AMQPs queue declare command. This command allows the durable, exclusive, auto-delete and alternate-exchange properties to be specified. Any qpidd specific options can be passed in the arguments field. See the MRG Messaging API documentation for the client language you wish to use for more details.