OmniEvents
ProxyPullSupplier.cc
Go to the documentation of this file.
1 // Package : omniEvents
2 // ProxyPullSupplier.cc Created : 2003/12/04
3 // Author : Alex Tingle
4 //
5 // Copyright (C) 2003-2005 Alex Tingle.
6 //
7 // This file is part of the omniEvents application.
8 //
9 // omniEvents is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // omniEvents is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 //
23 
24 #include "ProxyPullSupplier.h"
25 #include "EventChannel.h"
26 #include "Orb.h"
27 #include "omniEventsLog.h"
28 #include "PersistNode.h"
29 #include <assert.h>
30 
31 namespace OmniEvents {
32 
33 //
34 // ProxyPullSupplierManager
35 //
36 
37 PortableServer::Servant ProxyPullSupplierManager::incarnate(
38  const PortableServer::ObjectId& oid,
39  PortableServer::POA_ptr poa
40 )
41 {
42  // Evict the oldest proxy servant, if we have reached the maximum number.
43  if(_servants.size()>=_channel.maxNumProxies())
44  {
45  ProxyPullSupplier_i* oldest =NULL;
46  unsigned long age =0;
47  for(set<Proxy*>::iterator i=_servants.begin(); i!=_servants.end(); ++i)
48  if(!oldest || dynamic_cast<ProxyPullSupplier_i*>(*i)->timestamp()<age)
49  {
50  oldest=dynamic_cast<ProxyPullSupplier_i*>(*i);
51  age=oldest->timestamp();
52  }
53  DB(5,"Evicting oldest ProxyPullSupplier to make space for a new one")
54  try{ oldest->disconnect_pull_supplier(); }catch(CORBA::OBJECT_NOT_EXIST&){}
55  }
56  // Make a new servant.
58  _servants.insert(result);
59  return result;
60 }
61 
63  const EventChannel_i& channel,
64  PortableServer::POA_ptr parentPoa,
65  EventQueue& q
66 )
67 : ProxyManager(parentPoa),
68  _queue(q),
69  _channel(channel)
70 {
71  ProxyManager::activate("ProxyPullSupplier");
72 }
73 
75 {
76  DB(20,"~ProxyPullSupplierManager()")
77 }
78 
80 
81 CosEventChannelAdmin::ProxyPullSupplier_ptr
82 ProxyPullSupplierManager::createObject()
83 {
84  return createNarrowedReference<CosEventChannelAdmin::ProxyPullSupplier>(
85  _managedPoa.in(),
86  CosEventChannelAdmin::_tc_ProxyPullSupplier->id()
87  );
88 }
89 
91 {
92  for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
93  {
94  ProxyPullSupplier_i* pps =dynamic_cast<ProxyPullSupplier_i*>(*i);
95  // We are in the EventChannel's thread.
96  // Make sure all calls go though the ProxyPullSupplier POA.
97  CosEventChannelAdmin::ProxyPullSupplier_var ppsv =pps->_this();
99 
100  }
101 }
102 
103 
104 //
105 // ProxyPullSupplier_i
106 //
107 
108 // CORBA interface methods
109 
111  CosEventComm::PullConsumer_ptr pullConsumer
112 )
113 {
114  if(_connected || !CORBA::is_nil(_target) || !CORBA::is_nil(_req))
115  throw CosEventChannelAdmin::AlreadyConnected();
116  touch();
117  _connected=true;
118  if(!CORBA::is_nil(pullConsumer))
119  _target=CosEventComm::PullConsumer::_duplicate(pullConsumer);
120 
122  {
123  WriteLock log;
124  output(log.os);
125  }
126 }
127 
129 {
130  DB(5,"ProxyPullSupplier_i::disconnect_pull_supplier()");
131  touch();
132  eraseKey("ConsumerAdmin/ProxyPullSupplier");
134  if(!_connected)
135  {
136  throw CORBA::OBJECT_NOT_EXIST(
137  IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
138  CORBA::COMPLETED_NO
139  );
140  }
141  else if(!CORBA::is_nil(_target))
142  {
143  CORBA::Request_var req=_target->_request("disconnect_pull_consumer");
144  _target=CosEventComm::PullConsumer::_nil();
145  req->send_deferred();
146  Orb::inst().deferredRequest(req._retn());
147  }
148 }
149 
151 {
152  if(!_connected)
153  throw CosEventComm::Disconnected();
154  touch();
155  if(moreEvents())
156  return new CORBA::Any(*nextEvent());
157  else
158  throw CORBA::TRANSIENT(
159  IFELSE_OMNIORB4(omni::TRANSIENT_CallTimedout,0),
160  CORBA::COMPLETED_NO
161  );
162 }
163 
164 CORBA::Any* ProxyPullSupplier_i::try_pull(CORBA::Boolean& has_event)
165 {
166  if(!_connected)
167  throw CosEventComm::Disconnected();
168  touch();
169  if(moreEvents())
170  {
171  has_event=1;
172  return new CORBA::Any(*nextEvent());
173  }
174  else
175  {
176  has_event=0;
177  return new CORBA::Any();
178  }
179 }
180 
181 //
182 
184  PortableServer::POA_ptr poa,
185  EventQueue& q
186 )
187 : Proxy(poa),
188  EventQueue::Reader(q),
189  _target(CosEventComm::PullConsumer::_nil()),
190  _connected(false),
191  _timestamp(0)
192 {
193  touch();
194 }
195 
197 {
198  DB(20,"~ProxyPullSupplier_i()")
199 }
200 
202  const string& oid,
203  const PersistNode& node
204 )
205 {
206  CosEventComm::PullConsumer_var pullConsumer =
207  string_to_<CosEventComm::PullConsumer>(node.attrString("IOR").c_str());
208  // Do not activate until we know that we have read a valid target.
209  activateObjectWithId(oid.c_str());
210  connect_pull_consumer(pullConsumer.in());
211 }
212 
214 {
215  basicOutput(os,"ConsumerAdmin/ProxyPullSupplier",_target.in());
216 }
217 
219 {
220  unsigned long nsec; // dummy
221  omni_thread::get_time(&_timestamp,&nsec);
222 }
223 
224 }; // end namespace OmniEvents