LogDevice API
BufferedWriter.h
Go to the documentation of this file.
1 
8 #pragma once
9 
10 #include <chrono>
11 #include <memory>
12 #include <tuple>
13 #include <variant>
14 #include <vector>
15 
16 #include "logdevice/include/Client.h"
17 #include "logdevice/include/types.h"
18 
19 namespace facebook { namespace logdevice {
20 
61 class BufferedWriterImpl;
62 class BufferedWriterAppendSink;
63 
65  public:
67  using PayloadVariant = std::variant<std::string, PayloadGroup>;
68 
74  public:
75  using Context = void*;
76  using ContextSet = std::vector<std::pair<Context, PayloadVariant>>;
77 
94  virtual void onSuccess(logid_t /*log_id*/,
95  ContextSet /*contexts_and_payloads*/,
96  const DataRecordAttributes& /*attrs*/) {}
108  virtual void onFailure(logid_t /*log_id*/,
109  ContextSet /*contexts_and_payloads*/,
110  Status /*status*/) {}
111 
112  enum class RetryDecision { ALLOW, DENY };
113 
125  virtual RetryDecision onRetry(logid_t /*log_id*/,
126  const ContextSet& /*contexts_and_payloads*/,
127  Status /*status*/) {
128  return RetryDecision::ALLOW;
129  }
130 
131  virtual ~AppendCallback() {}
132  };
133  struct Append {
134  Append(logid_t log_id,
135  std::string payload,
136  AppendCallback::Context context,
137  AppendAttributes attrs = AppendAttributes())
138  : log_id(log_id),
139  payload(std::move(payload)),
140  context(context),
141  attrs(std::move(attrs)) {}
142 
143  Append(logid_t log_id,
144  PayloadGroup&& payload,
145  AppendCallback::Context context,
146  AppendAttributes attrs = AppendAttributes())
147  : log_id(log_id),
148  payload(std::move(payload)),
149  context(context),
150  attrs(std::move(attrs)) {}
151 
152  Append(const Append&) = delete;
153  Append(Append&&) = default;
154  Append& operator=(Append&&) = default;
155  logid_t log_id;
156  PayloadVariant payload;
157  AppendCallback::Context context;
158  AppendAttributes attrs;
159  };
160  struct LogOptions {
161  // TODO: Remove it from this struct.
162  using Compression = facebook::logdevice::Compression;
163 
164  LogOptions() {}
165 
166  // Flush buffered writes for a log when the oldest has been buffered this
167  // long (negative for no trigger)
168  std::chrono::milliseconds time_trigger{-1};
169 
170  // Flush buffered writes for a log as soon there are this many payload
171  // bytes buffered (negative for no trigger)
172  ssize_t size_trigger = -1;
173 
174  enum class Mode {
175  // Write each batch independently (also applies to retries if
176  // configured).
177  //
178  // This is the default mode which allows for highest throughput, but can
179  // cause writes to get reordered. For example, suppose two batches 1
180  // and 2 get sent out, 1 fails and 2 succeeds. After 1 is retried, the
181  // contents of the log would be 21 (or 121 if the very first write
182  // actually succeeded but we could not get confirmation).
183  INDEPENDENT,
184 
185  // Only allow one batch at a time to be inflight to LogDevice servers.
186  // This fixes the ordering issue in the INDEPENDENT mode. It is
187  // especially useful for stream processing cases where records in
188  // LogDevice need to end up in the same order as in the input stream.
189  //
190  // The size and time triggers are relaxed (batches can get larger and
191  // delayed while one is already in flight). This mode possibly limits
192  // throughput under certain conditions (extremely high throughput on a
193  // single log and/or errors writing to LogDevice).
194  ONE_AT_A_TIME,
195 
196  // Enables FIFO ordering for appends to a log from the same
197  // BufferedWriter. STREAM mode does not guarantee exactly-once semantics
198  // and hence records maybe duplicated. Precisely, for records r_1, r_2,
199  // ... appended using BufferedWriter::append(), the first occurrence of
200  // record r_i in the read stream is before the first occurrence of
201  // r_{i+1}. In the event of a failure, we recover a continuous prefix
202  // of records accepted by BufferedWriter::append().
203  //
204  // Callbacks to the BufferedWriter happen in order of accepted append
205  // messages. LSN reported back via the callback is not necessarily the
206  // first occurrence of the record and hence need not increase
207  // monotonically. It is not recommended to start reading directly from the
208  // LSN returned, since the reader may encounter gaps. We recommend reading
209  // from the beginning and caching the largest sequence number encountered,
210  // to consume records in FIFO order. We currently do not expose any
211  // sequence number to readers and hence it must be encoded as part of the
212  // payload to support exactly-once FIFO consumption.
213  STREAM,
214  };
215  Mode mode = Mode::INDEPENDENT;
216  // Max number of times to retry (0 for no retrying, negative for
217  // unlimited). You may also manually track retries and have onRetry()
218  // return DENY to stop retrying a particular batch.
219  int retry_count = 0;
220  // Initial delay before retrying (negative for a default 2x the append
221  // timeout). Subsequent retries are made after successively larger delays
222  // (exponential backoff with a factor of 2) up to retry_max_delay
223  std::chrono::milliseconds retry_initial_delay{-1};
224  // Max delay when retrying (negative for no limit)
225  std::chrono::milliseconds retry_max_delay{60000};
226 
227  // Compression codec.
228  Compression compression = Compression::LZ4;
229 
230  // If set to true, will destroy individual payloads immediately after they
231  // are batched together. onSuccess(), onFailure() and onRetry() callbacks
232  // will not contain payloads.
233  bool destroy_payloads = false;
234 
235  // Returns "independent" or "one_at_a_time".
236  static std::string modeToString(Mode mode);
237  // Returns 0 on success, -1 on error.
238  static int parseMode(const char* str, Mode* out_mode);
239 
240  // COMPAT. Use `compressionToString` and `parseCompression` from
241  // facebook::logdevice namespace.
242  static std::string compressionToString(Compression c);
243  static int parseCompression(const char* str, Compression* out_c);
244  };
245 
246  // NOTE: It is possible to have different options for each log but currently
247  // this feature is not supported in the interface.
248  struct Options : public LogOptions {
249  Options() : LogOptions() {}
250 
251  // Approximate memory budget for buffered and inflight writes. If an
252  // append() call would exceed this limit, it fails fast with E::NOBUFS.
253  //
254  // Accounting is not completely accurate for performance reasons. There
255  // is internal overhead per batch and there may be pathological cases
256  // where actual memory usage exceeds the limit. However, in most cases it
257  // should stay well under.
258  //
259  // Negative for no limit.
260  int32_t memory_limit_mb = -1;
261  };
262 
271  static std::unique_ptr<BufferedWriter> create(std::shared_ptr<Client> client,
272  AppendCallback* callback,
273  Options options = Options());
274 
286  int append(logid_t logid,
287  std::string&& payload,
288  AppendCallback::Context callback_context,
289  AppendAttributes&& attrs = AppendAttributes());
290  int append(logid_t logid,
291  PayloadGroup&& payload_group,
292  AppendCallback::Context callback_context,
293  AppendAttributes&& attrs = AppendAttributes());
294 
305  std::vector<Status> append(std::vector<Append>&& appends);
306 
318  int flushAll();
319 
324  virtual ~BufferedWriter() {}
325 
326  private:
327  BufferedWriter() {} // can be constructed by the factory only
328  BufferedWriter(const BufferedWriter&) = delete;
329  BufferedWriter& operator=(const BufferedWriter&) = delete;
330 
331  friend class BufferedWriterImpl;
332  BufferedWriterImpl* impl(); // downcasts (this)
333 };
334 }} // namespace facebook::logdevice
Definition: BufferedWriter.h:133
int append(logid_t logid, std::string &&payload, AppendCallback::Context callback_context, AppendAttributes &&attrs=AppendAttributes())
static std::unique_ptr< BufferedWriter > create(std::shared_ptr< Client > client, AppendCallback *callback, Options options=Options())
std::variant< std::string, PayloadGroup > PayloadVariant
Definition: BufferedWriter.h:67
Definition: AsyncReader.h:16
virtual void onSuccess(logid_t, ContextSet, const DataRecordAttributes &)
Definition: BufferedWriter.h:94
Definition: BufferedWriter.h:64
Definition: BufferedWriter.h:160
virtual void onFailure(logid_t, ContextSet, Status)
Definition: BufferedWriter.h:108
Definition: BufferedWriter.h:248
virtual ~BufferedWriter()
Definition: BufferedWriter.h:324
virtual RetryDecision onRetry(logid_t, const ContextSet &, Status)
Definition: BufferedWriter.h:125