CCA
|
InputPort.h
Go to the documentation of this file.
00001 /* ============================================================ 00002 * 00003 * This file is a part of CCA project 00004 * 00005 * Copyright (C) 2011 by Arne Nordmann <anordman at cor-lab dot uni-bielefeld dot de> 00006 * 00007 * This file may be licensed under the terms of the 00008 * GNU Lesser General Public License Version 3 (the ``LGPL''), 00009 * or (at your option) any later version. 00010 * 00011 * Software distributed under the License is distributed 00012 * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either 00013 * express or implied. See the LGPL for the specific language 00014 * governing rights and limitations. 00015 * 00016 * You should have received a copy of the LGPL along with this 00017 * program. If not, go to http://www.gnu.org/licenses/lgpl.html 00018 * or write to the Free Software Foundation, Inc., 00019 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 00020 * 00021 * The development of this software was supported by: 00022 * CoR-Lab, Research Institute for Cognition and Robotics 00023 * Bielefeld University 00024 * 00025 * ============================================================ */ 00026 00027 #pragma once 00028 00029 #include <boost/shared_ptr.hpp> 00030 #include <boost/bind.hpp> 00031 00032 #include <rsb/Listener.h> 00033 #include <rsb/Handler.h> 00034 #include <rsb/Event.h> 00035 00036 #include "cca/Port.h" 00037 #include "cca/buffer/SingleItemBuffer.h" 00038 #include "cca/buffer/SynchronizedQueue.h" 00039 00040 namespace cca { 00041 00042 class InputPortBase; 00043 typedef ::boost::shared_ptr<InputPortBase> InputPortPtr; 00044 00048 class InputPortBase: public Port { 00049 00050 public: 00051 00060 InputPortBase(bool optional = false, 00061 const std::string &datatype = "unknown"); 00062 00067 virtual void configureSpecifics(); 00068 00073 virtual void setHandler(::rsb::HandlerPtr handler); 00074 00078 virtual void purge() = 0; 00079 00086 virtual bool empty() const = 0; 00087 00094 virtual bool newItem() const = 0; 00095 00102 virtual unsigned int size() const = 0; 00103 00104 virtual ~InputPortBase(); 00105 00106 protected: 00107 00108 class InputHandler: public rsb::Handler { 00109 public: 00114 InputHandler(const std::string &datatype) : 00115 rsb::Handler(), callback(), handler(), mydatatype("unknown") { 00116 mydatatype = datatype; 00117 } 00118 00119 void setNextHandler(rsb::HandlerPtr h) { 00120 this->handler = h; 00121 } 00122 00123 void setInputCallback(const boost::function1<void, rsb::EventPtr> &c) { 00124 callback = c; 00125 } 00126 00127 void handle(rsb::EventPtr event) { 00128 this->callback(event); 00129 00130 rsb::HandlerPtr h = this->handler; 00131 if (h) { 00132 h->handle(event); 00133 } 00134 } 00135 00136 void printContents(std::ostream& stream) const { 00137 stream << "type = " << mydatatype; 00138 } 00139 00140 protected: 00141 boost::function1<void, rsb::EventPtr> callback; 00142 rsb::HandlerPtr handler; 00143 std::string mydatatype; 00144 }; 00145 00146 boost::shared_ptr<InputHandler> handler; 00147 00152 InputPortBase(InputPortBase &port); 00153 00158 void operator=(const InputPortBase &port); 00159 00161 ::rsb::ListenerPtr listener; 00162 rsc::logging::LoggerPtr logger; 00163 }; 00164 00168 template<typename DATATYPE> 00169 class InputPort: public InputPortBase, public Buffering<DATATYPE> { 00170 00171 public: 00172 00174 typedef ::boost::shared_ptr<DATATYPE> DataPtr; 00175 00177 typedef ::boost::shared_ptr<InputPort<DATATYPE> > Ptr; 00178 00179 typedef typename Buffer<DATATYPE>::Ptr BufferPtr; 00180 00186 static Ptr create() { 00187 return Ptr(new InputPort<DATATYPE>()); 00188 } 00189 00193 virtual void configureSpecifics() { 00194 RSCTRACE(logger, "InputPort::configureSpecifics()"); 00195 // Buffering 00196 if (config->getQueueSize() > 1) { 00197 this->buffer = typename Buffer<DATATYPE>::Ptr( 00198 new cca::SynchronizedQueue<DATATYPE>(config->getQueueSize(), 00199 config->keepLatest())); 00200 } else { 00201 this->buffer = typename Buffer<DATATYPE>::Ptr( 00202 new cca::SingleItemBuffer<DATATYPE>(config->keepLatest())); 00203 } 00204 00205 handler->setInputCallback( 00206 boost::bind(&InputPort<DATATYPE>::addEvent, this, _1)); 00207 00208 InputPortBase::configureSpecifics(); 00209 } 00210 00217 InputPort(bool optional = false) : 00218 InputPortBase(optional, rsc::runtime::typeName<DATATYPE>()), 00219 buffer(), 00220 logger(rsc::logging::Logger::getLogger("cca.inputport")) { 00221 } 00222 00228 virtual DataPtr get() throw () { 00229 if (this->configured) { 00230 return buffer->get(); 00231 } 00232 return DataPtr(); 00233 } 00234 00238 virtual void addEvent(rsb::EventPtr item) { 00239 if (this->configured) { 00240 buffer->add(boost::static_pointer_cast<DATATYPE>(item->getData())); 00241 } 00242 } 00243 00247 virtual void add(DataPtr item) { 00248 RSCTRACE(logger, "InputPort::add()"); 00249 if (this->configured) { 00250 buffer->add(item); 00251 } 00252 } 00253 00254 virtual bool empty() const { 00255 if (this->configured) { 00256 return buffer->empty(); 00257 } 00258 return true; 00259 } 00260 00261 virtual bool newItem() const { 00262 if (this->configured) { 00263 return buffer->newItem(); 00264 } 00265 return false; 00266 } 00267 00268 virtual unsigned int size() const { 00269 if (this->configured) { 00270 return buffer->size(); 00271 } 00272 return 0; 00273 } 00274 00275 virtual void purge() { 00276 if (this->configured) { 00277 buffer->purge(); 00278 } 00279 } 00280 00281 virtual std::string print() const { 00282 ::std::ostringstream outstream(::std::ostringstream::out); 00283 outstream << "<" << rsc::runtime::typeName<DATATYPE>() << ">"; 00284 if (configured) { 00285 outstream << " " << getConfig()->print(); 00286 outstream << "(" << getScopePtr()->toString() << ")"; 00287 } else { 00288 outstream << " <unconfigured>"; 00289 } 00290 if (optional) { 00291 outstream << " [optional]"; 00292 } 00293 return outstream.str(); 00294 } 00295 00296 virtual std::string bla() const { 00297 ::std::ostringstream outstream(::std::ostringstream::out); 00298 outstream << "<" << rsc::runtime::typeName<DATATYPE>() << ">"; 00299 return outstream.str(); 00300 } 00301 00302 virtual ~InputPort() { 00303 } 00304 00305 protected: 00306 00311 InputPort(InputPort &port); 00312 00317 void operator=(const InputPort &port); 00318 00320 BufferPtr buffer; 00321 rsc::logging::LoggerPtr logger; 00322 00323 }; 00324 00325 }