25 #ifndef _MESSAGEROUTER_HPP_ 26 #define _MESSAGEROUTER_HPP_ 37 template <
typename Func,
typename Object>
44 #ifdef AMPS_USE_FUNCTIONAL 45 std::function<void(Object)> _callable;
51 static void noOpHandler(Object) {;}
53 typedef Func FunctionType;
57 #ifdef AMPS_USE_FUNCTIONAL
58 , _callable(
Handler<Func,Object>::noOpHandler)
70 : _func(func_), _userData(userData_)
71 #ifdef AMPS_USE_FUNCTIONAL
72 , _callable(noOpHandler)
81 : _func(orig_._func), _userData(orig_._userData)
82 #ifdef AMPS_USE_FUNCTIONAL
83 , _callable(orig_._callable)
88 #ifdef AMPS_USE_FUNCTIONAL 93 : _func(NULL), _userData(NULL), _callable(callback_), _isValid(true)
97 void invoke(Object message)
100 _func(message, _userData);
101 #ifdef AMPS_USE_FUNCTIONAL 107 bool isValid(
void)
const {
return _isValid; }
108 Func
function(void)
const {
return _func; }
109 void* userData(
void)
const {
return _userData; }
114 typedef void(*MessageHandlerFunc)(
const Message&,
void* userData);
124 MessageHandler _emptyMessageHandler;
128 MessageHandler _messageHandler;
129 unsigned _requestedAcks, _systemAcks, _terminationAck;
131 MessageRoute() : _requestedAcks(0), _systemAcks(0), _terminationAck(0) {;}
132 MessageRoute(
const MessageRoute& rhs_) :
133 _messageHandler(rhs_._messageHandler),
134 _requestedAcks (rhs_._requestedAcks),
135 _systemAcks (rhs_._systemAcks),
136 _terminationAck(rhs_._terminationAck)
138 const MessageRoute& operator=(
const MessageRoute& rhs_)
140 _messageHandler = rhs_._messageHandler;
141 _requestedAcks = rhs_._requestedAcks;
142 _systemAcks = rhs_._systemAcks;
143 _terminationAck = rhs_._terminationAck;
146 MessageRoute(MessageHandler messageHandler_,
unsigned requestedAcks_,
147 unsigned systemAcks_,
bool isSubscribe_) :
148 _messageHandler(messageHandler_),
149 _requestedAcks(requestedAcks_),
150 _systemAcks(systemAcks_),
157 unsigned bitCounter = (requestedAcks_|systemAcks_)>>1;
159 while(bitCounter > 0) { bitCounter>>=1; _terminationAck<<=1; }
164 unsigned deliverAck(
const Message& message_,
unsigned ackType_)
166 if ( (_requestedAcks & ackType_) == 0)
return 0;
168 _messageHandler.invoke(message_);
169 }
catch (std::exception &ex)
171 std::cerr << ex.what() << std::endl;
175 bool isTerminationAck(
unsigned ackType_)
const 177 return ackType_ == _terminationAck;
179 unsigned deliverData(
const Message& message_)
181 _messageHandler.invoke(message_);
184 const MessageHandler& getMessageHandler()
const 186 return _messageHandler;
188 MessageHandler& getMessageHandler()
190 return _messageHandler;
196 : _previousCommandId(0),
197 _lookupGenerationCount(0),
202 unsigned requestedAcks_,
unsigned systemAcks_,
bool isSubscribe_)
204 Lock<Mutex> lock(_lock);
205 RouteMap::iterator i = _routes.find(commandId_);
206 if (i == _routes.end()) {
207 _routes[commandId_.
deepCopy()] = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, isSubscribe_);
210 i->second = MessageRoute(messageHandler_, requestedAcks_, systemAcks_, isSubscribe_);
215 bool removeRoute(
const Field& commandId_)
217 Lock<Mutex> lock(_lock);
218 RouteMap::iterator i = _routes.find(commandId_);
219 if (i == _routes.end())
return false;
220 return _removeRoute(i);
225 AMPS_FETCH_ADD(&_generationCount,1);
226 Lock<Mutex> lock(_lock);
227 for (RouteMap::iterator i = _routes.begin(); i != _routes.end(); ++i)
239 bool hasRoute(
const Field& commandId_)
const 241 Lock<Mutex> lock(_lock);
242 RouteMap::const_iterator it = _routes.find(commandId_);
243 return it != _routes.end();
247 bool getRoute(
const Field& commandId_, MessageHandler& result_)
const 249 Lock<Mutex> lock(_lock);
250 RouteMap::const_iterator it = _routes.find(commandId_);
251 if (it != _routes.end())
253 result_ = it->second.getMessageHandler();
258 result_ = _emptyMessageHandler;
270 MessageHandler handler;
272 class RouteCache :
public std::vector<RouteLookup>
274 RouteCache(
const RouteCache&);
275 void operator=(
const RouteCache&);
278 : _generationCount(0),
282 void invalidateCache(
void)
284 _generationCount = 0;
288 void invalidateCache(ATOMIC_TYPE generationCount_,
size_t hashVal_)
290 _generationCount = generationCount_;
295 bool isCacheHit(ATOMIC_TYPE generationCount_,
size_t hashVal_)
const 297 return _generationCount == generationCount_ && _hashVal == hashVal_;
301 ATOMIC_TYPE _generationCount;
308 size_t parseRoutes(
const Field& commandIdList_, RouteCache& result_)
313 size_t listHash = commandIdList_.hash_function();
314 if (result_.isCacheHit(_generationCount, listHash))
316 return result_.size();
318 result_.invalidateCache(_generationCount, listHash);
321 Lock<Mutex> lockGuard(_lock);
322 size_t resultCount = 0;
323 const char* pStart = commandIdList_.
data();
324 for(
const char* p = pStart, *e = commandIdList_.
len() + pStart; p < e;
327 const char* delimiter = p;
328 while(delimiter != e && *delimiter !=
',')
333 result_.push_back(RouteLookup());
336 RouteLookup& result = result_[resultCount];
337 result.idOffset = (size_t)(p - pStart);
338 result.idLength = (size_t)(delimiter - p);
340 RouteMap::const_iterator it = _routes.find(subId);
341 if (it != _routes.end())
343 result.handler = it->second.getMessageHandler();
347 result.handler = _emptyMessageHandler;
353 unsigned deliverAck(
const Message& ackMessage_,
unsigned ackType_)
356 unsigned messagesDelivered = 0;
363 messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
366 !key.
empty() && messagesDelivered == 0)
368 messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
371 !key.
empty() && messagesDelivered == 0)
373 messagesDelivered += _deliverAck(ackMessage_, ackType_, key);
375 return messagesDelivered;
379 unsigned deliverData(
const Message& dataMessage_,
const Field& commandId_)
381 unsigned messagesDelivered = 0;
382 size_t hval = commandId_.hash_function();
383 if(_previousCommandId == hval &&
384 _lookupGenerationCount == _generationCount)
386 messagesDelivered += _previousHandler.deliverData(dataMessage_);
390 Lock<Mutex> lock(_lock);
391 RouteMap::iterator it = _routes.find(commandId_);
392 if(it != _routes.end())
394 _previousCommandId = hval;
395 _lookupGenerationCount = _generationCount;
396 _previousHandler = it->second;
397 messagesDelivered += it->second.deliverData(dataMessage_);
400 return messagesDelivered;
403 void invalidateCache(
void)
405 _previousCommandId = 0;
408 void unsubscribeAll(
void)
410 AMPS_FETCH_ADD(&_generationCount,1);
411 std::vector<Field> removeIds;
412 Lock<Mutex> lock(_lock);
413 for (RouteMap::iterator it = _routes.begin(); it != _routes.end(); ++it)
415 if (it->second.isTerminationAck(0)) removeIds.push_back(it->first);
417 for (
size_t i=0; i<removeIds.size(); ++i)
419 RouteMap::iterator it = _routes.find(removeIds[i]);
428 typedef std::map<Field, MessageRoute> RouteMap;
432 MessageRoute _previousHandler;
433 size_t _previousCommandId;
434 mutable ATOMIC_TYPE _lookupGenerationCount;
435 mutable ATOMIC_TYPE _generationCount;
439 unsigned _deliverAck(
const Message& ackMessage_,
unsigned ackType_,
Field& commandId_)
441 Lock<Mutex> lock(_lock);
442 unsigned messagesDelivered = 0;
443 RouteMap::iterator it = _routes.find(commandId_);
444 if(it!=_routes.end())
446 MessageRoute& route = it->second;
447 messagesDelivered += route.deliverAck(ackMessage_, ackType_);
448 if(route.isTerminationAck(ackType_))
454 return messagesDelivered;
456 unsigned _processAckForRemoval(
unsigned ackType_,
Field& commandId_)
458 Lock<Mutex> lock(_lock);
459 RouteMap::iterator it = _routes.find(commandId_);
460 if(it!=_routes.end())
462 MessageRoute& route = it->second;
463 if(route.isTerminationAck(ackType_))
473 bool _removeRoute(RouteMap::iterator& it_)
476 AMPS_FETCH_ADD(&_generationCount,1);
478 Field f = it_->first;
Defines the AMPS::Message class and related classes.
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1051
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:393
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:196
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:206
Handler(const T &callback_)
Constructor for use with a standard c++ library function object.
Definition: MessageRouter.hpp:92
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:955
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:956
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:93
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:213
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:121
Field represents the value of a single field in a Message.
Definition: Field.hpp:52
Handler(Func func_, void *userData_)
Constructor for use with a bare function pointer.
Definition: MessageRouter.hpp:69
Wrapper for callback functions in AMPS.
Definition: MessageRouter.hpp:38
An iterable object representing the contents of an AMPS topic.
Definition: ampsplusplus.hpp:4154
Handler()
Null constructor – no function is wrapped.
Definition: MessageRouter.hpp:56
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1057
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:167
Handler(const Handler &orig_)
Copy constructor.
Definition: MessageRouter.hpp:80
Definition: ampsplusplus.hpp:136