aboutsummaryrefslogtreecommitdiffstats
path: root/lib/libtmi8/src/kv6_parquet.cpp
diff options
context:
space:
mode:
authorLibravatar Rutger Broekhoff2024-05-02 20:27:40 +0200
committerLibravatar Rutger Broekhoff2024-05-02 20:27:40 +0200
commit17a3ea880402338420699e03bcb24181e4ff3924 (patch)
treeda666ef91e0b60d20aa0b01529644c136fd1f4ab /lib/libtmi8/src/kv6_parquet.cpp
downloadoeuf-17a3ea880402338420699e03bcb24181e4ff3924.tar.gz
oeuf-17a3ea880402338420699e03bcb24181e4ff3924.zip
Initial commit
Based on dc4ba6a
Diffstat (limited to 'lib/libtmi8/src/kv6_parquet.cpp')
-rw-r--r--lib/libtmi8/src/kv6_parquet.cpp102
1 files changed, 102 insertions, 0 deletions
diff --git a/lib/libtmi8/src/kv6_parquet.cpp b/lib/libtmi8/src/kv6_parquet.cpp
new file mode 100644
index 0000000..ca70b7f
--- /dev/null
+++ b/lib/libtmi8/src/kv6_parquet.cpp
@@ -0,0 +1,102 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include <tmi8/kv6_parquet.hpp>
4
5ParquetBuilder::ParquetBuilder() {
6 std::shared_ptr<arrow::Field> field_type, field_data_owner_code, field_line_planning_number, field_operating_day,
7 field_journey_number, field_reinforcement_number, field_timestamp, field_source,
8 field_punctuality, field_user_stop_code, field_passage_sequence_number,
9 field_vehicle_number, field_block_code, field_wheelchair_accessible,
10 field_number_of_coaches, field_rd_y, field_rd_x, field_distance_since_last_user_stop;
11 field_type = arrow::field("type", arrow::utf8());
12 field_data_owner_code = arrow::field("data_owner_code", arrow::utf8());
13 field_line_planning_number = arrow::field("line_planning_number", arrow::utf8());
14 field_operating_day = arrow::field("operating_day", arrow::date32());
15 field_journey_number = arrow::field("journey_number", arrow::uint32());
16 field_reinforcement_number = arrow::field("reinforcement_number", arrow::uint8());
17 field_timestamp = arrow::field("timestamp", arrow::timestamp(arrow::TimeUnit::SECOND));
18 field_source = arrow::field("source", arrow::utf8());
19 field_punctuality = arrow::field("punctuality", arrow::int16());
20 field_user_stop_code = arrow::field("user_stop_code", arrow::utf8());
21 field_passage_sequence_number = arrow::field("passage_sequence_number", arrow::uint16());
22 field_vehicle_number = arrow::field("vehicle_number", arrow::uint32());
23 field_block_code = arrow::field("block_code", arrow::uint32());
24 field_wheelchair_accessible = arrow::field("wheelchair_accessible", arrow::utf8());
25 field_number_of_coaches = arrow::field("number_of_coaches", arrow::uint8());
26 field_rd_y = arrow::field("rd_y", arrow::int32());
27 field_rd_x = arrow::field("rd_x", arrow::int32());
28 field_distance_since_last_user_stop = arrow::field("distance_since_last_user_stop", arrow::uint32());
29
30 schema = arrow::schema({ field_type, field_data_owner_code, field_line_planning_number,
31 field_operating_day, field_journey_number,
32 field_reinforcement_number, field_timestamp, field_source,
33 field_punctuality, field_user_stop_code,
34 field_passage_sequence_number, field_vehicle_number,
35 field_block_code, field_wheelchair_accessible,
36 field_number_of_coaches, field_rd_y, field_rd_x,
37 field_distance_since_last_user_stop });
38}
39
40arrow::Result<std::shared_ptr<arrow::Table>> ParquetBuilder::getTable() {
41 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> types, types.Finish());
42 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> data_owner_codes, data_owner_codes.Finish());
43 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> line_planning_numbers, line_planning_numbers.Finish());
44 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> operating_days, operating_days.Finish());
45 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> journey_numbers, journey_numbers.Finish());
46 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> reinforcement_numbers, reinforcement_numbers.Finish());
47 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> timestamps, timestamps.Finish());
48 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> sources, sources.Finish());
49 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> punctualities, punctualities.Finish());
50 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> user_stop_codes, user_stop_codes.Finish());
51 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> passage_sequence_numbers, passage_sequence_numbers.Finish());
52 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> vehicle_numbers, vehicle_numbers.Finish());
53 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> block_codes, block_codes.Finish());
54 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> wheelchair_accessibles, wheelchair_accessibles.Finish());
55 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> number_of_coaches, number_of_coaches.Finish());
56 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> rd_ys, rd_ys.Finish());
57 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> rd_xs, rd_xs.Finish());
58 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> distance_since_last_user_stops, distance_since_last_user_stops.Finish());
59
60 std::vector<std::shared_ptr<arrow::Array>> columns = { types, data_owner_codes, line_planning_numbers, operating_days,
61 journey_numbers, reinforcement_numbers, timestamps, sources,
62 punctualities, user_stop_codes, passage_sequence_numbers,
63 vehicle_numbers, block_codes, wheelchair_accessibles,
64 number_of_coaches, rd_ys, rd_xs,
65 distance_since_last_user_stops };
66 return arrow::Result(arrow::Table::Make(schema, columns));
67}
68
69arrow::Status writeArrowRecordsAsParquetFile(arrow::RecordBatchReader &rbr, std::filesystem::path filename) {
70 std::shared_ptr<parquet::WriterProperties> props = parquet::WriterProperties::Builder()
71 .compression(arrow::Compression::ZSTD)
72 ->created_by("oeuf-libtmi8")
73 ->version(parquet::ParquetVersion::PARQUET_2_6)
74 ->data_page_version(parquet::ParquetDataPageVersion::V2)
75 ->max_row_group_length(MAX_PARQUET_CHUNK)
76 ->build();
77
78 std::shared_ptr<parquet::ArrowWriterProperties> arrow_props = parquet::ArrowWriterProperties::Builder()
79 .store_schema()->build();
80
81 std::shared_ptr<arrow::io::FileOutputStream> out_file;
82 std::string filename_str = filename;
83 ARROW_ASSIGN_OR_RAISE(out_file, arrow::io::FileOutputStream::Open(filename_str + ".part"));
84
85 ARROW_ASSIGN_OR_RAISE(auto writer,
86 parquet::arrow::FileWriter::Open(*rbr.schema(), arrow::default_memory_pool(), out_file, props, arrow_props));
87 for (const auto &batchr : rbr) {
88 ARROW_ASSIGN_OR_RAISE(auto batch, batchr);
89 ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
90 }
91 ARROW_RETURN_NOT_OK(writer->Close());
92 ARROW_RETURN_NOT_OK(out_file->Close());
93
94 std::filesystem::rename(filename_str + ".part", filename);
95
96 return arrow::Status::OK();
97}
98
99arrow::Status writeArrowTableAsParquetFile(const arrow::Table &table, std::filesystem::path filename) {
100 auto tbr = arrow::TableBatchReader(table);
101 return writeArrowRecordsAsParquetFile(tbr, filename);
102}