CCA
|
SynchronizedQueue.h
Go to the documentation of this file.
00001 /* ============================================================ 00002 * 00003 * This file is a part of CCA project 00004 * 00005 * Copyright (C) 2011 by Arne Nordmann <anordman at cor-lab dot uni-bielefeld dot de> 00006 * 00007 * This file may be licensed under the terms of the 00008 * GNU Lesser General Public License Version 3 (the ``LGPL''), 00009 * or (at your option) any later version. 00010 * 00011 * Software distributed under the License is distributed 00012 * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either 00013 * express or implied. See the LGPL for the specific language 00014 * governing rights and limitations. 00015 * 00016 * You should have received a copy of the LGPL along with this 00017 * program. If not, go to http://www.gnu.org/licenses/lgpl.html 00018 * or write to the Free Software Foundation, Inc., 00019 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 00020 * 00021 * The development of this software was supported by: 00022 * CoR-Lab, Research Institute for Cognition and Robotics 00023 * Bielefeld University 00024 * 00025 * ============================================================ */ 00026 00027 #pragma once 00028 00029 #include <string> 00030 #include <sstream> 00031 #include <iostream> 00032 #include <exception> 00033 00034 #include <rsc/threading/SynchronizedQueue.h> 00035 00036 #include "cca/buffer/Buffer.h" 00037 00038 namespace cca { 00039 00043 template<class DATATYPE> 00044 class SynchronizedQueue: public Buffer<DATATYPE> { 00045 public: 00046 00048 typedef ::boost::shared_ptr<SynchronizedQueue<DATATYPE> > Ptr; 00049 00051 typedef ::boost::shared_ptr<DATATYPE> DataPtr; 00052 00053 SynchronizedQueue(bool alwaysKeepLatest = true) : 00054 Buffer<DATATYPE>(alwaysKeepLatest), queuesize(0), queue(), latest(), 00055 latestItemMutex() { 00056 } 00057 00058 SynchronizedQueue(unsigned int qsize, bool alwaysKeepLatest = true) : 00059 Buffer<DATATYPE>(alwaysKeepLatest), queuesize(qsize), 00060 queue(queuesize), latest(), latestItemMutex() { 00061 } 00062 00063 virtual ~SynchronizedQueue() { 00064 } 00065 00069 std::string print() const { 00070 std::ostringstream outstream(std::ostringstream::out); 00071 outstream.precision(3); // Precision when printing double values 00072 outstream << "<ynchronizedQueue"; 00073 if (this->keepLatest) { 00074 outstream << ", keep latest"; 00075 if (this->warm) { 00076 outstream << " (warm)"; 00077 } else { 00078 outstream << " (cold)"; 00079 } 00080 } 00081 if (this->newItem()) { 00082 outstream << ", " << this->size() << " new item(s)"; 00083 } else { 00084 outstream << ", no new items"; 00085 } 00086 return outstream.str(); 00087 } 00088 00092 virtual DataPtr get() throw () { 00093 try { 00094 DataPtr item = this->queue.tryPop(); 00095 this->newitem = false; 00096 return item; 00097 } catch (rsc::threading::QueueEmptyException) { 00098 if (this->keepLatest) { 00099 if (this->warm) { 00100 this->newitem = false; 00101 boost::recursive_mutex::scoped_lock scoped_lock( 00102 latestItemMutex); 00103 return latest; 00104 } else { 00105 throw std::runtime_error( 00106 "Synchronized Queue not yet filled."); 00107 } 00108 } else { 00109 throw std::runtime_error("Synchronized Queue is empty."); 00110 } 00111 } 00112 } 00113 00117 virtual void add(DataPtr item) { 00118 this->queue.push(item); 00119 if (this->keepLatest) { 00120 boost::recursive_mutex::scoped_lock scoped_lock(latestItemMutex); 00121 this->latest = item; 00122 } 00123 this->newitem = true; 00124 this->warm = true; 00125 } 00126 00133 virtual unsigned int size() const { 00134 return this->queue.size(); 00135 } 00136 00140 virtual void purge() { 00141 boost::recursive_mutex::scoped_lock scoped_lock(latestItemMutex); 00142 this->latest = DataPtr(); 00143 this->numItems = 0; 00144 this->newitem = false; 00145 this->warm = false; 00146 00147 // New empty queue 00148 // TODO: This has to be faster 00149 while (this->queue.size() > 0) { 00150 this->queue.tryPop(); 00151 } 00152 00153 } 00154 00155 protected: 00160 SynchronizedQueue(SynchronizedQueue &buffer); 00161 00166 void operator=(const SynchronizedQueue &buffer); 00167 00168 private: 00169 unsigned int queuesize; 00170 rsc::threading::SynchronizedQueue<DataPtr> queue; 00171 DataPtr latest; 00172 mutable boost::recursive_mutex latestItemMutex; 00173 }; 00174 00175 }