Libosmium  2.17.0
Fast and flexible C++ library for working with OpenStreetMap data
reader.hpp
Go to the documentation of this file.
1 #ifndef OSMIUM_IO_READER_HPP
2 #define OSMIUM_IO_READER_HPP
3 
4 /*
5 
6 This file is part of Osmium (https://osmcode.org/libosmium).
7 
8 Copyright 2013-2021 Jochen Topf <jochen@topf.org> and others (see README).
9 
10 Boost Software License - Version 1.0 - August 17th, 2003
11 
12 Permission is hereby granted, free of charge, to any person or organization
13 obtaining a copy of the software and accompanying documentation covered by
14 this license (the "Software") to use, reproduce, display, distribute,
15 execute, and transmit the Software, and to prepare derivative works of the
16 Software, and to permit third-parties to whom the Software is furnished to
17 do so, all subject to the following:
18 
19 The copyright notices in the Software and this entire statement, including
20 the above license grant, this restriction and the following disclaimer,
21 must be included in all copies of the Software, in whole or in part, and
22 all derivative works of the Software, unless such copies or derivative
23 works are solely in the form of machine-executable object code generated by
24 a source language processor.
25 
26 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28 FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29 SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30 FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32 DEALINGS IN THE SOFTWARE.
33 
34 */
35 
37 #include <osmium/io/detail/input_format.hpp>
38 #include <osmium/io/detail/queue_util.hpp>
39 #include <osmium/io/detail/read_thread.hpp>
40 #include <osmium/io/detail/read_write.hpp>
41 #include <osmium/io/error.hpp>
42 #include <osmium/io/file.hpp>
43 #include <osmium/io/header.hpp>
44 #include <osmium/memory/buffer.hpp>
46 #include <osmium/thread/pool.hpp>
47 #include <osmium/thread/util.hpp>
48 #include <osmium/util/config.hpp>
49 
50 #include <cerrno>
51 #include <cstdlib>
52 #include <fcntl.h>
53 #include <future>
54 #include <memory>
55 #include <string>
56 #include <system_error>
57 #include <thread>
58 #include <utility>
59 
60 #ifndef _WIN32
61 # include <sys/wait.h>
62 #endif
63 
64 #ifndef _MSC_VER
65 # include <unistd.h>
66 #endif
67 
68 namespace osmium {
69 
70  namespace io {
71 
72  namespace detail {
73 
74  inline std::size_t get_input_queue_size() noexcept {
75  return osmium::config::get_max_queue_size("INPUT", 20);
76  }
77 
78  inline std::size_t get_osmdata_queue_size() noexcept {
79  return osmium::config::get_max_queue_size("OSMDATA", 20);
80  }
81 
82  } // namespace detail
83 
90  class Reader {
91 
92  // The Reader::read() function reads from a queue of buffers which
93  // can contain nested buffers. These nested buffers will be in
94  // here, because read() can only return a single unnested buffer.
95  osmium::memory::Buffer m_back_buffers{};
96 
98 
100 
101  detail::ParserFactory::create_parser_type m_creator;
102 
103  enum class status {
104  okay = 0, // normal reading
105  error = 1, // some error occurred while reading
106  closed = 2, // close() called
107  eof = 3 // eof of file was reached without error
109 
110  int m_childpid = 0;
111 
112  detail::future_string_queue_type m_input_queue;
113 
114  std::unique_ptr<osmium::io::Decompressor> m_decompressor;
115 
116  osmium::io::detail::ReadThreadManager m_read_thread_manager;
117 
118  detail::future_buffer_queue_type m_osmdata_queue;
119  detail::queue_wrapper<osmium::memory::Buffer> m_osmdata_queue_wrapper;
120 
121  std::future<osmium::io::Header> m_header_future{};
123 
125 
126  std::size_t m_file_size = 0;
127 
131 
132  void set_option(osmium::thread::Pool& pool) noexcept {
133  m_pool = &pool;
134  }
135 
137  m_read_which_entities = value;
138  }
139 
140  void set_option(osmium::io::read_meta value) noexcept {
141  // Ignore this setting if we have a history/change file,
142  // because if this is set to "no", we don't see the difference
143  // between visible and deleted objects.
145  m_read_metadata = value;
146  }
147  }
148 
149  void set_option(osmium::io::buffers_type value) noexcept {
150  m_buffers_kind = value;
151  }
152 
153  // This function will run in a separate thread.
155  const detail::ParserFactory::create_parser_type& creator,
156  detail::future_string_queue_type& input_queue,
157  detail::future_buffer_queue_type& osmdata_queue,
158  std::promise<osmium::io::Header>&& header_promise,
159  osmium::osm_entity_bits::type read_which_entities,
160  osmium::io::read_meta read_metadata,
161  osmium::io::buffers_type buffers_kind) {
162  std::promise<osmium::io::Header> promise{std::move(header_promise)};
163  osmium::io::detail::parser_arguments args = {
164  pool,
165  input_queue,
166  osmdata_queue,
167  promise,
168  read_which_entities,
169  read_metadata,
170  buffers_kind
171  };
172  creator(args)->parse();
173  }
174 
175 #ifndef _WIN32
187  static int execute(const std::string& command, const std::string& filename, int* childpid) {
188  int pipefd[2];
189  if (pipe(pipefd) < 0) {
190  throw std::system_error{errno, std::system_category(), "opening pipe failed"};
191  }
192  const pid_t pid = fork();
193  if (pid < 0) {
194  throw std::system_error{errno, std::system_category(), "fork failed"};
195  }
196  if (pid == 0) { // child
197  // close all file descriptors except one end of the pipe
198  for (int i = 0; i < 32; ++i) {
199  if (i != pipefd[1]) {
200  ::close(i);
201  }
202  }
203  if (dup2(pipefd[1], 1) < 0) { // put end of pipe as stdout/stdin
204  exit(1);
205  }
206 
207  ::open("/dev/null", O_RDONLY); // stdin
208  ::open("/dev/null", O_WRONLY); // stderr
209  // hack: -g switches off globbing in curl which allows [] to be used in file names
210  // this is important for XAPI URLs
211  // in theory this execute() function could be used for other commands, but it is
212  // only used for curl at the moment, so this is okay.
213  if (::execlp(command.c_str(), command.c_str(), "-g", filename.c_str(), nullptr) < 0) {
214  exit(1);
215  }
216  }
217  // parent
218  *childpid = pid;
219  ::close(pipefd[1]);
220  return pipefd[0];
221  }
222 #endif
223 
232  static int open_input_file_or_url(const std::string& filename, int* childpid) {
233  const std::string protocol{filename.substr(0, filename.find_first_of(':'))};
234  if (protocol == "http" || protocol == "https" || protocol == "ftp" || protocol == "file") {
235 #ifndef _WIN32
236  return execute("curl", filename, childpid);
237 #else
238  throw io_error{"Reading OSM files from the network currently not supported on Windows."};
239 #endif
240  }
241  const int fd = osmium::io::detail::open_for_reading(filename);
242 #if __linux__
243  if (fd >= 0) {
244  // Tell the kernel we are going to read this file sequentially
245  ::posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
246  }
247 #endif
248  return fd;
249  }
250 
251  public:
252 
293  template <typename... TArgs>
294  explicit Reader(const osmium::io::File& file, TArgs&&... args) :
295  m_file(file.check()),
296  m_creator(detail::ParserFactory::instance().get_creator_function(m_file)),
297  m_input_queue(detail::get_input_queue_size(), "raw_input"),
298  m_decompressor(m_file.buffer() ?
299  osmium::io::CompressionFactory::instance().create_decompressor(file.compression(), m_file.buffer(), m_file.buffer_size()) :
300  osmium::io::CompressionFactory::instance().create_decompressor(file.compression(), open_input_file_or_url(m_file.filename(), &m_childpid))),
302  m_osmdata_queue(detail::get_osmdata_queue_size(), "parser_results"),
305 
306  (void)std::initializer_list<int>{
307  (set_option(args), 0)...
308  };
309 
310  if (!m_pool) {
312  }
313 
314  std::promise<osmium::io::Header> header_promise;
315  m_header_future = header_promise.get_future();
317  std::ref(m_input_queue), std::ref(m_osmdata_queue),
318  std::move(header_promise), m_read_which_entities,
320  }
321 
322  template <typename... TArgs>
323  explicit Reader(const std::string& filename, TArgs&&... args) :
324  Reader(osmium::io::File(filename), std::forward<TArgs>(args)...) {
325  }
326 
327  template <typename... TArgs>
328  explicit Reader(const char* filename, TArgs&&... args) :
329  Reader(osmium::io::File(filename), std::forward<TArgs>(args)...) {
330  }
331 
332  Reader(const Reader&) = delete;
333  Reader& operator=(const Reader&) = delete;
334 
335  Reader(Reader&&) = delete;
336  Reader& operator=(Reader&&) = delete;
337 
338  ~Reader() noexcept {
339  try {
340  close();
341  } catch (...) {
342  // Ignore any exceptions because destructor must not throw.
343  }
344  }
345 
354  void close() {
356 
357  m_read_thread_manager.stop();
358 
359  m_osmdata_queue_wrapper.drain();
360 
361  try {
362  m_read_thread_manager.close();
363  } catch (...) {
364  // Ignore any exceptions.
365  }
366 
367 #ifndef _WIN32
368  if (m_childpid) {
369  int status = 0;
370  const pid_t pid = ::waitpid(m_childpid, &status, 0);
371 #pragma GCC diagnostic push
372 #pragma GCC diagnostic ignored "-Wold-style-cast"
373  if (pid < 0 || !WIFEXITED(status) || WEXITSTATUS(status) != 0) { // NOLINT(hicpp-signed-bitwise)
374  throw std::system_error{errno, std::system_category(), "subprocess returned error"};
375  }
376 #pragma GCC diagnostic pop
377  m_childpid = 0;
378  }
379 #endif
380  }
381 
389  if (m_status == status::error) {
390  throw io_error{"Can not get header from reader when in status 'error'"};
391  }
392 
393  try {
394  if (m_header_future.valid()) {
395  m_header = m_header_future.get();
396  }
397  } catch (...) {
398  close();
400  throw;
401  }
402 
403  return m_header;
404  }
405 
414  osmium::memory::Buffer read() {
415  osmium::memory::Buffer buffer;
416 
417  // If there are buffers on the stack, return those first.
418  if (m_back_buffers) {
419  if (m_back_buffers.has_nested_buffers()) {
420  buffer = std::move(*m_back_buffers.get_last_nested());
421  } else {
422  buffer = std::move(m_back_buffers);
423  m_back_buffers = osmium::memory::Buffer{};
424  }
425  return buffer;
426  }
427 
428  if (m_status != status::okay) {
429  throw io_error{"Can not read from reader when in status 'closed', 'eof', or 'error'"};
430  }
431 
434  return buffer;
435  }
436 
437  try {
438  // m_input_format.read() can return an invalid buffer to signal EOF,
439  // or a valid buffer with or without data. A valid buffer
440  // without data is not an error, it just means we have to
441  // keep getting the next buffer until there is one with data.
442  while (true) {
443  buffer = m_osmdata_queue_wrapper.pop();
444  if (detail::at_end_of_data(buffer)) {
446  m_read_thread_manager.close();
447  return buffer;
448  }
449  if (buffer.has_nested_buffers()) {
450  m_back_buffers = std::move(buffer);
451  buffer = std::move(*m_back_buffers.get_last_nested());
452  }
453  if (buffer.committed() > 0) {
454  return buffer;
455  }
456  }
457  } catch (...) {
458  close();
460  throw;
461  }
462  }
463 
468  bool eof() const {
470  }
471 
476  std::size_t file_size() const noexcept {
477  return m_file_size;
478  }
479 
494  std::size_t offset() const noexcept {
495  return m_decompressor->offset();
496  }
497 
498  }; // class Reader
499 
508  template <typename... TArgs>
509  osmium::memory::Buffer read_file(TArgs&&... args) {
510  osmium::memory::Buffer buffer{1024 * 1024, osmium::memory::Buffer::auto_grow::yes};
511 
512  Reader reader{std::forward<TArgs>(args)...};
513  while (auto read_buffer = reader.read()) {
514  buffer.add_buffer(read_buffer);
515  buffer.commit();
516  }
517 
518  return buffer;
519  }
520 
521  } // namespace io
522 
523 } // namespace osmium
524 
525 #endif // OSMIUM_IO_READER_HPP
Definition: compression.hpp:141
Definition: file.hpp:72
bool has_multiple_object_versions() const noexcept
Definition: file.hpp:303
Definition: header.hpp:68
Definition: reader.hpp:90
osmium::memory::Buffer read()
Definition: reader.hpp:414
osmium::io::buffers_type m_buffers_kind
Definition: reader.hpp:130
detail::future_string_queue_type m_input_queue
Definition: reader.hpp:112
osmium::memory::Buffer m_back_buffers
Definition: reader.hpp:95
int m_childpid
Definition: reader.hpp:110
void set_option(osmium::io::read_meta value) noexcept
Definition: reader.hpp:140
detail::future_buffer_queue_type m_osmdata_queue
Definition: reader.hpp:118
std::size_t m_file_size
Definition: reader.hpp:126
Reader & operator=(Reader &&)=delete
void set_option(osmium::thread::Pool &pool) noexcept
Definition: reader.hpp:132
void set_option(osmium::io::buffers_type value) noexcept
Definition: reader.hpp:149
static int execute(const std::string &command, const std::string &filename, int *childpid)
Definition: reader.hpp:187
enum osmium::io::Reader::status m_status
static int open_input_file_or_url(const std::string &filename, int *childpid)
Definition: reader.hpp:232
std::unique_ptr< osmium::io::Decompressor > m_decompressor
Definition: reader.hpp:114
status
Definition: reader.hpp:103
static void parser_thread(osmium::thread::Pool &pool, const detail::ParserFactory::create_parser_type &creator, detail::future_string_queue_type &input_queue, detail::future_buffer_queue_type &osmdata_queue, std::promise< osmium::io::Header > &&header_promise, osmium::osm_entity_bits::type read_which_entities, osmium::io::read_meta read_metadata, osmium::io::buffers_type buffers_kind)
Definition: reader.hpp:154
Reader(const char *filename, TArgs &&... args)
Definition: reader.hpp:328
osmium::io::Header m_header
Definition: reader.hpp:122
detail::ParserFactory::create_parser_type m_creator
Definition: reader.hpp:101
Reader & operator=(const Reader &)=delete
osmium::io::Header header()
Definition: reader.hpp:388
detail::queue_wrapper< osmium::memory::Buffer > m_osmdata_queue_wrapper
Definition: reader.hpp:119
Reader(const osmium::io::File &file, TArgs &&... args)
Definition: reader.hpp:294
std::size_t file_size() const noexcept
Definition: reader.hpp:476
Reader(Reader &&)=delete
void set_option(osmium::osm_entity_bits::type value) noexcept
Definition: reader.hpp:136
std::future< osmium::io::Header > m_header_future
Definition: reader.hpp:121
osmium::io::detail::ReadThreadManager m_read_thread_manager
Definition: reader.hpp:116
bool eof() const
Definition: reader.hpp:468
Reader(const Reader &)=delete
std::size_t offset() const noexcept
Definition: reader.hpp:494
osmium::thread::thread_handler m_thread
Definition: reader.hpp:124
void close()
Definition: reader.hpp:354
~Reader() noexcept
Definition: reader.hpp:338
osmium::io::File m_file
Definition: reader.hpp:97
osmium::osm_entity_bits::type m_read_which_entities
Definition: reader.hpp:128
Reader(const std::string &filename, TArgs &&... args)
Definition: reader.hpp:323
osmium::io::read_meta m_read_metadata
Definition: reader.hpp:129
osmium::thread::Pool * m_pool
Definition: reader.hpp:99
Definition: pool.hpp:90
static Pool & default_instance()
Definition: pool.hpp:186
Definition: util.hpp:85
Definition: attr.hpp:342
std::size_t get_max_queue_size(const char *queue_name, const std::size_t default_value) noexcept
Definition: config.hpp:83
osmium::memory::Buffer read_file(TArgs &&... args)
Definition: reader.hpp:509
buffers_type
Definition: file_format.hpp:60
read_meta
Definition: file_format.hpp:55
type
Definition: entity_bits.hpp:63
@ all
object or changeset
Definition: entity_bits.hpp:76
@ nothing
Definition: entity_bits.hpp:67
Namespace for everything in the Osmium library.
Definition: assembler.hpp:53
Definition: location.hpp:551
Definition: error.hpp:44