Skip to main content

限流优先级调度队列

本例程演示如何使用 librm 中的 ThrottledPrioQueue 类实现带优先级调度和截止时间丢弃的限流队列。

ThrottledPrioQueue 是一个通用的定频出队数据结构,适用于一切"生产速率高于消费速率、需要有序调度且过时数据可以丢弃"的场景,典型用例是 CAN 总线的定频发送(ThrottledCan 内部即使用此类)。

工作原理

每次调用 Process() 时,队列按以下步骤处理:

  1. 清理过期项:弹出所有 deadline 早于当前时间的项(过时数据直接丢弃)
  2. 限频检查:若距离上次成功出队的时间间隔未达到 1fmax\frac{1}{f_{max}},返回 nullopt
  3. 出队:取出优先级最高(优先级相同则 deadline 最早)的项,更新上次处理时间戳,返回负载数据

基础用法

#include <librm.hpp>
#include <iostream>

using namespace std::chrono_literals;
using clock = std::chrono::steady_clock;

struct Message {
int id;
std::string content;
};

// 创建队列:负载类型 Message,最大深度 16,限频 10 Hz
rm::modules::ThrottledPrioQueue<Message, 16> queue{10.0};

// 生产者:入队
// 参数:负载数据、优先级(0–255,越大越优先)、绝对截止时间点
queue.Push({"hello"}, 128, clock::now() + 500ms);
queue.Push({"urgent"}, 255, clock::now() + 100ms);
queue.Push({"low-prio"}, 64, clock::now() + 1s);

// 消费者:定期调用 Process(),按限频策略出队
while (true) {
auto result = queue.Process(); // 自动获取当前时间
if (result.has_value()) {
std::cout << "Got: " << result->content << std::endl;
}
// 注意:即使 Process() 返回 nullopt,也应继续循环,等待下一个间隔
}

指定时间点(高精度场景)

在中断或已知准确时间的上下文中,可以传入 time_point 避免重复调用 clock::now()

const auto now = clock::now();

// 使用同一个时间点处理多个队列,保证一致性
auto r1 = queue_a.Process(now);
auto r2 = queue_b.Process(now);

优先级调度规则

出队顺序遵循以下规则(大顶堆):

  1. 优先级数值越大,越先出队(范围 0–255)
  2. 优先级相同时,deadline 越早的越先出队(EDF,最早截止时间优先)
using namespace std::chrono_literals;
using clock = std::chrono::steady_clock;

rm::modules::ThrottledPrioQueue<int, 8> queue{100.0};

auto now = clock::now();

queue.Push(1, 128, now + 500ms); // 普通优先级
queue.Push(2, 255, now + 500ms); // 高优先级,先出队
queue.Push(3, 128, now + 100ms); // 同优先级,deadline 更早,比 1 先出队

// 出队顺序:2 → 3 → 1

截止时间丢弃

超过 deadline 仍未出队的项会在下次 Process() 时被自动清除,不会占用队列空间:

using namespace std::chrono_literals;
using clock = std::chrono::steady_clock;

rm::modules::ThrottledPrioQueue<int, 4> queue{1.0}; // 1 Hz

// 入队一个 10 ms 后过期的项
queue.Push(42, 128, clock::now() + 10ms);

// 等待超过 deadline
osDelay(20);

// Process() 会先清理过期项,此项被丢弃,返回 nullopt
auto result = queue.Process(); // nullopt
warning

Process() 清理过期项时,优先级堆并不是按 deadline 排序的,因此清理只能从堆顶开始。若堆顶项未过期,即使堆内存在更早过期的项,它们也不会在本次 Process() 中被清理——而是在它们自己成为堆顶时才被清理。

这意味着高优先级的过期项可能会暂时阻塞对堆内低优先级项的清理。在实际使用中,合理设置 deadline 和优先级可以避免此问题。

队列满时的行为

当队列已达最大深度时,Push() 返回 false,新数据被丢弃:

rm::modules::ThrottledPrioQueue<int, 2> queue{10.0};

bool ok1 = queue.Push(1, 128, clock::now() + 1s); // true
bool ok2 = queue.Push(2, 128, clock::now() + 1s); // true
bool ok3 = queue.Push(3, 128, clock::now() + 1s); // false,队列已满

查看队列状态与清空

queue.size();   // 当前队列中的项数
queue.empty(); // 是否为空
queue.Clear(); // 清空所有项

在 RTOS 任务中使用

#include "cmsis_os.h"
#include <librm.hpp>

using namespace std::chrono_literals;
using clock = std::chrono::steady_clock;

struct SensorData {
float value;
uint32_t sensor_id;
};

// 全局队列:最多缓存 32 条,限频 100 Hz
rm::modules::ThrottledPrioQueue<SensorData, 32> sensor_queue{100.0};

// 生产者任务(高频采样)
extern "C" void SensorTask(const void *) {
for (;;) {
SensorData data{ReadSensor(), 1};
// 紧急数据设置较短 deadline,确保及时处理或丢弃
sensor_queue.Push(data, 200, clock::now() + 50ms);
osDelay(1); // 1000 Hz 采样
}
}

// 消费者任务(定频处理)
extern "C" void ProcessTask(const void *) {
for (;;) {
auto result = sensor_queue.Process();
if (result.has_value()) {
ProcessData(result.value());
}
osDelay(1); // 驱动频率需高于限频上限(100 Hz)
}
}
info

ThrottledPrioQueue 本身不是线程安全的。如果生产者和消费者在不同任务中并发访问,需要在外部加互斥锁保护。