UniSet  2.2.1
UNetReceiver.h
00001 #ifndef UNetReceiver_H_
00002 #define UNetReceiver_H_
00003 // -----------------------------------------------------------------------------
00004 #include <ostream>
00005 #include <memory>
00006 #include <string>
00007 #include <queue>
00008 #include <unordered_map>
00009 #include <cc++/socket.h>
00010 #include <sigc++/sigc++.h>
00011 #include "UniSetObject.h"
00012 #include "Trigger.h"
00013 #include "Mutex.h"
00014 #include "SMInterface.h"
00015 #include "SharedMemory.h"
00016 #include "ThreadCreator.h"
00017 #include "UDPPacket.h"
00018 // -----------------------------------------------------------------------------
00019 /*  Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
00020  * ===============
00021  * Собственно реализация сделана так:
00022  * В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
00023  * что были посланы, сделана очередь с приоритетом. В качестве приориета используется номер пакета
00024  * (чем меньше тем старше). При этом обработка ведётся только тех пакетов, которые идут "подряд",
00025  * как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout)
00026  * "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше..
00027  * Всё это реализовано в функции UNetReceiver::real_update()
00028  *
00029  * КЭШ
00030  * ===
00031  * Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов.
00032  * Идея проста: сделан вектор размером с количество принимаемых данных. В векторе хранятся итераторы (и всё что необходимо).
00033  * Порядковый номер данных в пакете является индексом в кэше.
00034  * Для защиты от изменения последовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
00035  * ID который пришёл в пакете - элемент кэша обновляется.
00036  * Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется.
00037  *
00038  * КЭШ (ДОПОЛНЕНИЕ)
00039  * ===
00040  * Т.к. в общем случае, данные могут быть разбиты не несколько (много) пакетов, то для каждого из них выделен свой кэш и создан отдельный
00041  * map, ключом в котором является идентификатор данных (см. UDPMessage::getDataID()).
00042  * Кэш в map добавляется когда приходит пакет с новым UDPMessage::getDataID() и в дальнейшим используется для этого пакета.
00043  * В текущей реализации размер map не контролируется (завязан на UDPMessage::getDataID()) и расчитан на статичность пакетов,
00044  * т.е. на то что UNetSender не будет с течением времени менять структуру отправляемых пакетов.
00045  *
00046  * Обработка сбоя или переполнения счётчика пакетов(перехода через максимум)
00047  * =========================================================================
00048  * Для защиты от сбоя счётика сделана следующая логика:
00049  * Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается,
00050  * что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью.
00051  * При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет,
00052  * в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка.
00053  * Если во время "ожидания" опять происходит "разрыв" в номерах пакетов, то временная очередь чиститься
00054  * и данные которые в ней были теряются! Аналог ограниченного буфера (у любых карт), когда новые данные
00055  * затирают старые, если их не успели вынуть и обработать.
00056  * \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди.
00057  * =========================================================================
00058  * ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся.. не обрабатываем..
00059 */
00060 // -----------------------------------------------------------------------------
00061 class UNetReceiver:
00062     public std::enable_shared_from_this<UNetReceiver>
00063 {
00064     public:
00065         UNetReceiver( const std::string& host, const ost::tpport_t port, const std::shared_ptr<SMInterface>& smi );
00066         virtual ~UNetReceiver();
00067 
00068         void start();
00069         void stop();
00070 
00071         void receive();
00072         void update();
00073 
00074         inline const std::string getName() const
00075         {
00076             return myname;
00077         }
00078 
00079         // блокировать сохранение данных в SM
00080         void setLockUpdate( bool st );
00081         inline bool isLockUpdate() const
00082         {
00083             return lockUpdate;
00084         }
00085 
00086         void resetTimeout();
00087 
00088         inline bool isRecvOK() const
00089         {
00090             return !ptRecvTimeout.checkTime();
00091         }
00092         inline unsigned long getLostPacketsNum() const
00093         {
00094             return lostPackets;
00095         }
00096 
00097         void setReceiveTimeout( timeout_t msec );
00098         void setReceivePause( timeout_t msec );
00099         void setUpdatePause( timeout_t msec );
00100         void setLostTimeout( timeout_t msec );
00101         void setPrepareTime( timeout_t msec );
00102         void setMaxDifferens( unsigned long set );
00103 
00104         void setRespondID( UniSetTypes::ObjectId id, bool invert = false );
00105         void setLostPacketsID( UniSetTypes::ObjectId id );
00106 
00107         void setMaxProcessingCount( int set );
00108 
00109         void forceUpdate(); // пересохранить очередной пакет в SM даже если данные не менялись
00110 
00111         inline ost::IPV4Address getAddress() const
00112         {
00113             return addr;
00114         }
00115         inline ost::tpport_t getPort() const
00116         {
00117             return port;
00118         }
00119 
00121         enum Event
00122         {
00123             evOK,        
00124             evTimeout    
00125         };
00126 
00127         typedef sigc::slot<void, const std::shared_ptr<UNetReceiver>&, Event> EventSlot;
00128         void connectEvent( EventSlot sl );
00129 
00130         inline std::shared_ptr<DebugStream> getLog()
00131         {
00132             return unetlog;
00133         }
00134 
00135         virtual const std::string getShortInfo() const;
00136 
00137     protected:
00138 
00139         const std::shared_ptr<SMInterface> shm;
00140         std::shared_ptr<DebugStream> unetlog;
00141 
00142         bool recv();
00143         void step();
00144         void real_update();
00145 
00146         void initIterators();
00147 
00148     public:
00149 
00150         // функция определения приоритетного сообщения для обработки
00151         struct PacketCompare:
00152             public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool>
00153         {
00154             inline bool operator()(const UniSetUDP::UDPMessage& lhs,
00155                                    const UniSetUDP::UDPMessage& rhs) const
00156             {
00157                 return lhs.num > rhs.num;
00158             }
00159         };
00160         typedef std::priority_queue<UniSetUDP::UDPMessage, std::vector<UniSetUDP::UDPMessage>, PacketCompare> PacketQueue;
00161 
00162     private:
00163         UNetReceiver();
00164 
00165         timeout_t recvpause = { 10 };      
00166         timeout_t updatepause = { 100 };    
00168         std::shared_ptr<ost::UDPReceive> udp;
00169         ost::IPV4Address addr;
00170         ost::tpport_t port = { 0 };
00171         std::string myname;
00172 
00173         UniSetTypes::uniset_rwmutex pollMutex;
00174         PassiveTimer ptRecvTimeout;
00175         PassiveTimer ptPrepare;
00176         timeout_t recvTimeout = { 5000 }; // msec
00177         timeout_t prepareTime = { 2000 };
00178         timeout_t lostTimeout = { 200 };
00179         PassiveTimer ptLostTimeout;
00180         size_t lostPackets = { 0 }; 
00182         UniSetTypes::ObjectId sidRespond = { UniSetTypes::DefaultObjectId };
00183         IOController::IOStateList::iterator itRespond;
00184         bool respondInvert = { false };
00185         UniSetTypes::ObjectId sidLostPackets;
00186         IOController::IOStateList::iterator itLostPackets;
00187 
00188         std::atomic_bool activated = { false };
00189 
00190         std::shared_ptr< ThreadCreator<UNetReceiver> > r_thr;        // receive thread
00191         std::shared_ptr< ThreadCreator<UNetReceiver> > u_thr;        // update thread
00192 
00193         PacketQueue qpack;    
00194         UniSetUDP::UDPMessage pack;        
00195         UniSetUDP::UDPPacket r_buf;
00196         UniSetTypes::uniset_rwmutex packMutex; 
00197         size_t pnum = { 0 };    
00202         size_t maxDifferens = { 20 };
00203 
00204         PacketQueue qtmp;    
00205         bool waitClean = { false };    
00206         size_t rnum = { 0 };    
00208         size_t maxProcessingCount = { 100 }; 
00210         bool lockUpdate = { false }; 
00211         UniSetTypes::uniset_rwmutex lockMutex;
00212 
00213         EventSlot slEvent;
00214         Trigger trTimeout;
00215         UniSetTypes::uniset_rwmutex tmMutex;
00216 
00217         struct CacheItem
00218         {
00219             long id;
00220             IOController::IOStateList::iterator ioit;
00221             UniversalIO::IOType iotype;
00222 
00223             CacheItem():
00224                 id(UniSetTypes::DefaultObjectId), iotype(UniversalIO::UnknownIOType) {}
00225         };
00226 
00227         typedef std::vector<CacheItem> CacheVec;
00228         struct CacheInfo
00229         {
00230             CacheInfo():
00231                 cache_init_ok(false)
00232             {
00233             }
00234             bool cache_init_ok;
00235             CacheVec cache;
00236         };
00237         // ключом является UDPMessage::getDataID()
00238         typedef std::unordered_map<long, CacheInfo> CacheMap;
00239         CacheMap d_icache_map;     
00240         CacheMap a_icache_map;     
00242         bool d_cache_init_ok = { false };
00243         bool a_cache_init_ok = { false };
00244 
00245         void initDCache( UniSetUDP::UDPMessage& pack, bool force = false );
00246         void initACache( UniSetUDP::UDPMessage& pack, bool force = false );
00247 };
00248 // -----------------------------------------------------------------------------
00249 #endif // UNetReceiver_H_
00250 // -----------------------------------------------------------------------------