條件變量是線程之前同步的另一種機(jī)制。條件變量給多線程提供了一種會(huì)和的場(chǎng)所。當(dāng)條件變量和互斥鎖一起使用時(shí),允許線程以無競(jìng)爭(zhēng)的方式等待特定的條件發(fā)生。這樣大大減少了鎖競(jìng)爭(zhēng)引起的線程調(diào)度和線程等待。
消息隊(duì)列是服務(wù)器端開發(fā)過程中繞不開的一道坎,前面,我已經(jīng)實(shí)現(xiàn)了一個(gè)基于互斥鎖和三隊(duì)列的消息隊(duì)列,性能很不錯(cuò)。博客園中的其他園主也實(shí)現(xiàn)了很多基于環(huán)形隊(duì)列和lock-free的消息隊(duì)列,很不錯(cuò),今天我們將要實(shí)現(xiàn)一個(gè)基于雙緩沖、互斥鎖和條件變量的消息隊(duì)列;這個(gè)大概也參考了一下java的blockingqueue,在前面一個(gè)博客中有簡(jiǎn)單介紹!!基于三緩沖的隊(duì)列,雖然最大限度上解除了線程競(jìng)爭(zhēng),但是在玩家很少,消息很小的時(shí)候,需要添加一些buff去填充數(shù)據(jù),這大概也是其一個(gè)缺陷吧!
消息隊(duì)列在服務(wù)器開發(fā)過程中主要用于什么對(duì)象呢?
1: 我想大概就是通信層和邏輯層之間的交互,通信層接受到的網(wǎng)絡(luò)數(shù)據(jù),驗(yàn)證封包之后,通過消息隊(duì)列傳遞給邏輯層,邏輯層將處理結(jié)果封包再傳遞給通信層!
2:邏輯線程和數(shù)據(jù)庫IO線程的分離;數(shù)據(jù)庫IO線程負(fù)責(zé)對(duì)數(shù)據(jù)庫的讀寫更新,邏輯層對(duì)數(shù)據(jù)庫的操作,封裝成消息去請(qǐng)求數(shù)據(jù)庫IO線程,數(shù)據(jù)庫IO線程處理完之后,再交回給邏輯層。
3:日志;處理模式與方式2 類似。不過日志大概是不需要返回的!
給出源代碼:
BlockingQueue.h文件
/*
* BlockingQueue.h
*
* Created on: Apr 19, 2013
* Author: archy_yu
*/
#ifndef BLOCKINGQUEUE_H_
#define BLOCKINGQUEUE_H_
#include <queue>
#include <pthread.h>
typedef void* CommonItem;
class BlockingQueue
{
public:
BlockingQueue();
virtual ~BlockingQueue();
int peek(CommonItem &item);
int append(CommonItem item);
private:
pthread_mutex_t _mutex;
pthread_cond_t _cond;
std::queue<CommonItem> _read_queue;
std::queue<CommonItem> _write_queue;
};
#endif /* BLOCKINGQUEUE_H_ */
BlockingQueue.cpp 文件代碼
/*
* BlockingQueue.cpp
*
* Created on: Apr 19, 2013
* Author: archy_yu
*/
#include "BlockingQueue.h"
BlockingQueue::BlockingQueue()
{
pthread_mutex_init(&this->_mutex,NULL);
pthread_cond_init(&this->_cond,NULL);
}
BlockingQueue::~BlockingQueue()
{
pthread_mutex_destroy(&this->_mutex);
pthread_cond_destroy(&this->_cond);
}
int BlockingQueue::peek(CommonItem &item)
{
if( !this->_read_queue.empty() )
{
item = this->_read_queue.front();
this->_read_queue.pop();
}
else
{
pthread_mutex_lock(&this->_mutex);
while(this->_write_queue.empty())
{
pthread_cond_wait(&this->_cond,&this->_mutex);
}
while(!this->_write_queue.empty())
{
this->_read_queue.push(this->_write_queue.front());
this->_write_queue.pop();
}
pthread_mutex_unlock(&this->_mutex);
}
return 0;
}
int BlockingQueue::append(CommonItem item)
{
pthread_mutex_lock(&this->_mutex);
this->_write_queue.push(item);
pthread_cond_signal(&this->_cond);
pthread_mutex_unlock(&this->_mutex);
return 0;
}
測(cè)試代碼:
BlockingQueue _queue;
void* process(void* arg)
{
int i=0;
while(true)
{
int *j = new int();
*j = i;
_queue.append((void *)j);
i ++;
}
return NULL;
}
int main(int argc,char** argv)
{
pthread_t pid;
pthread_create(&pid,0,process,0);
long long int start = get_os_system_time();
int i = 0;
while(true)
{
int* j = NULL;
_queue.peek((void* &)j);
i ++;
if(j != NULL && (*j) == 100000)
{
long long int end = get_os_system_time();
printf("consume %d/n",end - start);
break;
}
}
return 0;
}