AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.0.5
MessageRouter.hpp
1 //
3 // Copyright (c) 2010-2020 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef _MESSAGEROUTER_HPP_
26 #define _MESSAGEROUTER_HPP_
27 #include <map>
28 #include "util.hpp"
29 #include "Message.hpp"
30 
31 namespace AMPS
32 {
37 template <typename Func, typename Object>
38 class Handler
39 {
40 protected:
41  friend class MessageStream;
42  Func _func;
43  void* _userData;
44 #ifdef AMPS_USE_FUNCTIONAL
45  std::function<void(Object)> _callable;
46 #endif
47  bool _isValid;
48 
49 public:
50  // No op function for handlers
51  static void noOpHandler(Object) {;}
52 
53  typedef Func FunctionType;
56  Handler() : _func(NULL), _userData(NULL)
57 #ifdef AMPS_USE_FUNCTIONAL
58  , _callable(Handler<Func,Object>::noOpHandler)
59 #endif
60  , _isValid(false)
61  {
62  }
63 
69  Handler(Func func_, void* userData_)
70  : _func(func_), _userData(userData_)
71 #ifdef AMPS_USE_FUNCTIONAL
72  , _callable(noOpHandler)
73 #endif
74  , _isValid(true)
75  {
76  }
77 
80  Handler(const Handler& orig_)
81  : _func(orig_._func), _userData(orig_._userData)
82 #ifdef AMPS_USE_FUNCTIONAL
83  , _callable(orig_._callable)
84 #endif
85  , _isValid(true)
86  {
87  }
88 #ifdef AMPS_USE_FUNCTIONAL
89  template <typename T>
92  Handler(const T& callback_)
93  : _func(NULL), _userData(NULL), _callable(callback_), _isValid(true)
94  {
95  }
96 #endif
97  void invoke(Object message)
98  {
99  if (_func)
100  _func(message, _userData);
101 #ifdef AMPS_USE_FUNCTIONAL
102  else
103  _callable(message);
104 #endif
105  }
106 
107  bool isValid(void) const { return _isValid; }
108  Func function(void) const { return _func; }
109  void* userData(void) const { return _userData; }
110 };
111 class Message;
114 typedef void(*MessageHandlerFunc)(const Message&, void* userData);
115 
117 
122 {
123 private:
124  MessageHandler _emptyMessageHandler;
125 
126  class MessageRoute
127  {
128  MessageHandler _messageHandler;
129  unsigned _requestedAcks, _systemAcks, _terminationAck;
130  public:
131  MessageRoute() : _requestedAcks(0), _systemAcks(0), _terminationAck(0) {;}
132  MessageRoute(const MessageRoute& rhs_) :
133  _messageHandler(rhs_._messageHandler),
134  _requestedAcks (rhs_._requestedAcks),
135  _systemAcks (rhs_._systemAcks),
136  _terminationAck(rhs_._terminationAck)
137  {;}
138  const MessageRoute& operator=(const MessageRoute& rhs_)
139  {
140  _messageHandler = rhs_._messageHandler;
141  _requestedAcks = rhs_._requestedAcks;
142  _systemAcks = rhs_._systemAcks;
143  _terminationAck = rhs_._terminationAck;
144  return *this;
145  }
146  MessageRoute(MessageHandler messageHandler_, unsigned requestedAcks_,
147  unsigned systemAcks_, bool isSubscribe_) :
148  _messageHandler(messageHandler_),
149  _requestedAcks(requestedAcks_),
150  _systemAcks(systemAcks_),
151  _terminationAck(0)
152  {
153  if(!isSubscribe_)
154  {
155  // The ack to terminate the route on is whatever the highest
156  // bit set in requestedAcks is.
157  unsigned bitCounter = (requestedAcks_|systemAcks_)>>1;
158  _terminationAck = 1;
159  while(bitCounter > 0) { bitCounter>>=1; _terminationAck<<=1; }
160  }
161  }
162 
163  // Deliver an ack to registered handler if the ack type was requested
164  unsigned deliverAck(const Message& message_, unsigned ackType_)
165  {
166  if ( (_requestedAcks & ackType_) == 0) return 0;
167  try {
168  _messageHandler.invoke(message_);
169  } catch (std::exception &ex)
170  {
171  std::cerr << ex.what() << std::endl;
172  }
173  return 1;
174  }
175  bool isTerminationAck(unsigned ackType_) const
176  {
177  return ackType_ == _terminationAck;
178  }
179  unsigned deliverData(const Message& message_)
180  {
181  _messageHandler.invoke(message_);
182  return 1;
183  }
184  const MessageHandler& getMessageHandler() const
185  {
186  return _messageHandler;
187  }
188  MessageHandler& getMessageHandler()
189  {
190  return _messageHandler;
191  }
192  };
193 
194 public:
195  MessageRouter()
196  : _previousCommandId(0),
197  _lookupGenerationCount(0),
198  _generationCount(0)
199  {;}
200 
201  void addRoute(const Field& commandId_, const AMPS::MessageHandler& messageHandler_,
202  unsigned requestedAcks_, unsigned systemAcks_, bool isSubscribe_)
203  {
204  Lock<Mutex> lock(_lock);
205  RouteMap::iterator i = _routes.find(commandId_);
206  if (i == _routes.end()) {
207  _routes[commandId_.deepCopy()] = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, isSubscribe_);
208  }
209  else {
210  i->second = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, isSubscribe_);
211  }
212  }
213 
214  // returns true if a route was removed.
215  bool removeRoute(const Field& commandId_)
216  {
217  Lock<Mutex> lock(_lock);
218  RouteMap::iterator i = _routes.find(commandId_);
219  if (i == _routes.end()) return false;
220  return _removeRoute(i);
221  }
222 
223  void clear()
224  {
225  AMPS_FETCH_ADD(&_generationCount,1);
226  Lock<Mutex> lock(_lock);
227  for (RouteMap::iterator i = _routes.begin(); i != _routes.end(); ++i)
228  {
229  // Make a non-const copy of Field and clear it, which will clear i as well
230  // but won't actually affect the map, which is unaware that the key's shared
231  // pointer has been deleted.
232  Field f = i->first;
233  f.clear();
234  }
235  _routes.clear();
236  }
237 
238  // Returns true if a route exists for a single id.
239  bool hasRoute(const Field& commandId_) const
240  {
241  Lock<Mutex> lock(_lock);
242  RouteMap::const_iterator it = _routes.find(commandId_);
243  return it != _routes.end();
244  }
245 
246  // Find a single route and return true if here, setting result_ to the handler.
247  bool getRoute(const Field& commandId_, MessageHandler& result_) const
248  {
249  Lock<Mutex> lock(_lock);
250  RouteMap::const_iterator it = _routes.find(commandId_);
251  if (it != _routes.end())
252  {
253  result_ = it->second.getMessageHandler();
254  return true;
255  }
256  else
257  {
258  result_ = _emptyMessageHandler;
259  return false;
260  }
261  }
262 
263  // RouteCache is the result type for a parseRoutes(); we do extra work
264  // to avoid hitting the map or its lock when the subids field on
265  // publish messages does not change.
266  struct RouteLookup
267  {
268  size_t idOffset;
269  size_t idLength;
270  MessageHandler handler;
271  };
272  class RouteCache : public std::vector<RouteLookup>
273  {
274  RouteCache(const RouteCache&);
275  void operator=(const RouteCache&);
276  public:
277  RouteCache(void)
278  : _generationCount(0),
279  _hashVal(0)
280  {;}
281 
282  void invalidateCache(void)
283  {
284  _generationCount = 0;
285  _hashVal = 0;
286  clear();
287  }
288  void invalidateCache(ATOMIC_TYPE generationCount_, size_t hashVal_)
289  {
290  _generationCount = generationCount_;
291  _hashVal = hashVal_;
292  clear();
293  }
294 
295  bool isCacheHit(ATOMIC_TYPE generationCount_, size_t hashVal_) const
296  {
297  return _generationCount == generationCount_ && _hashVal == hashVal_;
298  }
299 
300  private:
301  ATOMIC_TYPE _generationCount;
302  size_t _hashVal;
303  };
304 
305  // Parses the command id list into the route lookup vector and assigns
306  // the found handlers into the list. Only intended to be called by the
307  // message handler thread. Returns the number of command/sub IDs parsed.
308  size_t parseRoutes(const Field& commandIdList_, RouteCache& result_)
309  {
310  // Super shortcut: if the whole subID list is the same as the previous one,
311  // then assume the result_ contains all the right handlers already, and that
312  // the offsets and lengths of subIds are unchanged.
313  size_t listHash = commandIdList_.hash_function();
314  if (result_.isCacheHit(_generationCount, listHash))
315  {
316  return result_.size();
317  }
318  result_.invalidateCache(_generationCount, listHash);
319 
320  // Lock required now that we'll be using the route map.
321  Lock<Mutex> lockGuard(_lock);
322  size_t resultCount = 0;
323  const char* pStart = commandIdList_.data();
324  for(const char* p = pStart, *e = commandIdList_.len() + pStart; p < e;
325  ++p, ++resultCount)
326  {
327  const char* delimiter = p;
328  while(delimiter != e && *delimiter != ',')
329  {
330  ++delimiter;
331  }
332  AMPS::Field subId(p, (size_t)(delimiter-p));
333  result_.push_back(RouteLookup());
334  // Push back and then copy over fields; would emplace_back if available on
335  // all supported compilers.
336  RouteLookup& result = result_[resultCount];
337  result.idOffset = (size_t)(p - pStart);
338  result.idLength = (size_t)(delimiter - p);
339 
340  RouteMap::const_iterator it = _routes.find(subId);
341  if (it != _routes.end())
342  {
343  result.handler = it->second.getMessageHandler();
344  }
345  else
346  {
347  result.handler = _emptyMessageHandler;
348  }
349  p = delimiter;
350  }
351  return resultCount;
352  }
353  unsigned deliverAck(const Message& ackMessage_, unsigned ackType_)
354  {
355  assert(ackMessage_.getCommand() == "ack");
356  unsigned messagesDelivered = 0;
357  Field key;
358 
359  // Call _deliverAck, which will deliver to any waiting handlers
360  // AND remove the route if it's a termination ack
361  if(key = ackMessage_.getCommandId(), !key.empty())
362  {
363  messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
364  }
365  if(key = ackMessage_.getQueryID(),
366  !key.empty() && messagesDelivered == 0)
367  {
368  messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
369  }
370  if(key = ackMessage_.getSubscriptionId(),
371  !key.empty() && messagesDelivered == 0)
372  {
373  messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
374  }
375  return messagesDelivered;
376  }
377 
378  // deliverData may only be called by the message handler thread.
379  unsigned deliverData(const Message& dataMessage_, const Field& commandId_)
380  {
381  unsigned messagesDelivered = 0;
382  size_t hval = commandId_.hash_function();
383  if(_previousCommandId == hval &&
384  _lookupGenerationCount == _generationCount)
385  {
386  messagesDelivered += _previousHandler.deliverData(dataMessage_);
387  }
388  else
389  {
390  Lock<Mutex> lock(_lock);
391  RouteMap::iterator it = _routes.find(commandId_);
392  if(it != _routes.end())
393  {
394  _previousCommandId = hval;
395  _lookupGenerationCount = _generationCount;
396  _previousHandler = it->second;
397  messagesDelivered += it->second.deliverData(dataMessage_);
398  }
399  }
400  return messagesDelivered;
401  }
402 
403  void invalidateCache(void)
404  {
405  _previousCommandId = 0;
406  }
407 
408  void unsubscribeAll(void)
409  {
410  AMPS_FETCH_ADD(&_generationCount,1);
411  std::vector<Field> removeIds;
412  Lock<Mutex> lock(_lock);
413  for (RouteMap::iterator it = _routes.begin(); it != _routes.end(); ++it)
414  {
415  if (it->second.isTerminationAck(0)) removeIds.push_back(it->first);
416  }
417  for (size_t i=0; i<removeIds.size(); ++i)
418  {
419  RouteMap::iterator it = _routes.find(removeIds[i]);
420  // Make a non-const copy of Field and clear it, which will clear i as well
421  Field f = it->first;
422  f.clear();
423  _routes.erase(it);
424  }
425  }
426 
427 private:
428  typedef std::map<Field, MessageRoute> RouteMap;
429  RouteMap _routes;
430  mutable Mutex _lock;
431 
432  MessageRoute _previousHandler;
433  size_t _previousCommandId;
434  mutable ATOMIC_TYPE _lookupGenerationCount;
435  mutable ATOMIC_TYPE _generationCount;
436 
437  // Deliver the ack to any waiting handlers
438  // AND remove the route if it's a termination ack
439  unsigned _deliverAck(const Message& ackMessage_, unsigned ackType_, Field& commandId_)
440  {
441  Lock<Mutex> lock(_lock);
442  unsigned messagesDelivered = 0;
443  RouteMap::iterator it = _routes.find(commandId_);
444  if(it!=_routes.end())
445  {
446  MessageRoute& route = it->second;
447  messagesDelivered += route.deliverAck(ackMessage_, ackType_);
448  if(route.isTerminationAck(ackType_))
449  {
450  _removeRoute(it);
451  ++messagesDelivered;
452  }
453  }
454  return messagesDelivered;
455  }
456  unsigned _processAckForRemoval(unsigned ackType_, Field& commandId_)
457  {
458  Lock<Mutex> lock(_lock);
459  RouteMap::iterator it = _routes.find(commandId_);
460  if(it!=_routes.end())
461  {
462  MessageRoute& route = it->second;
463  if(route.isTerminationAck(ackType_))
464  {
465  _removeRoute(it);
466  return 1U;
467  }
468  }
469  return 0U;
470  }
471 
472  // returns true if a route was removed.
473  bool _removeRoute(RouteMap::iterator& it_)
474  {
475  // Called with lock already held
476  AMPS_FETCH_ADD(&_generationCount,1);
477  // Make a non-const copy of Field and clear it, which will clear i as well
478  Field f = it_->first;
479  f.clear();
480  _routes.erase(it_);
481  return true;
482  }
483 
484 };
485 
486 
487 }
488 
489 #endif
Defines the AMPS::Message class and related classes.
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1051
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:393
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:196
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:206
Handler(const T &callback_)
Constructor for use with a standard c++ library function object.
Definition: MessageRouter.hpp:92
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:955
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:956
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:93
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:213
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:121
Field represents the value of a single field in a Message.
Definition: Field.hpp:52
Handler(Func func_, void *userData_)
Constructor for use with a bare function pointer.
Definition: MessageRouter.hpp:69
Wrapper for callback functions in AMPS.
Definition: MessageRouter.hpp:38
An iterable object representing the contents of an AMPS topic.
Definition: ampsplusplus.hpp:4154
Handler()
Null constructor – no function is wrapped.
Definition: MessageRouter.hpp:56
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1057
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:167
Handler(const Handler &orig_)
Copy constructor.
Definition: MessageRouter.hpp:80
Definition: ampsplusplus.hpp:136