LogDevice API
BufferedWriter.h
Go to the documentation of this file.
1 
8 #pragma once
9 
10 #include <chrono>
11 #include <memory>
12 #include <tuple>
13 #include <vector>
14 
15 #include "logdevice/include/Client.h"
16 #include "logdevice/include/types.h"
17 
18 namespace facebook { namespace logdevice {
19 
60 class BufferedWriterImpl;
61 
63  public:
69  public:
70  using Context = void*;
71  using ContextSet = std::vector<std::pair<Context, std::string>>;
72 
81  virtual void onSuccess(logid_t /*log_id*/,
82  ContextSet /*contexts_and_payloads*/,
83  const DataRecordAttributes& /*attrs*/) {}
92  virtual void onFailure(logid_t /*log_id*/,
93  ContextSet /*contexts_and_payloads*/,
94  Status /*status*/) {}
95 
96  enum class RetryDecision { ALLOW, DENY };
97 
106  virtual RetryDecision onRetry(logid_t /*log_id*/,
107  const ContextSet& /*contexts_and_payloads*/,
108  Status /*status*/) {
109  return RetryDecision::ALLOW;
110  }
111 
112  virtual ~AppendCallback() {}
113  };
114  struct Append {
115  Append(logid_t log_id,
116  std::string payload,
117  AppendCallback::Context context,
118  AppendAttributes attrs = AppendAttributes())
119  : log_id(log_id),
120  payload(std::move(payload)),
121  context(context),
122  attrs(std::move(attrs)) {}
123 
124  Append(const Append&) = delete;
125  Append(Append&&) = default;
126  Append& operator=(Append&&) = default;
127  logid_t log_id;
128  std::string payload;
129  AppendCallback::Context context;
130  AppendAttributes attrs;
131  };
132  struct LogOptions {
133  // TODO: Remove it from this struct.
134  using Compression = facebook::logdevice::Compression;
135 
136  LogOptions() {}
137 
138  // Flush buffered writes for a log when the oldest has been buffered this
139  // long (negative for no trigger)
140  std::chrono::milliseconds time_trigger{-1};
141 
142  // Flush buffered writes for a log as soon there are this many payload
143  // bytes buffered (negative for no trigger)
144  ssize_t size_trigger = -1;
145 
146  enum class Mode {
147  // Write each batch independently (also applies to retries if
148  // configured).
149  //
150  // This is the default mode which allows for highest throughput, but can
151  // cause writes to get reordered. For example, suppose two batches 1
152  // and 2 get sent out, 1 fails and 2 succeeds. After 1 is retried, the
153  // contents of the log would be 21 (or 121 if the very first write
154  // actually succeeded but we could not get confirmation).
155  INDEPENDENT,
156 
157  // Only allow one batch at a time to be inflight to LogDevice servers.
158  // This fixes the ordering issue in the INDEPENDENT mode. It is
159  // especially useful for stream processing cases where records in
160  // LogDevice need to end up in the same order as in the input stream.
161  //
162  // The size and time triggers are relaxed (batches can get larger and
163  // delayed while one is already in flight). This mode possibly limits
164  // throughput under certain conditions (extremely high throughput on a
165  // single log and/or errors writing to LogDevice).
166  ONE_AT_A_TIME,
167  };
168  Mode mode = Mode::INDEPENDENT;
169  // Max number of times to retry (0 for no retrying, negative for
170  // unlimited). You may also manually track retries and have onRetry()
171  // return DENY to stop retrying a particular batch.
172  int retry_count = 0;
173  // Initial delay before retrying (negative for a default 2x the append
174  // timeout). Subsequent retries are made after successively larger delays
175  // (exponential backoff with a factor of 2) up to retry_max_delay
176  std::chrono::milliseconds retry_initial_delay{-1};
177  // Max delay when retrying (negative for no limit)
178  std::chrono::milliseconds retry_max_delay{60000};
179 
180  // Compression codec.
181  Compression compression = Compression::LZ4;
182 
183  // If set to true, will destroy individual payloads immediately after they
184  // are batched together. onSuccess(), onFailure() and onRetry() callbacks
185  // will not contain payloads.
186  bool destroy_payloads = false;
187 
188  // Returns "independent" or "one_at_a_time".
189  static std::string modeToString(Mode mode);
190  // Returns 0 on success, -1 on error.
191  static int parseMode(const char* str, Mode* out_mode);
192 
193  // COMPAT. Use `compressionToString` and `parseCompression` from
194  // facebook::logdevice namespace.
195  static std::string compressionToString(Compression c);
196  static int parseCompression(const char* str, Compression* out_c);
197  };
198 
199  // NOTE: It is possible to have different options for each log but currently
200  // this feature is not supported in the interface.
201  struct Options : public LogOptions {
202  Options() : LogOptions() {}
203 
204  // Approximate memory budget for buffered and inflight writes. If an
205  // append() call would exceed this limit, it fails fast with E::NOBUFS.
206  //
207  // Accounting is not completely accurate for performance reasons. There
208  // is internal overhead per batch and there may be pathological cases
209  // where actual memory usage exceeds the limit. However, in most cases it
210  // should stay well under.
211  //
212  // Negative for no limit.
213  int32_t memory_limit_mb = -1;
214  };
215 
222  static std::unique_ptr<BufferedWriter> create(std::shared_ptr<Client> client,
223  AppendCallback* callback,
224  Options options = Options());
225 
234  int append(logid_t logid,
235  std::string&& payload,
236  AppendCallback::Context callback_context,
237  AppendAttributes&& attrs = AppendAttributes());
238 
249  std::vector<Status> append(std::vector<Append>&& appends);
250 
262  int flushAll();
263 
268  virtual ~BufferedWriter() {}
269 
270  private:
271  BufferedWriter() {} // can be constructed by the factory only
272  BufferedWriter(const BufferedWriter&) = delete;
273  BufferedWriter& operator=(const BufferedWriter&) = delete;
274 
275  friend class BufferedWriterImpl;
276  BufferedWriterImpl* impl(); // downcasts (this)
277 };
278 }} // namespace facebook::logdevice
Definition: BufferedWriter.h:114
int append(logid_t logid, std::string &&payload, AppendCallback::Context callback_context, AppendAttributes &&attrs=AppendAttributes())
static std::unique_ptr< BufferedWriter > create(std::shared_ptr< Client > client, AppendCallback *callback, Options options=Options())
Definition: AsyncReader.h:16
virtual void onSuccess(logid_t, ContextSet, const DataRecordAttributes &)
Definition: BufferedWriter.h:81
Definition: BufferedWriter.h:62
Definition: BufferedWriter.h:132
virtual void onFailure(logid_t, ContextSet, Status)
Definition: BufferedWriter.h:92
Definition: BufferedWriter.h:201
virtual ~BufferedWriter()
Definition: BufferedWriter.h:268
virtual RetryDecision onRetry(logid_t, const ContextSet &, Status)
Definition: BufferedWriter.h:106