Win32多线程初步——生产者与消费者、死锁、信号量

注:本文是将之前发布在新浪博客的文章转移到CSDN。因为新浪博客太难用了,CSDN的MarkDown脚本有助于代码排版。


  闲来无事,折腾一下多线程。首先想到的是“生产者与消费者”问题。本帖循序渐进,最终构造出一个合格的程序。(高手就没必要看下去了)涉及到的概念有: 子线程生存期、CloseHandle、WaitForMultipleObjects 以及CriticalSection。
  首先来看最直观的解决方案:

—————————————————————— 第一步 ——————————————————————

#include "stdafx.h"
#include "windows.h"
#include "iostream"

using namespace std;

int iRes = 0;
DWORD WINAPI Producer(LPVOID);
DWORD WINAPI Customer(LPVOID);

int main(){
    HANDLE hP;
    DWORD  idP;
    HANDLE hC;
    DWORD  idC;

    hP = CreateThread(NULL, 0, Producer, 0, 0, &idP);
    hC = CreateThread(NULL, 0, Customer, 0, 0, &idC);
    return 0;
}

DWORD WINAPI Producer(LPVOID){
    while(true){
        iRes++;
        cout << iRes << endl;
        Sleep(1000);
    }
    return 0;
}

DWORD WINAPI Customer(LPVOID){
    while (true){
        if (iRes > 0){
            --iRes;
            cout << iRes << "  消费1次" << endl;
            Sleep(1000);
        }else{
            cout <<" 啊哦,资源不足,等待" << endl;
            Sleep(1000);
        }
    }
    return 0;
}

  iRes即为资源,Producer线程方法不断“生产”;Customer线程方法不断“消费”。F5运行,哎呀,窗口咋一闪就没了?!牵扯出第一个问题:子线程的生存期。当主线程结束的时候,子线程也会被强制结束。上述代码中,我们创建完子线程后,主线程马上结束了,所以他下面的子线程也被强制结束了。皮之不存毛将焉附。。。
  我们使用Sleep方法是使主线程不会马上结束。从而给子线程执行的时间。除此之外,我们还需在CreateThread之后调用CloseHandle关闭线程对象句柄。

—————————————————————— 第二步 ——————————————————————

#include "stdafx.h"
#include "windows.h"
#include "iostream"

using namespace std;

int iRes = 0;
DWORD WINAPI Producer(LPVOID);
DWORD WINAPI Customer(LPVOID);

int main(){
    HANDLE hP;
    DWORD  idP;
    HANDLE hC;
    DWORD  idC;

    hP = CreateThread(NULL, 0, Producer, 0, 0, &idP);
    hC = CreateThread(NULL, 0, Customer, 0, 0, &idC);
    CloseHandle(hP);
    CreateThread(hC);
    Sleep(20 * 1000);
    return 0;
}

DWORD WINAPI Producer(LPVOID){
    while(true){
        iRes++;
        cout << iRes << endl;
        Sleep(1000);
    }
    return 0;
}

DWORD WINAPI Customer(LPVOID){
    while (true){
        if (iRes > 0){
            --iRes;
            cout << iRes << "  消费1次" << endl;
            Sleep(1000);
        }else{
            cout <<" 啊哦,资源不足,等待" << endl;
            Sleep(1000);
        }
    }
    return 0;
}

F5运行,哈哈有了!但是咋这么怪异?
这里写图片描述
竟然是从10开始?怎么突然就21了? 翻了一下书,两个线程在访问iRes的时候起了冲突,怎么办?用CriticalSection!
—————————————————————- 第三步( CriticalSection)————————————————————-

#include "stdafx.h"
#include "windows.h"
#include "iostream"

using namespace std;

int iRes = 0;
CRITICAL_SECTION g_CriticalSection;
DWORD WINAPI Producer(LPVOID);
DWORD WINAPI Customer(LPVOID);

int main(){
    HANDLE hP;
    DWORD  idP;
    HANDLE hC;
    DWORD  idC;

    InitializeCriticalSection(&g_CriticalSection);

    hP = CreateThread(NULL, 0, Producer, 0, 0, &idP);
    hC = CreateThread(NULL, 0, Customer, 0, 0, &idC);
    CloseHandle(hP);     CloseHandle(hC);
    Sleep(20 * 1000);

    DeleteCriticalSection(&g_CriticalSection);
    return 0;
}

DWORD WINAPI Producer(LPVOID){
    while(true){
        EnterCriticalSection(&g_CriticalSection);
            iRes++;
            cout << iRes << endl;
        LeaveCriticalSection(&g_CriticalSection);
        Sleep(1000);
    }
    return 0;
}

DWORD WINAPI Customer(LPVOID){
    while (true){
        if (iRes > 0){
            EnterCriticalSection(&g_CriticalSection);
                --iRes;
                cout << iRes << "  消费1次" << endl;
            LeaveCriticalSection(&g_CriticalSection);
            Sleep(1000);
        }else{
            cout <<" 啊哦,资源不足,等待" << endl;
            Sleep(1000);
        }
    }
    return 0;
}

F5,哎,好了!此消彼长真和谐。
这里写图片描述
等等,让主线程Sleep 20秒,有点不太雅观,有没有什么优雅的方法等待子线程结束?
————————————————— 第四步(WaitForMutipleObjects) ———————————————————-

int main(){
    DWORD  idP;
    DWORD  idC;
    a.hMutex = CreateMutex(NULL, false, NULL);
    b.hMutex = CreateMutex(NULL, false, NULL);

    hArr[0] = CreateThread(NULL, 0, Producer, 0, 0, &idP);
    hArr[1] = CreateThread(NULL, 0, Customer, 0, 0, &idC);

    WaitForMultipleObjects(2, hArr, true, INFINITE);
    return 0;
}

  这样以来,主线程就会在子线程全部结束后才结束。至此,我们的资源只有一种,现实中往往没这么简单:消费者在消耗资源的时候,可能同时需要访问多个资源。假如一个线程函数需要同时使用N个资源,用这个线程函数创建多个线程极有可能引发死锁——使用Mutex解决。

————————————————————第五步(死锁) ——————————————————————
我们先模拟一下死锁。

#include "stdafx.h"
#include"define.h"

extern AtomCout atomcout;

struct Res {
    Res() :data(0), cs() {}
    Res(int val):data(val), cs(){}
    int data;
    CSLocker::CriticalSection cs;
};

void SwapResData(Res& A, Res& B) {
    A.cs.Lock();
        B.cs.Lock();
            int tmp = A.data;
            A.data = B.data;
            B.data = tmp;
            atomcout.out("thread: ", ::GetCurrentThreadId(), " swap finished \n");
        B.cs.UnLock();
    A.cs.UnLock();
}

SwapResData方法需要同时使用两个资源,缺一不可;

Res a(5);
Res b(6);

int main(){
    DWORD  idP;
    DWORD  idC;
    hArr[0] = CreateThread(NULL, 0, SwaperA, 0, 0, &idP);
    hArr[1] = CreateThread(NULL, 0, SwaperB, 0, 0, &idC);

    WaitForMultipleObjects(2, hArr, true, INFINITE);
    return 0;
}

DWORD WINAPI SwaperA(LPVOID) {
    while (true) {
        SwapResData(a, b);
        Sleep(1000);
    }
    return 0;
}
DWORD WINAPI SwaperB(LPVOID) {
    while (true) {
        SwapResData(b, a);
        Sleep(1000);
    }
    return 0;
}

main方法创建了两个线程,一个SwapResData(a, b); 另一个SwapResData(b, a);
如果不发生死锁,这两个线程会一直执行。假如在某个瞬间,SwaperA线程锁住了a.cs、SwaperB线程锁住了b.cs, 结果,SwaperA一直等待SwaperB释放b.cs,而SwaperB一直等待SwaperA释放a.cs,双方就这么一直干耗下去。。。

图中每个线程循环了三次之后,死锁发生了
从上面的情境中,我们对照下死锁发生的必要条件:

1,互斥。(a或b一次只能被一个线程使用)
2,请求与保持。(SwaperA线程因获取不到b.cs而阻塞,但并不会释放已获取的a.cs)
3,不可剥夺。(SwaperA线程因获取不到b.cs而阻塞,系统、SwaperB无法剥夺SwaperA已占有的a.cs)
4,循环等待。(SwaperA与SwaperB相互等待对方的资源)

打破上述四个条件其中的任何一个,死锁就不会发生了,那么,如何打破呢。。。使用Mutex代替CriticalSection,要么同时拥有两个资源,要么啥都不要。

#pragma once
#include "stdafx.h"
#include"define.h"

extern AtomCout atomcout;

struct ResEx {
    ResEx() :data(0), hMutex(NULL) {}
    ResEx(int val) :data(val), hMutex(NULL) {}
    int data;
    HANDLE hMutex;
};

extern ResEx a;
extern ResEx b;
void SwapResDataEx(ResEx& A, ResEx& B) {
    HANDLE hArr[2];
    hArr[0] = A.hMutex;
    hArr[1] = B.hMutex;

    WaitForMultipleObjects(2, hArr, true, INFINITE); //同时等待两个mutex,任何一个得不到就堵塞在这里。
        int tmp = A.data;
        A.data = B.data;
        B.data = tmp;
        atomcout.out("thread: ", ::GetCurrentThreadId(), " swap finished  ", a.data, "==>", b.data, " \n");
    ReleaseMutex(hArr[0]);   //释放两个mutex
    ReleaseMutex(hArr[1]);
}

此时创建两个线程,分别调用SwapResDataEx(a, b); SwapResDataEx(b, a);
便永远不会死锁了。

ResEx a(5);
ResEx b(6);

int main(){
    DWORD  idP;
    DWORD  idC;
    a.hMutex = CreateMutex(NULL, false, NULL);
    b.hMutex = CreateMutex(NULL, false, NULL);

    hArr[0] = CreateThread(NULL, 0, SwaperA, 0, 0, &idP);
    hArr[1] = CreateThread(NULL, 0, SwaperB, 0, 0, &idC);

    WaitForMultipleObjects(2, hArr, true, INFINITE);
    return 0;
}

DWORD WINAPI SwaperA(LPVOID) {
    while (true) {
        SwapResDataEx(a, b);
        Sleep(1000);
    }
    return 0;
}
DWORD WINAPI SwaperB(LPVOID) {
    while (true) {
        SwapResDataEx(b, a);
        Sleep(1000);
    }
    return 0;
}

这里写图片描述

————————————————————第六步(事件与信号量) ——————————————————————
第三步我们使用临界区实现了生产者与消费者问题(后来为了方便验证死锁与解锁,使用Mutex + Swap函数做模拟,抛弃了生产者与消费者问题。)现在再次回到生产者与消费者问题,我们第三步使用CriticalSection的实现除了有可能引发死锁,还有一个缺陷:当消费者发现没有资源可以消耗的时候,只是干巴巴的等1000ms,这是非常低效、不靠谱的。接下来我们引入信号量: 使用CreateSemaphore创建一个信号量。
Mutex可以看做是信号量的特殊版本:一个Mutex负责一个资源、一块不可被中断的代码块,它的状态要么是0要么是1,它只能被锁住一次,之后他人想锁住,就得等待。而信号量Semaphore可以被锁住多次。接下来我们使用Semaphore实现生产者与消费者。

HANDLE hFull;
HANDLE hEmpty;
HANDLE hMutex;

ResEx a(5);
ResEx b(1);
int iRes = 0;

int main(){
    DWORD  idP;
    DWORD  idC;
    hFull = CreateSemaphore(NULL, 0, 10, NULL);
    hEmpty = CreateSemaphore(NULL, 10, 10, NULL);

    for(int i = 0; i<3; ++i){
        hArr[i+3] = CreateThread(NULL, 0, Producer, 0, 0, &idP);
        hArr[i]   = CreateThread(NULL, 0, Customer, 0, 0, &idC);
    }

    //Sleep(1000*50);
    WaitForMultipleObjects(6, hArr, true, INFINITE);
    return 0;
}

先看main函数,我们创建了一个mutex、两个Semaphore。为啥需要两个Semaphore呢?为啥第二个信号量创建时的初值 == 最大值呢?先保留疑问,看接下来的生产者与消费者的实现代码:

extern HANDLE hFull;
extern HANDLE hEmpty;
extern HANDLE hMutex;

extern int iRes;

DWORD WINAPI Producer(LPVOID) {
    int count = 50;//让生产者工作50次后结束
    while (count-- > 0) {
        WaitForSingleObject(hEmpty, INFINITE);
        WaitForSingleObject(hMutex, INFINITE);
        atomcout.out("Produc one, ", ++iRes,"\n");
        INFO("Producer a:, b: ");
        ReleaseMutex(hMutex);
        ReleaseSemaphore(hFull, 1, NULL);
    }
    return 0;
}

DWORD WINAPI Customer(LPVOID) {
    int count = 50;//让生产者工作50次后结束
    while (count-- > 0) {
        WaitForSingleObject(hFull, INFINITE);
        WaitForSingleObject(hMutex, INFINITE);
        atomcout.out( "Custom one, ", --iRes, "\n");
        INFO("Customer a:, b: ");
        ReleaseMutex(hMutex);
        ReleaseSemaphore(hEmpty, 1, NULL);
    }
    return 0;
}

信号量每锁住一次(WaitForXXXObject返回),计数便会减一;每释放一次(ReleaseSemaphore返回),计数便会加一。当计数为0时,便无法再锁住(WaitForXXXObject阻塞)。。使用两个信号量分别管理资源的产生与消耗两个动作,而mutex保证对iRes的操作是原子的。

生产者首先执行:WaitForSingleObject(hEmpty, INFINITE);由于hEmpty的初值为10,处于可激发态,进而马上将iRes增加,然后增加hFull的计数。
消费者等待hFull,直到hFull被激发。然后消耗iRes、增加hEmpty的计数。

接下来我们分析下:当消费者将资源消耗光的时候,生产者生产出资源,是如何自动唤醒消费者线程的。

Customer线程频繁执行,导致hFull计数被递减、hEmpty计数被递增,当hFull计数为0时,消费者线程组塞住了。
hFull为0,并不影响消费者线程的工作——它锁住的是hEmpty,然后生产资源、将hFull递增==》hFull转为激发态——Customer线程回复执行。

猜你喜欢

转载自blog.csdn.net/JohnnyMartin/article/details/80975075