RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-window_toggle.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
28 #if !defined(RXCPP_OPERATORS_RX_WINDOW_TOGGLE_HPP)
29 #define RXCPP_OPERATORS_RX_WINDOW_TOGGLE_HPP
30 
31 #include "../rx-includes.hpp"
32 
33 namespace rxcpp {
34 
35 namespace operators {
36 
37 namespace detail {
38 
39 template<class... AN>
40 struct window_toggle_invalid_arguments {};
41 
42 template<class... AN>
43 struct window_toggle_invalid : public rxo::operator_base<window_toggle_invalid_arguments<AN...>> {
44  using type = observable<window_toggle_invalid_arguments<AN...>, window_toggle_invalid<AN...>>;
45 };
46 template<class... AN>
47 using window_toggle_invalid_t = typename window_toggle_invalid<AN...>::type;
48 
49 template<class T, class Openings, class ClosingSelector, class Coordination>
50 struct window_toggle
51 {
52  typedef window_toggle<T, Openings, ClosingSelector, Coordination> this_type;
53 
54  using source_value_type = rxu::decay_t<T>;
55  using coordination_type = rxu::decay_t<Coordination>;
56  using coordinator_type = typename coordination_type::coordinator_type;
57  using openings_type = rxu::decay_t<Openings>;
58  using openings_value_type = typename openings_type::value_type;
59  using closing_selector_type = rxu::decay_t<ClosingSelector>;
60  using closings_type = rxu::result_of_t<closing_selector_type(openings_value_type)>;
61  using closings_value_type = typename closings_type::value_type;
62 
63  struct window_toggle_values
64  {
65  window_toggle_values(openings_type opens, closing_selector_type closes, coordination_type c)
66  : openings(opens)
67  , closingSelector(closes)
68  , coordination(c)
69  {
70  }
71  openings_type openings;
72  mutable closing_selector_type closingSelector;
73  coordination_type coordination;
74  };
75  window_toggle_values initial;
76 
77  window_toggle(openings_type opens, closing_selector_type closes, coordination_type coordination)
78  : initial(opens, closes, coordination)
79  {
80  }
81 
82  template<class Subscriber>
83  struct window_toggle_observer
84  {
85  typedef window_toggle_observer<Subscriber> this_type;
86  typedef rxu::decay_t<T> value_type;
87  typedef rxu::decay_t<Subscriber> dest_type;
88  typedef observer<T, this_type> observer_type;
89 
90  struct window_toggle_subscriber_values : public window_toggle_values
91  {
92  window_toggle_subscriber_values(composite_subscription cs, dest_type d, window_toggle_values v, coordinator_type c)
93  : window_toggle_values(v)
94  , cs(std::move(cs))
95  , dest(std::move(d))
96  , coordinator(std::move(c))
97  , worker(coordinator.get_worker())
98  {
99  }
100  composite_subscription cs;
101  dest_type dest;
102  coordinator_type coordinator;
103  rxsc::worker worker;
104  mutable std::list<rxcpp::subjects::subject<T>> subj;
105  };
106  std::shared_ptr<window_toggle_subscriber_values> state;
107 
108  window_toggle_observer(composite_subscription cs, dest_type d, window_toggle_values v, coordinator_type c)
109  : state(std::make_shared<window_toggle_subscriber_values>(window_toggle_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
110  {
111  auto localState = state;
112 
113  composite_subscription innercs;
114 
115  // when the out observer is unsubscribed all the
116  // inner subscriptions are unsubscribed as well
117  auto innerscope = localState->dest.add(innercs);
118 
119  innercs.add([=](){
120  localState->dest.remove(innerscope);
121  });
122 
123  auto source = on_exception(
124  [&](){return localState->coordinator.in(localState->openings);},
125  localState->dest);
126  if (source.empty()) {
127  return;
128  }
129 
130  // this subscribe does not share the observer subscription
131  // so that when it is unsubscribed the observer can be called
132  // until the inner subscriptions have finished
133  auto sink = make_subscriber<openings_value_type>(
134  localState->dest,
135  innercs,
136  // on_next
137  [localState](const openings_value_type& ov) {
138  auto closer = localState->closingSelector(ov);
139 
140  auto it = localState->subj.insert(localState->subj.end(), rxcpp::subjects::subject<T>());
141  localState->dest.on_next(it->get_observable().as_dynamic());
142 
143  composite_subscription innercs;
144 
145  // when the out observer is unsubscribed all the
146  // inner subscriptions are unsubscribed as well
147  auto innerscope = localState->dest.add(innercs);
148 
149  innercs.add([=](){
150  localState->dest.remove(innerscope);
151  });
152 
153  auto source = localState->coordinator.in(closer);
154 
155  auto sit = std::make_shared<decltype(it)>(it);
156  auto close = [localState, sit]() {
157  auto it = *sit;
158  *sit = localState->subj.end();
159  if (it != localState->subj.end()) {
160  it->get_subscriber().on_completed();
161  localState->subj.erase(it);
162  }
163  };
164 
165  // this subscribe does not share the observer subscription
166  // so that when it is unsubscribed the observer can be called
167  // until the inner subscriptions have finished
168  auto sink = make_subscriber<closings_value_type>(
169  localState->dest,
170  innercs,
171  // on_next
172  [close, innercs](closings_value_type) {
173  close();
174  innercs.unsubscribe();
175  },
176  // on_error
177  [localState](std::exception_ptr e) {
178  localState->dest.on_error(e);
179  },
180  // on_completed
181  close
182  );
183  auto selectedSink = localState->coordinator.out(sink);
184  source.subscribe(std::move(selectedSink));
185  },
186  // on_error
187  [localState](std::exception_ptr e) {
188  localState->dest.on_error(e);
189  },
190  // on_completed
191  []() {
192  }
193  );
194  auto selectedSink = on_exception(
195  [&](){return localState->coordinator.out(sink);},
196  localState->dest);
197  if (selectedSink.empty()) {
198  return;
199  }
200  source->subscribe(std::move(selectedSink.get()));
201  }
202 
203  void on_next(T v) const {
204  auto localState = state;
205  auto work = [v, localState](const rxsc::schedulable&){
206  for (auto s : localState->subj) {
207  s.get_subscriber().on_next(v);
208  }
209  };
210  auto selectedWork = on_exception(
211  [&](){return localState->coordinator.act(work);},
212  localState->dest);
213  if (selectedWork.empty()) {
214  return;
215  }
216  localState->worker.schedule(selectedWork.get());
217  }
218 
219  void on_error(std::exception_ptr e) const {
220  auto localState = state;
221  auto work = [e, localState](const rxsc::schedulable&){
222  for (auto s : localState->subj) {
223  s.get_subscriber().on_error(e);
224  }
225  localState->dest.on_error(e);
226  };
227  auto selectedWork = on_exception(
228  [&](){return localState->coordinator.act(work);},
229  localState->dest);
230  if (selectedWork.empty()) {
231  return;
232  }
233  localState->worker.schedule(selectedWork.get());
234  }
235 
236  void on_completed() const {
237  auto localState = state;
238  auto work = [localState](const rxsc::schedulable&){
239  for (auto s : localState->subj) {
240  s.get_subscriber().on_completed();
241  }
242  localState->dest.on_completed();
243  };
244  auto selectedWork = on_exception(
245  [&](){return localState->coordinator.act(work);},
246  localState->dest);
247  if (selectedWork.empty()) {
248  return;
249  }
250  localState->worker.schedule(selectedWork.get());
251  }
252 
253  static subscriber<T, observer_type> make(dest_type d, window_toggle_values v) {
254  auto cs = composite_subscription();
255  auto coordinator = v.coordination.create_coordinator();
256 
257  return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
258  }
259  };
260 
261  template<class Subscriber>
262  auto operator()(Subscriber dest) const
263  -> decltype(window_toggle_observer<Subscriber>::make(std::move(dest), initial)) {
264  return window_toggle_observer<Subscriber>::make(std::move(dest), initial);
265  }
266 };
267 
268 }
269 
272 template<class... AN>
273 auto window_toggle(AN&&... an)
275  return operator_factory<window_toggle_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
276 }
277 
278 }
279 
280 template<>
282 {
283  template<class Observable, class Openings, class ClosingSelector,
284  class ClosingSelectorType = rxu::decay_t<ClosingSelector>,
285  class OpeningsType = rxu::decay_t<Openings>,
286  class OpeningsValueType = typename OpeningsType::value_type,
287  class Enabled = rxu::enable_if_all_true_type_t<
288  all_observables<Observable, Openings, rxu::result_of_t<ClosingSelectorType(OpeningsValueType)>>>,
289  class SourceValue = rxu::value_type_t<Observable>,
290  class WindowToggle = rxo::detail::window_toggle<SourceValue, rxu::decay_t<Openings>, rxu::decay_t<ClosingSelector>, identity_one_worker>,
291  class Value = observable<SourceValue>>
292  static auto member(Observable&& o, Openings&& openings, ClosingSelector&& closingSelector)
293  -> decltype(o.template lift<Value>(WindowToggle(std::forward<Openings>(openings), std::forward<ClosingSelector>(closingSelector), identity_immediate()))) {
294  return o.template lift<Value>(WindowToggle(std::forward<Openings>(openings), std::forward<ClosingSelector>(closingSelector), identity_immediate()));
295  }
296 
297  template<class Observable, class Openings, class ClosingSelector, class Coordination,
298  class ClosingSelectorType = rxu::decay_t<ClosingSelector>,
299  class OpeningsType = rxu::decay_t<Openings>,
300  class OpeningsValueType = typename OpeningsType::value_type,
301  class Enabled = rxu::enable_if_all_true_type_t<
302  all_observables<Observable, Openings, rxu::result_of_t<ClosingSelectorType(OpeningsValueType)>>,
304  class SourceValue = rxu::value_type_t<Observable>,
305  class WindowToggle = rxo::detail::window_toggle<SourceValue, rxu::decay_t<Openings>, rxu::decay_t<ClosingSelector>, rxu::decay_t<Coordination>>,
306  class Value = observable<SourceValue>>
307  static auto member(Observable&& o, Openings&& openings, ClosingSelector&& closingSelector, Coordination&& cn)
308  -> decltype(o.template lift<Value>(WindowToggle(std::forward<Openings>(openings), std::forward<ClosingSelector>(closingSelector), std::forward<Coordination>(cn)))) {
309  return o.template lift<Value>(WindowToggle(std::forward<Openings>(openings), std::forward<ClosingSelector>(closingSelector), std::forward<Coordination>(cn)));
310  }
311 
312  template<class... AN>
313  static operators::detail::window_toggle_invalid_t<AN...> member(AN...) {
314  std::terminate();
315  return {};
316  static_assert(sizeof...(AN) == 10000, "window_toggle takes (Openings, ClosingSelector, optional Coordination)");
317  }
318 };
319 
320 }
321 
322 #endif
Definition: rx-util.hpp:90
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static auto member(Observable &&o, Openings &&openings, ClosingSelector &&closingSelector) -> decltype(o.template lift< Value >(WindowToggle(std::forward< Openings >(openings), std::forward< ClosingSelector >(closingSelector), identity_immediate())))
Definition: rx-window_toggle.hpp:292
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:104
a source of values. subscribe or use one of the operator methods that return a new observable,...
Definition: rx-observable.hpp:510
identity_one_worker identity_immediate()
Definition: rx-coordination.hpp:170
static auto member(Observable &&o, Openings &&openings, ClosingSelector &&closingSelector, Coordination &&cn) -> decltype(o.template lift< Value >(WindowToggle(std::forward< Openings >(openings), std::forward< ClosingSelector >(closingSelector), std::forward< Coordination >(cn))))
Definition: rx-window_toggle.hpp:307
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:639
Definition: rx-operators.hpp:494
Definition: rx-coordination.hpp:114
auto window_toggle(AN &&... an) -> operator_factory< window_toggle_tag, AN... >
Return an observable that emits observables every period time interval and collects items from this o...
Definition: rx-window_toggle.hpp:273
typename std::result_of< TN... >::type result_of_t
Definition: rx-util.hpp:37
static operators::detail::window_toggle_invalid_t< AN... > member(AN...)
Definition: rx-window_toggle.hpp:313
Definition: rx-coordination.hpp:37