aboutsummaryrefslogtreecommitdiffstats
path: root/src/filterkv6
diff options
context:
space:
mode:
Diffstat (limited to 'src/filterkv6')
-rw-r--r--src/filterkv6/.envrc2
-rw-r--r--src/filterkv6/Makefile21
-rw-r--r--src/filterkv6/main.cpp106
3 files changed, 129 insertions, 0 deletions
diff --git a/src/filterkv6/.envrc b/src/filterkv6/.envrc
new file mode 100644
index 0000000..694e74f
--- /dev/null
+++ b/src/filterkv6/.envrc
@@ -0,0 +1,2 @@
1source_env ../../
2export DEVMODE=1
diff --git a/src/filterkv6/Makefile b/src/filterkv6/Makefile
new file mode 100644
index 0000000..13bb38e
--- /dev/null
+++ b/src/filterkv6/Makefile
@@ -0,0 +1,21 @@
1# Taken from:
2# Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide
3# for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01,
4# 2023. [Online]. Available:
5# https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html
6CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\
7 -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \
8 -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \
9 -D_GLIBCXX_ASSERTIONS \
10 -fstrict-flex-arrays=3 \
11 -fstack-clash-protection -fstack-protector-strong
12LDFLAGS=-larrow -larrow_dataset -lparquet -ltmi8 -Wl,-z,defs \
13 -Wl,-z,nodlopen -Wl,-z,noexecstack \
14 -Wl,-z,relro -Wl,-z,now
15
16filterkv6: main.cpp
17 $(CXX) -fPIE -pie -o $@ $^ $(CXXFLAGS) $(LDFLAGS)
18
19.PHONY: clean
20clean:
21 rm filterkv6
diff --git a/src/filterkv6/main.cpp b/src/filterkv6/main.cpp
new file mode 100644
index 0000000..a32220a
--- /dev/null
+++ b/src/filterkv6/main.cpp
@@ -0,0 +1,106 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include <chrono>
4#include <deque>
5#include <filesystem>
6#include <format>
7#include <fstream>
8#include <iostream>
9
10#include <arrow/api.h>
11#include <arrow/compute/api.h>
12#include <arrow/filesystem/api.h>
13#include <arrow/dataset/api.h>
14#include <arrow/io/api.h>
15
16#include <tmi8/kv6_parquet.hpp>
17
18namespace ds = arrow::dataset;
19namespace cp = arrow::compute;
20using namespace arrow;
21
22arrow::Status processTables(std::string lineno) {
23 auto filesystem = std::make_shared<fs::LocalFileSystem>();
24
25 fs::FileSelector selector;
26 selector.base_dir = std::filesystem::current_path();
27 selector.recursive = false;
28
29 auto format = std::static_pointer_cast<ds::FileFormat>(std::make_shared<ds::ParquetFileFormat>());
30
31 ARROW_ASSIGN_OR_RAISE(auto factory,
32 ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
33 ds::FileSystemFactoryOptions()));
34
35 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
36
37 printf("Scanning dataset for line %s...\n", lineno.c_str());
38 // Read specified columns with a row filter
39 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
40 ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::and_({
41 cp::equal(cp::field_ref("line_planning_number"), cp::literal(lineno)),
42 cp::is_valid(cp::field_ref("rd_x")),
43 cp::is_valid(cp::field_ref("rd_y")),
44 })));
45
46 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
47 ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable());
48
49 puts("Finished loading data, computing stable sort indices...");
50
51 arrow::Datum sort_indices;
52 cp::SortOptions sort_options;
53 sort_options.sort_keys = { cp::SortKey("timestamp" /* ascending by default */) };
54 ARROW_ASSIGN_OR_RAISE(sort_indices, cp::CallFunction("sort_indices", { table }, &sort_options));
55 puts("Finished computing stable sort indices, creating sorted table...");
56
57 arrow::Datum sorted;
58 ARROW_ASSIGN_OR_RAISE(sorted, cp::CallFunction("take", { table, sort_indices }));
59
60 puts("Writing sorted table to disk...");
61 ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*sorted.table(), "merged/oeuf-merged.parquet"));
62 puts("Syncing...");
63 sync();
64 puts("Done. Have a nice day.");
65
66 return arrow::Status::OK();
67}
68
69#define NOTICE "Notice: This tool will fail if any non-Parquet files in are present in the\n" \
70 " current working directory. It does not load files which are present in\n" \
71 " any possible subdirectories."
72
73const char help[] =
74 "Usage: %s <LINENO>\n"
75 "\n"
76 " LINENO The LinePlanningNumber as in the KV1/KV6 data\n\n"
77 NOTICE "\n";
78
79void exitHelp(const char *progname, int code = 1) {
80 printf(help, progname);
81 exit(code);
82}
83
84int main(int argc, char *argv[]) {
85 const char *progname = argv[0];
86 if (argc != 2) {
87 puts("Error: incorrect number of arguments provided\n");
88 exitHelp(progname);
89 }
90 char *lineno = argv[1];
91 puts(NOTICE "\n");
92
93 std::filesystem::path cwd = std::filesystem::current_path();
94 std::filesystem::create_directory(cwd / "merged");
95
96 puts("Running this program may take a while, especially on big datasets. If you're\n"
97 "processing the data of a single bus line over the course of multiple months,\n"
98 "you may see memory usage of up to 10 GiB. Make sure that you have sufficient\n"
99 "RAM available, to avoid overloading and subsequently freezing your system.\n");
100
101 arrow::Status st = processTables(std::string(lineno));
102 if (!st.ok()) {
103 std::cerr << "Failed to process tables: " << st << std::endl;
104 return EXIT_FAILURE;
105 }
106}