diff options
author | Rutger Broekhoff | 2024-05-02 20:27:40 +0200 |
---|---|---|
committer | Rutger Broekhoff | 2024-05-02 20:27:40 +0200 |
commit | 17a3ea880402338420699e03bcb24181e4ff3924 (patch) | |
tree | da666ef91e0b60d20aa0b01529644c136fd1f4ab /lib/libtmi8/src/kv6_parquet.cpp | |
download | oeuf-17a3ea880402338420699e03bcb24181e4ff3924.tar.gz oeuf-17a3ea880402338420699e03bcb24181e4ff3924.zip |
Initial commit
Based on dc4ba6a
Diffstat (limited to 'lib/libtmi8/src/kv6_parquet.cpp')
-rw-r--r-- | lib/libtmi8/src/kv6_parquet.cpp | 102 |
1 files changed, 102 insertions, 0 deletions
diff --git a/lib/libtmi8/src/kv6_parquet.cpp b/lib/libtmi8/src/kv6_parquet.cpp new file mode 100644 index 0000000..ca70b7f --- /dev/null +++ b/lib/libtmi8/src/kv6_parquet.cpp | |||
@@ -0,0 +1,102 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #include <tmi8/kv6_parquet.hpp> | ||
4 | |||
5 | ParquetBuilder::ParquetBuilder() { | ||
6 | std::shared_ptr<arrow::Field> field_type, field_data_owner_code, field_line_planning_number, field_operating_day, | ||
7 | field_journey_number, field_reinforcement_number, field_timestamp, field_source, | ||
8 | field_punctuality, field_user_stop_code, field_passage_sequence_number, | ||
9 | field_vehicle_number, field_block_code, field_wheelchair_accessible, | ||
10 | field_number_of_coaches, field_rd_y, field_rd_x, field_distance_since_last_user_stop; | ||
11 | field_type = arrow::field("type", arrow::utf8()); | ||
12 | field_data_owner_code = arrow::field("data_owner_code", arrow::utf8()); | ||
13 | field_line_planning_number = arrow::field("line_planning_number", arrow::utf8()); | ||
14 | field_operating_day = arrow::field("operating_day", arrow::date32()); | ||
15 | field_journey_number = arrow::field("journey_number", arrow::uint32()); | ||
16 | field_reinforcement_number = arrow::field("reinforcement_number", arrow::uint8()); | ||
17 | field_timestamp = arrow::field("timestamp", arrow::timestamp(arrow::TimeUnit::SECOND)); | ||
18 | field_source = arrow::field("source", arrow::utf8()); | ||
19 | field_punctuality = arrow::field("punctuality", arrow::int16()); | ||
20 | field_user_stop_code = arrow::field("user_stop_code", arrow::utf8()); | ||
21 | field_passage_sequence_number = arrow::field("passage_sequence_number", arrow::uint16()); | ||
22 | field_vehicle_number = arrow::field("vehicle_number", arrow::uint32()); | ||
23 | field_block_code = arrow::field("block_code", arrow::uint32()); | ||
24 | field_wheelchair_accessible = arrow::field("wheelchair_accessible", arrow::utf8()); | ||
25 | field_number_of_coaches = arrow::field("number_of_coaches", arrow::uint8()); | ||
26 | field_rd_y = arrow::field("rd_y", arrow::int32()); | ||
27 | field_rd_x = arrow::field("rd_x", arrow::int32()); | ||
28 | field_distance_since_last_user_stop = arrow::field("distance_since_last_user_stop", arrow::uint32()); | ||
29 | |||
30 | schema = arrow::schema({ field_type, field_data_owner_code, field_line_planning_number, | ||
31 | field_operating_day, field_journey_number, | ||
32 | field_reinforcement_number, field_timestamp, field_source, | ||
33 | field_punctuality, field_user_stop_code, | ||
34 | field_passage_sequence_number, field_vehicle_number, | ||
35 | field_block_code, field_wheelchair_accessible, | ||
36 | field_number_of_coaches, field_rd_y, field_rd_x, | ||
37 | field_distance_since_last_user_stop }); | ||
38 | } | ||
39 | |||
40 | arrow::Result<std::shared_ptr<arrow::Table>> ParquetBuilder::getTable() { | ||
41 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> types, types.Finish()); | ||
42 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> data_owner_codes, data_owner_codes.Finish()); | ||
43 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> line_planning_numbers, line_planning_numbers.Finish()); | ||
44 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> operating_days, operating_days.Finish()); | ||
45 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> journey_numbers, journey_numbers.Finish()); | ||
46 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> reinforcement_numbers, reinforcement_numbers.Finish()); | ||
47 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> timestamps, timestamps.Finish()); | ||
48 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> sources, sources.Finish()); | ||
49 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> punctualities, punctualities.Finish()); | ||
50 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> user_stop_codes, user_stop_codes.Finish()); | ||
51 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> passage_sequence_numbers, passage_sequence_numbers.Finish()); | ||
52 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> vehicle_numbers, vehicle_numbers.Finish()); | ||
53 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> block_codes, block_codes.Finish()); | ||
54 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> wheelchair_accessibles, wheelchair_accessibles.Finish()); | ||
55 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> number_of_coaches, number_of_coaches.Finish()); | ||
56 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> rd_ys, rd_ys.Finish()); | ||
57 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> rd_xs, rd_xs.Finish()); | ||
58 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> distance_since_last_user_stops, distance_since_last_user_stops.Finish()); | ||
59 | |||
60 | std::vector<std::shared_ptr<arrow::Array>> columns = { types, data_owner_codes, line_planning_numbers, operating_days, | ||
61 | journey_numbers, reinforcement_numbers, timestamps, sources, | ||
62 | punctualities, user_stop_codes, passage_sequence_numbers, | ||
63 | vehicle_numbers, block_codes, wheelchair_accessibles, | ||
64 | number_of_coaches, rd_ys, rd_xs, | ||
65 | distance_since_last_user_stops }; | ||
66 | return arrow::Result(arrow::Table::Make(schema, columns)); | ||
67 | } | ||
68 | |||
69 | arrow::Status writeArrowRecordsAsParquetFile(arrow::RecordBatchReader &rbr, std::filesystem::path filename) { | ||
70 | std::shared_ptr<parquet::WriterProperties> props = parquet::WriterProperties::Builder() | ||
71 | .compression(arrow::Compression::ZSTD) | ||
72 | ->created_by("oeuf-libtmi8") | ||
73 | ->version(parquet::ParquetVersion::PARQUET_2_6) | ||
74 | ->data_page_version(parquet::ParquetDataPageVersion::V2) | ||
75 | ->max_row_group_length(MAX_PARQUET_CHUNK) | ||
76 | ->build(); | ||
77 | |||
78 | std::shared_ptr<parquet::ArrowWriterProperties> arrow_props = parquet::ArrowWriterProperties::Builder() | ||
79 | .store_schema()->build(); | ||
80 | |||
81 | std::shared_ptr<arrow::io::FileOutputStream> out_file; | ||
82 | std::string filename_str = filename; | ||
83 | ARROW_ASSIGN_OR_RAISE(out_file, arrow::io::FileOutputStream::Open(filename_str + ".part")); | ||
84 | |||
85 | ARROW_ASSIGN_OR_RAISE(auto writer, | ||
86 | parquet::arrow::FileWriter::Open(*rbr.schema(), arrow::default_memory_pool(), out_file, props, arrow_props)); | ||
87 | for (const auto &batchr : rbr) { | ||
88 | ARROW_ASSIGN_OR_RAISE(auto batch, batchr); | ||
89 | ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); | ||
90 | } | ||
91 | ARROW_RETURN_NOT_OK(writer->Close()); | ||
92 | ARROW_RETURN_NOT_OK(out_file->Close()); | ||
93 | |||
94 | std::filesystem::rename(filename_str + ".part", filename); | ||
95 | |||
96 | return arrow::Status::OK(); | ||
97 | } | ||
98 | |||
99 | arrow::Status writeArrowTableAsParquetFile(const arrow::Table &table, std::filesystem::path filename) { | ||
100 | auto tbr = arrow::TableBatchReader(table); | ||
101 | return writeArrowRecordsAsParquetFile(tbr, filename); | ||
102 | } | ||