a亚洲精品_精品国产91乱码一区二区三区_亚洲精品在线免费观看视频_欧美日韩亚洲国产综合_久久久久久久久久久成人_在线区

首頁 > 編程 > C# > 正文

深入多線程之:深入生產(chǎn)者、消費(fèi)者隊(duì)列分析

2020-01-24 03:19:41
字體:
供稿:網(wǎng)友

上次我們使用AutoResetEvent實(shí)現(xiàn)了一個生產(chǎn)/消費(fèi)者隊(duì)列。這一次我們要使用Wait和Pulse方法來實(shí)現(xiàn)一個更強(qiáng)大的版本,它允許多個消費(fèi)者,每一個消費(fèi)者都在自己的線程中運(yùn)行。

我們使用數(shù)組來跟蹤線程。

Thread[] _workers;

通過跟蹤線程可以讓我們在所有的線程都結(jié)束后再結(jié)束我們的隊(duì)列任務(wù)。

每一個消費(fèi)者線程都執(zhí)行一個叫做Consume的方法,在一個for循環(huán)中,我們可以創(chuàng)建和啟動線程。例如:

復(fù)制代碼 代碼如下:

       public PCQueue(int workerCount)
        {
            _workers = new Thread[workerCount];
            for (int i = 0; i < workerCount; i++)
                (_workers[i] = new Thread(Consume)).Start();
        }

上次我們使用的是一個字符串來代表任務(wù),這次我們使用Action委托,它的定義如下:

Public delegate void Action();

為了表示一系列的任務(wù),我們使用Queue<T> 集合,例如:

Queue<Action> _itemQ = new Queue<Action>();

在我們調(diào)用生產(chǎn)(EnqueueItem)和消費(fèi)(Consume)方法前,還是完整的看一看代碼吧:

復(fù)制代碼 代碼如下:

class PCQueue
    {
        readonly object _locker = new object();
        Thread[] _workers;
        Queue<Action> _itemQ = new Queue<Action>(); //保存任務(wù)的隊(duì)列
        public PCQueue(int workerCount)
        {
            _workers = new Thread[workerCount];
            for (int i = 0; i < workerCount; i++)
                (_workers[i] = new Thread(Consume)).Start();
        }

        public void Shutdown(bool waitForWorkers)
        {
           //為每一個線程插入一個null item,可以是每一個worker 退出
            foreach (Thread worker in _workers)
                EnqueueItem(null);

           //等待所有的線程退出。
            if (waitForWorkers)
                foreach (Thread worker in _workers)
                    worker.Join();
        }

        public void EnqueueItem(Action item)
        {
            lock (_locker)
            {
                _itemQ.Enqueue(item);
                Monitor.Pulse(_locker); //通知等待隊(duì)列中的線程
            }
        }

        void Consume()
        {
            while (true)
            {
                Action item;
                lock (_locker)
                {
                    while (_itemQ.Count == 0)
                    {
                        Monitor.Wait(_locker); //釋放鎖,并阻止當(dāng)前線程,直到其他線程發(fā)送pulse信號。                    }
                    item = _itemQ.Dequeue();
                }

                if (item == null) return; //退出的信號
                item();
            }
        }
    }


我們可以有一個退出策略,插入一個null item作為consumer退出的信號。如果我們想要快速的退出,可以使用一個獨(dú)立的”cancel” 標(biāo)記,因?yàn)槲覀冎С侄鄠€consumers,所以我們必須為每一個consumer插入一個null item。

下面是Main方法。使用兩個consumer線程,然后讓這兩個consumers執(zhí)行10個委托

復(fù)制代碼 代碼如下:

public static void Main()
        {
            PCQueue q = new PCQueue(2);
            Console.WriteLine("Enqueuing 10 items...");

            for (int i = 0; i < 10; i++)
            {
                int itemNumber = i;
                q.EnqueueItem(() =>
                    {
                        Thread.Sleep(1000); //模擬耗時的工作
                        Console.WriteLine(" Task " + itemNumber);
                    });
            }

            q.Shutdown(true); //等待關(guān)閉
            Console.WriteLine();
            Console.WriteLine("Workers complete!");
        }

下面讓我們細(xì)致的看一看EnqueueItem方法:

復(fù)制代碼 代碼如下:

public void EnqueueItem(Action item)
        {
            lock (_locker)
            {
                _itemQ.Enqueue(item);
                Monitor.Pulse(_locker); //通知等待隊(duì)列中的線程
            }
        }

因?yàn)槲覀兊年?duì)列_itemQ被多線程環(huán)境使用,因此在對_itemQ進(jìn)行讀取的時候需要加鎖lock.

因?yàn)槲覀儾迦肓艘粋€新的任務(wù),我們必須修改阻塞條件,也就是調(diào)用pulse方法,來喚醒調(diào)用了wait方法的線程。


出于對效率的考慮,當(dāng)插入一個Item的時候使用Pulse來代替PulseAll方法,因?yàn)榇蟛糠謺r候每一個Item只需要一個consumer來執(zhí)行。如果你有一個冰淇淋,你不可能叫30個睡眠的孩子都起來吃它,同樣,對于一個item,同時喚醒30個consumers一點(diǎn)好處都沒有。


讓我們再看看Consumer方法。

我們希望當(dāng)沒什么事情做的時候,線程阻塞就可以了,換句話說,隊(duì)列中沒有item的時候,線程就應(yīng)該阻塞。因此我們的阻塞條件是_itemQ.Count ==0;

復(fù)制代碼 代碼如下:

Action item;
                lock (_locker)
                {
                    while (_itemQ.Count == 0)
                    {
                        Monitor.Wait(_locker); //釋放鎖,并阻止當(dāng)前線程,直到其他線程發(fā)送pulse信號。                    }
                    item = _itemQ.Dequeue();
                }

                if (item == null) return; //退出的信號
                item();


while循環(huán)退出的時候也意味著_itemQ 至少有一個item。我們必須在釋放鎖之前調(diào)用你哦個dequeue方法來獲取item,考慮下下面的代碼:
復(fù)制代碼 代碼如下:

lock (_locker)
                {
                    while (_itemQ.Count == 0)
                    {
                        Monitor.Wait(_locker); //釋放鎖,并阻止當(dāng)前線程,直到其他線程發(fā)送pulse信號。                    }
                }
                //現(xiàn)在在這里可能被搶占,_itemQ可能被修改
                lock (_locker)
                {
                    item = _itemQ.Dequeue();
                }

在item被Dequeued后,我們就應(yīng)該立即釋放鎖了,如果我們在執(zhí)行task的時候,一直持有鎖,則會沒有必要的阻塞其他線程來獲取任務(wù)。


Wait Timeouts

在調(diào)用Wait方法的時候可以傳遞一個毫秒或Timespan的時間來設(shè)置超時。如果Wait超時了,那么Wait方法就會返回false。

帶有超時功能的Wait方法的主要步驟:

    釋放鎖。
    阻塞 直到 pulsed 或者超時。
    重新獲取鎖。

超時就好像CLR 在超時到了的時候自動的調(diào)用了 pulse方法一樣。

下面是使用超時的Wait的主要代碼:

         lock(_locker)

         while(<阻塞條件>)

                   Monitor.Wait(_locker,<超時時間>);


Monitor.Wait 方法返回一個bool值來代表是調(diào)用了pulse還是已經(jīng)超時了。

如果是true: 代表調(diào)用了pulse。

如果是false:代表超時了。

這對記錄日志很有用。

發(fā)表評論 共有條評論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 精品一二区 | 精品无码久久久久国产 | 国产精品久久久久久久7电影 | 精品久久久久久久久久久 | 欧美精品福利视频 | 国产乱码精品一区二区 | 日韩欧美中文国 | 久久另类ts人妖一区二区 | 不卡久久 | av成人在线观看 | 国产不卡一 | 一级在线免费视频 | 一区二区三区高清 | 欧美日韩三区 | 成人a视频在线观看 | 狠狠躁夜夜躁人人爽天天天天97 | 欧美国产日韩在线观看 | 欧美日韩久久精品 | 91hd精品少妇| 久久久久久久国产 | 最近免费中文字幕在线视频2 | 亚洲最新中文字幕 | 久久99精品视频 | 欧美三级一区 | 免费黄色污网站 | 天堂精品一区 | 国产福利一区二区三区四区 | 国产成人在线视频 | 特级黄一级播放 | 成人精品视频 | av一区在线观看 | 免费三级网 | 草久网 | 午夜免费影视 | 亚洲一区二区av | 国产精品成人一区二区三区夜夜夜 | 成人欧美一区二区三区黑人 | 国产一级片免费观看 | 国产高清一级 | 日日夜夜免费精品视频 | 欧美日韩一区二区在线 |