Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: apache/arrow
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: main
Choose a base ref
...
head repository: viveris/arrow
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: main
Choose a head ref
Able to merge. These branches can be automatically merged.
  • 1 commit
  • 2 files changed
  • 1 contributor

Commits on Nov 15, 2024

  1. Allow writing BYTE_ARRAY with converted type NONE

    This allows to store binary data of arbitrary length in a parquet file,
    without having to wrongly declare it as UTF-8.
    
    Fixes the writer part of #42971
    
    The reader part has already been fixed in 4d82549
    and this uses a similar implementation, but with a stricter set of
    "exceptions" (only byte arrays with NONE type are allowed).
    pulkomandy committed Nov 15, 2024
    Copy the full SHA
    22663c1 View commit details
Showing with 43 additions and 7 deletions.
  1. +23 −4 cpp/src/parquet/stream_writer.cc
  2. +20 −3 cpp/src/parquet/stream_writer_test.cc
27 changes: 23 additions & 4 deletions cpp/src/parquet/stream_writer.cc
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@

#include "parquet/stream_writer.h"

#include <set>
#include <utility>

namespace parquet {
@@ -28,6 +29,19 @@ constexpr int16_t StreamWriter::kDefLevelOne;
constexpr int16_t StreamWriter::kRepLevelZero;
constexpr int64_t StreamWriter::kBatchSizeOne;

// The converted type may be NONE to store raw data in a byte array
// The following is a list of converted types which are allowed instead of the
// expected converted type.
// Each pair given is:
// {<StreamReader expected type>, <Parquet file converted type>}
// So for example {ConvertedType::INT_32, ConvertedType::NONE} means
// that if the StreamWriter was expecting the converted type INT_32,
// then it will allow to write a Parquet file using the converted type
// NONE.
//
static const std::set<std::pair<ConvertedType::type, ConvertedType::type>>
write_converted_type_exceptions = {{ConvertedType::UTF8, ConvertedType::NONE}};

StreamWriter::FixedStringView::FixedStringView(const char* data_ptr)
: data{data_ptr}, size{std::strlen(data_ptr)} {}

@@ -198,10 +212,15 @@ void StreamWriter::CheckColumn(Type::type physical_type,
"' not '" + TypeToString(physical_type) + "'");
}
if (converted_type != node->converted_type()) {
throw ParquetException("Column converted type mismatch. Column '" + node->name() +
"' has converted type[" +
ConvertedTypeToString(node->converted_type()) + "] not '" +
ConvertedTypeToString(converted_type) + "'");
// The converted type does not always match with the value
// provided so check the set of exceptions.
if (write_converted_type_exceptions.find({converted_type, node->converted_type()}) ==
write_converted_type_exceptions.end()) {
throw ParquetException("Column converted type mismatch. Column '" + node->name() +
"' has converted type[" +
ConvertedTypeToString(node->converted_type()) + "] not '" +
ConvertedTypeToString(converted_type) + "'");
}
}
// Length must be exact.
// A shorter length fixed array is not acceptable as it would
23 changes: 20 additions & 3 deletions cpp/src/parquet/stream_writer_test.cc
Original file line number Diff line number Diff line change
@@ -80,6 +80,9 @@ class TestStreamWriter : public ::testing::Test {
fields.push_back(schema::PrimitiveNode::Make("double_field", Repetition::REQUIRED,
Type::DOUBLE, ConvertedType::NONE));

fields.push_back(schema::PrimitiveNode::Make("bytes_field", Repetition::REQUIRED,
Type::BYTE_ARRAY, ConvertedType::NONE));

return std::static_pointer_cast<schema::GroupNode>(
schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));
}
@@ -99,7 +102,7 @@ TEST_F(TestStreamWriter, DefaultConstructed) {
EXPECT_EQ(0, os.current_column());
EXPECT_EQ(0, os.current_row());
EXPECT_EQ(0, os.num_columns());
EXPECT_EQ(0, os.SkipColumns(10));
EXPECT_EQ(0, os.SkipColumns(11));
}

TEST_F(TestStreamWriter, TypeChecking) {
@@ -162,6 +165,14 @@ TEST_F(TestStreamWriter, TypeChecking) {
EXPECT_THROW(writer_ << 5.4f, ParquetException);
EXPECT_NO_THROW(writer_ << 5.4);

// Required type: Variable length byte array.
EXPECT_EQ(10, writer_.current_column());
EXPECT_THROW(writer_ << 5, ParquetException);
EXPECT_THROW(writer_ << char3_array, ParquetException);
EXPECT_THROW(writer_ << char4_array, ParquetException);
EXPECT_THROW(writer_ << char5_array, ParquetException);
EXPECT_NO_THROW(writer_ << std::string_view("\xff\0ok", 4));

EXPECT_EQ(0, writer_.current_row());
EXPECT_NO_THROW(writer_ << EndRow);
EXPECT_EQ(1, writer_.current_row());
@@ -210,6 +221,10 @@ TEST_F(TestStreamWriter, RequiredFieldChecking) {
EXPECT_THROW(writer_ << optional<double>(), ParquetException);
EXPECT_NO_THROW(writer_ << optional<double>(5.4));

// Required field of type: Variable length byte array.
EXPECT_THROW(writer_ << optional<std::string>(), ParquetException);
EXPECT_NO_THROW(writer_ << optional<std::string>("ok"));

EXPECT_NO_THROW(writer_ << EndRow);
}

@@ -234,6 +249,7 @@ TEST_F(TestStreamWriter, EndRow) {
EXPECT_NO_THROW(writer_ << uint64_t((1ull << 60) + 123));
EXPECT_NO_THROW(writer_ << 25.4f);
EXPECT_NO_THROW(writer_ << 3.3424);
EXPECT_NO_THROW(writer_ << "ok");
// Correct use of end row after all fields have been output.
EXPECT_NO_THROW(writer_ << EndRow);
EXPECT_EQ(1, writer_.current_row());
@@ -272,6 +288,7 @@ TEST_F(TestStreamWriter, EndRowGroup) {
EXPECT_NO_THROW(writer_ << uint64_t((1ull << 60) - i * i)) << "index: " << i;
EXPECT_NO_THROW(writer_ << 42325.4f / float(i + 1)) << "index: " << i;
EXPECT_NO_THROW(writer_ << 3.2342e5 / double(i + 1)) << "index: " << i;
EXPECT_NO_THROW(writer_ << std::to_string(i)) << "index: " << i;
EXPECT_NO_THROW(writer_ << EndRow) << "index: " << i;

if (i % 1000 == 0) {
@@ -293,7 +310,7 @@ TEST_F(TestStreamWriter, SkipColumns) {
writer_ << true << std::string("Cannot skip mandatory columns");
EXPECT_THROW(writer_.SkipColumns(1), ParquetException);
writer_ << 'x' << std::array<char, 4>{'A', 'B', 'C', 'D'} << int8_t(2) << uint16_t(3)
<< int32_t(4) << uint64_t(5) << 6.0f << 7.0;
<< int32_t(4) << uint64_t(5) << 6.0f << 7.0 << "ok";
writer_ << EndRow;
}

@@ -304,7 +321,7 @@ TEST_F(TestStreamWriter, AppendNotImplemented) {
writer_ = StreamWriter{ParquetFileWriter::Open(outfile, GetSchema())};
writer_ << false << std::string("Just one row") << 'x'
<< std::array<char, 4>{'A', 'B', 'C', 'D'} << int8_t(2) << uint16_t(3)
<< int32_t(4) << uint64_t(5) << 6.0f << 7.0;
<< int32_t(4) << uint64_t(5) << 6.0f << 7.0 << "ok";
writer_ << EndRow;
writer_ = StreamWriter{};