Service Orchestration with Message Queues
Overview
This example demonstrates how to use our RabbitMQ Helper SDK (prt.mq
) to:
- Declare and configure RabbitMQ exchanges and queues (if you have permissions)
- Publish messages (asynchronously or directly)
- Consume messages using background tasks (consumers)
- Register consumers automatically (production approach)
- Export topology for environments where you do not have MQ configuration privileges
- Leverage advanced features such as prefetch concurrency, dead-letter exchanges, headers-based routing, and more.
In addition, this notebook shows how to deploy a Practicus AI app that exposes APIs for publishing messages and runs background consumers. Use this guide as both an educational reference and a hands-on example of deploying a complete MQ-based application.
1. Introduction to RabbitMQ Basics
In a GenAI or multi-agent environment, components often communicate asynchronously:
- Multiple models or agents sharing inference results
- Coordinating tasks across microservices
- Aggregating data streams in near real-time
A robust message queue architecture ensures messages do not get lost and that producers/consumers can operate at different speeds without bottlenecking the entire system.
Key Terms
- Exchanges: Routing agents; messages from producers go here. The exchange then routes to one or more queues based on type/routing key/binding.
- Queues: Store messages until consumed. Durable queues (the default in our SDK) survive RabbitMQ restarts.
- Bindings: Define how an exchange routes messages to a queue (e.g., matching a routing key or header properties).
Our SDK wraps these concepts in a simpler interface, so you can focus on your application rather than boilerplate code.
For more details on RabbitMQ concepts, visit RabbitMQ AMQP Concepts.
2. Basic Configuration & Parameters
The MQConfig
object holds all the connection and topology parameters:
conn_str
: AMQP connection string (e.g.,amqp://user:pass@host/vhost
).exchange
: Name of the exchange (optional).queue
: Name of the queue (required for consumers, optional for publishers if you have an exchange).exchange_type
: ("direct", "topic", "fanout", or "headers"). Defaults to"direct"
.routing_key
: Routing key to use when publishing.verify_delivery
: IfTrue
, the SDK sets the mandatory flag so unroutable messages are returned by the broker (note that the current code does not attach a return callback).max_retries
: Number of times a message is retried. Defaults to 5.dead_letter_exchange
&dead_letter_routing_key
: If configured, messages that exceedmax_retries
are sent to a dead-letter exchange.prefetch_count
: Limits the number of messages delivered to a consumer before acknowledgment.headers
: For headers-type exchanges (used both in publishing and binding for consuming).queue_ttl
: Time-to-live for messages in a queue, in milliseconds.
# Parameters - Adjust As Needed
mq_user = "guest"
mq_pwd = "guest"
mq_host = "prt-mq-default"
mq_vhost = "" # Recommended for fine-grained security ("/" is default if empty)
# If True, run consumer (background task) code in this notebook
test_consumer = True
# Deployment parameters for our Practicus AI app (for publishing APIs and background consumers)
app_deployment_key = None
app_prefix = "apps"
test_api = True
3. Applying or Exporting Your Topology
If you have administrator privileges on your RabbitMQ cluster, you can explicitly create (declare) exchanges, queues, and bindings via prt.mq.apply_topology(...)
. Otherwise, you can generate YAML manifests with prt.mq.export_topology(...)
and supply them to an admin.
Example: Declaring Our First Exchange & Queue
import practicuscore as prt
# Our first MQConfig. We'll define an exchange and a queue.
mq_config_basic = prt.MQConfig(
conn_str=mq_conn_str,
exchange="my-first-exchange",
routing_key="my-routing-key", # Recommended
queue="my-first-queue",
exchange_type="direct",
)
await prt.mq.apply_topology(mq_config_basic)
print("Topology applied successfully.")
# If you don't have privileges, you could instead export the topology:
# prt.mq.export_topology(mq_config_basic)
4. Publishing Messages
Basic Asynchronous Publishing via an Exchange
When an exchange
is provided in MQConfig
, your message is routed by that exchange (and the routing key) to the bound queue(s).
The following example demonstrates sending a Pydantic model (MyMsg
), which the SDK automatically serializes to JSON.
from pydantic import BaseModel
class MyMsg(BaseModel):
text: str
async def async_publish_example():
# Connect asynchronously
conn = await prt.mq.connect(mq_config_basic)
async with conn:
msg = MyMsg(text="Hello from async publisher using exchange!")
await prt.mq.publish(conn, msg)
print("Message published asynchronously using exchange.")
# Run the async publisher example
await async_publish_example()
Publishing Directly to a Queue
If you omit exchange
in MQConfig
, the SDK publishes to RabbitMQ's default exchange, using the queue
as the routing key (or routing_key
if provided).
This approach is simpler but less flexible, as it does not allow for custom routing. Practicus AI recommends using exchanges for all message publishing.
# Direct-to-Queue Publishing Example
mq_config_direct = prt.MQConfig(
conn_str=mq_conn_str,
queue="my-direct-queue",
)
# Declare the queue (if privileges allow)
await prt.mq.apply_topology(mq_config_direct)
async def async_publish_direct_queue():
conn = await prt.mq.connect(mq_config_direct)
async with conn:
msg = MyMsg(text="Hello directly to queue asynchronously!")
await prt.mq.publish(conn, msg)
print("Message published directly to queue asynchronously.")
await async_publish_direct_queue()
5. Consuming Messages
Our SDK uses the @prt.mq.consumer
decorator to mark a function as a consumer. The consumer function can have one of two signatures:
- Single-parameter function: The SDK automatically decodes the message (or deserializes it into a Pydantic model if annotated).
- Two-parameter function (optional): The second optional parameter receives the raw
aio_pika.IncomingMessage
(which gives access to headers and properties).
Important:
- Consumers run in an infinite loop. To test them, run the consumer cell in a separate notebook, process, or interrupt the kernel when done.
- The SDK automatically acknowledges messages on success and rejects them (with requeue) on error until
max_retries
is reached.
# Example of a Basic Consumer (Single Parameter)
subscriber_config = prt.MQConfig(
conn_str=mq_conn_str,
exchange="my-first-exchange",
routing_key="my-routing-key",
queue="my-first-queue",
exchange_type="direct",
prefetch_count=10,
max_retries=5,
)
@prt.mq.consumer(subscriber_config)
async def process_message(msg: MyMsg):
"""
A consumer that expects a Pydantic model.
"""
print("Received message:", msg.text)
Testing a Consumer in Jupyter
You can test a single consumer using prt.mq.test_consumer(...)
. Note: This will block further cell execution until you interrupt or restart the kernel. In production, consumers will be run in Practicus AI apps as background tasks.
if test_consumer:
print("Starting consumer. This cell will block until notebook kernel is interrupted or restarted.")
await prt.mq.test_consumer(process_message)
import practicuscore as prt
# Example of a Consumer with Two Parameters
subscriber_config_two_params = prt.MQConfig(conn_str=mq_conn_str, queue="my-direct-queue")
@prt.mq.consumer(subscriber_config_two_params)
async def process_message_twoparams(body, incoming_msg):
"""
A consumer that receives the decoded message and the raw IncomingMessage
with properties such as message id.
"""
print("Received body:", body)
print("IncomingMessage Properties:")
for key, value in incoming_msg.properties:
print(f"- {key}: {value}")
if test_consumer:
print("Starting consumer. This cell will block until notebook kernel is interrupted or restarted.")
await prt.mq.test_consumer(process_message_twoparams)
6. Production-Ready Consumer (Subscriber) Registration
In a production scenario, you can create one or more Python files that define consumer functions (using the @prt.mq.consumer
decorator). When the Practicus AI app starts, all consumers will be automatically loaded and run as background asynchronous jobs.
Example Consumer File:
# mqs/consume.py
import practicuscore as prt
from pydantic import BaseModel
class MyMsg(BaseModel):
text: str
mq_conn_str = "..."
# Consumer configuration
mq_config = prt.MQConfig(
conn_str=mq_conn_str,
queue="my-first-queue",
)
@prt.mq.consumer(mq_config)
async def consume_message(message: MyMsg):
print(f"Received message: {message.text}")
@prt.mq.consumer(mq_config)
async def consume_message_raw(message):
# For raw message processing
print(f"Received raw message: {message}")
Note: Since we have two consumers connecting to the same queue, incoming messages will be distributed "round-robin". E.g. first message will be handled by consume_message(), second one by consume_message_raw(), third one by consume_message() again ...
import practicuscore as prt
# View application deployment settings and application prefixes
region = prt.get_default_region()
my_app_settings = prt.apps.get_deployment_setting_list()
print("Application deployment settings available:")
display(my_app_settings.to_pandas())
my_app_prefixes = prt.apps.get_prefix_list()
print("Application prefixes (groups) available:")
display(my_app_prefixes.to_pandas())
assert app_deployment_key, "Please select an app deployment setting."
assert app_prefix, "Please select an app prefix."
async def test_publisher():
from apis.publish import MyMsg
payload = MyMsg(text="Testing API")
# Test the publishing API asynchronously
# You can use prt.apps.test_api()
response = prt.apps.test_api("/publish", payload)
print("Response:", response)
if test_api:
await test_publisher()
7. Deploying the Practicus AI App
After testing your publishers and consumers locally, you can deploy the Practicus AI app. This deployment creates:
- An API endpoint to accept publish requests
- Background tasks that run your consumer functions
The following code deploys the app and returns both the UI and API URLs.
app_name = "my-mq-app"
visible_name = "My RabbitMQ App"
description = "Application that hosts APIs to publish to RabbitMQ and background consumers to process messages."
icon = "fa-stream"
app_url, api_url = prt.apps.deploy(
deployment_setting_key=app_deployment_key,
prefix=app_prefix,
app_name=app_name,
app_dir=None,
visible_name=visible_name,
description=description,
icon=icon,
)
print("API Endpoint URL:", api_url)
def send_request():
from apis.publish import MyMsg
import requests
token = prt.apps.get_session_token(api_url=api_url)
publish_api_url = f"{api_url}publish/"
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
payload = MyMsg(text="Testing API")
json_data = payload.model_dump_json(indent=2)
print(f"Sending the following JSON to: {publish_api_url}")
print(json_data)
resp = requests.post(publish_api_url, json=json_data, headers=headers)
if resp.ok:
print("Response:")
print(resp.text)
else:
print("Error:", resp.status_code, resp.text)
send_request()
async def send_request_async():
from apis.publish import MyMsg
import httpx
token = prt.apps.get_session_token(api_url=api_url)
publish_api_url = f"{api_url}publish/"
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
payload = MyMsg(text="Testing API with an async call")
json_data = payload.model_dump_json(indent=2)
print(f"Sending (async) the following JSON to: {publish_api_url}")
print(json_data)
async with httpx.AsyncClient() as client:
response = await client.post(publish_api_url, data=json_data, headers=headers)
if response.status_code < 300:
print("Response:")
print(response.text)
else:
print("Error:", response.status_code, response.text)
await send_request_async()
Verifying Consumption
- After an API call, the message payload is forwarded to the designated RabbitMQ queue via the exchange.
- Consumers receive the messages in real time, automatically decoding and processing the content while logging the data and associated metadata.
- You can verify successful message consumption by monitoring your application's logs in Grafana or by checking logs directly from your Kubernetes pods.
- If multiple consumers are registered for the same queue (as in the
consume.py
example), messages are distributed round-robin. E.g., the first message is processed byconsume_message()
, the second byconsume_message_raw()
, then the third byconsume_message()
, and so on. If you need both consumer functions to receive the same messages, you can configure two separate queues. - This end-to-end flow confirms that your API is effectively integrated with the background consumer processes.
Cleaning-up
Delete the deployed app
8. Exporting RabbitMQ Topology for Production
In production environments where you lack permissions to create RabbitMQ resources, use prt.mq.export_topology(config)
to generate Kubernetes YAML files. You can then apply these via kubectl apply -f
or use the provided shell scripts.
Naming Tip: Ensure that RabbitMQ resource names are RFC 1123 compliant (e.g., my-exchange
or my.exchange
instead of my_exchange
) to avoid issues with Kubernetes resource naming.
# Generating new RabbitMQ topology files for production
mq_user = "new-user"
mq_pwd = "new-user-password"
mq_host = "prt-mq-default"
mq_vhost = "some-vhost"
mq_conn_str = f"amqp://{mq_user}:{mq_pwd}@{mq_host}/{mq_vhost}"
new_config = prt.MQConfig(
conn_str=mq_conn_str,
exchange="new-exchange",
routing_key="new-routing-key",
queue="new-queue",
exchange_type="direct"
)
prt.mq.export_topology(new_config)
# For more export options (e.g., hiding passwords), see the help for export_topology().
9. Advanced Features
Headers Exchange
If you set exchange_type="headers"
in MQConfig
, you can route messages based on custom headers:
- For publishing, set
config.headers
to attach custom headers with your message. - For consuming, the same
config.headers
is used as binding arguments (with an optionalx-match
property).
Dead-Letter Exchanges (DLX)
Set dead_letter_exchange
and optionally dead_letter_routing_key
in MQConfig
to route messages that exceed max_retries
to a dead-letter exchange.
Prefetch Concurrency
prefetch_count
limits the number of messages delivered to a consumer before waiting for acknowledgments, helping you control load.
Verify Delivery
With verify_delivery=True
, the SDK sets the mandatory flag
so that if a message is unroutable, the broker returns it. Note that our example implementation does not attach a return callback, so only general connection failures trigger retries.
Customizing MQ topology templates
If you would like to customize the way Practicus AI auto-generates MQ topology files, please update the corresponding ~/practicus/templates/mq_*.yaml Jinja template file.
10. Summary & Next Steps
In this example we have:
- Introduced RabbitMQ fundamentals (exchanges, queues, bindings).
- Demonstrated how to create or export topology using
apply_topology
andexport_topology
. - Provided examples for asynchronous publishing (both via an exchange and direct to a queue).
- Shown how to define and test consumers using the SDK (including single and dual-parameter approaches).
- Explained advanced features such as headers exchanges, dead-lettering, prefetch concurrency, and verify delivery.
- Illustrated a production-ready pattern for registering consumers from a folder.
- Demonstrated how to deploy a Practicus AI app that exposes publishing APIs and runs background consumers.
Next Steps:
- Integrate these examples into your GenAI or microservices pipelines.
- For advanced tracing or logging, extend the SDK or integrate with external systems.
- Customize deployment parameters as needed and consider using secure credential management (e.g.,
prt.vault
).
Supplementary Files
apis/publish.py
import practicuscore as prt
from pydantic import BaseModel
mq_conn = None
mq_user = "guest"
mq_pwd = "guest"
mq_host = "prt-mq-default"
mq_vhost = "" # Recommended for fine-grained security
mq_conn_str = f"amqp://{mq_user}:{mq_pwd}@{mq_host}/{mq_vhost}"
# Assuming queue(s) and binding(s) are already configured.
mq_config = prt.MQConfig(
conn_str=mq_conn_str,
exchange="my-first-exchange",
)
class MyMsg(BaseModel):
text: str
async def connect_mq():
global mq_conn
mq_conn = await prt.mq.connect(mq_config)
@prt.apps.api("/publish")
async def publish(payload: MyMsg, **kwargs):
if mq_conn is None:
await connect_mq()
await prt.mq.publish(conn=mq_conn, msg=payload)
return {"ok": True}
mqs/consume.py
import practicuscore as prt
from pydantic import BaseModel
class MyMsg(BaseModel):
text: str
# In production, please use prt.vault to avoid hard coding the secret in your code
mq_user = "guest"
mq_pwd = "guest"
mq_host = "prt-mq-default"
mq_vhost = "" # Recommended for fine-grained security
mq_conn_str = f"amqp://{mq_user}:{mq_pwd}@{mq_host}/{mq_vhost}"
# Subscriber configuration
mq_config = prt.MQConfig(
conn_str=mq_conn_str,
queue="my-first-queue",
)
# If you haven't configured the MQ topology in advance
# prt.mq.apply_topology(mq_conf)
@prt.mq.consumer(mq_config)
async def consume_message(message: MyMsg):
"""
Consumes an incoming RabbitMQ message, deserializing it into a Pydantic model first.
**Parameters:**
- message (MyMsg):
A Pydantic model instance representing the incoming message.
**Returns:**
- None
"""
print(f"Received message: {message.text}")
@prt.mq.consumer(mq_config)
async def consume_message_raw(message):
"""
Processes an incoming RabbitMQ message, *without* deserializing into a Pydantic model first.
**Parameters:**
- message:
Binary data representing the incoming message, which will be a json in our test.
**Returns:**
- None
"""
print(f"Received raw message: {message}")
Previous: Build | Next: Agentic AI > Build