AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.4
HAClientImpl.hpp
1 //
3 // Copyright (c) 2010-2023 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _HACLIENTIMPL_H_
27 #define _HACLIENTIMPL_H_
28 
29 #include <typeinfo>
30 #include <ampsplusplus.hpp>
31 #include <ServerChooser.hpp>
34 
35 namespace AMPS
36 {
37 
38  class HAClientImpl : public ClientImpl
39  {
40  public:
41  HAClientImpl(const std::string& name_)
42  : ClientImpl(name_), _timeout(AMPS_HACLIENT_TIMEOUT_DEFAULT)
43  , _reconnectDelay(AMPS_HACLIENT_RECONNECT_DEFAULT)
44  , _reconnectDelayStrategy(new ExponentialDelayStrategy(_reconnectDelay))
45  , _disconnected(false)
46  {
47 #ifdef AMPS_USE_FUNCTIONAL
48  setDisconnectHandler(HADisconnectHandler());
49 #else
50  setDisconnectHandler(AMPS::DisconnectHandler(&HADisconnectHandler::invoke, NULL));
51 #endif
52  setSubscriptionManager(new MemorySubscriptionManager());
53  }
54 
55  ~HAClientImpl()
56  {
57  _disconnected = true;
58  _cleanup();
59  }
60 
61  void setTimeout(int timeout_)
62  {
63  _timeout = timeout_;
64  }
65 
66  int getTimeout() const
67  {
68  return _timeout;
69  }
70 
71  unsigned int getReconnectDelay(void) const
72  {
73  return _reconnectDelay;
74  }
75 
76  void setReconnectDelay(unsigned int reconnectDelay_)
77  {
78  _reconnectDelay = reconnectDelay_;
79  setReconnectDelayStrategy(new FixedDelayStrategy(
80  (unsigned int)reconnectDelay_));
81  }
82 
83  void setReconnectDelayStrategy(const ReconnectDelayStrategy& strategy_)
84  {
85  _reconnectDelayStrategy = strategy_;
86  }
87 
88  ReconnectDelayStrategy getReconnectDelayStrategy(void) const
89  {
90  return _reconnectDelayStrategy;
91  }
92 
93  std::string getLogonOptions(void) const
94  {
95  return _logonOptions;
96  }
97 
98  void setLogonOptions(const std::string& logonOptions_)
99  {
100  _logonOptions = logonOptions_;
101  }
102 
103  void setLogonOptions(const char* logonOptions_)
104  {
105  _logonOptions = logonOptions_;
106  }
107 
108  ServerChooser getServerChooser() const
109  {
110  return _serverChooser;
111  }
112 
113  void setServerChooser(const ServerChooser& serverChooser_)
114  {
115  _serverChooser = serverChooser_;
116  }
117 
118  class HADisconnectHandler
119  {
120  public:
121  HADisconnectHandler() {}
122  static void invoke(Client& client, void* );
123 #ifdef AMPS_USE_FUNCTIONAL
124  void operator()(Client& client)
125  {
126  invoke(client, NULL);
127  }
128 #endif
129  };
130  void connectAndLogon()
131  {
132  Lock<Mutex> l(_connectAndLogonLock);
133  // AC-1030 In case this is called on a client after delay strategy caused a failure.
134  _reconnectDelayStrategy.reset();
135  try
136  {
137  while (true)
138  {
139  _disconnected = false;
140  connectAndLogonInternal();
141  try
142  {
143  // Resubscribe
144  if (_subscriptionManager)
145  {
146  Client c(this);
147  _subscriptionManager->resubscribe(c);
148  broadcastConnectionStateChanged(
149  ConnectionStateListener::Resubscribed);
150  }
151  return;
152  }
153  catch (const AMPSException& subEx)
154  {
155  // Keep receive thread from reconnecting
156  _disconnected = true;
157  _serverChooser.reportFailure(subEx, getConnectionInfo());
158  ClientImpl::setDisconnected();
159  }
160  }
161  }
162  catch (...)
163  {
164  // Failure, make sure we're disconnected
165  disconnect();
166  throw;
167  }
168  }
169 
170  virtual void connect(const std::string& /*uri*/)
171  {
172  connectAndLogon();
173  }
174 
175  virtual std::string logon(long /*timeout_*/, Authenticator& /*authenticator_*/,
176  const char* /*options_*/)
177  {
178  if (_disconnected)
179  {
180  throw DisconnectedException("Attempt to call logon on a disconnected HAClient. Use connectAndLogon() instead.");
181  }
182  throw AlreadyConnectedException("Attempt to call logon on an HAClient. Use connectAndLogon() instead.");
183  }
184 
185  static void HADoNothingDisconnectHandler(amps_handle /*client*/,
186  void* /*data*/)
187  {
188  ;
189  }
190 
191  class DisconnectHandlerDisabler
192  {
193  public:
194  DisconnectHandlerDisabler()
195  : _pClient(NULL), _queueAckTimeout(0) { }
196  DisconnectHandlerDisabler(HAClientImpl* pClient_)
197  : _pClient(pClient_)
198  {
199  setHandler();
200  _queueAckTimeout = _pClient->getAckTimeout();
201  _pClient->setAckTimeout(0);
202  }
203  ~DisconnectHandlerDisabler()
204  {
205  clear();
206  }
207  void clear()
208  {
209  if (_pClient)
210  {
212  _pClient->getHandle(),
213  (amps_handler)ClientImpl::ClientImplDisconnectHandler,
214  _pClient);
215  if (_queueAckTimeout)
216  {
217  _pClient->setAckTimeout(_queueAckTimeout);
218  _queueAckTimeout = 0;
219  }
220  _pClient = NULL;
221  }
222  }
223  void setClient(HAClientImpl* pClient_)
224  {
225  if (!_pClient)
226  {
227  _pClient = pClient_;
228  setHandler();
229  _queueAckTimeout = _pClient->getAckTimeout();
230  _pClient->setAckTimeout(0);
231  amps_client_disconnect(_pClient->getHandle());
232  }
233  }
234  void setHandler()
235  {
237  _pClient->getHandle(),
238  (amps_handler)HAClientImpl::HADoNothingDisconnectHandler,
239  _pClient);
240  }
241  private:
242  HAClientImpl* _pClient;
243  int _queueAckTimeout;
244  };
245 
246  void connectAndLogonInternal()
247  {
248  if (!_serverChooser.isValid())
249  {
250  throw ConnectionException("No server chooser registered with HAClient");
251  }
252  {
253  DisconnectHandlerDisabler disconnectDisabler;
254  while (!_disconnected)
255  {
256  std::string uri = _serverChooser.getCurrentURI();
257  if (uri.empty())
258  {
259  throw ConnectionException("No AMPS instances available for connection. " + _serverChooser.getError());
260  }
261  Authenticator& auth = _serverChooser.getCurrentAuthenticator();
262  // Begin locked section - see AC-1017
263  Lock<Mutex> l(_connectLock);
264  _sleepBeforeConnecting(uri);
265  try
266  {
267  // Check if another thread disconnected or already connected
268  if (_disconnected || _connected)
269  {
270  return;
271  }
272  // Temporarily unset the disconnect handler since we will loop
273  disconnectDisabler.setClient((HAClientImpl*)this);
274  // Connect and logon while holding the _lock
275  {
276  Lock<Mutex> clientLock(_lock);
277  ClientImpl::_connect(uri);
278  if (_logonOptions.empty())
279  {
280  ClientImpl::_logon(_timeout, auth);
281  }
282  else
283  {
284  ClientImpl::_logon(_timeout, auth, _logonOptions.c_str());
285  }
286  }
287  disconnectDisabler.clear();
288  try
289  {
290  _serverChooser.reportSuccess(getConnectionInfo());
291  _reconnectDelayStrategy.reset();
292  }
293  catch (const AMPSException& e)
294  {
295  ClientImpl::disconnect();
296  throw AMPSException(e);
297  }
298  break;
299  }
300  catch (const AMPSException& ex)
301  {
302  Unlock<Mutex> u(_connectLock);
303  ConnectionInfo ci = getConnectionInfo();
304  // Substitute the URI on the connection info with the one we attempted
305  ci["client.uri"] = uri;
306  _serverChooser.reportFailure(ex, ci);
307  try
308  {
309  ClientImpl::setDisconnected();
310  }
311  catch (const std::exception& e)
312  {
313  try
314  {
315  _exceptionListener->exceptionThrown(e);
316  }
317  catch (...) { } // -V565
318  }
319  catch (...)
320  {
321  try
322  {
323  _exceptionListener->exceptionThrown(UnknownException("Unknown exception calling setDisconnected"));
324  }
325  catch (...) { } // -V565
326  }
327  }
328  }
329  }
330  return;
331  }
332 
333  ConnectionInfo gatherConnectionInfo() const
334  {
335  return getConnectionInfo();
336  }
337 
338  ConnectionInfo getConnectionInfo() const
339  {
340  ConnectionInfo info = ClientImpl::getConnectionInfo();
341  std::ostringstream writer;
342 
343  writer << getReconnectDelay();
344  info["haClient.reconnectDelay"] = writer.str();
345  writer.clear(); writer.str("");
346  writer << _timeout;
347  info["haClient.timeout"] = writer.str();
348 
349  return info;
350  }
351 
352  bool disconnected() const
353  {
354  return _disconnected;
355  }
356  private:
357 
358  void disconnect()
359  {
360  {
361  Lock<Mutex> l(_connectLock);
362  _disconnected = true;
363  }
364  ClientImpl::disconnect();
365  }
366  void _millisleep(unsigned int millis_)
367  {
368  if (millis_ == 0)
369  {
370  return;
371  }
372  double waitTime = (double)millis_;
373  Timer timer(waitTime);
374  timer.start();
375  while (!timer.checkAndGetRemaining(&waitTime))
376  {
377  if (waitTime - 1000.0 > 0.0)
378  {
379  AMPS_USLEEP(1000000);
380  }
381  else
382  {
383  AMPS_USLEEP(1000UL * (unsigned int)waitTime);
384  }
385  amps_invoke_waiting_function();
386  }
387  }
388  void _sleepBeforeConnecting(const std::string& uri_)
389  {
390  try
391  {
392  _millisleep(
393  _reconnectDelayStrategy.getConnectWaitDuration(uri_));
394  }
395  catch (const ConnectionException&)
396  {
397  throw;
398  }
399  catch (const std::exception& ex_)
400  {
401  _exceptionListener->exceptionThrown(ex_);
402  throw ConnectionException(ex_.what());
403  }
404  catch (...)
405  {
406  throw ConnectionException("Unknown exception thrown by "
407  "the HAClient's delay strategy.");
408  }
409  }
410 
411  Mutex _connectLock;
412  Mutex _connectAndLogonLock;
413  int _timeout;
414  unsigned int _reconnectDelay;
415  ReconnectDelayStrategy _reconnectDelayStrategy;
416  ServerChooser _serverChooser;
417  volatile bool _disconnected;
418  std::string _logonOptions;
419 
420  }; // class HAClientImpl
421 
422 }// namespace AMPS
423 
424 #endif //_HACLIENTIMPL_H_
425 
AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
Provides AMPS::MemorySubscriptionManager, used by an AMPS::HAClient to resubmit subscriptions if conn...
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:196
Core type, function, and class declarations for the AMPS C++ client.
Provides AMPS::ReconnectDelayStrategy, called by an AMPS::HAClient to determine how long to wait betw...
Provides AMPS::ServerChooser, the abstract base class that defines the interface that an AMPS::HAClie...
Definition: ampsplusplus.hpp:103