5 #if !defined(RXCPP_RX_SCHEDULER_TEST_HPP) 6 #define RXCPP_RX_SCHEDULER_TEST_HPP 8 #include "../rx-includes.hpp" 12 namespace schedulers {
16 class test_type :
public scheduler_interface
22 struct test_type_state :
public virtual_time<long, long>
24 typedef virtual_time<long, long> base;
27 using base::schedule_relative;
29 clock_type::time_point now()
const {
30 return to_time_point(clock_now);
33 virtual void schedule_absolute(
long when,
const schedulable& a)
const 35 if (when <= base::clock_now)
36 when = base::clock_now + 1;
41 virtual long add(
long absolute,
long relative)
const 43 return absolute + relative;
46 virtual clock_type::time_point to_time_point(
long absolute)
const 48 return clock_type::time_point(std::chrono::milliseconds(absolute));
51 virtual long to_relative(clock_type::duration d)
const 53 return static_cast<long>(std::chrono::duration_cast<std::chrono::milliseconds>(d).
count());
58 mutable std::shared_ptr<test_type_state> state;
61 struct test_type_worker :
public worker_interface
63 mutable std::shared_ptr<test_type_state> state;
65 typedef test_type_state::absolute absolute;
66 typedef test_type_state::relative relative;
68 test_type_worker(std::shared_ptr<test_type_state> st)
69 : state(std::move(st))
73 virtual clock_type::time_point now()
const {
77 virtual void schedule(
const schedulable& scbl)
const {
78 state->schedule_absolute(state->clock(), scbl);
81 virtual void schedule(clock_type::time_point when,
const schedulable& scbl)
const {
82 state->schedule_relative(state->to_relative(when - now()), scbl);
85 void schedule_absolute(absolute when,
const schedulable& scbl)
const {
86 state->schedule_absolute(when, scbl);
89 void schedule_relative(relative when,
const schedulable& scbl)
const {
90 state->schedule_relative(when, scbl);
93 bool is_enabled()
const {
return state->is_enabled();}
94 absolute clock()
const {
return state->clock();}
106 void advance_to(absolute time)
const 108 state->advance_to(time);
111 void advance_by(relative time)
const 113 state->advance_by(time);
116 void sleep(relative time)
const 127 : state(std::make_shared<test_type_state>())
131 virtual clock_type::time_point now()
const {
135 virtual worker create_worker(composite_subscription cs)
const {
136 return worker(cs, std::make_shared<test_type_worker>(state));
139 bool is_enabled()
const {
return state->is_enabled();}
141 return state->clock();
144 clock_type::time_point to_time_point(
long absolute)
const {
145 return state->to_time_point(absolute);
148 std::shared_ptr<test_type_worker> create_test_type_worker_interface()
const {
149 return std::make_shared<test_type_worker>(state);
153 rxt::testable_observable<T> make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages)
const;
156 rxt::testable_observable<T> make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages)
const;
161 :
public rxt::detail::test_subject_base<T>
163 typedef typename rxn::notification<T> notification_type;
164 typedef rxn::recorded<typename notification_type::type> recorded_type;
167 explicit mock_observer(std::shared_ptr<test_type::test_type_state> sc)
172 std::shared_ptr<test_type::test_type_state> sc;
173 std::vector<recorded_type> m;
175 virtual void on_subscribe(subscriber<T>)
const {
178 virtual std::vector<rxn::subscription> subscriptions()
const {
182 virtual std::vector<recorded_type> messages()
const {
190 typedef typename rxn::notification<T> notification_type;
191 typedef rxn::recorded<typename notification_type::type> recorded_type;
193 auto ts = std::make_shared<mock_observer<T>>(state);
195 return rxcpp::make_subscriber<T>(rxt::testable_observer<T>(ts, make_observer_dynamic<T>(
200 recorded_type(ts->sc->clock(), notification_type::on_next(value)));
203 [ts](std::exception_ptr e)
206 recorded_type(ts->sc->clock(), notification_type::on_error(e)));
212 recorded_type(ts->sc->clock(), notification_type::on_completed()));
217 class cold_observable
218 :
public rxt::detail::test_subject_base<T>
220 typedef cold_observable<T> this_type;
221 std::shared_ptr<test_type::test_type_state> sc;
222 typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
223 mutable std::vector<recorded_type> mv;
224 mutable std::vector<rxn::subscription> sv;
225 mutable worker controller;
229 cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
236 template<
class Iterator>
237 cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, Iterator begin, Iterator end)
244 virtual void on_subscribe(subscriber<T> o)
const {
245 sv.push_back(rxn::subscription(sc->clock()));
246 auto index = sv.size() - 1;
248 for (
auto& message : mv) {
249 auto n = message.value();
252 [n, o](
const schedulable&) {
253 if (o.is_subscribed()) {
259 auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this());
260 o.add([sharedThis, index]() {
261 sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
265 virtual std::vector<rxn::subscription> subscriptions()
const {
269 virtual std::vector<recorded_type> messages()
const {
275 rxt::testable_observable<T> test_type::make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages)
const 277 auto co = std::make_shared<cold_observable<T>>(state, create_worker(composite_subscription()), std::move(messages));
278 return rxt::testable_observable<T>(co);
283 :
public rxt::detail::test_subject_base<T>
285 typedef hot_observable<T> this_type;
286 std::shared_ptr<test_type::test_type_state> sc;
287 typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
288 typedef subscriber<T> observer_type;
289 mutable std::vector<recorded_type> mv;
290 mutable std::vector<rxn::subscription> sv;
291 mutable std::list<observer_type> observers;
292 mutable worker controller;
296 hot_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
301 for (
auto& message : mv) {
302 auto n = message.value();
305 [
this, n](
const schedulable&) {
306 auto local = this->observers;
307 for (
auto& o : local) {
308 if (o.is_subscribed()) {
316 virtual ~hot_observable() {}
318 virtual void on_subscribe(observer_type o)
const {
319 auto olocation = observers.insert(observers.end(), o);
321 sv.push_back(rxn::subscription(sc->clock()));
322 auto index = sv.size() - 1;
324 auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this());
325 o.add([sharedThis, index, olocation]() {
326 sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
327 sharedThis->observers.erase(olocation);
331 virtual std::vector<rxn::subscription> subscriptions()
const {
335 virtual std::vector<recorded_type> messages()
const {
341 rxt::testable_observable<T> test_type::make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages)
const 343 return rxt::testable_observable<T>(
344 std::make_shared<hot_observable<T>>(state, create_worker(composite_subscription()), std::move(messages)));
348 struct is_create_source_function
352 static auto check(
int) -> decltype((*(CF*)
nullptr)());
354 static not_void check(...);
356 static const bool value = is_observable<decltype(check<rxu::decay_t<F>>(0))>::value;
363 std::shared_ptr<detail::test_type> tester;
366 explicit test(std::shared_ptr<detail::test_type> t)
396 template<
typename Exception>
408 std::shared_ptr<detail::test_type::test_type_worker> tester;
421 long clock()
const {
return tester->clock();}
424 tester->schedule_absolute(when, a);
428 tester->schedule_relative(when, a);
431 template<
class Arg0,
class... ArgN>
433 ->
typename std::enable_if<
434 (detail::is_action_function<Arg0>::value ||
437 tester->schedule_absolute(when,
make_schedulable(*
this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
440 template<
class Arg0,
class... ArgN>
442 ->
typename std::enable_if<
443 (detail::is_action_function<Arg0>::value ||
446 tester->schedule_relative(when,
make_schedulable(*
this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
451 tester->advance_to(time);
456 tester->advance_by(time);
464 template<
class T,
class F>
465 auto start(F createSource,
long created,
long subscribed,
long unsubscribed)
const 469 :
public std::enable_shared_from_this<state_type>
471 typedef decltype(createSource()) source_type;
473 std::unique_ptr<source_type> source;
482 auto state = std::make_shared<state_type>(this->make_subscriber<T>());
485 state->source.reset(
new typename state_type::source_type(createSource()));
488 state->source->subscribe(state->o);
491 state->o.unsubscribe();
499 template<
class T,
class F>
500 auto start(F&& createSource,
long unsubscribed)
const 506 template<
class T,
class F>
522 auto
start(F createSource,
long created,
long subscribed,
long unsubscribed) const
525 return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created, subscribed, unsubscribed);
529 auto start(F createSource,
long unsubscribed)
const 530 ->
typename std::enable_if<detail::is_create_source_function<F>::value,
start_traits<F>>::type::subscriber_type
537 ->
typename std::enable_if<detail::is_create_source_function<F>::value,
start_traits<F>>::type::subscriber_type
548 return tester->make_subscriber<T>();
552 clock_type::time_point
now()
const {
553 return tester->now();
557 return test_worker(cs, tester->create_test_type_worker_interface());
561 long clock()
const {
return tester->clock();}
564 return tester->to_time_point(absolute);
569 return tester->make_hot_observable(std::move(
messages));
572 template<
class T, std::
size_t size>
574 -> decltype(tester->make_hot_observable(std::vector<T>())) {
580 -> decltype(tester->make_hot_observable(std::vector<T>())) {
581 return tester->make_hot_observable(std::vector<T>(il));
586 return tester->make_cold_observable(std::move(
messages));
589 template<
class T, std::
size_t size>
591 -> decltype(tester->make_cold_observable(std::vector<T>())) {
597 -> decltype(tester->make_cold_observable(std::vector<T>())) {
598 return tester->make_cold_observable(std::vector<T>(il));
604 return test(std::make_shared<detail::test_type>());
Definition: rx-notification.hpp:253
Definition: rx-scheduler.hpp:163
static recorded_type completed(long ticks)
Definition: rx-test.hpp:392
auto make_hot_observable(const T(&arr) [size]) const -> decltype(tester->make_hot_observable(std::vector< T >()))
Definition: rx-test.hpp:573
auto make_cold_observable(std::initializer_list< T > il) const -> decltype(tester->make_cold_observable(std::vector< T >()))
Definition: rx-test.hpp:596
detail::test_type::clock_type clock_type
Definition: rx-test.hpp:372
void advance_to(long time) const
Definition: rx-test.hpp:449
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
~test_worker()
Definition: rx-test.hpp:411
test(std::shared_ptr< detail::test_type > t)
Definition: rx-test.hpp:366
static recorded_type next(long ticks, U value)
Definition: rx-test.hpp:388
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
auto make_schedulable(const schedulable &scbl) -> schedulable
Definition: rx-scheduler.hpp:735
rxt::testable_observable< T > make_cold_observable(std::vector< rxn::recorded< std::shared_ptr< rxn::detail::notification_base< T >>>> messages) const
Definition: rx-test.hpp:585
auto schedule_relative(long when, Arg0 &&a0, ArgN &&... an) const -> typename std::enable_if<(detail::is_action_function< Arg0 >::value||is_subscription< Arg0 >::value) &&!is_schedulable< Arg0 >::value >::type
Definition: rx-test.hpp:441
std::vector< T > to_vector(const T(&arr) [size])
Definition: rx-util.hpp:40
auto make_cold_observable(const T(&arr) [size]) const -> decltype(tester->make_cold_observable(std::vector< T >()))
Definition: rx-test.hpp:590
static rxn::subscription subscribe(long subscribe, long unsubscribe)
Definition: rx-test.hpp:401
bool is_enabled() const
Definition: rx-test.hpp:560
static type on_completed()
Definition: rx-notification.hpp:225
rxn::subscription subscription_type
Definition: rx-test.hpp:383
Definition: rx-notification.hpp:116
auto start(F &&createSource) const -> subscriber< T, rxt::testable_observer< T >>
Definition: rx-test.hpp:507
rxn::notification< T > notification_type
Definition: rx-test.hpp:381
auto start(F createSource) const -> typename std::enable_if< detail::is_create_source_function< F >::value, start_traits< F >>::type::subscriber_type
Definition: rx-test.hpp:536
auto start(F createSource, long unsubscribed) const -> typename std::enable_if< detail::is_create_source_function< F >::value, start_traits< F >>::type::subscriber_type
Definition: rx-test.hpp:529
Definition: rx-subscription.hpp:31
auto make_hot_observable(std::initializer_list< T > il) const -> decltype(tester->make_hot_observable(std::vector< T >()))
Definition: rx-test.hpp:579
long clock() const
Definition: rx-test.hpp:561
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
static const long created_time
Definition: rx-test.hpp:374
static type on_error(Exception &&e)
Definition: rx-notification.hpp:230
subscriber< T, rxt::testable_observer< T > > make_subscriber() const
Definition: rx-test.hpp:547
source_type::value_type value_type
Definition: rx-test.hpp:517
Definition: rx-predef.hpp:58
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:359
static type on_next(U value)
Definition: rx-notification.hpp:221
void start() const
Definition: rx-test.hpp:542
Definition: rx-test.hpp:514
subscriber< value_type, rxt::testable_observer< value_type > > subscriber_type
Definition: rx-test.hpp:518
Definition: rx-notification.hpp:14
Definition: rx-test.hpp:406
void sleep(long time) const
Definition: rx-test.hpp:459
void advance_by(long time) const
Definition: rx-test.hpp:454
static recorded_type error(long ticks, Exception &&e)
Definition: rx-test.hpp:397
clock_type::time_point now() const
Definition: rx-test.hpp:552
decltype((*(F *) nullptr)()) typedef source_type
Definition: rx-test.hpp:516
static const long unsubscribed_time
Definition: rx-test.hpp:376
rxt::testable_observable< T > make_hot_observable(std::vector< rxn::recorded< std::shared_ptr< rxn::detail::notification_base< T >>>> messages) const
Definition: rx-test.hpp:568
rxn::recorded< typename notification_type::type > recorded_type
Definition: rx-test.hpp:382
Definition: rx-scheduler.hpp:353
auto start(F createSource, long created, long subscribed, long unsubscribed) const -> subscriber< T, rxt::testable_observer< T >>
Definition: rx-test.hpp:465
auto make_subscriber(subscriber< T, Observer > o) -> subscriber< T, Observer >
Definition: rx-subscriber.hpp:224
virtual void schedule_absolute(typename base::absolute when, const schedulable &a) const
Definition: rx-virtualtime.hpp:206
long clock() const
Definition: rx-test.hpp:421
test_worker create_worker(composite_subscription cs=composite_subscription()) const
Definition: rx-test.hpp:556
clock_type::time_point to_time_point(long absolute) const
Definition: rx-test.hpp:563
identity_one_worker identity_test()
Definition: rx-test.hpp:609
auto start(F &&createSource, long unsubscribed) const -> subscriber< T, rxt::testable_observer< T >>
Definition: rx-test.hpp:500
messages()
Definition: rx-test.hpp:385
void schedule_absolute(long when, const schedulable &a) const
Definition: rx-test.hpp:423
a source of values that records the time of each subscription/unsubscription and all the values and t...
Definition: rx-test.hpp:83
Definition: rx-coordination.hpp:114
Definition: rx-test.hpp:361
auto schedule_absolute(long when, Arg0 &&a0, ArgN &&... an) const -> typename std::enable_if<(detail::is_action_function< Arg0 >::value||is_subscription< Arg0 >::value) &&!is_schedulable< Arg0 >::value >::type
Definition: rx-test.hpp:432
binds an observer that consumes values with a composite_subscription that controls lifetime.
Definition: rx-subscriber.hpp:25
test_worker(composite_subscription cs, std::shared_ptr< detail::test_type::test_type_worker > t)
Definition: rx-test.hpp:414
void schedule_relative(long when, const schedulable &a) const
Definition: rx-test.hpp:427
Definition: rx-test.hpp:379
test make_test()
Definition: rx-test.hpp:603
Definition: rx-scheduler.hpp:426
static const long subscribed_time
Definition: rx-test.hpp:375
Definition: rx-test.hpp:53
bool is_enabled() const
Definition: rx-test.hpp:420
Definition: rx-scheduler.hpp:200