26 #ifndef _MEMORYSUBSCRIPTIONMANAGER_H_ 27 #define _MEMORYSUBSCRIPTIONMANAGER_H_ 52 class SubscriptionInfo
57 unsigned requestedAckTypes_)
58 : _handler(messageHandler_)
61 , _requestedAckTypes(requestedAckTypes_)
65 std::string options = _m.getOptions();
66 size_t replace = options.find(
"replace");
68 static const size_t replaceLen = 7;
69 if (replace != std::string::npos)
71 options.erase(replace, replaceLen);
72 _m.setOptions(options);
74 _paused = (options.find(
"pause") != std::string::npos);
81 _m.getSubscriptionId().clear();
89 if (_paused && !_recent.empty())
90 _m.setBookmark(_recent);
95 _m.setAckTypeEnum(_requestedAckTypes);
98 client_.
send(_handler, _m, timeout_);
103 client_.
send(handler, _m, timeout_);
120 size_t subIdLen = subId_.
len();
121 const char* subIdData = subId_.
data();
122 while (subIdLen && *subIdData ==
',')
127 while (subIdLen && subIdData[subIdLen-1] ==
',')
131 if (subIdLen == 0 || subIdLen > _subId.len())
return _subId.empty();
133 size_t matchStart = 0;
134 size_t matchCount = 0;
135 for (
size_t i=0; i<_subId.len(); ++i)
137 if (_subId.data()[i] ==
',')
139 if (matchCount == subIdLen)
break;
146 if (_subId.data()[i] == subIdData[matchCount])
155 if (match && matchCount == subIdLen)
157 size_t newLen = _subId.len() - matchCount;
160 while (matchStart + matchCount < _subId.len() &&
161 _subId.data()[matchStart+matchCount] ==
',')
166 char* buffer =
new char[newLen];
170 memcpy(buffer, _subId.data(), matchStart);
173 if (matchStart + matchCount < _subId.len())
175 memcpy(buffer+matchStart,
176 _subId.data()+matchStart+matchCount,
177 _subId.len()-matchStart-matchCount);
181 _m.getSubscriptionId().clear();
187 _m.assignSubscriptionId(buffer, newLen);
188 _subId = _m.getSubscriptionId();
195 _m.getSubscriptionId().clear();
200 _m.getSubscriptionId().assign(NULL, 0);
202 _subId = _m.getSubscriptionId();
206 return _subId.empty();
217 std::string opts(Message::Options::Pause());
218 opts.append(_m.getOptions());
223 std::string getMostRecent(
Client& client_)
225 if (!_recent.empty())
return _recent;
226 std::map<amps_uint64_t, amps_uint64_t> publishers;
227 const char* start = _subId.data();
228 const char* end = _subId.data() + _subId.len();
231 const char* comma = (
const char*)memchr(start,
',',
232 (
size_t)(end-start));
234 if(!comma) comma = end;
242 const char* sidRecentStart = sidRecent.
data();
243 const char* sidRecentEnd = sidRecent.
data() + sidRecent.
len();
244 while (sidRecentStart < sidRecentEnd)
246 const char* sidRecentComma = (
const char*)
247 memchr(sidRecentStart,
',',
248 (
size_t)(sidRecentEnd-sidRecentStart));
250 if(!sidRecentComma) sidRecentComma = sidRecentEnd;
251 if(sidRecentComma == sidRecentStart)
253 sidRecentStart = sidRecentComma + 1;
257 (
size_t)(sidRecentComma-sidRecentStart));
258 amps_uint64_t publisher = (amps_uint64_t)0;
259 amps_uint64_t seq = (amps_uint64_t)0;
260 Field::parseBookmark(bookmark, publisher, seq);
261 if (publishers.count(publisher) == 0
262 || publishers[publisher] > seq)
264 publishers[publisher] = seq;
267 sidRecentStart = sidRecentComma + 1;
272 std::ostringstream os;
273 for(std::map<amps_uint64_t, amps_uint64_t>::iterator i = publishers.begin();
274 i != publishers.end();
277 if (i->first == 0 && i->second == 0)
continue;
278 if (os.tellp() > 0) os <<
',';
279 os << i->first <<
'|' << i->second <<
"|";
285 void setMostRecent(
const std::string& recent_)
295 unsigned _requestedAckTypes;
307 Resubscriber(
Client& client_,
int timeout_)
311 void operator()(std::pair<Message::Field, SubscriptionInfo*> iter_)
313 void* data = amps_invoke_copy_route_function(iter_.second->messageHandler().userData());
314 iter_.second->resubscribe(_client, _timeout, data);
316 void operator()(SubscriptionInfo* iter_)
318 void* data = amps_invoke_copy_route_function(iter_->messageHandler().userData());
319 iter_->resubscribe(_client, _timeout, data);
327 Deleter(
bool clearSubId_ =
false)
328 : _clearSubId(clearSubId_)
330 void operator()(std::pair<Message::Field, SubscriptionInfo*> iter_)
338 amps_invoke_remove_route_function(iter_.second->messageHandler().userData());
342 void operator()(SubscriptionInfo* iter_)
349 typedef std::map<Message::Field, SubscriptionInfo*, Message::Field::FieldHash> SubscriptionMap;
368 const Message& message_,
unsigned requestedAckTypes_)
373 Lock<Mutex> l(_lock);
374 while(_resubscribing != 0) _lock.wait(10);
376 if (options.find(
"resume") != std::string::npos)
379 SubscriptionInfo* subInfo =
new SubscriptionInfo(
MessageHandler(),
383 Field fullSubId = subInfo->subId();
384 const char* start = fullSubId.
data();
385 const char* end = fullSubId.
data() + fullSubId.
len();
388 const char* comma = (
const char*)memchr(start,
',',
389 (
size_t)(end-start));
391 if(!comma) comma = end;
398 (
size_t)(comma-start));
401 if (_resumed.find(sid) == _resumed.end())
409 if (saved) _resumedSet.insert(subInfo);
412 else if (options.find(
"pause") != std::string::npos)
414 const char* start = subId.
data();
415 const char* end = subId.
data() + subId.
len();
419 const char* comma = (
const char*)memchr(start,
',',
420 (
size_t)(end-start));
422 if(!comma) comma = end;
429 (
size_t)(comma-start));
430 SubscriptionMap::iterator resume = _resumed.find(sid);
431 if (resume != _resumed.end())
433 SubscriptionInfo* subPtr = resume->second;
435 _resumed.erase(resume);
438 if (subPtr->removeSubId(sid))
440 _resumedSet.erase(subPtr);
446 SubscriptionMap::iterator item = _active.find(sid);
447 if(item != _active.end())
449 if (options.find(
"replace") != std::string::npos)
451 messageHandler = item->second->messageHandler();
457 item->second->pause();
463 Unlock<Mutex> u(_lock);
464 void* data = amps_invoke_copy_route_function(messageHandler_
468 messageHandler =
MessageHandler(messageHandler_.function(), data);
473 SubscriptionInfo* s =
new SubscriptionInfo(messageHandler, m,
476 _active[s->subId()] = s;
482 SubscriptionMap::iterator item = _active.find(subId);
483 if(item != _active.end())
485 messageHandler = item->second->messageHandler();
491 Unlock<Mutex> u(_lock);
492 void* data = amps_invoke_copy_route_function(messageHandler_
496 messageHandler =
MessageHandler(messageHandler_.function(), data);
499 SubscriptionInfo* s =
new SubscriptionInfo(messageHandler,
503 _active[s->subId()] = s;
513 Lock<Mutex> l(_lock);
514 SubscriptionMap::iterator item = _active.find(subId_);
515 if(item != _active.end())
517 SubscriptionInfo* subPtr = item->second;
519 while(_resubscribing != 0) _lock.wait(10);
520 Unlock<Mutex> u(_lock);
521 amps_invoke_remove_route_function(subPtr->messageHandler().userData());
524 item = _resumed.find(subId_);
525 if(item != _resumed.end())
527 SubscriptionInfo* subPtr = item->second;
529 _resumed.erase(item);
532 if (subPtr->removeSubId(subId_))
534 _resumedSet.erase(subPtr);
535 while(_resubscribing != 0) _lock.wait(10);
552 Lock<Mutex> l(_lock);
553 while(_resubscribing != 0) _lock.wait(10);
557 AtomicFlagFlip resubFlip(&_resubscribing);
559 Unlock<Mutex> u(_lock);
560 std::for_each(_active.begin(), _active.end(), Deleter());
561 std::for_each(_resumedSet.begin(), _resumedSet.end(), Deleter());
562 std::for_each(_resumed.begin(), _resumed.end(), Deleter(
true));
578 std::list<SubscriptionInfo*> subscriptions;
581 AtomicFlagFlip resubFlip(&_resubscribing);
583 Lock<Mutex> l(_lock);
584 subscriptions.assign(_resumedSet.begin(), _resumedSet.end());
585 for (SubscriptionMap::iterator iter = _active.begin();
586 iter != _active.end(); ++iter)
588 SubscriptionInfo* sub = iter->second;
591 SubscriptionMap::iterator resIter = _resumed.find(sub->subId());
594 if (resIter != _resumed.end())
596 sub->setMostRecent(resIter->second->getMostRecent(client_));
599 subscriptions.push_front(iter->second);
602 Resubscriber resubscriber(client_, _resubscriptionTimeout);
603 std::for_each(subscriptions.begin(), subscriptions.end(),
607 catch(
const AMPSException& e)
612 catch(
const std::exception& e)
626 _resubscriptionTimeout = timeout_;
635 return _resubscriptionTimeout;
644 static int _defaultResubscriptionTimeout =
645 AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT;
648 _defaultResubscriptionTimeout = timeout_;
650 return _defaultResubscriptionTimeout;
664 SubscriptionMap _active;
665 SubscriptionMap _resumed;
666 std::set<SubscriptionInfo*> _resumedSet;
668 ATOMIC_TYPE_8 _resubscribing;
669 int _resubscriptionTimeout;
674 #endif //_MEMORYSUBSCRIPTIONMANAGER_H_ Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1235
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:538
int getResubscriptionTimeout(void)
Gets the timeout used when trying to resubscribe after disconnect.
Definition: MemorySubscriptionManager.hpp:633
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:501
static int getDefaultResubscriptionTimeout(void)
Gets the default timeout used by new MemorySubscriptionManager objects when trying to resubscribe aft...
Definition: MemorySubscriptionManager.hpp:657
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4446
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:6408
void setResubscriptionTimeout(int timeout_)
Sets the timeout used when trying to resubscribe after disconnect.
Definition: MemorySubscriptionManager.hpp:622
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1043
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1152
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:4249
void _clear()
Clear all subscriptions from the manager.
Definition: MemorySubscriptionManager.hpp:550
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1235
void resubscribe(Client &client_)
Place all saved subscriptions on the provided Client.
Definition: MemorySubscriptionManager.hpp:572
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:126
Defines the AMPS::Field class, which represents the value of a field in a message.
Core type, function, and class declarations for the AMPS C++ client.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:4550
void subscribe(MessageHandler messageHandler_, const Message &message_, unsigned requestedAckTypes_)
Save a subscription so it can be placed again if a disconnect occurs.
Definition: MemorySubscriptionManager.hpp:367
void clear()
Clear all subscriptions from the manager.
Definition: MemorySubscriptionManager.hpp:543
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
void unsubscribe(const Message::Field &subId_)
Remove the subscription from the manager.
Definition: MemorySubscriptionManager.hpp:511
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
A SubscriptionManager implementation that maintains subscriptions placed in memory so that they can b...
Definition: MemorySubscriptionManager.hpp:48
Definition: ampsplusplus.hpp:103
static int setDefaultResubscriptionTimeout(int timeout_)
Sets the default timeout used by new MemorySubscriptionManager objects when trying to resubscribe aft...
Definition: MemorySubscriptionManager.hpp:642
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1034
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:301
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:6417