aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/augmentkv6/.envrc2
-rw-r--r--src/augmentkv6/Makefile21
-rw-r--r--src/augmentkv6/main.cpp510
-rw-r--r--src/bundleparquet/.envrc2
-rw-r--r--src/bundleparquet/Makefile21
-rw-r--r--src/bundleparquet/main.cpp213
-rw-r--r--src/bundleparquet/spliturl.cpp203
-rw-r--r--src/bundleparquet/spliturl.hpp11
-rw-r--r--src/filterkv6/.envrc2
-rw-r--r--src/filterkv6/Makefile21
-rw-r--r--src/filterkv6/main.cpp106
-rw-r--r--src/querykv1/.envrc2
-rw-r--r--src/querykv1/.gitignore1
-rw-r--r--src/querykv1/Makefile28
-rw-r--r--src/querykv1/cliopts.cpp456
-rw-r--r--src/querykv1/cliopts.hpp35
-rw-r--r--src/querykv1/daterange.cpp91
-rw-r--r--src/querykv1/daterange.hpp118
-rw-r--r--src/querykv1/grammar.abnf44
-rw-r--r--src/querykv1/grammar.ebnf47
-rw-r--r--src/querykv1/grammar.ebnf.bak23
-rw-r--r--src/querykv1/joparoute.cpp102
-rw-r--r--src/querykv1/joparoute.hpp13
-rw-r--r--src/querykv1/journeyinfo.cpp64
-rw-r--r--src/querykv1/journeyinfo.hpp13
-rw-r--r--src/querykv1/journeyroute.cpp96
-rw-r--r--src/querykv1/journeyroute.hpp13
-rw-r--r--src/querykv1/journeys.cpp95
-rw-r--r--src/querykv1/journeys.hpp13
-rw-r--r--src/querykv1/main.cpp198
-rw-r--r--src/querykv1/schedule.cpp63
-rw-r--r--src/querykv1/schedule.hpp13
-rw-r--r--src/recvkv6/.envrc2
-rw-r--r--src/recvkv6/Makefile21
-rw-r--r--src/recvkv6/main.cpp1300
35 files changed, 3963 insertions, 0 deletions
diff --git a/src/augmentkv6/.envrc b/src/augmentkv6/.envrc
new file mode 100644
index 0000000..694e74f
--- /dev/null
+++ b/src/augmentkv6/.envrc
@@ -0,0 +1,2 @@
1source_env ../../
2export DEVMODE=1
diff --git a/src/augmentkv6/Makefile b/src/augmentkv6/Makefile
new file mode 100644
index 0000000..cebb291
--- /dev/null
+++ b/src/augmentkv6/Makefile
@@ -0,0 +1,21 @@
1# Taken from:
2# Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide
3# for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01,
4# 2023. [Online]. Available:
5# https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html
6CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\
7 -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \
8 -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \
9 -D_GLIBCXX_ASSERTIONS \
10 -fstrict-flex-arrays=3 \
11 -fstack-clash-protection -fstack-protector-strong
12LDFLAGS=-larrow -larrow_acero -larrow_dataset -lparquet -ltmi8 -Wl,-z,defs \
13 -Wl,-z,nodlopen -Wl,-z,noexecstack \
14 -Wl,-z,relro -Wl,-z,now
15
16augmentkv6: main.cpp
17 $(CXX) -fPIE -pie -o $@ $^ $(CXXFLAGS) $(LDFLAGS)
18
19.PHONY: clean
20clean:
21 rm augmentkv6
diff --git a/src/augmentkv6/main.cpp b/src/augmentkv6/main.cpp
new file mode 100644
index 0000000..81a54d3
--- /dev/null
+++ b/src/augmentkv6/main.cpp
@@ -0,0 +1,510 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include <chrono>
4#include <cstdio>
5#include <deque>
6#include <filesystem>
7#include <format>
8#include <fstream>
9#include <iostream>
10#include <string>
11#include <string_view>
12#include <vector>
13
14#include <arrow/acero/exec_plan.h>
15#include <arrow/api.h>
16#include <arrow/compute/api.h>
17#include <arrow/dataset/api.h>
18#include <arrow/filesystem/api.h>
19#include <arrow/io/api.h>
20#include <parquet/arrow/reader.h>
21
22#include <tmi8/kv1_index.hpp>
23#include <tmi8/kv1_lexer.hpp>
24#include <tmi8/kv1_parser.hpp>
25#include <tmi8/kv1_types.hpp>
26#include <tmi8/kv6_parquet.hpp>
27
28using namespace std::string_view_literals;
29
30namespace ac = arrow::acero;
31namespace ds = arrow::dataset;
32namespace cp = arrow::compute;
33using namespace arrow;
34
35using TimingClock = std::conditional_t<
36 std::chrono::high_resolution_clock::is_steady,
37 std::chrono::high_resolution_clock,
38 std::chrono::steady_clock>;
39
40std::string readKv1() {
41 fputs("Reading KV1 from standard input\n", stderr);
42
43 char buf[4096];
44 std::string data;
45 while (!feof(stdin) && !ferror(stdin)) {
46 size_t read = fread(buf, sizeof(char), 4096, stdin);
47 data.append(buf, read);
48 }
49 if (ferror(stdin)) {
50 fputs("Error when reading from stdin\n", stderr);
51 exit(1);
52 }
53 fprintf(stderr, "Read %lu bytes\n", data.size());
54
55 return data;
56}
57
58std::vector<Kv1Token> lex() {
59 std::string data = readKv1();
60
61 auto start = TimingClock::now();
62 Kv1Lexer lexer(data);
63 lexer.lex();
64 auto end = TimingClock::now();
65
66 std::chrono::duration<double> elapsed{end - start};
67 double bytes = static_cast<double>(data.size()) / 1'000'000;
68 double speed = bytes / elapsed.count();
69
70 if (!lexer.errors.empty()) {
71 fputs("Lexer reported errors:\n", stderr);
72 for (const auto &error : lexer.errors)
73 fprintf(stderr, "- %s\n", error.c_str());
74 exit(1);
75 }
76
77 fprintf(stderr, "Got %lu tokens\n", lexer.tokens.size());
78 fprintf(stderr, "Duration: %f s\n", elapsed.count());
79 fprintf(stderr, "Speed: %f MB/s\n", speed);
80
81 return std::move(lexer.tokens);
82}
83
84bool parse(Kv1Records &into) {
85 std::vector<Kv1Token> tokens = lex();
86
87 Kv1Parser parser(tokens, into);
88 parser.parse();
89
90 bool ok = true;
91 if (!parser.gerrors.empty()) {
92 ok = false;
93 fputs("Parser reported errors:\n", stderr);
94 for (const auto &error : parser.gerrors)
95 fprintf(stderr, "- %s\n", error.c_str());
96 }
97 if (!parser.warns.empty()) {
98 fputs("Parser reported warnings:\n", stderr);
99 for (const auto &warn : parser.warns)
100 fprintf(stderr, "- %s\n", warn.c_str());
101 }
102
103 fprintf(stderr, "Parsed %lu records\n", into.size());
104
105 return ok;
106}
107
108void printParsedRecords(const Kv1Records &records) {
109 fputs("Parsed records:\n", stderr);
110 fprintf(stderr, " organizational_units: %lu\n", records.organizational_units.size());
111 fprintf(stderr, " higher_organizational_units: %lu\n", records.higher_organizational_units.size());
112 fprintf(stderr, " user_stop_points: %lu\n", records.user_stop_points.size());
113 fprintf(stderr, " user_stop_areas: %lu\n", records.user_stop_areas.size());
114 fprintf(stderr, " timing_links: %lu\n", records.timing_links.size());
115 fprintf(stderr, " links: %lu\n", records.links.size());
116 fprintf(stderr, " lines: %lu\n", records.lines.size());
117 fprintf(stderr, " destinations: %lu\n", records.destinations.size());
118 fprintf(stderr, " journey_patterns: %lu\n", records.journey_patterns.size());
119 fprintf(stderr, " concession_financer_relations: %lu\n", records.concession_financer_relations.size());
120 fprintf(stderr, " concession_areas: %lu\n", records.concession_areas.size());
121 fprintf(stderr, " financers: %lu\n", records.financers.size());
122 fprintf(stderr, " journey_pattern_timing_links: %lu\n", records.journey_pattern_timing_links.size());
123 fprintf(stderr, " points: %lu\n", records.points.size());
124 fprintf(stderr, " point_on_links: %lu\n", records.point_on_links.size());
125 fprintf(stderr, " icons: %lu\n", records.icons.size());
126 fprintf(stderr, " notices: %lu\n", records.notices.size());
127 fprintf(stderr, " notice_assignments: %lu\n", records.notice_assignments.size());
128 fprintf(stderr, " time_demand_groups: %lu\n", records.time_demand_groups.size());
129 fprintf(stderr, " time_demand_group_run_times: %lu\n", records.time_demand_group_run_times.size());
130 fprintf(stderr, " period_groups: %lu\n", records.period_groups.size());
131 fprintf(stderr, " specific_days: %lu\n", records.specific_days.size());
132 fprintf(stderr, " timetable_versions: %lu\n", records.timetable_versions.size());
133 fprintf(stderr, " public_journeys: %lu\n", records.public_journeys.size());
134 fprintf(stderr, " period_group_validities: %lu\n", records.period_group_validities.size());
135 fprintf(stderr, " exceptional_operating_days: %lu\n", records.exceptional_operating_days.size());
136 fprintf(stderr, " schedule_versions: %lu\n", records.schedule_versions.size());
137 fprintf(stderr, " public_journey_passing_times: %lu\n", records.public_journey_passing_times.size());
138 fprintf(stderr, " operating_days: %lu\n", records.operating_days.size());
139}
140
141void printIndexSize(const Kv1Index &index) {
142 fputs("Index size:\n", stderr);
143 fprintf(stderr, " organizational_units: %lu\n", index.organizational_units.size());
144 fprintf(stderr, " user_stop_points: %lu\n", index.user_stop_points.size());
145 fprintf(stderr, " user_stop_areas: %lu\n", index.user_stop_areas.size());
146 fprintf(stderr, " timing_links: %lu\n", index.timing_links.size());
147 fprintf(stderr, " links: %lu\n", index.links.size());
148 fprintf(stderr, " lines: %lu\n", index.lines.size());
149 fprintf(stderr, " destinations: %lu\n", index.destinations.size());
150 fprintf(stderr, " journey_patterns: %lu\n", index.journey_patterns.size());
151 fprintf(stderr, " concession_financer_relations: %lu\n", index.concession_financer_relations.size());
152 fprintf(stderr, " concession_areas: %lu\n", index.concession_areas.size());
153 fprintf(stderr, " financers: %lu\n", index.financers.size());
154 fprintf(stderr, " journey_pattern_timing_links: %lu\n", index.journey_pattern_timing_links.size());
155 fprintf(stderr, " points: %lu\n", index.points.size());
156 fprintf(stderr, " point_on_links: %lu\n", index.point_on_links.size());
157 fprintf(stderr, " icons: %lu\n", index.icons.size());
158 fprintf(stderr, " notices: %lu\n", index.notices.size());
159 fprintf(stderr, " time_demand_groups: %lu\n", index.time_demand_groups.size());
160 fprintf(stderr, " time_demand_group_run_times: %lu\n", index.time_demand_group_run_times.size());
161 fprintf(stderr, " period_groups: %lu\n", index.period_groups.size());
162 fprintf(stderr, " specific_days: %lu\n", index.specific_days.size());
163 fprintf(stderr, " timetable_versions: %lu\n", index.timetable_versions.size());
164 fprintf(stderr, " public_journeys: %lu\n", index.public_journeys.size());
165 fprintf(stderr, " period_group_validities: %lu\n", index.period_group_validities.size());
166 fprintf(stderr, " exceptional_operating_days: %lu\n", index.exceptional_operating_days.size());
167 fprintf(stderr, " schedule_versions: %lu\n", index.schedule_versions.size());
168 fprintf(stderr, " public_journey_passing_times: %lu\n", index.public_journey_passing_times.size());
169 fprintf(stderr, " operating_days: %lu\n", index.operating_days.size());
170}
171
172struct BasicJourneyKey {
173 std::string data_owner_code;
174 std::string line_planning_number;
175 int journey_number;
176
177 auto operator<=>(const BasicJourneyKey &) const = default;
178};
179
180size_t hash_value(const BasicJourneyKey &k) {
181 size_t seed = 0;
182
183 boost::hash_combine(seed, k.data_owner_code);
184 boost::hash_combine(seed, k.line_planning_number);
185 boost::hash_combine(seed, k.journey_number);
186
187 return seed;
188}
189
190using BasicJourneyKeySet = std::unordered_set<BasicJourneyKey, boost::hash<BasicJourneyKey>>;
191
192arrow::Result<BasicJourneyKeySet> basicJourneys(std::shared_ptr<arrow::Table> table) {
193 ac::TableSourceNodeOptions table_source_node_options(table);
194 ac::Declaration table_source("table_source", std::move(table_source_node_options));
195 auto aggregate_options = ac::AggregateNodeOptions{
196 /* .aggregates = */ {},
197 /* .keys = */ { "data_owner_code", "line_planning_number", "journey_number" },
198 };
199 ac::Declaration aggregate("aggregate", { std::move(table_source) }, std::move(aggregate_options));
200
201 std::shared_ptr<arrow::Table> result;
202 ARROW_ASSIGN_OR_RAISE(result, ac::DeclarationToTable(std::move(aggregate)));
203
204 std::shared_ptr<arrow::ChunkedArray> data_owner_codes = result->GetColumnByName("data_owner_code");
205 std::shared_ptr<arrow::ChunkedArray> line_planning_numbers = result->GetColumnByName("line_planning_number");
206 std::shared_ptr<arrow::ChunkedArray> journey_numbers = result->GetColumnByName("journey_number");
207
208 int i_data_owner_codes_chunk = 0;
209 int i_journey_numbers_chunk = 0;
210 int i_line_planning_numbers_chunk = 0;
211 int i_in_data_owner_codes_chunk = 0;
212 int i_in_journey_numbers_chunk = 0;
213 int i_in_line_planning_numbers_chunk = 0;
214
215 BasicJourneyKeySet journeys;
216
217 for (int64_t i = 0; i < result->num_rows(); i++) {
218 auto data_owner_codes_chunk = std::static_pointer_cast<arrow::StringArray>(data_owner_codes->chunk(i_data_owner_codes_chunk));
219 auto line_planning_numbers_chunk = std::static_pointer_cast<arrow::StringArray>(line_planning_numbers->chunk(i_line_planning_numbers_chunk));
220 auto journey_numbers_chunk = std::static_pointer_cast<arrow::UInt32Array>(journey_numbers->chunk(i_journey_numbers_chunk));
221
222 std::string_view data_owner_code = data_owner_codes_chunk->Value(i_in_data_owner_codes_chunk);
223 std::string_view line_planning_number = line_planning_numbers_chunk->Value(i_in_line_planning_numbers_chunk);
224 uint32_t journey_number = journey_numbers_chunk->Value(i_in_journey_numbers_chunk);
225
226 journeys.emplace(
227 std::string(data_owner_code),
228 std::string(line_planning_number),
229 journey_number
230 );
231
232 i_in_data_owner_codes_chunk++;
233 i_in_line_planning_numbers_chunk++;
234 i_in_journey_numbers_chunk++;
235 if (i_in_data_owner_codes_chunk >= data_owner_codes_chunk->length()) {
236 i_data_owner_codes_chunk++;
237 i_in_data_owner_codes_chunk = 0;
238 }
239 if (i_in_line_planning_numbers_chunk >= line_planning_numbers_chunk->length()) {
240 i_line_planning_numbers_chunk++;
241 i_in_line_planning_numbers_chunk = 0;
242 }
243 if (i_in_journey_numbers_chunk >= journey_numbers_chunk->length()) {
244 i_journey_numbers_chunk++;
245 i_in_journey_numbers_chunk = 0;
246 }
247 }
248
249 return journeys;
250}
251
252struct DistanceKey {
253 BasicJourneyKey journey;
254 std::string last_passed_user_stop_code;
255
256 auto operator<=>(const DistanceKey &) const = default;
257};
258
259size_t hash_value(const DistanceKey &k) {
260 size_t seed = 0;
261
262 boost::hash_combine(seed, k.journey);
263 boost::hash_combine(seed, k.last_passed_user_stop_code);
264
265 return seed;
266}
267
268struct DistanceTimingLink {
269 const Kv1JourneyPatternTimingLink *jopatili;
270 double distance_since_start_of_journey = 0; // at the start of the link
271};
272
273using DistanceMap = std::unordered_map<DistanceKey, double, boost::hash<DistanceKey>>;
274
275// Returns a map, where
276// DataOwnerCode + LinePlanningNumber + JourneyNumber + UserStopCode ->
277// Distance of Last User Stop
278DistanceMap makeDistanceMap(Kv1Records &records, Kv1Index &index, BasicJourneyKeySet &journeys) {
279 std::unordered_map<
280 Kv1JourneyPattern::Key,
281 std::vector<DistanceTimingLink>,
282 boost::hash<Kv1JourneyPattern::Key>> jopatili_index;
283 std::unordered_map<
284 BasicJourneyKey,
285 const Kv1PublicJourney *,
286 boost::hash<BasicJourneyKey>> journey_index;
287 for (size_t i = 0; i < records.public_journeys.size(); i++) {
288 const Kv1PublicJourney *pujo = &records.public_journeys[i];
289
290 BasicJourneyKey journey_key(
291 pujo->key.data_owner_code,
292 pujo->key.line_planning_number,
293 pujo->key.journey_number);
294
295 if (journeys.contains(journey_key)) {
296 journey_index[journey_key] = pujo;
297
298 Kv1JourneyPattern::Key jopa_key(
299 pujo->key.data_owner_code,
300 pujo->key.line_planning_number,
301 pujo->journey_pattern_code);
302 jopatili_index[jopa_key] = {};
303 }
304 }
305
306 for (size_t i = 0; i < records.journey_pattern_timing_links.size(); i++) {
307 const Kv1JourneyPatternTimingLink *jopatili = &records.journey_pattern_timing_links[i];
308 Kv1JourneyPattern::Key jopa_key(
309 jopatili->key.data_owner_code,
310 jopatili->key.line_planning_number,
311 jopatili->key.journey_pattern_code);
312 if (jopatili_index.contains(jopa_key)) {
313 jopatili_index[jopa_key].push_back(DistanceTimingLink(jopatili, 0));
314 }
315 }
316
317 for (auto &[jopa_key, timing_links] : jopatili_index) {
318 std::sort(timing_links.begin(), timing_links.end(), [](auto a, auto b) {
319 return a.jopatili->key.timing_link_order < b.jopatili->key.timing_link_order;
320 });
321
322 const std::string transport_type = index.journey_patterns[jopa_key]->p_line->transport_type;
323
324 for (size_t i = 1; i < timing_links.size(); i++) {
325 DistanceTimingLink *timing_link = &timing_links[i];
326 DistanceTimingLink *prev_timing_link = &timing_links[i - 1];
327
328 const Kv1Link::Key link_key(
329 prev_timing_link->jopatili->key.data_owner_code,
330 prev_timing_link->jopatili->user_stop_code_begin,
331 prev_timing_link->jopatili->user_stop_code_end,
332 transport_type);
333 double link_distance = index.links[link_key]->distance;
334 timing_link->distance_since_start_of_journey =
335 prev_timing_link->distance_since_start_of_journey + link_distance;
336 }
337 }
338
339 // DataOwnerCode + LinePlanningNumber + JourneyNumber + UserStopCode ->
340 // Distance of Last User Stop
341 DistanceMap distance_map;
342
343 for (const auto &journey : journeys) {
344 const Kv1PublicJourney *pujo = journey_index[journey];
345 if (pujo == nullptr) {
346 std::cerr << "Warning: No PUJO found for [" << journey.data_owner_code << "] "
347 << journey.line_planning_number << "/" << journey.journey_number << std::endl;
348 continue;
349 }
350 Kv1JourneyPattern::Key jopa_key(
351 pujo->key.data_owner_code,
352 pujo->key.line_planning_number,
353 pujo->journey_pattern_code);
354 for (const auto &timing_link : jopatili_index[jopa_key]) {
355 DistanceKey key(journey, timing_link.jopatili->user_stop_code_begin);
356 distance_map[key] = timing_link.distance_since_start_of_journey;
357 }
358 }
359
360 return distance_map;
361}
362
363arrow::Result<std::shared_ptr<arrow::Table>> augment(
364 std::shared_ptr<arrow::Table> table,
365 const DistanceMap &distance_map
366) {
367 for (int i = 0; i < table->num_columns(); i++) {
368 if (table->column(i)->num_chunks() > 1) {
369 std::stringstream ss;
370 ss << "Error: Expected column " << i
371 << " (" << table->ColumnNames()[i] << ") to have 1 chunk, got "
372 << table->column(i)->num_chunks();
373 return arrow::Status::Invalid(ss.str());
374 }
375 }
376
377 auto data_owner_codes = std::static_pointer_cast<arrow::StringArray>(table->GetColumnByName("data_owner_code")->chunk(0));
378 auto line_planning_numbers = std::static_pointer_cast<arrow::StringArray>(table->GetColumnByName("line_planning_number")->chunk(0));
379 auto journey_numbers = std::static_pointer_cast<arrow::UInt32Array>(table->GetColumnByName("journey_number")->chunk(0));
380 auto user_stop_codes = std::static_pointer_cast<arrow::StringArray>(table->GetColumnByName("user_stop_code")->chunk(0));
381 auto distance_since_last_user_stops = std::static_pointer_cast<arrow::UInt32Array>(table->GetColumnByName("distance_since_last_user_stop")->chunk(0));
382 auto timestamps = std::static_pointer_cast<arrow::TimestampArray>(table->GetColumnByName("timestamp")->chunk(0));
383
384 auto timestamps_type = table->schema()->GetFieldByName("timestamp")->type();
385 if (timestamps_type->id() != arrow::Type::TIMESTAMP)
386 return arrow::Status::Invalid("Field 'timestamp' does not have expected type TIMESTAMP");
387 if (std::static_pointer_cast<arrow::TimestampType>(timestamps_type)->unit() != arrow::TimeUnit::MILLI)
388 return arrow::Status::Invalid("Field 'timestamp' does not have unit MILLI");
389 if (!std::static_pointer_cast<arrow::TimestampType>(timestamps_type)->timezone().empty())
390 return arrow::Status::Invalid("Field 'timestamp' should have empty time zone name");
391
392 std::shared_ptr<arrow::Field> field_distance_since_start_of_journey =
393 arrow::field("distance_since_start_of_journey", arrow::uint32());
394 std::shared_ptr<arrow::Field> field_day_of_week =
395 arrow::field("timestamp_iso_day_of_week", arrow::int64());
396 std::shared_ptr<arrow::Field> field_date =
397 arrow::field("timestamp_date", arrow::date32());
398 std::shared_ptr<arrow::Field> field_local_time =
399 arrow::field("timestamp_local_time", arrow::time32(arrow::TimeUnit::SECOND));
400 arrow::UInt32Builder distance_since_start_of_journey_builder;
401 arrow::Int64Builder day_of_week_builder;
402 arrow::Date32Builder date_builder;
403 arrow::Time32Builder local_time_builder(arrow::time32(arrow::TimeUnit::SECOND), arrow::default_memory_pool());
404
405 const std::chrono::time_zone *amsterdam = std::chrono::locate_zone("Europe/Amsterdam");
406
407 for (int64_t i = 0; i < table->num_rows(); i++) {
408 DistanceKey key(
409 BasicJourneyKey(
410 std::string(data_owner_codes->Value(i)),
411 std::string(line_planning_numbers->Value(i)),
412 journey_numbers->Value(i)),
413 std::string(user_stop_codes->Value(i)));
414
415 uint32_t distance_since_last_user_stop = distance_since_last_user_stops->Value(i);
416 if (distance_map.contains(key)) {
417 uint32_t total_distance = distance_since_last_user_stop + static_cast<uint32_t>(distance_map.at(key));
418 ARROW_RETURN_NOT_OK(distance_since_start_of_journey_builder.Append(total_distance));
419 } else {
420 ARROW_RETURN_NOT_OK(distance_since_start_of_journey_builder.AppendNull());
421 }
422
423 // Welp, this has gotten a bit complicated!
424 std::chrono::sys_seconds timestamp(std::chrono::floor<std::chrono::seconds>(std::chrono::milliseconds(timestamps->Value(i))));
425 std::chrono::zoned_seconds zoned_timestamp(amsterdam, timestamp);
426 std::chrono::local_seconds local_timestamp(zoned_timestamp);
427 std::chrono::local_days local_date = std::chrono::floor<std::chrono::days>(local_timestamp);
428 std::chrono::year_month_day date(local_date);
429 std::chrono::weekday day_of_week(local_date);
430 std::chrono::hh_mm_ss<std::chrono::seconds> time(local_timestamp - local_date);
431 std::chrono::sys_days unix_date(date);
432
433 int64_t iso_day_of_week = day_of_week.iso_encoding();
434 int32_t unix_days = static_cast<int32_t>(unix_date.time_since_epoch().count());
435 int32_t secs_since_midnight = static_cast<int32_t>(std::chrono::seconds(time).count());
436
437 ARROW_RETURN_NOT_OK(day_of_week_builder.Append(iso_day_of_week));
438 ARROW_RETURN_NOT_OK(date_builder.Append(unix_days));
439 ARROW_RETURN_NOT_OK(local_time_builder.Append(secs_since_midnight));
440 }
441
442 ARROW_ASSIGN_OR_RAISE(auto distance_since_start_of_journey_col_chunk, distance_since_start_of_journey_builder.Finish());
443 ARROW_ASSIGN_OR_RAISE(auto day_of_week_col_chunk, day_of_week_builder.Finish());
444 ARROW_ASSIGN_OR_RAISE(auto date_col_chunk, date_builder.Finish());
445 ARROW_ASSIGN_OR_RAISE(auto local_time_col_chunk, local_time_builder.Finish());
446 auto distance_since_start_of_journey_col =
447 std::make_shared<arrow::ChunkedArray>(distance_since_start_of_journey_col_chunk);
448 auto day_of_week_col = std::make_shared<arrow::ChunkedArray>(day_of_week_col_chunk);
449 auto date_col = std::make_shared<arrow::ChunkedArray>(date_col_chunk);
450 auto local_time_col = std::make_shared<arrow::ChunkedArray>(local_time_col_chunk);
451
452 ARROW_ASSIGN_OR_RAISE(table, table->AddColumn(
453 table->num_columns(),
454 field_distance_since_start_of_journey,
455 distance_since_start_of_journey_col));
456 ARROW_ASSIGN_OR_RAISE(table, table->AddColumn(table->num_columns(), field_day_of_week, day_of_week_col));
457 ARROW_ASSIGN_OR_RAISE(table, table->AddColumn(table->num_columns(), field_date, date_col));
458 ARROW_ASSIGN_OR_RAISE(table, table->AddColumn(table->num_columns(), field_local_time, local_time_col));
459
460 return table;
461}
462
463arrow::Status processTables(Kv1Records &records, Kv1Index &index) {
464 std::shared_ptr<arrow::io::RandomAccessFile> input;
465 ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open("oeuf-input.parquet"));
466
467 std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
468 ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, arrow::default_memory_pool(), &arrow_reader));
469
470 std::shared_ptr<arrow::Table> table;
471 ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table));
472
473 std::cerr << "Input KV6 file has " << table->num_rows() << " rows" << std::endl;
474 ARROW_ASSIGN_OR_RAISE(BasicJourneyKeySet journeys, basicJourneys(table));
475 std::cerr << "Found " << journeys.size() << " distinct journeys" << std::endl;
476 DistanceMap distance_map = makeDistanceMap(records, index, journeys);
477 std::cerr << "Distance map has " << distance_map.size() << " keys" << std::endl;
478
479 std::cerr << "Creating augmented table" << std::endl;
480 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> augmented, augment(table, distance_map));
481
482 std::cerr << "Writing augmented table" << std::endl;
483 return writeArrowTableAsParquetFile(*augmented, "oeuf-augmented.parquet");
484}
485
486int main(int argc, char *argv[]) {
487 Kv1Records records;
488 if (!parse(records)) {
489 fputs("Error parsing records, exiting\n", stderr);
490 return EXIT_FAILURE;
491 }
492 printParsedRecords(records);
493 fputs("Indexing...\n", stderr);
494 Kv1Index index(&records);
495 fprintf(stderr, "Indexed %lu records\n", index.size());
496 // Only notice assignments are not indexed. If this equality is not valid,
497 // then this means that we had duplicate keys or that something else went
498 // wrong. That would really not be great.
499 assert(index.size() == records.size() - records.notice_assignments.size());
500 printIndexSize(index);
501 fputs("Linking records...\n", stderr);
502 kv1LinkRecords(index);
503 fputs("Done linking\n", stderr);
504
505 arrow::Status st = processTables(records, index);
506 if (!st.ok()) {
507 std::cerr << "Failed to process tables: " << st << std::endl;
508 return EXIT_FAILURE;
509 }
510}
diff --git a/src/bundleparquet/.envrc b/src/bundleparquet/.envrc
new file mode 100644
index 0000000..694e74f
--- /dev/null
+++ b/src/bundleparquet/.envrc
@@ -0,0 +1,2 @@
1source_env ../../
2export DEVMODE=1
diff --git a/src/bundleparquet/Makefile b/src/bundleparquet/Makefile
new file mode 100644
index 0000000..170304d
--- /dev/null
+++ b/src/bundleparquet/Makefile
@@ -0,0 +1,21 @@
1# Taken from:
2# Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide
3# for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01,
4# 2023. [Online]. Available:
5# https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html
6CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\
7 -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \
8 -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \
9 -D_GLIBCXX_ASSERTIONS \
10 -fstrict-flex-arrays=3 \
11 -fstack-clash-protection -fstack-protector-strong
12LDFLAGS=-larrow -lcurl -lparquet -lprometheus-cpp-push -lprometheus-cpp-core -lz -ltmi8 -Wl,-z,defs \
13 -Wl,-z,nodlopen -Wl,-z,noexecstack \
14 -Wl,-z,relro -Wl,-z,now
15
16bundleparquet: main.cpp spliturl.cpp
17 $(CXX) -fPIE -pie -o $@ $^ $(CXXFLAGS) $(LDFLAGS)
18
19.PHONY: clean
20clean:
21 rm bundleparquet
diff --git a/src/bundleparquet/main.cpp b/src/bundleparquet/main.cpp
new file mode 100644
index 0000000..05fd881
--- /dev/null
+++ b/src/bundleparquet/main.cpp
@@ -0,0 +1,213 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include <chrono>
4#include <deque>
5#include <filesystem>
6#include <format>
7#include <fstream>
8#include <iostream>
9
10#include <arrow/api.h>
11#include <arrow/io/api.h>
12#include <parquet/arrow/reader.h>
13
14#include <nlohmann/json.hpp>
15
16#include <prometheus/counter.h>
17#include <prometheus/gateway.h>
18#include <prometheus/registry.h>
19
20#include <tmi8/kv6_parquet.hpp>
21
22#include "spliturl.hpp"
23
24static const int MIN_COMBINED_ROWS = 1000000; // one million
25static const int MAX_COMBINED_ROWS = 2000000; // two million
26
27struct FileMetadata {
28 int64_t min_timestamp = 0;
29 int64_t max_timestamp = 0;
30 int64_t rows_written = 0;
31};
32
33struct File {
34 FileMetadata metadata;
35 std::filesystem::path filename;
36};
37
38FileMetadata readMetadataOf(std::filesystem::path filename) {
39 std::string meta_filename = std::string(filename) + ".meta.json";
40 std::ifstream meta_file = std::ifstream(meta_filename, std::ifstream::in|std::ifstream::binary);
41 nlohmann::json meta_json;
42 meta_file >> meta_json;
43 FileMetadata meta = {
44 .min_timestamp = meta_json["min_timestamp"],
45 .max_timestamp = meta_json["max_timestamp"],
46 .rows_written = meta_json["rows_written"],
47 };
48 return meta;
49}
50
51arrow::Status processFirstTables(std::deque<File> &files, prometheus::Counter &rows_written) {
52 if (files.size() == 0) {
53 std::cerr << "Did not find any files" << std::endl;
54 return arrow::Status::OK();
55 }
56
57 int64_t rows = 0;
58
59 std::vector<std::shared_ptr<arrow::Table>> tables;
60 std::vector<std::filesystem::path> processed;
61 int64_t min_timestamp = std::numeric_limits<int64_t>::max();
62 int64_t max_timestamp = 0;
63
64 bool over_capacity_risk = false;
65 auto it = files.begin();
66 while (it != files.end()) {
67 const std::filesystem::path &filename = it->filename;
68 const FileMetadata &metadata = it->metadata;
69
70 std::shared_ptr<arrow::io::RandomAccessFile> input;
71 ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open(filename));
72
73 std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
74 ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, arrow::default_memory_pool(), &arrow_reader));
75
76 if (metadata.min_timestamp < min_timestamp)
77 min_timestamp = metadata.min_timestamp;
78 if (metadata.max_timestamp > max_timestamp)
79 max_timestamp = metadata.max_timestamp;
80
81 if (rows + metadata.rows_written > MAX_COMBINED_ROWS) {
82 over_capacity_risk = true;
83 break;
84 }
85
86 std::shared_ptr<arrow::Table> table;
87 ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table));
88 tables.push_back(table);
89 processed.push_back(filename);
90 rows += metadata.rows_written;
91 it = files.erase(it);
92 }
93
94 if (rows < MIN_COMBINED_ROWS && !over_capacity_risk) {
95 std::cerr << "Found files, but not enough to satisfy the minimum amount of rows for the combined file" << std::endl;
96 std::cerr << "(We have " << rows << "/" << MIN_COMBINED_ROWS << " rows at the moment, so "
97 << static_cast<float>(rows)/static_cast<float>(MIN_COMBINED_ROWS)*100.f << "%)" << std::endl;
98 return arrow::Status::OK();
99 } else if (rows == 0 && over_capacity_risk) {
100 const std::filesystem::path &filename = files.front().filename;
101 std::filesystem::rename(filename, "merged" / filename);
102 std::filesystem::rename(std::string(filename) + ".meta.json", std::string("merged" / filename) + ".meta.json");
103 rows_written.Increment(static_cast<double>(files.front().metadata.rows_written));
104 files.pop_front();
105 return arrow::Status::OK();
106 }
107
108 // Default options specify that the schemas are not unified, which is
109 // luckliy exactly what we want :)
110 std::shared_ptr<arrow::Table> merged_table;
111 ARROW_ASSIGN_OR_RAISE(merged_table, arrow::ConcatenateTables(tables));
112
113 auto timestamp = std::chrono::round<std::chrono::seconds>(std::chrono::system_clock::now());
114 std::string filename = std::format("merged/oeuf-{:%FT%T%Ez}.parquet", timestamp);
115 ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*merged_table, filename));
116
117 std::cerr << "Wrote merged table to " << filename << std::endl;
118
119 std::ofstream metaf(filename + ".meta.json.part", std::ios::binary);
120 nlohmann::json meta{
121 { "min_timestamp", min_timestamp },
122 { "max_timestamp", max_timestamp },
123 { "rows_written", rows },
124 };
125 metaf << meta;
126 metaf.close();
127 std::filesystem::rename(filename + ".meta.json.part", filename + ".meta.json");
128
129 std::cerr << "Wrote merged table metadata" << std::endl;
130 rows_written.Increment(static_cast<double>(rows));
131
132 for (const std::filesystem::path &filename : processed) {
133 std::filesystem::remove(filename);
134 std::filesystem::remove(std::string(filename) + ".meta.json");
135 }
136
137 std::cerr << "Successfully wrote merged table, metadata and deleted old files" << std::endl;
138
139 return arrow::Status::OK();
140}
141
142arrow::Status processTables(std::deque<File> &files, prometheus::Counter &rows_written) {
143 while (!files.empty())
144 ARROW_RETURN_NOT_OK(processFirstTables(files, rows_written));
145 return arrow::Status::OK();
146}
147
148int main(int argc, char *argv[]) {
149 std::filesystem::path cwd = std::filesystem::current_path();
150 std::filesystem::create_directory(cwd / "merged");
151
152 const char *prom_push_url = getenv("PROMETHEUS_PUSH_URL");
153 if (!prom_push_url || strlen(prom_push_url) == 0) {
154 std::cerr << "Error: no PROMETHEUS_PUSH_URL set!" << std::endl;
155 return EXIT_FAILURE;
156 }
157
158 std::string split_err;
159 auto split_prom_push_url = splitUrl(prom_push_url, &split_err);
160 if (!split_prom_push_url) {
161 std::cerr << "Could not process URL in environment variable PROMETHEUS_PUSH_URL: "
162 << split_err << std::endl;
163 return EXIT_FAILURE;
164 }
165 std::cout << "Prometheus Push URL: " << split_prom_push_url->schemehost << ":"
166 << split_prom_push_url->portpath << std::endl;
167
168 prometheus::Gateway gateway{split_prom_push_url->schemehost,
169 split_prom_push_url->portpath,
170 "oeuf-archiver"};
171
172 auto registry = std::make_shared<prometheus::Registry>();
173 prometheus::Gauge &rows_available = prometheus::BuildGauge()
174 .Name("archiver_rows_available")
175 .Help("Number of rows available to the archiver")
176 .Register(*registry)
177 .Add({});
178 prometheus::Counter &rows_written = prometheus::BuildCounter()
179 .Name("archiver_rows_written")
180 .Help("Number of rows written by the archiver")
181 .Register(*registry)
182 .Add({});
183 gateway.RegisterCollectable(registry);
184
185 std::deque<File> files;
186 for (auto const &dir_entry : std::filesystem::directory_iterator{cwd}) {
187 if (!dir_entry.is_regular_file()) continue;
188 std::filesystem::path filename = dir_entry.path().filename();
189 const std::string &filename_str = filename;
190 if (filename_str.starts_with("oeuf-") && filename_str.ends_with("+00:00.parquet")) {
191 try {
192 FileMetadata meta = readMetadataOf(filename);
193 File file = { .metadata = meta, .filename = filename };
194 files.push_back(file);
195
196 rows_available.Increment(static_cast<double>(meta.rows_written));
197 } catch (const std::exception &e) {
198 std::cerr << "Failed to read metadata of file " << filename << ": " << e.what() << std::endl;
199 return EXIT_FAILURE;
200 }
201 }
202 }
203
204 std::sort(files.begin(), files.end(),
205 [](const File &f1, const File &f2) { return f1.filename < f2.filename; });
206 arrow::Status st = processTables(files, rows_written);
207 if (!st.ok()) {
208 std::cerr << "Failed to process tables: " << st << std::endl;
209 return EXIT_FAILURE;
210 }
211
212 gateway.Push();
213}
diff --git a/src/bundleparquet/spliturl.cpp b/src/bundleparquet/spliturl.cpp
new file mode 100644
index 0000000..90fd821
--- /dev/null
+++ b/src/bundleparquet/spliturl.cpp
@@ -0,0 +1,203 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include <cstring>
4#include <iostream>
5#include <optional>
6#include <sstream>
7#include <string>
8
9#include <curl/curl.h>
10
11#include "spliturl.hpp"
12
13// splitUrl takes a URL of the shape '[http[s]://]HOST[:PORT][/PATH]', and
14// splits it into two URLs:
15// - scheme + host -> '[http[s]://]HOST'
16// - port + path -> '[PORT][/PATH]'
17// In case an IPv6 address is provided, the host must enclosed in square
18// brackets. The zone ID may also be indicated. Note that in the resulting
19// parts, the colon preceding the port number is omitted. This is on purpose.
20std::optional<SplitUrl> splitUrl(const std::string &url, std::string *error) {
21 std::stringstream errs;
22 std::optional<SplitUrl> result;
23 char *processed = nullptr;
24 char *scheme = nullptr;
25 char *user = nullptr;
26 char *password = nullptr;
27 char *zoneid = nullptr;
28 char *query = nullptr;
29 char *fragment = nullptr;
30 CURLU *schemehost = nullptr;
31 char *schemehost_url = nullptr;
32 char *portpath_url = nullptr;
33
34 // Parse the URL, allowing the user to omit the scheme. CURL will use 'https'
35 // by default if no scheme is specified.
36
37 CURLU *parsed = curl_url();
38 CURLUcode rc = curl_url_set(parsed, CURLUPART_URL, url.c_str(), CURLU_DEFAULT_SCHEME);
39 if (rc != CURLUE_OK) {
40 errs << "Failed to parse URL: " << curl_url_strerror(rc);
41 goto Exit;
42 }
43
44 // As we parse the URL with the option CURLU_DEFAULT_SCHEME, the CURL API
45 // won't require the user to provide the scheme part of the URL. It will
46 // automatically default the scheme to https. However, we do not usually want
47 // it to default to HTTPS, but HTTP instead (as the use case, connecting to a
48 // PushGateway server, usually is served over a private network via HTTP).
49 //
50 // This is why we check if the scheme was put there by CURL and otherwise set
51 // it to HTTP. We also check for any other schemes that the user may have
52 // provided, and reject anything that is not http/https.
53 if (!url.starts_with("http://") && !url.starts_with("https://")) {
54 rc = curl_url_get(parsed, CURLUPART_SCHEME, &scheme, 0);
55 if (rc != CURLUE_OK) {
56 errs << "Could not get scheme from parsed URL: " << curl_url_strerror(rc);
57 goto Exit;
58 }
59 if (strcmp(scheme, "https")) {
60 errs << "Unexpected scheme" << scheme << "in provided URL (expected http or https)";
61 goto Exit;
62 }
63 rc = curl_url_set(parsed, CURLUPART_SCHEME, "http", 0);
64 if (rc != CURLUE_OK) {
65 errs << "Could not set URL scheme to http: " << curl_url_strerror(rc);
66 goto Exit;
67 }
68 }
69
70 // Turn the parsed URL back into a string.
71 rc = curl_url_get(parsed, CURLUPART_URL, &processed, 0);
72 if (rc != CURLUE_OK) {
73 errs << "Failed to output parsed URL: " << curl_url_strerror(rc);
74 goto Exit;
75 }
76
77 // This part of the code checks if no prohibited parts are present in the URL
78 // (basic auth: (user, password), query, fragment).
79
80 rc = curl_url_get(parsed, CURLUPART_USER, &user, 0);
81 if (rc == CURLUE_OK && strlen(user) != 0) {
82 errs << "Provided URL should not contain a user part";
83 goto Exit;
84 } else if (rc != CURLUE_NO_USER && rc != CURLUE_OK) {
85 errs << "Failed to get check user part existence in provided url: " << curl_url_strerror(rc);
86 goto Exit;
87 }
88
89 rc = curl_url_get(parsed, CURLUPART_PASSWORD, &password, 0);
90 if (rc == CURLUE_OK && strlen(password) != 0) {
91 errs << "Provided URL should not contain a password part";
92 goto Exit;
93 } else if (rc != CURLUE_NO_PASSWORD && rc != CURLUE_OK) {
94 errs << "Failed to get check password part existence in provided url: " << curl_url_strerror(rc);
95 goto Exit;
96 }
97
98 rc = curl_url_get(parsed, CURLUPART_QUERY, &query, 0);
99 if (rc == CURLUE_OK && strlen(query) != 0) {
100 errs << "Provided URL should not contain a query part";
101 goto Exit;
102 } else if (rc != CURLUE_NO_QUERY && rc != CURLUE_OK) {
103 errs << "Failed to get check query part existence in provided url: " << curl_url_strerror(rc);
104 goto Exit;
105 }
106
107 rc = curl_url_get(parsed, CURLUPART_FRAGMENT, &fragment, 0);
108 if (rc == CURLUE_OK && strlen(fragment) != 0) {
109 errs << "Provided URL should not contain a fragment part";
110 goto Exit;
111 } else if (rc != CURLUE_NO_FRAGMENT && rc != CURLUE_OK) {
112 errs << "Failed to get check fragment part existence in provided url: " << curl_url_strerror(rc);
113 goto Exit;
114 }
115
116 // Now that we know that the provided URL makes sense, we can start doing
117 // some arts and crafts. We get started by copying the parsed URL into
118 // schemehost and simply delete all parts which are not scheme + host.
119
120 schemehost = curl_url_dup(parsed);
121
122 // CURL BUG WORKAROUND: CURLUPART_ZONEID is NOT copied by curl_url_dup!
123 // ^ fixed in CURL 8.3.0 after https://curl.se/mail/lib-2023-07/0047.html
124 rc = curl_url_get(parsed, CURLUPART_ZONEID, &zoneid, 0);
125 if (rc == CURLUE_OK) {
126 rc = curl_url_set(schemehost, CURLUPART_ZONEID, zoneid, 0);
127 if (rc != CURLUE_OK) {
128 errs << "Could not copy zone ID to duplicated URL: " << curl_url_strerror(rc);
129 goto Exit;
130 }
131 }
132 rc = curl_url_set(schemehost, CURLUPART_PORT, nullptr, 0);
133 if (rc != CURLUE_OK) {
134 errs << "Could not unset port in duplicated URL: " << curl_url_strerror(rc);
135 goto Exit;
136 }
137 rc = curl_url_set(schemehost, CURLUPART_PATH, nullptr, 0);
138 if (rc != CURLUE_OK) {
139 errs << "Could not unset path in duplicated URL: " << curl_url_strerror(rc);
140 goto Exit;
141 }
142
143 // Okay, now we have the schemehost CURLU all ready to go. Note that a URL
144 // only consisting of a scheme and host is considered valid, so CURL will be
145 // more than happy to actually turn it into a string for us. Which is exactly
146 // what we do here :)
147
148 rc = curl_url_get(schemehost, CURLUPART_URL, &schemehost_url, 0);
149 if (rc != CURLUE_OK) {
150 errs << "Could not get scheme + host URL: " << curl_url_strerror(rc);
151 goto Exit;
152 }
153
154 // Remove any trailing slash after the scheme + host URL that CURL might have
155 // put there -- we still want to get a valid URL if we paste the port + path
156 // part behind it.
157
158 if (strlen(schemehost_url) > 0) {
159 if (schemehost_url[strlen(schemehost_url) - 1] != '/') {
160 errs << "Scheme + host URL does not end with a slash";
161 goto Exit;
162 }
163 schemehost_url[strlen(schemehost_url) - 1] = '\0';
164 }
165
166 // Look, this is really gross. Because the port + path part of the URL is not
167 // a valid URL itself, but the scheme + host should be a prefix of the full
168 // URL containing the port + path, we can simply check if it is indeed a
169 // prefix, and then strip it from the full URL, giving us the port + path
170 // (after deleting the colon preceding the port).
171
172 if (!std::string_view(processed).starts_with(schemehost_url)) {
173 errs << "Scheme + host URL is not a prefix of the processed URL";
174 goto Exit;
175 }
176
177 portpath_url = processed + strlen(schemehost_url);
178 // We should not have the colon before the port, prometheus-cpp inserts it
179 if (strlen(portpath_url) > 0 && portpath_url[0] == ':') portpath_url++;
180 // We do not need a trailing slash
181 if (strlen(portpath_url) > 0 && portpath_url[strlen(portpath_url)-1] == '/')
182 portpath_url[strlen(portpath_url)-1] = '\0';
183
184 // It has been done. BLECH
185 result = std::make_optional<SplitUrl>(schemehost_url, portpath_url);
186
187Exit:
188 curl_free(processed);
189 curl_free(scheme);
190 curl_free(user);
191 curl_free(password);
192 curl_free(query);
193 curl_free(fragment);
194 curl_free(zoneid);
195 curl_free(schemehost_url);
196 curl_url_cleanup(schemehost);
197 curl_url_cleanup(parsed);
198
199 if (!result && error)
200 *error = errs.str();
201
202 return result;
203}
diff --git a/src/bundleparquet/spliturl.hpp b/src/bundleparquet/spliturl.hpp
new file mode 100644
index 0000000..d8150e0
--- /dev/null
+++ b/src/bundleparquet/spliturl.hpp
@@ -0,0 +1,11 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include <optional>
4#include <string>
5
6struct SplitUrl {
7 std::string schemehost;
8 std::string portpath;
9};
10
11std::optional<SplitUrl> splitUrl(const std::string &url, std::string *error = nullptr);
diff --git a/src/filterkv6/.envrc b/src/filterkv6/.envrc
new file mode 100644
index 0000000..694e74f
--- /dev/null
+++ b/src/filterkv6/.envrc
@@ -0,0 +1,2 @@
1source_env ../../
2export DEVMODE=1
diff --git a/src/filterkv6/Makefile b/src/filterkv6/Makefile
new file mode 100644
index 0000000..13bb38e
--- /dev/null
+++ b/src/filterkv6/Makefile
@@ -0,0 +1,21 @@
1# Taken from:
2# Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide
3# for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01,
4# 2023. [Online]. Available:
5# https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html
6CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\
7 -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \
8 -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \
9 -D_GLIBCXX_ASSERTIONS \
10 -fstrict-flex-arrays=3 \
11 -fstack-clash-protection -fstack-protector-strong
12LDFLAGS=-larrow -larrow_dataset -lparquet -ltmi8 -Wl,-z,defs \
13 -Wl,-z,nodlopen -Wl,-z,noexecstack \
14 -Wl,-z,relro -Wl,-z,now
15
16filterkv6: main.cpp
17 $(CXX) -fPIE -pie -o $@ $^ $(CXXFLAGS) $(LDFLAGS)
18
19.PHONY: clean
20clean:
21 rm filterkv6
diff --git a/src/filterkv6/main.cpp b/src/filterkv6/main.cpp
new file mode 100644
index 0000000..a32220a
--- /dev/null
+++ b/src/filterkv6/main.cpp
@@ -0,0 +1,106 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include <chrono>
4#include <deque>
5#include <filesystem>
6#include <format>
7#include <fstream>
8#include <iostream>
9
10#include <arrow/api.h>
11#include <arrow/compute/api.h>
12#include <arrow/filesystem/api.h>
13#include <arrow/dataset/api.h>
14#include <arrow/io/api.h>
15
16#include <tmi8/kv6_parquet.hpp>
17
18namespace ds = arrow::dataset;
19namespace cp = arrow::compute;
20using namespace arrow;
21
22arrow::Status processTables(std::string lineno) {
23 auto filesystem = std::make_shared<fs::LocalFileSystem>();
24
25 fs::FileSelector selector;
26 selector.base_dir = std::filesystem::current_path();
27 selector.recursive = false;
28
29 auto format = std::static_pointer_cast<ds::FileFormat>(std::make_shared<ds::ParquetFileFormat>());
30
31 ARROW_ASSIGN_OR_RAISE(auto factory,
32 ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
33 ds::FileSystemFactoryOptions()));
34
35 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
36
37 printf("Scanning dataset for line %s...\n", lineno.c_str());
38 // Read specified columns with a row filter
39 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
40 ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::and_({
41 cp::equal(cp::field_ref("line_planning_number"), cp::literal(lineno)),
42 cp::is_valid(cp::field_ref("rd_x")),
43 cp::is_valid(cp::field_ref("rd_y")),
44 })));
45
46 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
47 ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable());
48
49 puts("Finished loading data, computing stable sort indices...");
50
51 arrow::Datum sort_indices;
52 cp::SortOptions sort_options;
53 sort_options.sort_keys = { cp::SortKey("timestamp" /* ascending by default */) };
54 ARROW_ASSIGN_OR_RAISE(sort_indices, cp::CallFunction("sort_indices", { table }, &sort_options));
55 puts("Finished computing stable sort indices, creating sorted table...");
56
57 arrow::Datum sorted;
58 ARROW_ASSIGN_OR_RAISE(sorted, cp::CallFunction("take", { table, sort_indices }));
59
60 puts("Writing sorted table to disk...");
61 ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*sorted.table(), "merged/oeuf-merged.parquet"));
62 puts("Syncing...");
63 sync();
64 puts("Done. Have a nice day.");
65
66 return arrow::Status::OK();
67}
68
69#define NOTICE "Notice: This tool will fail if any non-Parquet files in are present in the\n" \
70 " current working directory. It does not load files which are present in\n" \
71 " any possible subdirectories."
72
73const char help[] =
74 "Usage: %s <LINENO>\n"
75 "\n"
76 " LINENO The LinePlanningNumber as in the KV1/KV6 data\n\n"
77 NOTICE "\n";
78
79void exitHelp(const char *progname, int code = 1) {
80 printf(help, progname);
81 exit(code);
82}
83
84int main(int argc, char *argv[]) {
85 const char *progname = argv[0];
86 if (argc != 2) {
87 puts("Error: incorrect number of arguments provided\n");
88 exitHelp(progname);
89 }
90 char *lineno = argv[1];
91 puts(NOTICE "\n");
92
93 std::filesystem::path cwd = std::filesystem::current_path();
94 std::filesystem::create_directory(cwd / "merged");
95
96 puts("Running this program may take a while, especially on big datasets. If you're\n"
97 "processing the data of a single bus line over the course of multiple months,\n"
98 "you may see memory usage of up to 10 GiB. Make sure that you have sufficient\n"
99 "RAM available, to avoid overloading and subsequently freezing your system.\n");
100
101 arrow::Status st = processTables(std::string(lineno));
102 if (!st.ok()) {
103 std::cerr << "Failed to process tables: " << st << std::endl;
104 return EXIT_FAILURE;
105 }
106}
diff --git a/src/querykv1/.envrc b/src/querykv1/.envrc
new file mode 100644
index 0000000..694e74f
--- /dev/null
+++ b/src/querykv1/.envrc
@@ -0,0 +1,2 @@
1source_env ../../
2export DEVMODE=1
diff --git a/src/querykv1/.gitignore b/src/querykv1/.gitignore
new file mode 100644
index 0000000..5761abc
--- /dev/null
+++ b/src/querykv1/.gitignore
@@ -0,0 +1 @@
*.o
diff --git a/src/querykv1/Makefile b/src/querykv1/Makefile
new file mode 100644
index 0000000..a8791f5
--- /dev/null
+++ b/src/querykv1/Makefile
@@ -0,0 +1,28 @@
1# Taken from:
2# Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide
3# for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01,
4# 2023. [Online]. Available:
5# https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html
6CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\
7 -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \
8 -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \
9 -D_GLIBCXX_ASSERTIONS \
10 -fstrict-flex-arrays=3 \
11 -fstack-clash-protection -fstack-protector-strong
12LDFLAGS=-ltmi8 -Wl,-z,defs \
13 -Wl,-z,nodlopen -Wl,-z,noexecstack \
14 -Wl,-z,relro -Wl,-z,now
15
16HDRS=cliopts.hpp daterange.hpp joparoute.hpp journeyinfo.hpp journeyroute.hpp journeys.hpp schedule.hpp
17SRCS=main.cpp cliopts.cpp daterange.cpp joparoute.cpp journeyinfo.cpp journeyroute.cpp journeys.cpp schedule.cpp
18OBJS=$(patsubst %.cpp,%.o,$(SRCS))
19
20%.o: %.cpp $(HDRS)
21 $(CXX) -c -o $@ $< $(CXXFLAGS)
22
23querykv1: $(OBJS)
24 $(CXX) -fPIE -pie -o $@ $^ $(CXXFLAGS) $(LDFLAGS)
25
26.PHONY: clean
27clean:
28 rm querykv1
diff --git a/src/querykv1/cliopts.cpp b/src/querykv1/cliopts.cpp
new file mode 100644
index 0000000..bef7a98
--- /dev/null
+++ b/src/querykv1/cliopts.cpp
@@ -0,0 +1,456 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include <cstdlib>
4#include <cstdio>
5#include <string>
6#include <string_view>
7
8#include <getopt.h>
9
10#include "cliopts.hpp"
11
12using namespace std::string_view_literals;
13
14const char *opt_set = "";
15const char *opt_unset = nullptr;
16
17const char help[] = R"(Usage: %1$s [OPTIONS] <COMMAND>
18
19Global Options:
20 --kv1 <PATH> Path to file containing all KV1 data, '-' for stdin
21 -h, --help Print this help
22
23Commands:
24 joparoute Generate CSV for journey pattern route
25 journeyinfo Print some information on a journey
26 journeyroute Generate CSV for journey route
27 journeys List journeys of a specific line going from stop A to B
28 schedule Generate schedule
29)";
30
31const char joparoute_help[] = R"(Usage: %1$s joparoute --line <NUMBER> --jopa <CODE> [OPTIONS]
32
33Options:
34 --line <NUMBER> Line planning number as in schedule
35 --jopa <CODE> Journey pattern code as in KV1 data
36 -o <PATH> Path of file to write to, '-' for stdout
37
38Global Options:
39 --kv1 <PATH> Path to file containing all KV1 data, '-' for stdin
40 -h, --help Print this help
41)";
42
43const char journeyroute_help[] = R"(Usage: %1$s journeyroute --line <NUMBER> [OPTIONS]
44
45Options:
46 --line <NUMBER> Line planning number as in KV1 data
47 --journey <NUMBER> Journey number as in KV1 data
48 -o <PATH> Path of file to write to, '-' for stdout
49
50Global Options:
51 --kv1 <PATH> Path to file containing all KV1 data, '-' for stdin
52 -h, --help Print this help
53)";
54
55const char journeys_help[] = R"(Usage: %1$s journeys --line <NUMBER> --begin <STOP> --end <STOP> [OPTIONS]
56
57For the --begin and --end arguments, use the following format:
58 --begin/--end stop:<USRSTOP CODE>
59 --begin/--end star:<USRSTAR CODE>
60
61Options:
62 --begin <STOP> User stop code/area of stop the journey should begin at
63 --end <STOP> User stop code/area of stop the journey should end at
64 --line <NUMBER> Line planning number to filter on
65 -o <PATH> Path of file to write to, '-' for stdout
66
67Global Options:
68 --kv1 <PATH> Path to file containing all KV1 data, '-' for stdin
69 -h, --help Print this help
70)";
71
72const char journeyinfo_help[] = R"(Usage: %1$s journeyinfo --line <NUMBER> --journey <NUMBER> [OPTIONS]
73
74Options:
75 --line <NUMBER> Line planning number to filter on
76 --journey <NUMBER> Journey number as in schedule
77
78Global Options:
79 --kv1 <PATH> Path to file containing all KV1 data, '-' for stdin
80 -h, --help Print this help
81)";
82
83const char schedule_help[] = R"(Usage: %1$s schedule --line <NUMBER> [OPTIONS]
84
85Options:
86 --line <NUMBER> Line planning number to generate schedule for
87 -o <PATH> Path of file to write to, '-' for stdout
88
89Global Options:
90 --kv1 <PATH> Path to file containing all KV1 data, '-' for stdin
91 -h, --help Print this help
92)";
93
94void journeyRouteValidateOptions(const char *progname, Options *options) {
95#define X(name, argument, long_, short_) \
96 if (#name != "kv1_file_path"sv && #name != "line_planning_number"sv \
97 && #name != "journey_number"sv && #name != "help"sv && #name != "output_file_path"sv) \
98 if (options->name) { \
99 if (long_) { \
100 if (short_) fprintf(stderr, "%s: unexpected flag --%s (-%c) for journeyroute subcommand\n\n", progname, static_cast<const char *>(long_), short_); \
101 else fprintf(stderr, "%s: unexpected flag --%s for journeyroute subcommand\n\n", progname, static_cast<const char *>(long_)); \
102 } else if (short_) fprintf(stderr, "%s: unexpected flag -%c for journeyroute subcommand\n\n", progname, short_); \
103 fprintf(stderr, journeyroute_help, progname); \
104 exit(1); \
105 }
106 LONG_OPTIONS
107 SHORT_OPTIONS
108#undef X
109
110 if (options->positional.size() > 0) {
111 fprintf(stderr, "%s: unexpected positional argument(s) for journeyroute subcommand\n\n", progname);
112 for (auto pos : options->positional) fprintf(stderr, "opt: %s\n", pos);
113 fprintf(stderr, journeyroute_help, progname);
114 exit(1);
115 }
116
117 if (!options->kv1_file_path)
118 options->kv1_file_path = "-";
119 if (!options->output_file_path)
120 options->output_file_path = "-";
121 if (options->kv1_file_path == ""sv) {
122 fprintf(stderr, "%s: KV1 file path cannot be empty\n\n", progname);
123 fprintf(stderr, journeyroute_help, progname);
124 exit(1);
125 }
126 if (options->output_file_path == ""sv) {
127 fprintf(stderr, "%s: output file path cannot be empty\n\n", progname);
128 fprintf(stderr, journeyroute_help, progname);
129 exit(1);
130 }
131 if (!options->journey_number || options->journey_number == ""sv) {
132 fprintf(stderr, "%s: journey number must be provided\n\n", progname);
133 fprintf(stderr, journeyroute_help, progname);
134 exit(1);
135 }
136 if (!options->line_planning_number || options->line_planning_number == ""sv) {
137 fprintf(stderr, "%s: line planning number must be provided\n\n", progname);
138 fprintf(stderr, journeyroute_help, progname);
139 exit(1);
140 }
141}
142
143void scheduleValidateOptions(const char *progname, Options *options) {
144#define X(name, argument, long_, short_) \
145 if (#name != "kv1_file_path"sv && #name != "help"sv \
146 && #name != "line_planning_number"sv && #name != "output_file_path"sv) \
147 if (options->name) { \
148 if (long_) { \
149 if (short_) fprintf(stderr, "%s: unexpected flag --%s (-%c) for schedule subcommand\n\n", progname, static_cast<const char *>(long_), short_); \
150 else fprintf(stderr, "%s: unexpected flag --%s for schedule subcommand\n\n", progname, static_cast<const char *>(long_)); \
151 } else if (short_) fprintf(stderr, "%s: unexpected flag -%c for schedule subcommand\n\n", progname, short_); \
152 fprintf(stderr, schedule_help, progname); \
153 exit(1); \
154 }
155 LONG_OPTIONS
156 SHORT_OPTIONS
157#undef X
158
159 if (options->positional.size() > 0) {
160 fprintf(stderr, "%s: unexpected positional argument(s) for schedule subcommand\n\n", progname);
161 for (auto pos : options->positional) fprintf(stderr, "opt: %s\n", pos);
162 fprintf(stderr, schedule_help, progname);
163 exit(1);
164 }
165
166 if (!options->kv1_file_path)
167 options->kv1_file_path = "-";
168 if (!options->output_file_path)
169 options->output_file_path = "-";
170 if (options->kv1_file_path == ""sv) {
171 fprintf(stderr, "%s: KV1 file path cannot be empty\n\n", progname);
172 fprintf(stderr, schedule_help, progname);
173 exit(1);
174 }
175 if (options->output_file_path == ""sv) {
176 fprintf(stderr, "%s: output file path cannot be empty\n\n", progname);
177 fprintf(stderr, schedule_help, progname);
178 exit(1);
179 }
180 if (!options->line_planning_number || options->line_planning_number == ""sv) {
181 fprintf(stderr, "%s: line planning number must be provided\n\n", progname);
182 fprintf(stderr, schedule_help, progname);
183 exit(1);
184 }
185}
186
187void journeysValidateOptions(const char *progname, Options *options) {
188#define X(name, argument, long_, short_) \
189 if (#name != "kv1_file_path"sv && #name != "help"sv \
190 && #name != "line_planning_number"sv && #name != "output_file_path"sv \
191 && #name != "begin_stop_code"sv && #name != "end_stop_code"sv) \
192 if (options->name) { \
193 if (long_) { \
194 if (short_) fprintf(stderr, "%s: unexpected flag --%s (-%c) for journeys subcommand\n\n", progname, static_cast<const char *>(long_), short_); \
195 else fprintf(stderr, "%s: unexpected flag --%s for journeys subcommand\n\n", progname, static_cast<const char *>(long_)); \
196 } else if (short_) fprintf(stderr, "%s: unexpected flag -%c for journeys subcommand\n\n", progname, short_); \
197 fprintf(stderr, journeys_help, progname); \
198 exit(1); \
199 }
200 LONG_OPTIONS
201 SHORT_OPTIONS
202#undef X
203
204 if (options->positional.size() > 0) {
205 fprintf(stderr, "%s: unexpected positional argument(s) for journeys subcommand\n\n", progname);
206 for (auto pos : options->positional) fprintf(stderr, "opt: %s\n", pos);
207 fprintf(stderr, journeys_help, progname);
208 exit(1);
209 }
210
211 if (!options->kv1_file_path)
212 options->kv1_file_path = "-";
213 if (!options->output_file_path)
214 options->output_file_path = "-";
215 if (options->kv1_file_path == ""sv) {
216 fprintf(stderr, "%s: KV1 file path cannot be empty\n\n", progname);
217 fprintf(stderr, journeys_help, progname);
218 exit(1);
219 }
220 if (options->output_file_path == ""sv) {
221 fprintf(stderr, "%s: output file path cannot be empty\n\n", progname);
222 fprintf(stderr, journeys_help, progname);
223 exit(1);
224 }
225 if (!options->line_planning_number || options->line_planning_number == ""sv) {
226 fprintf(stderr, "%s: line planning number must be provided\n\n", progname);
227 fprintf(stderr, journeys_help, progname);
228 exit(1);
229 }
230 if (!options->begin_stop_code || options->begin_stop_code == ""sv) {
231 fprintf(stderr, "%s: start user stop code must be provided\n\n", progname);
232 fprintf(stderr, journeys_help, progname);
233 exit(1);
234 }
235 if (!options->end_stop_code || options->end_stop_code == ""sv) {
236 fprintf(stderr, "%s: end user stop code must be provided\n\n", progname);
237 fprintf(stderr, journeys_help, progname);
238 exit(1);
239 }
240 if (!std::string_view(options->begin_stop_code).starts_with("star:")
241 && !std::string_view(options->begin_stop_code).starts_with("stop:")) {
242 fprintf(stderr, "%s: begin user stop code must be prefixed with star:/stop:\n\n", progname);
243 fprintf(stderr, journeys_help, progname);
244 exit(1);
245 }
246 if (!std::string_view(options->end_stop_code).starts_with("star:")
247 && !std::string_view(options->end_stop_code).starts_with("stop:")) {
248 fprintf(stderr, "%s: end user stop code must be prefixed with star:/stop:\n\n", progname);
249 fprintf(stderr, journeys_help, progname);
250 exit(1);
251 }
252}
253
254void journeyInfoValidateOptions(const char *progname, Options *options) {
255#define X(name, argument, long_, short_) \
256 if (#name != "kv1_file_path"sv && #name != "line_planning_number"sv \
257 && #name != "journey_number"sv && #name != "help"sv) \
258 if (options->name) { \
259 if (long_) { \
260 if (short_) fprintf(stderr, "%s: unexpected flag --%s (-%c) for journeyinfo subcommand\n\n", progname, static_cast<const char *>(long_), short_); \
261 else fprintf(stderr, "%s: unexpected flag --%s for journeyinfo subcommand\n\n", progname, static_cast<const char *>(long_)); \
262 } else if (short_) fprintf(stderr, "%s: unexpected flag -%c for journeyinfo subcommand\n\n", progname, short_); \
263 fprintf(stderr, journeyinfo_help, progname); \
264 exit(1); \
265 }
266 LONG_OPTIONS
267 SHORT_OPTIONS
268#undef X
269
270 if (options->positional.size() > 0) {
271 fprintf(stderr, "%s: unexpected positional argument(s) for journeyinfo subcommand\n\n", progname);
272 for (auto pos : options->positional) fprintf(stderr, "opt: %s\n", pos);
273 fprintf(stderr, journeyinfo_help, progname);
274 exit(1);
275 }
276
277 if (!options->kv1_file_path)
278 options->kv1_file_path = "-";
279 if (options->kv1_file_path == ""sv) {
280 fprintf(stderr, "%s: KV1 file path cannot be empty\n\n", progname);
281 fprintf(stderr, journeyinfo_help, progname);
282 exit(1);
283 }
284 if (!options->journey_number || options->journey_number == ""sv) {
285 fprintf(stderr, "%s: journey number must be provided\n\n", progname);
286 fprintf(stderr, journeyinfo_help, progname);
287 exit(1);
288 }
289 if (!options->line_planning_number || options->line_planning_number == ""sv) {
290 fprintf(stderr, "%s: line planning number must be provided\n\n", progname);
291 fprintf(stderr, journeyinfo_help, progname);
292 exit(1);
293 }
294}
295
296void jopaRouteValidateOptions(const char *progname, Options *options) {
297#define X(name, argument, long_, short_) \
298 if (#name != "kv1_file_path"sv && #name != "line_planning_number"sv \
299 && #name != "journey_pattern_code"sv && #name != "help"sv && #name != "output_file_path"sv) \
300 if (options->name) { \
301 if (long_) { \
302 if (short_) fprintf(stderr, "%s: unexpected flag --%s (-%c) for joparoute subcommand\n\n", progname, static_cast<const char *>(long_), short_); \
303 else fprintf(stderr, "%s: unexpected flag --%s for joparoute subcommand\n\n", progname, static_cast<const char *>(long_)); \
304 } else if (short_) fprintf(stderr, "%s: unexpected flag -%c for joparoute subcommand\n\n", progname, short_); \
305 fprintf(stderr, joparoute_help, progname); \
306 exit(1); \
307 }
308 LONG_OPTIONS
309 SHORT_OPTIONS
310#undef X
311
312 if (options->positional.size() > 0) {
313 fprintf(stderr, "%s: unexpected positional argument(s) for joparoute subcommand\n\n", progname);
314 for (auto pos : options->positional) fprintf(stderr, "opt: %s\n", pos);
315 fprintf(stderr, joparoute_help, progname);
316 exit(1);
317 }
318
319 if (!options->kv1_file_path)
320 options->kv1_file_path = "-";
321 if (!options->output_file_path)
322 options->output_file_path = "-";
323 if (options->kv1_file_path == ""sv) {
324 fprintf(stderr, "%s: KV1 file path cannot be empty\n\n", progname);
325 fprintf(stderr, joparoute_help, progname);
326 exit(1);
327 }
328 if (options->output_file_path == ""sv) {
329 fprintf(stderr, "%s: output file path cannot be empty\n\n", progname);
330 fprintf(stderr, joparoute_help, progname);
331 exit(1);
332 }
333 if (!options->journey_pattern_code || options->journey_pattern_code == ""sv) {
334 fprintf(stderr, "%s: journey pattern code must be provided\n\n", progname);
335 fprintf(stderr, joparoute_help, progname);
336 exit(1);
337 }
338 if (!options->line_planning_number || options->line_planning_number == ""sv) {
339 fprintf(stderr, "%s: line planning number must be provided\n\n", progname);
340 fprintf(stderr, joparoute_help, progname);
341 exit(1);
342 }
343}
344
345struct ShortFlag {
346 int has_arg;
347 int c;
348};
349
350template<ShortFlag ...flags>
351const std::string mkargarr =
352 (std::string()
353 + ...
354 + (flags.c == 0
355 ? ""
356 : std::string((const char[]){ flags.c, '\0' })
357 + (flags.has_arg == required_argument
358 ? ":"
359 : flags.has_arg == optional_argument
360 ? "::"
361 : "")));
362
363#define X(name, has_arg, long_, short_) ShortFlag(has_arg, short_),
364const std::string argarr = mkargarr<SHORT_OPTIONS LONG_OPTIONS ShortFlag(no_argument, 0)>;
365#undef X
366
367Options parseOptions(int argc, char *argv[]) {
368 const char *progname = argv[0];
369
370 // Struct with options for augmentkv6.
371 Options options;
372
373 static option long_options[] = {
374#define X(name, argument, long_, short_) { long_, argument, nullptr, short_ },
375 LONG_OPTIONS
376#undef X
377 { 0 },
378 };
379
380 int c;
381 int option_index = 0;
382 bool error = false;
383 while ((c = getopt_long(argc, argv, argarr.c_str(), long_options, &option_index)) != -1) {
384 // If a long option was used, c corresponds with val. We have val = 0 for
385 // options which have no short alternative, so checking for c = 0 gives us
386 // whether a long option with no short alternative was used.
387 // Below, we check for c = 'h', which corresponds with the long option
388 // '--help', for which val = 'h'.
389 if (c == 0) {
390 const char *name = long_options[option_index].name;
391#define X(opt_name, opt_has_arg, opt_long, opt_short) \
392 if (name == opt_long ## sv) { options.opt_name = optarg; continue; }
393 LONG_OPTIONS
394#undef X
395 error = true;
396 }
397#define X(opt_name, opt_has_arg, opt_long, opt_short) \
398 if (c == opt_short) { options.opt_name = optarg ? optarg : opt_set; continue; }
399 LONG_OPTIONS
400 SHORT_OPTIONS
401#undef X
402 error = true;
403 }
404
405 if (optind < argc)
406 options.subcommand = argv[optind++];
407 while (optind < argc)
408 options.positional.push_back(argv[optind++]);
409
410 if (options.subcommand
411 && options.subcommand != "schedule"sv
412 && options.subcommand != "joparoute"sv
413 && options.subcommand != "journeyinfo"sv
414 && options.subcommand != "journeyroute"sv
415 && options.subcommand != "journeys"sv) {
416 fprintf(stderr, "%s: unknown subcommand '%s'\n\n", progname, options.subcommand);
417 fprintf(stderr, help, progname);
418 exit(1);
419 }
420 if (options.subcommand && error) {
421 fputc('\n', stderr);
422 if (options.subcommand == "joparoute"sv) fprintf(stderr, joparoute_help, progname);
423 if (options.subcommand == "journeyinfo"sv) fprintf(stderr, journeyinfo_help, progname);
424 if (options.subcommand == "journeyroute"sv) fprintf(stderr, journeyroute_help, progname);
425 if (options.subcommand == "journeys"sv) fprintf(stderr, journeys_help, progname);
426 if (options.subcommand == "schedule"sv) fprintf(stderr, schedule_help, progname);
427 exit(1);
428 }
429 if (error || !options.subcommand) {
430 if (!options.subcommand) fprintf(stderr, "%s: no subcommand provided\n", progname);
431 fputc('\n', stderr);
432 fprintf(stderr, help, progname);
433 exit(1);
434 }
435 if (options.help) {
436 if (options.subcommand == "joparoute"sv) fprintf(stderr, joparoute_help, progname);
437 if (options.subcommand == "journeyinfo"sv) fprintf(stderr, journeyinfo_help, progname);
438 if (options.subcommand == "journeyroute"sv) fprintf(stderr, journeyroute_help, progname);
439 if (options.subcommand == "journeys"sv) fprintf(stderr, journeys_help, progname);
440 if (options.subcommand == "schedule"sv) fprintf(stderr, schedule_help, progname);
441 exit(0);
442 }
443
444 if (options.subcommand == "joparoute"sv)
445 jopaRouteValidateOptions(progname, &options);
446 if (options.subcommand == "journeyinfo"sv)
447 journeyInfoValidateOptions(progname, &options);
448 if (options.subcommand == "journeyroute"sv)
449 journeyRouteValidateOptions(progname, &options);
450 if (options.subcommand == "journeys"sv)
451 journeysValidateOptions(progname, &options);
452 if (options.subcommand == "schedule"sv)
453 scheduleValidateOptions(progname, &options);
454
455 return options;
456}
diff --git a/src/querykv1/cliopts.hpp b/src/querykv1/cliopts.hpp
new file mode 100644
index 0000000..df8630e
--- /dev/null
+++ b/src/querykv1/cliopts.hpp
@@ -0,0 +1,35 @@
1// vim:set sw=2 ts=2 sts et:
2
3#ifndef OEUF_QUERYKV1_CLIOPTS_HPP
4#define OEUF_QUERYKV1_CLIOPTS_HPP
5
6#include <vector>
7
8#define LONG_OPTIONS \
9/* name req/opt/no arg long short */
10 X(kv1_file_path, required_argument, "kv1", 0 ) \
11 X(line_planning_number, required_argument, "line", 0 ) \
12 X(journey_number, required_argument, "journey", 0 ) \
13 X(journey_pattern_code, required_argument, "jopa", 0 ) \
14 X(begin_stop_code, required_argument, "begin", 0 ) \
15 X(end_stop_code, required_argument, "end", 0 ) \
16 X(help, no_argument, "help", 'h')
17
18#define SHORT_OPTIONS \
19 X(output_file_path, required_argument, nullptr, 'o')
20
21struct Options {
22 const char *subcommand = nullptr;
23 std::vector<const char *> positional;
24#define X(name, argument, long_, short_) const char *name = nullptr;
25 LONG_OPTIONS
26 SHORT_OPTIONS
27#undef X
28};
29
30extern const char *opt_set;
31extern const char *opt_unset;
32
33Options parseOptions(int argc, char *argv[]);
34
35#endif // OEUF_QUERYKV1_CLIOPTS_HPP
diff --git a/src/querykv1/daterange.cpp b/src/querykv1/daterange.cpp
new file mode 100644
index 0000000..5ce42bf
--- /dev/null
+++ b/src/querykv1/daterange.cpp
@@ -0,0 +1,91 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include "daterange.hpp"
4
5static std::chrono::year_month_day nextDay(std::chrono::year_month_day ymd) {
6 return std::chrono::sys_days(ymd) + std::chrono::days(1);
7}
8
9// DateRange expresses the date range [from, thru].
10DateRange::Iterator &DateRange::Iterator::operator++() {
11 ymd_ = nextDay(ymd_);
12 return *this;
13}
14
15std::chrono::year_month_day DateRange::Iterator::operator*() const {
16 return ymd_;
17}
18
19std::chrono::year_month_day DateRange::Iterator::ymd() const {
20 return ymd_;
21}
22
23DateRange::Iterator::Iterator(std::chrono::year_month_day ymd) : ymd_(ymd) {}
24
25DateRange::DateRange(std::chrono::year_month_day from, std::chrono::year_month_day thru)
26 : from_(from), thru_(thru)
27{}
28
29DateRange::Iterator DateRange::begin() const {
30 return DateRange::Iterator(from_);
31}
32
33DateRange::Iterator DateRange::end() const {
34 return DateRange::Iterator(nextDay(thru_));
35}
36
37bool DateRange::valid() const {
38 return from_ <= thru_;
39}
40
41std::chrono::year_month_day DateRange::from() const {
42 return from_;
43}
44
45std::chrono::year_month_day DateRange::thru() const {
46 return thru_;
47}
48
49bool operator==(const DateRange::Iterator a, const DateRange::Iterator b) {
50 return *a == *b;
51}
52
53DateRangeSeq::DateRangeSeq(std::initializer_list<DateRange> ranges)
54 : DateRangeSeq(ranges.begin(), ranges.end())
55{}
56
57DateRangeSeq DateRangeSeq::clampFrom(std::chrono::year_month_day from) const {
58 std::vector<DateRange> new_ranges;
59 new_ranges.reserve(ranges_.size());
60 for (const DateRange range : ranges_) {
61 if (range.from() < from) {
62 if (range.thru() < from)
63 continue;
64 new_ranges.emplace_back(from, range.thru());
65 }
66 new_ranges.push_back(range);
67 }
68 return DateRangeSeq(new_ranges.begin(), new_ranges.end());
69}
70
71DateRangeSeq DateRangeSeq::clampThru(std::chrono::year_month_day thru) const {
72 std::vector<DateRange> new_ranges;
73 new_ranges.reserve(ranges_.size());
74 for (const DateRange range : ranges_) {
75 if (range.thru() > thru) {
76 if (range.from() > thru)
77 continue;
78 new_ranges.emplace_back(range.from(), thru);
79 }
80 new_ranges.push_back(range);
81 }
82 return DateRangeSeq(new_ranges.begin(), new_ranges.end());
83}
84
85std::vector<DateRange>::const_iterator DateRangeSeq::begin() const {
86 return ranges_.begin();
87}
88
89std::vector<DateRange>::const_iterator DateRangeSeq::end() const {
90 return ranges_.end();
91}
diff --git a/src/querykv1/daterange.hpp b/src/querykv1/daterange.hpp
new file mode 100644
index 0000000..e34c39c
--- /dev/null
+++ b/src/querykv1/daterange.hpp
@@ -0,0 +1,118 @@
1// vim:set sw=2 ts=2 sts et:
2
3#ifndef OEUF_QUERYKV1_DATERANGE_HPP
4#define OEUF_QUERYKV1_DATERANGE_HPP
5
6#include <cassert>
7#include <chrono>
8#include <concepts>
9#include <iterator>
10#include <utility>
11#include <vector>
12
13// DateRange expresses the date range [from, thru].
14class DateRange {
15 public:
16 class Iterator {
17 friend class DateRange;
18
19 public:
20 Iterator &operator++();
21
22 std::chrono::year_month_day operator*() const;
23 std::chrono::year_month_day ymd() const;
24
25 private:
26 explicit Iterator(std::chrono::year_month_day ymd);
27
28 std::chrono::year_month_day ymd_;
29 };
30
31 explicit DateRange(std::chrono::year_month_day from, std::chrono::year_month_day thru);
32
33 Iterator begin() const;
34 Iterator end() const;
35 bool valid() const;
36 std::chrono::year_month_day from() const;
37 std::chrono::year_month_day thru() const;
38
39 private:
40 std::chrono::year_month_day from_;
41 std::chrono::year_month_day thru_;
42};
43
44bool operator==(const DateRange::Iterator a, const DateRange::Iterator b);
45
46template<typename Tp, typename T>
47concept DerefsTo = requires(Tp p) {
48 { *p } -> std::convertible_to<T>;
49};
50
51class DateRangeSeq {
52 // The way LE and GE are ordered makes a difference for how the sorting
53 // (insertion based on lower_bound) works. Do not carelessly reorder this.
54 enum LeGe {
55 GE, // >=
56 LE, // <=
57 };
58
59 std::vector<DateRange> ranges_;
60
61 public:
62 template<std::input_iterator InputIt>
63 requires DerefsTo<InputIt, DateRange>
64 explicit DateRangeSeq(InputIt begin, InputIt end) {
65 // We convert every inclusive date range [x, y] into (x, >=) and (y, <=)
66 // and put these into a list, using binary search to make sure that these
67 // stay ordered. We then reduce this list, removing tautological
68 // predicates, giving us a final list of ranges that do not overlap.
69
70 std::vector<std::pair<std::chrono::year_month_day, LeGe>> preds;
71
72 size_t n = 0;
73 for (auto it = begin; it != end; it++) {
74 auto &range = *it;
75 if (!range.valid()) continue;
76
77 auto a = std::make_pair(range.from(), GE);
78 auto b = std::make_pair(range.thru(), LE);
79 preds.insert(std::lower_bound(preds.begin(), preds.end(), a), a);
80 preds.insert(std::lower_bound(preds.begin(), preds.end(), b), b);
81
82 n++;
83 }
84
85 if (preds.empty())
86 return;
87
88 assert(preds.size() >= 2);
89 assert(preds.front().second == GE);
90 assert(preds.back().second == LE);
91
92 std::chrono::year_month_day begin_ymd = preds[0].first;
93 for (size_t i = 1; i < preds.size(); i++) {
94 if (preds[i].second == LE && (i + 1 == preds.size() || preds[i + 1].second == GE)) {
95 std::chrono::year_month_day end_ymd = preds[i].first;
96 if (!ranges_.empty() && ranges_.back().thru() == begin_ymd)
97 ranges_.back() = DateRange(ranges_.back().from(), end_ymd);
98 else
99 ranges_.push_back(DateRange(begin_ymd, end_ymd));
100 if (i + 1 != preds.size()) {
101 begin_ymd = preds[i + 1].first;
102 i++;
103 }
104 }
105 }
106 }
107
108 explicit DateRangeSeq(std::initializer_list<DateRange> ranges);
109
110 DateRangeSeq clampFrom(std::chrono::year_month_day from) const;
111 DateRangeSeq clampThru(std::chrono::year_month_day thru) const;
112
113 public:
114 std::vector<DateRange>::const_iterator begin() const;
115 std::vector<DateRange>::const_iterator end() const;
116};
117
118#endif // OEUF_QUERYKV1_DATERANGE_HPP
diff --git a/src/querykv1/grammar.abnf b/src/querykv1/grammar.abnf
new file mode 100644
index 0000000..1c93760
--- /dev/null
+++ b/src/querykv1/grammar.abnf
@@ -0,0 +1,44 @@
1; This grammar does *not* allow fields to contain LF, unless the entire content
2; of the field is quoted. The file is simply rejected otherwise.
3; I took the liberty to take some inspiration from the somewhat similar IETF RFC 4180.
4
5document = [header NEWLINE] (comment / record / empty-line) *(NEWLINE (comment / record / empty-line)) [NEWLINE] / header
6
7header = OPENBRACK *NOTCRLF
8comment = SEMICOLON *NOTCRLF
9
10empty-line = *WHITESPACE
11
12record = field *(PIPE field)
13field = *WHITESPACE field-data *WHITESPACE
14field-data = escaped / unescaped
15
16; Unescaped fields are also allowed to contain double quotes,
17; they are just not interpreted in any special way.
18escaped = DQUOTE *(TEXTDATA / WHITESPACE / NEWLINE / PIPE / 2DQUOTE) DQUOTE
19unescaped = [TEXTDATA *(*WHITESPACE (TEXTDATA / DQUOTE))]
20
21HTAB = %x09 ; <horizontal tab, "\t">
22LF = %x0A ; <line feed, "\n">
23VTAB = %x0B ; <vertical tab, "\v">
24FF = %x0C ; <form feed, "\f">
25CR = %x0D ; <carriage return, "\r">
26SPACE = %x20 ; <space, " ">
27DQUOTE = %x22 ; "
28SEMICOLON = %x3B ; ;
29OPENBRACK = %x5B ; [
30PIPE = %x7C ; |
31
32; All codepoints, except CR, LF, SPACE, FF, HTAB, VTAB, PIPE, DQUOTE
33; Semicolon is included, as comments are only defined as 'lines starting with a semicolon'.
34; So it should be fine if a semicolon is part of a field, the rest of the line would not
35; be interpreted as a comment in that case.
36TEXTDATA = %x00-08 / %x0E-1F / %x21 / %x23-5A / %x5C-7B / %x7D-10FFFF
37
38; Not including LF here even though TMI8/KV1 does not officially consider it
39; a newline, as newlines are defined as 'CR optionally followed by LF'
40WHITESPACE = SPACE / FF / HTAB / VTAB
41
42; All codepoints excluding CR and LF
43NOTCRLF = %x00-09 / %x0B-0C / %x0E-10FFFF
44NEWLINE = CR [LF]
diff --git a/src/querykv1/grammar.ebnf b/src/querykv1/grammar.ebnf
new file mode 100644
index 0000000..94f8cde
--- /dev/null
+++ b/src/querykv1/grammar.ebnf
@@ -0,0 +1,47 @@
1/* This grammar does allow fields to contain stray LFs, not after any specific
2 * CR. I took the liberty to take some inspiration from the somewhat similar
3 * IETF RFC 4180.
4 */
5document ::= (header NEWLINE)? (comment | record | empty-line) (NEWLINE (comment | record | empty-line))* NEWLINE? | header
6
7header ::= OPENBRACK NOTCR*
8comment ::= SEMICOLON NOTCR*
9
10empty-line ::= WHITESPACE*
11
12record ::= field (PIPE field)*
13field ::= WHITESPACE* field-data WHITESPACE*
14field-data ::= DQUOTE escaped DQUOTE | unescaped
15
16/* Unescaped fields are also allowed to contain double quotes, they are just
17 * not interpreted in any special way.
18 */
19escaped ::= (TEXTDATA | WHITESPACE | NEWLINE | PIPE | DQUOTE DQUOTE)*
20unescaped ::= (TEXTDATA (WHITESPACE* (TEXTDATA | DQUOTE))*)?
21
22HTAB ::= #x09 /* <horizontal tab, "\t"> */
23LF ::= #x0A /* <line feed, "\n"> */
24VTAB ::= #x0B /* <vertical tab, "\v"> */
25FF ::= #x0C /* <form feed, "\f"> */
26CR ::= #x0D /* <carriage return, "\r"> */
27SPACE ::= #x20 /* <space, " "> */
28DQUOTE ::= #x22 /* " */
29SEMICOLON ::= #x3B /* ; */
30OPENBRACK ::= #x5B /* [ */
31PIPE ::= #x7C /* | */
32
33/* All codepoints, except CR, LF, SPACE, FF, HTAB, VTAB, PIPE, DQUOTE.
34 * Semicolon is included, as comments are only defined as 'lines starting with
35 * a semicolon'. So it should be fine if a semicolon is part of a field, the
36 * rest of the line would not be interpreted as a comment in that case.
37 */
38TEXTDATA ::= [#x00-#x08#x0E-#x1F#x21#x23-#x5A#x5C-#x7B#x7D-#x10FFFF]
39
40/* Including LF here as TMI8/KV1 does not consider it a newline,
41 * as newlines are defined as 'CR optionally followed by LF'
42 */
43WHITESPACE ::= SPACE | LF | FF | HTAB | VTAB
44
45/* All codepoints excluding CR and LF */
46NOTCR ::= [#x00-#x0C#x0E-#x10FFFF]
47NEWLINE ::= CR LF?
diff --git a/src/querykv1/grammar.ebnf.bak b/src/querykv1/grammar.ebnf.bak
new file mode 100644
index 0000000..b5acbf5
--- /dev/null
+++ b/src/querykv1/grammar.ebnf.bak
@@ -0,0 +1,23 @@
1document ::= (header NEWLINE)? (comment | record | empty-line) (NEWLINE (comment | record | empty-line))* NEWLINE? | header
2header ::= OPENBRACK NOTCRLF*
3comment ::= SEMICOLON NOTCRLF*
4empty-line ::= WHITESPACE*
5record ::= field (PIPE field)*
6field ::= WHITESPACE* field-data WHITESPACE*
7field-data ::= escaped | unescaped
8escaped ::= DQUOTE (TEXTDATA | WHITESPACE | NEWLINE | PIPE | DQUOTE DQUOTE)* DQUOTE
9unescaped ::= (TEXTDATA (WHITESPACE* (TEXTDATA | DQUOTE))*)?
10HTAB ::= #x09
11LF ::= #x0A
12VTAB ::= #x0B
13FF ::= #x0C
14CR ::= #x0D
15SPACE ::= #x20
16DQUOTE ::= #x22
17SEMICOLON ::= #x3B
18OPENBRACK ::= #x5B
19PIPE ::= #x7C
20WHITESPACE ::= SPACE | FF | HTAB | VTAB
21NOTCRLF ::= [#x00-#x09#x0B-#x0C#x0E-#x10FFFF]
22TEXTDATA ::= [#x00-#x08#x0E-#x1F#x21#x23-#x5A#x5C-#x7B#x7D-#x10FFFF]
23NEWLINE ::= CR LF?
diff --git a/src/querykv1/joparoute.cpp b/src/querykv1/joparoute.cpp
new file mode 100644
index 0000000..94ed359
--- /dev/null
+++ b/src/querykv1/joparoute.cpp
@@ -0,0 +1,102 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include <cstdio>
4#include <iostream>
5#include <string_view>
6
7#include "joparoute.hpp"
8
9using namespace std::string_view_literals;
10
11void jopaRoute(const Options &options, Kv1Records &records, Kv1Index &index) {
12 FILE *out = stdout;
13 if (options.output_file_path != "-"sv)
14 out = fopen(options.output_file_path, "wb");
15 if (!out) {
16 fprintf(stderr, "Open %s: %s\n", options.output_file_path, strerrordesc_np(errno));
17 exit(EXIT_FAILURE);
18 }
19
20 const std::string data_owner_code = "CXX";
21 Kv1JourneyPattern::Key jopa_key(
22 // Of course it is bad to hardcode this, but we really have no time to make
23 // everything nice and dynamic. We're only working with CXX data anyway,
24 // and provide no support for the 'Schedules and Passing Times' KV1
25 // variant.
26 data_owner_code,
27 options.line_planning_number,
28 options.journey_pattern_code);
29
30 const Kv1JourneyPattern *jopa = index.journey_patterns[jopa_key];
31 if (!jopa) {
32 std::cerr << "Journey pattern not found" << std::endl;
33 return;
34 }
35 const Kv1Line *line = jopa->p_line;
36
37 struct Point {
38 bool is_stop = false;
39 const Kv1JourneyPatternTimingLink *jopatili = nullptr;
40 const Kv1Link *link = nullptr;
41 const Kv1Point *point = nullptr;
42 double distance_since_start_of_link = 0;
43 double distance_since_start_of_journey = 0;
44 };
45 std::vector<Point> points;
46
47 for (size_t i = 0; i < records.journey_pattern_timing_links.size(); i++) {
48 const Kv1JourneyPatternTimingLink *jopatili = &records.journey_pattern_timing_links[i];
49 if (jopatili->key.line_planning_number == jopa->key.line_planning_number
50 && jopatili->key.journey_pattern_code == jopa->key.journey_pattern_code) {
51 const Kv1Link::Key link_key(data_owner_code, jopatili->user_stop_code_begin,
52 jopatili->user_stop_code_end, line->transport_type);
53 const Kv1Link *link = index.links[link_key];
54 const Kv1UserStopPoint::Key link_begin_key(data_owner_code, jopatili->user_stop_code_begin);
55 const Kv1UserStopPoint::Key link_end_key(data_owner_code, jopatili->user_stop_code_end);
56 const Kv1UserStopPoint *link_begin = index.user_stop_points[link_begin_key];
57 const Kv1UserStopPoint *link_end = index.user_stop_points[link_end_key];
58
59 points.emplace_back(true, jopatili, link, link_begin->p_point, 0);
60
61 for (size_t j = 0; j < records.point_on_links.size(); j++) {
62 Kv1PointOnLink *pool = &records.point_on_links[j];
63 if (pool->key.user_stop_code_begin == jopatili->user_stop_code_begin
64 && pool->key.user_stop_code_end == jopatili->user_stop_code_end
65 && pool->key.transport_type == jopatili->p_line->transport_type) {
66 points.emplace_back(false, jopatili, link, pool->p_point, pool->distance_since_start_of_link);
67 }
68 }
69
70 points.emplace_back(true, jopatili, link, link_end->p_point, link->distance);
71 }
72 }
73
74 std::sort(points.begin(), points.end(), [](Point &a, Point &b) {
75 if (a.jopatili->key.timing_link_order != b.jopatili->key.timing_link_order)
76 return a.jopatili->key.timing_link_order < b.jopatili->key.timing_link_order;
77 return a.distance_since_start_of_link < b.distance_since_start_of_link;
78 });
79
80 double distance_since_start_of_journey = 0;
81 for (size_t i = 0; i < points.size(); i++) {
82 Point *p = &points[i];
83 if (i > 0) {
84 Point *prev = &points[i - 1];
85 if (p->link != prev->link) {
86 distance_since_start_of_journey += prev->link->distance;
87 }
88 }
89 p->distance_since_start_of_journey = distance_since_start_of_journey + p->distance_since_start_of_link;
90 }
91
92 fputs("is_stop,link_usrstop_begin,link_usrstop_end,point_code,rd_x,rd_y,distance_since_start_of_link,distance_since_start_of_journey\n", out);
93 for (const auto &point : points) {
94 fprintf(out, "%s,%s,%s,%s,%f,%f,%f,%f\n",
95 point.is_stop ? "true" : "false",
96 point.jopatili->user_stop_code_begin.c_str(), point.jopatili->user_stop_code_end.c_str(),
97 point.point->key.point_code.c_str(), point.point->location_x_ew, point.point->location_y_ns,
98 point.distance_since_start_of_link, point.distance_since_start_of_journey);
99 }
100
101 if (options.output_file_path != "-"sv) fclose(out);
102}
diff --git a/src/querykv1/joparoute.hpp b/src/querykv1/joparoute.hpp
new file mode 100644
index 0000000..ade94e8
--- /dev/null
+++ b/src/querykv1/joparoute.hpp
@@ -0,0 +1,13 @@
1// vim:set sw=2 ts=2 sts et:
2
3#ifndef OEUF_QUERYKV1_JOPAROUTE_HPP
4#define OEUF_QUERYKV1_JOPAROUTE_HPP
5
6#include <tmi8/kv1_types.hpp>
7#include <tmi8/kv1_index.hpp>
8
9#include "cliopts.hpp"
10
11void jopaRoute(const Options &options, Kv1Records &records, Kv1Index &index);
12
13#endif // OEUF_QUERYKV1_JOPAROUTE_HPP
diff --git a/src/querykv1/journeyinfo.cpp b/src/querykv1/journeyinfo.cpp
new file mode 100644
index 0000000..bd29490
--- /dev/null
+++ b/src/querykv1/journeyinfo.cpp
@@ -0,0 +1,64 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include <iostream>
4
5#include "journeyinfo.hpp"
6
7void journeyInfo(const Options &options, Kv1Records &records, Kv1Index &index) {
8 std::cout << "Info for journey " << options.line_planning_number
9 << "/" << options.journey_number << std::endl;
10
11 std::unordered_map<std::string, const Kv1UserStopPoint *> usrstops;
12 for (size_t i = 0; i < records.user_stop_points.size(); i++) {
13 const Kv1UserStopPoint *usrstop = &records.user_stop_points[i];
14 usrstops[usrstop->key.user_stop_code] = usrstop;
15 }
16
17 for (const auto &pujo : records.public_journeys) {
18 if (pujo.key.line_planning_number != options.line_planning_number
19 || std::to_string(pujo.key.journey_number) != options.journey_number)
20 continue;
21
22 std::vector<const Kv1JourneyPatternTimingLink *> timing_links;
23 for (size_t i = 0; i < records.journey_pattern_timing_links.size(); i++) {
24 const Kv1JourneyPatternTimingLink *jopatili = &records.journey_pattern_timing_links[i];
25 if (jopatili->key.line_planning_number != options.line_planning_number
26 || jopatili->key.journey_pattern_code != pujo.journey_pattern_code)
27 continue;
28 timing_links.push_back(jopatili);
29 }
30
31 std::sort(timing_links.begin(), timing_links.end(), [](auto a, auto b) -> bool {
32 return a->key.timing_link_order < b->key.timing_link_order;
33 });
34 auto begin_stop = timing_links.front()->user_stop_code_begin;
35 auto end_stop = timing_links.back()->user_stop_code_end;
36
37 const auto *begin = usrstops[begin_stop];
38 const auto *end = usrstops[end_stop];
39
40 std::cout << " Journey pattern: " << pujo.key.line_planning_number
41 << "/" << pujo.journey_pattern_code << std::endl
42 << " Begin stop: " << begin_stop
43 << "; name: " << std::quoted(begin->name)
44 << "; town: " << std::quoted(begin->town) << std::endl
45 << " End stop: " << end_stop
46 << "; name: " << std::quoted(end->name)
47 << "; town: " << std::quoted(end->town) << std::endl;
48
49 const auto *begin_star = begin->p_user_stop_area;
50 const auto *end_star = end->p_user_stop_area;
51 if (begin_star)
52 std::cout << " Begin stop area: " << begin_star->key.user_stop_area_code
53 << "; name: " << std::quoted(begin_star->name)
54 << ", town: " << std::quoted(begin_star->town)
55 << std::endl;
56 if (end_star)
57 std::cout << " End stop area: " << end_star->key.user_stop_area_code
58 << "; name: " << std::quoted(end_star->name)
59 << ", town: " << std::quoted(end_star->town)
60 << std::endl;
61
62 break;
63 }
64}
diff --git a/src/querykv1/journeyinfo.hpp b/src/querykv1/journeyinfo.hpp
new file mode 100644
index 0000000..2a2118d
--- /dev/null
+++ b/src/querykv1/journeyinfo.hpp
@@ -0,0 +1,13 @@
1// vim:set sw=2 ts=2 sts et:
2
3#ifndef OEUF_QUERYKV1_JOURNEYINFO_HPP
4#define OEUF_QUERYKV1_JOURNEYINFO_HPP
5
6#include <tmi8/kv1_types.hpp>
7#include <tmi8/kv1_index.hpp>
8
9#include "cliopts.hpp"
10
11void journeyInfo(const Options &options, Kv1Records &records, Kv1Index &index);
12
13#endif // OEUF_QUERYKV1_JOURNEYINFO_HPP
diff --git a/src/querykv1/journeyroute.cpp b/src/querykv1/journeyroute.cpp
new file mode 100644
index 0000000..013ea1c
--- /dev/null
+++ b/src/querykv1/journeyroute.cpp
@@ -0,0 +1,96 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include <iostream>
4#include <string_view>
5
6#include "journeyroute.hpp"
7
8using namespace std::string_view_literals;
9
10void journeyRoute(const Options &options, Kv1Records &records, Kv1Index &index) {
11 FILE *out = stdout;
12 if (options.output_file_path != "-"sv)
13 out = fopen(options.output_file_path, "wb");
14 if (!out) {
15 fprintf(stderr, "Open %s: %s\n", options.output_file_path, strerrordesc_np(errno));
16 exit(EXIT_FAILURE);
17 }
18
19 for (auto &pujo : records.public_journeys) {
20 if (pujo.key.line_planning_number == options.line_planning_number && std::to_string(pujo.key.journey_number) == options.journey_number) {
21 fprintf(stderr, "Got PUJO %s/%s:\n", options.line_planning_number, options.journey_number);
22 fprintf(stderr, " Day type: %s\n", pujo.key.day_type.c_str());
23 auto &pegr = *pujo.p_period_group;
24 fprintf(stderr, " PEGR Code: %s\n", pegr.key.period_group_code.c_str());
25 fprintf(stderr, " PEGR Description: %s\n", pegr.description.c_str());
26 fprintf(stderr, " SPECDAY Code: %s\n", pujo.key.specific_day_code.c_str());
27 auto &timdemgrp = *pujo.p_time_demand_group;
28
29 for (auto &pegrval : records.period_group_validities) {
30 if (pegrval.key.period_group_code == pegr.key.period_group_code) {
31 fprintf(stderr, "Got PEGRVAL for PEGR %s\n", pegr.key.period_group_code.c_str());
32 std::cerr << " Valid from: " << pegrval.key.valid_from << std::endl;
33 std::cerr << " Valid thru: " << pegrval.valid_thru << std::endl;
34 }
35 }
36
37 struct Point {
38 Kv1JourneyPatternTimingLink *jopatili = nullptr;
39 Kv1TimeDemandGroupRunTime *timdemrnt = nullptr;
40 double distance_since_start_of_link = 0;
41 double rd_x = 0;
42 double rd_y = 0;
43 double total_time_s = 0;
44 };
45 std::vector<Point> points;
46
47 for (size_t i = 0; i < records.time_demand_group_run_times.size(); i++) {
48 Kv1TimeDemandGroupRunTime *timdemrnt = &records.time_demand_group_run_times[i];
49 if (timdemrnt->key.line_planning_number == timdemgrp.key.line_planning_number
50 && timdemrnt->key.journey_pattern_code == timdemgrp.key.journey_pattern_code
51 && timdemrnt->key.time_demand_group_code == timdemgrp.key.time_demand_group_code) {
52 Kv1JourneyPatternTimingLink *jopatili = timdemrnt->p_journey_pattern_timing_link;
53 for (auto &pool : records.point_on_links) {
54 if (pool.key.user_stop_code_begin == timdemrnt->user_stop_code_begin
55 && pool.key.user_stop_code_end == timdemrnt->user_stop_code_end
56 && pool.key.transport_type == jopatili->p_line->transport_type) {
57 points.emplace_back(
58 jopatili,
59 timdemrnt,
60 pool.distance_since_start_of_link,
61 pool.p_point->location_x_ew,
62 pool.p_point->location_y_ns
63 );
64 }
65 }
66 }
67 }
68
69 std::sort(points.begin(), points.end(), [](Point &a, Point &b) {
70 if (a.jopatili->key.timing_link_order != b.jopatili->key.timing_link_order)
71 return a.jopatili->key.timing_link_order < b.jopatili->key.timing_link_order;
72 return a.distance_since_start_of_link < b.distance_since_start_of_link;
73 });
74
75 double total_time_s = 0;
76 for (size_t i = 0; i < points.size(); i++) {
77 Point *p = &points[i];
78 p->total_time_s = total_time_s;
79 if (i > 0) {
80 Point *prev = &points[i - 1];
81 if (p->timdemrnt != prev->timdemrnt) {
82 total_time_s += prev->timdemrnt->total_drive_time_s;
83 prev->total_time_s = total_time_s;
84 }
85 }
86 }
87
88 fputs("rd_x,rd_y,total_time_s,is_timing_stop\n", out);
89 for (const auto &point : points) {
90 fprintf(out, "%f,%f,%f,%d\n", point.rd_x, point.rd_y, point.total_time_s, point.jopatili->is_timing_stop);
91 }
92 }
93 }
94
95 if (options.output_file_path != "-"sv) fclose(out);
96}
diff --git a/src/querykv1/journeyroute.hpp b/src/querykv1/journeyroute.hpp
new file mode 100644
index 0000000..ccd996c
--- /dev/null
+++ b/src/querykv1/journeyroute.hpp
@@ -0,0 +1,13 @@
1// vim:set sw=2 ts=2 sts et:
2
3#ifndef OEUF_QUERYKV1_JOURNEYROUTE_HPP
4#define OEUF_QUERYKV1_JOURNEYROUTE_HPP
5
6#include <tmi8/kv1_types.hpp>
7#include <tmi8/kv1_index.hpp>
8
9#include "cliopts.hpp"
10
11void journeyRoute(const Options &options, Kv1Records &records, Kv1Index &index);
12
13#endif // OEUF_QUERYKV1_JOURNEYROUTE_HPP
diff --git a/src/querykv1/journeys.cpp b/src/querykv1/journeys.cpp
new file mode 100644
index 0000000..96566b2
--- /dev/null
+++ b/src/querykv1/journeys.cpp
@@ -0,0 +1,95 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include <iostream>
4#include <map>
5#include <string_view>
6#include <unordered_set>
7
8#include "journeys.hpp"
9
10using namespace std::string_view_literals;
11
12void journeys(const Options &options, Kv1Records &records, Kv1Index &index) {
13 const std::string_view want_begin_stop_code(options.begin_stop_code);
14 const std::string_view want_end_stop_code(options.end_stop_code);
15
16 FILE *out = stdout;
17 if (options.output_file_path != "-"sv)
18 out = fopen(options.output_file_path, "wb");
19 if (!out) {
20 fprintf(stderr, "Open %s: %s\n", options.output_file_path, strerrordesc_np(errno));
21 exit(EXIT_FAILURE);
22 }
23
24 std::cerr << "Generating journeys for " << options.line_planning_number << ", going from stop "
25 << options.begin_stop_code << " to " << options.end_stop_code << std::endl;
26
27 std::unordered_map<std::string, const Kv1UserStopPoint *> usrstops;
28 for (size_t i = 0; i < records.user_stop_points.size(); i++) {
29 const Kv1UserStopPoint *usrstop = &records.user_stop_points[i];
30 usrstops[usrstop->key.user_stop_code] = usrstop;
31 }
32
33 std::unordered_set<std::string> journey_pattern_codes;
34 for (const auto &jopa : records.journey_patterns) {
35 if (jopa.key.line_planning_number != options.line_planning_number)
36 continue;
37 journey_pattern_codes.insert(jopa.key.journey_pattern_code);
38 }
39
40 std::unordered_map<std::string, std::vector<const Kv1JourneyPatternTimingLink *>> jopatilis;
41 for (size_t i = 0; i < records.journey_pattern_timing_links.size(); i++) {
42 const Kv1JourneyPatternTimingLink *jopatili = &records.journey_pattern_timing_links[i];
43 if (jopatili->key.line_planning_number != options.line_planning_number
44 || !journey_pattern_codes.contains(jopatili->key.journey_pattern_code))
45 continue;
46 jopatilis[jopatili->key.journey_pattern_code].push_back(jopatili);
47 }
48
49 std::unordered_set<std::string> valid_jopas;
50 for (auto &[journey_pattern_code, timing_links] : jopatilis) {
51 std::sort(timing_links.begin(), timing_links.end(), [](auto a, auto b) -> bool {
52 return a->key.timing_link_order < b->key.timing_link_order;
53 });
54 auto begin_stop = timing_links.front()->user_stop_code_begin;
55 auto end_stop = timing_links.back()->user_stop_code_end;
56
57 const auto *begin = usrstops[begin_stop];
58 const auto *end = usrstops[end_stop];
59
60 bool begin_stop_ok = false;
61 if (want_begin_stop_code.starts_with("stop:"))
62 begin_stop_ok = want_begin_stop_code.substr(5) == begin_stop;
63 else if (want_begin_stop_code.starts_with("star:"))
64 begin_stop_ok = want_begin_stop_code.substr(5) == begin->user_stop_area_code;
65
66 bool end_stop_ok = false;
67 if (want_end_stop_code.starts_with("stop:"))
68 end_stop_ok = want_end_stop_code.substr(5) == end_stop;
69 else if (want_end_stop_code.starts_with("star:"))
70 end_stop_ok = want_end_stop_code.substr(5) == end->user_stop_area_code;
71
72 if (begin_stop_ok && end_stop_ok) {
73 valid_jopas.insert(journey_pattern_code);
74 }
75 }
76
77 std::map<int, std::pair<std::string, std::string>> valid_journeys;
78 for (const auto &pujo : records.public_journeys) {
79 if (pujo.key.line_planning_number == options.line_planning_number
80 && valid_jopas.contains(pujo.journey_pattern_code)) {
81 valid_journeys[pujo.key.journey_number] = {
82 pujo.time_demand_group_code,
83 pujo.journey_pattern_code,
84 };
85 }
86 }
87
88 fputs("journey_number,time_demand_group_code,journey_pattern_code\n", out);
89 for (const auto &[journey_number, timdemgrp_jopa] : valid_journeys) {
90 const auto &[time_demand_group_code, journey_pattern_code] = timdemgrp_jopa;
91 fprintf(out, "%d,%s,%s\n", journey_number, time_demand_group_code.c_str(), journey_pattern_code.c_str());
92 }
93
94 if (options.output_file_path != "-"sv) fclose(out);
95}
diff --git a/src/querykv1/journeys.hpp b/src/querykv1/journeys.hpp
new file mode 100644
index 0000000..cf615c7
--- /dev/null
+++ b/src/querykv1/journeys.hpp
@@ -0,0 +1,13 @@
1// vim:set sw=2 ts=2 sts et:
2
3#ifndef OEUF_QUERYKV1_JOURNEYS_HPP
4#define OEUF_QUERYKV1_JOURNEYS_HPP
5
6#include <tmi8/kv1_types.hpp>
7#include <tmi8/kv1_index.hpp>
8
9#include "cliopts.hpp"
10
11void journeys(const Options &options, Kv1Records &records, Kv1Index &index);
12
13#endif // OEUF_QUERYKV1_JOURNEYS_HPP
diff --git a/src/querykv1/main.cpp b/src/querykv1/main.cpp
new file mode 100644
index 0000000..6c606ba
--- /dev/null
+++ b/src/querykv1/main.cpp
@@ -0,0 +1,198 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include <chrono>
4#include <cstdio>
5#include <string>
6#include <string_view>
7#include <vector>
8
9#include <tmi8/kv1_types.hpp>
10#include <tmi8/kv1_index.hpp>
11#include <tmi8/kv1_lexer.hpp>
12#include <tmi8/kv1_parser.hpp>
13
14#include "cliopts.hpp"
15#include "joparoute.hpp"
16#include "journeyinfo.hpp"
17#include "journeyroute.hpp"
18#include "journeys.hpp"
19#include "schedule.hpp"
20
21using namespace std::string_view_literals;
22
23using TimingClock = std::conditional_t<
24 std::chrono::high_resolution_clock::is_steady,
25 std::chrono::high_resolution_clock,
26 std::chrono::steady_clock>;
27
28std::string readKv1(const char *path) {
29 FILE *in = stdin;
30 if (path != "-"sv) in = fopen(path, "rb");
31 else fputs("Reading KV1 from standard input\n", stderr);
32 if (!in) {
33 fprintf(stderr, "Open %s: %s\n", path, strerrordesc_np(errno));
34 exit(1);
35 }
36
37 char buf[4096];
38 std::string data;
39 while (!feof(in) && !ferror(in)) {
40 size_t read = fread(buf, sizeof(char), 4096, in);
41 data.append(buf, read);
42 }
43 if (ferror(in)) {
44 if (path == "-"sv)
45 fputs("Error when reading from stdin\n", stderr);
46 else
47 fprintf(stderr, "Error reading from file \"%s\"\n", path);
48 exit(1);
49 }
50 fprintf(stderr, "Read %lu bytes\n", data.size());
51
52 if (path != "-"sv)
53 fclose(in);
54
55 return data;
56}
57
58std::vector<Kv1Token> lex(const char *path) {
59 std::string data = readKv1(path);
60
61 auto start = TimingClock::now();
62 Kv1Lexer lexer(data);
63 lexer.lex();
64 auto end = TimingClock::now();
65
66 std::chrono::duration<double> elapsed{end - start};
67 double bytes = static_cast<double>(data.size()) / 1'000'000;
68 double speed = bytes / elapsed.count();
69
70 if (!lexer.errors.empty()) {
71 fputs("Lexer reported errors:\n", stderr);
72 for (const auto &error : lexer.errors)
73 fprintf(stderr, "- %s\n", error.c_str());
74 exit(1);
75 }
76
77 fprintf(stderr, "Got %lu tokens\n", lexer.tokens.size());
78 fprintf(stderr, "Duration: %f s\n", elapsed.count());
79 fprintf(stderr, "Speed: %f MB/s\n", speed);
80
81 return std::move(lexer.tokens);
82}
83
84bool parse(const char *path, Kv1Records &into) {
85 std::vector<Kv1Token> tokens = lex(path);
86
87 Kv1Parser parser(tokens, into);
88 parser.parse();
89
90 bool ok = true;
91 if (!parser.gerrors.empty()) {
92 ok = false;
93 fputs("Parser reported errors:\n", stderr);
94 for (const auto &error : parser.gerrors)
95 fprintf(stderr, "- %s\n", error.c_str());
96 }
97 if (!parser.warns.empty()) {
98 fputs("Parser reported warnings:\n", stderr);
99 for (const auto &warn : parser.warns)
100 fprintf(stderr, "- %s\n", warn.c_str());
101 }
102
103 fprintf(stderr, "Parsed %lu records\n", into.size());
104
105 return ok;
106}
107
108void printParsedRecords(const Kv1Records &records) {
109 fputs("Parsed records:\n", stderr);
110 fprintf(stderr, " organizational_units: %lu\n", records.organizational_units.size());
111 fprintf(stderr, " higher_organizational_units: %lu\n", records.higher_organizational_units.size());
112 fprintf(stderr, " user_stop_points: %lu\n", records.user_stop_points.size());
113 fprintf(stderr, " user_stop_areas: %lu\n", records.user_stop_areas.size());
114 fprintf(stderr, " timing_links: %lu\n", records.timing_links.size());
115 fprintf(stderr, " links: %lu\n", records.links.size());
116 fprintf(stderr, " lines: %lu\n", records.lines.size());
117 fprintf(stderr, " destinations: %lu\n", records.destinations.size());
118 fprintf(stderr, " journey_patterns: %lu\n", records.journey_patterns.size());
119 fprintf(stderr, " concession_financer_relations: %lu\n", records.concession_financer_relations.size());
120 fprintf(stderr, " concession_areas: %lu\n", records.concession_areas.size());
121 fprintf(stderr, " financers: %lu\n", records.financers.size());
122 fprintf(stderr, " journey_pattern_timing_links: %lu\n", records.journey_pattern_timing_links.size());
123 fprintf(stderr, " points: %lu\n", records.points.size());
124 fprintf(stderr, " point_on_links: %lu\n", records.point_on_links.size());
125 fprintf(stderr, " icons: %lu\n", records.icons.size());
126 fprintf(stderr, " notices: %lu\n", records.notices.size());
127 fprintf(stderr, " notice_assignments: %lu\n", records.notice_assignments.size());
128 fprintf(stderr, " time_demand_groups: %lu\n", records.time_demand_groups.size());
129 fprintf(stderr, " time_demand_group_run_times: %lu\n", records.time_demand_group_run_times.size());
130 fprintf(stderr, " period_groups: %lu\n", records.period_groups.size());
131 fprintf(stderr, " specific_days: %lu\n", records.specific_days.size());
132 fprintf(stderr, " timetable_versions: %lu\n", records.timetable_versions.size());
133 fprintf(stderr, " public_journeys: %lu\n", records.public_journeys.size());
134 fprintf(stderr, " period_group_validities: %lu\n", records.period_group_validities.size());
135 fprintf(stderr, " exceptional_operating_days: %lu\n", records.exceptional_operating_days.size());
136 fprintf(stderr, " schedule_versions: %lu\n", records.schedule_versions.size());
137 fprintf(stderr, " public_journey_passing_times: %lu\n", records.public_journey_passing_times.size());
138 fprintf(stderr, " operating_days: %lu\n", records.operating_days.size());
139}
140
141void printIndexSize(const Kv1Index &index) {
142 fputs("Index size:\n", stderr);
143 fprintf(stderr, " organizational_units: %lu\n", index.organizational_units.size());
144 fprintf(stderr, " user_stop_points: %lu\n", index.user_stop_points.size());
145 fprintf(stderr, " user_stop_areas: %lu\n", index.user_stop_areas.size());
146 fprintf(stderr, " timing_links: %lu\n", index.timing_links.size());
147 fprintf(stderr, " links: %lu\n", index.links.size());
148 fprintf(stderr, " lines: %lu\n", index.lines.size());
149 fprintf(stderr, " destinations: %lu\n", index.destinations.size());
150 fprintf(stderr, " journey_patterns: %lu\n", index.journey_patterns.size());
151 fprintf(stderr, " concession_financer_relations: %lu\n", index.concession_financer_relations.size());
152 fprintf(stderr, " concession_areas: %lu\n", index.concession_areas.size());
153 fprintf(stderr, " financers: %lu\n", index.financers.size());
154 fprintf(stderr, " journey_pattern_timing_links: %lu\n", index.journey_pattern_timing_links.size());
155 fprintf(stderr, " points: %lu\n", index.points.size());
156 fprintf(stderr, " point_on_links: %lu\n", index.point_on_links.size());
157 fprintf(stderr, " icons: %lu\n", index.icons.size());
158 fprintf(stderr, " notices: %lu\n", index.notices.size());
159 fprintf(stderr, " time_demand_groups: %lu\n", index.time_demand_groups.size());
160 fprintf(stderr, " time_demand_group_run_times: %lu\n", index.time_demand_group_run_times.size());
161 fprintf(stderr, " period_groups: %lu\n", index.period_groups.size());
162 fprintf(stderr, " specific_days: %lu\n", index.specific_days.size());
163 fprintf(stderr, " timetable_versions: %lu\n", index.timetable_versions.size());
164 fprintf(stderr, " public_journeys: %lu\n", index.public_journeys.size());
165 fprintf(stderr, " period_group_validities: %lu\n", index.period_group_validities.size());
166 fprintf(stderr, " exceptional_operating_days: %lu\n", index.exceptional_operating_days.size());
167 fprintf(stderr, " schedule_versions: %lu\n", index.schedule_versions.size());
168 fprintf(stderr, " public_journey_passing_times: %lu\n", index.public_journey_passing_times.size());
169 fprintf(stderr, " operating_days: %lu\n", index.operating_days.size());
170}
171
172int main(int argc, char *argv[]) {
173 Options options = parseOptions(argc, argv);
174
175 Kv1Records records;
176 if (!parse(options.kv1_file_path, records)) {
177 fputs("Error parsing records, exiting\n", stderr);
178 return EXIT_FAILURE;
179 }
180 printParsedRecords(records);
181 fputs("Indexing...\n", stderr);
182 Kv1Index index(&records);
183 fprintf(stderr, "Indexed %lu records\n", index.size());
184 // Only notice assignments are not indexed. If this equality is not valid,
185 // then this means that we had duplicate keys or that something else went
186 // wrong. That would really not be great.
187 assert(index.size() == records.size() - records.notice_assignments.size());
188 printIndexSize(index);
189 fputs("Linking records...\n", stderr);
190 kv1LinkRecords(index);
191 fputs("Done linking\n", stderr);
192
193 if (options.subcommand == "joparoute"sv) jopaRoute(options, records, index);
194 if (options.subcommand == "journeyroute"sv) journeyRoute(options, records, index);
195 if (options.subcommand == "journeys"sv) journeys(options, records, index);
196 if (options.subcommand == "journeyinfo"sv) journeyInfo(options, records, index);
197 if (options.subcommand == "schedule"sv) schedule(options, records, index);
198}
diff --git a/src/querykv1/schedule.cpp b/src/querykv1/schedule.cpp
new file mode 100644
index 0000000..2bcfe0a
--- /dev/null
+++ b/src/querykv1/schedule.cpp
@@ -0,0 +1,63 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include <iostream>
4#include <string_view>
5#include <unordered_map>
6#include <vector>
7
8#include "daterange.hpp"
9#include "schedule.hpp"
10
11using namespace std::string_view_literals;
12
13void schedule(const Options &options, Kv1Records &records, Kv1Index &index) {
14 FILE *out = stdout;
15 if (options.output_file_path != "-"sv)
16 out = fopen(options.output_file_path, "wb");
17 if (!out) {
18 fprintf(stderr, "Open %s: %s\n", options.output_file_path, strerrordesc_np(errno));
19 exit(EXIT_FAILURE);
20 }
21
22 std::cerr << "Generating schedule for " << options.line_planning_number << std::endl;
23
24 std::unordered_multimap<std::string, Kv1PeriodGroupValidity> period_group_validities;
25 for (const auto &pegr : records.period_group_validities)
26 period_group_validities.insert({ pegr.key.period_group_code, pegr });
27 std::unordered_multimap<std::string, Kv1PublicJourney> public_journeys;
28 for (const auto &pujo : records.public_journeys)
29 public_journeys.insert({ pujo.key.timetable_version_code, pujo });
30
31 std::cout << "line_planning_number,journey_number,date,departure_time" << std::endl;
32 for (const auto &tive : records.timetable_versions) {
33 std::vector<DateRange> tive_pegrval_ranges;
34
35 auto pegrval_range = period_group_validities.equal_range(tive.key.period_group_code);
36 for (auto it = pegrval_range.first; it != pegrval_range.second; it++) {
37 const auto &[_, pegrval] = *it;
38 tive_pegrval_ranges.emplace_back(pegrval.key.valid_from, pegrval.valid_thru);
39 }
40
41 DateRangeSeq seq(tive_pegrval_ranges.begin(), tive_pegrval_ranges.end());
42 seq = seq.clampFrom(tive.valid_from);
43 if (tive.valid_thru)
44 seq = seq.clampThru(*tive.valid_thru);
45
46 for (const auto &range : seq) for (auto date : range) {
47 auto weekday = std::chrono::year_month_weekday(std::chrono::sys_days(date)).weekday();
48
49 auto pujo_range = public_journeys.equal_range(tive.key.timetable_version_code);
50 for (auto itt = pujo_range.first; itt != pujo_range.second; itt++) {
51 const auto &[_, pujo] = *itt;
52
53 if (pujo.key.line_planning_number == options.line_planning_number && pujo.key.day_type.size() == 7
54 && pujo.key.day_type[weekday.iso_encoding() - 1] == static_cast<char>('0' + weekday.iso_encoding())) {
55 std::cout << pujo.key.line_planning_number << "," << pujo.key.journey_number << ","
56 << date << "," << pujo.departure_time << std::endl;
57 }
58 }
59 }
60 }
61
62 if (options.output_file_path != "-"sv) fclose(out);
63}
diff --git a/src/querykv1/schedule.hpp b/src/querykv1/schedule.hpp
new file mode 100644
index 0000000..100bd4c
--- /dev/null
+++ b/src/querykv1/schedule.hpp
@@ -0,0 +1,13 @@
1// vim:set sw=2 ts=2 sts et:
2
3#ifndef OEUF_QUERYKV1_SCHEDULE_HPP
4#define OEUF_QUERYKV1_SCHEDULE_HPP
5
6#include <tmi8/kv1_types.hpp>
7#include <tmi8/kv1_index.hpp>
8
9#include "cliopts.hpp"
10
11void schedule(const Options &options, Kv1Records &records, Kv1Index &index);
12
13#endif // OEUF_QUERYKV1_SCHEDULE_HPP
diff --git a/src/recvkv6/.envrc b/src/recvkv6/.envrc
new file mode 100644
index 0000000..694e74f
--- /dev/null
+++ b/src/recvkv6/.envrc
@@ -0,0 +1,2 @@
1source_env ../../
2export DEVMODE=1
diff --git a/src/recvkv6/Makefile b/src/recvkv6/Makefile
new file mode 100644
index 0000000..12ff7fb
--- /dev/null
+++ b/src/recvkv6/Makefile
@@ -0,0 +1,21 @@
1# Taken from:
2# Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide
3# for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01,
4# 2023. [Online]. Available:
5# https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html
6CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\
7 -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \
8 -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \
9 -D_GLIBCXX_ASSERTIONS \
10 -fstrict-flex-arrays=3 \
11 -fstack-clash-protection -fstack-protector-strong
12LDFLAGS=-lzmq -larrow -lparquet -lprometheus-cpp-pull -lprometheus-cpp-core -lz -ltmi8 -Wl,-z,defs \
13 -Wl,-z,nodlopen -Wl,-z,noexecstack \
14 -Wl,-z,relro -Wl,-z,now
15
16recvkv6: main.cpp
17 $(CXX) -o $@ $^ $(CXXFLAGS) $(LDFLAGS)
18
19.PHONY: clean
20clean:
21 rm recvkv6
diff --git a/src/recvkv6/main.cpp b/src/recvkv6/main.cpp
new file mode 100644
index 0000000..2ac3669
--- /dev/null
+++ b/src/recvkv6/main.cpp
@@ -0,0 +1,1300 @@
1// vim:set sw=2 ts=2 sts et:
2
3#include <array>
4#include <cassert>
5#include <chrono>
6#include <csignal>
7#include <cstring>
8#include <filesystem>
9#include <format>
10#include <fstream>
11#include <iostream>
12#include <optional>
13#include <stack>
14#include <string>
15#include <sstream>
16#include <vector>
17
18#include <zlib.h>
19#include <zmq.h>
20
21#include <nlohmann/json.hpp>
22
23#include <prometheus/counter.h>
24#include <prometheus/exposer.h>
25#include <prometheus/histogram.h>
26#include <prometheus/registry.h>
27
28#include <rapidxml/rapidxml.hpp>
29
30#include <tmi8/kv6_parquet.hpp>
31
32#define CHUNK 16384
33
34struct RawMessage {
35 public:
36 // Takes ownership of envelope and body
37 RawMessage(zmq_msg_t envelope, zmq_msg_t body)
38 : envelope(envelope), body(body)
39 {}
40
41 // Prevent copying
42 RawMessage(const RawMessage &) = delete;
43 RawMessage &operator=(RawMessage const &) = delete;
44
45 std::string_view getEnvelope() {
46 return static_cast<const char *>(zmq_msg_data(&envelope));
47 }
48
49 char *getBody() {
50 return static_cast<char *>(zmq_msg_data(&body));
51 }
52
53 size_t getBodySize() {
54 return zmq_msg_size(&body);
55 }
56
57 ~RawMessage() {
58 zmq_msg_close(&envelope);
59 zmq_msg_close(&body);
60 }
61
62 private:
63 zmq_msg_t envelope;
64 zmq_msg_t body;
65};
66
67std::optional<RawMessage> recvMsg(void *socket) {
68 while (true) {
69 zmq_msg_t envelope, body;
70 int rc = zmq_msg_init(&envelope);
71 assert(rc == 0);
72 rc = zmq_msg_init(&body);
73 assert(rc == 0);
74
75 rc = zmq_msg_recv(&envelope, socket, 0);
76 if (rc == -1) return std::nullopt;
77
78 int more;
79 size_t more_size = sizeof(more);
80 rc = zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size);
81 if (!more) {
82 zmq_msg_close(&envelope);
83 zmq_msg_close(&body);
84 continue;
85 }
86
87 rc = zmq_msg_recv(&body, socket, 0);
88 if (rc == -1) return std::nullopt;
89
90 rc = zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size);
91 assert(!more);
92
93 return std::make_optional<RawMessage>(envelope, body);
94 }
95}
96
97// Ensures that <return value>[output_size] == 0
98char *decompress(char *raw, unsigned int input_size, unsigned int &output_size) {
99 assert(input_size <= UINT32_MAX);
100
101 z_stream strm;
102 strm.next_in = reinterpret_cast<unsigned char *>(raw);
103 strm.avail_in = input_size;
104 strm.zalloc = Z_NULL;
105 strm.zfree = Z_NULL;
106 strm.opaque = Z_NULL;
107 int rc = inflateInit2(&strm, 32);
108 assert(rc == Z_OK);
109
110 unsigned int buf_cap = CHUNK;
111 unsigned int buf_len = 0;
112 char *buf = static_cast<char *>(malloc(CHUNK));
113 do {
114 if (buf_len + CHUNK > buf_cap) {
115 assert(buf_cap <= UINT32_MAX);
116 buf_cap *= 2;
117 buf = static_cast<char *>(realloc(buf, buf_cap));
118 }
119 strm.avail_out = buf_cap - buf_len;
120 strm.next_out = reinterpret_cast<unsigned char *>(buf + buf_len);
121
122 unsigned long old_total = strm.total_out;
123 rc = inflate(&strm, Z_FINISH);
124 unsigned progress = static_cast<unsigned int>(strm.total_out - old_total);
125 buf_len += progress;
126 assert(progress != 0 || rc == Z_STREAM_END);
127 } while (strm.total_in < input_size);
128
129 if (buf_len == buf_cap) {
130 buf = static_cast<char *>(realloc(buf, buf_len + 1));
131 }
132 buf[buf_len] = 0;
133 output_size = buf_len;
134
135 rc = inflateEnd(&strm);
136 assert(rc == Z_OK);
137
138 return buf;
139}
140
141struct Date {
142 int16_t year = 0;
143 uint8_t month = 0;
144 uint8_t day = 0;
145
146 static bool parse(Date &dest, std::string_view src) {
147 dest.year = 0, dest.month = 0, dest.day = 0;
148
149 int16_t y_mul_fac = 1;
150 bool extended = false;
151
152 size_t plus = src.find('+');
153 if (plus != std::string_view::npos) {
154 extended = true;
155 src = src.substr(1); // remove plus sign from the start
156 }
157 if (!extended) {
158 size_t min_or_dash = src.find('-');
159 if (min_or_dash == std::string_view::npos) return false;
160 if (min_or_dash == 0) {
161 y_mul_fac = -1; // it's a minus sign
162 src = src.substr(1); // remove minus sign at the start
163 }
164 }
165
166 int y_chars = 0;
167 while (src.size() > 0 && src[0] >= '0' && src[0] <= '9') {
168 dest.year = static_cast<int16_t>(dest.year * 10 + src[0] - '0');
169 src = src.substr(1);
170 y_chars++;
171 }
172 if (src.size() == 0) { dest.year = 0; return false; }
173 if (src[0] != '-') { dest.year = 0; return false; }
174 src = src.substr(1); // remove dash
175 if (y_chars < 4 || (y_chars > 4 && !extended)) { dest.year = 0; return false; }
176 dest.year *= y_mul_fac;
177
178 bool rest_correct = src.size() == 5
179 && src[0] >= '0' && src[0] <= '9'
180 && src[1] >= '0' && src[1] <= '9'
181 && src[3] >= '0' && src[3] <= '9'
182 && src[4] >= '0' && src[4] <= '9';
183 if (!rest_correct) { dest.year = 0; return false; }
184 dest.month = static_cast<uint8_t>((src[0] - '0') * 10 + src[1] - '0');
185 dest.day = static_cast<uint8_t>((src[3] - '0') * 10 + src[4] - '0');
186 if (dest.month > 12 || dest.day > 31) {
187 dest.year = 0, dest.month = 0, dest.day = 0;
188 return false;
189 }
190 return true;
191 }
192
193 std::string toString() const {
194 if (year < 0 || year > 9999 || month < 0 || month > 12 || day < 0 || day > 31)
195 throw std::invalid_argument("one or more date components (year, month, day) out of range");
196 char data[11] = "XXXX-XX-XX";
197 sprintf(data, "%04u-%02u-%02u", year, month, day);
198 return data;
199 }
200
201 std::chrono::days toUnixDays() const {
202 std::chrono::year_month_day ymd{std::chrono::year(year), std::chrono::month(month), std::chrono::day(day)};
203 // This is valid since C++20: as of C++20, the system clock is defined to measure the
204 // Unix Time, the amount of seconds since Thursday 1 January 1970, without leap seconds.
205 std::chrono::days since_epoch = std::chrono::sys_days(ymd).time_since_epoch();
206 return since_epoch;
207 }
208};
209
210struct Time {
211 uint8_t hour = 0;
212 uint8_t minute = 0;
213 uint8_t second = 0;
214
215 static bool parse(Time &dest, std::string_view src) {
216 bool okay = src.size() == 8
217 && src[0] >= '0' && src[0] <= '9'
218 && src[1] >= '0' && src[1] <= '9'
219 && src[2] == ':'
220 && src[3] >= '0' && src[3] <= '9'
221 && src[4] >= '0' && src[4] <= '9'
222 && src[5] == ':'
223 && src[6] >= '0' && src[6] <= '9'
224 && src[7] >= '0' && src[7] <= '9';
225 if (!okay) return false;
226 dest.hour = static_cast<uint8_t>((src[0] - '0') * 10 + src[1] - '0');
227 dest.minute = static_cast<uint8_t>((src[3] - '0') * 10 + src[4] - '0');
228 dest.second = static_cast<uint8_t>((src[6] - '0') * 10 + src[7] - '0');
229 if (dest.hour > 23 || dest.minute > 59 || dest.second > 59) {
230 dest.hour = 0, dest.minute = 0, dest.second = 0;
231 return false;
232 }
233 return true;
234 }
235
236 std::string toString() const {
237 if (hour < 0 || hour > 23 || minute < 0 || minute > 59 || second < 0 || second > 59)
238 throw std::invalid_argument("one or more time components (hour, minute, second) out of range");
239 char data[9] = "XX:XX:XX";
240 sprintf(data, "%02u:%02u:%02u", hour, minute, second);
241 return data;
242 }
243};
244
245// Time zone designator
246struct Tzd {
247 int16_t minutes = 0;
248
249 static bool parse(Tzd &dest, std::string_view src) {
250 dest.minutes = 0;
251
252 if (src.size() == 0) return false;
253 if (src == "Z") return true;
254
255 int16_t multiplier = 1;
256 if (src[0] == '-') multiplier = -1;
257 else if (src[0] != '+') return false;
258 src = src.substr(1);
259
260 bool okay = src.size() == 5
261 && src[0] >= '0' && src[0] <= '9'
262 && src[1] >= '0' && src[1] <= '9'
263 && src[2] == ':'
264 && src[3] >= '0' && src[3] <= '9'
265 && src[4] >= '0' && src[4] <= '9';
266 if (!okay) return false;
267 int16_t hours = static_cast<int16_t>((src[0] - '0') * 10 + src[1] - '0');
268 int16_t minutes = static_cast<int16_t>((src[3] - '0') * 10 + src[4] - '0');
269 if (hours > 23 || minutes > 59) return false;
270 dest.minutes = static_cast<int16_t>(multiplier * (60 * hours + minutes));
271 return true;
272 }
273
274 std::string toString() const {
275 if (minutes == 0)
276 return "Z";
277
278 bool negative = minutes < 0;
279 int hours_off = abs(minutes / 60);
280 int mins_off = abs(minutes) - hours_off*60;
281 if (hours_off > 23 || mins_off > 59)
282 throw std::invalid_argument("offset out of range");
283 char data[7] = "+XX:XX";
284 sprintf(data, "%c%02u:%02u", negative ? '-' : '+', hours_off, mins_off);
285 return data;
286 }
287};
288
289struct Timestamp {
290 Date date;
291 Tzd off;
292 Time time;
293
294 static bool parse(Timestamp &dest, std::string_view src) {
295 size_t t = src.find('T');
296 if (t == std::string_view::npos || t + 1 >= src.size()) return false;
297
298 std::string_view date = src.substr(0, t);
299 std::string_view time_and_tzd = src.substr(t + 1);
300 if (time_and_tzd.size() < 9) return false;
301 if (!Date::parse(dest.date, date)) return false;
302
303 std::string_view time = time_and_tzd.substr(0, 8);
304 std::string_view tzd = time_and_tzd.substr(8);
305 if (!Time::parse(dest.time, time)) return false;
306 return Tzd::parse(dest.off, tzd);
307 }
308
309 std::string toString() const {
310 return date.toString() + "T" + time.toString() + off.toString();
311 }
312
313 std::chrono::seconds toUnixSeconds() const {
314 std::chrono::year_month_day ymd(std::chrono::year(date.year),
315 std::chrono::month(date.month),
316 std::chrono::day(date.day));
317 std::chrono::sys_days sys_days(ymd);
318 std::chrono::time_point<std::chrono::utc_clock, std::chrono::days> utc_days(sys_days.time_since_epoch());
319 std::chrono::utc_seconds utc_seconds = std::chrono::time_point_cast<std::chrono::seconds>(utc_days);
320 utc_seconds += std::chrono::hours(time.hour) + std::chrono::minutes(time.minute) +
321 std::chrono::seconds(time.second) - std::chrono::minutes(off.minutes);
322 std::chrono::sys_seconds sys_seconds = std::chrono::utc_clock::to_sys(utc_seconds);
323 std::chrono::seconds unix = sys_seconds.time_since_epoch();
324 return unix;
325 }
326};
327
328static const std::string_view TMI8_XML_NS = "http://bison.connekt.nl/tmi8/kv6/msg";
329
330enum Kv6RecordType {
331 KV6T_UNKNOWN = 0,
332 KV6T_DELAY = 1,
333 KV6T_INIT = 2,
334 KV6T_ARRIVAL = 3,
335 KV6T_ON_STOP = 4,
336 KV6T_DEPARTURE = 5,
337 KV6T_ON_ROUTE = 6,
338 KV6T_ON_PATH = 7,
339 KV6T_OFF_ROUTE = 8,
340 KV6T_END = 9,
341 // Always keep this updated to correspond to the
342 // first and last elements of the enumeration!
343 _KV6T_FIRST_TYPE = KV6T_UNKNOWN,
344 _KV6T_LAST_TYPE = KV6T_END,
345};
346
347enum Kv6Field {
348 KV6F_NONE = 0,
349 KV6F_DATA_OWNER_CODE = 1,
350 KV6F_LINE_PLANNING_NUMBER = 2,
351 KV6F_OPERATING_DAY = 4,
352 KV6F_JOURNEY_NUMBER = 8,
353 KV6F_REINFORCEMENT_NUMBER = 16,
354 KV6F_TIMESTAMP = 32,
355 KV6F_SOURCE = 64,
356 KV6F_PUNCTUALITY = 128,
357 KV6F_USER_STOP_CODE = 256,
358 KV6F_PASSAGE_SEQUENCE_NUMBER = 512,
359 KV6F_VEHICLE_NUMBER = 1024,
360 KV6F_BLOCK_CODE = 2048,
361 KV6F_WHEELCHAIR_ACCESSIBLE = 4096,
362 KV6F_NUMBER_OF_COACHES = 8192,
363 KV6F_RD_Y = 16384,
364 KV6F_RD_X = 32768,
365 KV6F_DISTANCE_SINCE_LAST_USER_STOP = 65536,
366};
367
368static constexpr Kv6Field KV6T_REQUIRED_FIELDS[_KV6T_LAST_TYPE + 1] = {
369 // KV6T_UNKNOWN
370 KV6F_NONE,
371 // KV6T_DELAY
372 static_cast<Kv6Field>(
373 KV6F_DATA_OWNER_CODE
374 | KV6F_LINE_PLANNING_NUMBER
375 | KV6F_OPERATING_DAY
376 | KV6F_JOURNEY_NUMBER
377 | KV6F_REINFORCEMENT_NUMBER
378 | KV6F_TIMESTAMP
379 | KV6F_SOURCE
380 | KV6F_PUNCTUALITY),
381 // KV6T_INIT
382 static_cast<Kv6Field>(
383 KV6F_DATA_OWNER_CODE
384 | KV6F_LINE_PLANNING_NUMBER
385 | KV6F_OPERATING_DAY
386 | KV6F_JOURNEY_NUMBER
387 | KV6F_REINFORCEMENT_NUMBER
388 | KV6F_TIMESTAMP
389 | KV6F_SOURCE
390 | KV6F_USER_STOP_CODE
391 | KV6F_PASSAGE_SEQUENCE_NUMBER
392 | KV6F_VEHICLE_NUMBER
393 | KV6F_BLOCK_CODE
394 | KV6F_WHEELCHAIR_ACCESSIBLE
395 | KV6F_NUMBER_OF_COACHES),
396 // KV6T_ARRIVAL
397 static_cast<Kv6Field>(
398 KV6F_DATA_OWNER_CODE
399 | KV6F_LINE_PLANNING_NUMBER
400 | KV6F_OPERATING_DAY
401 | KV6F_JOURNEY_NUMBER
402 | KV6F_REINFORCEMENT_NUMBER
403 | KV6F_USER_STOP_CODE
404 | KV6F_PASSAGE_SEQUENCE_NUMBER
405 | KV6F_TIMESTAMP
406 | KV6F_SOURCE
407 | KV6F_VEHICLE_NUMBER
408 | KV6F_PUNCTUALITY),
409 // KV6T_ON_STOP
410 static_cast<Kv6Field>(
411 KV6F_DATA_OWNER_CODE
412 | KV6F_LINE_PLANNING_NUMBER
413 | KV6F_OPERATING_DAY
414 | KV6F_JOURNEY_NUMBER
415 | KV6F_REINFORCEMENT_NUMBER
416 | KV6F_USER_STOP_CODE
417 | KV6F_PASSAGE_SEQUENCE_NUMBER
418 | KV6F_TIMESTAMP
419 | KV6F_SOURCE
420 | KV6F_VEHICLE_NUMBER
421 | KV6F_PUNCTUALITY),
422 // KV6T_DEPARTURE
423 static_cast<Kv6Field>(
424 KV6F_DATA_OWNER_CODE
425 | KV6F_LINE_PLANNING_NUMBER
426 | KV6F_OPERATING_DAY
427 | KV6F_JOURNEY_NUMBER
428 | KV6F_REINFORCEMENT_NUMBER
429 | KV6F_USER_STOP_CODE
430 | KV6F_PASSAGE_SEQUENCE_NUMBER
431 | KV6F_TIMESTAMP
432 | KV6F_SOURCE
433 | KV6F_VEHICLE_NUMBER
434 | KV6F_PUNCTUALITY),
435 // KV6T_ON_ROUTE
436 static_cast<Kv6Field>(
437 KV6F_DATA_OWNER_CODE
438 | KV6F_LINE_PLANNING_NUMBER
439 | KV6F_OPERATING_DAY
440 | KV6F_JOURNEY_NUMBER
441 | KV6F_REINFORCEMENT_NUMBER
442 | KV6F_USER_STOP_CODE
443 | KV6F_PASSAGE_SEQUENCE_NUMBER
444 | KV6F_TIMESTAMP
445 | KV6F_SOURCE
446 | KV6F_VEHICLE_NUMBER
447 | KV6F_PUNCTUALITY
448 | KV6F_RD_X
449 | KV6F_RD_Y),
450 // KV6T_ON_PATH
451 KV6F_NONE,
452 // KV6T_OFF_ROUTE
453 static_cast<Kv6Field>(
454 KV6F_DATA_OWNER_CODE
455 | KV6F_LINE_PLANNING_NUMBER
456 | KV6F_OPERATING_DAY
457 | KV6F_JOURNEY_NUMBER
458 | KV6F_REINFORCEMENT_NUMBER
459 | KV6F_TIMESTAMP
460 | KV6F_SOURCE
461 | KV6F_USER_STOP_CODE
462 | KV6F_PASSAGE_SEQUENCE_NUMBER
463 | KV6F_VEHICLE_NUMBER
464 | KV6F_RD_X
465 | KV6F_RD_Y),
466 // KV6T_END
467 static_cast<Kv6Field>(
468 KV6F_DATA_OWNER_CODE
469 | KV6F_LINE_PLANNING_NUMBER
470 | KV6F_OPERATING_DAY
471 | KV6F_JOURNEY_NUMBER
472 | KV6F_REINFORCEMENT_NUMBER
473 | KV6F_TIMESTAMP
474 | KV6F_SOURCE
475 | KV6F_USER_STOP_CODE
476 | KV6F_PASSAGE_SEQUENCE_NUMBER
477 | KV6F_VEHICLE_NUMBER),
478};
479
480static constexpr Kv6Field KV6T_OPTIONAL_FIELDS[_KV6T_LAST_TYPE + 1] = {
481 // KV6T_UNKNOWN
482 KV6F_NONE,
483 // KV6T_DELAY
484 KV6F_NONE,
485 // KV6T_INIT
486 KV6F_NONE,
487 // KV6T_ARRIVAL
488 static_cast<Kv6Field>(KV6F_RD_X | KV6F_RD_Y),
489 // KV6T_ON_STOP
490 static_cast<Kv6Field>(KV6F_RD_X | KV6F_RD_Y),
491 // KV6T_DEPARTURE
492 static_cast<Kv6Field>(KV6F_RD_X | KV6F_RD_Y),
493 // KV6T_ON_ROUTE
494 KV6F_DISTANCE_SINCE_LAST_USER_STOP,
495 // KV6T_ON_PATH
496 KV6F_NONE,
497 // KV6T_OFF_ROUTE
498 KV6F_NONE,
499 // KV6T_END
500 KV6F_NONE,
501};
502
503struct Kv6Record {
504 Kv6RecordType type = KV6T_UNKNOWN;
505 Kv6Field presence = KV6F_NONE;
506 Kv6Field next = KV6F_NONE;
507 std::string data_owner_code;
508 std::string line_planning_number;
509 std::string source;
510 std::string user_stop_code;
511 std::string wheelchair_accessible;
512 Date operating_day;
513 Timestamp timestamp;
514 uint32_t block_code = 0;
515 uint32_t journey_number = 0;
516 uint32_t vehicle_number = 0;
517 int32_t rd_x = 0;
518 int32_t rd_y = 0;
519 // The TMI8 specification is unclear: this field
520 // might actually be called distancesincelaststop
521 uint32_t distance_since_last_user_stop = 0;
522 uint16_t passage_sequence_number = 0;
523 int16_t punctuality = 0;
524 uint8_t number_of_coaches = 0;
525 uint8_t reinforcement_number = 0;
526
527 void markPresent(Kv6Field field) {
528 presence = static_cast<Kv6Field>(presence | field);
529 }
530
531 void removeUnsupportedFields() {
532 Kv6Field required_fields = KV6T_REQUIRED_FIELDS[type];
533 Kv6Field optional_fields = KV6T_OPTIONAL_FIELDS[type];
534 Kv6Field supported_fields = static_cast<Kv6Field>(required_fields | optional_fields);
535 presence = static_cast<Kv6Field>(presence & supported_fields);
536 }
537
538 bool valid() {
539 Kv6Field required_fields = KV6T_REQUIRED_FIELDS[type];
540 Kv6Field optional_fields = KV6T_OPTIONAL_FIELDS[type];
541 Kv6Field supported_fields = static_cast<Kv6Field>(required_fields | optional_fields);
542
543 Kv6Field required_field_presence = static_cast<Kv6Field>(presence & required_fields);
544 Kv6Field unsupported_field_presence = static_cast<Kv6Field>(presence & ~supported_fields);
545
546 return required_field_presence == required_fields && !unsupported_field_presence;
547 }
548};
549
550enum Tmi8VvTmPushInfoField {
551 TMI8F_NONE = 0,
552 TMI8F_SUBSCRIBER_ID = 1,
553 TMI8F_VERSION = 2,
554 TMI8F_DOSSIER_NAME = 4,
555 TMI8F_TIMESTAMP = 8,
556};
557
558struct Tmi8VvTmPushInfo {
559 Tmi8VvTmPushInfoField next = TMI8F_NONE;
560 Tmi8VvTmPushInfoField presence = TMI8F_NONE;
561 std::string subscriber_id;
562 std::string version;
563 std::string dossier_name;
564 Timestamp timestamp;
565 std::vector<Kv6Record> messages;
566
567 void markPresent(Tmi8VvTmPushInfoField field) {
568 presence = static_cast<Tmi8VvTmPushInfoField>(presence | field);
569 }
570
571 bool valid() {
572 const Tmi8VvTmPushInfoField REQUIRED_FIELDS =
573 static_cast<Tmi8VvTmPushInfoField>(
574 TMI8F_SUBSCRIBER_ID
575 | TMI8F_VERSION
576 | TMI8F_DOSSIER_NAME
577 | TMI8F_TIMESTAMP);
578 return (presence & REQUIRED_FIELDS) == REQUIRED_FIELDS;
579 }
580};
581
582static const std::array<std::string_view, _KV6T_LAST_TYPE + 1> KV6_POS_INFO_RECORD_TYPES = {
583 "UNKNOWN", "DELAY", "INIT", "ARRIVAL", "ONSTOP", "DEPARTURE", "ONROUTE", "ONPATH", "OFFROUTE", "END",
584};
585
586std::optional<std::string_view> findKv6PosInfoRecordTypeName(Kv6RecordType type) {
587 if (type > _KV6T_LAST_TYPE)
588 return std::nullopt;
589 return KV6_POS_INFO_RECORD_TYPES[type];
590}
591
592const std::array<std::tuple<std::string_view, Kv6Field>, 17> KV6_POS_INFO_RECORD_FIELDS = {{
593 { "dataownercode", KV6F_DATA_OWNER_CODE },
594 { "lineplanningnumber", KV6F_LINE_PLANNING_NUMBER },
595 { "operatingday", KV6F_OPERATING_DAY },
596 { "journeynumber", KV6F_JOURNEY_NUMBER },
597 { "reinforcementnumber", KV6F_REINFORCEMENT_NUMBER },
598 { "timestamp", KV6F_TIMESTAMP },
599 { "source", KV6F_SOURCE },
600 { "punctuality", KV6F_PUNCTUALITY },
601 { "userstopcode", KV6F_USER_STOP_CODE },
602 { "passagesequencenumber", KV6F_PASSAGE_SEQUENCE_NUMBER },
603 { "vehiclenumber", KV6F_VEHICLE_NUMBER },
604 { "blockcode", KV6F_BLOCK_CODE },
605 { "wheelchairaccessible", KV6F_WHEELCHAIR_ACCESSIBLE },
606 { "numberofcoaches", KV6F_NUMBER_OF_COACHES },
607 { "rd-y", KV6F_RD_Y },
608 { "rd-x", KV6F_RD_X },
609 { "distancesincelastuserstop", KV6F_DISTANCE_SINCE_LAST_USER_STOP },
610}};
611
612// Returns the maximum amount of digits such that it is guaranteed that
613// a corresponding amount of repeated 9's can be represented by the type.
614template<std::integral T>
615constexpr size_t maxDigits() {
616 size_t digits = 0;
617 for (T x = std::numeric_limits<T>::max(); x != 0; x /= 10) digits++;
618 return digits - 1;
619}
620
621template<size_t MaxDigits, std::unsigned_integral T>
622constexpr bool parseUnsigned(T &out, std::string_view src) {
623 static_assert(MaxDigits <= maxDigits<T>());
624 if (src.size() > MaxDigits) return false;
625 T res = 0;
626 while (src.size() > 0) {
627 if (src[0] < '0' || src[0] > '9') return false;
628 res = static_cast<T>(res * 10 + src[0] - '0');
629 src = src.substr(1);
630 }
631 out = res;
632 return true;
633}
634
635template<size_t MaxDigits, std::signed_integral T>
636constexpr bool parseSigned(T &out, std::string_view src) {
637 static_assert(MaxDigits <= maxDigits<T>());
638 if (src.size() == 0) return false;
639 bool negative = src[0] == '-';
640 if (negative) src = src.substr(1);
641 if (src.size() > MaxDigits) return false;
642 T res = 0;
643 while (src.size() > 0) {
644 if (src[0] < '0' || src[0] > '9') return false;
645 res = static_cast<T>(res * 10 + src[0] - '0');
646 src = src.substr(1);
647 }
648 out = negative ? -res : res;
649 return true;
650}
651
652struct Xmlns {
653 const Xmlns *next;
654 std::string_view prefix;
655 std::string_view url;
656};
657
658std::optional<std::string_view> resolve(std::string_view prefix, const Xmlns *nss) {
659 while (nss)
660 if (nss->prefix == prefix)
661 return nss->url;
662 else
663 nss = nss->next;
664 return std::nullopt;
665}
666
667template<typename T>
668void withXmlnss(const rapidxml::xml_attribute<> *attr, const Xmlns *nss, const T &fn) {
669 while (attr) {
670 std::string_view name(attr->name(), attr->name_size());
671 if (name.starts_with("xmlns")) {
672 if (name.size() == 5) { // just xmlns
673 Xmlns ns0 = {
674 .next = nss,
675 .url = std::string_view(attr->value(), attr->value_size()),
676 };
677 withXmlnss(attr->next_attribute(), &ns0, fn);
678 return;
679 } else if (name.size() > 6 && name[5] == ':') { // xmlns:<something>
680 Xmlns ns0 = {
681 .next = nss,
682 .prefix = name.substr(6),
683 .url = std::string_view(attr->value(), attr->value_size()),
684 };
685 withXmlnss(attr->next_attribute(), &ns0, fn);
686 return;
687 }
688 }
689 attr = attr->next_attribute();
690 }
691 fn(nss);
692}
693
694template<typename T>
695void ifResolvable(const rapidxml::xml_node<> &node, const Xmlns *nss, const T &fn) {
696 std::string_view name(node.name(), node.name_size());
697 std::string_view ns;
698 size_t colon = name.find(':');
699
700 if (colon != std::string_view::npos) {
701 if (colon >= name.size() - 1) // last character
702 return;
703 ns = name.substr(0, colon);
704 name = name.substr(colon + 1);
705 }
706
707 withXmlnss(node.first_attribute(), nss, [&](const Xmlns *nss) {
708 std::optional<std::string_view> ns_url = resolve(ns, nss);
709 if (!ns_url && !ns.empty()) return;
710 if (!ns_url) fn(std::string_view(), name, nss);
711 else fn(*ns_url, name, nss);
712 });
713}
714
715template<typename T>
716void ifTmi8Element(const rapidxml::xml_node<> &node, const Xmlns *nss, const T &fn) {
717 ifResolvable(node, nss, [&](std::string_view ns_url, std::string_view name, const Xmlns *nss) {
718 if (node.type() == rapidxml::node_element && (ns_url.empty() || ns_url == TMI8_XML_NS)) fn(name, nss);
719 });
720}
721
722bool onlyTextElement(const rapidxml::xml_node<> &node) {
723 return node.type() == rapidxml::node_element
724 && node.first_node()
725 && node.first_node() == node.last_node()
726 && node.first_node()->type() == rapidxml::node_data;
727}
728
729std::string_view getValue(const rapidxml::xml_node<> &node) {
730 return std::string_view(node.value(), node.value_size());
731}
732
733bool parseStringValue(std::string &into, size_t max_len, std::string_view val) {
734 if (val.size() > max_len)
735 return false;
736 into = val;
737 return true;
738}
739
740struct Kv6Parser {
741 std::stringstream &errs;
742 std::stringstream &warns;
743
744 void error(std::string_view msg) {
745 errs << msg << '\n';
746 }
747
748 void warn(std::string_view msg) {
749 warns << msg << '\n';
750 }
751
752#define PERRASSERT(msg, ...) do { if (!(__VA_ARGS__)) { error(msg); return; } } while (false)
753#define PWARNASSERT(msg, ...) do { if (!(__VA_ARGS__)) { warn(msg); return; } } while (false)
754
755 std::optional<Kv6Record> parseKv6PosInfoRecord(Kv6RecordType type, const rapidxml::xml_node<> &node, const Xmlns *nss) {
756 Kv6Record fields = { .type = type };
757 for (const rapidxml::xml_node<> *child = node.first_node(); child; child = child->next_sibling()) {
758 ifTmi8Element(*child, nss, [&](std::string_view name, const Xmlns *nss) {
759 for (const auto &[fname, field] : KV6_POS_INFO_RECORD_FIELDS) {
760 if (field == KV6F_NONE)
761 continue;
762 if (fname == name) {
763 PWARNASSERT("Expected KV6 record field element to only contain data",
764 onlyTextElement(*child));
765 std::string_view childval = getValue(*child);
766 switch (field) {
767 case KV6F_DATA_OWNER_CODE:
768 PWARNASSERT("Invalid value for dataownercode",
769 parseStringValue(fields.data_owner_code, 10, childval));
770 break;
771 case KV6F_LINE_PLANNING_NUMBER:
772 PWARNASSERT("Invalid value for lineplanningnumber",
773 parseStringValue(fields.line_planning_number, 10, childval));
774 break;
775 case KV6F_OPERATING_DAY:
776 PWARNASSERT("Invalid value for operatatingday: not a valid date",
777 Date::parse(fields.operating_day, childval));
778 break;
779 case KV6F_JOURNEY_NUMBER:
780 PWARNASSERT("Invalid value for journeynumber:"
781 " not a valid unsigned number with at most six digits",
782 parseUnsigned<6>(fields.journey_number, childval));
783 break;
784 case KV6F_REINFORCEMENT_NUMBER:
785 PWARNASSERT("Invalid value for reinforcementnumber:"
786 " not a valid unsigned number with at most two digits",
787 parseUnsigned<2>(fields.reinforcement_number, childval));
788 break;
789 case KV6F_TIMESTAMP:
790 PWARNASSERT("Invalid value for timestamp: not a valid timestamp",
791 Timestamp::parse(fields.timestamp, childval));
792 break;
793 case KV6F_SOURCE:
794 PWARNASSERT("Invalid value for source:"
795 " not a valid string of at most 10 bytes",
796 parseStringValue(fields.source, 10, childval));
797 break;
798 case KV6F_PUNCTUALITY:
799 PWARNASSERT("Invalid value for punctuality:"
800 " not a valid signed number with at most four digits",
801 parseSigned<4>(fields.punctuality, childval));
802 break;
803 case KV6F_USER_STOP_CODE:
804 PWARNASSERT("Invalid value for userstopcode:"
805 " not a valid string of at most 10 bytes",
806 parseStringValue(fields.user_stop_code, 10, childval));
807 break;
808 case KV6F_PASSAGE_SEQUENCE_NUMBER:
809 PWARNASSERT("Invalid value for passagesequencenumber:"
810 " not a valid unsigned number with at most four digits",
811 parseUnsigned<4>(fields.passage_sequence_number, childval));
812 break;
813 case KV6F_VEHICLE_NUMBER:
814 PWARNASSERT("Invalid value for vehiclenumber:"
815 " not a valid unsigned number with at most six digits",
816 parseUnsigned<6>(fields.vehicle_number, childval));
817 break;
818 case KV6F_BLOCK_CODE:
819 PWARNASSERT("Invalid value for blockcode:"
820 " not a valid unsigned number with at most eight digits",
821 parseUnsigned<8>(fields.block_code, childval));
822 break;
823 case KV6F_WHEELCHAIR_ACCESSIBLE:
824 PWARNASSERT("Invalid value for wheelchairaccessible:"
825 " not a valid value for wheelchair accessibility",
826 childval == "ACCESSIBLE"
827 || childval == "NOTACCESSIBLE"
828 || childval == "UNKNOWN");
829 fields.wheelchair_accessible = childval;
830 break;
831 case KV6F_NUMBER_OF_COACHES:
832 PWARNASSERT("Invalid for numberofcoaches:"
833 " not a valid unsigned number with at most two digits",
834 parseUnsigned<2>(fields.number_of_coaches, childval));
835 break;
836 case KV6F_RD_X:
837 PWARNASSERT("Invalid value for rd-x:"
838 " not a valid signed number with at most six digits",
839 parseSigned<6>(fields.rd_x, childval));
840 break;
841 case KV6F_RD_Y:
842 PWARNASSERT("Invalid value for rd-y:"
843 " not a valid signed number with at most six digits",
844 parseSigned<6>(fields.rd_y, childval));
845 break;
846 case KV6F_DISTANCE_SINCE_LAST_USER_STOP:
847 PWARNASSERT("Invalid value for distancesincelastuserstop:"
848 " not a valid unsigned number with at most five digits",
849 parseUnsigned<5>(fields.distance_since_last_user_stop, childval));
850 break;
851 case KV6F_NONE:
852 error("NONE field type case should be unreachable in parseKv6PosInfoRecord");
853 return;
854 }
855 fields.markPresent(field);
856 break;
857 }
858 }
859 });
860 }
861
862 fields.removeUnsupportedFields();
863
864 if (!fields.valid())
865 return std::nullopt;
866 return fields;
867 }
868
869 std::vector<Kv6Record> parseKv6PosInfo(const rapidxml::xml_node<> &node, const Xmlns *nss) {
870 std::vector<Kv6Record> records;
871 for (const rapidxml::xml_node<> *child = node.first_node(); child; child = child->next_sibling()) {
872 ifTmi8Element(*child, nss, [&](std::string_view name, const Xmlns *nss) {
873 for (auto type = _KV6T_FIRST_TYPE;
874 type != _KV6T_LAST_TYPE;
875 type = static_cast<Kv6RecordType>(type + 1)) {
876 if (type == KV6T_UNKNOWN)
877 continue;
878 if (KV6_POS_INFO_RECORD_TYPES[type] == name) {
879 auto record = parseKv6PosInfoRecord(type, *child, nss);
880 if (record) {
881 records.push_back(*record);
882 }
883 }
884 }
885 });
886 }
887 return records;
888 }
889
890 std::optional<Tmi8VvTmPushInfo> parseVvTmPush(const rapidxml::xml_node<> &node, const Xmlns *nss) {
891 Tmi8VvTmPushInfo info;
892 for (const rapidxml::xml_node<> *child = node.first_node(); child; child = child->next_sibling()) {
893 ifTmi8Element(*child, nss, [&](std::string_view name, const Xmlns *nss) {
894 if (name == "Timestamp") {
895 PERRASSERT("Invalid value for Timestamp: Bad format", onlyTextElement(*child));
896 PERRASSERT("Invalid value for Timestamp: Invalid timestamp", Timestamp::parse(info.timestamp, getValue(*child)));
897 info.markPresent(TMI8F_TIMESTAMP);
898 } else if (name == "SubscriberID") {
899 PERRASSERT("Invalid value for SubscriberID: Bad format", onlyTextElement(*child));
900 info.subscriber_id = getValue(*child);
901 info.markPresent(TMI8F_SUBSCRIBER_ID);
902 } else if (name == "Version") {
903 PERRASSERT("Invalid value for Version: Bad format", onlyTextElement(*child));
904 info.version = getValue(*child);
905 info.markPresent(TMI8F_VERSION);
906 } else if (name == "DossierName") {
907 PERRASSERT("Invalid value for DossierName: Bad format", onlyTextElement(*child));
908 info.dossier_name = getValue(*child);
909 info.markPresent(TMI8F_DOSSIER_NAME);
910 } else if (name == "KV6posinfo") {
911 info.messages = parseKv6PosInfo(*child, nss);
912 }
913 });
914 }
915
916 if (!info.valid())
917 return std::nullopt;
918 return info;
919 }
920
921 std::optional<Tmi8VvTmPushInfo> parse(const rapidxml::xml_document<> &doc) {
922 std::optional<Tmi8VvTmPushInfo> msg;
923 for (const rapidxml::xml_node<> *node = doc.first_node(); node; node = node->next_sibling()) {
924 ifTmi8Element(*node, nullptr /* nss */, [&](std::string_view name, const Xmlns *nss) {
925 if (name == "VV_TM_PUSH") {
926 if (msg) {
927 error("Duplicated VV_TM_PUSH");
928 return;
929 }
930 msg = parseVvTmPush(*node, nss);
931 if (!msg) {
932 error("Invalid VV_TM_PUSH");
933 }
934 }
935 });
936 }
937 if (!msg)
938 error("Expected to find VV_TM_PUSH");
939 return msg;
940 }
941};
942
943std::optional<Tmi8VvTmPushInfo> parseXml(const rapidxml::xml_document<> &doc, std::stringstream &errs, std::stringstream &warns) {
944 Kv6Parser parser = { errs, warns };
945 return parser.parse(doc);
946}
947
948struct Metrics {
949 prometheus::Counter &messages_counter_ok;
950 prometheus::Counter &messages_counter_error;
951 prometheus::Counter &messages_counter_warning;
952 prometheus::Counter &rows_written_counter;
953 prometheus::Histogram &records_hist;
954 prometheus::Histogram &message_parse_hist;
955 prometheus::Histogram &payload_size_hist;
956
957 using BucketBoundaries = prometheus::Histogram::BucketBoundaries;
958
959 enum class ParseStatus {
960 OK,
961 WARNING,
962 ERROR,
963 };
964
965 Metrics(std::shared_ptr<prometheus::Registry> registry) :
966 Metrics(registry, prometheus::BuildCounter()
967 .Name("kv6_vv_tm_push_messages_total")
968 .Help("Number of KV6 VV_TM_PUSH messages received")
969 .Register(*registry))
970 {}
971
972 void addMeasurement(std::chrono::duration<double> took_secs, size_t payload_size, size_t records, ParseStatus parsed) {
973 double millis = took_secs.count() * 1000.0;
974
975 if (parsed == ParseStatus::OK) messages_counter_ok.Increment();
976 else if (parsed == ParseStatus::WARNING) messages_counter_warning.Increment();
977 else if (parsed == ParseStatus::ERROR) messages_counter_error.Increment();
978 records_hist.Observe(static_cast<double>(records));
979 message_parse_hist.Observe(millis);
980 payload_size_hist.Observe(static_cast<double>(payload_size));
981 }
982
983 void rowsWritten(int64_t rows) {
984 rows_written_counter.Increment(static_cast<double>(rows));
985 }
986
987 private:
988 Metrics(std::shared_ptr<prometheus::Registry> registry,
989 prometheus::Family<prometheus::Counter> &messages_counter) :
990 messages_counter_ok(messages_counter
991 .Add({{ "status", "ok" }})),
992 messages_counter_error(messages_counter
993 .Add({{ "status", "error" }})),
994 messages_counter_warning(messages_counter
995 .Add({{ "status", "warning" }})),
996 rows_written_counter(prometheus::BuildCounter()
997 .Name("kv6_vv_tm_push_records_written")
998 .Help("Numer of VV_TM_PUSH records written to disk")
999 .Register(*registry)
1000 .Add({})),
1001 records_hist(prometheus::BuildHistogram()
1002 .Name("kv6_vv_tm_push_records_amount")
1003 .Help("Number of KV6 VV_TM_PUSH records")
1004 .Register(*registry)
1005 .Add({}, BucketBoundaries{ 5.0, 10.0, 20.0, 50.0, 100.0, 250.0, 500.0 })),
1006 message_parse_hist(prometheus::BuildHistogram()
1007 .Name("kv6_vv_tm_push_message_parse_millis")
1008 .Help("Milliseconds taken to parse KV6 VV_TM_PUSH messages")
1009 .Register(*registry)
1010 .Add({}, BucketBoundaries{ 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 100.0, 1000.0, 2000.0 })),
1011 payload_size_hist(prometheus::BuildHistogram()
1012 .Name("kv6_payload_size")
1013 .Help("Sizes of KV6 ZeroMQ message payloads")
1014 .Register(*registry)
1015 .Add({}, BucketBoundaries{ 500.0, 1000.0, 2500.0, 5000.0, 10000.0, 25000.0, 50000.0 }))
1016 {}
1017};
1018
1019// Note: it *must* hold that decompressed[size] == 0
1020std::optional<Tmi8VvTmPushInfo> parseMsg(char *decompressed, size_t size, Metrics &metrics, std::stringstream &errs, std::stringstream &warns) {
1021 auto start = std::chrono::steady_clock::now();
1022
1023 std::optional<Tmi8VvTmPushInfo> info;
1024
1025 if (decompressed[size] != 0) {
1026 errs << "Not parsing: missing null terminator" << '\n';
1027 } else {
1028 rapidxml::xml_document<> doc;
1029 constexpr int PARSE_FLAGS = rapidxml::parse_trim_whitespace
1030 | rapidxml::parse_no_string_terminators
1031 | rapidxml::parse_validate_closing_tags;
1032
1033 try {
1034 doc.parse<PARSE_FLAGS>(decompressed);
1035 info = parseXml(doc, errs, warns);
1036 } catch (const rapidxml::parse_error &err) {
1037 errs << "XML parsing failed" << '\n';
1038 }
1039 }
1040
1041 auto end = std::chrono::steady_clock::now();
1042 std::chrono::duration<double> took = end - start;
1043
1044 if (info)
1045 if (warns.view().empty())
1046 metrics.addMeasurement(took, size, info->messages.size(), Metrics::ParseStatus::OK);
1047 else
1048 metrics.addMeasurement(took, size, info->messages.size(), Metrics::ParseStatus::WARNING);
1049 else
1050 metrics.addMeasurement(took, size, 0, Metrics::ParseStatus::ERROR);
1051
1052 return info;
1053}
1054
1055bool terminate = false;
1056
1057void onSigIntOrTerm(int /* signum */) {
1058 terminate = true;
1059}
1060
1061arrow::Result<std::shared_ptr<arrow::Table>> getTable(const std::vector<Kv6Record> &messages, size_t &rows_written) {
1062 ParquetBuilder builder;
1063
1064 for (const auto &msg : messages) {
1065 Kv6Field present = msg.presence;
1066 Kv6Field required = KV6T_REQUIRED_FIELDS[msg.type];
1067 Kv6Field optional = KV6T_OPTIONAL_FIELDS[msg.type];
1068 if ((~msg.presence & required) != 0) {
1069 std::cout << "Invalid message: not all required fields present; skipping" << std::endl;
1070 continue;
1071 }
1072 Kv6Field used = static_cast<Kv6Field>(present & (required | optional));
1073 rows_written++;
1074
1075 // RD-X and RD-Y fix: some datatypes have these fields marked as required, but still give option
1076 // of not providing these fields by setting them to -1. We want this normalized, where these
1077 // fields are instead simply marked as not present.
1078 if ((used & KV6F_RD_X) && msg.rd_x == -1)
1079 used = static_cast<Kv6Field>(used & ~KV6F_RD_X);
1080 if ((used & KV6F_RD_Y) && msg.rd_y == -1)
1081 used = static_cast<Kv6Field>(used & ~KV6F_RD_Y);
1082
1083 ARROW_RETURN_NOT_OK(builder.types.Append(*findKv6PosInfoRecordTypeName(msg.type)));
1084 ARROW_RETURN_NOT_OK(used & KV6F_DATA_OWNER_CODE
1085 ? builder.data_owner_codes.Append(msg.data_owner_code)
1086 : builder.data_owner_codes.AppendNull());
1087 ARROW_RETURN_NOT_OK(used & KV6F_LINE_PLANNING_NUMBER
1088 ? builder.line_planning_numbers.Append(msg.line_planning_number)
1089 : builder.line_planning_numbers.AppendNull());
1090 ARROW_RETURN_NOT_OK(used & KV6F_OPERATING_DAY
1091 ? builder.operating_days.Append(static_cast<int32_t>(msg.operating_day.toUnixDays().count()))
1092 : builder.operating_days.AppendNull());
1093 ARROW_RETURN_NOT_OK(used & KV6F_JOURNEY_NUMBER
1094 ? builder.journey_numbers.Append(msg.journey_number)
1095 : builder.journey_numbers.AppendNull());
1096 ARROW_RETURN_NOT_OK(used & KV6F_REINFORCEMENT_NUMBER
1097 ? builder.reinforcement_numbers.Append(msg.reinforcement_number)
1098 : builder.reinforcement_numbers.AppendNull());
1099 ARROW_RETURN_NOT_OK(used & KV6F_TIMESTAMP
1100 ? builder.timestamps.Append(msg.timestamp.toUnixSeconds().count())
1101 : builder.timestamps.AppendNull());
1102 ARROW_RETURN_NOT_OK(used & KV6F_SOURCE
1103 ? builder.sources.Append(msg.source)
1104 : builder.sources.AppendNull());
1105 ARROW_RETURN_NOT_OK(used & KV6F_PUNCTUALITY
1106 ? builder.punctualities.Append(msg.punctuality)
1107 : builder.punctualities.AppendNull());
1108 ARROW_RETURN_NOT_OK(used & KV6F_USER_STOP_CODE
1109 ? builder.user_stop_codes.Append(msg.user_stop_code)
1110 : builder.user_stop_codes.AppendNull());
1111 ARROW_RETURN_NOT_OK(used & KV6F_PASSAGE_SEQUENCE_NUMBER
1112 ? builder.passage_sequence_numbers.Append(msg.passage_sequence_number)
1113 : builder.passage_sequence_numbers.AppendNull());
1114 ARROW_RETURN_NOT_OK(used & KV6F_VEHICLE_NUMBER
1115 ? builder.vehicle_numbers.Append(msg.vehicle_number)
1116 : builder.vehicle_numbers.AppendNull());
1117 ARROW_RETURN_NOT_OK(used & KV6F_BLOCK_CODE
1118 ? builder.block_codes.Append(msg.block_code)
1119 : builder.block_codes.AppendNull());
1120 ARROW_RETURN_NOT_OK(used & KV6F_WHEELCHAIR_ACCESSIBLE
1121 ? builder.wheelchair_accessibles.Append(msg.wheelchair_accessible)
1122 : builder.wheelchair_accessibles.AppendNull());
1123 ARROW_RETURN_NOT_OK(used & KV6F_NUMBER_OF_COACHES
1124 ? builder.number_of_coaches.Append(msg.number_of_coaches)
1125 : builder.number_of_coaches.AppendNull());
1126 ARROW_RETURN_NOT_OK(used & KV6F_RD_Y
1127 ? builder.rd_ys.Append(msg.rd_y)
1128 : builder.rd_ys.AppendNull());
1129 ARROW_RETURN_NOT_OK(used & KV6F_RD_X
1130 ? builder.rd_xs.Append(msg.rd_x)
1131 : builder.rd_xs.AppendNull());
1132 ARROW_RETURN_NOT_OK(used & KV6F_DISTANCE_SINCE_LAST_USER_STOP
1133 ? builder.distance_since_last_user_stops.Append(msg.distance_since_last_user_stop)
1134 : builder.distance_since_last_user_stops.AppendNull());
1135 }
1136
1137 return builder.getTable();
1138}
1139
1140std::tuple<int64_t, int64_t> getMinMaxTimestamp(const std::vector<Kv6Record> &messages) {
1141 if (messages.size() == 0)
1142 return { 0, 0 };
1143 int64_t min = std::numeric_limits<int64_t>::max();
1144 int64_t max = 0;
1145 for (const auto &message : messages) {
1146 if (~message.presence & KV6F_TIMESTAMP)
1147 continue;
1148 int64_t seconds = message.timestamp.toUnixSeconds().count();
1149 if (seconds < min)
1150 min = seconds;
1151 if (seconds > max)
1152 max = seconds;
1153 }
1154 if (min == std::numeric_limits<decltype(min)>::max())
1155 return { 0, 0 }; // this is stupid
1156 return { min, max };
1157}
1158
1159arrow::Status writeParquet(const std::vector<Kv6Record> &messages, Metrics &metrics) {
1160 size_t rows_written = 0;
1161 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, getTable(messages, rows_written));
1162
1163 auto timestamp = std::chrono::round<std::chrono::seconds>(std::chrono::utc_clock::now());
1164 std::string filename = std::format("oeuf-{:%FT%T%Ez}.parquet", timestamp);
1165 ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*table, filename));
1166 std::cout << "Wrote Parquet file " << filename << std::endl;
1167
1168 auto [min_timestamp, max_timestamp] = getMinMaxTimestamp(messages);
1169 std::ofstream metaf(filename + ".meta.json.part", std::ios::binary);
1170 nlohmann::json meta{
1171 { "min_timestamp", min_timestamp },
1172 { "max_timestamp", max_timestamp },
1173 { "rows_written", rows_written },
1174 };
1175 metaf << meta;
1176 metaf.close();
1177 std::filesystem::rename(filename + ".meta.json.part", filename + ".meta.json");
1178
1179 metrics.rowsWritten(rows_written);
1180
1181 return arrow::Status::OK();
1182}
1183
1184using SteadyTime = std::chrono::steady_clock::time_point;
1185
1186std::string dumpFailedMsg(std::string_view txt, std::string_view errs, std::string_view warns) {
1187 auto timestamp = std::chrono::round<std::chrono::seconds>(std::chrono::utc_clock::now());
1188 std::string filename = std::format("oeuf-error-{:%FT%T%Ez}.txt", timestamp);
1189 std::ofstream dumpf(filename, std::ios::binary);
1190 dumpf << "======= ERROR MESSAGES ========" << std::endl;
1191 dumpf << errs;
1192 dumpf << "======= WARNING MESSAGES ======" << std::endl;
1193 dumpf << warns;
1194 dumpf << "======= RECEIVED MESSAGE ======" << std::endl;
1195 dumpf << txt << std::endl;
1196 dumpf.close();
1197 return filename;
1198}
1199
1200void handleMsg(RawMessage &msg, Metrics &metrics, SteadyTime &last_output, std::vector<Kv6Record> &msg_buf) {
1201 unsigned int decompressed_size = 0;
1202 if (msg.getBodySize() > std::numeric_limits<unsigned int>::max())
1203 std::cout << "parseMsg failed due to too large message" << std::endl;
1204 char *decompressed = decompress(msg.getBody(), static_cast<unsigned int>(msg.getBodySize()), decompressed_size);
1205
1206 std::stringstream errs;
1207 std::stringstream warns;
1208 // We know that decompressed[decompressed_size] == 0 because decompress() ensures this.
1209 auto parsed_msg = parseMsg(decompressed, decompressed_size, metrics, errs, warns);
1210 if (parsed_msg) {
1211 const Tmi8VvTmPushInfo &info = *parsed_msg;
1212 auto new_msgs_it = info.messages.begin();
1213 while (new_msgs_it != info.messages.end()) {
1214 size_t remaining_space = MAX_PARQUET_CHUNK - msg_buf.size();
1215 size_t new_msgs_left = info.messages.end() - new_msgs_it;
1216 auto new_msgs_start = new_msgs_it;
1217 auto new_msgs_end = new_msgs_start + std::min(remaining_space, new_msgs_left);
1218 new_msgs_it = new_msgs_end;
1219 msg_buf.insert(msg_buf.end(), new_msgs_start, new_msgs_end);
1220
1221 bool time_expired = std::chrono::steady_clock::now() - last_output > std::chrono::minutes(5);
1222 if (msg_buf.size() >= MAX_PARQUET_CHUNK || (new_msgs_it == info.messages.end() && time_expired)) {
1223 arrow::Status status = writeParquet(msg_buf, metrics);
1224 if (!status.ok())
1225 std::cout << "Writing Parquet file failed: " << status << std::endl;
1226 msg_buf.clear();
1227 last_output = std::chrono::steady_clock::now();
1228 }
1229 }
1230 if (!errs.view().empty() || !warns.view().empty()) {
1231 std::filesystem::path dump_file = dumpFailedMsg(std::string_view(decompressed, decompressed_size), errs.str(), warns.str());
1232 std::cout << "parseMsg finished with warnings: details dumped to " << dump_file << std::endl;
1233 }
1234 } else {
1235 std::filesystem::path dump_file = dumpFailedMsg(std::string_view(decompressed, decompressed_size), errs.str(), warns.str());
1236 std::cout << "parseMsg failed: error details dumped to " << dump_file << std::endl;
1237 }
1238 free(decompressed);
1239}
1240
1241int main(int argc, char *argv[]) {
1242 std::cout << "Working directory: " << std::filesystem::current_path() << std::endl;
1243
1244 const char *metrics_addr = getenv("METRICS_ADDR");
1245 if (!metrics_addr || strlen(metrics_addr) == 0) {
1246 std::cout << "Error: no METRICS_ADDR set!" << std::endl;
1247 exit(EXIT_FAILURE);
1248 }
1249 prometheus::Exposer exposer{metrics_addr};
1250
1251 bool prod = false;
1252 const char *prod_env = getenv("NDOV_PRODUCTION");
1253 if (prod_env && strcmp(prod_env, "true") == 0) prod = true;
1254
1255 void *zmq_context = zmq_ctx_new();
1256 void *zmq_subscriber = zmq_socket(zmq_context, ZMQ_SUB);
1257 int rc = zmq_connect(zmq_subscriber, prod ? "tcp://pubsub.ndovloket.nl:7658" : "tcp://pubsub.besteffort.ndovloket.nl:7658");
1258 assert(rc == 0);
1259
1260 const char *topic = "/CXX/KV6posinfo";
1261 rc = zmq_setsockopt(zmq_subscriber, ZMQ_SUBSCRIBE, topic, strlen(topic));
1262 assert(rc == 0);
1263
1264 signal(SIGINT, onSigIntOrTerm);
1265 signal(SIGTERM, onSigIntOrTerm);
1266
1267 SteadyTime last_output = std::chrono::steady_clock::now();
1268
1269 auto registry = std::make_shared<prometheus::Registry>();
1270 Metrics metrics(registry);
1271 exposer.RegisterCollectable(registry);
1272
1273 std::vector<Kv6Record> msg_buf;
1274 while (!terminate) {
1275 std::optional<RawMessage> msg = recvMsg(zmq_subscriber);
1276 if (!msg) {
1277 if (!terminate)
1278 perror("recvMsg");
1279 continue;
1280 }
1281 handleMsg(*msg, metrics, last_output, msg_buf);
1282 }
1283
1284 std::cout << "Terminating" << std::endl;
1285 if (msg_buf.size() > 0) {
1286 arrow::Status status = writeParquet(msg_buf, metrics);
1287 if (!status.ok()) std::cout << "Writing final Parquet file failed: " << status << std::endl;
1288 else std::cout << "Final data written" << std::endl;
1289 msg_buf.clear();
1290 }
1291
1292 if (zmq_close(zmq_subscriber))
1293 perror("zmq_close");
1294 if (zmq_ctx_destroy(zmq_context))
1295 perror("zmq_ctx_destroy");
1296
1297 std::cout << "Bye" << std::endl;
1298
1299 return 0;
1300}