diff options
Diffstat (limited to 'src')
35 files changed, 3963 insertions, 0 deletions
diff --git a/src/augmentkv6/.envrc b/src/augmentkv6/.envrc new file mode 100644 index 0000000..694e74f --- /dev/null +++ b/src/augmentkv6/.envrc | |||
@@ -0,0 +1,2 @@ | |||
1 | source_env ../../ | ||
2 | export DEVMODE=1 | ||
diff --git a/src/augmentkv6/Makefile b/src/augmentkv6/Makefile new file mode 100644 index 0000000..cebb291 --- /dev/null +++ b/src/augmentkv6/Makefile | |||
@@ -0,0 +1,21 @@ | |||
1 | # Taken from: | ||
2 | # Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide | ||
3 | # for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01, | ||
4 | # 2023. [Online]. Available: | ||
5 | # https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html | ||
6 | CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\ | ||
7 | -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \ | ||
8 | -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \ | ||
9 | -D_GLIBCXX_ASSERTIONS \ | ||
10 | -fstrict-flex-arrays=3 \ | ||
11 | -fstack-clash-protection -fstack-protector-strong | ||
12 | LDFLAGS=-larrow -larrow_acero -larrow_dataset -lparquet -ltmi8 -Wl,-z,defs \ | ||
13 | -Wl,-z,nodlopen -Wl,-z,noexecstack \ | ||
14 | -Wl,-z,relro -Wl,-z,now | ||
15 | |||
16 | augmentkv6: main.cpp | ||
17 | $(CXX) -fPIE -pie -o $@ $^ $(CXXFLAGS) $(LDFLAGS) | ||
18 | |||
19 | .PHONY: clean | ||
20 | clean: | ||
21 | rm augmentkv6 | ||
diff --git a/src/augmentkv6/main.cpp b/src/augmentkv6/main.cpp new file mode 100644 index 0000000..81a54d3 --- /dev/null +++ b/src/augmentkv6/main.cpp | |||
@@ -0,0 +1,510 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #include <chrono> | ||
4 | #include <cstdio> | ||
5 | #include <deque> | ||
6 | #include <filesystem> | ||
7 | #include <format> | ||
8 | #include <fstream> | ||
9 | #include <iostream> | ||
10 | #include <string> | ||
11 | #include <string_view> | ||
12 | #include <vector> | ||
13 | |||
14 | #include <arrow/acero/exec_plan.h> | ||
15 | #include <arrow/api.h> | ||
16 | #include <arrow/compute/api.h> | ||
17 | #include <arrow/dataset/api.h> | ||
18 | #include <arrow/filesystem/api.h> | ||
19 | #include <arrow/io/api.h> | ||
20 | #include <parquet/arrow/reader.h> | ||
21 | |||
22 | #include <tmi8/kv1_index.hpp> | ||
23 | #include <tmi8/kv1_lexer.hpp> | ||
24 | #include <tmi8/kv1_parser.hpp> | ||
25 | #include <tmi8/kv1_types.hpp> | ||
26 | #include <tmi8/kv6_parquet.hpp> | ||
27 | |||
28 | using namespace std::string_view_literals; | ||
29 | |||
30 | namespace ac = arrow::acero; | ||
31 | namespace ds = arrow::dataset; | ||
32 | namespace cp = arrow::compute; | ||
33 | using namespace arrow; | ||
34 | |||
35 | using TimingClock = std::conditional_t< | ||
36 | std::chrono::high_resolution_clock::is_steady, | ||
37 | std::chrono::high_resolution_clock, | ||
38 | std::chrono::steady_clock>; | ||
39 | |||
40 | std::string readKv1() { | ||
41 | fputs("Reading KV1 from standard input\n", stderr); | ||
42 | |||
43 | char buf[4096]; | ||
44 | std::string data; | ||
45 | while (!feof(stdin) && !ferror(stdin)) { | ||
46 | size_t read = fread(buf, sizeof(char), 4096, stdin); | ||
47 | data.append(buf, read); | ||
48 | } | ||
49 | if (ferror(stdin)) { | ||
50 | fputs("Error when reading from stdin\n", stderr); | ||
51 | exit(1); | ||
52 | } | ||
53 | fprintf(stderr, "Read %lu bytes\n", data.size()); | ||
54 | |||
55 | return data; | ||
56 | } | ||
57 | |||
58 | std::vector<Kv1Token> lex() { | ||
59 | std::string data = readKv1(); | ||
60 | |||
61 | auto start = TimingClock::now(); | ||
62 | Kv1Lexer lexer(data); | ||
63 | lexer.lex(); | ||
64 | auto end = TimingClock::now(); | ||
65 | |||
66 | std::chrono::duration<double> elapsed{end - start}; | ||
67 | double bytes = static_cast<double>(data.size()) / 1'000'000; | ||
68 | double speed = bytes / elapsed.count(); | ||
69 | |||
70 | if (!lexer.errors.empty()) { | ||
71 | fputs("Lexer reported errors:\n", stderr); | ||
72 | for (const auto &error : lexer.errors) | ||
73 | fprintf(stderr, "- %s\n", error.c_str()); | ||
74 | exit(1); | ||
75 | } | ||
76 | |||
77 | fprintf(stderr, "Got %lu tokens\n", lexer.tokens.size()); | ||
78 | fprintf(stderr, "Duration: %f s\n", elapsed.count()); | ||
79 | fprintf(stderr, "Speed: %f MB/s\n", speed); | ||
80 | |||
81 | return std::move(lexer.tokens); | ||
82 | } | ||
83 | |||
84 | bool parse(Kv1Records &into) { | ||
85 | std::vector<Kv1Token> tokens = lex(); | ||
86 | |||
87 | Kv1Parser parser(tokens, into); | ||
88 | parser.parse(); | ||
89 | |||
90 | bool ok = true; | ||
91 | if (!parser.gerrors.empty()) { | ||
92 | ok = false; | ||
93 | fputs("Parser reported errors:\n", stderr); | ||
94 | for (const auto &error : parser.gerrors) | ||
95 | fprintf(stderr, "- %s\n", error.c_str()); | ||
96 | } | ||
97 | if (!parser.warns.empty()) { | ||
98 | fputs("Parser reported warnings:\n", stderr); | ||
99 | for (const auto &warn : parser.warns) | ||
100 | fprintf(stderr, "- %s\n", warn.c_str()); | ||
101 | } | ||
102 | |||
103 | fprintf(stderr, "Parsed %lu records\n", into.size()); | ||
104 | |||
105 | return ok; | ||
106 | } | ||
107 | |||
108 | void printParsedRecords(const Kv1Records &records) { | ||
109 | fputs("Parsed records:\n", stderr); | ||
110 | fprintf(stderr, " organizational_units: %lu\n", records.organizational_units.size()); | ||
111 | fprintf(stderr, " higher_organizational_units: %lu\n", records.higher_organizational_units.size()); | ||
112 | fprintf(stderr, " user_stop_points: %lu\n", records.user_stop_points.size()); | ||
113 | fprintf(stderr, " user_stop_areas: %lu\n", records.user_stop_areas.size()); | ||
114 | fprintf(stderr, " timing_links: %lu\n", records.timing_links.size()); | ||
115 | fprintf(stderr, " links: %lu\n", records.links.size()); | ||
116 | fprintf(stderr, " lines: %lu\n", records.lines.size()); | ||
117 | fprintf(stderr, " destinations: %lu\n", records.destinations.size()); | ||
118 | fprintf(stderr, " journey_patterns: %lu\n", records.journey_patterns.size()); | ||
119 | fprintf(stderr, " concession_financer_relations: %lu\n", records.concession_financer_relations.size()); | ||
120 | fprintf(stderr, " concession_areas: %lu\n", records.concession_areas.size()); | ||
121 | fprintf(stderr, " financers: %lu\n", records.financers.size()); | ||
122 | fprintf(stderr, " journey_pattern_timing_links: %lu\n", records.journey_pattern_timing_links.size()); | ||
123 | fprintf(stderr, " points: %lu\n", records.points.size()); | ||
124 | fprintf(stderr, " point_on_links: %lu\n", records.point_on_links.size()); | ||
125 | fprintf(stderr, " icons: %lu\n", records.icons.size()); | ||
126 | fprintf(stderr, " notices: %lu\n", records.notices.size()); | ||
127 | fprintf(stderr, " notice_assignments: %lu\n", records.notice_assignments.size()); | ||
128 | fprintf(stderr, " time_demand_groups: %lu\n", records.time_demand_groups.size()); | ||
129 | fprintf(stderr, " time_demand_group_run_times: %lu\n", records.time_demand_group_run_times.size()); | ||
130 | fprintf(stderr, " period_groups: %lu\n", records.period_groups.size()); | ||
131 | fprintf(stderr, " specific_days: %lu\n", records.specific_days.size()); | ||
132 | fprintf(stderr, " timetable_versions: %lu\n", records.timetable_versions.size()); | ||
133 | fprintf(stderr, " public_journeys: %lu\n", records.public_journeys.size()); | ||
134 | fprintf(stderr, " period_group_validities: %lu\n", records.period_group_validities.size()); | ||
135 | fprintf(stderr, " exceptional_operating_days: %lu\n", records.exceptional_operating_days.size()); | ||
136 | fprintf(stderr, " schedule_versions: %lu\n", records.schedule_versions.size()); | ||
137 | fprintf(stderr, " public_journey_passing_times: %lu\n", records.public_journey_passing_times.size()); | ||
138 | fprintf(stderr, " operating_days: %lu\n", records.operating_days.size()); | ||
139 | } | ||
140 | |||
141 | void printIndexSize(const Kv1Index &index) { | ||
142 | fputs("Index size:\n", stderr); | ||
143 | fprintf(stderr, " organizational_units: %lu\n", index.organizational_units.size()); | ||
144 | fprintf(stderr, " user_stop_points: %lu\n", index.user_stop_points.size()); | ||
145 | fprintf(stderr, " user_stop_areas: %lu\n", index.user_stop_areas.size()); | ||
146 | fprintf(stderr, " timing_links: %lu\n", index.timing_links.size()); | ||
147 | fprintf(stderr, " links: %lu\n", index.links.size()); | ||
148 | fprintf(stderr, " lines: %lu\n", index.lines.size()); | ||
149 | fprintf(stderr, " destinations: %lu\n", index.destinations.size()); | ||
150 | fprintf(stderr, " journey_patterns: %lu\n", index.journey_patterns.size()); | ||
151 | fprintf(stderr, " concession_financer_relations: %lu\n", index.concession_financer_relations.size()); | ||
152 | fprintf(stderr, " concession_areas: %lu\n", index.concession_areas.size()); | ||
153 | fprintf(stderr, " financers: %lu\n", index.financers.size()); | ||
154 | fprintf(stderr, " journey_pattern_timing_links: %lu\n", index.journey_pattern_timing_links.size()); | ||
155 | fprintf(stderr, " points: %lu\n", index.points.size()); | ||
156 | fprintf(stderr, " point_on_links: %lu\n", index.point_on_links.size()); | ||
157 | fprintf(stderr, " icons: %lu\n", index.icons.size()); | ||
158 | fprintf(stderr, " notices: %lu\n", index.notices.size()); | ||
159 | fprintf(stderr, " time_demand_groups: %lu\n", index.time_demand_groups.size()); | ||
160 | fprintf(stderr, " time_demand_group_run_times: %lu\n", index.time_demand_group_run_times.size()); | ||
161 | fprintf(stderr, " period_groups: %lu\n", index.period_groups.size()); | ||
162 | fprintf(stderr, " specific_days: %lu\n", index.specific_days.size()); | ||
163 | fprintf(stderr, " timetable_versions: %lu\n", index.timetable_versions.size()); | ||
164 | fprintf(stderr, " public_journeys: %lu\n", index.public_journeys.size()); | ||
165 | fprintf(stderr, " period_group_validities: %lu\n", index.period_group_validities.size()); | ||
166 | fprintf(stderr, " exceptional_operating_days: %lu\n", index.exceptional_operating_days.size()); | ||
167 | fprintf(stderr, " schedule_versions: %lu\n", index.schedule_versions.size()); | ||
168 | fprintf(stderr, " public_journey_passing_times: %lu\n", index.public_journey_passing_times.size()); | ||
169 | fprintf(stderr, " operating_days: %lu\n", index.operating_days.size()); | ||
170 | } | ||
171 | |||
172 | struct BasicJourneyKey { | ||
173 | std::string data_owner_code; | ||
174 | std::string line_planning_number; | ||
175 | int journey_number; | ||
176 | |||
177 | auto operator<=>(const BasicJourneyKey &) const = default; | ||
178 | }; | ||
179 | |||
180 | size_t hash_value(const BasicJourneyKey &k) { | ||
181 | size_t seed = 0; | ||
182 | |||
183 | boost::hash_combine(seed, k.data_owner_code); | ||
184 | boost::hash_combine(seed, k.line_planning_number); | ||
185 | boost::hash_combine(seed, k.journey_number); | ||
186 | |||
187 | return seed; | ||
188 | } | ||
189 | |||
190 | using BasicJourneyKeySet = std::unordered_set<BasicJourneyKey, boost::hash<BasicJourneyKey>>; | ||
191 | |||
192 | arrow::Result<BasicJourneyKeySet> basicJourneys(std::shared_ptr<arrow::Table> table) { | ||
193 | ac::TableSourceNodeOptions table_source_node_options(table); | ||
194 | ac::Declaration table_source("table_source", std::move(table_source_node_options)); | ||
195 | auto aggregate_options = ac::AggregateNodeOptions{ | ||
196 | /* .aggregates = */ {}, | ||
197 | /* .keys = */ { "data_owner_code", "line_planning_number", "journey_number" }, | ||
198 | }; | ||
199 | ac::Declaration aggregate("aggregate", { std::move(table_source) }, std::move(aggregate_options)); | ||
200 | |||
201 | std::shared_ptr<arrow::Table> result; | ||
202 | ARROW_ASSIGN_OR_RAISE(result, ac::DeclarationToTable(std::move(aggregate))); | ||
203 | |||
204 | std::shared_ptr<arrow::ChunkedArray> data_owner_codes = result->GetColumnByName("data_owner_code"); | ||
205 | std::shared_ptr<arrow::ChunkedArray> line_planning_numbers = result->GetColumnByName("line_planning_number"); | ||
206 | std::shared_ptr<arrow::ChunkedArray> journey_numbers = result->GetColumnByName("journey_number"); | ||
207 | |||
208 | int i_data_owner_codes_chunk = 0; | ||
209 | int i_journey_numbers_chunk = 0; | ||
210 | int i_line_planning_numbers_chunk = 0; | ||
211 | int i_in_data_owner_codes_chunk = 0; | ||
212 | int i_in_journey_numbers_chunk = 0; | ||
213 | int i_in_line_planning_numbers_chunk = 0; | ||
214 | |||
215 | BasicJourneyKeySet journeys; | ||
216 | |||
217 | for (int64_t i = 0; i < result->num_rows(); i++) { | ||
218 | auto data_owner_codes_chunk = std::static_pointer_cast<arrow::StringArray>(data_owner_codes->chunk(i_data_owner_codes_chunk)); | ||
219 | auto line_planning_numbers_chunk = std::static_pointer_cast<arrow::StringArray>(line_planning_numbers->chunk(i_line_planning_numbers_chunk)); | ||
220 | auto journey_numbers_chunk = std::static_pointer_cast<arrow::UInt32Array>(journey_numbers->chunk(i_journey_numbers_chunk)); | ||
221 | |||
222 | std::string_view data_owner_code = data_owner_codes_chunk->Value(i_in_data_owner_codes_chunk); | ||
223 | std::string_view line_planning_number = line_planning_numbers_chunk->Value(i_in_line_planning_numbers_chunk); | ||
224 | uint32_t journey_number = journey_numbers_chunk->Value(i_in_journey_numbers_chunk); | ||
225 | |||
226 | journeys.emplace( | ||
227 | std::string(data_owner_code), | ||
228 | std::string(line_planning_number), | ||
229 | journey_number | ||
230 | ); | ||
231 | |||
232 | i_in_data_owner_codes_chunk++; | ||
233 | i_in_line_planning_numbers_chunk++; | ||
234 | i_in_journey_numbers_chunk++; | ||
235 | if (i_in_data_owner_codes_chunk >= data_owner_codes_chunk->length()) { | ||
236 | i_data_owner_codes_chunk++; | ||
237 | i_in_data_owner_codes_chunk = 0; | ||
238 | } | ||
239 | if (i_in_line_planning_numbers_chunk >= line_planning_numbers_chunk->length()) { | ||
240 | i_line_planning_numbers_chunk++; | ||
241 | i_in_line_planning_numbers_chunk = 0; | ||
242 | } | ||
243 | if (i_in_journey_numbers_chunk >= journey_numbers_chunk->length()) { | ||
244 | i_journey_numbers_chunk++; | ||
245 | i_in_journey_numbers_chunk = 0; | ||
246 | } | ||
247 | } | ||
248 | |||
249 | return journeys; | ||
250 | } | ||
251 | |||
252 | struct DistanceKey { | ||
253 | BasicJourneyKey journey; | ||
254 | std::string last_passed_user_stop_code; | ||
255 | |||
256 | auto operator<=>(const DistanceKey &) const = default; | ||
257 | }; | ||
258 | |||
259 | size_t hash_value(const DistanceKey &k) { | ||
260 | size_t seed = 0; | ||
261 | |||
262 | boost::hash_combine(seed, k.journey); | ||
263 | boost::hash_combine(seed, k.last_passed_user_stop_code); | ||
264 | |||
265 | return seed; | ||
266 | } | ||
267 | |||
268 | struct DistanceTimingLink { | ||
269 | const Kv1JourneyPatternTimingLink *jopatili; | ||
270 | double distance_since_start_of_journey = 0; // at the start of the link | ||
271 | }; | ||
272 | |||
273 | using DistanceMap = std::unordered_map<DistanceKey, double, boost::hash<DistanceKey>>; | ||
274 | |||
275 | // Returns a map, where | ||
276 | // DataOwnerCode + LinePlanningNumber + JourneyNumber + UserStopCode -> | ||
277 | // Distance of Last User Stop | ||
278 | DistanceMap makeDistanceMap(Kv1Records &records, Kv1Index &index, BasicJourneyKeySet &journeys) { | ||
279 | std::unordered_map< | ||
280 | Kv1JourneyPattern::Key, | ||
281 | std::vector<DistanceTimingLink>, | ||
282 | boost::hash<Kv1JourneyPattern::Key>> jopatili_index; | ||
283 | std::unordered_map< | ||
284 | BasicJourneyKey, | ||
285 | const Kv1PublicJourney *, | ||
286 | boost::hash<BasicJourneyKey>> journey_index; | ||
287 | for (size_t i = 0; i < records.public_journeys.size(); i++) { | ||
288 | const Kv1PublicJourney *pujo = &records.public_journeys[i]; | ||
289 | |||
290 | BasicJourneyKey journey_key( | ||
291 | pujo->key.data_owner_code, | ||
292 | pujo->key.line_planning_number, | ||
293 | pujo->key.journey_number); | ||
294 | |||
295 | if (journeys.contains(journey_key)) { | ||
296 | journey_index[journey_key] = pujo; | ||
297 | |||
298 | Kv1JourneyPattern::Key jopa_key( | ||
299 | pujo->key.data_owner_code, | ||
300 | pujo->key.line_planning_number, | ||
301 | pujo->journey_pattern_code); | ||
302 | jopatili_index[jopa_key] = {}; | ||
303 | } | ||
304 | } | ||
305 | |||
306 | for (size_t i = 0; i < records.journey_pattern_timing_links.size(); i++) { | ||
307 | const Kv1JourneyPatternTimingLink *jopatili = &records.journey_pattern_timing_links[i]; | ||
308 | Kv1JourneyPattern::Key jopa_key( | ||
309 | jopatili->key.data_owner_code, | ||
310 | jopatili->key.line_planning_number, | ||
311 | jopatili->key.journey_pattern_code); | ||
312 | if (jopatili_index.contains(jopa_key)) { | ||
313 | jopatili_index[jopa_key].push_back(DistanceTimingLink(jopatili, 0)); | ||
314 | } | ||
315 | } | ||
316 | |||
317 | for (auto &[jopa_key, timing_links] : jopatili_index) { | ||
318 | std::sort(timing_links.begin(), timing_links.end(), [](auto a, auto b) { | ||
319 | return a.jopatili->key.timing_link_order < b.jopatili->key.timing_link_order; | ||
320 | }); | ||
321 | |||
322 | const std::string transport_type = index.journey_patterns[jopa_key]->p_line->transport_type; | ||
323 | |||
324 | for (size_t i = 1; i < timing_links.size(); i++) { | ||
325 | DistanceTimingLink *timing_link = &timing_links[i]; | ||
326 | DistanceTimingLink *prev_timing_link = &timing_links[i - 1]; | ||
327 | |||
328 | const Kv1Link::Key link_key( | ||
329 | prev_timing_link->jopatili->key.data_owner_code, | ||
330 | prev_timing_link->jopatili->user_stop_code_begin, | ||
331 | prev_timing_link->jopatili->user_stop_code_end, | ||
332 | transport_type); | ||
333 | double link_distance = index.links[link_key]->distance; | ||
334 | timing_link->distance_since_start_of_journey = | ||
335 | prev_timing_link->distance_since_start_of_journey + link_distance; | ||
336 | } | ||
337 | } | ||
338 | |||
339 | // DataOwnerCode + LinePlanningNumber + JourneyNumber + UserStopCode -> | ||
340 | // Distance of Last User Stop | ||
341 | DistanceMap distance_map; | ||
342 | |||
343 | for (const auto &journey : journeys) { | ||
344 | const Kv1PublicJourney *pujo = journey_index[journey]; | ||
345 | if (pujo == nullptr) { | ||
346 | std::cerr << "Warning: No PUJO found for [" << journey.data_owner_code << "] " | ||
347 | << journey.line_planning_number << "/" << journey.journey_number << std::endl; | ||
348 | continue; | ||
349 | } | ||
350 | Kv1JourneyPattern::Key jopa_key( | ||
351 | pujo->key.data_owner_code, | ||
352 | pujo->key.line_planning_number, | ||
353 | pujo->journey_pattern_code); | ||
354 | for (const auto &timing_link : jopatili_index[jopa_key]) { | ||
355 | DistanceKey key(journey, timing_link.jopatili->user_stop_code_begin); | ||
356 | distance_map[key] = timing_link.distance_since_start_of_journey; | ||
357 | } | ||
358 | } | ||
359 | |||
360 | return distance_map; | ||
361 | } | ||
362 | |||
363 | arrow::Result<std::shared_ptr<arrow::Table>> augment( | ||
364 | std::shared_ptr<arrow::Table> table, | ||
365 | const DistanceMap &distance_map | ||
366 | ) { | ||
367 | for (int i = 0; i < table->num_columns(); i++) { | ||
368 | if (table->column(i)->num_chunks() > 1) { | ||
369 | std::stringstream ss; | ||
370 | ss << "Error: Expected column " << i | ||
371 | << " (" << table->ColumnNames()[i] << ") to have 1 chunk, got " | ||
372 | << table->column(i)->num_chunks(); | ||
373 | return arrow::Status::Invalid(ss.str()); | ||
374 | } | ||
375 | } | ||
376 | |||
377 | auto data_owner_codes = std::static_pointer_cast<arrow::StringArray>(table->GetColumnByName("data_owner_code")->chunk(0)); | ||
378 | auto line_planning_numbers = std::static_pointer_cast<arrow::StringArray>(table->GetColumnByName("line_planning_number")->chunk(0)); | ||
379 | auto journey_numbers = std::static_pointer_cast<arrow::UInt32Array>(table->GetColumnByName("journey_number")->chunk(0)); | ||
380 | auto user_stop_codes = std::static_pointer_cast<arrow::StringArray>(table->GetColumnByName("user_stop_code")->chunk(0)); | ||
381 | auto distance_since_last_user_stops = std::static_pointer_cast<arrow::UInt32Array>(table->GetColumnByName("distance_since_last_user_stop")->chunk(0)); | ||
382 | auto timestamps = std::static_pointer_cast<arrow::TimestampArray>(table->GetColumnByName("timestamp")->chunk(0)); | ||
383 | |||
384 | auto timestamps_type = table->schema()->GetFieldByName("timestamp")->type(); | ||
385 | if (timestamps_type->id() != arrow::Type::TIMESTAMP) | ||
386 | return arrow::Status::Invalid("Field 'timestamp' does not have expected type TIMESTAMP"); | ||
387 | if (std::static_pointer_cast<arrow::TimestampType>(timestamps_type)->unit() != arrow::TimeUnit::MILLI) | ||
388 | return arrow::Status::Invalid("Field 'timestamp' does not have unit MILLI"); | ||
389 | if (!std::static_pointer_cast<arrow::TimestampType>(timestamps_type)->timezone().empty()) | ||
390 | return arrow::Status::Invalid("Field 'timestamp' should have empty time zone name"); | ||
391 | |||
392 | std::shared_ptr<arrow::Field> field_distance_since_start_of_journey = | ||
393 | arrow::field("distance_since_start_of_journey", arrow::uint32()); | ||
394 | std::shared_ptr<arrow::Field> field_day_of_week = | ||
395 | arrow::field("timestamp_iso_day_of_week", arrow::int64()); | ||
396 | std::shared_ptr<arrow::Field> field_date = | ||
397 | arrow::field("timestamp_date", arrow::date32()); | ||
398 | std::shared_ptr<arrow::Field> field_local_time = | ||
399 | arrow::field("timestamp_local_time", arrow::time32(arrow::TimeUnit::SECOND)); | ||
400 | arrow::UInt32Builder distance_since_start_of_journey_builder; | ||
401 | arrow::Int64Builder day_of_week_builder; | ||
402 | arrow::Date32Builder date_builder; | ||
403 | arrow::Time32Builder local_time_builder(arrow::time32(arrow::TimeUnit::SECOND), arrow::default_memory_pool()); | ||
404 | |||
405 | const std::chrono::time_zone *amsterdam = std::chrono::locate_zone("Europe/Amsterdam"); | ||
406 | |||
407 | for (int64_t i = 0; i < table->num_rows(); i++) { | ||
408 | DistanceKey key( | ||
409 | BasicJourneyKey( | ||
410 | std::string(data_owner_codes->Value(i)), | ||
411 | std::string(line_planning_numbers->Value(i)), | ||
412 | journey_numbers->Value(i)), | ||
413 | std::string(user_stop_codes->Value(i))); | ||
414 | |||
415 | uint32_t distance_since_last_user_stop = distance_since_last_user_stops->Value(i); | ||
416 | if (distance_map.contains(key)) { | ||
417 | uint32_t total_distance = distance_since_last_user_stop + static_cast<uint32_t>(distance_map.at(key)); | ||
418 | ARROW_RETURN_NOT_OK(distance_since_start_of_journey_builder.Append(total_distance)); | ||
419 | } else { | ||
420 | ARROW_RETURN_NOT_OK(distance_since_start_of_journey_builder.AppendNull()); | ||
421 | } | ||
422 | |||
423 | // Welp, this has gotten a bit complicated! | ||
424 | std::chrono::sys_seconds timestamp(std::chrono::floor<std::chrono::seconds>(std::chrono::milliseconds(timestamps->Value(i)))); | ||
425 | std::chrono::zoned_seconds zoned_timestamp(amsterdam, timestamp); | ||
426 | std::chrono::local_seconds local_timestamp(zoned_timestamp); | ||
427 | std::chrono::local_days local_date = std::chrono::floor<std::chrono::days>(local_timestamp); | ||
428 | std::chrono::year_month_day date(local_date); | ||
429 | std::chrono::weekday day_of_week(local_date); | ||
430 | std::chrono::hh_mm_ss<std::chrono::seconds> time(local_timestamp - local_date); | ||
431 | std::chrono::sys_days unix_date(date); | ||
432 | |||
433 | int64_t iso_day_of_week = day_of_week.iso_encoding(); | ||
434 | int32_t unix_days = static_cast<int32_t>(unix_date.time_since_epoch().count()); | ||
435 | int32_t secs_since_midnight = static_cast<int32_t>(std::chrono::seconds(time).count()); | ||
436 | |||
437 | ARROW_RETURN_NOT_OK(day_of_week_builder.Append(iso_day_of_week)); | ||
438 | ARROW_RETURN_NOT_OK(date_builder.Append(unix_days)); | ||
439 | ARROW_RETURN_NOT_OK(local_time_builder.Append(secs_since_midnight)); | ||
440 | } | ||
441 | |||
442 | ARROW_ASSIGN_OR_RAISE(auto distance_since_start_of_journey_col_chunk, distance_since_start_of_journey_builder.Finish()); | ||
443 | ARROW_ASSIGN_OR_RAISE(auto day_of_week_col_chunk, day_of_week_builder.Finish()); | ||
444 | ARROW_ASSIGN_OR_RAISE(auto date_col_chunk, date_builder.Finish()); | ||
445 | ARROW_ASSIGN_OR_RAISE(auto local_time_col_chunk, local_time_builder.Finish()); | ||
446 | auto distance_since_start_of_journey_col = | ||
447 | std::make_shared<arrow::ChunkedArray>(distance_since_start_of_journey_col_chunk); | ||
448 | auto day_of_week_col = std::make_shared<arrow::ChunkedArray>(day_of_week_col_chunk); | ||
449 | auto date_col = std::make_shared<arrow::ChunkedArray>(date_col_chunk); | ||
450 | auto local_time_col = std::make_shared<arrow::ChunkedArray>(local_time_col_chunk); | ||
451 | |||
452 | ARROW_ASSIGN_OR_RAISE(table, table->AddColumn( | ||
453 | table->num_columns(), | ||
454 | field_distance_since_start_of_journey, | ||
455 | distance_since_start_of_journey_col)); | ||
456 | ARROW_ASSIGN_OR_RAISE(table, table->AddColumn(table->num_columns(), field_day_of_week, day_of_week_col)); | ||
457 | ARROW_ASSIGN_OR_RAISE(table, table->AddColumn(table->num_columns(), field_date, date_col)); | ||
458 | ARROW_ASSIGN_OR_RAISE(table, table->AddColumn(table->num_columns(), field_local_time, local_time_col)); | ||
459 | |||
460 | return table; | ||
461 | } | ||
462 | |||
463 | arrow::Status processTables(Kv1Records &records, Kv1Index &index) { | ||
464 | std::shared_ptr<arrow::io::RandomAccessFile> input; | ||
465 | ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open("oeuf-input.parquet")); | ||
466 | |||
467 | std::unique_ptr<parquet::arrow::FileReader> arrow_reader; | ||
468 | ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, arrow::default_memory_pool(), &arrow_reader)); | ||
469 | |||
470 | std::shared_ptr<arrow::Table> table; | ||
471 | ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table)); | ||
472 | |||
473 | std::cerr << "Input KV6 file has " << table->num_rows() << " rows" << std::endl; | ||
474 | ARROW_ASSIGN_OR_RAISE(BasicJourneyKeySet journeys, basicJourneys(table)); | ||
475 | std::cerr << "Found " << journeys.size() << " distinct journeys" << std::endl; | ||
476 | DistanceMap distance_map = makeDistanceMap(records, index, journeys); | ||
477 | std::cerr << "Distance map has " << distance_map.size() << " keys" << std::endl; | ||
478 | |||
479 | std::cerr << "Creating augmented table" << std::endl; | ||
480 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> augmented, augment(table, distance_map)); | ||
481 | |||
482 | std::cerr << "Writing augmented table" << std::endl; | ||
483 | return writeArrowTableAsParquetFile(*augmented, "oeuf-augmented.parquet"); | ||
484 | } | ||
485 | |||
486 | int main(int argc, char *argv[]) { | ||
487 | Kv1Records records; | ||
488 | if (!parse(records)) { | ||
489 | fputs("Error parsing records, exiting\n", stderr); | ||
490 | return EXIT_FAILURE; | ||
491 | } | ||
492 | printParsedRecords(records); | ||
493 | fputs("Indexing...\n", stderr); | ||
494 | Kv1Index index(&records); | ||
495 | fprintf(stderr, "Indexed %lu records\n", index.size()); | ||
496 | // Only notice assignments are not indexed. If this equality is not valid, | ||
497 | // then this means that we had duplicate keys or that something else went | ||
498 | // wrong. That would really not be great. | ||
499 | assert(index.size() == records.size() - records.notice_assignments.size()); | ||
500 | printIndexSize(index); | ||
501 | fputs("Linking records...\n", stderr); | ||
502 | kv1LinkRecords(index); | ||
503 | fputs("Done linking\n", stderr); | ||
504 | |||
505 | arrow::Status st = processTables(records, index); | ||
506 | if (!st.ok()) { | ||
507 | std::cerr << "Failed to process tables: " << st << std::endl; | ||
508 | return EXIT_FAILURE; | ||
509 | } | ||
510 | } | ||
diff --git a/src/bundleparquet/.envrc b/src/bundleparquet/.envrc new file mode 100644 index 0000000..694e74f --- /dev/null +++ b/src/bundleparquet/.envrc | |||
@@ -0,0 +1,2 @@ | |||
1 | source_env ../../ | ||
2 | export DEVMODE=1 | ||
diff --git a/src/bundleparquet/Makefile b/src/bundleparquet/Makefile new file mode 100644 index 0000000..170304d --- /dev/null +++ b/src/bundleparquet/Makefile | |||
@@ -0,0 +1,21 @@ | |||
1 | # Taken from: | ||
2 | # Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide | ||
3 | # for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01, | ||
4 | # 2023. [Online]. Available: | ||
5 | # https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html | ||
6 | CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\ | ||
7 | -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \ | ||
8 | -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \ | ||
9 | -D_GLIBCXX_ASSERTIONS \ | ||
10 | -fstrict-flex-arrays=3 \ | ||
11 | -fstack-clash-protection -fstack-protector-strong | ||
12 | LDFLAGS=-larrow -lcurl -lparquet -lprometheus-cpp-push -lprometheus-cpp-core -lz -ltmi8 -Wl,-z,defs \ | ||
13 | -Wl,-z,nodlopen -Wl,-z,noexecstack \ | ||
14 | -Wl,-z,relro -Wl,-z,now | ||
15 | |||
16 | bundleparquet: main.cpp spliturl.cpp | ||
17 | $(CXX) -fPIE -pie -o $@ $^ $(CXXFLAGS) $(LDFLAGS) | ||
18 | |||
19 | .PHONY: clean | ||
20 | clean: | ||
21 | rm bundleparquet | ||
diff --git a/src/bundleparquet/main.cpp b/src/bundleparquet/main.cpp new file mode 100644 index 0000000..05fd881 --- /dev/null +++ b/src/bundleparquet/main.cpp | |||
@@ -0,0 +1,213 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #include <chrono> | ||
4 | #include <deque> | ||
5 | #include <filesystem> | ||
6 | #include <format> | ||
7 | #include <fstream> | ||
8 | #include <iostream> | ||
9 | |||
10 | #include <arrow/api.h> | ||
11 | #include <arrow/io/api.h> | ||
12 | #include <parquet/arrow/reader.h> | ||
13 | |||
14 | #include <nlohmann/json.hpp> | ||
15 | |||
16 | #include <prometheus/counter.h> | ||
17 | #include <prometheus/gateway.h> | ||
18 | #include <prometheus/registry.h> | ||
19 | |||
20 | #include <tmi8/kv6_parquet.hpp> | ||
21 | |||
22 | #include "spliturl.hpp" | ||
23 | |||
24 | static const int MIN_COMBINED_ROWS = 1000000; // one million | ||
25 | static const int MAX_COMBINED_ROWS = 2000000; // two million | ||
26 | |||
27 | struct FileMetadata { | ||
28 | int64_t min_timestamp = 0; | ||
29 | int64_t max_timestamp = 0; | ||
30 | int64_t rows_written = 0; | ||
31 | }; | ||
32 | |||
33 | struct File { | ||
34 | FileMetadata metadata; | ||
35 | std::filesystem::path filename; | ||
36 | }; | ||
37 | |||
38 | FileMetadata readMetadataOf(std::filesystem::path filename) { | ||
39 | std::string meta_filename = std::string(filename) + ".meta.json"; | ||
40 | std::ifstream meta_file = std::ifstream(meta_filename, std::ifstream::in|std::ifstream::binary); | ||
41 | nlohmann::json meta_json; | ||
42 | meta_file >> meta_json; | ||
43 | FileMetadata meta = { | ||
44 | .min_timestamp = meta_json["min_timestamp"], | ||
45 | .max_timestamp = meta_json["max_timestamp"], | ||
46 | .rows_written = meta_json["rows_written"], | ||
47 | }; | ||
48 | return meta; | ||
49 | } | ||
50 | |||
51 | arrow::Status processFirstTables(std::deque<File> &files, prometheus::Counter &rows_written) { | ||
52 | if (files.size() == 0) { | ||
53 | std::cerr << "Did not find any files" << std::endl; | ||
54 | return arrow::Status::OK(); | ||
55 | } | ||
56 | |||
57 | int64_t rows = 0; | ||
58 | |||
59 | std::vector<std::shared_ptr<arrow::Table>> tables; | ||
60 | std::vector<std::filesystem::path> processed; | ||
61 | int64_t min_timestamp = std::numeric_limits<int64_t>::max(); | ||
62 | int64_t max_timestamp = 0; | ||
63 | |||
64 | bool over_capacity_risk = false; | ||
65 | auto it = files.begin(); | ||
66 | while (it != files.end()) { | ||
67 | const std::filesystem::path &filename = it->filename; | ||
68 | const FileMetadata &metadata = it->metadata; | ||
69 | |||
70 | std::shared_ptr<arrow::io::RandomAccessFile> input; | ||
71 | ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open(filename)); | ||
72 | |||
73 | std::unique_ptr<parquet::arrow::FileReader> arrow_reader; | ||
74 | ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, arrow::default_memory_pool(), &arrow_reader)); | ||
75 | |||
76 | if (metadata.min_timestamp < min_timestamp) | ||
77 | min_timestamp = metadata.min_timestamp; | ||
78 | if (metadata.max_timestamp > max_timestamp) | ||
79 | max_timestamp = metadata.max_timestamp; | ||
80 | |||
81 | if (rows + metadata.rows_written > MAX_COMBINED_ROWS) { | ||
82 | over_capacity_risk = true; | ||
83 | break; | ||
84 | } | ||
85 | |||
86 | std::shared_ptr<arrow::Table> table; | ||
87 | ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table)); | ||
88 | tables.push_back(table); | ||
89 | processed.push_back(filename); | ||
90 | rows += metadata.rows_written; | ||
91 | it = files.erase(it); | ||
92 | } | ||
93 | |||
94 | if (rows < MIN_COMBINED_ROWS && !over_capacity_risk) { | ||
95 | std::cerr << "Found files, but not enough to satisfy the minimum amount of rows for the combined file" << std::endl; | ||
96 | std::cerr << "(We have " << rows << "/" << MIN_COMBINED_ROWS << " rows at the moment, so " | ||
97 | << static_cast<float>(rows)/static_cast<float>(MIN_COMBINED_ROWS)*100.f << "%)" << std::endl; | ||
98 | return arrow::Status::OK(); | ||
99 | } else if (rows == 0 && over_capacity_risk) { | ||
100 | const std::filesystem::path &filename = files.front().filename; | ||
101 | std::filesystem::rename(filename, "merged" / filename); | ||
102 | std::filesystem::rename(std::string(filename) + ".meta.json", std::string("merged" / filename) + ".meta.json"); | ||
103 | rows_written.Increment(static_cast<double>(files.front().metadata.rows_written)); | ||
104 | files.pop_front(); | ||
105 | return arrow::Status::OK(); | ||
106 | } | ||
107 | |||
108 | // Default options specify that the schemas are not unified, which is | ||
109 | // luckliy exactly what we want :) | ||
110 | std::shared_ptr<arrow::Table> merged_table; | ||
111 | ARROW_ASSIGN_OR_RAISE(merged_table, arrow::ConcatenateTables(tables)); | ||
112 | |||
113 | auto timestamp = std::chrono::round<std::chrono::seconds>(std::chrono::system_clock::now()); | ||
114 | std::string filename = std::format("merged/oeuf-{:%FT%T%Ez}.parquet", timestamp); | ||
115 | ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*merged_table, filename)); | ||
116 | |||
117 | std::cerr << "Wrote merged table to " << filename << std::endl; | ||
118 | |||
119 | std::ofstream metaf(filename + ".meta.json.part", std::ios::binary); | ||
120 | nlohmann::json meta{ | ||
121 | { "min_timestamp", min_timestamp }, | ||
122 | { "max_timestamp", max_timestamp }, | ||
123 | { "rows_written", rows }, | ||
124 | }; | ||
125 | metaf << meta; | ||
126 | metaf.close(); | ||
127 | std::filesystem::rename(filename + ".meta.json.part", filename + ".meta.json"); | ||
128 | |||
129 | std::cerr << "Wrote merged table metadata" << std::endl; | ||
130 | rows_written.Increment(static_cast<double>(rows)); | ||
131 | |||
132 | for (const std::filesystem::path &filename : processed) { | ||
133 | std::filesystem::remove(filename); | ||
134 | std::filesystem::remove(std::string(filename) + ".meta.json"); | ||
135 | } | ||
136 | |||
137 | std::cerr << "Successfully wrote merged table, metadata and deleted old files" << std::endl; | ||
138 | |||
139 | return arrow::Status::OK(); | ||
140 | } | ||
141 | |||
142 | arrow::Status processTables(std::deque<File> &files, prometheus::Counter &rows_written) { | ||
143 | while (!files.empty()) | ||
144 | ARROW_RETURN_NOT_OK(processFirstTables(files, rows_written)); | ||
145 | return arrow::Status::OK(); | ||
146 | } | ||
147 | |||
148 | int main(int argc, char *argv[]) { | ||
149 | std::filesystem::path cwd = std::filesystem::current_path(); | ||
150 | std::filesystem::create_directory(cwd / "merged"); | ||
151 | |||
152 | const char *prom_push_url = getenv("PROMETHEUS_PUSH_URL"); | ||
153 | if (!prom_push_url || strlen(prom_push_url) == 0) { | ||
154 | std::cerr << "Error: no PROMETHEUS_PUSH_URL set!" << std::endl; | ||
155 | return EXIT_FAILURE; | ||
156 | } | ||
157 | |||
158 | std::string split_err; | ||
159 | auto split_prom_push_url = splitUrl(prom_push_url, &split_err); | ||
160 | if (!split_prom_push_url) { | ||
161 | std::cerr << "Could not process URL in environment variable PROMETHEUS_PUSH_URL: " | ||
162 | << split_err << std::endl; | ||
163 | return EXIT_FAILURE; | ||
164 | } | ||
165 | std::cout << "Prometheus Push URL: " << split_prom_push_url->schemehost << ":" | ||
166 | << split_prom_push_url->portpath << std::endl; | ||
167 | |||
168 | prometheus::Gateway gateway{split_prom_push_url->schemehost, | ||
169 | split_prom_push_url->portpath, | ||
170 | "oeuf-archiver"}; | ||
171 | |||
172 | auto registry = std::make_shared<prometheus::Registry>(); | ||
173 | prometheus::Gauge &rows_available = prometheus::BuildGauge() | ||
174 | .Name("archiver_rows_available") | ||
175 | .Help("Number of rows available to the archiver") | ||
176 | .Register(*registry) | ||
177 | .Add({}); | ||
178 | prometheus::Counter &rows_written = prometheus::BuildCounter() | ||
179 | .Name("archiver_rows_written") | ||
180 | .Help("Number of rows written by the archiver") | ||
181 | .Register(*registry) | ||
182 | .Add({}); | ||
183 | gateway.RegisterCollectable(registry); | ||
184 | |||
185 | std::deque<File> files; | ||
186 | for (auto const &dir_entry : std::filesystem::directory_iterator{cwd}) { | ||
187 | if (!dir_entry.is_regular_file()) continue; | ||
188 | std::filesystem::path filename = dir_entry.path().filename(); | ||
189 | const std::string &filename_str = filename; | ||
190 | if (filename_str.starts_with("oeuf-") && filename_str.ends_with("+00:00.parquet")) { | ||
191 | try { | ||
192 | FileMetadata meta = readMetadataOf(filename); | ||
193 | File file = { .metadata = meta, .filename = filename }; | ||
194 | files.push_back(file); | ||
195 | |||
196 | rows_available.Increment(static_cast<double>(meta.rows_written)); | ||
197 | } catch (const std::exception &e) { | ||
198 | std::cerr << "Failed to read metadata of file " << filename << ": " << e.what() << std::endl; | ||
199 | return EXIT_FAILURE; | ||
200 | } | ||
201 | } | ||
202 | } | ||
203 | |||
204 | std::sort(files.begin(), files.end(), | ||
205 | [](const File &f1, const File &f2) { return f1.filename < f2.filename; }); | ||
206 | arrow::Status st = processTables(files, rows_written); | ||
207 | if (!st.ok()) { | ||
208 | std::cerr << "Failed to process tables: " << st << std::endl; | ||
209 | return EXIT_FAILURE; | ||
210 | } | ||
211 | |||
212 | gateway.Push(); | ||
213 | } | ||
diff --git a/src/bundleparquet/spliturl.cpp b/src/bundleparquet/spliturl.cpp new file mode 100644 index 0000000..90fd821 --- /dev/null +++ b/src/bundleparquet/spliturl.cpp | |||
@@ -0,0 +1,203 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #include <cstring> | ||
4 | #include <iostream> | ||
5 | #include <optional> | ||
6 | #include <sstream> | ||
7 | #include <string> | ||
8 | |||
9 | #include <curl/curl.h> | ||
10 | |||
11 | #include "spliturl.hpp" | ||
12 | |||
13 | // splitUrl takes a URL of the shape '[http[s]://]HOST[:PORT][/PATH]', and | ||
14 | // splits it into two URLs: | ||
15 | // - scheme + host -> '[http[s]://]HOST' | ||
16 | // - port + path -> '[PORT][/PATH]' | ||
17 | // In case an IPv6 address is provided, the host must enclosed in square | ||
18 | // brackets. The zone ID may also be indicated. Note that in the resulting | ||
19 | // parts, the colon preceding the port number is omitted. This is on purpose. | ||
20 | std::optional<SplitUrl> splitUrl(const std::string &url, std::string *error) { | ||
21 | std::stringstream errs; | ||
22 | std::optional<SplitUrl> result; | ||
23 | char *processed = nullptr; | ||
24 | char *scheme = nullptr; | ||
25 | char *user = nullptr; | ||
26 | char *password = nullptr; | ||
27 | char *zoneid = nullptr; | ||
28 | char *query = nullptr; | ||
29 | char *fragment = nullptr; | ||
30 | CURLU *schemehost = nullptr; | ||
31 | char *schemehost_url = nullptr; | ||
32 | char *portpath_url = nullptr; | ||
33 | |||
34 | // Parse the URL, allowing the user to omit the scheme. CURL will use 'https' | ||
35 | // by default if no scheme is specified. | ||
36 | |||
37 | CURLU *parsed = curl_url(); | ||
38 | CURLUcode rc = curl_url_set(parsed, CURLUPART_URL, url.c_str(), CURLU_DEFAULT_SCHEME); | ||
39 | if (rc != CURLUE_OK) { | ||
40 | errs << "Failed to parse URL: " << curl_url_strerror(rc); | ||
41 | goto Exit; | ||
42 | } | ||
43 | |||
44 | // As we parse the URL with the option CURLU_DEFAULT_SCHEME, the CURL API | ||
45 | // won't require the user to provide the scheme part of the URL. It will | ||
46 | // automatically default the scheme to https. However, we do not usually want | ||
47 | // it to default to HTTPS, but HTTP instead (as the use case, connecting to a | ||
48 | // PushGateway server, usually is served over a private network via HTTP). | ||
49 | // | ||
50 | // This is why we check if the scheme was put there by CURL and otherwise set | ||
51 | // it to HTTP. We also check for any other schemes that the user may have | ||
52 | // provided, and reject anything that is not http/https. | ||
53 | if (!url.starts_with("http://") && !url.starts_with("https://")) { | ||
54 | rc = curl_url_get(parsed, CURLUPART_SCHEME, &scheme, 0); | ||
55 | if (rc != CURLUE_OK) { | ||
56 | errs << "Could not get scheme from parsed URL: " << curl_url_strerror(rc); | ||
57 | goto Exit; | ||
58 | } | ||
59 | if (strcmp(scheme, "https")) { | ||
60 | errs << "Unexpected scheme" << scheme << "in provided URL (expected http or https)"; | ||
61 | goto Exit; | ||
62 | } | ||
63 | rc = curl_url_set(parsed, CURLUPART_SCHEME, "http", 0); | ||
64 | if (rc != CURLUE_OK) { | ||
65 | errs << "Could not set URL scheme to http: " << curl_url_strerror(rc); | ||
66 | goto Exit; | ||
67 | } | ||
68 | } | ||
69 | |||
70 | // Turn the parsed URL back into a string. | ||
71 | rc = curl_url_get(parsed, CURLUPART_URL, &processed, 0); | ||
72 | if (rc != CURLUE_OK) { | ||
73 | errs << "Failed to output parsed URL: " << curl_url_strerror(rc); | ||
74 | goto Exit; | ||
75 | } | ||
76 | |||
77 | // This part of the code checks if no prohibited parts are present in the URL | ||
78 | // (basic auth: (user, password), query, fragment). | ||
79 | |||
80 | rc = curl_url_get(parsed, CURLUPART_USER, &user, 0); | ||
81 | if (rc == CURLUE_OK && strlen(user) != 0) { | ||
82 | errs << "Provided URL should not contain a user part"; | ||
83 | goto Exit; | ||
84 | } else if (rc != CURLUE_NO_USER && rc != CURLUE_OK) { | ||
85 | errs << "Failed to get check user part existence in provided url: " << curl_url_strerror(rc); | ||
86 | goto Exit; | ||
87 | } | ||
88 | |||
89 | rc = curl_url_get(parsed, CURLUPART_PASSWORD, &password, 0); | ||
90 | if (rc == CURLUE_OK && strlen(password) != 0) { | ||
91 | errs << "Provided URL should not contain a password part"; | ||
92 | goto Exit; | ||
93 | } else if (rc != CURLUE_NO_PASSWORD && rc != CURLUE_OK) { | ||
94 | errs << "Failed to get check password part existence in provided url: " << curl_url_strerror(rc); | ||
95 | goto Exit; | ||
96 | } | ||
97 | |||
98 | rc = curl_url_get(parsed, CURLUPART_QUERY, &query, 0); | ||
99 | if (rc == CURLUE_OK && strlen(query) != 0) { | ||
100 | errs << "Provided URL should not contain a query part"; | ||
101 | goto Exit; | ||
102 | } else if (rc != CURLUE_NO_QUERY && rc != CURLUE_OK) { | ||
103 | errs << "Failed to get check query part existence in provided url: " << curl_url_strerror(rc); | ||
104 | goto Exit; | ||
105 | } | ||
106 | |||
107 | rc = curl_url_get(parsed, CURLUPART_FRAGMENT, &fragment, 0); | ||
108 | if (rc == CURLUE_OK && strlen(fragment) != 0) { | ||
109 | errs << "Provided URL should not contain a fragment part"; | ||
110 | goto Exit; | ||
111 | } else if (rc != CURLUE_NO_FRAGMENT && rc != CURLUE_OK) { | ||
112 | errs << "Failed to get check fragment part existence in provided url: " << curl_url_strerror(rc); | ||
113 | goto Exit; | ||
114 | } | ||
115 | |||
116 | // Now that we know that the provided URL makes sense, we can start doing | ||
117 | // some arts and crafts. We get started by copying the parsed URL into | ||
118 | // schemehost and simply delete all parts which are not scheme + host. | ||
119 | |||
120 | schemehost = curl_url_dup(parsed); | ||
121 | |||
122 | // CURL BUG WORKAROUND: CURLUPART_ZONEID is NOT copied by curl_url_dup! | ||
123 | // ^ fixed in CURL 8.3.0 after https://curl.se/mail/lib-2023-07/0047.html | ||
124 | rc = curl_url_get(parsed, CURLUPART_ZONEID, &zoneid, 0); | ||
125 | if (rc == CURLUE_OK) { | ||
126 | rc = curl_url_set(schemehost, CURLUPART_ZONEID, zoneid, 0); | ||
127 | if (rc != CURLUE_OK) { | ||
128 | errs << "Could not copy zone ID to duplicated URL: " << curl_url_strerror(rc); | ||
129 | goto Exit; | ||
130 | } | ||
131 | } | ||
132 | rc = curl_url_set(schemehost, CURLUPART_PORT, nullptr, 0); | ||
133 | if (rc != CURLUE_OK) { | ||
134 | errs << "Could not unset port in duplicated URL: " << curl_url_strerror(rc); | ||
135 | goto Exit; | ||
136 | } | ||
137 | rc = curl_url_set(schemehost, CURLUPART_PATH, nullptr, 0); | ||
138 | if (rc != CURLUE_OK) { | ||
139 | errs << "Could not unset path in duplicated URL: " << curl_url_strerror(rc); | ||
140 | goto Exit; | ||
141 | } | ||
142 | |||
143 | // Okay, now we have the schemehost CURLU all ready to go. Note that a URL | ||
144 | // only consisting of a scheme and host is considered valid, so CURL will be | ||
145 | // more than happy to actually turn it into a string for us. Which is exactly | ||
146 | // what we do here :) | ||
147 | |||
148 | rc = curl_url_get(schemehost, CURLUPART_URL, &schemehost_url, 0); | ||
149 | if (rc != CURLUE_OK) { | ||
150 | errs << "Could not get scheme + host URL: " << curl_url_strerror(rc); | ||
151 | goto Exit; | ||
152 | } | ||
153 | |||
154 | // Remove any trailing slash after the scheme + host URL that CURL might have | ||
155 | // put there -- we still want to get a valid URL if we paste the port + path | ||
156 | // part behind it. | ||
157 | |||
158 | if (strlen(schemehost_url) > 0) { | ||
159 | if (schemehost_url[strlen(schemehost_url) - 1] != '/') { | ||
160 | errs << "Scheme + host URL does not end with a slash"; | ||
161 | goto Exit; | ||
162 | } | ||
163 | schemehost_url[strlen(schemehost_url) - 1] = '\0'; | ||
164 | } | ||
165 | |||
166 | // Look, this is really gross. Because the port + path part of the URL is not | ||
167 | // a valid URL itself, but the scheme + host should be a prefix of the full | ||
168 | // URL containing the port + path, we can simply check if it is indeed a | ||
169 | // prefix, and then strip it from the full URL, giving us the port + path | ||
170 | // (after deleting the colon preceding the port). | ||
171 | |||
172 | if (!std::string_view(processed).starts_with(schemehost_url)) { | ||
173 | errs << "Scheme + host URL is not a prefix of the processed URL"; | ||
174 | goto Exit; | ||
175 | } | ||
176 | |||
177 | portpath_url = processed + strlen(schemehost_url); | ||
178 | // We should not have the colon before the port, prometheus-cpp inserts it | ||
179 | if (strlen(portpath_url) > 0 && portpath_url[0] == ':') portpath_url++; | ||
180 | // We do not need a trailing slash | ||
181 | if (strlen(portpath_url) > 0 && portpath_url[strlen(portpath_url)-1] == '/') | ||
182 | portpath_url[strlen(portpath_url)-1] = '\0'; | ||
183 | |||
184 | // It has been done. BLECH | ||
185 | result = std::make_optional<SplitUrl>(schemehost_url, portpath_url); | ||
186 | |||
187 | Exit: | ||
188 | curl_free(processed); | ||
189 | curl_free(scheme); | ||
190 | curl_free(user); | ||
191 | curl_free(password); | ||
192 | curl_free(query); | ||
193 | curl_free(fragment); | ||
194 | curl_free(zoneid); | ||
195 | curl_free(schemehost_url); | ||
196 | curl_url_cleanup(schemehost); | ||
197 | curl_url_cleanup(parsed); | ||
198 | |||
199 | if (!result && error) | ||
200 | *error = errs.str(); | ||
201 | |||
202 | return result; | ||
203 | } | ||
diff --git a/src/bundleparquet/spliturl.hpp b/src/bundleparquet/spliturl.hpp new file mode 100644 index 0000000..d8150e0 --- /dev/null +++ b/src/bundleparquet/spliturl.hpp | |||
@@ -0,0 +1,11 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #include <optional> | ||
4 | #include <string> | ||
5 | |||
6 | struct SplitUrl { | ||
7 | std::string schemehost; | ||
8 | std::string portpath; | ||
9 | }; | ||
10 | |||
11 | std::optional<SplitUrl> splitUrl(const std::string &url, std::string *error = nullptr); | ||
diff --git a/src/filterkv6/.envrc b/src/filterkv6/.envrc new file mode 100644 index 0000000..694e74f --- /dev/null +++ b/src/filterkv6/.envrc | |||
@@ -0,0 +1,2 @@ | |||
1 | source_env ../../ | ||
2 | export DEVMODE=1 | ||
diff --git a/src/filterkv6/Makefile b/src/filterkv6/Makefile new file mode 100644 index 0000000..13bb38e --- /dev/null +++ b/src/filterkv6/Makefile | |||
@@ -0,0 +1,21 @@ | |||
1 | # Taken from: | ||
2 | # Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide | ||
3 | # for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01, | ||
4 | # 2023. [Online]. Available: | ||
5 | # https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html | ||
6 | CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\ | ||
7 | -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \ | ||
8 | -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \ | ||
9 | -D_GLIBCXX_ASSERTIONS \ | ||
10 | -fstrict-flex-arrays=3 \ | ||
11 | -fstack-clash-protection -fstack-protector-strong | ||
12 | LDFLAGS=-larrow -larrow_dataset -lparquet -ltmi8 -Wl,-z,defs \ | ||
13 | -Wl,-z,nodlopen -Wl,-z,noexecstack \ | ||
14 | -Wl,-z,relro -Wl,-z,now | ||
15 | |||
16 | filterkv6: main.cpp | ||
17 | $(CXX) -fPIE -pie -o $@ $^ $(CXXFLAGS) $(LDFLAGS) | ||
18 | |||
19 | .PHONY: clean | ||
20 | clean: | ||
21 | rm filterkv6 | ||
diff --git a/src/filterkv6/main.cpp b/src/filterkv6/main.cpp new file mode 100644 index 0000000..a32220a --- /dev/null +++ b/src/filterkv6/main.cpp | |||
@@ -0,0 +1,106 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #include <chrono> | ||
4 | #include <deque> | ||
5 | #include <filesystem> | ||
6 | #include <format> | ||
7 | #include <fstream> | ||
8 | #include <iostream> | ||
9 | |||
10 | #include <arrow/api.h> | ||
11 | #include <arrow/compute/api.h> | ||
12 | #include <arrow/filesystem/api.h> | ||
13 | #include <arrow/dataset/api.h> | ||
14 | #include <arrow/io/api.h> | ||
15 | |||
16 | #include <tmi8/kv6_parquet.hpp> | ||
17 | |||
18 | namespace ds = arrow::dataset; | ||
19 | namespace cp = arrow::compute; | ||
20 | using namespace arrow; | ||
21 | |||
22 | arrow::Status processTables(std::string lineno) { | ||
23 | auto filesystem = std::make_shared<fs::LocalFileSystem>(); | ||
24 | |||
25 | fs::FileSelector selector; | ||
26 | selector.base_dir = std::filesystem::current_path(); | ||
27 | selector.recursive = false; | ||
28 | |||
29 | auto format = std::static_pointer_cast<ds::FileFormat>(std::make_shared<ds::ParquetFileFormat>()); | ||
30 | |||
31 | ARROW_ASSIGN_OR_RAISE(auto factory, | ||
32 | ds::FileSystemDatasetFactory::Make(filesystem, selector, format, | ||
33 | ds::FileSystemFactoryOptions())); | ||
34 | |||
35 | ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish()); | ||
36 | |||
37 | printf("Scanning dataset for line %s...\n", lineno.c_str()); | ||
38 | // Read specified columns with a row filter | ||
39 | ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan()); | ||
40 | ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::and_({ | ||
41 | cp::equal(cp::field_ref("line_planning_number"), cp::literal(lineno)), | ||
42 | cp::is_valid(cp::field_ref("rd_x")), | ||
43 | cp::is_valid(cp::field_ref("rd_y")), | ||
44 | }))); | ||
45 | |||
46 | ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish()); | ||
47 | ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable()); | ||
48 | |||
49 | puts("Finished loading data, computing stable sort indices..."); | ||
50 | |||
51 | arrow::Datum sort_indices; | ||
52 | cp::SortOptions sort_options; | ||
53 | sort_options.sort_keys = { cp::SortKey("timestamp" /* ascending by default */) }; | ||
54 | ARROW_ASSIGN_OR_RAISE(sort_indices, cp::CallFunction("sort_indices", { table }, &sort_options)); | ||
55 | puts("Finished computing stable sort indices, creating sorted table..."); | ||
56 | |||
57 | arrow::Datum sorted; | ||
58 | ARROW_ASSIGN_OR_RAISE(sorted, cp::CallFunction("take", { table, sort_indices })); | ||
59 | |||
60 | puts("Writing sorted table to disk..."); | ||
61 | ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*sorted.table(), "merged/oeuf-merged.parquet")); | ||
62 | puts("Syncing..."); | ||
63 | sync(); | ||
64 | puts("Done. Have a nice day."); | ||
65 | |||
66 | return arrow::Status::OK(); | ||
67 | } | ||
68 | |||
69 | #define NOTICE "Notice: This tool will fail if any non-Parquet files in are present in the\n" \ | ||
70 | " current working directory. It does not load files which are present in\n" \ | ||
71 | " any possible subdirectories." | ||
72 | |||
73 | const char help[] = | ||
74 | "Usage: %s <LINENO>\n" | ||
75 | "\n" | ||
76 | " LINENO The LinePlanningNumber as in the KV1/KV6 data\n\n" | ||
77 | NOTICE "\n"; | ||
78 | |||
79 | void exitHelp(const char *progname, int code = 1) { | ||
80 | printf(help, progname); | ||
81 | exit(code); | ||
82 | } | ||
83 | |||
84 | int main(int argc, char *argv[]) { | ||
85 | const char *progname = argv[0]; | ||
86 | if (argc != 2) { | ||
87 | puts("Error: incorrect number of arguments provided\n"); | ||
88 | exitHelp(progname); | ||
89 | } | ||
90 | char *lineno = argv[1]; | ||
91 | puts(NOTICE "\n"); | ||
92 | |||
93 | std::filesystem::path cwd = std::filesystem::current_path(); | ||
94 | std::filesystem::create_directory(cwd / "merged"); | ||
95 | |||
96 | puts("Running this program may take a while, especially on big datasets. If you're\n" | ||
97 | "processing the data of a single bus line over the course of multiple months,\n" | ||
98 | "you may see memory usage of up to 10 GiB. Make sure that you have sufficient\n" | ||
99 | "RAM available, to avoid overloading and subsequently freezing your system.\n"); | ||
100 | |||
101 | arrow::Status st = processTables(std::string(lineno)); | ||
102 | if (!st.ok()) { | ||
103 | std::cerr << "Failed to process tables: " << st << std::endl; | ||
104 | return EXIT_FAILURE; | ||
105 | } | ||
106 | } | ||
diff --git a/src/querykv1/.envrc b/src/querykv1/.envrc new file mode 100644 index 0000000..694e74f --- /dev/null +++ b/src/querykv1/.envrc | |||
@@ -0,0 +1,2 @@ | |||
1 | source_env ../../ | ||
2 | export DEVMODE=1 | ||
diff --git a/src/querykv1/.gitignore b/src/querykv1/.gitignore new file mode 100644 index 0000000..5761abc --- /dev/null +++ b/src/querykv1/.gitignore | |||
@@ -0,0 +1 @@ | |||
*.o | |||
diff --git a/src/querykv1/Makefile b/src/querykv1/Makefile new file mode 100644 index 0000000..a8791f5 --- /dev/null +++ b/src/querykv1/Makefile | |||
@@ -0,0 +1,28 @@ | |||
1 | # Taken from: | ||
2 | # Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide | ||
3 | # for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01, | ||
4 | # 2023. [Online]. Available: | ||
5 | # https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html | ||
6 | CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\ | ||
7 | -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \ | ||
8 | -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \ | ||
9 | -D_GLIBCXX_ASSERTIONS \ | ||
10 | -fstrict-flex-arrays=3 \ | ||
11 | -fstack-clash-protection -fstack-protector-strong | ||
12 | LDFLAGS=-ltmi8 -Wl,-z,defs \ | ||
13 | -Wl,-z,nodlopen -Wl,-z,noexecstack \ | ||
14 | -Wl,-z,relro -Wl,-z,now | ||
15 | |||
16 | HDRS=cliopts.hpp daterange.hpp joparoute.hpp journeyinfo.hpp journeyroute.hpp journeys.hpp schedule.hpp | ||
17 | SRCS=main.cpp cliopts.cpp daterange.cpp joparoute.cpp journeyinfo.cpp journeyroute.cpp journeys.cpp schedule.cpp | ||
18 | OBJS=$(patsubst %.cpp,%.o,$(SRCS)) | ||
19 | |||
20 | %.o: %.cpp $(HDRS) | ||
21 | $(CXX) -c -o $@ $< $(CXXFLAGS) | ||
22 | |||
23 | querykv1: $(OBJS) | ||
24 | $(CXX) -fPIE -pie -o $@ $^ $(CXXFLAGS) $(LDFLAGS) | ||
25 | |||
26 | .PHONY: clean | ||
27 | clean: | ||
28 | rm querykv1 | ||
diff --git a/src/querykv1/cliopts.cpp b/src/querykv1/cliopts.cpp new file mode 100644 index 0000000..bef7a98 --- /dev/null +++ b/src/querykv1/cliopts.cpp | |||
@@ -0,0 +1,456 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #include <cstdlib> | ||
4 | #include <cstdio> | ||
5 | #include <string> | ||
6 | #include <string_view> | ||
7 | |||
8 | #include <getopt.h> | ||
9 | |||
10 | #include "cliopts.hpp" | ||
11 | |||
12 | using namespace std::string_view_literals; | ||
13 | |||
14 | const char *opt_set = ""; | ||
15 | const char *opt_unset = nullptr; | ||
16 | |||
17 | const char help[] = R"(Usage: %1$s [OPTIONS] <COMMAND> | ||
18 | |||
19 | Global Options: | ||
20 | --kv1 <PATH> Path to file containing all KV1 data, '-' for stdin | ||
21 | -h, --help Print this help | ||
22 | |||
23 | Commands: | ||
24 | joparoute Generate CSV for journey pattern route | ||
25 | journeyinfo Print some information on a journey | ||
26 | journeyroute Generate CSV for journey route | ||
27 | journeys List journeys of a specific line going from stop A to B | ||
28 | schedule Generate schedule | ||
29 | )"; | ||
30 | |||
31 | const char joparoute_help[] = R"(Usage: %1$s joparoute --line <NUMBER> --jopa <CODE> [OPTIONS] | ||
32 | |||
33 | Options: | ||
34 | --line <NUMBER> Line planning number as in schedule | ||
35 | --jopa <CODE> Journey pattern code as in KV1 data | ||
36 | -o <PATH> Path of file to write to, '-' for stdout | ||
37 | |||
38 | Global Options: | ||
39 | --kv1 <PATH> Path to file containing all KV1 data, '-' for stdin | ||
40 | -h, --help Print this help | ||
41 | )"; | ||
42 | |||
43 | const char journeyroute_help[] = R"(Usage: %1$s journeyroute --line <NUMBER> [OPTIONS] | ||
44 | |||
45 | Options: | ||
46 | --line <NUMBER> Line planning number as in KV1 data | ||
47 | --journey <NUMBER> Journey number as in KV1 data | ||
48 | -o <PATH> Path of file to write to, '-' for stdout | ||
49 | |||
50 | Global Options: | ||
51 | --kv1 <PATH> Path to file containing all KV1 data, '-' for stdin | ||
52 | -h, --help Print this help | ||
53 | )"; | ||
54 | |||
55 | const char journeys_help[] = R"(Usage: %1$s journeys --line <NUMBER> --begin <STOP> --end <STOP> [OPTIONS] | ||
56 | |||
57 | For the --begin and --end arguments, use the following format: | ||
58 | --begin/--end stop:<USRSTOP CODE> | ||
59 | --begin/--end star:<USRSTAR CODE> | ||
60 | |||
61 | Options: | ||
62 | --begin <STOP> User stop code/area of stop the journey should begin at | ||
63 | --end <STOP> User stop code/area of stop the journey should end at | ||
64 | --line <NUMBER> Line planning number to filter on | ||
65 | -o <PATH> Path of file to write to, '-' for stdout | ||
66 | |||
67 | Global Options: | ||
68 | --kv1 <PATH> Path to file containing all KV1 data, '-' for stdin | ||
69 | -h, --help Print this help | ||
70 | )"; | ||
71 | |||
72 | const char journeyinfo_help[] = R"(Usage: %1$s journeyinfo --line <NUMBER> --journey <NUMBER> [OPTIONS] | ||
73 | |||
74 | Options: | ||
75 | --line <NUMBER> Line planning number to filter on | ||
76 | --journey <NUMBER> Journey number as in schedule | ||
77 | |||
78 | Global Options: | ||
79 | --kv1 <PATH> Path to file containing all KV1 data, '-' for stdin | ||
80 | -h, --help Print this help | ||
81 | )"; | ||
82 | |||
83 | const char schedule_help[] = R"(Usage: %1$s schedule --line <NUMBER> [OPTIONS] | ||
84 | |||
85 | Options: | ||
86 | --line <NUMBER> Line planning number to generate schedule for | ||
87 | -o <PATH> Path of file to write to, '-' for stdout | ||
88 | |||
89 | Global Options: | ||
90 | --kv1 <PATH> Path to file containing all KV1 data, '-' for stdin | ||
91 | -h, --help Print this help | ||
92 | )"; | ||
93 | |||
94 | void journeyRouteValidateOptions(const char *progname, Options *options) { | ||
95 | #define X(name, argument, long_, short_) \ | ||
96 | if (#name != "kv1_file_path"sv && #name != "line_planning_number"sv \ | ||
97 | && #name != "journey_number"sv && #name != "help"sv && #name != "output_file_path"sv) \ | ||
98 | if (options->name) { \ | ||
99 | if (long_) { \ | ||
100 | if (short_) fprintf(stderr, "%s: unexpected flag --%s (-%c) for journeyroute subcommand\n\n", progname, static_cast<const char *>(long_), short_); \ | ||
101 | else fprintf(stderr, "%s: unexpected flag --%s for journeyroute subcommand\n\n", progname, static_cast<const char *>(long_)); \ | ||
102 | } else if (short_) fprintf(stderr, "%s: unexpected flag -%c for journeyroute subcommand\n\n", progname, short_); \ | ||
103 | fprintf(stderr, journeyroute_help, progname); \ | ||
104 | exit(1); \ | ||
105 | } | ||
106 | LONG_OPTIONS | ||
107 | SHORT_OPTIONS | ||
108 | #undef X | ||
109 | |||
110 | if (options->positional.size() > 0) { | ||
111 | fprintf(stderr, "%s: unexpected positional argument(s) for journeyroute subcommand\n\n", progname); | ||
112 | for (auto pos : options->positional) fprintf(stderr, "opt: %s\n", pos); | ||
113 | fprintf(stderr, journeyroute_help, progname); | ||
114 | exit(1); | ||
115 | } | ||
116 | |||
117 | if (!options->kv1_file_path) | ||
118 | options->kv1_file_path = "-"; | ||
119 | if (!options->output_file_path) | ||
120 | options->output_file_path = "-"; | ||
121 | if (options->kv1_file_path == ""sv) { | ||
122 | fprintf(stderr, "%s: KV1 file path cannot be empty\n\n", progname); | ||
123 | fprintf(stderr, journeyroute_help, progname); | ||
124 | exit(1); | ||
125 | } | ||
126 | if (options->output_file_path == ""sv) { | ||
127 | fprintf(stderr, "%s: output file path cannot be empty\n\n", progname); | ||
128 | fprintf(stderr, journeyroute_help, progname); | ||
129 | exit(1); | ||
130 | } | ||
131 | if (!options->journey_number || options->journey_number == ""sv) { | ||
132 | fprintf(stderr, "%s: journey number must be provided\n\n", progname); | ||
133 | fprintf(stderr, journeyroute_help, progname); | ||
134 | exit(1); | ||
135 | } | ||
136 | if (!options->line_planning_number || options->line_planning_number == ""sv) { | ||
137 | fprintf(stderr, "%s: line planning number must be provided\n\n", progname); | ||
138 | fprintf(stderr, journeyroute_help, progname); | ||
139 | exit(1); | ||
140 | } | ||
141 | } | ||
142 | |||
143 | void scheduleValidateOptions(const char *progname, Options *options) { | ||
144 | #define X(name, argument, long_, short_) \ | ||
145 | if (#name != "kv1_file_path"sv && #name != "help"sv \ | ||
146 | && #name != "line_planning_number"sv && #name != "output_file_path"sv) \ | ||
147 | if (options->name) { \ | ||
148 | if (long_) { \ | ||
149 | if (short_) fprintf(stderr, "%s: unexpected flag --%s (-%c) for schedule subcommand\n\n", progname, static_cast<const char *>(long_), short_); \ | ||
150 | else fprintf(stderr, "%s: unexpected flag --%s for schedule subcommand\n\n", progname, static_cast<const char *>(long_)); \ | ||
151 | } else if (short_) fprintf(stderr, "%s: unexpected flag -%c for schedule subcommand\n\n", progname, short_); \ | ||
152 | fprintf(stderr, schedule_help, progname); \ | ||
153 | exit(1); \ | ||
154 | } | ||
155 | LONG_OPTIONS | ||
156 | SHORT_OPTIONS | ||
157 | #undef X | ||
158 | |||
159 | if (options->positional.size() > 0) { | ||
160 | fprintf(stderr, "%s: unexpected positional argument(s) for schedule subcommand\n\n", progname); | ||
161 | for (auto pos : options->positional) fprintf(stderr, "opt: %s\n", pos); | ||
162 | fprintf(stderr, schedule_help, progname); | ||
163 | exit(1); | ||
164 | } | ||
165 | |||
166 | if (!options->kv1_file_path) | ||
167 | options->kv1_file_path = "-"; | ||
168 | if (!options->output_file_path) | ||
169 | options->output_file_path = "-"; | ||
170 | if (options->kv1_file_path == ""sv) { | ||
171 | fprintf(stderr, "%s: KV1 file path cannot be empty\n\n", progname); | ||
172 | fprintf(stderr, schedule_help, progname); | ||
173 | exit(1); | ||
174 | } | ||
175 | if (options->output_file_path == ""sv) { | ||
176 | fprintf(stderr, "%s: output file path cannot be empty\n\n", progname); | ||
177 | fprintf(stderr, schedule_help, progname); | ||
178 | exit(1); | ||
179 | } | ||
180 | if (!options->line_planning_number || options->line_planning_number == ""sv) { | ||
181 | fprintf(stderr, "%s: line planning number must be provided\n\n", progname); | ||
182 | fprintf(stderr, schedule_help, progname); | ||
183 | exit(1); | ||
184 | } | ||
185 | } | ||
186 | |||
187 | void journeysValidateOptions(const char *progname, Options *options) { | ||
188 | #define X(name, argument, long_, short_) \ | ||
189 | if (#name != "kv1_file_path"sv && #name != "help"sv \ | ||
190 | && #name != "line_planning_number"sv && #name != "output_file_path"sv \ | ||
191 | && #name != "begin_stop_code"sv && #name != "end_stop_code"sv) \ | ||
192 | if (options->name) { \ | ||
193 | if (long_) { \ | ||
194 | if (short_) fprintf(stderr, "%s: unexpected flag --%s (-%c) for journeys subcommand\n\n", progname, static_cast<const char *>(long_), short_); \ | ||
195 | else fprintf(stderr, "%s: unexpected flag --%s for journeys subcommand\n\n", progname, static_cast<const char *>(long_)); \ | ||
196 | } else if (short_) fprintf(stderr, "%s: unexpected flag -%c for journeys subcommand\n\n", progname, short_); \ | ||
197 | fprintf(stderr, journeys_help, progname); \ | ||
198 | exit(1); \ | ||
199 | } | ||
200 | LONG_OPTIONS | ||
201 | SHORT_OPTIONS | ||
202 | #undef X | ||
203 | |||
204 | if (options->positional.size() > 0) { | ||
205 | fprintf(stderr, "%s: unexpected positional argument(s) for journeys subcommand\n\n", progname); | ||
206 | for (auto pos : options->positional) fprintf(stderr, "opt: %s\n", pos); | ||
207 | fprintf(stderr, journeys_help, progname); | ||
208 | exit(1); | ||
209 | } | ||
210 | |||
211 | if (!options->kv1_file_path) | ||
212 | options->kv1_file_path = "-"; | ||
213 | if (!options->output_file_path) | ||
214 | options->output_file_path = "-"; | ||
215 | if (options->kv1_file_path == ""sv) { | ||
216 | fprintf(stderr, "%s: KV1 file path cannot be empty\n\n", progname); | ||
217 | fprintf(stderr, journeys_help, progname); | ||
218 | exit(1); | ||
219 | } | ||
220 | if (options->output_file_path == ""sv) { | ||
221 | fprintf(stderr, "%s: output file path cannot be empty\n\n", progname); | ||
222 | fprintf(stderr, journeys_help, progname); | ||
223 | exit(1); | ||
224 | } | ||
225 | if (!options->line_planning_number || options->line_planning_number == ""sv) { | ||
226 | fprintf(stderr, "%s: line planning number must be provided\n\n", progname); | ||
227 | fprintf(stderr, journeys_help, progname); | ||
228 | exit(1); | ||
229 | } | ||
230 | if (!options->begin_stop_code || options->begin_stop_code == ""sv) { | ||
231 | fprintf(stderr, "%s: start user stop code must be provided\n\n", progname); | ||
232 | fprintf(stderr, journeys_help, progname); | ||
233 | exit(1); | ||
234 | } | ||
235 | if (!options->end_stop_code || options->end_stop_code == ""sv) { | ||
236 | fprintf(stderr, "%s: end user stop code must be provided\n\n", progname); | ||
237 | fprintf(stderr, journeys_help, progname); | ||
238 | exit(1); | ||
239 | } | ||
240 | if (!std::string_view(options->begin_stop_code).starts_with("star:") | ||
241 | && !std::string_view(options->begin_stop_code).starts_with("stop:")) { | ||
242 | fprintf(stderr, "%s: begin user stop code must be prefixed with star:/stop:\n\n", progname); | ||
243 | fprintf(stderr, journeys_help, progname); | ||
244 | exit(1); | ||
245 | } | ||
246 | if (!std::string_view(options->end_stop_code).starts_with("star:") | ||
247 | && !std::string_view(options->end_stop_code).starts_with("stop:")) { | ||
248 | fprintf(stderr, "%s: end user stop code must be prefixed with star:/stop:\n\n", progname); | ||
249 | fprintf(stderr, journeys_help, progname); | ||
250 | exit(1); | ||
251 | } | ||
252 | } | ||
253 | |||
254 | void journeyInfoValidateOptions(const char *progname, Options *options) { | ||
255 | #define X(name, argument, long_, short_) \ | ||
256 | if (#name != "kv1_file_path"sv && #name != "line_planning_number"sv \ | ||
257 | && #name != "journey_number"sv && #name != "help"sv) \ | ||
258 | if (options->name) { \ | ||
259 | if (long_) { \ | ||
260 | if (short_) fprintf(stderr, "%s: unexpected flag --%s (-%c) for journeyinfo subcommand\n\n", progname, static_cast<const char *>(long_), short_); \ | ||
261 | else fprintf(stderr, "%s: unexpected flag --%s for journeyinfo subcommand\n\n", progname, static_cast<const char *>(long_)); \ | ||
262 | } else if (short_) fprintf(stderr, "%s: unexpected flag -%c for journeyinfo subcommand\n\n", progname, short_); \ | ||
263 | fprintf(stderr, journeyinfo_help, progname); \ | ||
264 | exit(1); \ | ||
265 | } | ||
266 | LONG_OPTIONS | ||
267 | SHORT_OPTIONS | ||
268 | #undef X | ||
269 | |||
270 | if (options->positional.size() > 0) { | ||
271 | fprintf(stderr, "%s: unexpected positional argument(s) for journeyinfo subcommand\n\n", progname); | ||
272 | for (auto pos : options->positional) fprintf(stderr, "opt: %s\n", pos); | ||
273 | fprintf(stderr, journeyinfo_help, progname); | ||
274 | exit(1); | ||
275 | } | ||
276 | |||
277 | if (!options->kv1_file_path) | ||
278 | options->kv1_file_path = "-"; | ||
279 | if (options->kv1_file_path == ""sv) { | ||
280 | fprintf(stderr, "%s: KV1 file path cannot be empty\n\n", progname); | ||
281 | fprintf(stderr, journeyinfo_help, progname); | ||
282 | exit(1); | ||
283 | } | ||
284 | if (!options->journey_number || options->journey_number == ""sv) { | ||
285 | fprintf(stderr, "%s: journey number must be provided\n\n", progname); | ||
286 | fprintf(stderr, journeyinfo_help, progname); | ||
287 | exit(1); | ||
288 | } | ||
289 | if (!options->line_planning_number || options->line_planning_number == ""sv) { | ||
290 | fprintf(stderr, "%s: line planning number must be provided\n\n", progname); | ||
291 | fprintf(stderr, journeyinfo_help, progname); | ||
292 | exit(1); | ||
293 | } | ||
294 | } | ||
295 | |||
296 | void jopaRouteValidateOptions(const char *progname, Options *options) { | ||
297 | #define X(name, argument, long_, short_) \ | ||
298 | if (#name != "kv1_file_path"sv && #name != "line_planning_number"sv \ | ||
299 | && #name != "journey_pattern_code"sv && #name != "help"sv && #name != "output_file_path"sv) \ | ||
300 | if (options->name) { \ | ||
301 | if (long_) { \ | ||
302 | if (short_) fprintf(stderr, "%s: unexpected flag --%s (-%c) for joparoute subcommand\n\n", progname, static_cast<const char *>(long_), short_); \ | ||
303 | else fprintf(stderr, "%s: unexpected flag --%s for joparoute subcommand\n\n", progname, static_cast<const char *>(long_)); \ | ||
304 | } else if (short_) fprintf(stderr, "%s: unexpected flag -%c for joparoute subcommand\n\n", progname, short_); \ | ||
305 | fprintf(stderr, joparoute_help, progname); \ | ||
306 | exit(1); \ | ||
307 | } | ||
308 | LONG_OPTIONS | ||
309 | SHORT_OPTIONS | ||
310 | #undef X | ||
311 | |||
312 | if (options->positional.size() > 0) { | ||
313 | fprintf(stderr, "%s: unexpected positional argument(s) for joparoute subcommand\n\n", progname); | ||
314 | for (auto pos : options->positional) fprintf(stderr, "opt: %s\n", pos); | ||
315 | fprintf(stderr, joparoute_help, progname); | ||
316 | exit(1); | ||
317 | } | ||
318 | |||
319 | if (!options->kv1_file_path) | ||
320 | options->kv1_file_path = "-"; | ||
321 | if (!options->output_file_path) | ||
322 | options->output_file_path = "-"; | ||
323 | if (options->kv1_file_path == ""sv) { | ||
324 | fprintf(stderr, "%s: KV1 file path cannot be empty\n\n", progname); | ||
325 | fprintf(stderr, joparoute_help, progname); | ||
326 | exit(1); | ||
327 | } | ||
328 | if (options->output_file_path == ""sv) { | ||
329 | fprintf(stderr, "%s: output file path cannot be empty\n\n", progname); | ||
330 | fprintf(stderr, joparoute_help, progname); | ||
331 | exit(1); | ||
332 | } | ||
333 | if (!options->journey_pattern_code || options->journey_pattern_code == ""sv) { | ||
334 | fprintf(stderr, "%s: journey pattern code must be provided\n\n", progname); | ||
335 | fprintf(stderr, joparoute_help, progname); | ||
336 | exit(1); | ||
337 | } | ||
338 | if (!options->line_planning_number || options->line_planning_number == ""sv) { | ||
339 | fprintf(stderr, "%s: line planning number must be provided\n\n", progname); | ||
340 | fprintf(stderr, joparoute_help, progname); | ||
341 | exit(1); | ||
342 | } | ||
343 | } | ||
344 | |||
345 | struct ShortFlag { | ||
346 | int has_arg; | ||
347 | int c; | ||
348 | }; | ||
349 | |||
350 | template<ShortFlag ...flags> | ||
351 | const std::string mkargarr = | ||
352 | (std::string() | ||
353 | + ... | ||
354 | + (flags.c == 0 | ||
355 | ? "" | ||
356 | : std::string((const char[]){ flags.c, '\0' }) | ||
357 | + (flags.has_arg == required_argument | ||
358 | ? ":" | ||
359 | : flags.has_arg == optional_argument | ||
360 | ? "::" | ||
361 | : ""))); | ||
362 | |||
363 | #define X(name, has_arg, long_, short_) ShortFlag(has_arg, short_), | ||
364 | const std::string argarr = mkargarr<SHORT_OPTIONS LONG_OPTIONS ShortFlag(no_argument, 0)>; | ||
365 | #undef X | ||
366 | |||
367 | Options parseOptions(int argc, char *argv[]) { | ||
368 | const char *progname = argv[0]; | ||
369 | |||
370 | // Struct with options for augmentkv6. | ||
371 | Options options; | ||
372 | |||
373 | static option long_options[] = { | ||
374 | #define X(name, argument, long_, short_) { long_, argument, nullptr, short_ }, | ||
375 | LONG_OPTIONS | ||
376 | #undef X | ||
377 | { 0 }, | ||
378 | }; | ||
379 | |||
380 | int c; | ||
381 | int option_index = 0; | ||
382 | bool error = false; | ||
383 | while ((c = getopt_long(argc, argv, argarr.c_str(), long_options, &option_index)) != -1) { | ||
384 | // If a long option was used, c corresponds with val. We have val = 0 for | ||
385 | // options which have no short alternative, so checking for c = 0 gives us | ||
386 | // whether a long option with no short alternative was used. | ||
387 | // Below, we check for c = 'h', which corresponds with the long option | ||
388 | // '--help', for which val = 'h'. | ||
389 | if (c == 0) { | ||
390 | const char *name = long_options[option_index].name; | ||
391 | #define X(opt_name, opt_has_arg, opt_long, opt_short) \ | ||
392 | if (name == opt_long ## sv) { options.opt_name = optarg; continue; } | ||
393 | LONG_OPTIONS | ||
394 | #undef X | ||
395 | error = true; | ||
396 | } | ||
397 | #define X(opt_name, opt_has_arg, opt_long, opt_short) \ | ||
398 | if (c == opt_short) { options.opt_name = optarg ? optarg : opt_set; continue; } | ||
399 | LONG_OPTIONS | ||
400 | SHORT_OPTIONS | ||
401 | #undef X | ||
402 | error = true; | ||
403 | } | ||
404 | |||
405 | if (optind < argc) | ||
406 | options.subcommand = argv[optind++]; | ||
407 | while (optind < argc) | ||
408 | options.positional.push_back(argv[optind++]); | ||
409 | |||
410 | if (options.subcommand | ||
411 | && options.subcommand != "schedule"sv | ||
412 | && options.subcommand != "joparoute"sv | ||
413 | && options.subcommand != "journeyinfo"sv | ||
414 | && options.subcommand != "journeyroute"sv | ||
415 | && options.subcommand != "journeys"sv) { | ||
416 | fprintf(stderr, "%s: unknown subcommand '%s'\n\n", progname, options.subcommand); | ||
417 | fprintf(stderr, help, progname); | ||
418 | exit(1); | ||
419 | } | ||
420 | if (options.subcommand && error) { | ||
421 | fputc('\n', stderr); | ||
422 | if (options.subcommand == "joparoute"sv) fprintf(stderr, joparoute_help, progname); | ||
423 | if (options.subcommand == "journeyinfo"sv) fprintf(stderr, journeyinfo_help, progname); | ||
424 | if (options.subcommand == "journeyroute"sv) fprintf(stderr, journeyroute_help, progname); | ||
425 | if (options.subcommand == "journeys"sv) fprintf(stderr, journeys_help, progname); | ||
426 | if (options.subcommand == "schedule"sv) fprintf(stderr, schedule_help, progname); | ||
427 | exit(1); | ||
428 | } | ||
429 | if (error || !options.subcommand) { | ||
430 | if (!options.subcommand) fprintf(stderr, "%s: no subcommand provided\n", progname); | ||
431 | fputc('\n', stderr); | ||
432 | fprintf(stderr, help, progname); | ||
433 | exit(1); | ||
434 | } | ||
435 | if (options.help) { | ||
436 | if (options.subcommand == "joparoute"sv) fprintf(stderr, joparoute_help, progname); | ||
437 | if (options.subcommand == "journeyinfo"sv) fprintf(stderr, journeyinfo_help, progname); | ||
438 | if (options.subcommand == "journeyroute"sv) fprintf(stderr, journeyroute_help, progname); | ||
439 | if (options.subcommand == "journeys"sv) fprintf(stderr, journeys_help, progname); | ||
440 | if (options.subcommand == "schedule"sv) fprintf(stderr, schedule_help, progname); | ||
441 | exit(0); | ||
442 | } | ||
443 | |||
444 | if (options.subcommand == "joparoute"sv) | ||
445 | jopaRouteValidateOptions(progname, &options); | ||
446 | if (options.subcommand == "journeyinfo"sv) | ||
447 | journeyInfoValidateOptions(progname, &options); | ||
448 | if (options.subcommand == "journeyroute"sv) | ||
449 | journeyRouteValidateOptions(progname, &options); | ||
450 | if (options.subcommand == "journeys"sv) | ||
451 | journeysValidateOptions(progname, &options); | ||
452 | if (options.subcommand == "schedule"sv) | ||
453 | scheduleValidateOptions(progname, &options); | ||
454 | |||
455 | return options; | ||
456 | } | ||
diff --git a/src/querykv1/cliopts.hpp b/src/querykv1/cliopts.hpp new file mode 100644 index 0000000..df8630e --- /dev/null +++ b/src/querykv1/cliopts.hpp | |||
@@ -0,0 +1,35 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #ifndef OEUF_QUERYKV1_CLIOPTS_HPP | ||
4 | #define OEUF_QUERYKV1_CLIOPTS_HPP | ||
5 | |||
6 | #include <vector> | ||
7 | |||
8 | #define LONG_OPTIONS \ | ||
9 | /* name req/opt/no arg long short */ | ||
10 | X(kv1_file_path, required_argument, "kv1", 0 ) \ | ||
11 | X(line_planning_number, required_argument, "line", 0 ) \ | ||
12 | X(journey_number, required_argument, "journey", 0 ) \ | ||
13 | X(journey_pattern_code, required_argument, "jopa", 0 ) \ | ||
14 | X(begin_stop_code, required_argument, "begin", 0 ) \ | ||
15 | X(end_stop_code, required_argument, "end", 0 ) \ | ||
16 | X(help, no_argument, "help", 'h') | ||
17 | |||
18 | #define SHORT_OPTIONS \ | ||
19 | X(output_file_path, required_argument, nullptr, 'o') | ||
20 | |||
21 | struct Options { | ||
22 | const char *subcommand = nullptr; | ||
23 | std::vector<const char *> positional; | ||
24 | #define X(name, argument, long_, short_) const char *name = nullptr; | ||
25 | LONG_OPTIONS | ||
26 | SHORT_OPTIONS | ||
27 | #undef X | ||
28 | }; | ||
29 | |||
30 | extern const char *opt_set; | ||
31 | extern const char *opt_unset; | ||
32 | |||
33 | Options parseOptions(int argc, char *argv[]); | ||
34 | |||
35 | #endif // OEUF_QUERYKV1_CLIOPTS_HPP | ||
diff --git a/src/querykv1/daterange.cpp b/src/querykv1/daterange.cpp new file mode 100644 index 0000000..5ce42bf --- /dev/null +++ b/src/querykv1/daterange.cpp | |||
@@ -0,0 +1,91 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #include "daterange.hpp" | ||
4 | |||
5 | static std::chrono::year_month_day nextDay(std::chrono::year_month_day ymd) { | ||
6 | return std::chrono::sys_days(ymd) + std::chrono::days(1); | ||
7 | } | ||
8 | |||
9 | // DateRange expresses the date range [from, thru]. | ||
10 | DateRange::Iterator &DateRange::Iterator::operator++() { | ||
11 | ymd_ = nextDay(ymd_); | ||
12 | return *this; | ||
13 | } | ||
14 | |||
15 | std::chrono::year_month_day DateRange::Iterator::operator*() const { | ||
16 | return ymd_; | ||
17 | } | ||
18 | |||
19 | std::chrono::year_month_day DateRange::Iterator::ymd() const { | ||
20 | return ymd_; | ||
21 | } | ||
22 | |||
23 | DateRange::Iterator::Iterator(std::chrono::year_month_day ymd) : ymd_(ymd) {} | ||
24 | |||
25 | DateRange::DateRange(std::chrono::year_month_day from, std::chrono::year_month_day thru) | ||
26 | : from_(from), thru_(thru) | ||
27 | {} | ||
28 | |||
29 | DateRange::Iterator DateRange::begin() const { | ||
30 | return DateRange::Iterator(from_); | ||
31 | } | ||
32 | |||
33 | DateRange::Iterator DateRange::end() const { | ||
34 | return DateRange::Iterator(nextDay(thru_)); | ||
35 | } | ||
36 | |||
37 | bool DateRange::valid() const { | ||
38 | return from_ <= thru_; | ||
39 | } | ||
40 | |||
41 | std::chrono::year_month_day DateRange::from() const { | ||
42 | return from_; | ||
43 | } | ||
44 | |||
45 | std::chrono::year_month_day DateRange::thru() const { | ||
46 | return thru_; | ||
47 | } | ||
48 | |||
49 | bool operator==(const DateRange::Iterator a, const DateRange::Iterator b) { | ||
50 | return *a == *b; | ||
51 | } | ||
52 | |||
53 | DateRangeSeq::DateRangeSeq(std::initializer_list<DateRange> ranges) | ||
54 | : DateRangeSeq(ranges.begin(), ranges.end()) | ||
55 | {} | ||
56 | |||
57 | DateRangeSeq DateRangeSeq::clampFrom(std::chrono::year_month_day from) const { | ||
58 | std::vector<DateRange> new_ranges; | ||
59 | new_ranges.reserve(ranges_.size()); | ||
60 | for (const DateRange range : ranges_) { | ||
61 | if (range.from() < from) { | ||
62 | if (range.thru() < from) | ||
63 | continue; | ||
64 | new_ranges.emplace_back(from, range.thru()); | ||
65 | } | ||
66 | new_ranges.push_back(range); | ||
67 | } | ||
68 | return DateRangeSeq(new_ranges.begin(), new_ranges.end()); | ||
69 | } | ||
70 | |||
71 | DateRangeSeq DateRangeSeq::clampThru(std::chrono::year_month_day thru) const { | ||
72 | std::vector<DateRange> new_ranges; | ||
73 | new_ranges.reserve(ranges_.size()); | ||
74 | for (const DateRange range : ranges_) { | ||
75 | if (range.thru() > thru) { | ||
76 | if (range.from() > thru) | ||
77 | continue; | ||
78 | new_ranges.emplace_back(range.from(), thru); | ||
79 | } | ||
80 | new_ranges.push_back(range); | ||
81 | } | ||
82 | return DateRangeSeq(new_ranges.begin(), new_ranges.end()); | ||
83 | } | ||
84 | |||
85 | std::vector<DateRange>::const_iterator DateRangeSeq::begin() const { | ||
86 | return ranges_.begin(); | ||
87 | } | ||
88 | |||
89 | std::vector<DateRange>::const_iterator DateRangeSeq::end() const { | ||
90 | return ranges_.end(); | ||
91 | } | ||
diff --git a/src/querykv1/daterange.hpp b/src/querykv1/daterange.hpp new file mode 100644 index 0000000..e34c39c --- /dev/null +++ b/src/querykv1/daterange.hpp | |||
@@ -0,0 +1,118 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #ifndef OEUF_QUERYKV1_DATERANGE_HPP | ||
4 | #define OEUF_QUERYKV1_DATERANGE_HPP | ||
5 | |||
6 | #include <cassert> | ||
7 | #include <chrono> | ||
8 | #include <concepts> | ||
9 | #include <iterator> | ||
10 | #include <utility> | ||
11 | #include <vector> | ||
12 | |||
13 | // DateRange expresses the date range [from, thru]. | ||
14 | class DateRange { | ||
15 | public: | ||
16 | class Iterator { | ||
17 | friend class DateRange; | ||
18 | |||
19 | public: | ||
20 | Iterator &operator++(); | ||
21 | |||
22 | std::chrono::year_month_day operator*() const; | ||
23 | std::chrono::year_month_day ymd() const; | ||
24 | |||
25 | private: | ||
26 | explicit Iterator(std::chrono::year_month_day ymd); | ||
27 | |||
28 | std::chrono::year_month_day ymd_; | ||
29 | }; | ||
30 | |||
31 | explicit DateRange(std::chrono::year_month_day from, std::chrono::year_month_day thru); | ||
32 | |||
33 | Iterator begin() const; | ||
34 | Iterator end() const; | ||
35 | bool valid() const; | ||
36 | std::chrono::year_month_day from() const; | ||
37 | std::chrono::year_month_day thru() const; | ||
38 | |||
39 | private: | ||
40 | std::chrono::year_month_day from_; | ||
41 | std::chrono::year_month_day thru_; | ||
42 | }; | ||
43 | |||
44 | bool operator==(const DateRange::Iterator a, const DateRange::Iterator b); | ||
45 | |||
46 | template<typename Tp, typename T> | ||
47 | concept DerefsTo = requires(Tp p) { | ||
48 | { *p } -> std::convertible_to<T>; | ||
49 | }; | ||
50 | |||
51 | class DateRangeSeq { | ||
52 | // The way LE and GE are ordered makes a difference for how the sorting | ||
53 | // (insertion based on lower_bound) works. Do not carelessly reorder this. | ||
54 | enum LeGe { | ||
55 | GE, // >= | ||
56 | LE, // <= | ||
57 | }; | ||
58 | |||
59 | std::vector<DateRange> ranges_; | ||
60 | |||
61 | public: | ||
62 | template<std::input_iterator InputIt> | ||
63 | requires DerefsTo<InputIt, DateRange> | ||
64 | explicit DateRangeSeq(InputIt begin, InputIt end) { | ||
65 | // We convert every inclusive date range [x, y] into (x, >=) and (y, <=) | ||
66 | // and put these into a list, using binary search to make sure that these | ||
67 | // stay ordered. We then reduce this list, removing tautological | ||
68 | // predicates, giving us a final list of ranges that do not overlap. | ||
69 | |||
70 | std::vector<std::pair<std::chrono::year_month_day, LeGe>> preds; | ||
71 | |||
72 | size_t n = 0; | ||
73 | for (auto it = begin; it != end; it++) { | ||
74 | auto &range = *it; | ||
75 | if (!range.valid()) continue; | ||
76 | |||
77 | auto a = std::make_pair(range.from(), GE); | ||
78 | auto b = std::make_pair(range.thru(), LE); | ||
79 | preds.insert(std::lower_bound(preds.begin(), preds.end(), a), a); | ||
80 | preds.insert(std::lower_bound(preds.begin(), preds.end(), b), b); | ||
81 | |||
82 | n++; | ||
83 | } | ||
84 | |||
85 | if (preds.empty()) | ||
86 | return; | ||
87 | |||
88 | assert(preds.size() >= 2); | ||
89 | assert(preds.front().second == GE); | ||
90 | assert(preds.back().second == LE); | ||
91 | |||
92 | std::chrono::year_month_day begin_ymd = preds[0].first; | ||
93 | for (size_t i = 1; i < preds.size(); i++) { | ||
94 | if (preds[i].second == LE && (i + 1 == preds.size() || preds[i + 1].second == GE)) { | ||
95 | std::chrono::year_month_day end_ymd = preds[i].first; | ||
96 | if (!ranges_.empty() && ranges_.back().thru() == begin_ymd) | ||
97 | ranges_.back() = DateRange(ranges_.back().from(), end_ymd); | ||
98 | else | ||
99 | ranges_.push_back(DateRange(begin_ymd, end_ymd)); | ||
100 | if (i + 1 != preds.size()) { | ||
101 | begin_ymd = preds[i + 1].first; | ||
102 | i++; | ||
103 | } | ||
104 | } | ||
105 | } | ||
106 | } | ||
107 | |||
108 | explicit DateRangeSeq(std::initializer_list<DateRange> ranges); | ||
109 | |||
110 | DateRangeSeq clampFrom(std::chrono::year_month_day from) const; | ||
111 | DateRangeSeq clampThru(std::chrono::year_month_day thru) const; | ||
112 | |||
113 | public: | ||
114 | std::vector<DateRange>::const_iterator begin() const; | ||
115 | std::vector<DateRange>::const_iterator end() const; | ||
116 | }; | ||
117 | |||
118 | #endif // OEUF_QUERYKV1_DATERANGE_HPP | ||
diff --git a/src/querykv1/grammar.abnf b/src/querykv1/grammar.abnf new file mode 100644 index 0000000..1c93760 --- /dev/null +++ b/src/querykv1/grammar.abnf | |||
@@ -0,0 +1,44 @@ | |||
1 | ; This grammar does *not* allow fields to contain LF, unless the entire content | ||
2 | ; of the field is quoted. The file is simply rejected otherwise. | ||
3 | ; I took the liberty to take some inspiration from the somewhat similar IETF RFC 4180. | ||
4 | |||
5 | document = [header NEWLINE] (comment / record / empty-line) *(NEWLINE (comment / record / empty-line)) [NEWLINE] / header | ||
6 | |||
7 | header = OPENBRACK *NOTCRLF | ||
8 | comment = SEMICOLON *NOTCRLF | ||
9 | |||
10 | empty-line = *WHITESPACE | ||
11 | |||
12 | record = field *(PIPE field) | ||
13 | field = *WHITESPACE field-data *WHITESPACE | ||
14 | field-data = escaped / unescaped | ||
15 | |||
16 | ; Unescaped fields are also allowed to contain double quotes, | ||
17 | ; they are just not interpreted in any special way. | ||
18 | escaped = DQUOTE *(TEXTDATA / WHITESPACE / NEWLINE / PIPE / 2DQUOTE) DQUOTE | ||
19 | unescaped = [TEXTDATA *(*WHITESPACE (TEXTDATA / DQUOTE))] | ||
20 | |||
21 | HTAB = %x09 ; <horizontal tab, "\t"> | ||
22 | LF = %x0A ; <line feed, "\n"> | ||
23 | VTAB = %x0B ; <vertical tab, "\v"> | ||
24 | FF = %x0C ; <form feed, "\f"> | ||
25 | CR = %x0D ; <carriage return, "\r"> | ||
26 | SPACE = %x20 ; <space, " "> | ||
27 | DQUOTE = %x22 ; " | ||
28 | SEMICOLON = %x3B ; ; | ||
29 | OPENBRACK = %x5B ; [ | ||
30 | PIPE = %x7C ; | | ||
31 | |||
32 | ; All codepoints, except CR, LF, SPACE, FF, HTAB, VTAB, PIPE, DQUOTE | ||
33 | ; Semicolon is included, as comments are only defined as 'lines starting with a semicolon'. | ||
34 | ; So it should be fine if a semicolon is part of a field, the rest of the line would not | ||
35 | ; be interpreted as a comment in that case. | ||
36 | TEXTDATA = %x00-08 / %x0E-1F / %x21 / %x23-5A / %x5C-7B / %x7D-10FFFF | ||
37 | |||
38 | ; Not including LF here even though TMI8/KV1 does not officially consider it | ||
39 | ; a newline, as newlines are defined as 'CR optionally followed by LF' | ||
40 | WHITESPACE = SPACE / FF / HTAB / VTAB | ||
41 | |||
42 | ; All codepoints excluding CR and LF | ||
43 | NOTCRLF = %x00-09 / %x0B-0C / %x0E-10FFFF | ||
44 | NEWLINE = CR [LF] | ||
diff --git a/src/querykv1/grammar.ebnf b/src/querykv1/grammar.ebnf new file mode 100644 index 0000000..94f8cde --- /dev/null +++ b/src/querykv1/grammar.ebnf | |||
@@ -0,0 +1,47 @@ | |||
1 | /* This grammar does allow fields to contain stray LFs, not after any specific | ||
2 | * CR. I took the liberty to take some inspiration from the somewhat similar | ||
3 | * IETF RFC 4180. | ||
4 | */ | ||
5 | document ::= (header NEWLINE)? (comment | record | empty-line) (NEWLINE (comment | record | empty-line))* NEWLINE? | header | ||
6 | |||
7 | header ::= OPENBRACK NOTCR* | ||
8 | comment ::= SEMICOLON NOTCR* | ||
9 | |||
10 | empty-line ::= WHITESPACE* | ||
11 | |||
12 | record ::= field (PIPE field)* | ||
13 | field ::= WHITESPACE* field-data WHITESPACE* | ||
14 | field-data ::= DQUOTE escaped DQUOTE | unescaped | ||
15 | |||
16 | /* Unescaped fields are also allowed to contain double quotes, they are just | ||
17 | * not interpreted in any special way. | ||
18 | */ | ||
19 | escaped ::= (TEXTDATA | WHITESPACE | NEWLINE | PIPE | DQUOTE DQUOTE)* | ||
20 | unescaped ::= (TEXTDATA (WHITESPACE* (TEXTDATA | DQUOTE))*)? | ||
21 | |||
22 | HTAB ::= #x09 /* <horizontal tab, "\t"> */ | ||
23 | LF ::= #x0A /* <line feed, "\n"> */ | ||
24 | VTAB ::= #x0B /* <vertical tab, "\v"> */ | ||
25 | FF ::= #x0C /* <form feed, "\f"> */ | ||
26 | CR ::= #x0D /* <carriage return, "\r"> */ | ||
27 | SPACE ::= #x20 /* <space, " "> */ | ||
28 | DQUOTE ::= #x22 /* " */ | ||
29 | SEMICOLON ::= #x3B /* ; */ | ||
30 | OPENBRACK ::= #x5B /* [ */ | ||
31 | PIPE ::= #x7C /* | */ | ||
32 | |||
33 | /* All codepoints, except CR, LF, SPACE, FF, HTAB, VTAB, PIPE, DQUOTE. | ||
34 | * Semicolon is included, as comments are only defined as 'lines starting with | ||
35 | * a semicolon'. So it should be fine if a semicolon is part of a field, the | ||
36 | * rest of the line would not be interpreted as a comment in that case. | ||
37 | */ | ||
38 | TEXTDATA ::= [#x00-#x08#x0E-#x1F#x21#x23-#x5A#x5C-#x7B#x7D-#x10FFFF] | ||
39 | |||
40 | /* Including LF here as TMI8/KV1 does not consider it a newline, | ||
41 | * as newlines are defined as 'CR optionally followed by LF' | ||
42 | */ | ||
43 | WHITESPACE ::= SPACE | LF | FF | HTAB | VTAB | ||
44 | |||
45 | /* All codepoints excluding CR and LF */ | ||
46 | NOTCR ::= [#x00-#x0C#x0E-#x10FFFF] | ||
47 | NEWLINE ::= CR LF? | ||
diff --git a/src/querykv1/grammar.ebnf.bak b/src/querykv1/grammar.ebnf.bak new file mode 100644 index 0000000..b5acbf5 --- /dev/null +++ b/src/querykv1/grammar.ebnf.bak | |||
@@ -0,0 +1,23 @@ | |||
1 | document ::= (header NEWLINE)? (comment | record | empty-line) (NEWLINE (comment | record | empty-line))* NEWLINE? | header | ||
2 | header ::= OPENBRACK NOTCRLF* | ||
3 | comment ::= SEMICOLON NOTCRLF* | ||
4 | empty-line ::= WHITESPACE* | ||
5 | record ::= field (PIPE field)* | ||
6 | field ::= WHITESPACE* field-data WHITESPACE* | ||
7 | field-data ::= escaped | unescaped | ||
8 | escaped ::= DQUOTE (TEXTDATA | WHITESPACE | NEWLINE | PIPE | DQUOTE DQUOTE)* DQUOTE | ||
9 | unescaped ::= (TEXTDATA (WHITESPACE* (TEXTDATA | DQUOTE))*)? | ||
10 | HTAB ::= #x09 | ||
11 | LF ::= #x0A | ||
12 | VTAB ::= #x0B | ||
13 | FF ::= #x0C | ||
14 | CR ::= #x0D | ||
15 | SPACE ::= #x20 | ||
16 | DQUOTE ::= #x22 | ||
17 | SEMICOLON ::= #x3B | ||
18 | OPENBRACK ::= #x5B | ||
19 | PIPE ::= #x7C | ||
20 | WHITESPACE ::= SPACE | FF | HTAB | VTAB | ||
21 | NOTCRLF ::= [#x00-#x09#x0B-#x0C#x0E-#x10FFFF] | ||
22 | TEXTDATA ::= [#x00-#x08#x0E-#x1F#x21#x23-#x5A#x5C-#x7B#x7D-#x10FFFF] | ||
23 | NEWLINE ::= CR LF? | ||
diff --git a/src/querykv1/joparoute.cpp b/src/querykv1/joparoute.cpp new file mode 100644 index 0000000..94ed359 --- /dev/null +++ b/src/querykv1/joparoute.cpp | |||
@@ -0,0 +1,102 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #include <cstdio> | ||
4 | #include <iostream> | ||
5 | #include <string_view> | ||
6 | |||
7 | #include "joparoute.hpp" | ||
8 | |||
9 | using namespace std::string_view_literals; | ||
10 | |||
11 | void jopaRoute(const Options &options, Kv1Records &records, Kv1Index &index) { | ||
12 | FILE *out = stdout; | ||
13 | if (options.output_file_path != "-"sv) | ||
14 | out = fopen(options.output_file_path, "wb"); | ||
15 | if (!out) { | ||
16 | fprintf(stderr, "Open %s: %s\n", options.output_file_path, strerrordesc_np(errno)); | ||
17 | exit(EXIT_FAILURE); | ||
18 | } | ||
19 | |||
20 | const std::string data_owner_code = "CXX"; | ||
21 | Kv1JourneyPattern::Key jopa_key( | ||
22 | // Of course it is bad to hardcode this, but we really have no time to make | ||
23 | // everything nice and dynamic. We're only working with CXX data anyway, | ||
24 | // and provide no support for the 'Schedules and Passing Times' KV1 | ||
25 | // variant. | ||
26 | data_owner_code, | ||
27 | options.line_planning_number, | ||
28 | options.journey_pattern_code); | ||
29 | |||
30 | const Kv1JourneyPattern *jopa = index.journey_patterns[jopa_key]; | ||
31 | if (!jopa) { | ||
32 | std::cerr << "Journey pattern not found" << std::endl; | ||
33 | return; | ||
34 | } | ||
35 | const Kv1Line *line = jopa->p_line; | ||
36 | |||
37 | struct Point { | ||
38 | bool is_stop = false; | ||
39 | const Kv1JourneyPatternTimingLink *jopatili = nullptr; | ||
40 | const Kv1Link *link = nullptr; | ||
41 | const Kv1Point *point = nullptr; | ||
42 | double distance_since_start_of_link = 0; | ||
43 | double distance_since_start_of_journey = 0; | ||
44 | }; | ||
45 | std::vector<Point> points; | ||
46 | |||
47 | for (size_t i = 0; i < records.journey_pattern_timing_links.size(); i++) { | ||
48 | const Kv1JourneyPatternTimingLink *jopatili = &records.journey_pattern_timing_links[i]; | ||
49 | if (jopatili->key.line_planning_number == jopa->key.line_planning_number | ||
50 | && jopatili->key.journey_pattern_code == jopa->key.journey_pattern_code) { | ||
51 | const Kv1Link::Key link_key(data_owner_code, jopatili->user_stop_code_begin, | ||
52 | jopatili->user_stop_code_end, line->transport_type); | ||
53 | const Kv1Link *link = index.links[link_key]; | ||
54 | const Kv1UserStopPoint::Key link_begin_key(data_owner_code, jopatili->user_stop_code_begin); | ||
55 | const Kv1UserStopPoint::Key link_end_key(data_owner_code, jopatili->user_stop_code_end); | ||
56 | const Kv1UserStopPoint *link_begin = index.user_stop_points[link_begin_key]; | ||
57 | const Kv1UserStopPoint *link_end = index.user_stop_points[link_end_key]; | ||
58 | |||
59 | points.emplace_back(true, jopatili, link, link_begin->p_point, 0); | ||
60 | |||
61 | for (size_t j = 0; j < records.point_on_links.size(); j++) { | ||
62 | Kv1PointOnLink *pool = &records.point_on_links[j]; | ||
63 | if (pool->key.user_stop_code_begin == jopatili->user_stop_code_begin | ||
64 | && pool->key.user_stop_code_end == jopatili->user_stop_code_end | ||
65 | && pool->key.transport_type == jopatili->p_line->transport_type) { | ||
66 | points.emplace_back(false, jopatili, link, pool->p_point, pool->distance_since_start_of_link); | ||
67 | } | ||
68 | } | ||
69 | |||
70 | points.emplace_back(true, jopatili, link, link_end->p_point, link->distance); | ||
71 | } | ||
72 | } | ||
73 | |||
74 | std::sort(points.begin(), points.end(), [](Point &a, Point &b) { | ||
75 | if (a.jopatili->key.timing_link_order != b.jopatili->key.timing_link_order) | ||
76 | return a.jopatili->key.timing_link_order < b.jopatili->key.timing_link_order; | ||
77 | return a.distance_since_start_of_link < b.distance_since_start_of_link; | ||
78 | }); | ||
79 | |||
80 | double distance_since_start_of_journey = 0; | ||
81 | for (size_t i = 0; i < points.size(); i++) { | ||
82 | Point *p = &points[i]; | ||
83 | if (i > 0) { | ||
84 | Point *prev = &points[i - 1]; | ||
85 | if (p->link != prev->link) { | ||
86 | distance_since_start_of_journey += prev->link->distance; | ||
87 | } | ||
88 | } | ||
89 | p->distance_since_start_of_journey = distance_since_start_of_journey + p->distance_since_start_of_link; | ||
90 | } | ||
91 | |||
92 | fputs("is_stop,link_usrstop_begin,link_usrstop_end,point_code,rd_x,rd_y,distance_since_start_of_link,distance_since_start_of_journey\n", out); | ||
93 | for (const auto &point : points) { | ||
94 | fprintf(out, "%s,%s,%s,%s,%f,%f,%f,%f\n", | ||
95 | point.is_stop ? "true" : "false", | ||
96 | point.jopatili->user_stop_code_begin.c_str(), point.jopatili->user_stop_code_end.c_str(), | ||
97 | point.point->key.point_code.c_str(), point.point->location_x_ew, point.point->location_y_ns, | ||
98 | point.distance_since_start_of_link, point.distance_since_start_of_journey); | ||
99 | } | ||
100 | |||
101 | if (options.output_file_path != "-"sv) fclose(out); | ||
102 | } | ||
diff --git a/src/querykv1/joparoute.hpp b/src/querykv1/joparoute.hpp new file mode 100644 index 0000000..ade94e8 --- /dev/null +++ b/src/querykv1/joparoute.hpp | |||
@@ -0,0 +1,13 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #ifndef OEUF_QUERYKV1_JOPAROUTE_HPP | ||
4 | #define OEUF_QUERYKV1_JOPAROUTE_HPP | ||
5 | |||
6 | #include <tmi8/kv1_types.hpp> | ||
7 | #include <tmi8/kv1_index.hpp> | ||
8 | |||
9 | #include "cliopts.hpp" | ||
10 | |||
11 | void jopaRoute(const Options &options, Kv1Records &records, Kv1Index &index); | ||
12 | |||
13 | #endif // OEUF_QUERYKV1_JOPAROUTE_HPP | ||
diff --git a/src/querykv1/journeyinfo.cpp b/src/querykv1/journeyinfo.cpp new file mode 100644 index 0000000..bd29490 --- /dev/null +++ b/src/querykv1/journeyinfo.cpp | |||
@@ -0,0 +1,64 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #include <iostream> | ||
4 | |||
5 | #include "journeyinfo.hpp" | ||
6 | |||
7 | void journeyInfo(const Options &options, Kv1Records &records, Kv1Index &index) { | ||
8 | std::cout << "Info for journey " << options.line_planning_number | ||
9 | << "/" << options.journey_number << std::endl; | ||
10 | |||
11 | std::unordered_map<std::string, const Kv1UserStopPoint *> usrstops; | ||
12 | for (size_t i = 0; i < records.user_stop_points.size(); i++) { | ||
13 | const Kv1UserStopPoint *usrstop = &records.user_stop_points[i]; | ||
14 | usrstops[usrstop->key.user_stop_code] = usrstop; | ||
15 | } | ||
16 | |||
17 | for (const auto &pujo : records.public_journeys) { | ||
18 | if (pujo.key.line_planning_number != options.line_planning_number | ||
19 | || std::to_string(pujo.key.journey_number) != options.journey_number) | ||
20 | continue; | ||
21 | |||
22 | std::vector<const Kv1JourneyPatternTimingLink *> timing_links; | ||
23 | for (size_t i = 0; i < records.journey_pattern_timing_links.size(); i++) { | ||
24 | const Kv1JourneyPatternTimingLink *jopatili = &records.journey_pattern_timing_links[i]; | ||
25 | if (jopatili->key.line_planning_number != options.line_planning_number | ||
26 | || jopatili->key.journey_pattern_code != pujo.journey_pattern_code) | ||
27 | continue; | ||
28 | timing_links.push_back(jopatili); | ||
29 | } | ||
30 | |||
31 | std::sort(timing_links.begin(), timing_links.end(), [](auto a, auto b) -> bool { | ||
32 | return a->key.timing_link_order < b->key.timing_link_order; | ||
33 | }); | ||
34 | auto begin_stop = timing_links.front()->user_stop_code_begin; | ||
35 | auto end_stop = timing_links.back()->user_stop_code_end; | ||
36 | |||
37 | const auto *begin = usrstops[begin_stop]; | ||
38 | const auto *end = usrstops[end_stop]; | ||
39 | |||
40 | std::cout << " Journey pattern: " << pujo.key.line_planning_number | ||
41 | << "/" << pujo.journey_pattern_code << std::endl | ||
42 | << " Begin stop: " << begin_stop | ||
43 | << "; name: " << std::quoted(begin->name) | ||
44 | << "; town: " << std::quoted(begin->town) << std::endl | ||
45 | << " End stop: " << end_stop | ||
46 | << "; name: " << std::quoted(end->name) | ||
47 | << "; town: " << std::quoted(end->town) << std::endl; | ||
48 | |||
49 | const auto *begin_star = begin->p_user_stop_area; | ||
50 | const auto *end_star = end->p_user_stop_area; | ||
51 | if (begin_star) | ||
52 | std::cout << " Begin stop area: " << begin_star->key.user_stop_area_code | ||
53 | << "; name: " << std::quoted(begin_star->name) | ||
54 | << ", town: " << std::quoted(begin_star->town) | ||
55 | << std::endl; | ||
56 | if (end_star) | ||
57 | std::cout << " End stop area: " << end_star->key.user_stop_area_code | ||
58 | << "; name: " << std::quoted(end_star->name) | ||
59 | << ", town: " << std::quoted(end_star->town) | ||
60 | << std::endl; | ||
61 | |||
62 | break; | ||
63 | } | ||
64 | } | ||
diff --git a/src/querykv1/journeyinfo.hpp b/src/querykv1/journeyinfo.hpp new file mode 100644 index 0000000..2a2118d --- /dev/null +++ b/src/querykv1/journeyinfo.hpp | |||
@@ -0,0 +1,13 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #ifndef OEUF_QUERYKV1_JOURNEYINFO_HPP | ||
4 | #define OEUF_QUERYKV1_JOURNEYINFO_HPP | ||
5 | |||
6 | #include <tmi8/kv1_types.hpp> | ||
7 | #include <tmi8/kv1_index.hpp> | ||
8 | |||
9 | #include "cliopts.hpp" | ||
10 | |||
11 | void journeyInfo(const Options &options, Kv1Records &records, Kv1Index &index); | ||
12 | |||
13 | #endif // OEUF_QUERYKV1_JOURNEYINFO_HPP | ||
diff --git a/src/querykv1/journeyroute.cpp b/src/querykv1/journeyroute.cpp new file mode 100644 index 0000000..013ea1c --- /dev/null +++ b/src/querykv1/journeyroute.cpp | |||
@@ -0,0 +1,96 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #include <iostream> | ||
4 | #include <string_view> | ||
5 | |||
6 | #include "journeyroute.hpp" | ||
7 | |||
8 | using namespace std::string_view_literals; | ||
9 | |||
10 | void journeyRoute(const Options &options, Kv1Records &records, Kv1Index &index) { | ||
11 | FILE *out = stdout; | ||
12 | if (options.output_file_path != "-"sv) | ||
13 | out = fopen(options.output_file_path, "wb"); | ||
14 | if (!out) { | ||
15 | fprintf(stderr, "Open %s: %s\n", options.output_file_path, strerrordesc_np(errno)); | ||
16 | exit(EXIT_FAILURE); | ||
17 | } | ||
18 | |||
19 | for (auto &pujo : records.public_journeys) { | ||
20 | if (pujo.key.line_planning_number == options.line_planning_number && std::to_string(pujo.key.journey_number) == options.journey_number) { | ||
21 | fprintf(stderr, "Got PUJO %s/%s:\n", options.line_planning_number, options.journey_number); | ||
22 | fprintf(stderr, " Day type: %s\n", pujo.key.day_type.c_str()); | ||
23 | auto &pegr = *pujo.p_period_group; | ||
24 | fprintf(stderr, " PEGR Code: %s\n", pegr.key.period_group_code.c_str()); | ||
25 | fprintf(stderr, " PEGR Description: %s\n", pegr.description.c_str()); | ||
26 | fprintf(stderr, " SPECDAY Code: %s\n", pujo.key.specific_day_code.c_str()); | ||
27 | auto &timdemgrp = *pujo.p_time_demand_group; | ||
28 | |||
29 | for (auto &pegrval : records.period_group_validities) { | ||
30 | if (pegrval.key.period_group_code == pegr.key.period_group_code) { | ||
31 | fprintf(stderr, "Got PEGRVAL for PEGR %s\n", pegr.key.period_group_code.c_str()); | ||
32 | std::cerr << " Valid from: " << pegrval.key.valid_from << std::endl; | ||
33 | std::cerr << " Valid thru: " << pegrval.valid_thru << std::endl; | ||
34 | } | ||
35 | } | ||
36 | |||
37 | struct Point { | ||
38 | Kv1JourneyPatternTimingLink *jopatili = nullptr; | ||
39 | Kv1TimeDemandGroupRunTime *timdemrnt = nullptr; | ||
40 | double distance_since_start_of_link = 0; | ||
41 | double rd_x = 0; | ||
42 | double rd_y = 0; | ||
43 | double total_time_s = 0; | ||
44 | }; | ||
45 | std::vector<Point> points; | ||
46 | |||
47 | for (size_t i = 0; i < records.time_demand_group_run_times.size(); i++) { | ||
48 | Kv1TimeDemandGroupRunTime *timdemrnt = &records.time_demand_group_run_times[i]; | ||
49 | if (timdemrnt->key.line_planning_number == timdemgrp.key.line_planning_number | ||
50 | && timdemrnt->key.journey_pattern_code == timdemgrp.key.journey_pattern_code | ||
51 | && timdemrnt->key.time_demand_group_code == timdemgrp.key.time_demand_group_code) { | ||
52 | Kv1JourneyPatternTimingLink *jopatili = timdemrnt->p_journey_pattern_timing_link; | ||
53 | for (auto &pool : records.point_on_links) { | ||
54 | if (pool.key.user_stop_code_begin == timdemrnt->user_stop_code_begin | ||
55 | && pool.key.user_stop_code_end == timdemrnt->user_stop_code_end | ||
56 | && pool.key.transport_type == jopatili->p_line->transport_type) { | ||
57 | points.emplace_back( | ||
58 | jopatili, | ||
59 | timdemrnt, | ||
60 | pool.distance_since_start_of_link, | ||
61 | pool.p_point->location_x_ew, | ||
62 | pool.p_point->location_y_ns | ||
63 | ); | ||
64 | } | ||
65 | } | ||
66 | } | ||
67 | } | ||
68 | |||
69 | std::sort(points.begin(), points.end(), [](Point &a, Point &b) { | ||
70 | if (a.jopatili->key.timing_link_order != b.jopatili->key.timing_link_order) | ||
71 | return a.jopatili->key.timing_link_order < b.jopatili->key.timing_link_order; | ||
72 | return a.distance_since_start_of_link < b.distance_since_start_of_link; | ||
73 | }); | ||
74 | |||
75 | double total_time_s = 0; | ||
76 | for (size_t i = 0; i < points.size(); i++) { | ||
77 | Point *p = &points[i]; | ||
78 | p->total_time_s = total_time_s; | ||
79 | if (i > 0) { | ||
80 | Point *prev = &points[i - 1]; | ||
81 | if (p->timdemrnt != prev->timdemrnt) { | ||
82 | total_time_s += prev->timdemrnt->total_drive_time_s; | ||
83 | prev->total_time_s = total_time_s; | ||
84 | } | ||
85 | } | ||
86 | } | ||
87 | |||
88 | fputs("rd_x,rd_y,total_time_s,is_timing_stop\n", out); | ||
89 | for (const auto &point : points) { | ||
90 | fprintf(out, "%f,%f,%f,%d\n", point.rd_x, point.rd_y, point.total_time_s, point.jopatili->is_timing_stop); | ||
91 | } | ||
92 | } | ||
93 | } | ||
94 | |||
95 | if (options.output_file_path != "-"sv) fclose(out); | ||
96 | } | ||
diff --git a/src/querykv1/journeyroute.hpp b/src/querykv1/journeyroute.hpp new file mode 100644 index 0000000..ccd996c --- /dev/null +++ b/src/querykv1/journeyroute.hpp | |||
@@ -0,0 +1,13 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #ifndef OEUF_QUERYKV1_JOURNEYROUTE_HPP | ||
4 | #define OEUF_QUERYKV1_JOURNEYROUTE_HPP | ||
5 | |||
6 | #include <tmi8/kv1_types.hpp> | ||
7 | #include <tmi8/kv1_index.hpp> | ||
8 | |||
9 | #include "cliopts.hpp" | ||
10 | |||
11 | void journeyRoute(const Options &options, Kv1Records &records, Kv1Index &index); | ||
12 | |||
13 | #endif // OEUF_QUERYKV1_JOURNEYROUTE_HPP | ||
diff --git a/src/querykv1/journeys.cpp b/src/querykv1/journeys.cpp new file mode 100644 index 0000000..96566b2 --- /dev/null +++ b/src/querykv1/journeys.cpp | |||
@@ -0,0 +1,95 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #include <iostream> | ||
4 | #include <map> | ||
5 | #include <string_view> | ||
6 | #include <unordered_set> | ||
7 | |||
8 | #include "journeys.hpp" | ||
9 | |||
10 | using namespace std::string_view_literals; | ||
11 | |||
12 | void journeys(const Options &options, Kv1Records &records, Kv1Index &index) { | ||
13 | const std::string_view want_begin_stop_code(options.begin_stop_code); | ||
14 | const std::string_view want_end_stop_code(options.end_stop_code); | ||
15 | |||
16 | FILE *out = stdout; | ||
17 | if (options.output_file_path != "-"sv) | ||
18 | out = fopen(options.output_file_path, "wb"); | ||
19 | if (!out) { | ||
20 | fprintf(stderr, "Open %s: %s\n", options.output_file_path, strerrordesc_np(errno)); | ||
21 | exit(EXIT_FAILURE); | ||
22 | } | ||
23 | |||
24 | std::cerr << "Generating journeys for " << options.line_planning_number << ", going from stop " | ||
25 | << options.begin_stop_code << " to " << options.end_stop_code << std::endl; | ||
26 | |||
27 | std::unordered_map<std::string, const Kv1UserStopPoint *> usrstops; | ||
28 | for (size_t i = 0; i < records.user_stop_points.size(); i++) { | ||
29 | const Kv1UserStopPoint *usrstop = &records.user_stop_points[i]; | ||
30 | usrstops[usrstop->key.user_stop_code] = usrstop; | ||
31 | } | ||
32 | |||
33 | std::unordered_set<std::string> journey_pattern_codes; | ||
34 | for (const auto &jopa : records.journey_patterns) { | ||
35 | if (jopa.key.line_planning_number != options.line_planning_number) | ||
36 | continue; | ||
37 | journey_pattern_codes.insert(jopa.key.journey_pattern_code); | ||
38 | } | ||
39 | |||
40 | std::unordered_map<std::string, std::vector<const Kv1JourneyPatternTimingLink *>> jopatilis; | ||
41 | for (size_t i = 0; i < records.journey_pattern_timing_links.size(); i++) { | ||
42 | const Kv1JourneyPatternTimingLink *jopatili = &records.journey_pattern_timing_links[i]; | ||
43 | if (jopatili->key.line_planning_number != options.line_planning_number | ||
44 | || !journey_pattern_codes.contains(jopatili->key.journey_pattern_code)) | ||
45 | continue; | ||
46 | jopatilis[jopatili->key.journey_pattern_code].push_back(jopatili); | ||
47 | } | ||
48 | |||
49 | std::unordered_set<std::string> valid_jopas; | ||
50 | for (auto &[journey_pattern_code, timing_links] : jopatilis) { | ||
51 | std::sort(timing_links.begin(), timing_links.end(), [](auto a, auto b) -> bool { | ||
52 | return a->key.timing_link_order < b->key.timing_link_order; | ||
53 | }); | ||
54 | auto begin_stop = timing_links.front()->user_stop_code_begin; | ||
55 | auto end_stop = timing_links.back()->user_stop_code_end; | ||
56 | |||
57 | const auto *begin = usrstops[begin_stop]; | ||
58 | const auto *end = usrstops[end_stop]; | ||
59 | |||
60 | bool begin_stop_ok = false; | ||
61 | if (want_begin_stop_code.starts_with("stop:")) | ||
62 | begin_stop_ok = want_begin_stop_code.substr(5) == begin_stop; | ||
63 | else if (want_begin_stop_code.starts_with("star:")) | ||
64 | begin_stop_ok = want_begin_stop_code.substr(5) == begin->user_stop_area_code; | ||
65 | |||
66 | bool end_stop_ok = false; | ||
67 | if (want_end_stop_code.starts_with("stop:")) | ||
68 | end_stop_ok = want_end_stop_code.substr(5) == end_stop; | ||
69 | else if (want_end_stop_code.starts_with("star:")) | ||
70 | end_stop_ok = want_end_stop_code.substr(5) == end->user_stop_area_code; | ||
71 | |||
72 | if (begin_stop_ok && end_stop_ok) { | ||
73 | valid_jopas.insert(journey_pattern_code); | ||
74 | } | ||
75 | } | ||
76 | |||
77 | std::map<int, std::pair<std::string, std::string>> valid_journeys; | ||
78 | for (const auto &pujo : records.public_journeys) { | ||
79 | if (pujo.key.line_planning_number == options.line_planning_number | ||
80 | && valid_jopas.contains(pujo.journey_pattern_code)) { | ||
81 | valid_journeys[pujo.key.journey_number] = { | ||
82 | pujo.time_demand_group_code, | ||
83 | pujo.journey_pattern_code, | ||
84 | }; | ||
85 | } | ||
86 | } | ||
87 | |||
88 | fputs("journey_number,time_demand_group_code,journey_pattern_code\n", out); | ||
89 | for (const auto &[journey_number, timdemgrp_jopa] : valid_journeys) { | ||
90 | const auto &[time_demand_group_code, journey_pattern_code] = timdemgrp_jopa; | ||
91 | fprintf(out, "%d,%s,%s\n", journey_number, time_demand_group_code.c_str(), journey_pattern_code.c_str()); | ||
92 | } | ||
93 | |||
94 | if (options.output_file_path != "-"sv) fclose(out); | ||
95 | } | ||
diff --git a/src/querykv1/journeys.hpp b/src/querykv1/journeys.hpp new file mode 100644 index 0000000..cf615c7 --- /dev/null +++ b/src/querykv1/journeys.hpp | |||
@@ -0,0 +1,13 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #ifndef OEUF_QUERYKV1_JOURNEYS_HPP | ||
4 | #define OEUF_QUERYKV1_JOURNEYS_HPP | ||
5 | |||
6 | #include <tmi8/kv1_types.hpp> | ||
7 | #include <tmi8/kv1_index.hpp> | ||
8 | |||
9 | #include "cliopts.hpp" | ||
10 | |||
11 | void journeys(const Options &options, Kv1Records &records, Kv1Index &index); | ||
12 | |||
13 | #endif // OEUF_QUERYKV1_JOURNEYS_HPP | ||
diff --git a/src/querykv1/main.cpp b/src/querykv1/main.cpp new file mode 100644 index 0000000..6c606ba --- /dev/null +++ b/src/querykv1/main.cpp | |||
@@ -0,0 +1,198 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #include <chrono> | ||
4 | #include <cstdio> | ||
5 | #include <string> | ||
6 | #include <string_view> | ||
7 | #include <vector> | ||
8 | |||
9 | #include <tmi8/kv1_types.hpp> | ||
10 | #include <tmi8/kv1_index.hpp> | ||
11 | #include <tmi8/kv1_lexer.hpp> | ||
12 | #include <tmi8/kv1_parser.hpp> | ||
13 | |||
14 | #include "cliopts.hpp" | ||
15 | #include "joparoute.hpp" | ||
16 | #include "journeyinfo.hpp" | ||
17 | #include "journeyroute.hpp" | ||
18 | #include "journeys.hpp" | ||
19 | #include "schedule.hpp" | ||
20 | |||
21 | using namespace std::string_view_literals; | ||
22 | |||
23 | using TimingClock = std::conditional_t< | ||
24 | std::chrono::high_resolution_clock::is_steady, | ||
25 | std::chrono::high_resolution_clock, | ||
26 | std::chrono::steady_clock>; | ||
27 | |||
28 | std::string readKv1(const char *path) { | ||
29 | FILE *in = stdin; | ||
30 | if (path != "-"sv) in = fopen(path, "rb"); | ||
31 | else fputs("Reading KV1 from standard input\n", stderr); | ||
32 | if (!in) { | ||
33 | fprintf(stderr, "Open %s: %s\n", path, strerrordesc_np(errno)); | ||
34 | exit(1); | ||
35 | } | ||
36 | |||
37 | char buf[4096]; | ||
38 | std::string data; | ||
39 | while (!feof(in) && !ferror(in)) { | ||
40 | size_t read = fread(buf, sizeof(char), 4096, in); | ||
41 | data.append(buf, read); | ||
42 | } | ||
43 | if (ferror(in)) { | ||
44 | if (path == "-"sv) | ||
45 | fputs("Error when reading from stdin\n", stderr); | ||
46 | else | ||
47 | fprintf(stderr, "Error reading from file \"%s\"\n", path); | ||
48 | exit(1); | ||
49 | } | ||
50 | fprintf(stderr, "Read %lu bytes\n", data.size()); | ||
51 | |||
52 | if (path != "-"sv) | ||
53 | fclose(in); | ||
54 | |||
55 | return data; | ||
56 | } | ||
57 | |||
58 | std::vector<Kv1Token> lex(const char *path) { | ||
59 | std::string data = readKv1(path); | ||
60 | |||
61 | auto start = TimingClock::now(); | ||
62 | Kv1Lexer lexer(data); | ||
63 | lexer.lex(); | ||
64 | auto end = TimingClock::now(); | ||
65 | |||
66 | std::chrono::duration<double> elapsed{end - start}; | ||
67 | double bytes = static_cast<double>(data.size()) / 1'000'000; | ||
68 | double speed = bytes / elapsed.count(); | ||
69 | |||
70 | if (!lexer.errors.empty()) { | ||
71 | fputs("Lexer reported errors:\n", stderr); | ||
72 | for (const auto &error : lexer.errors) | ||
73 | fprintf(stderr, "- %s\n", error.c_str()); | ||
74 | exit(1); | ||
75 | } | ||
76 | |||
77 | fprintf(stderr, "Got %lu tokens\n", lexer.tokens.size()); | ||
78 | fprintf(stderr, "Duration: %f s\n", elapsed.count()); | ||
79 | fprintf(stderr, "Speed: %f MB/s\n", speed); | ||
80 | |||
81 | return std::move(lexer.tokens); | ||
82 | } | ||
83 | |||
84 | bool parse(const char *path, Kv1Records &into) { | ||
85 | std::vector<Kv1Token> tokens = lex(path); | ||
86 | |||
87 | Kv1Parser parser(tokens, into); | ||
88 | parser.parse(); | ||
89 | |||
90 | bool ok = true; | ||
91 | if (!parser.gerrors.empty()) { | ||
92 | ok = false; | ||
93 | fputs("Parser reported errors:\n", stderr); | ||
94 | for (const auto &error : parser.gerrors) | ||
95 | fprintf(stderr, "- %s\n", error.c_str()); | ||
96 | } | ||
97 | if (!parser.warns.empty()) { | ||
98 | fputs("Parser reported warnings:\n", stderr); | ||
99 | for (const auto &warn : parser.warns) | ||
100 | fprintf(stderr, "- %s\n", warn.c_str()); | ||
101 | } | ||
102 | |||
103 | fprintf(stderr, "Parsed %lu records\n", into.size()); | ||
104 | |||
105 | return ok; | ||
106 | } | ||
107 | |||
108 | void printParsedRecords(const Kv1Records &records) { | ||
109 | fputs("Parsed records:\n", stderr); | ||
110 | fprintf(stderr, " organizational_units: %lu\n", records.organizational_units.size()); | ||
111 | fprintf(stderr, " higher_organizational_units: %lu\n", records.higher_organizational_units.size()); | ||
112 | fprintf(stderr, " user_stop_points: %lu\n", records.user_stop_points.size()); | ||
113 | fprintf(stderr, " user_stop_areas: %lu\n", records.user_stop_areas.size()); | ||
114 | fprintf(stderr, " timing_links: %lu\n", records.timing_links.size()); | ||
115 | fprintf(stderr, " links: %lu\n", records.links.size()); | ||
116 | fprintf(stderr, " lines: %lu\n", records.lines.size()); | ||
117 | fprintf(stderr, " destinations: %lu\n", records.destinations.size()); | ||
118 | fprintf(stderr, " journey_patterns: %lu\n", records.journey_patterns.size()); | ||
119 | fprintf(stderr, " concession_financer_relations: %lu\n", records.concession_financer_relations.size()); | ||
120 | fprintf(stderr, " concession_areas: %lu\n", records.concession_areas.size()); | ||
121 | fprintf(stderr, " financers: %lu\n", records.financers.size()); | ||
122 | fprintf(stderr, " journey_pattern_timing_links: %lu\n", records.journey_pattern_timing_links.size()); | ||
123 | fprintf(stderr, " points: %lu\n", records.points.size()); | ||
124 | fprintf(stderr, " point_on_links: %lu\n", records.point_on_links.size()); | ||
125 | fprintf(stderr, " icons: %lu\n", records.icons.size()); | ||
126 | fprintf(stderr, " notices: %lu\n", records.notices.size()); | ||
127 | fprintf(stderr, " notice_assignments: %lu\n", records.notice_assignments.size()); | ||
128 | fprintf(stderr, " time_demand_groups: %lu\n", records.time_demand_groups.size()); | ||
129 | fprintf(stderr, " time_demand_group_run_times: %lu\n", records.time_demand_group_run_times.size()); | ||
130 | fprintf(stderr, " period_groups: %lu\n", records.period_groups.size()); | ||
131 | fprintf(stderr, " specific_days: %lu\n", records.specific_days.size()); | ||
132 | fprintf(stderr, " timetable_versions: %lu\n", records.timetable_versions.size()); | ||
133 | fprintf(stderr, " public_journeys: %lu\n", records.public_journeys.size()); | ||
134 | fprintf(stderr, " period_group_validities: %lu\n", records.period_group_validities.size()); | ||
135 | fprintf(stderr, " exceptional_operating_days: %lu\n", records.exceptional_operating_days.size()); | ||
136 | fprintf(stderr, " schedule_versions: %lu\n", records.schedule_versions.size()); | ||
137 | fprintf(stderr, " public_journey_passing_times: %lu\n", records.public_journey_passing_times.size()); | ||
138 | fprintf(stderr, " operating_days: %lu\n", records.operating_days.size()); | ||
139 | } | ||
140 | |||
141 | void printIndexSize(const Kv1Index &index) { | ||
142 | fputs("Index size:\n", stderr); | ||
143 | fprintf(stderr, " organizational_units: %lu\n", index.organizational_units.size()); | ||
144 | fprintf(stderr, " user_stop_points: %lu\n", index.user_stop_points.size()); | ||
145 | fprintf(stderr, " user_stop_areas: %lu\n", index.user_stop_areas.size()); | ||
146 | fprintf(stderr, " timing_links: %lu\n", index.timing_links.size()); | ||
147 | fprintf(stderr, " links: %lu\n", index.links.size()); | ||
148 | fprintf(stderr, " lines: %lu\n", index.lines.size()); | ||
149 | fprintf(stderr, " destinations: %lu\n", index.destinations.size()); | ||
150 | fprintf(stderr, " journey_patterns: %lu\n", index.journey_patterns.size()); | ||
151 | fprintf(stderr, " concession_financer_relations: %lu\n", index.concession_financer_relations.size()); | ||
152 | fprintf(stderr, " concession_areas: %lu\n", index.concession_areas.size()); | ||
153 | fprintf(stderr, " financers: %lu\n", index.financers.size()); | ||
154 | fprintf(stderr, " journey_pattern_timing_links: %lu\n", index.journey_pattern_timing_links.size()); | ||
155 | fprintf(stderr, " points: %lu\n", index.points.size()); | ||
156 | fprintf(stderr, " point_on_links: %lu\n", index.point_on_links.size()); | ||
157 | fprintf(stderr, " icons: %lu\n", index.icons.size()); | ||
158 | fprintf(stderr, " notices: %lu\n", index.notices.size()); | ||
159 | fprintf(stderr, " time_demand_groups: %lu\n", index.time_demand_groups.size()); | ||
160 | fprintf(stderr, " time_demand_group_run_times: %lu\n", index.time_demand_group_run_times.size()); | ||
161 | fprintf(stderr, " period_groups: %lu\n", index.period_groups.size()); | ||
162 | fprintf(stderr, " specific_days: %lu\n", index.specific_days.size()); | ||
163 | fprintf(stderr, " timetable_versions: %lu\n", index.timetable_versions.size()); | ||
164 | fprintf(stderr, " public_journeys: %lu\n", index.public_journeys.size()); | ||
165 | fprintf(stderr, " period_group_validities: %lu\n", index.period_group_validities.size()); | ||
166 | fprintf(stderr, " exceptional_operating_days: %lu\n", index.exceptional_operating_days.size()); | ||
167 | fprintf(stderr, " schedule_versions: %lu\n", index.schedule_versions.size()); | ||
168 | fprintf(stderr, " public_journey_passing_times: %lu\n", index.public_journey_passing_times.size()); | ||
169 | fprintf(stderr, " operating_days: %lu\n", index.operating_days.size()); | ||
170 | } | ||
171 | |||
172 | int main(int argc, char *argv[]) { | ||
173 | Options options = parseOptions(argc, argv); | ||
174 | |||
175 | Kv1Records records; | ||
176 | if (!parse(options.kv1_file_path, records)) { | ||
177 | fputs("Error parsing records, exiting\n", stderr); | ||
178 | return EXIT_FAILURE; | ||
179 | } | ||
180 | printParsedRecords(records); | ||
181 | fputs("Indexing...\n", stderr); | ||
182 | Kv1Index index(&records); | ||
183 | fprintf(stderr, "Indexed %lu records\n", index.size()); | ||
184 | // Only notice assignments are not indexed. If this equality is not valid, | ||
185 | // then this means that we had duplicate keys or that something else went | ||
186 | // wrong. That would really not be great. | ||
187 | assert(index.size() == records.size() - records.notice_assignments.size()); | ||
188 | printIndexSize(index); | ||
189 | fputs("Linking records...\n", stderr); | ||
190 | kv1LinkRecords(index); | ||
191 | fputs("Done linking\n", stderr); | ||
192 | |||
193 | if (options.subcommand == "joparoute"sv) jopaRoute(options, records, index); | ||
194 | if (options.subcommand == "journeyroute"sv) journeyRoute(options, records, index); | ||
195 | if (options.subcommand == "journeys"sv) journeys(options, records, index); | ||
196 | if (options.subcommand == "journeyinfo"sv) journeyInfo(options, records, index); | ||
197 | if (options.subcommand == "schedule"sv) schedule(options, records, index); | ||
198 | } | ||
diff --git a/src/querykv1/schedule.cpp b/src/querykv1/schedule.cpp new file mode 100644 index 0000000..2bcfe0a --- /dev/null +++ b/src/querykv1/schedule.cpp | |||
@@ -0,0 +1,63 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #include <iostream> | ||
4 | #include <string_view> | ||
5 | #include <unordered_map> | ||
6 | #include <vector> | ||
7 | |||
8 | #include "daterange.hpp" | ||
9 | #include "schedule.hpp" | ||
10 | |||
11 | using namespace std::string_view_literals; | ||
12 | |||
13 | void schedule(const Options &options, Kv1Records &records, Kv1Index &index) { | ||
14 | FILE *out = stdout; | ||
15 | if (options.output_file_path != "-"sv) | ||
16 | out = fopen(options.output_file_path, "wb"); | ||
17 | if (!out) { | ||
18 | fprintf(stderr, "Open %s: %s\n", options.output_file_path, strerrordesc_np(errno)); | ||
19 | exit(EXIT_FAILURE); | ||
20 | } | ||
21 | |||
22 | std::cerr << "Generating schedule for " << options.line_planning_number << std::endl; | ||
23 | |||
24 | std::unordered_multimap<std::string, Kv1PeriodGroupValidity> period_group_validities; | ||
25 | for (const auto &pegr : records.period_group_validities) | ||
26 | period_group_validities.insert({ pegr.key.period_group_code, pegr }); | ||
27 | std::unordered_multimap<std::string, Kv1PublicJourney> public_journeys; | ||
28 | for (const auto &pujo : records.public_journeys) | ||
29 | public_journeys.insert({ pujo.key.timetable_version_code, pujo }); | ||
30 | |||
31 | std::cout << "line_planning_number,journey_number,date,departure_time" << std::endl; | ||
32 | for (const auto &tive : records.timetable_versions) { | ||
33 | std::vector<DateRange> tive_pegrval_ranges; | ||
34 | |||
35 | auto pegrval_range = period_group_validities.equal_range(tive.key.period_group_code); | ||
36 | for (auto it = pegrval_range.first; it != pegrval_range.second; it++) { | ||
37 | const auto &[_, pegrval] = *it; | ||
38 | tive_pegrval_ranges.emplace_back(pegrval.key.valid_from, pegrval.valid_thru); | ||
39 | } | ||
40 | |||
41 | DateRangeSeq seq(tive_pegrval_ranges.begin(), tive_pegrval_ranges.end()); | ||
42 | seq = seq.clampFrom(tive.valid_from); | ||
43 | if (tive.valid_thru) | ||
44 | seq = seq.clampThru(*tive.valid_thru); | ||
45 | |||
46 | for (const auto &range : seq) for (auto date : range) { | ||
47 | auto weekday = std::chrono::year_month_weekday(std::chrono::sys_days(date)).weekday(); | ||
48 | |||
49 | auto pujo_range = public_journeys.equal_range(tive.key.timetable_version_code); | ||
50 | for (auto itt = pujo_range.first; itt != pujo_range.second; itt++) { | ||
51 | const auto &[_, pujo] = *itt; | ||
52 | |||
53 | if (pujo.key.line_planning_number == options.line_planning_number && pujo.key.day_type.size() == 7 | ||
54 | && pujo.key.day_type[weekday.iso_encoding() - 1] == static_cast<char>('0' + weekday.iso_encoding())) { | ||
55 | std::cout << pujo.key.line_planning_number << "," << pujo.key.journey_number << "," | ||
56 | << date << "," << pujo.departure_time << std::endl; | ||
57 | } | ||
58 | } | ||
59 | } | ||
60 | } | ||
61 | |||
62 | if (options.output_file_path != "-"sv) fclose(out); | ||
63 | } | ||
diff --git a/src/querykv1/schedule.hpp b/src/querykv1/schedule.hpp new file mode 100644 index 0000000..100bd4c --- /dev/null +++ b/src/querykv1/schedule.hpp | |||
@@ -0,0 +1,13 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #ifndef OEUF_QUERYKV1_SCHEDULE_HPP | ||
4 | #define OEUF_QUERYKV1_SCHEDULE_HPP | ||
5 | |||
6 | #include <tmi8/kv1_types.hpp> | ||
7 | #include <tmi8/kv1_index.hpp> | ||
8 | |||
9 | #include "cliopts.hpp" | ||
10 | |||
11 | void schedule(const Options &options, Kv1Records &records, Kv1Index &index); | ||
12 | |||
13 | #endif // OEUF_QUERYKV1_SCHEDULE_HPP | ||
diff --git a/src/recvkv6/.envrc b/src/recvkv6/.envrc new file mode 100644 index 0000000..694e74f --- /dev/null +++ b/src/recvkv6/.envrc | |||
@@ -0,0 +1,2 @@ | |||
1 | source_env ../../ | ||
2 | export DEVMODE=1 | ||
diff --git a/src/recvkv6/Makefile b/src/recvkv6/Makefile new file mode 100644 index 0000000..12ff7fb --- /dev/null +++ b/src/recvkv6/Makefile | |||
@@ -0,0 +1,21 @@ | |||
1 | # Taken from: | ||
2 | # Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide | ||
3 | # for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01, | ||
4 | # 2023. [Online]. Available: | ||
5 | # https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html | ||
6 | CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\ | ||
7 | -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \ | ||
8 | -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \ | ||
9 | -D_GLIBCXX_ASSERTIONS \ | ||
10 | -fstrict-flex-arrays=3 \ | ||
11 | -fstack-clash-protection -fstack-protector-strong | ||
12 | LDFLAGS=-lzmq -larrow -lparquet -lprometheus-cpp-pull -lprometheus-cpp-core -lz -ltmi8 -Wl,-z,defs \ | ||
13 | -Wl,-z,nodlopen -Wl,-z,noexecstack \ | ||
14 | -Wl,-z,relro -Wl,-z,now | ||
15 | |||
16 | recvkv6: main.cpp | ||
17 | $(CXX) -o $@ $^ $(CXXFLAGS) $(LDFLAGS) | ||
18 | |||
19 | .PHONY: clean | ||
20 | clean: | ||
21 | rm recvkv6 | ||
diff --git a/src/recvkv6/main.cpp b/src/recvkv6/main.cpp new file mode 100644 index 0000000..2ac3669 --- /dev/null +++ b/src/recvkv6/main.cpp | |||
@@ -0,0 +1,1300 @@ | |||
1 | // vim:set sw=2 ts=2 sts et: | ||
2 | |||
3 | #include <array> | ||
4 | #include <cassert> | ||
5 | #include <chrono> | ||
6 | #include <csignal> | ||
7 | #include <cstring> | ||
8 | #include <filesystem> | ||
9 | #include <format> | ||
10 | #include <fstream> | ||
11 | #include <iostream> | ||
12 | #include <optional> | ||
13 | #include <stack> | ||
14 | #include <string> | ||
15 | #include <sstream> | ||
16 | #include <vector> | ||
17 | |||
18 | #include <zlib.h> | ||
19 | #include <zmq.h> | ||
20 | |||
21 | #include <nlohmann/json.hpp> | ||
22 | |||
23 | #include <prometheus/counter.h> | ||
24 | #include <prometheus/exposer.h> | ||
25 | #include <prometheus/histogram.h> | ||
26 | #include <prometheus/registry.h> | ||
27 | |||
28 | #include <rapidxml/rapidxml.hpp> | ||
29 | |||
30 | #include <tmi8/kv6_parquet.hpp> | ||
31 | |||
32 | #define CHUNK 16384 | ||
33 | |||
34 | struct RawMessage { | ||
35 | public: | ||
36 | // Takes ownership of envelope and body | ||
37 | RawMessage(zmq_msg_t envelope, zmq_msg_t body) | ||
38 | : envelope(envelope), body(body) | ||
39 | {} | ||
40 | |||
41 | // Prevent copying | ||
42 | RawMessage(const RawMessage &) = delete; | ||
43 | RawMessage &operator=(RawMessage const &) = delete; | ||
44 | |||
45 | std::string_view getEnvelope() { | ||
46 | return static_cast<const char *>(zmq_msg_data(&envelope)); | ||
47 | } | ||
48 | |||
49 | char *getBody() { | ||
50 | return static_cast<char *>(zmq_msg_data(&body)); | ||
51 | } | ||
52 | |||
53 | size_t getBodySize() { | ||
54 | return zmq_msg_size(&body); | ||
55 | } | ||
56 | |||
57 | ~RawMessage() { | ||
58 | zmq_msg_close(&envelope); | ||
59 | zmq_msg_close(&body); | ||
60 | } | ||
61 | |||
62 | private: | ||
63 | zmq_msg_t envelope; | ||
64 | zmq_msg_t body; | ||
65 | }; | ||
66 | |||
67 | std::optional<RawMessage> recvMsg(void *socket) { | ||
68 | while (true) { | ||
69 | zmq_msg_t envelope, body; | ||
70 | int rc = zmq_msg_init(&envelope); | ||
71 | assert(rc == 0); | ||
72 | rc = zmq_msg_init(&body); | ||
73 | assert(rc == 0); | ||
74 | |||
75 | rc = zmq_msg_recv(&envelope, socket, 0); | ||
76 | if (rc == -1) return std::nullopt; | ||
77 | |||
78 | int more; | ||
79 | size_t more_size = sizeof(more); | ||
80 | rc = zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size); | ||
81 | if (!more) { | ||
82 | zmq_msg_close(&envelope); | ||
83 | zmq_msg_close(&body); | ||
84 | continue; | ||
85 | } | ||
86 | |||
87 | rc = zmq_msg_recv(&body, socket, 0); | ||
88 | if (rc == -1) return std::nullopt; | ||
89 | |||
90 | rc = zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size); | ||
91 | assert(!more); | ||
92 | |||
93 | return std::make_optional<RawMessage>(envelope, body); | ||
94 | } | ||
95 | } | ||
96 | |||
97 | // Ensures that <return value>[output_size] == 0 | ||
98 | char *decompress(char *raw, unsigned int input_size, unsigned int &output_size) { | ||
99 | assert(input_size <= UINT32_MAX); | ||
100 | |||
101 | z_stream strm; | ||
102 | strm.next_in = reinterpret_cast<unsigned char *>(raw); | ||
103 | strm.avail_in = input_size; | ||
104 | strm.zalloc = Z_NULL; | ||
105 | strm.zfree = Z_NULL; | ||
106 | strm.opaque = Z_NULL; | ||
107 | int rc = inflateInit2(&strm, 32); | ||
108 | assert(rc == Z_OK); | ||
109 | |||
110 | unsigned int buf_cap = CHUNK; | ||
111 | unsigned int buf_len = 0; | ||
112 | char *buf = static_cast<char *>(malloc(CHUNK)); | ||
113 | do { | ||
114 | if (buf_len + CHUNK > buf_cap) { | ||
115 | assert(buf_cap <= UINT32_MAX); | ||
116 | buf_cap *= 2; | ||
117 | buf = static_cast<char *>(realloc(buf, buf_cap)); | ||
118 | } | ||
119 | strm.avail_out = buf_cap - buf_len; | ||
120 | strm.next_out = reinterpret_cast<unsigned char *>(buf + buf_len); | ||
121 | |||
122 | unsigned long old_total = strm.total_out; | ||
123 | rc = inflate(&strm, Z_FINISH); | ||
124 | unsigned progress = static_cast<unsigned int>(strm.total_out - old_total); | ||
125 | buf_len += progress; | ||
126 | assert(progress != 0 || rc == Z_STREAM_END); | ||
127 | } while (strm.total_in < input_size); | ||
128 | |||
129 | if (buf_len == buf_cap) { | ||
130 | buf = static_cast<char *>(realloc(buf, buf_len + 1)); | ||
131 | } | ||
132 | buf[buf_len] = 0; | ||
133 | output_size = buf_len; | ||
134 | |||
135 | rc = inflateEnd(&strm); | ||
136 | assert(rc == Z_OK); | ||
137 | |||
138 | return buf; | ||
139 | } | ||
140 | |||
141 | struct Date { | ||
142 | int16_t year = 0; | ||
143 | uint8_t month = 0; | ||
144 | uint8_t day = 0; | ||
145 | |||
146 | static bool parse(Date &dest, std::string_view src) { | ||
147 | dest.year = 0, dest.month = 0, dest.day = 0; | ||
148 | |||
149 | int16_t y_mul_fac = 1; | ||
150 | bool extended = false; | ||
151 | |||
152 | size_t plus = src.find('+'); | ||
153 | if (plus != std::string_view::npos) { | ||
154 | extended = true; | ||
155 | src = src.substr(1); // remove plus sign from the start | ||
156 | } | ||
157 | if (!extended) { | ||
158 | size_t min_or_dash = src.find('-'); | ||
159 | if (min_or_dash == std::string_view::npos) return false; | ||
160 | if (min_or_dash == 0) { | ||
161 | y_mul_fac = -1; // it's a minus sign | ||
162 | src = src.substr(1); // remove minus sign at the start | ||
163 | } | ||
164 | } | ||
165 | |||
166 | int y_chars = 0; | ||
167 | while (src.size() > 0 && src[0] >= '0' && src[0] <= '9') { | ||
168 | dest.year = static_cast<int16_t>(dest.year * 10 + src[0] - '0'); | ||
169 | src = src.substr(1); | ||
170 | y_chars++; | ||
171 | } | ||
172 | if (src.size() == 0) { dest.year = 0; return false; } | ||
173 | if (src[0] != '-') { dest.year = 0; return false; } | ||
174 | src = src.substr(1); // remove dash | ||
175 | if (y_chars < 4 || (y_chars > 4 && !extended)) { dest.year = 0; return false; } | ||
176 | dest.year *= y_mul_fac; | ||
177 | |||
178 | bool rest_correct = src.size() == 5 | ||
179 | && src[0] >= '0' && src[0] <= '9' | ||
180 | && src[1] >= '0' && src[1] <= '9' | ||
181 | && src[3] >= '0' && src[3] <= '9' | ||
182 | && src[4] >= '0' && src[4] <= '9'; | ||
183 | if (!rest_correct) { dest.year = 0; return false; } | ||
184 | dest.month = static_cast<uint8_t>((src[0] - '0') * 10 + src[1] - '0'); | ||
185 | dest.day = static_cast<uint8_t>((src[3] - '0') * 10 + src[4] - '0'); | ||
186 | if (dest.month > 12 || dest.day > 31) { | ||
187 | dest.year = 0, dest.month = 0, dest.day = 0; | ||
188 | return false; | ||
189 | } | ||
190 | return true; | ||
191 | } | ||
192 | |||
193 | std::string toString() const { | ||
194 | if (year < 0 || year > 9999 || month < 0 || month > 12 || day < 0 || day > 31) | ||
195 | throw std::invalid_argument("one or more date components (year, month, day) out of range"); | ||
196 | char data[11] = "XXXX-XX-XX"; | ||
197 | sprintf(data, "%04u-%02u-%02u", year, month, day); | ||
198 | return data; | ||
199 | } | ||
200 | |||
201 | std::chrono::days toUnixDays() const { | ||
202 | std::chrono::year_month_day ymd{std::chrono::year(year), std::chrono::month(month), std::chrono::day(day)}; | ||
203 | // This is valid since C++20: as of C++20, the system clock is defined to measure the | ||
204 | // Unix Time, the amount of seconds since Thursday 1 January 1970, without leap seconds. | ||
205 | std::chrono::days since_epoch = std::chrono::sys_days(ymd).time_since_epoch(); | ||
206 | return since_epoch; | ||
207 | } | ||
208 | }; | ||
209 | |||
210 | struct Time { | ||
211 | uint8_t hour = 0; | ||
212 | uint8_t minute = 0; | ||
213 | uint8_t second = 0; | ||
214 | |||
215 | static bool parse(Time &dest, std::string_view src) { | ||
216 | bool okay = src.size() == 8 | ||
217 | && src[0] >= '0' && src[0] <= '9' | ||
218 | && src[1] >= '0' && src[1] <= '9' | ||
219 | && src[2] == ':' | ||
220 | && src[3] >= '0' && src[3] <= '9' | ||
221 | && src[4] >= '0' && src[4] <= '9' | ||
222 | && src[5] == ':' | ||
223 | && src[6] >= '0' && src[6] <= '9' | ||
224 | && src[7] >= '0' && src[7] <= '9'; | ||
225 | if (!okay) return false; | ||
226 | dest.hour = static_cast<uint8_t>((src[0] - '0') * 10 + src[1] - '0'); | ||
227 | dest.minute = static_cast<uint8_t>((src[3] - '0') * 10 + src[4] - '0'); | ||
228 | dest.second = static_cast<uint8_t>((src[6] - '0') * 10 + src[7] - '0'); | ||
229 | if (dest.hour > 23 || dest.minute > 59 || dest.second > 59) { | ||
230 | dest.hour = 0, dest.minute = 0, dest.second = 0; | ||
231 | return false; | ||
232 | } | ||
233 | return true; | ||
234 | } | ||
235 | |||
236 | std::string toString() const { | ||
237 | if (hour < 0 || hour > 23 || minute < 0 || minute > 59 || second < 0 || second > 59) | ||
238 | throw std::invalid_argument("one or more time components (hour, minute, second) out of range"); | ||
239 | char data[9] = "XX:XX:XX"; | ||
240 | sprintf(data, "%02u:%02u:%02u", hour, minute, second); | ||
241 | return data; | ||
242 | } | ||
243 | }; | ||
244 | |||
245 | // Time zone designator | ||
246 | struct Tzd { | ||
247 | int16_t minutes = 0; | ||
248 | |||
249 | static bool parse(Tzd &dest, std::string_view src) { | ||
250 | dest.minutes = 0; | ||
251 | |||
252 | if (src.size() == 0) return false; | ||
253 | if (src == "Z") return true; | ||
254 | |||
255 | int16_t multiplier = 1; | ||
256 | if (src[0] == '-') multiplier = -1; | ||
257 | else if (src[0] != '+') return false; | ||
258 | src = src.substr(1); | ||
259 | |||
260 | bool okay = src.size() == 5 | ||
261 | && src[0] >= '0' && src[0] <= '9' | ||
262 | && src[1] >= '0' && src[1] <= '9' | ||
263 | && src[2] == ':' | ||
264 | && src[3] >= '0' && src[3] <= '9' | ||
265 | && src[4] >= '0' && src[4] <= '9'; | ||
266 | if (!okay) return false; | ||
267 | int16_t hours = static_cast<int16_t>((src[0] - '0') * 10 + src[1] - '0'); | ||
268 | int16_t minutes = static_cast<int16_t>((src[3] - '0') * 10 + src[4] - '0'); | ||
269 | if (hours > 23 || minutes > 59) return false; | ||
270 | dest.minutes = static_cast<int16_t>(multiplier * (60 * hours + minutes)); | ||
271 | return true; | ||
272 | } | ||
273 | |||
274 | std::string toString() const { | ||
275 | if (minutes == 0) | ||
276 | return "Z"; | ||
277 | |||
278 | bool negative = minutes < 0; | ||
279 | int hours_off = abs(minutes / 60); | ||
280 | int mins_off = abs(minutes) - hours_off*60; | ||
281 | if (hours_off > 23 || mins_off > 59) | ||
282 | throw std::invalid_argument("offset out of range"); | ||
283 | char data[7] = "+XX:XX"; | ||
284 | sprintf(data, "%c%02u:%02u", negative ? '-' : '+', hours_off, mins_off); | ||
285 | return data; | ||
286 | } | ||
287 | }; | ||
288 | |||
289 | struct Timestamp { | ||
290 | Date date; | ||
291 | Tzd off; | ||
292 | Time time; | ||
293 | |||
294 | static bool parse(Timestamp &dest, std::string_view src) { | ||
295 | size_t t = src.find('T'); | ||
296 | if (t == std::string_view::npos || t + 1 >= src.size()) return false; | ||
297 | |||
298 | std::string_view date = src.substr(0, t); | ||
299 | std::string_view time_and_tzd = src.substr(t + 1); | ||
300 | if (time_and_tzd.size() < 9) return false; | ||
301 | if (!Date::parse(dest.date, date)) return false; | ||
302 | |||
303 | std::string_view time = time_and_tzd.substr(0, 8); | ||
304 | std::string_view tzd = time_and_tzd.substr(8); | ||
305 | if (!Time::parse(dest.time, time)) return false; | ||
306 | return Tzd::parse(dest.off, tzd); | ||
307 | } | ||
308 | |||
309 | std::string toString() const { | ||
310 | return date.toString() + "T" + time.toString() + off.toString(); | ||
311 | } | ||
312 | |||
313 | std::chrono::seconds toUnixSeconds() const { | ||
314 | std::chrono::year_month_day ymd(std::chrono::year(date.year), | ||
315 | std::chrono::month(date.month), | ||
316 | std::chrono::day(date.day)); | ||
317 | std::chrono::sys_days sys_days(ymd); | ||
318 | std::chrono::time_point<std::chrono::utc_clock, std::chrono::days> utc_days(sys_days.time_since_epoch()); | ||
319 | std::chrono::utc_seconds utc_seconds = std::chrono::time_point_cast<std::chrono::seconds>(utc_days); | ||
320 | utc_seconds += std::chrono::hours(time.hour) + std::chrono::minutes(time.minute) + | ||
321 | std::chrono::seconds(time.second) - std::chrono::minutes(off.minutes); | ||
322 | std::chrono::sys_seconds sys_seconds = std::chrono::utc_clock::to_sys(utc_seconds); | ||
323 | std::chrono::seconds unix = sys_seconds.time_since_epoch(); | ||
324 | return unix; | ||
325 | } | ||
326 | }; | ||
327 | |||
328 | static const std::string_view TMI8_XML_NS = "http://bison.connekt.nl/tmi8/kv6/msg"; | ||
329 | |||
330 | enum Kv6RecordType { | ||
331 | KV6T_UNKNOWN = 0, | ||
332 | KV6T_DELAY = 1, | ||
333 | KV6T_INIT = 2, | ||
334 | KV6T_ARRIVAL = 3, | ||
335 | KV6T_ON_STOP = 4, | ||
336 | KV6T_DEPARTURE = 5, | ||
337 | KV6T_ON_ROUTE = 6, | ||
338 | KV6T_ON_PATH = 7, | ||
339 | KV6T_OFF_ROUTE = 8, | ||
340 | KV6T_END = 9, | ||
341 | // Always keep this updated to correspond to the | ||
342 | // first and last elements of the enumeration! | ||
343 | _KV6T_FIRST_TYPE = KV6T_UNKNOWN, | ||
344 | _KV6T_LAST_TYPE = KV6T_END, | ||
345 | }; | ||
346 | |||
347 | enum Kv6Field { | ||
348 | KV6F_NONE = 0, | ||
349 | KV6F_DATA_OWNER_CODE = 1, | ||
350 | KV6F_LINE_PLANNING_NUMBER = 2, | ||
351 | KV6F_OPERATING_DAY = 4, | ||
352 | KV6F_JOURNEY_NUMBER = 8, | ||
353 | KV6F_REINFORCEMENT_NUMBER = 16, | ||
354 | KV6F_TIMESTAMP = 32, | ||
355 | KV6F_SOURCE = 64, | ||
356 | KV6F_PUNCTUALITY = 128, | ||
357 | KV6F_USER_STOP_CODE = 256, | ||
358 | KV6F_PASSAGE_SEQUENCE_NUMBER = 512, | ||
359 | KV6F_VEHICLE_NUMBER = 1024, | ||
360 | KV6F_BLOCK_CODE = 2048, | ||
361 | KV6F_WHEELCHAIR_ACCESSIBLE = 4096, | ||
362 | KV6F_NUMBER_OF_COACHES = 8192, | ||
363 | KV6F_RD_Y = 16384, | ||
364 | KV6F_RD_X = 32768, | ||
365 | KV6F_DISTANCE_SINCE_LAST_USER_STOP = 65536, | ||
366 | }; | ||
367 | |||
368 | static constexpr Kv6Field KV6T_REQUIRED_FIELDS[_KV6T_LAST_TYPE + 1] = { | ||
369 | // KV6T_UNKNOWN | ||
370 | KV6F_NONE, | ||
371 | // KV6T_DELAY | ||
372 | static_cast<Kv6Field>( | ||
373 | KV6F_DATA_OWNER_CODE | ||
374 | | KV6F_LINE_PLANNING_NUMBER | ||
375 | | KV6F_OPERATING_DAY | ||
376 | | KV6F_JOURNEY_NUMBER | ||
377 | | KV6F_REINFORCEMENT_NUMBER | ||
378 | | KV6F_TIMESTAMP | ||
379 | | KV6F_SOURCE | ||
380 | | KV6F_PUNCTUALITY), | ||
381 | // KV6T_INIT | ||
382 | static_cast<Kv6Field>( | ||
383 | KV6F_DATA_OWNER_CODE | ||
384 | | KV6F_LINE_PLANNING_NUMBER | ||
385 | | KV6F_OPERATING_DAY | ||
386 | | KV6F_JOURNEY_NUMBER | ||
387 | | KV6F_REINFORCEMENT_NUMBER | ||
388 | | KV6F_TIMESTAMP | ||
389 | | KV6F_SOURCE | ||
390 | | KV6F_USER_STOP_CODE | ||
391 | | KV6F_PASSAGE_SEQUENCE_NUMBER | ||
392 | | KV6F_VEHICLE_NUMBER | ||
393 | | KV6F_BLOCK_CODE | ||
394 | | KV6F_WHEELCHAIR_ACCESSIBLE | ||
395 | | KV6F_NUMBER_OF_COACHES), | ||
396 | // KV6T_ARRIVAL | ||
397 | static_cast<Kv6Field>( | ||
398 | KV6F_DATA_OWNER_CODE | ||
399 | | KV6F_LINE_PLANNING_NUMBER | ||
400 | | KV6F_OPERATING_DAY | ||
401 | | KV6F_JOURNEY_NUMBER | ||
402 | | KV6F_REINFORCEMENT_NUMBER | ||
403 | | KV6F_USER_STOP_CODE | ||
404 | | KV6F_PASSAGE_SEQUENCE_NUMBER | ||
405 | | KV6F_TIMESTAMP | ||
406 | | KV6F_SOURCE | ||
407 | | KV6F_VEHICLE_NUMBER | ||
408 | | KV6F_PUNCTUALITY), | ||
409 | // KV6T_ON_STOP | ||
410 | static_cast<Kv6Field>( | ||
411 | KV6F_DATA_OWNER_CODE | ||
412 | | KV6F_LINE_PLANNING_NUMBER | ||
413 | | KV6F_OPERATING_DAY | ||
414 | | KV6F_JOURNEY_NUMBER | ||
415 | | KV6F_REINFORCEMENT_NUMBER | ||
416 | | KV6F_USER_STOP_CODE | ||
417 | | KV6F_PASSAGE_SEQUENCE_NUMBER | ||
418 | | KV6F_TIMESTAMP | ||
419 | | KV6F_SOURCE | ||
420 | | KV6F_VEHICLE_NUMBER | ||
421 | | KV6F_PUNCTUALITY), | ||
422 | // KV6T_DEPARTURE | ||
423 | static_cast<Kv6Field>( | ||
424 | KV6F_DATA_OWNER_CODE | ||
425 | | KV6F_LINE_PLANNING_NUMBER | ||
426 | | KV6F_OPERATING_DAY | ||
427 | | KV6F_JOURNEY_NUMBER | ||
428 | | KV6F_REINFORCEMENT_NUMBER | ||
429 | | KV6F_USER_STOP_CODE | ||
430 | | KV6F_PASSAGE_SEQUENCE_NUMBER | ||
431 | | KV6F_TIMESTAMP | ||
432 | | KV6F_SOURCE | ||
433 | | KV6F_VEHICLE_NUMBER | ||
434 | | KV6F_PUNCTUALITY), | ||
435 | // KV6T_ON_ROUTE | ||
436 | static_cast<Kv6Field>( | ||
437 | KV6F_DATA_OWNER_CODE | ||
438 | | KV6F_LINE_PLANNING_NUMBER | ||
439 | | KV6F_OPERATING_DAY | ||
440 | | KV6F_JOURNEY_NUMBER | ||
441 | | KV6F_REINFORCEMENT_NUMBER | ||
442 | | KV6F_USER_STOP_CODE | ||
443 | | KV6F_PASSAGE_SEQUENCE_NUMBER | ||
444 | | KV6F_TIMESTAMP | ||
445 | | KV6F_SOURCE | ||
446 | | KV6F_VEHICLE_NUMBER | ||
447 | | KV6F_PUNCTUALITY | ||
448 | | KV6F_RD_X | ||
449 | | KV6F_RD_Y), | ||
450 | // KV6T_ON_PATH | ||
451 | KV6F_NONE, | ||
452 | // KV6T_OFF_ROUTE | ||
453 | static_cast<Kv6Field>( | ||
454 | KV6F_DATA_OWNER_CODE | ||
455 | | KV6F_LINE_PLANNING_NUMBER | ||
456 | | KV6F_OPERATING_DAY | ||
457 | | KV6F_JOURNEY_NUMBER | ||
458 | | KV6F_REINFORCEMENT_NUMBER | ||
459 | | KV6F_TIMESTAMP | ||
460 | | KV6F_SOURCE | ||
461 | | KV6F_USER_STOP_CODE | ||
462 | | KV6F_PASSAGE_SEQUENCE_NUMBER | ||
463 | | KV6F_VEHICLE_NUMBER | ||
464 | | KV6F_RD_X | ||
465 | | KV6F_RD_Y), | ||
466 | // KV6T_END | ||
467 | static_cast<Kv6Field>( | ||
468 | KV6F_DATA_OWNER_CODE | ||
469 | | KV6F_LINE_PLANNING_NUMBER | ||
470 | | KV6F_OPERATING_DAY | ||
471 | | KV6F_JOURNEY_NUMBER | ||
472 | | KV6F_REINFORCEMENT_NUMBER | ||
473 | | KV6F_TIMESTAMP | ||
474 | | KV6F_SOURCE | ||
475 | | KV6F_USER_STOP_CODE | ||
476 | | KV6F_PASSAGE_SEQUENCE_NUMBER | ||
477 | | KV6F_VEHICLE_NUMBER), | ||
478 | }; | ||
479 | |||
480 | static constexpr Kv6Field KV6T_OPTIONAL_FIELDS[_KV6T_LAST_TYPE + 1] = { | ||
481 | // KV6T_UNKNOWN | ||
482 | KV6F_NONE, | ||
483 | // KV6T_DELAY | ||
484 | KV6F_NONE, | ||
485 | // KV6T_INIT | ||
486 | KV6F_NONE, | ||
487 | // KV6T_ARRIVAL | ||
488 | static_cast<Kv6Field>(KV6F_RD_X | KV6F_RD_Y), | ||
489 | // KV6T_ON_STOP | ||
490 | static_cast<Kv6Field>(KV6F_RD_X | KV6F_RD_Y), | ||
491 | // KV6T_DEPARTURE | ||
492 | static_cast<Kv6Field>(KV6F_RD_X | KV6F_RD_Y), | ||
493 | // KV6T_ON_ROUTE | ||
494 | KV6F_DISTANCE_SINCE_LAST_USER_STOP, | ||
495 | // KV6T_ON_PATH | ||
496 | KV6F_NONE, | ||
497 | // KV6T_OFF_ROUTE | ||
498 | KV6F_NONE, | ||
499 | // KV6T_END | ||
500 | KV6F_NONE, | ||
501 | }; | ||
502 | |||
503 | struct Kv6Record { | ||
504 | Kv6RecordType type = KV6T_UNKNOWN; | ||
505 | Kv6Field presence = KV6F_NONE; | ||
506 | Kv6Field next = KV6F_NONE; | ||
507 | std::string data_owner_code; | ||
508 | std::string line_planning_number; | ||
509 | std::string source; | ||
510 | std::string user_stop_code; | ||
511 | std::string wheelchair_accessible; | ||
512 | Date operating_day; | ||
513 | Timestamp timestamp; | ||
514 | uint32_t block_code = 0; | ||
515 | uint32_t journey_number = 0; | ||
516 | uint32_t vehicle_number = 0; | ||
517 | int32_t rd_x = 0; | ||
518 | int32_t rd_y = 0; | ||
519 | // The TMI8 specification is unclear: this field | ||
520 | // might actually be called distancesincelaststop | ||
521 | uint32_t distance_since_last_user_stop = 0; | ||
522 | uint16_t passage_sequence_number = 0; | ||
523 | int16_t punctuality = 0; | ||
524 | uint8_t number_of_coaches = 0; | ||
525 | uint8_t reinforcement_number = 0; | ||
526 | |||
527 | void markPresent(Kv6Field field) { | ||
528 | presence = static_cast<Kv6Field>(presence | field); | ||
529 | } | ||
530 | |||
531 | void removeUnsupportedFields() { | ||
532 | Kv6Field required_fields = KV6T_REQUIRED_FIELDS[type]; | ||
533 | Kv6Field optional_fields = KV6T_OPTIONAL_FIELDS[type]; | ||
534 | Kv6Field supported_fields = static_cast<Kv6Field>(required_fields | optional_fields); | ||
535 | presence = static_cast<Kv6Field>(presence & supported_fields); | ||
536 | } | ||
537 | |||
538 | bool valid() { | ||
539 | Kv6Field required_fields = KV6T_REQUIRED_FIELDS[type]; | ||
540 | Kv6Field optional_fields = KV6T_OPTIONAL_FIELDS[type]; | ||
541 | Kv6Field supported_fields = static_cast<Kv6Field>(required_fields | optional_fields); | ||
542 | |||
543 | Kv6Field required_field_presence = static_cast<Kv6Field>(presence & required_fields); | ||
544 | Kv6Field unsupported_field_presence = static_cast<Kv6Field>(presence & ~supported_fields); | ||
545 | |||
546 | return required_field_presence == required_fields && !unsupported_field_presence; | ||
547 | } | ||
548 | }; | ||
549 | |||
550 | enum Tmi8VvTmPushInfoField { | ||
551 | TMI8F_NONE = 0, | ||
552 | TMI8F_SUBSCRIBER_ID = 1, | ||
553 | TMI8F_VERSION = 2, | ||
554 | TMI8F_DOSSIER_NAME = 4, | ||
555 | TMI8F_TIMESTAMP = 8, | ||
556 | }; | ||
557 | |||
558 | struct Tmi8VvTmPushInfo { | ||
559 | Tmi8VvTmPushInfoField next = TMI8F_NONE; | ||
560 | Tmi8VvTmPushInfoField presence = TMI8F_NONE; | ||
561 | std::string subscriber_id; | ||
562 | std::string version; | ||
563 | std::string dossier_name; | ||
564 | Timestamp timestamp; | ||
565 | std::vector<Kv6Record> messages; | ||
566 | |||
567 | void markPresent(Tmi8VvTmPushInfoField field) { | ||
568 | presence = static_cast<Tmi8VvTmPushInfoField>(presence | field); | ||
569 | } | ||
570 | |||
571 | bool valid() { | ||
572 | const Tmi8VvTmPushInfoField REQUIRED_FIELDS = | ||
573 | static_cast<Tmi8VvTmPushInfoField>( | ||
574 | TMI8F_SUBSCRIBER_ID | ||
575 | | TMI8F_VERSION | ||
576 | | TMI8F_DOSSIER_NAME | ||
577 | | TMI8F_TIMESTAMP); | ||
578 | return (presence & REQUIRED_FIELDS) == REQUIRED_FIELDS; | ||
579 | } | ||
580 | }; | ||
581 | |||
582 | static const std::array<std::string_view, _KV6T_LAST_TYPE + 1> KV6_POS_INFO_RECORD_TYPES = { | ||
583 | "UNKNOWN", "DELAY", "INIT", "ARRIVAL", "ONSTOP", "DEPARTURE", "ONROUTE", "ONPATH", "OFFROUTE", "END", | ||
584 | }; | ||
585 | |||
586 | std::optional<std::string_view> findKv6PosInfoRecordTypeName(Kv6RecordType type) { | ||
587 | if (type > _KV6T_LAST_TYPE) | ||
588 | return std::nullopt; | ||
589 | return KV6_POS_INFO_RECORD_TYPES[type]; | ||
590 | } | ||
591 | |||
592 | const std::array<std::tuple<std::string_view, Kv6Field>, 17> KV6_POS_INFO_RECORD_FIELDS = {{ | ||
593 | { "dataownercode", KV6F_DATA_OWNER_CODE }, | ||
594 | { "lineplanningnumber", KV6F_LINE_PLANNING_NUMBER }, | ||
595 | { "operatingday", KV6F_OPERATING_DAY }, | ||
596 | { "journeynumber", KV6F_JOURNEY_NUMBER }, | ||
597 | { "reinforcementnumber", KV6F_REINFORCEMENT_NUMBER }, | ||
598 | { "timestamp", KV6F_TIMESTAMP }, | ||
599 | { "source", KV6F_SOURCE }, | ||
600 | { "punctuality", KV6F_PUNCTUALITY }, | ||
601 | { "userstopcode", KV6F_USER_STOP_CODE }, | ||
602 | { "passagesequencenumber", KV6F_PASSAGE_SEQUENCE_NUMBER }, | ||
603 | { "vehiclenumber", KV6F_VEHICLE_NUMBER }, | ||
604 | { "blockcode", KV6F_BLOCK_CODE }, | ||
605 | { "wheelchairaccessible", KV6F_WHEELCHAIR_ACCESSIBLE }, | ||
606 | { "numberofcoaches", KV6F_NUMBER_OF_COACHES }, | ||
607 | { "rd-y", KV6F_RD_Y }, | ||
608 | { "rd-x", KV6F_RD_X }, | ||
609 | { "distancesincelastuserstop", KV6F_DISTANCE_SINCE_LAST_USER_STOP }, | ||
610 | }}; | ||
611 | |||
612 | // Returns the maximum amount of digits such that it is guaranteed that | ||
613 | // a corresponding amount of repeated 9's can be represented by the type. | ||
614 | template<std::integral T> | ||
615 | constexpr size_t maxDigits() { | ||
616 | size_t digits = 0; | ||
617 | for (T x = std::numeric_limits<T>::max(); x != 0; x /= 10) digits++; | ||
618 | return digits - 1; | ||
619 | } | ||
620 | |||
621 | template<size_t MaxDigits, std::unsigned_integral T> | ||
622 | constexpr bool parseUnsigned(T &out, std::string_view src) { | ||
623 | static_assert(MaxDigits <= maxDigits<T>()); | ||
624 | if (src.size() > MaxDigits) return false; | ||
625 | T res = 0; | ||
626 | while (src.size() > 0) { | ||
627 | if (src[0] < '0' || src[0] > '9') return false; | ||
628 | res = static_cast<T>(res * 10 + src[0] - '0'); | ||
629 | src = src.substr(1); | ||
630 | } | ||
631 | out = res; | ||
632 | return true; | ||
633 | } | ||
634 | |||
635 | template<size_t MaxDigits, std::signed_integral T> | ||
636 | constexpr bool parseSigned(T &out, std::string_view src) { | ||
637 | static_assert(MaxDigits <= maxDigits<T>()); | ||
638 | if (src.size() == 0) return false; | ||
639 | bool negative = src[0] == '-'; | ||
640 | if (negative) src = src.substr(1); | ||
641 | if (src.size() > MaxDigits) return false; | ||
642 | T res = 0; | ||
643 | while (src.size() > 0) { | ||
644 | if (src[0] < '0' || src[0] > '9') return false; | ||
645 | res = static_cast<T>(res * 10 + src[0] - '0'); | ||
646 | src = src.substr(1); | ||
647 | } | ||
648 | out = negative ? -res : res; | ||
649 | return true; | ||
650 | } | ||
651 | |||
652 | struct Xmlns { | ||
653 | const Xmlns *next; | ||
654 | std::string_view prefix; | ||
655 | std::string_view url; | ||
656 | }; | ||
657 | |||
658 | std::optional<std::string_view> resolve(std::string_view prefix, const Xmlns *nss) { | ||
659 | while (nss) | ||
660 | if (nss->prefix == prefix) | ||
661 | return nss->url; | ||
662 | else | ||
663 | nss = nss->next; | ||
664 | return std::nullopt; | ||
665 | } | ||
666 | |||
667 | template<typename T> | ||
668 | void withXmlnss(const rapidxml::xml_attribute<> *attr, const Xmlns *nss, const T &fn) { | ||
669 | while (attr) { | ||
670 | std::string_view name(attr->name(), attr->name_size()); | ||
671 | if (name.starts_with("xmlns")) { | ||
672 | if (name.size() == 5) { // just xmlns | ||
673 | Xmlns ns0 = { | ||
674 | .next = nss, | ||
675 | .url = std::string_view(attr->value(), attr->value_size()), | ||
676 | }; | ||
677 | withXmlnss(attr->next_attribute(), &ns0, fn); | ||
678 | return; | ||
679 | } else if (name.size() > 6 && name[5] == ':') { // xmlns:<something> | ||
680 | Xmlns ns0 = { | ||
681 | .next = nss, | ||
682 | .prefix = name.substr(6), | ||
683 | .url = std::string_view(attr->value(), attr->value_size()), | ||
684 | }; | ||
685 | withXmlnss(attr->next_attribute(), &ns0, fn); | ||
686 | return; | ||
687 | } | ||
688 | } | ||
689 | attr = attr->next_attribute(); | ||
690 | } | ||
691 | fn(nss); | ||
692 | } | ||
693 | |||
694 | template<typename T> | ||
695 | void ifResolvable(const rapidxml::xml_node<> &node, const Xmlns *nss, const T &fn) { | ||
696 | std::string_view name(node.name(), node.name_size()); | ||
697 | std::string_view ns; | ||
698 | size_t colon = name.find(':'); | ||
699 | |||
700 | if (colon != std::string_view::npos) { | ||
701 | if (colon >= name.size() - 1) // last character | ||
702 | return; | ||
703 | ns = name.substr(0, colon); | ||
704 | name = name.substr(colon + 1); | ||
705 | } | ||
706 | |||
707 | withXmlnss(node.first_attribute(), nss, [&](const Xmlns *nss) { | ||
708 | std::optional<std::string_view> ns_url = resolve(ns, nss); | ||
709 | if (!ns_url && !ns.empty()) return; | ||
710 | if (!ns_url) fn(std::string_view(), name, nss); | ||
711 | else fn(*ns_url, name, nss); | ||
712 | }); | ||
713 | } | ||
714 | |||
715 | template<typename T> | ||
716 | void ifTmi8Element(const rapidxml::xml_node<> &node, const Xmlns *nss, const T &fn) { | ||
717 | ifResolvable(node, nss, [&](std::string_view ns_url, std::string_view name, const Xmlns *nss) { | ||
718 | if (node.type() == rapidxml::node_element && (ns_url.empty() || ns_url == TMI8_XML_NS)) fn(name, nss); | ||
719 | }); | ||
720 | } | ||
721 | |||
722 | bool onlyTextElement(const rapidxml::xml_node<> &node) { | ||
723 | return node.type() == rapidxml::node_element | ||
724 | && node.first_node() | ||
725 | && node.first_node() == node.last_node() | ||
726 | && node.first_node()->type() == rapidxml::node_data; | ||
727 | } | ||
728 | |||
729 | std::string_view getValue(const rapidxml::xml_node<> &node) { | ||
730 | return std::string_view(node.value(), node.value_size()); | ||
731 | } | ||
732 | |||
733 | bool parseStringValue(std::string &into, size_t max_len, std::string_view val) { | ||
734 | if (val.size() > max_len) | ||
735 | return false; | ||
736 | into = val; | ||
737 | return true; | ||
738 | } | ||
739 | |||
740 | struct Kv6Parser { | ||
741 | std::stringstream &errs; | ||
742 | std::stringstream &warns; | ||
743 | |||
744 | void error(std::string_view msg) { | ||
745 | errs << msg << '\n'; | ||
746 | } | ||
747 | |||
748 | void warn(std::string_view msg) { | ||
749 | warns << msg << '\n'; | ||
750 | } | ||
751 | |||
752 | #define PERRASSERT(msg, ...) do { if (!(__VA_ARGS__)) { error(msg); return; } } while (false) | ||
753 | #define PWARNASSERT(msg, ...) do { if (!(__VA_ARGS__)) { warn(msg); return; } } while (false) | ||
754 | |||
755 | std::optional<Kv6Record> parseKv6PosInfoRecord(Kv6RecordType type, const rapidxml::xml_node<> &node, const Xmlns *nss) { | ||
756 | Kv6Record fields = { .type = type }; | ||
757 | for (const rapidxml::xml_node<> *child = node.first_node(); child; child = child->next_sibling()) { | ||
758 | ifTmi8Element(*child, nss, [&](std::string_view name, const Xmlns *nss) { | ||
759 | for (const auto &[fname, field] : KV6_POS_INFO_RECORD_FIELDS) { | ||
760 | if (field == KV6F_NONE) | ||
761 | continue; | ||
762 | if (fname == name) { | ||
763 | PWARNASSERT("Expected KV6 record field element to only contain data", | ||
764 | onlyTextElement(*child)); | ||
765 | std::string_view childval = getValue(*child); | ||
766 | switch (field) { | ||
767 | case KV6F_DATA_OWNER_CODE: | ||
768 | PWARNASSERT("Invalid value for dataownercode", | ||
769 | parseStringValue(fields.data_owner_code, 10, childval)); | ||
770 | break; | ||
771 | case KV6F_LINE_PLANNING_NUMBER: | ||
772 | PWARNASSERT("Invalid value for lineplanningnumber", | ||
773 | parseStringValue(fields.line_planning_number, 10, childval)); | ||
774 | break; | ||
775 | case KV6F_OPERATING_DAY: | ||
776 | PWARNASSERT("Invalid value for operatatingday: not a valid date", | ||
777 | Date::parse(fields.operating_day, childval)); | ||
778 | break; | ||
779 | case KV6F_JOURNEY_NUMBER: | ||
780 | PWARNASSERT("Invalid value for journeynumber:" | ||
781 | " not a valid unsigned number with at most six digits", | ||
782 | parseUnsigned<6>(fields.journey_number, childval)); | ||
783 | break; | ||
784 | case KV6F_REINFORCEMENT_NUMBER: | ||
785 | PWARNASSERT("Invalid value for reinforcementnumber:" | ||
786 | " not a valid unsigned number with at most two digits", | ||
787 | parseUnsigned<2>(fields.reinforcement_number, childval)); | ||
788 | break; | ||
789 | case KV6F_TIMESTAMP: | ||
790 | PWARNASSERT("Invalid value for timestamp: not a valid timestamp", | ||
791 | Timestamp::parse(fields.timestamp, childval)); | ||
792 | break; | ||
793 | case KV6F_SOURCE: | ||
794 | PWARNASSERT("Invalid value for source:" | ||
795 | " not a valid string of at most 10 bytes", | ||
796 | parseStringValue(fields.source, 10, childval)); | ||
797 | break; | ||
798 | case KV6F_PUNCTUALITY: | ||
799 | PWARNASSERT("Invalid value for punctuality:" | ||
800 | " not a valid signed number with at most four digits", | ||
801 | parseSigned<4>(fields.punctuality, childval)); | ||
802 | break; | ||
803 | case KV6F_USER_STOP_CODE: | ||
804 | PWARNASSERT("Invalid value for userstopcode:" | ||
805 | " not a valid string of at most 10 bytes", | ||
806 | parseStringValue(fields.user_stop_code, 10, childval)); | ||
807 | break; | ||
808 | case KV6F_PASSAGE_SEQUENCE_NUMBER: | ||
809 | PWARNASSERT("Invalid value for passagesequencenumber:" | ||
810 | " not a valid unsigned number with at most four digits", | ||
811 | parseUnsigned<4>(fields.passage_sequence_number, childval)); | ||
812 | break; | ||
813 | case KV6F_VEHICLE_NUMBER: | ||
814 | PWARNASSERT("Invalid value for vehiclenumber:" | ||
815 | " not a valid unsigned number with at most six digits", | ||
816 | parseUnsigned<6>(fields.vehicle_number, childval)); | ||
817 | break; | ||
818 | case KV6F_BLOCK_CODE: | ||
819 | PWARNASSERT("Invalid value for blockcode:" | ||
820 | " not a valid unsigned number with at most eight digits", | ||
821 | parseUnsigned<8>(fields.block_code, childval)); | ||
822 | break; | ||
823 | case KV6F_WHEELCHAIR_ACCESSIBLE: | ||
824 | PWARNASSERT("Invalid value for wheelchairaccessible:" | ||
825 | " not a valid value for wheelchair accessibility", | ||
826 | childval == "ACCESSIBLE" | ||
827 | || childval == "NOTACCESSIBLE" | ||
828 | || childval == "UNKNOWN"); | ||
829 | fields.wheelchair_accessible = childval; | ||
830 | break; | ||
831 | case KV6F_NUMBER_OF_COACHES: | ||
832 | PWARNASSERT("Invalid for numberofcoaches:" | ||
833 | " not a valid unsigned number with at most two digits", | ||
834 | parseUnsigned<2>(fields.number_of_coaches, childval)); | ||
835 | break; | ||
836 | case KV6F_RD_X: | ||
837 | PWARNASSERT("Invalid value for rd-x:" | ||
838 | " not a valid signed number with at most six digits", | ||
839 | parseSigned<6>(fields.rd_x, childval)); | ||
840 | break; | ||
841 | case KV6F_RD_Y: | ||
842 | PWARNASSERT("Invalid value for rd-y:" | ||
843 | " not a valid signed number with at most six digits", | ||
844 | parseSigned<6>(fields.rd_y, childval)); | ||
845 | break; | ||
846 | case KV6F_DISTANCE_SINCE_LAST_USER_STOP: | ||
847 | PWARNASSERT("Invalid value for distancesincelastuserstop:" | ||
848 | " not a valid unsigned number with at most five digits", | ||
849 | parseUnsigned<5>(fields.distance_since_last_user_stop, childval)); | ||
850 | break; | ||
851 | case KV6F_NONE: | ||
852 | error("NONE field type case should be unreachable in parseKv6PosInfoRecord"); | ||
853 | return; | ||
854 | } | ||
855 | fields.markPresent(field); | ||
856 | break; | ||
857 | } | ||
858 | } | ||
859 | }); | ||
860 | } | ||
861 | |||
862 | fields.removeUnsupportedFields(); | ||
863 | |||
864 | if (!fields.valid()) | ||
865 | return std::nullopt; | ||
866 | return fields; | ||
867 | } | ||
868 | |||
869 | std::vector<Kv6Record> parseKv6PosInfo(const rapidxml::xml_node<> &node, const Xmlns *nss) { | ||
870 | std::vector<Kv6Record> records; | ||
871 | for (const rapidxml::xml_node<> *child = node.first_node(); child; child = child->next_sibling()) { | ||
872 | ifTmi8Element(*child, nss, [&](std::string_view name, const Xmlns *nss) { | ||
873 | for (auto type = _KV6T_FIRST_TYPE; | ||
874 | type != _KV6T_LAST_TYPE; | ||
875 | type = static_cast<Kv6RecordType>(type + 1)) { | ||
876 | if (type == KV6T_UNKNOWN) | ||
877 | continue; | ||
878 | if (KV6_POS_INFO_RECORD_TYPES[type] == name) { | ||
879 | auto record = parseKv6PosInfoRecord(type, *child, nss); | ||
880 | if (record) { | ||
881 | records.push_back(*record); | ||
882 | } | ||
883 | } | ||
884 | } | ||
885 | }); | ||
886 | } | ||
887 | return records; | ||
888 | } | ||
889 | |||
890 | std::optional<Tmi8VvTmPushInfo> parseVvTmPush(const rapidxml::xml_node<> &node, const Xmlns *nss) { | ||
891 | Tmi8VvTmPushInfo info; | ||
892 | for (const rapidxml::xml_node<> *child = node.first_node(); child; child = child->next_sibling()) { | ||
893 | ifTmi8Element(*child, nss, [&](std::string_view name, const Xmlns *nss) { | ||
894 | if (name == "Timestamp") { | ||
895 | PERRASSERT("Invalid value for Timestamp: Bad format", onlyTextElement(*child)); | ||
896 | PERRASSERT("Invalid value for Timestamp: Invalid timestamp", Timestamp::parse(info.timestamp, getValue(*child))); | ||
897 | info.markPresent(TMI8F_TIMESTAMP); | ||
898 | } else if (name == "SubscriberID") { | ||
899 | PERRASSERT("Invalid value for SubscriberID: Bad format", onlyTextElement(*child)); | ||
900 | info.subscriber_id = getValue(*child); | ||
901 | info.markPresent(TMI8F_SUBSCRIBER_ID); | ||
902 | } else if (name == "Version") { | ||
903 | PERRASSERT("Invalid value for Version: Bad format", onlyTextElement(*child)); | ||
904 | info.version = getValue(*child); | ||
905 | info.markPresent(TMI8F_VERSION); | ||
906 | } else if (name == "DossierName") { | ||
907 | PERRASSERT("Invalid value for DossierName: Bad format", onlyTextElement(*child)); | ||
908 | info.dossier_name = getValue(*child); | ||
909 | info.markPresent(TMI8F_DOSSIER_NAME); | ||
910 | } else if (name == "KV6posinfo") { | ||
911 | info.messages = parseKv6PosInfo(*child, nss); | ||
912 | } | ||
913 | }); | ||
914 | } | ||
915 | |||
916 | if (!info.valid()) | ||
917 | return std::nullopt; | ||
918 | return info; | ||
919 | } | ||
920 | |||
921 | std::optional<Tmi8VvTmPushInfo> parse(const rapidxml::xml_document<> &doc) { | ||
922 | std::optional<Tmi8VvTmPushInfo> msg; | ||
923 | for (const rapidxml::xml_node<> *node = doc.first_node(); node; node = node->next_sibling()) { | ||
924 | ifTmi8Element(*node, nullptr /* nss */, [&](std::string_view name, const Xmlns *nss) { | ||
925 | if (name == "VV_TM_PUSH") { | ||
926 | if (msg) { | ||
927 | error("Duplicated VV_TM_PUSH"); | ||
928 | return; | ||
929 | } | ||
930 | msg = parseVvTmPush(*node, nss); | ||
931 | if (!msg) { | ||
932 | error("Invalid VV_TM_PUSH"); | ||
933 | } | ||
934 | } | ||
935 | }); | ||
936 | } | ||
937 | if (!msg) | ||
938 | error("Expected to find VV_TM_PUSH"); | ||
939 | return msg; | ||
940 | } | ||
941 | }; | ||
942 | |||
943 | std::optional<Tmi8VvTmPushInfo> parseXml(const rapidxml::xml_document<> &doc, std::stringstream &errs, std::stringstream &warns) { | ||
944 | Kv6Parser parser = { errs, warns }; | ||
945 | return parser.parse(doc); | ||
946 | } | ||
947 | |||
948 | struct Metrics { | ||
949 | prometheus::Counter &messages_counter_ok; | ||
950 | prometheus::Counter &messages_counter_error; | ||
951 | prometheus::Counter &messages_counter_warning; | ||
952 | prometheus::Counter &rows_written_counter; | ||
953 | prometheus::Histogram &records_hist; | ||
954 | prometheus::Histogram &message_parse_hist; | ||
955 | prometheus::Histogram &payload_size_hist; | ||
956 | |||
957 | using BucketBoundaries = prometheus::Histogram::BucketBoundaries; | ||
958 | |||
959 | enum class ParseStatus { | ||
960 | OK, | ||
961 | WARNING, | ||
962 | ERROR, | ||
963 | }; | ||
964 | |||
965 | Metrics(std::shared_ptr<prometheus::Registry> registry) : | ||
966 | Metrics(registry, prometheus::BuildCounter() | ||
967 | .Name("kv6_vv_tm_push_messages_total") | ||
968 | .Help("Number of KV6 VV_TM_PUSH messages received") | ||
969 | .Register(*registry)) | ||
970 | {} | ||
971 | |||
972 | void addMeasurement(std::chrono::duration<double> took_secs, size_t payload_size, size_t records, ParseStatus parsed) { | ||
973 | double millis = took_secs.count() * 1000.0; | ||
974 | |||
975 | if (parsed == ParseStatus::OK) messages_counter_ok.Increment(); | ||
976 | else if (parsed == ParseStatus::WARNING) messages_counter_warning.Increment(); | ||
977 | else if (parsed == ParseStatus::ERROR) messages_counter_error.Increment(); | ||
978 | records_hist.Observe(static_cast<double>(records)); | ||
979 | message_parse_hist.Observe(millis); | ||
980 | payload_size_hist.Observe(static_cast<double>(payload_size)); | ||
981 | } | ||
982 | |||
983 | void rowsWritten(int64_t rows) { | ||
984 | rows_written_counter.Increment(static_cast<double>(rows)); | ||
985 | } | ||
986 | |||
987 | private: | ||
988 | Metrics(std::shared_ptr<prometheus::Registry> registry, | ||
989 | prometheus::Family<prometheus::Counter> &messages_counter) : | ||
990 | messages_counter_ok(messages_counter | ||
991 | .Add({{ "status", "ok" }})), | ||
992 | messages_counter_error(messages_counter | ||
993 | .Add({{ "status", "error" }})), | ||
994 | messages_counter_warning(messages_counter | ||
995 | .Add({{ "status", "warning" }})), | ||
996 | rows_written_counter(prometheus::BuildCounter() | ||
997 | .Name("kv6_vv_tm_push_records_written") | ||
998 | .Help("Numer of VV_TM_PUSH records written to disk") | ||
999 | .Register(*registry) | ||
1000 | .Add({})), | ||
1001 | records_hist(prometheus::BuildHistogram() | ||
1002 | .Name("kv6_vv_tm_push_records_amount") | ||
1003 | .Help("Number of KV6 VV_TM_PUSH records") | ||
1004 | .Register(*registry) | ||
1005 | .Add({}, BucketBoundaries{ 5.0, 10.0, 20.0, 50.0, 100.0, 250.0, 500.0 })), | ||
1006 | message_parse_hist(prometheus::BuildHistogram() | ||
1007 | .Name("kv6_vv_tm_push_message_parse_millis") | ||
1008 | .Help("Milliseconds taken to parse KV6 VV_TM_PUSH messages") | ||
1009 | .Register(*registry) | ||
1010 | .Add({}, BucketBoundaries{ 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 100.0, 1000.0, 2000.0 })), | ||
1011 | payload_size_hist(prometheus::BuildHistogram() | ||
1012 | .Name("kv6_payload_size") | ||
1013 | .Help("Sizes of KV6 ZeroMQ message payloads") | ||
1014 | .Register(*registry) | ||
1015 | .Add({}, BucketBoundaries{ 500.0, 1000.0, 2500.0, 5000.0, 10000.0, 25000.0, 50000.0 })) | ||
1016 | {} | ||
1017 | }; | ||
1018 | |||
1019 | // Note: it *must* hold that decompressed[size] == 0 | ||
1020 | std::optional<Tmi8VvTmPushInfo> parseMsg(char *decompressed, size_t size, Metrics &metrics, std::stringstream &errs, std::stringstream &warns) { | ||
1021 | auto start = std::chrono::steady_clock::now(); | ||
1022 | |||
1023 | std::optional<Tmi8VvTmPushInfo> info; | ||
1024 | |||
1025 | if (decompressed[size] != 0) { | ||
1026 | errs << "Not parsing: missing null terminator" << '\n'; | ||
1027 | } else { | ||
1028 | rapidxml::xml_document<> doc; | ||
1029 | constexpr int PARSE_FLAGS = rapidxml::parse_trim_whitespace | ||
1030 | | rapidxml::parse_no_string_terminators | ||
1031 | | rapidxml::parse_validate_closing_tags; | ||
1032 | |||
1033 | try { | ||
1034 | doc.parse<PARSE_FLAGS>(decompressed); | ||
1035 | info = parseXml(doc, errs, warns); | ||
1036 | } catch (const rapidxml::parse_error &err) { | ||
1037 | errs << "XML parsing failed" << '\n'; | ||
1038 | } | ||
1039 | } | ||
1040 | |||
1041 | auto end = std::chrono::steady_clock::now(); | ||
1042 | std::chrono::duration<double> took = end - start; | ||
1043 | |||
1044 | if (info) | ||
1045 | if (warns.view().empty()) | ||
1046 | metrics.addMeasurement(took, size, info->messages.size(), Metrics::ParseStatus::OK); | ||
1047 | else | ||
1048 | metrics.addMeasurement(took, size, info->messages.size(), Metrics::ParseStatus::WARNING); | ||
1049 | else | ||
1050 | metrics.addMeasurement(took, size, 0, Metrics::ParseStatus::ERROR); | ||
1051 | |||
1052 | return info; | ||
1053 | } | ||
1054 | |||
1055 | bool terminate = false; | ||
1056 | |||
1057 | void onSigIntOrTerm(int /* signum */) { | ||
1058 | terminate = true; | ||
1059 | } | ||
1060 | |||
1061 | arrow::Result<std::shared_ptr<arrow::Table>> getTable(const std::vector<Kv6Record> &messages, size_t &rows_written) { | ||
1062 | ParquetBuilder builder; | ||
1063 | |||
1064 | for (const auto &msg : messages) { | ||
1065 | Kv6Field present = msg.presence; | ||
1066 | Kv6Field required = KV6T_REQUIRED_FIELDS[msg.type]; | ||
1067 | Kv6Field optional = KV6T_OPTIONAL_FIELDS[msg.type]; | ||
1068 | if ((~msg.presence & required) != 0) { | ||
1069 | std::cout << "Invalid message: not all required fields present; skipping" << std::endl; | ||
1070 | continue; | ||
1071 | } | ||
1072 | Kv6Field used = static_cast<Kv6Field>(present & (required | optional)); | ||
1073 | rows_written++; | ||
1074 | |||
1075 | // RD-X and RD-Y fix: some datatypes have these fields marked as required, but still give option | ||
1076 | // of not providing these fields by setting them to -1. We want this normalized, where these | ||
1077 | // fields are instead simply marked as not present. | ||
1078 | if ((used & KV6F_RD_X) && msg.rd_x == -1) | ||
1079 | used = static_cast<Kv6Field>(used & ~KV6F_RD_X); | ||
1080 | if ((used & KV6F_RD_Y) && msg.rd_y == -1) | ||
1081 | used = static_cast<Kv6Field>(used & ~KV6F_RD_Y); | ||
1082 | |||
1083 | ARROW_RETURN_NOT_OK(builder.types.Append(*findKv6PosInfoRecordTypeName(msg.type))); | ||
1084 | ARROW_RETURN_NOT_OK(used & KV6F_DATA_OWNER_CODE | ||
1085 | ? builder.data_owner_codes.Append(msg.data_owner_code) | ||
1086 | : builder.data_owner_codes.AppendNull()); | ||
1087 | ARROW_RETURN_NOT_OK(used & KV6F_LINE_PLANNING_NUMBER | ||
1088 | ? builder.line_planning_numbers.Append(msg.line_planning_number) | ||
1089 | : builder.line_planning_numbers.AppendNull()); | ||
1090 | ARROW_RETURN_NOT_OK(used & KV6F_OPERATING_DAY | ||
1091 | ? builder.operating_days.Append(static_cast<int32_t>(msg.operating_day.toUnixDays().count())) | ||
1092 | : builder.operating_days.AppendNull()); | ||
1093 | ARROW_RETURN_NOT_OK(used & KV6F_JOURNEY_NUMBER | ||
1094 | ? builder.journey_numbers.Append(msg.journey_number) | ||
1095 | : builder.journey_numbers.AppendNull()); | ||
1096 | ARROW_RETURN_NOT_OK(used & KV6F_REINFORCEMENT_NUMBER | ||
1097 | ? builder.reinforcement_numbers.Append(msg.reinforcement_number) | ||
1098 | : builder.reinforcement_numbers.AppendNull()); | ||
1099 | ARROW_RETURN_NOT_OK(used & KV6F_TIMESTAMP | ||
1100 | ? builder.timestamps.Append(msg.timestamp.toUnixSeconds().count()) | ||
1101 | : builder.timestamps.AppendNull()); | ||
1102 | ARROW_RETURN_NOT_OK(used & KV6F_SOURCE | ||
1103 | ? builder.sources.Append(msg.source) | ||
1104 | : builder.sources.AppendNull()); | ||
1105 | ARROW_RETURN_NOT_OK(used & KV6F_PUNCTUALITY | ||
1106 | ? builder.punctualities.Append(msg.punctuality) | ||
1107 | : builder.punctualities.AppendNull()); | ||
1108 | ARROW_RETURN_NOT_OK(used & KV6F_USER_STOP_CODE | ||
1109 | ? builder.user_stop_codes.Append(msg.user_stop_code) | ||
1110 | : builder.user_stop_codes.AppendNull()); | ||
1111 | ARROW_RETURN_NOT_OK(used & KV6F_PASSAGE_SEQUENCE_NUMBER | ||
1112 | ? builder.passage_sequence_numbers.Append(msg.passage_sequence_number) | ||
1113 | : builder.passage_sequence_numbers.AppendNull()); | ||
1114 | ARROW_RETURN_NOT_OK(used & KV6F_VEHICLE_NUMBER | ||
1115 | ? builder.vehicle_numbers.Append(msg.vehicle_number) | ||
1116 | : builder.vehicle_numbers.AppendNull()); | ||
1117 | ARROW_RETURN_NOT_OK(used & KV6F_BLOCK_CODE | ||
1118 | ? builder.block_codes.Append(msg.block_code) | ||
1119 | : builder.block_codes.AppendNull()); | ||
1120 | ARROW_RETURN_NOT_OK(used & KV6F_WHEELCHAIR_ACCESSIBLE | ||
1121 | ? builder.wheelchair_accessibles.Append(msg.wheelchair_accessible) | ||
1122 | : builder.wheelchair_accessibles.AppendNull()); | ||
1123 | ARROW_RETURN_NOT_OK(used & KV6F_NUMBER_OF_COACHES | ||
1124 | ? builder.number_of_coaches.Append(msg.number_of_coaches) | ||
1125 | : builder.number_of_coaches.AppendNull()); | ||
1126 | ARROW_RETURN_NOT_OK(used & KV6F_RD_Y | ||
1127 | ? builder.rd_ys.Append(msg.rd_y) | ||
1128 | : builder.rd_ys.AppendNull()); | ||
1129 | ARROW_RETURN_NOT_OK(used & KV6F_RD_X | ||
1130 | ? builder.rd_xs.Append(msg.rd_x) | ||
1131 | : builder.rd_xs.AppendNull()); | ||
1132 | ARROW_RETURN_NOT_OK(used & KV6F_DISTANCE_SINCE_LAST_USER_STOP | ||
1133 | ? builder.distance_since_last_user_stops.Append(msg.distance_since_last_user_stop) | ||
1134 | : builder.distance_since_last_user_stops.AppendNull()); | ||
1135 | } | ||
1136 | |||
1137 | return builder.getTable(); | ||
1138 | } | ||
1139 | |||
1140 | std::tuple<int64_t, int64_t> getMinMaxTimestamp(const std::vector<Kv6Record> &messages) { | ||
1141 | if (messages.size() == 0) | ||
1142 | return { 0, 0 }; | ||
1143 | int64_t min = std::numeric_limits<int64_t>::max(); | ||
1144 | int64_t max = 0; | ||
1145 | for (const auto &message : messages) { | ||
1146 | if (~message.presence & KV6F_TIMESTAMP) | ||
1147 | continue; | ||
1148 | int64_t seconds = message.timestamp.toUnixSeconds().count(); | ||
1149 | if (seconds < min) | ||
1150 | min = seconds; | ||
1151 | if (seconds > max) | ||
1152 | max = seconds; | ||
1153 | } | ||
1154 | if (min == std::numeric_limits<decltype(min)>::max()) | ||
1155 | return { 0, 0 }; // this is stupid | ||
1156 | return { min, max }; | ||
1157 | } | ||
1158 | |||
1159 | arrow::Status writeParquet(const std::vector<Kv6Record> &messages, Metrics &metrics) { | ||
1160 | size_t rows_written = 0; | ||
1161 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, getTable(messages, rows_written)); | ||
1162 | |||
1163 | auto timestamp = std::chrono::round<std::chrono::seconds>(std::chrono::utc_clock::now()); | ||
1164 | std::string filename = std::format("oeuf-{:%FT%T%Ez}.parquet", timestamp); | ||
1165 | ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*table, filename)); | ||
1166 | std::cout << "Wrote Parquet file " << filename << std::endl; | ||
1167 | |||
1168 | auto [min_timestamp, max_timestamp] = getMinMaxTimestamp(messages); | ||
1169 | std::ofstream metaf(filename + ".meta.json.part", std::ios::binary); | ||
1170 | nlohmann::json meta{ | ||
1171 | { "min_timestamp", min_timestamp }, | ||
1172 | { "max_timestamp", max_timestamp }, | ||
1173 | { "rows_written", rows_written }, | ||
1174 | }; | ||
1175 | metaf << meta; | ||
1176 | metaf.close(); | ||
1177 | std::filesystem::rename(filename + ".meta.json.part", filename + ".meta.json"); | ||
1178 | |||
1179 | metrics.rowsWritten(rows_written); | ||
1180 | |||
1181 | return arrow::Status::OK(); | ||
1182 | } | ||
1183 | |||
1184 | using SteadyTime = std::chrono::steady_clock::time_point; | ||
1185 | |||
1186 | std::string dumpFailedMsg(std::string_view txt, std::string_view errs, std::string_view warns) { | ||
1187 | auto timestamp = std::chrono::round<std::chrono::seconds>(std::chrono::utc_clock::now()); | ||
1188 | std::string filename = std::format("oeuf-error-{:%FT%T%Ez}.txt", timestamp); | ||
1189 | std::ofstream dumpf(filename, std::ios::binary); | ||
1190 | dumpf << "======= ERROR MESSAGES ========" << std::endl; | ||
1191 | dumpf << errs; | ||
1192 | dumpf << "======= WARNING MESSAGES ======" << std::endl; | ||
1193 | dumpf << warns; | ||
1194 | dumpf << "======= RECEIVED MESSAGE ======" << std::endl; | ||
1195 | dumpf << txt << std::endl; | ||
1196 | dumpf.close(); | ||
1197 | return filename; | ||
1198 | } | ||
1199 | |||
1200 | void handleMsg(RawMessage &msg, Metrics &metrics, SteadyTime &last_output, std::vector<Kv6Record> &msg_buf) { | ||
1201 | unsigned int decompressed_size = 0; | ||
1202 | if (msg.getBodySize() > std::numeric_limits<unsigned int>::max()) | ||
1203 | std::cout << "parseMsg failed due to too large message" << std::endl; | ||
1204 | char *decompressed = decompress(msg.getBody(), static_cast<unsigned int>(msg.getBodySize()), decompressed_size); | ||
1205 | |||
1206 | std::stringstream errs; | ||
1207 | std::stringstream warns; | ||
1208 | // We know that decompressed[decompressed_size] == 0 because decompress() ensures this. | ||
1209 | auto parsed_msg = parseMsg(decompressed, decompressed_size, metrics, errs, warns); | ||
1210 | if (parsed_msg) { | ||
1211 | const Tmi8VvTmPushInfo &info = *parsed_msg; | ||
1212 | auto new_msgs_it = info.messages.begin(); | ||
1213 | while (new_msgs_it != info.messages.end()) { | ||
1214 | size_t remaining_space = MAX_PARQUET_CHUNK - msg_buf.size(); | ||
1215 | size_t new_msgs_left = info.messages.end() - new_msgs_it; | ||
1216 | auto new_msgs_start = new_msgs_it; | ||
1217 | auto new_msgs_end = new_msgs_start + std::min(remaining_space, new_msgs_left); | ||
1218 | new_msgs_it = new_msgs_end; | ||
1219 | msg_buf.insert(msg_buf.end(), new_msgs_start, new_msgs_end); | ||
1220 | |||
1221 | bool time_expired = std::chrono::steady_clock::now() - last_output > std::chrono::minutes(5); | ||
1222 | if (msg_buf.size() >= MAX_PARQUET_CHUNK || (new_msgs_it == info.messages.end() && time_expired)) { | ||
1223 | arrow::Status status = writeParquet(msg_buf, metrics); | ||
1224 | if (!status.ok()) | ||
1225 | std::cout << "Writing Parquet file failed: " << status << std::endl; | ||
1226 | msg_buf.clear(); | ||
1227 | last_output = std::chrono::steady_clock::now(); | ||
1228 | } | ||
1229 | } | ||
1230 | if (!errs.view().empty() || !warns.view().empty()) { | ||
1231 | std::filesystem::path dump_file = dumpFailedMsg(std::string_view(decompressed, decompressed_size), errs.str(), warns.str()); | ||
1232 | std::cout << "parseMsg finished with warnings: details dumped to " << dump_file << std::endl; | ||
1233 | } | ||
1234 | } else { | ||
1235 | std::filesystem::path dump_file = dumpFailedMsg(std::string_view(decompressed, decompressed_size), errs.str(), warns.str()); | ||
1236 | std::cout << "parseMsg failed: error details dumped to " << dump_file << std::endl; | ||
1237 | } | ||
1238 | free(decompressed); | ||
1239 | } | ||
1240 | |||
1241 | int main(int argc, char *argv[]) { | ||
1242 | std::cout << "Working directory: " << std::filesystem::current_path() << std::endl; | ||
1243 | |||
1244 | const char *metrics_addr = getenv("METRICS_ADDR"); | ||
1245 | if (!metrics_addr || strlen(metrics_addr) == 0) { | ||
1246 | std::cout << "Error: no METRICS_ADDR set!" << std::endl; | ||
1247 | exit(EXIT_FAILURE); | ||
1248 | } | ||
1249 | prometheus::Exposer exposer{metrics_addr}; | ||
1250 | |||
1251 | bool prod = false; | ||
1252 | const char *prod_env = getenv("NDOV_PRODUCTION"); | ||
1253 | if (prod_env && strcmp(prod_env, "true") == 0) prod = true; | ||
1254 | |||
1255 | void *zmq_context = zmq_ctx_new(); | ||
1256 | void *zmq_subscriber = zmq_socket(zmq_context, ZMQ_SUB); | ||
1257 | int rc = zmq_connect(zmq_subscriber, prod ? "tcp://pubsub.ndovloket.nl:7658" : "tcp://pubsub.besteffort.ndovloket.nl:7658"); | ||
1258 | assert(rc == 0); | ||
1259 | |||
1260 | const char *topic = "/CXX/KV6posinfo"; | ||
1261 | rc = zmq_setsockopt(zmq_subscriber, ZMQ_SUBSCRIBE, topic, strlen(topic)); | ||
1262 | assert(rc == 0); | ||
1263 | |||
1264 | signal(SIGINT, onSigIntOrTerm); | ||
1265 | signal(SIGTERM, onSigIntOrTerm); | ||
1266 | |||
1267 | SteadyTime last_output = std::chrono::steady_clock::now(); | ||
1268 | |||
1269 | auto registry = std::make_shared<prometheus::Registry>(); | ||
1270 | Metrics metrics(registry); | ||
1271 | exposer.RegisterCollectable(registry); | ||
1272 | |||
1273 | std::vector<Kv6Record> msg_buf; | ||
1274 | while (!terminate) { | ||
1275 | std::optional<RawMessage> msg = recvMsg(zmq_subscriber); | ||
1276 | if (!msg) { | ||
1277 | if (!terminate) | ||
1278 | perror("recvMsg"); | ||
1279 | continue; | ||
1280 | } | ||
1281 | handleMsg(*msg, metrics, last_output, msg_buf); | ||
1282 | } | ||
1283 | |||
1284 | std::cout << "Terminating" << std::endl; | ||
1285 | if (msg_buf.size() > 0) { | ||
1286 | arrow::Status status = writeParquet(msg_buf, metrics); | ||
1287 | if (!status.ok()) std::cout << "Writing final Parquet file failed: " << status << std::endl; | ||
1288 | else std::cout << "Final data written" << std::endl; | ||
1289 | msg_buf.clear(); | ||
1290 | } | ||
1291 | |||
1292 | if (zmq_close(zmq_subscriber)) | ||
1293 | perror("zmq_close"); | ||
1294 | if (zmq_ctx_destroy(zmq_context)) | ||
1295 | perror("zmq_ctx_destroy"); | ||
1296 | |||
1297 | std::cout << "Bye" << std::endl; | ||
1298 | |||
1299 | return 0; | ||
1300 | } | ||