diff options
Diffstat (limited to 'src/bundleparquet/main.cpp')
-rw-r--r-- | src/bundleparquet/main.cpp | 213 |
1 files changed, 213 insertions, 0 deletions
diff --git a/src/bundleparquet/main.cpp b/src/bundleparquet/main.cpp new file mode 100644 index 0000000..05fd881 --- /dev/null +++ b/src/bundleparquet/main.cpp | |||
@@ -0,0 +1,213 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #include <chrono> | ||
4 | #include <deque> | ||
5 | #include <filesystem> | ||
6 | #include <format> | ||
7 | #include <fstream> | ||
8 | #include <iostream> | ||
9 | |||
10 | #include <arrow/api.h> | ||
11 | #include <arrow/io/api.h> | ||
12 | #include <parquet/arrow/reader.h> | ||
13 | |||
14 | #include <nlohmann/json.hpp> | ||
15 | |||
16 | #include <prometheus/counter.h> | ||
17 | #include <prometheus/gateway.h> | ||
18 | #include <prometheus/registry.h> | ||
19 | |||
20 | #include <tmi8/kv6_parquet.hpp> | ||
21 | |||
22 | #include "spliturl.hpp" | ||
23 | |||
24 | static const int MIN_COMBINED_ROWS = 1000000; // one million | ||
25 | static const int MAX_COMBINED_ROWS = 2000000; // two million | ||
26 | |||
27 | struct FileMetadata { | ||
28 | int64_t min_timestamp = 0; | ||
29 | int64_t max_timestamp = 0; | ||
30 | int64_t rows_written = 0; | ||
31 | }; | ||
32 | |||
33 | struct File { | ||
34 | FileMetadata metadata; | ||
35 | std::filesystem::path filename; | ||
36 | }; | ||
37 | |||
38 | FileMetadata readMetadataOf(std::filesystem::path filename) { | ||
39 | std::string meta_filename = std::string(filename) + ".meta.json"; | ||
40 | std::ifstream meta_file = std::ifstream(meta_filename, std::ifstream::in|std::ifstream::binary); | ||
41 | nlohmann::json meta_json; | ||
42 | meta_file >> meta_json; | ||
43 | FileMetadata meta = { | ||
44 | .min_timestamp = meta_json["min_timestamp"], | ||
45 | .max_timestamp = meta_json["max_timestamp"], | ||
46 | .rows_written = meta_json["rows_written"], | ||
47 | }; | ||
48 | return meta; | ||
49 | } | ||
50 | |||
51 | arrow::Status processFirstTables(std::deque<File> &files, prometheus::Counter &rows_written) { | ||
52 | if (files.size() == 0) { | ||
53 | std::cerr << "Did not find any files" << std::endl; | ||
54 | return arrow::Status::OK(); | ||
55 | } | ||
56 | |||
57 | int64_t rows = 0; | ||
58 | |||
59 | std::vector<std::shared_ptr<arrow::Table>> tables; | ||
60 | std::vector<std::filesystem::path> processed; | ||
61 | int64_t min_timestamp = std::numeric_limits<int64_t>::max(); | ||
62 | int64_t max_timestamp = 0; | ||
63 | |||
64 | bool over_capacity_risk = false; | ||
65 | auto it = files.begin(); | ||
66 | while (it != files.end()) { | ||
67 | const std::filesystem::path &filename = it->filename; | ||
68 | const FileMetadata &metadata = it->metadata; | ||
69 | |||
70 | std::shared_ptr<arrow::io::RandomAccessFile> input; | ||
71 | ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open(filename)); | ||
72 | |||
73 | std::unique_ptr<parquet::arrow::FileReader> arrow_reader; | ||
74 | ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, arrow::default_memory_pool(), &arrow_reader)); | ||
75 | |||
76 | if (metadata.min_timestamp < min_timestamp) | ||
77 | min_timestamp = metadata.min_timestamp; | ||
78 | if (metadata.max_timestamp > max_timestamp) | ||
79 | max_timestamp = metadata.max_timestamp; | ||
80 | |||
81 | if (rows + metadata.rows_written > MAX_COMBINED_ROWS) { | ||
82 | over_capacity_risk = true; | ||
83 | break; | ||
84 | } | ||
85 | |||
86 | std::shared_ptr<arrow::Table> table; | ||
87 | ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table)); | ||
88 | tables.push_back(table); | ||
89 | processed.push_back(filename); | ||
90 | rows += metadata.rows_written; | ||
91 | it = files.erase(it); | ||
92 | } | ||
93 | |||
94 | if (rows < MIN_COMBINED_ROWS && !over_capacity_risk) { | ||
95 | std::cerr << "Found files, but not enough to satisfy the minimum amount of rows for the combined file" << std::endl; | ||
96 | std::cerr << "(We have " << rows << "/" << MIN_COMBINED_ROWS << " rows at the moment, so " | ||
97 | << static_cast<float>(rows)/static_cast<float>(MIN_COMBINED_ROWS)*100.f << "%)" << std::endl; | ||
98 | return arrow::Status::OK(); | ||
99 | } else if (rows == 0 && over_capacity_risk) { | ||
100 | const std::filesystem::path &filename = files.front().filename; | ||
101 | std::filesystem::rename(filename, "merged" / filename); | ||
102 | std::filesystem::rename(std::string(filename) + ".meta.json", std::string("merged" / filename) + ".meta.json"); | ||
103 | rows_written.Increment(static_cast<double>(files.front().metadata.rows_written)); | ||
104 | files.pop_front(); | ||
105 | return arrow::Status::OK(); | ||
106 | } | ||
107 | |||
108 | // Default options specify that the schemas are not unified, which is | ||
109 | // luckliy exactly what we want :) | ||
110 | std::shared_ptr<arrow::Table> merged_table; | ||
111 | ARROW_ASSIGN_OR_RAISE(merged_table, arrow::ConcatenateTables(tables)); | ||
112 | |||
113 | auto timestamp = std::chrono::round<std::chrono::seconds>(std::chrono::system_clock::now()); | ||
114 | std::string filename = std::format("merged/oeuf-{:%FT%T%Ez}.parquet", timestamp); | ||
115 | ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*merged_table, filename)); | ||
116 | |||
117 | std::cerr << "Wrote merged table to " << filename << std::endl; | ||
118 | |||
119 | std::ofstream metaf(filename + ".meta.json.part", std::ios::binary); | ||
120 | nlohmann::json meta{ | ||
121 | { "min_timestamp", min_timestamp }, | ||
122 | { "max_timestamp", max_timestamp }, | ||
123 | { "rows_written", rows }, | ||
124 | }; | ||
125 | metaf << meta; | ||
126 | metaf.close(); | ||
127 | std::filesystem::rename(filename + ".meta.json.part", filename + ".meta.json"); | ||
128 | |||
129 | std::cerr << "Wrote merged table metadata" << std::endl; | ||
130 | rows_written.Increment(static_cast<double>(rows)); | ||
131 | |||
132 | for (const std::filesystem::path &filename : processed) { | ||
133 | std::filesystem::remove(filename); | ||
134 | std::filesystem::remove(std::string(filename) + ".meta.json"); | ||
135 | } | ||
136 | |||
137 | std::cerr << "Successfully wrote merged table, metadata and deleted old files" << std::endl; | ||
138 | |||
139 | return arrow::Status::OK(); | ||
140 | } | ||
141 | |||
142 | arrow::Status processTables(std::deque<File> &files, prometheus::Counter &rows_written) { | ||
143 | while (!files.empty()) | ||
144 | ARROW_RETURN_NOT_OK(processFirstTables(files, rows_written)); | ||
145 | return arrow::Status::OK(); | ||
146 | } | ||
147 | |||
148 | int main(int argc, char *argv[]) { | ||
149 | std::filesystem::path cwd = std::filesystem::current_path(); | ||
150 | std::filesystem::create_directory(cwd / "merged"); | ||
151 | |||
152 | const char *prom_push_url = getenv("PROMETHEUS_PUSH_URL"); | ||
153 | if (!prom_push_url || strlen(prom_push_url) == 0) { | ||
154 | std::cerr << "Error: no PROMETHEUS_PUSH_URL set!" << std::endl; | ||
155 | return EXIT_FAILURE; | ||
156 | } | ||
157 | |||
158 | std::string split_err; | ||
159 | auto split_prom_push_url = splitUrl(prom_push_url, &split_err); | ||
160 | if (!split_prom_push_url) { | ||
161 | std::cerr << "Could not process URL in environment variable PROMETHEUS_PUSH_URL: " | ||
162 | << split_err << std::endl; | ||
163 | return EXIT_FAILURE; | ||
164 | } | ||
165 | std::cout << "Prometheus Push URL: " << split_prom_push_url->schemehost << ":" | ||
166 | << split_prom_push_url->portpath << std::endl; | ||
167 | |||
168 | prometheus::Gateway gateway{split_prom_push_url->schemehost, | ||
169 | split_prom_push_url->portpath, | ||
170 | "oeuf-archiver"}; | ||
171 | |||
172 | auto registry = std::make_shared<prometheus::Registry>(); | ||
173 | prometheus::Gauge &rows_available = prometheus::BuildGauge() | ||
174 | .Name("archiver_rows_available") | ||
175 | .Help("Number of rows available to the archiver") | ||
176 | .Register(*registry) | ||
177 | .Add({}); | ||
178 | prometheus::Counter &rows_written = prometheus::BuildCounter() | ||
179 | .Name("archiver_rows_written") | ||
180 | .Help("Number of rows written by the archiver") | ||
181 | .Register(*registry) | ||
182 | .Add({}); | ||
183 | gateway.RegisterCollectable(registry); | ||
184 | |||
185 | std::deque<File> files; | ||
186 | for (auto const &dir_entry : std::filesystem::directory_iterator{cwd}) { | ||
187 | if (!dir_entry.is_regular_file()) continue; | ||
188 | std::filesystem::path filename = dir_entry.path().filename(); | ||
189 | const std::string &filename_str = filename; | ||
190 | if (filename_str.starts_with("oeuf-") && filename_str.ends_with("+00:00.parquet")) { | ||
191 | try { | ||
192 | FileMetadata meta = readMetadataOf(filename); | ||
193 | File file = { .metadata = meta, .filename = filename }; | ||
194 | files.push_back(file); | ||
195 | |||
196 | rows_available.Increment(static_cast<double>(meta.rows_written)); | ||
197 | } catch (const std::exception &e) { | ||
198 | std::cerr << "Failed to read metadata of file " << filename << ": " << e.what() << std::endl; | ||
199 | return EXIT_FAILURE; | ||
200 | } | ||
201 | } | ||
202 | } | ||
203 | |||
204 | std::sort(files.begin(), files.end(), | ||
205 | [](const File &f1, const File &f2) { return f1.filename < f2.filename; }); | ||
206 | arrow::Status st = processTables(files, rows_written); | ||
207 | if (!st.ok()) { | ||
208 | std::cerr << "Failed to process tables: " << st << std::endl; | ||
209 | return EXIT_FAILURE; | ||
210 | } | ||
211 | |||
212 | gateway.Push(); | ||
213 | } | ||