CCA
InputPort.h
Go to the documentation of this file.
00001 /* ============================================================
00002  *
00003  * This file is a part of CCA project
00004  *
00005  * Copyright (C) 2011 by Arne Nordmann <anordman at cor-lab dot uni-bielefeld dot de>
00006  *
00007  * This file may be licensed under the terms of the
00008  * GNU Lesser General Public License Version 3 (the ``LGPL''),
00009  * or (at your option) any later version.
00010  *
00011  * Software distributed under the License is distributed
00012  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
00013  * express or implied. See the LGPL for the specific language
00014  * governing rights and limitations.
00015  *
00016  * You should have received a copy of the LGPL along with this
00017  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
00018  * or write to the Free Software Foundation, Inc.,
00019  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
00020  *
00021  * The development of this software was supported by:
00022  *   CoR-Lab, Research Institute for Cognition and Robotics
00023  *     Bielefeld University
00024  *
00025  * ============================================================ */
00026 
00027 #pragma once
00028 
00029 #include <boost/shared_ptr.hpp>
00030 #include <boost/bind.hpp>
00031 
00032 #include <rsb/Listener.h>
00033 #include <rsb/Handler.h>
00034 #include <rsb/Event.h>
00035 
00036 #include "cca/Port.h"
00037 #include "cca/buffer/SingleItemBuffer.h"
00038 #include "cca/buffer/SynchronizedQueue.h"
00039 
00040 namespace cca {
00041 
00042 class InputPortBase;
00043 typedef ::boost::shared_ptr<InputPortBase> InputPortPtr;
00044 
00048 class InputPortBase: public Port {
00049 
00050 public:
00051 
00060     InputPortBase(bool optional = false,
00061             const std::string &datatype = "unknown");
00062 
00067     virtual void configureSpecifics();
00068 
00073     virtual void setHandler(::rsb::HandlerPtr handler);
00074 
00078     virtual void purge() = 0;
00079 
00086     virtual bool empty() const = 0;
00087 
00094     virtual bool newItem() const = 0;
00095 
00102     virtual unsigned int size() const = 0;
00103 
00104     virtual ~InputPortBase();
00105 
00106 protected:
00107 
00108     class InputHandler: public rsb::Handler {
00109     public:
00114         InputHandler(const std::string &datatype) :
00115                 rsb::Handler(), callback(), handler(), mydatatype("unknown") {
00116             mydatatype = datatype;
00117         }
00118 
00119         void setNextHandler(rsb::HandlerPtr h) {
00120             this->handler = h;
00121         }
00122 
00123         void setInputCallback(const boost::function1<void, rsb::EventPtr> &c) {
00124             callback = c;
00125         }
00126 
00127         void handle(rsb::EventPtr event) {
00128             this->callback(event);
00129 
00130             rsb::HandlerPtr h = this->handler;
00131             if (h) {
00132                 h->handle(event);
00133             }
00134         }
00135 
00136         void printContents(std::ostream& stream) const {
00137             stream << "type = " << mydatatype;
00138         }
00139 
00140     protected:
00141         boost::function1<void, rsb::EventPtr> callback;
00142         rsb::HandlerPtr handler;
00143         std::string mydatatype;
00144     };
00145 
00146     boost::shared_ptr<InputHandler> handler;
00147 
00152     InputPortBase(InputPortBase &port);
00153 
00158     void operator=(const InputPortBase &port);
00159 
00161     ::rsb::ListenerPtr listener;
00162     rsc::logging::LoggerPtr logger;
00163 };
00164 
00168 template<typename DATATYPE>
00169 class InputPort: public InputPortBase, public Buffering<DATATYPE> {
00170 
00171 public:
00172 
00174     typedef ::boost::shared_ptr<DATATYPE> DataPtr;
00175 
00177     typedef ::boost::shared_ptr<InputPort<DATATYPE> > Ptr;
00178 
00179     typedef typename Buffer<DATATYPE>::Ptr BufferPtr;
00180 
00186     static Ptr create() {
00187         return Ptr(new InputPort<DATATYPE>());
00188     }
00189 
00193     virtual void configureSpecifics() {
00194         RSCTRACE(logger, "InputPort::configureSpecifics()");
00195         // Buffering
00196         if (config->getQueueSize() > 1) {
00197             this->buffer = typename Buffer<DATATYPE>::Ptr(
00198                     new cca::SynchronizedQueue<DATATYPE>(config->getQueueSize(),
00199                             config->keepLatest()));
00200         } else {
00201             this->buffer = typename Buffer<DATATYPE>::Ptr(
00202                     new cca::SingleItemBuffer<DATATYPE>(config->keepLatest()));
00203         }
00204 
00205         handler->setInputCallback(
00206                 boost::bind(&InputPort<DATATYPE>::addEvent, this, _1));
00207 
00208         InputPortBase::configureSpecifics();
00209     }
00210 
00217     InputPort(bool optional = false) :
00218             InputPortBase(optional, rsc::runtime::typeName<DATATYPE>()),
00219                     buffer(),
00220                     logger(rsc::logging::Logger::getLogger("cca.inputport")) {
00221     }
00222 
00228     virtual DataPtr get() throw () {
00229         if (this->configured) {
00230             return buffer->get();
00231         }
00232         return DataPtr();
00233     }
00234 
00238     virtual void addEvent(rsb::EventPtr item) {
00239         if (this->configured) {
00240             buffer->add(boost::static_pointer_cast<DATATYPE>(item->getData()));
00241         }
00242     }
00243 
00247     virtual void add(DataPtr item) {
00248         RSCTRACE(logger, "InputPort::add()");
00249         if (this->configured) {
00250             buffer->add(item);
00251         }
00252     }
00253 
00254     virtual bool empty() const {
00255         if (this->configured) {
00256             return buffer->empty();
00257         }
00258         return true;
00259     }
00260 
00261     virtual bool newItem() const {
00262         if (this->configured) {
00263             return buffer->newItem();
00264         }
00265         return false;
00266     }
00267 
00268     virtual unsigned int size() const {
00269         if (this->configured) {
00270             return buffer->size();
00271         }
00272         return 0;
00273     }
00274 
00275     virtual void purge() {
00276         if (this->configured) {
00277             buffer->purge();
00278         }
00279     }
00280 
00281     virtual std::string print() const {
00282         ::std::ostringstream outstream(::std::ostringstream::out);
00283         outstream << "<" << rsc::runtime::typeName<DATATYPE>() << ">";
00284         if (configured) {
00285             outstream << " " << getConfig()->print();
00286             outstream << "(" << getScopePtr()->toString() << ")";
00287         } else {
00288             outstream << " <unconfigured>";
00289         }
00290         if (optional) {
00291             outstream << " [optional]";
00292         }
00293         return outstream.str();
00294     }
00295 
00296     virtual std::string bla() const {
00297         ::std::ostringstream outstream(::std::ostringstream::out);
00298         outstream << "<" << rsc::runtime::typeName<DATATYPE>() << ">";
00299         return outstream.str();
00300     }
00301 
00302     virtual ~InputPort() {
00303     }
00304 
00305 protected:
00306 
00311     InputPort(InputPort &port);
00312 
00317     void operator=(const InputPort &port);
00318 
00320     BufferPtr buffer;
00321     rsc::logging::LoggerPtr logger;
00322 
00323 };
00324 
00325 }