diff options
Diffstat (limited to 'src/recvkv6')
| -rw-r--r-- | src/recvkv6/.envrc | 2 | ||||
| -rw-r--r-- | src/recvkv6/Makefile | 21 | ||||
| -rw-r--r-- | src/recvkv6/main.cpp | 1300 |
3 files changed, 1323 insertions, 0 deletions
diff --git a/src/recvkv6/.envrc b/src/recvkv6/.envrc new file mode 100644 index 0000000..694e74f --- /dev/null +++ b/src/recvkv6/.envrc | |||
| @@ -0,0 +1,2 @@ | |||
| 1 | source_env ../../ | ||
| 2 | export DEVMODE=1 | ||
diff --git a/src/recvkv6/Makefile b/src/recvkv6/Makefile new file mode 100644 index 0000000..12ff7fb --- /dev/null +++ b/src/recvkv6/Makefile | |||
| @@ -0,0 +1,21 @@ | |||
| 1 | # Taken from: | ||
| 2 | # Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide | ||
| 3 | # for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01, | ||
| 4 | # 2023. [Online]. Available: | ||
| 5 | # https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html | ||
| 6 | CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\ | ||
| 7 | -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \ | ||
| 8 | -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \ | ||
| 9 | -D_GLIBCXX_ASSERTIONS \ | ||
| 10 | -fstrict-flex-arrays=3 \ | ||
| 11 | -fstack-clash-protection -fstack-protector-strong | ||
| 12 | LDFLAGS=-lzmq -larrow -lparquet -lprometheus-cpp-pull -lprometheus-cpp-core -lz -ltmi8 -Wl,-z,defs \ | ||
| 13 | -Wl,-z,nodlopen -Wl,-z,noexecstack \ | ||
| 14 | -Wl,-z,relro -Wl,-z,now | ||
| 15 | |||
| 16 | recvkv6: main.cpp | ||
| 17 | $(CXX) -o $@ $^ $(CXXFLAGS) $(LDFLAGS) | ||
| 18 | |||
| 19 | .PHONY: clean | ||
| 20 | clean: | ||
| 21 | rm recvkv6 | ||
diff --git a/src/recvkv6/main.cpp b/src/recvkv6/main.cpp new file mode 100644 index 0000000..2ac3669 --- /dev/null +++ b/src/recvkv6/main.cpp | |||
| @@ -0,0 +1,1300 @@ | |||
| 1 | // vim:set sw=2 ts=2 sts et: | ||
| 2 | |||
| 3 | #include <array> | ||
| 4 | #include <cassert> | ||
| 5 | #include <chrono> | ||
| 6 | #include <csignal> | ||
| 7 | #include <cstring> | ||
| 8 | #include <filesystem> | ||
| 9 | #include <format> | ||
| 10 | #include <fstream> | ||
| 11 | #include <iostream> | ||
| 12 | #include <optional> | ||
| 13 | #include <stack> | ||
| 14 | #include <string> | ||
| 15 | #include <sstream> | ||
| 16 | #include <vector> | ||
| 17 | |||
| 18 | #include <zlib.h> | ||
| 19 | #include <zmq.h> | ||
| 20 | |||
| 21 | #include <nlohmann/json.hpp> | ||
| 22 | |||
| 23 | #include <prometheus/counter.h> | ||
| 24 | #include <prometheus/exposer.h> | ||
| 25 | #include <prometheus/histogram.h> | ||
| 26 | #include <prometheus/registry.h> | ||
| 27 | |||
| 28 | #include <rapidxml/rapidxml.hpp> | ||
| 29 | |||
| 30 | #include <tmi8/kv6_parquet.hpp> | ||
| 31 | |||
| 32 | #define CHUNK 16384 | ||
| 33 | |||
| 34 | struct RawMessage { | ||
| 35 | public: | ||
| 36 | // Takes ownership of envelope and body | ||
| 37 | RawMessage(zmq_msg_t envelope, zmq_msg_t body) | ||
| 38 | : envelope(envelope), body(body) | ||
| 39 | {} | ||
| 40 | |||
| 41 | // Prevent copying | ||
| 42 | RawMessage(const RawMessage &) = delete; | ||
| 43 | RawMessage &operator=(RawMessage const &) = delete; | ||
| 44 | |||
| 45 | std::string_view getEnvelope() { | ||
| 46 | return static_cast<const char *>(zmq_msg_data(&envelope)); | ||
| 47 | } | ||
| 48 | |||
| 49 | char *getBody() { | ||
| 50 | return static_cast<char *>(zmq_msg_data(&body)); | ||
| 51 | } | ||
| 52 | |||
| 53 | size_t getBodySize() { | ||
| 54 | return zmq_msg_size(&body); | ||
| 55 | } | ||
| 56 | |||
| 57 | ~RawMessage() { | ||
| 58 | zmq_msg_close(&envelope); | ||
| 59 | zmq_msg_close(&body); | ||
| 60 | } | ||
| 61 | |||
| 62 | private: | ||
| 63 | zmq_msg_t envelope; | ||
| 64 | zmq_msg_t body; | ||
| 65 | }; | ||
| 66 | |||
| 67 | std::optional<RawMessage> recvMsg(void *socket) { | ||
| 68 | while (true) { | ||
| 69 | zmq_msg_t envelope, body; | ||
| 70 | int rc = zmq_msg_init(&envelope); | ||
| 71 | assert(rc == 0); | ||
| 72 | rc = zmq_msg_init(&body); | ||
| 73 | assert(rc == 0); | ||
| 74 | |||
| 75 | rc = zmq_msg_recv(&envelope, socket, 0); | ||
| 76 | if (rc == -1) return std::nullopt; | ||
| 77 | |||
| 78 | int more; | ||
| 79 | size_t more_size = sizeof(more); | ||
| 80 | rc = zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size); | ||
| 81 | if (!more) { | ||
| 82 | zmq_msg_close(&envelope); | ||
| 83 | zmq_msg_close(&body); | ||
| 84 | continue; | ||
| 85 | } | ||
| 86 | |||
| 87 | rc = zmq_msg_recv(&body, socket, 0); | ||
| 88 | if (rc == -1) return std::nullopt; | ||
| 89 | |||
| 90 | rc = zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size); | ||
| 91 | assert(!more); | ||
| 92 | |||
| 93 | return std::make_optional<RawMessage>(envelope, body); | ||
| 94 | } | ||
| 95 | } | ||
| 96 | |||
| 97 | // Ensures that <return value>[output_size] == 0 | ||
| 98 | char *decompress(char *raw, unsigned int input_size, unsigned int &output_size) { | ||
| 99 | assert(input_size <= UINT32_MAX); | ||
| 100 | |||
| 101 | z_stream strm; | ||
| 102 | strm.next_in = reinterpret_cast<unsigned char *>(raw); | ||
| 103 | strm.avail_in = input_size; | ||
| 104 | strm.zalloc = Z_NULL; | ||
| 105 | strm.zfree = Z_NULL; | ||
| 106 | strm.opaque = Z_NULL; | ||
| 107 | int rc = inflateInit2(&strm, 32); | ||
| 108 | assert(rc == Z_OK); | ||
| 109 | |||
| 110 | unsigned int buf_cap = CHUNK; | ||
| 111 | unsigned int buf_len = 0; | ||
| 112 | char *buf = static_cast<char *>(malloc(CHUNK)); | ||
| 113 | do { | ||
| 114 | if (buf_len + CHUNK > buf_cap) { | ||
| 115 | assert(buf_cap <= UINT32_MAX); | ||
| 116 | buf_cap *= 2; | ||
| 117 | buf = static_cast<char *>(realloc(buf, buf_cap)); | ||
| 118 | } | ||
| 119 | strm.avail_out = buf_cap - buf_len; | ||
| 120 | strm.next_out = reinterpret_cast<unsigned char *>(buf + buf_len); | ||
| 121 | |||
| 122 | unsigned long old_total = strm.total_out; | ||
| 123 | rc = inflate(&strm, Z_FINISH); | ||
| 124 | unsigned progress = static_cast<unsigned int>(strm.total_out - old_total); | ||
| 125 | buf_len += progress; | ||
| 126 | assert(progress != 0 || rc == Z_STREAM_END); | ||
| 127 | } while (strm.total_in < input_size); | ||
| 128 | |||
| 129 | if (buf_len == buf_cap) { | ||
| 130 | buf = static_cast<char *>(realloc(buf, buf_len + 1)); | ||
| 131 | } | ||
| 132 | buf[buf_len] = 0; | ||
| 133 | output_size = buf_len; | ||
| 134 | |||
| 135 | rc = inflateEnd(&strm); | ||
| 136 | assert(rc == Z_OK); | ||
| 137 | |||
| 138 | return buf; | ||
| 139 | } | ||
| 140 | |||
| 141 | struct Date { | ||
| 142 | int16_t year = 0; | ||
| 143 | uint8_t month = 0; | ||
| 144 | uint8_t day = 0; | ||
| 145 | |||
| 146 | static bool parse(Date &dest, std::string_view src) { | ||
| 147 | dest.year = 0, dest.month = 0, dest.day = 0; | ||
| 148 | |||
| 149 | int16_t y_mul_fac = 1; | ||
| 150 | bool extended = false; | ||
| 151 | |||
| 152 | size_t plus = src.find('+'); | ||
| 153 | if (plus != std::string_view::npos) { | ||
| 154 | extended = true; | ||
| 155 | src = src.substr(1); // remove plus sign from the start | ||
| 156 | } | ||
| 157 | if (!extended) { | ||
| 158 | size_t min_or_dash = src.find('-'); | ||
| 159 | if (min_or_dash == std::string_view::npos) return false; | ||
| 160 | if (min_or_dash == 0) { | ||
| 161 | y_mul_fac = -1; // it's a minus sign | ||
| 162 | src = src.substr(1); // remove minus sign at the start | ||
| 163 | } | ||
| 164 | } | ||
| 165 | |||
| 166 | int y_chars = 0; | ||
| 167 | while (src.size() > 0 && src[0] >= '0' && src[0] <= '9') { | ||
| 168 | dest.year = static_cast<int16_t>(dest.year * 10 + src[0] - '0'); | ||
| 169 | src = src.substr(1); | ||
| 170 | y_chars++; | ||
| 171 | } | ||
| 172 | if (src.size() == 0) { dest.year = 0; return false; } | ||
| 173 | if (src[0] != '-') { dest.year = 0; return false; } | ||
| 174 | src = src.substr(1); // remove dash | ||
| 175 | if (y_chars < 4 || (y_chars > 4 && !extended)) { dest.year = 0; return false; } | ||
| 176 | dest.year *= y_mul_fac; | ||
| 177 | |||
| 178 | bool rest_correct = src.size() == 5 | ||
| 179 | && src[0] >= '0' && src[0] <= '9' | ||
| 180 | && src[1] >= '0' && src[1] <= '9' | ||
| 181 | && src[3] >= '0' && src[3] <= '9' | ||
| 182 | && src[4] >= '0' && src[4] <= '9'; | ||
| 183 | if (!rest_correct) { dest.year = 0; return false; } | ||
| 184 | dest.month = static_cast<uint8_t>((src[0] - '0') * 10 + src[1] - '0'); | ||
| 185 | dest.day = static_cast<uint8_t>((src[3] - '0') * 10 + src[4] - '0'); | ||
| 186 | if (dest.month > 12 || dest.day > 31) { | ||
| 187 | dest.year = 0, dest.month = 0, dest.day = 0; | ||
| 188 | return false; | ||
| 189 | } | ||
| 190 | return true; | ||
| 191 | } | ||
| 192 | |||
| 193 | std::string toString() const { | ||
| 194 | if (year < 0 || year > 9999 || month < 0 || month > 12 || day < 0 || day > 31) | ||
| 195 | throw std::invalid_argument("one or more date components (year, month, day) out of range"); | ||
| 196 | char data[11] = "XXXX-XX-XX"; | ||
| 197 | sprintf(data, "%04u-%02u-%02u", year, month, day); | ||
| 198 | return data; | ||
| 199 | } | ||
| 200 | |||
| 201 | std::chrono::days toUnixDays() const { | ||
| 202 | std::chrono::year_month_day ymd{std::chrono::year(year), std::chrono::month(month), std::chrono::day(day)}; | ||
| 203 | // This is valid since C++20: as of C++20, the system clock is defined to measure the | ||
| 204 | // Unix Time, the amount of seconds since Thursday 1 January 1970, without leap seconds. | ||
| 205 | std::chrono::days since_epoch = std::chrono::sys_days(ymd).time_since_epoch(); | ||
| 206 | return since_epoch; | ||
| 207 | } | ||
| 208 | }; | ||
| 209 | |||
| 210 | struct Time { | ||
| 211 | uint8_t hour = 0; | ||
| 212 | uint8_t minute = 0; | ||
| 213 | uint8_t second = 0; | ||
| 214 | |||
| 215 | static bool parse(Time &dest, std::string_view src) { | ||
| 216 | bool okay = src.size() == 8 | ||
| 217 | && src[0] >= '0' && src[0] <= '9' | ||
| 218 | && src[1] >= '0' && src[1] <= '9' | ||
| 219 | && src[2] == ':' | ||
| 220 | && src[3] >= '0' && src[3] <= '9' | ||
| 221 | && src[4] >= '0' && src[4] <= '9' | ||
| 222 | && src[5] == ':' | ||
| 223 | && src[6] >= '0' && src[6] <= '9' | ||
| 224 | && src[7] >= '0' && src[7] <= '9'; | ||
| 225 | if (!okay) return false; | ||
| 226 | dest.hour = static_cast<uint8_t>((src[0] - '0') * 10 + src[1] - '0'); | ||
| 227 | dest.minute = static_cast<uint8_t>((src[3] - '0') * 10 + src[4] - '0'); | ||
| 228 | dest.second = static_cast<uint8_t>((src[6] - '0') * 10 + src[7] - '0'); | ||
| 229 | if (dest.hour > 23 || dest.minute > 59 || dest.second > 59) { | ||
| 230 | dest.hour = 0, dest.minute = 0, dest.second = 0; | ||
| 231 | return false; | ||
| 232 | } | ||
| 233 | return true; | ||
| 234 | } | ||
| 235 | |||
| 236 | std::string toString() const { | ||
| 237 | if (hour < 0 || hour > 23 || minute < 0 || minute > 59 || second < 0 || second > 59) | ||
| 238 | throw std::invalid_argument("one or more time components (hour, minute, second) out of range"); | ||
| 239 | char data[9] = "XX:XX:XX"; | ||
| 240 | sprintf(data, "%02u:%02u:%02u", hour, minute, second); | ||
| 241 | return data; | ||
| 242 | } | ||
| 243 | }; | ||
| 244 | |||
| 245 | // Time zone designator | ||
| 246 | struct Tzd { | ||
| 247 | int16_t minutes = 0; | ||
| 248 | |||
| 249 | static bool parse(Tzd &dest, std::string_view src) { | ||
| 250 | dest.minutes = 0; | ||
| 251 | |||
| 252 | if (src.size() == 0) return false; | ||
| 253 | if (src == "Z") return true; | ||
| 254 | |||
| 255 | int16_t multiplier = 1; | ||
| 256 | if (src[0] == '-') multiplier = -1; | ||
| 257 | else if (src[0] != '+') return false; | ||
| 258 | src = src.substr(1); | ||
| 259 | |||
| 260 | bool okay = src.size() == 5 | ||
| 261 | && src[0] >= '0' && src[0] <= '9' | ||
| 262 | && src[1] >= '0' && src[1] <= '9' | ||
| 263 | && src[2] == ':' | ||
| 264 | && src[3] >= '0' && src[3] <= '9' | ||
| 265 | && src[4] >= '0' && src[4] <= '9'; | ||
| 266 | if (!okay) return false; | ||
| 267 | int16_t hours = static_cast<int16_t>((src[0] - '0') * 10 + src[1] - '0'); | ||
| 268 | int16_t minutes = static_cast<int16_t>((src[3] - '0') * 10 + src[4] - '0'); | ||
| 269 | if (hours > 23 || minutes > 59) return false; | ||
| 270 | dest.minutes = static_cast<int16_t>(multiplier * (60 * hours + minutes)); | ||
| 271 | return true; | ||
| 272 | } | ||
| 273 | |||
| 274 | std::string toString() const { | ||
| 275 | if (minutes == 0) | ||
| 276 | return "Z"; | ||
| 277 | |||
| 278 | bool negative = minutes < 0; | ||
| 279 | int hours_off = abs(minutes / 60); | ||
| 280 | int mins_off = abs(minutes) - hours_off*60; | ||
| 281 | if (hours_off > 23 || mins_off > 59) | ||
| 282 | throw std::invalid_argument("offset out of range"); | ||
| 283 | char data[7] = "+XX:XX"; | ||
| 284 | sprintf(data, "%c%02u:%02u", negative ? '-' : '+', hours_off, mins_off); | ||
| 285 | return data; | ||
| 286 | } | ||
| 287 | }; | ||
| 288 | |||
| 289 | struct Timestamp { | ||
| 290 | Date date; | ||
| 291 | Tzd off; | ||
| 292 | Time time; | ||
| 293 | |||
| 294 | static bool parse(Timestamp &dest, std::string_view src) { | ||
| 295 | size_t t = src.find('T'); | ||
| 296 | if (t == std::string_view::npos || t + 1 >= src.size()) return false; | ||
| 297 | |||
| 298 | std::string_view date = src.substr(0, t); | ||
| 299 | std::string_view time_and_tzd = src.substr(t + 1); | ||
| 300 | if (time_and_tzd.size() < 9) return false; | ||
| 301 | if (!Date::parse(dest.date, date)) return false; | ||
| 302 | |||
| 303 | std::string_view time = time_and_tzd.substr(0, 8); | ||
| 304 | std::string_view tzd = time_and_tzd.substr(8); | ||
| 305 | if (!Time::parse(dest.time, time)) return false; | ||
| 306 | return Tzd::parse(dest.off, tzd); | ||
| 307 | } | ||
| 308 | |||
| 309 | std::string toString() const { | ||
| 310 | return date.toString() + "T" + time.toString() + off.toString(); | ||
| 311 | } | ||
| 312 | |||
| 313 | std::chrono::seconds toUnixSeconds() const { | ||
| 314 | std::chrono::year_month_day ymd(std::chrono::year(date.year), | ||
| 315 | std::chrono::month(date.month), | ||
| 316 | std::chrono::day(date.day)); | ||
| 317 | std::chrono::sys_days sys_days(ymd); | ||
| 318 | std::chrono::time_point<std::chrono::utc_clock, std::chrono::days> utc_days(sys_days.time_since_epoch()); | ||
| 319 | std::chrono::utc_seconds utc_seconds = std::chrono::time_point_cast<std::chrono::seconds>(utc_days); | ||
| 320 | utc_seconds += std::chrono::hours(time.hour) + std::chrono::minutes(time.minute) + | ||
| 321 | std::chrono::seconds(time.second) - std::chrono::minutes(off.minutes); | ||
| 322 | std::chrono::sys_seconds sys_seconds = std::chrono::utc_clock::to_sys(utc_seconds); | ||
| 323 | std::chrono::seconds unix = sys_seconds.time_since_epoch(); | ||
| 324 | return unix; | ||
| 325 | } | ||
| 326 | }; | ||
| 327 | |||
| 328 | static const std::string_view TMI8_XML_NS = "http://bison.connekt.nl/tmi8/kv6/msg"; | ||
| 329 | |||
| 330 | enum Kv6RecordType { | ||
| 331 | KV6T_UNKNOWN = 0, | ||
| 332 | KV6T_DELAY = 1, | ||
| 333 | KV6T_INIT = 2, | ||
| 334 | KV6T_ARRIVAL = 3, | ||
| 335 | KV6T_ON_STOP = 4, | ||
| 336 | KV6T_DEPARTURE = 5, | ||
| 337 | KV6T_ON_ROUTE = 6, | ||
| 338 | KV6T_ON_PATH = 7, | ||
| 339 | KV6T_OFF_ROUTE = 8, | ||
| 340 | KV6T_END = 9, | ||
| 341 | // Always keep this updated to correspond to the | ||
| 342 | // first and last elements of the enumeration! | ||
| 343 | _KV6T_FIRST_TYPE = KV6T_UNKNOWN, | ||
| 344 | _KV6T_LAST_TYPE = KV6T_END, | ||
| 345 | }; | ||
| 346 | |||
| 347 | enum Kv6Field { | ||
| 348 | KV6F_NONE = 0, | ||
| 349 | KV6F_DATA_OWNER_CODE = 1, | ||
| 350 | KV6F_LINE_PLANNING_NUMBER = 2, | ||
| 351 | KV6F_OPERATING_DAY = 4, | ||
| 352 | KV6F_JOURNEY_NUMBER = 8, | ||
| 353 | KV6F_REINFORCEMENT_NUMBER = 16, | ||
| 354 | KV6F_TIMESTAMP = 32, | ||
| 355 | KV6F_SOURCE = 64, | ||
| 356 | KV6F_PUNCTUALITY = 128, | ||
| 357 | KV6F_USER_STOP_CODE = 256, | ||
| 358 | KV6F_PASSAGE_SEQUENCE_NUMBER = 512, | ||
| 359 | KV6F_VEHICLE_NUMBER = 1024, | ||
| 360 | KV6F_BLOCK_CODE = 2048, | ||
| 361 | KV6F_WHEELCHAIR_ACCESSIBLE = 4096, | ||
| 362 | KV6F_NUMBER_OF_COACHES = 8192, | ||
| 363 | KV6F_RD_Y = 16384, | ||
| 364 | KV6F_RD_X = 32768, | ||
| 365 | KV6F_DISTANCE_SINCE_LAST_USER_STOP = 65536, | ||
| 366 | }; | ||
| 367 | |||
| 368 | static constexpr Kv6Field KV6T_REQUIRED_FIELDS[_KV6T_LAST_TYPE + 1] = { | ||
| 369 | // KV6T_UNKNOWN | ||
| 370 | KV6F_NONE, | ||
| 371 | // KV6T_DELAY | ||
| 372 | static_cast<Kv6Field>( | ||
| 373 | KV6F_DATA_OWNER_CODE | ||
| 374 | | KV6F_LINE_PLANNING_NUMBER | ||
| 375 | | KV6F_OPERATING_DAY | ||
| 376 | | KV6F_JOURNEY_NUMBER | ||
| 377 | | KV6F_REINFORCEMENT_NUMBER | ||
| 378 | | KV6F_TIMESTAMP | ||
| 379 | | KV6F_SOURCE | ||
| 380 | | KV6F_PUNCTUALITY), | ||
| 381 | // KV6T_INIT | ||
| 382 | static_cast<Kv6Field>( | ||
| 383 | KV6F_DATA_OWNER_CODE | ||
| 384 | | KV6F_LINE_PLANNING_NUMBER | ||
| 385 | | KV6F_OPERATING_DAY | ||
| 386 | | KV6F_JOURNEY_NUMBER | ||
| 387 | | KV6F_REINFORCEMENT_NUMBER | ||
| 388 | | KV6F_TIMESTAMP | ||
| 389 | | KV6F_SOURCE | ||
| 390 | | KV6F_USER_STOP_CODE | ||
| 391 | | KV6F_PASSAGE_SEQUENCE_NUMBER | ||
| 392 | | KV6F_VEHICLE_NUMBER | ||
| 393 | | KV6F_BLOCK_CODE | ||
| 394 | | KV6F_WHEELCHAIR_ACCESSIBLE | ||
| 395 | | KV6F_NUMBER_OF_COACHES), | ||
| 396 | // KV6T_ARRIVAL | ||
| 397 | static_cast<Kv6Field>( | ||
| 398 | KV6F_DATA_OWNER_CODE | ||
| 399 | | KV6F_LINE_PLANNING_NUMBER | ||
| 400 | | KV6F_OPERATING_DAY | ||
| 401 | | KV6F_JOURNEY_NUMBER | ||
| 402 | | KV6F_REINFORCEMENT_NUMBER | ||
| 403 | | KV6F_USER_STOP_CODE | ||
| 404 | | KV6F_PASSAGE_SEQUENCE_NUMBER | ||
| 405 | | KV6F_TIMESTAMP | ||
| 406 | | KV6F_SOURCE | ||
| 407 | | KV6F_VEHICLE_NUMBER | ||
| 408 | | KV6F_PUNCTUALITY), | ||
| 409 | // KV6T_ON_STOP | ||
| 410 | static_cast<Kv6Field>( | ||
| 411 | KV6F_DATA_OWNER_CODE | ||
| 412 | | KV6F_LINE_PLANNING_NUMBER | ||
| 413 | | KV6F_OPERATING_DAY | ||
| 414 | | KV6F_JOURNEY_NUMBER | ||
| 415 | | KV6F_REINFORCEMENT_NUMBER | ||
| 416 | | KV6F_USER_STOP_CODE | ||
| 417 | | KV6F_PASSAGE_SEQUENCE_NUMBER | ||
| 418 | | KV6F_TIMESTAMP | ||
| 419 | | KV6F_SOURCE | ||
| 420 | | KV6F_VEHICLE_NUMBER | ||
| 421 | | KV6F_PUNCTUALITY), | ||
| 422 | // KV6T_DEPARTURE | ||
| 423 | static_cast<Kv6Field>( | ||
| 424 | KV6F_DATA_OWNER_CODE | ||
| 425 | | KV6F_LINE_PLANNING_NUMBER | ||
| 426 | | KV6F_OPERATING_DAY | ||
| 427 | | KV6F_JOURNEY_NUMBER | ||
| 428 | | KV6F_REINFORCEMENT_NUMBER | ||
| 429 | | KV6F_USER_STOP_CODE | ||
| 430 | | KV6F_PASSAGE_SEQUENCE_NUMBER | ||
| 431 | | KV6F_TIMESTAMP | ||
| 432 | | KV6F_SOURCE | ||
| 433 | | KV6F_VEHICLE_NUMBER | ||
| 434 | | KV6F_PUNCTUALITY), | ||
| 435 | // KV6T_ON_ROUTE | ||
| 436 | static_cast<Kv6Field>( | ||
| 437 | KV6F_DATA_OWNER_CODE | ||
| 438 | | KV6F_LINE_PLANNING_NUMBER | ||
| 439 | | KV6F_OPERATING_DAY | ||
| 440 | | KV6F_JOURNEY_NUMBER | ||
| 441 | | KV6F_REINFORCEMENT_NUMBER | ||
| 442 | | KV6F_USER_STOP_CODE | ||
| 443 | | KV6F_PASSAGE_SEQUENCE_NUMBER | ||
| 444 | | KV6F_TIMESTAMP | ||
| 445 | | KV6F_SOURCE | ||
| 446 | | KV6F_VEHICLE_NUMBER | ||
| 447 | | KV6F_PUNCTUALITY | ||
| 448 | | KV6F_RD_X | ||
| 449 | | KV6F_RD_Y), | ||
| 450 | // KV6T_ON_PATH | ||
| 451 | KV6F_NONE, | ||
| 452 | // KV6T_OFF_ROUTE | ||
| 453 | static_cast<Kv6Field>( | ||
| 454 | KV6F_DATA_OWNER_CODE | ||
| 455 | | KV6F_LINE_PLANNING_NUMBER | ||
| 456 | | KV6F_OPERATING_DAY | ||
| 457 | | KV6F_JOURNEY_NUMBER | ||
| 458 | | KV6F_REINFORCEMENT_NUMBER | ||
| 459 | | KV6F_TIMESTAMP | ||
| 460 | | KV6F_SOURCE | ||
| 461 | | KV6F_USER_STOP_CODE | ||
| 462 | | KV6F_PASSAGE_SEQUENCE_NUMBER | ||
| 463 | | KV6F_VEHICLE_NUMBER | ||
| 464 | | KV6F_RD_X | ||
| 465 | | KV6F_RD_Y), | ||
| 466 | // KV6T_END | ||
| 467 | static_cast<Kv6Field>( | ||
| 468 | KV6F_DATA_OWNER_CODE | ||
| 469 | | KV6F_LINE_PLANNING_NUMBER | ||
| 470 | | KV6F_OPERATING_DAY | ||
| 471 | | KV6F_JOURNEY_NUMBER | ||
| 472 | | KV6F_REINFORCEMENT_NUMBER | ||
| 473 | | KV6F_TIMESTAMP | ||
| 474 | | KV6F_SOURCE | ||
| 475 | | KV6F_USER_STOP_CODE | ||
| 476 | | KV6F_PASSAGE_SEQUENCE_NUMBER | ||
| 477 | | KV6F_VEHICLE_NUMBER), | ||
| 478 | }; | ||
| 479 | |||
| 480 | static constexpr Kv6Field KV6T_OPTIONAL_FIELDS[_KV6T_LAST_TYPE + 1] = { | ||
| 481 | // KV6T_UNKNOWN | ||
| 482 | KV6F_NONE, | ||
| 483 | // KV6T_DELAY | ||
| 484 | KV6F_NONE, | ||
| 485 | // KV6T_INIT | ||
| 486 | KV6F_NONE, | ||
| 487 | // KV6T_ARRIVAL | ||
| 488 | static_cast<Kv6Field>(KV6F_RD_X | KV6F_RD_Y), | ||
| 489 | // KV6T_ON_STOP | ||
| 490 | static_cast<Kv6Field>(KV6F_RD_X | KV6F_RD_Y), | ||
| 491 | // KV6T_DEPARTURE | ||
| 492 | static_cast<Kv6Field>(KV6F_RD_X | KV6F_RD_Y), | ||
| 493 | // KV6T_ON_ROUTE | ||
| 494 | KV6F_DISTANCE_SINCE_LAST_USER_STOP, | ||
| 495 | // KV6T_ON_PATH | ||
| 496 | KV6F_NONE, | ||
| 497 | // KV6T_OFF_ROUTE | ||
| 498 | KV6F_NONE, | ||
| 499 | // KV6T_END | ||
| 500 | KV6F_NONE, | ||
| 501 | }; | ||
| 502 | |||
| 503 | struct Kv6Record { | ||
| 504 | Kv6RecordType type = KV6T_UNKNOWN; | ||
| 505 | Kv6Field presence = KV6F_NONE; | ||
| 506 | Kv6Field next = KV6F_NONE; | ||
| 507 | std::string data_owner_code; | ||
| 508 | std::string line_planning_number; | ||
| 509 | std::string source; | ||
| 510 | std::string user_stop_code; | ||
| 511 | std::string wheelchair_accessible; | ||
| 512 | Date operating_day; | ||
| 513 | Timestamp timestamp; | ||
| 514 | uint32_t block_code = 0; | ||
| 515 | uint32_t journey_number = 0; | ||
| 516 | uint32_t vehicle_number = 0; | ||
| 517 | int32_t rd_x = 0; | ||
| 518 | int32_t rd_y = 0; | ||
| 519 | // The TMI8 specification is unclear: this field | ||
| 520 | // might actually be called distancesincelaststop | ||
| 521 | uint32_t distance_since_last_user_stop = 0; | ||
| 522 | uint16_t passage_sequence_number = 0; | ||
| 523 | int16_t punctuality = 0; | ||
| 524 | uint8_t number_of_coaches = 0; | ||
| 525 | uint8_t reinforcement_number = 0; | ||
| 526 | |||
| 527 | void markPresent(Kv6Field field) { | ||
| 528 | presence = static_cast<Kv6Field>(presence | field); | ||
| 529 | } | ||
| 530 | |||
| 531 | void removeUnsupportedFields() { | ||
| 532 | Kv6Field required_fields = KV6T_REQUIRED_FIELDS[type]; | ||
| 533 | Kv6Field optional_fields = KV6T_OPTIONAL_FIELDS[type]; | ||
| 534 | Kv6Field supported_fields = static_cast<Kv6Field>(required_fields | optional_fields); | ||
| 535 | presence = static_cast<Kv6Field>(presence & supported_fields); | ||
| 536 | } | ||
| 537 | |||
| 538 | bool valid() { | ||
| 539 | Kv6Field required_fields = KV6T_REQUIRED_FIELDS[type]; | ||
| 540 | Kv6Field optional_fields = KV6T_OPTIONAL_FIELDS[type]; | ||
| 541 | Kv6Field supported_fields = static_cast<Kv6Field>(required_fields | optional_fields); | ||
| 542 | |||
| 543 | Kv6Field required_field_presence = static_cast<Kv6Field>(presence & required_fields); | ||
| 544 | Kv6Field unsupported_field_presence = static_cast<Kv6Field>(presence & ~supported_fields); | ||
| 545 | |||
| 546 | return required_field_presence == required_fields && !unsupported_field_presence; | ||
| 547 | } | ||
| 548 | }; | ||
| 549 | |||
| 550 | enum Tmi8VvTmPushInfoField { | ||
| 551 | TMI8F_NONE = 0, | ||
| 552 | TMI8F_SUBSCRIBER_ID = 1, | ||
| 553 | TMI8F_VERSION = 2, | ||
| 554 | TMI8F_DOSSIER_NAME = 4, | ||
| 555 | TMI8F_TIMESTAMP = 8, | ||
| 556 | }; | ||
| 557 | |||
| 558 | struct Tmi8VvTmPushInfo { | ||
| 559 | Tmi8VvTmPushInfoField next = TMI8F_NONE; | ||
| 560 | Tmi8VvTmPushInfoField presence = TMI8F_NONE; | ||
| 561 | std::string subscriber_id; | ||
| 562 | std::string version; | ||
| 563 | std::string dossier_name; | ||
| 564 | Timestamp timestamp; | ||
| 565 | std::vector<Kv6Record> messages; | ||
| 566 | |||
| 567 | void markPresent(Tmi8VvTmPushInfoField field) { | ||
| 568 | presence = static_cast<Tmi8VvTmPushInfoField>(presence | field); | ||
| 569 | } | ||
| 570 | |||
| 571 | bool valid() { | ||
| 572 | const Tmi8VvTmPushInfoField REQUIRED_FIELDS = | ||
| 573 | static_cast<Tmi8VvTmPushInfoField>( | ||
| 574 | TMI8F_SUBSCRIBER_ID | ||
| 575 | | TMI8F_VERSION | ||
| 576 | | TMI8F_DOSSIER_NAME | ||
| 577 | | TMI8F_TIMESTAMP); | ||
| 578 | return (presence & REQUIRED_FIELDS) == REQUIRED_FIELDS; | ||
| 579 | } | ||
| 580 | }; | ||
| 581 | |||
| 582 | static const std::array<std::string_view, _KV6T_LAST_TYPE + 1> KV6_POS_INFO_RECORD_TYPES = { | ||
| 583 | "UNKNOWN", "DELAY", "INIT", "ARRIVAL", "ONSTOP", "DEPARTURE", "ONROUTE", "ONPATH", "OFFROUTE", "END", | ||
| 584 | }; | ||
| 585 | |||
| 586 | std::optional<std::string_view> findKv6PosInfoRecordTypeName(Kv6RecordType type) { | ||
| 587 | if (type > _KV6T_LAST_TYPE) | ||
| 588 | return std::nullopt; | ||
| 589 | return KV6_POS_INFO_RECORD_TYPES[type]; | ||
| 590 | } | ||
| 591 | |||
| 592 | const std::array<std::tuple<std::string_view, Kv6Field>, 17> KV6_POS_INFO_RECORD_FIELDS = {{ | ||
| 593 | { "dataownercode", KV6F_DATA_OWNER_CODE }, | ||
| 594 | { "lineplanningnumber", KV6F_LINE_PLANNING_NUMBER }, | ||
| 595 | { "operatingday", KV6F_OPERATING_DAY }, | ||
| 596 | { "journeynumber", KV6F_JOURNEY_NUMBER }, | ||
| 597 | { "reinforcementnumber", KV6F_REINFORCEMENT_NUMBER }, | ||
| 598 | { "timestamp", KV6F_TIMESTAMP }, | ||
| 599 | { "source", KV6F_SOURCE }, | ||
| 600 | { "punctuality", KV6F_PUNCTUALITY }, | ||
| 601 | { "userstopcode", KV6F_USER_STOP_CODE }, | ||
| 602 | { "passagesequencenumber", KV6F_PASSAGE_SEQUENCE_NUMBER }, | ||
| 603 | { "vehiclenumber", KV6F_VEHICLE_NUMBER }, | ||
| 604 | { "blockcode", KV6F_BLOCK_CODE }, | ||
| 605 | { "wheelchairaccessible", KV6F_WHEELCHAIR_ACCESSIBLE }, | ||
| 606 | { "numberofcoaches", KV6F_NUMBER_OF_COACHES }, | ||
| 607 | { "rd-y", KV6F_RD_Y }, | ||
| 608 | { "rd-x", KV6F_RD_X }, | ||
| 609 | { "distancesincelastuserstop", KV6F_DISTANCE_SINCE_LAST_USER_STOP }, | ||
| 610 | }}; | ||
| 611 | |||
| 612 | // Returns the maximum amount of digits such that it is guaranteed that | ||
| 613 | // a corresponding amount of repeated 9's can be represented by the type. | ||
| 614 | template<std::integral T> | ||
| 615 | constexpr size_t maxDigits() { | ||
| 616 | size_t digits = 0; | ||
| 617 | for (T x = std::numeric_limits<T>::max(); x != 0; x /= 10) digits++; | ||
| 618 | return digits - 1; | ||
| 619 | } | ||
| 620 | |||
| 621 | template<size_t MaxDigits, std::unsigned_integral T> | ||
| 622 | constexpr bool parseUnsigned(T &out, std::string_view src) { | ||
| 623 | static_assert(MaxDigits <= maxDigits<T>()); | ||
| 624 | if (src.size() > MaxDigits) return false; | ||
| 625 | T res = 0; | ||
| 626 | while (src.size() > 0) { | ||
| 627 | if (src[0] < '0' || src[0] > '9') return false; | ||
| 628 | res = static_cast<T>(res * 10 + src[0] - '0'); | ||
| 629 | src = src.substr(1); | ||
| 630 | } | ||
| 631 | out = res; | ||
| 632 | return true; | ||
| 633 | } | ||
| 634 | |||
| 635 | template<size_t MaxDigits, std::signed_integral T> | ||
| 636 | constexpr bool parseSigned(T &out, std::string_view src) { | ||
| 637 | static_assert(MaxDigits <= maxDigits<T>()); | ||
| 638 | if (src.size() == 0) return false; | ||
| 639 | bool negative = src[0] == '-'; | ||
| 640 | if (negative) src = src.substr(1); | ||
| 641 | if (src.size() > MaxDigits) return false; | ||
| 642 | T res = 0; | ||
| 643 | while (src.size() > 0) { | ||
| 644 | if (src[0] < '0' || src[0] > '9') return false; | ||
| 645 | res = static_cast<T>(res * 10 + src[0] - '0'); | ||
| 646 | src = src.substr(1); | ||
| 647 | } | ||
| 648 | out = negative ? -res : res; | ||
| 649 | return true; | ||
| 650 | } | ||
| 651 | |||
| 652 | struct Xmlns { | ||
| 653 | const Xmlns *next; | ||
| 654 | std::string_view prefix; | ||
| 655 | std::string_view url; | ||
| 656 | }; | ||
| 657 | |||
| 658 | std::optional<std::string_view> resolve(std::string_view prefix, const Xmlns *nss) { | ||
| 659 | while (nss) | ||
| 660 | if (nss->prefix == prefix) | ||
| 661 | return nss->url; | ||
| 662 | else | ||
| 663 | nss = nss->next; | ||
| 664 | return std::nullopt; | ||
| 665 | } | ||
| 666 | |||
| 667 | template<typename T> | ||
| 668 | void withXmlnss(const rapidxml::xml_attribute<> *attr, const Xmlns *nss, const T &fn) { | ||
| 669 | while (attr) { | ||
| 670 | std::string_view name(attr->name(), attr->name_size()); | ||
| 671 | if (name.starts_with("xmlns")) { | ||
| 672 | if (name.size() == 5) { // just xmlns | ||
| 673 | Xmlns ns0 = { | ||
| 674 | .next = nss, | ||
| 675 | .url = std::string_view(attr->value(), attr->value_size()), | ||
| 676 | }; | ||
| 677 | withXmlnss(attr->next_attribute(), &ns0, fn); | ||
| 678 | return; | ||
| 679 | } else if (name.size() > 6 && name[5] == ':') { // xmlns:<something> | ||
| 680 | Xmlns ns0 = { | ||
| 681 | .next = nss, | ||
| 682 | .prefix = name.substr(6), | ||
| 683 | .url = std::string_view(attr->value(), attr->value_size()), | ||
| 684 | }; | ||
| 685 | withXmlnss(attr->next_attribute(), &ns0, fn); | ||
| 686 | return; | ||
| 687 | } | ||
| 688 | } | ||
| 689 | attr = attr->next_attribute(); | ||
| 690 | } | ||
| 691 | fn(nss); | ||
| 692 | } | ||
| 693 | |||
| 694 | template<typename T> | ||
| 695 | void ifResolvable(const rapidxml::xml_node<> &node, const Xmlns *nss, const T &fn) { | ||
| 696 | std::string_view name(node.name(), node.name_size()); | ||
| 697 | std::string_view ns; | ||
| 698 | size_t colon = name.find(':'); | ||
| 699 | |||
| 700 | if (colon != std::string_view::npos) { | ||
| 701 | if (colon >= name.size() - 1) // last character | ||
| 702 | return; | ||
| 703 | ns = name.substr(0, colon); | ||
| 704 | name = name.substr(colon + 1); | ||
| 705 | } | ||
| 706 | |||
| 707 | withXmlnss(node.first_attribute(), nss, [&](const Xmlns *nss) { | ||
| 708 | std::optional<std::string_view> ns_url = resolve(ns, nss); | ||
| 709 | if (!ns_url && !ns.empty()) return; | ||
| 710 | if (!ns_url) fn(std::string_view(), name, nss); | ||
| 711 | else fn(*ns_url, name, nss); | ||
| 712 | }); | ||
| 713 | } | ||
| 714 | |||
| 715 | template<typename T> | ||
| 716 | void ifTmi8Element(const rapidxml::xml_node<> &node, const Xmlns *nss, const T &fn) { | ||
| 717 | ifResolvable(node, nss, [&](std::string_view ns_url, std::string_view name, const Xmlns *nss) { | ||
| 718 | if (node.type() == rapidxml::node_element && (ns_url.empty() || ns_url == TMI8_XML_NS)) fn(name, nss); | ||
| 719 | }); | ||
| 720 | } | ||
| 721 | |||
| 722 | bool onlyTextElement(const rapidxml::xml_node<> &node) { | ||
| 723 | return node.type() == rapidxml::node_element | ||
| 724 | && node.first_node() | ||
| 725 | && node.first_node() == node.last_node() | ||
| 726 | && node.first_node()->type() == rapidxml::node_data; | ||
| 727 | } | ||
| 728 | |||
| 729 | std::string_view getValue(const rapidxml::xml_node<> &node) { | ||
| 730 | return std::string_view(node.value(), node.value_size()); | ||
| 731 | } | ||
| 732 | |||
| 733 | bool parseStringValue(std::string &into, size_t max_len, std::string_view val) { | ||
| 734 | if (val.size() > max_len) | ||
| 735 | return false; | ||
| 736 | into = val; | ||
| 737 | return true; | ||
| 738 | } | ||
| 739 | |||
| 740 | struct Kv6Parser { | ||
| 741 | std::stringstream &errs; | ||
| 742 | std::stringstream &warns; | ||
| 743 | |||
| 744 | void error(std::string_view msg) { | ||
| 745 | errs << msg << '\n'; | ||
| 746 | } | ||
| 747 | |||
| 748 | void warn(std::string_view msg) { | ||
| 749 | warns << msg << '\n'; | ||
| 750 | } | ||
| 751 | |||
| 752 | #define PERRASSERT(msg, ...) do { if (!(__VA_ARGS__)) { error(msg); return; } } while (false) | ||
| 753 | #define PWARNASSERT(msg, ...) do { if (!(__VA_ARGS__)) { warn(msg); return; } } while (false) | ||
| 754 | |||
| 755 | std::optional<Kv6Record> parseKv6PosInfoRecord(Kv6RecordType type, const rapidxml::xml_node<> &node, const Xmlns *nss) { | ||
| 756 | Kv6Record fields = { .type = type }; | ||
| 757 | for (const rapidxml::xml_node<> *child = node.first_node(); child; child = child->next_sibling()) { | ||
| 758 | ifTmi8Element(*child, nss, [&](std::string_view name, const Xmlns *nss) { | ||
| 759 | for (const auto &[fname, field] : KV6_POS_INFO_RECORD_FIELDS) { | ||
| 760 | if (field == KV6F_NONE) | ||
| 761 | continue; | ||
| 762 | if (fname == name) { | ||
| 763 | PWARNASSERT("Expected KV6 record field element to only contain data", | ||
| 764 | onlyTextElement(*child)); | ||
| 765 | std::string_view childval = getValue(*child); | ||
| 766 | switch (field) { | ||
| 767 | case KV6F_DATA_OWNER_CODE: | ||
| 768 | PWARNASSERT("Invalid value for dataownercode", | ||
| 769 | parseStringValue(fields.data_owner_code, 10, childval)); | ||
| 770 | break; | ||
| 771 | case KV6F_LINE_PLANNING_NUMBER: | ||
| 772 | PWARNASSERT("Invalid value for lineplanningnumber", | ||
| 773 | parseStringValue(fields.line_planning_number, 10, childval)); | ||
| 774 | break; | ||
| 775 | case KV6F_OPERATING_DAY: | ||
| 776 | PWARNASSERT("Invalid value for operatatingday: not a valid date", | ||
| 777 | Date::parse(fields.operating_day, childval)); | ||
| 778 | break; | ||
| 779 | case KV6F_JOURNEY_NUMBER: | ||
| 780 | PWARNASSERT("Invalid value for journeynumber:" | ||
| 781 | " not a valid unsigned number with at most six digits", | ||
| 782 | parseUnsigned<6>(fields.journey_number, childval)); | ||
| 783 | break; | ||
| 784 | case KV6F_REINFORCEMENT_NUMBER: | ||
| 785 | PWARNASSERT("Invalid value for reinforcementnumber:" | ||
| 786 | " not a valid unsigned number with at most two digits", | ||
| 787 | parseUnsigned<2>(fields.reinforcement_number, childval)); | ||
| 788 | break; | ||
| 789 | case KV6F_TIMESTAMP: | ||
| 790 | PWARNASSERT("Invalid value for timestamp: not a valid timestamp", | ||
| 791 | Timestamp::parse(fields.timestamp, childval)); | ||
| 792 | break; | ||
| 793 | case KV6F_SOURCE: | ||
| 794 | PWARNASSERT("Invalid value for source:" | ||
| 795 | " not a valid string of at most 10 bytes", | ||
| 796 | parseStringValue(fields.source, 10, childval)); | ||
| 797 | break; | ||
| 798 | case KV6F_PUNCTUALITY: | ||
| 799 | PWARNASSERT("Invalid value for punctuality:" | ||
| 800 | " not a valid signed number with at most four digits", | ||
| 801 | parseSigned<4>(fields.punctuality, childval)); | ||
| 802 | break; | ||
| 803 | case KV6F_USER_STOP_CODE: | ||
| 804 | PWARNASSERT("Invalid value for userstopcode:" | ||
| 805 | " not a valid string of at most 10 bytes", | ||
| 806 | parseStringValue(fields.user_stop_code, 10, childval)); | ||
| 807 | break; | ||
| 808 | case KV6F_PASSAGE_SEQUENCE_NUMBER: | ||
| 809 | PWARNASSERT("Invalid value for passagesequencenumber:" | ||
| 810 | " not a valid unsigned number with at most four digits", | ||
| 811 | parseUnsigned<4>(fields.passage_sequence_number, childval)); | ||
| 812 | break; | ||
| 813 | case KV6F_VEHICLE_NUMBER: | ||
| 814 | PWARNASSERT("Invalid value for vehiclenumber:" | ||
| 815 | " not a valid unsigned number with at most six digits", | ||
| 816 | parseUnsigned<6>(fields.vehicle_number, childval)); | ||
| 817 | break; | ||
| 818 | case KV6F_BLOCK_CODE: | ||
| 819 | PWARNASSERT("Invalid value for blockcode:" | ||
| 820 | " not a valid unsigned number with at most eight digits", | ||
| 821 | parseUnsigned<8>(fields.block_code, childval)); | ||
| 822 | break; | ||
| 823 | case KV6F_WHEELCHAIR_ACCESSIBLE: | ||
| 824 | PWARNASSERT("Invalid value for wheelchairaccessible:" | ||
| 825 | " not a valid value for wheelchair accessibility", | ||
| 826 | childval == "ACCESSIBLE" | ||
| 827 | || childval == "NOTACCESSIBLE" | ||
| 828 | || childval == "UNKNOWN"); | ||
| 829 | fields.wheelchair_accessible = childval; | ||
| 830 | break; | ||
| 831 | case KV6F_NUMBER_OF_COACHES: | ||
| 832 | PWARNASSERT("Invalid for numberofcoaches:" | ||
| 833 | " not a valid unsigned number with at most two digits", | ||
| 834 | parseUnsigned<2>(fields.number_of_coaches, childval)); | ||
| 835 | break; | ||
| 836 | case KV6F_RD_X: | ||
| 837 | PWARNASSERT("Invalid value for rd-x:" | ||
| 838 | " not a valid signed number with at most six digits", | ||
| 839 | parseSigned<6>(fields.rd_x, childval)); | ||
| 840 | break; | ||
| 841 | case KV6F_RD_Y: | ||
| 842 | PWARNASSERT("Invalid value for rd-y:" | ||
| 843 | " not a valid signed number with at most six digits", | ||
| 844 | parseSigned<6>(fields.rd_y, childval)); | ||
| 845 | break; | ||
| 846 | case KV6F_DISTANCE_SINCE_LAST_USER_STOP: | ||
| 847 | PWARNASSERT("Invalid value for distancesincelastuserstop:" | ||
| 848 | " not a valid unsigned number with at most five digits", | ||
| 849 | parseUnsigned<5>(fields.distance_since_last_user_stop, childval)); | ||
| 850 | break; | ||
| 851 | case KV6F_NONE: | ||
| 852 | error("NONE field type case should be unreachable in parseKv6PosInfoRecord"); | ||
| 853 | return; | ||
| 854 | } | ||
| 855 | fields.markPresent(field); | ||
| 856 | break; | ||
| 857 | } | ||
| 858 | } | ||
| 859 | }); | ||
| 860 | } | ||
| 861 | |||
| 862 | fields.removeUnsupportedFields(); | ||
| 863 | |||
| 864 | if (!fields.valid()) | ||
| 865 | return std::nullopt; | ||
| 866 | return fields; | ||
| 867 | } | ||
| 868 | |||
| 869 | std::vector<Kv6Record> parseKv6PosInfo(const rapidxml::xml_node<> &node, const Xmlns *nss) { | ||
| 870 | std::vector<Kv6Record> records; | ||
| 871 | for (const rapidxml::xml_node<> *child = node.first_node(); child; child = child->next_sibling()) { | ||
| 872 | ifTmi8Element(*child, nss, [&](std::string_view name, const Xmlns *nss) { | ||
| 873 | for (auto type = _KV6T_FIRST_TYPE; | ||
| 874 | type != _KV6T_LAST_TYPE; | ||
| 875 | type = static_cast<Kv6RecordType>(type + 1)) { | ||
| 876 | if (type == KV6T_UNKNOWN) | ||
| 877 | continue; | ||
| 878 | if (KV6_POS_INFO_RECORD_TYPES[type] == name) { | ||
| 879 | auto record = parseKv6PosInfoRecord(type, *child, nss); | ||
| 880 | if (record) { | ||
| 881 | records.push_back(*record); | ||
| 882 | } | ||
| 883 | } | ||
| 884 | } | ||
| 885 | }); | ||
| 886 | } | ||
| 887 | return records; | ||
| 888 | } | ||
| 889 | |||
| 890 | std::optional<Tmi8VvTmPushInfo> parseVvTmPush(const rapidxml::xml_node<> &node, const Xmlns *nss) { | ||
| 891 | Tmi8VvTmPushInfo info; | ||
| 892 | for (const rapidxml::xml_node<> *child = node.first_node(); child; child = child->next_sibling()) { | ||
| 893 | ifTmi8Element(*child, nss, [&](std::string_view name, const Xmlns *nss) { | ||
| 894 | if (name == "Timestamp") { | ||
| 895 | PERRASSERT("Invalid value for Timestamp: Bad format", onlyTextElement(*child)); | ||
| 896 | PERRASSERT("Invalid value for Timestamp: Invalid timestamp", Timestamp::parse(info.timestamp, getValue(*child))); | ||
| 897 | info.markPresent(TMI8F_TIMESTAMP); | ||
| 898 | } else if (name == "SubscriberID") { | ||
| 899 | PERRASSERT("Invalid value for SubscriberID: Bad format", onlyTextElement(*child)); | ||
| 900 | info.subscriber_id = getValue(*child); | ||
| 901 | info.markPresent(TMI8F_SUBSCRIBER_ID); | ||
| 902 | } else if (name == "Version") { | ||
| 903 | PERRASSERT("Invalid value for Version: Bad format", onlyTextElement(*child)); | ||
| 904 | info.version = getValue(*child); | ||
| 905 | info.markPresent(TMI8F_VERSION); | ||
| 906 | } else if (name == "DossierName") { | ||
| 907 | PERRASSERT("Invalid value for DossierName: Bad format", onlyTextElement(*child)); | ||
| 908 | info.dossier_name = getValue(*child); | ||
| 909 | info.markPresent(TMI8F_DOSSIER_NAME); | ||
| 910 | } else if (name == "KV6posinfo") { | ||
| 911 | info.messages = parseKv6PosInfo(*child, nss); | ||
| 912 | } | ||
| 913 | }); | ||
| 914 | } | ||
| 915 | |||
| 916 | if (!info.valid()) | ||
| 917 | return std::nullopt; | ||
| 918 | return info; | ||
| 919 | } | ||
| 920 | |||
| 921 | std::optional<Tmi8VvTmPushInfo> parse(const rapidxml::xml_document<> &doc) { | ||
| 922 | std::optional<Tmi8VvTmPushInfo> msg; | ||
| 923 | for (const rapidxml::xml_node<> *node = doc.first_node(); node; node = node->next_sibling()) { | ||
| 924 | ifTmi8Element(*node, nullptr /* nss */, [&](std::string_view name, const Xmlns *nss) { | ||
| 925 | if (name == "VV_TM_PUSH") { | ||
| 926 | if (msg) { | ||
| 927 | error("Duplicated VV_TM_PUSH"); | ||
| 928 | return; | ||
| 929 | } | ||
| 930 | msg = parseVvTmPush(*node, nss); | ||
| 931 | if (!msg) { | ||
| 932 | error("Invalid VV_TM_PUSH"); | ||
| 933 | } | ||
| 934 | } | ||
| 935 | }); | ||
| 936 | } | ||
| 937 | if (!msg) | ||
| 938 | error("Expected to find VV_TM_PUSH"); | ||
| 939 | return msg; | ||
| 940 | } | ||
| 941 | }; | ||
| 942 | |||
| 943 | std::optional<Tmi8VvTmPushInfo> parseXml(const rapidxml::xml_document<> &doc, std::stringstream &errs, std::stringstream &warns) { | ||
| 944 | Kv6Parser parser = { errs, warns }; | ||
| 945 | return parser.parse(doc); | ||
| 946 | } | ||
| 947 | |||
| 948 | struct Metrics { | ||
| 949 | prometheus::Counter &messages_counter_ok; | ||
| 950 | prometheus::Counter &messages_counter_error; | ||
| 951 | prometheus::Counter &messages_counter_warning; | ||
| 952 | prometheus::Counter &rows_written_counter; | ||
| 953 | prometheus::Histogram &records_hist; | ||
| 954 | prometheus::Histogram &message_parse_hist; | ||
| 955 | prometheus::Histogram &payload_size_hist; | ||
| 956 | |||
| 957 | using BucketBoundaries = prometheus::Histogram::BucketBoundaries; | ||
| 958 | |||
| 959 | enum class ParseStatus { | ||
| 960 | OK, | ||
| 961 | WARNING, | ||
| 962 | ERROR, | ||
| 963 | }; | ||
| 964 | |||
| 965 | Metrics(std::shared_ptr<prometheus::Registry> registry) : | ||
| 966 | Metrics(registry, prometheus::BuildCounter() | ||
| 967 | .Name("kv6_vv_tm_push_messages_total") | ||
| 968 | .Help("Number of KV6 VV_TM_PUSH messages received") | ||
| 969 | .Register(*registry)) | ||
| 970 | {} | ||
| 971 | |||
| 972 | void addMeasurement(std::chrono::duration<double> took_secs, size_t payload_size, size_t records, ParseStatus parsed) { | ||
| 973 | double millis = took_secs.count() * 1000.0; | ||
| 974 | |||
| 975 | if (parsed == ParseStatus::OK) messages_counter_ok.Increment(); | ||
| 976 | else if (parsed == ParseStatus::WARNING) messages_counter_warning.Increment(); | ||
| 977 | else if (parsed == ParseStatus::ERROR) messages_counter_error.Increment(); | ||
| 978 | records_hist.Observe(static_cast<double>(records)); | ||
| 979 | message_parse_hist.Observe(millis); | ||
| 980 | payload_size_hist.Observe(static_cast<double>(payload_size)); | ||
| 981 | } | ||
| 982 | |||
| 983 | void rowsWritten(int64_t rows) { | ||
| 984 | rows_written_counter.Increment(static_cast<double>(rows)); | ||
| 985 | } | ||
| 986 | |||
| 987 | private: | ||
| 988 | Metrics(std::shared_ptr<prometheus::Registry> registry, | ||
| 989 | prometheus::Family<prometheus::Counter> &messages_counter) : | ||
| 990 | messages_counter_ok(messages_counter | ||
| 991 | .Add({{ "status", "ok" }})), | ||
| 992 | messages_counter_error(messages_counter | ||
| 993 | .Add({{ "status", "error" }})), | ||
| 994 | messages_counter_warning(messages_counter | ||
| 995 | .Add({{ "status", "warning" }})), | ||
| 996 | rows_written_counter(prometheus::BuildCounter() | ||
| 997 | .Name("kv6_vv_tm_push_records_written") | ||
| 998 | .Help("Numer of VV_TM_PUSH records written to disk") | ||
| 999 | .Register(*registry) | ||
| 1000 | .Add({})), | ||
| 1001 | records_hist(prometheus::BuildHistogram() | ||
| 1002 | .Name("kv6_vv_tm_push_records_amount") | ||
| 1003 | .Help("Number of KV6 VV_TM_PUSH records") | ||
| 1004 | .Register(*registry) | ||
| 1005 | .Add({}, BucketBoundaries{ 5.0, 10.0, 20.0, 50.0, 100.0, 250.0, 500.0 })), | ||
| 1006 | message_parse_hist(prometheus::BuildHistogram() | ||
| 1007 | .Name("kv6_vv_tm_push_message_parse_millis") | ||
| 1008 | .Help("Milliseconds taken to parse KV6 VV_TM_PUSH messages") | ||
| 1009 | .Register(*registry) | ||
| 1010 | .Add({}, BucketBoundaries{ 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 100.0, 1000.0, 2000.0 })), | ||
| 1011 | payload_size_hist(prometheus::BuildHistogram() | ||
| 1012 | .Name("kv6_payload_size") | ||
| 1013 | .Help("Sizes of KV6 ZeroMQ message payloads") | ||
| 1014 | .Register(*registry) | ||
| 1015 | .Add({}, BucketBoundaries{ 500.0, 1000.0, 2500.0, 5000.0, 10000.0, 25000.0, 50000.0 })) | ||
| 1016 | {} | ||
| 1017 | }; | ||
| 1018 | |||
| 1019 | // Note: it *must* hold that decompressed[size] == 0 | ||
| 1020 | std::optional<Tmi8VvTmPushInfo> parseMsg(char *decompressed, size_t size, Metrics &metrics, std::stringstream &errs, std::stringstream &warns) { | ||
| 1021 | auto start = std::chrono::steady_clock::now(); | ||
| 1022 | |||
| 1023 | std::optional<Tmi8VvTmPushInfo> info; | ||
| 1024 | |||
| 1025 | if (decompressed[size] != 0) { | ||
| 1026 | errs << "Not parsing: missing null terminator" << '\n'; | ||
| 1027 | } else { | ||
| 1028 | rapidxml::xml_document<> doc; | ||
| 1029 | constexpr int PARSE_FLAGS = rapidxml::parse_trim_whitespace | ||
| 1030 | | rapidxml::parse_no_string_terminators | ||
| 1031 | | rapidxml::parse_validate_closing_tags; | ||
| 1032 | |||
| 1033 | try { | ||
| 1034 | doc.parse<PARSE_FLAGS>(decompressed); | ||
| 1035 | info = parseXml(doc, errs, warns); | ||
| 1036 | } catch (const rapidxml::parse_error &err) { | ||
| 1037 | errs << "XML parsing failed" << '\n'; | ||
| 1038 | } | ||
| 1039 | } | ||
| 1040 | |||
| 1041 | auto end = std::chrono::steady_clock::now(); | ||
| 1042 | std::chrono::duration<double> took = end - start; | ||
| 1043 | |||
| 1044 | if (info) | ||
| 1045 | if (warns.view().empty()) | ||
| 1046 | metrics.addMeasurement(took, size, info->messages.size(), Metrics::ParseStatus::OK); | ||
| 1047 | else | ||
| 1048 | metrics.addMeasurement(took, size, info->messages.size(), Metrics::ParseStatus::WARNING); | ||
| 1049 | else | ||
| 1050 | metrics.addMeasurement(took, size, 0, Metrics::ParseStatus::ERROR); | ||
| 1051 | |||
| 1052 | return info; | ||
| 1053 | } | ||
| 1054 | |||
| 1055 | bool terminate = false; | ||
| 1056 | |||
| 1057 | void onSigIntOrTerm(int /* signum */) { | ||
| 1058 | terminate = true; | ||
| 1059 | } | ||
| 1060 | |||
| 1061 | arrow::Result<std::shared_ptr<arrow::Table>> getTable(const std::vector<Kv6Record> &messages, size_t &rows_written) { | ||
| 1062 | ParquetBuilder builder; | ||
| 1063 | |||
| 1064 | for (const auto &msg : messages) { | ||
| 1065 | Kv6Field present = msg.presence; | ||
| 1066 | Kv6Field required = KV6T_REQUIRED_FIELDS[msg.type]; | ||
| 1067 | Kv6Field optional = KV6T_OPTIONAL_FIELDS[msg.type]; | ||
| 1068 | if ((~msg.presence & required) != 0) { | ||
| 1069 | std::cout << "Invalid message: not all required fields present; skipping" << std::endl; | ||
| 1070 | continue; | ||
| 1071 | } | ||
| 1072 | Kv6Field used = static_cast<Kv6Field>(present & (required | optional)); | ||
| 1073 | rows_written++; | ||
| 1074 | |||
| 1075 | // RD-X and RD-Y fix: some datatypes have these fields marked as required, but still give option | ||
| 1076 | // of not providing these fields by setting them to -1. We want this normalized, where these | ||
| 1077 | // fields are instead simply marked as not present. | ||
| 1078 | if ((used & KV6F_RD_X) && msg.rd_x == -1) | ||
| 1079 | used = static_cast<Kv6Field>(used & ~KV6F_RD_X); | ||
| 1080 | if ((used & KV6F_RD_Y) && msg.rd_y == -1) | ||
| 1081 | used = static_cast<Kv6Field>(used & ~KV6F_RD_Y); | ||
| 1082 | |||
| 1083 | ARROW_RETURN_NOT_OK(builder.types.Append(*findKv6PosInfoRecordTypeName(msg.type))); | ||
| 1084 | ARROW_RETURN_NOT_OK(used & KV6F_DATA_OWNER_CODE | ||
| 1085 | ? builder.data_owner_codes.Append(msg.data_owner_code) | ||
| 1086 | : builder.data_owner_codes.AppendNull()); | ||
| 1087 | ARROW_RETURN_NOT_OK(used & KV6F_LINE_PLANNING_NUMBER | ||
| 1088 | ? builder.line_planning_numbers.Append(msg.line_planning_number) | ||
| 1089 | : builder.line_planning_numbers.AppendNull()); | ||
| 1090 | ARROW_RETURN_NOT_OK(used & KV6F_OPERATING_DAY | ||
| 1091 | ? builder.operating_days.Append(static_cast<int32_t>(msg.operating_day.toUnixDays().count())) | ||
| 1092 | : builder.operating_days.AppendNull()); | ||
| 1093 | ARROW_RETURN_NOT_OK(used & KV6F_JOURNEY_NUMBER | ||
| 1094 | ? builder.journey_numbers.Append(msg.journey_number) | ||
| 1095 | : builder.journey_numbers.AppendNull()); | ||
| 1096 | ARROW_RETURN_NOT_OK(used & KV6F_REINFORCEMENT_NUMBER | ||
| 1097 | ? builder.reinforcement_numbers.Append(msg.reinforcement_number) | ||
| 1098 | : builder.reinforcement_numbers.AppendNull()); | ||
| 1099 | ARROW_RETURN_NOT_OK(used & KV6F_TIMESTAMP | ||
| 1100 | ? builder.timestamps.Append(msg.timestamp.toUnixSeconds().count()) | ||
| 1101 | : builder.timestamps.AppendNull()); | ||
| 1102 | ARROW_RETURN_NOT_OK(used & KV6F_SOURCE | ||
| 1103 | ? builder.sources.Append(msg.source) | ||
| 1104 | : builder.sources.AppendNull()); | ||
| 1105 | ARROW_RETURN_NOT_OK(used & KV6F_PUNCTUALITY | ||
| 1106 | ? builder.punctualities.Append(msg.punctuality) | ||
| 1107 | : builder.punctualities.AppendNull()); | ||
| 1108 | ARROW_RETURN_NOT_OK(used & KV6F_USER_STOP_CODE | ||
| 1109 | ? builder.user_stop_codes.Append(msg.user_stop_code) | ||
| 1110 | : builder.user_stop_codes.AppendNull()); | ||
| 1111 | ARROW_RETURN_NOT_OK(used & KV6F_PASSAGE_SEQUENCE_NUMBER | ||
| 1112 | ? builder.passage_sequence_numbers.Append(msg.passage_sequence_number) | ||
| 1113 | : builder.passage_sequence_numbers.AppendNull()); | ||
| 1114 | ARROW_RETURN_NOT_OK(used & KV6F_VEHICLE_NUMBER | ||
| 1115 | ? builder.vehicle_numbers.Append(msg.vehicle_number) | ||
| 1116 | : builder.vehicle_numbers.AppendNull()); | ||
| 1117 | ARROW_RETURN_NOT_OK(used & KV6F_BLOCK_CODE | ||
| 1118 | ? builder.block_codes.Append(msg.block_code) | ||
| 1119 | : builder.block_codes.AppendNull()); | ||
| 1120 | ARROW_RETURN_NOT_OK(used & KV6F_WHEELCHAIR_ACCESSIBLE | ||
| 1121 | ? builder.wheelchair_accessibles.Append(msg.wheelchair_accessible) | ||
| 1122 | : builder.wheelchair_accessibles.AppendNull()); | ||
| 1123 | ARROW_RETURN_NOT_OK(used & KV6F_NUMBER_OF_COACHES | ||
| 1124 | ? builder.number_of_coaches.Append(msg.number_of_coaches) | ||
| 1125 | : builder.number_of_coaches.AppendNull()); | ||
| 1126 | ARROW_RETURN_NOT_OK(used & KV6F_RD_Y | ||
| 1127 | ? builder.rd_ys.Append(msg.rd_y) | ||
| 1128 | : builder.rd_ys.AppendNull()); | ||
| 1129 | ARROW_RETURN_NOT_OK(used & KV6F_RD_X | ||
| 1130 | ? builder.rd_xs.Append(msg.rd_x) | ||
| 1131 | : builder.rd_xs.AppendNull()); | ||
| 1132 | ARROW_RETURN_NOT_OK(used & KV6F_DISTANCE_SINCE_LAST_USER_STOP | ||
| 1133 | ? builder.distance_since_last_user_stops.Append(msg.distance_since_last_user_stop) | ||
| 1134 | : builder.distance_since_last_user_stops.AppendNull()); | ||
| 1135 | } | ||
| 1136 | |||
| 1137 | return builder.getTable(); | ||
| 1138 | } | ||
| 1139 | |||
| 1140 | std::tuple<int64_t, int64_t> getMinMaxTimestamp(const std::vector<Kv6Record> &messages) { | ||
| 1141 | if (messages.size() == 0) | ||
| 1142 | return { 0, 0 }; | ||
| 1143 | int64_t min = std::numeric_limits<int64_t>::max(); | ||
| 1144 | int64_t max = 0; | ||
| 1145 | for (const auto &message : messages) { | ||
| 1146 | if (~message.presence & KV6F_TIMESTAMP) | ||
| 1147 | continue; | ||
| 1148 | int64_t seconds = message.timestamp.toUnixSeconds().count(); | ||
| 1149 | if (seconds < min) | ||
| 1150 | min = seconds; | ||
| 1151 | if (seconds > max) | ||
| 1152 | max = seconds; | ||
| 1153 | } | ||
| 1154 | if (min == std::numeric_limits<decltype(min)>::max()) | ||
| 1155 | return { 0, 0 }; // this is stupid | ||
| 1156 | return { min, max }; | ||
| 1157 | } | ||
| 1158 | |||
| 1159 | arrow::Status writeParquet(const std::vector<Kv6Record> &messages, Metrics &metrics) { | ||
| 1160 | size_t rows_written = 0; | ||
| 1161 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, getTable(messages, rows_written)); | ||
| 1162 | |||
| 1163 | auto timestamp = std::chrono::round<std::chrono::seconds>(std::chrono::utc_clock::now()); | ||
| 1164 | std::string filename = std::format("oeuf-{:%FT%T%Ez}.parquet", timestamp); | ||
| 1165 | ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*table, filename)); | ||
| 1166 | std::cout << "Wrote Parquet file " << filename << std::endl; | ||
| 1167 | |||
| 1168 | auto [min_timestamp, max_timestamp] = getMinMaxTimestamp(messages); | ||
| 1169 | std::ofstream metaf(filename + ".meta.json.part", std::ios::binary); | ||
| 1170 | nlohmann::json meta{ | ||
| 1171 | { "min_timestamp", min_timestamp }, | ||
| 1172 | { "max_timestamp", max_timestamp }, | ||
| 1173 | { "rows_written", rows_written }, | ||
| 1174 | }; | ||
| 1175 | metaf << meta; | ||
| 1176 | metaf.close(); | ||
| 1177 | std::filesystem::rename(filename + ".meta.json.part", filename + ".meta.json"); | ||
| 1178 | |||
| 1179 | metrics.rowsWritten(rows_written); | ||
| 1180 | |||
| 1181 | return arrow::Status::OK(); | ||
| 1182 | } | ||
| 1183 | |||
| 1184 | using SteadyTime = std::chrono::steady_clock::time_point; | ||
| 1185 | |||
| 1186 | std::string dumpFailedMsg(std::string_view txt, std::string_view errs, std::string_view warns) { | ||
| 1187 | auto timestamp = std::chrono::round<std::chrono::seconds>(std::chrono::utc_clock::now()); | ||
| 1188 | std::string filename = std::format("oeuf-error-{:%FT%T%Ez}.txt", timestamp); | ||
| 1189 | std::ofstream dumpf(filename, std::ios::binary); | ||
| 1190 | dumpf << "======= ERROR MESSAGES ========" << std::endl; | ||
| 1191 | dumpf << errs; | ||
| 1192 | dumpf << "======= WARNING MESSAGES ======" << std::endl; | ||
| 1193 | dumpf << warns; | ||
| 1194 | dumpf << "======= RECEIVED MESSAGE ======" << std::endl; | ||
| 1195 | dumpf << txt << std::endl; | ||
| 1196 | dumpf.close(); | ||
| 1197 | return filename; | ||
| 1198 | } | ||
| 1199 | |||
| 1200 | void handleMsg(RawMessage &msg, Metrics &metrics, SteadyTime &last_output, std::vector<Kv6Record> &msg_buf) { | ||
| 1201 | unsigned int decompressed_size = 0; | ||
| 1202 | if (msg.getBodySize() > std::numeric_limits<unsigned int>::max()) | ||
| 1203 | std::cout << "parseMsg failed due to too large message" << std::endl; | ||
| 1204 | char *decompressed = decompress(msg.getBody(), static_cast<unsigned int>(msg.getBodySize()), decompressed_size); | ||
| 1205 | |||
| 1206 | std::stringstream errs; | ||
| 1207 | std::stringstream warns; | ||
| 1208 | // We know that decompressed[decompressed_size] == 0 because decompress() ensures this. | ||
| 1209 | auto parsed_msg = parseMsg(decompressed, decompressed_size, metrics, errs, warns); | ||
| 1210 | if (parsed_msg) { | ||
| 1211 | const Tmi8VvTmPushInfo &info = *parsed_msg; | ||
| 1212 | auto new_msgs_it = info.messages.begin(); | ||
| 1213 | while (new_msgs_it != info.messages.end()) { | ||
| 1214 | size_t remaining_space = MAX_PARQUET_CHUNK - msg_buf.size(); | ||
| 1215 | size_t new_msgs_left = info.messages.end() - new_msgs_it; | ||
| 1216 | auto new_msgs_start = new_msgs_it; | ||
| 1217 | auto new_msgs_end = new_msgs_start + std::min(remaining_space, new_msgs_left); | ||
| 1218 | new_msgs_it = new_msgs_end; | ||
| 1219 | msg_buf.insert(msg_buf.end(), new_msgs_start, new_msgs_end); | ||
| 1220 | |||
| 1221 | bool time_expired = std::chrono::steady_clock::now() - last_output > std::chrono::minutes(5); | ||
| 1222 | if (msg_buf.size() >= MAX_PARQUET_CHUNK || (new_msgs_it == info.messages.end() && time_expired)) { | ||
| 1223 | arrow::Status status = writeParquet(msg_buf, metrics); | ||
| 1224 | if (!status.ok()) | ||
| 1225 | std::cout << "Writing Parquet file failed: " << status << std::endl; | ||
| 1226 | msg_buf.clear(); | ||
| 1227 | last_output = std::chrono::steady_clock::now(); | ||
| 1228 | } | ||
| 1229 | } | ||
| 1230 | if (!errs.view().empty() || !warns.view().empty()) { | ||
| 1231 | std::filesystem::path dump_file = dumpFailedMsg(std::string_view(decompressed, decompressed_size), errs.str(), warns.str()); | ||
| 1232 | std::cout << "parseMsg finished with warnings: details dumped to " << dump_file << std::endl; | ||
| 1233 | } | ||
| 1234 | } else { | ||
| 1235 | std::filesystem::path dump_file = dumpFailedMsg(std::string_view(decompressed, decompressed_size), errs.str(), warns.str()); | ||
| 1236 | std::cout << "parseMsg failed: error details dumped to " << dump_file << std::endl; | ||
| 1237 | } | ||
| 1238 | free(decompressed); | ||
| 1239 | } | ||
| 1240 | |||
| 1241 | int main(int argc, char *argv[]) { | ||
| 1242 | std::cout << "Working directory: " << std::filesystem::current_path() << std::endl; | ||
| 1243 | |||
| 1244 | const char *metrics_addr = getenv("METRICS_ADDR"); | ||
| 1245 | if (!metrics_addr || strlen(metrics_addr) == 0) { | ||
| 1246 | std::cout << "Error: no METRICS_ADDR set!" << std::endl; | ||
| 1247 | exit(EXIT_FAILURE); | ||
| 1248 | } | ||
| 1249 | prometheus::Exposer exposer{metrics_addr}; | ||
| 1250 | |||
| 1251 | bool prod = false; | ||
| 1252 | const char *prod_env = getenv("NDOV_PRODUCTION"); | ||
| 1253 | if (prod_env && strcmp(prod_env, "true") == 0) prod = true; | ||
| 1254 | |||
| 1255 | void *zmq_context = zmq_ctx_new(); | ||
| 1256 | void *zmq_subscriber = zmq_socket(zmq_context, ZMQ_SUB); | ||
| 1257 | int rc = zmq_connect(zmq_subscriber, prod ? "tcp://pubsub.ndovloket.nl:7658" : "tcp://pubsub.besteffort.ndovloket.nl:7658"); | ||
| 1258 | assert(rc == 0); | ||
| 1259 | |||
| 1260 | const char *topic = "/CXX/KV6posinfo"; | ||
| 1261 | rc = zmq_setsockopt(zmq_subscriber, ZMQ_SUBSCRIBE, topic, strlen(topic)); | ||
| 1262 | assert(rc == 0); | ||
| 1263 | |||
| 1264 | signal(SIGINT, onSigIntOrTerm); | ||
| 1265 | signal(SIGTERM, onSigIntOrTerm); | ||
| 1266 | |||
| 1267 | SteadyTime last_output = std::chrono::steady_clock::now(); | ||
| 1268 | |||
| 1269 | auto registry = std::make_shared<prometheus::Registry>(); | ||
| 1270 | Metrics metrics(registry); | ||
| 1271 | exposer.RegisterCollectable(registry); | ||
| 1272 | |||
| 1273 | std::vector<Kv6Record> msg_buf; | ||
| 1274 | while (!terminate) { | ||
| 1275 | std::optional<RawMessage> msg = recvMsg(zmq_subscriber); | ||
| 1276 | if (!msg) { | ||
| 1277 | if (!terminate) | ||
| 1278 | perror("recvMsg"); | ||
| 1279 | continue; | ||
| 1280 | } | ||
| 1281 | handleMsg(*msg, metrics, last_output, msg_buf); | ||
| 1282 | } | ||
| 1283 | |||
| 1284 | std::cout << "Terminating" << std::endl; | ||
| 1285 | if (msg_buf.size() > 0) { | ||
| 1286 | arrow::Status status = writeParquet(msg_buf, metrics); | ||
| 1287 | if (!status.ok()) std::cout << "Writing final Parquet file failed: " << status << std::endl; | ||
| 1288 | else std::cout << "Final data written" << std::endl; | ||
| 1289 | msg_buf.clear(); | ||
| 1290 | } | ||
| 1291 | |||
| 1292 | if (zmq_close(zmq_subscriber)) | ||
| 1293 | perror("zmq_close"); | ||
| 1294 | if (zmq_ctx_destroy(zmq_context)) | ||
| 1295 | perror("zmq_ctx_destroy"); | ||
| 1296 | |||
| 1297 | std::cout << "Bye" << std::endl; | ||
| 1298 | |||
| 1299 | return 0; | ||
| 1300 | } | ||