aboutsummaryrefslogtreecommitdiffstats
path: root/src/recvkv6
diff options
context:
space:
mode:
authorLibravatar Rutger Broekhoff2024-05-02 20:27:40 +0200
committerLibravatar Rutger Broekhoff2024-05-02 20:27:40 +0200
commit17a3ea880402338420699e03bcb24181e4ff3924 (patch)
treeda666ef91e0b60d20aa0b01529644c136fd1f4ab /src/recvkv6
downloadoeuf-17a3ea880402338420699e03bcb24181e4ff3924.tar.gz
oeuf-17a3ea880402338420699e03bcb24181e4ff3924.zip
Initial commit
Based on dc4ba6a
Diffstat (limited to 'src/recvkv6')
-rw-r--r--src/recvkv6/.envrc2
-rw-r--r--src/recvkv6/Makefile21
-rw-r--r--src/recvkv6/main.cpp1300
3 files changed, 1323 insertions, 0 deletions
diff --git a/src/recvkv6/.envrc b/src/recvkv6/.envrc
new file mode 100644
index 0000000..694e74f
--- /dev/null
+++ b/src/recvkv6/.envrc
@@ -0,0 +1,2 @@
1source_env ../../
2export DEVMODE=1
diff --git a/src/recvkv6/Makefile b/src/recvkv6/Makefile
new file mode 100644
index 0000000..12ff7fb
--- /dev/null
+++ b/src/recvkv6/Makefile
@@ -0,0 +1,21 @@
1# Taken from:
2# Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide
3# for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01,
4# 2023. [Online]. Available:
5# https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html
6CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\
7 -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \
8 -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \
9 -D_GLIBCXX_ASSERTIONS \
10 -fstrict-flex-arrays=3 \
11 -fstack-clash-protection -fstack-protector-strong
12LDFLAGS=-lzmq -larrow -lparquet -lprometheus-cpp-pull -lprometheus-cpp-core -lz -ltmi8 -Wl,-z,defs \
13 -Wl,-z,nodlopen -Wl,-z,noexecstack \
14 -Wl,-z,relro -Wl,-z,now
15
16recvkv6: main.cpp
17 $(CXX) -o $@ $^ $(CXXFLAGS) $(LDFLAGS)
18
19.PHONY: clean
20clean:
21 rm recvkv6
diff --git a/src/recvkv6/main.cpp b/src/recvkv6/main.cpp
new file mode 100644
index 0000000..2ac3669
--- /dev/null
+++ b/src/recvkv6/main.cpp
@@ -0,0 +1,1300 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include <array>
4#include <cassert>
5#include <chrono>
6#include <csignal>
7#include <cstring>
8#include <filesystem>
9#include <format>
10#include <fstream>
11#include <iostream>
12#include <optional>
13#include <stack>
14#include <string>
15#include <sstream>
16#include <vector>
17
18#include <zlib.h>
19#include <zmq.h>
20
21#include <nlohmann/json.hpp>
22
23#include <prometheus/counter.h>
24#include <prometheus/exposer.h>
25#include <prometheus/histogram.h>
26#include <prometheus/registry.h>
27
28#include <rapidxml/rapidxml.hpp>
29
30#include <tmi8/kv6_parquet.hpp>
31
32#define CHUNK 16384
33
34struct RawMessage {
35 public:
36 // Takes ownership of envelope and body
37 RawMessage(zmq_msg_t envelope, zmq_msg_t body)
38 : envelope(envelope), body(body)
39 {}
40
41 // Prevent copying
42 RawMessage(const RawMessage &) = delete;
43 RawMessage &operator=(RawMessage const &) = delete;
44
45 std::string_view getEnvelope() {
46 return static_cast<const char *>(zmq_msg_data(&envelope));
47 }
48
49 char *getBody() {
50 return static_cast<char *>(zmq_msg_data(&body));
51 }
52
53 size_t getBodySize() {
54 return zmq_msg_size(&body);
55 }
56
57 ~RawMessage() {
58 zmq_msg_close(&envelope);
59 zmq_msg_close(&body);
60 }
61
62 private:
63 zmq_msg_t envelope;
64 zmq_msg_t body;
65};
66
67std::optional<RawMessage> recvMsg(void *socket) {
68 while (true) {
69 zmq_msg_t envelope, body;
70 int rc = zmq_msg_init(&envelope);
71 assert(rc == 0);
72 rc = zmq_msg_init(&body);
73 assert(rc == 0);
74
75 rc = zmq_msg_recv(&envelope, socket, 0);
76 if (rc == -1) return std::nullopt;
77
78 int more;
79 size_t more_size = sizeof(more);
80 rc = zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size);
81 if (!more) {
82 zmq_msg_close(&envelope);
83 zmq_msg_close(&body);
84 continue;
85 }
86
87 rc = zmq_msg_recv(&body, socket, 0);
88 if (rc == -1) return std::nullopt;
89
90 rc = zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size);
91 assert(!more);
92
93 return std::make_optional<RawMessage>(envelope, body);
94 }
95}
96
97// Ensures that <return value>[output_size] == 0
98char *decompress(char *raw, unsigned int input_size, unsigned int &output_size) {
99 assert(input_size <= UINT32_MAX);
100
101 z_stream strm;
102 strm.next_in = reinterpret_cast<unsigned char *>(raw);
103 strm.avail_in = input_size;
104 strm.zalloc = Z_NULL;
105 strm.zfree = Z_NULL;
106 strm.opaque = Z_NULL;
107 int rc = inflateInit2(&strm, 32);
108 assert(rc == Z_OK);
109
110 unsigned int buf_cap = CHUNK;
111 unsigned int buf_len = 0;
112 char *buf = static_cast<char *>(malloc(CHUNK));
113 do {
114 if (buf_len + CHUNK > buf_cap) {
115 assert(buf_cap <= UINT32_MAX);
116 buf_cap *= 2;
117 buf = static_cast<char *>(realloc(buf, buf_cap));
118 }
119 strm.avail_out = buf_cap - buf_len;
120 strm.next_out = reinterpret_cast<unsigned char *>(buf + buf_len);
121
122 unsigned long old_total = strm.total_out;
123 rc = inflate(&strm, Z_FINISH);
124 unsigned progress = static_cast<unsigned int>(strm.total_out - old_total);
125 buf_len += progress;
126 assert(progress != 0 || rc == Z_STREAM_END);
127 } while (strm.total_in < input_size);
128
129 if (buf_len == buf_cap) {
130 buf = static_cast<char *>(realloc(buf, buf_len + 1));
131 }
132 buf[buf_len] = 0;
133 output_size = buf_len;
134
135 rc = inflateEnd(&strm);
136 assert(rc == Z_OK);
137
138 return buf;
139}
140
141struct Date {
142 int16_t year = 0;
143 uint8_t month = 0;
144 uint8_t day = 0;
145
146 static bool parse(Date &dest, std::string_view src) {
147 dest.year = 0, dest.month = 0, dest.day = 0;
148
149 int16_t y_mul_fac = 1;
150 bool extended = false;
151
152 size_t plus = src.find('+');
153 if (plus != std::string_view::npos) {
154 extended = true;
155 src = src.substr(1); // remove plus sign from the start
156 }
157 if (!extended) {
158 size_t min_or_dash = src.find('-');
159 if (min_or_dash == std::string_view::npos) return false;
160 if (min_or_dash == 0) {
161 y_mul_fac = -1; // it's a minus sign
162 src = src.substr(1); // remove minus sign at the start
163 }
164 }
165
166 int y_chars = 0;
167 while (src.size() > 0 && src[0] >= '0' && src[0] <= '9') {
168 dest.year = static_cast<int16_t>(dest.year * 10 + src[0] - '0');
169 src = src.substr(1);
170 y_chars++;
171 }
172 if (src.size() == 0) { dest.year = 0; return false; }
173 if (src[0] != '-') { dest.year = 0; return false; }
174 src = src.substr(1); // remove dash
175 if (y_chars < 4 || (y_chars > 4 && !extended)) { dest.year = 0; return false; }
176 dest.year *= y_mul_fac;
177
178 bool rest_correct = src.size() == 5
179 && src[0] >= '0' && src[0] <= '9'
180 && src[1] >= '0' && src[1] <= '9'
181 && src[3] >= '0' && src[3] <= '9'
182 && src[4] >= '0' && src[4] <= '9';
183 if (!rest_correct) { dest.year = 0; return false; }
184 dest.month = static_cast<uint8_t>((src[0] - '0') * 10 + src[1] - '0');
185 dest.day = static_cast<uint8_t>((src[3] - '0') * 10 + src[4] - '0');
186 if (dest.month > 12 || dest.day > 31) {
187 dest.year = 0, dest.month = 0, dest.day = 0;
188 return false;
189 }
190 return true;
191 }
192
193 std::string toString() const {
194 if (year < 0 || year > 9999 || month < 0 || month > 12 || day < 0 || day > 31)
195 throw std::invalid_argument("one or more date components (year, month, day) out of range");
196 char data[11] = "XXXX-XX-XX";
197 sprintf(data, "%04u-%02u-%02u", year, month, day);
198 return data;
199 }
200
201 std::chrono::days toUnixDays() const {
202 std::chrono::year_month_day ymd{std::chrono::year(year), std::chrono::month(month), std::chrono::day(day)};
203 // This is valid since C++20: as of C++20, the system clock is defined to measure the
204 // Unix Time, the amount of seconds since Thursday 1 January 1970, without leap seconds.
205 std::chrono::days since_epoch = std::chrono::sys_days(ymd).time_since_epoch();
206 return since_epoch;
207 }
208};
209
210struct Time {
211 uint8_t hour = 0;
212 uint8_t minute = 0;
213 uint8_t second = 0;
214
215 static bool parse(Time &dest, std::string_view src) {
216 bool okay = src.size() == 8
217 && src[0] >= '0' && src[0] <= '9'
218 && src[1] >= '0' && src[1] <= '9'
219 && src[2] == ':'
220 && src[3] >= '0' && src[3] <= '9'
221 && src[4] >= '0' && src[4] <= '9'
222 && src[5] == ':'
223 && src[6] >= '0' && src[6] <= '9'
224 && src[7] >= '0' && src[7] <= '9';
225 if (!okay) return false;
226 dest.hour = static_cast<uint8_t>((src[0] - '0') * 10 + src[1] - '0');
227 dest.minute = static_cast<uint8_t>((src[3] - '0') * 10 + src[4] - '0');
228 dest.second = static_cast<uint8_t>((src[6] - '0') * 10 + src[7] - '0');
229 if (dest.hour > 23 || dest.minute > 59 || dest.second > 59) {
230 dest.hour = 0, dest.minute = 0, dest.second = 0;
231 return false;
232 }
233 return true;
234 }
235
236 std::string toString() const {
237 if (hour < 0 || hour > 23 || minute < 0 || minute > 59 || second < 0 || second > 59)
238 throw std::invalid_argument("one or more time components (hour, minute, second) out of range");
239 char data[9] = "XX:XX:XX";
240 sprintf(data, "%02u:%02u:%02u", hour, minute, second);
241 return data;
242 }
243};
244
245// Time zone designator
246struct Tzd {
247 int16_t minutes = 0;
248
249 static bool parse(Tzd &dest, std::string_view src) {
250 dest.minutes = 0;
251
252 if (src.size() == 0) return false;
253 if (src == "Z") return true;
254
255 int16_t multiplier = 1;
256 if (src[0] == '-') multiplier = -1;
257 else if (src[0] != '+') return false;
258 src = src.substr(1);
259
260 bool okay = src.size() == 5
261 && src[0] >= '0' && src[0] <= '9'
262 && src[1] >= '0' && src[1] <= '9'
263 && src[2] == ':'
264 && src[3] >= '0' && src[3] <= '9'
265 && src[4] >= '0' && src[4] <= '9';
266 if (!okay) return false;
267 int16_t hours = static_cast<int16_t>((src[0] - '0') * 10 + src[1] - '0');
268 int16_t minutes = static_cast<int16_t>((src[3] - '0') * 10 + src[4] - '0');
269 if (hours > 23 || minutes > 59) return false;
270 dest.minutes = static_cast<int16_t>(multiplier * (60 * hours + minutes));
271 return true;
272 }
273
274 std::string toString() const {
275 if (minutes == 0)
276 return "Z";
277
278 bool negative = minutes < 0;
279 int hours_off = abs(minutes / 60);
280 int mins_off = abs(minutes) - hours_off*60;
281 if (hours_off > 23 || mins_off > 59)
282 throw std::invalid_argument("offset out of range");
283 char data[7] = "+XX:XX";
284 sprintf(data, "%c%02u:%02u", negative ? '-' : '+', hours_off, mins_off);
285 return data;
286 }
287};
288
289struct Timestamp {
290 Date date;
291 Tzd off;
292 Time time;
293
294 static bool parse(Timestamp &dest, std::string_view src) {
295 size_t t = src.find('T');
296 if (t == std::string_view::npos || t + 1 >= src.size()) return false;
297
298 std::string_view date = src.substr(0, t);
299 std::string_view time_and_tzd = src.substr(t + 1);
300 if (time_and_tzd.size() < 9) return false;
301 if (!Date::parse(dest.date, date)) return false;
302
303 std::string_view time = time_and_tzd.substr(0, 8);
304 std::string_view tzd = time_and_tzd.substr(8);
305 if (!Time::parse(dest.time, time)) return false;
306 return Tzd::parse(dest.off, tzd);
307 }
308
309 std::string toString() const {
310 return date.toString() + "T" + time.toString() + off.toString();
311 }
312
313 std::chrono::seconds toUnixSeconds() const {
314 std::chrono::year_month_day ymd(std::chrono::year(date.year),
315 std::chrono::month(date.month),
316 std::chrono::day(date.day));
317 std::chrono::sys_days sys_days(ymd);
318 std::chrono::time_point<std::chrono::utc_clock, std::chrono::days> utc_days(sys_days.time_since_epoch());
319 std::chrono::utc_seconds utc_seconds = std::chrono::time_point_cast<std::chrono::seconds>(utc_days);
320 utc_seconds += std::chrono::hours(time.hour) + std::chrono::minutes(time.minute) +
321 std::chrono::seconds(time.second) - std::chrono::minutes(off.minutes);
322 std::chrono::sys_seconds sys_seconds = std::chrono::utc_clock::to_sys(utc_seconds);
323 std::chrono::seconds unix = sys_seconds.time_since_epoch();
324 return unix;
325 }
326};
327
328static const std::string_view TMI8_XML_NS = "http://bison.connekt.nl/tmi8/kv6/msg";
329
330enum Kv6RecordType {
331 KV6T_UNKNOWN = 0,
332 KV6T_DELAY = 1,
333 KV6T_INIT = 2,
334 KV6T_ARRIVAL = 3,
335 KV6T_ON_STOP = 4,
336 KV6T_DEPARTURE = 5,
337 KV6T_ON_ROUTE = 6,
338 KV6T_ON_PATH = 7,
339 KV6T_OFF_ROUTE = 8,
340 KV6T_END = 9,
341 // Always keep this updated to correspond to the
342 // first and last elements of the enumeration!
343 _KV6T_FIRST_TYPE = KV6T_UNKNOWN,
344 _KV6T_LAST_TYPE = KV6T_END,
345};
346
347enum Kv6Field {
348 KV6F_NONE = 0,
349 KV6F_DATA_OWNER_CODE = 1,
350 KV6F_LINE_PLANNING_NUMBER = 2,
351 KV6F_OPERATING_DAY = 4,
352 KV6F_JOURNEY_NUMBER = 8,
353 KV6F_REINFORCEMENT_NUMBER = 16,
354 KV6F_TIMESTAMP = 32,
355 KV6F_SOURCE = 64,
356 KV6F_PUNCTUALITY = 128,
357 KV6F_USER_STOP_CODE = 256,
358 KV6F_PASSAGE_SEQUENCE_NUMBER = 512,
359 KV6F_VEHICLE_NUMBER = 1024,
360 KV6F_BLOCK_CODE = 2048,
361 KV6F_WHEELCHAIR_ACCESSIBLE = 4096,
362 KV6F_NUMBER_OF_COACHES = 8192,
363 KV6F_RD_Y = 16384,
364 KV6F_RD_X = 32768,
365 KV6F_DISTANCE_SINCE_LAST_USER_STOP = 65536,
366};
367
368static constexpr Kv6Field KV6T_REQUIRED_FIELDS[_KV6T_LAST_TYPE + 1] = {
369 // KV6T_UNKNOWN
370 KV6F_NONE,
371 // KV6T_DELAY
372 static_cast<Kv6Field>(
373 KV6F_DATA_OWNER_CODE
374 | KV6F_LINE_PLANNING_NUMBER
375 | KV6F_OPERATING_DAY
376 | KV6F_JOURNEY_NUMBER
377 | KV6F_REINFORCEMENT_NUMBER
378 | KV6F_TIMESTAMP
379 | KV6F_SOURCE
380 | KV6F_PUNCTUALITY),
381 // KV6T_INIT
382 static_cast<Kv6Field>(
383 KV6F_DATA_OWNER_CODE
384 | KV6F_LINE_PLANNING_NUMBER
385 | KV6F_OPERATING_DAY
386 | KV6F_JOURNEY_NUMBER
387 | KV6F_REINFORCEMENT_NUMBER
388 | KV6F_TIMESTAMP
389 | KV6F_SOURCE
390 | KV6F_USER_STOP_CODE
391 | KV6F_PASSAGE_SEQUENCE_NUMBER
392 | KV6F_VEHICLE_NUMBER
393 | KV6F_BLOCK_CODE
394 | KV6F_WHEELCHAIR_ACCESSIBLE
395 | KV6F_NUMBER_OF_COACHES),
396 // KV6T_ARRIVAL
397 static_cast<Kv6Field>(
398 KV6F_DATA_OWNER_CODE
399 | KV6F_LINE_PLANNING_NUMBER
400 | KV6F_OPERATING_DAY
401 | KV6F_JOURNEY_NUMBER
402 | KV6F_REINFORCEMENT_NUMBER
403 | KV6F_USER_STOP_CODE
404 | KV6F_PASSAGE_SEQUENCE_NUMBER
405 | KV6F_TIMESTAMP
406 | KV6F_SOURCE
407 | KV6F_VEHICLE_NUMBER
408 | KV6F_PUNCTUALITY),
409 // KV6T_ON_STOP
410 static_cast<Kv6Field>(
411 KV6F_DATA_OWNER_CODE
412 | KV6F_LINE_PLANNING_NUMBER
413 | KV6F_OPERATING_DAY
414 | KV6F_JOURNEY_NUMBER
415 | KV6F_REINFORCEMENT_NUMBER
416 | KV6F_USER_STOP_CODE
417 | KV6F_PASSAGE_SEQUENCE_NUMBER
418 | KV6F_TIMESTAMP
419 | KV6F_SOURCE
420 | KV6F_VEHICLE_NUMBER
421 | KV6F_PUNCTUALITY),
422 // KV6T_DEPARTURE
423 static_cast<Kv6Field>(
424 KV6F_DATA_OWNER_CODE
425 | KV6F_LINE_PLANNING_NUMBER
426 | KV6F_OPERATING_DAY
427 | KV6F_JOURNEY_NUMBER
428 | KV6F_REINFORCEMENT_NUMBER
429 | KV6F_USER_STOP_CODE
430 | KV6F_PASSAGE_SEQUENCE_NUMBER
431 | KV6F_TIMESTAMP
432 | KV6F_SOURCE
433 | KV6F_VEHICLE_NUMBER
434 | KV6F_PUNCTUALITY),
435 // KV6T_ON_ROUTE
436 static_cast<Kv6Field>(
437 KV6F_DATA_OWNER_CODE
438 | KV6F_LINE_PLANNING_NUMBER
439 | KV6F_OPERATING_DAY
440 | KV6F_JOURNEY_NUMBER
441 | KV6F_REINFORCEMENT_NUMBER
442 | KV6F_USER_STOP_CODE
443 | KV6F_PASSAGE_SEQUENCE_NUMBER
444 | KV6F_TIMESTAMP
445 | KV6F_SOURCE
446 | KV6F_VEHICLE_NUMBER
447 | KV6F_PUNCTUALITY
448 | KV6F_RD_X
449 | KV6F_RD_Y),
450 // KV6T_ON_PATH
451 KV6F_NONE,
452 // KV6T_OFF_ROUTE
453 static_cast<Kv6Field>(
454 KV6F_DATA_OWNER_CODE
455 | KV6F_LINE_PLANNING_NUMBER
456 | KV6F_OPERATING_DAY
457 | KV6F_JOURNEY_NUMBER
458 | KV6F_REINFORCEMENT_NUMBER
459 | KV6F_TIMESTAMP
460 | KV6F_SOURCE
461 | KV6F_USER_STOP_CODE
462 | KV6F_PASSAGE_SEQUENCE_NUMBER
463 | KV6F_VEHICLE_NUMBER
464 | KV6F_RD_X
465 | KV6F_RD_Y),
466 // KV6T_END
467 static_cast<Kv6Field>(
468 KV6F_DATA_OWNER_CODE
469 | KV6F_LINE_PLANNING_NUMBER
470 | KV6F_OPERATING_DAY
471 | KV6F_JOURNEY_NUMBER
472 | KV6F_REINFORCEMENT_NUMBER
473 | KV6F_TIMESTAMP
474 | KV6F_SOURCE
475 | KV6F_USER_STOP_CODE
476 | KV6F_PASSAGE_SEQUENCE_NUMBER
477 | KV6F_VEHICLE_NUMBER),
478};
479
480static constexpr Kv6Field KV6T_OPTIONAL_FIELDS[_KV6T_LAST_TYPE + 1] = {
481 // KV6T_UNKNOWN
482 KV6F_NONE,
483 // KV6T_DELAY
484 KV6F_NONE,
485 // KV6T_INIT
486 KV6F_NONE,
487 // KV6T_ARRIVAL
488 static_cast<Kv6Field>(KV6F_RD_X | KV6F_RD_Y),
489 // KV6T_ON_STOP
490 static_cast<Kv6Field>(KV6F_RD_X | KV6F_RD_Y),
491 // KV6T_DEPARTURE
492 static_cast<Kv6Field>(KV6F_RD_X | KV6F_RD_Y),
493 // KV6T_ON_ROUTE
494 KV6F_DISTANCE_SINCE_LAST_USER_STOP,
495 // KV6T_ON_PATH
496 KV6F_NONE,
497 // KV6T_OFF_ROUTE
498 KV6F_NONE,
499 // KV6T_END
500 KV6F_NONE,
501};
502
503struct Kv6Record {
504 Kv6RecordType type = KV6T_UNKNOWN;
505 Kv6Field presence = KV6F_NONE;
506 Kv6Field next = KV6F_NONE;
507 std::string data_owner_code;
508 std::string line_planning_number;
509 std::string source;
510 std::string user_stop_code;
511 std::string wheelchair_accessible;
512 Date operating_day;
513 Timestamp timestamp;
514 uint32_t block_code = 0;
515 uint32_t journey_number = 0;
516 uint32_t vehicle_number = 0;
517 int32_t rd_x = 0;
518 int32_t rd_y = 0;
519 // The TMI8 specification is unclear: this field
520 // might actually be called distancesincelaststop
521 uint32_t distance_since_last_user_stop = 0;
522 uint16_t passage_sequence_number = 0;
523 int16_t punctuality = 0;
524 uint8_t number_of_coaches = 0;
525 uint8_t reinforcement_number = 0;
526
527 void markPresent(Kv6Field field) {
528 presence = static_cast<Kv6Field>(presence | field);
529 }
530
531 void removeUnsupportedFields() {
532 Kv6Field required_fields = KV6T_REQUIRED_FIELDS[type];
533 Kv6Field optional_fields = KV6T_OPTIONAL_FIELDS[type];
534 Kv6Field supported_fields = static_cast<Kv6Field>(required_fields | optional_fields);
535 presence = static_cast<Kv6Field>(presence & supported_fields);
536 }
537
538 bool valid() {
539 Kv6Field required_fields = KV6T_REQUIRED_FIELDS[type];
540 Kv6Field optional_fields = KV6T_OPTIONAL_FIELDS[type];
541 Kv6Field supported_fields = static_cast<Kv6Field>(required_fields | optional_fields);
542
543 Kv6Field required_field_presence = static_cast<Kv6Field>(presence & required_fields);
544 Kv6Field unsupported_field_presence = static_cast<Kv6Field>(presence & ~supported_fields);
545
546 return required_field_presence == required_fields && !unsupported_field_presence;
547 }
548};
549
550enum Tmi8VvTmPushInfoField {
551 TMI8F_NONE = 0,
552 TMI8F_SUBSCRIBER_ID = 1,
553 TMI8F_VERSION = 2,
554 TMI8F_DOSSIER_NAME = 4,
555 TMI8F_TIMESTAMP = 8,
556};
557
558struct Tmi8VvTmPushInfo {
559 Tmi8VvTmPushInfoField next = TMI8F_NONE;
560 Tmi8VvTmPushInfoField presence = TMI8F_NONE;
561 std::string subscriber_id;
562 std::string version;
563 std::string dossier_name;
564 Timestamp timestamp;
565 std::vector<Kv6Record> messages;
566
567 void markPresent(Tmi8VvTmPushInfoField field) {
568 presence = static_cast<Tmi8VvTmPushInfoField>(presence | field);
569 }
570
571 bool valid() {
572 const Tmi8VvTmPushInfoField REQUIRED_FIELDS =
573 static_cast<Tmi8VvTmPushInfoField>(
574 TMI8F_SUBSCRIBER_ID
575 | TMI8F_VERSION
576 | TMI8F_DOSSIER_NAME
577 | TMI8F_TIMESTAMP);
578 return (presence & REQUIRED_FIELDS) == REQUIRED_FIELDS;
579 }
580};
581
582static const std::array<std::string_view, _KV6T_LAST_TYPE + 1> KV6_POS_INFO_RECORD_TYPES = {
583 "UNKNOWN", "DELAY", "INIT", "ARRIVAL", "ONSTOP", "DEPARTURE", "ONROUTE", "ONPATH", "OFFROUTE", "END",
584};
585
586std::optional<std::string_view> findKv6PosInfoRecordTypeName(Kv6RecordType type) {
587 if (type > _KV6T_LAST_TYPE)
588 return std::nullopt;
589 return KV6_POS_INFO_RECORD_TYPES[type];
590}
591
592const std::array<std::tuple<std::string_view, Kv6Field>, 17> KV6_POS_INFO_RECORD_FIELDS = {{
593 { "dataownercode", KV6F_DATA_OWNER_CODE },
594 { "lineplanningnumber", KV6F_LINE_PLANNING_NUMBER },
595 { "operatingday", KV6F_OPERATING_DAY },
596 { "journeynumber", KV6F_JOURNEY_NUMBER },
597 { "reinforcementnumber", KV6F_REINFORCEMENT_NUMBER },
598 { "timestamp", KV6F_TIMESTAMP },
599 { "source", KV6F_SOURCE },
600 { "punctuality", KV6F_PUNCTUALITY },
601 { "userstopcode", KV6F_USER_STOP_CODE },
602 { "passagesequencenumber", KV6F_PASSAGE_SEQUENCE_NUMBER },
603 { "vehiclenumber", KV6F_VEHICLE_NUMBER },
604 { "blockcode", KV6F_BLOCK_CODE },
605 { "wheelchairaccessible", KV6F_WHEELCHAIR_ACCESSIBLE },
606 { "numberofcoaches", KV6F_NUMBER_OF_COACHES },
607 { "rd-y", KV6F_RD_Y },
608 { "rd-x", KV6F_RD_X },
609 { "distancesincelastuserstop", KV6F_DISTANCE_SINCE_LAST_USER_STOP },
610}};
611
612// Returns the maximum amount of digits such that it is guaranteed that
613// a corresponding amount of repeated 9's can be represented by the type.
614template<std::integral T>
615constexpr size_t maxDigits() {
616 size_t digits = 0;
617 for (T x = std::numeric_limits<T>::max(); x != 0; x /= 10) digits++;
618 return digits - 1;
619}
620
621template<size_t MaxDigits, std::unsigned_integral T>
622constexpr bool parseUnsigned(T &out, std::string_view src) {
623 static_assert(MaxDigits <= maxDigits<T>());
624 if (src.size() > MaxDigits) return false;
625 T res = 0;
626 while (src.size() > 0) {
627 if (src[0] < '0' || src[0] > '9') return false;
628 res = static_cast<T>(res * 10 + src[0] - '0');
629 src = src.substr(1);
630 }
631 out = res;
632 return true;
633}
634
635template<size_t MaxDigits, std::signed_integral T>
636constexpr bool parseSigned(T &out, std::string_view src) {
637 static_assert(MaxDigits <= maxDigits<T>());
638 if (src.size() == 0) return false;
639 bool negative = src[0] == '-';
640 if (negative) src = src.substr(1);
641 if (src.size() > MaxDigits) return false;
642 T res = 0;
643 while (src.size() > 0) {
644 if (src[0] < '0' || src[0] > '9') return false;
645 res = static_cast<T>(res * 10 + src[0] - '0');
646 src = src.substr(1);
647 }
648 out = negative ? -res : res;
649 return true;
650}
651
652struct Xmlns {
653 const Xmlns *next;
654 std::string_view prefix;
655 std::string_view url;
656};
657
658std::optional<std::string_view> resolve(std::string_view prefix, const Xmlns *nss) {
659 while (nss)
660 if (nss->prefix == prefix)
661 return nss->url;
662 else
663 nss = nss->next;
664 return std::nullopt;
665}
666
667template<typename T>
668void withXmlnss(const rapidxml::xml_attribute<> *attr, const Xmlns *nss, const T &fn) {
669 while (attr) {
670 std::string_view name(attr->name(), attr->name_size());
671 if (name.starts_with("xmlns")) {
672 if (name.size() == 5) { // just xmlns
673 Xmlns ns0 = {
674 .next = nss,
675 .url = std::string_view(attr->value(), attr->value_size()),
676 };
677 withXmlnss(attr->next_attribute(), &ns0, fn);
678 return;
679 } else if (name.size() > 6 && name[5] == ':') { // xmlns:<something>
680 Xmlns ns0 = {
681 .next = nss,
682 .prefix = name.substr(6),
683 .url = std::string_view(attr->value(), attr->value_size()),
684 };
685 withXmlnss(attr->next_attribute(), &ns0, fn);
686 return;
687 }
688 }
689 attr = attr->next_attribute();
690 }
691 fn(nss);
692}
693
694template<typename T>
695void ifResolvable(const rapidxml::xml_node<> &node, const Xmlns *nss, const T &fn) {
696 std::string_view name(node.name(), node.name_size());
697 std::string_view ns;
698 size_t colon = name.find(':');
699
700 if (colon != std::string_view::npos) {
701 if (colon >= name.size() - 1) // last character
702 return;
703 ns = name.substr(0, colon);
704 name = name.substr(colon + 1);
705 }
706
707 withXmlnss(node.first_attribute(), nss, [&](const Xmlns *nss) {
708 std::optional<std::string_view> ns_url = resolve(ns, nss);
709 if (!ns_url && !ns.empty()) return;
710 if (!ns_url) fn(std::string_view(), name, nss);
711 else fn(*ns_url, name, nss);
712 });
713}
714
715template<typename T>
716void ifTmi8Element(const rapidxml::xml_node<> &node, const Xmlns *nss, const T &fn) {
717 ifResolvable(node, nss, [&](std::string_view ns_url, std::string_view name, const Xmlns *nss) {
718 if (node.type() == rapidxml::node_element && (ns_url.empty() || ns_url == TMI8_XML_NS)) fn(name, nss);
719 });
720}
721
722bool onlyTextElement(const rapidxml::xml_node<> &node) {
723 return node.type() == rapidxml::node_element
724 && node.first_node()
725 && node.first_node() == node.last_node()
726 && node.first_node()->type() == rapidxml::node_data;
727}
728
729std::string_view getValue(const rapidxml::xml_node<> &node) {
730 return std::string_view(node.value(), node.value_size());
731}
732
733bool parseStringValue(std::string &into, size_t max_len, std::string_view val) {
734 if (val.size() > max_len)
735 return false;
736 into = val;
737 return true;
738}
739
740struct Kv6Parser {
741 std::stringstream &errs;
742 std::stringstream &warns;
743
744 void error(std::string_view msg) {
745 errs << msg << '\n';
746 }
747
748 void warn(std::string_view msg) {
749 warns << msg << '\n';
750 }
751
752#define PERRASSERT(msg, ...) do { if (!(__VA_ARGS__)) { error(msg); return; } } while (false)
753#define PWARNASSERT(msg, ...) do { if (!(__VA_ARGS__)) { warn(msg); return; } } while (false)
754
755 std::optional<Kv6Record> parseKv6PosInfoRecord(Kv6RecordType type, const rapidxml::xml_node<> &node, const Xmlns *nss) {
756 Kv6Record fields = { .type = type };
757 for (const rapidxml::xml_node<> *child = node.first_node(); child; child = child->next_sibling()) {
758 ifTmi8Element(*child, nss, [&](std::string_view name, const Xmlns *nss) {
759 for (const auto &[fname, field] : KV6_POS_INFO_RECORD_FIELDS) {
760 if (field == KV6F_NONE)
761 continue;
762 if (fname == name) {
763 PWARNASSERT("Expected KV6 record field element to only contain data",
764 onlyTextElement(*child));
765 std::string_view childval = getValue(*child);
766 switch (field) {
767 case KV6F_DATA_OWNER_CODE:
768 PWARNASSERT("Invalid value for dataownercode",
769 parseStringValue(fields.data_owner_code, 10, childval));
770 break;
771 case KV6F_LINE_PLANNING_NUMBER:
772 PWARNASSERT("Invalid value for lineplanningnumber",
773 parseStringValue(fields.line_planning_number, 10, childval));
774 break;
775 case KV6F_OPERATING_DAY:
776 PWARNASSERT("Invalid value for operatatingday: not a valid date",
777 Date::parse(fields.operating_day, childval));
778 break;
779 case KV6F_JOURNEY_NUMBER:
780 PWARNASSERT("Invalid value for journeynumber:"
781 " not a valid unsigned number with at most six digits",
782 parseUnsigned<6>(fields.journey_number, childval));
783 break;
784 case KV6F_REINFORCEMENT_NUMBER:
785 PWARNASSERT("Invalid value for reinforcementnumber:"
786 " not a valid unsigned number with at most two digits",
787 parseUnsigned<2>(fields.reinforcement_number, childval));
788 break;
789 case KV6F_TIMESTAMP:
790 PWARNASSERT("Invalid value for timestamp: not a valid timestamp",
791 Timestamp::parse(fields.timestamp, childval));
792 break;
793 case KV6F_SOURCE:
794 PWARNASSERT("Invalid value for source:"
795 " not a valid string of at most 10 bytes",
796 parseStringValue(fields.source, 10, childval));
797 break;
798 case KV6F_PUNCTUALITY:
799 PWARNASSERT("Invalid value for punctuality:"
800 " not a valid signed number with at most four digits",
801 parseSigned<4>(fields.punctuality, childval));
802 break;
803 case KV6F_USER_STOP_CODE:
804 PWARNASSERT("Invalid value for userstopcode:"
805 " not a valid string of at most 10 bytes",
806 parseStringValue(fields.user_stop_code, 10, childval));
807 break;
808 case KV6F_PASSAGE_SEQUENCE_NUMBER:
809 PWARNASSERT("Invalid value for passagesequencenumber:"
810 " not a valid unsigned number with at most four digits",
811 parseUnsigned<4>(fields.passage_sequence_number, childval));
812 break;
813 case KV6F_VEHICLE_NUMBER:
814 PWARNASSERT("Invalid value for vehiclenumber:"
815 " not a valid unsigned number with at most six digits",
816 parseUnsigned<6>(fields.vehicle_number, childval));
817 break;
818 case KV6F_BLOCK_CODE:
819 PWARNASSERT("Invalid value for blockcode:"
820 " not a valid unsigned number with at most eight digits",
821 parseUnsigned<8>(fields.block_code, childval));
822 break;
823 case KV6F_WHEELCHAIR_ACCESSIBLE:
824 PWARNASSERT("Invalid value for wheelchairaccessible:"
825 " not a valid value for wheelchair accessibility",
826 childval == "ACCESSIBLE"
827 || childval == "NOTACCESSIBLE"
828 || childval == "UNKNOWN");
829 fields.wheelchair_accessible = childval;
830 break;
831 case KV6F_NUMBER_OF_COACHES:
832 PWARNASSERT("Invalid for numberofcoaches:"
833 " not a valid unsigned number with at most two digits",
834 parseUnsigned<2>(fields.number_of_coaches, childval));
835 break;
836 case KV6F_RD_X:
837 PWARNASSERT("Invalid value for rd-x:"
838 " not a valid signed number with at most six digits",
839 parseSigned<6>(fields.rd_x, childval));
840 break;
841 case KV6F_RD_Y:
842 PWARNASSERT("Invalid value for rd-y:"
843 " not a valid signed number with at most six digits",
844 parseSigned<6>(fields.rd_y, childval));
845 break;
846 case KV6F_DISTANCE_SINCE_LAST_USER_STOP:
847 PWARNASSERT("Invalid value for distancesincelastuserstop:"
848 " not a valid unsigned number with at most five digits",
849 parseUnsigned<5>(fields.distance_since_last_user_stop, childval));
850 break;
851 case KV6F_NONE:
852 error("NONE field type case should be unreachable in parseKv6PosInfoRecord");
853 return;
854 }
855 fields.markPresent(field);
856 break;
857 }
858 }
859 });
860 }
861
862 fields.removeUnsupportedFields();
863
864 if (!fields.valid())
865 return std::nullopt;
866 return fields;
867 }
868
869 std::vector<Kv6Record> parseKv6PosInfo(const rapidxml::xml_node<> &node, const Xmlns *nss) {
870 std::vector<Kv6Record> records;
871 for (const rapidxml::xml_node<> *child = node.first_node(); child; child = child->next_sibling()) {
872 ifTmi8Element(*child, nss, [&](std::string_view name, const Xmlns *nss) {
873 for (auto type = _KV6T_FIRST_TYPE;
874 type != _KV6T_LAST_TYPE;
875 type = static_cast<Kv6RecordType>(type + 1)) {
876 if (type == KV6T_UNKNOWN)
877 continue;
878 if (KV6_POS_INFO_RECORD_TYPES[type] == name) {
879 auto record = parseKv6PosInfoRecord(type, *child, nss);
880 if (record) {
881 records.push_back(*record);
882 }
883 }
884 }
885 });
886 }
887 return records;
888 }
889
890 std::optional<Tmi8VvTmPushInfo> parseVvTmPush(const rapidxml::xml_node<> &node, const Xmlns *nss) {
891 Tmi8VvTmPushInfo info;
892 for (const rapidxml::xml_node<> *child = node.first_node(); child; child = child->next_sibling()) {
893 ifTmi8Element(*child, nss, [&](std::string_view name, const Xmlns *nss) {
894 if (name == "Timestamp") {
895 PERRASSERT("Invalid value for Timestamp: Bad format", onlyTextElement(*child));
896 PERRASSERT("Invalid value for Timestamp: Invalid timestamp", Timestamp::parse(info.timestamp, getValue(*child)));
897 info.markPresent(TMI8F_TIMESTAMP);
898 } else if (name == "SubscriberID") {
899 PERRASSERT("Invalid value for SubscriberID: Bad format", onlyTextElement(*child));
900 info.subscriber_id = getValue(*child);
901 info.markPresent(TMI8F_SUBSCRIBER_ID);
902 } else if (name == "Version") {
903 PERRASSERT("Invalid value for Version: Bad format", onlyTextElement(*child));
904 info.version = getValue(*child);
905 info.markPresent(TMI8F_VERSION);
906 } else if (name == "DossierName") {
907 PERRASSERT("Invalid value for DossierName: Bad format", onlyTextElement(*child));
908 info.dossier_name = getValue(*child);
909 info.markPresent(TMI8F_DOSSIER_NAME);
910 } else if (name == "KV6posinfo") {
911 info.messages = parseKv6PosInfo(*child, nss);
912 }
913 });
914 }
915
916 if (!info.valid())
917 return std::nullopt;
918 return info;
919 }
920
921 std::optional<Tmi8VvTmPushInfo> parse(const rapidxml::xml_document<> &doc) {
922 std::optional<Tmi8VvTmPushInfo> msg;
923 for (const rapidxml::xml_node<> *node = doc.first_node(); node; node = node->next_sibling()) {
924 ifTmi8Element(*node, nullptr /* nss */, [&](std::string_view name, const Xmlns *nss) {
925 if (name == "VV_TM_PUSH") {
926 if (msg) {
927 error("Duplicated VV_TM_PUSH");
928 return;
929 }
930 msg = parseVvTmPush(*node, nss);
931 if (!msg) {
932 error("Invalid VV_TM_PUSH");
933 }
934 }
935 });
936 }
937 if (!msg)
938 error("Expected to find VV_TM_PUSH");
939 return msg;
940 }
941};
942
943std::optional<Tmi8VvTmPushInfo> parseXml(const rapidxml::xml_document<> &doc, std::stringstream &errs, std::stringstream &warns) {
944 Kv6Parser parser = { errs, warns };
945 return parser.parse(doc);
946}
947
948struct Metrics {
949 prometheus::Counter &messages_counter_ok;
950 prometheus::Counter &messages_counter_error;
951 prometheus::Counter &messages_counter_warning;
952 prometheus::Counter &rows_written_counter;
953 prometheus::Histogram &records_hist;
954 prometheus::Histogram &message_parse_hist;
955 prometheus::Histogram &payload_size_hist;
956
957 using BucketBoundaries = prometheus::Histogram::BucketBoundaries;
958
959 enum class ParseStatus {
960 OK,
961 WARNING,
962 ERROR,
963 };
964
965 Metrics(std::shared_ptr<prometheus::Registry> registry) :
966 Metrics(registry, prometheus::BuildCounter()
967 .Name("kv6_vv_tm_push_messages_total")
968 .Help("Number of KV6 VV_TM_PUSH messages received")
969 .Register(*registry))
970 {}
971
972 void addMeasurement(std::chrono::duration<double> took_secs, size_t payload_size, size_t records, ParseStatus parsed) {
973 double millis = took_secs.count() * 1000.0;
974
975 if (parsed == ParseStatus::OK) messages_counter_ok.Increment();
976 else if (parsed == ParseStatus::WARNING) messages_counter_warning.Increment();
977 else if (parsed == ParseStatus::ERROR) messages_counter_error.Increment();
978 records_hist.Observe(static_cast<double>(records));
979 message_parse_hist.Observe(millis);
980 payload_size_hist.Observe(static_cast<double>(payload_size));
981 }
982
983 void rowsWritten(int64_t rows) {
984 rows_written_counter.Increment(static_cast<double>(rows));
985 }
986
987 private:
988 Metrics(std::shared_ptr<prometheus::Registry> registry,
989 prometheus::Family<prometheus::Counter> &messages_counter) :
990 messages_counter_ok(messages_counter
991 .Add({{ "status", "ok" }})),
992 messages_counter_error(messages_counter
993 .Add({{ "status", "error" }})),
994 messages_counter_warning(messages_counter
995 .Add({{ "status", "warning" }})),
996 rows_written_counter(prometheus::BuildCounter()
997 .Name("kv6_vv_tm_push_records_written")
998 .Help("Numer of VV_TM_PUSH records written to disk")
999 .Register(*registry)
1000 .Add({})),
1001 records_hist(prometheus::BuildHistogram()
1002 .Name("kv6_vv_tm_push_records_amount")
1003 .Help("Number of KV6 VV_TM_PUSH records")
1004 .Register(*registry)
1005 .Add({}, BucketBoundaries{ 5.0, 10.0, 20.0, 50.0, 100.0, 250.0, 500.0 })),
1006 message_parse_hist(prometheus::BuildHistogram()
1007 .Name("kv6_vv_tm_push_message_parse_millis")
1008 .Help("Milliseconds taken to parse KV6 VV_TM_PUSH messages")
1009 .Register(*registry)
1010 .Add({}, BucketBoundaries{ 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 100.0, 1000.0, 2000.0 })),
1011 payload_size_hist(prometheus::BuildHistogram()
1012 .Name("kv6_payload_size")
1013 .Help("Sizes of KV6 ZeroMQ message payloads")
1014 .Register(*registry)
1015 .Add({}, BucketBoundaries{ 500.0, 1000.0, 2500.0, 5000.0, 10000.0, 25000.0, 50000.0 }))
1016 {}
1017};
1018
1019// Note: it *must* hold that decompressed[size] == 0
1020std::optional<Tmi8VvTmPushInfo> parseMsg(char *decompressed, size_t size, Metrics &metrics, std::stringstream &errs, std::stringstream &warns) {
1021 auto start = std::chrono::steady_clock::now();
1022
1023 std::optional<Tmi8VvTmPushInfo> info;
1024
1025 if (decompressed[size] != 0) {
1026 errs << "Not parsing: missing null terminator" << '\n';
1027 } else {
1028 rapidxml::xml_document<> doc;
1029 constexpr int PARSE_FLAGS = rapidxml::parse_trim_whitespace
1030 | rapidxml::parse_no_string_terminators
1031 | rapidxml::parse_validate_closing_tags;
1032
1033 try {
1034 doc.parse<PARSE_FLAGS>(decompressed);
1035 info = parseXml(doc, errs, warns);
1036 } catch (const rapidxml::parse_error &err) {
1037 errs << "XML parsing failed" << '\n';
1038 }
1039 }
1040
1041 auto end = std::chrono::steady_clock::now();
1042 std::chrono::duration<double> took = end - start;
1043
1044 if (info)
1045 if (warns.view().empty())
1046 metrics.addMeasurement(took, size, info->messages.size(), Metrics::ParseStatus::OK);
1047 else
1048 metrics.addMeasurement(took, size, info->messages.size(), Metrics::ParseStatus::WARNING);
1049 else
1050 metrics.addMeasurement(took, size, 0, Metrics::ParseStatus::ERROR);
1051
1052 return info;
1053}
1054
1055bool terminate = false;
1056
1057void onSigIntOrTerm(int /* signum */) {
1058 terminate = true;
1059}
1060
1061arrow::Result<std::shared_ptr<arrow::Table>> getTable(const std::vector<Kv6Record> &messages, size_t &rows_written) {
1062 ParquetBuilder builder;
1063
1064 for (const auto &msg : messages) {
1065 Kv6Field present = msg.presence;
1066 Kv6Field required = KV6T_REQUIRED_FIELDS[msg.type];
1067 Kv6Field optional = KV6T_OPTIONAL_FIELDS[msg.type];
1068 if ((~msg.presence & required) != 0) {
1069 std::cout << "Invalid message: not all required fields present; skipping" << std::endl;
1070 continue;
1071 }
1072 Kv6Field used = static_cast<Kv6Field>(present & (required | optional));
1073 rows_written++;
1074
1075 // RD-X and RD-Y fix: some datatypes have these fields marked as required, but still give option
1076 // of not providing these fields by setting them to -1. We want this normalized, where these
1077 // fields are instead simply marked as not present.
1078 if ((used & KV6F_RD_X) && msg.rd_x == -1)
1079 used = static_cast<Kv6Field>(used & ~KV6F_RD_X);
1080 if ((used & KV6F_RD_Y) && msg.rd_y == -1)
1081 used = static_cast<Kv6Field>(used & ~KV6F_RD_Y);
1082
1083 ARROW_RETURN_NOT_OK(builder.types.Append(*findKv6PosInfoRecordTypeName(msg.type)));
1084 ARROW_RETURN_NOT_OK(used & KV6F_DATA_OWNER_CODE
1085 ? builder.data_owner_codes.Append(msg.data_owner_code)
1086 : builder.data_owner_codes.AppendNull());
1087 ARROW_RETURN_NOT_OK(used & KV6F_LINE_PLANNING_NUMBER
1088 ? builder.line_planning_numbers.Append(msg.line_planning_number)
1089 : builder.line_planning_numbers.AppendNull());
1090 ARROW_RETURN_NOT_OK(used & KV6F_OPERATING_DAY
1091 ? builder.operating_days.Append(static_cast<int32_t>(msg.operating_day.toUnixDays().count()))
1092 : builder.operating_days.AppendNull());
1093 ARROW_RETURN_NOT_OK(used & KV6F_JOURNEY_NUMBER
1094 ? builder.journey_numbers.Append(msg.journey_number)
1095 : builder.journey_numbers.AppendNull());
1096 ARROW_RETURN_NOT_OK(used & KV6F_REINFORCEMENT_NUMBER
1097 ? builder.reinforcement_numbers.Append(msg.reinforcement_number)
1098 : builder.reinforcement_numbers.AppendNull());
1099 ARROW_RETURN_NOT_OK(used & KV6F_TIMESTAMP
1100 ? builder.timestamps.Append(msg.timestamp.toUnixSeconds().count())
1101 : builder.timestamps.AppendNull());
1102 ARROW_RETURN_NOT_OK(used & KV6F_SOURCE
1103 ? builder.sources.Append(msg.source)
1104 : builder.sources.AppendNull());
1105 ARROW_RETURN_NOT_OK(used & KV6F_PUNCTUALITY
1106 ? builder.punctualities.Append(msg.punctuality)
1107 : builder.punctualities.AppendNull());
1108 ARROW_RETURN_NOT_OK(used & KV6F_USER_STOP_CODE
1109 ? builder.user_stop_codes.Append(msg.user_stop_code)
1110 : builder.user_stop_codes.AppendNull());
1111 ARROW_RETURN_NOT_OK(used & KV6F_PASSAGE_SEQUENCE_NUMBER
1112 ? builder.passage_sequence_numbers.Append(msg.passage_sequence_number)
1113 : builder.passage_sequence_numbers.AppendNull());
1114 ARROW_RETURN_NOT_OK(used & KV6F_VEHICLE_NUMBER
1115 ? builder.vehicle_numbers.Append(msg.vehicle_number)
1116 : builder.vehicle_numbers.AppendNull());
1117 ARROW_RETURN_NOT_OK(used & KV6F_BLOCK_CODE
1118 ? builder.block_codes.Append(msg.block_code)
1119 : builder.block_codes.AppendNull());
1120 ARROW_RETURN_NOT_OK(used & KV6F_WHEELCHAIR_ACCESSIBLE
1121 ? builder.wheelchair_accessibles.Append(msg.wheelchair_accessible)
1122 : builder.wheelchair_accessibles.AppendNull());
1123 ARROW_RETURN_NOT_OK(used & KV6F_NUMBER_OF_COACHES
1124 ? builder.number_of_coaches.Append(msg.number_of_coaches)
1125 : builder.number_of_coaches.AppendNull());
1126 ARROW_RETURN_NOT_OK(used & KV6F_RD_Y
1127 ? builder.rd_ys.Append(msg.rd_y)
1128 : builder.rd_ys.AppendNull());
1129 ARROW_RETURN_NOT_OK(used & KV6F_RD_X
1130 ? builder.rd_xs.Append(msg.rd_x)
1131 : builder.rd_xs.AppendNull());
1132 ARROW_RETURN_NOT_OK(used & KV6F_DISTANCE_SINCE_LAST_USER_STOP
1133 ? builder.distance_since_last_user_stops.Append(msg.distance_since_last_user_stop)
1134 : builder.distance_since_last_user_stops.AppendNull());
1135 }
1136
1137 return builder.getTable();
1138}
1139
1140std::tuple<int64_t, int64_t> getMinMaxTimestamp(const std::vector<Kv6Record> &messages) {
1141 if (messages.size() == 0)
1142 return { 0, 0 };
1143 int64_t min = std::numeric_limits<int64_t>::max();
1144 int64_t max = 0;
1145 for (const auto &message : messages) {
1146 if (~message.presence & KV6F_TIMESTAMP)
1147 continue;
1148 int64_t seconds = message.timestamp.toUnixSeconds().count();
1149 if (seconds < min)
1150 min = seconds;
1151 if (seconds > max)
1152 max = seconds;
1153 }
1154 if (min == std::numeric_limits<decltype(min)>::max())
1155 return { 0, 0 }; // this is stupid
1156 return { min, max };
1157}
1158
1159arrow::Status writeParquet(const std::vector<Kv6Record> &messages, Metrics &metrics) {
1160 size_t rows_written = 0;
1161 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, getTable(messages, rows_written));
1162
1163 auto timestamp = std::chrono::round<std::chrono::seconds>(std::chrono::utc_clock::now());
1164 std::string filename = std::format("oeuf-{:%FT%T%Ez}.parquet", timestamp);
1165 ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*table, filename));
1166 std::cout << "Wrote Parquet file " << filename << std::endl;
1167
1168 auto [min_timestamp, max_timestamp] = getMinMaxTimestamp(messages);
1169 std::ofstream metaf(filename + ".meta.json.part", std::ios::binary);
1170 nlohmann::json meta{
1171 { "min_timestamp", min_timestamp },
1172 { "max_timestamp", max_timestamp },
1173 { "rows_written", rows_written },
1174 };
1175 metaf << meta;
1176 metaf.close();
1177 std::filesystem::rename(filename + ".meta.json.part", filename + ".meta.json");
1178
1179 metrics.rowsWritten(rows_written);
1180
1181 return arrow::Status::OK();
1182}
1183
1184using SteadyTime = std::chrono::steady_clock::time_point;
1185
1186std::string dumpFailedMsg(std::string_view txt, std::string_view errs, std::string_view warns) {
1187 auto timestamp = std::chrono::round<std::chrono::seconds>(std::chrono::utc_clock::now());
1188 std::string filename = std::format("oeuf-error-{:%FT%T%Ez}.txt", timestamp);
1189 std::ofstream dumpf(filename, std::ios::binary);
1190 dumpf << "======= ERROR MESSAGES ========" << std::endl;
1191 dumpf << errs;
1192 dumpf << "======= WARNING MESSAGES ======" << std::endl;
1193 dumpf << warns;
1194 dumpf << "======= RECEIVED MESSAGE ======" << std::endl;
1195 dumpf << txt << std::endl;
1196 dumpf.close();
1197 return filename;
1198}
1199
1200void handleMsg(RawMessage &msg, Metrics &metrics, SteadyTime &last_output, std::vector<Kv6Record> &msg_buf) {
1201 unsigned int decompressed_size = 0;
1202 if (msg.getBodySize() > std::numeric_limits<unsigned int>::max())
1203 std::cout << "parseMsg failed due to too large message" << std::endl;
1204 char *decompressed = decompress(msg.getBody(), static_cast<unsigned int>(msg.getBodySize()), decompressed_size);
1205
1206 std::stringstream errs;
1207 std::stringstream warns;
1208 // We know that decompressed[decompressed_size] == 0 because decompress() ensures this.
1209 auto parsed_msg = parseMsg(decompressed, decompressed_size, metrics, errs, warns);
1210 if (parsed_msg) {
1211 const Tmi8VvTmPushInfo &info = *parsed_msg;
1212 auto new_msgs_it = info.messages.begin();
1213 while (new_msgs_it != info.messages.end()) {
1214 size_t remaining_space = MAX_PARQUET_CHUNK - msg_buf.size();
1215 size_t new_msgs_left = info.messages.end() - new_msgs_it;
1216 auto new_msgs_start = new_msgs_it;
1217 auto new_msgs_end = new_msgs_start + std::min(remaining_space, new_msgs_left);
1218 new_msgs_it = new_msgs_end;
1219 msg_buf.insert(msg_buf.end(), new_msgs_start, new_msgs_end);
1220
1221 bool time_expired = std::chrono::steady_clock::now() - last_output > std::chrono::minutes(5);
1222 if (msg_buf.size() >= MAX_PARQUET_CHUNK || (new_msgs_it == info.messages.end() && time_expired)) {
1223 arrow::Status status = writeParquet(msg_buf, metrics);
1224 if (!status.ok())
1225 std::cout << "Writing Parquet file failed: " << status << std::endl;
1226 msg_buf.clear();
1227 last_output = std::chrono::steady_clock::now();
1228 }
1229 }
1230 if (!errs.view().empty() || !warns.view().empty()) {
1231 std::filesystem::path dump_file = dumpFailedMsg(std::string_view(decompressed, decompressed_size), errs.str(), warns.str());
1232 std::cout << "parseMsg finished with warnings: details dumped to " << dump_file << std::endl;
1233 }
1234 } else {
1235 std::filesystem::path dump_file = dumpFailedMsg(std::string_view(decompressed, decompressed_size), errs.str(), warns.str());
1236 std::cout << "parseMsg failed: error details dumped to " << dump_file << std::endl;
1237 }
1238 free(decompressed);
1239}
1240
1241int main(int argc, char *argv[]) {
1242 std::cout << "Working directory: " << std::filesystem::current_path() << std::endl;
1243
1244 const char *metrics_addr = getenv("METRICS_ADDR");
1245 if (!metrics_addr || strlen(metrics_addr) == 0) {
1246 std::cout << "Error: no METRICS_ADDR set!" << std::endl;
1247 exit(EXIT_FAILURE);
1248 }
1249 prometheus::Exposer exposer{metrics_addr};
1250
1251 bool prod = false;
1252 const char *prod_env = getenv("NDOV_PRODUCTION");
1253 if (prod_env && strcmp(prod_env, "true") == 0) prod = true;
1254
1255 void *zmq_context = zmq_ctx_new();
1256 void *zmq_subscriber = zmq_socket(zmq_context, ZMQ_SUB);
1257 int rc = zmq_connect(zmq_subscriber, prod ? "tcp://pubsub.ndovloket.nl:7658" : "tcp://pubsub.besteffort.ndovloket.nl:7658");
1258 assert(rc == 0);
1259
1260 const char *topic = "/CXX/KV6posinfo";
1261 rc = zmq_setsockopt(zmq_subscriber, ZMQ_SUBSCRIBE, topic, strlen(topic));
1262 assert(rc == 0);
1263
1264 signal(SIGINT, onSigIntOrTerm);
1265 signal(SIGTERM, onSigIntOrTerm);
1266
1267 SteadyTime last_output = std::chrono::steady_clock::now();
1268
1269 auto registry = std::make_shared<prometheus::Registry>();
1270 Metrics metrics(registry);
1271 exposer.RegisterCollectable(registry);
1272
1273 std::vector<Kv6Record> msg_buf;
1274 while (!terminate) {
1275 std::optional<RawMessage> msg = recvMsg(zmq_subscriber);
1276 if (!msg) {
1277 if (!terminate)
1278 perror("recvMsg");
1279 continue;
1280 }
1281 handleMsg(*msg, metrics, last_output, msg_buf);
1282 }
1283
1284 std::cout << "Terminating" << std::endl;
1285 if (msg_buf.size() > 0) {
1286 arrow::Status status = writeParquet(msg_buf, metrics);
1287 if (!status.ok()) std::cout << "Writing final Parquet file failed: " << status << std::endl;
1288 else std::cout << "Final data written" << std::endl;
1289 msg_buf.clear();
1290 }
1291
1292 if (zmq_close(zmq_subscriber))
1293 perror("zmq_close");
1294 if (zmq_ctx_destroy(zmq_context))
1295 perror("zmq_ctx_destroy");
1296
1297 std::cout << "Bye" << std::endl;
1298
1299 return 0;
1300}