// vim:set sw=2 ts=2 sts et: // // Copyright 2024 Rutger Broekhoff. Licensed under the EUPL. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "spliturl.hpp" static const int MIN_COMBINED_ROWS = 1000000; // one million static const int MAX_COMBINED_ROWS = 2000000; // two million struct FileMetadata { int64_t min_timestamp = 0; int64_t max_timestamp = 0; int64_t rows_written = 0; }; struct File { FileMetadata metadata; std::filesystem::path filename; }; FileMetadata readMetadataOf(std::filesystem::path filename) { std::string meta_filename = std::string(filename) + ".meta.json"; std::ifstream meta_file = std::ifstream(meta_filename, std::ifstream::in|std::ifstream::binary); nlohmann::json meta_json; meta_file >> meta_json; FileMetadata meta = { .min_timestamp = meta_json["min_timestamp"], .max_timestamp = meta_json["max_timestamp"], .rows_written = meta_json["rows_written"], }; return meta; } arrow::Status processFirstTables(std::deque &files, prometheus::Counter &rows_written) { if (files.size() == 0) { std::cerr << "Did not find any files" << std::endl; return arrow::Status::OK(); } int64_t rows = 0; std::vector> tables; std::vector processed; int64_t min_timestamp = std::numeric_limits::max(); int64_t max_timestamp = 0; bool over_capacity_risk = false; auto it = files.begin(); while (it != files.end()) { const std::filesystem::path &filename = it->filename; const FileMetadata &metadata = it->metadata; std::shared_ptr input; ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open(filename)); std::unique_ptr arrow_reader; ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, arrow::default_memory_pool(), &arrow_reader)); if (metadata.min_timestamp < min_timestamp) min_timestamp = metadata.min_timestamp; if (metadata.max_timestamp > max_timestamp) max_timestamp = metadata.max_timestamp; if (rows + metadata.rows_written > MAX_COMBINED_ROWS) { over_capacity_risk = true; break; } std::shared_ptr table; ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table)); tables.push_back(table); processed.push_back(filename); rows += metadata.rows_written; it = files.erase(it); } if (rows < MIN_COMBINED_ROWS && !over_capacity_risk) { std::cerr << "Found files, but not enough to satisfy the minimum amount of rows for the combined file" << std::endl; std::cerr << "(We have " << rows << "/" << MIN_COMBINED_ROWS << " rows at the moment, so " << static_cast(rows)/static_cast(MIN_COMBINED_ROWS)*100.f << "%)" << std::endl; return arrow::Status::OK(); } else if (rows == 0 && over_capacity_risk) { const std::filesystem::path &filename = files.front().filename; std::filesystem::rename(filename, "merged" / filename); std::filesystem::rename(std::string(filename) + ".meta.json", std::string("merged" / filename) + ".meta.json"); rows_written.Increment(static_cast(files.front().metadata.rows_written)); files.pop_front(); return arrow::Status::OK(); } // Default options specify that the schemas are not unified, which is // luckliy exactly what we want :) std::shared_ptr merged_table; ARROW_ASSIGN_OR_RAISE(merged_table, arrow::ConcatenateTables(tables)); auto timestamp = std::chrono::round(std::chrono::system_clock::now()); std::string filename = std::format("merged/oeuf-{:%FT%T%Ez}.parquet", timestamp); ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*merged_table, filename)); std::cerr << "Wrote merged table to " << filename << std::endl; std::ofstream metaf(filename + ".meta.json.part", std::ios::binary); nlohmann::json meta{ { "min_timestamp", min_timestamp }, { "max_timestamp", max_timestamp }, { "rows_written", rows }, }; metaf << meta; metaf.close(); std::filesystem::rename(filename + ".meta.json.part", filename + ".meta.json"); std::cerr << "Wrote merged table metadata" << std::endl; rows_written.Increment(static_cast(rows)); for (const std::filesystem::path &filename : processed) { std::filesystem::remove(filename); std::filesystem::remove(std::string(filename) + ".meta.json"); } std::cerr << "Successfully wrote merged table, metadata and deleted old files" << std::endl; return arrow::Status::OK(); } arrow::Status processTables(std::deque &files, prometheus::Counter &rows_written) { while (!files.empty()) ARROW_RETURN_NOT_OK(processFirstTables(files, rows_written)); return arrow::Status::OK(); } int main(int argc, char *argv[]) { std::filesystem::path cwd = std::filesystem::current_path(); std::filesystem::create_directory(cwd / "merged"); const char *prom_push_url = getenv("PROMETHEUS_PUSH_URL"); if (!prom_push_url || strlen(prom_push_url) == 0) { std::cerr << "Error: no PROMETHEUS_PUSH_URL set!" << std::endl; return EXIT_FAILURE; } std::string split_err; auto split_prom_push_url = splitUrl(prom_push_url, &split_err); if (!split_prom_push_url) { std::cerr << "Could not process URL in environment variable PROMETHEUS_PUSH_URL: " << split_err << std::endl; return EXIT_FAILURE; } std::cout << "Prometheus Push URL: " << split_prom_push_url->schemehost << ":" << split_prom_push_url->portpath << std::endl; prometheus::Gateway gateway{split_prom_push_url->schemehost, split_prom_push_url->portpath, "oeuf-archiver"}; auto registry = std::make_shared(); prometheus::Gauge &rows_available = prometheus::BuildGauge() .Name("archiver_rows_available") .Help("Number of rows available to the archiver") .Register(*registry) .Add({}); prometheus::Counter &rows_written = prometheus::BuildCounter() .Name("archiver_rows_written") .Help("Number of rows written by the archiver") .Register(*registry) .Add({}); gateway.RegisterCollectable(registry); std::deque files; for (auto const &dir_entry : std::filesystem::directory_iterator{cwd}) { if (!dir_entry.is_regular_file()) continue; std::filesystem::path filename = dir_entry.path().filename(); const std::string &filename_str = filename; if (filename_str.starts_with("oeuf-") && filename_str.ends_with("+00:00.parquet")) { try { FileMetadata meta = readMetadataOf(filename); File file = { .metadata = meta, .filename = filename }; files.push_back(file); rows_available.Increment(static_cast(meta.rows_written)); } catch (const std::exception &e) { std::cerr << "Failed to read metadata of file " << filename << ": " << e.what() << std::endl; return EXIT_FAILURE; } } } std::sort(files.begin(), files.end(), [](const File &f1, const File &f2) { return f1.filename < f2.filename; }); arrow::Status st = processTables(files, rows_written); if (!st.ok()) { std::cerr << "Failed to process tables: " << st << std::endl; return EXIT_FAILURE; } gateway.Push(); }