jiegemena Blog

哪些事情,我相信!

线程安全队列,不阻塞队列

使用了 C++11 的标准库特性 特性说明

  1. 线程安全:使用互斥锁和条件变量保证线程安全

  2. 非阻塞操作:

try_pop() 立即返回,不阻塞 wait_and_pop() 可设置超时时间 3. 优雅停止:stop() 方法可以安全停止队列

  1. 高效:使用移动语义减少拷贝开销

  2. 模板化:支持任意数据类型

使用

#include "plltools/threadsafe_queue.h"
#include <iostream>
#include <thread>

void producer(plltools::ThreadSafeQueue<int>& queue) {
    for (int i = 0; i < 10; ++i) {
        queue.push(i);
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    queue.stop(); // 生产完成后停止队列
}

void consumer(plltools::ThreadSafeQueue<int>& queue) {
    int value;
    while (!queue.is_stopped()) {
        if (queue.wait_and_pop(value, std::chrono::milliseconds(500))) {
            std::cout << "Consumed: " << value << std::endl;
        }
    }
    
    // 处理剩余元素
    while (queue.try_pop(value)) {
        std::cout << "Processed remaining: " << value << std::endl;
    }
}

int main() {
    plltools::ThreadSafeQueue<int> queue;
    
    std::thread producer_thread(producer, std::ref(queue));
    std::thread consumer_thread(consumer, std::ref(queue));
    
    producer_thread.join();
    consumer_thread.join();
    
    return 0;
}

threadsafe_queue.h

#ifndef THREADSAFE_QUEUE_H
#define THREADSAFE_QUEUE_H

#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <atomic>

namespace plltools {

template<typename T>
class ThreadSafeQueue {
public:
    ThreadSafeQueue() : _stop(false) {}
    ~ThreadSafeQueue() { stop(); }

    // 禁止拷贝和赋值
    ThreadSafeQueue(const ThreadSafeQueue&) = delete;
    ThreadSafeQueue& operator=(const ThreadSafeQueue&) = delete;

    // 入队操作 (非阻塞)
    void push(T&& value) {
        std::lock_guard<std::mutex> lock(_mutex);
        if (!_stop) {
            _queue.push(std::move(value));
            _cond.notify_one();
        }
    }

    // 出队操作 (非阻塞,立即返回)
    bool try_pop(T& value) {
        std::lock_guard<std::mutex> lock(_mutex);
        if (_queue.empty() || _stop) {
            return false;
        }
        value = std::move(_queue.front());
        _queue.pop();
        return true;
    }

    // 等待出队操作 (带超时)
    bool wait_and_pop(T& value, std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) {
        std::unique_lock<std::mutex> lock(_mutex);
        if (timeout.count() > 0) {
            if (!_cond.wait_for(lock, timeout, [this]() { return !_queue.empty() || _stop; })) {
                return false;
            }
        } else {
            _cond.wait(lock, [this]() { return !_queue.empty() || _stop; });
        }

        if (_stop || _queue.empty()) {
            return false;
        }

        value = std::move(_queue.front());
        _queue.pop();
        return true;
    }

    // 获取队列大小
    size_t size() const {
        std::lock_guard<std::mutex> lock(_mutex);
        return _queue.size();
    }

    // 判断队列是否为空
    bool empty() const {
        std::lock_guard<std::mutex> lock(_mutex);
        return _queue.empty();
    }

    // 停止队列,唤醒所有等待线程
    void stop() {
        std::lock_guard<std::mutex> lock(_mutex);
        _stop = true;
        _cond.notify_all();
    }

    // 检查队列是否已停止
    bool is_stopped() const {
        std::lock_guard<std::mutex> lock(_mutex);
        return _stop;
    }

private:
    mutable std::mutex _mutex;
    std::condition_variable _cond;
    std::queue<T> _queue;
    std::atomic<bool> _stop;
};

} // namespace plltools

#endif // THREADSAFE_QUEUE_H