LogDevice API
BufferedWriter.h
1 
8 #pragma once
9 
10 #include <chrono>
11 #include <memory>
12 #include <tuple>
13 #include <vector>
14 
15 #include "logdevice/include/Client.h"
16 #include "logdevice/include/types.h"
17 
18 namespace facebook { namespace logdevice {
19 
55 class BufferedWriterImpl;
56 
58  public:
64  public:
65  using Context = void*;
66  using ContextSet = std::vector<std::pair<Context, std::string>>;
67 
76  virtual void onSuccess(logid_t /*log_id*/,
77  ContextSet /*contexts*/,
78  const DataRecordAttributes& /*attrs*/) {}
87  virtual void onFailure(logid_t /*log_id*/,
88  ContextSet /*contexts*/,
89  Status /*status*/) {}
90 
91  enum class RetryDecision { ALLOW, DENY };
92 
101  virtual RetryDecision onRetry(logid_t /*log_id*/,
102  const ContextSet& /*contexts*/,
103  Status /*status*/) {
104  return RetryDecision::ALLOW;
105  }
106 
107  virtual ~AppendCallback() {}
108  };
109  struct Append {
110  Append(logid_t log_id,
111  std::string payload,
112  AppendCallback::Context context,
113  AppendAttributes attrs = AppendAttributes())
114  : log_id(log_id),
115  payload(std::move(payload)),
116  context(context),
117  attrs(std::move(attrs)) {}
118 
119  Append(const Append&) = delete;
120  Append(Append&&) = default;
121  Append& operator=(Append&&) = default;
122  logid_t log_id;
123  std::string payload;
124  AppendCallback::Context context;
125  AppendAttributes attrs;
126  };
127  struct LogOptions {
128  // TODO: Remove it from this struct.
129  using Compression = facebook::logdevice::Compression;
130 
131  LogOptions() {}
132 
133  // Flush buffered writes for a log when the oldest has been buffered this
134  // long (negative for no trigger)
135  std::chrono::milliseconds time_trigger{-1};
136 
137  // Flush buffered writes for a log as soon there are this many payload
138  // bytes buffered (negative for no trigger)
139  ssize_t size_trigger = -1;
140 
141  enum class Mode {
142  // Write each batch independently (also applies to retries if
143  // configured).
144  //
145  // This is the default mode which allows for highest throughput, but can
146  // cause writes to get reordered. For example, suppose two batches 1
147  // and 2 get sent out, 1 fails and 2 succeeds. After 1 is retried, the
148  // contents of the log would be 21 (or 121 if the very first write
149  // actually succeeded but we could not get confirmation).
150  INDEPENDENT,
151 
152  // Only allow one batch at a time to be inflight to LogDevice servers.
153  // This fixes the ordering issue in the INDEPENDENT mode. It is
154  // especially useful for stream processing cases where records in
155  // LogDevice need to end up in the same order as in the input stream.
156  //
157  // The size and time triggers are relaxed (batches can get larger and
158  // delayed while one is already in flight). This mode possibly limits
159  // throughput under certain conditions (extremely high throughput on a
160  // single log and/or errors writing to LogDevice).
161  ONE_AT_A_TIME,
162  };
163  Mode mode = Mode::INDEPENDENT;
164  // Max number of times to retry (0 for no retrying, negative for
165  // unlimited). You may also manually track retries and have onRetry()
166  // return DENY to stop retrying a particular batch.
167  int retry_count = 0;
168  // Initial delay before retrying (negative for a default 2x the append
169  // timeout). Subsequent retries are made after successively larger delays
170  // (exponential backoff with a factor of 2) up to retry_max_delay
171  std::chrono::milliseconds retry_initial_delay{-1};
172  // Max delay when retrying (negative for no limit)
173  std::chrono::milliseconds retry_max_delay{60000};
174 
175  // Compression codec.
176  Compression compression = Compression::LZ4;
177 
178  // If set to true, will destroy individual payloads immediately after they
179  // are batched together. onSuccess(), onFailure() and onRetry() callbacks
180  // will not contain payloads.
181  bool destroy_payloads = false;
182 
183  // Returns "independent" or "one_at_a_time".
184  static std::string modeToString(Mode mode);
185  // Returns 0 on success, -1 on error.
186  static int parseMode(const char* str, Mode* out_mode);
187 
188  // COMPAT. Use `compressionToString` and `parseCompression` from
189  // facebook::logdevice namespace.
190  static std::string compressionToString(Compression c);
191  static int parseCompression(const char* str, Compression* out_c);
192  };
193 
194  // NOTE: It is possible to have different options for each log but currently
195  // this feature is not supported in the interface.
196  struct Options : public LogOptions {
197  Options() : LogOptions() {}
198 
199  // Approximate memory budget for buffered and inflight writes. If an
200  // append() call would exceed this limit, it fails fast with E::NOBUFS.
201  //
202  // Accounting is not completely accurate for performance reasons. There
203  // is internal overhead per batch and there may be pathological cases
204  // where actual memory usage exceeds the limit. However, in most cases it
205  // should stay well under.
206  //
207  // Negative for no limit.
208  int32_t memory_limit_mb = -1;
209  };
210 
217  static std::unique_ptr<BufferedWriter> create(std::shared_ptr<Client> client,
218  AppendCallback* callback,
219  Options options = Options());
220 
229  int append(logid_t logid,
230  std::string&& payload,
231  AppendCallback::Context callback_context,
232  AppendAttributes&& attrs = AppendAttributes());
233 
244  std::vector<Status> append(std::vector<Append>&& appends);
245 
257  int flushAll();
258 
263  virtual ~BufferedWriter() {}
264 
265  private:
266  BufferedWriter() {} // can be constructed by the factory only
267  BufferedWriter(const BufferedWriter&) = delete;
268  BufferedWriter& operator=(const BufferedWriter&) = delete;
269 
270  friend class BufferedWriterImpl;
271  BufferedWriterImpl* impl(); // downcasts (this)
272 };
273 }} // namespace facebook::logdevice
Definition: BufferedWriter.h:109
int append(logid_t logid, std::string &&payload, AppendCallback::Context callback_context, AppendAttributes &&attrs=AppendAttributes())
static std::unique_ptr< BufferedWriter > create(std::shared_ptr< Client > client, AppendCallback *callback, Options options=Options())
Definition: AsyncReader.h:16
virtual void onSuccess(logid_t, ContextSet, const DataRecordAttributes &)
Definition: BufferedWriter.h:76
Definition: BufferedWriter.h:57
Definition: BufferedWriter.h:127
virtual void onFailure(logid_t, ContextSet, Status)
Definition: BufferedWriter.h:87
Definition: BufferedWriter.h:196
virtual ~BufferedWriter()
Definition: BufferedWriter.h:263
virtual RetryDecision onRetry(logid_t, const ContextSet &, Status)
Definition: BufferedWriter.h:101