AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.4
MemorySubscriptionManager.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2023 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _MEMORYSUBSCRIPTIONMANAGER_H_
27 #define _MEMORYSUBSCRIPTIONMANAGER_H_
28 
29 #include <ampsplusplus.hpp>
30 #include <Field.hpp>
31 #include <algorithm>
32 #ifdef AMPS_USE_FUNCTIONAL
33  #include <forward_list>
34 #endif
35 #include <list>
36 #include <map>
37 #include <memory>
38 #include <set>
39 
44 
45 namespace AMPS
46 {
47 
52  {
53  protected:
54 
55  class SubscriptionInfo
56  {
57  public:
58  SubscriptionInfo(MessageHandler messageHandler_,
59  const Message& message_,
60  unsigned requestedAckTypes_)
61  : _handler(messageHandler_)
62  , _m(message_)
63  , _subId(message_.getSubscriptionId())
64  , _requestedAckTypes(requestedAckTypes_)
65  , _useBookmark(!message_.getBookmark().empty())
66  , _clearSubId(false)
67  {
68  std::string options = _m.getOptions();
69  size_t replace = options.find("replace");
70  // AMPS should be ok if options contains ,,
71  static const size_t replaceLen = 7;
72  if (replace != std::string::npos)
73  {
74  options.erase(replace, replaceLen);
75  _m.setOptions(options);
76  }
77  _paused = (options.find("pause") != std::string::npos);
78  }
79 
80  ~SubscriptionInfo()
81  {
82  if (_clearSubId)
83  {
84  _m.getSubscriptionId().clear();
85  }
86  }
87 
88  void resubscribe(Client& client_, int timeout_, void* userData_)
89  {
90  if (_useBookmark)
91  {
92  // Use the same bookmark for all members of a pause group
93  if (_paused && !_recent.empty())
94  {
95  _m.setBookmark(_recent);
96  }
97  else
98  {
99  _m.setBookmark(client_.getBookmarkStore().getMostRecent(_subId));
100  }
101  }
102  _m.newCommandId();
103  _m.setAckTypeEnum(_requestedAckTypes);
104  if (!userData_)
105  {
106  client_.send(_handler, _m, timeout_);
107  }
108  else
109  {
110  MessageHandler handler(_handler.function(), userData_);
111  client_.send(handler, _m, timeout_);
112  }
113  }
114 
115  MessageHandler messageHandler() const
116  {
117  return _handler;
118  }
119 
120  const Message::Field& subId() const
121  {
122  return _subId;
123  }
124 
125  // Returns true if the last subId is removed, false otherwise
126  bool removeSubId(const Message::Field& subId_)
127  {
128  size_t subIdLen = subId_.len();
129  const char* subIdData = subId_.data();
130  while (subIdLen && *subIdData == ',')
131  {
132  ++subIdData;
133  --subIdLen;
134  }
135  while (subIdLen && subIdData[subIdLen - 1] == ',')
136  {
137  --subIdLen;
138  }
139  if (subIdLen == 0 || subIdLen > _subId.len())
140  {
141  return _subId.empty();
142  }
143  bool match = true;
144  size_t matchStart = 0;
145  size_t matchCount = 0;
146  for (size_t i = 0; i < _subId.len(); ++i)
147  {
148  if (_subId.data()[i] == ',')
149  {
150  if (matchCount == subIdLen)
151  {
152  break;
153  }
154  matchStart = i + 1;
155  matchCount = 0;
156  match = true;
157  }
158  else if (match)
159  {
160  if (_subId.data()[i] == subIdData[matchCount])
161  {
162  ++matchCount;
163  }
164  else
165  {
166  matchCount = 0;
167  match = false;
168  }
169  }
170  }
171  if (match && matchCount == subIdLen)
172  {
173  size_t newLen = _subId.len() - matchCount;
174  if (newLen > 1) // More than just ,
175  {
176  while (matchStart + matchCount < _subId.len() &&
177  _subId.data()[matchStart + matchCount] == ',')
178  {
179  ++matchCount;
180  --newLen;
181  }
182  char* buffer = new char[newLen];
183  // Match is not first
184  if (matchStart > 0)
185  {
186  memcpy(buffer, _subId.data(), matchStart);
187  }
188  // Match is not last
189  if (matchStart + matchCount < _subId.len())
190  {
191  memcpy(buffer + matchStart,
192  _subId.data() + matchStart + matchCount,
193  _subId.len() - matchStart - matchCount);
194  }
195  if (_clearSubId)
196  {
197  _m.getSubscriptionId().clear();
198  }
199  else
200  {
201  _clearSubId = true;
202  }
203  _m.assignSubscriptionId(buffer, newLen);
204  _subId = _m.getSubscriptionId();
205  return false;
206  }
207  else
208  {
209  if (_clearSubId)
210  {
211  _m.getSubscriptionId().clear();
212  _clearSubId = false;
213  }
214  else
215  {
216  _m.getSubscriptionId().assign(NULL, 0);
217  }
218  _subId = _m.getSubscriptionId();
219  return true;
220  }
221  }
222  return _subId.empty();
223  }
224 
225  bool paused() const
226  {
227  return _paused;
228  }
229 
230  void pause()
231  {
232  if (_paused)
233  {
234  return;
235  }
236  std::string opts(Message::Options::Pause());
237  opts.append(_m.getOptions());
238  _m.setOptions(opts);
239  _paused = true;
240  }
241 
242  std::string getMostRecent(Client& client_)
243  {
244  if (!_recent.empty())
245  {
246  return _recent;
247  }
248  std::map<amps_uint64_t, amps_uint64_t> publishers;
249  const char* start = _subId.data();
250  const char* end = _subId.data() + _subId.len();
251  while (start < end)
252  {
253  const char* comma = (const char*)memchr(start, ',',
254  (size_t)(end - start));
255  // No more commas found, just use start->end
256  if (!comma)
257  {
258  comma = end;
259  }
260  if (comma == start)
261  {
262  start = comma + 1;
263  continue;
264  }
265  Message::Field sid(start, (size_t)(comma - start));
266  Message::Field sidRecent = client_.getBookmarkStore().getMostRecent(sid);
267  const char* sidRecentStart = sidRecent.data();
268  const char* sidRecentEnd = sidRecent.data() + sidRecent.len();
269  while (sidRecentStart < sidRecentEnd)
270  {
271  const char* sidRecentComma = (const char*)
272  memchr(sidRecentStart, ',',
273  (size_t)(sidRecentEnd - sidRecentStart));
274  // No more commas found, just use start->end
275  if (!sidRecentComma)
276  {
277  sidRecentComma = sidRecentEnd;
278  }
279  if (sidRecentComma == sidRecentStart)
280  {
281  sidRecentStart = sidRecentComma + 1;
282  continue;
283  }
284  Message::Field bookmark(sidRecentStart,
285  (size_t)(sidRecentComma - sidRecentStart));
286  amps_uint64_t publisher = (amps_uint64_t)0;
287  amps_uint64_t seq = (amps_uint64_t)0;
288  Field::parseBookmark(bookmark, publisher, seq);
289  if (publishers.count(publisher) == 0
290  || publishers[publisher] > seq)
291  {
292  publishers[publisher] = seq;
293  }
294  // Move past comma
295  sidRecentStart = sidRecentComma + 1;
296  }
297  // Move past comma
298  start = comma + 1;
299  }
300  std::ostringstream os;
301  for (std::map<amps_uint64_t, amps_uint64_t>::iterator i = publishers.begin();
302  i != publishers.end();
303  ++i)
304  {
305  if (i->first == 0 && i->second == 0)
306  {
307  continue;
308  }
309  if (os.tellp() > 0)
310  {
311  os << ',';
312  }
313  os << i->first << '|' << i->second << "|";
314  }
315  _recent = os.str();
316  return _recent;
317  }
318 
319  void setMostRecent(const std::string& recent_)
320  {
321  _recent = recent_;
322  }
323 
324  private:
325  std::string _recent;
326  MessageHandler _handler;
327  Message _m;
328  Message::Field _subId;
329  unsigned _requestedAckTypes;
330  bool _useBookmark;
331  bool _paused;
332  bool _clearSubId;
333 
334  };//class SubscriptionInfo
335 
336  class Resubscriber
337  {
338  Client& _client;
339  int _timeout;
340  public:
341  Resubscriber(Client& client_, int timeout_)
342  : _client(client_)
343  , _timeout(timeout_)
344  { }
345  void operator()(std::pair<Message::Field, SubscriptionInfo*> iter_)
346  {
347  void* data = amps_invoke_copy_route_function(iter_.second->messageHandler().userData());
348  iter_.second->resubscribe(_client, _timeout, data);
349  }
350  void operator()(SubscriptionInfo* iter_)
351  {
352  void* data = amps_invoke_copy_route_function(iter_->messageHandler().userData());
353  iter_->resubscribe(_client, _timeout, data);
354  }
355  };
356 
357  class Deleter
358  {
359  bool _clearSubId;
360  public:
361  Deleter(bool clearSubId_ = false)
362  : _clearSubId(clearSubId_)
363  { }
364  void operator()(std::pair<Message::Field, SubscriptionInfo*> iter_)
365  {
366  if (_clearSubId)
367  {
368  iter_.first.clear();
369  }
370  else
371  {
372  amps_invoke_remove_route_function(iter_.second->messageHandler().userData());
373  delete iter_.second;
374  }
375  }
376  void operator()(SubscriptionInfo* iter_)
377  {
378  delete iter_;
379  }
380  };
381 
382  virtual SubscriptionInfo* createSubscriptionInfo(MessageHandler messageHandler_,
383  const Message& message_,
384  unsigned requestedAckTypes_)
385  {
386  return new SubscriptionInfo(messageHandler_, message_,
387  requestedAckTypes_);
388  }
389 
390  public:
391  typedef std::map<Message::Field, SubscriptionInfo*, Message::Field::FieldHash> SubscriptionMap;
392 
394  : _resubscribing(0)
395  , _resubscriptionTimeout(getDefaultResubscriptionTimeout())
396  { ; }
397 
399  {
400  _clear();
401  }
402 
409  void subscribe(MessageHandler messageHandler_,
410  const Message& message_, unsigned requestedAckTypes_)
411  {
412  const Message::Field& subId = message_.getSubscriptionId();
413  if (!subId.empty())
414  {
415  Lock<Mutex> l(_lock);
416  while (_resubscribing != 0)
417  {
418  _lock.wait(10);
419  }
420  std::string options = message_.getOptions();
421  if (options.find("resume") != std::string::npos)
422  {
423  // For a resume, we store each sub id with a single Subscription
424  SubscriptionInfo* subInfo = createSubscriptionInfo(MessageHandler(),
425  message_,
426  requestedAckTypes_);
427  bool saved = false;
428  Field fullSubId = subInfo->subId();
429  const char* start = fullSubId.data();
430  const char* end = fullSubId.data() + fullSubId.len();
431  while (start < end)
432  {
433  const char* comma = (const char*)memchr(start, ',',
434  (size_t)(end - start));
435  // No more commas found, just use start->end
436  if (!comma)
437  {
438  comma = end;
439  }
440  if (comma == start)
441  {
442  start = comma + 1;
443  continue;
444  }
445  Message::Field sid = Message::Field(start,
446  (size_t)(comma - start));
447  // Calling resume on something already resumed is ignored,
448  // so don't update anything that exists.
449  if (_resumed.find(sid) == _resumed.end())
450  {
451  _resumed[sid.deepCopy()] = subInfo;
452  saved = true;
453  }
454  // Move past comma
455  start = comma + 1;
456  }
457  if (saved)
458  {
459  _resumedSet.insert(subInfo);
460  }
461  else
462  {
463  delete subInfo;
464  }
465  }
466  else if (options.find("pause") != std::string::npos)
467  {
468  const char* start = subId.data();
469  const char* end = subId.data() + subId.len();
470  while (start < end)
471  {
472  MessageHandler messageHandler = messageHandler_;
473  const char* comma = (const char*)memchr(start, ',',
474  (size_t)(end - start));
475  // No more commas found, just use start->end
476  if (!comma)
477  {
478  comma = end;
479  }
480  if (comma == start)
481  {
482  start = comma + 1;
483  continue;
484  }
485  Message::Field sid = Message::Field(start,
486  (size_t)(comma - start));
487  SubscriptionMap::iterator resume = _resumed.find(sid);
488  if (resume != _resumed.end())
489  {
490  SubscriptionInfo* subPtr = resume->second;
491  Message::Field subField(resume->first);
492  _resumed.erase(resume); // Remove mapping for sid
493  subField.clear();
494  // If last subId, remove completely
495  if (subPtr->removeSubId(sid))
496  {
497  _resumedSet.erase(subPtr);
498  delete subPtr;
499  }
500  }
501  // Move past comma
502  start = comma + 1;
503  SubscriptionMap::iterator item = _active.find(sid);
504  if (item != _active.end())
505  {
506  if (options.find("replace") != std::string::npos)
507  {
508  messageHandler = item->second->messageHandler();
509  delete item->second;
510  _active.erase(item);
511  }
512  else
513  {
514  item->second->pause();
515  continue; // Leave current one
516  }
517  }
518  else
519  {
520  Unlock<Mutex> u(_lock);
521  void* data = amps_invoke_copy_route_function(
522  messageHandler_.userData());
523  if (data)
524  {
525  messageHandler = MessageHandler(messageHandler_.function(), data);
526  }
527  }
528  Message m = message_.deepCopy();
529  m.setSubscriptionId(sid.data(), sid.len());
530  SubscriptionInfo* s = createSubscriptionInfo(messageHandler, m,
531  requestedAckTypes_);
532  // Insert using the subId from s, which is deep copy of original
533  _active[s->subId()] = s;
534  }
535  }
536  else // Not a pause or resume
537  {
538  MessageHandler messageHandler = messageHandler_;
539  SubscriptionMap::iterator item = _active.find(subId);
540  if (item != _active.end())
541  {
542  messageHandler = item->second->messageHandler();
543  delete item->second;
544  _active.erase(item);
545  }
546  else
547  {
548  Unlock<Mutex> u(_lock);
549  void* data = amps_invoke_copy_route_function(
550  messageHandler_.userData());
551  if (data)
552  {
553  messageHandler = MessageHandler(messageHandler_.function(), data);
554  }
555  }
556  SubscriptionInfo* s = createSubscriptionInfo(messageHandler,
557  message_,
558  requestedAckTypes_);
559  // Insert using the subId from s, which is deep copy of original
560  _active[s->subId()] = s;
561  }
562  }
563  }
564 
568  void unsubscribe(const Message::Field& subId_)
569  {
570  Lock<Mutex> l(_lock);
571  SubscriptionMap::iterator item = _active.find(subId_);
572  if (item != _active.end())
573  {
574  SubscriptionInfo* subPtr = item->second;
575  _active.erase(item);
576  while (_resubscribing != 0)
577  {
578  _lock.wait(10);
579  }
580  Unlock<Mutex> u(_lock);
581  amps_invoke_remove_route_function(subPtr->messageHandler().userData());
582  delete subPtr;
583  }
584  item = _resumed.find(subId_);
585  if (item != _resumed.end())
586  {
587  SubscriptionInfo* subPtr = item->second;
588  Message::Field subField(item->first);
589  _resumed.erase(item);
590  subField.clear();
591  // If last subId, remove completely
592  if (subPtr->removeSubId(subId_))
593  {
594  _resumedSet.erase(subPtr);
595  while (_resubscribing != 0)
596  {
597  _lock.wait(10);
598  }
599  delete subPtr;
600  }
601  }
602  }
603 
606  void clear()
607  {
608  _clear();
609  }
610 
613  void _clear()
614  {
615  Lock<Mutex> l(_lock);
616  while (_resubscribing != 0)
617  {
618  _lock.wait(10);
619  }
620  // Settting _resubscribing keeps other threads from touching data
621  // even if lock isn't held. Don't want to hold lock when
622  // amps_invoke_remove_route_function is called.
623  AtomicFlagFlip resubFlip(&_resubscribing);
624  {
625  Unlock<Mutex> u(_lock);
626  std::for_each(_active.begin(), _active.end(), Deleter());
627  std::for_each(_resumedSet.begin(), _resumedSet.end(), Deleter());
628  std::for_each(_resumed.begin(), _resumed.end(), Deleter(true));
629  }
630  _active.clear();
631  _resumed.clear();
632  _resumedSet.clear();
633  }
634 
638  void resubscribe(Client& client_)
639  {
640  // At this point, it's better to throw an exception back to disconnect
641  // handling than to attempt a reconnect in send, so turn off retry.
642  bool retry = client_.getRetryOnDisconnect();
643  client_.setRetryOnDisconnect(false);
644 #ifdef AMPS_USE_FUNCTIONAL
645  std::forward_list<SubscriptionInfo*> subscriptions;
646 #else
647  std::list<SubscriptionInfo*> subscriptions;
648 #endif
649  try
650  {
651  AtomicFlagFlip resubFlip(&_resubscribing);
652  {
653  Lock<Mutex> l(_lock);
654  subscriptions.assign(_resumedSet.begin(), _resumedSet.end());
655  for (SubscriptionMap::iterator iter = _active.begin();
656  iter != _active.end(); ++iter)
657  {
658  SubscriptionInfo* sub = iter->second;
659  if (sub->paused())
660  {
661  SubscriptionMap::iterator resIter = _resumed.find(sub->subId());
662  // All pause subs resuming together should be sent with
663  // bookmark as list of the resumes' most recents
664  if (resIter != _resumed.end())
665  {
666  sub->setMostRecent(resIter->second->getMostRecent(client_));
667  }
668  }
669  subscriptions.push_front(iter->second);
670  }
671  }
672  Resubscriber resubscriber(client_, _resubscriptionTimeout);
673  std::for_each(subscriptions.begin(), subscriptions.end(),
674  resubscriber);
675  client_.setRetryOnDisconnect(retry);
676  }
677  catch (const AMPSException& e)
678  {
679  client_.setRetryOnDisconnect(retry);
680  throw e;
681  }
682  catch (const std::exception& e)
683  {
684  client_.setRetryOnDisconnect(retry);
685  throw e;
686  }
687  }
688 
692  void setResubscriptionTimeout(int timeout_)
693  {
694  if (timeout_ >= 0)
695  {
696  _resubscriptionTimeout = timeout_;
697  }
698  }
699 
704  {
705  return _resubscriptionTimeout;
706  }
707 
712  static int setDefaultResubscriptionTimeout(int timeout_)
713  {
714  static int _defaultResubscriptionTimeout =
715  AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT;
716  if (timeout_ >= 0)
717  {
718  _defaultResubscriptionTimeout = timeout_;
719  }
720  return _defaultResubscriptionTimeout;
721  }
722 
728  {
730  }
731 
732  private:
733 
734  SubscriptionMap _active;
735  SubscriptionMap _resumed;
736  std::set<SubscriptionInfo*> _resumedSet;
737  Mutex _lock;
738  ATOMIC_TYPE_8 _resubscribing;
739  int _resubscriptionTimeout;
740  }; //class MemorySubscriptionManager
741 
742 } // namespace AMPS
743 
744 #endif //_MEMORYSUBSCRIPTIONMANAGER_H_
745 
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1373
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:548
int getResubscriptionTimeout(void)
Gets the timeout used when trying to resubscribe after disconnect.
Definition: MemorySubscriptionManager.hpp:703
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:511
static int getDefaultResubscriptionTimeout(void)
Gets the default timeout used by new MemorySubscriptionManager objects when trying to resubscribe aft...
Definition: MemorySubscriptionManager.hpp:727
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5019
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:7084
void setResubscriptionTimeout(int timeout_)
Sets the timeout used when trying to resubscribe after disconnect.
Definition: MemorySubscriptionManager.hpp:692
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:246
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1229
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1262
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:259
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:4814
void _clear()
Clear all subscriptions from the manager.
Definition: MemorySubscriptionManager.hpp:613
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1373
void resubscribe(Client &client_)
Place all saved subscriptions on the provided Client.
Definition: MemorySubscriptionManager.hpp:638
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:127
Defines the AMPS::Field class, which represents the value of a field in a message.
Core type, function, and class declarations for the AMPS C++ client.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:266
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:5123
void subscribe(MessageHandler messageHandler_, const Message &message_, unsigned requestedAckTypes_)
Save a subscription so it can be placed again if a disconnect occurs.
Definition: MemorySubscriptionManager.hpp:409
void clear()
Clear all subscriptions from the manager.
Definition: MemorySubscriptionManager.hpp:606
Field represents the value of a single field in a Message.
Definition: Field.hpp:85
void unsubscribe(const Message::Field &subId_)
Remove the subscription from the manager.
Definition: MemorySubscriptionManager.hpp:568
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:217
A SubscriptionManager implementation that maintains subscriptions placed in memory so that they can b...
Definition: MemorySubscriptionManager.hpp:51
Definition: ampsplusplus.hpp:103
static int setDefaultResubscriptionTimeout(int timeout_)
Sets the default timeout used by new MemorySubscriptionManager objects when trying to resubscribe aft...
Definition: MemorySubscriptionManager.hpp:712
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1140
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:310
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:7093