ASSA::Reactor Class Reference

#include <Reactor.h>

List of all members.

Public Member Functions

 Reactor ()
 Constructor.
 ~Reactor ()
 Destructor.
TimerId registerTimerHandler (EventHandler *eh_, const TimeVal &tv_, const std::string &name_="<unknown>")
 Register Timer Event handler with Reactor.
bool registerIOHandler (EventHandler *eh_, handler_t fd_, EventType et_=RWE_EVENTS)
 Register I/O Event handler with Reactor.
bool removeHandler (EventHandler *eh_, EventType et_=ALL_EVENTS)
 Remove Event handler from reactor for either all I/O events or timeout event or both.
bool removeTimerHandler (TimerId id_)
 Remove Timer event from the queue.
bool removeIOHandler (handler_t fd_)
 Remove IO Event handler from reactor.
void waitForEvents (void)
 Main waiting loop that blocks indefinitely processing events.
void waitForEvents (TimeVal *tv_)
 Wait for events for time specified.
void stopReactor (void)
 Stop Reactor's activity.
void deactivate (void)
 Deactivate Reactor.

Private Types

typedef std::map< u_int,
EventHandler * > 
Fd2Eh_Map_Type
 no cloning
typedef Fd2Eh_Map_Type::iterator Fd2Eh_Map_Iter

Private Member Functions

 Reactor (const Reactor &)
Reactoroperator= (const Reactor &)
 no cloning
void adjust_maxfdp1 (handler_t fd_)
 Adjust maxfdp1 in a portable way (win32 ignores maxfd alltogether).
bool handleError (void)
 Handle error in select(2) loop appropriately.
bool dispatch (int minimum_)
 Notify all EventHandlers registered on respecful events occured.
int isAnyReady (void)
 Return number of file descriptors ready accross all sets.
bool checkFDs (void)
 Check mask for bad file descriptors.
void dispatchHandler (FdSet &mask_, Fd2Eh_Map_Type &fdSet_, EH_IO_Callback callback_)
 Call handler's callback and, if callback returns negative value, remove it from the Reactor.
void calculateTimeout (TimeVal *&howlong_, TimeVal *maxwait_)
 Calculate closest timeout.

Private Attributes

int m_fd_setsize
 Max number of open files per process.
handler_t m_maxfd_plus1
 Max file descriptor number (in all sets) plus 1.
bool m_active
 Flag that indicates whether Reactor is active or had been stopped.
Fd2Eh_Map_Type m_readSet
 Event handlers awaiting on READ_EVENT.
Fd2Eh_Map_Type m_writeSet
 Event handlers awaiting on WRITE_EVENT.
Fd2Eh_Map_Type m_exceptSet
 Event handlers awaiting on EXCEPT_EVENT.
MaskSet m_waitSet
 Handlers to wait for event on.
MaskSet m_readySet
 Handlers that are ready for processing.
TimerQueue m_tqueue
 The queue of Timers.


Detailed Description

Definition at line 57 of file Reactor.h.


Member Typedef Documentation

typedef std::map<u_int, EventHandler*> ASSA::Reactor::Fd2Eh_Map_Type [private]

no cloning

Definition at line 154 of file Reactor.h.

typedef Fd2Eh_Map_Type::iterator ASSA::Reactor::Fd2Eh_Map_Iter [private]

Definition at line 155 of file Reactor.h.


Constructor & Destructor Documentation

Reactor::Reactor (  ) 

Constructor.

Maximum number of sockets supported (per process) Win32 defines it to 64 in winsock2.h.

Initialize winsock2 library

Definition at line 24 of file Reactor.cpp.

References m_fd_setsize, ASSA::REACTTRACE, and trace_with_mask.

00024            : 
00025     m_fd_setsize  (1024), 
00026     m_maxfd_plus1 (0), 
00027     m_active      (true)
00028 {
00029     trace_with_mask("Reactor::Reactor",REACTTRACE);
00030 
00034 #if defined(WIN32)
00035     m_fd_setsize = FD_SETSIZE;
00036 
00037 #else  // POSIX
00038     struct rlimit rlim;
00039     rlim.rlim_max = 0;
00040 
00041     if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) {
00042         m_fd_setsize = rlim.rlim_cur;
00043     }
00044 #endif
00045 
00048 #if defined (WIN32)             
00049     WSADATA data;
00050     WSAStartup (MAKEWORD (2, 2), &data);
00051 #endif
00052 }

Reactor::~Reactor (  ) 

Destructor.

Definition at line 55 of file Reactor.cpp.

References deactivate(), m_exceptSet, m_readSet, m_writeSet, ASSA::REACTTRACE, and trace_with_mask.

00056 {   
00057     trace_with_mask("Reactor::~Reactor",REACTTRACE);
00058 
00059     m_readSet.clear   ();
00060     m_writeSet.clear  ();
00061     m_exceptSet.clear ();
00062     deactivate ();
00063 }

ASSA::Reactor::Reactor ( const Reactor  )  [private]


Member Function Documentation

TimerId Reactor::registerTimerHandler ( EventHandler eh_,
const TimeVal tv_,
const std::string &  name_ = "<unknown>" 
)

Register Timer Event handler with Reactor.

Reactor will dispatch appropriate callback when event of EventType is received.

Parameters:
eh_ Pointer to the EventHandler
tv_ Timeout value
name_ Name of the timer
Returns:
Timer ID that can be used to cancel timer and find out its name.

Definition at line 67 of file Reactor.cpp.

References Assure_return, DL, ASSA::TimerQueue::dump(), ASSA::TimeVal::fmtString(), ASSA::TimeVal::gettimeofday(), ASSA::TimerQueue::insert(), m_tqueue, ASSA::TimeVal::msec(), ASSA::REACT, ASSA::REACTTRACE, ASSA::TimeVal::sec(), and trace_with_mask.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doAsync(), and ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync().

00070 {
00071     trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE);
00072     Assure_return (eh_);
00073 
00074     TimeVal now (TimeVal::gettimeofday());
00075     TimeVal t (now + timeout_);
00076 
00077     DL((REACT,"TIMEOUT_EVENT......: (%d,%d)\n",  
00078         timeout_.sec(),timeout_.msec()));
00079     DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() ));
00080     DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() ));
00081 
00082     TimerId tid =  m_tqueue.insert (eh_, t, timeout_, name_);
00083 
00084     DL((REACT,"---Modified Timer Queue----\n"));
00085     m_tqueue.dump();
00086     DL((REACT,"---------------------------\n"));
00087 
00088     return (tid);
00089 }

bool Reactor::registerIOHandler ( EventHandler eh_,
handler_t  fd_,
EventType  et_ = RWE_EVENTS 
)

Register I/O Event handler with Reactor.

Reactor will dispatch appropriate callback when event of EventType is received.

Parameters:
eh_ Pointer to the EventHandler
fd_ File descriptor
et_ Event Type
Returns:
true if success, false if error

Definition at line 93 of file Reactor.cpp.

References ASSA::ASSAERR, Assure_return, DL, ASSA::MaskSet::dump(), ASSA::ends(), ASSA::EventHandler::get_id(), ASSA::isExceptEvent(), ASSA::isReadEvent(), ASSA::isSignalEvent(), ASSA::isTimeoutEvent(), ASSA::isWriteEvent(), ASSA::MaskSet::m_eset, m_exceptSet, m_maxfd_plus1, m_readSet, ASSA::MaskSet::m_rset, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::FdSet::setFd(), and trace_with_mask.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doAsync(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(), ASSA::RemoteLogger::log_open(), and ASSA::Acceptor< SERVICE_HANDLER, PEER_ACCEPTOR >::open().

00094 {
00095     trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE);
00096 
00097     std::ostringstream msg;
00098     Assure_return (eh_ && !isSignalEvent (et_) && !isTimeoutEvent (et_));
00099 
00100     if (isReadEvent (et_)) 
00101     {
00102         if (!m_waitSet.m_rset.setFd (fd_)) 
00103         {
00104             DL((ASSAERR,"readset: fd %d out of range\n", fd_));
00105             return (false);
00106         }
00107         m_readSet[fd_] = eh_;
00108         msg << "READ_EVENT";
00109     }
00110 
00111     if (isWriteEvent (et_)) 
00112     {
00113         if (!m_waitSet.m_wset.setFd (fd_)) 
00114         {
00115             DL((ASSAERR,"writeset: fd %d out of range\n", fd_));
00116             return (false);
00117         }
00118         m_writeSet[fd_] = eh_;
00119         msg << " WRITE_EVENT";
00120     }
00121 
00122     if (isExceptEvent (et_)) 
00123     {
00124         if (!m_waitSet.m_eset.setFd (fd_)) 
00125         {
00126             DL((ASSAERR,"exceptset: fd %d out of range\n", fd_));
00127             return (false);
00128         }
00129         m_exceptSet[fd_] = eh_;
00130         msg << " EXCEPT_EVENT";
00131     }
00132     msg << std::ends;
00133 
00134     DL((REACT,"Registered EvtH(%s) fd=%d (0x%x) for event(s) %s\n", 
00135         eh_->get_id ().c_str (), fd_, (u_long)eh_, msg.str ().c_str () ));
00136 
00137 #if !defined (WIN32)
00138     if (m_maxfd_plus1 < fd_+1) {
00139         m_maxfd_plus1 = fd_+1;
00140         DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
00141     }
00142 #endif
00143 
00144     DL((REACT,"Modified waitSet:\n"));
00145     m_waitSet.dump ();
00146 
00147     return (true);
00148 }

bool Reactor::removeHandler ( EventHandler eh_,
EventType  et_ = ALL_EVENTS 
)

Remove Event handler from reactor for either all I/O events or timeout event or both.

Remove handler from all events that matches event_.

If et_ is TIMEOUT_EVENT, all timers associated with Event Handler eh_ will be removed.

Parameters:
eh_ Pointer to the EventHandler
et_ Event Type to remove. Default will remove Event Handler for all events.
Returns:
true if success, false if wasn't registered for any events.

Definition at line 173 of file Reactor.cpp.

References adjust_maxfdp1(), ASSA::FdSet::clear(), DL, ASSA::MaskSet::dump(), ASSA::EventHandler::get_id(), ASSA::EventHandler::handle_close(), ASSA::isExceptEvent(), ASSA::isReadEvent(), ASSA::isTimeoutEvent(), ASSA::isWriteEvent(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, ASSA::MaskSet::m_rset, m_tqueue, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), and trace_with_mask.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_timeout(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_write(), ASSA::RemoteLogger::log_close(), and stopReactor().

00174 {
00175     trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE);
00176 
00177     bool ret = false;
00178     handler_t fd;
00179     Fd2Eh_Map_Iter iter;
00180 
00181     if (eh_ == NULL) {
00182         return false;
00183     }
00184 
00185     if (isTimeoutEvent (event_)) {
00186         ret = m_tqueue.remove (eh_);
00187         ret = true;
00188     }
00189 
00190     if (isReadEvent (event_)) {
00191         iter = m_readSet.begin ();
00192         while (iter != m_readSet.end ()) {
00193             if ((*iter).second == eh_) {
00194                 fd = (*iter).first;
00195                 m_readSet.erase (iter);
00196                 m_waitSet.m_rset.clear (fd);
00197                 ret = true;
00198                 break;
00199             }
00200             iter++;
00201         }
00202     } 
00203     
00204     if (isWriteEvent (event_)) {
00205         iter = m_writeSet.begin ();
00206         while (iter != m_writeSet.end ()) {
00207             if ((*iter).second == eh_) {
00208                 fd = (*iter).first;
00209                 m_writeSet.erase (iter);
00210                 m_waitSet.m_wset.clear (fd);
00211                 ret = true;
00212                 break;
00213             }
00214             iter++;
00215         }
00216     }
00217 
00218     if (isExceptEvent (event_)) {
00219         iter = m_exceptSet.begin ();
00220         while (iter != m_exceptSet.end ()) {
00221             if ((*iter).second == eh_) {
00222                 fd = (*iter).first;
00223                 m_exceptSet.erase (iter);
00224                 m_waitSet.m_eset.clear (fd);
00225                 ret = true;
00226                 break;
00227             }
00228             iter++;
00229         }
00230     }
00231 
00232     if (ret == true) {
00233         DL((REACT,"Found EvtH \"%s\"(%p)\n", eh_->get_id ().c_str (), eh_));
00234         eh_->handle_close (fd);
00235     }
00236 
00237     adjust_maxfdp1 (fd);
00238 
00239     DL((REACT,"Modifies waitSet:\n"));
00240     m_waitSet.dump ();
00241 
00242     return (ret);
00243 }

bool Reactor::removeTimerHandler ( TimerId  id_  ) 

Remove Timer event from the queue.

This removes particular event.

Parameters:
id_ Timer Id returned by registerTimer.
Returns:
true if timer found and removed; false otherwise

Definition at line 152 of file Reactor.cpp.

References ASSA::ASSAERR, DL, ASSA::TimerQueue::dump(), EL, m_tqueue, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), and trace_with_mask.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_write().

00153 {
00154     trace_with_mask("Reactor::removeTimer",REACTTRACE);
00155     bool ret;
00156 
00157     if ((ret = m_tqueue.remove (tid_))) {
00158         DL((REACT,"---Modified Timer Queue----\n"));
00159         m_tqueue.dump();
00160         DL((REACT,"---------------------------\n"));
00161     }
00162     else {
00163         EL((ASSAERR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ ));
00164     }
00165     return (ret);
00166 }

bool Reactor::removeIOHandler ( handler_t  fd_  ) 

Remove IO Event handler from reactor.

This will remove handler from receiving all I/O events.

Parameters:
fd_ File descriptor
Returns:
true on success, false if fd_ is out of range

We clear m_readySet mask here as well, because if we don't, it will be erroneously used by isAnyReady() before select().

Definition at line 247 of file Reactor.cpp.

References adjust_maxfdp1(), Assure_return, ASSA::FdSet::clear(), DL, ASSA::MaskSet::dump(), ASSA::EventHandler::get_id(), ASSA::EventHandler::handle_close(), ASSA::is_valid_handler(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by checkFDs(), and dispatchHandler().

00248 {
00249     trace_with_mask("Reactor::removeIOHandler",REACTTRACE);
00250 
00251     bool ret = false;
00252     EventHandler*  ehp = NULL;
00253     Fd2Eh_Map_Iter iter;
00254 
00255     Assure_return (ASSA::is_valid_handler (fd_));
00256 
00257     DL((REACT,"Removing handler for fd=%d\n",fd_));
00258 
00263     if ((iter = m_readSet.find (fd_)) != m_readSet.end ()) 
00264     {
00265         ehp = (*iter).second;
00266         m_readSet.erase (iter);
00267         m_waitSet.m_rset.clear (fd_);
00268         m_readySet.m_rset.clear (fd_);
00269         if (m_readSet.size () > 0) {
00270             iter = m_readSet.end ();
00271             iter--;
00272         }
00273         ret = true;
00274     }
00275 
00276     if ((iter = m_writeSet.find (fd_)) != m_writeSet.end ()) 
00277     {
00278         ehp = (*iter).second;
00279         m_writeSet.erase (iter);
00280         m_waitSet.m_wset.clear (fd_);
00281         m_readySet.m_wset.clear (fd_);
00282         if (m_writeSet.size () > 0) {
00283             iter = m_writeSet.end ();
00284             iter--;
00285         }
00286         ret = true;
00287     }
00288 
00289     if ((iter = m_exceptSet.find (fd_)) != m_exceptSet.end ()) 
00290     {
00291         ehp = (*iter).second;
00292         m_exceptSet.erase (iter);
00293         m_waitSet.m_eset.clear (fd_);
00294         m_readySet.m_eset.clear (fd_);
00295         if (m_exceptSet.size () > 0) {
00296             iter = m_exceptSet.end ();
00297             iter--;
00298         }
00299         ret = true;
00300     }
00301 
00302     if (ret == true && ehp != NULL) {
00303         DL((REACT,"Removed EvtH \"%s\"(%p)\n", ehp->get_id ().c_str (), ehp));
00304         ehp->handle_close (fd_);
00305     }
00306 
00307     adjust_maxfdp1 (fd_);
00308 
00309     DL((REACT,"Modifies waitSet:\n"));
00310     m_waitSet.dump ();
00311 
00312     return (ret);
00313 }

void Reactor::waitForEvents ( void   ) 

Main waiting loop that blocks indefinitely processing events.

Block forever version.

Definition at line 470 of file Reactor.cpp.

References m_active.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync().

00471 {
00472     while ( m_active ) {
00473         waitForEvents ((TimeVal*) NULL);
00474     }
00475 }

void Reactor::waitForEvents ( TimeVal tv_  ) 

Wait for events for time specified.

===================================================================== | select() | errno | Events | Behavior | |===================================================================| | < 0 | EINTR | Interrup by signal | Retry | +----------+-------+---------------------+--------------------------+ | < 0 | EBADF | Bad file descriptor | Remove bad fds and retry | | | | | and retry | +----------+-------+---------------------+--------------------------+ | < 0 | others| Some other error | Fall through | +----------+-------+---------------------+--------------------------+ | == 0 | 0 | Timed out | Fall through | +----------+-------+---------------------+--------------------------+ | > 0 | 0 | Got some work to do | Fall through | +-------------------------------------------------------------------+

Passing NULL replicates behavior of waitForEvents(void). Passing tv_ {0, 0} will cause non-blocking polling for all events. This method blocks up to tv_ time interval processing event. If an event occurs, it will process event(s) and return. tv_ time is adjusted by substracting time spent in event processing.

Parameters:
tv_ [RW] is time to wait for.

Definition at line 495 of file Reactor.cpp.

References calculateTimeout(), dispatch(), DL, ASSA::MaskSet::dump(), ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), handleError(), isAnyReady(), m_active, ASSA::MaskSet::m_eset, m_maxfd_plus1, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_waitSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::MaskSet::reset(), ASSA::MaskSet::sync(), and trace_with_mask.

00496 {
00497     trace_with_mask("Reactor::waitForEvents",REACTTRACE);
00498 
00499     TimerCountdown traceTime (tv_);
00500     DL((REACT,"======================================\n"));
00501 
00502     /*--- Expire all stale Timers ---*/
00503     m_tqueue.expire (TimeVal::gettimeofday ());
00504 
00505     /* Test to see if Reactor has been deactivated as a result
00506      * of processing done by any TimerHandlers.
00507      */
00508     if (!m_active) {
00509         return;
00510     }
00511 
00512     int      nReady;
00513     TimeVal  delay;
00514     TimeVal* dlp = &delay;
00515 
00516     /*---
00517       In case if not all data have been processed by the EventHandler,
00518       and EventHandler stated so in its callback's return value
00519       to dispatcher (), it will be called again. This way 
00520       underlying file/socket stream can efficiently utilize its
00521       buffering mechaninsm.
00522       ---*/
00523     if ((nReady = isAnyReady ())) {
00524         DL((REACT,"isAnyReady returned: %d\n",nReady));
00525         dispatch (nReady);
00526         return;
00527     }
00528 
00529     DL((REACT,"=== m_waitSet ===\n"));
00530     m_waitSet.dump ();
00531 
00532     do {
00533         m_readySet.reset ();
00534         DL ((REACT,"m_readySet after reset():\n"));
00535         m_readySet.dump ();
00536 
00537         m_readySet = m_waitSet;
00538         DL ((REACT,"m_readySet after assign:\n"));
00539         m_readySet.dump ();
00540 
00541         calculateTimeout (dlp, tv_);
00542 
00543         nReady = ::select (m_maxfd_plus1, 
00544                            &m_readySet.m_rset,
00545                            &m_readySet.m_wset, 
00546                            &m_readySet.m_eset, 
00547                            dlp);
00548         DL((REACT,"::select() returned: %d\n",nReady));
00549 
00550         m_readySet.sync ();
00551         DL ((REACT,"m_readySet after select:\n"));
00552         m_readySet.dump ();
00553 
00554     } 
00555     while (nReady < 0 && handleError ());
00556 
00557     dispatch (nReady);
00558 }

void Reactor::stopReactor ( void   ) 

Stop Reactor's activity.

This effectively removes all handlers from under Reactor's supervision. As of now, there is no way to re-activate the Reactor. This method is typically called from method other then EventHandler::signal_handler(). EventHandler::handle_read () is a good candidate. Calling it from EventHandler::handle_close () will most likely cause an infinite loop of recursive calls.

Definition at line 667 of file Reactor.cpp.

References m_active, m_exceptSet, m_readSet, m_writeSet, ASSA::REACTTRACE, removeHandler(), and trace_with_mask.

00668 { 
00669     trace_with_mask("Reactor::stopReactor", REACTTRACE);
00670 
00671     m_active = false; 
00672 
00673     Fd2Eh_Map_Iter iter;
00674     EventHandler* ehp;
00675 
00676     while (m_readSet.size () > 0) {
00677         iter = m_readSet.begin ();
00678         ehp = (*iter).second;
00679         removeHandler (ehp);
00680     }
00681 
00682     while (m_writeSet.size () > 0) {
00683         iter = m_writeSet.begin ();
00684         ehp = (*iter).second;
00685         removeHandler (ehp);
00686     }
00687 
00688     while (m_exceptSet.size () > 0) {
00689         iter = m_exceptSet.begin ();
00690         ehp = (*iter).second;
00691         removeHandler (ehp);
00692     }
00693 }

void ASSA::Reactor::deactivate ( void   )  [inline]

Deactivate Reactor.

This function sets internal flag which notifies Reactor's internal event handling loop to abort its activity. It is mostly used when a *slow* system call is interrupted by the signal handler. The system call will be restarted by OS after control returns from the signal handler. Signal handler (GenServer::handle_signal()) should call this method to delay Reactor's deactivation.

Definition at line 234 of file Reactor.h.

References m_active.

Referenced by ASSA::GenServer::handle_signal(), ASSA::GenServer::stop_service(), and ~Reactor().

00234 {  m_active = false; }

Reactor& ASSA::Reactor::operator= ( const Reactor  )  [private]

no cloning

void Reactor::adjust_maxfdp1 ( handler_t  fd_  )  [private]

Adjust maxfdp1 in a portable way (win32 ignores maxfd alltogether).

If the socket descriptor that has just been eliminated was the maxfd+1, we readjust to the next highest.

Win32 implementation of select() ignores this value altogether.

Definition at line 701 of file Reactor.cpp.

References DL, m_maxfd_plus1, m_waitSet, ASSA::MaskSet::max_fd(), ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by removeHandler(), and removeIOHandler().

00702 {
00703 #if !defined (WIN32)  /* POSIX */
00704 
00705     trace_with_mask("Reactor::adjust_maxfdp1", REACTTRACE);
00706 
00707     if (m_maxfd_plus1 == fd_ + 1) 
00708     {
00709         m_maxfd_plus1 = m_waitSet.max_fd () + 1;
00710         DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
00711     }
00712 #endif
00713 }

bool Reactor::handleError ( void   )  [private]

Handle error in select(2) loop appropriately.

If commanded to stop, do so

Definition at line 341 of file Reactor.cpp.

References ASSA::ASSAERR, checkFDs(), DL, EL, m_active, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

00342 {
00343     trace_with_mask("Reactor::handleError",REACTTRACE);
00344 
00347     if ( !m_active ) {
00348         DL((REACT,"Received cmd to stop Reactor\n"));
00349         return (false);
00350     }
00351 
00352     /*---
00353       TODO: If select(2) returns before time expires, with
00354       a descriptor ready or with EINTR, timeval is not
00355       going to be updated with number of seconds remaining.
00356       This is true for all systems except Linux, which will
00357       do so. Therefore, to restart correctly in case of
00358       EINTR, we ought to take time measurement before and
00359       after select, and try to select() for remaining time.
00360     
00361       For now, we restart with the initial timing value.
00362       ---*/
00363     /*---
00364       BSD kernel never restarts select(2). SVR4 will restart if
00365       the SA_RESTART flag is specified when the signal handler
00366       for the signal delivered is installed. This means taht for
00367       portability, we must handle signal interrupts.
00368       ---*/
00369 
00370     if ( errno == EINTR ) {
00371         EL((REACT,"EINTR: interrupted select(2)\n"));
00372         /*
00373           If I was sitting in select(2) and received SIGTERM,
00374           the signal handler would have set m_active to 'false',
00375           and this function would have returned 'false' as above.
00376           For any other non-critical signals (USR1,...),
00377           we retry select.
00378         */
00379         return (true);
00380     }
00381     /*
00382       EBADF - bad file number. One of the file descriptors does
00383       not reference an open file to open(), close(), ioctl().
00384       This can happen if user closed fd and forgot to remove
00385       handler from Reactor.
00386     */
00387     if ( errno == EBADF ) {
00388         DL((REACT,"EBADF: bad file descriptor\n"));
00389         return (checkFDs ());
00390     }
00391     /*
00392       Any other error from select
00393     */
00394 #if defined (WIN32) 
00395     DL ((REACT,"select(3) error = %d\n", WSAGetLastError()));
00396 #else
00397     EL((ASSAERR,"select(3) error\n"));
00398 #endif
00399     return (false);
00400 }

bool Reactor::dispatch ( int  minimum_  )  [private]

Notify all EventHandlers registered on respecful events occured.

Many UNIX systems will count a particular file descriptor in the ready_ only ONCE, even if it was flagged by select(2) in, say, both read and write masks.

Parameters:
minimum_ number of file descriptors ready.

Definition at line 626 of file Reactor.cpp.

References ASSA::ASSAERR, dispatchHandler(), DL, ASSA::MaskSet::dump(), EL, ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), ASSA::EventHandler::handle_except(), ASSA::EventHandler::handle_read(), ASSA::EventHandler::handle_write(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

00627 {
00628     trace_with_mask("Reactor::dispatch", REACTTRACE);
00629 
00630     m_tqueue.expire (TimeVal::gettimeofday ());
00631 
00632     if ( ready_ < 0 ) 
00633     {
00634 #if !defined (WIN32)
00635         EL((ASSAERR,"::select(3) error\n"));
00636 #endif
00637         return (false);
00638     }
00639     if ( ready_ == 0 ) {
00640         return (true);
00641     }
00642 
00643     DL((REACT,"Dispatching %d FDs.\n",ready_));
00644     DL((REACT,"m_readySet:\n"));
00645     m_readySet.dump ();
00646 
00647     /*--- Writes first ---*/
00648     dispatchHandler (m_readySet.m_wset, 
00649                      m_writeSet, 
00650                      &EventHandler::handle_write);
00651 
00652     /*--- Exceptions next ---*/
00653     dispatchHandler (m_readySet.m_eset, 
00654                      m_exceptSet, 
00655                      &EventHandler::handle_except);
00656 
00657     /*--- Finally, the Reads ---*/
00658     dispatchHandler (m_readySet.m_rset, 
00659                      m_readSet, 
00660                      &EventHandler::handle_read);
00661 
00662     return (true);
00663 }

int Reactor::isAnyReady ( void   )  [private]

Return number of file descriptors ready accross all sets.

Definition at line 404 of file Reactor.cpp.

References DL, ASSA::MaskSet::dump(), ASSA::MaskSet::m_eset, m_readySet, ASSA::MaskSet::m_rset, ASSA::MaskSet::m_wset, ASSA::FdSet::numSet(), ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

00405 {
00406     trace_with_mask("Reactor::isAnyReady",REACTTRACE);
00407 
00408     int n = m_readySet.m_rset.numSet () +
00409         m_readySet.m_wset.numSet () +
00410         m_readySet.m_eset.numSet ();
00411 
00412     if ( n > 0 ) {
00413         DL((REACT,"m_readySet: %d FDs are ready for processing\n", n));
00414         m_readySet.dump ();
00415     }
00416     return (n);
00417 }

bool Reactor::checkFDs ( void   )  [private]

Check mask for bad file descriptors.

Returns:
true if any fd(s) were found and removed; false otherwise

Definition at line 317 of file Reactor.cpp.

References ASSA::FdSet::clear(), DL, m_fd_setsize, m_readSet, ASSA::REACT, ASSA::REACTTRACE, removeIOHandler(), ASSA::FdSet::setFd(), and trace_with_mask.

Referenced by handleError().

00318 {
00319     trace_with_mask("Reactor::checkFDs",REACTTRACE);
00320     
00321     bool num_removed = false;
00322     FdSet mask;
00323     timeval poll = { 0, 0 };
00324 
00325     for (handler_t fd = 0; fd < m_fd_setsize; fd++) {
00326         if ( m_readSet[fd] != NULL ) {
00327             mask.setFd (fd);
00328             if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) {
00329                 removeIOHandler (fd);
00330                 num_removed = true;
00331                 DL((REACT,"Detected BAD FD: %d\n", fd ));
00332             }
00333             mask.clear (fd);
00334         }
00335     }
00336     return (num_removed);
00337 }

void Reactor::dispatchHandler ( FdSet mask_,
Fd2Eh_Map_Type fdSet_,
EH_IO_Callback  callback_ 
) [private]

Call handler's callback and, if callback returns negative value, remove it from the Reactor.

This spot needs re-thinking.

When you have several high data-rate connections sending data at the same time, the one that had connected first would get lower FD number and would get data transfer preference over everybody else who has connected later on.

WIN32 HACK: Without having restarted scan from the beginning, this causes crash due to the fact that firing a callback of EventHandler might have invalidated the iterator (happens with Connector's in a sync mode).

Definition at line 568 of file Reactor.cpp.

References ASSA::FdSet::clear(), DL, ASSA::EventHandler::get_id(), ASSA::FdSet::isSet(), ASSA::REACT, ASSA::REACTTRACE, removeIOHandler(), and trace_with_mask.

Referenced by dispatch().

00569 {
00570     trace_with_mask("Reactor::dispatchHandler",REACTTRACE);
00571 
00572     int ret = 0;
00573     handler_t fd;
00574     EventHandler* ehp = NULL;
00575     std::string eh_id;
00576 
00577     Fd2Eh_Map_Iter iter = fdSet_.begin ();
00578 
00579     while (iter != fdSet_.end ()) 
00580     {
00581         fd  = (*iter).first;
00582         ehp = (*iter).second;
00583 
00584         if (mask_.isSet (fd) && ehp != NULL) 
00585         {
00586             eh_id = ehp->get_id ();
00587             DL((REACT,"Data detected from \"%s\"(fd=%d)\n",
00588                 eh_id.c_str (), fd));
00589 
00590             ret = (ehp->*callback_) (fd); /* Fire up a callback */
00591 
00592             if (ret == -1) {
00593                 removeIOHandler (fd);
00594             }
00595             else if (ret > 0) {
00596                 DL((REACT,"%d bytes pending on fd=%d \"%s\"\n",
00597                     ret, fd, eh_id.c_str ()));
00598                 //return;   <-- would starve other connections
00599             }
00600             else {
00601                 DL((REACT,"All data from \"%s\"(fd=%d) are consumed\n", 
00602                     eh_id.c_str (), fd));
00603                 mask_.clear (fd);
00604             }
00611             iter = fdSet_.begin ();
00612         }
00613         else {
00614             iter++;
00615         }
00616     }
00617 }

void Reactor::calculateTimeout ( TimeVal *&  howlong_,
TimeVal maxwait_ 
) [private]

Calculate closest timeout.

If TimerQueue is not empty, then return smallest of maxtimeout and first in the queue. Otherwise, return maxtimeout.

Parameters:
maxwait_ (in) how long we are expected to wait for event(s).
howlong_ (out) how long we are going to wait.

Definition at line 421 of file Reactor.cpp.

References DL, ASSA::TimerQueue::dump(), ASSA::TimeVal::gettimeofday(), ASSA::TimerQueue::isEmpty(), m_tqueue, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::top(), trace_with_mask, and ASSA::TimeVal::zeroTime().

Referenced by waitForEvents().

00422 {
00423     trace_with_mask("Reactor::calculateTimeout",REACTTRACE);
00424 
00425     TimeVal now;
00426     TimeVal tv;
00427 
00428     if (m_tqueue.isEmpty () ) {
00429         howlong_ = maxwait_;
00430         goto done;
00431     }
00432     now = TimeVal::gettimeofday ();
00433     tv = m_tqueue.top ();
00434     
00435     if (tv < now) {
00436         /*--- 
00437           It took too long to get here (fraction of a millisecond), 
00438           and top timer had already expired. In this case,
00439           perform non-blocking select in order to drain the timer queue.
00440           ---*/
00441         *howlong_ = 0;
00442     }
00443     else {  
00444         DL((REACT,"--------- Timer Queue ----------\n"));
00445         m_tqueue.dump();
00446         DL((REACT,"--------------------------------\n"));
00447 
00448         if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) {
00449             *howlong_ = tv - now;
00450         }
00451         else {
00452             *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now;
00453         }
00454     }
00455 
00456  done:
00457     if (howlong_ != NULL) {
00458         DL((REACT,"delay (%f)\n", double (*howlong_) ));
00459     }
00460     else {
00461         DL((REACT,"delay (forever)\n"));
00462     }
00463 }


Member Data Documentation

Max number of open files per process.

This is the soft limit enforced by the kernel. It can be obtained/manipulated from the shell with ulimit/limit utilities, but may not exceed the hard limit.

Definition at line 200 of file Reactor.h.

Referenced by checkFDs(), and Reactor().

Max file descriptor number (in all sets) plus 1.

This value is ignored by WIN32 implementation of select()

Definition at line 206 of file Reactor.h.

Referenced by adjust_maxfdp1(), registerIOHandler(), and waitForEvents().

bool ASSA::Reactor::m_active [private]

Flag that indicates whether Reactor is active or had been stopped.

Definition at line 209 of file Reactor.h.

Referenced by deactivate(), handleError(), stopReactor(), and waitForEvents().

Event handlers awaiting on READ_EVENT.

Definition at line 212 of file Reactor.h.

Referenced by checkFDs(), dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().

Event handlers awaiting on WRITE_EVENT.

Definition at line 215 of file Reactor.h.

Referenced by dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().

Event handlers awaiting on EXCEPT_EVENT.

Definition at line 218 of file Reactor.h.

Referenced by dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().

Handlers to wait for event on.

Definition at line 221 of file Reactor.h.

Referenced by adjust_maxfdp1(), registerIOHandler(), removeHandler(), removeIOHandler(), and waitForEvents().

Handlers that are ready for processing.

Definition at line 224 of file Reactor.h.

Referenced by dispatch(), isAnyReady(), removeIOHandler(), and waitForEvents().

The queue of Timers.

Definition at line 227 of file Reactor.h.

Referenced by calculateTimeout(), dispatch(), registerTimerHandler(), removeHandler(), removeTimerHandler(), and waitForEvents().


The documentation for this class was generated from the following files:

Generated on Sun Mar 2 15:15:48 2008 for libassa by  doxygen 1.5.5