Skip to content

Priority Queue

๐Ÿงช Beta Feature
RedisPriorityQueue is currently experimental. The API may change in future releases based on feedback.
Only JSON-serializable value types are supported: str, int, float, and bool.

RedisPriorityQueue is a priority queue field backed by a Redis Sorted Set. Items with lower priority scores are popped first.

Unlike regular Redis types (RedisStr, RedisList, etc.), the priority queue stores its data in a separate Redis key โ€” not inline with the model's JSON. This means it operates as a pure Redis proxy with no local state.

from pydantic import Field
from rapyer import AtomicRedisModel
from rapyer.types import RedisPriorityQueue


class JobQueue(AtomicRedisModel):
    name: str = "default"
    tasks: RedisPriorityQueue[str] = Field(default_factory=RedisPriorityQueue)

Basic Usage

queue = JobQueue(name="emails")
await queue.asave()

# Push items with a priority score
await queue.tasks.apush("send_welcome", priority=1.0)
await queue.tasks.apush("send_digest", priority=3.0)
await queue.tasks.apush("send_alert", priority=2.0)

# Pop returns the item with the lowest score
task = await queue.tasks.apop()  # "send_welcome"
task = await queue.tasks.apop()  # "send_alert"
task = await queue.tasks.apop()  # "send_digest"
task = await queue.tasks.apop()  # None (empty)

Generic Type Support

RedisPriorityQueue accepts any JSON-serializable type as its generic parameter:

class IntQueue(AtomicRedisModel):
    scores: RedisPriorityQueue[int] = Field(default_factory=RedisPriorityQueue)

class FloatQueue(AtomicRedisModel):
    measurements: RedisPriorityQueue[float] = Field(default_factory=RedisPriorityQueue)

Operations

Operation Method Description
push await queue.tasks.apush(value, priority) Add an item with a priority score
push many await queue.tasks.apush_many(items) Add multiple items at once
pop await queue.tasks.apop() Remove and return the lowest-priority item
peek await queue.tasks.apeek() Return the lowest-priority item without removing it
size await queue.tasks.asize() Return the number of items
items await queue.tasks.aitems() Return all items sorted by priority
remove await queue.tasks.aremove(value) Remove a specific value
clear await queue.tasks.aclear() Remove all items

Batch Push

Use PriorityQueueItem to push multiple items in a single Redis command:

from rapyer.types.priority_queue import PriorityQueueItem

await queue.tasks.apush_many([
    PriorityQueueItem(value="task_a", priority=1.0),
    PriorityQueueItem(value="task_b", priority=2.0),
    PriorityQueueItem(value="task_c", priority=3.0),
])

Listing Items

aitems() returns a list of PriorityQueueItem objects sorted by priority (ascending):

items = await queue.tasks.aitems()
for item in items:
    print(f"{item.value} (priority: {item.priority})")

Optional Fields

Priority queue fields can be optional:

class Worker(AtomicRedisModel):
    name: str = "default"
    tasks: Optional[RedisPriorityQueue[str]] = None

Set the field after initialization to start using it:

worker = Worker()
await worker.asave()

worker.tasks = RedisPriorityQueue()
await worker.tasks.apush("job_1", priority=1.0)

How It Works

RedisPriorityQueue is a special field type โ€” it stores data in a separate Redis key derived from the parent model's key:

__rapyer_special__:{ModelName}:{pk}:tasks

This means:

  • Save/delete are automatic โ€” calling asave() or adelete() on the parent model handles the priority queue's Redis key and TTL automatically.
  • TTL is inherited โ€” if the parent model has a TTL configured, the queue's key gets the same expiration.
  • No local state โ€” all operations (apush, apop, etc.) go directly to Redis.

Cannot use aupdate() with special fields

Special fields manage their own Redis storage and cannot be passed to aupdate(). Use the field's own methods instead.

# โŒ This raises UpdateAtomicModelError
await model.aupdate(tasks=some_value)

# โœ… Use the field's methods directly
await model.tasks.apush("value", priority=1.0)