Skip to content

Commit

Permalink
Merge pull request #429 from apache/get_serialized_size
Browse files Browse the repository at this point in the history
Get serialized size
  • Loading branch information
AlexanderSaydakov authored May 10, 2024
2 parents 145ab0d + 994e882 commit 85254b7
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 25 deletions.
2 changes: 1 addition & 1 deletion theta/include/theta_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class theta_build_helper{
// consistent way of initializing theta from p
// avoids multiplication if p == 1 since it might not yield MAX_THETA exactly
static uint64_t starting_theta_from_p(float p) {
if (p < 1) return static_cast<uint64_t>(theta_constants::MAX_THETA * p);
if (p < 1) return static_cast<uint64_t>(static_cast<double>(theta_constants::MAX_THETA) * p);
return theta_constants::MAX_THETA;
}

Expand Down
19 changes: 18 additions & 1 deletion theta/include/theta_sketch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,20 @@ class compact_theta_sketch_alloc: public theta_sketch_alloc<Allocator> {
virtual uint32_t get_num_retained() const;
virtual uint16_t get_seed_hash() const;

/**
* Computes maximum serialized size in bytes
* @param lg_k nominal number of entries in the sketch
*/
static size_t get_max_serialized_size_bytes(uint8_t lg_k);

/**
* Computes size in bytes required to serialize the current state of the sketch.
* Computing compressed size is expensive. It takes iterating over all retained hashes,
* and the actual serialization will have to look at them again.
* @param compressed if true compressed size is returned (if applicable)
*/
size_t get_serialized_size_bytes(bool compressed = false) const;

/**
* This method serializes the sketch into a given stream in a binary form
* @param os output stream
Expand Down Expand Up @@ -486,8 +500,11 @@ class compact_theta_sketch_alloc: public theta_sketch_alloc<Allocator> {
uint64_t theta_;
std::vector<uint64_t, Allocator> entries_;

uint8_t get_preamble_longs(bool compressed) const;
bool is_suitable_for_compression() const;
uint8_t compute_min_leading_zeros() const;
uint8_t compute_entry_bits() const;
uint8_t get_num_entries_bytes() const;
size_t get_compressed_serialized_size_bytes(uint8_t entry_bits, uint8_t num_entries_bytes) const;
void serialize_version_4(std::ostream& os) const;
vector_bytes serialize_version_4(unsigned header_size_bytes = 0) const;

Expand Down
65 changes: 44 additions & 21 deletions theta/include/theta_sketch_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
#include <vector>
#include <stdexcept>

#include "serde.hpp"
#include "binomial_bounds.hpp"
#include "theta_helpers.hpp"
#include "count_zeros.hpp"
#include "bit_packing.hpp"
#include "memory_operations.hpp"

namespace datasketches {

Expand Down Expand Up @@ -341,6 +341,39 @@ auto compact_theta_sketch_alloc<A>::end() const -> const_iterator {
template<typename A>
void compact_theta_sketch_alloc<A>::print_specifics(std::ostringstream&) const {}

template<typename A>
uint8_t compact_theta_sketch_alloc<A>::get_preamble_longs(bool compressed) const {
if (compressed) {
return this->is_estimation_mode() ? 2 : 1;
}
return this->is_estimation_mode() ? 3 : this->is_empty() || entries_.size() == 1 ? 1 : 2;
}

template<typename A>
size_t compact_theta_sketch_alloc<A>::get_max_serialized_size_bytes(uint8_t lg_k) {
return sizeof(uint64_t) * (3 + update_theta_sketch_alloc<A>::theta_table::get_capacity(lg_k + 1, lg_k));
}

template<typename A>
size_t compact_theta_sketch_alloc<A>::get_serialized_size_bytes(bool compressed) const {
if (compressed && is_suitable_for_compression()) {
return get_compressed_serialized_size_bytes(compute_entry_bits(), get_num_entries_bytes());
}
return sizeof(uint64_t) * get_preamble_longs(false) + sizeof(uint64_t) * entries_.size();
}

// store num_entries as whole bytes since whole-byte blocks will follow (most probably)
template<typename A>
uint8_t compact_theta_sketch_alloc<A>::get_num_entries_bytes() const {
return whole_bytes_to_hold_bits<uint8_t>(32 - count_leading_zeros_in_u32(static_cast<uint32_t>(entries_.size())));
}

template<typename A>
size_t compact_theta_sketch_alloc<A>::get_compressed_serialized_size_bytes(uint8_t entry_bits, uint8_t num_entries_bytes) const {
const size_t compressed_bits = entry_bits * entries_.size();
return sizeof(uint64_t) * get_preamble_longs(true) + num_entries_bytes + whole_bytes_to_hold_bits(compressed_bits);
}

template<typename A>
void compact_theta_sketch_alloc<A>::serialize(std::ostream& os) const {
const uint8_t preamble_longs = this->is_estimation_mode() ? 3 : this->is_empty() || entries_.size() == 1 ? 1 : 2;
Expand All @@ -366,12 +399,10 @@ void compact_theta_sketch_alloc<A>::serialize(std::ostream& os) const {

template<typename A>
auto compact_theta_sketch_alloc<A>::serialize(unsigned header_size_bytes) const -> vector_bytes {
const uint8_t preamble_longs = this->is_estimation_mode() ? 3 : this->is_empty() || entries_.size() == 1 ? 1 : 2;
const size_t size = header_size_bytes + sizeof(uint64_t) * preamble_longs
+ sizeof(uint64_t) * entries_.size();
const size_t size = get_serialized_size_bytes() + header_size_bytes;
vector_bytes bytes(size, 0, entries_.get_allocator());
uint8_t* ptr = bytes.data() + header_size_bytes;

const uint8_t preamble_longs = get_preamble_longs(false);
*ptr++ = preamble_longs;
*ptr++ = UNCOMPRESSED_SERIAL_VERSION;
*ptr++ = SKETCH_TYPE;
Expand Down Expand Up @@ -413,7 +444,7 @@ auto compact_theta_sketch_alloc<A>::serialize_compressed(unsigned header_size_by
}

template<typename A>
uint8_t compact_theta_sketch_alloc<A>::compute_min_leading_zeros() const {
uint8_t compact_theta_sketch_alloc<A>::compute_entry_bits() const {
// compression is based on leading zeros in deltas between ordered hash values
// assumes ordered sketch
uint64_t previous = 0;
Expand All @@ -423,16 +454,14 @@ uint8_t compact_theta_sketch_alloc<A>::compute_min_leading_zeros() const {
ored |= delta;
previous = entry;
}
return count_leading_zeros_in_u64(ored);
return 64 - count_leading_zeros_in_u64(ored);
}

template<typename A>
void compact_theta_sketch_alloc<A>::serialize_version_4(std::ostream& os) const {
const uint8_t preamble_longs = this->is_estimation_mode() ? 2 : 1;
const uint8_t entry_bits = 64 - compute_min_leading_zeros();

// store num_entries as whole bytes since whole-byte blocks will follow (most probably)
const uint8_t num_entries_bytes = whole_bytes_to_hold_bits<uint8_t>(32 - count_leading_zeros_in_u32(static_cast<uint32_t>(entries_.size())));
const uint8_t entry_bits = compute_entry_bits();
const uint8_t num_entries_bytes = get_num_entries_bytes();

write(os, preamble_longs);
write(os, COMPRESSED_SERIAL_VERSION);
Expand Down Expand Up @@ -483,19 +512,13 @@ void compact_theta_sketch_alloc<A>::serialize_version_4(std::ostream& os) const

template<typename A>
auto compact_theta_sketch_alloc<A>::serialize_version_4(unsigned header_size_bytes) const -> vector_bytes {
const uint8_t preamble_longs = this->is_estimation_mode() ? 2 : 1;
const uint8_t entry_bits = 64 - compute_min_leading_zeros();
const size_t compressed_bits = entry_bits * entries_.size();

// store num_entries as whole bytes since whole-byte blocks will follow (most probably)
const uint8_t num_entries_bytes = whole_bytes_to_hold_bits<uint8_t>(32 - count_leading_zeros_in_u32(static_cast<uint32_t>(entries_.size())));

const size_t size = header_size_bytes + sizeof(uint64_t) * preamble_longs + num_entries_bytes
+ whole_bytes_to_hold_bits(compressed_bits);
const uint8_t entry_bits = compute_entry_bits();
const uint8_t num_entries_bytes = get_num_entries_bytes();
const size_t size = get_compressed_serialized_size_bytes(entry_bits, num_entries_bytes) + header_size_bytes;
vector_bytes bytes(size, 0, entries_.get_allocator());
uint8_t* ptr = bytes.data() + header_size_bytes;

*ptr++ = preamble_longs;
*ptr++ = get_preamble_longs(true);
*ptr++ = COMPRESSED_SERIAL_VERSION;
*ptr++ = SKETCH_TYPE;
*ptr++ = entry_bits;
Expand Down
34 changes: 32 additions & 2 deletions theta/test/theta_sketch_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,11 @@ TEST_CASE("theta sketch: serialize deserialize stream and bytes equivalence", "[
for (int i = 0; i < n; i++) update_sketch.update(i);

std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
update_sketch.compact().serialize(s);
auto bytes = update_sketch.compact().serialize();
auto compact_sketch = update_sketch.compact();
compact_sketch.serialize(s);
auto bytes = compact_sketch.serialize();
REQUIRE(bytes.size() == static_cast<size_t>(s.tellp()));
REQUIRE(bytes.size() == compact_sketch.get_serialized_size_bytes());
for (size_t i = 0; i < bytes.size(); ++i) {
REQUIRE(((char*)bytes.data())[i] == (char)s.get());
}
Expand Down Expand Up @@ -521,6 +523,7 @@ TEST_CASE("theta sketch: serialize deserialize compressed", "[theta_sketch]") {
auto compact_sketch = update_sketch.compact();

auto bytes = compact_sketch.serialize_compressed();
REQUIRE(bytes.size() == compact_sketch.get_serialized_size_bytes(true));
{ // deserialize bytes
auto deserialized_sketch = compact_theta_sketch::deserialize(bytes.data(), bytes.size());
REQUIRE(deserialized_sketch.get_num_retained() == compact_sketch.get_num_retained());
Expand All @@ -544,6 +547,7 @@ TEST_CASE("theta sketch: serialize deserialize compressed", "[theta_sketch]") {

std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
compact_sketch.serialize_compressed(s);
REQUIRE(static_cast<size_t>(s.tellp()) == compact_sketch.get_serialized_size_bytes(true));
auto deserialized_sketch = compact_theta_sketch::deserialize(s);
REQUIRE(deserialized_sketch.get_num_retained() == compact_sketch.get_num_retained());
REQUIRE(deserialized_sketch.get_theta() == compact_sketch.get_theta());
Expand All @@ -554,4 +558,30 @@ TEST_CASE("theta sketch: serialize deserialize compressed", "[theta_sketch]") {
}
}

// The sketch reaches capacity for the first time at 2 * K * 15/16,
// but at that point it is still in exact mode, so the serialized size is not the maximum
// (theta in not serialized in the exact mode).
// So we need to catch the second time, but some updates will be ignored in the estimation mode,
// so we update more than enough times keeping track of the maximum.
// Potentially the exact number of updates to reach the peak can be figured out given this particular sequence,
// but not assuming that might be even better (say, in case we change the load factor or hash function
// or just out of principle not to rely on implementation details too much).
TEST_CASE("max serialized size", "[theta_sketch]") {
const uint8_t lg_k = 10;
auto sketch = update_theta_sketch::builder().set_lg_k(lg_k).build();
int value = 0;

// this will go over the first peak, which is not the highest
for (int i = 0; i < (1 << lg_k) * 2; ++i) sketch.update(value++);

// this will to over the second peak keeping track of the max size
size_t max_size_bytes = 0;
for (int i = 0; i < (1 << lg_k) * 2; ++i) {
sketch.update(value++);
auto bytes = sketch.compact().serialize();
max_size_bytes = std::max(max_size_bytes, bytes.size());
}
REQUIRE(max_size_bytes == compact_theta_sketch::get_max_serialized_size_bytes(lg_k));
}

} /* namespace datasketches */

0 comments on commit 85254b7

Please sign in to comment.