1919
2020#include "sockhub.h"
2121
22+ inline void ShubAddSocket (Shub * shub , int fd )
23+ {
24+ #ifdef USE_EPOLL
25+ struct epoll_event ev ;
26+ ev .events = EPOLLIN ;
27+ ev .data .fd = fd ;
28+ if (epoll_ctl (shub -> epollfd , EPOLL_CTL_ADD , fd , & ev ) < 0 ) {
29+ shub -> params -> error_handler ("Failed to add socket to epoll set" , SHUB_FATAL_ERROR );
30+ }
31+ #else
32+ FD_SET (fd , & shub -> inset );
33+ if (fd > shub -> max_fd ) {
34+ shub -> max_fd = fd ;
35+ }
36+ #endif
37+ }
38+
39+
2240static void default_error_handler (char const * msg , ShubErrorSeverity severity )
2341{
2442 perror (msg );
@@ -68,7 +86,13 @@ static int resolve_host_by_name(const char *hostname, unsigned* addrs, unsigned*
6886static void close_socket (Shub * shub , int fd )
6987{
7088 close (fd );
89+ #ifdef USE_EPOLL
90+ if (epoll_ctl (shub -> epollfd , EPOLL_CTL_DEL , fd , NULL ) < 0 ) {
91+ shub -> params -> error_handler ("Failed to add socket to epoll set" , SHUB_RECOVERABLE_ERROR );
92+ }
93+ #else
7194 FD_CLR (fd , & shub -> inset );
95+ #endif
7296}
7397
7498int ShubReadSocketEx (int sd , void * buf , int min_size , int max_size )
@@ -163,7 +187,8 @@ static void reconnect(Shub* shub)
163187 } else {
164188 int optval = 1 ;
165189 setsockopt (shub -> output , IPPROTO_TCP , TCP_NODELAY , (char const * )& optval , sizeof (optval ));
166- FD_SET (shub -> output , & shub -> inset );
190+
191+ ShubAddSocket (shub , shub -> output );
167192 if (sep != NULL ) {
168193 * sep = ',' ;
169194 }
@@ -196,6 +221,7 @@ static void notify_disconnect(Shub* shub, int chan)
196221
197222static void recovery (Shub * shub )
198223{
224+ #ifndef USE_EPOLL
199225 int i , max_fd ;
200226
201227 for (i = 0 , max_fd = shub -> max_fd ; i <= max_fd ; i ++ ) {
@@ -212,6 +238,7 @@ static void recovery(Shub* shub)
212238 }
213239 }
214240 }
241+ #endif
215242}
216243
217244void ShubInitialize (Shub * shub , ShubParams * params )
@@ -233,11 +260,14 @@ void ShubInitialize(Shub* shub, ShubParams* params)
233260 if (listen (shub -> input , params -> queue_size ) < 0 ) {
234261 shub -> params -> error_handler ("Failed to listen local socket" , SHUB_FATAL_ERROR );
235262 }
236- FD_ZERO (& shub -> inset );
237- FD_SET (shub -> input , & shub -> inset );
238-
239263 shub -> output = -1 ;
240- shub -> max_fd = shub -> input ;
264+ #ifdef USE_EPOLL
265+ shub -> epollfd = epoll_create (MAX_EVENTS );
266+ #else
267+ FD_ZERO (& shub -> inset );
268+ shub -> max_fd = 0 ;
269+ #endif
270+ ShubAddSocket (shub , shub -> input );
241271 reconnect (shub );
242272
243273 shub -> in_buffer = malloc (params -> buffer_size );
@@ -266,34 +296,42 @@ void ShubLoop(Shub* shub)
266296 sigprocmask (SIG_UNBLOCK , & sset , NULL );
267297
268298 while (!stop ) {
299+ int i , rc ;
300+ #ifdef USE_EPOLL
301+ struct epoll_event events [MAX_EVENTS ];
302+ rc = epoll_wait (shub -> epollfd , events , MAX_EVENTS , shub -> params -> delay );
303+ #else
269304 fd_set events ;
270305 struct timeval tm ;
271- int i , rc ;
272306 int max_fd = shub -> max_fd ;
273307
274308 tm .tv_sec = shub -> params -> delay /1000 ;
275309 tm .tv_usec = shub -> params -> delay % 1000 * 1000 ;
276310
277- events = shub -> inset ;
278311 rc = select (max_fd + 1 , & events , NULL , NULL , shub -> in_buffer_used == 0 ? NULL : & tm );
312+ #endif
279313 if (rc < 0 ) {
280314 if (errno != EINTR ) {
281315 shub -> params -> error_handler ("Select failed" , SHUB_RECOVERABLE_ERROR );
282316 recovery (shub );
283317 }
284318 } else {
285319 if (rc > 0 ) {
286- for (i = 0 ; i <= max_fd ; i ++ ) {
320+ #ifdef USE_EPOLL
321+ int j ;
322+ for (j = 0 ; j < rc ; j ++ ) {
323+ {
324+ i = events [j ].data .fd ;
325+ #else
326+ for (i = 0 ; i <= max_fd ; i ++ ) {
287327 if (FD_ISSET (i , & events )) {
328+ #endif
288329 if (i == shub -> input ) { /* accept incomming connection */
289330 int s = accept (i , NULL , NULL );
290331 if (s < 0 ) {
291332 shub -> params -> error_handler ("Failed to accept socket" , SHUB_RECOVERABLE_ERROR );
292333 } else {
293- if (s > shub -> max_fd ) {
294- shub -> max_fd = s ;
295- }
296- FD_SET (s , & shub -> inset );
334+ ShubAddSocket (shub , i );
297335 }
298336 } else if (i == shub -> output ) { /* receive response from server */
299337 /* try to read as much as possible */
@@ -420,10 +458,10 @@ void ShubLoop(Shub* shub)
420458 do {
421459 unsigned int n = processed + size > buffer_size ? buffer_size - processed : size ;
422460 if (chan >= 0 && !ShubReadSocket (chan , shub -> in_buffer + processed , n )) {
423- char buf [1024 ];
424- sprintf (buf , "Failed to read local socket rc=%d, len=%d, errno=%d" , rc , n , errno );
425- shub -> params -> error_handler (buf , SHUB_RECOVERABLE_ERROR );
426- //shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
461+ char buf [1024 ];
462+ sprintf (buf , "Failed to read local socket rc=%d, len=%d, errno=%d" , rc , n , errno );
463+ shub -> params -> error_handler (buf , SHUB_RECOVERABLE_ERROR );
464+ //shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
427465 close_socket (shub , chan );
428466 if (hdr != NULL ) { /* if message header is not yet sent to the server... */
429467 /* ... then skip this message */
0 commit comments