From 17a3ea880402338420699e03bcb24181e4ff3924 Mon Sep 17 00:00:00 2001 From: Rutger Broekhoff Date: Thu, 2 May 2024 20:27:40 +0200 Subject: Initial commit Based on dc4ba6a --- lib/libtmi8/src/kv6_parquet.cpp | 102 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 lib/libtmi8/src/kv6_parquet.cpp (limited to 'lib/libtmi8/src/kv6_parquet.cpp') diff --git a/lib/libtmi8/src/kv6_parquet.cpp b/lib/libtmi8/src/kv6_parquet.cpp new file mode 100644 index 0000000..ca70b7f --- /dev/null +++ b/lib/libtmi8/src/kv6_parquet.cpp @@ -0,0 +1,102 @@ +// vim:set sw=2 ts=2 sts et: + +#include + +ParquetBuilder::ParquetBuilder() { + std::shared_ptr field_type, field_data_owner_code, field_line_planning_number, field_operating_day, + field_journey_number, field_reinforcement_number, field_timestamp, field_source, + field_punctuality, field_user_stop_code, field_passage_sequence_number, + field_vehicle_number, field_block_code, field_wheelchair_accessible, + field_number_of_coaches, field_rd_y, field_rd_x, field_distance_since_last_user_stop; + field_type = arrow::field("type", arrow::utf8()); + field_data_owner_code = arrow::field("data_owner_code", arrow::utf8()); + field_line_planning_number = arrow::field("line_planning_number", arrow::utf8()); + field_operating_day = arrow::field("operating_day", arrow::date32()); + field_journey_number = arrow::field("journey_number", arrow::uint32()); + field_reinforcement_number = arrow::field("reinforcement_number", arrow::uint8()); + field_timestamp = arrow::field("timestamp", arrow::timestamp(arrow::TimeUnit::SECOND)); + field_source = arrow::field("source", arrow::utf8()); + field_punctuality = arrow::field("punctuality", arrow::int16()); + field_user_stop_code = arrow::field("user_stop_code", arrow::utf8()); + field_passage_sequence_number = arrow::field("passage_sequence_number", arrow::uint16()); + field_vehicle_number = arrow::field("vehicle_number", arrow::uint32()); + field_block_code = arrow::field("block_code", arrow::uint32()); + field_wheelchair_accessible = arrow::field("wheelchair_accessible", arrow::utf8()); + field_number_of_coaches = arrow::field("number_of_coaches", arrow::uint8()); + field_rd_y = arrow::field("rd_y", arrow::int32()); + field_rd_x = arrow::field("rd_x", arrow::int32()); + field_distance_since_last_user_stop = arrow::field("distance_since_last_user_stop", arrow::uint32()); + + schema = arrow::schema({ field_type, field_data_owner_code, field_line_planning_number, + field_operating_day, field_journey_number, + field_reinforcement_number, field_timestamp, field_source, + field_punctuality, field_user_stop_code, + field_passage_sequence_number, field_vehicle_number, + field_block_code, field_wheelchair_accessible, + field_number_of_coaches, field_rd_y, field_rd_x, + field_distance_since_last_user_stop }); +} + +arrow::Result> ParquetBuilder::getTable() { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr types, types.Finish()); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr data_owner_codes, data_owner_codes.Finish()); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr line_planning_numbers, line_planning_numbers.Finish()); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr operating_days, operating_days.Finish()); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr journey_numbers, journey_numbers.Finish()); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr reinforcement_numbers, reinforcement_numbers.Finish()); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr timestamps, timestamps.Finish()); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr sources, sources.Finish()); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr punctualities, punctualities.Finish()); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr user_stop_codes, user_stop_codes.Finish()); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr passage_sequence_numbers, passage_sequence_numbers.Finish()); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr vehicle_numbers, vehicle_numbers.Finish()); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr block_codes, block_codes.Finish()); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr wheelchair_accessibles, wheelchair_accessibles.Finish()); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr number_of_coaches, number_of_coaches.Finish()); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr rd_ys, rd_ys.Finish()); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr rd_xs, rd_xs.Finish()); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr distance_since_last_user_stops, distance_since_last_user_stops.Finish()); + + std::vector> columns = { types, data_owner_codes, line_planning_numbers, operating_days, + journey_numbers, reinforcement_numbers, timestamps, sources, + punctualities, user_stop_codes, passage_sequence_numbers, + vehicle_numbers, block_codes, wheelchair_accessibles, + number_of_coaches, rd_ys, rd_xs, + distance_since_last_user_stops }; + return arrow::Result(arrow::Table::Make(schema, columns)); +} + +arrow::Status writeArrowRecordsAsParquetFile(arrow::RecordBatchReader &rbr, std::filesystem::path filename) { + std::shared_ptr props = parquet::WriterProperties::Builder() + .compression(arrow::Compression::ZSTD) + ->created_by("oeuf-libtmi8") + ->version(parquet::ParquetVersion::PARQUET_2_6) + ->data_page_version(parquet::ParquetDataPageVersion::V2) + ->max_row_group_length(MAX_PARQUET_CHUNK) + ->build(); + + std::shared_ptr arrow_props = parquet::ArrowWriterProperties::Builder() + .store_schema()->build(); + + std::shared_ptr out_file; + std::string filename_str = filename; + ARROW_ASSIGN_OR_RAISE(out_file, arrow::io::FileOutputStream::Open(filename_str + ".part")); + + ARROW_ASSIGN_OR_RAISE(auto writer, + parquet::arrow::FileWriter::Open(*rbr.schema(), arrow::default_memory_pool(), out_file, props, arrow_props)); + for (const auto &batchr : rbr) { + ARROW_ASSIGN_OR_RAISE(auto batch, batchr); + ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); + } + ARROW_RETURN_NOT_OK(writer->Close()); + ARROW_RETURN_NOT_OK(out_file->Close()); + + std::filesystem::rename(filename_str + ".part", filename); + + return arrow::Status::OK(); +} + +arrow::Status writeArrowTableAsParquetFile(const arrow::Table &table, std::filesystem::path filename) { + auto tbr = arrow::TableBatchReader(table); + return writeArrowRecordsAsParquetFile(tbr, filename); +} -- cgit v1.2.3