diff options
Diffstat (limited to 'src/bundleparquet/main.cpp')
| -rw-r--r-- | src/bundleparquet/main.cpp | 213 |
1 files changed, 213 insertions, 0 deletions
diff --git a/src/bundleparquet/main.cpp b/src/bundleparquet/main.cpp new file mode 100644 index 0000000..05fd881 --- /dev/null +++ b/src/bundleparquet/main.cpp | |||
| @@ -0,0 +1,213 @@ | |||
| 1 | // vim:set sw=2 ts=2 sts et: | ||
| 2 | |||
| 3 | #include <chrono> | ||
| 4 | #include <deque> | ||
| 5 | #include <filesystem> | ||
| 6 | #include <format> | ||
| 7 | #include <fstream> | ||
| 8 | #include <iostream> | ||
| 9 | |||
| 10 | #include <arrow/api.h> | ||
| 11 | #include <arrow/io/api.h> | ||
| 12 | #include <parquet/arrow/reader.h> | ||
| 13 | |||
| 14 | #include <nlohmann/json.hpp> | ||
| 15 | |||
| 16 | #include <prometheus/counter.h> | ||
| 17 | #include <prometheus/gateway.h> | ||
| 18 | #include <prometheus/registry.h> | ||
| 19 | |||
| 20 | #include <tmi8/kv6_parquet.hpp> | ||
| 21 | |||
| 22 | #include "spliturl.hpp" | ||
| 23 | |||
| 24 | static const int MIN_COMBINED_ROWS = 1000000; // one million | ||
| 25 | static const int MAX_COMBINED_ROWS = 2000000; // two million | ||
| 26 | |||
| 27 | struct FileMetadata { | ||
| 28 | int64_t min_timestamp = 0; | ||
| 29 | int64_t max_timestamp = 0; | ||
| 30 | int64_t rows_written = 0; | ||
| 31 | }; | ||
| 32 | |||
| 33 | struct File { | ||
| 34 | FileMetadata metadata; | ||
| 35 | std::filesystem::path filename; | ||
| 36 | }; | ||
| 37 | |||
| 38 | FileMetadata readMetadataOf(std::filesystem::path filename) { | ||
| 39 | std::string meta_filename = std::string(filename) + ".meta.json"; | ||
| 40 | std::ifstream meta_file = std::ifstream(meta_filename, std::ifstream::in|std::ifstream::binary); | ||
| 41 | nlohmann::json meta_json; | ||
| 42 | meta_file >> meta_json; | ||
| 43 | FileMetadata meta = { | ||
| 44 | .min_timestamp = meta_json["min_timestamp"], | ||
| 45 | .max_timestamp = meta_json["max_timestamp"], | ||
| 46 | .rows_written = meta_json["rows_written"], | ||
| 47 | }; | ||
| 48 | return meta; | ||
| 49 | } | ||
| 50 | |||
| 51 | arrow::Status processFirstTables(std::deque<File> &files, prometheus::Counter &rows_written) { | ||
| 52 | if (files.size() == 0) { | ||
| 53 | std::cerr << "Did not find any files" << std::endl; | ||
| 54 | return arrow::Status::OK(); | ||
| 55 | } | ||
| 56 | |||
| 57 | int64_t rows = 0; | ||
| 58 | |||
| 59 | std::vector<std::shared_ptr<arrow::Table>> tables; | ||
| 60 | std::vector<std::filesystem::path> processed; | ||
| 61 | int64_t min_timestamp = std::numeric_limits<int64_t>::max(); | ||
| 62 | int64_t max_timestamp = 0; | ||
| 63 | |||
| 64 | bool over_capacity_risk = false; | ||
| 65 | auto it = files.begin(); | ||
| 66 | while (it != files.end()) { | ||
| 67 | const std::filesystem::path &filename = it->filename; | ||
| 68 | const FileMetadata &metadata = it->metadata; | ||
| 69 | |||
| 70 | std::shared_ptr<arrow::io::RandomAccessFile> input; | ||
| 71 | ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open(filename)); | ||
| 72 | |||
| 73 | std::unique_ptr<parquet::arrow::FileReader> arrow_reader; | ||
| 74 | ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, arrow::default_memory_pool(), &arrow_reader)); | ||
| 75 | |||
| 76 | if (metadata.min_timestamp < min_timestamp) | ||
| 77 | min_timestamp = metadata.min_timestamp; | ||
| 78 | if (metadata.max_timestamp > max_timestamp) | ||
| 79 | max_timestamp = metadata.max_timestamp; | ||
| 80 | |||
| 81 | if (rows + metadata.rows_written > MAX_COMBINED_ROWS) { | ||
| 82 | over_capacity_risk = true; | ||
| 83 | break; | ||
| 84 | } | ||
| 85 | |||
| 86 | std::shared_ptr<arrow::Table> table; | ||
| 87 | ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table)); | ||
| 88 | tables.push_back(table); | ||
| 89 | processed.push_back(filename); | ||
| 90 | rows += metadata.rows_written; | ||
| 91 | it = files.erase(it); | ||
| 92 | } | ||
| 93 | |||
| 94 | if (rows < MIN_COMBINED_ROWS && !over_capacity_risk) { | ||
| 95 | std::cerr << "Found files, but not enough to satisfy the minimum amount of rows for the combined file" << std::endl; | ||
| 96 | std::cerr << "(We have " << rows << "/" << MIN_COMBINED_ROWS << " rows at the moment, so " | ||
| 97 | << static_cast<float>(rows)/static_cast<float>(MIN_COMBINED_ROWS)*100.f << "%)" << std::endl; | ||
| 98 | return arrow::Status::OK(); | ||
| 99 | } else if (rows == 0 && over_capacity_risk) { | ||
| 100 | const std::filesystem::path &filename = files.front().filename; | ||
| 101 | std::filesystem::rename(filename, "merged" / filename); | ||
| 102 | std::filesystem::rename(std::string(filename) + ".meta.json", std::string("merged" / filename) + ".meta.json"); | ||
| 103 | rows_written.Increment(static_cast<double>(files.front().metadata.rows_written)); | ||
| 104 | files.pop_front(); | ||
| 105 | return arrow::Status::OK(); | ||
| 106 | } | ||
| 107 | |||
| 108 | // Default options specify that the schemas are not unified, which is | ||
| 109 | // luckliy exactly what we want :) | ||
| 110 | std::shared_ptr<arrow::Table> merged_table; | ||
| 111 | ARROW_ASSIGN_OR_RAISE(merged_table, arrow::ConcatenateTables(tables)); | ||
| 112 | |||
| 113 | auto timestamp = std::chrono::round<std::chrono::seconds>(std::chrono::system_clock::now()); | ||
| 114 | std::string filename = std::format("merged/oeuf-{:%FT%T%Ez}.parquet", timestamp); | ||
| 115 | ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*merged_table, filename)); | ||
| 116 | |||
| 117 | std::cerr << "Wrote merged table to " << filename << std::endl; | ||
| 118 | |||
| 119 | std::ofstream metaf(filename + ".meta.json.part", std::ios::binary); | ||
| 120 | nlohmann::json meta{ | ||
| 121 | { "min_timestamp", min_timestamp }, | ||
| 122 | { "max_timestamp", max_timestamp }, | ||
| 123 | { "rows_written", rows }, | ||
| 124 | }; | ||
| 125 | metaf << meta; | ||
| 126 | metaf.close(); | ||
| 127 | std::filesystem::rename(filename + ".meta.json.part", filename + ".meta.json"); | ||
| 128 | |||
| 129 | std::cerr << "Wrote merged table metadata" << std::endl; | ||
| 130 | rows_written.Increment(static_cast<double>(rows)); | ||
| 131 | |||
| 132 | for (const std::filesystem::path &filename : processed) { | ||
| 133 | std::filesystem::remove(filename); | ||
| 134 | std::filesystem::remove(std::string(filename) + ".meta.json"); | ||
| 135 | } | ||
| 136 | |||
| 137 | std::cerr << "Successfully wrote merged table, metadata and deleted old files" << std::endl; | ||
| 138 | |||
| 139 | return arrow::Status::OK(); | ||
| 140 | } | ||
| 141 | |||
| 142 | arrow::Status processTables(std::deque<File> &files, prometheus::Counter &rows_written) { | ||
| 143 | while (!files.empty()) | ||
| 144 | ARROW_RETURN_NOT_OK(processFirstTables(files, rows_written)); | ||
| 145 | return arrow::Status::OK(); | ||
| 146 | } | ||
| 147 | |||
| 148 | int main(int argc, char *argv[]) { | ||
| 149 | std::filesystem::path cwd = std::filesystem::current_path(); | ||
| 150 | std::filesystem::create_directory(cwd / "merged"); | ||
| 151 | |||
| 152 | const char *prom_push_url = getenv("PROMETHEUS_PUSH_URL"); | ||
| 153 | if (!prom_push_url || strlen(prom_push_url) == 0) { | ||
| 154 | std::cerr << "Error: no PROMETHEUS_PUSH_URL set!" << std::endl; | ||
| 155 | return EXIT_FAILURE; | ||
| 156 | } | ||
| 157 | |||
| 158 | std::string split_err; | ||
| 159 | auto split_prom_push_url = splitUrl(prom_push_url, &split_err); | ||
| 160 | if (!split_prom_push_url) { | ||
| 161 | std::cerr << "Could not process URL in environment variable PROMETHEUS_PUSH_URL: " | ||
| 162 | << split_err << std::endl; | ||
| 163 | return EXIT_FAILURE; | ||
| 164 | } | ||
| 165 | std::cout << "Prometheus Push URL: " << split_prom_push_url->schemehost << ":" | ||
| 166 | << split_prom_push_url->portpath << std::endl; | ||
| 167 | |||
| 168 | prometheus::Gateway gateway{split_prom_push_url->schemehost, | ||
| 169 | split_prom_push_url->portpath, | ||
| 170 | "oeuf-archiver"}; | ||
| 171 | |||
| 172 | auto registry = std::make_shared<prometheus::Registry>(); | ||
| 173 | prometheus::Gauge &rows_available = prometheus::BuildGauge() | ||
| 174 | .Name("archiver_rows_available") | ||
| 175 | .Help("Number of rows available to the archiver") | ||
| 176 | .Register(*registry) | ||
| 177 | .Add({}); | ||
| 178 | prometheus::Counter &rows_written = prometheus::BuildCounter() | ||
| 179 | .Name("archiver_rows_written") | ||
| 180 | .Help("Number of rows written by the archiver") | ||
| 181 | .Register(*registry) | ||
| 182 | .Add({}); | ||
| 183 | gateway.RegisterCollectable(registry); | ||
| 184 | |||
| 185 | std::deque<File> files; | ||
| 186 | for (auto const &dir_entry : std::filesystem::directory_iterator{cwd}) { | ||
| 187 | if (!dir_entry.is_regular_file()) continue; | ||
| 188 | std::filesystem::path filename = dir_entry.path().filename(); | ||
| 189 | const std::string &filename_str = filename; | ||
| 190 | if (filename_str.starts_with("oeuf-") && filename_str.ends_with("+00:00.parquet")) { | ||
| 191 | try { | ||
| 192 | FileMetadata meta = readMetadataOf(filename); | ||
| 193 | File file = { .metadata = meta, .filename = filename }; | ||
| 194 | files.push_back(file); | ||
| 195 | |||
| 196 | rows_available.Increment(static_cast<double>(meta.rows_written)); | ||
| 197 | } catch (const std::exception &e) { | ||
| 198 | std::cerr << "Failed to read metadata of file " << filename << ": " << e.what() << std::endl; | ||
| 199 | return EXIT_FAILURE; | ||
| 200 | } | ||
| 201 | } | ||
| 202 | } | ||
| 203 | |||
| 204 | std::sort(files.begin(), files.end(), | ||
| 205 | [](const File &f1, const File &f2) { return f1.filename < f2.filename; }); | ||
| 206 | arrow::Status st = processTables(files, rows_written); | ||
| 207 | if (!st.ok()) { | ||
| 208 | std::cerr << "Failed to process tables: " << st << std::endl; | ||
| 209 | return EXIT_FAILURE; | ||
| 210 | } | ||
| 211 | |||
| 212 | gateway.Push(); | ||
| 213 | } | ||