diff options
author | Rutger Broekhoff | 2024-05-02 20:27:40 +0200 |
---|---|---|
committer | Rutger Broekhoff | 2024-05-02 20:27:40 +0200 |
commit | 17a3ea880402338420699e03bcb24181e4ff3924 (patch) | |
tree | da666ef91e0b60d20aa0b01529644c136fd1f4ab /src/filterkv6/main.cpp | |
download | oeuf-17a3ea880402338420699e03bcb24181e4ff3924.tar.gz oeuf-17a3ea880402338420699e03bcb24181e4ff3924.zip |
Initial commit
Based on dc4ba6a
Diffstat (limited to 'src/filterkv6/main.cpp')
-rw-r--r-- | src/filterkv6/main.cpp | 106 |
1 files changed, 106 insertions, 0 deletions
diff --git a/src/filterkv6/main.cpp b/src/filterkv6/main.cpp new file mode 100644 index 0000000..a32220a --- /dev/null +++ b/src/filterkv6/main.cpp | |||
@@ -0,0 +1,106 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #include <chrono> | ||
4 | #include <deque> | ||
5 | #include <filesystem> | ||
6 | #include <format> | ||
7 | #include <fstream> | ||
8 | #include <iostream> | ||
9 | |||
10 | #include <arrow/api.h> | ||
11 | #include <arrow/compute/api.h> | ||
12 | #include <arrow/filesystem/api.h> | ||
13 | #include <arrow/dataset/api.h> | ||
14 | #include <arrow/io/api.h> | ||
15 | |||
16 | #include <tmi8/kv6_parquet.hpp> | ||
17 | |||
18 | namespace ds = arrow::dataset; | ||
19 | namespace cp = arrow::compute; | ||
20 | using namespace arrow; | ||
21 | |||
22 | arrow::Status processTables(std::string lineno) { | ||
23 | auto filesystem = std::make_shared<fs::LocalFileSystem>(); | ||
24 | |||
25 | fs::FileSelector selector; | ||
26 | selector.base_dir = std::filesystem::current_path(); | ||
27 | selector.recursive = false; | ||
28 | |||
29 | auto format = std::static_pointer_cast<ds::FileFormat>(std::make_shared<ds::ParquetFileFormat>()); | ||
30 | |||
31 | ARROW_ASSIGN_OR_RAISE(auto factory, | ||
32 | ds::FileSystemDatasetFactory::Make(filesystem, selector, format, | ||
33 | ds::FileSystemFactoryOptions())); | ||
34 | |||
35 | ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish()); | ||
36 | |||
37 | printf("Scanning dataset for line %s...\n", lineno.c_str()); | ||
38 | // Read specified columns with a row filter | ||
39 | ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan()); | ||
40 | ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::and_({ | ||
41 | cp::equal(cp::field_ref("line_planning_number"), cp::literal(lineno)), | ||
42 | cp::is_valid(cp::field_ref("rd_x")), | ||
43 | cp::is_valid(cp::field_ref("rd_y")), | ||
44 | }))); | ||
45 | |||
46 | ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish()); | ||
47 | ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable()); | ||
48 | |||
49 | puts("Finished loading data, computing stable sort indices..."); | ||
50 | |||
51 | arrow::Datum sort_indices; | ||
52 | cp::SortOptions sort_options; | ||
53 | sort_options.sort_keys = { cp::SortKey("timestamp" /* ascending by default */) }; | ||
54 | ARROW_ASSIGN_OR_RAISE(sort_indices, cp::CallFunction("sort_indices", { table }, &sort_options)); | ||
55 | puts("Finished computing stable sort indices, creating sorted table..."); | ||
56 | |||
57 | arrow::Datum sorted; | ||
58 | ARROW_ASSIGN_OR_RAISE(sorted, cp::CallFunction("take", { table, sort_indices })); | ||
59 | |||
60 | puts("Writing sorted table to disk..."); | ||
61 | ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*sorted.table(), "merged/oeuf-merged.parquet")); | ||
62 | puts("Syncing..."); | ||
63 | sync(); | ||
64 | puts("Done. Have a nice day."); | ||
65 | |||
66 | return arrow::Status::OK(); | ||
67 | } | ||
68 | |||
69 | #define NOTICE "Notice: This tool will fail if any non-Parquet files in are present in the\n" \ | ||
70 | " current working directory. It does not load files which are present in\n" \ | ||
71 | " any possible subdirectories." | ||
72 | |||
73 | const char help[] = | ||
74 | "Usage: %s <LINENO>\n" | ||
75 | "\n" | ||
76 | " LINENO The LinePlanningNumber as in the KV1/KV6 data\n\n" | ||
77 | NOTICE "\n"; | ||
78 | |||
79 | void exitHelp(const char *progname, int code = 1) { | ||
80 | printf(help, progname); | ||
81 | exit(code); | ||
82 | } | ||
83 | |||
84 | int main(int argc, char *argv[]) { | ||
85 | const char *progname = argv[0]; | ||
86 | if (argc != 2) { | ||
87 | puts("Error: incorrect number of arguments provided\n"); | ||
88 | exitHelp(progname); | ||
89 | } | ||
90 | char *lineno = argv[1]; | ||
91 | puts(NOTICE "\n"); | ||
92 | |||
93 | std::filesystem::path cwd = std::filesystem::current_path(); | ||
94 | std::filesystem::create_directory(cwd / "merged"); | ||
95 | |||
96 | puts("Running this program may take a while, especially on big datasets. If you're\n" | ||
97 | "processing the data of a single bus line over the course of multiple months,\n" | ||
98 | "you may see memory usage of up to 10 GiB. Make sure that you have sufficient\n" | ||
99 | "RAM available, to avoid overloading and subsequently freezing your system.\n"); | ||
100 | |||
101 | arrow::Status st = processTables(std::string(lineno)); | ||
102 | if (!st.ok()) { | ||
103 | std::cerr << "Failed to process tables: " << st << std::endl; | ||
104 | return EXIT_FAILURE; | ||
105 | } | ||
106 | } | ||