redis的多執行緒
目錄
- redis的三個執行緒主要是做什麼
-
- pthread_cond_wait原理
主要看下redis是怎麼使用多執行緒的
先說明下redis也是多執行緒的.但是redis的主執行緒處理業務.而其他三個執行緒跟主要功能是關係不到的
redis的三個執行緒主要是做什麼
初始化入口
void initServer(void) { ... bioInit(); ... }
初始化後redis其他後臺執行緒.
void bioInit(void) { pthread_attr_t attr; pthread_t thread; size_t stacksize; int j; /* Initialization of state vars and objects * * 初始化 job 佇列,以及執行緒狀態 */ for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { pthread_mutex_init(&bio_mutex[j],NULL); pthread_cond_init(&bio_condvar[j],NULL); bio_jobs[j] = listCreate(); bio_pending[j] = 0; } /* Set the stack size as by default it may be small in some system * * 設定棧大小 */ pthread_attr_init(&attr); pthread_attr_getstacksize(&attr,&stacksize); if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; pthread_attr_setstacksize(&attr, stacksize); /* Ready to spawn our threads. We use the single argument the thread * function accepts in order to pass the job ID the thread is * responsible of. * * 建立執行緒 */ for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { void *arg = (void*)(unsigned long) j; if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs."); exit(1); } bio_threads[j] = thread; } }
初始化三類執行緒. 這三類執行緒被認為是後臺執行.不影響主執行緒
- BIO_CLOSE_FILE . 關閉重寫之前的aof檔案.
- BIO_AOF_FSYNC . 定時重新整理資料到磁碟上.
- BIO_LAZY_FREE . 惰性刪除過期時間資料
redis為了保證其高效.一些比較耗時的動作會起執行緒或者程序來完成.不會阻塞在業務主執行緒上.
使用多執行緒的特點
- 建立3個執行緒.這個三個執行緒的功能互不影響
- 每個執行緒都有一個工作佇列.主執行緒生產任務放到任務隊裡.這三個執行緒消費這些任務.
- 任務佇列和取出消費的時候都得加鎖.防止競爭
- 使用條件變數來等待任務.以及通知
// 存放工作的佇列 static list *bio_jobs[REDIS_BIO_NUM_OPS];
bio_jobs是一個雙端連結串列結構
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { struct bio_job *job = zmalloc(sizeof(*job)); job->time = time(NULL); job->arg1 = arg1; job->arg2 = arg2; job->arg3 = arg3; pthread_mutex_lock(&bio_mutex[type]); // 將新工作推入佇列 listAddNodeTail(bio_jobs[type],job); bio_pending[type]++; pthread_cond_signal(&bio_condvar[type]); pthread_mutex_unlock(&bio_mutex[type]); }
當有任務的時候.先把任務丟到redis工作佇列裡.這裡記得加鎖
void *bioProcessBackgroundJobs(void *arg) { struct bio_job *job; unsigned long type = (unsigned long) arg; sigset_t sigset; /* Make the thread killable at any time, so that bioKillThreads() * can work reliably. */ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); pthread_mutex_lock(&bio_mutex[type]); /* Block SIGALRM so we are sure that only the main thread will * receive the watchdog signal. */ sigemptyset(&sigset); sigaddset(&sigset, SIGALRM); if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) redisLog(REDIS_WARNING, "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno)); while(1) { listNode *ln; /* The loop always starts with the lock hold. */ if (listLength(bio_jobs[type]) == 0) { pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]); continue; } /* Pop the job from the queue. * * 取出(但不刪除)佇列中的首個任務 */ ln = listFirst(bio_jobs[type]); job = ln->value; /* It is now possible to unlock the background system as we know have * a stand alone job structure to process.*/ pthread_mutex_unlock(&bio_mutex[type]); /* Process the job accordingly to its type. */ // 執行任務 if (type == REDIS_BIO_CLOSE_FILE) { close((long)job->arg1); } else if (type == REDIS_BIO_AOF_FSYNC) { aof_fsync((long)job->arg1); } else { redisPanic("Wrong job type in bioProcessBackgroundJobs()."); } zfree(job); /* Lock again before reiterating the loop, if there are no longer * jobs to process we'll block again in pthread_cond_wait(). */ pthread_mutex_lock(&bio_mutex[type]); // 將執行完成的任務從佇列中刪除,並減少任務計數器 listDelNode(bio_jobs[type],ln); bio_pending[type]--; } }
- 操作前先上鎖
- 從工作任務裡取任務
- 解鎖
- 執行業務邏輯
- 執行完上鎖.重新pthread_cond_wait
條件變數
條件變數是利用執行緒間共享的全域性變數進行同步的一種機制,主要包括兩個動作:
- 一個執行緒等待"條件變數的條件成立"而掛起;
- 另一個執行緒使"條件成立"(給出條件成立信號)。
==為了防止競爭,條件變數的使用總是和一個互斥鎖結合在一起==
pthread_cond_wait原理
就是說pthread_cond_wait(pthread_cond_tcond, pthread_mutex_t mutex)函式傳入的引數mutex用於保護條件,因為我們在呼叫pthread_cond_wait時,如果條件不成立我們就進入阻塞,但是進入阻塞這個期間,如果條件變數改變了的話,那我們就漏掉了這個條件。因為這個執行緒還沒有放到等待佇列上,所以呼叫pthread_cond_wait前要先鎖互斥量,即呼叫pthread_mutex_lock()。
==pthread_cond_wait在把執行緒放進阻塞佇列後,自動對mutex進行解鎖,使得其它執行緒可以獲得加鎖的權利。這樣其它執行緒才能對臨界資源進行訪問並在適當的時候喚醒這個阻塞的程序。當pthread_cond_wait返回的時候又自動給mutex加鎖==