Skip to content

Commit

Permalink
store variadic buffer sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
bkietz committed Oct 25, 2023
1 parent b0bcb16 commit 0afb739
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 18 deletions.
66 changes: 54 additions & 12 deletions cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/range.h"
#include "arrow/util/small_vector.h"
#include "arrow/util/string.h"
#include "arrow/util/value_parsing.h"
Expand All @@ -52,6 +53,8 @@ namespace arrow {
using internal::checked_cast;
using internal::checked_pointer_cast;

using internal::Zip;

using internal::SmallVector;
using internal::StaticVector;

Expand Down Expand Up @@ -522,13 +525,14 @@ namespace {

struct ExportedArrayPrivateData : PoolAllocationMixin<ExportedArrayPrivateData> {
// The buffers are owned by the ArrayData member
StaticVector<const void*, 3> buffers_;
SmallVector<const void*, 3> buffers_;
struct ArrowArray dictionary_;
SmallVector<struct ArrowArray, 1> children_;
SmallVector<struct ArrowArray*, 4> child_pointers_;

std::shared_ptr<ArrayData> data_;
std::shared_ptr<Device::SyncEvent> sync_;
std::vector<int64_t> variadic_buffer_sizes_;

ExportedArrayPrivateData() = default;
ARROW_DEFAULT_MOVE_AND_ASSIGN(ExportedArrayPrivateData);
Expand Down Expand Up @@ -571,12 +575,28 @@ struct ArrayExporter {
--n_buffers;
++buffers_begin;
}

bool need_variadic_buffer_sizes =
data->type->id() == Type::BINARY_VIEW || data->type->id() == Type::STRING_VIEW;
if (need_variadic_buffer_sizes) {
++n_buffers;
}

export_.buffers_.resize(n_buffers);
std::transform(buffers_begin, data->buffers.end(), export_.buffers_.begin(),
[](const std::shared_ptr<Buffer>& buffer) -> const void* {
return buffer ? buffer->data() : nullptr;
});

if (need_variadic_buffer_sizes) {
auto variadic_buffers = util::span(data->buffers).subspan(2);
export_.variadic_buffer_sizes_.resize(variadic_buffers.size());
for (auto [size, buf] : Zip(export_.variadic_buffer_sizes_, variadic_buffers)) {
size = buf->size();
}
export_.buffers_.back() = export_.variadic_buffer_sizes_.data();
}

// Export dictionary
if (data->dictionary != nullptr) {
dict_exporter_ = std::make_unique<ArrayExporter>();
Expand Down Expand Up @@ -796,7 +816,7 @@ Status InvalidFormatString(std::string_view v) {

class FormatStringParser {
public:
FormatStringParser() {}
FormatStringParser() = default;

explicit FormatStringParser(std::string_view v) : view_(v), index_(0) {}

Expand Down Expand Up @@ -942,8 +962,6 @@ Result<DecodedMetadata> DecodeMetadata(const char* metadata) {
}

struct SchemaImporter {
SchemaImporter() : c_struct_(nullptr), guard_(nullptr) {}

Status Import(struct ArrowSchema* src) {
if (ArrowSchemaIsReleased(src)) {
return Status::Invalid("Cannot import released ArrowSchema");
Expand Down Expand Up @@ -1355,8 +1373,8 @@ struct SchemaImporter {
return Status::OK();
}

struct ArrowSchema* c_struct_;
SchemaExportGuard guard_;
struct ArrowSchema* c_struct_{nullptr};
SchemaExportGuard guard_{nullptr};
FormatStringParser f_parser_;
int64_t recursion_level_;
std::vector<SchemaImporter> child_importers_;
Expand Down Expand Up @@ -1424,7 +1442,7 @@ class ImportedBuffer : public Buffer {
std::shared_ptr<ImportedArrayData> import)
: Buffer(data, size, mm, nullptr, device_type), import_(std::move(import)) {}

~ImportedBuffer() override {}
~ImportedBuffer() override = default;

std::shared_ptr<Device::SyncEvent> device_sync_event() override {
return import_->device_sync_;
Expand All @@ -1436,9 +1454,7 @@ class ImportedBuffer : public Buffer {

struct ArrayImporter {
explicit ArrayImporter(const std::shared_ptr<DataType>& type)
: type_(type),
zero_size_buffer_(std::make_shared<Buffer>(kZeroSizeArea, 0)),
device_type_(DeviceAllocationType::kCPU) {}
: type_(type), zero_size_buffer_(std::make_shared<Buffer>(kZeroSizeArea, 0)) {}

Status Import(struct ArrowDeviceArray* src, const DeviceMemoryMapper& mapper) {
ARROW_ASSIGN_OR_RAISE(memory_mgr_, mapper(src->device_type, src->device_id));
Expand Down Expand Up @@ -1586,6 +1602,10 @@ struct ArrayImporter {

Status Visit(const LargeBinaryType& type) { return ImportStringLike(type); }

Status Visit(const StringViewType& type) { return ImportBinaryView(type); }

Status Visit(const BinaryViewType& type) { return ImportBinaryView(type); }

Status Visit(const ListType& type) { return ImportListLike(type); }

Status Visit(const LargeListType& type) { return ImportListLike(type); }
Expand Down Expand Up @@ -1664,6 +1684,28 @@ struct ArrayImporter {
return Status::OK();
}

Status ImportBinaryView(const BinaryViewType&) {
RETURN_NOT_OK(CheckNoChildren());
if (c_struct_->n_buffers < 3) {
return Status::Invalid("Expected at least 3 buffers for imported type ",
type_->ToString(), ", ArrowArray struct has ",
c_struct_->n_buffers);
}
RETURN_NOT_OK(AllocateArrayData());
RETURN_NOT_OK(ImportNullBitmap());
RETURN_NOT_OK(ImportFixedSizeBuffer(1, BinaryViewType::kSize));

// The last C data buffer stores buffer sizes, and shouldn't be imported
auto* buffer_sizes =
static_cast<const int64_t*>(c_struct_->buffers[c_struct_->n_buffers - 1]);

for (int32_t buffer_id = 2; buffer_id < c_struct_->n_buffers - 1; ++buffer_id) {
RETURN_NOT_OK(ImportBuffer(buffer_id, buffer_sizes[buffer_id - 2]));
}
data_->buffers.pop_back();
return Status::OK();
}

template <typename StringType>
Status ImportStringLike(const StringType& type) {
RETURN_NOT_OK(CheckNoChildren());
Expand Down Expand Up @@ -1808,7 +1850,7 @@ struct ArrayImporter {
std::shared_ptr<Buffer> zero_size_buffer_;

std::shared_ptr<MemoryManager> memory_mgr_;
DeviceAllocationType device_type_;
DeviceAllocationType device_type_{};
};

} // namespace
Expand Down Expand Up @@ -2014,7 +2056,7 @@ class ArrayStreamBatchReader : public RecordBatchReader {
DCHECK(!ArrowArrayStreamIsReleased(&stream_));
}

~ArrayStreamBatchReader() {
~ArrayStreamBatchReader() override {
if (!ArrowArrayStreamIsReleased(&stream_)) {
ArrowArrayStreamRelease(&stream_);
}
Expand Down
18 changes: 16 additions & 2 deletions cpp/src/arrow/c/bridge_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/range.h"

// TODO(GH-37221): Remove these ifdef checks when compute dependency is removed
#ifdef ARROW_COMPUTE
Expand All @@ -57,6 +58,7 @@ using internal::ArrayStreamExportTraits;
using internal::checked_cast;
using internal::SchemaExportGuard;
using internal::SchemaExportTraits;
using internal::Zip;

template <typename T>
struct ExportTraits {};
Expand Down Expand Up @@ -558,12 +560,24 @@ struct ArrayExportChecker {
--expected_n_buffers;
++expected_buffers;
}
ASSERT_EQ(c_export->n_buffers, expected_n_buffers);
bool has_variadic_buffer_sizes = expected_data.type->id() == Type::STRING_VIEW ||
expected_data.type->id() == Type::BINARY_VIEW;
ASSERT_EQ(c_export->n_buffers, expected_n_buffers + has_variadic_buffer_sizes);
ASSERT_NE(c_export->buffers, nullptr);
for (int64_t i = 0; i < c_export->n_buffers; ++i) {

for (int64_t i = 0; i < expected_n_buffers; ++i) {
auto expected_ptr = expected_buffers[i] ? expected_buffers[i]->data() : nullptr;
ASSERT_EQ(c_export->buffers[i], expected_ptr);
}
if (has_variadic_buffer_sizes) {
auto variadic_buffers = util::span(expected_data.buffers).subspan(2);
auto variadic_buffer_sizes = util::span(
static_cast<const int64_t*>(c_export->buffers[c_export->n_buffers - 1]),
variadic_buffers.size());
for (auto [buf, size] : Zip(variadic_buffers, variadic_buffer_sizes)) {
ASSERT_EQ(buf->size(), size);
}
}

if (expected_data.dictionary != nullptr) {
// Recurse into dictionary
Expand Down
2 changes: 1 addition & 1 deletion dev/archery/archery/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
BOOL = ArrowBool()


@click.group()
@click.group(context_settings={"help_option_names": ["-h", "--help"]})
@click.option("--debug", type=BOOL, is_flag=True, default=False,
help="Increase logging with debugging output.")
@click.option("--pdb", type=BOOL, is_flag=True, default=False,
Expand Down
4 changes: 1 addition & 3 deletions dev/archery/archery/integration/datagen.py
Original file line number Diff line number Diff line change
Expand Up @@ -1862,9 +1862,7 @@ def _temp_path():
.skip_tester('Go')
.skip_tester('Java')
.skip_tester('JS')
.skip_tester('Rust')
.skip_format(SKIP_C_SCHEMA, 'C++')
.skip_format(SKIP_C_ARRAY, 'C++'),
.skip_tester('Rust'),

generate_extension_case()
.skip_tester('C#')
Expand Down
8 changes: 8 additions & 0 deletions docs/source/format/CDataInterface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,14 @@ parameterized extension types).
The ``ArrowArray`` structure exported from an extension array simply points
to the storage data of the extension array.

Binary view arrays
------------------

For binary or utf-8 view arrays, an extra buffer is appended which stores
the lengths of each variadic data buffer as ``int64_t``. This buffer is
necessary since these buffer lengths are not trivially extractable from
other data in an array of binary or utf-8 view type.

.. _c-data-interface-semantics:

Semantics
Expand Down

0 comments on commit 0afb739

Please sign in to comment.