From 17a3ea880402338420699e03bcb24181e4ff3924 Mon Sep 17 00:00:00 2001 From: Rutger Broekhoff Date: Thu, 2 May 2024 20:27:40 +0200 Subject: Initial commit Based on dc4ba6a --- src/recvkv6/.envrc | 2 + src/recvkv6/Makefile | 21 + src/recvkv6/main.cpp | 1300 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 1323 insertions(+) create mode 100644 src/recvkv6/.envrc create mode 100644 src/recvkv6/Makefile create mode 100644 src/recvkv6/main.cpp (limited to 'src/recvkv6') diff --git a/src/recvkv6/.envrc b/src/recvkv6/.envrc new file mode 100644 index 0000000..694e74f --- /dev/null +++ b/src/recvkv6/.envrc @@ -0,0 +1,2 @@ +source_env ../../ +export DEVMODE=1 diff --git a/src/recvkv6/Makefile b/src/recvkv6/Makefile new file mode 100644 index 0000000..12ff7fb --- /dev/null +++ b/src/recvkv6/Makefile @@ -0,0 +1,21 @@ +# Taken from: +# Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide +# for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01, +# 2023. [Online]. Available: +# https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html +CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\ + -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \ + -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \ + -D_GLIBCXX_ASSERTIONS \ + -fstrict-flex-arrays=3 \ + -fstack-clash-protection -fstack-protector-strong +LDFLAGS=-lzmq -larrow -lparquet -lprometheus-cpp-pull -lprometheus-cpp-core -lz -ltmi8 -Wl,-z,defs \ + -Wl,-z,nodlopen -Wl,-z,noexecstack \ + -Wl,-z,relro -Wl,-z,now + +recvkv6: main.cpp + $(CXX) -o $@ $^ $(CXXFLAGS) $(LDFLAGS) + +.PHONY: clean +clean: + rm recvkv6 diff --git a/src/recvkv6/main.cpp b/src/recvkv6/main.cpp new file mode 100644 index 0000000..2ac3669 --- /dev/null +++ b/src/recvkv6/main.cpp @@ -0,0 +1,1300 @@ +// vim:set sw=2 ts=2 sts et: + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include +#include +#include +#include + +#include + +#include + +#define CHUNK 16384 + +struct RawMessage { + public: + // Takes ownership of envelope and body + RawMessage(zmq_msg_t envelope, zmq_msg_t body) + : envelope(envelope), body(body) + {} + + // Prevent copying + RawMessage(const RawMessage &) = delete; + RawMessage &operator=(RawMessage const &) = delete; + + std::string_view getEnvelope() { + return static_cast(zmq_msg_data(&envelope)); + } + + char *getBody() { + return static_cast(zmq_msg_data(&body)); + } + + size_t getBodySize() { + return zmq_msg_size(&body); + } + + ~RawMessage() { + zmq_msg_close(&envelope); + zmq_msg_close(&body); + } + + private: + zmq_msg_t envelope; + zmq_msg_t body; +}; + +std::optional recvMsg(void *socket) { + while (true) { + zmq_msg_t envelope, body; + int rc = zmq_msg_init(&envelope); + assert(rc == 0); + rc = zmq_msg_init(&body); + assert(rc == 0); + + rc = zmq_msg_recv(&envelope, socket, 0); + if (rc == -1) return std::nullopt; + + int more; + size_t more_size = sizeof(more); + rc = zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size); + if (!more) { + zmq_msg_close(&envelope); + zmq_msg_close(&body); + continue; + } + + rc = zmq_msg_recv(&body, socket, 0); + if (rc == -1) return std::nullopt; + + rc = zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size); + assert(!more); + + return std::make_optional(envelope, body); + } +} + +// Ensures that [output_size] == 0 +char *decompress(char *raw, unsigned int input_size, unsigned int &output_size) { + assert(input_size <= UINT32_MAX); + + z_stream strm; + strm.next_in = reinterpret_cast(raw); + strm.avail_in = input_size; + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + int rc = inflateInit2(&strm, 32); + assert(rc == Z_OK); + + unsigned int buf_cap = CHUNK; + unsigned int buf_len = 0; + char *buf = static_cast(malloc(CHUNK)); + do { + if (buf_len + CHUNK > buf_cap) { + assert(buf_cap <= UINT32_MAX); + buf_cap *= 2; + buf = static_cast(realloc(buf, buf_cap)); + } + strm.avail_out = buf_cap - buf_len; + strm.next_out = reinterpret_cast(buf + buf_len); + + unsigned long old_total = strm.total_out; + rc = inflate(&strm, Z_FINISH); + unsigned progress = static_cast(strm.total_out - old_total); + buf_len += progress; + assert(progress != 0 || rc == Z_STREAM_END); + } while (strm.total_in < input_size); + + if (buf_len == buf_cap) { + buf = static_cast(realloc(buf, buf_len + 1)); + } + buf[buf_len] = 0; + output_size = buf_len; + + rc = inflateEnd(&strm); + assert(rc == Z_OK); + + return buf; +} + +struct Date { + int16_t year = 0; + uint8_t month = 0; + uint8_t day = 0; + + static bool parse(Date &dest, std::string_view src) { + dest.year = 0, dest.month = 0, dest.day = 0; + + int16_t y_mul_fac = 1; + bool extended = false; + + size_t plus = src.find('+'); + if (plus != std::string_view::npos) { + extended = true; + src = src.substr(1); // remove plus sign from the start + } + if (!extended) { + size_t min_or_dash = src.find('-'); + if (min_or_dash == std::string_view::npos) return false; + if (min_or_dash == 0) { + y_mul_fac = -1; // it's a minus sign + src = src.substr(1); // remove minus sign at the start + } + } + + int y_chars = 0; + while (src.size() > 0 && src[0] >= '0' && src[0] <= '9') { + dest.year = static_cast(dest.year * 10 + src[0] - '0'); + src = src.substr(1); + y_chars++; + } + if (src.size() == 0) { dest.year = 0; return false; } + if (src[0] != '-') { dest.year = 0; return false; } + src = src.substr(1); // remove dash + if (y_chars < 4 || (y_chars > 4 && !extended)) { dest.year = 0; return false; } + dest.year *= y_mul_fac; + + bool rest_correct = src.size() == 5 + && src[0] >= '0' && src[0] <= '9' + && src[1] >= '0' && src[1] <= '9' + && src[3] >= '0' && src[3] <= '9' + && src[4] >= '0' && src[4] <= '9'; + if (!rest_correct) { dest.year = 0; return false; } + dest.month = static_cast((src[0] - '0') * 10 + src[1] - '0'); + dest.day = static_cast((src[3] - '0') * 10 + src[4] - '0'); + if (dest.month > 12 || dest.day > 31) { + dest.year = 0, dest.month = 0, dest.day = 0; + return false; + } + return true; + } + + std::string toString() const { + if (year < 0 || year > 9999 || month < 0 || month > 12 || day < 0 || day > 31) + throw std::invalid_argument("one or more date components (year, month, day) out of range"); + char data[11] = "XXXX-XX-XX"; + sprintf(data, "%04u-%02u-%02u", year, month, day); + return data; + } + + std::chrono::days toUnixDays() const { + std::chrono::year_month_day ymd{std::chrono::year(year), std::chrono::month(month), std::chrono::day(day)}; + // This is valid since C++20: as of C++20, the system clock is defined to measure the + // Unix Time, the amount of seconds since Thursday 1 January 1970, without leap seconds. + std::chrono::days since_epoch = std::chrono::sys_days(ymd).time_since_epoch(); + return since_epoch; + } +}; + +struct Time { + uint8_t hour = 0; + uint8_t minute = 0; + uint8_t second = 0; + + static bool parse(Time &dest, std::string_view src) { + bool okay = src.size() == 8 + && src[0] >= '0' && src[0] <= '9' + && src[1] >= '0' && src[1] <= '9' + && src[2] == ':' + && src[3] >= '0' && src[3] <= '9' + && src[4] >= '0' && src[4] <= '9' + && src[5] == ':' + && src[6] >= '0' && src[6] <= '9' + && src[7] >= '0' && src[7] <= '9'; + if (!okay) return false; + dest.hour = static_cast((src[0] - '0') * 10 + src[1] - '0'); + dest.minute = static_cast((src[3] - '0') * 10 + src[4] - '0'); + dest.second = static_cast((src[6] - '0') * 10 + src[7] - '0'); + if (dest.hour > 23 || dest.minute > 59 || dest.second > 59) { + dest.hour = 0, dest.minute = 0, dest.second = 0; + return false; + } + return true; + } + + std::string toString() const { + if (hour < 0 || hour > 23 || minute < 0 || minute > 59 || second < 0 || second > 59) + throw std::invalid_argument("one or more time components (hour, minute, second) out of range"); + char data[9] = "XX:XX:XX"; + sprintf(data, "%02u:%02u:%02u", hour, minute, second); + return data; + } +}; + +// Time zone designator +struct Tzd { + int16_t minutes = 0; + + static bool parse(Tzd &dest, std::string_view src) { + dest.minutes = 0; + + if (src.size() == 0) return false; + if (src == "Z") return true; + + int16_t multiplier = 1; + if (src[0] == '-') multiplier = -1; + else if (src[0] != '+') return false; + src = src.substr(1); + + bool okay = src.size() == 5 + && src[0] >= '0' && src[0] <= '9' + && src[1] >= '0' && src[1] <= '9' + && src[2] == ':' + && src[3] >= '0' && src[3] <= '9' + && src[4] >= '0' && src[4] <= '9'; + if (!okay) return false; + int16_t hours = static_cast((src[0] - '0') * 10 + src[1] - '0'); + int16_t minutes = static_cast((src[3] - '0') * 10 + src[4] - '0'); + if (hours > 23 || minutes > 59) return false; + dest.minutes = static_cast(multiplier * (60 * hours + minutes)); + return true; + } + + std::string toString() const { + if (minutes == 0) + return "Z"; + + bool negative = minutes < 0; + int hours_off = abs(minutes / 60); + int mins_off = abs(minutes) - hours_off*60; + if (hours_off > 23 || mins_off > 59) + throw std::invalid_argument("offset out of range"); + char data[7] = "+XX:XX"; + sprintf(data, "%c%02u:%02u", negative ? '-' : '+', hours_off, mins_off); + return data; + } +}; + +struct Timestamp { + Date date; + Tzd off; + Time time; + + static bool parse(Timestamp &dest, std::string_view src) { + size_t t = src.find('T'); + if (t == std::string_view::npos || t + 1 >= src.size()) return false; + + std::string_view date = src.substr(0, t); + std::string_view time_and_tzd = src.substr(t + 1); + if (time_and_tzd.size() < 9) return false; + if (!Date::parse(dest.date, date)) return false; + + std::string_view time = time_and_tzd.substr(0, 8); + std::string_view tzd = time_and_tzd.substr(8); + if (!Time::parse(dest.time, time)) return false; + return Tzd::parse(dest.off, tzd); + } + + std::string toString() const { + return date.toString() + "T" + time.toString() + off.toString(); + } + + std::chrono::seconds toUnixSeconds() const { + std::chrono::year_month_day ymd(std::chrono::year(date.year), + std::chrono::month(date.month), + std::chrono::day(date.day)); + std::chrono::sys_days sys_days(ymd); + std::chrono::time_point utc_days(sys_days.time_since_epoch()); + std::chrono::utc_seconds utc_seconds = std::chrono::time_point_cast(utc_days); + utc_seconds += std::chrono::hours(time.hour) + std::chrono::minutes(time.minute) + + std::chrono::seconds(time.second) - std::chrono::minutes(off.minutes); + std::chrono::sys_seconds sys_seconds = std::chrono::utc_clock::to_sys(utc_seconds); + std::chrono::seconds unix = sys_seconds.time_since_epoch(); + return unix; + } +}; + +static const std::string_view TMI8_XML_NS = "http://bison.connekt.nl/tmi8/kv6/msg"; + +enum Kv6RecordType { + KV6T_UNKNOWN = 0, + KV6T_DELAY = 1, + KV6T_INIT = 2, + KV6T_ARRIVAL = 3, + KV6T_ON_STOP = 4, + KV6T_DEPARTURE = 5, + KV6T_ON_ROUTE = 6, + KV6T_ON_PATH = 7, + KV6T_OFF_ROUTE = 8, + KV6T_END = 9, + // Always keep this updated to correspond to the + // first and last elements of the enumeration! + _KV6T_FIRST_TYPE = KV6T_UNKNOWN, + _KV6T_LAST_TYPE = KV6T_END, +}; + +enum Kv6Field { + KV6F_NONE = 0, + KV6F_DATA_OWNER_CODE = 1, + KV6F_LINE_PLANNING_NUMBER = 2, + KV6F_OPERATING_DAY = 4, + KV6F_JOURNEY_NUMBER = 8, + KV6F_REINFORCEMENT_NUMBER = 16, + KV6F_TIMESTAMP = 32, + KV6F_SOURCE = 64, + KV6F_PUNCTUALITY = 128, + KV6F_USER_STOP_CODE = 256, + KV6F_PASSAGE_SEQUENCE_NUMBER = 512, + KV6F_VEHICLE_NUMBER = 1024, + KV6F_BLOCK_CODE = 2048, + KV6F_WHEELCHAIR_ACCESSIBLE = 4096, + KV6F_NUMBER_OF_COACHES = 8192, + KV6F_RD_Y = 16384, + KV6F_RD_X = 32768, + KV6F_DISTANCE_SINCE_LAST_USER_STOP = 65536, +}; + +static constexpr Kv6Field KV6T_REQUIRED_FIELDS[_KV6T_LAST_TYPE + 1] = { + // KV6T_UNKNOWN + KV6F_NONE, + // KV6T_DELAY + static_cast( + KV6F_DATA_OWNER_CODE + | KV6F_LINE_PLANNING_NUMBER + | KV6F_OPERATING_DAY + | KV6F_JOURNEY_NUMBER + | KV6F_REINFORCEMENT_NUMBER + | KV6F_TIMESTAMP + | KV6F_SOURCE + | KV6F_PUNCTUALITY), + // KV6T_INIT + static_cast( + KV6F_DATA_OWNER_CODE + | KV6F_LINE_PLANNING_NUMBER + | KV6F_OPERATING_DAY + | KV6F_JOURNEY_NUMBER + | KV6F_REINFORCEMENT_NUMBER + | KV6F_TIMESTAMP + | KV6F_SOURCE + | KV6F_USER_STOP_CODE + | KV6F_PASSAGE_SEQUENCE_NUMBER + | KV6F_VEHICLE_NUMBER + | KV6F_BLOCK_CODE + | KV6F_WHEELCHAIR_ACCESSIBLE + | KV6F_NUMBER_OF_COACHES), + // KV6T_ARRIVAL + static_cast( + KV6F_DATA_OWNER_CODE + | KV6F_LINE_PLANNING_NUMBER + | KV6F_OPERATING_DAY + | KV6F_JOURNEY_NUMBER + | KV6F_REINFORCEMENT_NUMBER + | KV6F_USER_STOP_CODE + | KV6F_PASSAGE_SEQUENCE_NUMBER + | KV6F_TIMESTAMP + | KV6F_SOURCE + | KV6F_VEHICLE_NUMBER + | KV6F_PUNCTUALITY), + // KV6T_ON_STOP + static_cast( + KV6F_DATA_OWNER_CODE + | KV6F_LINE_PLANNING_NUMBER + | KV6F_OPERATING_DAY + | KV6F_JOURNEY_NUMBER + | KV6F_REINFORCEMENT_NUMBER + | KV6F_USER_STOP_CODE + | KV6F_PASSAGE_SEQUENCE_NUMBER + | KV6F_TIMESTAMP + | KV6F_SOURCE + | KV6F_VEHICLE_NUMBER + | KV6F_PUNCTUALITY), + // KV6T_DEPARTURE + static_cast( + KV6F_DATA_OWNER_CODE + | KV6F_LINE_PLANNING_NUMBER + | KV6F_OPERATING_DAY + | KV6F_JOURNEY_NUMBER + | KV6F_REINFORCEMENT_NUMBER + | KV6F_USER_STOP_CODE + | KV6F_PASSAGE_SEQUENCE_NUMBER + | KV6F_TIMESTAMP + | KV6F_SOURCE + | KV6F_VEHICLE_NUMBER + | KV6F_PUNCTUALITY), + // KV6T_ON_ROUTE + static_cast( + KV6F_DATA_OWNER_CODE + | KV6F_LINE_PLANNING_NUMBER + | KV6F_OPERATING_DAY + | KV6F_JOURNEY_NUMBER + | KV6F_REINFORCEMENT_NUMBER + | KV6F_USER_STOP_CODE + | KV6F_PASSAGE_SEQUENCE_NUMBER + | KV6F_TIMESTAMP + | KV6F_SOURCE + | KV6F_VEHICLE_NUMBER + | KV6F_PUNCTUALITY + | KV6F_RD_X + | KV6F_RD_Y), + // KV6T_ON_PATH + KV6F_NONE, + // KV6T_OFF_ROUTE + static_cast( + KV6F_DATA_OWNER_CODE + | KV6F_LINE_PLANNING_NUMBER + | KV6F_OPERATING_DAY + | KV6F_JOURNEY_NUMBER + | KV6F_REINFORCEMENT_NUMBER + | KV6F_TIMESTAMP + | KV6F_SOURCE + | KV6F_USER_STOP_CODE + | KV6F_PASSAGE_SEQUENCE_NUMBER + | KV6F_VEHICLE_NUMBER + | KV6F_RD_X + | KV6F_RD_Y), + // KV6T_END + static_cast( + KV6F_DATA_OWNER_CODE + | KV6F_LINE_PLANNING_NUMBER + | KV6F_OPERATING_DAY + | KV6F_JOURNEY_NUMBER + | KV6F_REINFORCEMENT_NUMBER + | KV6F_TIMESTAMP + | KV6F_SOURCE + | KV6F_USER_STOP_CODE + | KV6F_PASSAGE_SEQUENCE_NUMBER + | KV6F_VEHICLE_NUMBER), +}; + +static constexpr Kv6Field KV6T_OPTIONAL_FIELDS[_KV6T_LAST_TYPE + 1] = { + // KV6T_UNKNOWN + KV6F_NONE, + // KV6T_DELAY + KV6F_NONE, + // KV6T_INIT + KV6F_NONE, + // KV6T_ARRIVAL + static_cast(KV6F_RD_X | KV6F_RD_Y), + // KV6T_ON_STOP + static_cast(KV6F_RD_X | KV6F_RD_Y), + // KV6T_DEPARTURE + static_cast(KV6F_RD_X | KV6F_RD_Y), + // KV6T_ON_ROUTE + KV6F_DISTANCE_SINCE_LAST_USER_STOP, + // KV6T_ON_PATH + KV6F_NONE, + // KV6T_OFF_ROUTE + KV6F_NONE, + // KV6T_END + KV6F_NONE, +}; + +struct Kv6Record { + Kv6RecordType type = KV6T_UNKNOWN; + Kv6Field presence = KV6F_NONE; + Kv6Field next = KV6F_NONE; + std::string data_owner_code; + std::string line_planning_number; + std::string source; + std::string user_stop_code; + std::string wheelchair_accessible; + Date operating_day; + Timestamp timestamp; + uint32_t block_code = 0; + uint32_t journey_number = 0; + uint32_t vehicle_number = 0; + int32_t rd_x = 0; + int32_t rd_y = 0; + // The TMI8 specification is unclear: this field + // might actually be called distancesincelaststop + uint32_t distance_since_last_user_stop = 0; + uint16_t passage_sequence_number = 0; + int16_t punctuality = 0; + uint8_t number_of_coaches = 0; + uint8_t reinforcement_number = 0; + + void markPresent(Kv6Field field) { + presence = static_cast(presence | field); + } + + void removeUnsupportedFields() { + Kv6Field required_fields = KV6T_REQUIRED_FIELDS[type]; + Kv6Field optional_fields = KV6T_OPTIONAL_FIELDS[type]; + Kv6Field supported_fields = static_cast(required_fields | optional_fields); + presence = static_cast(presence & supported_fields); + } + + bool valid() { + Kv6Field required_fields = KV6T_REQUIRED_FIELDS[type]; + Kv6Field optional_fields = KV6T_OPTIONAL_FIELDS[type]; + Kv6Field supported_fields = static_cast(required_fields | optional_fields); + + Kv6Field required_field_presence = static_cast(presence & required_fields); + Kv6Field unsupported_field_presence = static_cast(presence & ~supported_fields); + + return required_field_presence == required_fields && !unsupported_field_presence; + } +}; + +enum Tmi8VvTmPushInfoField { + TMI8F_NONE = 0, + TMI8F_SUBSCRIBER_ID = 1, + TMI8F_VERSION = 2, + TMI8F_DOSSIER_NAME = 4, + TMI8F_TIMESTAMP = 8, +}; + +struct Tmi8VvTmPushInfo { + Tmi8VvTmPushInfoField next = TMI8F_NONE; + Tmi8VvTmPushInfoField presence = TMI8F_NONE; + std::string subscriber_id; + std::string version; + std::string dossier_name; + Timestamp timestamp; + std::vector messages; + + void markPresent(Tmi8VvTmPushInfoField field) { + presence = static_cast(presence | field); + } + + bool valid() { + const Tmi8VvTmPushInfoField REQUIRED_FIELDS = + static_cast( + TMI8F_SUBSCRIBER_ID + | TMI8F_VERSION + | TMI8F_DOSSIER_NAME + | TMI8F_TIMESTAMP); + return (presence & REQUIRED_FIELDS) == REQUIRED_FIELDS; + } +}; + +static const std::array KV6_POS_INFO_RECORD_TYPES = { + "UNKNOWN", "DELAY", "INIT", "ARRIVAL", "ONSTOP", "DEPARTURE", "ONROUTE", "ONPATH", "OFFROUTE", "END", +}; + +std::optional findKv6PosInfoRecordTypeName(Kv6RecordType type) { + if (type > _KV6T_LAST_TYPE) + return std::nullopt; + return KV6_POS_INFO_RECORD_TYPES[type]; +} + +const std::array, 17> KV6_POS_INFO_RECORD_FIELDS = {{ + { "dataownercode", KV6F_DATA_OWNER_CODE }, + { "lineplanningnumber", KV6F_LINE_PLANNING_NUMBER }, + { "operatingday", KV6F_OPERATING_DAY }, + { "journeynumber", KV6F_JOURNEY_NUMBER }, + { "reinforcementnumber", KV6F_REINFORCEMENT_NUMBER }, + { "timestamp", KV6F_TIMESTAMP }, + { "source", KV6F_SOURCE }, + { "punctuality", KV6F_PUNCTUALITY }, + { "userstopcode", KV6F_USER_STOP_CODE }, + { "passagesequencenumber", KV6F_PASSAGE_SEQUENCE_NUMBER }, + { "vehiclenumber", KV6F_VEHICLE_NUMBER }, + { "blockcode", KV6F_BLOCK_CODE }, + { "wheelchairaccessible", KV6F_WHEELCHAIR_ACCESSIBLE }, + { "numberofcoaches", KV6F_NUMBER_OF_COACHES }, + { "rd-y", KV6F_RD_Y }, + { "rd-x", KV6F_RD_X }, + { "distancesincelastuserstop", KV6F_DISTANCE_SINCE_LAST_USER_STOP }, +}}; + +// Returns the maximum amount of digits such that it is guaranteed that +// a corresponding amount of repeated 9's can be represented by the type. +template +constexpr size_t maxDigits() { + size_t digits = 0; + for (T x = std::numeric_limits::max(); x != 0; x /= 10) digits++; + return digits - 1; +} + +template +constexpr bool parseUnsigned(T &out, std::string_view src) { + static_assert(MaxDigits <= maxDigits()); + if (src.size() > MaxDigits) return false; + T res = 0; + while (src.size() > 0) { + if (src[0] < '0' || src[0] > '9') return false; + res = static_cast(res * 10 + src[0] - '0'); + src = src.substr(1); + } + out = res; + return true; +} + +template +constexpr bool parseSigned(T &out, std::string_view src) { + static_assert(MaxDigits <= maxDigits()); + if (src.size() == 0) return false; + bool negative = src[0] == '-'; + if (negative) src = src.substr(1); + if (src.size() > MaxDigits) return false; + T res = 0; + while (src.size() > 0) { + if (src[0] < '0' || src[0] > '9') return false; + res = static_cast(res * 10 + src[0] - '0'); + src = src.substr(1); + } + out = negative ? -res : res; + return true; +} + +struct Xmlns { + const Xmlns *next; + std::string_view prefix; + std::string_view url; +}; + +std::optional resolve(std::string_view prefix, const Xmlns *nss) { + while (nss) + if (nss->prefix == prefix) + return nss->url; + else + nss = nss->next; + return std::nullopt; +} + +template +void withXmlnss(const rapidxml::xml_attribute<> *attr, const Xmlns *nss, const T &fn) { + while (attr) { + std::string_view name(attr->name(), attr->name_size()); + if (name.starts_with("xmlns")) { + if (name.size() == 5) { // just xmlns + Xmlns ns0 = { + .next = nss, + .url = std::string_view(attr->value(), attr->value_size()), + }; + withXmlnss(attr->next_attribute(), &ns0, fn); + return; + } else if (name.size() > 6 && name[5] == ':') { // xmlns: + Xmlns ns0 = { + .next = nss, + .prefix = name.substr(6), + .url = std::string_view(attr->value(), attr->value_size()), + }; + withXmlnss(attr->next_attribute(), &ns0, fn); + return; + } + } + attr = attr->next_attribute(); + } + fn(nss); +} + +template +void ifResolvable(const rapidxml::xml_node<> &node, const Xmlns *nss, const T &fn) { + std::string_view name(node.name(), node.name_size()); + std::string_view ns; + size_t colon = name.find(':'); + + if (colon != std::string_view::npos) { + if (colon >= name.size() - 1) // last character + return; + ns = name.substr(0, colon); + name = name.substr(colon + 1); + } + + withXmlnss(node.first_attribute(), nss, [&](const Xmlns *nss) { + std::optional ns_url = resolve(ns, nss); + if (!ns_url && !ns.empty()) return; + if (!ns_url) fn(std::string_view(), name, nss); + else fn(*ns_url, name, nss); + }); +} + +template +void ifTmi8Element(const rapidxml::xml_node<> &node, const Xmlns *nss, const T &fn) { + ifResolvable(node, nss, [&](std::string_view ns_url, std::string_view name, const Xmlns *nss) { + if (node.type() == rapidxml::node_element && (ns_url.empty() || ns_url == TMI8_XML_NS)) fn(name, nss); + }); +} + +bool onlyTextElement(const rapidxml::xml_node<> &node) { + return node.type() == rapidxml::node_element + && node.first_node() + && node.first_node() == node.last_node() + && node.first_node()->type() == rapidxml::node_data; +} + +std::string_view getValue(const rapidxml::xml_node<> &node) { + return std::string_view(node.value(), node.value_size()); +} + +bool parseStringValue(std::string &into, size_t max_len, std::string_view val) { + if (val.size() > max_len) + return false; + into = val; + return true; +} + +struct Kv6Parser { + std::stringstream &errs; + std::stringstream &warns; + + void error(std::string_view msg) { + errs << msg << '\n'; + } + + void warn(std::string_view msg) { + warns << msg << '\n'; + } + +#define PERRASSERT(msg, ...) do { if (!(__VA_ARGS__)) { error(msg); return; } } while (false) +#define PWARNASSERT(msg, ...) do { if (!(__VA_ARGS__)) { warn(msg); return; } } while (false) + + std::optional parseKv6PosInfoRecord(Kv6RecordType type, const rapidxml::xml_node<> &node, const Xmlns *nss) { + Kv6Record fields = { .type = type }; + for (const rapidxml::xml_node<> *child = node.first_node(); child; child = child->next_sibling()) { + ifTmi8Element(*child, nss, [&](std::string_view name, const Xmlns *nss) { + for (const auto &[fname, field] : KV6_POS_INFO_RECORD_FIELDS) { + if (field == KV6F_NONE) + continue; + if (fname == name) { + PWARNASSERT("Expected KV6 record field element to only contain data", + onlyTextElement(*child)); + std::string_view childval = getValue(*child); + switch (field) { + case KV6F_DATA_OWNER_CODE: + PWARNASSERT("Invalid value for dataownercode", + parseStringValue(fields.data_owner_code, 10, childval)); + break; + case KV6F_LINE_PLANNING_NUMBER: + PWARNASSERT("Invalid value for lineplanningnumber", + parseStringValue(fields.line_planning_number, 10, childval)); + break; + case KV6F_OPERATING_DAY: + PWARNASSERT("Invalid value for operatatingday: not a valid date", + Date::parse(fields.operating_day, childval)); + break; + case KV6F_JOURNEY_NUMBER: + PWARNASSERT("Invalid value for journeynumber:" + " not a valid unsigned number with at most six digits", + parseUnsigned<6>(fields.journey_number, childval)); + break; + case KV6F_REINFORCEMENT_NUMBER: + PWARNASSERT("Invalid value for reinforcementnumber:" + " not a valid unsigned number with at most two digits", + parseUnsigned<2>(fields.reinforcement_number, childval)); + break; + case KV6F_TIMESTAMP: + PWARNASSERT("Invalid value for timestamp: not a valid timestamp", + Timestamp::parse(fields.timestamp, childval)); + break; + case KV6F_SOURCE: + PWARNASSERT("Invalid value for source:" + " not a valid string of at most 10 bytes", + parseStringValue(fields.source, 10, childval)); + break; + case KV6F_PUNCTUALITY: + PWARNASSERT("Invalid value for punctuality:" + " not a valid signed number with at most four digits", + parseSigned<4>(fields.punctuality, childval)); + break; + case KV6F_USER_STOP_CODE: + PWARNASSERT("Invalid value for userstopcode:" + " not a valid string of at most 10 bytes", + parseStringValue(fields.user_stop_code, 10, childval)); + break; + case KV6F_PASSAGE_SEQUENCE_NUMBER: + PWARNASSERT("Invalid value for passagesequencenumber:" + " not a valid unsigned number with at most four digits", + parseUnsigned<4>(fields.passage_sequence_number, childval)); + break; + case KV6F_VEHICLE_NUMBER: + PWARNASSERT("Invalid value for vehiclenumber:" + " not a valid unsigned number with at most six digits", + parseUnsigned<6>(fields.vehicle_number, childval)); + break; + case KV6F_BLOCK_CODE: + PWARNASSERT("Invalid value for blockcode:" + " not a valid unsigned number with at most eight digits", + parseUnsigned<8>(fields.block_code, childval)); + break; + case KV6F_WHEELCHAIR_ACCESSIBLE: + PWARNASSERT("Invalid value for wheelchairaccessible:" + " not a valid value for wheelchair accessibility", + childval == "ACCESSIBLE" + || childval == "NOTACCESSIBLE" + || childval == "UNKNOWN"); + fields.wheelchair_accessible = childval; + break; + case KV6F_NUMBER_OF_COACHES: + PWARNASSERT("Invalid for numberofcoaches:" + " not a valid unsigned number with at most two digits", + parseUnsigned<2>(fields.number_of_coaches, childval)); + break; + case KV6F_RD_X: + PWARNASSERT("Invalid value for rd-x:" + " not a valid signed number with at most six digits", + parseSigned<6>(fields.rd_x, childval)); + break; + case KV6F_RD_Y: + PWARNASSERT("Invalid value for rd-y:" + " not a valid signed number with at most six digits", + parseSigned<6>(fields.rd_y, childval)); + break; + case KV6F_DISTANCE_SINCE_LAST_USER_STOP: + PWARNASSERT("Invalid value for distancesincelastuserstop:" + " not a valid unsigned number with at most five digits", + parseUnsigned<5>(fields.distance_since_last_user_stop, childval)); + break; + case KV6F_NONE: + error("NONE field type case should be unreachable in parseKv6PosInfoRecord"); + return; + } + fields.markPresent(field); + break; + } + } + }); + } + + fields.removeUnsupportedFields(); + + if (!fields.valid()) + return std::nullopt; + return fields; + } + + std::vector parseKv6PosInfo(const rapidxml::xml_node<> &node, const Xmlns *nss) { + std::vector records; + for (const rapidxml::xml_node<> *child = node.first_node(); child; child = child->next_sibling()) { + ifTmi8Element(*child, nss, [&](std::string_view name, const Xmlns *nss) { + for (auto type = _KV6T_FIRST_TYPE; + type != _KV6T_LAST_TYPE; + type = static_cast(type + 1)) { + if (type == KV6T_UNKNOWN) + continue; + if (KV6_POS_INFO_RECORD_TYPES[type] == name) { + auto record = parseKv6PosInfoRecord(type, *child, nss); + if (record) { + records.push_back(*record); + } + } + } + }); + } + return records; + } + + std::optional parseVvTmPush(const rapidxml::xml_node<> &node, const Xmlns *nss) { + Tmi8VvTmPushInfo info; + for (const rapidxml::xml_node<> *child = node.first_node(); child; child = child->next_sibling()) { + ifTmi8Element(*child, nss, [&](std::string_view name, const Xmlns *nss) { + if (name == "Timestamp") { + PERRASSERT("Invalid value for Timestamp: Bad format", onlyTextElement(*child)); + PERRASSERT("Invalid value for Timestamp: Invalid timestamp", Timestamp::parse(info.timestamp, getValue(*child))); + info.markPresent(TMI8F_TIMESTAMP); + } else if (name == "SubscriberID") { + PERRASSERT("Invalid value for SubscriberID: Bad format", onlyTextElement(*child)); + info.subscriber_id = getValue(*child); + info.markPresent(TMI8F_SUBSCRIBER_ID); + } else if (name == "Version") { + PERRASSERT("Invalid value for Version: Bad format", onlyTextElement(*child)); + info.version = getValue(*child); + info.markPresent(TMI8F_VERSION); + } else if (name == "DossierName") { + PERRASSERT("Invalid value for DossierName: Bad format", onlyTextElement(*child)); + info.dossier_name = getValue(*child); + info.markPresent(TMI8F_DOSSIER_NAME); + } else if (name == "KV6posinfo") { + info.messages = parseKv6PosInfo(*child, nss); + } + }); + } + + if (!info.valid()) + return std::nullopt; + return info; + } + + std::optional parse(const rapidxml::xml_document<> &doc) { + std::optional msg; + for (const rapidxml::xml_node<> *node = doc.first_node(); node; node = node->next_sibling()) { + ifTmi8Element(*node, nullptr /* nss */, [&](std::string_view name, const Xmlns *nss) { + if (name == "VV_TM_PUSH") { + if (msg) { + error("Duplicated VV_TM_PUSH"); + return; + } + msg = parseVvTmPush(*node, nss); + if (!msg) { + error("Invalid VV_TM_PUSH"); + } + } + }); + } + if (!msg) + error("Expected to find VV_TM_PUSH"); + return msg; + } +}; + +std::optional parseXml(const rapidxml::xml_document<> &doc, std::stringstream &errs, std::stringstream &warns) { + Kv6Parser parser = { errs, warns }; + return parser.parse(doc); +} + +struct Metrics { + prometheus::Counter &messages_counter_ok; + prometheus::Counter &messages_counter_error; + prometheus::Counter &messages_counter_warning; + prometheus::Counter &rows_written_counter; + prometheus::Histogram &records_hist; + prometheus::Histogram &message_parse_hist; + prometheus::Histogram &payload_size_hist; + + using BucketBoundaries = prometheus::Histogram::BucketBoundaries; + + enum class ParseStatus { + OK, + WARNING, + ERROR, + }; + + Metrics(std::shared_ptr registry) : + Metrics(registry, prometheus::BuildCounter() + .Name("kv6_vv_tm_push_messages_total") + .Help("Number of KV6 VV_TM_PUSH messages received") + .Register(*registry)) + {} + + void addMeasurement(std::chrono::duration took_secs, size_t payload_size, size_t records, ParseStatus parsed) { + double millis = took_secs.count() * 1000.0; + + if (parsed == ParseStatus::OK) messages_counter_ok.Increment(); + else if (parsed == ParseStatus::WARNING) messages_counter_warning.Increment(); + else if (parsed == ParseStatus::ERROR) messages_counter_error.Increment(); + records_hist.Observe(static_cast(records)); + message_parse_hist.Observe(millis); + payload_size_hist.Observe(static_cast(payload_size)); + } + + void rowsWritten(int64_t rows) { + rows_written_counter.Increment(static_cast(rows)); + } + + private: + Metrics(std::shared_ptr registry, + prometheus::Family &messages_counter) : + messages_counter_ok(messages_counter + .Add({{ "status", "ok" }})), + messages_counter_error(messages_counter + .Add({{ "status", "error" }})), + messages_counter_warning(messages_counter + .Add({{ "status", "warning" }})), + rows_written_counter(prometheus::BuildCounter() + .Name("kv6_vv_tm_push_records_written") + .Help("Numer of VV_TM_PUSH records written to disk") + .Register(*registry) + .Add({})), + records_hist(prometheus::BuildHistogram() + .Name("kv6_vv_tm_push_records_amount") + .Help("Number of KV6 VV_TM_PUSH records") + .Register(*registry) + .Add({}, BucketBoundaries{ 5.0, 10.0, 20.0, 50.0, 100.0, 250.0, 500.0 })), + message_parse_hist(prometheus::BuildHistogram() + .Name("kv6_vv_tm_push_message_parse_millis") + .Help("Milliseconds taken to parse KV6 VV_TM_PUSH messages") + .Register(*registry) + .Add({}, BucketBoundaries{ 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 100.0, 1000.0, 2000.0 })), + payload_size_hist(prometheus::BuildHistogram() + .Name("kv6_payload_size") + .Help("Sizes of KV6 ZeroMQ message payloads") + .Register(*registry) + .Add({}, BucketBoundaries{ 500.0, 1000.0, 2500.0, 5000.0, 10000.0, 25000.0, 50000.0 })) + {} +}; + +// Note: it *must* hold that decompressed[size] == 0 +std::optional parseMsg(char *decompressed, size_t size, Metrics &metrics, std::stringstream &errs, std::stringstream &warns) { + auto start = std::chrono::steady_clock::now(); + + std::optional info; + + if (decompressed[size] != 0) { + errs << "Not parsing: missing null terminator" << '\n'; + } else { + rapidxml::xml_document<> doc; + constexpr int PARSE_FLAGS = rapidxml::parse_trim_whitespace + | rapidxml::parse_no_string_terminators + | rapidxml::parse_validate_closing_tags; + + try { + doc.parse(decompressed); + info = parseXml(doc, errs, warns); + } catch (const rapidxml::parse_error &err) { + errs << "XML parsing failed" << '\n'; + } + } + + auto end = std::chrono::steady_clock::now(); + std::chrono::duration took = end - start; + + if (info) + if (warns.view().empty()) + metrics.addMeasurement(took, size, info->messages.size(), Metrics::ParseStatus::OK); + else + metrics.addMeasurement(took, size, info->messages.size(), Metrics::ParseStatus::WARNING); + else + metrics.addMeasurement(took, size, 0, Metrics::ParseStatus::ERROR); + + return info; +} + +bool terminate = false; + +void onSigIntOrTerm(int /* signum */) { + terminate = true; +} + +arrow::Result> getTable(const std::vector &messages, size_t &rows_written) { + ParquetBuilder builder; + + for (const auto &msg : messages) { + Kv6Field present = msg.presence; + Kv6Field required = KV6T_REQUIRED_FIELDS[msg.type]; + Kv6Field optional = KV6T_OPTIONAL_FIELDS[msg.type]; + if ((~msg.presence & required) != 0) { + std::cout << "Invalid message: not all required fields present; skipping" << std::endl; + continue; + } + Kv6Field used = static_cast(present & (required | optional)); + rows_written++; + + // RD-X and RD-Y fix: some datatypes have these fields marked as required, but still give option + // of not providing these fields by setting them to -1. We want this normalized, where these + // fields are instead simply marked as not present. + if ((used & KV6F_RD_X) && msg.rd_x == -1) + used = static_cast(used & ~KV6F_RD_X); + if ((used & KV6F_RD_Y) && msg.rd_y == -1) + used = static_cast(used & ~KV6F_RD_Y); + + ARROW_RETURN_NOT_OK(builder.types.Append(*findKv6PosInfoRecordTypeName(msg.type))); + ARROW_RETURN_NOT_OK(used & KV6F_DATA_OWNER_CODE + ? builder.data_owner_codes.Append(msg.data_owner_code) + : builder.data_owner_codes.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_LINE_PLANNING_NUMBER + ? builder.line_planning_numbers.Append(msg.line_planning_number) + : builder.line_planning_numbers.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_OPERATING_DAY + ? builder.operating_days.Append(static_cast(msg.operating_day.toUnixDays().count())) + : builder.operating_days.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_JOURNEY_NUMBER + ? builder.journey_numbers.Append(msg.journey_number) + : builder.journey_numbers.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_REINFORCEMENT_NUMBER + ? builder.reinforcement_numbers.Append(msg.reinforcement_number) + : builder.reinforcement_numbers.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_TIMESTAMP + ? builder.timestamps.Append(msg.timestamp.toUnixSeconds().count()) + : builder.timestamps.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_SOURCE + ? builder.sources.Append(msg.source) + : builder.sources.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_PUNCTUALITY + ? builder.punctualities.Append(msg.punctuality) + : builder.punctualities.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_USER_STOP_CODE + ? builder.user_stop_codes.Append(msg.user_stop_code) + : builder.user_stop_codes.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_PASSAGE_SEQUENCE_NUMBER + ? builder.passage_sequence_numbers.Append(msg.passage_sequence_number) + : builder.passage_sequence_numbers.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_VEHICLE_NUMBER + ? builder.vehicle_numbers.Append(msg.vehicle_number) + : builder.vehicle_numbers.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_BLOCK_CODE + ? builder.block_codes.Append(msg.block_code) + : builder.block_codes.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_WHEELCHAIR_ACCESSIBLE + ? builder.wheelchair_accessibles.Append(msg.wheelchair_accessible) + : builder.wheelchair_accessibles.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_NUMBER_OF_COACHES + ? builder.number_of_coaches.Append(msg.number_of_coaches) + : builder.number_of_coaches.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_RD_Y + ? builder.rd_ys.Append(msg.rd_y) + : builder.rd_ys.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_RD_X + ? builder.rd_xs.Append(msg.rd_x) + : builder.rd_xs.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_DISTANCE_SINCE_LAST_USER_STOP + ? builder.distance_since_last_user_stops.Append(msg.distance_since_last_user_stop) + : builder.distance_since_last_user_stops.AppendNull()); + } + + return builder.getTable(); +} + +std::tuple getMinMaxTimestamp(const std::vector &messages) { + if (messages.size() == 0) + return { 0, 0 }; + int64_t min = std::numeric_limits::max(); + int64_t max = 0; + for (const auto &message : messages) { + if (~message.presence & KV6F_TIMESTAMP) + continue; + int64_t seconds = message.timestamp.toUnixSeconds().count(); + if (seconds < min) + min = seconds; + if (seconds > max) + max = seconds; + } + if (min == std::numeric_limits::max()) + return { 0, 0 }; // this is stupid + return { min, max }; +} + +arrow::Status writeParquet(const std::vector &messages, Metrics &metrics) { + size_t rows_written = 0; + ARROW_ASSIGN_OR_RAISE(std::shared_ptr table, getTable(messages, rows_written)); + + auto timestamp = std::chrono::round(std::chrono::utc_clock::now()); + std::string filename = std::format("oeuf-{:%FT%T%Ez}.parquet", timestamp); + ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*table, filename)); + std::cout << "Wrote Parquet file " << filename << std::endl; + + auto [min_timestamp, max_timestamp] = getMinMaxTimestamp(messages); + std::ofstream metaf(filename + ".meta.json.part", std::ios::binary); + nlohmann::json meta{ + { "min_timestamp", min_timestamp }, + { "max_timestamp", max_timestamp }, + { "rows_written", rows_written }, + }; + metaf << meta; + metaf.close(); + std::filesystem::rename(filename + ".meta.json.part", filename + ".meta.json"); + + metrics.rowsWritten(rows_written); + + return arrow::Status::OK(); +} + +using SteadyTime = std::chrono::steady_clock::time_point; + +std::string dumpFailedMsg(std::string_view txt, std::string_view errs, std::string_view warns) { + auto timestamp = std::chrono::round(std::chrono::utc_clock::now()); + std::string filename = std::format("oeuf-error-{:%FT%T%Ez}.txt", timestamp); + std::ofstream dumpf(filename, std::ios::binary); + dumpf << "======= ERROR MESSAGES ========" << std::endl; + dumpf << errs; + dumpf << "======= WARNING MESSAGES ======" << std::endl; + dumpf << warns; + dumpf << "======= RECEIVED MESSAGE ======" << std::endl; + dumpf << txt << std::endl; + dumpf.close(); + return filename; +} + +void handleMsg(RawMessage &msg, Metrics &metrics, SteadyTime &last_output, std::vector &msg_buf) { + unsigned int decompressed_size = 0; + if (msg.getBodySize() > std::numeric_limits::max()) + std::cout << "parseMsg failed due to too large message" << std::endl; + char *decompressed = decompress(msg.getBody(), static_cast(msg.getBodySize()), decompressed_size); + + std::stringstream errs; + std::stringstream warns; + // We know that decompressed[decompressed_size] == 0 because decompress() ensures this. + auto parsed_msg = parseMsg(decompressed, decompressed_size, metrics, errs, warns); + if (parsed_msg) { + const Tmi8VvTmPushInfo &info = *parsed_msg; + auto new_msgs_it = info.messages.begin(); + while (new_msgs_it != info.messages.end()) { + size_t remaining_space = MAX_PARQUET_CHUNK - msg_buf.size(); + size_t new_msgs_left = info.messages.end() - new_msgs_it; + auto new_msgs_start = new_msgs_it; + auto new_msgs_end = new_msgs_start + std::min(remaining_space, new_msgs_left); + new_msgs_it = new_msgs_end; + msg_buf.insert(msg_buf.end(), new_msgs_start, new_msgs_end); + + bool time_expired = std::chrono::steady_clock::now() - last_output > std::chrono::minutes(5); + if (msg_buf.size() >= MAX_PARQUET_CHUNK || (new_msgs_it == info.messages.end() && time_expired)) { + arrow::Status status = writeParquet(msg_buf, metrics); + if (!status.ok()) + std::cout << "Writing Parquet file failed: " << status << std::endl; + msg_buf.clear(); + last_output = std::chrono::steady_clock::now(); + } + } + if (!errs.view().empty() || !warns.view().empty()) { + std::filesystem::path dump_file = dumpFailedMsg(std::string_view(decompressed, decompressed_size), errs.str(), warns.str()); + std::cout << "parseMsg finished with warnings: details dumped to " << dump_file << std::endl; + } + } else { + std::filesystem::path dump_file = dumpFailedMsg(std::string_view(decompressed, decompressed_size), errs.str(), warns.str()); + std::cout << "parseMsg failed: error details dumped to " << dump_file << std::endl; + } + free(decompressed); +} + +int main(int argc, char *argv[]) { + std::cout << "Working directory: " << std::filesystem::current_path() << std::endl; + + const char *metrics_addr = getenv("METRICS_ADDR"); + if (!metrics_addr || strlen(metrics_addr) == 0) { + std::cout << "Error: no METRICS_ADDR set!" << std::endl; + exit(EXIT_FAILURE); + } + prometheus::Exposer exposer{metrics_addr}; + + bool prod = false; + const char *prod_env = getenv("NDOV_PRODUCTION"); + if (prod_env && strcmp(prod_env, "true") == 0) prod = true; + + void *zmq_context = zmq_ctx_new(); + void *zmq_subscriber = zmq_socket(zmq_context, ZMQ_SUB); + int rc = zmq_connect(zmq_subscriber, prod ? "tcp://pubsub.ndovloket.nl:7658" : "tcp://pubsub.besteffort.ndovloket.nl:7658"); + assert(rc == 0); + + const char *topic = "/CXX/KV6posinfo"; + rc = zmq_setsockopt(zmq_subscriber, ZMQ_SUBSCRIBE, topic, strlen(topic)); + assert(rc == 0); + + signal(SIGINT, onSigIntOrTerm); + signal(SIGTERM, onSigIntOrTerm); + + SteadyTime last_output = std::chrono::steady_clock::now(); + + auto registry = std::make_shared(); + Metrics metrics(registry); + exposer.RegisterCollectable(registry); + + std::vector msg_buf; + while (!terminate) { + std::optional msg = recvMsg(zmq_subscriber); + if (!msg) { + if (!terminate) + perror("recvMsg"); + continue; + } + handleMsg(*msg, metrics, last_output, msg_buf); + } + + std::cout << "Terminating" << std::endl; + if (msg_buf.size() > 0) { + arrow::Status status = writeParquet(msg_buf, metrics); + if (!status.ok()) std::cout << "Writing final Parquet file failed: " << status << std::endl; + else std::cout << "Final data written" << std::endl; + msg_buf.clear(); + } + + if (zmq_close(zmq_subscriber)) + perror("zmq_close"); + if (zmq_ctx_destroy(zmq_context)) + perror("zmq_ctx_destroy"); + + std::cout << "Bye" << std::endl; + + return 0; +} -- cgit v1.2.3