UniSet  2.6.0
UNetSender.h
1 /*
2  * Copyright (c) 2015 Pavel Vainerman.
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU Lesser General Public License as
6  * published by the Free Software Foundation, version 2.1.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11  * Lesser General Lesser Public License for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public License
14  * along with this program. If not, see <http://www.gnu.org/licenses/>.
15  */
16 // -----------------------------------------------------------------------------
17 #ifndef UNetSender_H_
18 #define UNetSender_H_
19 // -----------------------------------------------------------------------------
20 #include <ostream>
21 #include <string>
22 #include <vector>
23 #include <unordered_map>
24 #include "UniSetObject.h"
25 #include "Trigger.h"
26 #include "Mutex.h"
27 #include "SMInterface.h"
28 #include "SharedMemory.h"
29 #include "ThreadCreator.h"
30 #include "UDPCore.h"
31 #include "UDPPacket.h"
32 // --------------------------------------------------------------------------
33 namespace uniset
34 {
35  // -----------------------------------------------------------------------------
36  /*
37  * Распределение датчиков по пакетам
38  * =========================================================================
39  * Все пересылаемые данные разбиваются на группы по частоте посылки("sendfactor").
40  * Частота посылки кратна sendpause, задаётся для каждого датчика, при помощи свойства prefix_sendfactor.
41  * Внутри каждой группы пакеты набираются по мере "заполнения".
42  *
43  * Добавление датчика в пакет и создание нового пакета при переполнении происходит в функции initItem().
44  * Причем так как дискретные и аналоговые датчики обрабатываются отдельно (но пересылаются в одном пакете),
45  * то датчики, которые первые переполнятся приводят к тому, что создаётся новый пакет и они добавляются в него,
46  * в свою очередь остальные продолжают "добивать" предыдущий пакет.
47  * В initItem() каждому UItem в dlist кроме pack_ind присваивается еще и номер пакета pack_num, который гарантировано соответствует
48  * существующему пакету, поэтому в дальнейшем при использовании pack_num в качестве ключа в mypacks мы не проверяем пакет на существование.
49  *
50  * ОПТИМИЗАЦИЯ N1: Для оптимизации обработки посылаемых пакетов (на стороне UNetSender) сделана следующая логика:
51  * Номер очередного посылаемого пакета меняется (увеличивается) только, если изменились данные с момента
52  последней посылки. Для этого по данным каждый раз производится расчёт UNetUDP::makeCRC() и сравнивается с последним.
53  На стороне UNetReceiver пакаеты с повторными номерами (т.е. уже обработанные) - откидываются.
54  *
55  *
56  * Создание соединения
57  * ======================================
58  * Попытка создать соединение производиться сразу в конструкторе, если это не получается,
59  * то в потоке "посылки", с заданным периодом (checkConnectionTime) идёт попытка создать соединение..
60  * и так бесконечно, пока не получиться. Это важно для систем, где в момент загрузки программы
61  * (в момент создания объекта UNetSender) ещё может быть не поднята сеть или какой-то сбой с сетью и требуется
62  * ожидание (без вылета программы) пока "внешняя система мониторинга" не поднимет сеть).
63  * Если такая логика не требуется, то можно задать в конструкторе флаг nocheckconnection=true,
64  * тогда при создании объекта UNetSender, в конструкторе будет
65  * выкинуто исключение при неудачной попытке создания соединения.
66  * \warning setCheckConnectionPause(msec) должно быть кратно sendpause!
67  */
68  class UNetSender
69  {
70  public:
71  UNetSender( const std::string& host, const int port, const std::shared_ptr<SMInterface>& smi, bool nocheckConnection = false,
72  const std::string& s_field = "", const std::string& s_fvalue = "", const std::string& prefix = "unet",
73  size_t maxDCount = UniSetUDP::MaxDCount, size_t maxACount = UniSetUDP::MaxACount );
74 
75  virtual ~UNetSender();
76 
77  typedef size_t sendfactor_t;
78 
79  struct UItem
80  {
81  UItem():
82  iotype(UniversalIO::UnknownIOType),
84  pack_num(0),
85  pack_ind(0),
86  pack_sendfactor(0) {}
87 
88  UniversalIO::IOType iotype;
90  IOController::IOStateList::iterator ioit;
91  size_t pack_num;
92  size_t pack_ind;
93  sendfactor_t pack_sendfactor = { 0 };
94  friend std::ostream& operator<<( std::ostream& os, UItem& p );
95  };
96 
97  typedef std::unordered_map<uniset::ObjectId, UItem> UItemMap;
98 
99  size_t getDataPackCount() const;
100 
101  void start();
102  void stop();
103 
104  void send() noexcept;
105 
106  struct PackMessage
107  {
108  PackMessage( UniSetUDP::UDPMessage&& m ) noexcept: msg(std::move(m)) {}
109  PackMessage( const UniSetUDP::UDPMessage& m ) = delete;
110 
111  PackMessage() noexcept {}
112 
115  };
116 
117  void real_send( PackMessage& mypack ) noexcept;
118 
120  void updateFromSM();
121 
123  void updateSensor( uniset::ObjectId id, long value );
124 
126  void updateItem( UItem& it, long value );
127 
128  inline void setSendPause( int msec )
129  {
130  sendpause = msec;
131  }
132  inline void setPackSendPause( int msec )
133  {
134  packsendpause = msec;
135  }
136 
137  void setCheckConnectionPause( int msec );
138 
140  void askSensors( UniversalIO::UIOCommand cmd );
141 
143  void initIterators();
144 
145  inline std::shared_ptr<DebugStream> getLog()
146  {
147  return unetlog;
148  }
149 
150  virtual const std::string getShortInfo() const;
151 
152  inline std::string getAddress() const
153  {
154  return addr;
155  }
156  inline int getPort() const
157  {
158  return port;
159  }
160 
161  inline size_t getADataSize() const
162  {
163  return maxAData;
164  }
165  inline size_t getDDataSize() const
166  {
167  return maxDData;
168  }
169 
170  protected:
171 
172  std::string s_field = { "" };
173  std::string s_fvalue = { "" };
174  std::string prefix = { "" };
175 
176  const std::shared_ptr<SMInterface> shm;
177  std::shared_ptr<DebugStream> unetlog;
178 
179  bool initItem( UniXML::iterator& it );
180  bool readItem( const std::shared_ptr<UniXML>& xml, UniXML::iterator& it, xmlNode* sec );
181 
182  void readConfiguration();
183 
184  bool createConnection( bool throwEx );
185 
186  private:
187  UNetSender();
188 
189  std::shared_ptr<UDPSocketU> udp = { nullptr };
190  std::string addr;
191  int port = { 0 };
192  std::string s_host = { "" };
193  Poco::Net::SocketAddress saddr;
194 
195  std::string myname = { "" };
196  timeout_t sendpause = { 150 };
197  timeout_t packsendpause = { 5 };
198  timeout_t writeTimeout = { 1000 }; // msec
199  std::atomic_bool activated = { false };
200  PassiveTimer ptCheckConnection;
201 
202  typedef std::unordered_map<sendfactor_t, std::vector<PackMessage>> Packs;
203 
204  // mypacks заполняется в начале и дальше с ним происходит только чтение
205  // поэтому mutex-ом его не защищаем
206  Packs mypacks;
207  std::unordered_map<sendfactor_t, size_t> packs_anum;
208  std::unordered_map<sendfactor_t, size_t> packs_dnum;
209  UItemMap items;
210  size_t packetnum = { 1 };
211  uint16_t lastcrc = { 0 };
212  UniSetUDP::UDPPacket s_msg;
213 
214  size_t maxAData = { UniSetUDP::MaxACount };
215  size_t maxDData = { UniSetUDP::MaxDCount };
216 
217  std::shared_ptr< ThreadCreator<UNetSender> > s_thr; // send thread
218 
219  size_t ncycle = { 0 };
221  };
222  // --------------------------------------------------------------------------
223 } // end of namespace uniset
224 // -----------------------------------------------------------------------------
225 #endif // UNetSender_H_
226 // -----------------------------------------------------------------------------