33 #include <sys/types.h>
47 #define SIMJs (CORES+1)
50 #define SIMTs (4*SIMJs)
63 enum Qstate {active, stopped};
74 void (*publish)(
int jid,
jobData* jd);
105 PTHREAD_MUTEX_INITIALIZER,
106 PTHREAD_COND_INITIALIZER,
107 PTHREAD_COND_INITIALIZER
111 PTHREAD_COND_INITIALIZER
125 static int activeQ(
void);
135 static void halt(
void)
139 pthread_mutex_lock(&Q.
mutex);
142 pthread_mutex_unlock(&Q.
mutex);
145 pthread_cond_broadcast(&Q.
cond);
154 static int resources(
void)
163 static int tasksN(
void)
172 static int jobsN(
void)
181 static int activeQ(
void)
183 return (active == Q.
s) || (Jq.
tail < Jq.
head);
196 static task pop(
void)
200 assert(tasksN() > 0);
202 t = Q.
A[Q.
tail % SIMTs];
203 Q.
A[Q.
tail % SIMTs] = NULL;
217 static int submit(
int jid,
taskData* d)
222 assert(Jq.
tail <= jid && jid <= Jq.
head);
224 (*F.merge)(&(Jq.
R[jid % SIMJs]), d);
228 signal = (jobsN() == SIMJs && 0 == Jq.
C[Jq.
tail % SIMJs]);
230 while(jobsN() > 0 && 0 == Jq.
C[Jq.
tail % SIMJs])
232 (*F.publish)(Jq.
tail, &(Jq.
R[Jq.
tail % SIMJs]));
247 static int recycle(task t)
251 signal = (resources() == 4);
269 static task queryTs(
int p)
288 t = (task) calloc(1,
sizeof(
struct task));
312 static int post(
int pri,
int jid,
taskData* d)
333 pthread_cond_wait(&Q.
mem, &Q.
mutex);
343 Jq.
C[t->
jid % SIMJs]++;
344 pthread_mutex_unlock(&Q.
mutex);
345 (*F.taskCopy)(&(t->d), d);
348 pthread_mutex_lock(&Q.
mutex);
349 signal = (activeQ() && Q.
waiters > 0);
350 Q.
A[Q.
head % SIMTs] = t;
356 pthread_mutex_unlock(&Q.
mutex);
359 pthread_cond_signal(&Q.
cond);
374 pthread_mutex_lock(&Q.
mutex);
376 while(jobsN() == SIMJs)
382 (*F.jobCopy)(&(Jq.
R[jid % SIMJs]), jd);
391 t = (task) container_of(d,
struct task, d);
396 pthread_mutex_lock(&Q.
mutex);
397 if(!post(1, t->
jid, d))
398 (*(F.run))(t->
b, d, &(Jq.
R[t->
jid % SIMJs]));
401 (*(F.run))(t->
b, d, &(Jq.
R[t->
jid % SIMJs]));
404 void* worker(
void *threadid)
414 tid = *((
int*)threadid);
416 b = &(toodles[tid-1]);
425 pthread_mutex_lock(&Q.
mutex);
426 while(activeQ() && 0 == tasksN())
438 pthread_mutex_unlock(&Q.
mutex);
440 (*F.run)(t->
b, &(t->d), &(Jq.
R[t->
jid % SIMJs]));
442 pthread_mutex_lock(&Q.
mutex);
443 signalJob = submit(t->
jid, &(t->d));
444 signalMem = recycle(t);
445 signalDismiss = !activeQ();
447 pthread_mutex_unlock(&Q.
mutex);
451 pthread_cond_signal(&Jq.
cond);
453 pthread_cond_signal(&Q.
mem);
455 pthread_cond_broadcast(&Q.
cond);
466 void* (*console)(
void *threadid),
474 void (*publish)(
int jid,
jobData* jd)
478 pthread_t wths[CORES];
484 F.closeBox = closeBox;
487 F.taskCopy = taskCopy;
497 printf(
"Using %d threads\n", w);
499 memset(toodles, 0x0, w*
sizeof(
toolBox));
505 pthread_create(&wths[i], NULL, worker, &tids[i]);
511 pthread_create(&cthread, NULL, console, NULL);
512 pthread_join(cthread, NULL);
520 pthread_join(wths[i], NULL);
int(* priority)(taskData *d)
toolBox * b
Pass job Id to thread.
Structure that stores processable information of a task.
int memDrop
The array is organized in the following order: