WvStreams
wvsubprocqueue.cc
1 /*
2  * Worldvisions Weaver Software:
3  * Copyright (C) 1997-2002 Net Integration Technologies, Inc.
4  *
5  * A way to enqueue a series of WvSubProc objects. See wvsubprocqueue.h.
6  */
7 #include "wvsubprocqueue.h"
8 #include <unistd.h>
9 #include <assert.h>
10 
11 
12 WvSubProcQueue::WvSubProcQueue(unsigned _maxrunning)
13 {
14  maxrunning = _maxrunning;
15 }
16 
17 
18 WvSubProcQueue::~WvSubProcQueue()
19 {
20 }
21 
22 
23 void WvSubProcQueue::add(void *cookie, WvSubProc *proc)
24 {
25  assert(proc);
26  assert(!proc->running);
27  if (cookie)
28  {
29  // search for other enqueued objects with this cookie
30  EntList::Iter i(waitq);
31  for (i.rewind(); i.next(); )
32  {
33  if (i->cookie == cookie)
34  {
35  // already enqueued; mark it as "redo" unless it's already
36  // the last one. That way we guarantee it'll still run
37  // in the future from now, and it'll come later than anything
38  // else in the queue, but it won't pointlessly run twice at
39  // the end.
40  Ent *e = i.ptr();
41  if (i.next())
42  e->redo = true;
43  delete proc;
44  return;
45  }
46  }
47  }
48 
49  waitq.append(new Ent(cookie, proc), true);
50 }
51 
52 
53 void WvSubProcQueue::add(void *cookie,
54  const char *cmd, const char * const *argv)
55 {
56  WvSubProc *p = new WvSubProc;
57  p->preparev(cmd, argv);
58  add(cookie, p);
59 }
60 
61 
62 bool WvSubProcQueue::cookie_running()
63 {
64  EntList::Iter i(runq);
65  for (i.rewind(); i.next(); )
66  if (i->cookie)
67  return true;
68  return false;
69 }
70 
71 
73 {
74  int started = 0;
75 
76  //fprintf(stderr, "go: %d waiting, %d running\n",
77  // waitq.count(), runq.count());
78 
79  // first we need to clean up any finished processes
80  {
81  EntList::Iter i(runq);
82  for (i.rewind(); i.next(); )
83  {
84  Ent *e = i.ptr();
85 
86  e->proc->wait(0, true);
87  if (!e->proc->running)
88  {
89  if (e->redo)
90  {
91  // someone re-enqueued this task while it was
92  // waiting/running
93  e->redo = false;
94  i.xunlink(false);
95  waitq.append(e, true);
96  }
97  else
98  i.xunlink();
99  }
100  }
101  }
102 
103  while (!waitq.isempty() && runq.count() < maxrunning)
104  {
105  EntList::Iter i(waitq);
106  for (i.rewind(); i.next(); )
107  {
108  // elements with cookies are "sync points" in the queue;
109  // they guarantee that everything before that point has
110  // finished running before they run, and don't let anything
111  // after them run until they've finished.
112  if (i->cookie && !runq.isempty())
113  goto out;
114  if (cookie_running())
115  goto out;
116 
117  // jump it into the running queue, but be careful not to
118  // delete the object when removing!
119  Ent *e = i.ptr();
120  i.xunlink(false);
121  runq.append(e, true);
122  e->proc->start_again();
123  started++;
124  break;
125  }
126  }
127 
128 out:
129  assert(runq.count() <= maxrunning);
130  return started;
131 }
132 
133 
134 unsigned WvSubProcQueue::running() const
135 {
136  return runq.count();
137 }
138 
139 
141 {
142  return runq.count() + waitq.count();
143 }
144 
145 
147 {
148  return runq.isempty() && waitq.isempty();
149 }
150 
151 
153 {
154  while (!isempty())
155  {
156  go();
157  if (!isempty())
158  usleep(100*1000);
159  }
160 }
WvSubProcQueue::add
void add(void *cookie, WvSubProc *proc)
Enqueue a process.
Definition: wvsubprocqueue.cc:23
WvSubProcQueue::finish
void finish()
Wait synchronously for all processes in the entire queue to finish.
Definition: wvsubprocqueue.cc:152
WvSubProc
Definition: wvsubproc.h:29
WvSubProcQueue::running
unsigned running() const
Return the number of currently running processes.
Definition: wvsubprocqueue.cc:134
WvSubProcQueue::isempty
bool isempty() const
True if there are no unfinished (ie. running or waiting) processes.
Definition: wvsubprocqueue.cc:146
WvSubProcQueue::go
int go()
Clean up after any running processes in the queue, and start running additional processes if any are ...
Definition: wvsubprocqueue.cc:72
WvSubProcQueue::remaining
unsigned remaining() const
Return the number of unfinished (ie. running or waiting) processes.
Definition: wvsubprocqueue.cc:140
WvSubProcQueue::WvSubProcQueue
WvSubProcQueue(unsigned _maxrunning)
Create a WvSubProcQueue.
Definition: wvsubprocqueue.cc:12