diff options
| author | Rutger Broekhoff | 2024-05-02 20:27:40 +0200 |
|---|---|---|
| committer | Rutger Broekhoff | 2024-05-02 20:27:40 +0200 |
| commit | 17a3ea880402338420699e03bcb24181e4ff3924 (patch) | |
| tree | da666ef91e0b60d20aa0b01529644c136fd1f4ab /src/filterkv6 | |
| download | oeuf-17a3ea880402338420699e03bcb24181e4ff3924.tar.gz oeuf-17a3ea880402338420699e03bcb24181e4ff3924.zip | |
Initial commit
Based on dc4ba6a
Diffstat (limited to 'src/filterkv6')
| -rw-r--r-- | src/filterkv6/.envrc | 2 | ||||
| -rw-r--r-- | src/filterkv6/Makefile | 21 | ||||
| -rw-r--r-- | src/filterkv6/main.cpp | 106 |
3 files changed, 129 insertions, 0 deletions
diff --git a/src/filterkv6/.envrc b/src/filterkv6/.envrc new file mode 100644 index 0000000..694e74f --- /dev/null +++ b/src/filterkv6/.envrc | |||
| @@ -0,0 +1,2 @@ | |||
| 1 | source_env ../../ | ||
| 2 | export DEVMODE=1 | ||
diff --git a/src/filterkv6/Makefile b/src/filterkv6/Makefile new file mode 100644 index 0000000..13bb38e --- /dev/null +++ b/src/filterkv6/Makefile | |||
| @@ -0,0 +1,21 @@ | |||
| 1 | # Taken from: | ||
| 2 | # Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide | ||
| 3 | # for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01, | ||
| 4 | # 2023. [Online]. Available: | ||
| 5 | # https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html | ||
| 6 | CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\ | ||
| 7 | -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \ | ||
| 8 | -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \ | ||
| 9 | -D_GLIBCXX_ASSERTIONS \ | ||
| 10 | -fstrict-flex-arrays=3 \ | ||
| 11 | -fstack-clash-protection -fstack-protector-strong | ||
| 12 | LDFLAGS=-larrow -larrow_dataset -lparquet -ltmi8 -Wl,-z,defs \ | ||
| 13 | -Wl,-z,nodlopen -Wl,-z,noexecstack \ | ||
| 14 | -Wl,-z,relro -Wl,-z,now | ||
| 15 | |||
| 16 | filterkv6: main.cpp | ||
| 17 | $(CXX) -fPIE -pie -o $@ $^ $(CXXFLAGS) $(LDFLAGS) | ||
| 18 | |||
| 19 | .PHONY: clean | ||
| 20 | clean: | ||
| 21 | rm filterkv6 | ||
diff --git a/src/filterkv6/main.cpp b/src/filterkv6/main.cpp new file mode 100644 index 0000000..a32220a --- /dev/null +++ b/src/filterkv6/main.cpp | |||
| @@ -0,0 +1,106 @@ | |||
| 1 | // vim:set sw=2 ts=2 sts et: | ||
| 2 | |||
| 3 | #include <chrono> | ||
| 4 | #include <deque> | ||
| 5 | #include <filesystem> | ||
| 6 | #include <format> | ||
| 7 | #include <fstream> | ||
| 8 | #include <iostream> | ||
| 9 | |||
| 10 | #include <arrow/api.h> | ||
| 11 | #include <arrow/compute/api.h> | ||
| 12 | #include <arrow/filesystem/api.h> | ||
| 13 | #include <arrow/dataset/api.h> | ||
| 14 | #include <arrow/io/api.h> | ||
| 15 | |||
| 16 | #include <tmi8/kv6_parquet.hpp> | ||
| 17 | |||
| 18 | namespace ds = arrow::dataset; | ||
| 19 | namespace cp = arrow::compute; | ||
| 20 | using namespace arrow; | ||
| 21 | |||
| 22 | arrow::Status processTables(std::string lineno) { | ||
| 23 | auto filesystem = std::make_shared<fs::LocalFileSystem>(); | ||
| 24 | |||
| 25 | fs::FileSelector selector; | ||
| 26 | selector.base_dir = std::filesystem::current_path(); | ||
| 27 | selector.recursive = false; | ||
| 28 | |||
| 29 | auto format = std::static_pointer_cast<ds::FileFormat>(std::make_shared<ds::ParquetFileFormat>()); | ||
| 30 | |||
| 31 | ARROW_ASSIGN_OR_RAISE(auto factory, | ||
| 32 | ds::FileSystemDatasetFactory::Make(filesystem, selector, format, | ||
| 33 | ds::FileSystemFactoryOptions())); | ||
| 34 | |||
| 35 | ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish()); | ||
| 36 | |||
| 37 | printf("Scanning dataset for line %s...\n", lineno.c_str()); | ||
| 38 | // Read specified columns with a row filter | ||
| 39 | ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan()); | ||
| 40 | ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::and_({ | ||
| 41 | cp::equal(cp::field_ref("line_planning_number"), cp::literal(lineno)), | ||
| 42 | cp::is_valid(cp::field_ref("rd_x")), | ||
| 43 | cp::is_valid(cp::field_ref("rd_y")), | ||
| 44 | }))); | ||
| 45 | |||
| 46 | ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish()); | ||
| 47 | ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable()); | ||
| 48 | |||
| 49 | puts("Finished loading data, computing stable sort indices..."); | ||
| 50 | |||
| 51 | arrow::Datum sort_indices; | ||
| 52 | cp::SortOptions sort_options; | ||
| 53 | sort_options.sort_keys = { cp::SortKey("timestamp" /* ascending by default */) }; | ||
| 54 | ARROW_ASSIGN_OR_RAISE(sort_indices, cp::CallFunction("sort_indices", { table }, &sort_options)); | ||
| 55 | puts("Finished computing stable sort indices, creating sorted table..."); | ||
| 56 | |||
| 57 | arrow::Datum sorted; | ||
| 58 | ARROW_ASSIGN_OR_RAISE(sorted, cp::CallFunction("take", { table, sort_indices })); | ||
| 59 | |||
| 60 | puts("Writing sorted table to disk..."); | ||
| 61 | ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*sorted.table(), "merged/oeuf-merged.parquet")); | ||
| 62 | puts("Syncing..."); | ||
| 63 | sync(); | ||
| 64 | puts("Done. Have a nice day."); | ||
| 65 | |||
| 66 | return arrow::Status::OK(); | ||
| 67 | } | ||
| 68 | |||
| 69 | #define NOTICE "Notice: This tool will fail if any non-Parquet files in are present in the\n" \ | ||
| 70 | " current working directory. It does not load files which are present in\n" \ | ||
| 71 | " any possible subdirectories." | ||
| 72 | |||
| 73 | const char help[] = | ||
| 74 | "Usage: %s <LINENO>\n" | ||
| 75 | "\n" | ||
| 76 | " LINENO The LinePlanningNumber as in the KV1/KV6 data\n\n" | ||
| 77 | NOTICE "\n"; | ||
| 78 | |||
| 79 | void exitHelp(const char *progname, int code = 1) { | ||
| 80 | printf(help, progname); | ||
| 81 | exit(code); | ||
| 82 | } | ||
| 83 | |||
| 84 | int main(int argc, char *argv[]) { | ||
| 85 | const char *progname = argv[0]; | ||
| 86 | if (argc != 2) { | ||
| 87 | puts("Error: incorrect number of arguments provided\n"); | ||
| 88 | exitHelp(progname); | ||
| 89 | } | ||
| 90 | char *lineno = argv[1]; | ||
| 91 | puts(NOTICE "\n"); | ||
| 92 | |||
| 93 | std::filesystem::path cwd = std::filesystem::current_path(); | ||
| 94 | std::filesystem::create_directory(cwd / "merged"); | ||
| 95 | |||
| 96 | puts("Running this program may take a while, especially on big datasets. If you're\n" | ||
| 97 | "processing the data of a single bus line over the course of multiple months,\n" | ||
| 98 | "you may see memory usage of up to 10 GiB. Make sure that you have sufficient\n" | ||
| 99 | "RAM available, to avoid overloading and subsequently freezing your system.\n"); | ||
| 100 | |||
| 101 | arrow::Status st = processTables(std::string(lineno)); | ||
| 102 | if (!st.ok()) { | ||
| 103 | std::cerr << "Failed to process tables: " << st << std::endl; | ||
| 104 | return EXIT_FAILURE; | ||
| 105 | } | ||
| 106 | } | ||