26 #ifndef _MEMORYSUBSCRIPTIONMANAGER_H_ 27 #define _MEMORYSUBSCRIPTIONMANAGER_H_ 51 class SubscriptionInfo
56 unsigned requestedAckTypes_)
57 : _handler(messageHandler_)
58 , _requestedAckTypes(requestedAckTypes_)
64 std::string options = _m.getOptions();
65 size_t replace = options.find(
"replace");
67 static const size_t replaceLen = 7;
68 if (replace != std::string::npos)
70 options.erase(replace, replaceLen);
71 _m.setOptions(options);
73 _paused = (options.find(
"pause") != std::string::npos);
80 _m.getSubscriptionId().
clear();
88 if (_paused && !_recent.empty())
89 _m.setBookmark(_recent);
94 _m.setAckTypeEnum(_requestedAckTypes);
95 client_.
send(_handler, _m, timeout_);
111 size_t subIdLen = subId_.
len();
112 const char* subIdData = subId_.
data();
113 while (subIdLen && *subIdData ==
',')
118 while (subIdLen && subIdData[subIdLen-1] ==
',')
122 if (subIdLen == 0 || subIdLen > _subId.len())
return _subId.empty();
124 size_t matchStart = 0;
125 size_t matchCount = 0;
126 for (
size_t i=0; i<_subId.len(); ++i)
128 if (_subId.data()[i] ==
',')
130 if (matchCount == subIdLen)
break;
137 if (_subId.data()[i] == subIdData[matchCount])
146 if (match && matchCount == subIdLen)
148 size_t newLen = _subId.len() - matchCount;
151 while (matchStart + matchCount < _subId.len() &&
152 _subId.data()[matchStart+matchCount] ==
',')
157 char* buffer =
new char[newLen];
161 memcpy(buffer, _subId.data(), matchStart);
164 if (matchStart + matchCount < _subId.len())
166 memcpy(buffer+matchStart,
167 _subId.data()+matchStart+matchCount,
168 _subId.len()-matchStart-matchCount);
172 _m.getSubscriptionId().clear();
178 _m.assignSubscriptionId(buffer, newLen);
179 _subId = _m.getSubscriptionId();
186 _m.getSubscriptionId().clear();
191 _m.getSubscriptionId().assign(NULL, 0);
193 _subId = _m.getSubscriptionId();
197 return _subId.empty();
208 std::string opts(Message::Options::Pause());
209 opts.append(_m.getOptions());
214 std::string getMostRecent(
Client& client_)
216 if (!_recent.empty())
return _recent;
217 std::map<amps_uint64_t, amps_uint64_t> publishers;
218 const char* start = _subId.data();
219 const char* end = _subId.data() + _subId.len();
222 const char* comma = (
const char*)memchr(start,
',',
223 (
size_t)(end-start));
225 if(!comma) comma = end;
233 const char* sidRecentStart = sidRecent.
data();
234 const char* sidRecentEnd = sidRecent.
data() + sidRecent.
len();
235 while (sidRecentStart < sidRecentEnd)
237 const char* sidRecentComma = (
const char*)
238 memchr(sidRecentStart,
',',
239 (
size_t)(sidRecentEnd-sidRecentStart));
241 if(!sidRecentComma) sidRecentComma = sidRecentEnd;
242 if(sidRecentComma == sidRecentStart)
244 sidRecentStart = sidRecentComma + 1;
248 (
size_t)(sidRecentComma-sidRecentStart));
249 amps_uint64_t publisher = (amps_uint64_t)0;
250 amps_uint64_t seq = (amps_uint64_t)0;
251 Field::parseBookmark(bookmark, publisher, seq);
252 if (publishers.count(publisher) == 0
253 || publishers[publisher] > seq)
255 publishers[publisher] = seq;
258 sidRecentStart = sidRecentComma + 1;
263 std::ostringstream os;
264 for(std::map<amps_uint64_t, amps_uint64_t>::iterator i = publishers.begin();
265 i != publishers.end();
268 if (i->first == 0 && i->second == 0)
continue;
269 if (os.tellp() > 0) os <<
',';
270 os << i->first <<
'|' << i->second <<
"|";
276 void setMostRecent(
const std::string& recent_)
286 unsigned _requestedAckTypes;
298 Resubscriber(
Client& client_,
int timeout_)
299 : _client(client_), _timeout(timeout_) {}
300 void operator()(std::pair<Message::Field, SubscriptionInfo*> iter_)
302 iter_.second->resubscribe(_client, _timeout);
304 void operator()(SubscriptionInfo* iter_)
306 iter_->resubscribe(_client, _timeout);
314 Deleter(
bool clearSubId_ =
false) : _clearSubId(clearSubId_) {}
315 void operator()(std::pair<Message::Field, SubscriptionInfo*> iter_)
317 if (_clearSubId) iter_.first.clear();
318 else delete iter_.second;
320 void operator()(SubscriptionInfo* iter_)
327 typedef std::map<Message::Field, SubscriptionInfo*, Message::Field::FieldHash> SubscriptionMap;
346 const Message& message_,
unsigned requestedAckTypes_)
351 Lock<Mutex> l(_lock);
352 while(_resubscribing != 0) _lock.wait(10);
353 std::string options = message_.getOptions();
354 if (options.find(
"resume") != std::string::npos)
357 SubscriptionInfo* subInfo =
new SubscriptionInfo(
MessageHandler(),
361 const char* start = subInfo->subId().data();
362 const char* end = subInfo->subId().data() + subInfo->subId().len();
365 const char* comma = (
const char*)memchr(start,
',',
366 (
size_t)(end-start));
368 if(!comma) comma = end;
375 (
size_t)(comma-start));
378 if (_resumed.find(sid) == _resumed.end())
386 if (saved) _resumedSet.insert(subInfo);
389 else if (options.find(
"pause") != std::string::npos)
391 const char* start = subId.
data();
392 const char* end = subId.
data() + subId.
len();
396 const char* comma = (
const char*)memchr(start,
',',
397 (
size_t)(end-start));
399 if(!comma) comma = end;
406 (
size_t)(comma-start));
407 SubscriptionMap::iterator resume = _resumed.find(sid);
408 if (resume != _resumed.end())
410 SubscriptionInfo* subPtr = resume->second;
412 _resumed.erase(resume);
415 if (subPtr->removeSubId(sid))
417 _resumedSet.erase(subPtr);
423 SubscriptionMap::iterator item = _active.find(sid);
424 if(item != _active.end())
426 if (options.find(
"replace") != std::string::npos)
428 messageHandler = item->second->messageHandler();
434 item->second->pause();
440 SubscriptionInfo* s =
new SubscriptionInfo(messageHandler, m,
443 _active[s->subId()] = s;
449 SubscriptionMap::iterator item = _active.find(subId);
450 if(item != _active.end())
452 messageHandler = item->second->messageHandler();
456 SubscriptionInfo* s =
new SubscriptionInfo(messageHandler,
460 _active[s->subId()] = s;
470 Lock<Mutex> l(_lock);
471 SubscriptionMap::iterator item = _active.find(subId_);
472 if(item != _active.end())
474 SubscriptionInfo* subPtr = item->second;
476 while(_resubscribing != 0) _lock.wait(10);
479 item = _resumed.find(subId_);
480 if(item != _resumed.end())
482 SubscriptionInfo* subPtr = item->second;
484 _resumed.erase(item);
487 if (subPtr->removeSubId(subId_))
489 _resumedSet.erase(subPtr);
490 while(_resubscribing != 0) _lock.wait(10);
500 Lock<Mutex> l(_lock);
501 while(_resubscribing != 0) _lock.wait(10);
502 std::for_each(_active.begin(), _active.end(), Deleter());
503 std::for_each(_resumedSet.begin(), _resumedSet.end(), Deleter());
504 std::for_each(_resumed.begin(), _resumed.end(), Deleter(
true));
515 std::list<SubscriptionInfo*> subscriptions;
518 AtomicFlagFlip resubFlip(&_resubscribing);
520 Lock<Mutex> l(_lock);
521 subscriptions.assign(_resumedSet.begin(), _resumedSet.end());
522 for (SubscriptionMap::iterator iter = _active.begin();
523 iter != _active.end(); ++iter)
525 SubscriptionInfo* sub = iter->second;
528 SubscriptionMap::iterator resIter = _resumed.find(sub->subId());
531 if (resIter != _resumed.end())
533 sub->setMostRecent(resIter->second->getMostRecent(client_));
536 subscriptions.push_front(iter->second);
539 Resubscriber resubscriber(client_, _resubscriptionTimeout);
540 std::for_each(subscriptions.begin(), subscriptions.end(),
543 catch(
const AMPSException& e)
547 catch(
const std::exception& e)
560 _resubscriptionTimeout = timeout_;
569 return _resubscriptionTimeout;
578 static int _defaultResubscriptionTimeout =
579 AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT;
582 _defaultResubscriptionTimeout = timeout_;
584 return _defaultResubscriptionTimeout;
598 SubscriptionMap _active;
599 SubscriptionMap _resumed;
600 std::set<SubscriptionInfo*> _resumedSet;
602 ATOMIC_TYPE_8 _resubscribing;
603 int _resubscriptionTimeout;
608 #endif //_MEMORYSUBSCRIPTIONMANAGER_H_ Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1051
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:430
int getResubscriptionTimeout(void)
Gets the timeout used when trying to resubscribe after disconnect.
Definition: MemorySubscriptionManager.hpp:567
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:393
static int getDefaultResubscriptionTimeout(void)
Gets the default timeout used by new MemorySubscriptionManager objects when trying to resubscribe aft...
Definition: MemorySubscriptionManager.hpp:591
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4422
void setResubscriptionTimeout(int timeout_)
Sets the timeout used when trying to resubscribe after disconnect.
Definition: MemorySubscriptionManager.hpp:556
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:196
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1426
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:206
The main class for interacting with AMPS.
Definition: ampsplusplus.hpp:4233
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1051
void resubscribe(Client &client_)
Place all saved subscriptions on the provided Client.
Definition: MemorySubscriptionManager.hpp:513
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:93
Defines the AMPS::Field class, which represents the value of a field in a message.
Core type, function, and class declarations for the AMPS C++ client.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:213
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:4526
void subscribe(MessageHandler messageHandler_, const Message &message_, unsigned requestedAckTypes_)
Save a subscription so it can be placed again if a disconnect occurs.
Definition: MemorySubscriptionManager.hpp:345
void clear()
Clear all subscriptions from the manager.
Definition: MemorySubscriptionManager.hpp:498
Field represents the value of a single field in a Message.
Definition: Field.hpp:52
void unsubscribe(const Message::Field &subId_)
Remove the subscription from the manager.
Definition: MemorySubscriptionManager.hpp:468
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:167
A SubscriptionManager implementation that maintains subscriptions placed in memory so that they can b...
Definition: MemorySubscriptionManager.hpp:47
Definition: ampsplusplus.hpp:136
static int setDefaultResubscriptionTimeout(int timeout_)
Sets the default timeout used by new MemorySubscriptionManager objects when trying to resubscribe aft...
Definition: MemorySubscriptionManager.hpp:576
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1064
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: ampsplusplus.hpp:909