WvStreams
wvdbusconn.cc
1 /* -*- Mode: C++ -*-
2  * Worldvisions Weaver Software:
3  * Copyright (C) 2004-2006 Net Integration Technologies, Inc.
4  *
5  * Pathfinder Software:
6  * Copyright (C) 2007, Carillon Information Security Inc.
7  *
8  * This library is licensed under the LGPL, please read LICENSE for details.
9  *
10  */
11 #include "wvdbusconn.h"
12 #include "wvmoniker.h"
13 #include "wvstrutils.h"
14 #undef interface // windows
15 #include <dbus/dbus.h>
16 
17 
18 static WvString translate(WvStringParm dbus_moniker)
19 {
20  WvStringList l;
21  WvStringList::Iter i(l);
22 
23  if (!strncasecmp(dbus_moniker, "unix:", 5))
24  {
25  WvString path, tmpdir;
26  l.split(dbus_moniker+5, ",");
27  for (i.rewind(); i.next(); )
28  {
29  if (!strncasecmp(*i, "path=", 5))
30  path = *i + 5;
31  else if (!strncasecmp(*i, "abstract=", 9))
32  path = WvString("@%s", *i + 9);
33  else if (!strncasecmp(*i, "tmpdir=", 7))
34  tmpdir = *i + 7;
35  }
36  if (!!path)
37  return WvString("unix:%s", path);
38  else if (!!tmpdir)
39  return WvString("unix:%s/dbus.sock", tmpdir);
40  }
41  else if (!strncasecmp(dbus_moniker, "tcp:", 4))
42  {
43  WvString host, port, family;
44  l.split(dbus_moniker+4, ",");
45  for (i.rewind(); i.next(); )
46  {
47  if (!strncasecmp(*i, "family=", 7))
48  family = *i + 7;
49  else if (!strncasecmp(*i, "host=", 5))
50  host = *i + 5;
51  else if (!strncasecmp(*i, "port=", 5))
52  port = *i + 5;
53  }
54  if (!!host && !!port)
55  return WvString("tcp:%s:%s", host, port);
56  else if (!!host)
57  return WvString("tcp:%s", host);
58  else if (!!port)
59  return WvString("tcp:0.0.0.0:%s", port); // localhost
60  }
61 
62  return dbus_moniker; // unrecognized
63 }
64 
65 
66 static IWvStream *stream_creator(WvStringParm _s, IObject *)
67 {
68  WvString s(_s);
69 
70  if (!strcasecmp(s, "starter"))
71  {
72  WvString startbus(getenv("DBUS_STARTER_ADDRESS"));
73  if (!!startbus)
74  return IWvStream::create(translate(startbus));
75  else
76  {
77  WvString starttype(getenv("DBUS_STARTER_BUS_TYPE"));
78  if (!!starttype && !strcasecmp(starttype, "system"))
79  s = "system";
80  else if (!!starttype && !strcasecmp(starttype, "session"))
81  s = "session";
82  }
83  }
84 
85  if (!strcasecmp(s, "system"))
86  {
87  // NOTE: the environment variable for the address of the system
88  // bus is very often not set-- in that case, look in your dbus
89  // system bus config file (e.g. /etc/dbus-1/system.conf) for the
90  // raw address and either set this environment variable to that, or
91  // pass in the address directly
92  WvString bus(getenv("DBUS_SYSTEM_BUS_ADDRESS"));
93  if (!!bus)
94  return IWvStream::create(translate(bus));
95  }
96 
97  if (!strcasecmp(s, "session"))
98  {
99  WvString bus(getenv("DBUS_SESSION_BUS_ADDRESS"));
100  if (!!bus)
101  return IWvStream::create(translate(bus));
102  }
103 
104  return IWvStream::create(translate(s));
105 }
106 
107 static WvMoniker<IWvStream> reg("dbus", stream_creator);
108 
109 
110 static int conncount;
111 
112 WvDBusConn::WvDBusConn(IWvStream *_cloned, IWvDBusAuth *_auth, bool _client)
113  : WvStreamClone(_cloned),
114  log(WvString("DBus %s%s",
115  _client ? "" : "s",
116  ++conncount), WvLog::Debug5),
117  pending(10)
118 {
119  init(_auth, _client);
120 }
121 
122 
123 WvDBusConn::WvDBusConn(WvStringParm moniker, IWvDBusAuth *_auth, bool _client)
124  : WvStreamClone(IWvStream::create(moniker)),
125  log(WvString("DBus %s%s",
126  _client ? "" : "s",
127  ++conncount), WvLog::Debug5),
128  pending(10)
129 {
130  log("Connecting to '%s'\n", moniker);
131  init(_auth, _client);
132 }
133 
134 
135 void WvDBusConn::init(IWvDBusAuth *_auth, bool _client)
136 {
137  log("Initializing.\n");
138  client = _client;
139  auth = _auth ? _auth : new WvDBusClientAuth;
140  authorized = in_post_select = false;
141  if (!client) set_uniquename(WvString(":%s.0", conncount));
142 
143  if (!isok()) return;
144 
145  delay_output(true);
146 
147  // this will get enqueued until later, but we want to make sure it
148  // comes before anything the user tries to send - including anything
149  // goofy they enqueue in the authorization part.
150  if (client)
151  send_hello();
152 
153  try_auth();
154 }
155 
157 {
158  log("Shutting down.\n");
159  if (geterr())
160  log("Error was: %s\n", errstr());
161 
162  close();
163 
164  delete auth;
165 }
166 
167 
169 {
170  if (!closed)
171  log("Closing.\n");
173 }
174 
175 
177 {
178  return _uniquename;
179 }
180 
181 
182 void WvDBusConn::request_name(WvStringParm name, const WvDBusCallback &onreply,
183  time_t msec_timeout)
184 {
185  uint32_t flags = (DBUS_NAME_FLAG_ALLOW_REPLACEMENT |
186  DBUS_NAME_FLAG_REPLACE_EXISTING);
187  WvDBusMsg msg("org.freedesktop.DBus", "/org/freedesktop/DBus",
188  "org.freedesktop.DBus", "RequestName");
189  msg.append(name).append(flags);
190  send(msg, onreply, msec_timeout);
191 }
192 
193 
195 {
196  msg.marshal(out_queue);
197  if (authorized)
198  {
199  log(" >> %s\n", msg);
200  write(out_queue);
201  }
202  else
203  log(" .> %s\n", msg);
204  return msg.get_serial();
205 }
206 
207 
208 void WvDBusConn::send(WvDBusMsg msg, const WvDBusCallback &onreply,
209  time_t msec_timeout)
210 {
211  send(msg);
212  if (onreply)
213  add_pending(msg, onreply, msec_timeout);
214 }
215 
216 
218 {
219 public:
220  WvDBusMsg *reply;
221 
222  xxReplyWaiter()
223  { reply = NULL; }
224  ~xxReplyWaiter()
225  { delete reply; }
226  bool reply_wait(WvDBusMsg &msg)
227  { reply = new WvDBusMsg(msg); return true; }
228 };
229 
230 
232  wv::function<void(uint32_t)> serial_cb)
233 {
234  xxReplyWaiter rw;
235 
236  send(msg, wv::bind(&xxReplyWaiter::reply_wait, &rw, _1),
237  msec_timeout);
238  if (serial_cb)
239  serial_cb(msg.get_serial());
240  while (!rw.reply && isok())
241  runonce();
242  if (!rw.reply)
243  return WvDBusError(msg, DBUS_ERROR_FAILED,
244  WvString("Connection closed (%s) "
245  "while waiting for reply.",
246  errstr()));
247  else
248  return *rw.reply;
249 }
250 
251 
252 void WvDBusConn::out(WvStringParm s)
253 {
254  log(" >> %s", s);
255  print(s);
256 }
257 
258 
259 const char *WvDBusConn::in()
260 {
261  const char *s = trim_string(getline(0));
262  if (s)
263  log("<< %s\n", s);
264  return s;
265 }
266 
267 
268 void WvDBusConn::send_hello()
269 {
270  WvDBusMsg msg("org.freedesktop.DBus", "/org/freedesktop/DBus",
271  "org.freedesktop.DBus", "Hello");
272  send(msg, wv::bind(&WvDBusConn::_registered, this, _1));
273  WvDBusMsg msg2("org.freedesktop.DBus", "/org/freedesktop/DBus",
274  "org.freedesktop.DBus", "AddMatch");
275  msg2.append("type='signal'");
276  send(msg2); // don't need to monitor this for completion
277 }
278 
279 
280 void WvDBusConn::set_uniquename(WvStringParm s)
281 {
282  // we want to print the message before switching log.app, so that we
283  // can trace which log.app turned into which
284  log("Assigned name '%s'\n", s);
285  _uniquename = s;
286  log.app = WvString("DBus %s%s", client ? "" : "s", uniquename());
287 }
288 
289 
290 void WvDBusConn::try_auth()
291 {
292  bool done = auth->authorize(*this);
293  if (done)
294  {
295  // ready to send messages!
296  if (out_queue.used())
297  {
298  log(" >> (sending enqueued messages)\n");
299  write(out_queue);
300  }
301 
302  authorized = true;
303  }
304 }
305 
306 
307 void WvDBusConn::add_callback(CallbackPri pri, WvDBusCallback cb, void *cookie)
308 {
309  callbacks.append(new CallbackInfo(pri, cb, cookie), true);
310 }
311 
312 
313 void WvDBusConn::del_callback(void *cookie)
314 {
315  // remember, there might be more than one callback with the same cookie.
316  CallbackInfoList::Iter i(callbacks);
317  for (i.rewind(); i.next(); )
318  if (i->cookie == cookie)
319  i.xunlink();
320 }
321 
322 
323 int WvDBusConn::priority_order(const CallbackInfo *a, const CallbackInfo *b)
324 {
325  return a->pri - b->pri;
326 }
327 
329 {
330  log("<< %s\n", msg);
331 
332  // handle replies
333  uint32_t rserial = msg.get_replyserial();
334  if (rserial)
335  {
336  Pending *p = pending[rserial];
337  if (p)
338  {
339  p->cb(msg);
340  pending.remove(p);
341  return true; // handled it
342  }
343  }
344 
345  // handle all the generic filters
346  CallbackInfoList::Sorter i(callbacks, priority_order);
347  for (i.rewind(); i.next(); )
348  {
349  bool handled = i->cb(msg);
350  if (handled) return true;
351  }
352 
353  return false; // couldn't handle the message, sorry
354 }
355 
356 
357 WvDBusClientAuth::WvDBusClientAuth()
358 {
359  sent_request = false;
360 }
361 
362 
363 wvuid_t WvDBusClientAuth::get_uid()
364 {
365  return wvgetuid();
366 }
367 
368 
370 {
371  if (!sent_request)
372  {
373  c.write("\0", 1);
374  WvString uid = get_uid();
375  c.out("AUTH EXTERNAL %s\r\n\0", WvHexEncoder().strflushstr(uid));
376  sent_request = true;
377  }
378  else
379  {
380  const char *line = c.in();
381  if (line)
382  {
383  if (!strncasecmp(line, "OK ", 3))
384  {
385  c.out("BEGIN\r\n");
386  return true;
387  }
388  else if (!strncasecmp(line, "ERROR ", 6))
389  c.seterr("Auth failed: %s", line);
390  else
391  c.seterr("Unknown AUTH response: '%s'", line);
392  }
393  }
394 
395  return false;
396 }
397 
398 
399 time_t WvDBusConn::mintimeout_msec()
400 {
401  WvTime when = 0;
402  PendingDict::Iter i(pending);
403  for (i.rewind(); i.next(); )
404  {
405  if (!when || when > i->valid_until)
406  when = i->valid_until;
407  }
408  if (!when)
409  return -1;
410  else if (when <= wvstime())
411  return 0;
412  else
413  return msecdiff(when, wvstime());
414 }
415 
416 
417 bool WvDBusConn::post_select(SelectInfo &si)
418 {
419  bool ready = WvStreamClone::post_select(si);
420  if (si.inherit_request) return ready;
421 
422  if (in_post_select) return false;
423  in_post_select = true;
424 
425  if (!authorized && ready)
426  try_auth();
427 
428  if (!alarm_remaining())
429  {
430  WvTime now = wvstime();
431  PendingDict::Iter i(pending);
432  for (i.rewind(); i.next(); )
433  {
434  if (now > i->valid_until)
435  {
436  log("Expiring %s\n", i->msg);
437  expire_pending(i.ptr());
438  i.rewind();
439  }
440  }
441  }
442 
443  if (authorized && ready)
444  {
445  // put this in a loop so that wvdbusd can forward packets rapidly.
446  // Otherwise TCP_NODELAY kicks in, because we do a select() loop
447  // between packets, which causes delay_output() to flush.
448  bool ran;
449  do
450  {
451  ran = false;
452  size_t needed = WvDBusMsg::demarshal_bytes_needed(in_queue);
453  size_t amt = needed - in_queue.used();
454  if (amt < 4096)
455  amt = 4096;
456  read(in_queue, amt);
457  WvDBusMsg *m;
458  while ((m = WvDBusMsg::demarshal(in_queue)) != NULL)
459  {
460  ran = true;
461  filter_func(*m);
462  delete m;
463  }
464  } while (ran);
465  }
466 
467  alarm(mintimeout_msec());
468  in_post_select = false;
469  return false;
470 }
471 
472 
474 {
475  return !out_queue.used() && pending.isempty();
476 }
477 
478 
479 void WvDBusConn::expire_pending(Pending *p)
480 {
481  if (p)
482  {
483  WvDBusCallback xcb(p->cb);
484  pending.remove(p); // prevent accidental recursion
485  WvDBusError e(p->msg, DBUS_ERROR_FAILED,
486  "Timed out while waiting for reply");
487  xcb(e);
488  }
489 }
490 
491 
492 void WvDBusConn::cancel_pending(uint32_t serial)
493 {
494  Pending *p = pending[serial];
495  if (p)
496  {
497  WvDBusCallback xcb(p->cb);
498  WvDBusMsg msg(p->msg);
499  pending.remove(p); // prevent accidental recursion
500  WvDBusError e(msg, DBUS_ERROR_FAILED,
501  "Canceled while waiting for reply");
502  xcb(e);
503  }
504 }
505 
506 
507 void WvDBusConn::add_pending(WvDBusMsg &msg, WvDBusCallback cb,
508  time_t msec_timeout)
509 {
510  uint32_t serial = msg.get_serial();
511  assert(serial);
512  if (pending[serial])
513  cancel_pending(serial);
514  pending.add(new Pending(msg, cb, msec_timeout), true);
515  alarm(mintimeout_msec());
516 }
517 
518 
519 bool WvDBusConn::_registered(WvDBusMsg &msg)
520 {
521  WvDBusMsg::Iter i(msg);
522  _uniquename = i.getnext().get_str();
523  set_uniquename(_uniquename);
524  return true;
525 }
526 
WvStream::delay_output
void delay_output(bool is_delayed)
force write() to always buffer output.
Definition: wvstream.h:246
WvDBusMsg::demarshal
static WvDBusMsg * demarshal(WvBuf &buf)
Demarshals a new WvDBusMsg from a buffer containing its binary DBus protocol representation.
Definition: wvdbusmarshal.cc:27
WvStream::alarm_remaining
time_t alarm_remaining()
return the number of milliseconds remaining before the alarm will go off; -1 means no alarm is set (i...
Definition: wvstream.cc:1057
WvDBusConn::WvDBusConn
WvDBusConn(WvStringParm moniker, IWvDBusAuth *_auth=NULL, bool _client=true)
Creates a new dbus connection using the given WvStreams moniker.
Definition: wvdbusconn.cc:123
WvStream::runonce
void runonce(time_t msec_timeout=-1)
Exactly the same as: if (select(timeout)) callback();.
Definition: wvstream.h:391
WvStream::write
virtual size_t write(const void *buf, size_t count)
Write data to the stream.
Definition: wvstream.cc:532
WvDBusConn::add_callback
void add_callback(CallbackPri pri, WvDBusCallback cb, void *cookie=NULL)
Adds a callback to the connection: all received messages will be sent to all callbacks to look at and...
Definition: wvdbusconn.cc:307
WvDBusConn::send
uint32_t send(WvDBusMsg msg)
Send a message on the bus, not expecting any reply.
Definition: wvdbusconn.cc:194
WvDBusConn::del_callback
void del_callback(void *cookie)
Delete all callbacks that have the given cookie.
Definition: wvdbusconn.cc:313
WvDBusError
Definition: wvdbusmsg.h:299
WvStreamClone::close
virtual void close()
Close this stream.
Definition: wvstreamclone.cc:83
WvDBusMsg::append
WvDBusMsg & append(const char *s)
The following methods are designed to allow appending various arguments to the message.
Definition: wvdbusmsg.cc:461
WvDBusConn::request_name
void request_name(WvStringParm name, const WvDBusCallback &onreply=0, time_t msec_timeout=WVDBUS_DEFAULT_TIMEOUT)
Request the given service name on DBus.
Definition: wvdbusconn.cc:182
IWvDBusAuth
Definition: wvdbusconn.h:34
xxReplyWaiter
Definition: wvdbusconn.cc:217
WvTime
Based on (and interchangeable with) struct timeval.
Definition: wvtimeutils.h:17
WvDBusClientAuth::authorize
virtual bool authorize(WvDBusConn &c)
Main action callback.
Definition: wvdbusconn.cc:369
IWvStream
Definition: iwvstream.h:24
trim_string
char * trim_string(char *string)
Trims whitespace from the beginning and end of the character string, including carriage return / line...
Definition: strutils.cc:59
WvDBusConn::uniquename
WvString uniquename() const
Return this connection's unique name on the bus, assigned by the server at connect time.
Definition: wvdbusconn.cc:176
WvDBusMsg
Definition: wvdbusmsg.h:28
WvDBusMsg::marshal
void marshal(WvBuf &buf)
Locks this message, encodes it in DBus binary protocol format, and adds it to the given buffer.
Definition: wvdbusmarshal.cc:83
WvStream::seterr
virtual void seterr(int _errnum)
Override seterr() from WvError so that it auto-closes the stream.
Definition: wvstream.cc:451
WvString
WvString is an implementation of a simple and efficient printable-string class.
Definition: wvstring.h:329
WvLog
A WvLog stream accepts log messages from applications and forwards them to all registered WvLogRcv's.
Definition: wvlog.h:56
WvDBusMsg::Iter
Definition: wvdbusmsg.h:176
wvstrutils.h
WvStream::getline
char * getline(time_t wait_msec=0, char separator='\n', int readahead=1024)
Read up to one line of data from the stream and return a pointer to the internal buffer containing th...
Definition: wvstream.h:175
WvMoniker
A type-safe version of WvMonikerBase that lets you provide create functions for object types other th...
Definition: wvmoniker.h:61
WvDBusConn::close
virtual void close()
Close the underlying stream.
Definition: wvdbusconn.cc:168
WvDBusConn::isidle
bool isidle()
Returns true if there are no outstanding messages that have not received (or timed out) their reply.
Definition: wvdbusconn.cc:473
WvStream::read
virtual size_t read(void *buf, size_t count)
read a data block on the stream.
Definition: wvstream.cc:490
WvStreamClone
WvStreamClone simply forwards all requests to the "cloned" stream.
Definition: wvstreamclone.h:23
IObject
Definition: IObject.h:65
WvStream::alarm
void alarm(time_t msec_timeout)
set an alarm, ie.
Definition: wvstream.cc:1048
WvHexEncoder
A hex encoder.
Definition: wvhex.h:21
WvDBusConn::~WvDBusConn
virtual ~WvDBusConn()
Release this connection.
Definition: wvdbusconn.cc:156
WvStreamClone::geterr
virtual int geterr() const
If isok() is false, return the system error number corresponding to the error, -1 for a special error...
Definition: wvstreamclone.cc:149
WvStreamClone::isok
virtual bool isok() const
return true if the stream is actually usable right now
Definition: wvstreamclone.cc:136
WvDBusConn::filter_func
virtual bool filter_func(WvDBusMsg &msg)
Called by for each received message.
Definition: wvdbusconn.cc:328
WvBufBaseCommonImpl::used
size_t used() const
Returns the number of elements in the buffer currently available for reading.
Definition: wvbufbase.h:92
WvDBusClientAuth
Definition: wvdbusconn.h:55
WvStringList
This is a WvList of WvStrings, and is a really handy way to parse strings.
Definition: wvstringlist.h:27
IWvDBusAuth::authorize
virtual bool authorize(WvDBusConn &c)=0
Main action callback.
WvStringList::split
void split(WvStringParm s, const char *splitchars=" \t\r\n", int limit=0)
split s and form a list ignoring splitchars (except at beginning and end) ie.
Definition: wvstringlist.cc:19
WvDBusConn::send_and_wait
WvDBusMsg send_and_wait(WvDBusMsg msg, time_t msec_timeout=WVDBUS_DEFAULT_TIMEOUT, wv::function< void(uint32_t)> serial_cb=0)
Send a message on the bus and wait for a reply to come in, returning the message when it does.
Definition: wvdbusconn.cc:231
WvStreamClone::post_select
virtual bool post_select(SelectInfo &si)
post_select() is called after ::select(), and returns true if this object is now ready.
Definition: wvstreamclone.cc:222
WvDBusConn::CallbackPri
CallbackPri
The priority level of a callback registration.
Definition: wvdbusconn.h:170
WvDBusMsg::demarshal_bytes_needed
static size_t demarshal_bytes_needed(WvBuf &buf)
Given a buffer containing what might be the header of a DBus message, checks how many bytes need to b...
Definition: wvdbusmarshal.cc:70
WvDBusConn
Definition: wvdbusconn.h:65