OmniEvents
ConsumerAdmin.cc
Go to the documentation of this file.
1 // Package : omniEvents
2 // ConsumerAdmin.cc Created : 2003/12/04
3 // Author : Alex Tingle
4 //
5 // Copyright (C) 2003-2005 Alex Tingle.
6 //
7 // This file is part of the omniEvents application.
8 //
9 // omniEvents is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // omniEvents is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 //
23 
24 #include "ConsumerAdmin.h"
25 
26 #include "EventChannel.h"
27 #include "ProxyPushSupplier.h"
28 #include "ProxyPullSupplier.h"
29 #include "Orb.h"
30 #include "PersistNode.h"
31 #include "Filter.h"
32 
33 namespace OmniEvents {
34 
35 
36 CosEventChannelAdmin::ProxyPushSupplier_ptr
38 {
39  if(!_pushSupplier)
41  return _pushSupplier->createObject();
42 }
43 
44 
45 CosEventChannelAdmin::ProxyPullSupplier_ptr
47 {
48  if(!_pullSupplier)
50  return _pullSupplier->createObject();
51 }
52 
53 
55  const EventChannel_i& channel,
56  PortableServer::POA_ptr poa
57 )
58 : Servant(poa),
59  _channel(channel),
60  _queue(channel.maxQueueLength()),
61  _pushSupplier(NULL),
62  _pullSupplier(NULL)
63 {
64  if(_channel.properties().hasAttr("FilterId"))
65  {
66  string rid =_channel.properties().attrString("FilterId");
67  _queue.setFilter(new FilterByRepositoryId(rid.c_str()));
68  }
69  else if(_channel.properties().hasAttr("FilterKind"))
70  {
71  CORBA::TCKind kind =
72  CORBA::TCKind(_channel.properties().attrLong("FilterKind"));
73  _queue.setFilter(new FilterByTCKind(kind));
74  }
75 
76  activateObjectWithId("ConsumerAdmin");
77 }
78 
79 
81 {
82  DB(20,"~ConsumerAdmin_i()")
83  if(_pushSupplier)
84  {
85  _pushSupplier->_remove_ref(); // terminates thread.
86  _pushSupplier=NULL;
87  }
88  if(_pullSupplier)
89  {
90  _pullSupplier->_remove_ref();
91  _pullSupplier=NULL;
92  }
93 }
94 
95 
97 
98 
99 void ConsumerAdmin_i::send(CORBA::Any* event)
100 {
102  _queue.append(event);
103 }
104 
105 
106 void ConsumerAdmin_i::send(list<CORBA::Any*>& events)
107 {
108  if(!events.empty())
109  {
111  for(list<CORBA::Any*>::iterator i=events.begin(); i!=events.end(); ++i)
112  _queue.append( *i );
113  events.clear();
114  }
115 }
116 
117 
119 {
120  if(_pushSupplier)
122  if(_pullSupplier)
124 }
125 
126 
128 {
129  // Build Push Supplier proxies
130  PersistNode* pushsNode =node.child("ProxyPushSupplier");
131  if(pushsNode && !pushsNode->_child.empty())
132  {
134  _pushSupplier->reincarnate(*pushsNode);
135  }
136 
137  // Build Pull Supplier proxies
138  PersistNode* pullsNode =node.child("ProxyPullSupplier");
139  if(pullsNode && !pullsNode->_child.empty())
140  {
142  _pullSupplier->reincarnate(*pullsNode);
143  }
144 }
145 
146 
147 void ConsumerAdmin_i::output(ostream& os)
148 {
149  if(_pushSupplier)
150  {
151  omni_mutex_lock l(_pushSupplier->_lock);
152  _pushSupplier->output(os);
153  }
154  if(_pullSupplier)
155  {
156  _pullSupplier->output(os);
157  }
158 }
159 
160 
161 }; // end namespace OmniEvents