A wrapper around pika's Twisted connection to make it simpler to use.
Main assumptions:
- All messages are JSON. Non-JSON messages log an error and are acked immediately. You can publish non-JSON messages, any non-strings that are published are converted to JSON strings.
- All you really want to do is consume and publish.
Use pip:
pip install tx-easy-pika
from datetime import datetime
from txeasypika import QueueConnection
from twisted.internet import reactor
def callback(channel, tag, message):
''' This function will be called whenever the server sends us a message. '''
print "Received a message:"
print message
channel.basic_ack(tag)
queues = QueueConnection()
# Create an exchange
queues.exchange_declare(exchange="test-exchange")
# Set up a listener on a queue
queues.bind("my-queue", "test-exchange", "test-routing-key", callback)
# Now publish some messages to that queue
# Note that this message will not be published until we have started the
# Twisted reactor and we have successfully connected to Rabbit.
for i in range(5):
queues.basic_publish("test-exchange", "test-routing-key", {
"value": i,
"message_data": "This is a message",
"nested_field": {
"an_array": ["this has an array"],
"a_date": datetime.now()
}
})
# Start up the reactor
reactor.run()
Class to represent a connection to the queues.
Construct a new connection to the queues. Note that this is based on Twisted, so no connection is actually made until the reactor is running.
Arguments:
host
(str) - The server that RabbitMQ is on.port
(str) - The port that RabbitMQ is listening on.username
(str) - Username for RabbitMQ.password
(str) - Password for RabbitMQ.heartbeat
(int) - Heartbeat frequency in seconds for RabbitMQ.prefetch_count
(int) - How many messages to prefetch for this connection.log_level
(logging log level) - Logging level for Pika
Publish a message to the queues. This is a deferred method, the actual call will be made when the system has successfully connected to the AMQP server.
Arguments:
exchange
(str or unicode sequence of these characters: letters, digits, hyphen, underscore, period, or colon.) - The exchange to publish to.routing_key
(str) - The routing key to bind on.message
(anything) - The message to send. If this is not a string it will be serialized as JSON, and the properties.content_type field will be forced to be application/json.properties
(dict) - Dict of AMQP message properties.mandatory
(bool) - AMQP Mandatory flag.immediate
(bool) - AMQP Immediate flag.
Declare an exchange. This is a deferred method, the actual call will be made when the system has successfully connected to the AMQP server.
If passive
set, the server will reply with Declare-Ok if the exchange already
exists with the same name, and raise an error if not and if the exchange does
not already exist, the server MUST raise a channel exception with reply code
404 (not found).
Parameters:
callback
(method) – Call this method on Exchange.DeclareOkexchange
(str or unicode sequence of these characters: letters, digits, hyphen, underscore, period, or colon.) – The exchange name consists of a non-emptyexchange_type
(str) – The exchange type to usepassive
(bool) – Perform a declare or just check to see if it existsdurable
(bool) – Survive a reboot of RabbitMQauto_delete
(bool) – Remove when no more queues are bound to itinternal
(bool) – Can only be published to by other exchangesnowait
(bool) – Do not expect an Exchange.DeclareOk responsearguments
(dict) – Custom key/value pair arguments for the exchange
Declare a queue, create a binding on it. Optionally attach a function to call.
Note that the function will be called once for each call to bind()
queue_name
(str) - The name of the queue we want to declare.exchange
(str) - The exchange of the initial binding on the new queue.routing_key
(str) - The routing key of the initial binding on the new queue.callback
(function) - A function to call when a message is received on this queue. Arguments are (channel object, delivery tag, JSON-parsed message).arguments
(dict) - AMQP Arguments for declaring the queue.no_ack
(bool) - Whether this consumer will ack or not. Ignored ifcallback
is None.
Close the connection to the AMQP server.