AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.4
SOWRecoveryPointAdapter.hpp
1 //
3 // Copyright (c) 2010-2023 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _SOWRECOVERYPOINTADAPTER_H_
27 #define _SOWRECOVERYPOINTADAPTER_H_
28 
29 #include <ampsplusplus.hpp>
30 #include <Field.hpp>
31 #include <RecoveryPoint.hpp>
32 #include <RecoveryPointAdapter.hpp>
33 #include <assert.h>
34 #include <memory>
35 #include <string>
36 
37 using std::string;
38 
39 #define AMPS_SOW_STORE_DEFAULT_TOPIC "/ADMIN/bookmark_store"
40 #define AMPS_SOW_STORE_DEFAULT_CLIENT_FIELD "clientName"
41 #define AMPS_SOW_STORE_DEFAULT_SUB_FIELD "subId"
42 #define AMPS_SOW_STORE_DEFAULT_BOOKMARK_FIELD "bookmark"
43 
44 #define SOW_RECOVERY_HANDLE_EXCEPTION(x) \
45  catch (const AMPSException& ex_) \
46  { \
47  std::ostringstream os; \
48  os << x << ": AMPSException " << ex_.what(); \
49  StoreException ex(os.str()); \
50  if (!_throwNotListen && _pExceptionListener) \
51  { \
52  _pExceptionListener->exceptionThrown(ex); \
53  } \
54  else throw ex; \
55  } \
56  catch (const std::exception& ex_) \
57  { \
58  std::ostringstream os; \
59  os << x << ": std::exception " << ex_.what(); \
60  StoreException ex(os.str()); \
61  if (!_throwNotListen && _pExceptionListener) \
62  { \
63  _pExceptionListener->exceptionThrown(ex); \
64  } \
65  else throw ex; \
66  } \
67  catch (...) \
68  { \
69  std::ostringstream os; \
70  os << x << ": Unknown exception"; \
71  StoreException ex(os.str()); \
72  if (!_throwNotListen && _pExceptionListener) \
73  { \
74  _pExceptionListener->exceptionThrown(ex); \
75  } \
76  else throw ex; \
77  }
78 
79 namespace AMPS
80 {
81 
85  {
86  public:
87 
119  SOWRecoveryPointAdapter(const Client& storeClient_,
120  const string& trackedClientName_,
121  unsigned timeoutMillis_ = 5000,
122  bool useTimestamp_ = false,
123  bool closeClient_ = true,
124  bool updateFailureThrows_ = false,
125  const string& topic_ = AMPS_SOW_STORE_DEFAULT_TOPIC,
126  const string& clientNameField_ = AMPS_SOW_STORE_DEFAULT_CLIENT_FIELD,
127  const string& subIdField_ = AMPS_SOW_STORE_DEFAULT_SUB_FIELD,
128  const string& bookmarkField_ = AMPS_SOW_STORE_DEFAULT_BOOKMARK_FIELD
129  )
131  , _serializeLen(0)
132  , _serializeBuffer(0)
133  , _deserializeLen(0)
134  , _deserializeBuffer(0)
135  , _client(storeClient_)
136  , _trackedName(trackedClientName_)
137  , _topic(topic_)
138  , _nameField(clientNameField_)
139  , _subIdField(subIdField_)
140  , _bookmarkField(bookmarkField_)
141  , _timeoutMillis(timeoutMillis_)
142  , _cmd("publish")
143  , _closeClient(closeClient_)
144  , _executed(false)
145  , _throwNotListen(updateFailureThrows_)
146  , _useTimestamp(useTimestamp_)
147  , _closed(false)
148  {
149  _initSerialization();
150  _cmd.setTopic(_topic);
151  }
152 
153  virtual ~SOWRecoveryPointAdapter()
154  {
155  _close();
156  delete[] _serializeBuffer;
157  delete[] _deserializeBuffer;
158  }
159 
164  virtual bool next(RecoveryPoint& current_)
165  {
166  static Field emptyField;
167  try
168  {
169  if (!_executed)
170  {
171  Command cmd("sow");
172  cmd.setTopic(_topic)
173  .setFilter("/" + _nameField + "='" + _trackedName + "'")
174  .setTimeout(_timeoutMillis);
175  if (_useTimestamp)
176  {
177  cmd.setOptions("select=[-/,+/" + _subIdField + ",+/"
178  + _bookmarkField + "],timestamp");
179  }
180  else
181  {
182  cmd.setOptions("select=[-/,+/" + _subIdField + ",+/"
183  + _bookmarkField + "]");
184  }
185  _stream = _client.execute(cmd).timeout(_timeoutMillis);
186  _msIter = _stream.begin();
187  _executed = true;
188  }
189  else
190  {
191  ++_msIter;
192  }
193  if (_msIter == MessageStream::iterator())
194  {
195  return false;
196  }
197  Message m = *_msIter;
198  if (!m.isValid())
199  {
200  current_ = RecoveryPoint(NULL);
201  return false;
202  }
203  if (m.getCommand() == "group_begin")
204  {
205  return next(current_);
206  }
207  else if (m.getCommand() == "sow")
208  {
209  if (_useTimestamp)
210  {
211  current_ = RecoveryPoint(deserialize(m.getData(),
212  m.getTimestamp()));
213  }
214  else
215  {
216  current_ = RecoveryPoint(deserialize(m.getData(),
217  emptyField));
218  }
219  return true;
220  }
221  else if (m.getCommand() == "group_end" || m.getCommand() == "ack")
222  {
223  current_ = RecoveryPoint(NULL);
224  _msIter = MessageStream::iterator();
225  _stream = MessageStream();
226  return false;
227  }
228  }
229  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::next")
230  return false;
231  }
232 
235  virtual void update(RecoveryPoint& recoveryPoint_)
236  {
237  try
238  {
239  Field data = serialize(recoveryPoint_);
240  _cmd.setData(data.data(), data.len());
241  _client.execute(_cmd);
242  }
243  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::update")
244  }
245 
247  virtual void purge()
248  {
249  try
250  {
251  Message m = _client.sowDelete(_topic, "/" + _nameField
252  + "='" + _trackedName + "'");
253  }
254  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::purge")
255  }
256 
259  virtual void purge(const Field& subId_)
260  {
261  try
262  {
263  Message m = _client.sowDelete(_topic, "/" + _nameField + "='"
264  + _trackedName + "' and /"
265  + _subIdField + "='"
266  + subId_ + "'");
267  }
268  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::purge(subId)")
269  }
270 
272  virtual void close()
273  {
274  _close();
275  }
276 
284  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
285  {
286  _pExceptionListener = pListener_;
287  }
288  protected:
289  void _close()
290  {
291  if (_closed || !_client.isValid())
292  {
293  return;
294  }
295  try
296  {
297  _client.publishFlush();
298  }
299  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::close publishFlush")
300  try
301  {
302  if (_closeClient)
303  {
304  _closed = true;
305  _client.disconnect();
306  _client = Client();
307  }
308  }
309  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::close disconnect")
310  }
311 
312  void _initSerialization()
313  {
314  try
315  {
316  // Set up json serialization
317  if (_serializeLen == 0)
318  {
319  _serializeLen = (size_t) (_nameField.length()
320  + _trackedName.length()
321  + _subIdField.length()
322  + _bookmarkField.length()
323  + (AMPS_MAX_BOOKMARK_LEN * 4UL)
324  + SUBID_LEN + JSON_EXTRA);
325  _serializeLen += (128 - (_serializeLen % 128));
326  }
327  _serializeBuffer = new char[_serializeLen];
328  AMPS_snprintf(_serializeBuffer, _serializeLen,
329  "{\"%s\":\"%s\",\"%s\":\"", _nameField.c_str()
330  , _trackedName.c_str()
331  , _subIdField.c_str());
332  _serializeStart = JSON_START + _nameField.length()
333  + _trackedName.length() + _subIdField.length();
334  }
335  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::initSerialization")
336  }
337 
338  // Subclasses can override this to set up for something other than json
339  // serialization if not using json.
340  virtual void initSerialization()
341  {
342  _initSerialization();
343  }
344 
345  // Subclasses can override this function if not using json data type.
346  // It needs to return an allocated RecoveryPointImpl based on the data
347  // field from a sow message that contains only 2 fields: _subIdField and
348  // _bookmarkField. If you'd like more, override begin()
349  virtual RecoveryPointImpl* deserialize(const Field& data_,
350  const Field& timestamp_)
351  {
352  Field subId;
353  Field bookmark;
354  try
355  {
356  // We have 2 fields subId and bookmark and we only need the
357  // values. Find : then start ", then end ".
358  const char* start = (const char*)memchr((const void*)data_.data(),
359  (int)':', data_.len());
360  if (!start)
361  {
362  throw StoreException("Failure parsing json RecoveryPoint subId, no :");
363  }
364  size_t remain = data_.len() - (size_t)(start - data_.data());
365  start = (const char*)memchr((const void*)start, (int)'"', remain);
366  if (!start)
367  {
368  throw StoreException("Failure parsing json RecoveryPoint subId, no start \"");
369  }
370  ++start;
371  remain = data_.len() - (size_t)(start - data_.data());
372  const char* end = (const char*)memchr((const void*)start,
373  (int)'"', remain);
374  if (!end)
375  {
376  throw StoreException("Failure parsing json RecoveryPoint subId, no end \"");
377  }
378  size_t len = (size_t)(end - start);
379  subId = Field(start, len);
380  start = (const char*)memchr((const void*)start, (int)':', data_.len());
381  if (!start)
382  {
383  throw StoreException("Failure parsing json RecoveryPoint bookmark, no :");
384  }
385  remain = data_.len() - (size_t)(start - data_.data());
386  start = (const char*)memchr((const void*)start, (int)'"', remain);
387  if (!start)
388  {
389  throw StoreException("Failure parsing json RecoveryPoint bookmark, no start \"");
390  }
391  ++start;
392  remain = data_.len() - (size_t)(start - data_.data());
393  end = (const char*)memchr((const void*)start, (int)'"', remain);
394  if (!end)
395  {
396  throw StoreException("Failure parsing json RecoveryPoint bookmark, no end \"");
397  }
398  len = (size_t)(end - start);
399  if (_useTimestamp && !timestamp_.empty())
400  {
401  if (_deserializeLen < len + timestamp_.len())
402  {
403  delete[] _deserializeBuffer;
404  _deserializeBuffer = 0;
405  }
406  if (!_deserializeBuffer)
407  {
408  _deserializeLen = len + timestamp_.len() + 1;
409  _deserializeBuffer = new char[_deserializeLen];
410  }
411  memcpy((void*)_deserializeBuffer, (const void*)start, len);
412  _deserializeBuffer[len] = ',';
413  memcpy((void*)(_deserializeBuffer + len + 1),
414  (const void*)timestamp_.data(), timestamp_.len());
415  bookmark = Field(_deserializeBuffer, _deserializeLen);
416  }
417  else
418  {
419  bookmark = Field(start, len);
420  }
421  }
422  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::deserialize")
423  // Return a recovery point that will copy current field values and
424  // clear them when destructed.
425  return new FixedRecoveryPoint(subId, bookmark, true);
426  }
427 
428  virtual Field& serialize(const RecoveryPoint& recoveryPoint_)
429  {
430  try
431  {
432  Field subId = recoveryPoint_.getSubId();
433  Field bookmark = recoveryPoint_.getBookmark();
434  size_t fullLen = _serializeStart + subId.len()
435  + _bookmarkField.length() + bookmark.len() + JSON_END;
436  if (fullLen >= _serializeLen)
437  {
438  _serializeLen = fullLen + (128 - (fullLen % 128));
439  delete[] _serializeBuffer;
440  // This will reallocate the buffer and fill with predicate
441  initSerialization();
442  }
443  AMPS_snprintf(_serializeBuffer + _serializeStart,
444  _serializeLen - _serializeStart,
445  "%.*s\",\"%s\":\"%.*s\"}", (int)subId.len()
446  , subId.data()
447  , _bookmarkField.c_str()
448  , (int)bookmark.len()
449  , bookmark.data());
450  _serializeField.assign(_serializeBuffer, fullLen);
451  }
452  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::serialize")
453  return _serializeField;
454  }
455 
456  enum Constants : size_t
457  {
458  JSON_START = 11, // '{', 7 '"', 2 ':', 1 ','
459  JSON_END = 8, // '}', 5 '"', 1 ':', 1 ','
460  JSON_EXTRA = 19, // '{', '}', 3 ':', 12 '"', 2 ','
461  SUBID_LEN = 64 // rough guess on typical max len
462  };
463 
464  private:
465  size_t _serializeLen;
466  size_t _serializeStart;
467  Field _serializeField;
468  char* _serializeBuffer;
469  size_t _deserializeLen;
470  char* _deserializeBuffer;
471  Client _client;
472  std::string _trackedName;
473  std::string _topic;
474  std::string _nameField;
475  std::string _subIdField;
476  std::string _bookmarkField;
477  unsigned _timeoutMillis;
478  Command _cmd;
479  MessageStream _stream;
480  MessageStream::iterator _msIter;
481  std::shared_ptr<const ExceptionListener> _pExceptionListener;
482  bool _closeClient;
483  bool _executed;
484  bool _throwNotListen;
485  bool _useTimestamp;
486  bool _closed;
487  };
488 } // namespace AMPS
489 #endif //_SOWRECOVERYPOINTADAPTER_H_
490 
Command & setOptions(const std::string &options_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:633
virtual void update(RecoveryPoint &recoveryPoint_)
Update the storage information with the given recovery point.
Definition: SOWRecoveryPointAdapter.hpp:235
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:511
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:4735
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:259
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1141
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:8400
RecoveryPointAdapter virtual base class for implementing external storage of subscription recovery po...
Definition: SOWRecoveryPointAdapter.hpp:84
Command & setData(const std::string &data_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:666
Command & setFilter(const std::string &filter_)
Definition: ampsplusplus.hpp:587
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:4814
Provides access to the subId and bookmark needed to restart a subscription.
Definition: RecoveryPoint.hpp:67
RecoveryPointImpl virtual base class provides access to the subId and bookmark needed to restart a su...
Definition: RecoveryPoint.hpp:49
Command & setTimeout(unsigned timeout_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:688
virtual void purge()
Remove all data from the storage.
Definition: SOWRecoveryPointAdapter.hpp:247
virtual bool next(RecoveryPoint &current_)
Recovery is done by iteration over elements in storage.
Definition: SOWRecoveryPointAdapter.hpp:164
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:127
Defines the AMPS::Field class, which represents the value of a field in a message.
Core type, function, and class declarations for the AMPS C++ client.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:266
Provides AMPS::RecoveryPointAdapter, an iterface for implementing external storage of bookmark subscr...
virtual void purge(const Field &subId_)
Remove the specified subId_ from the storage.
Definition: SOWRecoveryPointAdapter.hpp:259
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Set an exception listener on this adapter that will be notified of all exceptions that occur rather t...
Definition: SOWRecoveryPointAdapter.hpp:284
SOWRecoveryPointAdapter(const Client &storeClient_, const string &trackedClientName_, unsigned timeoutMillis_=5000, bool useTimestamp_=false, bool closeClient_=true, bool updateFailureThrows_=false, const string &topic_=AMPS_SOW_STORE_DEFAULT_TOPIC, const string &clientNameField_=AMPS_SOW_STORE_DEFAULT_CLIENT_FIELD, const string &subIdField_=AMPS_SOW_STORE_DEFAULT_SUB_FIELD, const string &bookmarkField_=AMPS_SOW_STORE_DEFAULT_BOOKMARK_FIELD)
Create a SOWRecoveryPointAdapter for a BookmarkStore that writes updated RecoveryPoints to the server...
Definition: SOWRecoveryPointAdapter.hpp:119
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:4690
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:5365
virtual void close()
Take any necessary actions to close the associated storage.
Definition: SOWRecoveryPointAdapter.hpp:272
Field represents the value of a single field in a Message.
Definition: Field.hpp:85
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4682
Field getTimestamp() const
Retrieves the value of the Timestamp header of the Message as a new Field.
Definition: Message.hpp:1376
Field getData() const
Returns the data from this message.
Definition: Message.hpp:1406
FixedRecoveryPoint is a RecoveryPoint implementation where subId and bookmark are set explicitly...
Definition: RecoveryPoint.hpp:133
Command & setTopic(const std::string &topic_)
Definition: ampsplusplus.hpp:581
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:5001
RecoveryPointAdapterImpl virtual base class for implementing external storage of subscription recover...
Definition: RecoveryPointAdapter.hpp:49
Definition: ampsplusplus.hpp:103
const Field & getSubId() const
Get the sub id for this recovery point.
Definition: RecoveryPoint.hpp:84
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:8371
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:438
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6401
const Field & getBookmark() const
Get the bookmark for this recovery point.
Definition: RecoveryPoint.hpp:91