|
UniSet
2.2.1
|
00001 #ifndef UNetReceiver_H_ 00002 #define UNetReceiver_H_ 00003 // ----------------------------------------------------------------------------- 00004 #include <ostream> 00005 #include <memory> 00006 #include <string> 00007 #include <queue> 00008 #include <unordered_map> 00009 #include <cc++/socket.h> 00010 #include <sigc++/sigc++.h> 00011 #include "UniSetObject.h" 00012 #include "Trigger.h" 00013 #include "Mutex.h" 00014 #include "SMInterface.h" 00015 #include "SharedMemory.h" 00016 #include "ThreadCreator.h" 00017 #include "UDPPacket.h" 00018 // ----------------------------------------------------------------------------- 00019 /* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP. 00020 * =============== 00021 * Собственно реализация сделана так: 00022 * В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности 00023 * что были посланы, сделана очередь с приоритетом. В качестве приориета используется номер пакета 00024 * (чем меньше тем старше). При этом обработка ведётся только тех пакетов, которые идут "подряд", 00025 * как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout) 00026 * "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше.. 00027 * Всё это реализовано в функции UNetReceiver::real_update() 00028 * 00029 * КЭШ 00030 * === 00031 * Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов. 00032 * Идея проста: сделан вектор размером с количество принимаемых данных. В векторе хранятся итераторы (и всё что необходимо). 00033 * Порядковый номер данных в пакете является индексом в кэше. 00034 * Для защиты от изменения последовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем, 00035 * ID который пришёл в пакете - элемент кэша обновляется. 00036 * Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется. 00037 * 00038 * КЭШ (ДОПОЛНЕНИЕ) 00039 * === 00040 * Т.к. в общем случае, данные могут быть разбиты не несколько (много) пакетов, то для каждого из них выделен свой кэш и создан отдельный 00041 * map, ключом в котором является идентификатор данных (см. UDPMessage::getDataID()). 00042 * Кэш в map добавляется когда приходит пакет с новым UDPMessage::getDataID() и в дальнейшим используется для этого пакета. 00043 * В текущей реализации размер map не контролируется (завязан на UDPMessage::getDataID()) и расчитан на статичность пакетов, 00044 * т.е. на то что UNetSender не будет с течением времени менять структуру отправляемых пакетов. 00045 * 00046 * Обработка сбоя или переполнения счётчика пакетов(перехода через максимум) 00047 * ========================================================================= 00048 * Для защиты от сбоя счётика сделана следующая логика: 00049 * Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается, 00050 * что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью. 00051 * При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет, 00052 * в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка. 00053 * Если во время "ожидания" опять происходит "разрыв" в номерах пакетов, то временная очередь чиститься 00054 * и данные которые в ней были теряются! Аналог ограниченного буфера (у любых карт), когда новые данные 00055 * затирают старые, если их не успели вынуть и обработать. 00056 * \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди. 00057 * ========================================================================= 00058 * ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся.. не обрабатываем.. 00059 */ 00060 // ----------------------------------------------------------------------------- 00061 class UNetReceiver: 00062 public std::enable_shared_from_this<UNetReceiver> 00063 { 00064 public: 00065 UNetReceiver( const std::string& host, const ost::tpport_t port, const std::shared_ptr<SMInterface>& smi ); 00066 virtual ~UNetReceiver(); 00067 00068 void start(); 00069 void stop(); 00070 00071 void receive(); 00072 void update(); 00073 00074 inline const std::string getName() const 00075 { 00076 return myname; 00077 } 00078 00079 // блокировать сохранение данных в SM 00080 void setLockUpdate( bool st ); 00081 inline bool isLockUpdate() const 00082 { 00083 return lockUpdate; 00084 } 00085 00086 void resetTimeout(); 00087 00088 inline bool isRecvOK() const 00089 { 00090 return !ptRecvTimeout.checkTime(); 00091 } 00092 inline unsigned long getLostPacketsNum() const 00093 { 00094 return lostPackets; 00095 } 00096 00097 void setReceiveTimeout( timeout_t msec ); 00098 void setReceivePause( timeout_t msec ); 00099 void setUpdatePause( timeout_t msec ); 00100 void setLostTimeout( timeout_t msec ); 00101 void setPrepareTime( timeout_t msec ); 00102 void setMaxDifferens( unsigned long set ); 00103 00104 void setRespondID( UniSetTypes::ObjectId id, bool invert = false ); 00105 void setLostPacketsID( UniSetTypes::ObjectId id ); 00106 00107 void setMaxProcessingCount( int set ); 00108 00109 void forceUpdate(); // пересохранить очередной пакет в SM даже если данные не менялись 00110 00111 inline ost::IPV4Address getAddress() const 00112 { 00113 return addr; 00114 } 00115 inline ost::tpport_t getPort() const 00116 { 00117 return port; 00118 } 00119 00121 enum Event 00122 { 00123 evOK, 00124 evTimeout 00125 }; 00126 00127 typedef sigc::slot<void, const std::shared_ptr<UNetReceiver>&, Event> EventSlot; 00128 void connectEvent( EventSlot sl ); 00129 00130 inline std::shared_ptr<DebugStream> getLog() 00131 { 00132 return unetlog; 00133 } 00134 00135 virtual const std::string getShortInfo() const; 00136 00137 protected: 00138 00139 const std::shared_ptr<SMInterface> shm; 00140 std::shared_ptr<DebugStream> unetlog; 00141 00142 bool recv(); 00143 void step(); 00144 void real_update(); 00145 00146 void initIterators(); 00147 00148 public: 00149 00150 // функция определения приоритетного сообщения для обработки 00151 struct PacketCompare: 00152 public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool> 00153 { 00154 inline bool operator()(const UniSetUDP::UDPMessage& lhs, 00155 const UniSetUDP::UDPMessage& rhs) const 00156 { 00157 return lhs.num > rhs.num; 00158 } 00159 }; 00160 typedef std::priority_queue<UniSetUDP::UDPMessage, std::vector<UniSetUDP::UDPMessage>, PacketCompare> PacketQueue; 00161 00162 private: 00163 UNetReceiver(); 00164 00165 timeout_t recvpause = { 10 }; 00166 timeout_t updatepause = { 100 }; 00168 std::shared_ptr<ost::UDPReceive> udp; 00169 ost::IPV4Address addr; 00170 ost::tpport_t port = { 0 }; 00171 std::string myname; 00172 00173 UniSetTypes::uniset_rwmutex pollMutex; 00174 PassiveTimer ptRecvTimeout; 00175 PassiveTimer ptPrepare; 00176 timeout_t recvTimeout = { 5000 }; // msec 00177 timeout_t prepareTime = { 2000 }; 00178 timeout_t lostTimeout = { 200 }; 00179 PassiveTimer ptLostTimeout; 00180 size_t lostPackets = { 0 }; 00182 UniSetTypes::ObjectId sidRespond = { UniSetTypes::DefaultObjectId }; 00183 IOController::IOStateList::iterator itRespond; 00184 bool respondInvert = { false }; 00185 UniSetTypes::ObjectId sidLostPackets; 00186 IOController::IOStateList::iterator itLostPackets; 00187 00188 std::atomic_bool activated = { false }; 00189 00190 std::shared_ptr< ThreadCreator<UNetReceiver> > r_thr; // receive thread 00191 std::shared_ptr< ThreadCreator<UNetReceiver> > u_thr; // update thread 00192 00193 PacketQueue qpack; 00194 UniSetUDP::UDPMessage pack; 00195 UniSetUDP::UDPPacket r_buf; 00196 UniSetTypes::uniset_rwmutex packMutex; 00197 size_t pnum = { 0 }; 00202 size_t maxDifferens = { 20 }; 00203 00204 PacketQueue qtmp; 00205 bool waitClean = { false }; 00206 size_t rnum = { 0 }; 00208 size_t maxProcessingCount = { 100 }; 00210 bool lockUpdate = { false }; 00211 UniSetTypes::uniset_rwmutex lockMutex; 00212 00213 EventSlot slEvent; 00214 Trigger trTimeout; 00215 UniSetTypes::uniset_rwmutex tmMutex; 00216 00217 struct CacheItem 00218 { 00219 long id; 00220 IOController::IOStateList::iterator ioit; 00221 UniversalIO::IOType iotype; 00222 00223 CacheItem(): 00224 id(UniSetTypes::DefaultObjectId), iotype(UniversalIO::UnknownIOType) {} 00225 }; 00226 00227 typedef std::vector<CacheItem> CacheVec; 00228 struct CacheInfo 00229 { 00230 CacheInfo(): 00231 cache_init_ok(false) 00232 { 00233 } 00234 bool cache_init_ok; 00235 CacheVec cache; 00236 }; 00237 // ключом является UDPMessage::getDataID() 00238 typedef std::unordered_map<long, CacheInfo> CacheMap; 00239 CacheMap d_icache_map; 00240 CacheMap a_icache_map; 00242 bool d_cache_init_ok = { false }; 00243 bool a_cache_init_ok = { false }; 00244 00245 void initDCache( UniSetUDP::UDPMessage& pack, bool force = false ); 00246 void initACache( UniSetUDP::UDPMessage& pack, bool force = false ); 00247 }; 00248 // ----------------------------------------------------------------------------- 00249 #endif // UNetReceiver_H_ 00250 // -----------------------------------------------------------------------------
1.7.6.1