OmniEvents
SupplierAdmin.cc
Go to the documentation of this file.
1 // Package : omniEvents
2 // SupplierAdmin.h Created : 2003/12/04
3 // Author : Alex Tingle
4 //
5 // Copyright (C) 2003-2005 Alex Tingle.
6 //
7 // This file is part of the omniEvents application.
8 //
9 // omniEvents is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // omniEvents is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 //
23 
24 #include "SupplierAdmin.h"
25 
26 #include "EventChannel.h"
27 #include "ProxyPushConsumer.h"
28 #include "ProxyPullConsumer.h"
29 #include "Orb.h"
30 #include "PersistNode.h"
31 
32 #define MILLION 1000000
33 #define BILLION 1000000000
34 
35 namespace OmniEvents {
36 
37 CosEventChannelAdmin::ProxyPushConsumer_ptr
39 {
40  return _pushConsumer->createObject();
41 }
42 
43 
44 CosEventChannelAdmin::ProxyPullConsumer_ptr
46 {
47  if(!_pullConsumer)
49  return _pullConsumer->createObject();
50 }
51 
52 
54  const EventChannel_i& channel,
55  PortableServer::POA_ptr poa
56 )
57 : Servant(poa),
58  _channel(channel),
59  _pushConsumer(NULL),
60  _pullConsumer(NULL),
61  _queue(),
62  _nextPull(0,0)
63 {
64  // Initialise _nextPull. Only set it if the cycle period is LESS than the
65  // pull retry period - otherwise just pull every cycle.
67  {
68  omni_thread::get_time(&(_nextPull.first),&(_nextPull.second));
69  }
70 
71  // Always create the ProxyPushConsumer_i default servant. This allows
72  // lazy clients to connect suppliers without having to go through the
73  // proper procedure - they can make up an appropriate ObjectId, call push()
74  // and it will just work (TM).
75  // Note: A SupplierAdmin_i is always created by the EventChannel to allow this
76  // behaviour.
78 
79  activateObjectWithId("SupplierAdmin");
80 }
81 
82 
84 {
85  DB(20,"~SupplierAdmin_i()")
86  if(_pullConsumer)
87  {
88  _pullConsumer->_remove_ref();
89  _pullConsumer=NULL;
90  }
91  if(_pushConsumer)
92  {
93  delete _pushConsumer;
94  _pushConsumer=NULL;
95  }
96  for(list<CORBA::Any*>::iterator i=_queue.begin(); i!=_queue.end(); ++i)
97  delete *i;
98 }
99 
100 
102 
103 
104 void SupplierAdmin_i::collect(list<CORBA::Any*>& events)
105 {
106  if(_pullConsumer)
107  {
108  _pullConsumer->collect();
109  if(0==_nextPull.first)
110  { // No delay between pulls.
111  _pullConsumer->triggerRequest();
112  }
113  else
114  { // Only trigger new pull() calls if `pullRetry' ms have passed.
115  pair<unsigned long,unsigned long> now;
116  omni_thread::get_time(&(now.first),&(now.second));
117  if(now>=_nextPull)
118  {
119  _pullConsumer->triggerRequest();
120 
121  CORBA::ULong p =_channel.pullRetryPeriod_ms();
122  do{
123  _nextPull.second += (p%1000)*MILLION; // nsec
124  _nextPull.first += p/1000 + _nextPull.second/BILLION; // sec
125  _nextPull.second %= BILLION; // nsec
126  } while(now>=_nextPull);
127  }
128  }
129  }
130  _pushConsumer->trigger();
131  // Pick up events from both pull & push consumers.
132  events=_queue;
133  _queue.clear();
134 }
135 
136 
138 {
139  if(_pushConsumer)
141  if(_pullConsumer)
143 }
144 
145 
147 {
148  // Build Push Consumer proxies
149  PersistNode* pushcNode =node.child("ProxyPushConsumer");
150  if(pushcNode && !pushcNode->_child.empty())
151  {
152  assert(_pushConsumer!=NULL);
153  _pushConsumer->reincarnate(*pushcNode);
154  }
155 
156  // Build Pull Consumer proxies
157  PersistNode* pullcNode =node.child("ProxyPullConsumer");
158  if(pullcNode && !pullcNode->_child.empty())
159  {
160  if(!_pullConsumer)
162  _pullConsumer->reincarnate(*pullcNode);
163  }
164 }
165 
166 
167 void SupplierAdmin_i::output(ostream& os)
168 {
169  if(_pushConsumer)
170  _pushConsumer->output(os);
171  if(_pullConsumer)
172  _pullConsumer->output(os);
173 }
174 
175 
176 }; // end namespace OmniEvents