26 #ifndef _MEMORYSUBSCRIPTIONMANAGER_H_ 27 #define _MEMORYSUBSCRIPTIONMANAGER_H_ 32 #ifdef AMPS_USE_FUNCTIONAL 33 #include <forward_list> 55 class SubscriptionInfo
60 unsigned requestedAckTypes_)
61 : _handler(messageHandler_)
64 , _requestedAckTypes(requestedAckTypes_)
68 std::string options = _m.getOptions();
69 size_t replace = options.find(
"replace");
71 static const size_t replaceLen = 7;
72 if (replace != std::string::npos)
74 options.erase(replace, replaceLen);
75 _m.setOptions(options);
77 _paused = (options.find(
"pause") != std::string::npos);
84 _m.getSubscriptionId().clear();
93 if (_paused && !_recent.empty())
95 _m.setBookmark(_recent);
103 _m.setAckTypeEnum(_requestedAckTypes);
106 client_.
send(_handler, _m, timeout_);
111 client_.
send(handler, _m, timeout_);
128 size_t subIdLen = subId_.
len();
129 const char* subIdData = subId_.
data();
130 while (subIdLen && *subIdData ==
',')
135 while (subIdLen && subIdData[subIdLen - 1] ==
',')
139 if (subIdLen == 0 || subIdLen > _subId.len())
141 return _subId.empty();
144 size_t matchStart = 0;
145 size_t matchCount = 0;
146 for (
size_t i = 0; i < _subId.len(); ++i)
148 if (_subId.data()[i] ==
',')
150 if (matchCount == subIdLen)
160 if (_subId.data()[i] == subIdData[matchCount])
171 if (match && matchCount == subIdLen)
173 size_t newLen = _subId.len() - matchCount;
176 while (matchStart + matchCount < _subId.len() &&
177 _subId.data()[matchStart + matchCount] ==
',')
182 char* buffer =
new char[newLen];
186 memcpy(buffer, _subId.data(), matchStart);
189 if (matchStart + matchCount < _subId.len())
191 memcpy(buffer + matchStart,
192 _subId.data() + matchStart + matchCount,
193 _subId.len() - matchStart - matchCount);
197 _m.getSubscriptionId().clear();
203 _m.assignSubscriptionId(buffer, newLen);
204 _subId = _m.getSubscriptionId();
211 _m.getSubscriptionId().clear();
216 _m.getSubscriptionId().assign(NULL, 0);
218 _subId = _m.getSubscriptionId();
222 return _subId.empty();
236 std::string opts(Message::Options::Pause());
237 opts.append(_m.getOptions());
242 std::string getMostRecent(
Client& client_)
244 if (!_recent.empty())
248 std::map<amps_uint64_t, amps_uint64_t> publishers;
249 const char* start = _subId.data();
250 const char* end = _subId.data() + _subId.len();
253 const char* comma = (
const char*)memchr(start,
',',
254 (
size_t)(end - start));
267 const char* sidRecentStart = sidRecent.
data();
268 const char* sidRecentEnd = sidRecent.
data() + sidRecent.
len();
269 while (sidRecentStart < sidRecentEnd)
271 const char* sidRecentComma = (
const char*)
272 memchr(sidRecentStart,
',',
273 (
size_t)(sidRecentEnd - sidRecentStart));
277 sidRecentComma = sidRecentEnd;
279 if (sidRecentComma == sidRecentStart)
281 sidRecentStart = sidRecentComma + 1;
285 (
size_t)(sidRecentComma - sidRecentStart));
286 amps_uint64_t publisher = (amps_uint64_t)0;
287 amps_uint64_t seq = (amps_uint64_t)0;
288 Field::parseBookmark(bookmark, publisher, seq);
289 if (publishers.count(publisher) == 0
290 || publishers[publisher] > seq)
292 publishers[publisher] = seq;
295 sidRecentStart = sidRecentComma + 1;
300 std::ostringstream os;
301 for (std::map<amps_uint64_t, amps_uint64_t>::iterator i = publishers.begin();
302 i != publishers.end();
305 if (i->first == 0 && i->second == 0)
313 os << i->first <<
'|' << i->second <<
"|";
319 void setMostRecent(
const std::string& recent_)
329 unsigned _requestedAckTypes;
341 Resubscriber(
Client& client_,
int timeout_)
345 void operator()(std::pair<Message::Field, SubscriptionInfo*> iter_)
347 void* data = amps_invoke_copy_route_function(iter_.second->messageHandler().userData());
348 iter_.second->resubscribe(_client, _timeout, data);
350 void operator()(SubscriptionInfo* iter_)
352 void* data = amps_invoke_copy_route_function(iter_->messageHandler().userData());
353 iter_->resubscribe(_client, _timeout, data);
361 Deleter(
bool clearSubId_ =
false)
362 : _clearSubId(clearSubId_)
364 void operator()(std::pair<Message::Field, SubscriptionInfo*> iter_)
372 amps_invoke_remove_route_function(iter_.second->messageHandler().userData());
376 void operator()(SubscriptionInfo* iter_)
382 virtual SubscriptionInfo* createSubscriptionInfo(
MessageHandler messageHandler_,
384 unsigned requestedAckTypes_)
386 return new SubscriptionInfo(messageHandler_, message_,
391 typedef std::map<Message::Field, SubscriptionInfo*, Message::Field::FieldHash> SubscriptionMap;
410 const Message& message_,
unsigned requestedAckTypes_)
415 Lock<Mutex> l(_lock);
416 while (_resubscribing != 0)
421 if (options.find(
"resume") != std::string::npos)
424 SubscriptionInfo* subInfo = createSubscriptionInfo(
MessageHandler(),
428 Field fullSubId = subInfo->subId();
429 const char* start = fullSubId.
data();
430 const char* end = fullSubId.
data() + fullSubId.
len();
433 const char* comma = (
const char*)memchr(start,
',',
434 (
size_t)(end - start));
446 (
size_t)(comma - start));
449 if (_resumed.find(sid) == _resumed.end())
459 _resumedSet.insert(subInfo);
466 else if (options.find(
"pause") != std::string::npos)
468 const char* start = subId.
data();
469 const char* end = subId.
data() + subId.
len();
473 const char* comma = (
const char*)memchr(start,
',',
474 (
size_t)(end - start));
486 (
size_t)(comma - start));
487 SubscriptionMap::iterator resume = _resumed.find(sid);
488 if (resume != _resumed.end())
490 SubscriptionInfo* subPtr = resume->second;
492 _resumed.erase(resume);
495 if (subPtr->removeSubId(sid))
497 _resumedSet.erase(subPtr);
503 SubscriptionMap::iterator item = _active.find(sid);
504 if (item != _active.end())
506 if (options.find(
"replace") != std::string::npos)
508 messageHandler = item->second->messageHandler();
514 item->second->pause();
520 Unlock<Mutex> u(_lock);
521 void* data = amps_invoke_copy_route_function(
522 messageHandler_.userData());
525 messageHandler =
MessageHandler(messageHandler_.function(), data);
530 SubscriptionInfo* s = createSubscriptionInfo(messageHandler, m,
533 _active[s->subId()] = s;
539 SubscriptionMap::iterator item = _active.find(subId);
540 if (item != _active.end())
542 messageHandler = item->second->messageHandler();
548 Unlock<Mutex> u(_lock);
549 void* data = amps_invoke_copy_route_function(
550 messageHandler_.userData());
553 messageHandler =
MessageHandler(messageHandler_.function(), data);
556 SubscriptionInfo* s = createSubscriptionInfo(messageHandler,
560 _active[s->subId()] = s;
570 Lock<Mutex> l(_lock);
571 SubscriptionMap::iterator item = _active.find(subId_);
572 if (item != _active.end())
574 SubscriptionInfo* subPtr = item->second;
576 while (_resubscribing != 0)
580 Unlock<Mutex> u(_lock);
581 amps_invoke_remove_route_function(subPtr->messageHandler().userData());
584 item = _resumed.find(subId_);
585 if (item != _resumed.end())
587 SubscriptionInfo* subPtr = item->second;
589 _resumed.erase(item);
592 if (subPtr->removeSubId(subId_))
594 _resumedSet.erase(subPtr);
595 while (_resubscribing != 0)
615 Lock<Mutex> l(_lock);
616 while (_resubscribing != 0)
623 AtomicFlagFlip resubFlip(&_resubscribing);
625 Unlock<Mutex> u(_lock);
626 std::for_each(_active.begin(), _active.end(), Deleter());
627 std::for_each(_resumedSet.begin(), _resumedSet.end(), Deleter());
628 std::for_each(_resumed.begin(), _resumed.end(), Deleter(
true));
644 #ifdef AMPS_USE_FUNCTIONAL 645 std::forward_list<SubscriptionInfo*> subscriptions;
647 std::list<SubscriptionInfo*> subscriptions;
651 AtomicFlagFlip resubFlip(&_resubscribing);
653 Lock<Mutex> l(_lock);
654 subscriptions.assign(_resumedSet.begin(), _resumedSet.end());
655 for (SubscriptionMap::iterator iter = _active.begin();
656 iter != _active.end(); ++iter)
658 SubscriptionInfo* sub = iter->second;
661 SubscriptionMap::iterator resIter = _resumed.find(sub->subId());
664 if (resIter != _resumed.end())
666 sub->setMostRecent(resIter->second->getMostRecent(client_));
669 subscriptions.push_front(iter->second);
672 Resubscriber resubscriber(client_, _resubscriptionTimeout);
673 std::for_each(subscriptions.begin(), subscriptions.end(),
677 catch (
const AMPSException& e)
682 catch (
const std::exception& e)
696 _resubscriptionTimeout = timeout_;
705 return _resubscriptionTimeout;
714 static int _defaultResubscriptionTimeout =
715 AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT;
718 _defaultResubscriptionTimeout = timeout_;
720 return _defaultResubscriptionTimeout;
734 SubscriptionMap _active;
735 SubscriptionMap _resumed;
736 std::set<SubscriptionInfo*> _resumedSet;
738 ATOMIC_TYPE_8 _resubscribing;
739 int _resubscriptionTimeout;
744 #endif //_MEMORYSUBSCRIPTIONMANAGER_H_ Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1373
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:548
int getResubscriptionTimeout(void)
Gets the timeout used when trying to resubscribe after disconnect.
Definition: MemorySubscriptionManager.hpp:703
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:511
static int getDefaultResubscriptionTimeout(void)
Gets the default timeout used by new MemorySubscriptionManager objects when trying to resubscribe aft...
Definition: MemorySubscriptionManager.hpp:727
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5019
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:7084
void setResubscriptionTimeout(int timeout_)
Sets the timeout used when trying to resubscribe after disconnect.
Definition: MemorySubscriptionManager.hpp:692
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:246
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1229
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1262
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:259
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:4814
void _clear()
Clear all subscriptions from the manager.
Definition: MemorySubscriptionManager.hpp:613
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1373
void resubscribe(Client &client_)
Place all saved subscriptions on the provided Client.
Definition: MemorySubscriptionManager.hpp:638
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:127
Defines the AMPS::Field class, which represents the value of a field in a message.
Core type, function, and class declarations for the AMPS C++ client.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:266
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:5123
void subscribe(MessageHandler messageHandler_, const Message &message_, unsigned requestedAckTypes_)
Save a subscription so it can be placed again if a disconnect occurs.
Definition: MemorySubscriptionManager.hpp:409
void clear()
Clear all subscriptions from the manager.
Definition: MemorySubscriptionManager.hpp:606
Field represents the value of a single field in a Message.
Definition: Field.hpp:85
void unsubscribe(const Message::Field &subId_)
Remove the subscription from the manager.
Definition: MemorySubscriptionManager.hpp:568
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:217
A SubscriptionManager implementation that maintains subscriptions placed in memory so that they can b...
Definition: MemorySubscriptionManager.hpp:51
Definition: ampsplusplus.hpp:103
static int setDefaultResubscriptionTimeout(int timeout_)
Sets the default timeout used by new MemorySubscriptionManager objects when trying to resubscribe aft...
Definition: MemorySubscriptionManager.hpp:712
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1140
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:310
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:7093