// vim:set sw=2 ts=2 sts et: // // Copyright 2024 Rutger Broekhoff. Licensed under the EUPL. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define CHUNK 16384 struct RawMessage { public: // Takes ownership of envelope and body RawMessage(zmq_msg_t envelope, zmq_msg_t body) : envelope(envelope), body(body) {} // Prevent copying RawMessage(const RawMessage &) = delete; RawMessage &operator=(RawMessage const &) = delete; std::string_view getEnvelope() { return static_cast(zmq_msg_data(&envelope)); } char *getBody() { return static_cast(zmq_msg_data(&body)); } size_t getBodySize() { return zmq_msg_size(&body); } ~RawMessage() { zmq_msg_close(&envelope); zmq_msg_close(&body); } private: zmq_msg_t envelope; zmq_msg_t body; }; std::optional recvMsg(void *socket) { while (true) { zmq_msg_t envelope, body; int rc = zmq_msg_init(&envelope); assert(rc == 0); rc = zmq_msg_init(&body); assert(rc == 0); rc = zmq_msg_recv(&envelope, socket, 0); if (rc == -1) return std::nullopt; int more; size_t more_size = sizeof(more); rc = zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size); if (!more) { zmq_msg_close(&envelope); zmq_msg_close(&body); continue; } rc = zmq_msg_recv(&body, socket, 0); if (rc == -1) return std::nullopt; rc = zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size); assert(!more); return std::make_optional(envelope, body); } } // Ensures that [output_size] == 0 char *decompress(char *raw, unsigned int input_size, unsigned int &output_size) { assert(input_size <= UINT32_MAX); z_stream strm; strm.next_in = reinterpret_cast(raw); strm.avail_in = input_size; strm.zalloc = Z_NULL; strm.zfree = Z_NULL; strm.opaque = Z_NULL; int rc = inflateInit2(&strm, 32); assert(rc == Z_OK); unsigned int buf_cap = CHUNK; unsigned int buf_len = 0; char *buf = static_cast(malloc(CHUNK)); do { if (buf_len + CHUNK > buf_cap) { assert(buf_cap <= UINT32_MAX); buf_cap *= 2; buf = static_cast(realloc(buf, buf_cap)); } strm.avail_out = buf_cap - buf_len; strm.next_out = reinterpret_cast(buf + buf_len); unsigned long old_total = strm.total_out; rc = inflate(&strm, Z_FINISH); unsigned progress = static_cast(strm.total_out - old_total); buf_len += progress; assert(progress != 0 || rc == Z_STREAM_END); } while (strm.total_in < input_size); if (buf_len == buf_cap) { buf = static_cast(realloc(buf, buf_len + 1)); } buf[buf_len] = 0; output_size = buf_len; rc = inflateEnd(&strm); assert(rc == Z_OK); return buf; } struct Date { int16_t year = 0; uint8_t month = 0; uint8_t day = 0; static bool parse(Date &dest, std::string_view src) { dest.year = 0, dest.month = 0, dest.day = 0; int16_t y_mul_fac = 1; bool extended = false; size_t plus = src.find('+'); if (plus != std::string_view::npos) { extended = true; src = src.substr(1); // remove plus sign from the start } if (!extended) { size_t min_or_dash = src.find('-'); if (min_or_dash == std::string_view::npos) return false; if (min_or_dash == 0) { y_mul_fac = -1; // it's a minus sign src = src.substr(1); // remove minus sign at the start } } int y_chars = 0; while (src.size() > 0 && src[0] >= '0' && src[0] <= '9') { dest.year = static_cast(dest.year * 10 + src[0] - '0'); src = src.substr(1); y_chars++; } if (src.size() == 0) { dest.year = 0; return false; } if (src[0] != '-') { dest.year = 0; return false; } src = src.substr(1); // remove dash if (y_chars < 4 || (y_chars > 4 && !extended)) { dest.year = 0; return false; } dest.year *= y_mul_fac; bool rest_correct = src.size() == 5 && src[0] >= '0' && src[0] <= '9' && src[1] >= '0' && src[1] <= '9' && src[3] >= '0' && src[3] <= '9' && src[4] >= '0' && src[4] <= '9'; if (!rest_correct) { dest.year = 0; return false; } dest.month = static_cast((src[0] - '0') * 10 + src[1] - '0'); dest.day = static_cast((src[3] - '0') * 10 + src[4] - '0'); if (dest.month > 12 || dest.day > 31) { dest.year = 0, dest.month = 0, dest.day = 0; return false; } return true; } std::string toString() const { if (year < 0 || year > 9999 || month < 0 || month > 12 || day < 0 || day > 31) throw std::invalid_argument("one or more date components (year, month, day) out of range"); char data[11] = "XXXX-XX-XX"; sprintf(data, "%04u-%02u-%02u", year, month, day); return data; } std::chrono::days toUnixDays() const { std::chrono::year_month_day ymd{std::chrono::year(year), std::chrono::month(month), std::chrono::day(day)}; // This is valid since C++20: as of C++20, the system clock is defined to measure the // Unix Time, the amount of seconds since Thursday 1 January 1970, without leap seconds. std::chrono::days since_epoch = std::chrono::sys_days(ymd).time_since_epoch(); return since_epoch; } }; struct Time { uint8_t hour = 0; uint8_t minute = 0; uint8_t second = 0; static bool parse(Time &dest, std::string_view src) { bool okay = src.size() == 8 && src[0] >= '0' && src[0] <= '9' && src[1] >= '0' && src[1] <= '9' && src[2] == ':' && src[3] >= '0' && src[3] <= '9' && src[4] >= '0' && src[4] <= '9' && src[5] == ':' && src[6] >= '0' && src[6] <= '9' && src[7] >= '0' && src[7] <= '9'; if (!okay) return false; dest.hour = static_cast((src[0] - '0') * 10 + src[1] - '0'); dest.minute = static_cast((src[3] - '0') * 10 + src[4] - '0'); dest.second = static_cast((src[6] - '0') * 10 + src[7] - '0'); if (dest.hour > 23 || dest.minute > 59 || dest.second > 59) { dest.hour = 0, dest.minute = 0, dest.second = 0; return false; } return true; } std::string toString() const { if (hour < 0 || hour > 23 || minute < 0 || minute > 59 || second < 0 || second > 59) throw std::invalid_argument("one or more time components (hour, minute, second) out of range"); char data[9] = "XX:XX:XX"; sprintf(data, "%02u:%02u:%02u", hour, minute, second); return data; } }; // Time zone designator struct Tzd { int16_t minutes = 0; static bool parse(Tzd &dest, std::string_view src) { dest.minutes = 0; if (src.size() == 0) return false; if (src == "Z") return true; int16_t multiplier = 1; if (src[0] == '-') multiplier = -1; else if (src[0] != '+') return false; src = src.substr(1); bool okay = src.size() == 5 && src[0] >= '0' && src[0] <= '9' && src[1] >= '0' && src[1] <= '9' && src[2] == ':' && src[3] >= '0' && src[3] <= '9' && src[4] >= '0' && src[4] <= '9'; if (!okay) return false; int16_t hours = static_cast((src[0] - '0') * 10 + src[1] - '0'); int16_t minutes = static_cast((src[3] - '0') * 10 + src[4] - '0'); if (hours > 23 || minutes > 59) return false; dest.minutes = static_cast(multiplier * (60 * hours + minutes)); return true; } std::string toString() const { if (minutes == 0) return "Z"; bool negative = minutes < 0; int hours_off = abs(minutes / 60); int mins_off = abs(minutes) - hours_off*60; if (hours_off > 23 || mins_off > 59) throw std::invalid_argument("offset out of range"); char data[7] = "+XX:XX"; sprintf(data, "%c%02u:%02u", negative ? '-' : '+', hours_off, mins_off); return data; } }; struct Timestamp { Date date; Tzd off; Time time; static bool parse(Timestamp &dest, std::string_view src) { size_t t = src.find('T'); if (t == std::string_view::npos || t + 1 >= src.size()) return false; std::string_view date = src.substr(0, t); std::string_view time_and_tzd = src.substr(t + 1); if (time_and_tzd.size() < 9) return false; if (!Date::parse(dest.date, date)) return false; std::string_view time = time_and_tzd.substr(0, 8); std::string_view tzd = time_and_tzd.substr(8); if (!Time::parse(dest.time, time)) return false; return Tzd::parse(dest.off, tzd); } std::string toString() const { return date.toString() + "T" + time.toString() + off.toString(); } std::chrono::seconds toUnixSeconds() const { std::chrono::year_month_day ymd(std::chrono::year(date.year), std::chrono::month(date.month), std::chrono::day(date.day)); std::chrono::sys_days sys_days(ymd); std::chrono::time_point utc_days(sys_days.time_since_epoch()); std::chrono::utc_seconds utc_seconds = std::chrono::time_point_cast(utc_days); utc_seconds += std::chrono::hours(time.hour) + std::chrono::minutes(time.minute) + std::chrono::seconds(time.second) - std::chrono::minutes(off.minutes); std::chrono::sys_seconds sys_seconds = std::chrono::utc_clock::to_sys(utc_seconds); std::chrono::seconds unix = sys_seconds.time_since_epoch(); return unix; } }; static const std::string_view TMI8_XML_NS = "http://bison.connekt.nl/tmi8/kv6/msg"; enum Kv6RecordType { KV6T_UNKNOWN = 0, KV6T_DELAY = 1, KV6T_INIT = 2, KV6T_ARRIVAL = 3, KV6T_ON_STOP = 4, KV6T_DEPARTURE = 5, KV6T_ON_ROUTE = 6, KV6T_ON_PATH = 7, KV6T_OFF_ROUTE = 8, KV6T_END = 9, // Always keep this updated to correspond to the // first and last elements of the enumeration! _KV6T_FIRST_TYPE = KV6T_UNKNOWN, _KV6T_LAST_TYPE = KV6T_END, }; enum Kv6Field { KV6F_NONE = 0, KV6F_DATA_OWNER_CODE = 1, KV6F_LINE_PLANNING_NUMBER = 2, KV6F_OPERATING_DAY = 4, KV6F_JOURNEY_NUMBER = 8, KV6F_REINFORCEMENT_NUMBER = 16, KV6F_TIMESTAMP = 32, KV6F_SOURCE = 64, KV6F_PUNCTUALITY = 128, KV6F_USER_STOP_CODE = 256, KV6F_PASSAGE_SEQUENCE_NUMBER = 512, KV6F_VEHICLE_NUMBER = 1024, KV6F_BLOCK_CODE = 2048, KV6F_WHEELCHAIR_ACCESSIBLE = 4096, KV6F_NUMBER_OF_COACHES = 8192, KV6F_RD_Y = 16384, KV6F_RD_X = 32768, KV6F_DISTANCE_SINCE_LAST_USER_STOP = 65536, }; static constexpr Kv6Field KV6T_REQUIRED_FIELDS[_KV6T_LAST_TYPE + 1] = { // KV6T_UNKNOWN KV6F_NONE, // KV6T_DELAY static_cast( KV6F_DATA_OWNER_CODE | KV6F_LINE_PLANNING_NUMBER | KV6F_OPERATING_DAY | KV6F_JOURNEY_NUMBER | KV6F_REINFORCEMENT_NUMBER | KV6F_TIMESTAMP | KV6F_SOURCE | KV6F_PUNCTUALITY), // KV6T_INIT static_cast( KV6F_DATA_OWNER_CODE | KV6F_LINE_PLANNING_NUMBER | KV6F_OPERATING_DAY | KV6F_JOURNEY_NUMBER | KV6F_REINFORCEMENT_NUMBER | KV6F_TIMESTAMP | KV6F_SOURCE | KV6F_USER_STOP_CODE | KV6F_PASSAGE_SEQUENCE_NUMBER | KV6F_VEHICLE_NUMBER | KV6F_BLOCK_CODE | KV6F_WHEELCHAIR_ACCESSIBLE | KV6F_NUMBER_OF_COACHES), // KV6T_ARRIVAL static_cast( KV6F_DATA_OWNER_CODE | KV6F_LINE_PLANNING_NUMBER | KV6F_OPERATING_DAY | KV6F_JOURNEY_NUMBER | KV6F_REINFORCEMENT_NUMBER | KV6F_USER_STOP_CODE | KV6F_PASSAGE_SEQUENCE_NUMBER | KV6F_TIMESTAMP | KV6F_SOURCE | KV6F_VEHICLE_NUMBER | KV6F_PUNCTUALITY), // KV6T_ON_STOP static_cast( KV6F_DATA_OWNER_CODE | KV6F_LINE_PLANNING_NUMBER | KV6F_OPERATING_DAY | KV6F_JOURNEY_NUMBER | KV6F_REINFORCEMENT_NUMBER | KV6F_USER_STOP_CODE | KV6F_PASSAGE_SEQUENCE_NUMBER | KV6F_TIMESTAMP | KV6F_SOURCE | KV6F_VEHICLE_NUMBER | KV6F_PUNCTUALITY), // KV6T_DEPARTURE static_cast( KV6F_DATA_OWNER_CODE | KV6F_LINE_PLANNING_NUMBER | KV6F_OPERATING_DAY | KV6F_JOURNEY_NUMBER | KV6F_REINFORCEMENT_NUMBER | KV6F_USER_STOP_CODE | KV6F_PASSAGE_SEQUENCE_NUMBER | KV6F_TIMESTAMP | KV6F_SOURCE | KV6F_VEHICLE_NUMBER | KV6F_PUNCTUALITY), // KV6T_ON_ROUTE static_cast( KV6F_DATA_OWNER_CODE | KV6F_LINE_PLANNING_NUMBER | KV6F_OPERATING_DAY | KV6F_JOURNEY_NUMBER | KV6F_REINFORCEMENT_NUMBER | KV6F_USER_STOP_CODE | KV6F_PASSAGE_SEQUENCE_NUMBER | KV6F_TIMESTAMP | KV6F_SOURCE | KV6F_VEHICLE_NUMBER | KV6F_PUNCTUALITY | KV6F_RD_X | KV6F_RD_Y), // KV6T_ON_PATH KV6F_NONE, // KV6T_OFF_ROUTE static_cast( KV6F_DATA_OWNER_CODE | KV6F_LINE_PLANNING_NUMBER | KV6F_OPERATING_DAY | KV6F_JOURNEY_NUMBER | KV6F_REINFORCEMENT_NUMBER | KV6F_TIMESTAMP | KV6F_SOURCE | KV6F_USER_STOP_CODE | KV6F_PASSAGE_SEQUENCE_NUMBER | KV6F_VEHICLE_NUMBER | KV6F_RD_X | KV6F_RD_Y), // KV6T_END static_cast( KV6F_DATA_OWNER_CODE | KV6F_LINE_PLANNING_NUMBER | KV6F_OPERATING_DAY | KV6F_JOURNEY_NUMBER | KV6F_REINFORCEMENT_NUMBER | KV6F_TIMESTAMP | KV6F_SOURCE | KV6F_USER_STOP_CODE | KV6F_PASSAGE_SEQUENCE_NUMBER | KV6F_VEHICLE_NUMBER), }; static constexpr Kv6Field KV6T_OPTIONAL_FIELDS[_KV6T_LAST_TYPE + 1] = { // KV6T_UNKNOWN KV6F_NONE, // KV6T_DELAY KV6F_NONE, // KV6T_INIT KV6F_NONE, // KV6T_ARRIVAL static_cast(KV6F_RD_X | KV6F_RD_Y), // KV6T_ON_STOP static_cast(KV6F_RD_X | KV6F_RD_Y), // KV6T_DEPARTURE static_cast(KV6F_RD_X | KV6F_RD_Y), // KV6T_ON_ROUTE KV6F_DISTANCE_SINCE_LAST_USER_STOP, // KV6T_ON_PATH KV6F_NONE, // KV6T_OFF_ROUTE KV6F_NONE, // KV6T_END KV6F_NONE, }; struct Kv6Record { Kv6RecordType type = KV6T_UNKNOWN; Kv6Field presence = KV6F_NONE; Kv6Field next = KV6F_NONE; std::string data_owner_code; std::string line_planning_number; std::string source; std::string user_stop_code; std::string wheelchair_accessible; Date operating_day; Timestamp timestamp; uint32_t block_code = 0; uint32_t journey_number = 0; uint32_t vehicle_number = 0; int32_t rd_x = 0; int32_t rd_y = 0; // The TMI8 specification is unclear: this field // might actually be called distancesincelaststop uint32_t distance_since_last_user_stop = 0; uint16_t passage_sequence_number = 0; int16_t punctuality = 0; uint8_t number_of_coaches = 0; uint8_t reinforcement_number = 0; void markPresent(Kv6Field field) { presence = static_cast(presence | field); } void removeUnsupportedFields() { Kv6Field required_fields = KV6T_REQUIRED_FIELDS[type]; Kv6Field optional_fields = KV6T_OPTIONAL_FIELDS[type]; Kv6Field supported_fields = static_cast(required_fields | optional_fields); presence = static_cast(presence & supported_fields); } bool valid() { Kv6Field required_fields = KV6T_REQUIRED_FIELDS[type]; Kv6Field optional_fields = KV6T_OPTIONAL_FIELDS[type]; Kv6Field supported_fields = static_cast(required_fields | optional_fields); Kv6Field required_field_presence = static_cast(presence & required_fields); Kv6Field unsupported_field_presence = static_cast(presence & ~supported_fields); return required_field_presence == required_fields && !unsupported_field_presence; } }; enum Tmi8VvTmPushInfoField { TMI8F_NONE = 0, TMI8F_SUBSCRIBER_ID = 1, TMI8F_VERSION = 2, TMI8F_DOSSIER_NAME = 4, TMI8F_TIMESTAMP = 8, }; struct Tmi8VvTmPushInfo { Tmi8VvTmPushInfoField next = TMI8F_NONE; Tmi8VvTmPushInfoField presence = TMI8F_NONE; std::string subscriber_id; std::string version; std::string dossier_name; Timestamp timestamp; std::vector messages; void markPresent(Tmi8VvTmPushInfoField field) { presence = static_cast(presence | field); } bool valid() { const Tmi8VvTmPushInfoField REQUIRED_FIELDS = static_cast( TMI8F_SUBSCRIBER_ID | TMI8F_VERSION | TMI8F_DOSSIER_NAME | TMI8F_TIMESTAMP); return (presence & REQUIRED_FIELDS) == REQUIRED_FIELDS; } }; static const std::array KV6_POS_INFO_RECORD_TYPES = { "UNKNOWN", "DELAY", "INIT", "ARRIVAL", "ONSTOP", "DEPARTURE", "ONROUTE", "ONPATH", "OFFROUTE", "END", }; std::optional findKv6PosInfoRecordTypeName(Kv6RecordType type) { if (type > _KV6T_LAST_TYPE) return std::nullopt; return KV6_POS_INFO_RECORD_TYPES[type]; } const std::array, 17> KV6_POS_INFO_RECORD_FIELDS = {{ { "dataownercode", KV6F_DATA_OWNER_CODE }, { "lineplanningnumber", KV6F_LINE_PLANNING_NUMBER }, { "operatingday", KV6F_OPERATING_DAY }, { "journeynumber", KV6F_JOURNEY_NUMBER }, { "reinforcementnumber", KV6F_REINFORCEMENT_NUMBER }, { "timestamp", KV6F_TIMESTAMP }, { "source", KV6F_SOURCE }, { "punctuality", KV6F_PUNCTUALITY }, { "userstopcode", KV6F_USER_STOP_CODE }, { "passagesequencenumber", KV6F_PASSAGE_SEQUENCE_NUMBER }, { "vehiclenumber", KV6F_VEHICLE_NUMBER }, { "blockcode", KV6F_BLOCK_CODE }, { "wheelchairaccessible", KV6F_WHEELCHAIR_ACCESSIBLE }, { "numberofcoaches", KV6F_NUMBER_OF_COACHES }, { "rd-y", KV6F_RD_Y }, { "rd-x", KV6F_RD_X }, { "distancesincelastuserstop", KV6F_DISTANCE_SINCE_LAST_USER_STOP }, }}; // Returns the maximum amount of digits such that it is guaranteed that // a corresponding amount of repeated 9's can be represented by the type. template constexpr size_t maxDigits() { size_t digits = 0; for (T x = std::numeric_limits::max(); x != 0; x /= 10) digits++; return digits - 1; } template constexpr bool parseUnsigned(T &out, std::string_view src) { static_assert(MaxDigits <= maxDigits()); if (src.size() > MaxDigits) return false; T res = 0; while (src.size() > 0) { if (src[0] < '0' || src[0] > '9') return false; res = static_cast(res * 10 + src[0] - '0'); src = src.substr(1); } out = res; return true; } template constexpr bool parseSigned(T &out, std::string_view src) { static_assert(MaxDigits <= maxDigits()); if (src.size() == 0) return false; bool negative = src[0] == '-'; if (negative) src = src.substr(1); if (src.size() > MaxDigits) return false; T res = 0; while (src.size() > 0) { if (src[0] < '0' || src[0] > '9') return false; res = static_cast(res * 10 + src[0] - '0'); src = src.substr(1); } out = negative ? -res : res; return true; } struct Xmlns { const Xmlns *next; std::string_view prefix; std::string_view url; }; std::optional resolve(std::string_view prefix, const Xmlns *nss) { while (nss) if (nss->prefix == prefix) return nss->url; else nss = nss->next; return std::nullopt; } template void withXmlnss(const rapidxml::xml_attribute<> *attr, const Xmlns *nss, const T &fn) { while (attr) { std::string_view name(attr->name(), attr->name_size()); if (name.starts_with("xmlns")) { if (name.size() == 5) { // just xmlns Xmlns ns0 = { .next = nss, .url = std::string_view(attr->value(), attr->value_size()), }; withXmlnss(attr->next_attribute(), &ns0, fn); return; } else if (name.size() > 6 && name[5] == ':') { // xmlns: Xmlns ns0 = { .next = nss, .prefix = name.substr(6), .url = std::string_view(attr->value(), attr->value_size()), }; withXmlnss(attr->next_attribute(), &ns0, fn); return; } } attr = attr->next_attribute(); } fn(nss); } template void ifResolvable(const rapidxml::xml_node<> &node, const Xmlns *nss, const T &fn) { std::string_view name(node.name(), node.name_size()); std::string_view ns; size_t colon = name.find(':'); if (colon != std::string_view::npos) { if (colon >= name.size() - 1) // last character return; ns = name.substr(0, colon); name = name.substr(colon + 1); } withXmlnss(node.first_attribute(), nss, [&](const Xmlns *nss) { std::optional ns_url = resolve(ns, nss); if (!ns_url && !ns.empty()) return; if (!ns_url) fn(std::string_view(), name, nss); else fn(*ns_url, name, nss); }); } template void ifTmi8Element(const rapidxml::xml_node<> &node, const Xmlns *nss, const T &fn) { ifResolvable(node, nss, [&](std::string_view ns_url, std::string_view name, const Xmlns *nss) { if (node.type() == rapidxml::node_element && (ns_url.empty() || ns_url == TMI8_XML_NS)) fn(name, nss); }); } bool onlyTextElement(const rapidxml::xml_node<> &node) { return node.type() == rapidxml::node_element && node.first_node() && node.first_node() == node.last_node() && node.first_node()->type() == rapidxml::node_data; } std::string_view getValue(const rapidxml::xml_node<> &node) { return std::string_view(node.value(), node.value_size()); } bool parseStringValue(std::string &into, size_t max_len, std::string_view val) { if (val.size() > max_len) return false; into = val; return true; } struct Kv6Parser { std::stringstream &errs; std::stringstream &warns; void error(std::string_view msg) { errs << msg << '\n'; } void warn(std::string_view msg) { warns << msg << '\n'; } #define PERRASSERT(msg, ...) do { if (!(__VA_ARGS__)) { error(msg); return; } } while (false) #define PWARNASSERT(msg, ...) do { if (!(__VA_ARGS__)) { warn(msg); return; } } while (false) std::optional parseKv6PosInfoRecord(Kv6RecordType type, const rapidxml::xml_node<> &node, const Xmlns *nss) { Kv6Record fields = { .type = type }; for (const rapidxml::xml_node<> *child = node.first_node(); child; child = child->next_sibling()) { ifTmi8Element(*child, nss, [&](std::string_view name, const Xmlns *nss) { for (const auto &[fname, field] : KV6_POS_INFO_RECORD_FIELDS) { if (field == KV6F_NONE) continue; if (fname == name) { PWARNASSERT("Expected KV6 record field element to only contain data", onlyTextElement(*child)); std::string_view childval = getValue(*child); switch (field) { case KV6F_DATA_OWNER_CODE: PWARNASSERT("Invalid value for dataownercode", parseStringValue(fields.data_owner_code, 10, childval)); break; case KV6F_LINE_PLANNING_NUMBER: PWARNASSERT("Invalid value for lineplanningnumber", parseStringValue(fields.line_planning_number, 10, childval)); break; case KV6F_OPERATING_DAY: PWARNASSERT("Invalid value for operatatingday: not a valid date", Date::parse(fields.operating_day, childval)); break; case KV6F_JOURNEY_NUMBER: PWARNASSERT("Invalid value for journeynumber:" " not a valid unsigned number with at most six digits", parseUnsigned<6>(fields.journey_number, childval)); break; case KV6F_REINFORCEMENT_NUMBER: PWARNASSERT("Invalid value for reinforcementnumber:" " not a valid unsigned number with at most two digits", parseUnsigned<2>(fields.reinforcement_number, childval)); break; case KV6F_TIMESTAMP: PWARNASSERT("Invalid value for timestamp: not a valid timestamp", Timestamp::parse(fields.timestamp, childval)); break; case KV6F_SOURCE: PWARNASSERT("Invalid value for source:" " not a valid string of at most 10 bytes", parseStringValue(fields.source, 10, childval)); break; case KV6F_PUNCTUALITY: PWARNASSERT("Invalid value for punctuality:" " not a valid signed number with at most four digits", parseSigned<4>(fields.punctuality, childval)); break; case KV6F_USER_STOP_CODE: PWARNASSERT("Invalid value for userstopcode:" " not a valid string of at most 10 bytes", parseStringValue(fields.user_stop_code, 10, childval)); break; case KV6F_PASSAGE_SEQUENCE_NUMBER: PWARNASSERT("Invalid value for passagesequencenumber:" " not a valid unsigned number with at most four digits", parseUnsigned<4>(fields.passage_sequence_number, childval)); break; case KV6F_VEHICLE_NUMBER: PWARNASSERT("Invalid value for vehiclenumber:" " not a valid unsigned number with at most six digits", parseUnsigned<6>(fields.vehicle_number, childval)); break; case KV6F_BLOCK_CODE: PWARNASSERT("Invalid value for blockcode:" " not a valid unsigned number with at most eight digits", parseUnsigned<8>(fields.block_code, childval)); break; case KV6F_WHEELCHAIR_ACCESSIBLE: PWARNASSERT("Invalid value for wheelchairaccessible:" " not a valid value for wheelchair accessibility", childval == "ACCESSIBLE" || childval == "NOTACCESSIBLE" || childval == "UNKNOWN"); fields.wheelchair_accessible = childval; break; case KV6F_NUMBER_OF_COACHES: PWARNASSERT("Invalid for numberofcoaches:" " not a valid unsigned number with at most two digits", parseUnsigned<2>(fields.number_of_coaches, childval)); break; case KV6F_RD_X: PWARNASSERT("Invalid value for rd-x:" " not a valid signed number with at most six digits", parseSigned<6>(fields.rd_x, childval)); break; case KV6F_RD_Y: PWARNASSERT("Invalid value for rd-y:" " not a valid signed number with at most six digits", parseSigned<6>(fields.rd_y, childval)); break; case KV6F_DISTANCE_SINCE_LAST_USER_STOP: PWARNASSERT("Invalid value for distancesincelastuserstop:" " not a valid unsigned number with at most five digits", parseUnsigned<5>(fields.distance_since_last_user_stop, childval)); break; case KV6F_NONE: error("NONE field type case should be unreachable in parseKv6PosInfoRecord"); return; } fields.markPresent(field); break; } } }); } fields.removeUnsupportedFields(); if (!fields.valid()) return std::nullopt; return fields; } std::vector parseKv6PosInfo(const rapidxml::xml_node<> &node, const Xmlns *nss) { std::vector records; for (const rapidxml::xml_node<> *child = node.first_node(); child; child = child->next_sibling()) { ifTmi8Element(*child, nss, [&](std::string_view name, const Xmlns *nss) { for (auto type = _KV6T_FIRST_TYPE; type != _KV6T_LAST_TYPE; type = static_cast(type + 1)) { if (type == KV6T_UNKNOWN) continue; if (KV6_POS_INFO_RECORD_TYPES[type] == name) { auto record = parseKv6PosInfoRecord(type, *child, nss); if (record) { records.push_back(*record); } } } }); } return records; } std::optional parseVvTmPush(const rapidxml::xml_node<> &node, const Xmlns *nss) { Tmi8VvTmPushInfo info; for (const rapidxml::xml_node<> *child = node.first_node(); child; child = child->next_sibling()) { ifTmi8Element(*child, nss, [&](std::string_view name, const Xmlns *nss) { if (name == "Timestamp") { PERRASSERT("Invalid value for Timestamp: Bad format", onlyTextElement(*child)); PERRASSERT("Invalid value for Timestamp: Invalid timestamp", Timestamp::parse(info.timestamp, getValue(*child))); info.markPresent(TMI8F_TIMESTAMP); } else if (name == "SubscriberID") { PERRASSERT("Invalid value for SubscriberID: Bad format", onlyTextElement(*child)); info.subscriber_id = getValue(*child); info.markPresent(TMI8F_SUBSCRIBER_ID); } else if (name == "Version") { PERRASSERT("Invalid value for Version: Bad format", onlyTextElement(*child)); info.version = getValue(*child); info.markPresent(TMI8F_VERSION); } else if (name == "DossierName") { PERRASSERT("Invalid value for DossierName: Bad format", onlyTextElement(*child)); info.dossier_name = getValue(*child); info.markPresent(TMI8F_DOSSIER_NAME); } else if (name == "KV6posinfo") { info.messages = parseKv6PosInfo(*child, nss); } }); } if (!info.valid()) return std::nullopt; return info; } std::optional parse(const rapidxml::xml_document<> &doc) { std::optional msg; withXmlnss(doc.first_attribute(), nullptr /* nss */, [&](const Xmlns *nss) { for (const rapidxml::xml_node<> *node = doc.first_node(); node; node = node->next_sibling()) { ifTmi8Element(*node, nss, [&](std::string_view name, const Xmlns *node_nss) { if (name == "VV_TM_PUSH") { if (msg) { error("Duplicated VV_TM_PUSH"); return; } msg = parseVvTmPush(*node, node_nss); if (!msg) { error("Invalid VV_TM_PUSH"); } } }); } }); if (!msg) error("Expected to find VV_TM_PUSH"); return msg; } }; std::optional parseXml(const rapidxml::xml_document<> &doc, std::stringstream &errs, std::stringstream &warns) { Kv6Parser parser = { errs, warns }; return parser.parse(doc); } struct Metrics { prometheus::Counter &messages_counter_ok; prometheus::Counter &messages_counter_error; prometheus::Counter &messages_counter_warning; prometheus::Counter &rows_written_counter; prometheus::Histogram &records_hist; prometheus::Histogram &message_parse_hist; prometheus::Histogram &payload_size_hist; using BucketBoundaries = prometheus::Histogram::BucketBoundaries; enum class ParseStatus { OK, WARNING, ERROR, }; Metrics(std::shared_ptr registry) : Metrics(registry, prometheus::BuildCounter() .Name("kv6_vv_tm_push_messages_total") .Help("Number of KV6 VV_TM_PUSH messages received") .Register(*registry)) {} void addMeasurement(std::chrono::duration took_secs, size_t payload_size, size_t records, ParseStatus parsed) { double millis = took_secs.count() * 1000.0; if (parsed == ParseStatus::OK) messages_counter_ok.Increment(); else if (parsed == ParseStatus::WARNING) messages_counter_warning.Increment(); else if (parsed == ParseStatus::ERROR) messages_counter_error.Increment(); records_hist.Observe(static_cast(records)); message_parse_hist.Observe(millis); payload_size_hist.Observe(static_cast(payload_size)); } void rowsWritten(int64_t rows) { rows_written_counter.Increment(static_cast(rows)); } private: Metrics(std::shared_ptr registry, prometheus::Family &messages_counter) : messages_counter_ok(messages_counter .Add({{ "status", "ok" }})), messages_counter_error(messages_counter .Add({{ "status", "error" }})), messages_counter_warning(messages_counter .Add({{ "status", "warning" }})), rows_written_counter(prometheus::BuildCounter() .Name("kv6_vv_tm_push_records_written") .Help("Numer of VV_TM_PUSH records written to disk") .Register(*registry) .Add({})), records_hist(prometheus::BuildHistogram() .Name("kv6_vv_tm_push_records_amount") .Help("Number of KV6 VV_TM_PUSH records") .Register(*registry) .Add({}, BucketBoundaries{ 5.0, 10.0, 20.0, 50.0, 100.0, 250.0, 500.0 })), message_parse_hist(prometheus::BuildHistogram() .Name("kv6_vv_tm_push_message_parse_millis") .Help("Milliseconds taken to parse KV6 VV_TM_PUSH messages") .Register(*registry) .Add({}, BucketBoundaries{ 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 100.0, 1000.0, 2000.0 })), payload_size_hist(prometheus::BuildHistogram() .Name("kv6_payload_size") .Help("Sizes of KV6 ZeroMQ message payloads") .Register(*registry) .Add({}, BucketBoundaries{ 500.0, 1000.0, 2500.0, 5000.0, 10000.0, 25000.0, 50000.0 })) {} }; // Note: it *must* hold that decompressed[size] == 0 std::optional parseMsg(char *decompressed, size_t size, Metrics &metrics, std::stringstream &errs, std::stringstream &warns) { auto start = std::chrono::steady_clock::now(); std::optional info; if (decompressed[size] != 0) { errs << "Not parsing: missing null terminator" << '\n'; } else { rapidxml::xml_document<> doc; constexpr int PARSE_FLAGS = rapidxml::parse_trim_whitespace | rapidxml::parse_no_string_terminators | rapidxml::parse_validate_closing_tags; try { doc.parse(decompressed); info = parseXml(doc, errs, warns); } catch (const rapidxml::parse_error &err) { errs << "XML parsing failed" << '\n'; } } auto end = std::chrono::steady_clock::now(); std::chrono::duration took = end - start; if (info) if (warns.view().empty()) metrics.addMeasurement(took, size, info->messages.size(), Metrics::ParseStatus::OK); else metrics.addMeasurement(took, size, info->messages.size(), Metrics::ParseStatus::WARNING); else metrics.addMeasurement(took, size, 0, Metrics::ParseStatus::ERROR); return info; } bool terminate = false; void onSigIntOrTerm(int /* signum */) { terminate = true; } arrow::Result> getTable(const std::vector &messages, size_t &rows_written) { ParquetBuilder builder; for (const auto &msg : messages) { Kv6Field present = msg.presence; Kv6Field required = KV6T_REQUIRED_FIELDS[msg.type]; Kv6Field optional = KV6T_OPTIONAL_FIELDS[msg.type]; if ((~msg.presence & required) != 0) { std::cout << "Invalid message: not all required fields present; skipping" << std::endl; continue; } Kv6Field used = static_cast(present & (required | optional)); rows_written++; // RD-X and RD-Y fix: some datatypes have these fields marked as required, but still give option // of not providing these fields by setting them to -1. We want this normalized, where these // fields are instead simply marked as not present. if ((used & KV6F_RD_X) && msg.rd_x == -1) used = static_cast(used & ~KV6F_RD_X); if ((used & KV6F_RD_Y) && msg.rd_y == -1) used = static_cast(used & ~KV6F_RD_Y); ARROW_RETURN_NOT_OK(builder.types.Append(*findKv6PosInfoRecordTypeName(msg.type))); ARROW_RETURN_NOT_OK(used & KV6F_DATA_OWNER_CODE ? builder.data_owner_codes.Append(msg.data_owner_code) : builder.data_owner_codes.AppendNull()); ARROW_RETURN_NOT_OK(used & KV6F_LINE_PLANNING_NUMBER ? builder.line_planning_numbers.Append(msg.line_planning_number) : builder.line_planning_numbers.AppendNull()); ARROW_RETURN_NOT_OK(used & KV6F_OPERATING_DAY ? builder.operating_days.Append(static_cast(msg.operating_day.toUnixDays().count())) : builder.operating_days.AppendNull()); ARROW_RETURN_NOT_OK(used & KV6F_JOURNEY_NUMBER ? builder.journey_numbers.Append(msg.journey_number) : builder.journey_numbers.AppendNull()); ARROW_RETURN_NOT_OK(used & KV6F_REINFORCEMENT_NUMBER ? builder.reinforcement_numbers.Append(msg.reinforcement_number) : builder.reinforcement_numbers.AppendNull()); ARROW_RETURN_NOT_OK(used & KV6F_TIMESTAMP ? builder.timestamps.Append(msg.timestamp.toUnixSeconds().count()) : builder.timestamps.AppendNull()); ARROW_RETURN_NOT_OK(used & KV6F_SOURCE ? builder.sources.Append(msg.source) : builder.sources.AppendNull()); ARROW_RETURN_NOT_OK(used & KV6F_PUNCTUALITY ? builder.punctualities.Append(msg.punctuality) : builder.punctualities.AppendNull()); ARROW_RETURN_NOT_OK(used & KV6F_USER_STOP_CODE ? builder.user_stop_codes.Append(msg.user_stop_code) : builder.user_stop_codes.AppendNull()); ARROW_RETURN_NOT_OK(used & KV6F_PASSAGE_SEQUENCE_NUMBER ? builder.passage_sequence_numbers.Append(msg.passage_sequence_number) : builder.passage_sequence_numbers.AppendNull()); ARROW_RETURN_NOT_OK(used & KV6F_VEHICLE_NUMBER ? builder.vehicle_numbers.Append(msg.vehicle_number) : builder.vehicle_numbers.AppendNull()); ARROW_RETURN_NOT_OK(used & KV6F_BLOCK_CODE ? builder.block_codes.Append(msg.block_code) : builder.block_codes.AppendNull()); ARROW_RETURN_NOT_OK(used & KV6F_WHEELCHAIR_ACCESSIBLE ? builder.wheelchair_accessibles.Append(msg.wheelchair_accessible) : builder.wheelchair_accessibles.AppendNull()); ARROW_RETURN_NOT_OK(used & KV6F_NUMBER_OF_COACHES ? builder.number_of_coaches.Append(msg.number_of_coaches) : builder.number_of_coaches.AppendNull()); ARROW_RETURN_NOT_OK(used & KV6F_RD_Y ? builder.rd_ys.Append(msg.rd_y) : builder.rd_ys.AppendNull()); ARROW_RETURN_NOT_OK(used & KV6F_RD_X ? builder.rd_xs.Append(msg.rd_x) : builder.rd_xs.AppendNull()); ARROW_RETURN_NOT_OK(used & KV6F_DISTANCE_SINCE_LAST_USER_STOP ? builder.distance_since_last_user_stops.Append(msg.distance_since_last_user_stop) : builder.distance_since_last_user_stops.AppendNull()); } return builder.getTable(); } std::tuple getMinMaxTimestamp(const std::vector &messages) { if (messages.size() == 0) return { 0, 0 }; int64_t min = std::numeric_limits::max(); int64_t max = 0; for (const auto &message : messages) { if (~message.presence & KV6F_TIMESTAMP) continue; int64_t seconds = message.timestamp.toUnixSeconds().count(); if (seconds < min) min = seconds; if (seconds > max) max = seconds; } if (min == std::numeric_limits::max()) return { 0, 0 }; // this is stupid return { min, max }; } arrow::Status writeParquet(const std::vector &messages, Metrics &metrics) { size_t rows_written = 0; ARROW_ASSIGN_OR_RAISE(std::shared_ptr table, getTable(messages, rows_written)); auto timestamp = std::chrono::round(std::chrono::utc_clock::now()); std::string filename = std::format("oeuf-{:%FT%T%Ez}.parquet", timestamp); ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*table, filename)); std::cout << "Wrote Parquet file " << filename << std::endl; auto [min_timestamp, max_timestamp] = getMinMaxTimestamp(messages); std::ofstream metaf(filename + ".meta.json.part", std::ios::binary); nlohmann::json meta{ { "min_timestamp", min_timestamp }, { "max_timestamp", max_timestamp }, { "rows_written", rows_written }, }; metaf << meta; metaf.close(); std::filesystem::rename(filename + ".meta.json.part", filename + ".meta.json"); metrics.rowsWritten(rows_written); return arrow::Status::OK(); } using SteadyTime = std::chrono::steady_clock::time_point; std::string dumpFailedMsg(std::string_view txt, std::string_view errs, std::string_view warns) { auto timestamp = std::chrono::round(std::chrono::utc_clock::now()); std::string filename = std::format("oeuf-error-{:%FT%T%Ez}.txt", timestamp); std::ofstream dumpf(filename, std::ios::binary); dumpf << "======= ERROR MESSAGES ========" << std::endl; dumpf << errs; dumpf << "======= WARNING MESSAGES ======" << std::endl; dumpf << warns; dumpf << "======= RECEIVED MESSAGE ======" << std::endl; dumpf << txt << std::endl; dumpf.close(); return filename; } void handleMsg(RawMessage &msg, Metrics &metrics, SteadyTime &last_output, std::vector &msg_buf) { unsigned int decompressed_size = 0; if (msg.getBodySize() > std::numeric_limits::max()) std::cout << "parseMsg failed due to too large message" << std::endl; char *decompressed = decompress(msg.getBody(), static_cast(msg.getBodySize()), decompressed_size); std::stringstream errs; std::stringstream warns; // We know that decompressed[decompressed_size] == 0 because decompress() ensures this. auto parsed_msg = parseMsg(decompressed, decompressed_size, metrics, errs, warns); if (parsed_msg) { const Tmi8VvTmPushInfo &info = *parsed_msg; auto new_msgs_it = info.messages.begin(); while (new_msgs_it != info.messages.end()) { size_t remaining_space = MAX_PARQUET_CHUNK - msg_buf.size(); size_t new_msgs_left = info.messages.end() - new_msgs_it; auto new_msgs_start = new_msgs_it; auto new_msgs_end = new_msgs_start + std::min(remaining_space, new_msgs_left); new_msgs_it = new_msgs_end; msg_buf.insert(msg_buf.end(), new_msgs_start, new_msgs_end); bool time_expired = std::chrono::steady_clock::now() - last_output > std::chrono::minutes(5); if (msg_buf.size() >= MAX_PARQUET_CHUNK || (new_msgs_it == info.messages.end() && time_expired)) { arrow::Status status = writeParquet(msg_buf, metrics); if (!status.ok()) std::cout << "Writing Parquet file failed: " << status << std::endl; msg_buf.clear(); last_output = std::chrono::steady_clock::now(); } } if (!errs.view().empty() || !warns.view().empty()) { std::filesystem::path dump_file = dumpFailedMsg(std::string_view(decompressed, decompressed_size), errs.str(), warns.str()); std::cout << "parseMsg finished with warnings: details dumped to " << dump_file << std::endl; } } else { std::filesystem::path dump_file = dumpFailedMsg(std::string_view(decompressed, decompressed_size), errs.str(), warns.str()); std::cout << "parseMsg failed: error details dumped to " << dump_file << std::endl; } free(decompressed); } int main(int argc, char *argv[]) { std::cout << "Working directory: " << std::filesystem::current_path() << std::endl; const char *metrics_addr = getenv("METRICS_ADDR"); if (!metrics_addr || strlen(metrics_addr) == 0) { std::cout << "Error: no METRICS_ADDR set!" << std::endl; exit(EXIT_FAILURE); } prometheus::Exposer exposer{metrics_addr}; bool prod = false; const char *prod_env = getenv("NDOV_PRODUCTION"); if (prod_env && strcmp(prod_env, "true") == 0) prod = true; void *zmq_context = zmq_ctx_new(); void *zmq_subscriber = zmq_socket(zmq_context, ZMQ_SUB); int rc = zmq_connect(zmq_subscriber, prod ? "tcp://pubsub.ndovloket.nl:7658" : "tcp://pubsub.besteffort.ndovloket.nl:7658"); assert(rc == 0); const char *topic = "/CXX/KV6posinfo"; rc = zmq_setsockopt(zmq_subscriber, ZMQ_SUBSCRIBE, topic, strlen(topic)); assert(rc == 0); signal(SIGINT, onSigIntOrTerm); signal(SIGTERM, onSigIntOrTerm); SteadyTime last_output = std::chrono::steady_clock::now(); auto registry = std::make_shared(); Metrics metrics(registry); exposer.RegisterCollectable(registry); std::vector msg_buf; while (!terminate) { std::optional msg = recvMsg(zmq_subscriber); if (!msg) { if (!terminate) perror("recvMsg"); continue; } handleMsg(*msg, metrics, last_output, msg_buf); } std::cout << "Terminating" << std::endl; if (msg_buf.size() > 0) { arrow::Status status = writeParquet(msg_buf, metrics); if (!status.ok()) std::cout << "Writing final Parquet file failed: " << status << std::endl; else std::cout << "Final data written" << std::endl; msg_buf.clear(); } if (zmq_close(zmq_subscriber)) perror("zmq_close"); if (zmq_ctx_destroy(zmq_context)) perror("zmq_ctx_destroy"); std::cout << "Bye" << std::endl; return 0; }