长沙极速数控设备维修有限公司

使用C ++ 11线程支持库创建带有事件循环,消息队列和计时器的辅助线程

时间:2020-09-14 15:31 来源:互联网 作者:源码搜藏 浏览: 次 收藏 挑错 推荐 打印

事件循环(有时称为消息循环)是等待并调度传入事件的线程。 线程阻塞等待请求的到达,然后将事件分派给事件处理程序函数。 循环通常使用消息队列来保存传入消息。 依次对每个消息进行出队,解码,然后执行操作。 事件循环是实现进程间通信的一种方法。 所有

事件循环(有时称为消息循环)是等待并调度传入事件的线程。线程阻塞等待请求的到达,然后将事件分派给事件处理程序函数。循环通常使用消息队列来保存传入消息。依次对每个消息进行出队,解码,然后执行操作。事件循环是实现进程间通信的一种方法。

所有操作系统都支持多线程应用程序。每个操作系统都有用于创建线程,消息队列和计时器的唯一函数调用。随着C ++ 11线程支持库的出现,现在可以创建可移植的代码并避免特定于OS的函数调用。本文提供了一个简单的示例,说明如何仅依靠C ++标准库来创建线程事件循环,消息队列和计时器服务。任何支持线程库的C ++ 11编译器都应该能够编译附加的源代码。

背景

通常,我需要一个线程来充当事件循环。线程将入站消息出队,并根据唯一的消息标识符将数据调度到适当的函数处理程序。能够调用功能的计时器支持对于低速轮询很方便,如果在预期的时间内没有发生任何事情,则可以生成超时。很多时候,辅助线程是在启动时创建的,直到应用程序终止后才被销毁。

该实现的关键要求是,传入消息必须在同一线程实例上执行。尽管std::async 可以使用池中的临时线程,但是此类确保所有传入消息使用同一线程。例如,可以使用不是线程安全的代码来实现子系统。单个WorkerThread 实例用于安全地将函数调用分派到子系统中。

乍一看,C ++线程支持似乎缺少一些关键功能。是的,std::thread 可以拆分一个线程,但是没有线程安全队列,也没有计时器-大多数OS都提供的服务。我将展示如何使用C ++标准库创建这些“缺失”功能,并提供许多程序员熟悉的事件处理循环。

工作线程

WorkerThread 类封装了所有必要的事件循环机制。一个简单的类接口允许线程创建,将消息发布到事件循环以及最终的线程终止。界面如下图所示:

class WorkerThread
{
public:
    /// Constructor
    WorkerThread(const char* threadName);

    /// Destructor
    ~WorkerThread();

    /// Called once to create the worker thread
    /// @return True if thread is created. False otherwise. 
    bool CreateThread();

    /// Called once a program exit to exit the worker thread
    void ExitThread();

    /// Get the ID of this thread instance
    /// @return The worker thread ID
    std::thread::id GetThreadId();

    /// Get the ID of the currently executing thread
    /// @return The current thread ID
    static std::thread::id GetCurrentThreadId();

    /// Add a message to the thread queue
    /// @param[in] data - thread specific message information
    void PostMsg(std::shared_ptr<UserData> msg);

private:
    WorkerThread(const WorkerThread&) = delete;
    WorkerThread& operator=(const WorkerThread&) = delete;

    /// Entry point for the worker thread
    void Process();

    /// Entry point for timer thread
    void TimerThread();

    std::unique_ptr<std::thread> m_thread;
    std::queue<std::shared_ptr<ThreadMsg>> m_queue;
    std::mutex m_mutex;
    std::condition_variable m_cv;
    std::atomic<bool> m_timerExit;
    const char* THREAD_NAME;
};

首先要注意的是,std::thread 它用于创建主工作线程。主要工作线程功能是Process()

bool WorkerThread::CreateThread()
{
    if (!m_thread)
        m_thread = new thread(&WorkerThread::Process, this);
    return true;
}

事件循环

Process() 事件循环如下所示。该线程依赖于a std::queue<ThreadMsg*> 消息队列。std::queue 不是线程安全的,因此对队列的所有访问都必须由互斥保护。std::condition_variable 用于暂停线程,直到收到新消息已添加到队列的通知。

void WorkerThread::Process()
{
    m_timerExit = false;
    std::thread timerThread(&WorkerThread::TimerThread, this);

    while (1)
    {
        std::shared_ptr<ThreadMsg> msg;
        {
            // Wait for a message to be added to the queue
            std::unique_lock<std::mutex> lk(m_mutex);
            while (m_queue.empty())
                m_cv.wait(lk);

            if (m_queue.empty())
                continue;

            msg = m_queue.front();
            m_queue.pop();
        }

        switch (msg->id)
        {
            case MSG_POST_USER_DATA:
            {
                ASSERT_TRUE(msg->msg != NULL);

                auto userData = std::static_pointer_cast<UserData>(msg->msg);
                cout << userData->msg.c_str() << " " << userData->year << " on " << THREAD_NAME << endl;

                break;
            }

            case MSG_TIMER:
                cout << "Timer expired on " << THREAD_NAME << endl;
                break;

            case MSG_EXIT_THREAD:
            {
                m_timerExit = true;
                timerThread.join();
                return;
            }

            default:
                ASSERT();
        }
    }
}

PostMsg() ThreadMsg 在堆上创建一个新对象,将该消息添加到队列中,然后使用条件变量通知工作线程。

void WorkerThread::PostMsg(std::shared_ptr<UserData> data)
{
    ASSERT_TRUE(m_thread);

    // Create a new ThreadMsg
    std::shared_ptr<ThreadMsg> threadMsg(new ThreadMsg(MSG_POST_USER_DATA, data));

    // Add user data msg to queue and notify worker thread
    std::unique_lock<std::mutex> lk(m_mutex);
    m_queue.push(threadMsg);
    m_cv.notify_one();
}

循环将继续处理消息,直到MSG_EXIT_THREAD 收到并退出线程为止

void WorkerThread::ExitThread()
{
    if (!m_thread)
        return;

    // Create a new ThreadMsg
    std::shared_ptr<ThreadMsg> threadMsg(new ThreadMsg(MSG_EXIT_THREAD, 0));

    // Put exit thread message into the queue
    {
        lock_guard<mutex> lock(m_mutex);
        m_queue.push(threadMsg);
        m_cv.notify_one();
    }

    m_thread->join();
    m_thread = nullptr;
}

事件循环(Win32)

下面的代码片段将std::thread 上面事件循环与使用Windows API的类似Win32版本进行了对比注意GetMessage() API用于代替std::queue使用将消息发布到OS消息队列PostThreadMessage()最后,timerSetEvent() 用于将WM_USER_TIMER 消息放入队列。所有这些服务均由OS提供。std::thread WorkerThread 此处介绍实现避免了原始OS调用,但实现功能与Win32版本相同,而仅依赖于C ++标准库。

unsigned long WorkerThread::Process(void* parameter)
{
    MSG msg;
    BOOL bRet;

    // Start periodic timer
    MMRESULT timerId = timeSetEvent(250, 10, &WorkerThread::TimerExpired, 
                       reinterpret_cast<DWORD>(this), TIME_PERIODIC);

    while ((bRet = GetMessage(&msg, NULL, WM_USER_BEGIN, WM_USER_END)) != 0)
    {
        switch (msg.message)
        {
            case WM_DISPATCH_DELEGATE:
            {
                ASSERT_TRUE(msg.wParam != NULL);

                // Convert the ThreadMsg void* data back to a UserData*
                const UserData* userData = static_cast<const UserData*>(msg.wParam);

                cout << userData->msg.c_str() << " " << userData->year << " on " << THREAD_NAME << endl;

                // Delete dynamic data passed through message queue
                delete userData;
                break;
            }

            case WM_USER_TIMER:
                cout << "Timer expired on " << THREAD_NAME << endl;
                break;

            case WM_EXIT_THREAD:
                timeKillEvent(timerId);
                return 0;

            default:
                ASSERT();
        }
    }
    return 0;
}

计时器

使用辅助专用线程将低分辨率定期计时器消息插入队列。计时器线程在内部创建Process()

void WorkerThread::Process()
{
    m_timerExit = false;
    std::thread timerThread(&WorkerThread::TimerThread, this);

...

计时器线程的唯一责任是MSG_TIMER 每250ms 插入一条消息。在此实现中,无法防止计时器线程将多个计时器消息注入到队列中。如果工作线程落后并且无法足够快地服务于消息队列,则可能发生这种情况。根据工作线程,处理负载以及计时器消息的插入速度,可以采用其他逻辑来防止泛滥队列。

void WorkerThread::TimerThread()
{
    while (!m_timerExit)
    {
        // Sleep for 250mS then put a MSG_TIMER into the message queue
        std::this_thread::sleep_for(250ms);

        std::shared_ptr<ThreadMsg> threadMsg (new ThreadMsg(MSG_TIMER, 0));

        // Add timer msg to queue and notify worker thread
        std::unique_lock<std::mutex> lk(m_mutex);
        m_queue.push(threadMsg);
        m_cv.notify_one();
    }
}

用法

main()下面函数显示了如何使用WorkerThread 该类。创建两个工作线程,并将消息发布到每个工作线程。短暂延迟后,两个线程均退出。

// Worker thread instances
WorkerThread workerThread1("WorkerThread1");
WorkerThread workerThread2("WorkerThread2");

int main(void)
{    
    // Create worker threads
    workerThread1.CreateThread();
    workerThread2.CreateThread();

    // Create message to send to worker thread 1
    std::shared_ptr<UserData> userData1(new UserData());
    userData1->msg = "Hello world";
    userData1->year = 2017;

    // Post the message to worker thread 1
    workerThread1.PostMsg(userData1);

    // Create message to send to worker thread 2
    std::shared_ptr<UserData> userData2(new UserData());
    userData2->msg = "Goodbye world";
    userData2->year = 2017;

    // Post the message to worker thread 2
    workerThread2.PostMsg(userData2);

    // Give time for messages processing on worker threads
    this_thread::sleep_for(1s);

    workerThread1.ExitThread();
    workerThread2.ExitThread();

    return 0;
}

结论

C ++线程支持库提供了独立于平台的方式来编写多线程应用程序代码,而无需依赖于特定于操作系统的API。WorkerThread 这里介绍类是事件循环的基本实现,但所有基础知识都已准备就绪,可以进行扩展。

使用C ++ 11线程支持库创建带有事件循环,消息队列和计时器的辅助线程 转载http://citizenmori.com/appboke/52393.html

技术博客阅读排行

最新文章