25 #ifndef _MESSAGEROUTER_HPP_ 26 #define _MESSAGEROUTER_HPP_ 28 #include "ampscrc.hpp" 38 template <
typename Func,
typename Object>
45 #ifdef AMPS_USE_FUNCTIONAL 46 std::function<void(Object)> _callable;
52 static void noOpHandler(Object) {;}
54 typedef Func FunctionType;
58 #ifdef AMPS_USE_FUNCTIONAL
59 , _callable(
Handler<Func,Object>::noOpHandler)
71 : _func(func_), _userData(userData_)
72 #ifdef AMPS_USE_FUNCTIONAL
73 , _callable(noOpHandler)
82 : _func(orig_._func), _userData(orig_._userData)
83 #ifdef AMPS_USE_FUNCTIONAL
84 , _callable(orig_._callable)
89 #ifdef AMPS_USE_FUNCTIONAL 94 : _func(NULL), _userData(NULL), _callable(callback_), _isValid(true)
98 void invoke(Object message)
101 _func(message, _userData);
102 #ifdef AMPS_USE_FUNCTIONAL 113 _userData = rhs_._userData;
114 #ifdef AMPS_USE_FUNCTIONAL 115 _callable = rhs_._callable;
117 _isValid = rhs_._isValid;
122 bool isValid(
void)
const {
return _isValid; }
123 Func
function(void)
const {
return _func; }
124 void* userData(
void)
const {
return _userData; }
129 typedef void(*MessageHandlerFunc)(
const Message&,
void* userData);
139 MessageHandler _emptyMessageHandler;
140 typedef amps_uint64_t (*CRCFunction)(
const char*, size_t, amps_uint64_t);
146 MessageHandler _messageHandler;
147 unsigned _requestedAcks, _systemAcks, _terminationAck;
149 MessageRoute() : _requestedAcks(0), _systemAcks(0), _terminationAck(0) {;}
150 MessageRoute(
const MessageRoute& rhs_) :
151 _messageHandler(rhs_._messageHandler),
152 _requestedAcks (rhs_._requestedAcks),
153 _systemAcks (rhs_._systemAcks),
154 _terminationAck(rhs_._terminationAck)
156 const MessageRoute& operator=(
const MessageRoute& rhs_)
158 _messageHandler = rhs_._messageHandler;
159 _requestedAcks = rhs_._requestedAcks;
160 _systemAcks = rhs_._systemAcks;
161 _terminationAck = rhs_._terminationAck;
164 MessageRoute(MessageHandler messageHandler_,
unsigned requestedAcks_,
165 unsigned systemAcks_,
bool isSubscribe_) :
166 _messageHandler(messageHandler_),
167 _requestedAcks(requestedAcks_),
168 _systemAcks(systemAcks_),
175 unsigned bitCounter = (requestedAcks_|systemAcks_)>>1;
177 while(bitCounter > 0) { bitCounter>>=1; _terminationAck<<=1; }
182 unsigned deliverAck(
const Message& message_,
unsigned ackType_)
184 if ( (_requestedAcks & ackType_) == 0)
return 0;
186 _messageHandler.invoke(message_);
187 }
catch (std::exception &ex)
189 std::cerr << ex.what() << std::endl;
193 bool isTerminationAck(
unsigned ackType_)
const 195 return ackType_ == _terminationAck;
197 unsigned deliverData(
const Message& message_)
199 _messageHandler.invoke(message_);
202 const MessageHandler& getMessageHandler()
const 204 return _messageHandler;
206 MessageHandler& getMessageHandler()
208 return _messageHandler;
214 : _previousCommandId(0),
215 _lookupGenerationCount(0),
219 _crc = AMPS::CRC<0>::crcNoSSE;
221 if(AMPS::CRC<0>::isSSE42Enabled())
223 _crc = AMPS::CRC<0>::crc;
227 _crc = AMPS::CRC<0>::crcNoSSE;
233 unsigned requestedAcks_,
unsigned systemAcks_,
bool isSubscribe_)
235 Lock<Mutex> lock(_lock);
236 RouteMap::iterator i = _routes.find(commandId_);
237 if (i == _routes.end()) {
238 _routes[commandId_.
deepCopy()] = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, isSubscribe_);
243 if (isSubscribe_ && !i->second.isTerminationAck(0))
245 void *routeData = i->second.getMessageHandler().userData();;
246 i->second = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, isSubscribe_);
249 Unlock<Mutex> u(_lock);
250 amps_invoke_remove_route_function(routeData);
259 bool removeRoute(
const Field& commandId_)
261 Lock<Mutex> lock(_lock);
262 RouteMap::iterator i = _routes.find(commandId_);
263 if (i == _routes.end())
return false;
264 return _removeRoute(i);
269 AMPS_FETCH_ADD(&_generationCount,1);
270 std::vector<void*> removeData;
272 Lock<Mutex> lock(_lock);
273 for (RouteMap::iterator i = _routes.begin(); i != _routes.end(); ++i)
279 void* data = i->second.getMessageHandler().userData();
280 removeData.push_back(data);
285 for (
size_t i=0; i<removeData.size(); ++i)
287 amps_invoke_remove_route_function(removeData[i]);
292 bool hasRoute(
const Field& commandId_)
const 294 Lock<Mutex> lock(_lock);
295 RouteMap::const_iterator it = _routes.find(commandId_);
296 return it != _routes.end();
300 bool getRoute(
const Field& commandId_, MessageHandler& result_)
const 302 Lock<Mutex> lock(_lock);
303 RouteMap::const_iterator it = _routes.find(commandId_);
304 if (it != _routes.end())
306 result_ = it->second.getMessageHandler();
311 result_ = _emptyMessageHandler;
323 MessageHandler handler;
325 class RouteCache :
public std::vector<RouteLookup>
327 RouteCache(
const RouteCache&);
328 void operator=(
const RouteCache&);
331 : _generationCount(0),
335 void invalidateCache(
void)
337 _generationCount = 0;
341 void invalidateCache(ATOMIC_TYPE generationCount_, amps_uint64_t hashVal_)
343 _generationCount = generationCount_;
348 bool isCacheHit(ATOMIC_TYPE generationCount_, amps_uint64_t hashVal_)
const 350 return _generationCount == generationCount_ && _hashVal == hashVal_;
354 ATOMIC_TYPE _generationCount;
355 amps_uint64_t _hashVal;
361 size_t parseRoutes(
const Field& commandIdList_, RouteCache& result_)
366 amps_uint64_t listHash = _crc(commandIdList_.
data(), commandIdList_.
len(), 0);
367 if (result_.isCacheHit(_generationCount, listHash))
369 return result_.size();
371 result_.invalidateCache(_generationCount, listHash);
374 Lock<Mutex> lockGuard(_lock);
375 size_t resultCount = 0;
376 const char* pStart = commandIdList_.
data();
377 for(
const char* p = pStart, *e = commandIdList_.
len() + pStart; p < e;
380 const char* delimiter = p;
381 while(delimiter != e && *delimiter !=
',')
386 result_.push_back(RouteLookup());
389 RouteLookup& result = result_[resultCount];
390 result.idOffset = (size_t)(p - pStart);
391 result.idLength = (size_t)(delimiter - p);
393 RouteMap::const_iterator it = _routes.find(subId);
394 if (it != _routes.end())
396 result.handler = it->second.getMessageHandler();
400 result.handler = _emptyMessageHandler;
406 unsigned deliverAck(
const Message& ackMessage_,
unsigned ackType_)
409 unsigned messagesDelivered = 0;
416 messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
419 !key.
empty() && messagesDelivered == 0)
421 messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
424 !key.
empty() && messagesDelivered == 0)
426 messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
428 return messagesDelivered;
432 unsigned deliverData(
const Message& dataMessage_,
const Field& commandId_)
434 unsigned messagesDelivered = 0;
435 amps_uint64_t hval = _crc(commandId_.
data(), commandId_.
len(), 0);
436 if(_previousCommandId == hval &&
437 _lookupGenerationCount == _generationCount)
439 messagesDelivered += _previousHandler.deliverData(dataMessage_);
443 Lock<Mutex> lock(_lock);
444 RouteMap::iterator it = _routes.find(commandId_);
445 if(it != _routes.end())
447 _previousCommandId = hval;
448 _lookupGenerationCount = _generationCount;
449 _previousHandler = it->second;
450 messagesDelivered += it->second.deliverData(dataMessage_);
453 return messagesDelivered;
456 void invalidateCache(
void)
458 _previousCommandId = 0;
461 void unsubscribeAll(
void)
463 AMPS_FETCH_ADD(&_generationCount,1);
464 std::vector<Field> removeIds;
465 std::vector<void*> removeData;
466 Lock<Mutex> lock(_lock);
467 for (RouteMap::iterator it = _routes.begin(); it != _routes.end(); ++it)
469 if (it->second.isTerminationAck(0))
471 removeIds.push_back(it->first);
472 removeData.push_back(it->second.getMessageHandler().userData());
475 for (
size_t i=0; i<removeIds.size(); ++i)
478 RouteMap::iterator it = _routes.find(removeIds[i]);
484 Unlock<Mutex> u(_lock);
485 for (
size_t i=0; i<removeData.size(); ++i)
487 amps_invoke_remove_route_function(removeData[i]);
492 typedef std::map<Field, MessageRoute> RouteMap;
496 MessageRoute _previousHandler;
497 amps_uint64_t _previousCommandId;
498 mutable ATOMIC_TYPE _lookupGenerationCount;
499 mutable ATOMIC_TYPE _generationCount;
503 unsigned _deliverAck(
const Message& ackMessage_,
unsigned ackType_,
Field& commandId_)
505 Lock<Mutex> lock(_lock);
506 unsigned messagesDelivered = 0;
507 RouteMap::iterator it = _routes.find(commandId_);
508 if(it!=_routes.end())
510 MessageRoute& route = it->second;
511 messagesDelivered += route.deliverAck(ackMessage_, ackType_);
512 if(route.isTerminationAck(ackType_))
518 return messagesDelivered;
520 unsigned _processAckForRemoval(
unsigned ackType_,
Field& commandId_)
522 Lock<Mutex> lock(_lock);
523 RouteMap::iterator it = _routes.find(commandId_);
524 if(it!=_routes.end())
526 MessageRoute& route = it->second;
527 if(route.isTerminationAck(ackType_))
537 bool _removeRoute(RouteMap::iterator& it_)
540 AMPS_FETCH_ADD(&_generationCount,1);
542 Field f = it_->first;
543 void* routeData = it_->second.getMessageHandler().userData();
548 Unlock<Mutex> u(_lock);
549 amps_invoke_remove_route_function(routeData);
Defines the AMPS::Message class and related classes.
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1235
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:501
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
Handler(const T &callback_)
Constructor for use with a standard c++ library function object.
Definition: MessageRouter.hpp:93
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1035
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1138
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:126
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:136
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
Handler(Func func_, void *userData_)
Constructor for use with a bare function pointer.
Definition: MessageRouter.hpp:70
Wrapper for callback functions in AMPS.
Definition: MessageRouter.hpp:39
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4132
Handler()
Null constructor – no function is wrapped.
Definition: MessageRouter.hpp:57
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1225
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
Handler(const Handler &orig_)
Copy constructor.
Definition: MessageRouter.hpp:81
Definition: ampsplusplus.hpp:103