AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.0.5
MemorySubscriptionManager.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2020 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _MEMORYSUBSCRIPTIONMANAGER_H_
27 #define _MEMORYSUBSCRIPTIONMANAGER_H_
28 
29 #include <ampsplusplus.hpp>
30 #include <Field.hpp>
31 #include <algorithm>
32 #include <list>
33 #include <map>
34 #include <set>
35 
40 
41 namespace AMPS
42 {
43 
48 {
49 private:
50 
51  class SubscriptionInfo
52  {
53  public:
54  SubscriptionInfo(MessageHandler messageHandler_,
55  const Message& message_,
56  unsigned requestedAckTypes_)
57  : _handler(messageHandler_)
58  , _requestedAckTypes(requestedAckTypes_)
59  , _bookmark(!message_.getBookmark().empty())
60  , _clearSubId(false)
61  {
62  _m = message_;
63  _subId = _m.getSubscriptionId();
64  std::string options = _m.getOptions();
65  size_t replace = options.find("replace");
66  // AMPS should be ok if options contains ,,
67  static const size_t replaceLen = 7;
68  if (replace != std::string::npos)
69  {
70  options.erase(replace, replaceLen);
71  _m.setOptions(options);
72  }
73  _paused = (options.find("pause") != std::string::npos);
74  }
75 
76  ~SubscriptionInfo()
77  {
78  if (_clearSubId)
79  {
80  _m.getSubscriptionId().clear();
81  }
82  }
83 
84  void resubscribe(Client& client_, int timeout_)
85  {
86  if(_bookmark)
87  {
88  if (_paused && !_recent.empty())
89  _m.setBookmark(_recent);
90  else
91  _m.setBookmark(client_.getBookmarkStore().getMostRecent(_subId));
92  }
93  _m.newCommandId();
94  _m.setAckTypeEnum(_requestedAckTypes);
95  client_.send(_handler, _m, timeout_);
96  }
97 
98  MessageHandler messageHandler() const
99  {
100  return _handler;
101  }
102 
103  const Message::Field& subId() const
104  {
105  return _subId;
106  }
107 
108  // Returns true if the last subId is removed, false otherwise
109  bool removeSubId(const Message::Field& subId_)
110  {
111  size_t subIdLen = subId_.len();
112  const char* subIdData = subId_.data();
113  while (subIdLen && *subIdData == ',')
114  {
115  ++subIdData;
116  --subIdLen;
117  }
118  while (subIdLen && subIdData[subIdLen-1] == ',')
119  {
120  --subIdLen;
121  }
122  if (subIdLen == 0 || subIdLen > _subId.len()) return _subId.empty();
123  bool match = true;
124  size_t matchStart = 0;
125  size_t matchCount = 0;
126  for (size_t i=0; i<_subId.len(); ++i)
127  {
128  if (_subId.data()[i] == ',')
129  {
130  if (matchCount == subIdLen) break;
131  matchStart = i+1;
132  matchCount = 0;
133  match = true;
134  }
135  else if (match)
136  {
137  if (_subId.data()[i] == subIdData[matchCount])
138  ++matchCount;
139  else
140  {
141  matchCount = 0;
142  match = false;
143  }
144  }
145  }
146  if (match && matchCount == subIdLen)
147  {
148  size_t newLen = _subId.len() - matchCount;
149  if (newLen > 1) // More than just ,
150  {
151  while (matchStart + matchCount < _subId.len() &&
152  _subId.data()[matchStart+matchCount] == ',')
153  {
154  ++matchCount;
155  --newLen;
156  }
157  char* buffer = new char[newLen];
158  // Match is not first
159  if (matchStart > 0)
160  {
161  memcpy(buffer, _subId.data(), matchStart);
162  }
163  // Match is not last
164  if (matchStart + matchCount < _subId.len())
165  {
166  memcpy(buffer+matchStart,
167  _subId.data()+matchStart+matchCount,
168  _subId.len()-matchStart-matchCount);
169  }
170  if (_clearSubId)
171  {
172  _m.getSubscriptionId().clear();
173  }
174  else
175  {
176  _clearSubId = true;
177  }
178  _m.assignSubscriptionId(buffer, newLen);
179  _subId = _m.getSubscriptionId();
180  return false;
181  }
182  else
183  {
184  if (_clearSubId)
185  {
186  _m.getSubscriptionId().clear();
187  _clearSubId = false;
188  }
189  else
190  {
191  _m.getSubscriptionId().assign(NULL, 0);
192  }
193  _subId = _m.getSubscriptionId();
194  return true;
195  }
196  }
197  return _subId.empty();
198  }
199 
200  bool paused() const
201  {
202  return _paused;
203  }
204 
205  void pause()
206  {
207  if (_paused) return;
208  std::string opts(Message::Options::Pause());
209  opts.append(_m.getOptions());
210  _m.setOptions(opts);
211  _paused = true;
212  }
213 
214  std::string getMostRecent(Client& client_)
215  {
216  if (!_recent.empty()) return _recent;
217  std::map<amps_uint64_t, amps_uint64_t> publishers;
218  const char* start = _subId.data();
219  const char* end = _subId.data() + _subId.len();
220  while (start < end)
221  {
222  const char* comma = (const char*)memchr(start, ',',
223  (size_t)(end-start));
224  // No more commas found, just use start->end
225  if(!comma) comma = end;
226  if(comma == start)
227  {
228  start = comma + 1;
229  continue;
230  }
231  Message::Field sid(start, (size_t)(comma-start));
232  Message::Field sidRecent = client_.getBookmarkStore().getMostRecent(sid);
233  const char* sidRecentStart = sidRecent.data();
234  const char* sidRecentEnd = sidRecent.data() + sidRecent.len();
235  while (sidRecentStart < sidRecentEnd)
236  {
237  const char* sidRecentComma = (const char*)
238  memchr(sidRecentStart, ',',
239  (size_t)(sidRecentEnd-sidRecentStart));
240  // No more commas found, just use start->end
241  if(!sidRecentComma) sidRecentComma = sidRecentEnd;
242  if(sidRecentComma == sidRecentStart)
243  {
244  sidRecentStart = sidRecentComma + 1;
245  continue;
246  }
247  Message::Field bookmark(sidRecentStart,
248  (size_t)(sidRecentComma-sidRecentStart));
249  amps_uint64_t publisher = (amps_uint64_t)0;
250  amps_uint64_t seq = (amps_uint64_t)0;
251  Field::parseBookmark(bookmark, publisher, seq);
252  if (publishers.count(publisher) == 0
253  || publishers[publisher] > seq)
254  {
255  publishers[publisher] = seq;
256  }
257  // Move past comma
258  sidRecentStart = sidRecentComma + 1;
259  }
260  // Move past comma
261  start = comma + 1;
262  }
263  std::ostringstream os;
264  for(std::map<amps_uint64_t, amps_uint64_t>::iterator i = publishers.begin();
265  i != publishers.end();
266  ++i)
267  {
268  if (i->first == 0 && i->second == 0) continue;
269  if (os.tellp() > 0) os << ',';
270  os << i->first << '|' << i->second << "|";
271  }
272  _recent = os.str();
273  return _recent;
274  }
275 
276  void setMostRecent(const std::string& recent_)
277  {
278  _recent = recent_;
279  }
280 
281  private:
282  std::string _recent;
283  MessageHandler _handler;
284  Message _m;
285  Message::Field _subId;
286  unsigned _requestedAckTypes;
287  bool _bookmark;
288  bool _paused;
289  bool _clearSubId;
290 
291  };//class SubscriptionInfo
292 
293  class Resubscriber
294  {
295  Client& _client;
296  int _timeout;
297  public:
298  Resubscriber(Client& client_, int timeout_)
299  : _client(client_), _timeout(timeout_) {}
300  void operator()(std::pair<Message::Field, SubscriptionInfo*> iter_)
301  {
302  iter_.second->resubscribe(_client, _timeout);
303  }
304  void operator()(SubscriptionInfo* iter_)
305  {
306  iter_->resubscribe(_client, _timeout);
307  }
308  };
309 
310  class Deleter
311  {
312  bool _clearSubId;
313  public:
314  Deleter(bool clearSubId_ = false) : _clearSubId(clearSubId_) {}
315  void operator()(std::pair<Message::Field, SubscriptionInfo*> iter_)
316  {
317  if (_clearSubId) iter_.first.clear();
318  else delete iter_.second;
319  }
320  void operator()(SubscriptionInfo* iter_)
321  {
322  delete iter_;
323  }
324  };
325 
326 public:
327  typedef std::map<Message::Field, SubscriptionInfo*, Message::Field::FieldHash> SubscriptionMap;
328 
330  : _resubscribing(0)
331  , _resubscriptionTimeout(getDefaultResubscriptionTimeout())
332  { ; }
333 
335  {
336  clear();
337  }
338 
345  void subscribe(MessageHandler messageHandler_,
346  const Message& message_, unsigned requestedAckTypes_)
347  {
348  const Message::Field& subId = message_.getSubscriptionId();
349  if(!subId.empty())
350  {
351  Lock<Mutex> l(_lock);
352  while(_resubscribing != 0) _lock.wait(10);
353  std::string options = message_.getOptions();
354  if (options.find("resume") != std::string::npos)
355  {
356  // For a resume, we store each sub id with a single Subscription
357  SubscriptionInfo* subInfo = new SubscriptionInfo(MessageHandler(),
358  message_,
359  requestedAckTypes_);
360  bool saved = false;
361  const char* start = subInfo->subId().data();
362  const char* end = subInfo->subId().data() + subInfo->subId().len();
363  while (start < end)
364  {
365  const char* comma = (const char*)memchr(start, ',',
366  (size_t)(end-start));
367  // No more commas found, just use start->end
368  if(!comma) comma = end;
369  if(comma == start)
370  {
371  start = comma + 1;
372  continue;
373  }
374  Message::Field sid = Message::Field(start,
375  (size_t)(comma-start));
376  // Calling resume on something already resumed is ignored,
377  // so don't update anything that exists.
378  if (_resumed.find(sid) == _resumed.end())
379  {
380  _resumed[sid.deepCopy()] = subInfo;
381  saved = true;
382  }
383  // Move past comma
384  start = comma + 1;
385  }
386  if (saved) _resumedSet.insert(subInfo);
387  else delete subInfo;
388  }
389  else if (options.find("pause") != std::string::npos)
390  {
391  const char* start = subId.data();
392  const char* end = subId.data() + subId.len();
393  while (start < end)
394  {
395  MessageHandler messageHandler = messageHandler_;
396  const char* comma = (const char*)memchr(start, ',',
397  (size_t)(end-start));
398  // No more commas found, just use start->end
399  if(!comma) comma = end;
400  if(comma == start)
401  {
402  start = comma + 1;
403  continue;
404  }
405  Message::Field sid = Message::Field(start,
406  (size_t)(comma-start));
407  SubscriptionMap::iterator resume = _resumed.find(sid);
408  if (resume != _resumed.end())
409  {
410  SubscriptionInfo* subPtr = resume->second;
411  Message::Field subField(resume->first);
412  _resumed.erase(resume); // Remove mapping for sid
413  subField.clear();
414  // If last subId, remove completely
415  if (subPtr->removeSubId(sid))
416  {
417  _resumedSet.erase(subPtr);
418  delete subPtr;
419  }
420  }
421  // Move past comma
422  start = comma + 1;
423  SubscriptionMap::iterator item = _active.find(sid);
424  if(item != _active.end())
425  {
426  if (options.find("replace") != std::string::npos)
427  {
428  messageHandler = item->second->messageHandler();
429  delete item->second;
430  _active.erase(item);
431  }
432  else
433  {
434  item->second->pause();
435  continue; // Leave current one
436  }
437  }
438  Message m = message_.deepCopy();
439  m.setSubscriptionId(sid.data(), sid.len());
440  SubscriptionInfo* s = new SubscriptionInfo(messageHandler, m,
441  requestedAckTypes_);
442  // Insert using the subId from s, which is deep copy of original
443  _active[s->subId()] = s;
444  }
445  }
446  else // Not a pause or resume
447  {
448  MessageHandler messageHandler = messageHandler_;
449  SubscriptionMap::iterator item = _active.find(subId);
450  if(item != _active.end())
451  {
452  messageHandler = item->second->messageHandler();
453  delete item->second;
454  _active.erase(item);
455  }
456  SubscriptionInfo* s = new SubscriptionInfo(messageHandler,
457  message_,
458  requestedAckTypes_);
459  // Insert using the subId from s, which is deep copy of original
460  _active[s->subId()] = s;
461  }
462  }
463  }
464 
468  void unsubscribe(const Message::Field& subId_)
469  {
470  Lock<Mutex> l(_lock);
471  SubscriptionMap::iterator item = _active.find(subId_);
472  if(item != _active.end())
473  {
474  SubscriptionInfo* subPtr = item->second;
475  _active.erase(item);
476  while(_resubscribing != 0) _lock.wait(10);
477  delete subPtr;
478  }
479  item = _resumed.find(subId_);
480  if(item != _resumed.end())
481  {
482  SubscriptionInfo* subPtr = item->second;
483  Message::Field subField(item->first);
484  _resumed.erase(item);
485  subField.clear();
486  // If last subId, remove completely
487  if (subPtr->removeSubId(subId_))
488  {
489  _resumedSet.erase(subPtr);
490  while(_resubscribing != 0) _lock.wait(10);
491  delete subPtr;
492  }
493  }
494  }
495 
498  void clear()
499  {
500  Lock<Mutex> l(_lock);
501  while(_resubscribing != 0) _lock.wait(10);
502  std::for_each(_active.begin(), _active.end(), Deleter());
503  std::for_each(_resumedSet.begin(), _resumedSet.end(), Deleter());
504  std::for_each(_resumed.begin(), _resumed.end(), Deleter(true));
505  _active.clear();
506  _resumed.clear();
507  _resumedSet.clear();
508  }
509 
513  void resubscribe(Client& client_)
514  {
515  std::list<SubscriptionInfo*> subscriptions;
516  try
517  {
518  AtomicFlagFlip resubFlip(&_resubscribing);
519  {
520  Lock<Mutex> l(_lock);
521  subscriptions.assign(_resumedSet.begin(), _resumedSet.end());
522  for (SubscriptionMap::iterator iter = _active.begin();
523  iter != _active.end(); ++iter)
524  {
525  SubscriptionInfo* sub = iter->second;
526  if (sub->paused())
527  {
528  SubscriptionMap::iterator resIter = _resumed.find(sub->subId());
529  // All pause subs resuming together should be sent with
530  // bookmark as list of the resumes' most recents
531  if (resIter != _resumed.end())
532  {
533  sub->setMostRecent(resIter->second->getMostRecent(client_));
534  }
535  }
536  subscriptions.push_front(iter->second);
537  }
538  }
539  Resubscriber resubscriber(client_, _resubscriptionTimeout);
540  std::for_each(subscriptions.begin(), subscriptions.end(),
541  resubscriber);
542  }
543  catch(const AMPSException& e)
544  {
545  throw e;
546  }
547  catch(const std::exception& e)
548  {
549  throw e;
550  }
551  }
552 
556  void setResubscriptionTimeout(int timeout_)
557  {
558  if (timeout_ >= 0)
559  {
560  _resubscriptionTimeout = timeout_;
561  }
562  }
563 
568  {
569  return _resubscriptionTimeout;
570  }
571 
576  static int setDefaultResubscriptionTimeout(int timeout_)
577  {
578  static int _defaultResubscriptionTimeout =
579  AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT;
580  if (timeout_ >= 0)
581  {
582  _defaultResubscriptionTimeout = timeout_;
583  }
584  return _defaultResubscriptionTimeout;
585  }
586 
592  {
594  }
595 
596 private:
597 
598  SubscriptionMap _active;
599  SubscriptionMap _resumed;
600  std::set<SubscriptionInfo*> _resumedSet;
601  Mutex _lock;
602  ATOMIC_TYPE_8 _resubscribing;
603  int _resubscriptionTimeout;
604 }; //class MemorySubscriptionManager
605 
606 } // namespace AMPS
607 
608 #endif //_MEMORYSUBSCRIPTIONMANAGER_H_
609 
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1051
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:430
int getResubscriptionTimeout(void)
Gets the timeout used when trying to resubscribe after disconnect.
Definition: MemorySubscriptionManager.hpp:567
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:393
static int getDefaultResubscriptionTimeout(void)
Gets the default timeout used by new MemorySubscriptionManager objects when trying to resubscribe aft...
Definition: MemorySubscriptionManager.hpp:591
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4422
void setResubscriptionTimeout(int timeout_)
Sets the timeout used when trying to resubscribe after disconnect.
Definition: MemorySubscriptionManager.hpp:556
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:196
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1426
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:206
The main class for interacting with AMPS.
Definition: ampsplusplus.hpp:4233
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1051
void resubscribe(Client &client_)
Place all saved subscriptions on the provided Client.
Definition: MemorySubscriptionManager.hpp:513
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:93
Defines the AMPS::Field class, which represents the value of a field in a message.
Core type, function, and class declarations for the AMPS C++ client.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:213
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:4526
void subscribe(MessageHandler messageHandler_, const Message &message_, unsigned requestedAckTypes_)
Save a subscription so it can be placed again if a disconnect occurs.
Definition: MemorySubscriptionManager.hpp:345
void clear()
Clear all subscriptions from the manager.
Definition: MemorySubscriptionManager.hpp:498
Field represents the value of a single field in a Message.
Definition: Field.hpp:52
void unsubscribe(const Message::Field &subId_)
Remove the subscription from the manager.
Definition: MemorySubscriptionManager.hpp:468
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:167
A SubscriptionManager implementation that maintains subscriptions placed in memory so that they can b...
Definition: MemorySubscriptionManager.hpp:47
Definition: ampsplusplus.hpp:136
static int setDefaultResubscriptionTimeout(int timeout_)
Sets the default timeout used by new MemorySubscriptionManager objects when trying to resubscribe aft...
Definition: MemorySubscriptionManager.hpp:576
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1064
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: ampsplusplus.hpp:909