UniSet  2.24.2
UNetReceiver.h
1 /*
2  * Copyright (c) 2015 Pavel Vainerman.
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU Lesser General Public License as
6  * published by the Free Software Foundation, version 2.1.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11  * Lesser General Lesser Public License for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public License
14  * along with this program. If not, see <http://www.gnu.org/licenses/>.
15  */
16 // -----------------------------------------------------------------------------
17 #ifndef UNetReceiver_H_
18 #define UNetReceiver_H_
19 // -----------------------------------------------------------------------------
20 #include <ostream>
21 #include <memory>
22 #include <string>
23 #include <vector>
24 #include <unordered_map>
25 #include <sigc++/sigc++.h>
26 #include <ev++.h>
27 #include "UniSetObject.h"
28 #include "Trigger.h"
29 #include "Mutex.h"
30 #include "SMInterface.h"
31 #include "SharedMemory.h"
32 #include "UDPPacket.h"
33 #include "CommonEventLoop.h"
34 #include "UNetTransport.h"
35 // --------------------------------------------------------------------------
36 namespace uniset
37 {
38  // -----------------------------------------------------------------------------
39  /* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
40  * ===============
41  * В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
42  * что были посланы, сделан циклический буфер (буфер сразу выделяет память при старте).
43  * Т.к. номер получаемых пакетов монотонно растёт на основе него вычисляется индекс
44  * куда поместить пакет в буфере. Есть два индекса
45  * rnum - (read number) номер последнего обработанного пакета + 1
46  * wnum - (write number) номер следующего ожидаемого пакета (номер последнего принятого + 1)
47  * WARNING: Если придёт два пакета с одинаковым номером, то новый пакет перезатрёт прошлый в буфере
48  *
49  * При этом обработка ведётся по порядку (только пакеты идущие подряд)
50  * как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout)
51  * "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается с нового места.
52  * Т.к. используется libev и нет многопоточной работы, события обрабатываются последовательно.
53  * Раз в updatetime msec происходит обновление данных в SM, все накопившиеся пакеты обрабатываются
54  * либо пока не встретиться "дырка", либо пока rnum не догонит wnum.
55  *
56  * КЭШ
57  * ===
58  * Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов.
59  * Идея проста: сделан вектор размером с количеством принимаемых данных. В векторе хранятся итераторы (и всё что необходимо).
60  * Порядковый номер данных в пакете является индексом в кэше.
61  * Для защиты от изменения последовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
62  * ID который пришёл в пакете - элемент кэша обновляется.
63  * Если количество пришедших данных не совпадают с размером кэша - кэш обновляется.
64  *
65  * КЭШ (ДОПОЛНЕНИЕ)
66  * ===
67  * Т.к. в общем случае, данные могут быть разбиты на несколько (много) пакетов, то для каждого из них выделен свой кэш и создан отдельный
68  * map, ключом в котором является идентификатор данных (см. UDPMessage::getDataID()).
69  * Кэш в map добавляется тогда, когда приходит пакет с новым UDPMessage::getDataID() и в дальнейшим он используется для этого пакета.
70  * В текущей реализации размер map не контролируется (завязан на UDPMessage::getDataID()) и рассчитан на статичность пакетов,
71  * т.е. на то что UNetSender не будет с течением времени менять количество отправляемых пакетов
72  * (работать будет, просто в map останутся лежать записи для неиспользуемых пакетов)
73  *
74  * ОПТИМИЗАЦИЯ
75  * ===
76  * В кэше так же хранится crc последних принятых данных. Если crc совпадает с тем, что пришло в пакете, то обработки не происходит.
77  * crc хранится отдельно для дискретных и отдельно для аналоговых датчиков.
78  * Эту оптимизацию можно отключить параметром --prefix-recv-ignore-crc или recvIgnoreCRC="1" в конф. файле.
79  *
80  * Обработка сбоев в номере пакетов
81  * =========================================================================
82  * Если в какой-то момент расстояние между rnum и wnum превышает maxDifferens пакетов
83  * то считается, что произошёл сбой или узел который посылал пакеты - перезагрузился
84  * Идёт попытка обработать все текущие пакеты (до первой дырки), а дальше происходит
85  * реинициализация и обработка продолжается с нового номера.
86  *
87  * =========================================================================
88  * ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся, пакет не обрабатываем.
89  *
90  * Создание соединения (открытие сокета)
91  * ======================================
92  * Попытка создать сокет производиться сразу в конструкторе, если это не получается,
93  * то создаётся таймер (evCheckConnection), который периодически (checkConnectionTime) пытается вновь
94  * открыть сокет.. и так бесконечно, пока не получиться. Это важно для систем, где в момент загрузки программы
95  * (в момент создания объекта UNetReceiver) ещё может быть не поднята сеть или какой-то сбой с сетью и требуется
96  * ожидание (без вылета программы) пока "внешняя система мониторинга" не поднимет сеть).
97  * Если такая логика не требуется, то можно задать в конструкторе
98  * последним аргументом флаг nocheckconnection=true, тогда при создании объекта UNetReceiver, в конструкторе будет
99  * выкинуто исключение при неудачной попытке создания соединения.
100  */
101  // -----------------------------------------------------------------------------
102  class UNetReceiver final:
103  protected EvWatcher,
104  public std::enable_shared_from_this<UNetReceiver>
105  {
106  public:
107  UNetReceiver( std::unique_ptr<UNetReceiveTransport>&& transport, const std::shared_ptr<SMInterface>& smi
108  , bool nocheckConnection = false
109  , const std::string& prefix = "unet" );
110  virtual ~UNetReceiver();
111 
112  void start();
113  void stop();
114 
115  inline std::string getName() const noexcept
116  {
117  return myname;
118  }
119 
120  // блокировать сохранение данных в SM
121  void setLockUpdate( bool st ) noexcept;
122  bool isLockUpdate() const noexcept;
123 
124  void resetTimeout() noexcept;
125 
126  bool isInitOK() const noexcept;
127  bool isRecvOK() const noexcept;
128  size_t getLostPacketsNum() const noexcept;
129 
130  void setReceiveTimeout( timeout_t msec ) noexcept;
131  void setUpdatePause( timeout_t msec ) noexcept;
132  void setLostTimeout( timeout_t msec ) noexcept;
133  void setPrepareTime( timeout_t msec ) noexcept;
134  void setCheckConnectionPause( timeout_t msec ) noexcept;
135  void setMaxDifferens( unsigned long set ) noexcept;
136  void setEvrunTimeout(timeout_t msec ) noexcept;
137  void setInitPause( timeout_t msec ) noexcept;
138  void setBufferSize( size_t sz ) noexcept;
139  void setMaxReceiveAtTime( size_t sz ) noexcept;
140  void setIgnoreCRC( bool set ) noexcept;
141 
142  void setRespondID( uniset::ObjectId id, bool invert = false ) noexcept;
143  void setLostPacketsID( uniset::ObjectId id ) noexcept;
144 
145  void forceUpdate() noexcept; // пересохранить очередной пакет в SM даже если данные не менялись
146 
147  inline std::string getTransportID() const noexcept
148  {
149  return transport->ID();
150  }
151 
153  enum Event
154  {
156  evTimeout
157  };
158 
159  typedef sigc::slot<void, const std::shared_ptr<UNetReceiver>&, Event> EventSlot;
160  void connectEvent( EventSlot sl ) noexcept;
161 
162  // --------------------------------------------------------------------
163  inline std::shared_ptr<DebugStream> getLog() noexcept
164  {
165  return unetlog;
166  }
167 
168  std::string getShortInfo() const noexcept;
169 
170  protected:
171 
172  const std::shared_ptr<SMInterface> shm;
173  std::shared_ptr<DebugStream> unetlog;
174 
175  enum ReceiveRetCode
176  {
177  retOK = 0,
178  retError = 1,
179  retNoData = 2
180  };
181 
182  ReceiveRetCode receive() noexcept;
183  void update() noexcept;
184  void callback( ev::io& watcher, int revents ) noexcept;
185  void readEvent( ev::io& watcher ) noexcept;
186  void updateEvent( ev::periodic& watcher, int revents ) noexcept;
187  void checkConnectionEvent( ev::periodic& watcher, int revents ) noexcept;
188  void statisticsEvent( ev::periodic& watcher, int revents ) noexcept;
189  void onForceUpdate( ev::async& watcher, int revents ) noexcept;
190  void initEvent( ev::timer& watcher, int revents ) noexcept;
191  virtual void evprepare( const ev::loop_ref& eloop ) noexcept override;
192  virtual void evfinish(const ev::loop_ref& eloop ) noexcept override;
193  virtual std::string wname() const noexcept override
194  {
195  return myname;
196  }
197 
198  void initIterators() noexcept;
199  bool createConnection( bool throwEx = false );
200  bool checkConnection();
201  size_t rnext( size_t num );
202 
203  private:
204  UNetReceiver() {}
205 
206  timeout_t updatepause = { 100 };
208  std::unique_ptr<UNetReceiveTransport> transport;
209  std::string addr;
210  std::string myname;
211  ev::io evReceive;
212  ev::periodic evCheckConnection;
213  ev::periodic evStatistic;
214  ev::periodic evUpdate;
215  ev::timer evInitPause;
216  ev::async evForceUpdate;
217 
218  // счётчики для подсчёта статистики
219  size_t recvCount = { 0 };
220  size_t upCount = { 0 };
221  std::chrono::steady_clock::time_point t_start;
222  std::chrono::steady_clock::time_point t_end;
223  std::chrono::steady_clock::time_point t_stats;
224 
225  // текущая статистика
226  struct Stats
227  {
228  float recvPerSec = {0};
229  float upPerSec = {0};
230  size_t upProcessingTime_microsec = {0};
231  size_t recvProcessingTime_microsec = {0};
232  };
233 
234  Stats stats;
235 
236  // делаем loop общим.. одним на всех!
237  static CommonEventLoop loop;
238 
239  double checkConnectionTime = { 10.0 }; // sec
240  std::mutex checkConnMutex;
241 
242  PassiveTimer ptRecvTimeout;
243  PassiveTimer ptPrepare;
244  timeout_t recvTimeout = { 5000 }; // msec
245  timeout_t prepareTime = { 2000 };
246  timeout_t evrunTimeout = { 15000 };
247  timeout_t lostTimeout = { 200 };
248  size_t maxReceiveCount = { 5 }; // количество читаемых за один раз
249 
250  double initPause = { 5.0 }; // пауза на начальную инициализацию (сек)
251  std::atomic_bool initOK = { false };
252 
253  PassiveTimer ptLostTimeout;
254  size_t lostPackets = { 0 };
256  uniset::ObjectId sidRespond = { uniset::DefaultObjectId };
257  IOController::IOStateList::iterator itRespond;
258  bool respondInvert = { false };
259  uniset::ObjectId sidLostPackets = { uniset::DefaultObjectId };
260  IOController::IOStateList::iterator itLostPackets;
261 
262  std::atomic_bool activated = { false };
263 
264  size_t cbufSize = { 100 };
265  std::vector<UniSetUDP::UDPMessage> cbuf; // circular buffer
266  size_t wnum = { 1 };
267  size_t rnum = { 0 };
268  UniSetUDP::UDPMessage* pack; // текущий обрабатываемый пакет
269 
273  size_t maxDifferens = { 20 };
274 
275  std::atomic_bool lockUpdate = { false };
277  EventSlot slEvent;
278  Trigger trTimeout;
279  std::mutex tmMutex;
280 
281  struct CacheItem
282  {
283  long id = { uniset::DefaultObjectId };
284  IOController::IOStateList::iterator ioit;
285 
286  CacheItem():
287  id(uniset::DefaultObjectId) {}
288  };
289  typedef std::vector<CacheItem> CacheVec;
290 
291  struct CacheInfo
292  {
293  uint16_t crc;
294  CacheVec items;
295 
296  CacheInfo(): crc(0) {}
297  };
298 
299  // ключом является UDPMessage::getDataID()
300  typedef std::unordered_map<long, CacheInfo> CacheMap;
301  CacheMap d_icache_map;
302  CacheMap a_icache_map;
303  size_t cacheMissed; // количество промахов
304  bool ignoreCRC = { false };
306  CacheInfo* getDCache( UniSetUDP::UDPMessage* upack ) noexcept;
307  CacheInfo* getACache( UniSetUDP::UDPMessage* pack ) noexcept;
308  };
309  // --------------------------------------------------------------------------
310 } // end of namespace uniset
311 // -----------------------------------------------------------------------------
312 #endif // UNetReceiver_H_
313 // -----------------------------------------------------------------------------
Definition: DebugStream.h:62
Definition: CommonEventLoop.h:19
Definition: UNetReceiver.h:105
Event
Definition: UNetReceiver.h:154
@ evTimeout
Definition: UNetReceiver.h:156
@ evOK
Definition: UNetReceiver.h:155
Definition: CommonEventLoop.h:15
const ObjectId DefaultObjectId
Definition: UniSetTypes.h:70
long ObjectId
Definition: UniSetTypes_i.idl:30