RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-test.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SCHEDULER_TEST_HPP)
6 #define RXCPP_RX_SCHEDULER_TEST_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace schedulers {
13 
14 namespace detail {
15 
16 class test_type : public scheduler_interface
17 {
18 public:
19 
20  typedef scheduler_interface::clock_type clock_type;
21 
22  struct test_type_state : public virtual_time<long, long>
23  {
24  typedef virtual_time<long, long> base;
25 
27  using base::schedule_relative;
28 
29  clock_type::time_point now() const {
30  return to_time_point(clock_now);
31  }
32 
33  virtual void schedule_absolute(long when, const schedulable& a) const
34  {
35  if (when <= base::clock_now)
36  when = base::clock_now + 1;
37 
38  return base::schedule_absolute(when, a);
39  }
40 
41  virtual long add(long absolute, long relative) const
42  {
43  return absolute + relative;
44  }
45 
46  virtual clock_type::time_point to_time_point(long absolute) const
47  {
48  return clock_type::time_point(std::chrono::milliseconds(absolute));
49  }
50 
51  virtual long to_relative(clock_type::duration d) const
52  {
53  return static_cast<long>(std::chrono::duration_cast<std::chrono::milliseconds>(d).count());
54  }
55  };
56 
57 private:
58  mutable std::shared_ptr<test_type_state> state;
59 
60 public:
61  struct test_type_worker : public worker_interface
62  {
63  mutable std::shared_ptr<test_type_state> state;
64 
65  typedef test_type_state::absolute absolute;
66  typedef test_type_state::relative relative;
67 
68  test_type_worker(std::shared_ptr<test_type_state> st)
69  : state(std::move(st))
70  {
71  }
72 
73  virtual clock_type::time_point now() const {
74  return state->now();
75  }
76 
77  virtual void schedule(const schedulable& scbl) const {
78  state->schedule_absolute(state->clock(), scbl);
79  }
80 
81  virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
82  state->schedule_relative(state->to_relative(when - now()), scbl);
83  }
84 
85  void schedule_absolute(absolute when, const schedulable& scbl) const {
86  state->schedule_absolute(when, scbl);
87  }
88 
89  void schedule_relative(relative when, const schedulable& scbl) const {
90  state->schedule_relative(when, scbl);
91  }
92 
93  bool is_enabled() const {return state->is_enabled();}
94  absolute clock() const {return state->clock();}
95 
96  void start() const
97  {
98  state->start();
99  }
100 
101  void stop() const
102  {
103  state->stop();
104  }
105 
106  void advance_to(absolute time) const
107  {
108  state->advance_to(time);
109  }
110 
111  void advance_by(relative time) const
112  {
113  state->advance_by(time);
114  }
115 
116  void sleep(relative time) const
117  {
118  state->sleep(time);
119  }
120 
121  template<class T>
122  subscriber<T, rxt::testable_observer<T>> make_subscriber() const;
123  };
124 
125 public:
126  test_type()
127  : state(std::make_shared<test_type_state>())
128  {
129  }
130 
131  virtual clock_type::time_point now() const {
132  return state->now();
133  }
134 
135  virtual worker create_worker(composite_subscription cs) const {
136  return worker(cs, std::make_shared<test_type_worker>(state));
137  }
138 
139  bool is_enabled() const {return state->is_enabled();}
140  long clock() {
141  return state->clock();
142  }
143 
144  clock_type::time_point to_time_point(long absolute) const {
145  return state->to_time_point(absolute);
146  }
147 
148  std::shared_ptr<test_type_worker> create_test_type_worker_interface() const {
149  return std::make_shared<test_type_worker>(state);
150  }
151 
152  template<class T>
153  rxt::testable_observable<T> make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const;
154 
155  template<class T>
156  rxt::testable_observable<T> make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const;
157 };
158 
159 template<class T>
160 class mock_observer
161  : public rxt::detail::test_subject_base<T>
162 {
163  typedef typename rxn::notification<T> notification_type;
164  typedef rxn::recorded<typename notification_type::type> recorded_type;
165 
166 public:
167  explicit mock_observer(std::shared_ptr<test_type::test_type_state> sc)
168  : sc(sc)
169  {
170  }
171 
172  std::shared_ptr<test_type::test_type_state> sc;
173  std::vector<recorded_type> m;
174 
175  virtual void on_subscribe(subscriber<T>) const {
176  std::terminate();
177  }
178  virtual std::vector<rxn::subscription> subscriptions() const {
179  std::terminate();
180  }
181 
182  virtual std::vector<recorded_type> messages() const {
183  return m;
184  }
185 };
186 
187 template<class T>
188 subscriber<T, rxt::testable_observer<T>> test_type::test_type_worker::make_subscriber() const
189 {
190  typedef typename rxn::notification<T> notification_type;
191  typedef rxn::recorded<typename notification_type::type> recorded_type;
192 
193  auto ts = std::make_shared<mock_observer<T>>(state);
194 
195  return rxcpp::make_subscriber<T>(rxt::testable_observer<T>(ts, make_observer_dynamic<T>(
196  // on_next
197  [ts](T value)
198  {
199  ts->m.push_back(
200  recorded_type(ts->sc->clock(), notification_type::on_next(value)));
201  },
202  // on_error
203  [ts](std::exception_ptr e)
204  {
205  ts->m.push_back(
206  recorded_type(ts->sc->clock(), notification_type::on_error(e)));
207  },
208  // on_completed
209  [ts]()
210  {
211  ts->m.push_back(
212  recorded_type(ts->sc->clock(), notification_type::on_completed()));
213  })));
214 }
215 
216 template<class T>
217 class cold_observable
218  : public rxt::detail::test_subject_base<T>
219 {
220  typedef cold_observable<T> this_type;
221  std::shared_ptr<test_type::test_type_state> sc;
222  typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
223  mutable std::vector<recorded_type> mv;
224  mutable std::vector<rxn::subscription> sv;
225  mutable worker controller;
226 
227 public:
228 
229  cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
230  : sc(sc)
231  , mv(std::move(mv))
232  , controller(w)
233  {
234  }
235 
236  template<class Iterator>
237  cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, Iterator begin, Iterator end)
238  : sc(sc)
239  , mv(begin, end)
240  , controller(w)
241  {
242  }
243 
244  virtual void on_subscribe(subscriber<T> o) const {
245  sv.push_back(rxn::subscription(sc->clock()));
246  auto index = sv.size() - 1;
247 
248  for (auto& message : mv) {
249  auto n = message.value();
250  sc->schedule_relative(message.time(), make_schedulable(
251  controller,
252  [n, o](const schedulable&) {
253  if (o.is_subscribed()) {
254  n->accept(o);
255  }
256  }));
257  }
258 
259  auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this());
260  o.add([sharedThis, index]() {
261  sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
262  });
263  }
264 
265  virtual std::vector<rxn::subscription> subscriptions() const {
266  return sv;
267  }
268 
269  virtual std::vector<recorded_type> messages() const {
270  return mv;
271  }
272 };
273 
274 template<class T>
275 rxt::testable_observable<T> test_type::make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const
276 {
277  auto co = std::make_shared<cold_observable<T>>(state, create_worker(composite_subscription()), std::move(messages));
278  return rxt::testable_observable<T>(co);
279 }
280 
281 template<class T>
282 class hot_observable
283  : public rxt::detail::test_subject_base<T>
284 {
285  typedef hot_observable<T> this_type;
286  std::shared_ptr<test_type::test_type_state> sc;
287  typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
288  typedef subscriber<T> observer_type;
289  mutable std::vector<recorded_type> mv;
290  mutable std::vector<rxn::subscription> sv;
291  mutable std::list<observer_type> observers;
292  mutable worker controller;
293 
294 public:
295 
296  hot_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
297  : sc(sc)
298  , mv(mv)
299  , controller(w)
300  {
301  for (auto& message : mv) {
302  auto n = message.value();
303  sc->schedule_absolute(message.time(), make_schedulable(
304  controller,
305  [this, n](const schedulable&) {
306  auto local = this->observers;
307  for (auto& o : local) {
308  if (o.is_subscribed()) {
309  n->accept(o);
310  }
311  }
312  }));
313  }
314  }
315 
316  virtual ~hot_observable() {}
317 
318  virtual void on_subscribe(observer_type o) const {
319  auto olocation = observers.insert(observers.end(), o);
320 
321  sv.push_back(rxn::subscription(sc->clock()));
322  auto index = sv.size() - 1;
323 
324  auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this());
325  o.add([sharedThis, index, olocation]() {
326  sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
327  sharedThis->observers.erase(olocation);
328  });
329  }
330 
331  virtual std::vector<rxn::subscription> subscriptions() const {
332  return sv;
333  }
334 
335  virtual std::vector<recorded_type> messages() const {
336  return mv;
337  }
338 };
339 
340 template<class T>
341 rxt::testable_observable<T> test_type::make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const
342 {
343  return rxt::testable_observable<T>(
344  std::make_shared<hot_observable<T>>(state, create_worker(composite_subscription()), std::move(messages)));
345 }
346 
347 template<class F>
348 struct is_create_source_function
349 {
350  struct not_void {};
351  template<class CF>
352  static auto check(int) -> decltype((*(CF*)nullptr)());
353  template<class CF>
354  static not_void check(...);
355 
356  static const bool value = is_observable<decltype(check<rxu::decay_t<F>>(0))>::value;
357 };
358 
359 }
360 
361 class test : public scheduler
362 {
363  std::shared_ptr<detail::test_type> tester;
364 public:
365 
366  explicit test(std::shared_ptr<detail::test_type> t)
367  : scheduler(std::static_pointer_cast<scheduler_interface>(t))
368  , tester(t)
369  {
370  }
371 
372  typedef detail::test_type::clock_type clock_type;
373 
374  static const long created_time = 100;
375  static const long subscribed_time = 200;
376  static const long unsubscribed_time = 1000;
377 
378  template<class T>
379  struct messages
380  {
384 
385  messages() {}
386 
387  template<typename U>
388  static recorded_type next(long ticks, U value) {
389  return recorded_type(ticks, notification_type::on_next(std::move(value)));
390  }
391 
392  static recorded_type completed(long ticks) {
394  }
395 
396  template<typename Exception>
397  static recorded_type error(long ticks, Exception&& e) {
398  return recorded_type(ticks, notification_type::on_error(std::forward<Exception>(e)));
399  }
400 
401  static rxn::subscription subscribe(long subscribe, long unsubscribe) {
402  return rxn::subscription(subscribe, unsubscribe);
403  }
404  };
405 
406  class test_worker : public worker
407  {
408  std::shared_ptr<detail::test_type::test_type_worker> tester;
409  public:
410 
412  }
413 
414  explicit test_worker(composite_subscription cs, std::shared_ptr<detail::test_type::test_type_worker> t)
415  : worker(cs, std::static_pointer_cast<worker_interface>(t))
416  , tester(t)
417  {
418  }
419 
420  bool is_enabled() const {return tester->is_enabled();}
421  long clock() const {return tester->clock();}
422 
423  void schedule_absolute(long when, const schedulable& a) const {
424  tester->schedule_absolute(when, a);
425  }
426 
427  void schedule_relative(long when, const schedulable& a) const {
428  tester->schedule_relative(when, a);
429  }
430 
431  template<class Arg0, class... ArgN>
432  auto schedule_absolute(long when, Arg0&& a0, ArgN&&... an) const
433  -> typename std::enable_if<
434  (detail::is_action_function<Arg0>::value ||
437  tester->schedule_absolute(when, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
438  }
439 
440  template<class Arg0, class... ArgN>
441  auto schedule_relative(long when, Arg0&& a0, ArgN&&... an) const
442  -> typename std::enable_if<
443  (detail::is_action_function<Arg0>::value ||
446  tester->schedule_relative(when, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
447  }
448 
449  void advance_to(long time) const
450  {
451  tester->advance_to(time);
452  }
453 
454  void advance_by(long time) const
455  {
456  tester->advance_by(time);
457  }
458 
459  void sleep(long time) const
460  {
461  tester->sleep(time);
462  }
463 
464  template<class T, class F>
465  auto start(F createSource, long created, long subscribed, long unsubscribed) const
467  {
468  struct state_type
469  : public std::enable_shared_from_this<state_type>
470  {
471  typedef decltype(createSource()) source_type;
472 
473  std::unique_ptr<source_type> source;
475 
476  explicit state_type(subscriber<T, rxt::testable_observer<T>> o)
477  : source()
478  , o(o)
479  {
480  }
481  };
482  auto state = std::make_shared<state_type>(this->make_subscriber<T>());
483 
484  schedule_absolute(created, [createSource, state](const schedulable&) {
485  state->source.reset(new typename state_type::source_type(createSource()));
486  });
487  schedule_absolute(subscribed, [state](const schedulable&) {
488  state->source->subscribe(state->o);
489  });
490  schedule_absolute(unsubscribed, [state](const schedulable&) {
491  state->o.unsubscribe();
492  });
493 
494  tester->start();
495 
496  return state->o;
497  }
498 
499  template<class T, class F>
500  auto start(F&& createSource, long unsubscribed) const
502  {
503  return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed);
504  }
505 
506  template<class T, class F>
507  auto start(F&& createSource) const
509  {
510  return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed_time);
511  }
512 
513  template<class F>
515  {
516  typedef decltype((*(F*)nullptr)()) source_type;
517  typedef typename source_type::value_type value_type;
518  typedef subscriber<value_type, rxt::testable_observer<value_type>> subscriber_type;
519  };
520 
521  template<class F>
522  auto start(F createSource, long created, long subscribed, long unsubscribed) const
523  -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type
524  {
525  return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created, subscribed, unsubscribed);
526  }
527 
528  template<class F>
529  auto start(F createSource, long unsubscribed) const
530  -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type
531  {
532  return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created_time, subscribed_time, unsubscribed);
533  }
534 
535  template<class F>
536  auto start(F createSource) const
537  -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type
538  {
539  return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created_time, subscribed_time, unsubscribed_time);
540  }
541 
542  void start() const {
543  tester->start();
544  }
545 
546  template<class T>
548  return tester->make_subscriber<T>();
549  }
550  };
551 
552  clock_type::time_point now() const {
553  return tester->now();
554  }
555 
557  return test_worker(cs, tester->create_test_type_worker_interface());
558  }
559 
560  bool is_enabled() const {return tester->is_enabled();}
561  long clock() const {return tester->clock();}
562 
563  clock_type::time_point to_time_point(long absolute) const {
564  return tester->to_time_point(absolute);
565  }
566 
567  template<class T>
568  rxt::testable_observable<T> make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const{
569  return tester->make_hot_observable(std::move(messages));
570  }
571 
572  template<class T, std::size_t size>
573  auto make_hot_observable(const T (&arr) [size]) const
574  -> decltype(tester->make_hot_observable(std::vector<T>())) {
575  return tester->make_hot_observable(rxu::to_vector(arr));
576  }
577 
578  template<class T>
579  auto make_hot_observable(std::initializer_list<T> il) const
580  -> decltype(tester->make_hot_observable(std::vector<T>())) {
581  return tester->make_hot_observable(std::vector<T>(il));
582  }
583 
584  template<class T>
585  rxt::testable_observable<T> make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const {
586  return tester->make_cold_observable(std::move(messages));
587  }
588 
589  template<class T, std::size_t size>
590  auto make_cold_observable(const T (&arr) [size]) const
591  -> decltype(tester->make_cold_observable(std::vector<T>())) {
592  return tester->make_cold_observable(rxu::to_vector(arr));
593  }
594 
595  template<class T>
596  auto make_cold_observable(std::initializer_list<T> il) const
597  -> decltype(tester->make_cold_observable(std::vector<T>())) {
598  return tester->make_cold_observable(std::vector<T>(il));
599  }
600 };
601 
602 
603 inline test make_test() {
604  return test(std::make_shared<detail::test_type>());
605 }
606 
607 }
608 
611  return r;
612 }
613 
614 }
615 
616 #endif
Definition: rx-notification.hpp:253
Definition: rx-scheduler.hpp:163
static recorded_type completed(long ticks)
Definition: rx-test.hpp:392
auto make_hot_observable(const T(&arr) [size]) const -> decltype(tester->make_hot_observable(std::vector< T >()))
Definition: rx-test.hpp:573
auto make_cold_observable(std::initializer_list< T > il) const -> decltype(tester->make_cold_observable(std::vector< T >()))
Definition: rx-test.hpp:596
detail::test_type::clock_type clock_type
Definition: rx-test.hpp:372
void advance_to(long time) const
Definition: rx-test.hpp:449
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
~test_worker()
Definition: rx-test.hpp:411
test(std::shared_ptr< detail::test_type > t)
Definition: rx-test.hpp:366
static recorded_type next(long ticks, U value)
Definition: rx-test.hpp:388
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
auto make_schedulable(const schedulable &scbl) -> schedulable
Definition: rx-scheduler.hpp:735
rxt::testable_observable< T > make_cold_observable(std::vector< rxn::recorded< std::shared_ptr< rxn::detail::notification_base< T >>>> messages) const
Definition: rx-test.hpp:585
auto schedule_relative(long when, Arg0 &&a0, ArgN &&... an) const -> typename std::enable_if<(detail::is_action_function< Arg0 >::value||is_subscription< Arg0 >::value) &&!is_schedulable< Arg0 >::value >::type
Definition: rx-test.hpp:441
std::vector< T > to_vector(const T(&arr) [size])
Definition: rx-util.hpp:40
auto make_cold_observable(const T(&arr) [size]) const -> decltype(tester->make_cold_observable(std::vector< T >()))
Definition: rx-test.hpp:590
static rxn::subscription subscribe(long subscribe, long unsubscribe)
Definition: rx-test.hpp:401
bool is_enabled() const
Definition: rx-test.hpp:560
static type on_completed()
Definition: rx-notification.hpp:225
rxn::subscription subscription_type
Definition: rx-test.hpp:383
Definition: rx-notification.hpp:116
auto start(F &&createSource) const -> subscriber< T, rxt::testable_observer< T >>
Definition: rx-test.hpp:507
rxn::notification< T > notification_type
Definition: rx-test.hpp:381
auto start(F createSource) const -> typename std::enable_if< detail::is_create_source_function< F >::value, start_traits< F >>::type::subscriber_type
Definition: rx-test.hpp:536
auto start(F createSource, long unsubscribed) const -> typename std::enable_if< detail::is_create_source_function< F >::value, start_traits< F >>::type::subscriber_type
Definition: rx-test.hpp:529
Definition: rx-subscription.hpp:31
auto make_hot_observable(std::initializer_list< T > il) const -> decltype(tester->make_hot_observable(std::vector< T >()))
Definition: rx-test.hpp:579
long clock() const
Definition: rx-test.hpp:561
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
static const long created_time
Definition: rx-test.hpp:374
static type on_error(Exception &&e)
Definition: rx-notification.hpp:230
subscriber< T, rxt::testable_observer< T > > make_subscriber() const
Definition: rx-test.hpp:547
source_type::value_type value_type
Definition: rx-test.hpp:517
Definition: rx-predef.hpp:58
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:359
static type on_next(U value)
Definition: rx-notification.hpp:221
void start() const
Definition: rx-test.hpp:542
subscriber< value_type, rxt::testable_observer< value_type > > subscriber_type
Definition: rx-test.hpp:518
Definition: rx-notification.hpp:14
Definition: rx-test.hpp:406
void sleep(long time) const
Definition: rx-test.hpp:459
void advance_by(long time) const
Definition: rx-test.hpp:454
static recorded_type error(long ticks, Exception &&e)
Definition: rx-test.hpp:397
clock_type::time_point now() const
Definition: rx-test.hpp:552
decltype((*(F *) nullptr)()) typedef source_type
Definition: rx-test.hpp:516
static const long unsubscribed_time
Definition: rx-test.hpp:376
rxt::testable_observable< T > make_hot_observable(std::vector< rxn::recorded< std::shared_ptr< rxn::detail::notification_base< T >>>> messages) const
Definition: rx-test.hpp:568
rxn::recorded< typename notification_type::type > recorded_type
Definition: rx-test.hpp:382
Definition: rx-scheduler.hpp:353
auto start(F createSource, long created, long subscribed, long unsubscribed) const -> subscriber< T, rxt::testable_observer< T >>
Definition: rx-test.hpp:465
auto make_subscriber(subscriber< T, Observer > o) -> subscriber< T, Observer >
Definition: rx-subscriber.hpp:224
virtual void schedule_absolute(typename base::absolute when, const schedulable &a) const
Definition: rx-virtualtime.hpp:206
long clock() const
Definition: rx-test.hpp:421
test_worker create_worker(composite_subscription cs=composite_subscription()) const
Definition: rx-test.hpp:556
clock_type::time_point to_time_point(long absolute) const
Definition: rx-test.hpp:563
identity_one_worker identity_test()
Definition: rx-test.hpp:609
auto start(F &&createSource, long unsubscribed) const -> subscriber< T, rxt::testable_observer< T >>
Definition: rx-test.hpp:500
messages()
Definition: rx-test.hpp:385
void schedule_absolute(long when, const schedulable &a) const
Definition: rx-test.hpp:423
a source of values that records the time of each subscription/unsubscription and all the values and t...
Definition: rx-test.hpp:83
Definition: rx-coordination.hpp:114
Definition: rx-test.hpp:361
auto schedule_absolute(long when, Arg0 &&a0, ArgN &&... an) const -> typename std::enable_if<(detail::is_action_function< Arg0 >::value||is_subscription< Arg0 >::value) &&!is_schedulable< Arg0 >::value >::type
Definition: rx-test.hpp:432
binds an observer that consumes values with a composite_subscription that controls lifetime.
Definition: rx-subscriber.hpp:25
test_worker(composite_subscription cs, std::shared_ptr< detail::test_type::test_type_worker > t)
Definition: rx-test.hpp:414
void schedule_relative(long when, const schedulable &a) const
Definition: rx-test.hpp:427
Definition: rx-test.hpp:379
test make_test()
Definition: rx-test.hpp:603
Definition: rx-scheduler.hpp:426
static const long subscribed_time
Definition: rx-test.hpp:375
Definition: rx-test.hpp:53
bool is_enabled() const
Definition: rx-test.hpp:420
Definition: rx-scheduler.hpp:200