RabbitMQ Batch Consumer Application
A Python application that consumes messages from a RabbitMQ topic exchange with batch processing and parallel execution using threading.
Features
- ✅ Batch Processing: Collects messages into batches of 10 before processing
- ✅ Prefetch Control: QoS prefetch count of 10 for optimal throughput
- ✅ Parallel Processing: Uses ThreadPoolExecutor for concurrent batch processing
- ✅ Async I/O: Built with aio_pika for efficient async RabbitMQ operations
- ✅ Error Handling: Individual message failures don’t block batch acknowledgment
- ✅ Configuration Management: Pydantic-settings for validated environment variables
- ✅ Dockerized: Complete Docker and Docker Compose setup
Installation
-
make init -
docker compose up -d -
make publish
Access RabbitMQ Management UI:
- URL: http://localhost:15672
- Username:
guest - Password:
guest
RabbitmqHandler class
- Message Collection: Messages are collected into a batch.
- Parallel Processing: Each batch is processed in a separate thread from the
ThreadPoolExecutor. - Individual Processing: The callback function is called for each message in the batch.
- Error Handling: If a message fails, we continue processing next messages.
- Ack/nack: Successful messages in the batch are acknowledged, and failed ones are nacked.
Architecture
┌─────────────┐
│ RabbitMQ │
│ Exchange │
└──────┬──────┘
│ (topic: events.#)
▼
┌─────────────┐
│ Queue │
└──────┬──────┘
│ (prefetch: 10)
▼
┌─────────────────────┐
│ RabbitmqHandler │
│ - Collects batch │
│ - Size: 10 │
└──────┬──────────────┘
│
▼
┌─────────────────────┐
│ ThreadPoolExecutor │
│ - Max workers: 4 │
│ - Parallel batches │
└──────┬──────────────┘
│
▼
┌─────────────────────┐
│ Callback Function │
│ - Process message │
│ - Print to console │
└─────────────────────┘
Troubleshooting
Consumer not receiving messages
- Check RabbitMQ is running:
docker ps. - Verify exchange and queue exist in RabbitMQ Management UI.
- Check routing key matches between publisher and consumer.
- Review logs:
docker compose logs -f appordocker compose logs -f rabbitmq.
Connection errors
- Ensure RabbitMQ is healthy:
docker compose ps. - Check credentials in environment variables.
- Verify network connectivity.
Messages not being acknowledged
- Check for errors in callback function.
- Review batch processing logs.
- Ensure
ThreadPoolExecutorhas available workers.