From 17a3ea880402338420699e03bcb24181e4ff3924 Mon Sep 17 00:00:00 2001 From: Rutger Broekhoff Date: Thu, 2 May 2024 20:27:40 +0200 Subject: Initial commit Based on dc4ba6a --- src/bundleparquet/.envrc | 2 + src/bundleparquet/Makefile | 21 ++++ src/bundleparquet/main.cpp | 213 +++++++++++++++++++++++++++++++++++++++++ src/bundleparquet/spliturl.cpp | 203 +++++++++++++++++++++++++++++++++++++++ src/bundleparquet/spliturl.hpp | 11 +++ 5 files changed, 450 insertions(+) create mode 100644 src/bundleparquet/.envrc create mode 100644 src/bundleparquet/Makefile create mode 100644 src/bundleparquet/main.cpp create mode 100644 src/bundleparquet/spliturl.cpp create mode 100644 src/bundleparquet/spliturl.hpp (limited to 'src/bundleparquet') diff --git a/src/bundleparquet/.envrc b/src/bundleparquet/.envrc new file mode 100644 index 0000000..694e74f --- /dev/null +++ b/src/bundleparquet/.envrc @@ -0,0 +1,2 @@ +source_env ../../ +export DEVMODE=1 diff --git a/src/bundleparquet/Makefile b/src/bundleparquet/Makefile new file mode 100644 index 0000000..170304d --- /dev/null +++ b/src/bundleparquet/Makefile @@ -0,0 +1,21 @@ +# Taken from: +# Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide +# for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01, +# 2023. [Online]. Available: +# https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html +CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\ + -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \ + -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \ + -D_GLIBCXX_ASSERTIONS \ + -fstrict-flex-arrays=3 \ + -fstack-clash-protection -fstack-protector-strong +LDFLAGS=-larrow -lcurl -lparquet -lprometheus-cpp-push -lprometheus-cpp-core -lz -ltmi8 -Wl,-z,defs \ + -Wl,-z,nodlopen -Wl,-z,noexecstack \ + -Wl,-z,relro -Wl,-z,now + +bundleparquet: main.cpp spliturl.cpp + $(CXX) -fPIE -pie -o $@ $^ $(CXXFLAGS) $(LDFLAGS) + +.PHONY: clean +clean: + rm bundleparquet diff --git a/src/bundleparquet/main.cpp b/src/bundleparquet/main.cpp new file mode 100644 index 0000000..05fd881 --- /dev/null +++ b/src/bundleparquet/main.cpp @@ -0,0 +1,213 @@ +// vim:set sw=2 ts=2 sts et: + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include +#include +#include + +#include + +#include "spliturl.hpp" + +static const int MIN_COMBINED_ROWS = 1000000; // one million +static const int MAX_COMBINED_ROWS = 2000000; // two million + +struct FileMetadata { + int64_t min_timestamp = 0; + int64_t max_timestamp = 0; + int64_t rows_written = 0; +}; + +struct File { + FileMetadata metadata; + std::filesystem::path filename; +}; + +FileMetadata readMetadataOf(std::filesystem::path filename) { + std::string meta_filename = std::string(filename) + ".meta.json"; + std::ifstream meta_file = std::ifstream(meta_filename, std::ifstream::in|std::ifstream::binary); + nlohmann::json meta_json; + meta_file >> meta_json; + FileMetadata meta = { + .min_timestamp = meta_json["min_timestamp"], + .max_timestamp = meta_json["max_timestamp"], + .rows_written = meta_json["rows_written"], + }; + return meta; +} + +arrow::Status processFirstTables(std::deque &files, prometheus::Counter &rows_written) { + if (files.size() == 0) { + std::cerr << "Did not find any files" << std::endl; + return arrow::Status::OK(); + } + + int64_t rows = 0; + + std::vector> tables; + std::vector processed; + int64_t min_timestamp = std::numeric_limits::max(); + int64_t max_timestamp = 0; + + bool over_capacity_risk = false; + auto it = files.begin(); + while (it != files.end()) { + const std::filesystem::path &filename = it->filename; + const FileMetadata &metadata = it->metadata; + + std::shared_ptr input; + ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open(filename)); + + std::unique_ptr arrow_reader; + ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, arrow::default_memory_pool(), &arrow_reader)); + + if (metadata.min_timestamp < min_timestamp) + min_timestamp = metadata.min_timestamp; + if (metadata.max_timestamp > max_timestamp) + max_timestamp = metadata.max_timestamp; + + if (rows + metadata.rows_written > MAX_COMBINED_ROWS) { + over_capacity_risk = true; + break; + } + + std::shared_ptr table; + ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table)); + tables.push_back(table); + processed.push_back(filename); + rows += metadata.rows_written; + it = files.erase(it); + } + + if (rows < MIN_COMBINED_ROWS && !over_capacity_risk) { + std::cerr << "Found files, but not enough to satisfy the minimum amount of rows for the combined file" << std::endl; + std::cerr << "(We have " << rows << "/" << MIN_COMBINED_ROWS << " rows at the moment, so " + << static_cast(rows)/static_cast(MIN_COMBINED_ROWS)*100.f << "%)" << std::endl; + return arrow::Status::OK(); + } else if (rows == 0 && over_capacity_risk) { + const std::filesystem::path &filename = files.front().filename; + std::filesystem::rename(filename, "merged" / filename); + std::filesystem::rename(std::string(filename) + ".meta.json", std::string("merged" / filename) + ".meta.json"); + rows_written.Increment(static_cast(files.front().metadata.rows_written)); + files.pop_front(); + return arrow::Status::OK(); + } + + // Default options specify that the schemas are not unified, which is + // luckliy exactly what we want :) + std::shared_ptr merged_table; + ARROW_ASSIGN_OR_RAISE(merged_table, arrow::ConcatenateTables(tables)); + + auto timestamp = std::chrono::round(std::chrono::system_clock::now()); + std::string filename = std::format("merged/oeuf-{:%FT%T%Ez}.parquet", timestamp); + ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*merged_table, filename)); + + std::cerr << "Wrote merged table to " << filename << std::endl; + + std::ofstream metaf(filename + ".meta.json.part", std::ios::binary); + nlohmann::json meta{ + { "min_timestamp", min_timestamp }, + { "max_timestamp", max_timestamp }, + { "rows_written", rows }, + }; + metaf << meta; + metaf.close(); + std::filesystem::rename(filename + ".meta.json.part", filename + ".meta.json"); + + std::cerr << "Wrote merged table metadata" << std::endl; + rows_written.Increment(static_cast(rows)); + + for (const std::filesystem::path &filename : processed) { + std::filesystem::remove(filename); + std::filesystem::remove(std::string(filename) + ".meta.json"); + } + + std::cerr << "Successfully wrote merged table, metadata and deleted old files" << std::endl; + + return arrow::Status::OK(); +} + +arrow::Status processTables(std::deque &files, prometheus::Counter &rows_written) { + while (!files.empty()) + ARROW_RETURN_NOT_OK(processFirstTables(files, rows_written)); + return arrow::Status::OK(); +} + +int main(int argc, char *argv[]) { + std::filesystem::path cwd = std::filesystem::current_path(); + std::filesystem::create_directory(cwd / "merged"); + + const char *prom_push_url = getenv("PROMETHEUS_PUSH_URL"); + if (!prom_push_url || strlen(prom_push_url) == 0) { + std::cerr << "Error: no PROMETHEUS_PUSH_URL set!" << std::endl; + return EXIT_FAILURE; + } + + std::string split_err; + auto split_prom_push_url = splitUrl(prom_push_url, &split_err); + if (!split_prom_push_url) { + std::cerr << "Could not process URL in environment variable PROMETHEUS_PUSH_URL: " + << split_err << std::endl; + return EXIT_FAILURE; + } + std::cout << "Prometheus Push URL: " << split_prom_push_url->schemehost << ":" + << split_prom_push_url->portpath << std::endl; + + prometheus::Gateway gateway{split_prom_push_url->schemehost, + split_prom_push_url->portpath, + "oeuf-archiver"}; + + auto registry = std::make_shared(); + prometheus::Gauge &rows_available = prometheus::BuildGauge() + .Name("archiver_rows_available") + .Help("Number of rows available to the archiver") + .Register(*registry) + .Add({}); + prometheus::Counter &rows_written = prometheus::BuildCounter() + .Name("archiver_rows_written") + .Help("Number of rows written by the archiver") + .Register(*registry) + .Add({}); + gateway.RegisterCollectable(registry); + + std::deque files; + for (auto const &dir_entry : std::filesystem::directory_iterator{cwd}) { + if (!dir_entry.is_regular_file()) continue; + std::filesystem::path filename = dir_entry.path().filename(); + const std::string &filename_str = filename; + if (filename_str.starts_with("oeuf-") && filename_str.ends_with("+00:00.parquet")) { + try { + FileMetadata meta = readMetadataOf(filename); + File file = { .metadata = meta, .filename = filename }; + files.push_back(file); + + rows_available.Increment(static_cast(meta.rows_written)); + } catch (const std::exception &e) { + std::cerr << "Failed to read metadata of file " << filename << ": " << e.what() << std::endl; + return EXIT_FAILURE; + } + } + } + + std::sort(files.begin(), files.end(), + [](const File &f1, const File &f2) { return f1.filename < f2.filename; }); + arrow::Status st = processTables(files, rows_written); + if (!st.ok()) { + std::cerr << "Failed to process tables: " << st << std::endl; + return EXIT_FAILURE; + } + + gateway.Push(); +} diff --git a/src/bundleparquet/spliturl.cpp b/src/bundleparquet/spliturl.cpp new file mode 100644 index 0000000..90fd821 --- /dev/null +++ b/src/bundleparquet/spliturl.cpp @@ -0,0 +1,203 @@ +// vim:set sw=2 ts=2 sts et: + +#include +#include +#include +#include +#include + +#include + +#include "spliturl.hpp" + +// splitUrl takes a URL of the shape '[http[s]://]HOST[:PORT][/PATH]', and +// splits it into two URLs: +// - scheme + host -> '[http[s]://]HOST' +// - port + path -> '[PORT][/PATH]' +// In case an IPv6 address is provided, the host must enclosed in square +// brackets. The zone ID may also be indicated. Note that in the resulting +// parts, the colon preceding the port number is omitted. This is on purpose. +std::optional splitUrl(const std::string &url, std::string *error) { + std::stringstream errs; + std::optional result; + char *processed = nullptr; + char *scheme = nullptr; + char *user = nullptr; + char *password = nullptr; + char *zoneid = nullptr; + char *query = nullptr; + char *fragment = nullptr; + CURLU *schemehost = nullptr; + char *schemehost_url = nullptr; + char *portpath_url = nullptr; + + // Parse the URL, allowing the user to omit the scheme. CURL will use 'https' + // by default if no scheme is specified. + + CURLU *parsed = curl_url(); + CURLUcode rc = curl_url_set(parsed, CURLUPART_URL, url.c_str(), CURLU_DEFAULT_SCHEME); + if (rc != CURLUE_OK) { + errs << "Failed to parse URL: " << curl_url_strerror(rc); + goto Exit; + } + + // As we parse the URL with the option CURLU_DEFAULT_SCHEME, the CURL API + // won't require the user to provide the scheme part of the URL. It will + // automatically default the scheme to https. However, we do not usually want + // it to default to HTTPS, but HTTP instead (as the use case, connecting to a + // PushGateway server, usually is served over a private network via HTTP). + // + // This is why we check if the scheme was put there by CURL and otherwise set + // it to HTTP. We also check for any other schemes that the user may have + // provided, and reject anything that is not http/https. + if (!url.starts_with("http://") && !url.starts_with("https://")) { + rc = curl_url_get(parsed, CURLUPART_SCHEME, &scheme, 0); + if (rc != CURLUE_OK) { + errs << "Could not get scheme from parsed URL: " << curl_url_strerror(rc); + goto Exit; + } + if (strcmp(scheme, "https")) { + errs << "Unexpected scheme" << scheme << "in provided URL (expected http or https)"; + goto Exit; + } + rc = curl_url_set(parsed, CURLUPART_SCHEME, "http", 0); + if (rc != CURLUE_OK) { + errs << "Could not set URL scheme to http: " << curl_url_strerror(rc); + goto Exit; + } + } + + // Turn the parsed URL back into a string. + rc = curl_url_get(parsed, CURLUPART_URL, &processed, 0); + if (rc != CURLUE_OK) { + errs << "Failed to output parsed URL: " << curl_url_strerror(rc); + goto Exit; + } + + // This part of the code checks if no prohibited parts are present in the URL + // (basic auth: (user, password), query, fragment). + + rc = curl_url_get(parsed, CURLUPART_USER, &user, 0); + if (rc == CURLUE_OK && strlen(user) != 0) { + errs << "Provided URL should not contain a user part"; + goto Exit; + } else if (rc != CURLUE_NO_USER && rc != CURLUE_OK) { + errs << "Failed to get check user part existence in provided url: " << curl_url_strerror(rc); + goto Exit; + } + + rc = curl_url_get(parsed, CURLUPART_PASSWORD, &password, 0); + if (rc == CURLUE_OK && strlen(password) != 0) { + errs << "Provided URL should not contain a password part"; + goto Exit; + } else if (rc != CURLUE_NO_PASSWORD && rc != CURLUE_OK) { + errs << "Failed to get check password part existence in provided url: " << curl_url_strerror(rc); + goto Exit; + } + + rc = curl_url_get(parsed, CURLUPART_QUERY, &query, 0); + if (rc == CURLUE_OK && strlen(query) != 0) { + errs << "Provided URL should not contain a query part"; + goto Exit; + } else if (rc != CURLUE_NO_QUERY && rc != CURLUE_OK) { + errs << "Failed to get check query part existence in provided url: " << curl_url_strerror(rc); + goto Exit; + } + + rc = curl_url_get(parsed, CURLUPART_FRAGMENT, &fragment, 0); + if (rc == CURLUE_OK && strlen(fragment) != 0) { + errs << "Provided URL should not contain a fragment part"; + goto Exit; + } else if (rc != CURLUE_NO_FRAGMENT && rc != CURLUE_OK) { + errs << "Failed to get check fragment part existence in provided url: " << curl_url_strerror(rc); + goto Exit; + } + + // Now that we know that the provided URL makes sense, we can start doing + // some arts and crafts. We get started by copying the parsed URL into + // schemehost and simply delete all parts which are not scheme + host. + + schemehost = curl_url_dup(parsed); + + // CURL BUG WORKAROUND: CURLUPART_ZONEID is NOT copied by curl_url_dup! + // ^ fixed in CURL 8.3.0 after https://curl.se/mail/lib-2023-07/0047.html + rc = curl_url_get(parsed, CURLUPART_ZONEID, &zoneid, 0); + if (rc == CURLUE_OK) { + rc = curl_url_set(schemehost, CURLUPART_ZONEID, zoneid, 0); + if (rc != CURLUE_OK) { + errs << "Could not copy zone ID to duplicated URL: " << curl_url_strerror(rc); + goto Exit; + } + } + rc = curl_url_set(schemehost, CURLUPART_PORT, nullptr, 0); + if (rc != CURLUE_OK) { + errs << "Could not unset port in duplicated URL: " << curl_url_strerror(rc); + goto Exit; + } + rc = curl_url_set(schemehost, CURLUPART_PATH, nullptr, 0); + if (rc != CURLUE_OK) { + errs << "Could not unset path in duplicated URL: " << curl_url_strerror(rc); + goto Exit; + } + + // Okay, now we have the schemehost CURLU all ready to go. Note that a URL + // only consisting of a scheme and host is considered valid, so CURL will be + // more than happy to actually turn it into a string for us. Which is exactly + // what we do here :) + + rc = curl_url_get(schemehost, CURLUPART_URL, &schemehost_url, 0); + if (rc != CURLUE_OK) { + errs << "Could not get scheme + host URL: " << curl_url_strerror(rc); + goto Exit; + } + + // Remove any trailing slash after the scheme + host URL that CURL might have + // put there -- we still want to get a valid URL if we paste the port + path + // part behind it. + + if (strlen(schemehost_url) > 0) { + if (schemehost_url[strlen(schemehost_url) - 1] != '/') { + errs << "Scheme + host URL does not end with a slash"; + goto Exit; + } + schemehost_url[strlen(schemehost_url) - 1] = '\0'; + } + + // Look, this is really gross. Because the port + path part of the URL is not + // a valid URL itself, but the scheme + host should be a prefix of the full + // URL containing the port + path, we can simply check if it is indeed a + // prefix, and then strip it from the full URL, giving us the port + path + // (after deleting the colon preceding the port). + + if (!std::string_view(processed).starts_with(schemehost_url)) { + errs << "Scheme + host URL is not a prefix of the processed URL"; + goto Exit; + } + + portpath_url = processed + strlen(schemehost_url); + // We should not have the colon before the port, prometheus-cpp inserts it + if (strlen(portpath_url) > 0 && portpath_url[0] == ':') portpath_url++; + // We do not need a trailing slash + if (strlen(portpath_url) > 0 && portpath_url[strlen(portpath_url)-1] == '/') + portpath_url[strlen(portpath_url)-1] = '\0'; + + // It has been done. BLECH + result = std::make_optional(schemehost_url, portpath_url); + +Exit: + curl_free(processed); + curl_free(scheme); + curl_free(user); + curl_free(password); + curl_free(query); + curl_free(fragment); + curl_free(zoneid); + curl_free(schemehost_url); + curl_url_cleanup(schemehost); + curl_url_cleanup(parsed); + + if (!result && error) + *error = errs.str(); + + return result; +} diff --git a/src/bundleparquet/spliturl.hpp b/src/bundleparquet/spliturl.hpp new file mode 100644 index 0000000..d8150e0 --- /dev/null +++ b/src/bundleparquet/spliturl.hpp @@ -0,0 +1,11 @@ +// vim:set sw=2 ts=2 sts et: + +#include +#include + +struct SplitUrl { + std::string schemehost; + std::string portpath; +}; + +std::optional splitUrl(const std::string &url, std::string *error = nullptr); -- cgit v1.2.3