UniSet  2.6.0
LogSession.h
1 /*
2  * Copyright (c) 2015 Pavel Vainerman.
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU Lesser General Public License as
6  * published by the Free Software Foundation, version 2.1.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11  * Lesser General Lesser Public License for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public License
14  * along with this program. If not, see <http://www.gnu.org/licenses/>.
15  */
16 // -------------------------------------------------------------------------
17 #ifndef LogSession_H_
18 #define LogSession_H_
19 // -------------------------------------------------------------------------
20 #include <string>
21 #include <memory>
22 #include <queue>
23 #include <ev++.h>
24 #include "Poco/Net/StreamSocket.h"
25 #include "Mutex.h"
26 #include "DebugStream.h"
27 #include "UTCPCore.h"
28 #include "UTCPStream.h"
29 #include "LogAgregator.h"
30 #ifndef DISABLE_REST_API
31 #include <Poco/JSON/Object.h>
32 #endif
33 // -------------------------------------------------------------------------
34 namespace uniset
35 {
36 
38  class LogSession
39  {
40  public:
41 
42  LogSession( const Poco::Net::StreamSocket& s, std::shared_ptr<DebugStream>& log, timeout_t cmdTimeout = 2000, timeout_t checkConnectionTime = 10000 );
43  ~LogSession();
44 
45  typedef sigc::slot<void, LogSession*> FinalSlot;
46  void connectFinalSession( FinalSlot sl ) noexcept;
47 
48  // сигнал о приходе команды: std::string func( LogSession*, command, logname );
49  // \return какую-то информацию, которая будет послана client-у. Если return.empty(), то ничего послано не будет.
50  typedef sigc::signal<std::string, LogSession*, LogServerTypes::Command, const std::string& > LogSessionCommand_Signal;
51  LogSessionCommand_Signal signal_logsession_command();
52 
53  // прервать работу
54  void cancel() noexcept;
55 
56  inline std::string getClientAddress() const noexcept
57  {
58  return caddr;
59  }
60 
61  inline void setSessionLogLevel( Debug::type t ) noexcept
62  {
63  mylog.level(t);
64  }
65  inline void addSessionLogLevel( Debug::type t ) noexcept
66  {
67  mylog.addLevel(t);
68  }
69  inline void delSessionLogLevel( Debug::type t ) noexcept
70  {
71  mylog.delLevel(t);
72  }
73 
75  void setMaxBufSize( size_t num );
76  size_t getMaxBufSize() const noexcept;
77 
78  // запуск обработки входящих запросов
79  void run( const ev::loop_ref& loop ) noexcept;
80  void terminate();
81 
82  bool isAcive() const noexcept;
83 
84  std::string name() const noexcept;
85 
86  std::string getShortInfo() noexcept;
87 
88 #ifndef DISABLE_REST_API
89  Poco::JSON::Object::Ptr httpGetShortInfo();
90 #endif
91 
92  protected:
93  // LogSession( ost::TCPSocket& server );
94 
95  void event( ev::async& watcher, int revents ) noexcept;
96  void callback( ev::io& watcher, int revents ) noexcept;
97  void readEvent( ev::io& watcher ) noexcept;
98  void writeEvent( ev::io& watcher );
99  size_t readData( unsigned char* buf, int len );
100  void cmdProcessing( const std::string& cmdLogName, const LogServerTypes::lsMessage& msg );
101  void onCmdTimeout( ev::timer& watcher, int revents ) noexcept;
102  void onCheckConnectionTimer( ev::timer& watcher, int revents ) noexcept;
103  void final() noexcept;
104 
105  void logOnEvent( const std::string& s ) noexcept;
106 
107  timeout_t cmdTimeout = { 2000 };
108  float checkConnectionTime = { 10. }; // время на проверку живости соединения..(сек)
109 
110  // Т.к. сообщений может быть ОЧЕНЬ МНОГО.. сеть медленная
111  // очередь будет не успевать рассасываться,
112  // то потенциально может "скушаться" вся память.
113  // Поэтому приходиться ограничить доступное количество записей.
114  // Рассчитываем, что средний размер одного сообщения 150 символов (байт)
115  // тогда выделяем буфер на 200 сообщений (~ 30кB)
116  // На самом деле сообщения могут быть совершенно разные..
117  size_t maxRecordsNum = { 30000 }; // максимальное количество сообщение в очереди
118 
119  private:
120  std::queue<UTCPCore::Buffer*> logbuf;
121  std::mutex logbuf_mutex;
122  bool lostMsg = { false };
123 
124  // статистика по использованию буфера
125  size_t maxCount = { 0 }; // максимальное количество побывавшее в очереди
126  size_t minSizeMsg = { 0 }; // минимальная встретившаяся длинна сообщения
127  size_t maxSizeMsg = { 0 }; // максимальная встретившаяся длинна сообщения
128  size_t numLostMsg = { 0 }; // количество потерянных сообщений
129 
130  std::string peername = { "" };
131  std::string caddr = { "" };
132  std::shared_ptr<DebugStream> log;
133  std::shared_ptr<LogAgregator> alog;
134  sigc::connection conn;
135 
136  std::shared_ptr<UTCPStream> sock;
137 
138  ev::io io;
139  ev::timer cmdTimer;
140  ev::async asyncEvent;
141  ev::timer checkConnectionTimer;
142 
143  FinalSlot slFin;
144  std::atomic_bool cancelled = { false };
145 
146  LogSessionCommand_Signal m_command_sig;
147 
148  DebugStream mylog;
149  };
150  // -------------------------------------------------------------------------
151 } // end of uniset namespace
152 // -------------------------------------------------------------------------
153 #endif // LogSession_H_
154 // -------------------------------------------------------------------------