CCA
|
Node.cpp
Go to the documentation of this file.
00001 /* ============================================================ 00002 * 00003 * This file is a part of CCA project 00004 * 00005 * Copyright (C) 2011 by Arne Nordmann <anordman at cor-lab dot uni-bielefeld dot de> 00006 * 00007 * This file may be licensed under the terms of the 00008 * GNU Lesser General Public License Version 3 (the ``LGPL''), 00009 * or (at your option) any later version. 00010 * 00011 * Software distributed under the License is distributed 00012 * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either 00013 * express or implied. See the LGPL for the specific language 00014 * governing rights and limitations. 00015 * 00016 * You should have received a copy of the LGPL along with this 00017 * program. If not, go to http://www.gnu.org/licenses/lgpl.html 00018 * or write to the Free Software Foundation, Inc., 00019 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 00020 * 00021 * The development of this software was supported by: 00022 * CoR-Lab, Research Institute for Cognition and Robotics 00023 * Bielefeld University 00024 * 00025 * ============================================================ */ 00026 00027 #include "Node.h" 00028 00029 #include <numeric> 00030 #include <iostream> 00031 00032 #include <boost/shared_ptr.hpp> 00033 #include <boost/bind.hpp> 00034 #include <boost/date_time/posix_time/posix_time.hpp> 00035 #include <boost/cstdint.hpp> 00036 00037 #include <rsc/logging/Logger.h> 00038 #include <rsc/misc/langutils.h> 00039 00040 #include <rsb/Event.h> 00041 #include <rsb/CommException.h> 00042 #include <rsb/converter/Repository.h> 00043 #include <rsb/converter/ProtocolBufferConverter.h> 00044 00045 #include "cca/buffer/SingleItemBuffer.h" 00046 #include "cca/buffer/SynchronizedQueue.h" 00047 00048 using namespace std; 00049 using namespace boost; 00050 using namespace rsb; 00051 using namespace rsc; 00052 using namespace rsc::logging; 00053 00054 namespace cca { 00055 00056 Node::Node(const string &nodename) : 00057 inputPorts(), outputPorts(), config(), userconfig(), name(nodename), 00058 ready(false), loaded(false), created(false), configured(false), 00059 strategy(), logger(Logger::getLogger("cca")), properties(), 00060 defaultProperties() { 00061 RSCINFO(logger, this->getName() + " component constructed."); 00062 this->create(); 00063 } 00064 00065 Node::~Node() { 00066 this->stop(); 00067 } 00068 00069 void Node::registerPort(const std::string &name, InputPortPtr port, 00070 bool optional) { 00071 RSCTRACE(logger, this->getName() + " registerPort() input port " + name); 00072 00073 if (!name.empty()) { 00074 inputPorts.insert(NamedInputPort(name, port)); 00075 } else { 00076 RSCERROR(logger, 00077 this->getName() + " can't register input port, name is missing"); 00078 } 00079 00080 port->makeOptional(optional); 00081 00082 port->setHandler( 00083 rsb::HandlerPtr( 00084 new rsb::EventFunctionHandler( 00085 bind(&Node::inputCallback, this, _1, name)))); 00086 } 00087 00088 void Node::registerPort(const std::string &name, OutputPortPtr port, 00089 bool optional) { 00090 RSCTRACE(logger, this->getName() + " registerPort() output port " + name); 00091 00092 if (!name.empty()) { 00093 outputPorts.insert(NamedOutputPort(name, port)); 00094 } else { 00095 RSCERROR(logger, 00096 this->getName() + " can't register output port, name is missing"); 00097 } 00098 00099 port->makeOptional(optional); 00100 } 00101 00102 bool Node::initialize() { 00103 RSCTRACE(logger, this->getName() + " initialize() Initializing component"); 00104 00105 if (!this->ready) { 00106 if (!this->created) { 00107 RSCDEBUG(logger, 00108 this->getName() + " initialize() " + "Can`t initialize component, because not fully " + "created yet"); 00109 return false; 00110 } 00111 if (!this->isConfigured()) { 00112 RSCERROR(logger, 00113 this->getName() + " initialize() " + "Can`t initialize component, because parts are " + "not configured yet"); 00114 return false; 00115 } 00116 if (!this->resume()) { 00117 return false; 00118 } 00119 } 00120 00121 this->onInitialize(); 00122 RSCINFO(logger, 00123 this->getName() + " initialize() " + "Component successfully initialized and ready to run"); 00124 00125 return true; 00126 } 00127 00128 bool Node::reset() { 00129 RSCTRACE(logger, this->getName() + " reset() Resetting component"); 00130 00131 RSCTRACE(logger, 00132 this->getName() + " initialize() Reset properties to its defaults"); 00133 this->properties = this->properties << this->defaultProperties; 00134 00135 RSCINFO(logger, this->getName() + " reset() Stopping component"); 00136 if (!this->stop()) { 00137 RSCERROR(logger, 00138 this->getName() + " reset() Failed to stop component."); 00139 return false; 00140 } 00141 00142 // Purge all buffers 00143 RSCINFO(logger, this->getName() + " reset() Empty all input ports"); 00144 for (NamedInputPorts::const_iterator it = inputPorts.begin(); 00145 it != inputPorts.end(); ++it) { 00146 RSCTRACE(logger, 00147 this->getName() + " reset() Purge input port " + it->first); 00148 if (it->second && it->second->isReady()) { 00149 it->second->purge(); 00150 } 00151 } 00152 00153 this->onReset(); 00154 00155 RSCINFO(logger, this->getName() + " reset() Re-initializing component"); 00156 return this->initialize(); 00157 } 00158 00159 void Node::onInitialize() { 00160 RSCTRACE(logger, this->getName() + " onInitialize() hook is empty"); 00161 } 00162 00163 string Node::getName() const { 00164 return this->name; 00165 } 00166 00167 StrategyPtr Node::getProcessingStrategy() const { 00168 return this->strategy; 00169 } 00170 00171 void Node::tick() { 00172 RSCTRACE(logger, this->getName() + " tick() Component received tick"); 00173 00174 if (!isConfigured()) { 00175 RSCFATAL(logger, 00176 this->getName() + " tick() " + "Component is not fully configured yet."); 00177 throw std::runtime_error( 00178 this->getName() + " tick() " 00179 + "Component is not configured yet."); 00180 } 00181 00182 this->onTick(); 00183 00184 if (this->strategy->listensToTicks()) { 00185 if (this->strategy->processOnTick()) { 00186 RSCTRACE(logger, 00187 this->getName() + " tick() " + "Strategy says to process on this tick"); 00188 this->process(); 00189 } else { 00190 RSCTRACE(logger, 00191 this->getName() + " tick() " + "Strategy says: Don`t process on this tick"); 00192 } 00193 } else { 00194 RSCTRACE(logger, 00195 this->getName() + " tick() " + "Strategy of this component doesn`t listen on Ticks"); 00196 } 00197 } 00198 00199 void Node::configureInputPort(const std::string &name, 00200 PortConfigurationPtr portcfg) { 00201 RSCTRACE(logger, this->getName() + " configureInputPort() entered"); 00202 00203 if (inputPorts.count(name) == 0) { 00204 throw(std::invalid_argument( 00205 str( 00206 format( 00207 "Can't configure input port '%1%' of node '%2%', port doesn't exist.") 00208 % name % getName()))); 00209 } 00210 00211 // Port 00212 try { 00213 inputPorts.at(name)->configure(portcfg); 00214 } catch (std::exception& e) { 00215 throw(std::runtime_error( 00216 str( 00217 format( 00218 "Can't configure input port '%1%' of node '%2%': %3%") 00219 % name % getName() % e.what()))); 00220 } 00221 } 00222 00223 void Node::configureInputPort(const std::string &name, 00224 const std::string &scope) { 00225 RSCTRACE(logger, this->getName() + " configureInputPort() entered"); 00226 00227 if (inputPorts.count(name) == 0) { 00228 throw(std::invalid_argument( 00229 str( 00230 format( 00231 "Can't configure input port '%1%' of node '%2%', port doesn't exist.") 00232 % name % getName()))); 00233 } 00234 00235 // Port 00236 try { 00237 inputPorts.at(name)->configure(scope); 00238 } catch (std::exception& e) { 00239 throw(std::runtime_error( 00240 str( 00241 format( 00242 "Can't configure input port '%1%' of node '%2%': %3%") 00243 % name % getName() % e.what()))); 00244 } 00245 } 00246 00247 void Node::configureOutputPort(const std::string &name, 00248 PortConfigurationPtr portcfg) { 00249 RSCTRACE(logger, this->getName() + "::configureOutputPort() entered"); 00250 00251 if (outputPorts.count(name) == 0) { 00252 throw(std::invalid_argument( 00253 str( 00254 boost::format( 00255 "Can't configure output port '%1%' of node '%2%', port doesn't exist.") 00256 % name % getName()))); 00257 } 00258 00259 // Port 00260 try { 00261 outputPorts.at(name)->configure(portcfg); 00262 } catch (std::exception& e) { 00263 throw(std::runtime_error( 00264 str( 00265 format( 00266 "Can't configure output port '%1%' of node '%2%': %3%") 00267 % name % getName() % e.what()))); 00268 } 00269 } 00270 00271 void Node::configureOutputPort(const std::string &name, 00272 const std::string &scope) { 00273 RSCTRACE(logger, this->getName() + "::configureOutputPort() entered"); 00274 00275 if (outputPorts.count(name) == 0) { 00276 throw(std::invalid_argument( 00277 str( 00278 boost::format( 00279 "Can't configure output port '%1%' of node '%2%', port doesn't exist.") 00280 % name % getName()))); 00281 } 00282 00283 // Port 00284 try { 00285 outputPorts.at(name)->configure(scope); 00286 } catch (std::exception& e) { 00287 throw(std::runtime_error( 00288 str( 00289 format( 00290 "Can't configure output port '%1%' of node '%2%': %3%") 00291 % name % getName() % e.what()))); 00292 } 00293 } 00294 00295 void Node::setProcessingStrategy(StrategyPtr Strategy) { 00296 RSCTRACE(logger, 00297 this->getName() + " setProcessingStrategy()" + " Processing strategy (re-)configured"); 00298 00299 this->strategy = Strategy; 00300 00301 if (this->inputPorts.empty() && this->strategy->listensToInputs()) { 00302 RSCWARN(logger, 00303 this->getName() + " isConfigured() Chosen strategy listens to " 00304 "inputs, but component does not have any input ports"); 00305 } 00306 } 00307 00308 bool Node::isConfigured() { 00309 RSCTRACE(logger, this->getName() + " isConfigured()"); 00310 if (this->configured) { 00311 return true; 00312 } 00313 00314 // Input ports configured 00315 for (NamedInputPorts::const_iterator it = inputPorts.begin(); 00316 it != inputPorts.end(); ++it) { 00317 if (!it->second->isReady() && !it->second->isOptional()) { 00318 RSCERROR(logger, 00319 this->getName() + " isConfigured() Input port '" + it->first + "' not configured yet, but mandatory"); 00320 return false; 00321 } 00322 } 00323 00324 // Output ports configured 00325 for (NamedOutputPorts::const_iterator it = outputPorts.begin(); 00326 it != outputPorts.end(); ++it) { 00327 if (!it->second->isReady() && !it->second->isOptional()) { 00328 RSCERROR(logger, 00329 this->getName() + " isConfigured() Output port '" + it->first + "' not configured yet, but mandatory"); 00330 return false; 00331 } 00332 } 00333 00334 // Processing strategy set 00335 if (!this->strategy) { 00336 RSCWARN(logger, 00337 this->getName() + " isConfigured() " + "Processing strategy not configured yet"); 00338 return false; 00339 } 00340 00341 this->configured = true; 00342 return this->configured; 00343 } 00344 00345 void Node::process() { 00346 RSCTRACE(logger, this->getName() + " process() Processing component"); 00347 00348 if (this->processing) { 00349 RSCWARN(logger, 00350 this->getName() + " process() Overshoot detected! Gets called faster then it can process."); 00351 } 00352 00353 if (!this->ready) { 00354 RSCINFO(logger, 00355 this->getName() + " process() " + "Component not ready yet, need to initialize"); 00356 if (!this->initialize()) { 00357 RSCFATAL(logger, 00358 this->getName() + " process() " + "failed to initialize component, can`t process."); 00359 throw std::runtime_error( 00360 this->getName() + " process() " 00361 + "failed to initialize component, can`t process."); 00362 } 00363 } 00364 00365 this->processing = true; 00366 try { 00367 this->onProcess(); 00368 } catch (...) { 00369 std::cout << std::endl << "ERROR in onProcess() of '" << this->getName() 00370 << "':" << std::endl << this->print() << std::endl 00371 << "onProcess() triggered by strategy '" 00372 << this->getProcessingStrategy()->print() << "'. Backtrace was:" 00373 << std::endl << std::endl; 00374 throw; 00375 } 00376 this->processing = false; 00377 } 00378 00379 void Node::inputCallback(EventPtr event, const std::string& name) { 00380 RSCTRACE(logger, 00381 this->getName() + " inputCallback() Received input on '" + event->getScopePtr()->toString() + "'"); 00382 if (!isConfigured()) { 00383 RSCERROR(logger, 00384 this->getName() + " inputCallback() " + "Can`t work yet, not fully configured."); 00385 } 00386 00387 // Receive and save input 00388 if (this->strategy->listensToInputs()) { 00389 if (this->strategy->processOnInputs(this->inputPorts)) { 00390 this->process(); 00391 } 00392 } 00393 00394 } 00395 00396 void Node::create() { 00397 /* Generic node creation goes here */ 00398 RSCTRACE(logger, this->getName() + " create() Creating component"); 00399 00400 this->onCreate(); 00401 this->created = true; 00402 } 00403 00404 void Node::onCreate() { 00405 /* Node-specific creation goes here */ 00406 RSCTRACE(logger, this->getName() + " onCreate() hook is empty"); 00407 } 00408 00409 bool Node::stop() { 00410 RSCTRACE(logger, this->getName() + " stop() Stopping."); 00411 this->onStop(); 00412 00413 /* Generic node stopping goes here */ 00414 00415 this->ready = false; 00416 00417 return !(this->ready); 00418 } 00419 00420 void Node::onStop() { 00421 /* Node-specific stopping goes here */ 00422 RSCTRACE(logger, this->getName() + " onStop() hook is empty"); 00423 } 00424 00425 void Node::onReset() { 00426 /* Node-specific stopping goes here */ 00427 RSCTRACE(logger, this->getName() + " onReset() hook is empty"); 00428 } 00429 00430 bool Node::resume() { 00431 RSCTRACE(logger, this->getName() + " resume()"); 00432 /* Generic node resuming goes here */ 00433 if (!this->isConfigured()) { 00434 return false; 00435 } 00436 00437 this->onResume(); 00438 this->ready = true; 00439 00440 return this->ready; 00441 } 00442 00443 void Node::onResume() { 00444 /* Node-specific resuming goes here */ 00445 RSCTRACE(logger, this->getName() + " onResume() hook is empty"); 00446 } 00447 00448 bool Node::load() { 00449 /* Generic node loading goes here */ 00450 00451 this->onLoad(); 00452 this->loaded = true; 00453 00454 return this->loaded; 00455 } 00456 00457 void Node::onLoad() { 00458 /* Node-specific loading goes here */ 00459 } 00460 00461 bool Node::persist() { 00462 this->onPersist(); 00463 00464 /* Generic node persisting goes here */ 00465 00466 return true; 00467 } 00468 00469 void Node::onPersist() { 00470 /* Node-specific persisting goes here */ 00471 } 00472 00473 void Node::onTick() { 00474 RSCTRACE(logger, this->getName() + " onTick() hook is empty."); 00475 } 00476 00477 bool Node::isReady() const { 00478 return this->ready; 00479 } 00480 00481 std::string Node::print() const { 00482 ComponentInfoPtr ci(new ComponentInfo(getName())); 00483 ci->displayStrategy(strategy); 00484 ci->displayDataPorts(inputPorts, outputPorts); 00485 ci->displayProperties(defaultProperties, properties); 00486 return ci->print(); 00487 } 00488 00489 bool Node::newValueAt(const std::string &name) const { 00490 NamedInputPorts::const_iterator ip = inputPorts.find(name); 00491 00492 return (!ip->second->empty() && ip->second->newItem()); 00493 } 00494 00495 } 00496 00497 ostream& operator<<(ostream& os, const cca::Node& node) { 00498 os.precision(3); // Precision when printing double values 00499 os << node.print(); 00500 return os; 00501 }