AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.0
MemorySubscriptionManager.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2021 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _MEMORYSUBSCRIPTIONMANAGER_H_
27 #define _MEMORYSUBSCRIPTIONMANAGER_H_
28 
29 #include <ampsplusplus.hpp>
30 #include <Field.hpp>
31 #include <algorithm>
32 #include <list>
33 #include <map>
34 #include <memory>
35 #include <set>
36 
41 
42 namespace AMPS
43 {
44 
49 {
50 private:
51 
52  class SubscriptionInfo
53  {
54  public:
55  SubscriptionInfo(MessageHandler messageHandler_,
56  const Message& message_,
57  unsigned requestedAckTypes_)
58  : _handler(messageHandler_)
59  , _m(message_)
60  , _subId(message_.getSubscriptionId())
61  , _requestedAckTypes(requestedAckTypes_)
62  , _bookmark(!message_.getBookmark().empty())
63  , _clearSubId(false)
64  {
65  std::string options = _m.getOptions();
66  size_t replace = options.find("replace");
67  // AMPS should be ok if options contains ,,
68  static const size_t replaceLen = 7;
69  if (replace != std::string::npos)
70  {
71  options.erase(replace, replaceLen);
72  _m.setOptions(options);
73  }
74  _paused = (options.find("pause") != std::string::npos);
75  }
76 
77  ~SubscriptionInfo()
78  {
79  if (_clearSubId)
80  {
81  _m.getSubscriptionId().clear();
82  }
83  }
84 
85  void resubscribe(Client& client_, int timeout_, void* userData_)
86  {
87  if(_bookmark)
88  {
89  if (_paused && !_recent.empty())
90  _m.setBookmark(_recent);
91  else
92  _m.setBookmark(client_.getBookmarkStore().getMostRecent(_subId));
93  }
94  _m.newCommandId();
95  _m.setAckTypeEnum(_requestedAckTypes);
96  if (!userData_)
97  {
98  client_.send(_handler, _m, timeout_);
99  }
100  else
101  {
102  MessageHandler handler(_handler.function(), userData_);
103  client_.send(handler, _m, timeout_);
104  }
105  }
106 
107  MessageHandler messageHandler() const
108  {
109  return _handler;
110  }
111 
112  const Message::Field& subId() const
113  {
114  return _subId;
115  }
116 
117  // Returns true if the last subId is removed, false otherwise
118  bool removeSubId(const Message::Field& subId_)
119  {
120  size_t subIdLen = subId_.len();
121  const char* subIdData = subId_.data();
122  while (subIdLen && *subIdData == ',')
123  {
124  ++subIdData;
125  --subIdLen;
126  }
127  while (subIdLen && subIdData[subIdLen-1] == ',')
128  {
129  --subIdLen;
130  }
131  if (subIdLen == 0 || subIdLen > _subId.len()) return _subId.empty();
132  bool match = true;
133  size_t matchStart = 0;
134  size_t matchCount = 0;
135  for (size_t i=0; i<_subId.len(); ++i)
136  {
137  if (_subId.data()[i] == ',')
138  {
139  if (matchCount == subIdLen) break;
140  matchStart = i+1;
141  matchCount = 0;
142  match = true;
143  }
144  else if (match)
145  {
146  if (_subId.data()[i] == subIdData[matchCount])
147  ++matchCount;
148  else
149  {
150  matchCount = 0;
151  match = false;
152  }
153  }
154  }
155  if (match && matchCount == subIdLen)
156  {
157  size_t newLen = _subId.len() - matchCount;
158  if (newLen > 1) // More than just ,
159  {
160  while (matchStart + matchCount < _subId.len() &&
161  _subId.data()[matchStart+matchCount] == ',')
162  {
163  ++matchCount;
164  --newLen;
165  }
166  char* buffer = new char[newLen];
167  // Match is not first
168  if (matchStart > 0)
169  {
170  memcpy(buffer, _subId.data(), matchStart);
171  }
172  // Match is not last
173  if (matchStart + matchCount < _subId.len())
174  {
175  memcpy(buffer+matchStart,
176  _subId.data()+matchStart+matchCount,
177  _subId.len()-matchStart-matchCount);
178  }
179  if (_clearSubId)
180  {
181  _m.getSubscriptionId().clear();
182  }
183  else
184  {
185  _clearSubId = true;
186  }
187  _m.assignSubscriptionId(buffer, newLen);
188  _subId = _m.getSubscriptionId();
189  return false;
190  }
191  else
192  {
193  if (_clearSubId)
194  {
195  _m.getSubscriptionId().clear();
196  _clearSubId = false;
197  }
198  else
199  {
200  _m.getSubscriptionId().assign(NULL, 0);
201  }
202  _subId = _m.getSubscriptionId();
203  return true;
204  }
205  }
206  return _subId.empty();
207  }
208 
209  bool paused() const
210  {
211  return _paused;
212  }
213 
214  void pause()
215  {
216  if (_paused) return;
217  std::string opts(Message::Options::Pause());
218  opts.append(_m.getOptions());
219  _m.setOptions(opts);
220  _paused = true;
221  }
222 
223  std::string getMostRecent(Client& client_)
224  {
225  if (!_recent.empty()) return _recent;
226  std::map<amps_uint64_t, amps_uint64_t> publishers;
227  const char* start = _subId.data();
228  const char* end = _subId.data() + _subId.len();
229  while (start < end)
230  {
231  const char* comma = (const char*)memchr(start, ',',
232  (size_t)(end-start));
233  // No more commas found, just use start->end
234  if(!comma) comma = end;
235  if(comma == start)
236  {
237  start = comma + 1;
238  continue;
239  }
240  Message::Field sid(start, (size_t)(comma-start));
241  Message::Field sidRecent = client_.getBookmarkStore().getMostRecent(sid);
242  const char* sidRecentStart = sidRecent.data();
243  const char* sidRecentEnd = sidRecent.data() + sidRecent.len();
244  while (sidRecentStart < sidRecentEnd)
245  {
246  const char* sidRecentComma = (const char*)
247  memchr(sidRecentStart, ',',
248  (size_t)(sidRecentEnd-sidRecentStart));
249  // No more commas found, just use start->end
250  if(!sidRecentComma) sidRecentComma = sidRecentEnd;
251  if(sidRecentComma == sidRecentStart)
252  {
253  sidRecentStart = sidRecentComma + 1;
254  continue;
255  }
256  Message::Field bookmark(sidRecentStart,
257  (size_t)(sidRecentComma-sidRecentStart));
258  amps_uint64_t publisher = (amps_uint64_t)0;
259  amps_uint64_t seq = (amps_uint64_t)0;
260  Field::parseBookmark(bookmark, publisher, seq);
261  if (publishers.count(publisher) == 0
262  || publishers[publisher] > seq)
263  {
264  publishers[publisher] = seq;
265  }
266  // Move past comma
267  sidRecentStart = sidRecentComma + 1;
268  }
269  // Move past comma
270  start = comma + 1;
271  }
272  std::ostringstream os;
273  for(std::map<amps_uint64_t, amps_uint64_t>::iterator i = publishers.begin();
274  i != publishers.end();
275  ++i)
276  {
277  if (i->first == 0 && i->second == 0) continue;
278  if (os.tellp() > 0) os << ',';
279  os << i->first << '|' << i->second << "|";
280  }
281  _recent = os.str();
282  return _recent;
283  }
284 
285  void setMostRecent(const std::string& recent_)
286  {
287  _recent = recent_;
288  }
289 
290  private:
291  std::string _recent;
292  MessageHandler _handler;
293  Message _m;
294  Message::Field _subId;
295  unsigned _requestedAckTypes;
296  bool _bookmark;
297  bool _paused;
298  bool _clearSubId;
299 
300  };//class SubscriptionInfo
301 
302  class Resubscriber
303  {
304  Client& _client;
305  int _timeout;
306  public:
307  Resubscriber(Client& client_, int timeout_)
308  : _client(client_)
309  , _timeout(timeout_)
310  { }
311  void operator()(std::pair<Message::Field, SubscriptionInfo*> iter_)
312  {
313  void* data = amps_invoke_copy_route_function(iter_.second->messageHandler().userData());
314  iter_.second->resubscribe(_client, _timeout, data);
315  }
316  void operator()(SubscriptionInfo* iter_)
317  {
318  void* data = amps_invoke_copy_route_function(iter_->messageHandler().userData());
319  iter_->resubscribe(_client, _timeout, data);
320  }
321  };
322 
323  class Deleter
324  {
325  bool _deleting;
326  bool _clearSubId;
327  public:
328  Deleter(bool deleting_, bool clearSubId_ = false)
329  : _deleting(deleting_)
330  , _clearSubId(clearSubId_)
331  { }
332  void operator()(std::pair<Message::Field, SubscriptionInfo*> iter_)
333  {
334  if (_clearSubId)
335  {
336  iter_.first.clear();
337  }
338  else
339  {
340  if (!_deleting)
341  {
342  amps_invoke_remove_route_function(iter_.second->messageHandler().userData());
343  }
344  delete iter_.second;
345  }
346  }
347  void operator()(SubscriptionInfo* iter_)
348  {
349  amps_invoke_remove_route_function(iter_->messageHandler()
350  .userData());
351  delete iter_;
352  }
353  };
354 
355 public:
356  typedef std::map<Message::Field, SubscriptionInfo*, Message::Field::FieldHash> SubscriptionMap;
357 
359  : _resubscribing(0)
360  , _resubscriptionTimeout(getDefaultResubscriptionTimeout())
361  { ; }
362 
364  {
365  _clear(true);
366  }
367 
374  void subscribe(MessageHandler messageHandler_,
375  const Message& message_, unsigned requestedAckTypes_)
376  {
377  const Message::Field& subId = message_.getSubscriptionId();
378  if(!subId.empty())
379  {
380  Lock<Mutex> l(_lock);
381  while(_resubscribing != 0) _lock.wait(10);
382  std::string options = message_.getOptions();
383  if (options.find("resume") != std::string::npos)
384  {
385  // For a resume, we store each sub id with a single Subscription
386  SubscriptionInfo* subInfo = new SubscriptionInfo(MessageHandler(),
387  message_,
388  requestedAckTypes_);
389  bool saved = false;
390  Field fullSubId = subInfo->subId();
391  const char* start = fullSubId.data();
392  const char* end = fullSubId.data() + fullSubId.len();
393  while (start < end)
394  {
395  const char* comma = (const char*)memchr(start, ',',
396  (size_t)(end-start));
397  // No more commas found, just use start->end
398  if(!comma) comma = end;
399  if(comma == start)
400  {
401  start = comma + 1;
402  continue;
403  }
404  Message::Field sid = Message::Field(start,
405  (size_t)(comma-start));
406  // Calling resume on something already resumed is ignored,
407  // so don't update anything that exists.
408  if (_resumed.find(sid) == _resumed.end())
409  {
410  _resumed[sid.deepCopy()] = subInfo;
411  saved = true;
412  }
413  // Move past comma
414  start = comma + 1;
415  }
416  if (saved) _resumedSet.insert(subInfo);
417  else delete subInfo;
418  }
419  else if (options.find("pause") != std::string::npos)
420  {
421  const char* start = subId.data();
422  const char* end = subId.data() + subId.len();
423  while (start < end)
424  {
425  MessageHandler messageHandler = messageHandler_;
426  const char* comma = (const char*)memchr(start, ',',
427  (size_t)(end-start));
428  // No more commas found, just use start->end
429  if(!comma) comma = end;
430  if(comma == start)
431  {
432  start = comma + 1;
433  continue;
434  }
435  Message::Field sid = Message::Field(start,
436  (size_t)(comma-start));
437  SubscriptionMap::iterator resume = _resumed.find(sid);
438  if (resume != _resumed.end())
439  {
440  SubscriptionInfo* subPtr = resume->second;
441  Message::Field subField(resume->first);
442  _resumed.erase(resume); // Remove mapping for sid
443  subField.clear();
444  // If last subId, remove completely
445  if (subPtr->removeSubId(sid))
446  {
447  _resumedSet.erase(subPtr);
448  delete subPtr;
449  }
450  }
451  // Move past comma
452  start = comma + 1;
453  SubscriptionMap::iterator item = _active.find(sid);
454  if(item != _active.end())
455  {
456  if (options.find("replace") != std::string::npos)
457  {
458  messageHandler = item->second->messageHandler();
459  delete item->second;
460  _active.erase(item);
461  }
462  else
463  {
464  item->second->pause();
465  continue; // Leave current one
466  }
467  }
468  Message m = message_.deepCopy();
469  m.setSubscriptionId(sid.data(), sid.len());
470  SubscriptionInfo* s = new SubscriptionInfo(messageHandler, m,
471  requestedAckTypes_);
472  // Insert using the subId from s, which is deep copy of original
473  _active[s->subId()] = s;
474  }
475  }
476  else // Not a pause or resume
477  {
478  MessageHandler messageHandler = messageHandler_;
479  SubscriptionMap::iterator item = _active.find(subId);
480  if(item != _active.end())
481  {
482  messageHandler = item->second->messageHandler();
483  delete item->second;
484  _active.erase(item);
485  }
486  else
487  {
488  Unlock<Mutex> u(_lock);
489  void* data = amps_invoke_copy_route_function(messageHandler_
490  .userData());
491  if (data)
492  {
493  messageHandler = MessageHandler(messageHandler_.function(), data);
494  }
495  }
496  SubscriptionInfo* s = new SubscriptionInfo(messageHandler,
497  message_,
498  requestedAckTypes_);
499  // Insert using the subId from s, which is deep copy of original
500  _active[s->subId()] = s;
501  }
502  }
503  }
504 
508  void unsubscribe(const Message::Field& subId_)
509  {
510  Lock<Mutex> l(_lock);
511  SubscriptionMap::iterator item = _active.find(subId_);
512  if(item != _active.end())
513  {
514  SubscriptionInfo* subPtr = item->second;
515  _active.erase(item);
516  while(_resubscribing != 0) _lock.wait(10);
517  Unlock<Mutex> u(_lock);
518  amps_invoke_remove_route_function(subPtr->messageHandler().userData());
519  delete subPtr;
520  }
521  item = _resumed.find(subId_);
522  if(item != _resumed.end())
523  {
524  SubscriptionInfo* subPtr = item->second;
525  Message::Field subField(item->first);
526  _resumed.erase(item);
527  subField.clear();
528  // If last subId, remove completely
529  if (subPtr->removeSubId(subId_))
530  {
531  _resumedSet.erase(subPtr);
532  while(_resubscribing != 0) _lock.wait(10);
533  Unlock<Mutex> u(_lock);
534  amps_invoke_remove_route_function(subPtr->messageHandler().userData());
535  delete subPtr;
536  }
537  }
538  }
539 
542  void clear()
543  {
544  _clear();
545  }
546 
549  void _clear(bool deleting_=false)
550  {
551  Lock<Mutex> l(_lock);
552  while(_resubscribing != 0) _lock.wait(10);
553  // Settting _resubscribing keeps other threads from touching data
554  // even if lock isn't held. Don't want to hold lock when
555  // amps_invoke_remove_route_function is called.
556  AtomicFlagFlip resubFlip(&_resubscribing);
557  {
558  Unlock<Mutex> u(_lock);
559  std::for_each(_active.begin(), _active.end(), Deleter(deleting_));
560  std::for_each(_resumedSet.begin(), _resumedSet.end(), Deleter(deleting_));
561  std::for_each(_resumed.begin(), _resumed.end(), Deleter(deleting_, true));
562  }
563  _active.clear();
564  _resumed.clear();
565  _resumedSet.clear();
566  }
567 
571  void resubscribe(Client& client_)
572  {
573  // At this point, it's better to throw an exception back to disconnect
574  // handling than to attempt a reconnect in send, so turn off retry.
575  bool retry = client_.getRetryOnDisconnect();
576  client_.setRetryOnDisconnect(false);
577  std::list<SubscriptionInfo*> subscriptions;
578  try
579  {
580  AtomicFlagFlip resubFlip(&_resubscribing);
581  {
582  Lock<Mutex> l(_lock);
583  subscriptions.assign(_resumedSet.begin(), _resumedSet.end());
584  for (SubscriptionMap::iterator iter = _active.begin();
585  iter != _active.end(); ++iter)
586  {
587  SubscriptionInfo* sub = iter->second;
588  if (sub->paused())
589  {
590  SubscriptionMap::iterator resIter = _resumed.find(sub->subId());
591  // All pause subs resuming together should be sent with
592  // bookmark as list of the resumes' most recents
593  if (resIter != _resumed.end())
594  {
595  sub->setMostRecent(resIter->second->getMostRecent(client_));
596  }
597  }
598  subscriptions.push_front(iter->second);
599  }
600  }
601  Resubscriber resubscriber(client_, _resubscriptionTimeout);
602  std::for_each(subscriptions.begin(), subscriptions.end(),
603  resubscriber);
604  client_.setRetryOnDisconnect(retry);
605  }
606  catch(const AMPSException& e)
607  {
608  client_.setRetryOnDisconnect(retry);
609  throw e;
610  }
611  catch(const std::exception& e)
612  {
613  client_.setRetryOnDisconnect(retry);
614  throw e;
615  }
616  }
617 
621  void setResubscriptionTimeout(int timeout_)
622  {
623  if (timeout_ >= 0)
624  {
625  _resubscriptionTimeout = timeout_;
626  }
627  }
628 
633  {
634  return _resubscriptionTimeout;
635  }
636 
641  static int setDefaultResubscriptionTimeout(int timeout_)
642  {
643  static int _defaultResubscriptionTimeout =
644  AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT;
645  if (timeout_ >= 0)
646  {
647  _defaultResubscriptionTimeout = timeout_;
648  }
649  return _defaultResubscriptionTimeout;
650  }
651 
657  {
659  }
660 
661 private:
662 
663  SubscriptionMap _active;
664  SubscriptionMap _resumed;
665  std::set<SubscriptionInfo*> _resumedSet;
666  Mutex _lock;
667  ATOMIC_TYPE_8 _resubscribing;
668  int _resubscriptionTimeout;
669 }; //class MemorySubscriptionManager
670 
671 } // namespace AMPS
672 
673 #endif //_MEMORYSUBSCRIPTIONMANAGER_H_
674 
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1107
void _clear(bool deleting_=false)
Clear all subscriptions from the manager.
Definition: MemorySubscriptionManager.hpp:549
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:484
int getResubscriptionTimeout(void)
Gets the timeout used when trying to resubscribe after disconnect.
Definition: MemorySubscriptionManager.hpp:632
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:447
static int getDefaultResubscriptionTimeout(void)
Gets the default timeout used by new MemorySubscriptionManager objects when trying to resubscribe aft...
Definition: MemorySubscriptionManager.hpp:656
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4450
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:6412
void setResubscriptionTimeout(int timeout_)
Sets the timeout used when trying to resubscribe after disconnect.
Definition: MemorySubscriptionManager.hpp:621
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1043
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:4253
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1107
void resubscribe(Client &client_)
Place all saved subscriptions on the provided Client.
Definition: MemorySubscriptionManager.hpp:571
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:126
Defines the AMPS::Field class, which represents the value of a field in a message.
Core type, function, and class declarations for the AMPS C++ client.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:4554
void subscribe(MessageHandler messageHandler_, const Message &message_, unsigned requestedAckTypes_)
Save a subscription so it can be placed again if a disconnect occurs.
Definition: MemorySubscriptionManager.hpp:374
void clear()
Clear all subscriptions from the manager.
Definition: MemorySubscriptionManager.hpp:542
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
void unsubscribe(const Message::Field &subId_)
Remove the subscription from the manager.
Definition: MemorySubscriptionManager.hpp:508
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
A SubscriptionManager implementation that maintains subscriptions placed in memory so that they can b...
Definition: MemorySubscriptionManager.hpp:48
Definition: ampsplusplus.hpp:103
static int setDefaultResubscriptionTimeout(int timeout_)
Sets the default timeout used by new MemorySubscriptionManager objects when trying to resubscribe aft...
Definition: MemorySubscriptionManager.hpp:641
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1122
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:301
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:6421