aboutsummaryrefslogtreecommitdiffstats
path: root/src/bundleparquet/main.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/bundleparquet/main.cpp')
-rw-r--r--src/bundleparquet/main.cpp213
1 files changed, 213 insertions, 0 deletions
diff --git a/src/bundleparquet/main.cpp b/src/bundleparquet/main.cpp
new file mode 100644
index 0000000..05fd881
--- /dev/null
+++ b/src/bundleparquet/main.cpp
@@ -0,0 +1,213 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include <chrono>
4#include <deque>
5#include <filesystem>
6#include <format>
7#include <fstream>
8#include <iostream>
9
10#include <arrow/api.h>
11#include <arrow/io/api.h>
12#include <parquet/arrow/reader.h>
13
14#include <nlohmann/json.hpp>
15
16#include <prometheus/counter.h>
17#include <prometheus/gateway.h>
18#include <prometheus/registry.h>
19
20#include <tmi8/kv6_parquet.hpp>
21
22#include "spliturl.hpp"
23
24static const int MIN_COMBINED_ROWS = 1000000; // one million
25static const int MAX_COMBINED_ROWS = 2000000; // two million
26
27struct FileMetadata {
28 int64_t min_timestamp = 0;
29 int64_t max_timestamp = 0;
30 int64_t rows_written = 0;
31};
32
33struct File {
34 FileMetadata metadata;
35 std::filesystem::path filename;
36};
37
38FileMetadata readMetadataOf(std::filesystem::path filename) {
39 std::string meta_filename = std::string(filename) + ".meta.json";
40 std::ifstream meta_file = std::ifstream(meta_filename, std::ifstream::in|std::ifstream::binary);
41 nlohmann::json meta_json;
42 meta_file >> meta_json;
43 FileMetadata meta = {
44 .min_timestamp = meta_json["min_timestamp"],
45 .max_timestamp = meta_json["max_timestamp"],
46 .rows_written = meta_json["rows_written"],
47 };
48 return meta;
49}
50
51arrow::Status processFirstTables(std::deque<File> &files, prometheus::Counter &rows_written) {
52 if (files.size() == 0) {
53 std::cerr << "Did not find any files" << std::endl;
54 return arrow::Status::OK();
55 }
56
57 int64_t rows = 0;
58
59 std::vector<std::shared_ptr<arrow::Table>> tables;
60 std::vector<std::filesystem::path> processed;
61 int64_t min_timestamp = std::numeric_limits<int64_t>::max();
62 int64_t max_timestamp = 0;
63
64 bool over_capacity_risk = false;
65 auto it = files.begin();
66 while (it != files.end()) {
67 const std::filesystem::path &filename = it->filename;
68 const FileMetadata &metadata = it->metadata;
69
70 std::shared_ptr<arrow::io::RandomAccessFile> input;
71 ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open(filename));
72
73 std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
74 ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, arrow::default_memory_pool(), &arrow_reader));
75
76 if (metadata.min_timestamp < min_timestamp)
77 min_timestamp = metadata.min_timestamp;
78 if (metadata.max_timestamp > max_timestamp)
79 max_timestamp = metadata.max_timestamp;
80
81 if (rows + metadata.rows_written > MAX_COMBINED_ROWS) {
82 over_capacity_risk = true;
83 break;
84 }
85
86 std::shared_ptr<arrow::Table> table;
87 ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table));
88 tables.push_back(table);
89 processed.push_back(filename);
90 rows += metadata.rows_written;
91 it = files.erase(it);
92 }
93
94 if (rows < MIN_COMBINED_ROWS && !over_capacity_risk) {
95 std::cerr << "Found files, but not enough to satisfy the minimum amount of rows for the combined file" << std::endl;
96 std::cerr << "(We have " << rows << "/" << MIN_COMBINED_ROWS << " rows at the moment, so "
97 << static_cast<float>(rows)/static_cast<float>(MIN_COMBINED_ROWS)*100.f << "%)" << std::endl;
98 return arrow::Status::OK();
99 } else if (rows == 0 && over_capacity_risk) {
100 const std::filesystem::path &filename = files.front().filename;
101 std::filesystem::rename(filename, "merged" / filename);
102 std::filesystem::rename(std::string(filename) + ".meta.json", std::string("merged" / filename) + ".meta.json");
103 rows_written.Increment(static_cast<double>(files.front().metadata.rows_written));
104 files.pop_front();
105 return arrow::Status::OK();
106 }
107
108 // Default options specify that the schemas are not unified, which is
109 // luckliy exactly what we want :)
110 std::shared_ptr<arrow::Table> merged_table;
111 ARROW_ASSIGN_OR_RAISE(merged_table, arrow::ConcatenateTables(tables));
112
113 auto timestamp = std::chrono::round<std::chrono::seconds>(std::chrono::system_clock::now());
114 std::string filename = std::format("merged/oeuf-{:%FT%T%Ez}.parquet", timestamp);
115 ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*merged_table, filename));
116
117 std::cerr << "Wrote merged table to " << filename << std::endl;
118
119 std::ofstream metaf(filename + ".meta.json.part", std::ios::binary);
120 nlohmann::json meta{
121 { "min_timestamp", min_timestamp },
122 { "max_timestamp", max_timestamp },
123 { "rows_written", rows },
124 };
125 metaf << meta;
126 metaf.close();
127 std::filesystem::rename(filename + ".meta.json.part", filename + ".meta.json");
128
129 std::cerr << "Wrote merged table metadata" << std::endl;
130 rows_written.Increment(static_cast<double>(rows));
131
132 for (const std::filesystem::path &filename : processed) {
133 std::filesystem::remove(filename);
134 std::filesystem::remove(std::string(filename) + ".meta.json");
135 }
136
137 std::cerr << "Successfully wrote merged table, metadata and deleted old files" << std::endl;
138
139 return arrow::Status::OK();
140}
141
142arrow::Status processTables(std::deque<File> &files, prometheus::Counter &rows_written) {
143 while (!files.empty())
144 ARROW_RETURN_NOT_OK(processFirstTables(files, rows_written));
145 return arrow::Status::OK();
146}
147
148int main(int argc, char *argv[]) {
149 std::filesystem::path cwd = std::filesystem::current_path();
150 std::filesystem::create_directory(cwd / "merged");
151
152 const char *prom_push_url = getenv("PROMETHEUS_PUSH_URL");
153 if (!prom_push_url || strlen(prom_push_url) == 0) {
154 std::cerr << "Error: no PROMETHEUS_PUSH_URL set!" << std::endl;
155 return EXIT_FAILURE;
156 }
157
158 std::string split_err;
159 auto split_prom_push_url = splitUrl(prom_push_url, &split_err);
160 if (!split_prom_push_url) {
161 std::cerr << "Could not process URL in environment variable PROMETHEUS_PUSH_URL: "
162 << split_err << std::endl;
163 return EXIT_FAILURE;
164 }
165 std::cout << "Prometheus Push URL: " << split_prom_push_url->schemehost << ":"
166 << split_prom_push_url->portpath << std::endl;
167
168 prometheus::Gateway gateway{split_prom_push_url->schemehost,
169 split_prom_push_url->portpath,
170 "oeuf-archiver"};
171
172 auto registry = std::make_shared<prometheus::Registry>();
173 prometheus::Gauge &rows_available = prometheus::BuildGauge()
174 .Name("archiver_rows_available")
175 .Help("Number of rows available to the archiver")
176 .Register(*registry)
177 .Add({});
178 prometheus::Counter &rows_written = prometheus::BuildCounter()
179 .Name("archiver_rows_written")
180 .Help("Number of rows written by the archiver")
181 .Register(*registry)
182 .Add({});
183 gateway.RegisterCollectable(registry);
184
185 std::deque<File> files;
186 for (auto const &dir_entry : std::filesystem::directory_iterator{cwd}) {
187 if (!dir_entry.is_regular_file()) continue;
188 std::filesystem::path filename = dir_entry.path().filename();
189 const std::string &filename_str = filename;
190 if (filename_str.starts_with("oeuf-") && filename_str.ends_with("+00:00.parquet")) {
191 try {
192 FileMetadata meta = readMetadataOf(filename);
193 File file = { .metadata = meta, .filename = filename };
194 files.push_back(file);
195
196 rows_available.Increment(static_cast<double>(meta.rows_written));
197 } catch (const std::exception &e) {
198 std::cerr << "Failed to read metadata of file " << filename << ": " << e.what() << std::endl;
199 return EXIT_FAILURE;
200 }
201 }
202 }
203
204 std::sort(files.begin(), files.end(),
205 [](const File &f1, const File &f2) { return f1.filename < f2.filename; });
206 arrow::Status st = processTables(files, rows_written);
207 if (!st.ok()) {
208 std::cerr << "Failed to process tables: " << st << std::endl;
209 return EXIT_FAILURE;
210 }
211
212 gateway.Push();
213}