AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.1
SOWRecoveryPointAdapter.hpp
1 //
3 // Copyright (c) 2010-2021 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _SOWRECOVERYPOINTADAPTER_H_
27 #define _SOWRECOVERYPOINTADAPTER_H_
28 
29 #include <ampsplusplus.hpp>
30 #include <Field.hpp>
31 #include <RecoveryPoint.hpp>
32 #include <RecoveryPointAdapter.hpp>
33 #include <memory>
34 #include <string>
35 
36 using std::string;
37 
38 #define AMPS_SOW_STORE_DEFAULT_TOPIC "/ADMIN/bookmark_store"
39 #define AMPS_SOW_STORE_DEFAULT_CLIENT_FIELD "clientName"
40 #define AMPS_SOW_STORE_DEFAULT_SUB_FIELD "subId"
41 #define AMPS_SOW_STORE_DEFAULT_BOOKMARK_FIELD "bookmark"
42 
43 #define SOW_RECOVERY_HANDLE_EXCEPTION(x) \
44 catch (const AMPSException& ex_) \
45 { \
46  std::ostringstream os; \
47  os << x << ": AMPSException " << ex_.what(); \
48  StoreException ex(os.str()); \
49  if (!_throwNotListen && _pExceptionListener) \
50  { \
51  _pExceptionListener->exceptionThrown(ex); \
52  } \
53  else throw ex; \
54 } \
55 catch (const std::exception& ex_) \
56 { \
57  std::ostringstream os; \
58  os << x << ": std::exception " << ex_.what(); \
59  StoreException ex(os.str()); \
60  if (!_throwNotListen && _pExceptionListener) \
61  { \
62  _pExceptionListener->exceptionThrown(ex); \
63  } \
64  else throw ex; \
65 } \
66 catch (...) \
67 { \
68  std::ostringstream os; \
69  os << x << ": Unknown exception"; \
70  StoreException ex(os.str()); \
71  if (!_throwNotListen && _pExceptionListener) \
72  { \
73  _pExceptionListener->exceptionThrown(ex); \
74  } \
75  else throw ex; \
76 }
77 
78 namespace AMPS
79 {
80 
84 {
85 public:
86 
118  SOWRecoveryPointAdapter(const Client& storeClient_,
119  const string& trackedClientName_,
120  unsigned timeoutMillis_ = 5000,
121  bool useTimestamp_ = false,
122  bool closeClient_ = true,
123  bool updateFailureThrows_ = false,
124  const string& topic_ = AMPS_SOW_STORE_DEFAULT_TOPIC,
125  const string& clientNameField_ = AMPS_SOW_STORE_DEFAULT_CLIENT_FIELD,
126  const string& subIdField_ = AMPS_SOW_STORE_DEFAULT_SUB_FIELD,
127  const string& bookmarkField_ = AMPS_SOW_STORE_DEFAULT_BOOKMARK_FIELD
128  )
130  , _serializeLen(0)
131  , _serializeBuffer(0)
132  , _deserializeLen(0)
133  , _deserializeBuffer(0)
134  , _client(storeClient_)
135  , _trackedName(trackedClientName_)
136  , _topic(topic_)
137  , _nameField(clientNameField_)
138  , _subIdField(subIdField_)
139  , _bookmarkField(bookmarkField_)
140  , _timeoutMillis(timeoutMillis_)
141  , _cmd("publish")
142  , _closeClient(closeClient_)
143  , _executed(false)
144  , _throwNotListen(updateFailureThrows_)
145  , _useTimestamp(useTimestamp_)
146  , _closed(false)
147  {
148  _initSerialization();
149  _cmd.setTopic(_topic);
150  }
151 
152  virtual ~SOWRecoveryPointAdapter()
153  {
154  _close();
155  delete[] _serializeBuffer;
156  delete[] _deserializeBuffer;
157  }
158 
163  virtual bool next(RecoveryPoint& current_)
164  {
165  static Field emptyField;
166  try
167  {
168  if (!_executed)
169  {
170  Command cmd("sow");
171  cmd.setTopic(_topic)
172  .setFilter("/" + _nameField + "='" + _trackedName + "'")
173  .setTimeout(_timeoutMillis);
174  if (_useTimestamp)
175  {
176  cmd.setOptions("select=[-/,+/" + _subIdField + ",+/"
177  + _bookmarkField + "],timestamp");
178  }
179  else
180  {
181  cmd.setOptions("select=[-/,+/" + _subIdField + ",+/"
182  + _bookmarkField + "]");
183  }
184  _stream = _client.execute(cmd).timeout(_timeoutMillis);
185  _msIter = _stream.begin();
186  _executed = true;
187  }
188  else
189  {
190  ++_msIter;
191  }
192  if (_msIter == MessageStream::iterator())
193  {
194  return false;
195  }
196  Message m = *_msIter;
197  if (!m.isValid())
198  {
199  current_ = RecoveryPoint(NULL);
200  return false;
201  }
202  if (m.getCommand() == "group_begin")
203  {
204  return next(current_);
205  }
206  else if (m.getCommand() == "sow")
207  {
208  if (_useTimestamp)
209  {
210  current_ = RecoveryPoint(deserialize(m.getData(),
211  m.getTimestamp()));
212  }
213  else
214  {
215  current_ = RecoveryPoint(deserialize(m.getData(),
216  emptyField));
217  }
218  return true;
219  }
220  else if (m.getCommand() == "group_end" || m.getCommand() == "ack")
221  {
222  current_ = RecoveryPoint(NULL);
223  _msIter = MessageStream::iterator();
224  _stream = MessageStream();
225  return false;
226  }
227  }
228  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::next")
229  return false;
230  }
231 
234  virtual void update(RecoveryPoint& recoveryPoint_)
235  {
236  try
237  {
238  Field data = serialize(recoveryPoint_);
239  _cmd.setData(data.data(), data.len());
240  _client.execute(_cmd);
241  }
242  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::update")
243  }
244 
246  virtual void purge()
247  {
248  try
249  {
250  Message m = _client.sowDelete(_topic, "/" + _nameField
251  + "='" + _trackedName + "'");
252  }
253  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::purge")
254  }
255 
258  virtual void purge(const Field& subId_)
259  {
260  try
261  {
262  Message m =_client.sowDelete(_topic, "/" + _nameField + "='"
263  + _trackedName + "' and /"
264  + _subIdField + "='"
265  + subId_ + "'");
266  }
267  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::purge(subId)")
268  }
269 
271  virtual void close()
272  {
273  _close();
274  }
275 
283  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
284  {
285  _pExceptionListener = pListener_;
286  }
287 protected:
288  void _close()
289  {
290  if (_closed || !_client.isValid()) return;
291  try
292  {
293  _client.publishFlush();
294  }
295  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::close publishFlush")
296  try
297  {
298  if (_closeClient)
299  {
300  _closed = true;
301  _client.disconnect();
302  _client = Client();
303  }
304  }
305  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::close disconnect")
306  }
307 
308  void _initSerialization()
309  {
310  try
311  {
312  // Set up json serialization
313  if (_serializeLen == 0)
314  {
315  _serializeLen = (size_t) (_nameField.length()
316  + _trackedName.length()
317  + _subIdField.length()
318  + _bookmarkField.length()
319  + (AMPS_MAX_BOOKMARK_LEN * 4UL)
320  + SUBID_LEN + JSON_EXTRA);
321  _serializeLen += (128-(_serializeLen % 128));
322  }
323  _serializeBuffer = new char[_serializeLen];
324  AMPS_snprintf(_serializeBuffer, _serializeLen,
325  "{\"%s\":\"%s\",\"%s\":\"", _nameField.c_str()
326  , _trackedName.c_str()
327  , _subIdField.c_str());
328  _serializeStart = JSON_START + _nameField.length()
329  + _trackedName.length() + _subIdField.length();
330  }
331  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::initSerialization")
332  }
333 
334  // Subclasses can override this to set up for something other than json
335  // serialization if not using json.
336  virtual void initSerialization()
337  {
338  _initSerialization();
339  }
340 
341  // Subclasses can override this function if not using json data type.
342  // It needs to return an allocated RecoveryPointImpl based on the data
343  // field from a sow message that contains only 2 fields: _subIdField and
344  // _bookmarkField. If you'd like more, override begin()
345  virtual RecoveryPointImpl* deserialize(const Field& data_,
346  const Field& timestamp_)
347  {
348  Field subId;
349  Field bookmark;
350  try
351  {
352  // We have 2 fields subId and bookmark and we only need the
353  // values. Find : then start ", then end ".
354  const char* start = (const char*)memchr((const void*)data_.data(),
355  (int)':', data_.len());
356  size_t remain = data_.len() - (size_t)(start - data_.data());
357  start = (const char*)memchr((const void*)start, (int)'"', remain);
358  ++start;
359  remain = data_.len() - (size_t)(start - data_.data());
360  const char* end = (const char*)memchr((const void*)start,
361  (int)'"', remain);
362  size_t len = (size_t)(end - start);
363  subId = Field(start, len);
364  start = (const char*)memchr((const void*)start, (int)':', data_.len());
365  remain = data_.len() - (size_t)(start - data_.data());
366  start = (const char*)memchr((const void*)start, (int)'"', remain);
367  ++start;
368  remain = data_.len() - (size_t)(start - data_.data());
369  end = (const char*)memchr((const void*)start, (int)'"', remain);
370  len = (size_t)(end - start);
371  if (_useTimestamp && !timestamp_.empty())
372  {
373  if (_deserializeLen < len + timestamp_.len())
374  {
375  delete[] _deserializeBuffer;
376  _deserializeBuffer = 0;
377  }
378  if (!_deserializeBuffer)
379  {
380  _deserializeLen = len + timestamp_.len() + 1;
381  _deserializeBuffer = new char[_deserializeLen];
382  }
383  memcpy((void*)_deserializeBuffer, (const void*)start, len);
384  _deserializeBuffer[len] = ',';
385  memcpy((void*)(_deserializeBuffer+len+1),
386  (const void*)timestamp_.data(), timestamp_.len());
387  bookmark = Field(_deserializeBuffer, _deserializeLen);
388  }
389  else
390  {
391  bookmark = Field(start, len);
392  }
393  }
394  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::deserialize")
395  // Return a recovery point that will copy current field values and
396  // clear them when destructed.
397  return new FixedRecoveryPoint(subId, bookmark, true);
398  }
399 
400  virtual Field& serialize(const RecoveryPoint& recoveryPoint_)
401  {
402  try
403  {
404  Field subId = recoveryPoint_.getSubId();
405  Field bookmark = recoveryPoint_.getBookmark();
406  size_t fullLen = _serializeStart + subId.len()
407  + _bookmarkField.length() + bookmark.len() + JSON_END;
408  if (fullLen >= _serializeLen)
409  {
410  _serializeLen = fullLen + (128-(fullLen % 128));
411  delete[] _serializeBuffer;
412  // This will reallocate the buffer and fill with predicate
413  initSerialization();
414  }
415  AMPS_snprintf(_serializeBuffer + _serializeStart,
416  _serializeLen - _serializeStart,
417  "%.*s\",\"%s\":\"%.*s\"}", (int)subId.len()
418  , subId.data()
419  , _bookmarkField.c_str()
420  , (int)bookmark.len()
421  , bookmark.data());
422  _serializeField.assign(_serializeBuffer, fullLen);
423  }
424  SOW_RECOVERY_HANDLE_EXCEPTION("SOWRecoveryPoint::serialize")
425  return _serializeField;
426  }
427 
428  enum Constants : size_t
429  {
430  JSON_START = 11, // '{', 7 '"', 2 ':', 1 ','
431  JSON_END = 8, // '}', 5 '"', 1 ':', 1 ','
432  JSON_EXTRA = 19, // '{', '}', 3 ':', 12 '"', 2 ','
433  SUBID_LEN = 64 // rough guess on typical max len
434  };
435 
436 private:
437  size_t _serializeLen;
438  size_t _serializeStart;
439  Field _serializeField;
440  char* _serializeBuffer;
441  size_t _deserializeLen;
442  char* _deserializeBuffer;
443  Client _client;
444  std::string _trackedName;
445  std::string _topic;
446  std::string _nameField;
447  std::string _subIdField;
448  std::string _bookmarkField;
449  unsigned _timeoutMillis;
450  Command _cmd;
451  MessageStream _stream;
452  MessageStream::iterator _msIter;
453  std::shared_ptr<const ExceptionListener> _pExceptionListener;
454  bool _closeClient;
455  bool _executed;
456  bool _throwNotListen;
457  bool _useTimestamp;
458  bool _closed;
459 };
460 } // namespace AMPS
461 #endif //_SOWRECOVERYPOINTADAPTER_H_
462 
Command & setFilter(const std::string &v_)
Definition: ampsplusplus.hpp:540
virtual void update(RecoveryPoint &recoveryPoint_)
Update the storage information with the given recovery point.
Definition: SOWRecoveryPointAdapter.hpp:234
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:501
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:4173
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1035
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:7613
RecoveryPointAdapter virtual base class for implementing external storage of subscription recovery po...
Definition: SOWRecoveryPointAdapter.hpp:83
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:4249
Provides access to the subId and bookmark needed to restart a subscription.
Definition: RecoveryPoint.hpp:67
RecoveryPointImpl virtual base class provides access to the subId and bookmark needed to restart a su...
Definition: RecoveryPoint.hpp:49
Command & setTopic(const std::string &v_)
Definition: ampsplusplus.hpp:538
virtual void purge()
Remove all data from the storage.
Definition: SOWRecoveryPointAdapter.hpp:246
virtual bool next(RecoveryPoint &current_)
Recovery is done by iteration over elements in storage.
Definition: SOWRecoveryPointAdapter.hpp:163
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:126
Defines the AMPS::Field class, which represents the value of a field in a message.
Core type, function, and class declarations for the AMPS C++ client.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
Command & setTimeout(unsigned v_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:588
Provides AMPS::RecoveryPointAdapter, an iterface for impplementing external storage of bookmark subsc...
virtual void purge(const Field &subId_)
Remove the specified subId_ from the storage.
Definition: SOWRecoveryPointAdapter.hpp:258
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Set an exception listener on this adapter that will be notified of all exceptions that occur rather t...
Definition: SOWRecoveryPointAdapter.hpp:283
SOWRecoveryPointAdapter(const Client &storeClient_, const string &trackedClientName_, unsigned timeoutMillis_=5000, bool useTimestamp_=false, bool closeClient_=true, bool updateFailureThrows_=false, const string &topic_=AMPS_SOW_STORE_DEFAULT_TOPIC, const string &clientNameField_=AMPS_SOW_STORE_DEFAULT_CLIENT_FIELD, const string &subIdField_=AMPS_SOW_STORE_DEFAULT_SUB_FIELD, const string &bookmarkField_=AMPS_SOW_STORE_DEFAULT_BOOKMARK_FIELD)
Create a SOWRecoveryPointAdapter for a BookmarkStore that writes updated RecoveryPoints to the server...
Definition: SOWRecoveryPointAdapter.hpp:118
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:4140
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:4792
virtual void close()
Take any necessary actions to close the associated storage.
Definition: SOWRecoveryPointAdapter.hpp:271
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4132
Field getTimestamp() const
Retrieves the value of the Timestamp header of the Message as a new Field.
Definition: Message.hpp:1238
Field getData() const
Returns the data from this message.
Definition: Message.hpp:1268
FixedRecoveryPoint is a RecoveryPoint implementation where subId and bookmark are set explicitly...
Definition: RecoveryPoint.hpp:133
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:4428
Command & setData(const std::string &v_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:574
Command & setOptions(const std::string &v_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:560
RecoveryPointAdapterImpl virtual base class for implementing external storage of subscription recover...
Definition: RecoveryPointAdapter.hpp:49
Definition: ampsplusplus.hpp:103
const Field & getSubId() const
Get the sub id for this recovery point.
Definition: RecoveryPoint.hpp:84
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:7584
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:407
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:5796
const Field & getBookmark() const
Get the bookmark for this recovery point.
Definition: RecoveryPoint.hpp:91