--- a/src/s3select/include/s3select_parquet_intrf.h +++ b/src/s3select/include/s3select_parquet_intrf.h @@ -26,6 +26,14 @@ #include "internal_file_decryptor.h" #include "encryption_internal.h" +#if ARROW_VERSION_MAJOR < 9 +#define _ARROW_FD fd_ +#define _ARROW_FD_TYPE int +#else +#define _ARROW_FD fd_.fd() +#define _ARROW_FD_TYPE arrow::internal::FileDescriptor +#endif + /******************************************/ /******************************************/ class optional_yield; @@ -164,7 +172,7 @@ std::mutex lock_; // File descriptor - int fd_; + _ARROW_FD_TYPE fd_; FileMode::type mode_; @@ -202,7 +210,7 @@ mode_ = write_only ? FileMode::WRITE : FileMode::READWRITE; if (!truncate) { - ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_)); + ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(_ARROW_FD)); } else { size_ = 0; } @@ -222,7 +230,11 @@ RETURN_NOT_OK(SetFileName(fd)); is_open_ = true; mode_ = FileMode::WRITE; + #if ARROW_VERSION_MAJOR < 9 fd_ = fd; + #else + fd_ = arrow::internal::FileDescriptor{fd}; + #endif return Status::OK(); } @@ -230,7 +242,7 @@ RETURN_NOT_OK(SetFileName(path)); ARROW_ASSIGN_OR_RAISE(fd_, ::arrow::internal::FileOpenReadable(file_name_)); - ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_)); + ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(_ARROW_FD)); is_open_ = true; mode_ = FileMode::READ; @@ -242,7 +254,11 @@ RETURN_NOT_OK(SetFileName(fd)); is_open_ = true; mode_ = FileMode::READ; + #if ARROW_VERSION_MAJOR < 9 fd_ = fd; + #else + fd_ = arrow::internal::FileDescriptor{fd}; + #endif return Status::OK(); } @@ -258,9 +274,13 @@ // Even if closing fails, the fd will likely be closed (perhaps it's // already closed). is_open_ = false; + #if ARROW_VERSION_MAJOR < 9 int fd = fd_; fd_ = -1; RETURN_NOT_OK(::arrow::internal::FileClose(fd)); + #else + RETURN_NOT_OK(fd_.Close()); + #endif } return Status::OK(); } @@ -268,7 +288,7 @@ Result Read(int64_t nbytes, void* out) override { RETURN_NOT_OK(CheckClosed()); RETURN_NOT_OK(CheckPositioned()); - return ::arrow::internal::FileRead(fd_, reinterpret_cast(out), nbytes); + return ::arrow::internal::FileRead(_ARROW_FD, reinterpret_cast(out), nbytes); } Result ReadAt(int64_t position, int64_t nbytes, void* out) override { @@ -277,7 +297,7 @@ // ReadAt() leaves the file position undefined, so require that we seek // before calling Read() or Write(). need_seeking_.store(true); - return ::arrow::internal::FileReadAt(fd_, reinterpret_cast(out), position, + return ::arrow::internal::FileReadAt(_ARROW_FD, reinterpret_cast(out), position, nbytes); } @@ -286,7 +306,7 @@ if (pos < 0) { return Status::Invalid("Invalid position"); } - Status st = ::arrow::internal::FileSeek(fd_, pos); + Status st = ::arrow::internal::FileSeek(_ARROW_FD, pos); if (st.ok()) { need_seeking_.store(false); } @@ -295,7 +315,7 @@ Result Tell() const override { RETURN_NOT_OK(CheckClosed()); - return ::arrow::internal::FileTell(fd_); + return ::arrow::internal::FileTell(_ARROW_FD); } Status Write(const void* data, int64_t length) override { @@ -306,11 +326,11 @@ if (length < 0) { return Status::IOError("Length must be non-negative"); } - return ::arrow::internal::FileWrite(fd_, reinterpret_cast(data), + return ::arrow::internal::FileWrite(_ARROW_FD, reinterpret_cast(data), length); } - int fd() const override { return fd_; } + int fd() const override { return _ARROW_FD; } bool is_open() const override { return is_open_; } @@ -345,7 +365,7 @@ std::mutex lock_; // File descriptor - int fd_; + _ARROW_FD_TYPE fd_; FileMode::type mode_; @@ -411,7 +431,11 @@ // already closed). is_open_ = false; //int fd = fd_; + #if ARROW_VERSION_MAJOR < 9 fd_ = -1; + #else + fd_.Close(); + #endif //RETURN_NOT_OK(::arrow::internal::FileClose(fd)); } return Status::OK(); @@ -421,7 +445,7 @@ NOT_IMPLEMENT; RETURN_NOT_OK(CheckClosed()); RETURN_NOT_OK(CheckPositioned()); - return ::arrow::internal::FileRead(fd_, reinterpret_cast(out), nbytes); + return ::arrow::internal::FileRead(_ARROW_FD, reinterpret_cast(out), nbytes); } Result ReadAt(int64_t position, int64_t nbytes, void* out) { @@ -443,7 +467,7 @@ return Status::OK(); } - int fd() const { return fd_; } + int fd() const { return _ARROW_FD; } bool is_open() const { return is_open_; } @@ -467,7 +491,7 @@ std::mutex lock_; // File descriptor - int fd_; + _ARROW_FD_TYPE fd_; FileMode::type mode_; @@ -609,7 +633,7 @@ for (const auto& range : ranges) { RETURN_NOT_OK(internal::ValidateRange(range.offset, range.length)); #if defined(POSIX_FADV_WILLNEED) - if (posix_fadvise(fd_, range.offset, range.length, POSIX_FADV_WILLNEED)) { + if (posix_fadvise(_ARROW_FD, range.offset, range.length, POSIX_FADV_WILLNEED)) { return IOErrorFromErrno(errno, "posix_fadvise failed"); } #elif defined(F_RDADVISE) // macOS, BSD? @@ -617,7 +641,7 @@ off_t ra_offset; int ra_count; } radvisory{range.offset, static_cast(range.length)}; - if (radvisory.ra_count > 0 && fcntl(fd_, F_RDADVISE, &radvisory) == -1) { + if (radvisory.ra_count > 0 && fcntl(_ARROW_FD, F_RDADVISE, &radvisory) == -1) { return IOErrorFromErrno(errno, "fcntl(fd, F_RDADVISE, ...) failed"); } #endif @@ -970,6 +994,9 @@ CryptoContext ctx(col->has_dictionary_page(), row_group_ordinal_, static_cast(i), meta_decryptor, data_decryptor); return PageReader::Open(stream, col->num_values(), col->compression(), + #if ARROW_VERSION_MAJOR > 8 + false, + #endif properties_.memory_pool(), &ctx); } @@ -985,6 +1012,9 @@ CryptoContext ctx(col->has_dictionary_page(), row_group_ordinal_, static_cast(i), meta_decryptor, data_decryptor); return PageReader::Open(stream, col->num_values(), col->compression(), + #if ARROW_VERSION_MAJOR > 8 + false, + #endif properties_.memory_pool(), &ctx); }