AMQP Adapter¶
While the core rabbitpy API strives to provide an easy to use, Pythonic interface
for RabbitMQ, some developers may prefer a less opinionated AMQP interface. The
rabbitpy.AMQP
adapter provides a more traditional AMQP client library
API seen in libraries like pika.
New in version 0.26.
Example¶
The following example will connect to RabbitMQ and use the rabbitpy.AMQP
adapter to consume and acknowledge messages.
import rabbitpy
with rabbitpy.Connection() as conn:
with conn.channel() as channel:
amqp = rabbitpy.AMQP(channel)
for message in amqp.basic_consume('queue-name'):
print(message)
API Documentation¶
- class rabbitpy.AMQP(channel)[source]¶
The AMQP Adapter provides a more generic, non-opinionated interface to RabbitMQ by providing methods that map to the AMQP API.
- Parameters
channel (rabbitmq.channel.Channel) – The channel to use
- basic_ack(delivery_tag=0, multiple=False)[source]¶
Acknowledge one or more messages
This method acknowledges one or more messages delivered via the Deliver or Get-Ok methods. The client can ask to confirm a single message or a set of messages up to and including a specific message.
- Parameters
delivery_tag (int|long) – Server-assigned delivery tag
multiple (bool) – Acknowledge multiple messages
- basic_cancel(consumer_tag='', nowait=False)[source]¶
End a queue consumer
This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel- ok reply.
- Parameters
consumer_tag (str) – Consumer tag
nowait (bool) – Do not send a reply method
- basic_consume(queue='', consumer_tag='', no_local=False, no_ack=False, exclusive=False, nowait=False, arguments=None)[source]¶
Start a queue consumer
This method asks the server to start a “consumer”, which is a transient request for messages from a specific queue. Consumers last as long as the channel they were declared on, or until the client cancels them.
This method will act as an generator, returning messages as they are delivered from the server.
Example use:
for message in basic_consume(queue_name): print message.body message.ack()
- Parameters
queue (str) – The queue name to consume from
consumer_tag (str) – The consumer tag
no_local (bool) – Do not deliver own messages
no_ack (bool) – No acknowledgement needed
exclusive (bool) – Request exclusive access
nowait (bool) – Do not send a reply method
arguments (dict) – Arguments for declaration
- basic_get(queue='', no_ack=False)[source]¶
Direct access to a queue
This method provides a direct access to the messages in a queue using a synchronous dialogue that is designed for specific types of application where synchronous functionality is more important than performance.
- Parameters
queue (str) – The queue name
no_ack (bool) – No acknowledgement needed
- basic_nack(delivery_tag=0, multiple=False, requeue=True)[source]¶
Reject one or more incoming messages.
This method allows a client to reject one or more incoming messages. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue. This method is also used by the server to inform publishers on channels in confirm mode of unhandled messages. If a publisher receives this method, it probably needs to republish the offending messages.
- Parameters
delivery_tag (int|long) – Server-assigned delivery tag
multiple (bool) – Reject multiple messages
requeue (bool) – Requeue the message
- basic_publish(exchange='', routing_key='', body='', properties=None, mandatory=False, immediate=False)[source]¶
Publish a message
This method publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed.
- Parameters
exchange (str) – The exchange name
routing_key (str) – Message routing key
body (str|bytes) – The message body
properties (dict) – AMQP message properties
mandatory (bool) – Indicate mandatory routing
immediate (bool) – Request immediate delivery
- Returns
bool or None
- basic_qos(prefetch_size=0, prefetch_count=0, global_flag=False)[source]¶
Specify quality of service
This method requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection. The particular properties and semantics of a qos method always depend on the content class semantics. Though the qos method could in principle apply to both peers, it is currently meaningful only for the server.
- Parameters
prefetch_size (int|long) – Prefetch window in octets
prefetch_count (int) – Prefetch window in messages
global_flag (bool) – Apply to entire connection
- basic_recover(requeue=False)[source]¶
Redeliver unacknowledged messages
This method asks the server to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method replaces the asynchronous Recover.
- Parameters
requeue (bool) – Requeue the message
- basic_reject(delivery_tag=0, requeue=True)[source]¶
Reject an incoming message
This method allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.
- Parameters
delivery_tag (int|long) – Server-assigned delivery tag
requeue (bool) – Requeue the message
- confirm_select()[source]¶
This method sets the channel to use publisher acknowledgements. The client can only use this method on a non-transactional channel.
- exchange_bind(destination='', source='', routing_key='', nowait=False, arguments=None)[source]¶
Bind exchange to an exchange.
This method binds an exchange to an exchange.
- Parameters
destination (str) – The destination exchange name
source (str) – The source exchange name
routing_key (str) – The routing key to bind with
nowait (bool) – Do not send a reply method
arguments (dict) – Optional arguments
- exchange_declare(exchange='', exchange_type='direct', passive=False, durable=False, auto_delete=False, internal=False, nowait=False, arguments=None)[source]¶
Verify exchange exists, create if needed
This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class.
- Parameters
exchange (str) – The exchange name
exchange_type (str) – Exchange type
passive (bool) – Do not create exchange
durable (bool) – Request a durable exchange
auto_delete (bool) – Automatically delete when not in use
internal (bool) – Deprecated
nowait (bool) – Do not send a reply method
arguments (dict) – Arguments for declaration
- exchange_delete(exchange='', if_unused=False, nowait=False)[source]¶
Delete an exchange
This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange are cancelled.
- Parameters
exchange (str) – The exchange name
if_unused (bool) – Delete only if unused
nowait (bool) – Do not send a reply method
- exchange_unbind(destination='', source='', routing_key='', nowait=False, arguments=None)[source]¶
Unbind an exchange from an exchange.
This method unbinds an exchange from an exchange.
- Parameters
destination (str) – The destination exchange name
source (str) – The source exchange name
routing_key (str) – The routing key to bind with
nowait (bool) – Do not send a reply method
arguments (dict) – Optional arguments
- queue_bind(queue='', exchange='', routing_key='', nowait=False, arguments=None)[source]¶
Bind queue to an exchange
This method binds a queue to an exchange. Until a queue is bound it will not receive any messages. In a classic messaging model, store-and- forward queues are bound to a direct exchange and subscription queues are bound to a topic exchange.
- Parameters
queue (str) – The queue name
exchange (str) – Name of the exchange to bind to
routing_key (str) – Message routing key
nowait (bool) – Do not send a reply method
arguments (dict) – Arguments for binding
- queue_declare(queue='', passive=False, durable=False, exclusive=False, auto_delete=False, nowait=False, arguments=None)[source]¶
Declare queue, create if needed
This method creates or checks a queue. When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue.
- Parameters
queue (str) – The queue name
passive (bool) – Do not create queue
durable (bool) – Request a durable queue
exclusive (bool) – Request an exclusive queue
auto_delete (bool) – Auto-delete queue when unused
nowait (bool) – Do not send a reply method
arguments (dict) – Arguments for declaration
- queue_delete(queue='', if_unused=False, if_empty=False, nowait=False)[source]¶
Delete a queue
This method deletes a queue. When a queue is deleted any pending messages are sent to a dead-letter queue if this is defined in the server configuration, and all consumers on the queue are cancelled.
- Parameters
queue (str) – The queue name
if_unused (bool) – Delete only if unused
if_empty (bool) – Delete only if empty
nowait (bool) – Do not send a reply method
- queue_purge(queue='', nowait=False)[source]¶
Purge a queue
This method removes all messages from a queue which are not awaiting acknowledgment.
- Parameters
queue (str) – The queue name
nowait (bool) – Do not send a reply method
- queue_unbind(queue='', exchange='', routing_key='', arguments=None)[source]¶
Unbind a queue from an exchange
This method unbinds a queue from an exchange.
- Parameters
queue (str) – The queue name
exchange (str) – The exchange name
routing_key (str) – Routing key of binding
arguments (dict) – Arguments of binding
- tx_commit()[source]¶
Commit the current transaction
This method commits all message publications and acknowledgments performed in the current transaction. A new transaction starts immediately after a commit.
- tx_rollback()[source]¶
Abandon the current transaction
This method abandons all message publications and acknowledgments performed in the current transaction. A new transaction starts immediately after a rollback. Note that unacked messages will not be automatically redelivered by rollback; if that is required an explicit recover call should be issued.