CCA
Node.cpp
Go to the documentation of this file.
00001 /* ============================================================
00002  *
00003  * This file is a part of CCA project
00004  *
00005  * Copyright (C) 2011 by Arne Nordmann <anordman at cor-lab dot uni-bielefeld dot de>
00006  *
00007  * This file may be licensed under the terms of the
00008  * GNU Lesser General Public License Version 3 (the ``LGPL''),
00009  * or (at your option) any later version.
00010  *
00011  * Software distributed under the License is distributed
00012  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
00013  * express or implied. See the LGPL for the specific language
00014  * governing rights and limitations.
00015  *
00016  * You should have received a copy of the LGPL along with this
00017  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
00018  * or write to the Free Software Foundation, Inc.,
00019  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
00020  *
00021  * The development of this software was supported by:
00022  *   CoR-Lab, Research Institute for Cognition and Robotics
00023  *     Bielefeld University
00024  *
00025  * ============================================================ */
00026 
00027 #include "Node.h"
00028 
00029 #include <numeric>
00030 #include <iostream>
00031 
00032 #include <boost/shared_ptr.hpp>
00033 #include <boost/bind.hpp>
00034 #include <boost/date_time/posix_time/posix_time.hpp>
00035 #include <boost/cstdint.hpp>
00036 
00037 #include <rsc/logging/Logger.h>
00038 #include <rsc/misc/langutils.h>
00039 
00040 #include <rsb/Event.h>
00041 #include <rsb/CommException.h>
00042 #include <rsb/converter/Repository.h>
00043 #include <rsb/converter/ProtocolBufferConverter.h>
00044 
00045 #include "cca/buffer/SingleItemBuffer.h"
00046 #include "cca/buffer/SynchronizedQueue.h"
00047 
00048 using namespace std;
00049 using namespace boost;
00050 using namespace rsb;
00051 using namespace rsc;
00052 using namespace rsc::logging;
00053 
00054 namespace cca {
00055 
00056 Node::Node(const string &nodename) :
00057         inputPorts(), outputPorts(), config(), userconfig(), name(nodename),
00058                 ready(false), loaded(false), created(false), configured(false),
00059                 strategy(), logger(Logger::getLogger("cca")), properties(),
00060                 defaultProperties() {
00061     RSCINFO(logger, this->getName() + " component constructed.");
00062     this->create();
00063 }
00064 
00065 Node::~Node() {
00066     this->stop();
00067 }
00068 
00069 void Node::registerPort(const std::string &name, InputPortPtr port,
00070         bool optional) {
00071     RSCTRACE(logger, this->getName() + " registerPort() input port " + name);
00072 
00073     if (!name.empty()) {
00074         inputPorts.insert(NamedInputPort(name, port));
00075     } else {
00076         RSCERROR(logger,
00077                 this->getName() + " can't register input port, name is missing");
00078     }
00079 
00080     port->makeOptional(optional);
00081 
00082     port->setHandler(
00083             rsb::HandlerPtr(
00084                     new rsb::EventFunctionHandler(
00085                             bind(&Node::inputCallback, this, _1, name))));
00086 }
00087 
00088 void Node::registerPort(const std::string &name, OutputPortPtr port,
00089         bool optional) {
00090     RSCTRACE(logger, this->getName() + " registerPort() output port " + name);
00091 
00092     if (!name.empty()) {
00093         outputPorts.insert(NamedOutputPort(name, port));
00094     } else {
00095         RSCERROR(logger,
00096                 this->getName() + " can't register output port, name is missing");
00097     }
00098 
00099     port->makeOptional(optional);
00100 }
00101 
00102 bool Node::initialize() {
00103     RSCTRACE(logger, this->getName() + " initialize() Initializing component");
00104 
00105     if (!this->ready) {
00106         if (!this->created) {
00107             RSCDEBUG(logger,
00108                     this->getName() + " initialize() " + "Can`t initialize component, because not fully " + "created yet");
00109             return false;
00110         }
00111         if (!this->isConfigured()) {
00112             RSCERROR(logger,
00113                     this->getName() + " initialize() " + "Can`t initialize component, because parts are " + "not configured yet");
00114             return false;
00115         }
00116         if (!this->resume()) {
00117             return false;
00118         }
00119     }
00120 
00121     this->onInitialize();
00122     RSCINFO(logger,
00123             this->getName() + " initialize() " + "Component successfully initialized and ready to run");
00124 
00125     return true;
00126 }
00127 
00128 bool Node::reset() {
00129     RSCTRACE(logger, this->getName() + " reset() Resetting component");
00130 
00131     RSCTRACE(logger,
00132             this->getName() + " initialize() Reset properties to its defaults");
00133     this->properties = this->properties << this->defaultProperties;
00134 
00135     RSCINFO(logger, this->getName() + " reset() Stopping component");
00136     if (!this->stop()) {
00137         RSCERROR(logger,
00138                 this->getName() + " reset() Failed to stop component.");
00139         return false;
00140     }
00141 
00142     // Purge all buffers
00143     RSCINFO(logger, this->getName() + " reset() Empty all input ports");
00144     for (NamedInputPorts::const_iterator it = inputPorts.begin();
00145             it != inputPorts.end(); ++it) {
00146         RSCTRACE(logger,
00147                 this->getName() + " reset() Purge input port " + it->first);
00148         if (it->second && it->second->isReady()) {
00149             it->second->purge();
00150         }
00151     }
00152 
00153     this->onReset();
00154 
00155     RSCINFO(logger, this->getName() + " reset() Re-initializing component");
00156     return this->initialize();
00157 }
00158 
00159 void Node::onInitialize() {
00160     RSCTRACE(logger, this->getName() + " onInitialize() hook is empty");
00161 }
00162 
00163 string Node::getName() const {
00164     return this->name;
00165 }
00166 
00167 StrategyPtr Node::getProcessingStrategy() const {
00168     return this->strategy;
00169 }
00170 
00171 void Node::tick() {
00172     RSCTRACE(logger, this->getName() + " tick() Component received tick");
00173 
00174     if (!isConfigured()) {
00175         RSCFATAL(logger,
00176                 this->getName() + " tick() " + "Component is not fully configured yet.");
00177         throw std::runtime_error(
00178                 this->getName() + " tick() "
00179                         + "Component is not configured yet.");
00180     }
00181 
00182     this->onTick();
00183 
00184     if (this->strategy->listensToTicks()) {
00185         if (this->strategy->processOnTick()) {
00186             RSCTRACE(logger,
00187                     this->getName() + " tick() " + "Strategy says to process on this tick");
00188             this->process();
00189         } else {
00190             RSCTRACE(logger,
00191                     this->getName() + " tick() " + "Strategy says: Don`t process on this tick");
00192         }
00193     } else {
00194         RSCTRACE(logger,
00195                 this->getName() + " tick() " + "Strategy of this component doesn`t listen on Ticks");
00196     }
00197 }
00198 
00199 void Node::configureInputPort(const std::string &name,
00200         PortConfigurationPtr portcfg) {
00201     RSCTRACE(logger, this->getName() + " configureInputPort() entered");
00202 
00203     if (inputPorts.count(name) == 0) {
00204         throw(std::invalid_argument(
00205                 str(
00206                         format(
00207                                 "Can't configure input port '%1%' of node '%2%', port doesn't exist.")
00208                                 % name % getName())));
00209     }
00210 
00211     // Port
00212     try {
00213         inputPorts.at(name)->configure(portcfg);
00214     } catch (std::exception& e) {
00215         throw(std::runtime_error(
00216                 str(
00217                         format(
00218                                 "Can't configure input port '%1%' of node '%2%': %3%")
00219                                 % name % getName() % e.what())));
00220     }
00221 }
00222 
00223 void Node::configureInputPort(const std::string &name,
00224         const std::string &scope) {
00225     RSCTRACE(logger, this->getName() + " configureInputPort() entered");
00226 
00227     if (inputPorts.count(name) == 0) {
00228         throw(std::invalid_argument(
00229                 str(
00230                         format(
00231                                 "Can't configure input port '%1%' of node '%2%', port doesn't exist.")
00232                                 % name % getName())));
00233     }
00234 
00235     // Port
00236     try {
00237         inputPorts.at(name)->configure(scope);
00238     } catch (std::exception& e) {
00239         throw(std::runtime_error(
00240                 str(
00241                         format(
00242                                 "Can't configure input port '%1%' of node '%2%': %3%")
00243                                 % name % getName() % e.what())));
00244     }
00245 }
00246 
00247 void Node::configureOutputPort(const std::string &name,
00248         PortConfigurationPtr portcfg) {
00249     RSCTRACE(logger, this->getName() + "::configureOutputPort() entered");
00250 
00251     if (outputPorts.count(name) == 0) {
00252         throw(std::invalid_argument(
00253                 str(
00254                         boost::format(
00255                                 "Can't configure output port '%1%' of node '%2%', port doesn't exist.")
00256                                 % name % getName())));
00257     }
00258 
00259 // Port
00260     try {
00261         outputPorts.at(name)->configure(portcfg);
00262     } catch (std::exception& e) {
00263         throw(std::runtime_error(
00264                 str(
00265                         format(
00266                                 "Can't configure output port '%1%' of node '%2%': %3%")
00267                                 % name % getName() % e.what())));
00268     }
00269 }
00270 
00271 void Node::configureOutputPort(const std::string &name,
00272         const std::string &scope) {
00273     RSCTRACE(logger, this->getName() + "::configureOutputPort() entered");
00274 
00275     if (outputPorts.count(name) == 0) {
00276         throw(std::invalid_argument(
00277                 str(
00278                         boost::format(
00279                                 "Can't configure output port '%1%' of node '%2%', port doesn't exist.")
00280                                 % name % getName())));
00281     }
00282 
00283 // Port
00284     try {
00285         outputPorts.at(name)->configure(scope);
00286     } catch (std::exception& e) {
00287         throw(std::runtime_error(
00288                 str(
00289                         format(
00290                                 "Can't configure output port '%1%' of node '%2%': %3%")
00291                                 % name % getName() % e.what())));
00292     }
00293 }
00294 
00295 void Node::setProcessingStrategy(StrategyPtr Strategy) {
00296     RSCTRACE(logger,
00297             this->getName() + " setProcessingStrategy()" + " Processing strategy (re-)configured");
00298 
00299     this->strategy = Strategy;
00300 
00301     if (this->inputPorts.empty() && this->strategy->listensToInputs()) {
00302         RSCWARN(logger,
00303                 this->getName() + " isConfigured() Chosen strategy listens to "
00304                 "inputs, but component does not have any input ports");
00305     }
00306 }
00307 
00308 bool Node::isConfigured() {
00309     RSCTRACE(logger, this->getName() + " isConfigured()");
00310     if (this->configured) {
00311         return true;
00312     }
00313 
00314 // Input ports configured
00315     for (NamedInputPorts::const_iterator it = inputPorts.begin();
00316             it != inputPorts.end(); ++it) {
00317         if (!it->second->isReady() && !it->second->isOptional()) {
00318             RSCERROR(logger,
00319                     this->getName() + " isConfigured() Input port '" + it->first + "' not configured yet, but mandatory");
00320             return false;
00321         }
00322     }
00323 
00324 // Output ports configured
00325     for (NamedOutputPorts::const_iterator it = outputPorts.begin();
00326             it != outputPorts.end(); ++it) {
00327         if (!it->second->isReady() && !it->second->isOptional()) {
00328             RSCERROR(logger,
00329                     this->getName() + " isConfigured() Output port '" + it->first + "' not configured yet, but mandatory");
00330             return false;
00331         }
00332     }
00333 
00334 // Processing strategy set
00335     if (!this->strategy) {
00336         RSCWARN(logger,
00337                 this->getName() + " isConfigured() " + "Processing strategy not configured yet");
00338         return false;
00339     }
00340 
00341     this->configured = true;
00342     return this->configured;
00343 }
00344 
00345 void Node::process() {
00346     RSCTRACE(logger, this->getName() + " process() Processing component");
00347 
00348     if (this->processing) {
00349         RSCWARN(logger,
00350                 this->getName() + " process() Overshoot detected! Gets called faster then it can process.");
00351     }
00352 
00353     if (!this->ready) {
00354         RSCINFO(logger,
00355                 this->getName() + " process() " + "Component not ready yet, need to initialize");
00356         if (!this->initialize()) {
00357             RSCFATAL(logger,
00358                     this->getName() + " process() " + "failed to initialize component, can`t process.");
00359             throw std::runtime_error(
00360                     this->getName() + " process() "
00361                             + "failed to initialize component, can`t process.");
00362         }
00363     }
00364 
00365     this->processing = true;
00366     try {
00367         this->onProcess();
00368     } catch (...) {
00369         std::cout << std::endl << "ERROR in onProcess() of '" << this->getName()
00370                 << "':" << std::endl << this->print() << std::endl
00371                 << "onProcess() triggered by strategy '"
00372                 << this->getProcessingStrategy()->print() << "'. Backtrace was:"
00373                 << std::endl << std::endl;
00374         throw;
00375     }
00376     this->processing = false;
00377 }
00378 
00379 void Node::inputCallback(EventPtr event, const std::string& name) {
00380     RSCTRACE(logger,
00381             this->getName() + " inputCallback() Received input on '" + event->getScopePtr()->toString() + "'");
00382     if (!isConfigured()) {
00383         RSCERROR(logger,
00384                 this->getName() + " inputCallback() " + "Can`t work yet, not fully configured.");
00385     }
00386 
00387     // Receive and save input
00388     if (this->strategy->listensToInputs()) {
00389         if (this->strategy->processOnInputs(this->inputPorts)) {
00390             this->process();
00391         }
00392     }
00393 
00394 }
00395 
00396 void Node::create() {
00397     /* Generic node creation goes here */
00398     RSCTRACE(logger, this->getName() + " create() Creating component");
00399 
00400     this->onCreate();
00401     this->created = true;
00402 }
00403 
00404 void Node::onCreate() {
00405     /* Node-specific creation goes here */
00406     RSCTRACE(logger, this->getName() + " onCreate() hook is empty");
00407 }
00408 
00409 bool Node::stop() {
00410     RSCTRACE(logger, this->getName() + " stop() Stopping.");
00411     this->onStop();
00412 
00413     /* Generic node stopping goes here */
00414 
00415     this->ready = false;
00416 
00417     return !(this->ready);
00418 }
00419 
00420 void Node::onStop() {
00421     /* Node-specific stopping goes here */
00422     RSCTRACE(logger, this->getName() + " onStop() hook is empty");
00423 }
00424 
00425 void Node::onReset() {
00426     /* Node-specific stopping goes here */
00427     RSCTRACE(logger, this->getName() + " onReset() hook is empty");
00428 }
00429 
00430 bool Node::resume() {
00431     RSCTRACE(logger, this->getName() + " resume()");
00432     /* Generic node resuming goes here */
00433     if (!this->isConfigured()) {
00434         return false;
00435     }
00436 
00437     this->onResume();
00438     this->ready = true;
00439 
00440     return this->ready;
00441 }
00442 
00443 void Node::onResume() {
00444     /* Node-specific resuming goes here */
00445     RSCTRACE(logger, this->getName() + " onResume() hook is empty");
00446 }
00447 
00448 bool Node::load() {
00449     /* Generic node loading goes here */
00450 
00451     this->onLoad();
00452     this->loaded = true;
00453 
00454     return this->loaded;
00455 }
00456 
00457 void Node::onLoad() {
00458     /* Node-specific loading goes here */
00459 }
00460 
00461 bool Node::persist() {
00462     this->onPersist();
00463 
00464     /* Generic node persisting goes here */
00465 
00466     return true;
00467 }
00468 
00469 void Node::onPersist() {
00470     /* Node-specific persisting goes here */
00471 }
00472 
00473 void Node::onTick() {
00474     RSCTRACE(logger, this->getName() + " onTick() hook is empty.");
00475 }
00476 
00477 bool Node::isReady() const {
00478     return this->ready;
00479 }
00480 
00481 std::string Node::print() const {
00482     ComponentInfoPtr ci(new ComponentInfo(getName()));
00483     ci->displayStrategy(strategy);
00484     ci->displayDataPorts(inputPorts, outputPorts);
00485     ci->displayProperties(defaultProperties, properties);
00486     return ci->print();
00487 }
00488 
00489 bool Node::newValueAt(const std::string &name) const {
00490     NamedInputPorts::const_iterator ip = inputPorts.find(name);
00491 
00492     return (!ip->second->empty() && ip->second->newItem());
00493 }
00494 
00495 }
00496 
00497 ostream& operator<<(ostream& os, const cca::Node& node) {
00498     os.precision(3); // Precision when printing double values
00499     os << node.print();
00500     return os;
00501 }