Process Hacker
workqueue.c
Go to the documentation of this file.
1 /*
2  * Process Hacker -
3  * thread pool
4  *
5  * Copyright (C) 2009-2015 wj32
6  *
7  * This file is part of Process Hacker.
8  *
9  * Process Hacker is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * Process Hacker is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with Process Hacker. If not, see <http://www.gnu.org/licenses/>.
21  */
22 
23 #define _PH_WORKQUEUE_PRIVATE
24 #include <phbase.h>
25 #include <phintrnl.h>
26 
28  _Inout_ PPH_WORK_QUEUE WorkQueue
29  );
30 
32  _Inout_ PPH_WORK_QUEUE WorkQueue
33  );
34 
36  _In_ PVOID Parameter
37  );
38 
39 static PH_FREE_LIST PhWorkQueueItemFreeList;
40 static PH_WORK_QUEUE PhGlobalWorkQueue;
41 static PH_INITONCE PhGlobalWorkQueueInitOnce = PH_INITONCE_INIT;
42 #ifdef DEBUG
43 PPH_LIST PhDbgWorkQueueList;
44 PH_QUEUED_LOCK PhDbgWorkQueueListLock = PH_QUEUED_LOCK_INIT;
45 #endif
46 
48  VOID
49  )
50 {
51  PhInitializeFreeList(&PhWorkQueueItemFreeList, sizeof(PH_WORK_QUEUE_ITEM), 32);
52 
53 #ifdef DEBUG
54  PhDbgWorkQueueList = PhCreateList(4);
55 #endif
56 }
57 
59  _In_ PUSER_THREAD_START_ROUTINE Function,
60  _In_opt_ PVOID Context,
61  _In_opt_ PPH_WORK_QUEUE_ITEM_DELETE_FUNCTION DeleteFunction
62  )
63 {
64  PPH_WORK_QUEUE_ITEM workQueueItem;
65 
66  workQueueItem = PhAllocateFromFreeList(&PhWorkQueueItemFreeList);
67  workQueueItem->Function = Function;
68  workQueueItem->Context = Context;
69  workQueueItem->DeleteFunction = DeleteFunction;
70 
71  return workQueueItem;
72 }
73 
75  _In_ PPH_WORK_QUEUE_ITEM WorkQueueItem
76  )
77 {
78  if (WorkQueueItem->DeleteFunction)
79  WorkQueueItem->DeleteFunction(WorkQueueItem->Function, WorkQueueItem->Context);
80 
81  PhFreeToFreeList(&PhWorkQueueItemFreeList, WorkQueueItem);
82 }
83 
85  _Inout_ PPH_WORK_QUEUE_ITEM WorkQueueItem
86  )
87 {
88  WorkQueueItem->Function(WorkQueueItem->Context);
89 }
90 
102  _Out_ PPH_WORK_QUEUE WorkQueue,
103  _In_ ULONG MinimumThreads,
104  _In_ ULONG MaximumThreads,
105  _In_ ULONG NoWorkTimeout
106  )
107 {
108  PhInitializeRundownProtection(&WorkQueue->RundownProtect);
109  WorkQueue->Terminating = FALSE;
110 
111  InitializeListHead(&WorkQueue->QueueListHead);
112  PhInitializeQueuedLock(&WorkQueue->QueueLock);
113  PhInitializeQueuedLock(&WorkQueue->QueueEmptyCondition);
114 
115  WorkQueue->MinimumThreads = MinimumThreads;
116  WorkQueue->MaximumThreads = MaximumThreads;
117  WorkQueue->NoWorkTimeout = NoWorkTimeout;
118 
119  PhInitializeQueuedLock(&WorkQueue->StateLock);
120 
121  WorkQueue->SemaphoreHandle = NULL;
122  WorkQueue->CurrentThreads = 0;
123  WorkQueue->BusyCount = 0;
124 
125 #ifdef DEBUG
126  PhAcquireQueuedLockExclusive(&PhDbgWorkQueueListLock);
127  PhAddItemList(PhDbgWorkQueueList, WorkQueue);
128  PhReleaseQueuedLockExclusive(&PhDbgWorkQueueListLock);
129 #endif
130 }
131 
138  _Inout_ PPH_WORK_QUEUE WorkQueue
139  )
140 {
141  PLIST_ENTRY listEntry;
142  PPH_WORK_QUEUE_ITEM workQueueItem;
143 #ifdef DEBUG
144  ULONG index;
145 #endif
146 
147 #ifdef DEBUG
148  PhAcquireQueuedLockExclusive(&PhDbgWorkQueueListLock);
149  if ((index = PhFindItemList(PhDbgWorkQueueList, WorkQueue)) != -1)
150  PhRemoveItemList(PhDbgWorkQueueList, index);
151  PhReleaseQueuedLockExclusive(&PhDbgWorkQueueListLock);
152 #endif
153 
154  // Wait for all worker threads to exit.
155 
156  WorkQueue->Terminating = TRUE;
157  MemoryBarrier();
158 
159  if (WorkQueue->SemaphoreHandle)
160  NtReleaseSemaphore(WorkQueue->SemaphoreHandle, WorkQueue->CurrentThreads, NULL);
161 
162  PhWaitForRundownProtection(&WorkQueue->RundownProtect);
163 
164  // Free all un-executed work items.
165 
166  listEntry = WorkQueue->QueueListHead.Flink;
167 
168  while (listEntry != &WorkQueue->QueueListHead)
169  {
170  workQueueItem = CONTAINING_RECORD(listEntry, PH_WORK_QUEUE_ITEM, ListEntry);
171  listEntry = listEntry->Flink;
172  PhpDestroyWorkQueueItem(workQueueItem);
173  }
174 
175  if (WorkQueue->SemaphoreHandle)
176  NtClose(WorkQueue->SemaphoreHandle);
177 }
178 
185  _Inout_ PPH_WORK_QUEUE WorkQueue
186  )
187 {
188  PhAcquireQueuedLockExclusive(&WorkQueue->QueueLock);
189 
190  while (!IsListEmpty(&WorkQueue->QueueListHead))
191  PhWaitForCondition(&WorkQueue->QueueEmptyCondition, &WorkQueue->QueueLock, NULL);
192 
193  PhReleaseQueuedLockExclusive(&WorkQueue->QueueLock);
194 }
195 
197  _Inout_ PPH_WORK_QUEUE WorkQueue
198  )
199 {
200  HANDLE semaphoreHandle;
201 
202  semaphoreHandle = WorkQueue->SemaphoreHandle;
203 
204  if (!semaphoreHandle)
205  {
206  NtCreateSemaphore(&semaphoreHandle, SEMAPHORE_ALL_ACCESS, NULL, 0, MAXLONG);
207  assert(semaphoreHandle);
208 
210  &WorkQueue->SemaphoreHandle,
211  semaphoreHandle,
212  NULL
213  ) != NULL)
214  {
215  // Someone else created the semaphore before we did.
216  NtClose(semaphoreHandle);
217  semaphoreHandle = WorkQueue->SemaphoreHandle;
218  }
219  }
220 
221  return semaphoreHandle;
222 }
223 
225  _Inout_ PPH_WORK_QUEUE WorkQueue
226  )
227 {
228  HANDLE threadHandle;
229 
230  // Make sure the structure doesn't get deleted while the thread is running.
231  if (!PhAcquireRundownProtection(&WorkQueue->RundownProtect))
232  return FALSE;
233 
234  threadHandle = PhCreateThread(0, PhpWorkQueueThreadStart, WorkQueue);
235 
236  if (threadHandle)
237  {
238  PHLIB_INC_STATISTIC(WqWorkQueueThreadsCreated);
239  WorkQueue->CurrentThreads++;
240  NtClose(threadHandle);
241 
242  return TRUE;
243  }
244  else
245  {
246  PHLIB_INC_STATISTIC(WqWorkQueueThreadsCreateFailed);
247  PhReleaseRundownProtection(&WorkQueue->RundownProtect);
248  return FALSE;
249  }
250 }
251 
253  _In_ PVOID Parameter
254  )
255 {
256  PPH_WORK_QUEUE workQueue = (PPH_WORK_QUEUE)Parameter;
257 
258  while (TRUE)
259  {
260  NTSTATUS status;
261  HANDLE semaphoreHandle;
262  LARGE_INTEGER timeout;
263  PPH_WORK_QUEUE_ITEM workQueueItem = NULL;
264 
265  // Check if we have more threads than the limit.
266  if (workQueue->CurrentThreads > workQueue->MaximumThreads)
267  {
268  BOOLEAN terminate = FALSE;
269 
270  // Lock and re-check.
272 
273  // Check the minimum as well.
274  if (workQueue->CurrentThreads > workQueue->MaximumThreads &&
275  workQueue->CurrentThreads > workQueue->MinimumThreads)
276  {
277  workQueue->CurrentThreads--;
278  terminate = TRUE;
279  }
280 
282 
283  if (terminate)
284  break;
285  }
286 
287  semaphoreHandle = PhpGetSemaphoreWorkQueue(workQueue);
288 
289  if (!workQueue->Terminating)
290  {
291  // Wait for work.
292  status = NtWaitForSingleObject(
293  semaphoreHandle,
294  FALSE,
295  PhTimeoutFromMilliseconds(&timeout, workQueue->NoWorkTimeout)
296  );
297  }
298  else
299  {
300  status = STATUS_UNSUCCESSFUL;
301  }
302 
303  if (status == STATUS_WAIT_0 && !workQueue->Terminating)
304  {
305  PLIST_ENTRY listEntry;
306 
307  // Dequeue the work item.
308 
310 
311  listEntry = RemoveHeadList(&workQueue->QueueListHead);
312 
313  if (IsListEmpty(&workQueue->QueueListHead))
315 
317 
318  // Make sure we got work.
319  if (listEntry != &workQueue->QueueListHead)
320  {
321  workQueueItem = CONTAINING_RECORD(listEntry, PH_WORK_QUEUE_ITEM, ListEntry);
322 
323  PhpExecuteWorkQueueItem(workQueueItem);
324  _InterlockedDecrement(&workQueue->BusyCount);
325 
326  PhpDestroyWorkQueueItem(workQueueItem);
327  }
328  }
329  else
330  {
331  BOOLEAN terminate = FALSE;
332 
333  // No work arrived before the timeout passed, or we are terminating, or some error occurred.
334  // Terminate the thread.
335 
337 
338  if (workQueue->Terminating || workQueue->CurrentThreads > workQueue->MinimumThreads)
339  {
340  workQueue->CurrentThreads--;
341  terminate = TRUE;
342  }
343 
345 
346  if (terminate)
347  break;
348  }
349  }
350 
352 
353  return STATUS_SUCCESS;
354 }
355 
364  _Inout_ PPH_WORK_QUEUE WorkQueue,
365  _In_ PUSER_THREAD_START_ROUTINE Function,
366  _In_opt_ PVOID Context
367  )
368 {
369  PhQueueItemWorkQueueEx(WorkQueue, Function, Context, NULL);
370 }
371 
381  _Inout_ PPH_WORK_QUEUE WorkQueue,
382  _In_ PUSER_THREAD_START_ROUTINE Function,
383  _In_opt_ PVOID Context,
384  _In_opt_ PPH_WORK_QUEUE_ITEM_DELETE_FUNCTION DeleteFunction
385  )
386 {
387  PPH_WORK_QUEUE_ITEM workQueueItem;
388 
389  workQueueItem = PhpCreateWorkQueueItem(Function, Context, DeleteFunction);
390 
391  // Enqueue the work item.
392  PhAcquireQueuedLockExclusive(&WorkQueue->QueueLock);
393  InsertTailList(&WorkQueue->QueueListHead, &workQueueItem->ListEntry);
394  _InterlockedIncrement(&WorkQueue->BusyCount);
395  PhReleaseQueuedLockExclusive(&WorkQueue->QueueLock);
396  // Signal the semaphore once to let a worker thread continue.
397  NtReleaseSemaphore(PhpGetSemaphoreWorkQueue(WorkQueue), 1, NULL);
398 
399  PHLIB_INC_STATISTIC(WqWorkItemsQueued);
400 
401  // Check if all worker threads are currently busy, and if we can create more threads.
402  if (WorkQueue->BusyCount >= WorkQueue->CurrentThreads &&
403  WorkQueue->CurrentThreads < WorkQueue->MaximumThreads)
404  {
405  // Lock and re-check.
406  PhAcquireQueuedLockExclusive(&WorkQueue->StateLock);
407 
408  if (WorkQueue->CurrentThreads < WorkQueue->MaximumThreads)
409  PhpCreateWorkQueueThread(WorkQueue);
410 
411  PhReleaseQueuedLockExclusive(&WorkQueue->StateLock);
412  }
413 }
414 
422  _In_ PUSER_THREAD_START_ROUTINE Function,
423  _In_opt_ PVOID Context
424  )
425 {
426  if (PhBeginInitOnce(&PhGlobalWorkQueueInitOnce))
427  {
429  &PhGlobalWorkQueue,
430  0,
431  3,
432  1000
433  );
434  PhEndInitOnce(&PhGlobalWorkQueueInitOnce);
435  }
436 
438  &PhGlobalWorkQueue,
439  Function,
440  Context
441  );
442 }