AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.1
HAClientImpl.hpp
1 //
3 // Copyright (c) 2010-2021 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _HACLIENTIMPL_H_
27 #define _HACLIENTIMPL_H_
28 
29 #include <typeinfo>
30 #include <ampsplusplus.hpp>
31 #include <ServerChooser.hpp>
34 
35 namespace AMPS
36 {
37 
38 class HAClientImpl : public ClientImpl
39 {
40 public:
41  HAClientImpl(const std::string& name_)
42  : ClientImpl(name_), _timeout(AMPS_HACLIENT_TIMEOUT_DEFAULT)
43  , _reconnectDelay(AMPS_HACLIENT_RECONNECT_DEFAULT)
44  , _reconnectDelayStrategy(new ExponentialDelayStrategy(_reconnectDelay))
45  , _disconnected(false)
46  {
47 #ifdef AMPS_USE_FUNCTIONAL
48  setDisconnectHandler(HADisconnectHandler());
49 #else
50  setDisconnectHandler(AMPS::DisconnectHandler(&HADisconnectHandler::invoke, NULL));
51 #endif
52  setSubscriptionManager(new MemorySubscriptionManager());
53  }
54 
55  ~HAClientImpl()
56  {
57  _disconnected = true;
58  _cleanup();
59  }
60 
61  void setTimeout(int timeout_)
62  {
63  _timeout = timeout_;
64  }
65 
66  int getTimeout() const
67  {
68  return _timeout;
69  }
70 
71  unsigned int getReconnectDelay(void) const
72  {
73  return _reconnectDelay;
74  }
75 
76  void setReconnectDelay(unsigned int reconnectDelay_)
77  {
78  _reconnectDelay = reconnectDelay_;
79  setReconnectDelayStrategy(new FixedDelayStrategy(
80  (unsigned int)reconnectDelay_));
81  }
82 
83  void setReconnectDelayStrategy(const ReconnectDelayStrategy& strategy_)
84  {
85  _reconnectDelayStrategy = strategy_;
86  }
87 
88  ReconnectDelayStrategy getReconnectDelayStrategy(void) const
89  {
90  return _reconnectDelayStrategy;
91  }
92 
93  std::string getLogonOptions(void) const
94  {
95  return _logonOptions;
96  }
97 
98  void setLogonOptions(const std::string& logonOptions_)
99  {
100  _logonOptions = logonOptions_;
101  }
102 
103  void setLogonOptions(const char* logonOptions_)
104  {
105  _logonOptions = logonOptions_;
106  }
107 
108  ServerChooser getServerChooser() const
109  {
110  return _serverChooser;
111  }
112 
113  void setServerChooser(const ServerChooser& serverChooser_)
114  {
115  _serverChooser = serverChooser_;
116  }
117 
118  class HADisconnectHandler
119  {
120  public:
121  HADisconnectHandler() {}
122  static void invoke(Client& client, void* );
123 #ifdef AMPS_USE_FUNCTIONAL
124  void operator()(Client& client)
125  {
126  invoke(client, NULL);
127  }
128 #endif
129  };
130  void connectAndLogon()
131  {
132  Lock<Mutex> l(_connectAndLogonLock);
133  // AC-1030 In case this is called on a client after delay strategy caused a failure.
134  _reconnectDelayStrategy.reset();
135  try
136  {
137  while(true)
138  {
139  _disconnected = false;
140  connectAndLogonInternal();
141  try
142  {
143  // Resubscribe
144  if(_subscriptionManager)
145  {
146  Client c(this);
147  _subscriptionManager->resubscribe(c);
148  broadcastConnectionStateChanged(
149  ConnectionStateListener::Resubscribed);
150  }
151  return;
152  }
153  catch(const AMPSException& subEx)
154  {
155  // Keep receive thread from reconnecting
156  _disconnected = true;
157  _serverChooser.reportFailure(subEx, getConnectionInfo());
158  ClientImpl::setDisconnected();
159  }
160  }
161  }
162  catch(...)
163  {
164  // Failure, make sure we're disconnected
165  disconnect();
166  throw;
167  }
168  }
169 
170  virtual void connect(const std::string& /*uri*/)
171  {
172  connectAndLogon();
173  }
174 
175  virtual std::string logon(long /*timeout_*/, Authenticator& /*authenticator_*/,
176  const char* /*options_*/)
177  {
178  if (_disconnected) throw DisconnectedException("Attempt to call logon on a disconnected HAClient. Use connectAndLogon() instead.");
179  throw AlreadyConnectedException("Attempt to call logon on an HAClient. Use connectAndLogon() instead.");
180  }
181 
182  static void HADoNothingDisconnectHandler(amps_handle /*client*/,
183  void* /*data*/)
184  {
185  ;
186  }
187 
188  class DisconnectHandlerDisabler
189  {
190  public:
191  DisconnectHandlerDisabler()
192  : _pClient(NULL), _queueAckTimeout(0) { }
193  DisconnectHandlerDisabler(HAClientImpl* pClient_)
194  : _pClient(pClient_)
195  {
196  setHandler();
197  _queueAckTimeout = _pClient->getAckTimeout();
198  _pClient->setAckTimeout(0);
199  }
200  ~DisconnectHandlerDisabler() { clear(); }
201  void clear()
202  {
203  if(_pClient)
204  {
206  _pClient->getHandle(),
207  (amps_handler)ClientImpl::ClientImplDisconnectHandler,
208  _pClient);
209  if (_queueAckTimeout)
210  {
211  _pClient->setAckTimeout(_queueAckTimeout);
212  _queueAckTimeout = 0;
213  }
214  _pClient = NULL;
215  }
216  }
217  void setClient(HAClientImpl* pClient_)
218  {
219  if (!_pClient)
220  {
221  _pClient = pClient_;
222  setHandler();
223  _queueAckTimeout = _pClient->getAckTimeout();
224  _pClient->setAckTimeout(0);
225  amps_client_disconnect(_pClient->getHandle());
226  }
227  }
228  void setHandler()
229  {
231  _pClient->getHandle(),
232  (amps_handler)HAClientImpl::HADoNothingDisconnectHandler,
233  NULL);
234  }
235  private:
236  HAClientImpl* _pClient;
237  int _queueAckTimeout;
238  };
239 
240  void connectAndLogonInternal()
241  {
242  if(!_serverChooser.isValid())
243  {
244  throw ConnectionException("No server chooser registered with HAClient");
245  }
246  {
247  DisconnectHandlerDisabler disconnectDisabler;
248  while(!_disconnected)
249  {
250  std::string uri = _serverChooser.getCurrentURI();
251  if(uri.empty())
252  {
253  throw ConnectionException("No AMPS instances available for connection. " + _serverChooser.getError());
254  }
255  Authenticator& auth = _serverChooser.getCurrentAuthenticator();
256  // Begin locked section - see AC-1017
257  Lock<Mutex> l(_connectLock);
258  _sleepBeforeConnecting(uri);
259  try
260  {
261  // Check if another thread disconnected or already connected
262  if(_disconnected || _connected)
263  {
264  return;
265  }
266  // Temporarily unset the disconnect handler since we will loop
267  disconnectDisabler.setClient((HAClientImpl*)this);
268  // Connect and logon while holding the _lock
269  {
270  Lock<Mutex> clientLock(_lock);
271  ClientImpl::_connect(uri);
272  if (_logonOptions.empty())
273  {
274  ClientImpl::_logon(_timeout, auth);
275  }
276  else
277  {
278  ClientImpl::_logon(_timeout, auth, _logonOptions.c_str());
279  }
280  }
281  disconnectDisabler.clear();
282  try
283  {
284  _serverChooser.reportSuccess(getConnectionInfo());
285  _reconnectDelayStrategy.reset();
286  }
287  catch(const AMPSException& e)
288  {
289  ClientImpl::disconnect();
290  throw AMPSException(e);
291  }
292  break;
293  }
294  catch(const AMPSException& ex)
295  {
296  Unlock<Mutex> u(_connectLock);
297  ConnectionInfo ci = getConnectionInfo();
298  // Substitute the URI on the connection info with the one we attempted
299  ci["client.uri"] = uri;
300  _serverChooser.reportFailure(ex,ci);
301  try
302  {
303  ClientImpl::setDisconnected();
304  }
305  catch (const std::exception& e)
306  {
307  try
308  {
309  _exceptionListener->exceptionThrown(e);
310  } catch (...) { } // -V565
311  }
312  catch (...)
313  {
314  try
315  {
316  _exceptionListener->exceptionThrown(UnknownException("Unknown exception calling setDisconnected"));
317  } catch (...) { } // -V565
318  }
319  }
320  }
321  }
322  return;
323  }
324 
325  ConnectionInfo gatherConnectionInfo() const
326  {
327  return getConnectionInfo();
328  }
329 
330  ConnectionInfo getConnectionInfo() const
331  {
332  ConnectionInfo info = ClientImpl::getConnectionInfo();
333  std::ostringstream writer;
334 
335  writer << getReconnectDelay();
336  info["haClient.reconnectDelay"] = writer.str();
337  writer.clear(); writer.str("");
338  writer << _timeout;
339  info["haClient.timeout"] = writer.str();
340 
341  return info;
342  }
343 
344  bool disconnected() const
345  {
346  return _disconnected;
347  }
348  private:
349 
350  void disconnect()
351  {
352  {
353  Lock<Mutex> l(_connectLock);
354  _disconnected = true;
355  }
356  ClientImpl::disconnect();
357  }
358  void _millisleep(unsigned int millis_)
359  {
360 #ifdef _WIN32
361  Sleep(millis_);
362 #else
363  struct timespec ts;
364  ts.tv_sec = millis_ / 1000;
365  ts.tv_nsec = millis_ % 1000 * 1000000;
366  nanosleep(&ts, NULL);
367 #endif
368  }
369  void _sleepBeforeConnecting(const std::string& uri_)
370  {
371  try
372  {
373  _millisleep(
374  _reconnectDelayStrategy.getConnectWaitDuration(uri_));
375  }
376  catch(const ConnectionException&)
377  {
378  throw;
379  }
380  catch(const std::exception& ex_)
381  {
382  _exceptionListener->exceptionThrown(ex_);
383  throw ConnectionException(ex_.what());
384  }
385  catch(...)
386  {
387  throw ConnectionException("Unknown exception thrown by "
388  "the HAClient's delay strategy.");
389  }
390  }
391 
392  Mutex _connectLock;
393  Mutex _connectAndLogonLock;
394  int _timeout;
395  unsigned int _reconnectDelay;
396  ReconnectDelayStrategy _reconnectDelayStrategy;
397  ServerChooser _serverChooser;
398  volatile bool _disconnected;
399  std::string _logonOptions;
400 
401 }; // class HAClientImpl
402 
403 }// namespace AMPS
404 
405 #endif //_HACLIENTIMPL_H_
406 
void AMPSDLL amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
Provides AMPS::MemorySubscriptionManager, used by an AMPS::HAClient to resubmit subscriptions if conn...
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:179
Core type, function, and class declarations for the AMPS C++ client.
Provides AMPS::ReconnectDelayStrategy, called by an AMPS::HAClient to determine how long to wait betw...
void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
Provides AMPS::ServerChooser, the abstract base class that defines the interface that an AMPS::HAClie...
Definition: ampsplusplus.hpp:103