Wednesday, 10 March 2021

How to use RabbitMQ

 

How to use RabbitMQ



Image for post

Concepts

Exchanges

Image for post
Image for post

Publish and subscribe messages

Python publisher with pika

pip install pika
def rabbit_publish():
print("rabbit publish")

params = pika.URLParameters('amqp://guest:guest@localhost:5672')
params.socket_timeout = 5
connection = pika.BlockingConnection(params)

channel = connection.channel() # start a channel
channel.queue_declare(queue='queue1')

data = {
"id": int(datetime.now().timestamp()) % 100,
}

# send a message
channel.basic_publish(exchange='', routing_key='queue1', body=json.dumps(data))
print("[x] Message sent")
connection.close()

Python subscriber with pika

def rabbit_subscribe():
def process(msg):
print("Processing")
data = json.loads(msg)
print(" [x] Received:", data["id"])

time.sleep(3)
print(f"Processing finished: #{data['id']}");
return True

def
callback(ch: BlockingChannel,
method: pika.spec.Basic.Deliver,
properties: pika.spec.BasicProperties,
body: bytes):
if process(body):
ch.basic_ack(method.delivery_tag, multiple=False)

print("rabbit subscribe")
params = pika.URLParameters('amqp://guest:guest@localhost:5672')
params.socket_timeout = 5
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.basic_consume('queue1', callback, auto_ack=False)

# start consuming (blocks)
channel.start_consuming()
connection.close()

Types of Exchanges

Direct Exchange

Image for post

Default exchange

Fanout Exchange

Image for post

No comments:

Post a Comment

What is the difference between the Rate Limit and Quota policies?

  The   Rate Limit   and   Quota policies   in Apigee serve similar but distinct purposes: Rate Limit: • Limits the number of requests withi...