AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.4
MessageRouter.hpp
1 //
3 // Copyright (c) 2010-2023 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef _MESSAGEROUTER_HPP_
26 #define _MESSAGEROUTER_HPP_
27 #include <map>
28 #include "ampscrc.hpp"
29 #include "util.hpp"
30 #include "Message.hpp"
31 
32 namespace AMPS
33 {
38  template <typename Func, typename Object>
39  class Handler
40  {
41  protected:
42  friend class MessageStream;
43  Func _func;
44  void* _userData;
45 #ifdef AMPS_USE_FUNCTIONAL
46  std::function<void(Object)> _callable;
47 #endif
48  bool _isValid;
49 
50  public:
51  // No op function for handlers
52  static void noOpHandler(Object) {;}
53 
54  typedef Func FunctionType;
57  Handler() : _func(NULL), _userData(NULL)
58 #ifdef AMPS_USE_FUNCTIONAL
59  , _callable(Handler<Func, Object>::noOpHandler)
60 #endif
61  , _isValid(false)
62  {
63  }
64 
70  Handler(Func func_, void* userData_)
71  : _func(func_), _userData(userData_)
72 #ifdef AMPS_USE_FUNCTIONAL
73  , _callable(noOpHandler)
74 #endif
75  , _isValid(true)
76  {
77  }
78 
81  Handler(const Handler& orig_)
82  : _func(orig_._func), _userData(orig_._userData)
83 #ifdef AMPS_USE_FUNCTIONAL
84  , _callable(orig_._callable)
85 #endif
86  , _isValid(true)
87  {
88  }
89 #ifdef AMPS_USE_FUNCTIONAL
90  template <typename T>
93  Handler(const T& callback_)
94  : _func(NULL), _userData(NULL), _callable(callback_), _isValid(true)
95  {
96  }
97 #endif
98  void invoke(Object message)
99  {
100  if (_func)
101  {
102  _func(message, _userData);
103  }
104 #ifdef AMPS_USE_FUNCTIONAL
105  else
106  {
107  _callable(message);
108  }
109 #endif
110  }
111 
112  Handler& operator=(const Handler& rhs_)
113  {
114  if (this != &rhs_)
115  {
116  _func = rhs_._func;
117  _userData = rhs_._userData;
118 #ifdef AMPS_USE_FUNCTIONAL
119  _callable = rhs_._callable;
120 #endif
121  _isValid = rhs_._isValid;
122  }
123  return *this;
124  }
125 
126  bool isValid(void) const
127  {
128  return _isValid;
129  }
130  Func function(void) const
131  {
132  return _func;
133  }
134  void* userData(void) const
135  {
136  return _userData;
137  }
138  };
139  class Message;
142  typedef void(*MessageHandlerFunc)(const Message&, void* userData);
143 
145 
150  {
151  private:
152  MessageHandler _emptyMessageHandler;
153  typedef amps_uint64_t (*CRCFunction)(const char*, size_t, amps_uint64_t);
154  // Function used to calculate the CRC if one is used
155  CRCFunction _crc;
156 
157  class MessageRoute
158  {
159  MessageHandler _messageHandler;
160  unsigned _requestedAcks, _systemAcks, _terminationAck;
161  public:
162  MessageRoute() : _requestedAcks(0), _systemAcks(0), _terminationAck(0) {;}
163  MessageRoute(const MessageRoute& rhs_) :
164  _messageHandler(rhs_._messageHandler),
165  _requestedAcks (rhs_._requestedAcks),
166  _systemAcks (rhs_._systemAcks),
167  _terminationAck(rhs_._terminationAck)
168  {;}
169  const MessageRoute& operator=(const MessageRoute& rhs_)
170  {
171  _messageHandler = rhs_._messageHandler;
172  _requestedAcks = rhs_._requestedAcks;
173  _systemAcks = rhs_._systemAcks;
174  _terminationAck = rhs_._terminationAck;
175  return *this;
176  }
177  MessageRoute(MessageHandler messageHandler_, unsigned requestedAcks_,
178  unsigned systemAcks_, bool isSubscribe_) :
179  _messageHandler(messageHandler_),
180  _requestedAcks(requestedAcks_),
181  _systemAcks(systemAcks_),
182  _terminationAck(0)
183  {
184  if (!isSubscribe_)
185  {
186  // The ack to terminate the route on is whatever the highest
187  // bit set in requestedAcks is.
188  unsigned bitCounter = (requestedAcks_ | systemAcks_) >> 1;
189  _terminationAck = 1;
190  while (bitCounter > 0)
191  {
192  bitCounter >>= 1;
193  _terminationAck <<= 1;
194  }
195  }
196  }
197 
198  // Deliver an ack to registered handler if the ack type was requested
199  unsigned deliverAck(const Message& message_, unsigned ackType_)
200  {
201  if ( (_requestedAcks & ackType_) == 0)
202  {
203  return 0;
204  }
205  try
206  {
207  _messageHandler.invoke(message_);
208  }
209  catch (std::exception& ex)
210  {
211  std::cerr << ex.what() << std::endl;
212  }
213  return 1;
214  }
215  bool isTerminationAck(unsigned ackType_) const
216  {
217  return ackType_ == _terminationAck;
218  }
219  unsigned deliverData(const Message& message_)
220  {
221  _messageHandler.invoke(message_);
222  return 1;
223  }
224  const MessageHandler& getMessageHandler() const
225  {
226  return _messageHandler;
227  }
228  MessageHandler& getMessageHandler()
229  {
230  return _messageHandler;
231  }
232  };
233 
234  public:
235  MessageRouter()
236  : _previousCommandId(0),
237  _lookupGenerationCount(0),
238  _generationCount(0)
239  {
240 #ifndef AMPS_SSE_42
241  _crc = AMPS::CRC<0>::crcNoSSE;
242 #else
243  if (AMPS::CRC<0>::isSSE42Enabled())
244  {
245  _crc = AMPS::CRC<0>::crc;
246  }
247  else
248  {
249  _crc = AMPS::CRC<0>::crcNoSSE;
250  }
251 #endif
252  }
253 
254  int addRoute(const Field& commandId_, const AMPS::MessageHandler& messageHandler_,
255  unsigned requestedAcks_, unsigned systemAcks_, bool isSubscribe_)
256  {
257  Lock<Mutex> lock(_lock);
258  RouteMap::iterator i = _routes.find(commandId_);
259  if (i == _routes.end())
260  {
261  _routes[commandId_.deepCopy()] = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, isSubscribe_);
262  return 1;
263  }
264  else
265  {
266  // Only replace a non-subscribe with a subscribe
267  if (isSubscribe_ && !i->second.isTerminationAck(0))
268  {
269  void* routeData = i->second.getMessageHandler().userData();;
270  i->second = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, isSubscribe_);
271  if (routeData)
272  {
273  Unlock<Mutex> u(_lock);
274  amps_invoke_remove_route_function(routeData);
275  }
276  return 1;
277  }
278  }
279  return 0;
280  }
281 
282  // returns true if a route was removed.
283  bool removeRoute(const Field& commandId_)
284  {
285  Lock<Mutex> lock(_lock);
286  RouteMap::iterator i = _routes.find(commandId_);
287  if (i == _routes.end())
288  {
289  return false;
290  }
291  return _removeRoute(i);
292  }
293 
294  void clear()
295  {
296  AMPS_FETCH_ADD(&_generationCount, 1);
297  std::vector<void*> removeData;
298  {
299  Lock<Mutex> lock(_lock);
300  for (RouteMap::iterator i = _routes.begin(); i != _routes.end(); ++i)
301  {
302  // Make a non-const copy of Field and clear it, which will clear i
303  // as well but won't actually affect the map, which is unaware that
304  // the key's shared pointer has been deleted.
305  Field f = i->first;
306  void* data = i->second.getMessageHandler().userData();
307  removeData.push_back(data);
308  f.clear();
309  }
310  _routes.clear();
311  }
312  for (size_t i = 0; i < removeData.size(); ++i)
313  {
314  amps_invoke_remove_route_function(removeData[i]);
315  }
316  }
317 
318  // Returns true if a route exists for a single id.
319  bool hasRoute(const Field& commandId_) const
320  {
321  Lock<Mutex> lock(_lock);
322  RouteMap::const_iterator it = _routes.find(commandId_);
323  return it != _routes.end();
324  }
325 
326  // Find a single route and return true if here, setting result_ to the handler.
327  bool getRoute(const Field& commandId_, MessageHandler& result_) const
328  {
329  Lock<Mutex> lock(_lock);
330  RouteMap::const_iterator it = _routes.find(commandId_);
331  if (it != _routes.end())
332  {
333  result_ = it->second.getMessageHandler();
334  return true;
335  }
336  else
337  {
338  result_ = _emptyMessageHandler;
339  return false;
340  }
341  }
342 
343  // RouteCache is the result type for a parseRoutes(); we do extra work
344  // to avoid hitting the map or its lock when the subids field on
345  // publish messages does not change.
346  struct RouteLookup
347  {
348  size_t idOffset;
349  size_t idLength;
350  MessageHandler handler;
351  };
352  class RouteCache : public std::vector<RouteLookup>
353  {
354  RouteCache(const RouteCache&);
355  void operator=(const RouteCache&);
356  public:
357  RouteCache(void)
358  : _generationCount(0),
359  _hashVal(0)
360  {;}
361 
362  void invalidateCache(void)
363  {
364  _generationCount = 0;
365  _hashVal = 0;
366  clear();
367  }
368  void invalidateCache(ATOMIC_TYPE generationCount_, amps_uint64_t hashVal_)
369  {
370  _generationCount = generationCount_;
371  _hashVal = hashVal_;
372  clear();
373  }
374 
375  bool isCacheHit(ATOMIC_TYPE generationCount_, amps_uint64_t hashVal_) const
376  {
377  return _generationCount == generationCount_ && _hashVal == hashVal_;
378  }
379 
380  private:
381  ATOMIC_TYPE _generationCount;
382  amps_uint64_t _hashVal;
383  };
384 
385  // Parses the command id list into the route lookup vector and assigns
386  // the found handlers into the list. Only intended to be called by the
387  // message handler thread. Returns the number of command/sub IDs parsed.
388  size_t parseRoutes(const Field& commandIdList_, RouteCache& result_)
389  {
390  // Super shortcut: if the whole subID list is the same as the previous one,
391  // then assume the result_ contains all the right handlers already, and that
392  // the offsets and lengths of subIds are unchanged.
393  amps_uint64_t listHash = _crc(commandIdList_.data(), commandIdList_.len(), 0);
394  if (result_.isCacheHit(_generationCount, listHash))
395  {
396  return result_.size();
397  }
398  result_.invalidateCache(_generationCount, listHash);
399 
400  // Lock required now that we'll be using the route map.
401  Lock<Mutex> lockGuard(_lock);
402  size_t resultCount = 0;
403  const char* pStart = commandIdList_.data();
404  for (const char* p = pStart, *e = commandIdList_.len() + pStart; p < e;
405  ++p, ++resultCount)
406  {
407  const char* delimiter = p;
408  while (delimiter != e && *delimiter != ',')
409  {
410  ++delimiter;
411  }
412  AMPS::Field subId(p, (size_t)(delimiter - p));
413 #ifdef AMPS_USE_EMPLACE
414  result_.emplace_back(RouteLookup());
415 #else
416  result_.push_back(RouteLookup());
417 #endif
418  // Push back and then copy over fields; would emplace_back if available on
419  // all supported compilers.
420  RouteLookup& result = result_[resultCount];
421  result.idOffset = (size_t)(p - pStart);
422  result.idLength = (size_t)(delimiter - p);
423 
424  RouteMap::const_iterator it = _routes.find(subId);
425  if (it != _routes.end())
426  {
427  result.handler = it->second.getMessageHandler();
428  }
429  else
430  {
431  result.handler = _emptyMessageHandler;
432  }
433  p = delimiter;
434  }
435  return resultCount;
436  }
437  unsigned deliverAck(const Message& ackMessage_, unsigned ackType_)
438  {
439  assert(ackMessage_.getCommand() == "ack");
440  unsigned messagesDelivered = 0;
441  Field key;
442 
443  // Call _deliverAck, which will deliver to any waiting handlers
444  // AND remove the route if it's a termination ack
445  if (key = ackMessage_.getCommandId(), !key.empty())
446  {
447  messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
448  }
449  if (key = ackMessage_.getQueryID(),
450  !key.empty() && messagesDelivered == 0)
451  {
452  messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
453  }
454  if (key = ackMessage_.getSubscriptionId(),
455  !key.empty() && messagesDelivered == 0)
456  {
457  messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
458  }
459  return messagesDelivered;
460  }
461 
462  // deliverData may only be called by the message handler thread.
463  unsigned deliverData(const Message& dataMessage_, const Field& commandId_)
464  {
465  unsigned messagesDelivered = 0;
466  amps_uint64_t hval = _crc(commandId_.data(), commandId_.len(), 0);
467  if (_previousCommandId == hval &&
468  _lookupGenerationCount == _generationCount)
469  {
470  messagesDelivered += _previousHandler.deliverData(dataMessage_);
471  }
472  else
473  {
474  Lock<Mutex> lock(_lock);
475  RouteMap::iterator it = _routes.find(commandId_);
476  if (it != _routes.end())
477  {
478  _previousCommandId = hval;
479  _lookupGenerationCount = _generationCount;
480  _previousHandler = it->second;
481  messagesDelivered += it->second.deliverData(dataMessage_);
482  }
483  }
484  return messagesDelivered;
485  }
486 
487  void invalidateCache(void)
488  {
489  _previousCommandId = 0;
490  }
491 
492  void unsubscribeAll(void)
493  {
494  AMPS_FETCH_ADD(&_generationCount, 1);
495  std::vector<Field> removeIds;
496  std::vector<void*> removeData;
497  Lock<Mutex> lock(_lock);
498  for (RouteMap::iterator it = _routes.begin(); it != _routes.end(); ++it)
499  {
500  if (it->second.isTerminationAck(0))
501  {
502  removeIds.push_back(it->first);
503  removeData.push_back(it->second.getMessageHandler().userData());
504  }
505  }
506  for (size_t i = 0; i < removeIds.size(); ++i)
507  {
508  // it can't be end() b/c we have the lock and found id above
509  RouteMap::iterator it = _routes.find(removeIds[i]);
510  // Make a non-const copy of Field and clear it, which will clear i as well
511  Field f = it->first; // -V783
512  f.clear();
513  _routes.erase(it);
514  }
515  Unlock<Mutex> u(_lock);
516  for (size_t i = 0; i < removeData.size(); ++i)
517  {
518  amps_invoke_remove_route_function(removeData[i]);
519  }
520  }
521 
522  private:
523  typedef std::map<Field, MessageRoute> RouteMap;
524  RouteMap _routes;
525  mutable Mutex _lock;
526 
527  MessageRoute _previousHandler;
528  amps_uint64_t _previousCommandId;
529  mutable ATOMIC_TYPE _lookupGenerationCount;
530  mutable ATOMIC_TYPE _generationCount;
531 
532  // Deliver the ack to any waiting handlers
533  // AND remove the route if it's a termination ack
534  unsigned _deliverAck(const Message& ackMessage_, unsigned ackType_, Field& commandId_)
535  {
536  Lock<Mutex> lock(_lock);
537  unsigned messagesDelivered = 0;
538  RouteMap::iterator it = _routes.find(commandId_);
539  if (it != _routes.end())
540  {
541  MessageRoute& route = it->second;
542  messagesDelivered += route.deliverAck(ackMessage_, ackType_);
543  if (route.isTerminationAck(ackType_))
544  {
545  _removeRoute(it);
546  ++messagesDelivered;
547  }
548  }
549  return messagesDelivered;
550  }
551  unsigned _processAckForRemoval(unsigned ackType_, Field& commandId_)
552  {
553  Lock<Mutex> lock(_lock);
554  RouteMap::iterator it = _routes.find(commandId_);
555  if (it != _routes.end())
556  {
557  MessageRoute& route = it->second;
558  if (route.isTerminationAck(ackType_))
559  {
560  _removeRoute(it);
561  return 1U;
562  }
563  }
564  return 0U;
565  }
566 
567  // returns true if a route was removed.
568  bool _removeRoute(RouteMap::iterator& it_)
569  {
570  // Called with lock already held
571  AMPS_FETCH_ADD(&_generationCount, 1);
572  // Make a non-const copy of Field and clear it, which will clear i as well
573  Field f = it_->first;
574  void* routeData = it_->second.getMessageHandler().userData();
575  _routes.erase(it_);
576  f.clear();
577  if (routeData)
578  {
579  Unlock<Mutex> u(_lock);
580  amps_invoke_remove_route_function(routeData);
581  }
582  return true;
583  }
584 
585  };
586 
587 
588 }
589 
590 #endif
Defines the AMPS::Message class and related classes.
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1373
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:511
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:246
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:259
Handler(const T &callback_)
Constructor for use with a standard c++ library function object.
Definition: MessageRouter.hpp:93
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1141
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1248
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:127
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:266
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:149
Field represents the value of a single field in a Message.
Definition: Field.hpp:85
Handler(Func func_, void *userData_)
Constructor for use with a bare function pointer.
Definition: MessageRouter.hpp:70
Wrapper for callback functions in AMPS.
Definition: MessageRouter.hpp:39
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4682
Handler()
Null constructor – no function is wrapped.
Definition: MessageRouter.hpp:57
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1363
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:217
Handler(const Handler &orig_)
Copy constructor.
Definition: MessageRouter.hpp:81
Definition: ampsplusplus.hpp:103