aboutsummaryrefslogtreecommitdiffstats
path: root/src/bundleparquet
diff options
context:
space:
mode:
authorLibravatar Rutger Broekhoff2024-05-02 20:27:40 +0200
committerLibravatar Rutger Broekhoff2024-05-02 20:27:40 +0200
commit17a3ea880402338420699e03bcb24181e4ff3924 (patch)
treeda666ef91e0b60d20aa0b01529644c136fd1f4ab /src/bundleparquet
downloadoeuf-17a3ea880402338420699e03bcb24181e4ff3924.tar.gz
oeuf-17a3ea880402338420699e03bcb24181e4ff3924.zip
Initial commit
Based on dc4ba6a
Diffstat (limited to 'src/bundleparquet')
-rw-r--r--src/bundleparquet/.envrc2
-rw-r--r--src/bundleparquet/Makefile21
-rw-r--r--src/bundleparquet/main.cpp213
-rw-r--r--src/bundleparquet/spliturl.cpp203
-rw-r--r--src/bundleparquet/spliturl.hpp11
5 files changed, 450 insertions, 0 deletions
diff --git a/src/bundleparquet/.envrc b/src/bundleparquet/.envrc
new file mode 100644
index 0000000..694e74f
--- /dev/null
+++ b/src/bundleparquet/.envrc
@@ -0,0 +1,2 @@
1source_env ../../
2export DEVMODE=1
diff --git a/src/bundleparquet/Makefile b/src/bundleparquet/Makefile
new file mode 100644
index 0000000..170304d
--- /dev/null
+++ b/src/bundleparquet/Makefile
@@ -0,0 +1,21 @@
1# Taken from:
2# Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide
3# for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01,
4# 2023. [Online]. Available:
5# https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html
6CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\
7 -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \
8 -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \
9 -D_GLIBCXX_ASSERTIONS \
10 -fstrict-flex-arrays=3 \
11 -fstack-clash-protection -fstack-protector-strong
12LDFLAGS=-larrow -lcurl -lparquet -lprometheus-cpp-push -lprometheus-cpp-core -lz -ltmi8 -Wl,-z,defs \
13 -Wl,-z,nodlopen -Wl,-z,noexecstack \
14 -Wl,-z,relro -Wl,-z,now
15
16bundleparquet: main.cpp spliturl.cpp
17 $(CXX) -fPIE -pie -o $@ $^ $(CXXFLAGS) $(LDFLAGS)
18
19.PHONY: clean
20clean:
21 rm bundleparquet
diff --git a/src/bundleparquet/main.cpp b/src/bundleparquet/main.cpp
new file mode 100644
index 0000000..05fd881
--- /dev/null
+++ b/src/bundleparquet/main.cpp
@@ -0,0 +1,213 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include <chrono>
4#include <deque>
5#include <filesystem>
6#include <format>
7#include <fstream>
8#include <iostream>
9
10#include <arrow/api.h>
11#include <arrow/io/api.h>
12#include <parquet/arrow/reader.h>
13
14#include <nlohmann/json.hpp>
15
16#include <prometheus/counter.h>
17#include <prometheus/gateway.h>
18#include <prometheus/registry.h>
19
20#include <tmi8/kv6_parquet.hpp>
21
22#include "spliturl.hpp"
23
24static const int MIN_COMBINED_ROWS = 1000000; // one million
25static const int MAX_COMBINED_ROWS = 2000000; // two million
26
27struct FileMetadata {
28 int64_t min_timestamp = 0;
29 int64_t max_timestamp = 0;
30 int64_t rows_written = 0;
31};
32
33struct File {
34 FileMetadata metadata;
35 std::filesystem::path filename;
36};
37
38FileMetadata readMetadataOf(std::filesystem::path filename) {
39 std::string meta_filename = std::string(filename) + ".meta.json";
40 std::ifstream meta_file = std::ifstream(meta_filename, std::ifstream::in|std::ifstream::binary);
41 nlohmann::json meta_json;
42 meta_file >> meta_json;
43 FileMetadata meta = {
44 .min_timestamp = meta_json["min_timestamp"],
45 .max_timestamp = meta_json["max_timestamp"],
46 .rows_written = meta_json["rows_written"],
47 };
48 return meta;
49}
50
51arrow::Status processFirstTables(std::deque<File> &files, prometheus::Counter &rows_written) {
52 if (files.size() == 0) {
53 std::cerr << "Did not find any files" << std::endl;
54 return arrow::Status::OK();
55 }
56
57 int64_t rows = 0;
58
59 std::vector<std::shared_ptr<arrow::Table>> tables;
60 std::vector<std::filesystem::path> processed;
61 int64_t min_timestamp = std::numeric_limits<int64_t>::max();
62 int64_t max_timestamp = 0;
63
64 bool over_capacity_risk = false;
65 auto it = files.begin();
66 while (it != files.end()) {
67 const std::filesystem::path &filename = it->filename;
68 const FileMetadata &metadata = it->metadata;
69
70 std::shared_ptr<arrow::io::RandomAccessFile> input;
71 ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open(filename));
72
73 std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
74 ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, arrow::default_memory_pool(), &arrow_reader));
75
76 if (metadata.min_timestamp < min_timestamp)
77 min_timestamp = metadata.min_timestamp;
78 if (metadata.max_timestamp > max_timestamp)
79 max_timestamp = metadata.max_timestamp;
80
81 if (rows + metadata.rows_written > MAX_COMBINED_ROWS) {
82 over_capacity_risk = true;
83 break;
84 }
85
86 std::shared_ptr<arrow::Table> table;
87 ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table));
88 tables.push_back(table);
89 processed.push_back(filename);
90 rows += metadata.rows_written;
91 it = files.erase(it);
92 }
93
94 if (rows < MIN_COMBINED_ROWS && !over_capacity_risk) {
95 std::cerr << "Found files, but not enough to satisfy the minimum amount of rows for the combined file" << std::endl;
96 std::cerr << "(We have " << rows << "/" << MIN_COMBINED_ROWS << " rows at the moment, so "
97 << static_cast<float>(rows)/static_cast<float>(MIN_COMBINED_ROWS)*100.f << "%)" << std::endl;
98 return arrow::Status::OK();
99 } else if (rows == 0 && over_capacity_risk) {
100 const std::filesystem::path &filename = files.front().filename;
101 std::filesystem::rename(filename, "merged" / filename);
102 std::filesystem::rename(std::string(filename) + ".meta.json", std::string("merged" / filename) + ".meta.json");
103 rows_written.Increment(static_cast<double>(files.front().metadata.rows_written));
104 files.pop_front();
105 return arrow::Status::OK();
106 }
107
108 // Default options specify that the schemas are not unified, which is
109 // luckliy exactly what we want :)
110 std::shared_ptr<arrow::Table> merged_table;
111 ARROW_ASSIGN_OR_RAISE(merged_table, arrow::ConcatenateTables(tables));
112
113 auto timestamp = std::chrono::round<std::chrono::seconds>(std::chrono::system_clock::now());
114 std::string filename = std::format("merged/oeuf-{:%FT%T%Ez}.parquet", timestamp);
115 ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*merged_table, filename));
116
117 std::cerr << "Wrote merged table to " << filename << std::endl;
118
119 std::ofstream metaf(filename + ".meta.json.part", std::ios::binary);
120 nlohmann::json meta{
121 { "min_timestamp", min_timestamp },
122 { "max_timestamp", max_timestamp },
123 { "rows_written", rows },
124 };
125 metaf << meta;
126 metaf.close();
127 std::filesystem::rename(filename + ".meta.json.part", filename + ".meta.json");
128
129 std::cerr << "Wrote merged table metadata" << std::endl;
130 rows_written.Increment(static_cast<double>(rows));
131
132 for (const std::filesystem::path &filename : processed) {
133 std::filesystem::remove(filename);
134 std::filesystem::remove(std::string(filename) + ".meta.json");
135 }
136
137 std::cerr << "Successfully wrote merged table, metadata and deleted old files" << std::endl;
138
139 return arrow::Status::OK();
140}
141
142arrow::Status processTables(std::deque<File> &files, prometheus::Counter &rows_written) {
143 while (!files.empty())
144 ARROW_RETURN_NOT_OK(processFirstTables(files, rows_written));
145 return arrow::Status::OK();
146}
147
148int main(int argc, char *argv[]) {
149 std::filesystem::path cwd = std::filesystem::current_path();
150 std::filesystem::create_directory(cwd / "merged");
151
152 const char *prom_push_url = getenv("PROMETHEUS_PUSH_URL");
153 if (!prom_push_url || strlen(prom_push_url) == 0) {
154 std::cerr << "Error: no PROMETHEUS_PUSH_URL set!" << std::endl;
155 return EXIT_FAILURE;
156 }
157
158 std::string split_err;
159 auto split_prom_push_url = splitUrl(prom_push_url, &split_err);
160 if (!split_prom_push_url) {
161 std::cerr << "Could not process URL in environment variable PROMETHEUS_PUSH_URL: "
162 << split_err << std::endl;
163 return EXIT_FAILURE;
164 }
165 std::cout << "Prometheus Push URL: " << split_prom_push_url->schemehost << ":"
166 << split_prom_push_url->portpath << std::endl;
167
168 prometheus::Gateway gateway{split_prom_push_url->schemehost,
169 split_prom_push_url->portpath,
170 "oeuf-archiver"};
171
172 auto registry = std::make_shared<prometheus::Registry>();
173 prometheus::Gauge &rows_available = prometheus::BuildGauge()
174 .Name("archiver_rows_available")
175 .Help("Number of rows available to the archiver")
176 .Register(*registry)
177 .Add({});
178 prometheus::Counter &rows_written = prometheus::BuildCounter()
179 .Name("archiver_rows_written")
180 .Help("Number of rows written by the archiver")
181 .Register(*registry)
182 .Add({});
183 gateway.RegisterCollectable(registry);
184
185 std::deque<File> files;
186 for (auto const &dir_entry : std::filesystem::directory_iterator{cwd}) {
187 if (!dir_entry.is_regular_file()) continue;
188 std::filesystem::path filename = dir_entry.path().filename();
189 const std::string &filename_str = filename;
190 if (filename_str.starts_with("oeuf-") && filename_str.ends_with("+00:00.parquet")) {
191 try {
192 FileMetadata meta = readMetadataOf(filename);
193 File file = { .metadata = meta, .filename = filename };
194 files.push_back(file);
195
196 rows_available.Increment(static_cast<double>(meta.rows_written));
197 } catch (const std::exception &e) {
198 std::cerr << "Failed to read metadata of file " << filename << ": " << e.what() << std::endl;
199 return EXIT_FAILURE;
200 }
201 }
202 }
203
204 std::sort(files.begin(), files.end(),
205 [](const File &f1, const File &f2) { return f1.filename < f2.filename; });
206 arrow::Status st = processTables(files, rows_written);
207 if (!st.ok()) {
208 std::cerr << "Failed to process tables: " << st << std::endl;
209 return EXIT_FAILURE;
210 }
211
212 gateway.Push();
213}
diff --git a/src/bundleparquet/spliturl.cpp b/src/bundleparquet/spliturl.cpp
new file mode 100644
index 0000000..90fd821
--- /dev/null
+++ b/src/bundleparquet/spliturl.cpp
@@ -0,0 +1,203 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include <cstring>
4#include <iostream>
5#include <optional>
6#include <sstream>
7#include <string>
8
9#include <curl/curl.h>
10
11#include "spliturl.hpp"
12
13// splitUrl takes a URL of the shape '[http[s]://]HOST[:PORT][/PATH]', and
14// splits it into two URLs:
15// - scheme + host -> '[http[s]://]HOST'
16// - port + path -> '[PORT][/PATH]'
17// In case an IPv6 address is provided, the host must enclosed in square
18// brackets. The zone ID may also be indicated. Note that in the resulting
19// parts, the colon preceding the port number is omitted. This is on purpose.
20std::optional<SplitUrl> splitUrl(const std::string &url, std::string *error) {
21 std::stringstream errs;
22 std::optional<SplitUrl> result;
23 char *processed = nullptr;
24 char *scheme = nullptr;
25 char *user = nullptr;
26 char *password = nullptr;
27 char *zoneid = nullptr;
28 char *query = nullptr;
29 char *fragment = nullptr;
30 CURLU *schemehost = nullptr;
31 char *schemehost_url = nullptr;
32 char *portpath_url = nullptr;
33
34 // Parse the URL, allowing the user to omit the scheme. CURL will use 'https'
35 // by default if no scheme is specified.
36
37 CURLU *parsed = curl_url();
38 CURLUcode rc = curl_url_set(parsed, CURLUPART_URL, url.c_str(), CURLU_DEFAULT_SCHEME);
39 if (rc != CURLUE_OK) {
40 errs << "Failed to parse URL: " << curl_url_strerror(rc);
41 goto Exit;
42 }
43
44 // As we parse the URL with the option CURLU_DEFAULT_SCHEME, the CURL API
45 // won't require the user to provide the scheme part of the URL. It will
46 // automatically default the scheme to https. However, we do not usually want
47 // it to default to HTTPS, but HTTP instead (as the use case, connecting to a
48 // PushGateway server, usually is served over a private network via HTTP).
49 //
50 // This is why we check if the scheme was put there by CURL and otherwise set
51 // it to HTTP. We also check for any other schemes that the user may have
52 // provided, and reject anything that is not http/https.
53 if (!url.starts_with("http://") && !url.starts_with("https://")) {
54 rc = curl_url_get(parsed, CURLUPART_SCHEME, &scheme, 0);
55 if (rc != CURLUE_OK) {
56 errs << "Could not get scheme from parsed URL: " << curl_url_strerror(rc);
57 goto Exit;
58 }
59 if (strcmp(scheme, "https")) {
60 errs << "Unexpected scheme" << scheme << "in provided URL (expected http or https)";
61 goto Exit;
62 }
63 rc = curl_url_set(parsed, CURLUPART_SCHEME, "http", 0);
64 if (rc != CURLUE_OK) {
65 errs << "Could not set URL scheme to http: " << curl_url_strerror(rc);
66 goto Exit;
67 }
68 }
69
70 // Turn the parsed URL back into a string.
71 rc = curl_url_get(parsed, CURLUPART_URL, &processed, 0);
72 if (rc != CURLUE_OK) {
73 errs << "Failed to output parsed URL: " << curl_url_strerror(rc);
74 goto Exit;
75 }
76
77 // This part of the code checks if no prohibited parts are present in the URL
78 // (basic auth: (user, password), query, fragment).
79
80 rc = curl_url_get(parsed, CURLUPART_USER, &user, 0);
81 if (rc == CURLUE_OK && strlen(user) != 0) {
82 errs << "Provided URL should not contain a user part";
83 goto Exit;
84 } else if (rc != CURLUE_NO_USER && rc != CURLUE_OK) {
85 errs << "Failed to get check user part existence in provided url: " << curl_url_strerror(rc);
86 goto Exit;
87 }
88
89 rc = curl_url_get(parsed, CURLUPART_PASSWORD, &password, 0);
90 if (rc == CURLUE_OK && strlen(password) != 0) {
91 errs << "Provided URL should not contain a password part";
92 goto Exit;
93 } else if (rc != CURLUE_NO_PASSWORD && rc != CURLUE_OK) {
94 errs << "Failed to get check password part existence in provided url: " << curl_url_strerror(rc);
95 goto Exit;
96 }
97
98 rc = curl_url_get(parsed, CURLUPART_QUERY, &query, 0);
99 if (rc == CURLUE_OK && strlen(query) != 0) {
100 errs << "Provided URL should not contain a query part";
101 goto Exit;
102 } else if (rc != CURLUE_NO_QUERY && rc != CURLUE_OK) {
103 errs << "Failed to get check query part existence in provided url: " << curl_url_strerror(rc);
104 goto Exit;
105 }
106
107 rc = curl_url_get(parsed, CURLUPART_FRAGMENT, &fragment, 0);
108 if (rc == CURLUE_OK && strlen(fragment) != 0) {
109 errs << "Provided URL should not contain a fragment part";
110 goto Exit;
111 } else if (rc != CURLUE_NO_FRAGMENT && rc != CURLUE_OK) {
112 errs << "Failed to get check fragment part existence in provided url: " << curl_url_strerror(rc);
113 goto Exit;
114 }
115
116 // Now that we know that the provided URL makes sense, we can start doing
117 // some arts and crafts. We get started by copying the parsed URL into
118 // schemehost and simply delete all parts which are not scheme + host.
119
120 schemehost = curl_url_dup(parsed);
121
122 // CURL BUG WORKAROUND: CURLUPART_ZONEID is NOT copied by curl_url_dup!
123 // ^ fixed in CURL 8.3.0 after https://curl.se/mail/lib-2023-07/0047.html
124 rc = curl_url_get(parsed, CURLUPART_ZONEID, &zoneid, 0);
125 if (rc == CURLUE_OK) {
126 rc = curl_url_set(schemehost, CURLUPART_ZONEID, zoneid, 0);
127 if (rc != CURLUE_OK) {
128 errs << "Could not copy zone ID to duplicated URL: " << curl_url_strerror(rc);
129 goto Exit;
130 }
131 }
132 rc = curl_url_set(schemehost, CURLUPART_PORT, nullptr, 0);
133 if (rc != CURLUE_OK) {
134 errs << "Could not unset port in duplicated URL: " << curl_url_strerror(rc);
135 goto Exit;
136 }
137 rc = curl_url_set(schemehost, CURLUPART_PATH, nullptr, 0);
138 if (rc != CURLUE_OK) {
139 errs << "Could not unset path in duplicated URL: " << curl_url_strerror(rc);
140 goto Exit;
141 }
142
143 // Okay, now we have the schemehost CURLU all ready to go. Note that a URL
144 // only consisting of a scheme and host is considered valid, so CURL will be
145 // more than happy to actually turn it into a string for us. Which is exactly
146 // what we do here :)
147
148 rc = curl_url_get(schemehost, CURLUPART_URL, &schemehost_url, 0);
149 if (rc != CURLUE_OK) {
150 errs << "Could not get scheme + host URL: " << curl_url_strerror(rc);
151 goto Exit;
152 }
153
154 // Remove any trailing slash after the scheme + host URL that CURL might have
155 // put there -- we still want to get a valid URL if we paste the port + path
156 // part behind it.
157
158 if (strlen(schemehost_url) > 0) {
159 if (schemehost_url[strlen(schemehost_url) - 1] != '/') {
160 errs << "Scheme + host URL does not end with a slash";
161 goto Exit;
162 }
163 schemehost_url[strlen(schemehost_url) - 1] = '\0';
164 }
165
166 // Look, this is really gross. Because the port + path part of the URL is not
167 // a valid URL itself, but the scheme + host should be a prefix of the full
168 // URL containing the port + path, we can simply check if it is indeed a
169 // prefix, and then strip it from the full URL, giving us the port + path
170 // (after deleting the colon preceding the port).
171
172 if (!std::string_view(processed).starts_with(schemehost_url)) {
173 errs << "Scheme + host URL is not a prefix of the processed URL";
174 goto Exit;
175 }
176
177 portpath_url = processed + strlen(schemehost_url);
178 // We should not have the colon before the port, prometheus-cpp inserts it
179 if (strlen(portpath_url) > 0 && portpath_url[0] == ':') portpath_url++;
180 // We do not need a trailing slash
181 if (strlen(portpath_url) > 0 && portpath_url[strlen(portpath_url)-1] == '/')
182 portpath_url[strlen(portpath_url)-1] = '\0';
183
184 // It has been done. BLECH
185 result = std::make_optional<SplitUrl>(schemehost_url, portpath_url);
186
187Exit:
188 curl_free(processed);
189 curl_free(scheme);
190 curl_free(user);
191 curl_free(password);
192 curl_free(query);
193 curl_free(fragment);
194 curl_free(zoneid);
195 curl_free(schemehost_url);
196 curl_url_cleanup(schemehost);
197 curl_url_cleanup(parsed);
198
199 if (!result && error)
200 *error = errs.str();
201
202 return result;
203}
diff --git a/src/bundleparquet/spliturl.hpp b/src/bundleparquet/spliturl.hpp
new file mode 100644
index 0000000..d8150e0
--- /dev/null
+++ b/src/bundleparquet/spliturl.hpp
@@ -0,0 +1,11 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include <optional>
4#include <string>
5
6struct SplitUrl {
7 std::string schemehost;
8 std::string portpath;
9};
10
11std::optional<SplitUrl> splitUrl(const std::string &url, std::string *error = nullptr);