AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.1
MemorySubscriptionManager.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2021 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _MEMORYSUBSCRIPTIONMANAGER_H_
27 #define _MEMORYSUBSCRIPTIONMANAGER_H_
28 
29 #include <ampsplusplus.hpp>
30 #include <Field.hpp>
31 #include <algorithm>
32 #include <list>
33 #include <map>
34 #include <memory>
35 #include <set>
36 
41 
42 namespace AMPS
43 {
44 
49 {
50 private:
51 
52  class SubscriptionInfo
53  {
54  public:
55  SubscriptionInfo(MessageHandler messageHandler_,
56  const Message& message_,
57  unsigned requestedAckTypes_)
58  : _handler(messageHandler_)
59  , _m(message_)
60  , _subId(message_.getSubscriptionId())
61  , _requestedAckTypes(requestedAckTypes_)
62  , _bookmark(!message_.getBookmark().empty())
63  , _clearSubId(false)
64  {
65  std::string options = _m.getOptions();
66  size_t replace = options.find("replace");
67  // AMPS should be ok if options contains ,,
68  static const size_t replaceLen = 7;
69  if (replace != std::string::npos)
70  {
71  options.erase(replace, replaceLen);
72  _m.setOptions(options);
73  }
74  _paused = (options.find("pause") != std::string::npos);
75  }
76 
77  ~SubscriptionInfo()
78  {
79  if (_clearSubId)
80  {
81  _m.getSubscriptionId().clear();
82  }
83  }
84 
85  void resubscribe(Client& client_, int timeout_, void* userData_)
86  {
87  if(_bookmark)
88  {
89  if (_paused && !_recent.empty())
90  _m.setBookmark(_recent);
91  else
92  _m.setBookmark(client_.getBookmarkStore().getMostRecent(_subId));
93  }
94  _m.newCommandId();
95  _m.setAckTypeEnum(_requestedAckTypes);
96  if (!userData_)
97  {
98  client_.send(_handler, _m, timeout_);
99  }
100  else
101  {
102  MessageHandler handler(_handler.function(), userData_);
103  client_.send(handler, _m, timeout_);
104  }
105  }
106 
107  MessageHandler messageHandler() const
108  {
109  return _handler;
110  }
111 
112  const Message::Field& subId() const
113  {
114  return _subId;
115  }
116 
117  // Returns true if the last subId is removed, false otherwise
118  bool removeSubId(const Message::Field& subId_)
119  {
120  size_t subIdLen = subId_.len();
121  const char* subIdData = subId_.data();
122  while (subIdLen && *subIdData == ',')
123  {
124  ++subIdData;
125  --subIdLen;
126  }
127  while (subIdLen && subIdData[subIdLen-1] == ',')
128  {
129  --subIdLen;
130  }
131  if (subIdLen == 0 || subIdLen > _subId.len()) return _subId.empty();
132  bool match = true;
133  size_t matchStart = 0;
134  size_t matchCount = 0;
135  for (size_t i=0; i<_subId.len(); ++i)
136  {
137  if (_subId.data()[i] == ',')
138  {
139  if (matchCount == subIdLen) break;
140  matchStart = i+1;
141  matchCount = 0;
142  match = true;
143  }
144  else if (match)
145  {
146  if (_subId.data()[i] == subIdData[matchCount])
147  ++matchCount;
148  else
149  {
150  matchCount = 0;
151  match = false;
152  }
153  }
154  }
155  if (match && matchCount == subIdLen)
156  {
157  size_t newLen = _subId.len() - matchCount;
158  if (newLen > 1) // More than just ,
159  {
160  while (matchStart + matchCount < _subId.len() &&
161  _subId.data()[matchStart+matchCount] == ',')
162  {
163  ++matchCount;
164  --newLen;
165  }
166  char* buffer = new char[newLen];
167  // Match is not first
168  if (matchStart > 0)
169  {
170  memcpy(buffer, _subId.data(), matchStart);
171  }
172  // Match is not last
173  if (matchStart + matchCount < _subId.len())
174  {
175  memcpy(buffer+matchStart,
176  _subId.data()+matchStart+matchCount,
177  _subId.len()-matchStart-matchCount);
178  }
179  if (_clearSubId)
180  {
181  _m.getSubscriptionId().clear();
182  }
183  else
184  {
185  _clearSubId = true;
186  }
187  _m.assignSubscriptionId(buffer, newLen);
188  _subId = _m.getSubscriptionId();
189  return false;
190  }
191  else
192  {
193  if (_clearSubId)
194  {
195  _m.getSubscriptionId().clear();
196  _clearSubId = false;
197  }
198  else
199  {
200  _m.getSubscriptionId().assign(NULL, 0);
201  }
202  _subId = _m.getSubscriptionId();
203  return true;
204  }
205  }
206  return _subId.empty();
207  }
208 
209  bool paused() const
210  {
211  return _paused;
212  }
213 
214  void pause()
215  {
216  if (_paused) return;
217  std::string opts(Message::Options::Pause());
218  opts.append(_m.getOptions());
219  _m.setOptions(opts);
220  _paused = true;
221  }
222 
223  std::string getMostRecent(Client& client_)
224  {
225  if (!_recent.empty()) return _recent;
226  std::map<amps_uint64_t, amps_uint64_t> publishers;
227  const char* start = _subId.data();
228  const char* end = _subId.data() + _subId.len();
229  while (start < end)
230  {
231  const char* comma = (const char*)memchr(start, ',',
232  (size_t)(end-start));
233  // No more commas found, just use start->end
234  if(!comma) comma = end;
235  if(comma == start)
236  {
237  start = comma + 1;
238  continue;
239  }
240  Message::Field sid(start, (size_t)(comma-start));
241  Message::Field sidRecent = client_.getBookmarkStore().getMostRecent(sid);
242  const char* sidRecentStart = sidRecent.data();
243  const char* sidRecentEnd = sidRecent.data() + sidRecent.len();
244  while (sidRecentStart < sidRecentEnd)
245  {
246  const char* sidRecentComma = (const char*)
247  memchr(sidRecentStart, ',',
248  (size_t)(sidRecentEnd-sidRecentStart));
249  // No more commas found, just use start->end
250  if(!sidRecentComma) sidRecentComma = sidRecentEnd;
251  if(sidRecentComma == sidRecentStart)
252  {
253  sidRecentStart = sidRecentComma + 1;
254  continue;
255  }
256  Message::Field bookmark(sidRecentStart,
257  (size_t)(sidRecentComma-sidRecentStart));
258  amps_uint64_t publisher = (amps_uint64_t)0;
259  amps_uint64_t seq = (amps_uint64_t)0;
260  Field::parseBookmark(bookmark, publisher, seq);
261  if (publishers.count(publisher) == 0
262  || publishers[publisher] > seq)
263  {
264  publishers[publisher] = seq;
265  }
266  // Move past comma
267  sidRecentStart = sidRecentComma + 1;
268  }
269  // Move past comma
270  start = comma + 1;
271  }
272  std::ostringstream os;
273  for(std::map<amps_uint64_t, amps_uint64_t>::iterator i = publishers.begin();
274  i != publishers.end();
275  ++i)
276  {
277  if (i->first == 0 && i->second == 0) continue;
278  if (os.tellp() > 0) os << ',';
279  os << i->first << '|' << i->second << "|";
280  }
281  _recent = os.str();
282  return _recent;
283  }
284 
285  void setMostRecent(const std::string& recent_)
286  {
287  _recent = recent_;
288  }
289 
290  private:
291  std::string _recent;
292  MessageHandler _handler;
293  Message _m;
294  Message::Field _subId;
295  unsigned _requestedAckTypes;
296  bool _bookmark;
297  bool _paused;
298  bool _clearSubId;
299 
300  };//class SubscriptionInfo
301 
302  class Resubscriber
303  {
304  Client& _client;
305  int _timeout;
306  public:
307  Resubscriber(Client& client_, int timeout_)
308  : _client(client_)
309  , _timeout(timeout_)
310  { }
311  void operator()(std::pair<Message::Field, SubscriptionInfo*> iter_)
312  {
313  void* data = amps_invoke_copy_route_function(iter_.second->messageHandler().userData());
314  iter_.second->resubscribe(_client, _timeout, data);
315  }
316  void operator()(SubscriptionInfo* iter_)
317  {
318  void* data = amps_invoke_copy_route_function(iter_->messageHandler().userData());
319  iter_->resubscribe(_client, _timeout, data);
320  }
321  };
322 
323  class Deleter
324  {
325  bool _clearSubId;
326  public:
327  Deleter(bool clearSubId_ = false)
328  : _clearSubId(clearSubId_)
329  { }
330  void operator()(std::pair<Message::Field, SubscriptionInfo*> iter_)
331  {
332  if (_clearSubId)
333  {
334  iter_.first.clear();
335  }
336  else
337  {
338  amps_invoke_remove_route_function(iter_.second->messageHandler().userData());
339  delete iter_.second;
340  }
341  }
342  void operator()(SubscriptionInfo* iter_)
343  {
344  delete iter_;
345  }
346  };
347 
348 public:
349  typedef std::map<Message::Field, SubscriptionInfo*, Message::Field::FieldHash> SubscriptionMap;
350 
352  : _resubscribing(0)
353  , _resubscriptionTimeout(getDefaultResubscriptionTimeout())
354  { ; }
355 
357  {
358  _clear();
359  }
360 
367  void subscribe(MessageHandler messageHandler_,
368  const Message& message_, unsigned requestedAckTypes_)
369  {
370  const Message::Field& subId = message_.getSubscriptionId();
371  if(!subId.empty())
372  {
373  Lock<Mutex> l(_lock);
374  while(_resubscribing != 0) _lock.wait(10);
375  std::string options = message_.getOptions();
376  if (options.find("resume") != std::string::npos)
377  {
378  // For a resume, we store each sub id with a single Subscription
379  SubscriptionInfo* subInfo = new SubscriptionInfo(MessageHandler(),
380  message_,
381  requestedAckTypes_);
382  bool saved = false;
383  Field fullSubId = subInfo->subId();
384  const char* start = fullSubId.data();
385  const char* end = fullSubId.data() + fullSubId.len();
386  while (start < end)
387  {
388  const char* comma = (const char*)memchr(start, ',',
389  (size_t)(end-start));
390  // No more commas found, just use start->end
391  if(!comma) comma = end;
392  if(comma == start)
393  {
394  start = comma + 1;
395  continue;
396  }
397  Message::Field sid = Message::Field(start,
398  (size_t)(comma-start));
399  // Calling resume on something already resumed is ignored,
400  // so don't update anything that exists.
401  if (_resumed.find(sid) == _resumed.end())
402  {
403  _resumed[sid.deepCopy()] = subInfo;
404  saved = true;
405  }
406  // Move past comma
407  start = comma + 1;
408  }
409  if (saved) _resumedSet.insert(subInfo);
410  else delete subInfo;
411  }
412  else if (options.find("pause") != std::string::npos)
413  {
414  const char* start = subId.data();
415  const char* end = subId.data() + subId.len();
416  while (start < end)
417  {
418  MessageHandler messageHandler = messageHandler_;
419  const char* comma = (const char*)memchr(start, ',',
420  (size_t)(end-start));
421  // No more commas found, just use start->end
422  if(!comma) comma = end;
423  if(comma == start)
424  {
425  start = comma + 1;
426  continue;
427  }
428  Message::Field sid = Message::Field(start,
429  (size_t)(comma-start));
430  SubscriptionMap::iterator resume = _resumed.find(sid);
431  if (resume != _resumed.end())
432  {
433  SubscriptionInfo* subPtr = resume->second;
434  Message::Field subField(resume->first);
435  _resumed.erase(resume); // Remove mapping for sid
436  subField.clear();
437  // If last subId, remove completely
438  if (subPtr->removeSubId(sid))
439  {
440  _resumedSet.erase(subPtr);
441  delete subPtr;
442  }
443  }
444  // Move past comma
445  start = comma + 1;
446  SubscriptionMap::iterator item = _active.find(sid);
447  if(item != _active.end())
448  {
449  if (options.find("replace") != std::string::npos)
450  {
451  messageHandler = item->second->messageHandler();
452  delete item->second;
453  _active.erase(item);
454  }
455  else
456  {
457  item->second->pause();
458  continue; // Leave current one
459  }
460  }
461  else
462  {
463  Unlock<Mutex> u(_lock);
464  void* data = amps_invoke_copy_route_function(messageHandler_
465  .userData());
466  if (data)
467  {
468  messageHandler = MessageHandler(messageHandler_.function(), data);
469  }
470  }
471  Message m = message_.deepCopy();
472  m.setSubscriptionId(sid.data(), sid.len());
473  SubscriptionInfo* s = new SubscriptionInfo(messageHandler, m,
474  requestedAckTypes_);
475  // Insert using the subId from s, which is deep copy of original
476  _active[s->subId()] = s;
477  }
478  }
479  else // Not a pause or resume
480  {
481  MessageHandler messageHandler = messageHandler_;
482  SubscriptionMap::iterator item = _active.find(subId);
483  if(item != _active.end())
484  {
485  messageHandler = item->second->messageHandler();
486  delete item->second;
487  _active.erase(item);
488  }
489  else
490  {
491  Unlock<Mutex> u(_lock);
492  void* data = amps_invoke_copy_route_function(messageHandler_
493  .userData());
494  if (data)
495  {
496  messageHandler = MessageHandler(messageHandler_.function(), data);
497  }
498  }
499  SubscriptionInfo* s = new SubscriptionInfo(messageHandler,
500  message_,
501  requestedAckTypes_);
502  // Insert using the subId from s, which is deep copy of original
503  _active[s->subId()] = s;
504  }
505  }
506  }
507 
511  void unsubscribe(const Message::Field& subId_)
512  {
513  Lock<Mutex> l(_lock);
514  SubscriptionMap::iterator item = _active.find(subId_);
515  if(item != _active.end())
516  {
517  SubscriptionInfo* subPtr = item->second;
518  _active.erase(item);
519  while(_resubscribing != 0) _lock.wait(10);
520  Unlock<Mutex> u(_lock);
521  amps_invoke_remove_route_function(subPtr->messageHandler().userData());
522  delete subPtr;
523  }
524  item = _resumed.find(subId_);
525  if(item != _resumed.end())
526  {
527  SubscriptionInfo* subPtr = item->second;
528  Message::Field subField(item->first);
529  _resumed.erase(item);
530  subField.clear();
531  // If last subId, remove completely
532  if (subPtr->removeSubId(subId_))
533  {
534  _resumedSet.erase(subPtr);
535  while(_resubscribing != 0) _lock.wait(10);
536  delete subPtr;
537  }
538  }
539  }
540 
543  void clear()
544  {
545  _clear();
546  }
547 
550  void _clear()
551  {
552  Lock<Mutex> l(_lock);
553  while(_resubscribing != 0) _lock.wait(10);
554  // Settting _resubscribing keeps other threads from touching data
555  // even if lock isn't held. Don't want to hold lock when
556  // amps_invoke_remove_route_function is called.
557  AtomicFlagFlip resubFlip(&_resubscribing);
558  {
559  Unlock<Mutex> u(_lock);
560  std::for_each(_active.begin(), _active.end(), Deleter());
561  std::for_each(_resumedSet.begin(), _resumedSet.end(), Deleter());
562  std::for_each(_resumed.begin(), _resumed.end(), Deleter(true));
563  }
564  _active.clear();
565  _resumed.clear();
566  _resumedSet.clear();
567  }
568 
572  void resubscribe(Client& client_)
573  {
574  // At this point, it's better to throw an exception back to disconnect
575  // handling than to attempt a reconnect in send, so turn off retry.
576  bool retry = client_.getRetryOnDisconnect();
577  client_.setRetryOnDisconnect(false);
578  std::list<SubscriptionInfo*> subscriptions;
579  try
580  {
581  AtomicFlagFlip resubFlip(&_resubscribing);
582  {
583  Lock<Mutex> l(_lock);
584  subscriptions.assign(_resumedSet.begin(), _resumedSet.end());
585  for (SubscriptionMap::iterator iter = _active.begin();
586  iter != _active.end(); ++iter)
587  {
588  SubscriptionInfo* sub = iter->second;
589  if (sub->paused())
590  {
591  SubscriptionMap::iterator resIter = _resumed.find(sub->subId());
592  // All pause subs resuming together should be sent with
593  // bookmark as list of the resumes' most recents
594  if (resIter != _resumed.end())
595  {
596  sub->setMostRecent(resIter->second->getMostRecent(client_));
597  }
598  }
599  subscriptions.push_front(iter->second);
600  }
601  }
602  Resubscriber resubscriber(client_, _resubscriptionTimeout);
603  std::for_each(subscriptions.begin(), subscriptions.end(),
604  resubscriber);
605  client_.setRetryOnDisconnect(retry);
606  }
607  catch(const AMPSException& e)
608  {
609  client_.setRetryOnDisconnect(retry);
610  throw e;
611  }
612  catch(const std::exception& e)
613  {
614  client_.setRetryOnDisconnect(retry);
615  throw e;
616  }
617  }
618 
622  void setResubscriptionTimeout(int timeout_)
623  {
624  if (timeout_ >= 0)
625  {
626  _resubscriptionTimeout = timeout_;
627  }
628  }
629 
634  {
635  return _resubscriptionTimeout;
636  }
637 
642  static int setDefaultResubscriptionTimeout(int timeout_)
643  {
644  static int _defaultResubscriptionTimeout =
645  AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT;
646  if (timeout_ >= 0)
647  {
648  _defaultResubscriptionTimeout = timeout_;
649  }
650  return _defaultResubscriptionTimeout;
651  }
652 
658  {
660  }
661 
662 private:
663 
664  SubscriptionMap _active;
665  SubscriptionMap _resumed;
666  std::set<SubscriptionInfo*> _resumedSet;
667  Mutex _lock;
668  ATOMIC_TYPE_8 _resubscribing;
669  int _resubscriptionTimeout;
670 }; //class MemorySubscriptionManager
671 
672 } // namespace AMPS
673 
674 #endif //_MEMORYSUBSCRIPTIONMANAGER_H_
675 
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1235
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:538
int getResubscriptionTimeout(void)
Gets the timeout used when trying to resubscribe after disconnect.
Definition: MemorySubscriptionManager.hpp:633
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:501
static int getDefaultResubscriptionTimeout(void)
Gets the default timeout used by new MemorySubscriptionManager objects when trying to resubscribe aft...
Definition: MemorySubscriptionManager.hpp:657
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4446
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:6408
void setResubscriptionTimeout(int timeout_)
Sets the timeout used when trying to resubscribe after disconnect.
Definition: MemorySubscriptionManager.hpp:622
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1043
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1152
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:4249
void _clear()
Clear all subscriptions from the manager.
Definition: MemorySubscriptionManager.hpp:550
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1235
void resubscribe(Client &client_)
Place all saved subscriptions on the provided Client.
Definition: MemorySubscriptionManager.hpp:572
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:126
Defines the AMPS::Field class, which represents the value of a field in a message.
Core type, function, and class declarations for the AMPS C++ client.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:4550
void subscribe(MessageHandler messageHandler_, const Message &message_, unsigned requestedAckTypes_)
Save a subscription so it can be placed again if a disconnect occurs.
Definition: MemorySubscriptionManager.hpp:367
void clear()
Clear all subscriptions from the manager.
Definition: MemorySubscriptionManager.hpp:543
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
void unsubscribe(const Message::Field &subId_)
Remove the subscription from the manager.
Definition: MemorySubscriptionManager.hpp:511
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
A SubscriptionManager implementation that maintains subscriptions placed in memory so that they can b...
Definition: MemorySubscriptionManager.hpp:48
Definition: ampsplusplus.hpp:103
static int setDefaultResubscriptionTimeout(int timeout_)
Sets the default timeout used by new MemorySubscriptionManager objects when trying to resubscribe aft...
Definition: MemorySubscriptionManager.hpp:642
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1034
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:301
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:6417