30 namespace OmniEvents {
51 PortableServer::Servant
53 const PortableServer::ObjectId& oid,
54 PortableServer::POA_ptr poa
65 const PortableServer::ObjectId& oid,
66 PortableServer::POA_ptr adapter,
67 PortableServer::Servant serv,
68 CORBA::Boolean cleanup_in_progress,
69 CORBA::Boolean remaining_activations
76 omni_mutex_lock pause(
_lock);
78 assert(narrowed!=NULL);
79 set<Proxy*>::iterator pos =
_servants.find(narrowed);
83 narrowed->_remove_ref();
87 DB(1,
"\t\teh? - POA attempted to etherealize unknown servant.");
92 PortableServer::POA_ptr parentPoa,
96 omni_thread(NULL,PRIORITY_HIGH),
98 _lock(),_condition(&_lock),
107 DB(20,
"~ProxyPushSupplierManager()")
110 CosEventChannelAdmin::ProxyPushSupplier_ptr
113 return createNarrowedReference<CosEventChannelAdmin::ProxyPushSupplier>(
115 CosEventChannelAdmin::_tc_ProxyPushSupplier->id()
127 CosEventChannelAdmin::ProxyPushSupplier_var ppsv =pps->_this();
150 const unsigned long sleepTimeNanosec0 =0x8000;
151 const unsigned long maxSleepNanosec =0x800000;
152 unsigned long sleepTimeNanosec =sleepTimeNanosec0;
154 omni_mutex_lock conditionLock(
_lock);
179 sleepTimeNanosec=sleepTimeNanosec0;
185 if(sleepTimeNanosec<maxSleepNanosec)
186 sleepTimeNanosec<<=1;
187 unsigned long sec,nsec;
188 omni_thread::get_time(&sec,&nsec,0,sleepTimeNanosec);
198 catch (CORBA::SystemException& ex) {
199 DB(2,
"ProxyPushSupplierManager ignoring CORBA system exception"
202 catch (CORBA::Exception& ex) {
203 DB(2,
"ProxyPushSupplierManager ignoring CORBA exception"
207 DB(2,
"ProxyPushSupplierManager thread killed by unknown exception.")
216 #if OMNIEVENTS__DEBUG_REF_COUNTS
217 DB(20,
"ProxyPushSupplierManager::_add_ref()")
219 omni_mutex_lock pause(
_lock);
225 #if OMNIEVENTS__DEBUG_REF_COUNTS
226 DB(20,
"ProxyPushSupplierManager::_remove_ref()")
235 DB(2,
"ProxyPushSupplierManager has negative ref count! "<<myref)
239 DB(15,
"ProxyPushSupplierManager has zero ref count -- shutdown.")
250 CosEventComm::PushConsumer_ptr pushConsumer)
252 if(CORBA::is_nil(pushConsumer))
253 throw CORBA::BAD_PARAM();
254 if(!CORBA::is_nil(
_target) || !CORBA::is_nil(
_req))
255 throw CosEventChannelAdmin::AlreadyConnected();
256 _target=CosEventComm::PushConsumer::_duplicate(pushConsumer);
260 CORBA::Request_var req =
_target->_request(
"_is_a");
261 req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushConsumer->id();
262 req->set_return_type(CORBA::_tc_boolean);
263 req->send_deferred();
276 DB(5,
"ProxyPushSupplier_i::disconnect_push_supplier()");
277 eraseKey(
"ConsumerAdmin/ProxyPushSupplier");
281 throw CORBA::OBJECT_NOT_EXIST(
288 CORBA::Request_var req=
_target->_request(
"disconnect_push_consumer");
289 _target=CosEventComm::PushConsumer::_nil();
290 req->send_deferred();
297 PortableServer::POA_ptr poa,
302 _target(CosEventComm::PushConsumer::_nil()),
303 _targetIsProxy(false)
310 DB(20,
"~ProxyPushSupplier_i()")
317 if(!CORBA::is_nil(_req) && _req->poll_response())
319 CORBA::Environment_ptr env=_req->env();
320 if(!CORBA::is_nil(env) && env->exception())
323 CORBA::Exception* ex =env->exception();
324 DB(10,
"ProxyPushSupplier got exception" IF_OMNIORB4(
": "<<ex->_name()) );
326 _req=CORBA::Request::_nil();
329 CORBA::Request_var req=_target->_request(
"disconnect_push_consumer");
330 req->send_deferred();
333 _target=CosEventComm::PushConsumer::_nil();
334 eraseKey(
"ConsumerAdmin/ProxyPushSupplier");
338 _req=CORBA::Request::_nil();
341 if(CORBA::is_nil(_req) && !CORBA::is_nil(_target) && moreEvents())
343 _req=_target->_request(
"push");
344 _req->add_in_arg() <<= *(nextEvent());
345 _req->send_deferred();
348 if(!CORBA::is_nil(_req))
359 DB(2,
"WARNING: Multiple connections to ProxyPushSupplier.");
361 else if(req->return_value()>>=CORBA::Any::to_boolean(
_targetIsProxy))
367 DB(15,
"ProxyPushSupplier is federated.");
372 DB(2,
"ProxyPushSupplier got unexpected callback.");
385 using namespace CosEventChannelAdmin;
388 CosEventComm::PushConsumer_var pushConsumer =
389 string_to_<CosEventComm::PushConsumer>(ior.c_str());
399 DB(15,
"Attempting to reconnect ProxyPushSupplier: "<<oid.c_str())
402 ProxyPushConsumer_var proxyCons =
403 string_to_<ProxyPushConsumer>(ior.c_str());
404 CosEventComm::PushSupplier_var thisSupp =_this();
405 proxyCons->connect_push_supplier(thisSupp);
406 DB(7,
"Reconnected ProxyPushSupplier: "<<oid.c_str())
409 catch(CosEventChannelAdmin::AlreadyConnected&){
411 DB(7,
"Remote ProxyPushConsumer already connected: "<<oid.c_str())
413 catch(CosEventChannelAdmin::TypeError&){
415 DB(2,
"Remote ProxyPushConsumer threw TypeError: "<<oid.c_str())
417 catch(CORBA::OBJECT_NOT_EXIST&) {}
418 catch(CORBA::TRANSIENT& ) {}
419 catch(CORBA::COMM_FAILURE& ) {}
426 os,
"ConsumerAdmin/ProxyPushSupplier",