25 #ifndef _MESSAGEROUTER_HPP_ 26 #define _MESSAGEROUTER_HPP_ 28 #include "ampscrc.hpp" 38 template <
typename Func,
typename Object>
45 #ifdef AMPS_USE_FUNCTIONAL 46 std::function<void(Object)> _callable;
52 static void noOpHandler(Object) {;}
54 typedef Func FunctionType;
58 #ifdef AMPS_USE_FUNCTIONAL
59 , _callable(
Handler<Func, Object>::noOpHandler)
71 : _func(func_), _userData(userData_)
72 #ifdef AMPS_USE_FUNCTIONAL
73 , _callable(noOpHandler)
82 : _func(orig_._func), _userData(orig_._userData)
83 #ifdef AMPS_USE_FUNCTIONAL
84 , _callable(orig_._callable)
89 #ifdef AMPS_USE_FUNCTIONAL 94 : _func(NULL), _userData(NULL), _callable(callback_), _isValid(true)
98 void invoke(Object message)
102 _func(message, _userData);
104 #ifdef AMPS_USE_FUNCTIONAL 117 _userData = rhs_._userData;
118 #ifdef AMPS_USE_FUNCTIONAL 119 _callable = rhs_._callable;
121 _isValid = rhs_._isValid;
126 bool isValid(
void)
const 130 Func
function(void)
const 134 void* userData(
void)
const 142 typedef void(*MessageHandlerFunc)(
const Message&,
void* userData);
152 MessageHandler _emptyMessageHandler;
153 typedef amps_uint64_t (*CRCFunction)(
const char*, size_t, amps_uint64_t);
159 MessageHandler _messageHandler;
160 unsigned _requestedAcks, _systemAcks, _terminationAck;
162 MessageRoute() : _requestedAcks(0), _systemAcks(0), _terminationAck(0) {;}
163 MessageRoute(
const MessageRoute& rhs_) :
164 _messageHandler(rhs_._messageHandler),
165 _requestedAcks (rhs_._requestedAcks),
166 _systemAcks (rhs_._systemAcks),
167 _terminationAck(rhs_._terminationAck)
169 const MessageRoute& operator=(
const MessageRoute& rhs_)
171 _messageHandler = rhs_._messageHandler;
172 _requestedAcks = rhs_._requestedAcks;
173 _systemAcks = rhs_._systemAcks;
174 _terminationAck = rhs_._terminationAck;
177 MessageRoute(MessageHandler messageHandler_,
unsigned requestedAcks_,
178 unsigned systemAcks_,
bool isSubscribe_) :
179 _messageHandler(messageHandler_),
180 _requestedAcks(requestedAcks_),
181 _systemAcks(systemAcks_),
188 unsigned bitCounter = (requestedAcks_ | systemAcks_) >> 1;
190 while (bitCounter > 0)
193 _terminationAck <<= 1;
199 unsigned deliverAck(
const Message& message_,
unsigned ackType_)
201 if ( (_requestedAcks & ackType_) == 0)
207 _messageHandler.invoke(message_);
209 catch (std::exception& ex)
211 std::cerr << ex.what() << std::endl;
215 bool isTerminationAck(
unsigned ackType_)
const 217 return ackType_ == _terminationAck;
219 unsigned deliverData(
const Message& message_)
221 _messageHandler.invoke(message_);
224 const MessageHandler& getMessageHandler()
const 226 return _messageHandler;
228 MessageHandler& getMessageHandler()
230 return _messageHandler;
236 : _previousCommandId(0),
237 _lookupGenerationCount(0),
241 _crc = AMPS::CRC<0>::crcNoSSE;
243 if (AMPS::CRC<0>::isSSE42Enabled())
245 _crc = AMPS::CRC<0>::crc;
249 _crc = AMPS::CRC<0>::crcNoSSE;
255 unsigned requestedAcks_,
unsigned systemAcks_,
bool isSubscribe_)
257 Lock<Mutex> lock(_lock);
258 RouteMap::iterator i = _routes.find(commandId_);
259 if (i == _routes.end())
261 _routes[commandId_.
deepCopy()] = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, isSubscribe_);
267 if (isSubscribe_ && !i->second.isTerminationAck(0))
269 void* routeData = i->second.getMessageHandler().userData();;
270 i->second = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, isSubscribe_);
273 Unlock<Mutex> u(_lock);
274 amps_invoke_remove_route_function(routeData);
283 bool removeRoute(
const Field& commandId_)
285 Lock<Mutex> lock(_lock);
286 RouteMap::iterator i = _routes.find(commandId_);
287 if (i == _routes.end())
291 return _removeRoute(i);
296 AMPS_FETCH_ADD(&_generationCount, 1);
297 std::vector<void*> removeData;
299 Lock<Mutex> lock(_lock);
300 for (RouteMap::iterator i = _routes.begin(); i != _routes.end(); ++i)
306 void* data = i->second.getMessageHandler().userData();
307 removeData.push_back(data);
312 for (
size_t i = 0; i < removeData.size(); ++i)
314 amps_invoke_remove_route_function(removeData[i]);
319 bool hasRoute(
const Field& commandId_)
const 321 Lock<Mutex> lock(_lock);
322 RouteMap::const_iterator it = _routes.find(commandId_);
323 return it != _routes.end();
327 bool getRoute(
const Field& commandId_, MessageHandler& result_)
const 329 Lock<Mutex> lock(_lock);
330 RouteMap::const_iterator it = _routes.find(commandId_);
331 if (it != _routes.end())
333 result_ = it->second.getMessageHandler();
338 result_ = _emptyMessageHandler;
350 MessageHandler handler;
352 class RouteCache :
public std::vector<RouteLookup>
354 RouteCache(
const RouteCache&);
355 void operator=(
const RouteCache&);
358 : _generationCount(0),
362 void invalidateCache(
void)
364 _generationCount = 0;
368 void invalidateCache(ATOMIC_TYPE generationCount_, amps_uint64_t hashVal_)
370 _generationCount = generationCount_;
375 bool isCacheHit(ATOMIC_TYPE generationCount_, amps_uint64_t hashVal_)
const 377 return _generationCount == generationCount_ && _hashVal == hashVal_;
381 ATOMIC_TYPE _generationCount;
382 amps_uint64_t _hashVal;
388 size_t parseRoutes(
const Field& commandIdList_, RouteCache& result_)
393 amps_uint64_t listHash = _crc(commandIdList_.
data(), commandIdList_.
len(), 0);
394 if (result_.isCacheHit(_generationCount, listHash))
396 return result_.size();
398 result_.invalidateCache(_generationCount, listHash);
401 Lock<Mutex> lockGuard(_lock);
402 size_t resultCount = 0;
403 const char* pStart = commandIdList_.
data();
404 for (
const char* p = pStart, *e = commandIdList_.
len() + pStart; p < e;
407 const char* delimiter = p;
408 while (delimiter != e && *delimiter !=
',')
413 #ifdef AMPS_USE_EMPLACE 414 result_.emplace_back(RouteLookup());
416 result_.push_back(RouteLookup());
420 RouteLookup& result = result_[resultCount];
421 result.idOffset = (size_t)(p - pStart);
422 result.idLength = (size_t)(delimiter - p);
424 RouteMap::const_iterator it = _routes.find(subId);
425 if (it != _routes.end())
427 result.handler = it->second.getMessageHandler();
431 result.handler = _emptyMessageHandler;
437 unsigned deliverAck(
const Message& ackMessage_,
unsigned ackType_)
440 unsigned messagesDelivered = 0;
447 messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
450 !key.
empty() && messagesDelivered == 0)
452 messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
455 !key.
empty() && messagesDelivered == 0)
457 messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
459 return messagesDelivered;
463 unsigned deliverData(
const Message& dataMessage_,
const Field& commandId_)
465 unsigned messagesDelivered = 0;
466 amps_uint64_t hval = _crc(commandId_.
data(), commandId_.
len(), 0);
467 if (_previousCommandId == hval &&
468 _lookupGenerationCount == _generationCount)
470 messagesDelivered += _previousHandler.deliverData(dataMessage_);
474 Lock<Mutex> lock(_lock);
475 RouteMap::iterator it = _routes.find(commandId_);
476 if (it != _routes.end())
478 _previousCommandId = hval;
479 _lookupGenerationCount = _generationCount;
480 _previousHandler = it->second;
481 messagesDelivered += it->second.deliverData(dataMessage_);
484 return messagesDelivered;
487 void invalidateCache(
void)
489 _previousCommandId = 0;
492 void unsubscribeAll(
void)
494 AMPS_FETCH_ADD(&_generationCount, 1);
495 std::vector<Field> removeIds;
496 std::vector<void*> removeData;
497 Lock<Mutex> lock(_lock);
498 for (RouteMap::iterator it = _routes.begin(); it != _routes.end(); ++it)
500 if (it->second.isTerminationAck(0))
502 removeIds.push_back(it->first);
503 removeData.push_back(it->second.getMessageHandler().userData());
506 for (
size_t i = 0; i < removeIds.size(); ++i)
509 RouteMap::iterator it = _routes.find(removeIds[i]);
515 Unlock<Mutex> u(_lock);
516 for (
size_t i = 0; i < removeData.size(); ++i)
518 amps_invoke_remove_route_function(removeData[i]);
523 typedef std::map<Field, MessageRoute> RouteMap;
527 MessageRoute _previousHandler;
528 amps_uint64_t _previousCommandId;
529 mutable ATOMIC_TYPE _lookupGenerationCount;
530 mutable ATOMIC_TYPE _generationCount;
534 unsigned _deliverAck(
const Message& ackMessage_,
unsigned ackType_,
Field& commandId_)
536 Lock<Mutex> lock(_lock);
537 unsigned messagesDelivered = 0;
538 RouteMap::iterator it = _routes.find(commandId_);
539 if (it != _routes.end())
541 MessageRoute& route = it->second;
542 messagesDelivered += route.deliverAck(ackMessage_, ackType_);
543 if (route.isTerminationAck(ackType_))
549 return messagesDelivered;
551 unsigned _processAckForRemoval(
unsigned ackType_,
Field& commandId_)
553 Lock<Mutex> lock(_lock);
554 RouteMap::iterator it = _routes.find(commandId_);
555 if (it != _routes.end())
557 MessageRoute& route = it->second;
558 if (route.isTerminationAck(ackType_))
568 bool _removeRoute(RouteMap::iterator& it_)
571 AMPS_FETCH_ADD(&_generationCount, 1);
573 Field f = it_->first;
574 void* routeData = it_->second.getMessageHandler().userData();
579 Unlock<Mutex> u(_lock);
580 amps_invoke_remove_route_function(routeData);
Defines the AMPS::Message class and related classes.
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1373
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:511
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:246
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:259
Handler(const T &callback_)
Constructor for use with a standard c++ library function object.
Definition: MessageRouter.hpp:93
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1141
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1248
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:127
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:266
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:149
Field represents the value of a single field in a Message.
Definition: Field.hpp:85
Handler(Func func_, void *userData_)
Constructor for use with a bare function pointer.
Definition: MessageRouter.hpp:70
Wrapper for callback functions in AMPS.
Definition: MessageRouter.hpp:39
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4682
Handler()
Null constructor – no function is wrapped.
Definition: MessageRouter.hpp:57
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1363
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:217
Handler(const Handler &orig_)
Copy constructor.
Definition: MessageRouter.hpp:81
Definition: ampsplusplus.hpp:103