diff options
author | Rutger Broekhoff | 2024-05-02 20:27:40 +0200 |
---|---|---|
committer | Rutger Broekhoff | 2024-05-02 20:27:40 +0200 |
commit | 17a3ea880402338420699e03bcb24181e4ff3924 (patch) | |
tree | da666ef91e0b60d20aa0b01529644c136fd1f4ab /src/filterkv6 | |
download | oeuf-17a3ea880402338420699e03bcb24181e4ff3924.tar.gz oeuf-17a3ea880402338420699e03bcb24181e4ff3924.zip |
Initial commit
Based on dc4ba6a
Diffstat (limited to 'src/filterkv6')
-rw-r--r-- | src/filterkv6/.envrc | 2 | ||||
-rw-r--r-- | src/filterkv6/Makefile | 21 | ||||
-rw-r--r-- | src/filterkv6/main.cpp | 106 |
3 files changed, 129 insertions, 0 deletions
diff --git a/src/filterkv6/.envrc b/src/filterkv6/.envrc new file mode 100644 index 0000000..694e74f --- /dev/null +++ b/src/filterkv6/.envrc | |||
@@ -0,0 +1,2 @@ | |||
1 | source_env ../../ | ||
2 | export DEVMODE=1 | ||
diff --git a/src/filterkv6/Makefile b/src/filterkv6/Makefile new file mode 100644 index 0000000..13bb38e --- /dev/null +++ b/src/filterkv6/Makefile | |||
@@ -0,0 +1,21 @@ | |||
1 | # Taken from: | ||
2 | # Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide | ||
3 | # for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01, | ||
4 | # 2023. [Online]. Available: | ||
5 | # https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html | ||
6 | CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\ | ||
7 | -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \ | ||
8 | -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \ | ||
9 | -D_GLIBCXX_ASSERTIONS \ | ||
10 | -fstrict-flex-arrays=3 \ | ||
11 | -fstack-clash-protection -fstack-protector-strong | ||
12 | LDFLAGS=-larrow -larrow_dataset -lparquet -ltmi8 -Wl,-z,defs \ | ||
13 | -Wl,-z,nodlopen -Wl,-z,noexecstack \ | ||
14 | -Wl,-z,relro -Wl,-z,now | ||
15 | |||
16 | filterkv6: main.cpp | ||
17 | $(CXX) -fPIE -pie -o $@ $^ $(CXXFLAGS) $(LDFLAGS) | ||
18 | |||
19 | .PHONY: clean | ||
20 | clean: | ||
21 | rm filterkv6 | ||
diff --git a/src/filterkv6/main.cpp b/src/filterkv6/main.cpp new file mode 100644 index 0000000..a32220a --- /dev/null +++ b/src/filterkv6/main.cpp | |||
@@ -0,0 +1,106 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #include <chrono> | ||
4 | #include <deque> | ||
5 | #include <filesystem> | ||
6 | #include <format> | ||
7 | #include <fstream> | ||
8 | #include <iostream> | ||
9 | |||
10 | #include <arrow/api.h> | ||
11 | #include <arrow/compute/api.h> | ||
12 | #include <arrow/filesystem/api.h> | ||
13 | #include <arrow/dataset/api.h> | ||
14 | #include <arrow/io/api.h> | ||
15 | |||
16 | #include <tmi8/kv6_parquet.hpp> | ||
17 | |||
18 | namespace ds = arrow::dataset; | ||
19 | namespace cp = arrow::compute; | ||
20 | using namespace arrow; | ||
21 | |||
22 | arrow::Status processTables(std::string lineno) { | ||
23 | auto filesystem = std::make_shared<fs::LocalFileSystem>(); | ||
24 | |||
25 | fs::FileSelector selector; | ||
26 | selector.base_dir = std::filesystem::current_path(); | ||
27 | selector.recursive = false; | ||
28 | |||
29 | auto format = std::static_pointer_cast<ds::FileFormat>(std::make_shared<ds::ParquetFileFormat>()); | ||
30 | |||
31 | ARROW_ASSIGN_OR_RAISE(auto factory, | ||
32 | ds::FileSystemDatasetFactory::Make(filesystem, selector, format, | ||
33 | ds::FileSystemFactoryOptions())); | ||
34 | |||
35 | ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish()); | ||
36 | |||
37 | printf("Scanning dataset for line %s...\n", lineno.c_str()); | ||
38 | // Read specified columns with a row filter | ||
39 | ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan()); | ||
40 | ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::and_({ | ||
41 | cp::equal(cp::field_ref("line_planning_number"), cp::literal(lineno)), | ||
42 | cp::is_valid(cp::field_ref("rd_x")), | ||
43 | cp::is_valid(cp::field_ref("rd_y")), | ||
44 | }))); | ||
45 | |||
46 | ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish()); | ||
47 | ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable()); | ||
48 | |||
49 | puts("Finished loading data, computing stable sort indices..."); | ||
50 | |||
51 | arrow::Datum sort_indices; | ||
52 | cp::SortOptions sort_options; | ||
53 | sort_options.sort_keys = { cp::SortKey("timestamp" /* ascending by default */) }; | ||
54 | ARROW_ASSIGN_OR_RAISE(sort_indices, cp::CallFunction("sort_indices", { table }, &sort_options)); | ||
55 | puts("Finished computing stable sort indices, creating sorted table..."); | ||
56 | |||
57 | arrow::Datum sorted; | ||
58 | ARROW_ASSIGN_OR_RAISE(sorted, cp::CallFunction("take", { table, sort_indices })); | ||
59 | |||
60 | puts("Writing sorted table to disk..."); | ||
61 | ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*sorted.table(), "merged/oeuf-merged.parquet")); | ||
62 | puts("Syncing..."); | ||
63 | sync(); | ||
64 | puts("Done. Have a nice day."); | ||
65 | |||
66 | return arrow::Status::OK(); | ||
67 | } | ||
68 | |||
69 | #define NOTICE "Notice: This tool will fail if any non-Parquet files in are present in the\n" \ | ||
70 | " current working directory. It does not load files which are present in\n" \ | ||
71 | " any possible subdirectories." | ||
72 | |||
73 | const char help[] = | ||
74 | "Usage: %s <LINENO>\n" | ||
75 | "\n" | ||
76 | " LINENO The LinePlanningNumber as in the KV1/KV6 data\n\n" | ||
77 | NOTICE "\n"; | ||
78 | |||
79 | void exitHelp(const char *progname, int code = 1) { | ||
80 | printf(help, progname); | ||
81 | exit(code); | ||
82 | } | ||
83 | |||
84 | int main(int argc, char *argv[]) { | ||
85 | const char *progname = argv[0]; | ||
86 | if (argc != 2) { | ||
87 | puts("Error: incorrect number of arguments provided\n"); | ||
88 | exitHelp(progname); | ||
89 | } | ||
90 | char *lineno = argv[1]; | ||
91 | puts(NOTICE "\n"); | ||
92 | |||
93 | std::filesystem::path cwd = std::filesystem::current_path(); | ||
94 | std::filesystem::create_directory(cwd / "merged"); | ||
95 | |||
96 | puts("Running this program may take a while, especially on big datasets. If you're\n" | ||
97 | "processing the data of a single bus line over the course of multiple months,\n" | ||
98 | "you may see memory usage of up to 10 GiB. Make sure that you have sufficient\n" | ||
99 | "RAM available, to avoid overloading and subsequently freezing your system.\n"); | ||
100 | |||
101 | arrow::Status st = processTables(std::string(lineno)); | ||
102 | if (!st.ok()) { | ||
103 | std::cerr << "Failed to process tables: " << st << std::endl; | ||
104 | return EXIT_FAILURE; | ||
105 | } | ||
106 | } | ||