线程安全队列,不阻塞队列
使用了 C++11 的标准库特性 特性说明
-
线程安全:使用互斥锁和条件变量保证线程安全
-
非阻塞操作:
try_pop() 立即返回,不阻塞 wait_and_pop() 可设置超时时间 3. 优雅停止:stop() 方法可以安全停止队列
-
高效:使用移动语义减少拷贝开销
-
模板化:支持任意数据类型
使用
#include "plltools/threadsafe_queue.h"
#include <iostream>
#include <thread>
void producer(plltools::ThreadSafeQueue<int>& queue) {
for (int i = 0; i < 10; ++i) {
queue.push(i);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
queue.stop(); // 生产完成后停止队列
}
void consumer(plltools::ThreadSafeQueue<int>& queue) {
int value;
while (!queue.is_stopped()) {
if (queue.wait_and_pop(value, std::chrono::milliseconds(500))) {
std::cout << "Consumed: " << value << std::endl;
}
}
// 处理剩余元素
while (queue.try_pop(value)) {
std::cout << "Processed remaining: " << value << std::endl;
}
}
int main() {
plltools::ThreadSafeQueue<int> queue;
std::thread producer_thread(producer, std::ref(queue));
std::thread consumer_thread(consumer, std::ref(queue));
producer_thread.join();
consumer_thread.join();
return 0;
}
threadsafe_queue.h
#ifndef THREADSAFE_QUEUE_H
#define THREADSAFE_QUEUE_H
#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <atomic>
namespace plltools {
template<typename T>
class ThreadSafeQueue {
public:
ThreadSafeQueue() : _stop(false) {}
~ThreadSafeQueue() { stop(); }
// 禁止拷贝和赋值
ThreadSafeQueue(const ThreadSafeQueue&) = delete;
ThreadSafeQueue& operator=(const ThreadSafeQueue&) = delete;
// 入队操作 (非阻塞)
void push(T&& value) {
std::lock_guard<std::mutex> lock(_mutex);
if (!_stop) {
_queue.push(std::move(value));
_cond.notify_one();
}
}
// 出队操作 (非阻塞,立即返回)
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(_mutex);
if (_queue.empty() || _stop) {
return false;
}
value = std::move(_queue.front());
_queue.pop();
return true;
}
// 等待出队操作 (带超时)
bool wait_and_pop(T& value, std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) {
std::unique_lock<std::mutex> lock(_mutex);
if (timeout.count() > 0) {
if (!_cond.wait_for(lock, timeout, [this]() { return !_queue.empty() || _stop; })) {
return false;
}
} else {
_cond.wait(lock, [this]() { return !_queue.empty() || _stop; });
}
if (_stop || _queue.empty()) {
return false;
}
value = std::move(_queue.front());
_queue.pop();
return true;
}
// 获取队列大小
size_t size() const {
std::lock_guard<std::mutex> lock(_mutex);
return _queue.size();
}
// 判断队列是否为空
bool empty() const {
std::lock_guard<std::mutex> lock(_mutex);
return _queue.empty();
}
// 停止队列,唤醒所有等待线程
void stop() {
std::lock_guard<std::mutex> lock(_mutex);
_stop = true;
_cond.notify_all();
}
// 检查队列是否已停止
bool is_stopped() const {
std::lock_guard<std::mutex> lock(_mutex);
return _stop;
}
private:
mutable std::mutex _mutex;
std::condition_variable _cond;
std::queue<T> _queue;
std::atomic<bool> _stop;
};
} // namespace plltools
#endif // THREADSAFE_QUEUE_H