parQHV
Compute HyperVolumes using threads
 All Data Structures Files Functions Variables Macros
tpool.c
1 /*
2  *
3  * Copyright (c) Year(s), 2013, Luis M. S. Russo and Alexandre
4  * P. Francisco / KDBIO / INESC-ID, <qhv@kdbio.inesc-id.pt>
5  *
6  * Any published media that is related with to use of the distributed
7  * software, or derived software, must contain a reference to "Extending
8  * quick hypervolume. Luís M. S. Russo, Alexandre P. Francisco:
9  * J. Heuristics 22(3): 245-271 (2016)".
10  *
11  * Permission to use, copy, modify, and/or distribute this software for
12  * any purpose with or without fee is hereby granted, provided that the
13  * above copyright notice and this permission notice appear in all
14  * copies.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
17  * WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
19  * AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL
20  * DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR
21  * PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
22  * TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
23  * PERFORMANCE OF THIS SOFTWARE.
24  *
25  */
26 
27 #include <stdlib.h>
28 #include <pthread.h>
29 #include <stdio.h>
30 #include <assert.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <sys/types.h> /* basic system data types */
34 #include <limits.h> /* PIPE_BUF */
35 #include <string.h>
36 
37 #include "tpool.h"
38 
39 /* This is the maximal number of cores in PC */
40 #ifndef CORES
41 #define CORES 64
42 #endif /* CORES */
43 
44 /* Number of dimensions import from existing code */
45 
46 /* Maximum number of simultaneous jobs */
47 #define SIMJs (CORES+1)
48 /* Maximum number of simultaneous tasks */
49 /* The last SIMJs, guarantees a task per job. */
50 #define SIMTs (4*SIMJs)
51 
53 struct task {
56  int jid;
57  taskData d;
58 };
59 
60 typedef struct task* task;
61 
63 enum Qstate {active, stopped};
64 
66 struct fns{
67  void (*run)(toolBox* b, taskData* d, const jobData* jd);
68  void (*openBox)(toolBox* b);
69  void (*closeBox)(toolBox* b);
70  int (*priority)(taskData* d);
71  void (*jobCopy)(jobData* des, jobData* src);
72  void (*taskCopy)(taskData* dst, taskData* src);
73  void (*merge)(jobData* dst, taskData* src);
74  void (*publish)(int jid, jobData* jd);
75 };
76 
78 struct jobQueue{
79  pthread_mutex_t* mutex;
80  pthread_cond_t cond;
81  int head;
82  int tail;
83  int C[SIMJs];
84  jobData R[SIMJs];
85 };
86 
87 struct taskQueue {
88  pthread_mutex_t mutex;
89  pthread_cond_t cond;
90  pthread_cond_t mem;
91  int waiters;
92  int minGrade;
93  enum Qstate s;
94  int memDrop;
96  int memGet;
97  int head;
98  int tail;
99  int heavyTail;
100  task A[SIMTs];
101 };
102 
103 static struct fns F;
104 static struct taskQueue Q = {
105  PTHREAD_MUTEX_INITIALIZER,
106  PTHREAD_COND_INITIALIZER,
107  PTHREAD_COND_INITIALIZER
108 };
109 static struct jobQueue Jq = {
110  &(Q.mutex),
111  PTHREAD_COND_INITIALIZER
112 };
113 
114 /* An array of tool boxes, one for each worker. */
115 static toolBox toodles[CORES];
116 
117 /* static void printState(void); */
118 
120 
125 static int activeQ(void);
126 
127 /* #define pthread_mutex_lock(A) pthread_mutex_lock(A); printf("mutex Locked\n"); printState(); */
128 /* #define pthread_mutex_unlock(A) pthread_mutex_unlock(A); printf("mutex UnLocked\n") */
129 
135 static void halt(void)
136 {
137  int actv;
138 
139  pthread_mutex_lock(&Q.mutex);
140  Q.s = stopped;
141  actv = !activeQ();
142  pthread_mutex_unlock(&Q.mutex);
143 
144  if(actv)
145  pthread_cond_broadcast(&Q.cond); /* wake sleeping threads. */
146 }
147 
149 
154 static int resources(void)
155 {
156  return ((Q.memGet - Q.heavyTail)*4)/SIMTs;
157 }
158 
163 static int tasksN(void)
164 {
165  return Q.head - Q.tail;
166 }
167 
172 static int jobsN(void)
173 {
174  return Jq.head - Jq.tail;
175 }
176 
181 static int activeQ(void)
182 {
183  return (active == Q.s) || (Jq.tail < Jq.head);
184 }
185 
186 /* static void printState(void) */
187 /* { */
188 /* printf("Q info: %d jobs, %d tasks, %d waiters\n", jobsN(), tasksN(), Q.waiters ); */
189 /* } */
190 
196 static task pop(void)
197 {
198  task t;
199 
200  assert(tasksN() > 0);
201 
202  t = Q.A[Q.tail % SIMTs];
203  Q.A[Q.tail % SIMTs] = NULL;
204  Q.tail++;
205 
206  return t;
207 }
208 
217 static int submit(int jid, taskData* d)
218 {
219  int signal;
220 
222  assert(Jq.tail <= jid && jid <= Jq.head);
223 
224  (*F.merge)(&(Jq.R[jid % SIMJs]), d);
225  /* Jq.R[jid % SIMJs]+=r; */
226  Jq.C[jid % SIMJs]--;
228  signal = (jobsN() == SIMJs && 0 == Jq.C[Jq.tail % SIMJs]);
229 
230  while(jobsN() > 0 && 0 == Jq.C[Jq.tail % SIMJs])
231  {
232  (*F.publish)(Jq.tail, &(Jq.R[Jq.tail % SIMJs]));
233  /* Do not clean up, jobs are initialized */
234  Jq.tail++;
235  }
236 
237  return signal;
238 }
239 
247 static int recycle(task t)
248 {
249  int signal;
250 
251  signal = (resources() == 4);
253  Q.A[Q.memDrop % SIMTs] = t;
254  Q.memDrop++;
255  Q.heavyTail++;
257  assert(Q.heavyTail <= Q.tail);
258 
259  return signal;
260 }
261 
269 static task queryTs(int p)
270 {
271  task t;
272 
273  t = NULL;
274 
275  /* printf("Admission criteria: Q.minGrade %d < p %d\n", Q.minGrade, p); */
276 
277  if(Q.minGrade < p)
278  { /* Hurray, you passed task admission */
279  Q.minGrade += resources(); /* Complain about lack of resources */
280  if(resources() < 4)
281  { /* Huff, you still got the resource. */
282  t = Q.A[Q.memGet % SIMTs];
283  Q.memGet++;
284 
285  if(Q.memGet > Q.memDrop) /* But need to alloc it. */
286  {
287  Q.memDrop++;
288  t = (task) calloc(1, sizeof(struct task));
289  }
290 
291  assert(Q.memGet <= Q.memDrop);
292  }
293  }
294  else /* We regret to inform you ... bla bla ... no task admission. */
295  Q.minGrade += resources() - 3; /* Complain about waisted resources */
296 
297  return t;
298 }
299 
312 static int post(int pri, int jid, taskData* d)
313 {
314  int r;
315  task t;
316  int signal;
317  int p;
319  /* printf("Trying to post task\n"); */
320  /* printState(); */
321 
322  signal = 0;
323 
324  p = (*F.priority)(d);
325  if(0 == pri)
326  p = INT_MAX;
327  t = queryTs(p);
328 
329  if(0 == pri)
330  {
331  while(t == NULL)
332  { /* LOOP until there is room to post task */
333  pthread_cond_wait(&Q.mem, &Q.mutex);
334  t = queryTs(p);
335  }
336  }
337 
338  r = (t != NULL);
339  if(r)
340  {
341  /* printf("Success post\n"); */
342  t->jid = jid;
343  Jq.C[t->jid % SIMJs]++;
344  pthread_mutex_unlock(&Q.mutex);
345  (*F.taskCopy)(&(t->d), d);
346  t->b = NULL;
348  pthread_mutex_lock(&Q.mutex);
349  signal = (activeQ() && Q.waiters > 0);
350  Q.A[Q.head % SIMTs] = t;
351  Q.head++;
352 
353  assert(Q.head <= Q.memGet);
354  assert(Q.memGet <= Q.memDrop);
355  }
356  pthread_mutex_unlock(&Q.mutex); /* Also releases by default */
357 
358  if(signal)
359  pthread_cond_signal(&Q.cond); /* wake sleeping threads. */
360 
361  return r;
362 }
363 
370 void postJob(jobData* jd, taskData* d)
371 {
372  int jid;
373 
374  pthread_mutex_lock(&Q.mutex);
375 
376  while(jobsN() == SIMJs) /* Job queue is full */
377  pthread_cond_wait(&Jq.cond, Jq.mutex);
378 
379  jid = Jq.head;
380  Jq.head++; /* Clean space is available, just update counter */
381 
382  (*F.jobCopy)(&(Jq.R[jid % SIMJs]), jd);
383 
384  post(0, jid, d);
385 }
386 
387 void prun(taskData* d, int p)
388 {
389  task t;
390 
391  t = (task) container_of(d, struct task, d); /* The d's mean diferent things. */
392 
393  /* Assuming taskData actually is in a task */
394  if(p)
395  { /* Try to Parallelize the computation */
396  pthread_mutex_lock(&Q.mutex);
397  if(!post(1, t->jid, d)) /* Not crucial, use task priority */
398  (*(F.run))(t->b, d, &(Jq.R[t->jid % SIMJs]));
399  }
400  else
401  (*(F.run))(t->b, d, &(Jq.R[t->jid % SIMJs]));
402 }
403 
404 void* worker(void *threadid)
405 {
406  task t;
407  int tid;
409  int signalMem;
410  int signalJob;
411  int signalDismiss;
412  toolBox* b;
413 
414  tid = *((int*)threadid);
415 
416  b = &(toodles[tid-1]);
417  (*F.openBox)(b);
419  do
420  {
421  signalJob = 0;
422  signalMem = 0;
423  signalDismiss = 0;
424 
425  pthread_mutex_lock(&Q.mutex);
426  while(activeQ() && 0 == tasksN())
427  {
428  Q.waiters++;
429  pthread_cond_wait(&Q.cond, &Q.mutex);
430  Q.waiters--;
431  /* printf("Got a Job\n"); */
432  /* printState(); */
433  }
434 
435  if(activeQ())
436  { /* Otherwise stop worker */
437  t = pop();
438  pthread_mutex_unlock(&Q.mutex);
439  t->b = b;
440  (*F.run)(t->b, &(t->d), &(Jq.R[t->jid % SIMJs]));
442  pthread_mutex_lock(&Q.mutex);
443  signalJob = submit(t->jid, &(t->d));
444  signalMem = recycle(t);
445  signalDismiss = !activeQ();
446  }
447  pthread_mutex_unlock(&Q.mutex);
448 
449  /* Signal events */
450  if(signalJob)
451  pthread_cond_signal(&Jq.cond); /* wake sleeping threads. */
452  if(signalMem)
453  pthread_cond_signal(&Q.mem); /* wake sleeping threads. */
454  if(signalDismiss)
455  pthread_cond_broadcast(&Q.cond); /* wake sleeping threads. */
456  }
457  while(activeQ());
458 
459  (*F.closeBox)(b);
461  pthread_exit(NULL);
462 }
463 
464 void launch(
465  int w,
466  void* (*console)(void *threadid),
467  void (*run)(toolBox* b, taskData* d, const jobData* jd),
468  void (*openBox)(toolBox* b),
469  void (*closeBox)(toolBox* b),
470  int (*priority)(taskData* d),
471  void (*jobCopy)(jobData* des, jobData* src),
472  void (*taskCopy)(taskData* dst, taskData* src),
473  void (*merge)(jobData* dst, taskData* src),
474  void (*publish)(int jid, jobData* jd)
475  )
476 {
477  int i;
478  pthread_t wths[CORES];
479  pthread_t cthread;
480  int tids[CORES];
481 
482  F.run = run;
483  F.openBox = openBox;
484  F.closeBox = closeBox;
485  F.priority = priority;
486  F.jobCopy = jobCopy;
487  F.taskCopy = taskCopy;
488  F.merge = merge;
489  F.publish = publish;
490 
491  if(CORES < w)
492  w = CORES;
493  if(1 > w)
494  w = 1;
495 
496  /* start all the worker threads */
497  printf("Using %d threads\n", w);
498  /* toodles = (toolBox*) calloc(w, sizeof(toolBox)); */
499  memset(toodles, 0x0, w*sizeof(toolBox));
500 
501  i = 0;
502  while(i < w)
503  {
504  tids[i] = i+1;
505  pthread_create(&wths[i], NULL, worker, &tids[i]);
506  i++;
507  }
508 
509  /* Create console only after you have the workers */
510 
511  pthread_create(&cthread, NULL, console, NULL);
512  pthread_join(cthread, NULL);
513  halt();
514 
515  /* Afther console finishes wait for workers. Forced to finish by halt. */
516 
517  i = 0;
518  while(i < w)
519  {
520  pthread_join(wths[i], NULL);
521  i++;
522  }
523 
524  i = 0;
525  while(i < SIMTs)
526  {
527  if(NULL != Q.A[i])
528  free(Q.A[i]);
529  i++;
530  }
531 
532  /* free(toodles); */
533 }
pthread_cond_t cond
Definition: tpool.c:89
pthread_mutex_t mutex
Definition: tpool.c:88
task A[SIMTs]
Definition: tpool.c:100
int jid
Definition: tpool.c:56
Pointers to functions.
Definition: tpool.c:66
int head
Definition: tpool.c:97
int(* priority)(taskData *d)
Definition: tpool.c:70
enum Qstate s
Definition: tpool.c:93
int C[SIMJs]
Definition: tpool.c:83
int tail
Definition: tpool.c:98
toolBox * b
Pass job Id to thread.
Definition: tpool.c:55
int waiters
Definition: tpool.c:91
Definition: tdata.h:48
int head
Definition: tpool.c:81
Definition: tdata.h:80
pthread_mutex_t * mutex
Definition: tpool.c:79
int tail
Definition: tpool.c:82
SIMJs is queue size.
Definition: tpool.c:78
jobData R[SIMJs]
Definition: tpool.c:84
Structure that stores processable information of a task.
Definition: tpool.c:53
int heavyTail
Definition: tpool.c:99
Definition: tdata.h:72
int memGet
Definition: tpool.c:96
pthread_cond_t cond
Definition: tpool.c:80
pthread_cond_t mem
Definition: tpool.c:90
int minGrade
Definition: tpool.c:92
int memDrop
The array is organized in the following order:
Definition: tpool.c:95