aboutsummaryrefslogtreecommitdiffstats
path: root/src/filterkv6/main.cpp
diff options
context:
space:
mode:
authorLibravatar Rutger Broekhoff2024-05-02 20:27:40 +0200
committerLibravatar Rutger Broekhoff2024-05-02 20:27:40 +0200
commit17a3ea880402338420699e03bcb24181e4ff3924 (patch)
treeda666ef91e0b60d20aa0b01529644c136fd1f4ab /src/filterkv6/main.cpp
downloadoeuf-17a3ea880402338420699e03bcb24181e4ff3924.tar.gz
oeuf-17a3ea880402338420699e03bcb24181e4ff3924.zip
Initial commit
Based on dc4ba6a
Diffstat (limited to 'src/filterkv6/main.cpp')
-rw-r--r--src/filterkv6/main.cpp106
1 files changed, 106 insertions, 0 deletions
diff --git a/src/filterkv6/main.cpp b/src/filterkv6/main.cpp
new file mode 100644
index 0000000..a32220a
--- /dev/null
+++ b/src/filterkv6/main.cpp
@@ -0,0 +1,106 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include <chrono>
4#include <deque>
5#include <filesystem>
6#include <format>
7#include <fstream>
8#include <iostream>
9
10#include <arrow/api.h>
11#include <arrow/compute/api.h>
12#include <arrow/filesystem/api.h>
13#include <arrow/dataset/api.h>
14#include <arrow/io/api.h>
15
16#include <tmi8/kv6_parquet.hpp>
17
18namespace ds = arrow::dataset;
19namespace cp = arrow::compute;
20using namespace arrow;
21
22arrow::Status processTables(std::string lineno) {
23 auto filesystem = std::make_shared<fs::LocalFileSystem>();
24
25 fs::FileSelector selector;
26 selector.base_dir = std::filesystem::current_path();
27 selector.recursive = false;
28
29 auto format = std::static_pointer_cast<ds::FileFormat>(std::make_shared<ds::ParquetFileFormat>());
30
31 ARROW_ASSIGN_OR_RAISE(auto factory,
32 ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
33 ds::FileSystemFactoryOptions()));
34
35 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
36
37 printf("Scanning dataset for line %s...\n", lineno.c_str());
38 // Read specified columns with a row filter
39 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
40 ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::and_({
41 cp::equal(cp::field_ref("line_planning_number"), cp::literal(lineno)),
42 cp::is_valid(cp::field_ref("rd_x")),
43 cp::is_valid(cp::field_ref("rd_y")),
44 })));
45
46 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
47 ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable());
48
49 puts("Finished loading data, computing stable sort indices...");
50
51 arrow::Datum sort_indices;
52 cp::SortOptions sort_options;
53 sort_options.sort_keys = { cp::SortKey("timestamp" /* ascending by default */) };
54 ARROW_ASSIGN_OR_RAISE(sort_indices, cp::CallFunction("sort_indices", { table }, &sort_options));
55 puts("Finished computing stable sort indices, creating sorted table...");
56
57 arrow::Datum sorted;
58 ARROW_ASSIGN_OR_RAISE(sorted, cp::CallFunction("take", { table, sort_indices }));
59
60 puts("Writing sorted table to disk...");
61 ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*sorted.table(), "merged/oeuf-merged.parquet"));
62 puts("Syncing...");
63 sync();
64 puts("Done. Have a nice day.");
65
66 return arrow::Status::OK();
67}
68
69#define NOTICE "Notice: This tool will fail if any non-Parquet files in are present in the\n" \
70 " current working directory. It does not load files which are present in\n" \
71 " any possible subdirectories."
72
73const char help[] =
74 "Usage: %s <LINENO>\n"
75 "\n"
76 " LINENO The LinePlanningNumber as in the KV1/KV6 data\n\n"
77 NOTICE "\n";
78
79void exitHelp(const char *progname, int code = 1) {
80 printf(help, progname);
81 exit(code);
82}
83
84int main(int argc, char *argv[]) {
85 const char *progname = argv[0];
86 if (argc != 2) {
87 puts("Error: incorrect number of arguments provided\n");
88 exitHelp(progname);
89 }
90 char *lineno = argv[1];
91 puts(NOTICE "\n");
92
93 std::filesystem::path cwd = std::filesystem::current_path();
94 std::filesystem::create_directory(cwd / "merged");
95
96 puts("Running this program may take a while, especially on big datasets. If you're\n"
97 "processing the data of a single bus line over the course of multiple months,\n"
98 "you may see memory usage of up to 10 GiB. Make sure that you have sufficient\n"
99 "RAM available, to avoid overloading and subsequently freezing your system.\n");
100
101 arrow::Status st = processTables(std::string(lineno));
102 if (!st.ok()) {
103 std::cerr << "Failed to process tables: " << st << std::endl;
104 return EXIT_FAILURE;
105 }
106}