Main Page   Namespace List   Class Hierarchy   Compound List   File List   Namespace Members   Compound Members   File Members  

ProxyPushSupplier.cc

Go to the documentation of this file.
00001 //                            Package   : omniEvents
00002 // ProxyPushSupplier.cc       Created   : 2003/12/04
00003 //                            Author    : Alex Tingle
00004 //
00005 //    Copyright (C) 2003 Alex Tingle.
00006 //
00007 //    This file is part of the omniEvents application.
00008 //
00009 //    omniEvents is free software; you can redistribute it and/or
00010 //    modify it under the terms of the GNU Lesser General Public
00011 //    License as published by the Free Software Foundation; either
00012 //    version 2.1 of the License, or (at your option) any later version.
00013 //
00014 //    omniEvents is distributed in the hope that it will be useful,
00015 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017 //    Lesser General Public License for more details.
00018 //
00019 //    You should have received a copy of the GNU Lesser General Public
00020 //    License along with this library; if not, write to the Free Software
00021 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 //
00023 
00024 #include "ProxyPushSupplier.h"
00025 #include "Orb.h"
00026 #include "omniEventsLog.h"
00027 #include "PersistNode.h"
00028 #include <assert.h>
00029 
00030 namespace OmniEvents {
00031 
00035 class omni_mutex_kcol {
00036     omni_mutex& mutex;
00037 public:
00038     omni_mutex_kcol(omni_mutex& m) : mutex(m) { mutex.unlock(); }
00039     ~omni_mutex_kcol(void) { mutex.lock(); }
00040 private:
00041     // dummy copy constructor and operator= to prevent copying
00042     omni_mutex_kcol(const omni_mutex_kcol&);
00043     omni_mutex_kcol& operator=(const omni_mutex_kcol&);
00044 };
00045 
00046 
00047 //
00048 //  ProxyPushSupplierManager
00049 //
00050 
00051 PortableServer::Servant
00052 ProxyPushSupplierManager::incarnate(
00053   const PortableServer::ObjectId& oid,
00054   PortableServer::POA_ptr         poa
00055 )
00056 {
00057   ProxyPushSupplier_i* result =new ProxyPushSupplier_i(_managedPoa,_queue);
00058   PauseThenWake p(this);
00059   _servants.insert(result);
00060   return result;
00061 }
00062 
00063 void
00064 ProxyPushSupplierManager::etherealize(
00065   const PortableServer::ObjectId& oid,
00066   PortableServer::POA_ptr         adapter,
00067   PortableServer::Servant         serv,
00068   CORBA::Boolean                  cleanup_in_progress,
00069   CORBA::Boolean                  remaining_activations
00070 )
00071 {
00072   omni_mutex_lock pause(_lock);
00073   ProxyManager::etherealize(oid,adapter,serv,
00074     cleanup_in_progress,remaining_activations);
00075 }
00076 
00077 ProxyPushSupplierManager::ProxyPushSupplierManager(
00078   PortableServer::POA_ptr parentPoa,
00079   EventQueue& q
00080 )
00081 : ProxyManager(parentPoa,"ProxyPushSupplier"),
00082   omni_thread(NULL,PRIORITY_HIGH),
00083   _queue(q),
00084   _lock(),_condition(&_lock),
00085   _refCount(1)
00086 {
00087   start();
00088 }
00089 
00090 ProxyPushSupplierManager::~ProxyPushSupplierManager()
00091 {
00092   DB(20,"~ProxyPushSupplierManager()")
00093 }
00094 
00095 CosEventChannelAdmin::ProxyPushSupplier_ptr
00096 ProxyPushSupplierManager::createObject()
00097 {  
00098   return createNarrowedReference<CosEventChannelAdmin::ProxyPushSupplier>(
00099            _managedPoa.in(),
00100            CosEventChannelAdmin::_tc_ProxyPushSupplier->id()
00101          );
00102 }
00103 
00104 void ProxyPushSupplierManager::disconnect()
00105 {
00106   for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00107   {
00108     Proxy* p =*i; // Sun's CC requires this temporary.
00109     ProxyPushSupplier_i* pps =static_cast<ProxyPushSupplier_i*>(p);
00110     pps->disconnect_push_supplier();
00111   }
00112 }
00113 
00114 void
00115 ProxyPushSupplierManager::run(void*)
00116 {
00117   // This loop repeatedly triggers all of the servants in turn. As long as
00118   // something happens each time, then we loop as fast as we can.
00119   // As soon as activity dries up, we start to wait longer and longer between
00120   // loops (up to a maximum). When there is no work to do, just block until
00121   // a new event arrives.
00122   //
00123   // Rationale: The faster we loop the more events we can deliver to each
00124   // consumer per second. However, when nothing is happening, this busy loop
00125   // just soaks up CPU and kills performance. The optimum sleep time varies
00126   // wildly from platform to platform, and also depends upon the typical ping
00127   // time to the consumers.
00128   //
00129   // This dynamic approach should deliver reasonable performance when things
00130   // are hectic, but not soak up too much CPU when not much is happening.
00131   //
00132   const unsigned long sleepTimeNanosec0 =0x8000;   // 33us (doubled before use)
00133   const unsigned long maxSleepNanosec   =0x800000; // 8.4ms
00134   unsigned long sleepTimeNanosec =sleepTimeNanosec0;
00135 
00136   omni_mutex_lock conditionLock(_lock);
00137   while(true)
00138   {
00139     try {
00140       if(_refCount<1)
00141           break;
00142 
00143       bool busy=false;
00144       bool waiting=false;
00145 
00146       // Trigger each servant in turn.
00147       for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00148       {
00149         Proxy* p =*i; // Sun's CC requires this temporary.
00150         ProxyPushSupplier_i* pps =static_cast<ProxyPushSupplier_i*>(p);
00151         pps->trigger(busy,waiting);
00152       }
00153 
00154       if(busy)
00155       {
00156         // Something happened last time round. So we'll be optimistic and
00157         // immediately go round for another go. Yield first, just to let the
00158         // other kids get in if they need to.
00159         omni_mutex_kcol l(_lock); // 'lock' reversed!
00160         omni_thread::yield();
00161         // Reset the sleep time.
00162         sleepTimeNanosec=sleepTimeNanosec0;
00163       }
00164       else if(waiting)
00165       {
00166         // Nothing happened, so we'll wait for a bit and then give it another
00167         // go. Each time we wait for twice as long, up to the maximum.
00168         if(sleepTimeNanosec<maxSleepNanosec)
00169             sleepTimeNanosec<<=1; // (multiply by 2)
00170         unsigned long sec,nsec;
00171         omni_thread::get_time(&sec,&nsec,0,sleepTimeNanosec);
00172         _condition.timedwait(sec,nsec);
00173       }
00174       else
00175       {
00176         // There is nothing to do, so block until a new event arrives.
00177         _condition.wait();
00178       }
00179 
00180     }
00181     catch (CORBA::SystemException& ex) {
00182       DB(2,"ProxyPushSupplierManager ignoring CORBA system exception"
00183          IF_OMNIORB4(": "<<ex._name()<<" ("<<NP_MINORSTRING(ex)<<")") ".")
00184     }
00185     catch (CORBA::Exception& ex) {
00186       DB(2,"ProxyPushSupplierManager ignoring CORBA exception"
00187          IF_OMNIORB4(": "<<ex._name()<<) ".")
00188     }
00189     catch(...) {
00190       DB(2,"ProxyPushSupplierManager thread killed by unknown exception.")
00191       break;
00192     }
00193   }
00194 }
00195 
00196 void ProxyPushSupplierManager::_add_ref()
00197 {
00198   omni_mutex_lock pause(_lock);
00199   ++_refCount;
00200 }
00201 
00202 void ProxyPushSupplierManager::_remove_ref()
00203 {
00204   int myref;
00205   {
00206     PauseThenWake p(this);
00207     myref = --_refCount;
00208   }
00209   if(myref<0)
00210       DB(2,"ProxyPushSupplierManager has negative ref count! "<<myref)
00211   else if(myref==0)
00212       DB(15,"ProxyPushSupplierManager has zero ref count -- shutdown.")
00213 }
00214 
00215 
00216 //
00217 //  ProxyPushSupplier_i
00218 //
00219 
00220 void ProxyPushSupplier_i::connect_push_consumer(
00221   CosEventComm::PushConsumer_ptr pushConsumer)
00222 {
00223   if(CORBA::is_nil(pushConsumer))
00224       throw CORBA::BAD_PARAM();
00225   if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req))
00226       throw CosEventChannelAdmin::AlreadyConnected();
00227   _target=CosEventComm::PushConsumer::_duplicate(pushConsumer);
00228 
00229   // Test to see whether pushSupplier is a ProxyPushSupplier.
00230   // If so, then we will aggressively try to reconnect, when we are reincarnated
00231   CORBA::Request_var req =_target->_request("_is_a");
00232   req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushConsumer->id();
00233   req->set_return_type(CORBA::_tc_boolean);
00234   req->send_deferred();
00235   Orb::inst().deferredRequest(req._retn(),this); // Register for callback
00236 
00237   output(WriteLock().os);
00238 }
00239 
00240 
00241 void ProxyPushSupplier_i::disconnect_push_supplier()
00242 {
00243   DB(5,"ProxyPushSupplier_i::disconnect_push_supplier()");
00244   eraseKey("ConsumerAdmin/ProxyPushSupplier");
00245   deactivateObject();
00246   if(CORBA::is_nil(_target))
00247   {
00248     throw CORBA::OBJECT_NOT_EXIST(
00249       IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
00250       CORBA::COMPLETED_NO
00251     );
00252   }
00253   else
00254   {
00255     CORBA::Request_var req=_target->_request("disconnect_push_consumer");
00256     req->send_deferred();
00257     Orb::inst().deferredRequest(req._retn());
00258     _target=CosEventComm::PushConsumer::_nil();
00259   }
00260 }
00261 
00262 
00263 ProxyPushSupplier_i::ProxyPushSupplier_i(
00264   PortableServer::POA_ptr poa,
00265   EventQueue&             q
00266 )
00267 : Proxy(poa),
00268   EventQueue::Reader(q),
00269   _target(CosEventComm::PushConsumer::_nil()),
00270   _targetIsProxy(false)
00271 {
00272   // pass
00273 }
00274 
00275 ProxyPushSupplier_i::~ProxyPushSupplier_i()
00276 {
00277   DB(20,"~ProxyPushSupplier_i()")
00278 }
00279 
00280 inline void ProxyPushSupplier_i::trigger(bool& busy, bool& waiting)
00281 {
00282   if(!CORBA::is_nil(_req) && _req->poll_response()) // response has arrived
00283   {
00284     CORBA::Environment_ptr env=_req->env(); // No need to free environment.
00285     if(!CORBA::is_nil(env) && env->exception())
00286     {
00287       // Shut down the connection
00288       CORBA::Exception* ex =env->exception(); // No need to free exception.
00289       DB(10,"ProxyPushSupplier got exception" IF_OMNIORB4(": "<<ex->_name()) );
00290       Orb::inst().reportObjectFailure(HERE,_target.in(),ex);
00291       _req=CORBA::Request::_nil();
00292 
00293       // Try to notify the Consumer that the connection is closing.
00294       CORBA::Request_var req=_target->_request("disconnect_push_consumer");
00295       req->send_deferred();
00296       Orb::inst().deferredRequest(req._retn());
00297 
00298       _target=CosEventComm::PushConsumer::_nil(); // disconnected.
00299       eraseKey("ConsumerAdmin/ProxyPushSupplier");
00300       deactivateObject();
00301       return; // No more work to do
00302     }
00303     _req=CORBA::Request::_nil();
00304     busy=true;
00305   }
00306   if(CORBA::is_nil(_req) && !CORBA::is_nil(_target) && moreEvents())
00307   {
00308     _req=_target->_request("push");
00309     _req->add_in_arg() <<= *(nextEvent());
00310     _req->send_deferred();
00311     busy=true;
00312   }
00313   if(!CORBA::is_nil(_req)) // More work to do, if _req NOT nil.
00314       waiting=true;
00315 }
00316 
00317 
00318 void ProxyPushSupplier_i::callback(CORBA::Request_ptr req)
00319 {
00320   if(_targetIsProxy)
00321   {
00322     // There should only ever be one of these callbacks per proxy,
00323     // because each proxy should only be connected once.
00324     DB(2,"WARNING: Multiple connections to ProxyPushSupplier.");
00325   }
00326   else if(req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy))
00327   {
00328     if(_targetIsProxy)
00329     {
00330       output(WriteLock().os);
00331       DB(15,"ProxyPushSupplier is federated.");
00332     }
00333   }
00334   else
00335   {
00336     DB(2,"ProxyPushSupplier got unexpected callback.");
00337     _targetIsProxy=false; // Reset it just to be sure.
00338   }
00339 }
00340 
00341 
00342 void ProxyPushSupplier_i::reincarnate(
00343   const string&      oid,
00344   const PersistNode& node
00345 )
00346 {
00347   try
00348   {
00349     using namespace CosEventChannelAdmin;
00350 
00351     string ior( node.attrString("IOR").c_str() );
00352     CosEventComm::PushConsumer_var pushConsumer =
00353       string_to_<CosEventComm::PushConsumer>(ior.c_str());
00354     // Do not activate until we know that we have read a valid target.
00355     activateObjectWithId(oid.c_str());
00356     _target=pushConsumer._retn();
00357     _targetIsProxy=bool(node.attrLong("proxy"));
00358 
00359     // If pushConsumer is a proxy, then try to reconnect.
00360     if(_targetIsProxy)
00361     {
00362       DB(15,"Attempting to reconnect ProxyPushSupplier: "<<oid.c_str())
00363       // This will only work if the proxy is implemented in the same way as
00364       // omniEvents, so connect_() automatically creates a proxy.
00365       ProxyPushConsumer_var proxyCons =
00366         string_to_<ProxyPushConsumer>(ior.c_str());
00367       CosEventComm::PushSupplier_var thisSupp =_this();
00368       proxyCons->connect_push_supplier(thisSupp);
00369       DB(7,"Reconnected ProxyPushSupplier: "<<oid.c_str())
00370     }
00371   }
00372   catch(CosEventChannelAdmin::AlreadyConnected&){ // connect_push_supplier()
00373     // The supplier doesn't need to be reconnected.
00374     DB(7,"Remote ProxyPushConsumer already connected: "<<oid.c_str())
00375   }
00376   catch(CosEventChannelAdmin::TypeError&){ // connect_push_supplier()
00377     // Don't know what to make of this...
00378     DB(2,"Remote ProxyPushConsumer threw TypeError: "<<oid.c_str())
00379   }
00380   catch(CORBA::OBJECT_NOT_EXIST&) {} // object 'pushConsumer' not responding.
00381   catch(CORBA::TRANSIENT&       ) {} // object 'pushConsumer' not responding.
00382   catch(CORBA::COMM_FAILURE&    ) {} // object 'pushConsumer' not responding.
00383 }
00384 
00385 
00386 void ProxyPushSupplier_i::output(ostream &os)
00387 {
00388   basicOutput(
00389     os,"ConsumerAdmin/ProxyPushSupplier",
00390     _target.in(),
00391     _targetIsProxy? " proxy=1": NULL
00392   );
00393 }
00394 
00395 
00396 }; // end namespace OmniEvents

Generated on Fri Nov 19 17:42:21 2004 for OmniEvents by doxygen1.2.15