25 #ifndef _MESSAGEROUTER_HPP_ 26 #define _MESSAGEROUTER_HPP_ 28 #include "ampscrc.hpp" 38 template <
typename Func,
typename Object>
45 #ifdef AMPS_USE_FUNCTIONAL 46 std::function<void(Object)> _callable;
52 static void noOpHandler(Object) {;}
54 typedef Func FunctionType;
58 #ifdef AMPS_USE_FUNCTIONAL
59 , _callable(
Handler<Func,Object>::noOpHandler)
71 : _func(func_), _userData(userData_)
72 #ifdef AMPS_USE_FUNCTIONAL
73 , _callable(noOpHandler)
82 : _func(orig_._func), _userData(orig_._userData)
83 #ifdef AMPS_USE_FUNCTIONAL
84 , _callable(orig_._callable)
89 #ifdef AMPS_USE_FUNCTIONAL 94 : _func(NULL), _userData(NULL), _callable(callback_), _isValid(true)
98 void invoke(Object message)
101 _func(message, _userData);
102 #ifdef AMPS_USE_FUNCTIONAL 113 _userData = rhs_._userData;
114 #ifdef AMPS_USE_FUNCTIONAL 115 _callable = rhs_._callable;
117 _isValid = rhs_._isValid;
122 bool isValid(
void)
const {
return _isValid; }
123 Func
function(void)
const {
return _func; }
124 void* userData(
void)
const {
return _userData; }
129 typedef void(*MessageHandlerFunc)(
const Message&,
void* userData);
139 MessageHandler _emptyMessageHandler;
140 typedef amps_uint64_t (*CRCFunction)(
const char*, size_t, amps_uint64_t);
146 MessageHandler _messageHandler;
147 unsigned _requestedAcks, _systemAcks, _terminationAck;
149 MessageRoute() : _requestedAcks(0), _systemAcks(0), _terminationAck(0) {;}
150 MessageRoute(
const MessageRoute& rhs_) :
151 _messageHandler(rhs_._messageHandler),
152 _requestedAcks (rhs_._requestedAcks),
153 _systemAcks (rhs_._systemAcks),
154 _terminationAck(rhs_._terminationAck)
156 const MessageRoute& operator=(
const MessageRoute& rhs_)
158 _messageHandler = rhs_._messageHandler;
159 _requestedAcks = rhs_._requestedAcks;
160 _systemAcks = rhs_._systemAcks;
161 _terminationAck = rhs_._terminationAck;
164 MessageRoute(MessageHandler messageHandler_,
unsigned requestedAcks_,
165 unsigned systemAcks_,
bool isSubscribe_) :
166 _messageHandler(messageHandler_),
167 _requestedAcks(requestedAcks_),
168 _systemAcks(systemAcks_),
175 unsigned bitCounter = (requestedAcks_|systemAcks_)>>1;
177 while(bitCounter > 0) { bitCounter>>=1; _terminationAck<<=1; }
182 unsigned deliverAck(
const Message& message_,
unsigned ackType_)
184 if ( (_requestedAcks & ackType_) == 0)
return 0;
186 _messageHandler.invoke(message_);
187 }
catch (std::exception &ex)
189 std::cerr << ex.what() << std::endl;
193 bool isTerminationAck(
unsigned ackType_)
const 195 return ackType_ == _terminationAck;
197 unsigned deliverData(
const Message& message_)
199 _messageHandler.invoke(message_);
202 const MessageHandler& getMessageHandler()
const 204 return _messageHandler;
206 MessageHandler& getMessageHandler()
208 return _messageHandler;
214 : _previousCommandId(0),
215 _lookupGenerationCount(0),
219 _crc = AMPS::CRC<0>::crcNoSSE;
221 if(AMPS::CRC<0>::isSSE42Enabled())
223 _crc = AMPS::CRC<0>::crc;
227 _crc = AMPS::CRC<0>::crcNoSSE;
233 unsigned requestedAcks_,
unsigned systemAcks_,
bool isSubscribe_)
235 Lock<Mutex> lock(_lock);
236 RouteMap::iterator i = _routes.find(commandId_);
237 if (i == _routes.end()) {
238 _routes[commandId_.
deepCopy()] = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, isSubscribe_);
242 if (isSubscribe_ && !i->second.isTerminationAck(0))
244 i->second = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, isSubscribe_);
250 bool removeRoute(
const Field& commandId_)
252 Lock<Mutex> lock(_lock);
253 RouteMap::iterator i = _routes.find(commandId_);
254 if (i == _routes.end())
return false;
255 return _removeRoute(i);
260 AMPS_FETCH_ADD(&_generationCount,1);
261 std::vector<void*> removeData;
263 Lock<Mutex> lock(_lock);
264 for (RouteMap::iterator i = _routes.begin(); i != _routes.end(); ++i)
270 void* data = i->second.getMessageHandler().userData();
271 removeData.push_back(data);
276 for (
size_t i=0; i<removeData.size(); ++i)
278 amps_invoke_remove_route_function(removeData[i]);
283 bool hasRoute(
const Field& commandId_)
const 285 Lock<Mutex> lock(_lock);
286 RouteMap::const_iterator it = _routes.find(commandId_);
287 return it != _routes.end();
291 bool getRoute(
const Field& commandId_, MessageHandler& result_)
const 293 Lock<Mutex> lock(_lock);
294 RouteMap::const_iterator it = _routes.find(commandId_);
295 if (it != _routes.end())
297 result_ = it->second.getMessageHandler();
302 result_ = _emptyMessageHandler;
314 MessageHandler handler;
316 class RouteCache :
public std::vector<RouteLookup>
318 RouteCache(
const RouteCache&);
319 void operator=(
const RouteCache&);
322 : _generationCount(0),
326 void invalidateCache(
void)
328 _generationCount = 0;
332 void invalidateCache(ATOMIC_TYPE generationCount_, amps_uint64_t hashVal_)
334 _generationCount = generationCount_;
339 bool isCacheHit(ATOMIC_TYPE generationCount_, amps_uint64_t hashVal_)
const 341 return _generationCount == generationCount_ && _hashVal == hashVal_;
345 ATOMIC_TYPE _generationCount;
346 amps_uint64_t _hashVal;
352 size_t parseRoutes(
const Field& commandIdList_, RouteCache& result_)
357 amps_uint64_t listHash = _crc(commandIdList_.
data(), commandIdList_.
len(), 0);
358 if (result_.isCacheHit(_generationCount, listHash))
360 return result_.size();
362 result_.invalidateCache(_generationCount, listHash);
365 Lock<Mutex> lockGuard(_lock);
366 size_t resultCount = 0;
367 const char* pStart = commandIdList_.
data();
368 for(
const char* p = pStart, *e = commandIdList_.
len() + pStart; p < e;
371 const char* delimiter = p;
372 while(delimiter != e && *delimiter !=
',')
377 result_.push_back(RouteLookup());
380 RouteLookup& result = result_[resultCount];
381 result.idOffset = (size_t)(p - pStart);
382 result.idLength = (size_t)(delimiter - p);
384 RouteMap::const_iterator it = _routes.find(subId);
385 if (it != _routes.end())
387 result.handler = it->second.getMessageHandler();
391 result.handler = _emptyMessageHandler;
397 unsigned deliverAck(
const Message& ackMessage_,
unsigned ackType_)
400 unsigned messagesDelivered = 0;
407 messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
410 !key.
empty() && messagesDelivered == 0)
412 messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
415 !key.
empty() && messagesDelivered == 0)
417 messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
419 return messagesDelivered;
423 unsigned deliverData(
const Message& dataMessage_,
const Field& commandId_)
425 unsigned messagesDelivered = 0;
426 amps_uint64_t hval = _crc(commandId_.
data(), commandId_.
len(), 0);
427 if(_previousCommandId == hval &&
428 _lookupGenerationCount == _generationCount)
430 messagesDelivered += _previousHandler.deliverData(dataMessage_);
434 Lock<Mutex> lock(_lock);
435 RouteMap::iterator it = _routes.find(commandId_);
436 if(it != _routes.end())
438 _previousCommandId = hval;
439 _lookupGenerationCount = _generationCount;
440 _previousHandler = it->second;
441 messagesDelivered += it->second.deliverData(dataMessage_);
444 return messagesDelivered;
447 void invalidateCache(
void)
449 _previousCommandId = 0;
452 void unsubscribeAll(
void)
454 AMPS_FETCH_ADD(&_generationCount,1);
455 std::vector<Field> removeIds;
456 std::vector<void*> removeData;
457 Lock<Mutex> lock(_lock);
458 for (RouteMap::iterator it = _routes.begin(); it != _routes.end(); ++it)
460 if (it->second.isTerminationAck(0))
462 removeIds.push_back(it->first);
463 removeData.push_back(it->second.getMessageHandler().userData());
466 for (
size_t i=0; i<removeIds.size(); ++i)
469 RouteMap::iterator it = _routes.find(removeIds[i]);
475 Unlock<Mutex> u(_lock);
476 for (
size_t i=0; i<removeData.size(); ++i)
478 amps_invoke_remove_route_function(removeData[i]);
483 typedef std::map<Field, MessageRoute> RouteMap;
487 MessageRoute _previousHandler;
488 amps_uint64_t _previousCommandId;
489 mutable ATOMIC_TYPE _lookupGenerationCount;
490 mutable ATOMIC_TYPE _generationCount;
494 unsigned _deliverAck(
const Message& ackMessage_,
unsigned ackType_,
Field& commandId_)
496 Lock<Mutex> lock(_lock);
497 unsigned messagesDelivered = 0;
498 RouteMap::iterator it = _routes.find(commandId_);
499 if(it!=_routes.end())
501 MessageRoute& route = it->second;
502 messagesDelivered += route.deliverAck(ackMessage_, ackType_);
503 if(route.isTerminationAck(ackType_))
509 return messagesDelivered;
511 unsigned _processAckForRemoval(
unsigned ackType_,
Field& commandId_)
513 Lock<Mutex> lock(_lock);
514 RouteMap::iterator it = _routes.find(commandId_);
515 if(it!=_routes.end())
517 MessageRoute& route = it->second;
518 if(route.isTerminationAck(ackType_))
528 bool _removeRoute(RouteMap::iterator& it_)
531 AMPS_FETCH_ADD(&_generationCount,1);
533 Field f = it_->first;
534 void* routeData = it_->second.getMessageHandler().userData();
539 Unlock<Mutex> u(_lock);
540 amps_invoke_remove_route_function(routeData);
Defines the AMPS::Message class and related classes.
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1107
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:447
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
Handler(const T &callback_)
Constructor for use with a standard c++ library function object.
Definition: MessageRouter.hpp:93
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1011
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1012
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:126
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:136
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
Handler(Func func_, void *userData_)
Constructor for use with a bare function pointer.
Definition: MessageRouter.hpp:70
Wrapper for callback functions in AMPS.
Definition: MessageRouter.hpp:39
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4136
Handler()
Null constructor – no function is wrapped.
Definition: MessageRouter.hpp:57
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1114
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
Handler(const Handler &orig_)
Copy constructor.
Definition: MessageRouter.hpp:81
Definition: ampsplusplus.hpp:103