AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.1
MessageRouter.hpp
1 //
3 // Copyright (c) 2010-2021 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef _MESSAGEROUTER_HPP_
26 #define _MESSAGEROUTER_HPP_
27 #include <map>
28 #include "ampscrc.hpp"
29 #include "util.hpp"
30 #include "Message.hpp"
31 
32 namespace AMPS
33 {
38 template <typename Func, typename Object>
39 class Handler
40 {
41 protected:
42  friend class MessageStream;
43  Func _func;
44  void* _userData;
45 #ifdef AMPS_USE_FUNCTIONAL
46  std::function<void(Object)> _callable;
47 #endif
48  bool _isValid;
49 
50 public:
51  // No op function for handlers
52  static void noOpHandler(Object) {;}
53 
54  typedef Func FunctionType;
57  Handler() : _func(NULL), _userData(NULL)
58 #ifdef AMPS_USE_FUNCTIONAL
59  , _callable(Handler<Func,Object>::noOpHandler)
60 #endif
61  , _isValid(false)
62  {
63  }
64 
70  Handler(Func func_, void* userData_)
71  : _func(func_), _userData(userData_)
72 #ifdef AMPS_USE_FUNCTIONAL
73  , _callable(noOpHandler)
74 #endif
75  , _isValid(true)
76  {
77  }
78 
81  Handler(const Handler& orig_)
82  : _func(orig_._func), _userData(orig_._userData)
83 #ifdef AMPS_USE_FUNCTIONAL
84  , _callable(orig_._callable)
85 #endif
86  , _isValid(true)
87  {
88  }
89 #ifdef AMPS_USE_FUNCTIONAL
90  template <typename T>
93  Handler(const T& callback_)
94  : _func(NULL), _userData(NULL), _callable(callback_), _isValid(true)
95  {
96  }
97 #endif
98  void invoke(Object message)
99  {
100  if (_func)
101  _func(message, _userData);
102 #ifdef AMPS_USE_FUNCTIONAL
103  else
104  _callable(message);
105 #endif
106  }
107 
108  Handler& operator=(const Handler& rhs_)
109  {
110  if (this != &rhs_)
111  {
112  _func = rhs_._func;
113  _userData = rhs_._userData;
114 #ifdef AMPS_USE_FUNCTIONAL
115  _callable = rhs_._callable;
116 #endif
117  _isValid = rhs_._isValid;
118  }
119  return *this;
120  }
121 
122  bool isValid(void) const { return _isValid; }
123  Func function(void) const { return _func; }
124  void* userData(void) const { return _userData; }
125 };
126 class Message;
129 typedef void(*MessageHandlerFunc)(const Message&, void* userData);
130 
132 
137 {
138 private:
139  MessageHandler _emptyMessageHandler;
140  typedef amps_uint64_t (*CRCFunction)(const char*, size_t, amps_uint64_t);
141  // Function used to calculate the CRC if one is used
142  CRCFunction _crc;
143 
144  class MessageRoute
145  {
146  MessageHandler _messageHandler;
147  unsigned _requestedAcks, _systemAcks, _terminationAck;
148  public:
149  MessageRoute() : _requestedAcks(0), _systemAcks(0), _terminationAck(0) {;}
150  MessageRoute(const MessageRoute& rhs_) :
151  _messageHandler(rhs_._messageHandler),
152  _requestedAcks (rhs_._requestedAcks),
153  _systemAcks (rhs_._systemAcks),
154  _terminationAck(rhs_._terminationAck)
155  {;}
156  const MessageRoute& operator=(const MessageRoute& rhs_)
157  {
158  _messageHandler = rhs_._messageHandler;
159  _requestedAcks = rhs_._requestedAcks;
160  _systemAcks = rhs_._systemAcks;
161  _terminationAck = rhs_._terminationAck;
162  return *this;
163  }
164  MessageRoute(MessageHandler messageHandler_, unsigned requestedAcks_,
165  unsigned systemAcks_, bool isSubscribe_) :
166  _messageHandler(messageHandler_),
167  _requestedAcks(requestedAcks_),
168  _systemAcks(systemAcks_),
169  _terminationAck(0)
170  {
171  if(!isSubscribe_)
172  {
173  // The ack to terminate the route on is whatever the highest
174  // bit set in requestedAcks is.
175  unsigned bitCounter = (requestedAcks_|systemAcks_)>>1;
176  _terminationAck = 1;
177  while(bitCounter > 0) { bitCounter>>=1; _terminationAck<<=1; }
178  }
179  }
180 
181  // Deliver an ack to registered handler if the ack type was requested
182  unsigned deliverAck(const Message& message_, unsigned ackType_)
183  {
184  if ( (_requestedAcks & ackType_) == 0) return 0;
185  try {
186  _messageHandler.invoke(message_);
187  } catch (std::exception &ex)
188  {
189  std::cerr << ex.what() << std::endl;
190  }
191  return 1;
192  }
193  bool isTerminationAck(unsigned ackType_) const
194  {
195  return ackType_ == _terminationAck;
196  }
197  unsigned deliverData(const Message& message_)
198  {
199  _messageHandler.invoke(message_);
200  return 1;
201  }
202  const MessageHandler& getMessageHandler() const
203  {
204  return _messageHandler;
205  }
206  MessageHandler& getMessageHandler()
207  {
208  return _messageHandler;
209  }
210  };
211 
212 public:
213  MessageRouter()
214  : _previousCommandId(0),
215  _lookupGenerationCount(0),
216  _generationCount(0)
217  {
218 #ifndef AMPS_SSE_42
219  _crc = AMPS::CRC<0>::crcNoSSE;
220 #else
221  if(AMPS::CRC<0>::isSSE42Enabled())
222  {
223  _crc = AMPS::CRC<0>::crc;
224  }
225  else
226  {
227  _crc = AMPS::CRC<0>::crcNoSSE;
228  }
229 #endif
230  }
231 
232  int addRoute(const Field& commandId_, const AMPS::MessageHandler& messageHandler_,
233  unsigned requestedAcks_, unsigned systemAcks_, bool isSubscribe_)
234  {
235  Lock<Mutex> lock(_lock);
236  RouteMap::iterator i = _routes.find(commandId_);
237  if (i == _routes.end()) {
238  _routes[commandId_.deepCopy()] = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, isSubscribe_);
239  return 1;
240  }
241  else {
242  // Only replace a non-subscribe with a subscribe
243  if (isSubscribe_ && !i->second.isTerminationAck(0))
244  {
245  void *routeData = i->second.getMessageHandler().userData();;
246  i->second = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, isSubscribe_);
247  if (routeData)
248  {
249  Unlock<Mutex> u(_lock);
250  amps_invoke_remove_route_function(routeData);
251  }
252  return 1;
253  }
254  }
255  return 0;
256  }
257 
258  // returns true if a route was removed.
259  bool removeRoute(const Field& commandId_)
260  {
261  Lock<Mutex> lock(_lock);
262  RouteMap::iterator i = _routes.find(commandId_);
263  if (i == _routes.end()) return false;
264  return _removeRoute(i);
265  }
266 
267  void clear()
268  {
269  AMPS_FETCH_ADD(&_generationCount,1);
270  std::vector<void*> removeData;
271  {
272  Lock<Mutex> lock(_lock);
273  for (RouteMap::iterator i = _routes.begin(); i != _routes.end(); ++i)
274  {
275  // Make a non-const copy of Field and clear it, which will clear i
276  // as well but won't actually affect the map, which is unaware that
277  // the key's shared pointer has been deleted.
278  Field f = i->first;
279  void* data = i->second.getMessageHandler().userData();
280  removeData.push_back(data);
281  f.clear();
282  }
283  _routes.clear();
284  }
285  for (size_t i=0; i<removeData.size(); ++i)
286  {
287  amps_invoke_remove_route_function(removeData[i]);
288  }
289  }
290 
291  // Returns true if a route exists for a single id.
292  bool hasRoute(const Field& commandId_) const
293  {
294  Lock<Mutex> lock(_lock);
295  RouteMap::const_iterator it = _routes.find(commandId_);
296  return it != _routes.end();
297  }
298 
299  // Find a single route and return true if here, setting result_ to the handler.
300  bool getRoute(const Field& commandId_, MessageHandler& result_) const
301  {
302  Lock<Mutex> lock(_lock);
303  RouteMap::const_iterator it = _routes.find(commandId_);
304  if (it != _routes.end())
305  {
306  result_ = it->second.getMessageHandler();
307  return true;
308  }
309  else
310  {
311  result_ = _emptyMessageHandler;
312  return false;
313  }
314  }
315 
316  // RouteCache is the result type for a parseRoutes(); we do extra work
317  // to avoid hitting the map or its lock when the subids field on
318  // publish messages does not change.
319  struct RouteLookup
320  {
321  size_t idOffset;
322  size_t idLength;
323  MessageHandler handler;
324  };
325  class RouteCache : public std::vector<RouteLookup>
326  {
327  RouteCache(const RouteCache&);
328  void operator=(const RouteCache&);
329  public:
330  RouteCache(void)
331  : _generationCount(0),
332  _hashVal(0)
333  {;}
334 
335  void invalidateCache(void)
336  {
337  _generationCount = 0;
338  _hashVal = 0;
339  clear();
340  }
341  void invalidateCache(ATOMIC_TYPE generationCount_, amps_uint64_t hashVal_)
342  {
343  _generationCount = generationCount_;
344  _hashVal = hashVal_;
345  clear();
346  }
347 
348  bool isCacheHit(ATOMIC_TYPE generationCount_, amps_uint64_t hashVal_) const
349  {
350  return _generationCount == generationCount_ && _hashVal == hashVal_;
351  }
352 
353  private:
354  ATOMIC_TYPE _generationCount;
355  amps_uint64_t _hashVal;
356  };
357 
358  // Parses the command id list into the route lookup vector and assigns
359  // the found handlers into the list. Only intended to be called by the
360  // message handler thread. Returns the number of command/sub IDs parsed.
361  size_t parseRoutes(const Field& commandIdList_, RouteCache& result_)
362  {
363  // Super shortcut: if the whole subID list is the same as the previous one,
364  // then assume the result_ contains all the right handlers already, and that
365  // the offsets and lengths of subIds are unchanged.
366  amps_uint64_t listHash = _crc(commandIdList_.data(), commandIdList_.len(), 0);
367  if (result_.isCacheHit(_generationCount, listHash))
368  {
369  return result_.size();
370  }
371  result_.invalidateCache(_generationCount, listHash);
372 
373  // Lock required now that we'll be using the route map.
374  Lock<Mutex> lockGuard(_lock);
375  size_t resultCount = 0;
376  const char* pStart = commandIdList_.data();
377  for(const char* p = pStart, *e = commandIdList_.len() + pStart; p < e;
378  ++p, ++resultCount)
379  {
380  const char* delimiter = p;
381  while(delimiter != e && *delimiter != ',')
382  {
383  ++delimiter;
384  }
385  AMPS::Field subId(p, (size_t)(delimiter-p));
386  result_.push_back(RouteLookup());
387  // Push back and then copy over fields; would emplace_back if available on
388  // all supported compilers.
389  RouteLookup& result = result_[resultCount];
390  result.idOffset = (size_t)(p - pStart);
391  result.idLength = (size_t)(delimiter - p);
392 
393  RouteMap::const_iterator it = _routes.find(subId);
394  if (it != _routes.end())
395  {
396  result.handler = it->second.getMessageHandler();
397  }
398  else
399  {
400  result.handler = _emptyMessageHandler;
401  }
402  p = delimiter;
403  }
404  return resultCount;
405  }
406  unsigned deliverAck(const Message& ackMessage_, unsigned ackType_)
407  {
408  assert(ackMessage_.getCommand() == "ack");
409  unsigned messagesDelivered = 0;
410  Field key;
411 
412  // Call _deliverAck, which will deliver to any waiting handlers
413  // AND remove the route if it's a termination ack
414  if(key = ackMessage_.getCommandId(), !key.empty())
415  {
416  messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
417  }
418  if(key = ackMessage_.getQueryID(),
419  !key.empty() && messagesDelivered == 0)
420  {
421  messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
422  }
423  if(key = ackMessage_.getSubscriptionId(),
424  !key.empty() && messagesDelivered == 0)
425  {
426  messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
427  }
428  return messagesDelivered;
429  }
430 
431  // deliverData may only be called by the message handler thread.
432  unsigned deliverData(const Message& dataMessage_, const Field& commandId_)
433  {
434  unsigned messagesDelivered = 0;
435  amps_uint64_t hval = _crc(commandId_.data(), commandId_.len(), 0);
436  if(_previousCommandId == hval &&
437  _lookupGenerationCount == _generationCount)
438  {
439  messagesDelivered += _previousHandler.deliverData(dataMessage_);
440  }
441  else
442  {
443  Lock<Mutex> lock(_lock);
444  RouteMap::iterator it = _routes.find(commandId_);
445  if(it != _routes.end())
446  {
447  _previousCommandId = hval;
448  _lookupGenerationCount = _generationCount;
449  _previousHandler = it->second;
450  messagesDelivered += it->second.deliverData(dataMessage_);
451  }
452  }
453  return messagesDelivered;
454  }
455 
456  void invalidateCache(void)
457  {
458  _previousCommandId = 0;
459  }
460 
461  void unsubscribeAll(void)
462  {
463  AMPS_FETCH_ADD(&_generationCount,1);
464  std::vector<Field> removeIds;
465  std::vector<void*> removeData;
466  Lock<Mutex> lock(_lock);
467  for (RouteMap::iterator it = _routes.begin(); it != _routes.end(); ++it)
468  {
469  if (it->second.isTerminationAck(0))
470  {
471  removeIds.push_back(it->first);
472  removeData.push_back(it->second.getMessageHandler().userData());
473  }
474  }
475  for (size_t i=0; i<removeIds.size(); ++i)
476  {
477  // it can't be end() b/c we have the lock and found id above
478  RouteMap::iterator it = _routes.find(removeIds[i]);
479  // Make a non-const copy of Field and clear it, which will clear i as well
480  Field f = it->first; // -V783
481  f.clear();
482  _routes.erase(it);
483  }
484  Unlock<Mutex> u(_lock);
485  for (size_t i=0; i<removeData.size(); ++i)
486  {
487  amps_invoke_remove_route_function(removeData[i]);
488  }
489  }
490 
491 private:
492  typedef std::map<Field, MessageRoute> RouteMap;
493  RouteMap _routes;
494  mutable Mutex _lock;
495 
496  MessageRoute _previousHandler;
497  amps_uint64_t _previousCommandId;
498  mutable ATOMIC_TYPE _lookupGenerationCount;
499  mutable ATOMIC_TYPE _generationCount;
500 
501  // Deliver the ack to any waiting handlers
502  // AND remove the route if it's a termination ack
503  unsigned _deliverAck(const Message& ackMessage_, unsigned ackType_, Field& commandId_)
504  {
505  Lock<Mutex> lock(_lock);
506  unsigned messagesDelivered = 0;
507  RouteMap::iterator it = _routes.find(commandId_);
508  if(it!=_routes.end())
509  {
510  MessageRoute& route = it->second;
511  messagesDelivered += route.deliverAck(ackMessage_, ackType_);
512  if(route.isTerminationAck(ackType_))
513  {
514  _removeRoute(it);
515  ++messagesDelivered;
516  }
517  }
518  return messagesDelivered;
519  }
520  unsigned _processAckForRemoval(unsigned ackType_, Field& commandId_)
521  {
522  Lock<Mutex> lock(_lock);
523  RouteMap::iterator it = _routes.find(commandId_);
524  if(it!=_routes.end())
525  {
526  MessageRoute& route = it->second;
527  if(route.isTerminationAck(ackType_))
528  {
529  _removeRoute(it);
530  return 1U;
531  }
532  }
533  return 0U;
534  }
535 
536  // returns true if a route was removed.
537  bool _removeRoute(RouteMap::iterator& it_)
538  {
539  // Called with lock already held
540  AMPS_FETCH_ADD(&_generationCount,1);
541  // Make a non-const copy of Field and clear it, which will clear i as well
542  Field f = it_->first;
543  void* routeData = it_->second.getMessageHandler().userData();
544  _routes.erase(it_);
545  f.clear();
546  if (routeData)
547  {
548  Unlock<Mutex> u(_lock);
549  amps_invoke_remove_route_function(routeData);
550  }
551  return true;
552  }
553 
554 };
555 
556 
557 }
558 
559 #endif
Defines the AMPS::Message class and related classes.
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1235
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:501
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
Handler(const T &callback_)
Constructor for use with a standard c++ library function object.
Definition: MessageRouter.hpp:93
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1035
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1138
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:126
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:136
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
Handler(Func func_, void *userData_)
Constructor for use with a bare function pointer.
Definition: MessageRouter.hpp:70
Wrapper for callback functions in AMPS.
Definition: MessageRouter.hpp:39
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4132
Handler()
Null constructor – no function is wrapped.
Definition: MessageRouter.hpp:57
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1225
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
Handler(const Handler &orig_)
Copy constructor.
Definition: MessageRouter.hpp:81
Definition: ampsplusplus.hpp:103