Libosmium  2.9.0
Fast and flexible C++ library for working with OpenStreetMap data
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
writer.hpp
Go to the documentation of this file.
1 #ifndef OSMIUM_IO_WRITER_HPP
2 #define OSMIUM_IO_WRITER_HPP
3 
4 /*
5 
6 This file is part of Osmium (http://osmcode.org/libosmium).
7 
8 Copyright 2013-2016 Jochen Topf <jochen@topf.org> and others (see README).
9 
10 Boost Software License - Version 1.0 - August 17th, 2003
11 
12 Permission is hereby granted, free of charge, to any person or organization
13 obtaining a copy of the software and accompanying documentation covered by
14 this license (the "Software") to use, reproduce, display, distribute,
15 execute, and transmit the Software, and to prepare derivative works of the
16 Software, and to permit third-parties to whom the Software is furnished to
17 do so, all subject to the following:
18 
19 The copyright notices in the Software and this entire statement, including
20 the above license grant, this restriction and the following disclaimer,
21 must be included in all copies of the Software, in whole or in part, and
22 all derivative works of the Software, unless such copies or derivative
23 works are solely in the form of machine-executable object code generated by
24 a source language processor.
25 
26 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28 FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29 SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30 FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32 DEALINGS IN THE SOFTWARE.
33 
34 */
35 
36 #include <cassert>
37 #include <cstddef>
38 #include <exception>
39 #include <functional>
40 #include <future>
41 #include <initializer_list>
42 #include <memory>
43 #include <string>
44 #include <utility>
45 
47 #include <osmium/io/detail/output_format.hpp>
48 #include <osmium/io/detail/queue_util.hpp>
49 #include <osmium/io/detail/read_write.hpp>
50 #include <osmium/io/detail/write_thread.hpp>
51 #include <osmium/io/error.hpp>
52 #include <osmium/io/file.hpp>
53 #include <osmium/io/header.hpp>
55 #include <osmium/memory/buffer.hpp>
56 #include <osmium/thread/util.hpp>
57 #include <osmium/util/config.hpp>
58 
59 namespace osmium {
60 
61  namespace memory {
62  class Item;
63  } //namespace memory
64 
65  namespace io {
66 
67  namespace detail {
68 
69  inline size_t get_output_queue_size() noexcept {
70  size_t n = osmium::config::get_max_queue_size("OUTPUT", 20);
71  return n > 2 ? n : 2;
72  }
73 
74  } // namespace detail
75 
99  class Writer {
100 
101  static constexpr size_t default_buffer_size = 10 * 1024 * 1024;
102 
104 
105  detail::future_string_queue_type m_output_queue;
106 
107  std::unique_ptr<osmium::io::detail::OutputFormat> m_output;
108 
110 
112 
113  std::future<bool> m_write_future;
114 
116 
117  enum class status {
118  okay = 0, // normal writing
119  error = 1, // some error occurred while writing
120  closed = 2 // close() called successfully
121  } m_status;
122 
123  // This function will run in a separate thread.
124  static void write_thread(detail::future_string_queue_type& output_queue,
125  std::unique_ptr<osmium::io::Compressor>&& compressor,
126  std::promise<bool>&& write_promise) {
127  detail::WriteThread write_thread{output_queue,
128  std::move(compressor),
129  std::move(write_promise)};
130  write_thread();
131  }
132 
134  if (buffer && buffer.committed() > 0) {
135  m_output->write_buffer(std::move(buffer));
136  }
137  }
138 
139  void do_flush() {
140  osmium::thread::check_for_exception(m_write_future);
141  if (m_buffer && m_buffer.committed() > 0) {
144  using std::swap;
145  swap(m_buffer, buffer);
146 
147  m_output->write_buffer(std::move(buffer));
148  }
149  }
150 
151  template <typename TFunction, typename... TArgs>
152  void ensure_cleanup(TFunction func, TArgs&&... args) {
153  if (m_status != status::okay) {
154  throw io_error("Can not write to writer when in status 'closed' or 'error'");
155  }
156 
157  try {
158  func(std::forward<TArgs>(args)...);
159  } catch (...) {
161  detail::add_to_queue(m_output_queue, std::current_exception());
162  detail::add_end_of_data_to_queue(m_output_queue);
163  throw;
164  }
165  }
166 
167  struct options_type {
171  };
172 
173  static void set_option(options_type& options, const osmium::io::Header& header) {
174  options.header = header;
175  }
176 
177  static void set_option(options_type& options, overwrite value) {
178  options.allow_overwrite = value;
179  }
180 
181  static void set_option(options_type& options, fsync value) {
182  options.sync = value;
183  }
184 
185  public:
186 
210  template <typename... TArgs>
211  explicit Writer(const osmium::io::File& file, TArgs&&... args) :
212  m_file(file.check()),
213  m_output_queue(detail::get_output_queue_size(), "raw_output"),
214  m_output(osmium::io::detail::OutputFormatFactory::instance().create_output(m_file, m_output_queue)),
215  m_buffer(),
216  m_buffer_size(default_buffer_size),
217  m_write_future(),
218  m_thread(),
219  m_status(status::okay) {
220  assert(!m_file.buffer()); // XXX can't handle pseudo-files
221 
222  options_type options;
223  (void)std::initializer_list<int>{
224  (set_option(options, args), 0)...
225  };
226 
227  std::unique_ptr<osmium::io::Compressor> compressor =
229  osmium::io::detail::open_for_writing(m_file.filename(), options.allow_overwrite),
230  options.sync);
231 
232  std::promise<bool> write_promise;
233  m_write_future = write_promise.get_future();
234  m_thread = osmium::thread::thread_handler{write_thread, std::ref(m_output_queue), std::move(compressor), std::move(write_promise)};
235 
236  ensure_cleanup([&](){
237  m_output->write_header(options.header);
238  });
239  }
240 
241  template <typename... TArgs>
242  explicit Writer(const std::string& filename, TArgs&&... args) :
243  Writer(osmium::io::File(filename), std::forward<TArgs>(args)...) {
244  }
245 
246  template <typename... TArgs>
247  explicit Writer(const char* filename, TArgs&&... args) :
248  Writer(osmium::io::File(filename), std::forward<TArgs>(args)...) {
249  }
250 
251  Writer(const Writer&) = delete;
252  Writer& operator=(const Writer&) = delete;
253 
254  Writer(Writer&&) = default;
255  Writer& operator=(Writer&&) = default;
256 
257  ~Writer() noexcept {
258  try {
259  close();
260  } catch (...) {
261  // Ignore any exceptions because destructor must not throw.
262  }
263  }
264 
268  size_t buffer_size() const noexcept {
269  return m_buffer_size;
270  }
271 
276  void set_buffer_size(size_t size) noexcept {
277  m_buffer_size = size;
278  }
279 
287  void flush() {
288  ensure_cleanup([&](){
289  do_flush();
290  });
291  }
292 
302  ensure_cleanup([&](){
303  do_flush();
304  do_write(std::move(buffer));
305  });
306  }
307 
315  void operator()(const osmium::memory::Item& item) {
316  ensure_cleanup([&](){
317  if (!m_buffer) {
320  }
321  try {
322  m_buffer.push_back(item);
323  } catch (const osmium::buffer_is_full&) {
324  do_flush();
325  m_buffer.push_back(item);
326  }
327  });
328  }
329 
339  void close() {
340  if (m_status == status::okay) {
341  ensure_cleanup([&](){
342  do_write(std::move(m_buffer));
343  m_output->write_end();
345  detail::add_end_of_data_to_queue(m_output_queue);
346  });
347  }
348 
349  if (m_write_future.valid()) {
350  m_write_future.get();
351  }
352  }
353 
354  }; // class Writer
355 
356  } // namespace io
357 
358 } // namespace osmium
359 
360 #endif // OSMIUM_IO_WRITER_HPP
fsync sync
Definition: writer.hpp:170
~Writer() noexcept
Definition: writer.hpp:257
void ensure_cleanup(TFunction func, TArgs &&...args)
Definition: writer.hpp:152
Definition: writer.hpp:167
static CompressionFactory & instance()
Definition: compression.hpp:172
void do_write(osmium::memory::Buffer &&buffer)
Definition: writer.hpp:133
void do_flush()
Definition: writer.hpp:139
Definition: reader_iterator.hpp:39
void swap(Buffer &lhs, Buffer &rhs)
Definition: buffer.hpp:731
osmium::thread::thread_handler m_thread
Definition: writer.hpp:115
osmium::io::Header header
Definition: writer.hpp:168
osmium::memory::Buffer m_buffer
Definition: writer.hpp:109
std::unique_ptr< osmium::io::detail::OutputFormat > m_output
Definition: writer.hpp:107
enum osmium::io::Writer::status m_status
void set_buffer_size(size_t size) noexcept
Definition: writer.hpp:276
Definition: file.hpp:72
Definition: item.hpp:105
static void write_thread(detail::future_string_queue_type &output_queue, std::unique_ptr< osmium::io::Compressor > &&compressor, std::promise< bool > &&write_promise)
Definition: writer.hpp:124
void operator()(osmium::memory::Buffer &&buffer)
Definition: writer.hpp:301
Namespace for everything in the Osmium library.
Definition: assembler.hpp:73
status
Definition: writer.hpp:117
Definition: attr.hpp:333
size_t buffer_size() const noexcept
Definition: writer.hpp:268
Writer(const std::string &filename, TArgs &&...args)
Definition: writer.hpp:242
fsync
Definition: writer_options.hpp:51
static constexpr size_t default_buffer_size
Definition: writer.hpp:101
static void set_option(options_type &options, fsync value)
Definition: writer.hpp:181
void push_back(const osmium::memory::Item &item)
Definition: buffer.hpp:488
Definition: error.hpp:44
size_t m_buffer_size
Definition: writer.hpp:111
void close()
Definition: writer.hpp:339
void flush()
Definition: writer.hpp:287
osmium::io::File m_file
Definition: writer.hpp:103
std::unique_ptr< osmium::io::Compressor > create_compressor(osmium::io::file_compression compression, TArgs &&...args)
Definition: compression.hpp:192
size_t committed() const noexcept
Definition: buffer.hpp:241
size_t get_max_queue_size(const char *queue_name, size_t default_value) noexcept
Definition: config.hpp:69
Definition: buffer.hpp:97
Definition: buffer.hpp:58
const char * buffer() const noexcept
Definition: file.hpp:156
static void set_option(options_type &options, overwrite value)
Definition: writer.hpp:177
Definition: writer.hpp:99
std::future< bool > m_write_future
Definition: writer.hpp:113
void check_for_exception(std::future< T > &future)
Definition: util.hpp:55
Writer(const osmium::io::File &file, TArgs &&...args)
Definition: writer.hpp:211
Definition: header.hpp:49
overwrite allow_overwrite
Definition: writer.hpp:169
static void set_option(options_type &options, const osmium::io::Header &header)
Definition: writer.hpp:173
detail::future_string_queue_type m_output_queue
Definition: writer.hpp:105
file_compression compression() const noexcept
Definition: file.hpp:289
File & filename(const std::string &filename)
Definition: file.hpp:307
Writer(const char *filename, TArgs &&...args)
Definition: writer.hpp:247
void operator()(const osmium::memory::Item &item)
Definition: writer.hpp:315
Definition: util.hpp:85
overwrite
Definition: writer_options.hpp:43
Writer & operator=(const Writer &)=delete