AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.0
MessageRouter.hpp
1 //
3 // Copyright (c) 2010-2021 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef _MESSAGEROUTER_HPP_
26 #define _MESSAGEROUTER_HPP_
27 #include <map>
28 #include "ampscrc.hpp"
29 #include "util.hpp"
30 #include "Message.hpp"
31 
32 namespace AMPS
33 {
38 template <typename Func, typename Object>
39 class Handler
40 {
41 protected:
42  friend class MessageStream;
43  Func _func;
44  void* _userData;
45 #ifdef AMPS_USE_FUNCTIONAL
46  std::function<void(Object)> _callable;
47 #endif
48  bool _isValid;
49 
50 public:
51  // No op function for handlers
52  static void noOpHandler(Object) {;}
53 
54  typedef Func FunctionType;
57  Handler() : _func(NULL), _userData(NULL)
58 #ifdef AMPS_USE_FUNCTIONAL
59  , _callable(Handler<Func,Object>::noOpHandler)
60 #endif
61  , _isValid(false)
62  {
63  }
64 
70  Handler(Func func_, void* userData_)
71  : _func(func_), _userData(userData_)
72 #ifdef AMPS_USE_FUNCTIONAL
73  , _callable(noOpHandler)
74 #endif
75  , _isValid(true)
76  {
77  }
78 
81  Handler(const Handler& orig_)
82  : _func(orig_._func), _userData(orig_._userData)
83 #ifdef AMPS_USE_FUNCTIONAL
84  , _callable(orig_._callable)
85 #endif
86  , _isValid(true)
87  {
88  }
89 #ifdef AMPS_USE_FUNCTIONAL
90  template <typename T>
93  Handler(const T& callback_)
94  : _func(NULL), _userData(NULL), _callable(callback_), _isValid(true)
95  {
96  }
97 #endif
98  void invoke(Object message)
99  {
100  if (_func)
101  _func(message, _userData);
102 #ifdef AMPS_USE_FUNCTIONAL
103  else
104  _callable(message);
105 #endif
106  }
107 
108  Handler& operator=(const Handler& rhs_)
109  {
110  if (this != &rhs_)
111  {
112  _func = rhs_._func;
113  _userData = rhs_._userData;
114 #ifdef AMPS_USE_FUNCTIONAL
115  _callable = rhs_._callable;
116 #endif
117  _isValid = rhs_._isValid;
118  }
119  return *this;
120  }
121 
122  bool isValid(void) const { return _isValid; }
123  Func function(void) const { return _func; }
124  void* userData(void) const { return _userData; }
125 };
126 class Message;
129 typedef void(*MessageHandlerFunc)(const Message&, void* userData);
130 
132 
137 {
138 private:
139  MessageHandler _emptyMessageHandler;
140  typedef amps_uint64_t (*CRCFunction)(const char*, size_t, amps_uint64_t);
141  // Function used to calculate the CRC if one is used
142  CRCFunction _crc;
143 
144  class MessageRoute
145  {
146  MessageHandler _messageHandler;
147  unsigned _requestedAcks, _systemAcks, _terminationAck;
148  public:
149  MessageRoute() : _requestedAcks(0), _systemAcks(0), _terminationAck(0) {;}
150  MessageRoute(const MessageRoute& rhs_) :
151  _messageHandler(rhs_._messageHandler),
152  _requestedAcks (rhs_._requestedAcks),
153  _systemAcks (rhs_._systemAcks),
154  _terminationAck(rhs_._terminationAck)
155  {;}
156  const MessageRoute& operator=(const MessageRoute& rhs_)
157  {
158  _messageHandler = rhs_._messageHandler;
159  _requestedAcks = rhs_._requestedAcks;
160  _systemAcks = rhs_._systemAcks;
161  _terminationAck = rhs_._terminationAck;
162  return *this;
163  }
164  MessageRoute(MessageHandler messageHandler_, unsigned requestedAcks_,
165  unsigned systemAcks_, bool isSubscribe_) :
166  _messageHandler(messageHandler_),
167  _requestedAcks(requestedAcks_),
168  _systemAcks(systemAcks_),
169  _terminationAck(0)
170  {
171  if(!isSubscribe_)
172  {
173  // The ack to terminate the route on is whatever the highest
174  // bit set in requestedAcks is.
175  unsigned bitCounter = (requestedAcks_|systemAcks_)>>1;
176  _terminationAck = 1;
177  while(bitCounter > 0) { bitCounter>>=1; _terminationAck<<=1; }
178  }
179  }
180 
181  // Deliver an ack to registered handler if the ack type was requested
182  unsigned deliverAck(const Message& message_, unsigned ackType_)
183  {
184  if ( (_requestedAcks & ackType_) == 0) return 0;
185  try {
186  _messageHandler.invoke(message_);
187  } catch (std::exception &ex)
188  {
189  std::cerr << ex.what() << std::endl;
190  }
191  return 1;
192  }
193  bool isTerminationAck(unsigned ackType_) const
194  {
195  return ackType_ == _terminationAck;
196  }
197  unsigned deliverData(const Message& message_)
198  {
199  _messageHandler.invoke(message_);
200  return 1;
201  }
202  const MessageHandler& getMessageHandler() const
203  {
204  return _messageHandler;
205  }
206  MessageHandler& getMessageHandler()
207  {
208  return _messageHandler;
209  }
210  };
211 
212 public:
213  MessageRouter()
214  : _previousCommandId(0),
215  _lookupGenerationCount(0),
216  _generationCount(0)
217  {
218 #ifndef AMPS_SSE_42
219  _crc = AMPS::CRC<0>::crcNoSSE;
220 #else
221  if(AMPS::CRC<0>::isSSE42Enabled())
222  {
223  _crc = AMPS::CRC<0>::crc;
224  }
225  else
226  {
227  _crc = AMPS::CRC<0>::crcNoSSE;
228  }
229 #endif
230  }
231 
232  void addRoute(const Field& commandId_, const AMPS::MessageHandler& messageHandler_,
233  unsigned requestedAcks_, unsigned systemAcks_, bool isSubscribe_)
234  {
235  Lock<Mutex> lock(_lock);
236  RouteMap::iterator i = _routes.find(commandId_);
237  if (i == _routes.end()) {
238  _routes[commandId_.deepCopy()] = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, isSubscribe_);
239  }
240  else {
241  // Only replace a non-subscribe with a subscribe
242  if (isSubscribe_ && !i->second.isTerminationAck(0))
243  {
244  i->second = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, isSubscribe_);
245  }
246  }
247  }
248 
249  // returns true if a route was removed.
250  bool removeRoute(const Field& commandId_)
251  {
252  Lock<Mutex> lock(_lock);
253  RouteMap::iterator i = _routes.find(commandId_);
254  if (i == _routes.end()) return false;
255  return _removeRoute(i);
256  }
257 
258  void clear()
259  {
260  AMPS_FETCH_ADD(&_generationCount,1);
261  std::vector<void*> removeData;
262  {
263  Lock<Mutex> lock(_lock);
264  for (RouteMap::iterator i = _routes.begin(); i != _routes.end(); ++i)
265  {
266  // Make a non-const copy of Field and clear it, which will clear i
267  // as well but won't actually affect the map, which is unaware that
268  // the key's shared pointer has been deleted.
269  Field f = i->first;
270  void* data = i->second.getMessageHandler().userData();
271  removeData.push_back(data);
272  f.clear();
273  }
274  _routes.clear();
275  }
276  for (size_t i=0; i<removeData.size(); ++i)
277  {
278  amps_invoke_remove_route_function(removeData[i]);
279  }
280  }
281 
282  // Returns true if a route exists for a single id.
283  bool hasRoute(const Field& commandId_) const
284  {
285  Lock<Mutex> lock(_lock);
286  RouteMap::const_iterator it = _routes.find(commandId_);
287  return it != _routes.end();
288  }
289 
290  // Find a single route and return true if here, setting result_ to the handler.
291  bool getRoute(const Field& commandId_, MessageHandler& result_) const
292  {
293  Lock<Mutex> lock(_lock);
294  RouteMap::const_iterator it = _routes.find(commandId_);
295  if (it != _routes.end())
296  {
297  result_ = it->second.getMessageHandler();
298  return true;
299  }
300  else
301  {
302  result_ = _emptyMessageHandler;
303  return false;
304  }
305  }
306 
307  // RouteCache is the result type for a parseRoutes(); we do extra work
308  // to avoid hitting the map or its lock when the subids field on
309  // publish messages does not change.
310  struct RouteLookup
311  {
312  size_t idOffset;
313  size_t idLength;
314  MessageHandler handler;
315  };
316  class RouteCache : public std::vector<RouteLookup>
317  {
318  RouteCache(const RouteCache&);
319  void operator=(const RouteCache&);
320  public:
321  RouteCache(void)
322  : _generationCount(0),
323  _hashVal(0)
324  {;}
325 
326  void invalidateCache(void)
327  {
328  _generationCount = 0;
329  _hashVal = 0;
330  clear();
331  }
332  void invalidateCache(ATOMIC_TYPE generationCount_, amps_uint64_t hashVal_)
333  {
334  _generationCount = generationCount_;
335  _hashVal = hashVal_;
336  clear();
337  }
338 
339  bool isCacheHit(ATOMIC_TYPE generationCount_, amps_uint64_t hashVal_) const
340  {
341  return _generationCount == generationCount_ && _hashVal == hashVal_;
342  }
343 
344  private:
345  ATOMIC_TYPE _generationCount;
346  amps_uint64_t _hashVal;
347  };
348 
349  // Parses the command id list into the route lookup vector and assigns
350  // the found handlers into the list. Only intended to be called by the
351  // message handler thread. Returns the number of command/sub IDs parsed.
352  size_t parseRoutes(const Field& commandIdList_, RouteCache& result_)
353  {
354  // Super shortcut: if the whole subID list is the same as the previous one,
355  // then assume the result_ contains all the right handlers already, and that
356  // the offsets and lengths of subIds are unchanged.
357  amps_uint64_t listHash = _crc(commandIdList_.data(), commandIdList_.len(), 0);
358  if (result_.isCacheHit(_generationCount, listHash))
359  {
360  return result_.size();
361  }
362  result_.invalidateCache(_generationCount, listHash);
363 
364  // Lock required now that we'll be using the route map.
365  Lock<Mutex> lockGuard(_lock);
366  size_t resultCount = 0;
367  const char* pStart = commandIdList_.data();
368  for(const char* p = pStart, *e = commandIdList_.len() + pStart; p < e;
369  ++p, ++resultCount)
370  {
371  const char* delimiter = p;
372  while(delimiter != e && *delimiter != ',')
373  {
374  ++delimiter;
375  }
376  AMPS::Field subId(p, (size_t)(delimiter-p));
377  result_.push_back(RouteLookup());
378  // Push back and then copy over fields; would emplace_back if available on
379  // all supported compilers.
380  RouteLookup& result = result_[resultCount];
381  result.idOffset = (size_t)(p - pStart);
382  result.idLength = (size_t)(delimiter - p);
383 
384  RouteMap::const_iterator it = _routes.find(subId);
385  if (it != _routes.end())
386  {
387  result.handler = it->second.getMessageHandler();
388  }
389  else
390  {
391  result.handler = _emptyMessageHandler;
392  }
393  p = delimiter;
394  }
395  return resultCount;
396  }
397  unsigned deliverAck(const Message& ackMessage_, unsigned ackType_)
398  {
399  assert(ackMessage_.getCommand() == "ack");
400  unsigned messagesDelivered = 0;
401  Field key;
402 
403  // Call _deliverAck, which will deliver to any waiting handlers
404  // AND remove the route if it's a termination ack
405  if(key = ackMessage_.getCommandId(), !key.empty())
406  {
407  messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
408  }
409  if(key = ackMessage_.getQueryID(),
410  !key.empty() && messagesDelivered == 0)
411  {
412  messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
413  }
414  if(key = ackMessage_.getSubscriptionId(),
415  !key.empty() && messagesDelivered == 0)
416  {
417  messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
418  }
419  return messagesDelivered;
420  }
421 
422  // deliverData may only be called by the message handler thread.
423  unsigned deliverData(const Message& dataMessage_, const Field& commandId_)
424  {
425  unsigned messagesDelivered = 0;
426  amps_uint64_t hval = _crc(commandId_.data(), commandId_.len(), 0);
427  if(_previousCommandId == hval &&
428  _lookupGenerationCount == _generationCount)
429  {
430  messagesDelivered += _previousHandler.deliverData(dataMessage_);
431  }
432  else
433  {
434  Lock<Mutex> lock(_lock);
435  RouteMap::iterator it = _routes.find(commandId_);
436  if(it != _routes.end())
437  {
438  _previousCommandId = hval;
439  _lookupGenerationCount = _generationCount;
440  _previousHandler = it->second;
441  messagesDelivered += it->second.deliverData(dataMessage_);
442  }
443  }
444  return messagesDelivered;
445  }
446 
447  void invalidateCache(void)
448  {
449  _previousCommandId = 0;
450  }
451 
452  void unsubscribeAll(void)
453  {
454  AMPS_FETCH_ADD(&_generationCount,1);
455  std::vector<Field> removeIds;
456  std::vector<void*> removeData;
457  Lock<Mutex> lock(_lock);
458  for (RouteMap::iterator it = _routes.begin(); it != _routes.end(); ++it)
459  {
460  if (it->second.isTerminationAck(0))
461  {
462  removeIds.push_back(it->first);
463  removeData.push_back(it->second.getMessageHandler().userData());
464  }
465  }
466  for (size_t i=0; i<removeIds.size(); ++i)
467  {
468  // it can't be end() b/c we have the lock and found id above
469  RouteMap::iterator it = _routes.find(removeIds[i]);
470  // Make a non-const copy of Field and clear it, which will clear i as well
471  Field f = it->first; // -V783
472  f.clear();
473  _routes.erase(it);
474  }
475  Unlock<Mutex> u(_lock);
476  for (size_t i=0; i<removeData.size(); ++i)
477  {
478  amps_invoke_remove_route_function(removeData[i]);
479  }
480  }
481 
482 private:
483  typedef std::map<Field, MessageRoute> RouteMap;
484  RouteMap _routes;
485  mutable Mutex _lock;
486 
487  MessageRoute _previousHandler;
488  amps_uint64_t _previousCommandId;
489  mutable ATOMIC_TYPE _lookupGenerationCount;
490  mutable ATOMIC_TYPE _generationCount;
491 
492  // Deliver the ack to any waiting handlers
493  // AND remove the route if it's a termination ack
494  unsigned _deliverAck(const Message& ackMessage_, unsigned ackType_, Field& commandId_)
495  {
496  Lock<Mutex> lock(_lock);
497  unsigned messagesDelivered = 0;
498  RouteMap::iterator it = _routes.find(commandId_);
499  if(it!=_routes.end())
500  {
501  MessageRoute& route = it->second;
502  messagesDelivered += route.deliverAck(ackMessage_, ackType_);
503  if(route.isTerminationAck(ackType_))
504  {
505  _removeRoute(it);
506  ++messagesDelivered;
507  }
508  }
509  return messagesDelivered;
510  }
511  unsigned _processAckForRemoval(unsigned ackType_, Field& commandId_)
512  {
513  Lock<Mutex> lock(_lock);
514  RouteMap::iterator it = _routes.find(commandId_);
515  if(it!=_routes.end())
516  {
517  MessageRoute& route = it->second;
518  if(route.isTerminationAck(ackType_))
519  {
520  _removeRoute(it);
521  return 1U;
522  }
523  }
524  return 0U;
525  }
526 
527  // returns true if a route was removed.
528  bool _removeRoute(RouteMap::iterator& it_)
529  {
530  // Called with lock already held
531  AMPS_FETCH_ADD(&_generationCount,1);
532  // Make a non-const copy of Field and clear it, which will clear i as well
533  Field f = it_->first;
534  void* routeData = it_->second.getMessageHandler().userData();
535  _routes.erase(it_);
536  f.clear();
537  if (routeData)
538  {
539  Unlock<Mutex> u(_lock);
540  amps_invoke_remove_route_function(routeData);
541  }
542  return true;
543  }
544 
545 };
546 
547 
548 }
549 
550 #endif
Defines the AMPS::Message class and related classes.
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1107
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:447
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
Handler(const T &callback_)
Constructor for use with a standard c++ library function object.
Definition: MessageRouter.hpp:93
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1011
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1012
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:126
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:136
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
Handler(Func func_, void *userData_)
Constructor for use with a bare function pointer.
Definition: MessageRouter.hpp:70
Wrapper for callback functions in AMPS.
Definition: MessageRouter.hpp:39
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4136
Handler()
Null constructor – no function is wrapped.
Definition: MessageRouter.hpp:57
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1114
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
Handler(const Handler &orig_)
Copy constructor.
Definition: MessageRouter.hpp:81
Definition: ampsplusplus.hpp:103