CCA
SynchronizedQueue.h
Go to the documentation of this file.
00001 /* ============================================================
00002  *
00003  * This file is a part of CCA project
00004  *
00005  * Copyright (C) 2011 by Arne Nordmann <anordman at cor-lab dot uni-bielefeld dot de>
00006  *
00007  * This file may be licensed under the terms of the
00008  * GNU Lesser General Public License Version 3 (the ``LGPL''),
00009  * or (at your option) any later version.
00010  *
00011  * Software distributed under the License is distributed
00012  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
00013  * express or implied. See the LGPL for the specific language
00014  * governing rights and limitations.
00015  *
00016  * You should have received a copy of the LGPL along with this
00017  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
00018  * or write to the Free Software Foundation, Inc.,
00019  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
00020  *
00021  * The development of this software was supported by:
00022  *   CoR-Lab, Research Institute for Cognition and Robotics
00023  *     Bielefeld University
00024  *
00025  * ============================================================ */
00026 
00027 #pragma once
00028 
00029 #include <string>
00030 #include <sstream>
00031 #include <iostream>
00032 #include <exception>
00033 
00034 #include <rsc/threading/SynchronizedQueue.h>
00035 
00036 #include "cca/buffer/Buffer.h"
00037 
00038 namespace cca {
00039 
00043 template<class DATATYPE>
00044 class SynchronizedQueue: public Buffer<DATATYPE> {
00045 public:
00046 
00048     typedef ::boost::shared_ptr<SynchronizedQueue<DATATYPE> > Ptr;
00049 
00051     typedef ::boost::shared_ptr<DATATYPE> DataPtr;
00052 
00053     SynchronizedQueue(bool alwaysKeepLatest = true) :
00054             Buffer<DATATYPE>(alwaysKeepLatest), queuesize(0), queue(), latest(),
00055                     latestItemMutex() {
00056     }
00057 
00058     SynchronizedQueue(unsigned int qsize, bool alwaysKeepLatest = true) :
00059             Buffer<DATATYPE>(alwaysKeepLatest), queuesize(qsize),
00060                     queue(queuesize), latest(), latestItemMutex() {
00061     }
00062 
00063     virtual ~SynchronizedQueue() {
00064     }
00065 
00069     std::string print() const {
00070         std::ostringstream outstream(std::ostringstream::out);
00071         outstream.precision(3); // Precision when printing double values
00072         outstream << "<ynchronizedQueue";
00073         if (this->keepLatest) {
00074             outstream << ", keep latest";
00075             if (this->warm) {
00076                 outstream << " (warm)";
00077             } else {
00078                 outstream << " (cold)";
00079             }
00080         }
00081         if (this->newItem()) {
00082             outstream << ", " << this->size() << " new item(s)";
00083         } else {
00084             outstream << ", no new items";
00085         }
00086         return outstream.str();
00087     }
00088 
00092     virtual DataPtr get() throw () {
00093         try {
00094             DataPtr item = this->queue.tryPop();
00095             this->newitem = false;
00096             return item;
00097         } catch (rsc::threading::QueueEmptyException) {
00098             if (this->keepLatest) {
00099                 if (this->warm) {
00100                     this->newitem = false;
00101                     boost::recursive_mutex::scoped_lock scoped_lock(
00102                             latestItemMutex);
00103                     return latest;
00104                 } else {
00105                     throw std::runtime_error(
00106                             "Synchronized Queue not yet filled.");
00107                 }
00108             } else {
00109                 throw std::runtime_error("Synchronized Queue is empty.");
00110             }
00111         }
00112     }
00113 
00117     virtual void add(DataPtr item) {
00118         this->queue.push(item);
00119         if (this->keepLatest) {
00120             boost::recursive_mutex::scoped_lock scoped_lock(latestItemMutex);
00121             this->latest = item;
00122         }
00123         this->newitem = true;
00124         this->warm = true;
00125     }
00126 
00133     virtual unsigned int size() const {
00134         return this->queue.size();
00135     }
00136 
00140     virtual void purge() {
00141         boost::recursive_mutex::scoped_lock scoped_lock(latestItemMutex);
00142         this->latest = DataPtr();
00143         this->numItems = 0;
00144         this->newitem = false;
00145         this->warm = false;
00146 
00147         // New empty queue
00148         // TODO: This has to be faster
00149         while (this->queue.size() > 0) {
00150             this->queue.tryPop();
00151         }
00152 
00153     }
00154 
00155 protected:
00160     SynchronizedQueue(SynchronizedQueue &buffer);
00161 
00166     void operator=(const SynchronizedQueue &buffer);
00167 
00168 private:
00169     unsigned int queuesize;
00170     rsc::threading::SynchronizedQueue<DataPtr> queue;
00171     DataPtr latest;
00172     mutable boost::recursive_mutex latestItemMutex;
00173 };
00174 
00175 }