29 #ifndef MULTI_MBOX_HDR 30 #define MULTI_MBOX_HDR 35 #include <boost/thread.hpp> 36 #include <boost/thread/thread.hpp> 37 #include <boost/thread/mutex.hpp> 38 #include <boost/thread/condition.hpp> 52 boost::mutex::scoped_lock lock(
rc_mutex);
59 bool free(std::queue<MBoxMessage *> & free_list) {
60 boost::mutex::scoped_lock lock(
rc_mutex);
85 keep_freelist = _keep_freelist;
89 int subscriber_id = subscriber_count;
91 subscribers[subscriber_id] =
new Subscriber;
99 m->setReaderCount(subscriber_count);
101 for(i = 0; i < subscriber_count; i++) {
102 Subscriber * s = subscribers[i];
103 boost::mutex::scoped_lock lock(s->postmutex);
104 s->posted_list.push(m);
106 s->postcond.notify_all();
110 T *
get(
unsigned int subscriber_id) {
111 return getCommon(subscriber_id,
false);
115 return getCommon(subscriber_id,
true);
120 if(keep_freelist && m->checkMBoxTag(
this)) {
121 if(m->readyToDie()) {
122 boost::mutex::scoped_lock lock(free_lock);
125 else m->decReaderCount();
128 if(m->readyToDie())
delete m;
129 else m->decReaderCount();
137 boost::mutex::scoped_lock lock(free_lock);
138 if(!free_list.empty() && keep_freelist) {
139 ret = (T*) free_list.front();
148 boost::mutex::scoped_lock lock(free_lock);
158 typename std::map<int, typename MultiMBox< T >::Subscriber * >::iterator sl;
159 for(sl = subscribers.begin();
160 sl != subscribers.end();
162 Subscriber * s = sl->second;
164 boost::mutex::scoped_lock lock(s->postmutex);
165 int le = s->posted_list.size();
170 if(le > max_len) max_len = le;
182 bool flush(
unsigned int subscriber_id) {
184 while((dummy = getCommon(subscriber_id,
false)) != NULL) {
194 if(subscriber_id >= subscriber_count) {
202 Subscriber * s = subscribers[subscriber_id];
203 boost::mutex::scoped_lock lock(s->postmutex);
204 if(s->posted_list.empty()) {
209 while(s->posted_list.empty()) {
210 s->postcond.wait(s->postmutex);
217 ret = s->posted_list.front();
218 s->posted_list.pop();
bool flush(unsigned int subscriber_id)
flush all items from this mailbox for this subscriber
void setMBoxTag(void *tag)
unsigned int subscriber_count
std::map< int, Subscriber * > subscribers
T * getWait(unsigned int subscriber_id)
MultiMBox(bool _keep_freelist=true)
bool free(std::queue< MBoxMessage *> &free_list)
void setReaderCount(unsigned int rc)
std::queue< T * > posted_list
unsigned int inFlightCount()
unsigned int reader_count
boost::condition postcond
T * getCommon(unsigned int subscriber_id, bool wait)
bool checkMBoxTag(void *tag)
std::queue< MBoxMessage * > free_list