From 17a3ea880402338420699e03bcb24181e4ff3924 Mon Sep 17 00:00:00 2001 From: Rutger Broekhoff Date: Thu, 2 May 2024 20:27:40 +0200 Subject: Initial commit Based on dc4ba6a --- src/augmentkv6/.envrc | 2 + src/augmentkv6/Makefile | 21 + src/augmentkv6/main.cpp | 510 ++++++++++++++++ src/bundleparquet/.envrc | 2 + src/bundleparquet/Makefile | 21 + src/bundleparquet/main.cpp | 213 +++++++ src/bundleparquet/spliturl.cpp | 203 +++++++ src/bundleparquet/spliturl.hpp | 11 + src/filterkv6/.envrc | 2 + src/filterkv6/Makefile | 21 + src/filterkv6/main.cpp | 106 ++++ src/querykv1/.envrc | 2 + src/querykv1/.gitignore | 1 + src/querykv1/Makefile | 28 + src/querykv1/cliopts.cpp | 456 ++++++++++++++ src/querykv1/cliopts.hpp | 35 ++ src/querykv1/daterange.cpp | 91 +++ src/querykv1/daterange.hpp | 118 ++++ src/querykv1/grammar.abnf | 44 ++ src/querykv1/grammar.ebnf | 47 ++ src/querykv1/grammar.ebnf.bak | 23 + src/querykv1/joparoute.cpp | 102 ++++ src/querykv1/joparoute.hpp | 13 + src/querykv1/journeyinfo.cpp | 64 ++ src/querykv1/journeyinfo.hpp | 13 + src/querykv1/journeyroute.cpp | 96 +++ src/querykv1/journeyroute.hpp | 13 + src/querykv1/journeys.cpp | 95 +++ src/querykv1/journeys.hpp | 13 + src/querykv1/main.cpp | 198 ++++++ src/querykv1/schedule.cpp | 63 ++ src/querykv1/schedule.hpp | 13 + src/recvkv6/.envrc | 2 + src/recvkv6/Makefile | 21 + src/recvkv6/main.cpp | 1300 ++++++++++++++++++++++++++++++++++++++++ 35 files changed, 3963 insertions(+) create mode 100644 src/augmentkv6/.envrc create mode 100644 src/augmentkv6/Makefile create mode 100644 src/augmentkv6/main.cpp create mode 100644 src/bundleparquet/.envrc create mode 100644 src/bundleparquet/Makefile create mode 100644 src/bundleparquet/main.cpp create mode 100644 src/bundleparquet/spliturl.cpp create mode 100644 src/bundleparquet/spliturl.hpp create mode 100644 src/filterkv6/.envrc create mode 100644 src/filterkv6/Makefile create mode 100644 src/filterkv6/main.cpp create mode 100644 src/querykv1/.envrc create mode 100644 src/querykv1/.gitignore create mode 100644 src/querykv1/Makefile create mode 100644 src/querykv1/cliopts.cpp create mode 100644 src/querykv1/cliopts.hpp create mode 100644 src/querykv1/daterange.cpp create mode 100644 src/querykv1/daterange.hpp create mode 100644 src/querykv1/grammar.abnf create mode 100644 src/querykv1/grammar.ebnf create mode 100644 src/querykv1/grammar.ebnf.bak create mode 100644 src/querykv1/joparoute.cpp create mode 100644 src/querykv1/joparoute.hpp create mode 100644 src/querykv1/journeyinfo.cpp create mode 100644 src/querykv1/journeyinfo.hpp create mode 100644 src/querykv1/journeyroute.cpp create mode 100644 src/querykv1/journeyroute.hpp create mode 100644 src/querykv1/journeys.cpp create mode 100644 src/querykv1/journeys.hpp create mode 100644 src/querykv1/main.cpp create mode 100644 src/querykv1/schedule.cpp create mode 100644 src/querykv1/schedule.hpp create mode 100644 src/recvkv6/.envrc create mode 100644 src/recvkv6/Makefile create mode 100644 src/recvkv6/main.cpp (limited to 'src') diff --git a/src/augmentkv6/.envrc b/src/augmentkv6/.envrc new file mode 100644 index 0000000..694e74f --- /dev/null +++ b/src/augmentkv6/.envrc @@ -0,0 +1,2 @@ +source_env ../../ +export DEVMODE=1 diff --git a/src/augmentkv6/Makefile b/src/augmentkv6/Makefile new file mode 100644 index 0000000..cebb291 --- /dev/null +++ b/src/augmentkv6/Makefile @@ -0,0 +1,21 @@ +# Taken from: +# Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide +# for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01, +# 2023. [Online]. Available: +# https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html +CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\ + -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \ + -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \ + -D_GLIBCXX_ASSERTIONS \ + -fstrict-flex-arrays=3 \ + -fstack-clash-protection -fstack-protector-strong +LDFLAGS=-larrow -larrow_acero -larrow_dataset -lparquet -ltmi8 -Wl,-z,defs \ + -Wl,-z,nodlopen -Wl,-z,noexecstack \ + -Wl,-z,relro -Wl,-z,now + +augmentkv6: main.cpp + $(CXX) -fPIE -pie -o $@ $^ $(CXXFLAGS) $(LDFLAGS) + +.PHONY: clean +clean: + rm augmentkv6 diff --git a/src/augmentkv6/main.cpp b/src/augmentkv6/main.cpp new file mode 100644 index 0000000..81a54d3 --- /dev/null +++ b/src/augmentkv6/main.cpp @@ -0,0 +1,510 @@ +// vim:set sw=2 ts=2 sts et: + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +using namespace std::string_view_literals; + +namespace ac = arrow::acero; +namespace ds = arrow::dataset; +namespace cp = arrow::compute; +using namespace arrow; + +using TimingClock = std::conditional_t< + std::chrono::high_resolution_clock::is_steady, + std::chrono::high_resolution_clock, + std::chrono::steady_clock>; + +std::string readKv1() { + fputs("Reading KV1 from standard input\n", stderr); + + char buf[4096]; + std::string data; + while (!feof(stdin) && !ferror(stdin)) { + size_t read = fread(buf, sizeof(char), 4096, stdin); + data.append(buf, read); + } + if (ferror(stdin)) { + fputs("Error when reading from stdin\n", stderr); + exit(1); + } + fprintf(stderr, "Read %lu bytes\n", data.size()); + + return data; +} + +std::vector lex() { + std::string data = readKv1(); + + auto start = TimingClock::now(); + Kv1Lexer lexer(data); + lexer.lex(); + auto end = TimingClock::now(); + + std::chrono::duration elapsed{end - start}; + double bytes = static_cast(data.size()) / 1'000'000; + double speed = bytes / elapsed.count(); + + if (!lexer.errors.empty()) { + fputs("Lexer reported errors:\n", stderr); + for (const auto &error : lexer.errors) + fprintf(stderr, "- %s\n", error.c_str()); + exit(1); + } + + fprintf(stderr, "Got %lu tokens\n", lexer.tokens.size()); + fprintf(stderr, "Duration: %f s\n", elapsed.count()); + fprintf(stderr, "Speed: %f MB/s\n", speed); + + return std::move(lexer.tokens); +} + +bool parse(Kv1Records &into) { + std::vector tokens = lex(); + + Kv1Parser parser(tokens, into); + parser.parse(); + + bool ok = true; + if (!parser.gerrors.empty()) { + ok = false; + fputs("Parser reported errors:\n", stderr); + for (const auto &error : parser.gerrors) + fprintf(stderr, "- %s\n", error.c_str()); + } + if (!parser.warns.empty()) { + fputs("Parser reported warnings:\n", stderr); + for (const auto &warn : parser.warns) + fprintf(stderr, "- %s\n", warn.c_str()); + } + + fprintf(stderr, "Parsed %lu records\n", into.size()); + + return ok; +} + +void printParsedRecords(const Kv1Records &records) { + fputs("Parsed records:\n", stderr); + fprintf(stderr, " organizational_units: %lu\n", records.organizational_units.size()); + fprintf(stderr, " higher_organizational_units: %lu\n", records.higher_organizational_units.size()); + fprintf(stderr, " user_stop_points: %lu\n", records.user_stop_points.size()); + fprintf(stderr, " user_stop_areas: %lu\n", records.user_stop_areas.size()); + fprintf(stderr, " timing_links: %lu\n", records.timing_links.size()); + fprintf(stderr, " links: %lu\n", records.links.size()); + fprintf(stderr, " lines: %lu\n", records.lines.size()); + fprintf(stderr, " destinations: %lu\n", records.destinations.size()); + fprintf(stderr, " journey_patterns: %lu\n", records.journey_patterns.size()); + fprintf(stderr, " concession_financer_relations: %lu\n", records.concession_financer_relations.size()); + fprintf(stderr, " concession_areas: %lu\n", records.concession_areas.size()); + fprintf(stderr, " financers: %lu\n", records.financers.size()); + fprintf(stderr, " journey_pattern_timing_links: %lu\n", records.journey_pattern_timing_links.size()); + fprintf(stderr, " points: %lu\n", records.points.size()); + fprintf(stderr, " point_on_links: %lu\n", records.point_on_links.size()); + fprintf(stderr, " icons: %lu\n", records.icons.size()); + fprintf(stderr, " notices: %lu\n", records.notices.size()); + fprintf(stderr, " notice_assignments: %lu\n", records.notice_assignments.size()); + fprintf(stderr, " time_demand_groups: %lu\n", records.time_demand_groups.size()); + fprintf(stderr, " time_demand_group_run_times: %lu\n", records.time_demand_group_run_times.size()); + fprintf(stderr, " period_groups: %lu\n", records.period_groups.size()); + fprintf(stderr, " specific_days: %lu\n", records.specific_days.size()); + fprintf(stderr, " timetable_versions: %lu\n", records.timetable_versions.size()); + fprintf(stderr, " public_journeys: %lu\n", records.public_journeys.size()); + fprintf(stderr, " period_group_validities: %lu\n", records.period_group_validities.size()); + fprintf(stderr, " exceptional_operating_days: %lu\n", records.exceptional_operating_days.size()); + fprintf(stderr, " schedule_versions: %lu\n", records.schedule_versions.size()); + fprintf(stderr, " public_journey_passing_times: %lu\n", records.public_journey_passing_times.size()); + fprintf(stderr, " operating_days: %lu\n", records.operating_days.size()); +} + +void printIndexSize(const Kv1Index &index) { + fputs("Index size:\n", stderr); + fprintf(stderr, " organizational_units: %lu\n", index.organizational_units.size()); + fprintf(stderr, " user_stop_points: %lu\n", index.user_stop_points.size()); + fprintf(stderr, " user_stop_areas: %lu\n", index.user_stop_areas.size()); + fprintf(stderr, " timing_links: %lu\n", index.timing_links.size()); + fprintf(stderr, " links: %lu\n", index.links.size()); + fprintf(stderr, " lines: %lu\n", index.lines.size()); + fprintf(stderr, " destinations: %lu\n", index.destinations.size()); + fprintf(stderr, " journey_patterns: %lu\n", index.journey_patterns.size()); + fprintf(stderr, " concession_financer_relations: %lu\n", index.concession_financer_relations.size()); + fprintf(stderr, " concession_areas: %lu\n", index.concession_areas.size()); + fprintf(stderr, " financers: %lu\n", index.financers.size()); + fprintf(stderr, " journey_pattern_timing_links: %lu\n", index.journey_pattern_timing_links.size()); + fprintf(stderr, " points: %lu\n", index.points.size()); + fprintf(stderr, " point_on_links: %lu\n", index.point_on_links.size()); + fprintf(stderr, " icons: %lu\n", index.icons.size()); + fprintf(stderr, " notices: %lu\n", index.notices.size()); + fprintf(stderr, " time_demand_groups: %lu\n", index.time_demand_groups.size()); + fprintf(stderr, " time_demand_group_run_times: %lu\n", index.time_demand_group_run_times.size()); + fprintf(stderr, " period_groups: %lu\n", index.period_groups.size()); + fprintf(stderr, " specific_days: %lu\n", index.specific_days.size()); + fprintf(stderr, " timetable_versions: %lu\n", index.timetable_versions.size()); + fprintf(stderr, " public_journeys: %lu\n", index.public_journeys.size()); + fprintf(stderr, " period_group_validities: %lu\n", index.period_group_validities.size()); + fprintf(stderr, " exceptional_operating_days: %lu\n", index.exceptional_operating_days.size()); + fprintf(stderr, " schedule_versions: %lu\n", index.schedule_versions.size()); + fprintf(stderr, " public_journey_passing_times: %lu\n", index.public_journey_passing_times.size()); + fprintf(stderr, " operating_days: %lu\n", index.operating_days.size()); +} + +struct BasicJourneyKey { + std::string data_owner_code; + std::string line_planning_number; + int journey_number; + + auto operator<=>(const BasicJourneyKey &) const = default; +}; + +size_t hash_value(const BasicJourneyKey &k) { + size_t seed = 0; + + boost::hash_combine(seed, k.data_owner_code); + boost::hash_combine(seed, k.line_planning_number); + boost::hash_combine(seed, k.journey_number); + + return seed; +} + +using BasicJourneyKeySet = std::unordered_set>; + +arrow::Result basicJourneys(std::shared_ptr table) { + ac::TableSourceNodeOptions table_source_node_options(table); + ac::Declaration table_source("table_source", std::move(table_source_node_options)); + auto aggregate_options = ac::AggregateNodeOptions{ + /* .aggregates = */ {}, + /* .keys = */ { "data_owner_code", "line_planning_number", "journey_number" }, + }; + ac::Declaration aggregate("aggregate", { std::move(table_source) }, std::move(aggregate_options)); + + std::shared_ptr result; + ARROW_ASSIGN_OR_RAISE(result, ac::DeclarationToTable(std::move(aggregate))); + + std::shared_ptr data_owner_codes = result->GetColumnByName("data_owner_code"); + std::shared_ptr line_planning_numbers = result->GetColumnByName("line_planning_number"); + std::shared_ptr journey_numbers = result->GetColumnByName("journey_number"); + + int i_data_owner_codes_chunk = 0; + int i_journey_numbers_chunk = 0; + int i_line_planning_numbers_chunk = 0; + int i_in_data_owner_codes_chunk = 0; + int i_in_journey_numbers_chunk = 0; + int i_in_line_planning_numbers_chunk = 0; + + BasicJourneyKeySet journeys; + + for (int64_t i = 0; i < result->num_rows(); i++) { + auto data_owner_codes_chunk = std::static_pointer_cast(data_owner_codes->chunk(i_data_owner_codes_chunk)); + auto line_planning_numbers_chunk = std::static_pointer_cast(line_planning_numbers->chunk(i_line_planning_numbers_chunk)); + auto journey_numbers_chunk = std::static_pointer_cast(journey_numbers->chunk(i_journey_numbers_chunk)); + + std::string_view data_owner_code = data_owner_codes_chunk->Value(i_in_data_owner_codes_chunk); + std::string_view line_planning_number = line_planning_numbers_chunk->Value(i_in_line_planning_numbers_chunk); + uint32_t journey_number = journey_numbers_chunk->Value(i_in_journey_numbers_chunk); + + journeys.emplace( + std::string(data_owner_code), + std::string(line_planning_number), + journey_number + ); + + i_in_data_owner_codes_chunk++; + i_in_line_planning_numbers_chunk++; + i_in_journey_numbers_chunk++; + if (i_in_data_owner_codes_chunk >= data_owner_codes_chunk->length()) { + i_data_owner_codes_chunk++; + i_in_data_owner_codes_chunk = 0; + } + if (i_in_line_planning_numbers_chunk >= line_planning_numbers_chunk->length()) { + i_line_planning_numbers_chunk++; + i_in_line_planning_numbers_chunk = 0; + } + if (i_in_journey_numbers_chunk >= journey_numbers_chunk->length()) { + i_journey_numbers_chunk++; + i_in_journey_numbers_chunk = 0; + } + } + + return journeys; +} + +struct DistanceKey { + BasicJourneyKey journey; + std::string last_passed_user_stop_code; + + auto operator<=>(const DistanceKey &) const = default; +}; + +size_t hash_value(const DistanceKey &k) { + size_t seed = 0; + + boost::hash_combine(seed, k.journey); + boost::hash_combine(seed, k.last_passed_user_stop_code); + + return seed; +} + +struct DistanceTimingLink { + const Kv1JourneyPatternTimingLink *jopatili; + double distance_since_start_of_journey = 0; // at the start of the link +}; + +using DistanceMap = std::unordered_map>; + +// Returns a map, where +// DataOwnerCode + LinePlanningNumber + JourneyNumber + UserStopCode -> +// Distance of Last User Stop +DistanceMap makeDistanceMap(Kv1Records &records, Kv1Index &index, BasicJourneyKeySet &journeys) { + std::unordered_map< + Kv1JourneyPattern::Key, + std::vector, + boost::hash> jopatili_index; + std::unordered_map< + BasicJourneyKey, + const Kv1PublicJourney *, + boost::hash> journey_index; + for (size_t i = 0; i < records.public_journeys.size(); i++) { + const Kv1PublicJourney *pujo = &records.public_journeys[i]; + + BasicJourneyKey journey_key( + pujo->key.data_owner_code, + pujo->key.line_planning_number, + pujo->key.journey_number); + + if (journeys.contains(journey_key)) { + journey_index[journey_key] = pujo; + + Kv1JourneyPattern::Key jopa_key( + pujo->key.data_owner_code, + pujo->key.line_planning_number, + pujo->journey_pattern_code); + jopatili_index[jopa_key] = {}; + } + } + + for (size_t i = 0; i < records.journey_pattern_timing_links.size(); i++) { + const Kv1JourneyPatternTimingLink *jopatili = &records.journey_pattern_timing_links[i]; + Kv1JourneyPattern::Key jopa_key( + jopatili->key.data_owner_code, + jopatili->key.line_planning_number, + jopatili->key.journey_pattern_code); + if (jopatili_index.contains(jopa_key)) { + jopatili_index[jopa_key].push_back(DistanceTimingLink(jopatili, 0)); + } + } + + for (auto &[jopa_key, timing_links] : jopatili_index) { + std::sort(timing_links.begin(), timing_links.end(), [](auto a, auto b) { + return a.jopatili->key.timing_link_order < b.jopatili->key.timing_link_order; + }); + + const std::string transport_type = index.journey_patterns[jopa_key]->p_line->transport_type; + + for (size_t i = 1; i < timing_links.size(); i++) { + DistanceTimingLink *timing_link = &timing_links[i]; + DistanceTimingLink *prev_timing_link = &timing_links[i - 1]; + + const Kv1Link::Key link_key( + prev_timing_link->jopatili->key.data_owner_code, + prev_timing_link->jopatili->user_stop_code_begin, + prev_timing_link->jopatili->user_stop_code_end, + transport_type); + double link_distance = index.links[link_key]->distance; + timing_link->distance_since_start_of_journey = + prev_timing_link->distance_since_start_of_journey + link_distance; + } + } + + // DataOwnerCode + LinePlanningNumber + JourneyNumber + UserStopCode -> + // Distance of Last User Stop + DistanceMap distance_map; + + for (const auto &journey : journeys) { + const Kv1PublicJourney *pujo = journey_index[journey]; + if (pujo == nullptr) { + std::cerr << "Warning: No PUJO found for [" << journey.data_owner_code << "] " + << journey.line_planning_number << "/" << journey.journey_number << std::endl; + continue; + } + Kv1JourneyPattern::Key jopa_key( + pujo->key.data_owner_code, + pujo->key.line_planning_number, + pujo->journey_pattern_code); + for (const auto &timing_link : jopatili_index[jopa_key]) { + DistanceKey key(journey, timing_link.jopatili->user_stop_code_begin); + distance_map[key] = timing_link.distance_since_start_of_journey; + } + } + + return distance_map; +} + +arrow::Result> augment( + std::shared_ptr table, + const DistanceMap &distance_map +) { + for (int i = 0; i < table->num_columns(); i++) { + if (table->column(i)->num_chunks() > 1) { + std::stringstream ss; + ss << "Error: Expected column " << i + << " (" << table->ColumnNames()[i] << ") to have 1 chunk, got " + << table->column(i)->num_chunks(); + return arrow::Status::Invalid(ss.str()); + } + } + + auto data_owner_codes = std::static_pointer_cast(table->GetColumnByName("data_owner_code")->chunk(0)); + auto line_planning_numbers = std::static_pointer_cast(table->GetColumnByName("line_planning_number")->chunk(0)); + auto journey_numbers = std::static_pointer_cast(table->GetColumnByName("journey_number")->chunk(0)); + auto user_stop_codes = std::static_pointer_cast(table->GetColumnByName("user_stop_code")->chunk(0)); + auto distance_since_last_user_stops = std::static_pointer_cast(table->GetColumnByName("distance_since_last_user_stop")->chunk(0)); + auto timestamps = std::static_pointer_cast(table->GetColumnByName("timestamp")->chunk(0)); + + auto timestamps_type = table->schema()->GetFieldByName("timestamp")->type(); + if (timestamps_type->id() != arrow::Type::TIMESTAMP) + return arrow::Status::Invalid("Field 'timestamp' does not have expected type TIMESTAMP"); + if (std::static_pointer_cast(timestamps_type)->unit() != arrow::TimeUnit::MILLI) + return arrow::Status::Invalid("Field 'timestamp' does not have unit MILLI"); + if (!std::static_pointer_cast(timestamps_type)->timezone().empty()) + return arrow::Status::Invalid("Field 'timestamp' should have empty time zone name"); + + std::shared_ptr field_distance_since_start_of_journey = + arrow::field("distance_since_start_of_journey", arrow::uint32()); + std::shared_ptr field_day_of_week = + arrow::field("timestamp_iso_day_of_week", arrow::int64()); + std::shared_ptr field_date = + arrow::field("timestamp_date", arrow::date32()); + std::shared_ptr field_local_time = + arrow::field("timestamp_local_time", arrow::time32(arrow::TimeUnit::SECOND)); + arrow::UInt32Builder distance_since_start_of_journey_builder; + arrow::Int64Builder day_of_week_builder; + arrow::Date32Builder date_builder; + arrow::Time32Builder local_time_builder(arrow::time32(arrow::TimeUnit::SECOND), arrow::default_memory_pool()); + + const std::chrono::time_zone *amsterdam = std::chrono::locate_zone("Europe/Amsterdam"); + + for (int64_t i = 0; i < table->num_rows(); i++) { + DistanceKey key( + BasicJourneyKey( + std::string(data_owner_codes->Value(i)), + std::string(line_planning_numbers->Value(i)), + journey_numbers->Value(i)), + std::string(user_stop_codes->Value(i))); + + uint32_t distance_since_last_user_stop = distance_since_last_user_stops->Value(i); + if (distance_map.contains(key)) { + uint32_t total_distance = distance_since_last_user_stop + static_cast(distance_map.at(key)); + ARROW_RETURN_NOT_OK(distance_since_start_of_journey_builder.Append(total_distance)); + } else { + ARROW_RETURN_NOT_OK(distance_since_start_of_journey_builder.AppendNull()); + } + + // Welp, this has gotten a bit complicated! + std::chrono::sys_seconds timestamp(std::chrono::floor(std::chrono::milliseconds(timestamps->Value(i)))); + std::chrono::zoned_seconds zoned_timestamp(amsterdam, timestamp); + std::chrono::local_seconds local_timestamp(zoned_timestamp); + std::chrono::local_days local_date = std::chrono::floor(local_timestamp); + std::chrono::year_month_day date(local_date); + std::chrono::weekday day_of_week(local_date); + std::chrono::hh_mm_ss time(local_timestamp - local_date); + std::chrono::sys_days unix_date(date); + + int64_t iso_day_of_week = day_of_week.iso_encoding(); + int32_t unix_days = static_cast(unix_date.time_since_epoch().count()); + int32_t secs_since_midnight = static_cast(std::chrono::seconds(time).count()); + + ARROW_RETURN_NOT_OK(day_of_week_builder.Append(iso_day_of_week)); + ARROW_RETURN_NOT_OK(date_builder.Append(unix_days)); + ARROW_RETURN_NOT_OK(local_time_builder.Append(secs_since_midnight)); + } + + ARROW_ASSIGN_OR_RAISE(auto distance_since_start_of_journey_col_chunk, distance_since_start_of_journey_builder.Finish()); + ARROW_ASSIGN_OR_RAISE(auto day_of_week_col_chunk, day_of_week_builder.Finish()); + ARROW_ASSIGN_OR_RAISE(auto date_col_chunk, date_builder.Finish()); + ARROW_ASSIGN_OR_RAISE(auto local_time_col_chunk, local_time_builder.Finish()); + auto distance_since_start_of_journey_col = + std::make_shared(distance_since_start_of_journey_col_chunk); + auto day_of_week_col = std::make_shared(day_of_week_col_chunk); + auto date_col = std::make_shared(date_col_chunk); + auto local_time_col = std::make_shared(local_time_col_chunk); + + ARROW_ASSIGN_OR_RAISE(table, table->AddColumn( + table->num_columns(), + field_distance_since_start_of_journey, + distance_since_start_of_journey_col)); + ARROW_ASSIGN_OR_RAISE(table, table->AddColumn(table->num_columns(), field_day_of_week, day_of_week_col)); + ARROW_ASSIGN_OR_RAISE(table, table->AddColumn(table->num_columns(), field_date, date_col)); + ARROW_ASSIGN_OR_RAISE(table, table->AddColumn(table->num_columns(), field_local_time, local_time_col)); + + return table; +} + +arrow::Status processTables(Kv1Records &records, Kv1Index &index) { + std::shared_ptr input; + ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open("oeuf-input.parquet")); + + std::unique_ptr arrow_reader; + ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, arrow::default_memory_pool(), &arrow_reader)); + + std::shared_ptr table; + ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table)); + + std::cerr << "Input KV6 file has " << table->num_rows() << " rows" << std::endl; + ARROW_ASSIGN_OR_RAISE(BasicJourneyKeySet journeys, basicJourneys(table)); + std::cerr << "Found " << journeys.size() << " distinct journeys" << std::endl; + DistanceMap distance_map = makeDistanceMap(records, index, journeys); + std::cerr << "Distance map has " << distance_map.size() << " keys" << std::endl; + + std::cerr << "Creating augmented table" << std::endl; + ARROW_ASSIGN_OR_RAISE(std::shared_ptr augmented, augment(table, distance_map)); + + std::cerr << "Writing augmented table" << std::endl; + return writeArrowTableAsParquetFile(*augmented, "oeuf-augmented.parquet"); +} + +int main(int argc, char *argv[]) { + Kv1Records records; + if (!parse(records)) { + fputs("Error parsing records, exiting\n", stderr); + return EXIT_FAILURE; + } + printParsedRecords(records); + fputs("Indexing...\n", stderr); + Kv1Index index(&records); + fprintf(stderr, "Indexed %lu records\n", index.size()); + // Only notice assignments are not indexed. If this equality is not valid, + // then this means that we had duplicate keys or that something else went + // wrong. That would really not be great. + assert(index.size() == records.size() - records.notice_assignments.size()); + printIndexSize(index); + fputs("Linking records...\n", stderr); + kv1LinkRecords(index); + fputs("Done linking\n", stderr); + + arrow::Status st = processTables(records, index); + if (!st.ok()) { + std::cerr << "Failed to process tables: " << st << std::endl; + return EXIT_FAILURE; + } +} diff --git a/src/bundleparquet/.envrc b/src/bundleparquet/.envrc new file mode 100644 index 0000000..694e74f --- /dev/null +++ b/src/bundleparquet/.envrc @@ -0,0 +1,2 @@ +source_env ../../ +export DEVMODE=1 diff --git a/src/bundleparquet/Makefile b/src/bundleparquet/Makefile new file mode 100644 index 0000000..170304d --- /dev/null +++ b/src/bundleparquet/Makefile @@ -0,0 +1,21 @@ +# Taken from: +# Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide +# for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01, +# 2023. [Online]. Available: +# https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html +CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\ + -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \ + -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \ + -D_GLIBCXX_ASSERTIONS \ + -fstrict-flex-arrays=3 \ + -fstack-clash-protection -fstack-protector-strong +LDFLAGS=-larrow -lcurl -lparquet -lprometheus-cpp-push -lprometheus-cpp-core -lz -ltmi8 -Wl,-z,defs \ + -Wl,-z,nodlopen -Wl,-z,noexecstack \ + -Wl,-z,relro -Wl,-z,now + +bundleparquet: main.cpp spliturl.cpp + $(CXX) -fPIE -pie -o $@ $^ $(CXXFLAGS) $(LDFLAGS) + +.PHONY: clean +clean: + rm bundleparquet diff --git a/src/bundleparquet/main.cpp b/src/bundleparquet/main.cpp new file mode 100644 index 0000000..05fd881 --- /dev/null +++ b/src/bundleparquet/main.cpp @@ -0,0 +1,213 @@ +// vim:set sw=2 ts=2 sts et: + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include +#include +#include + +#include + +#include "spliturl.hpp" + +static const int MIN_COMBINED_ROWS = 1000000; // one million +static const int MAX_COMBINED_ROWS = 2000000; // two million + +struct FileMetadata { + int64_t min_timestamp = 0; + int64_t max_timestamp = 0; + int64_t rows_written = 0; +}; + +struct File { + FileMetadata metadata; + std::filesystem::path filename; +}; + +FileMetadata readMetadataOf(std::filesystem::path filename) { + std::string meta_filename = std::string(filename) + ".meta.json"; + std::ifstream meta_file = std::ifstream(meta_filename, std::ifstream::in|std::ifstream::binary); + nlohmann::json meta_json; + meta_file >> meta_json; + FileMetadata meta = { + .min_timestamp = meta_json["min_timestamp"], + .max_timestamp = meta_json["max_timestamp"], + .rows_written = meta_json["rows_written"], + }; + return meta; +} + +arrow::Status processFirstTables(std::deque &files, prometheus::Counter &rows_written) { + if (files.size() == 0) { + std::cerr << "Did not find any files" << std::endl; + return arrow::Status::OK(); + } + + int64_t rows = 0; + + std::vector> tables; + std::vector processed; + int64_t min_timestamp = std::numeric_limits::max(); + int64_t max_timestamp = 0; + + bool over_capacity_risk = false; + auto it = files.begin(); + while (it != files.end()) { + const std::filesystem::path &filename = it->filename; + const FileMetadata &metadata = it->metadata; + + std::shared_ptr input; + ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open(filename)); + + std::unique_ptr arrow_reader; + ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, arrow::default_memory_pool(), &arrow_reader)); + + if (metadata.min_timestamp < min_timestamp) + min_timestamp = metadata.min_timestamp; + if (metadata.max_timestamp > max_timestamp) + max_timestamp = metadata.max_timestamp; + + if (rows + metadata.rows_written > MAX_COMBINED_ROWS) { + over_capacity_risk = true; + break; + } + + std::shared_ptr table; + ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table)); + tables.push_back(table); + processed.push_back(filename); + rows += metadata.rows_written; + it = files.erase(it); + } + + if (rows < MIN_COMBINED_ROWS && !over_capacity_risk) { + std::cerr << "Found files, but not enough to satisfy the minimum amount of rows for the combined file" << std::endl; + std::cerr << "(We have " << rows << "/" << MIN_COMBINED_ROWS << " rows at the moment, so " + << static_cast(rows)/static_cast(MIN_COMBINED_ROWS)*100.f << "%)" << std::endl; + return arrow::Status::OK(); + } else if (rows == 0 && over_capacity_risk) { + const std::filesystem::path &filename = files.front().filename; + std::filesystem::rename(filename, "merged" / filename); + std::filesystem::rename(std::string(filename) + ".meta.json", std::string("merged" / filename) + ".meta.json"); + rows_written.Increment(static_cast(files.front().metadata.rows_written)); + files.pop_front(); + return arrow::Status::OK(); + } + + // Default options specify that the schemas are not unified, which is + // luckliy exactly what we want :) + std::shared_ptr merged_table; + ARROW_ASSIGN_OR_RAISE(merged_table, arrow::ConcatenateTables(tables)); + + auto timestamp = std::chrono::round(std::chrono::system_clock::now()); + std::string filename = std::format("merged/oeuf-{:%FT%T%Ez}.parquet", timestamp); + ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*merged_table, filename)); + + std::cerr << "Wrote merged table to " << filename << std::endl; + + std::ofstream metaf(filename + ".meta.json.part", std::ios::binary); + nlohmann::json meta{ + { "min_timestamp", min_timestamp }, + { "max_timestamp", max_timestamp }, + { "rows_written", rows }, + }; + metaf << meta; + metaf.close(); + std::filesystem::rename(filename + ".meta.json.part", filename + ".meta.json"); + + std::cerr << "Wrote merged table metadata" << std::endl; + rows_written.Increment(static_cast(rows)); + + for (const std::filesystem::path &filename : processed) { + std::filesystem::remove(filename); + std::filesystem::remove(std::string(filename) + ".meta.json"); + } + + std::cerr << "Successfully wrote merged table, metadata and deleted old files" << std::endl; + + return arrow::Status::OK(); +} + +arrow::Status processTables(std::deque &files, prometheus::Counter &rows_written) { + while (!files.empty()) + ARROW_RETURN_NOT_OK(processFirstTables(files, rows_written)); + return arrow::Status::OK(); +} + +int main(int argc, char *argv[]) { + std::filesystem::path cwd = std::filesystem::current_path(); + std::filesystem::create_directory(cwd / "merged"); + + const char *prom_push_url = getenv("PROMETHEUS_PUSH_URL"); + if (!prom_push_url || strlen(prom_push_url) == 0) { + std::cerr << "Error: no PROMETHEUS_PUSH_URL set!" << std::endl; + return EXIT_FAILURE; + } + + std::string split_err; + auto split_prom_push_url = splitUrl(prom_push_url, &split_err); + if (!split_prom_push_url) { + std::cerr << "Could not process URL in environment variable PROMETHEUS_PUSH_URL: " + << split_err << std::endl; + return EXIT_FAILURE; + } + std::cout << "Prometheus Push URL: " << split_prom_push_url->schemehost << ":" + << split_prom_push_url->portpath << std::endl; + + prometheus::Gateway gateway{split_prom_push_url->schemehost, + split_prom_push_url->portpath, + "oeuf-archiver"}; + + auto registry = std::make_shared(); + prometheus::Gauge &rows_available = prometheus::BuildGauge() + .Name("archiver_rows_available") + .Help("Number of rows available to the archiver") + .Register(*registry) + .Add({}); + prometheus::Counter &rows_written = prometheus::BuildCounter() + .Name("archiver_rows_written") + .Help("Number of rows written by the archiver") + .Register(*registry) + .Add({}); + gateway.RegisterCollectable(registry); + + std::deque files; + for (auto const &dir_entry : std::filesystem::directory_iterator{cwd}) { + if (!dir_entry.is_regular_file()) continue; + std::filesystem::path filename = dir_entry.path().filename(); + const std::string &filename_str = filename; + if (filename_str.starts_with("oeuf-") && filename_str.ends_with("+00:00.parquet")) { + try { + FileMetadata meta = readMetadataOf(filename); + File file = { .metadata = meta, .filename = filename }; + files.push_back(file); + + rows_available.Increment(static_cast(meta.rows_written)); + } catch (const std::exception &e) { + std::cerr << "Failed to read metadata of file " << filename << ": " << e.what() << std::endl; + return EXIT_FAILURE; + } + } + } + + std::sort(files.begin(), files.end(), + [](const File &f1, const File &f2) { return f1.filename < f2.filename; }); + arrow::Status st = processTables(files, rows_written); + if (!st.ok()) { + std::cerr << "Failed to process tables: " << st << std::endl; + return EXIT_FAILURE; + } + + gateway.Push(); +} diff --git a/src/bundleparquet/spliturl.cpp b/src/bundleparquet/spliturl.cpp new file mode 100644 index 0000000..90fd821 --- /dev/null +++ b/src/bundleparquet/spliturl.cpp @@ -0,0 +1,203 @@ +// vim:set sw=2 ts=2 sts et: + +#include +#include +#include +#include +#include + +#include + +#include "spliturl.hpp" + +// splitUrl takes a URL of the shape '[http[s]://]HOST[:PORT][/PATH]', and +// splits it into two URLs: +// - scheme + host -> '[http[s]://]HOST' +// - port + path -> '[PORT][/PATH]' +// In case an IPv6 address is provided, the host must enclosed in square +// brackets. The zone ID may also be indicated. Note that in the resulting +// parts, the colon preceding the port number is omitted. This is on purpose. +std::optional splitUrl(const std::string &url, std::string *error) { + std::stringstream errs; + std::optional result; + char *processed = nullptr; + char *scheme = nullptr; + char *user = nullptr; + char *password = nullptr; + char *zoneid = nullptr; + char *query = nullptr; + char *fragment = nullptr; + CURLU *schemehost = nullptr; + char *schemehost_url = nullptr; + char *portpath_url = nullptr; + + // Parse the URL, allowing the user to omit the scheme. CURL will use 'https' + // by default if no scheme is specified. + + CURLU *parsed = curl_url(); + CURLUcode rc = curl_url_set(parsed, CURLUPART_URL, url.c_str(), CURLU_DEFAULT_SCHEME); + if (rc != CURLUE_OK) { + errs << "Failed to parse URL: " << curl_url_strerror(rc); + goto Exit; + } + + // As we parse the URL with the option CURLU_DEFAULT_SCHEME, the CURL API + // won't require the user to provide the scheme part of the URL. It will + // automatically default the scheme to https. However, we do not usually want + // it to default to HTTPS, but HTTP instead (as the use case, connecting to a + // PushGateway server, usually is served over a private network via HTTP). + // + // This is why we check if the scheme was put there by CURL and otherwise set + // it to HTTP. We also check for any other schemes that the user may have + // provided, and reject anything that is not http/https. + if (!url.starts_with("http://") && !url.starts_with("https://")) { + rc = curl_url_get(parsed, CURLUPART_SCHEME, &scheme, 0); + if (rc != CURLUE_OK) { + errs << "Could not get scheme from parsed URL: " << curl_url_strerror(rc); + goto Exit; + } + if (strcmp(scheme, "https")) { + errs << "Unexpected scheme" << scheme << "in provided URL (expected http or https)"; + goto Exit; + } + rc = curl_url_set(parsed, CURLUPART_SCHEME, "http", 0); + if (rc != CURLUE_OK) { + errs << "Could not set URL scheme to http: " << curl_url_strerror(rc); + goto Exit; + } + } + + // Turn the parsed URL back into a string. + rc = curl_url_get(parsed, CURLUPART_URL, &processed, 0); + if (rc != CURLUE_OK) { + errs << "Failed to output parsed URL: " << curl_url_strerror(rc); + goto Exit; + } + + // This part of the code checks if no prohibited parts are present in the URL + // (basic auth: (user, password), query, fragment). + + rc = curl_url_get(parsed, CURLUPART_USER, &user, 0); + if (rc == CURLUE_OK && strlen(user) != 0) { + errs << "Provided URL should not contain a user part"; + goto Exit; + } else if (rc != CURLUE_NO_USER && rc != CURLUE_OK) { + errs << "Failed to get check user part existence in provided url: " << curl_url_strerror(rc); + goto Exit; + } + + rc = curl_url_get(parsed, CURLUPART_PASSWORD, &password, 0); + if (rc == CURLUE_OK && strlen(password) != 0) { + errs << "Provided URL should not contain a password part"; + goto Exit; + } else if (rc != CURLUE_NO_PASSWORD && rc != CURLUE_OK) { + errs << "Failed to get check password part existence in provided url: " << curl_url_strerror(rc); + goto Exit; + } + + rc = curl_url_get(parsed, CURLUPART_QUERY, &query, 0); + if (rc == CURLUE_OK && strlen(query) != 0) { + errs << "Provided URL should not contain a query part"; + goto Exit; + } else if (rc != CURLUE_NO_QUERY && rc != CURLUE_OK) { + errs << "Failed to get check query part existence in provided url: " << curl_url_strerror(rc); + goto Exit; + } + + rc = curl_url_get(parsed, CURLUPART_FRAGMENT, &fragment, 0); + if (rc == CURLUE_OK && strlen(fragment) != 0) { + errs << "Provided URL should not contain a fragment part"; + goto Exit; + } else if (rc != CURLUE_NO_FRAGMENT && rc != CURLUE_OK) { + errs << "Failed to get check fragment part existence in provided url: " << curl_url_strerror(rc); + goto Exit; + } + + // Now that we know that the provided URL makes sense, we can start doing + // some arts and crafts. We get started by copying the parsed URL into + // schemehost and simply delete all parts which are not scheme + host. + + schemehost = curl_url_dup(parsed); + + // CURL BUG WORKAROUND: CURLUPART_ZONEID is NOT copied by curl_url_dup! + // ^ fixed in CURL 8.3.0 after https://curl.se/mail/lib-2023-07/0047.html + rc = curl_url_get(parsed, CURLUPART_ZONEID, &zoneid, 0); + if (rc == CURLUE_OK) { + rc = curl_url_set(schemehost, CURLUPART_ZONEID, zoneid, 0); + if (rc != CURLUE_OK) { + errs << "Could not copy zone ID to duplicated URL: " << curl_url_strerror(rc); + goto Exit; + } + } + rc = curl_url_set(schemehost, CURLUPART_PORT, nullptr, 0); + if (rc != CURLUE_OK) { + errs << "Could not unset port in duplicated URL: " << curl_url_strerror(rc); + goto Exit; + } + rc = curl_url_set(schemehost, CURLUPART_PATH, nullptr, 0); + if (rc != CURLUE_OK) { + errs << "Could not unset path in duplicated URL: " << curl_url_strerror(rc); + goto Exit; + } + + // Okay, now we have the schemehost CURLU all ready to go. Note that a URL + // only consisting of a scheme and host is considered valid, so CURL will be + // more than happy to actually turn it into a string for us. Which is exactly + // what we do here :) + + rc = curl_url_get(schemehost, CURLUPART_URL, &schemehost_url, 0); + if (rc != CURLUE_OK) { + errs << "Could not get scheme + host URL: " << curl_url_strerror(rc); + goto Exit; + } + + // Remove any trailing slash after the scheme + host URL that CURL might have + // put there -- we still want to get a valid URL if we paste the port + path + // part behind it. + + if (strlen(schemehost_url) > 0) { + if (schemehost_url[strlen(schemehost_url) - 1] != '/') { + errs << "Scheme + host URL does not end with a slash"; + goto Exit; + } + schemehost_url[strlen(schemehost_url) - 1] = '\0'; + } + + // Look, this is really gross. Because the port + path part of the URL is not + // a valid URL itself, but the scheme + host should be a prefix of the full + // URL containing the port + path, we can simply check if it is indeed a + // prefix, and then strip it from the full URL, giving us the port + path + // (after deleting the colon preceding the port). + + if (!std::string_view(processed).starts_with(schemehost_url)) { + errs << "Scheme + host URL is not a prefix of the processed URL"; + goto Exit; + } + + portpath_url = processed + strlen(schemehost_url); + // We should not have the colon before the port, prometheus-cpp inserts it + if (strlen(portpath_url) > 0 && portpath_url[0] == ':') portpath_url++; + // We do not need a trailing slash + if (strlen(portpath_url) > 0 && portpath_url[strlen(portpath_url)-1] == '/') + portpath_url[strlen(portpath_url)-1] = '\0'; + + // It has been done. BLECH + result = std::make_optional(schemehost_url, portpath_url); + +Exit: + curl_free(processed); + curl_free(scheme); + curl_free(user); + curl_free(password); + curl_free(query); + curl_free(fragment); + curl_free(zoneid); + curl_free(schemehost_url); + curl_url_cleanup(schemehost); + curl_url_cleanup(parsed); + + if (!result && error) + *error = errs.str(); + + return result; +} diff --git a/src/bundleparquet/spliturl.hpp b/src/bundleparquet/spliturl.hpp new file mode 100644 index 0000000..d8150e0 --- /dev/null +++ b/src/bundleparquet/spliturl.hpp @@ -0,0 +1,11 @@ +// vim:set sw=2 ts=2 sts et: + +#include +#include + +struct SplitUrl { + std::string schemehost; + std::string portpath; +}; + +std::optional splitUrl(const std::string &url, std::string *error = nullptr); diff --git a/src/filterkv6/.envrc b/src/filterkv6/.envrc new file mode 100644 index 0000000..694e74f --- /dev/null +++ b/src/filterkv6/.envrc @@ -0,0 +1,2 @@ +source_env ../../ +export DEVMODE=1 diff --git a/src/filterkv6/Makefile b/src/filterkv6/Makefile new file mode 100644 index 0000000..13bb38e --- /dev/null +++ b/src/filterkv6/Makefile @@ -0,0 +1,21 @@ +# Taken from: +# Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide +# for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01, +# 2023. [Online]. Available: +# https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html +CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\ + -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \ + -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \ + -D_GLIBCXX_ASSERTIONS \ + -fstrict-flex-arrays=3 \ + -fstack-clash-protection -fstack-protector-strong +LDFLAGS=-larrow -larrow_dataset -lparquet -ltmi8 -Wl,-z,defs \ + -Wl,-z,nodlopen -Wl,-z,noexecstack \ + -Wl,-z,relro -Wl,-z,now + +filterkv6: main.cpp + $(CXX) -fPIE -pie -o $@ $^ $(CXXFLAGS) $(LDFLAGS) + +.PHONY: clean +clean: + rm filterkv6 diff --git a/src/filterkv6/main.cpp b/src/filterkv6/main.cpp new file mode 100644 index 0000000..a32220a --- /dev/null +++ b/src/filterkv6/main.cpp @@ -0,0 +1,106 @@ +// vim:set sw=2 ts=2 sts et: + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + +namespace ds = arrow::dataset; +namespace cp = arrow::compute; +using namespace arrow; + +arrow::Status processTables(std::string lineno) { + auto filesystem = std::make_shared(); + + fs::FileSelector selector; + selector.base_dir = std::filesystem::current_path(); + selector.recursive = false; + + auto format = std::static_pointer_cast(std::make_shared()); + + ARROW_ASSIGN_OR_RAISE(auto factory, + ds::FileSystemDatasetFactory::Make(filesystem, selector, format, + ds::FileSystemFactoryOptions())); + + ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish()); + + printf("Scanning dataset for line %s...\n", lineno.c_str()); + // Read specified columns with a row filter + ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan()); + ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::and_({ + cp::equal(cp::field_ref("line_planning_number"), cp::literal(lineno)), + cp::is_valid(cp::field_ref("rd_x")), + cp::is_valid(cp::field_ref("rd_y")), + }))); + + ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish()); + ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable()); + + puts("Finished loading data, computing stable sort indices..."); + + arrow::Datum sort_indices; + cp::SortOptions sort_options; + sort_options.sort_keys = { cp::SortKey("timestamp" /* ascending by default */) }; + ARROW_ASSIGN_OR_RAISE(sort_indices, cp::CallFunction("sort_indices", { table }, &sort_options)); + puts("Finished computing stable sort indices, creating sorted table..."); + + arrow::Datum sorted; + ARROW_ASSIGN_OR_RAISE(sorted, cp::CallFunction("take", { table, sort_indices })); + + puts("Writing sorted table to disk..."); + ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*sorted.table(), "merged/oeuf-merged.parquet")); + puts("Syncing..."); + sync(); + puts("Done. Have a nice day."); + + return arrow::Status::OK(); +} + +#define NOTICE "Notice: This tool will fail if any non-Parquet files in are present in the\n" \ + " current working directory. It does not load files which are present in\n" \ + " any possible subdirectories." + +const char help[] = + "Usage: %s \n" + "\n" + " LINENO The LinePlanningNumber as in the KV1/KV6 data\n\n" + NOTICE "\n"; + +void exitHelp(const char *progname, int code = 1) { + printf(help, progname); + exit(code); +} + +int main(int argc, char *argv[]) { + const char *progname = argv[0]; + if (argc != 2) { + puts("Error: incorrect number of arguments provided\n"); + exitHelp(progname); + } + char *lineno = argv[1]; + puts(NOTICE "\n"); + + std::filesystem::path cwd = std::filesystem::current_path(); + std::filesystem::create_directory(cwd / "merged"); + + puts("Running this program may take a while, especially on big datasets. If you're\n" + "processing the data of a single bus line over the course of multiple months,\n" + "you may see memory usage of up to 10 GiB. Make sure that you have sufficient\n" + "RAM available, to avoid overloading and subsequently freezing your system.\n"); + + arrow::Status st = processTables(std::string(lineno)); + if (!st.ok()) { + std::cerr << "Failed to process tables: " << st << std::endl; + return EXIT_FAILURE; + } +} diff --git a/src/querykv1/.envrc b/src/querykv1/.envrc new file mode 100644 index 0000000..694e74f --- /dev/null +++ b/src/querykv1/.envrc @@ -0,0 +1,2 @@ +source_env ../../ +export DEVMODE=1 diff --git a/src/querykv1/.gitignore b/src/querykv1/.gitignore new file mode 100644 index 0000000..5761abc --- /dev/null +++ b/src/querykv1/.gitignore @@ -0,0 +1 @@ +*.o diff --git a/src/querykv1/Makefile b/src/querykv1/Makefile new file mode 100644 index 0000000..a8791f5 --- /dev/null +++ b/src/querykv1/Makefile @@ -0,0 +1,28 @@ +# Taken from: +# Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide +# for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01, +# 2023. [Online]. Available: +# https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html +CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\ + -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \ + -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \ + -D_GLIBCXX_ASSERTIONS \ + -fstrict-flex-arrays=3 \ + -fstack-clash-protection -fstack-protector-strong +LDFLAGS=-ltmi8 -Wl,-z,defs \ + -Wl,-z,nodlopen -Wl,-z,noexecstack \ + -Wl,-z,relro -Wl,-z,now + +HDRS=cliopts.hpp daterange.hpp joparoute.hpp journeyinfo.hpp journeyroute.hpp journeys.hpp schedule.hpp +SRCS=main.cpp cliopts.cpp daterange.cpp joparoute.cpp journeyinfo.cpp journeyroute.cpp journeys.cpp schedule.cpp +OBJS=$(patsubst %.cpp,%.o,$(SRCS)) + +%.o: %.cpp $(HDRS) + $(CXX) -c -o $@ $< $(CXXFLAGS) + +querykv1: $(OBJS) + $(CXX) -fPIE -pie -o $@ $^ $(CXXFLAGS) $(LDFLAGS) + +.PHONY: clean +clean: + rm querykv1 diff --git a/src/querykv1/cliopts.cpp b/src/querykv1/cliopts.cpp new file mode 100644 index 0000000..bef7a98 --- /dev/null +++ b/src/querykv1/cliopts.cpp @@ -0,0 +1,456 @@ +// vim:set sw=2 ts=2 sts et: + +#include +#include +#include +#include + +#include + +#include "cliopts.hpp" + +using namespace std::string_view_literals; + +const char *opt_set = ""; +const char *opt_unset = nullptr; + +const char help[] = R"(Usage: %1$s [OPTIONS] + +Global Options: + --kv1 Path to file containing all KV1 data, '-' for stdin + -h, --help Print this help + +Commands: + joparoute Generate CSV for journey pattern route + journeyinfo Print some information on a journey + journeyroute Generate CSV for journey route + journeys List journeys of a specific line going from stop A to B + schedule Generate schedule +)"; + +const char joparoute_help[] = R"(Usage: %1$s joparoute --line --jopa [OPTIONS] + +Options: + --line Line planning number as in schedule + --jopa Journey pattern code as in KV1 data + -o Path of file to write to, '-' for stdout + +Global Options: + --kv1 Path to file containing all KV1 data, '-' for stdin + -h, --help Print this help +)"; + +const char journeyroute_help[] = R"(Usage: %1$s journeyroute --line [OPTIONS] + +Options: + --line Line planning number as in KV1 data + --journey Journey number as in KV1 data + -o Path of file to write to, '-' for stdout + +Global Options: + --kv1 Path to file containing all KV1 data, '-' for stdin + -h, --help Print this help +)"; + +const char journeys_help[] = R"(Usage: %1$s journeys --line --begin --end [OPTIONS] + +For the --begin and --end arguments, use the following format: + --begin/--end stop: + --begin/--end star: + +Options: + --begin User stop code/area of stop the journey should begin at + --end User stop code/area of stop the journey should end at + --line Line planning number to filter on + -o Path of file to write to, '-' for stdout + +Global Options: + --kv1 Path to file containing all KV1 data, '-' for stdin + -h, --help Print this help +)"; + +const char journeyinfo_help[] = R"(Usage: %1$s journeyinfo --line --journey [OPTIONS] + +Options: + --line Line planning number to filter on + --journey Journey number as in schedule + +Global Options: + --kv1 Path to file containing all KV1 data, '-' for stdin + -h, --help Print this help +)"; + +const char schedule_help[] = R"(Usage: %1$s schedule --line [OPTIONS] + +Options: + --line Line planning number to generate schedule for + -o Path of file to write to, '-' for stdout + +Global Options: + --kv1 Path to file containing all KV1 data, '-' for stdin + -h, --help Print this help +)"; + +void journeyRouteValidateOptions(const char *progname, Options *options) { +#define X(name, argument, long_, short_) \ + if (#name != "kv1_file_path"sv && #name != "line_planning_number"sv \ + && #name != "journey_number"sv && #name != "help"sv && #name != "output_file_path"sv) \ + if (options->name) { \ + if (long_) { \ + if (short_) fprintf(stderr, "%s: unexpected flag --%s (-%c) for journeyroute subcommand\n\n", progname, static_cast(long_), short_); \ + else fprintf(stderr, "%s: unexpected flag --%s for journeyroute subcommand\n\n", progname, static_cast(long_)); \ + } else if (short_) fprintf(stderr, "%s: unexpected flag -%c for journeyroute subcommand\n\n", progname, short_); \ + fprintf(stderr, journeyroute_help, progname); \ + exit(1); \ + } + LONG_OPTIONS + SHORT_OPTIONS +#undef X + + if (options->positional.size() > 0) { + fprintf(stderr, "%s: unexpected positional argument(s) for journeyroute subcommand\n\n", progname); + for (auto pos : options->positional) fprintf(stderr, "opt: %s\n", pos); + fprintf(stderr, journeyroute_help, progname); + exit(1); + } + + if (!options->kv1_file_path) + options->kv1_file_path = "-"; + if (!options->output_file_path) + options->output_file_path = "-"; + if (options->kv1_file_path == ""sv) { + fprintf(stderr, "%s: KV1 file path cannot be empty\n\n", progname); + fprintf(stderr, journeyroute_help, progname); + exit(1); + } + if (options->output_file_path == ""sv) { + fprintf(stderr, "%s: output file path cannot be empty\n\n", progname); + fprintf(stderr, journeyroute_help, progname); + exit(1); + } + if (!options->journey_number || options->journey_number == ""sv) { + fprintf(stderr, "%s: journey number must be provided\n\n", progname); + fprintf(stderr, journeyroute_help, progname); + exit(1); + } + if (!options->line_planning_number || options->line_planning_number == ""sv) { + fprintf(stderr, "%s: line planning number must be provided\n\n", progname); + fprintf(stderr, journeyroute_help, progname); + exit(1); + } +} + +void scheduleValidateOptions(const char *progname, Options *options) { +#define X(name, argument, long_, short_) \ + if (#name != "kv1_file_path"sv && #name != "help"sv \ + && #name != "line_planning_number"sv && #name != "output_file_path"sv) \ + if (options->name) { \ + if (long_) { \ + if (short_) fprintf(stderr, "%s: unexpected flag --%s (-%c) for schedule subcommand\n\n", progname, static_cast(long_), short_); \ + else fprintf(stderr, "%s: unexpected flag --%s for schedule subcommand\n\n", progname, static_cast(long_)); \ + } else if (short_) fprintf(stderr, "%s: unexpected flag -%c for schedule subcommand\n\n", progname, short_); \ + fprintf(stderr, schedule_help, progname); \ + exit(1); \ + } + LONG_OPTIONS + SHORT_OPTIONS +#undef X + + if (options->positional.size() > 0) { + fprintf(stderr, "%s: unexpected positional argument(s) for schedule subcommand\n\n", progname); + for (auto pos : options->positional) fprintf(stderr, "opt: %s\n", pos); + fprintf(stderr, schedule_help, progname); + exit(1); + } + + if (!options->kv1_file_path) + options->kv1_file_path = "-"; + if (!options->output_file_path) + options->output_file_path = "-"; + if (options->kv1_file_path == ""sv) { + fprintf(stderr, "%s: KV1 file path cannot be empty\n\n", progname); + fprintf(stderr, schedule_help, progname); + exit(1); + } + if (options->output_file_path == ""sv) { + fprintf(stderr, "%s: output file path cannot be empty\n\n", progname); + fprintf(stderr, schedule_help, progname); + exit(1); + } + if (!options->line_planning_number || options->line_planning_number == ""sv) { + fprintf(stderr, "%s: line planning number must be provided\n\n", progname); + fprintf(stderr, schedule_help, progname); + exit(1); + } +} + +void journeysValidateOptions(const char *progname, Options *options) { +#define X(name, argument, long_, short_) \ + if (#name != "kv1_file_path"sv && #name != "help"sv \ + && #name != "line_planning_number"sv && #name != "output_file_path"sv \ + && #name != "begin_stop_code"sv && #name != "end_stop_code"sv) \ + if (options->name) { \ + if (long_) { \ + if (short_) fprintf(stderr, "%s: unexpected flag --%s (-%c) for journeys subcommand\n\n", progname, static_cast(long_), short_); \ + else fprintf(stderr, "%s: unexpected flag --%s for journeys subcommand\n\n", progname, static_cast(long_)); \ + } else if (short_) fprintf(stderr, "%s: unexpected flag -%c for journeys subcommand\n\n", progname, short_); \ + fprintf(stderr, journeys_help, progname); \ + exit(1); \ + } + LONG_OPTIONS + SHORT_OPTIONS +#undef X + + if (options->positional.size() > 0) { + fprintf(stderr, "%s: unexpected positional argument(s) for journeys subcommand\n\n", progname); + for (auto pos : options->positional) fprintf(stderr, "opt: %s\n", pos); + fprintf(stderr, journeys_help, progname); + exit(1); + } + + if (!options->kv1_file_path) + options->kv1_file_path = "-"; + if (!options->output_file_path) + options->output_file_path = "-"; + if (options->kv1_file_path == ""sv) { + fprintf(stderr, "%s: KV1 file path cannot be empty\n\n", progname); + fprintf(stderr, journeys_help, progname); + exit(1); + } + if (options->output_file_path == ""sv) { + fprintf(stderr, "%s: output file path cannot be empty\n\n", progname); + fprintf(stderr, journeys_help, progname); + exit(1); + } + if (!options->line_planning_number || options->line_planning_number == ""sv) { + fprintf(stderr, "%s: line planning number must be provided\n\n", progname); + fprintf(stderr, journeys_help, progname); + exit(1); + } + if (!options->begin_stop_code || options->begin_stop_code == ""sv) { + fprintf(stderr, "%s: start user stop code must be provided\n\n", progname); + fprintf(stderr, journeys_help, progname); + exit(1); + } + if (!options->end_stop_code || options->end_stop_code == ""sv) { + fprintf(stderr, "%s: end user stop code must be provided\n\n", progname); + fprintf(stderr, journeys_help, progname); + exit(1); + } + if (!std::string_view(options->begin_stop_code).starts_with("star:") + && !std::string_view(options->begin_stop_code).starts_with("stop:")) { + fprintf(stderr, "%s: begin user stop code must be prefixed with star:/stop:\n\n", progname); + fprintf(stderr, journeys_help, progname); + exit(1); + } + if (!std::string_view(options->end_stop_code).starts_with("star:") + && !std::string_view(options->end_stop_code).starts_with("stop:")) { + fprintf(stderr, "%s: end user stop code must be prefixed with star:/stop:\n\n", progname); + fprintf(stderr, journeys_help, progname); + exit(1); + } +} + +void journeyInfoValidateOptions(const char *progname, Options *options) { +#define X(name, argument, long_, short_) \ + if (#name != "kv1_file_path"sv && #name != "line_planning_number"sv \ + && #name != "journey_number"sv && #name != "help"sv) \ + if (options->name) { \ + if (long_) { \ + if (short_) fprintf(stderr, "%s: unexpected flag --%s (-%c) for journeyinfo subcommand\n\n", progname, static_cast(long_), short_); \ + else fprintf(stderr, "%s: unexpected flag --%s for journeyinfo subcommand\n\n", progname, static_cast(long_)); \ + } else if (short_) fprintf(stderr, "%s: unexpected flag -%c for journeyinfo subcommand\n\n", progname, short_); \ + fprintf(stderr, journeyinfo_help, progname); \ + exit(1); \ + } + LONG_OPTIONS + SHORT_OPTIONS +#undef X + + if (options->positional.size() > 0) { + fprintf(stderr, "%s: unexpected positional argument(s) for journeyinfo subcommand\n\n", progname); + for (auto pos : options->positional) fprintf(stderr, "opt: %s\n", pos); + fprintf(stderr, journeyinfo_help, progname); + exit(1); + } + + if (!options->kv1_file_path) + options->kv1_file_path = "-"; + if (options->kv1_file_path == ""sv) { + fprintf(stderr, "%s: KV1 file path cannot be empty\n\n", progname); + fprintf(stderr, journeyinfo_help, progname); + exit(1); + } + if (!options->journey_number || options->journey_number == ""sv) { + fprintf(stderr, "%s: journey number must be provided\n\n", progname); + fprintf(stderr, journeyinfo_help, progname); + exit(1); + } + if (!options->line_planning_number || options->line_planning_number == ""sv) { + fprintf(stderr, "%s: line planning number must be provided\n\n", progname); + fprintf(stderr, journeyinfo_help, progname); + exit(1); + } +} + +void jopaRouteValidateOptions(const char *progname, Options *options) { +#define X(name, argument, long_, short_) \ + if (#name != "kv1_file_path"sv && #name != "line_planning_number"sv \ + && #name != "journey_pattern_code"sv && #name != "help"sv && #name != "output_file_path"sv) \ + if (options->name) { \ + if (long_) { \ + if (short_) fprintf(stderr, "%s: unexpected flag --%s (-%c) for joparoute subcommand\n\n", progname, static_cast(long_), short_); \ + else fprintf(stderr, "%s: unexpected flag --%s for joparoute subcommand\n\n", progname, static_cast(long_)); \ + } else if (short_) fprintf(stderr, "%s: unexpected flag -%c for joparoute subcommand\n\n", progname, short_); \ + fprintf(stderr, joparoute_help, progname); \ + exit(1); \ + } + LONG_OPTIONS + SHORT_OPTIONS +#undef X + + if (options->positional.size() > 0) { + fprintf(stderr, "%s: unexpected positional argument(s) for joparoute subcommand\n\n", progname); + for (auto pos : options->positional) fprintf(stderr, "opt: %s\n", pos); + fprintf(stderr, joparoute_help, progname); + exit(1); + } + + if (!options->kv1_file_path) + options->kv1_file_path = "-"; + if (!options->output_file_path) + options->output_file_path = "-"; + if (options->kv1_file_path == ""sv) { + fprintf(stderr, "%s: KV1 file path cannot be empty\n\n", progname); + fprintf(stderr, joparoute_help, progname); + exit(1); + } + if (options->output_file_path == ""sv) { + fprintf(stderr, "%s: output file path cannot be empty\n\n", progname); + fprintf(stderr, joparoute_help, progname); + exit(1); + } + if (!options->journey_pattern_code || options->journey_pattern_code == ""sv) { + fprintf(stderr, "%s: journey pattern code must be provided\n\n", progname); + fprintf(stderr, joparoute_help, progname); + exit(1); + } + if (!options->line_planning_number || options->line_planning_number == ""sv) { + fprintf(stderr, "%s: line planning number must be provided\n\n", progname); + fprintf(stderr, joparoute_help, progname); + exit(1); + } +} + +struct ShortFlag { + int has_arg; + int c; +}; + +template +const std::string mkargarr = + (std::string() + + ... + + (flags.c == 0 + ? "" + : std::string((const char[]){ flags.c, '\0' }) + + (flags.has_arg == required_argument + ? ":" + : flags.has_arg == optional_argument + ? "::" + : ""))); + +#define X(name, has_arg, long_, short_) ShortFlag(has_arg, short_), +const std::string argarr = mkargarr; +#undef X + +Options parseOptions(int argc, char *argv[]) { + const char *progname = argv[0]; + + // Struct with options for augmentkv6. + Options options; + + static option long_options[] = { +#define X(name, argument, long_, short_) { long_, argument, nullptr, short_ }, + LONG_OPTIONS +#undef X + { 0 }, + }; + + int c; + int option_index = 0; + bool error = false; + while ((c = getopt_long(argc, argv, argarr.c_str(), long_options, &option_index)) != -1) { + // If a long option was used, c corresponds with val. We have val = 0 for + // options which have no short alternative, so checking for c = 0 gives us + // whether a long option with no short alternative was used. + // Below, we check for c = 'h', which corresponds with the long option + // '--help', for which val = 'h'. + if (c == 0) { + const char *name = long_options[option_index].name; +#define X(opt_name, opt_has_arg, opt_long, opt_short) \ + if (name == opt_long ## sv) { options.opt_name = optarg; continue; } + LONG_OPTIONS +#undef X + error = true; + } +#define X(opt_name, opt_has_arg, opt_long, opt_short) \ + if (c == opt_short) { options.opt_name = optarg ? optarg : opt_set; continue; } + LONG_OPTIONS + SHORT_OPTIONS +#undef X + error = true; + } + + if (optind < argc) + options.subcommand = argv[optind++]; + while (optind < argc) + options.positional.push_back(argv[optind++]); + + if (options.subcommand + && options.subcommand != "schedule"sv + && options.subcommand != "joparoute"sv + && options.subcommand != "journeyinfo"sv + && options.subcommand != "journeyroute"sv + && options.subcommand != "journeys"sv) { + fprintf(stderr, "%s: unknown subcommand '%s'\n\n", progname, options.subcommand); + fprintf(stderr, help, progname); + exit(1); + } + if (options.subcommand && error) { + fputc('\n', stderr); + if (options.subcommand == "joparoute"sv) fprintf(stderr, joparoute_help, progname); + if (options.subcommand == "journeyinfo"sv) fprintf(stderr, journeyinfo_help, progname); + if (options.subcommand == "journeyroute"sv) fprintf(stderr, journeyroute_help, progname); + if (options.subcommand == "journeys"sv) fprintf(stderr, journeys_help, progname); + if (options.subcommand == "schedule"sv) fprintf(stderr, schedule_help, progname); + exit(1); + } + if (error || !options.subcommand) { + if (!options.subcommand) fprintf(stderr, "%s: no subcommand provided\n", progname); + fputc('\n', stderr); + fprintf(stderr, help, progname); + exit(1); + } + if (options.help) { + if (options.subcommand == "joparoute"sv) fprintf(stderr, joparoute_help, progname); + if (options.subcommand == "journeyinfo"sv) fprintf(stderr, journeyinfo_help, progname); + if (options.subcommand == "journeyroute"sv) fprintf(stderr, journeyroute_help, progname); + if (options.subcommand == "journeys"sv) fprintf(stderr, journeys_help, progname); + if (options.subcommand == "schedule"sv) fprintf(stderr, schedule_help, progname); + exit(0); + } + + if (options.subcommand == "joparoute"sv) + jopaRouteValidateOptions(progname, &options); + if (options.subcommand == "journeyinfo"sv) + journeyInfoValidateOptions(progname, &options); + if (options.subcommand == "journeyroute"sv) + journeyRouteValidateOptions(progname, &options); + if (options.subcommand == "journeys"sv) + journeysValidateOptions(progname, &options); + if (options.subcommand == "schedule"sv) + scheduleValidateOptions(progname, &options); + + return options; +} diff --git a/src/querykv1/cliopts.hpp b/src/querykv1/cliopts.hpp new file mode 100644 index 0000000..df8630e --- /dev/null +++ b/src/querykv1/cliopts.hpp @@ -0,0 +1,35 @@ +// vim:set sw=2 ts=2 sts et: + +#ifndef OEUF_QUERYKV1_CLIOPTS_HPP +#define OEUF_QUERYKV1_CLIOPTS_HPP + +#include + +#define LONG_OPTIONS \ +/* name req/opt/no arg long short */ + X(kv1_file_path, required_argument, "kv1", 0 ) \ + X(line_planning_number, required_argument, "line", 0 ) \ + X(journey_number, required_argument, "journey", 0 ) \ + X(journey_pattern_code, required_argument, "jopa", 0 ) \ + X(begin_stop_code, required_argument, "begin", 0 ) \ + X(end_stop_code, required_argument, "end", 0 ) \ + X(help, no_argument, "help", 'h') + +#define SHORT_OPTIONS \ + X(output_file_path, required_argument, nullptr, 'o') + +struct Options { + const char *subcommand = nullptr; + std::vector positional; +#define X(name, argument, long_, short_) const char *name = nullptr; + LONG_OPTIONS + SHORT_OPTIONS +#undef X +}; + +extern const char *opt_set; +extern const char *opt_unset; + +Options parseOptions(int argc, char *argv[]); + +#endif // OEUF_QUERYKV1_CLIOPTS_HPP diff --git a/src/querykv1/daterange.cpp b/src/querykv1/daterange.cpp new file mode 100644 index 0000000..5ce42bf --- /dev/null +++ b/src/querykv1/daterange.cpp @@ -0,0 +1,91 @@ +// vim:set sw=2 ts=2 sts et: + +#include "daterange.hpp" + +static std::chrono::year_month_day nextDay(std::chrono::year_month_day ymd) { + return std::chrono::sys_days(ymd) + std::chrono::days(1); +} + +// DateRange expresses the date range [from, thru]. +DateRange::Iterator &DateRange::Iterator::operator++() { + ymd_ = nextDay(ymd_); + return *this; +} + +std::chrono::year_month_day DateRange::Iterator::operator*() const { + return ymd_; +} + +std::chrono::year_month_day DateRange::Iterator::ymd() const { + return ymd_; +} + +DateRange::Iterator::Iterator(std::chrono::year_month_day ymd) : ymd_(ymd) {} + +DateRange::DateRange(std::chrono::year_month_day from, std::chrono::year_month_day thru) + : from_(from), thru_(thru) +{} + +DateRange::Iterator DateRange::begin() const { + return DateRange::Iterator(from_); +} + +DateRange::Iterator DateRange::end() const { + return DateRange::Iterator(nextDay(thru_)); +} + +bool DateRange::valid() const { + return from_ <= thru_; +} + +std::chrono::year_month_day DateRange::from() const { + return from_; +} + +std::chrono::year_month_day DateRange::thru() const { + return thru_; +} + +bool operator==(const DateRange::Iterator a, const DateRange::Iterator b) { + return *a == *b; +} + +DateRangeSeq::DateRangeSeq(std::initializer_list ranges) + : DateRangeSeq(ranges.begin(), ranges.end()) +{} + +DateRangeSeq DateRangeSeq::clampFrom(std::chrono::year_month_day from) const { + std::vector new_ranges; + new_ranges.reserve(ranges_.size()); + for (const DateRange range : ranges_) { + if (range.from() < from) { + if (range.thru() < from) + continue; + new_ranges.emplace_back(from, range.thru()); + } + new_ranges.push_back(range); + } + return DateRangeSeq(new_ranges.begin(), new_ranges.end()); +} + +DateRangeSeq DateRangeSeq::clampThru(std::chrono::year_month_day thru) const { + std::vector new_ranges; + new_ranges.reserve(ranges_.size()); + for (const DateRange range : ranges_) { + if (range.thru() > thru) { + if (range.from() > thru) + continue; + new_ranges.emplace_back(range.from(), thru); + } + new_ranges.push_back(range); + } + return DateRangeSeq(new_ranges.begin(), new_ranges.end()); +} + +std::vector::const_iterator DateRangeSeq::begin() const { + return ranges_.begin(); +} + +std::vector::const_iterator DateRangeSeq::end() const { + return ranges_.end(); +} diff --git a/src/querykv1/daterange.hpp b/src/querykv1/daterange.hpp new file mode 100644 index 0000000..e34c39c --- /dev/null +++ b/src/querykv1/daterange.hpp @@ -0,0 +1,118 @@ +// vim:set sw=2 ts=2 sts et: + +#ifndef OEUF_QUERYKV1_DATERANGE_HPP +#define OEUF_QUERYKV1_DATERANGE_HPP + +#include +#include +#include +#include +#include +#include + +// DateRange expresses the date range [from, thru]. +class DateRange { + public: + class Iterator { + friend class DateRange; + + public: + Iterator &operator++(); + + std::chrono::year_month_day operator*() const; + std::chrono::year_month_day ymd() const; + + private: + explicit Iterator(std::chrono::year_month_day ymd); + + std::chrono::year_month_day ymd_; + }; + + explicit DateRange(std::chrono::year_month_day from, std::chrono::year_month_day thru); + + Iterator begin() const; + Iterator end() const; + bool valid() const; + std::chrono::year_month_day from() const; + std::chrono::year_month_day thru() const; + + private: + std::chrono::year_month_day from_; + std::chrono::year_month_day thru_; +}; + +bool operator==(const DateRange::Iterator a, const DateRange::Iterator b); + +template +concept DerefsTo = requires(Tp p) { + { *p } -> std::convertible_to; +}; + +class DateRangeSeq { + // The way LE and GE are ordered makes a difference for how the sorting + // (insertion based on lower_bound) works. Do not carelessly reorder this. + enum LeGe { + GE, // >= + LE, // <= + }; + + std::vector ranges_; + + public: + template + requires DerefsTo + explicit DateRangeSeq(InputIt begin, InputIt end) { + // We convert every inclusive date range [x, y] into (x, >=) and (y, <=) + // and put these into a list, using binary search to make sure that these + // stay ordered. We then reduce this list, removing tautological + // predicates, giving us a final list of ranges that do not overlap. + + std::vector> preds; + + size_t n = 0; + for (auto it = begin; it != end; it++) { + auto &range = *it; + if (!range.valid()) continue; + + auto a = std::make_pair(range.from(), GE); + auto b = std::make_pair(range.thru(), LE); + preds.insert(std::lower_bound(preds.begin(), preds.end(), a), a); + preds.insert(std::lower_bound(preds.begin(), preds.end(), b), b); + + n++; + } + + if (preds.empty()) + return; + + assert(preds.size() >= 2); + assert(preds.front().second == GE); + assert(preds.back().second == LE); + + std::chrono::year_month_day begin_ymd = preds[0].first; + for (size_t i = 1; i < preds.size(); i++) { + if (preds[i].second == LE && (i + 1 == preds.size() || preds[i + 1].second == GE)) { + std::chrono::year_month_day end_ymd = preds[i].first; + if (!ranges_.empty() && ranges_.back().thru() == begin_ymd) + ranges_.back() = DateRange(ranges_.back().from(), end_ymd); + else + ranges_.push_back(DateRange(begin_ymd, end_ymd)); + if (i + 1 != preds.size()) { + begin_ymd = preds[i + 1].first; + i++; + } + } + } + } + + explicit DateRangeSeq(std::initializer_list ranges); + + DateRangeSeq clampFrom(std::chrono::year_month_day from) const; + DateRangeSeq clampThru(std::chrono::year_month_day thru) const; + + public: + std::vector::const_iterator begin() const; + std::vector::const_iterator end() const; +}; + +#endif // OEUF_QUERYKV1_DATERANGE_HPP diff --git a/src/querykv1/grammar.abnf b/src/querykv1/grammar.abnf new file mode 100644 index 0000000..1c93760 --- /dev/null +++ b/src/querykv1/grammar.abnf @@ -0,0 +1,44 @@ +; This grammar does *not* allow fields to contain LF, unless the entire content +; of the field is quoted. The file is simply rejected otherwise. +; I took the liberty to take some inspiration from the somewhat similar IETF RFC 4180. + +document = [header NEWLINE] (comment / record / empty-line) *(NEWLINE (comment / record / empty-line)) [NEWLINE] / header + +header = OPENBRACK *NOTCRLF +comment = SEMICOLON *NOTCRLF + +empty-line = *WHITESPACE + +record = field *(PIPE field) +field = *WHITESPACE field-data *WHITESPACE +field-data = escaped / unescaped + +; Unescaped fields are also allowed to contain double quotes, +; they are just not interpreted in any special way. +escaped = DQUOTE *(TEXTDATA / WHITESPACE / NEWLINE / PIPE / 2DQUOTE) DQUOTE +unescaped = [TEXTDATA *(*WHITESPACE (TEXTDATA / DQUOTE))] + +HTAB = %x09 ; +LF = %x0A ; +VTAB = %x0B ; +FF = %x0C ;
+CR = %x0D ; +SPACE = %x20 ; +DQUOTE = %x22 ; " +SEMICOLON = %x3B ; ; +OPENBRACK = %x5B ; [ +PIPE = %x7C ; | + +; All codepoints, except CR, LF, SPACE, FF, HTAB, VTAB, PIPE, DQUOTE +; Semicolon is included, as comments are only defined as 'lines starting with a semicolon'. +; So it should be fine if a semicolon is part of a field, the rest of the line would not +; be interpreted as a comment in that case. +TEXTDATA = %x00-08 / %x0E-1F / %x21 / %x23-5A / %x5C-7B / %x7D-10FFFF + +; Not including LF here even though TMI8/KV1 does not officially consider it +; a newline, as newlines are defined as 'CR optionally followed by LF' +WHITESPACE = SPACE / FF / HTAB / VTAB + +; All codepoints excluding CR and LF +NOTCRLF = %x00-09 / %x0B-0C / %x0E-10FFFF +NEWLINE = CR [LF] diff --git a/src/querykv1/grammar.ebnf b/src/querykv1/grammar.ebnf new file mode 100644 index 0000000..94f8cde --- /dev/null +++ b/src/querykv1/grammar.ebnf @@ -0,0 +1,47 @@ +/* This grammar does allow fields to contain stray LFs, not after any specific + * CR. I took the liberty to take some inspiration from the somewhat similar + * IETF RFC 4180. + */ +document ::= (header NEWLINE)? (comment | record | empty-line) (NEWLINE (comment | record | empty-line))* NEWLINE? | header + +header ::= OPENBRACK NOTCR* +comment ::= SEMICOLON NOTCR* + +empty-line ::= WHITESPACE* + +record ::= field (PIPE field)* +field ::= WHITESPACE* field-data WHITESPACE* +field-data ::= DQUOTE escaped DQUOTE | unescaped + +/* Unescaped fields are also allowed to contain double quotes, they are just + * not interpreted in any special way. + */ +escaped ::= (TEXTDATA | WHITESPACE | NEWLINE | PIPE | DQUOTE DQUOTE)* +unescaped ::= (TEXTDATA (WHITESPACE* (TEXTDATA | DQUOTE))*)? + +HTAB ::= #x09 /* */ +LF ::= #x0A /* */ +VTAB ::= #x0B /* */ +FF ::= #x0C /* */ +CR ::= #x0D /* */ +SPACE ::= #x20 /* */ +DQUOTE ::= #x22 /* " */ +SEMICOLON ::= #x3B /* ; */ +OPENBRACK ::= #x5B /* [ */ +PIPE ::= #x7C /* | */ + +/* All codepoints, except CR, LF, SPACE, FF, HTAB, VTAB, PIPE, DQUOTE. + * Semicolon is included, as comments are only defined as 'lines starting with + * a semicolon'. So it should be fine if a semicolon is part of a field, the + * rest of the line would not be interpreted as a comment in that case. + */ +TEXTDATA ::= [#x00-#x08#x0E-#x1F#x21#x23-#x5A#x5C-#x7B#x7D-#x10FFFF] + +/* Including LF here as TMI8/KV1 does not consider it a newline, + * as newlines are defined as 'CR optionally followed by LF' + */ +WHITESPACE ::= SPACE | LF | FF | HTAB | VTAB + +/* All codepoints excluding CR and LF */ +NOTCR ::= [#x00-#x0C#x0E-#x10FFFF] +NEWLINE ::= CR LF? diff --git a/src/querykv1/grammar.ebnf.bak b/src/querykv1/grammar.ebnf.bak new file mode 100644 index 0000000..b5acbf5 --- /dev/null +++ b/src/querykv1/grammar.ebnf.bak @@ -0,0 +1,23 @@ +document ::= (header NEWLINE)? (comment | record | empty-line) (NEWLINE (comment | record | empty-line))* NEWLINE? | header +header ::= OPENBRACK NOTCRLF* +comment ::= SEMICOLON NOTCRLF* +empty-line ::= WHITESPACE* +record ::= field (PIPE field)* +field ::= WHITESPACE* field-data WHITESPACE* +field-data ::= escaped | unescaped +escaped ::= DQUOTE (TEXTDATA | WHITESPACE | NEWLINE | PIPE | DQUOTE DQUOTE)* DQUOTE +unescaped ::= (TEXTDATA (WHITESPACE* (TEXTDATA | DQUOTE))*)? +HTAB ::= #x09 +LF ::= #x0A +VTAB ::= #x0B +FF ::= #x0C +CR ::= #x0D +SPACE ::= #x20 +DQUOTE ::= #x22 +SEMICOLON ::= #x3B +OPENBRACK ::= #x5B +PIPE ::= #x7C +WHITESPACE ::= SPACE | FF | HTAB | VTAB +NOTCRLF ::= [#x00-#x09#x0B-#x0C#x0E-#x10FFFF] +TEXTDATA ::= [#x00-#x08#x0E-#x1F#x21#x23-#x5A#x5C-#x7B#x7D-#x10FFFF] +NEWLINE ::= CR LF? diff --git a/src/querykv1/joparoute.cpp b/src/querykv1/joparoute.cpp new file mode 100644 index 0000000..94ed359 --- /dev/null +++ b/src/querykv1/joparoute.cpp @@ -0,0 +1,102 @@ +// vim:set sw=2 ts=2 sts et: + +#include +#include +#include + +#include "joparoute.hpp" + +using namespace std::string_view_literals; + +void jopaRoute(const Options &options, Kv1Records &records, Kv1Index &index) { + FILE *out = stdout; + if (options.output_file_path != "-"sv) + out = fopen(options.output_file_path, "wb"); + if (!out) { + fprintf(stderr, "Open %s: %s\n", options.output_file_path, strerrordesc_np(errno)); + exit(EXIT_FAILURE); + } + + const std::string data_owner_code = "CXX"; + Kv1JourneyPattern::Key jopa_key( + // Of course it is bad to hardcode this, but we really have no time to make + // everything nice and dynamic. We're only working with CXX data anyway, + // and provide no support for the 'Schedules and Passing Times' KV1 + // variant. + data_owner_code, + options.line_planning_number, + options.journey_pattern_code); + + const Kv1JourneyPattern *jopa = index.journey_patterns[jopa_key]; + if (!jopa) { + std::cerr << "Journey pattern not found" << std::endl; + return; + } + const Kv1Line *line = jopa->p_line; + + struct Point { + bool is_stop = false; + const Kv1JourneyPatternTimingLink *jopatili = nullptr; + const Kv1Link *link = nullptr; + const Kv1Point *point = nullptr; + double distance_since_start_of_link = 0; + double distance_since_start_of_journey = 0; + }; + std::vector points; + + for (size_t i = 0; i < records.journey_pattern_timing_links.size(); i++) { + const Kv1JourneyPatternTimingLink *jopatili = &records.journey_pattern_timing_links[i]; + if (jopatili->key.line_planning_number == jopa->key.line_planning_number + && jopatili->key.journey_pattern_code == jopa->key.journey_pattern_code) { + const Kv1Link::Key link_key(data_owner_code, jopatili->user_stop_code_begin, + jopatili->user_stop_code_end, line->transport_type); + const Kv1Link *link = index.links[link_key]; + const Kv1UserStopPoint::Key link_begin_key(data_owner_code, jopatili->user_stop_code_begin); + const Kv1UserStopPoint::Key link_end_key(data_owner_code, jopatili->user_stop_code_end); + const Kv1UserStopPoint *link_begin = index.user_stop_points[link_begin_key]; + const Kv1UserStopPoint *link_end = index.user_stop_points[link_end_key]; + + points.emplace_back(true, jopatili, link, link_begin->p_point, 0); + + for (size_t j = 0; j < records.point_on_links.size(); j++) { + Kv1PointOnLink *pool = &records.point_on_links[j]; + if (pool->key.user_stop_code_begin == jopatili->user_stop_code_begin + && pool->key.user_stop_code_end == jopatili->user_stop_code_end + && pool->key.transport_type == jopatili->p_line->transport_type) { + points.emplace_back(false, jopatili, link, pool->p_point, pool->distance_since_start_of_link); + } + } + + points.emplace_back(true, jopatili, link, link_end->p_point, link->distance); + } + } + + std::sort(points.begin(), points.end(), [](Point &a, Point &b) { + if (a.jopatili->key.timing_link_order != b.jopatili->key.timing_link_order) + return a.jopatili->key.timing_link_order < b.jopatili->key.timing_link_order; + return a.distance_since_start_of_link < b.distance_since_start_of_link; + }); + + double distance_since_start_of_journey = 0; + for (size_t i = 0; i < points.size(); i++) { + Point *p = &points[i]; + if (i > 0) { + Point *prev = &points[i - 1]; + if (p->link != prev->link) { + distance_since_start_of_journey += prev->link->distance; + } + } + p->distance_since_start_of_journey = distance_since_start_of_journey + p->distance_since_start_of_link; + } + + fputs("is_stop,link_usrstop_begin,link_usrstop_end,point_code,rd_x,rd_y,distance_since_start_of_link,distance_since_start_of_journey\n", out); + for (const auto &point : points) { + fprintf(out, "%s,%s,%s,%s,%f,%f,%f,%f\n", + point.is_stop ? "true" : "false", + point.jopatili->user_stop_code_begin.c_str(), point.jopatili->user_stop_code_end.c_str(), + point.point->key.point_code.c_str(), point.point->location_x_ew, point.point->location_y_ns, + point.distance_since_start_of_link, point.distance_since_start_of_journey); + } + + if (options.output_file_path != "-"sv) fclose(out); +} diff --git a/src/querykv1/joparoute.hpp b/src/querykv1/joparoute.hpp new file mode 100644 index 0000000..ade94e8 --- /dev/null +++ b/src/querykv1/joparoute.hpp @@ -0,0 +1,13 @@ +// vim:set sw=2 ts=2 sts et: + +#ifndef OEUF_QUERYKV1_JOPAROUTE_HPP +#define OEUF_QUERYKV1_JOPAROUTE_HPP + +#include +#include + +#include "cliopts.hpp" + +void jopaRoute(const Options &options, Kv1Records &records, Kv1Index &index); + +#endif // OEUF_QUERYKV1_JOPAROUTE_HPP diff --git a/src/querykv1/journeyinfo.cpp b/src/querykv1/journeyinfo.cpp new file mode 100644 index 0000000..bd29490 --- /dev/null +++ b/src/querykv1/journeyinfo.cpp @@ -0,0 +1,64 @@ +// vim:set sw=2 ts=2 sts et: + +#include + +#include "journeyinfo.hpp" + +void journeyInfo(const Options &options, Kv1Records &records, Kv1Index &index) { + std::cout << "Info for journey " << options.line_planning_number + << "/" << options.journey_number << std::endl; + + std::unordered_map usrstops; + for (size_t i = 0; i < records.user_stop_points.size(); i++) { + const Kv1UserStopPoint *usrstop = &records.user_stop_points[i]; + usrstops[usrstop->key.user_stop_code] = usrstop; + } + + for (const auto &pujo : records.public_journeys) { + if (pujo.key.line_planning_number != options.line_planning_number + || std::to_string(pujo.key.journey_number) != options.journey_number) + continue; + + std::vector timing_links; + for (size_t i = 0; i < records.journey_pattern_timing_links.size(); i++) { + const Kv1JourneyPatternTimingLink *jopatili = &records.journey_pattern_timing_links[i]; + if (jopatili->key.line_planning_number != options.line_planning_number + || jopatili->key.journey_pattern_code != pujo.journey_pattern_code) + continue; + timing_links.push_back(jopatili); + } + + std::sort(timing_links.begin(), timing_links.end(), [](auto a, auto b) -> bool { + return a->key.timing_link_order < b->key.timing_link_order; + }); + auto begin_stop = timing_links.front()->user_stop_code_begin; + auto end_stop = timing_links.back()->user_stop_code_end; + + const auto *begin = usrstops[begin_stop]; + const auto *end = usrstops[end_stop]; + + std::cout << " Journey pattern: " << pujo.key.line_planning_number + << "/" << pujo.journey_pattern_code << std::endl + << " Begin stop: " << begin_stop + << "; name: " << std::quoted(begin->name) + << "; town: " << std::quoted(begin->town) << std::endl + << " End stop: " << end_stop + << "; name: " << std::quoted(end->name) + << "; town: " << std::quoted(end->town) << std::endl; + + const auto *begin_star = begin->p_user_stop_area; + const auto *end_star = end->p_user_stop_area; + if (begin_star) + std::cout << " Begin stop area: " << begin_star->key.user_stop_area_code + << "; name: " << std::quoted(begin_star->name) + << ", town: " << std::quoted(begin_star->town) + << std::endl; + if (end_star) + std::cout << " End stop area: " << end_star->key.user_stop_area_code + << "; name: " << std::quoted(end_star->name) + << ", town: " << std::quoted(end_star->town) + << std::endl; + + break; + } +} diff --git a/src/querykv1/journeyinfo.hpp b/src/querykv1/journeyinfo.hpp new file mode 100644 index 0000000..2a2118d --- /dev/null +++ b/src/querykv1/journeyinfo.hpp @@ -0,0 +1,13 @@ +// vim:set sw=2 ts=2 sts et: + +#ifndef OEUF_QUERYKV1_JOURNEYINFO_HPP +#define OEUF_QUERYKV1_JOURNEYINFO_HPP + +#include +#include + +#include "cliopts.hpp" + +void journeyInfo(const Options &options, Kv1Records &records, Kv1Index &index); + +#endif // OEUF_QUERYKV1_JOURNEYINFO_HPP diff --git a/src/querykv1/journeyroute.cpp b/src/querykv1/journeyroute.cpp new file mode 100644 index 0000000..013ea1c --- /dev/null +++ b/src/querykv1/journeyroute.cpp @@ -0,0 +1,96 @@ +// vim:set sw=2 ts=2 sts et: + +#include +#include + +#include "journeyroute.hpp" + +using namespace std::string_view_literals; + +void journeyRoute(const Options &options, Kv1Records &records, Kv1Index &index) { + FILE *out = stdout; + if (options.output_file_path != "-"sv) + out = fopen(options.output_file_path, "wb"); + if (!out) { + fprintf(stderr, "Open %s: %s\n", options.output_file_path, strerrordesc_np(errno)); + exit(EXIT_FAILURE); + } + + for (auto &pujo : records.public_journeys) { + if (pujo.key.line_planning_number == options.line_planning_number && std::to_string(pujo.key.journey_number) == options.journey_number) { + fprintf(stderr, "Got PUJO %s/%s:\n", options.line_planning_number, options.journey_number); + fprintf(stderr, " Day type: %s\n", pujo.key.day_type.c_str()); + auto &pegr = *pujo.p_period_group; + fprintf(stderr, " PEGR Code: %s\n", pegr.key.period_group_code.c_str()); + fprintf(stderr, " PEGR Description: %s\n", pegr.description.c_str()); + fprintf(stderr, " SPECDAY Code: %s\n", pujo.key.specific_day_code.c_str()); + auto &timdemgrp = *pujo.p_time_demand_group; + + for (auto &pegrval : records.period_group_validities) { + if (pegrval.key.period_group_code == pegr.key.period_group_code) { + fprintf(stderr, "Got PEGRVAL for PEGR %s\n", pegr.key.period_group_code.c_str()); + std::cerr << " Valid from: " << pegrval.key.valid_from << std::endl; + std::cerr << " Valid thru: " << pegrval.valid_thru << std::endl; + } + } + + struct Point { + Kv1JourneyPatternTimingLink *jopatili = nullptr; + Kv1TimeDemandGroupRunTime *timdemrnt = nullptr; + double distance_since_start_of_link = 0; + double rd_x = 0; + double rd_y = 0; + double total_time_s = 0; + }; + std::vector points; + + for (size_t i = 0; i < records.time_demand_group_run_times.size(); i++) { + Kv1TimeDemandGroupRunTime *timdemrnt = &records.time_demand_group_run_times[i]; + if (timdemrnt->key.line_planning_number == timdemgrp.key.line_planning_number + && timdemrnt->key.journey_pattern_code == timdemgrp.key.journey_pattern_code + && timdemrnt->key.time_demand_group_code == timdemgrp.key.time_demand_group_code) { + Kv1JourneyPatternTimingLink *jopatili = timdemrnt->p_journey_pattern_timing_link; + for (auto &pool : records.point_on_links) { + if (pool.key.user_stop_code_begin == timdemrnt->user_stop_code_begin + && pool.key.user_stop_code_end == timdemrnt->user_stop_code_end + && pool.key.transport_type == jopatili->p_line->transport_type) { + points.emplace_back( + jopatili, + timdemrnt, + pool.distance_since_start_of_link, + pool.p_point->location_x_ew, + pool.p_point->location_y_ns + ); + } + } + } + } + + std::sort(points.begin(), points.end(), [](Point &a, Point &b) { + if (a.jopatili->key.timing_link_order != b.jopatili->key.timing_link_order) + return a.jopatili->key.timing_link_order < b.jopatili->key.timing_link_order; + return a.distance_since_start_of_link < b.distance_since_start_of_link; + }); + + double total_time_s = 0; + for (size_t i = 0; i < points.size(); i++) { + Point *p = &points[i]; + p->total_time_s = total_time_s; + if (i > 0) { + Point *prev = &points[i - 1]; + if (p->timdemrnt != prev->timdemrnt) { + total_time_s += prev->timdemrnt->total_drive_time_s; + prev->total_time_s = total_time_s; + } + } + } + + fputs("rd_x,rd_y,total_time_s,is_timing_stop\n", out); + for (const auto &point : points) { + fprintf(out, "%f,%f,%f,%d\n", point.rd_x, point.rd_y, point.total_time_s, point.jopatili->is_timing_stop); + } + } + } + + if (options.output_file_path != "-"sv) fclose(out); +} diff --git a/src/querykv1/journeyroute.hpp b/src/querykv1/journeyroute.hpp new file mode 100644 index 0000000..ccd996c --- /dev/null +++ b/src/querykv1/journeyroute.hpp @@ -0,0 +1,13 @@ +// vim:set sw=2 ts=2 sts et: + +#ifndef OEUF_QUERYKV1_JOURNEYROUTE_HPP +#define OEUF_QUERYKV1_JOURNEYROUTE_HPP + +#include +#include + +#include "cliopts.hpp" + +void journeyRoute(const Options &options, Kv1Records &records, Kv1Index &index); + +#endif // OEUF_QUERYKV1_JOURNEYROUTE_HPP diff --git a/src/querykv1/journeys.cpp b/src/querykv1/journeys.cpp new file mode 100644 index 0000000..96566b2 --- /dev/null +++ b/src/querykv1/journeys.cpp @@ -0,0 +1,95 @@ +// vim:set sw=2 ts=2 sts et: + +#include +#include +#include +#include + +#include "journeys.hpp" + +using namespace std::string_view_literals; + +void journeys(const Options &options, Kv1Records &records, Kv1Index &index) { + const std::string_view want_begin_stop_code(options.begin_stop_code); + const std::string_view want_end_stop_code(options.end_stop_code); + + FILE *out = stdout; + if (options.output_file_path != "-"sv) + out = fopen(options.output_file_path, "wb"); + if (!out) { + fprintf(stderr, "Open %s: %s\n", options.output_file_path, strerrordesc_np(errno)); + exit(EXIT_FAILURE); + } + + std::cerr << "Generating journeys for " << options.line_planning_number << ", going from stop " + << options.begin_stop_code << " to " << options.end_stop_code << std::endl; + + std::unordered_map usrstops; + for (size_t i = 0; i < records.user_stop_points.size(); i++) { + const Kv1UserStopPoint *usrstop = &records.user_stop_points[i]; + usrstops[usrstop->key.user_stop_code] = usrstop; + } + + std::unordered_set journey_pattern_codes; + for (const auto &jopa : records.journey_patterns) { + if (jopa.key.line_planning_number != options.line_planning_number) + continue; + journey_pattern_codes.insert(jopa.key.journey_pattern_code); + } + + std::unordered_map> jopatilis; + for (size_t i = 0; i < records.journey_pattern_timing_links.size(); i++) { + const Kv1JourneyPatternTimingLink *jopatili = &records.journey_pattern_timing_links[i]; + if (jopatili->key.line_planning_number != options.line_planning_number + || !journey_pattern_codes.contains(jopatili->key.journey_pattern_code)) + continue; + jopatilis[jopatili->key.journey_pattern_code].push_back(jopatili); + } + + std::unordered_set valid_jopas; + for (auto &[journey_pattern_code, timing_links] : jopatilis) { + std::sort(timing_links.begin(), timing_links.end(), [](auto a, auto b) -> bool { + return a->key.timing_link_order < b->key.timing_link_order; + }); + auto begin_stop = timing_links.front()->user_stop_code_begin; + auto end_stop = timing_links.back()->user_stop_code_end; + + const auto *begin = usrstops[begin_stop]; + const auto *end = usrstops[end_stop]; + + bool begin_stop_ok = false; + if (want_begin_stop_code.starts_with("stop:")) + begin_stop_ok = want_begin_stop_code.substr(5) == begin_stop; + else if (want_begin_stop_code.starts_with("star:")) + begin_stop_ok = want_begin_stop_code.substr(5) == begin->user_stop_area_code; + + bool end_stop_ok = false; + if (want_end_stop_code.starts_with("stop:")) + end_stop_ok = want_end_stop_code.substr(5) == end_stop; + else if (want_end_stop_code.starts_with("star:")) + end_stop_ok = want_end_stop_code.substr(5) == end->user_stop_area_code; + + if (begin_stop_ok && end_stop_ok) { + valid_jopas.insert(journey_pattern_code); + } + } + + std::map> valid_journeys; + for (const auto &pujo : records.public_journeys) { + if (pujo.key.line_planning_number == options.line_planning_number + && valid_jopas.contains(pujo.journey_pattern_code)) { + valid_journeys[pujo.key.journey_number] = { + pujo.time_demand_group_code, + pujo.journey_pattern_code, + }; + } + } + + fputs("journey_number,time_demand_group_code,journey_pattern_code\n", out); + for (const auto &[journey_number, timdemgrp_jopa] : valid_journeys) { + const auto &[time_demand_group_code, journey_pattern_code] = timdemgrp_jopa; + fprintf(out, "%d,%s,%s\n", journey_number, time_demand_group_code.c_str(), journey_pattern_code.c_str()); + } + + if (options.output_file_path != "-"sv) fclose(out); +} diff --git a/src/querykv1/journeys.hpp b/src/querykv1/journeys.hpp new file mode 100644 index 0000000..cf615c7 --- /dev/null +++ b/src/querykv1/journeys.hpp @@ -0,0 +1,13 @@ +// vim:set sw=2 ts=2 sts et: + +#ifndef OEUF_QUERYKV1_JOURNEYS_HPP +#define OEUF_QUERYKV1_JOURNEYS_HPP + +#include +#include + +#include "cliopts.hpp" + +void journeys(const Options &options, Kv1Records &records, Kv1Index &index); + +#endif // OEUF_QUERYKV1_JOURNEYS_HPP diff --git a/src/querykv1/main.cpp b/src/querykv1/main.cpp new file mode 100644 index 0000000..6c606ba --- /dev/null +++ b/src/querykv1/main.cpp @@ -0,0 +1,198 @@ +// vim:set sw=2 ts=2 sts et: + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "cliopts.hpp" +#include "joparoute.hpp" +#include "journeyinfo.hpp" +#include "journeyroute.hpp" +#include "journeys.hpp" +#include "schedule.hpp" + +using namespace std::string_view_literals; + +using TimingClock = std::conditional_t< + std::chrono::high_resolution_clock::is_steady, + std::chrono::high_resolution_clock, + std::chrono::steady_clock>; + +std::string readKv1(const char *path) { + FILE *in = stdin; + if (path != "-"sv) in = fopen(path, "rb"); + else fputs("Reading KV1 from standard input\n", stderr); + if (!in) { + fprintf(stderr, "Open %s: %s\n", path, strerrordesc_np(errno)); + exit(1); + } + + char buf[4096]; + std::string data; + while (!feof(in) && !ferror(in)) { + size_t read = fread(buf, sizeof(char), 4096, in); + data.append(buf, read); + } + if (ferror(in)) { + if (path == "-"sv) + fputs("Error when reading from stdin\n", stderr); + else + fprintf(stderr, "Error reading from file \"%s\"\n", path); + exit(1); + } + fprintf(stderr, "Read %lu bytes\n", data.size()); + + if (path != "-"sv) + fclose(in); + + return data; +} + +std::vector lex(const char *path) { + std::string data = readKv1(path); + + auto start = TimingClock::now(); + Kv1Lexer lexer(data); + lexer.lex(); + auto end = TimingClock::now(); + + std::chrono::duration elapsed{end - start}; + double bytes = static_cast(data.size()) / 1'000'000; + double speed = bytes / elapsed.count(); + + if (!lexer.errors.empty()) { + fputs("Lexer reported errors:\n", stderr); + for (const auto &error : lexer.errors) + fprintf(stderr, "- %s\n", error.c_str()); + exit(1); + } + + fprintf(stderr, "Got %lu tokens\n", lexer.tokens.size()); + fprintf(stderr, "Duration: %f s\n", elapsed.count()); + fprintf(stderr, "Speed: %f MB/s\n", speed); + + return std::move(lexer.tokens); +} + +bool parse(const char *path, Kv1Records &into) { + std::vector tokens = lex(path); + + Kv1Parser parser(tokens, into); + parser.parse(); + + bool ok = true; + if (!parser.gerrors.empty()) { + ok = false; + fputs("Parser reported errors:\n", stderr); + for (const auto &error : parser.gerrors) + fprintf(stderr, "- %s\n", error.c_str()); + } + if (!parser.warns.empty()) { + fputs("Parser reported warnings:\n", stderr); + for (const auto &warn : parser.warns) + fprintf(stderr, "- %s\n", warn.c_str()); + } + + fprintf(stderr, "Parsed %lu records\n", into.size()); + + return ok; +} + +void printParsedRecords(const Kv1Records &records) { + fputs("Parsed records:\n", stderr); + fprintf(stderr, " organizational_units: %lu\n", records.organizational_units.size()); + fprintf(stderr, " higher_organizational_units: %lu\n", records.higher_organizational_units.size()); + fprintf(stderr, " user_stop_points: %lu\n", records.user_stop_points.size()); + fprintf(stderr, " user_stop_areas: %lu\n", records.user_stop_areas.size()); + fprintf(stderr, " timing_links: %lu\n", records.timing_links.size()); + fprintf(stderr, " links: %lu\n", records.links.size()); + fprintf(stderr, " lines: %lu\n", records.lines.size()); + fprintf(stderr, " destinations: %lu\n", records.destinations.size()); + fprintf(stderr, " journey_patterns: %lu\n", records.journey_patterns.size()); + fprintf(stderr, " concession_financer_relations: %lu\n", records.concession_financer_relations.size()); + fprintf(stderr, " concession_areas: %lu\n", records.concession_areas.size()); + fprintf(stderr, " financers: %lu\n", records.financers.size()); + fprintf(stderr, " journey_pattern_timing_links: %lu\n", records.journey_pattern_timing_links.size()); + fprintf(stderr, " points: %lu\n", records.points.size()); + fprintf(stderr, " point_on_links: %lu\n", records.point_on_links.size()); + fprintf(stderr, " icons: %lu\n", records.icons.size()); + fprintf(stderr, " notices: %lu\n", records.notices.size()); + fprintf(stderr, " notice_assignments: %lu\n", records.notice_assignments.size()); + fprintf(stderr, " time_demand_groups: %lu\n", records.time_demand_groups.size()); + fprintf(stderr, " time_demand_group_run_times: %lu\n", records.time_demand_group_run_times.size()); + fprintf(stderr, " period_groups: %lu\n", records.period_groups.size()); + fprintf(stderr, " specific_days: %lu\n", records.specific_days.size()); + fprintf(stderr, " timetable_versions: %lu\n", records.timetable_versions.size()); + fprintf(stderr, " public_journeys: %lu\n", records.public_journeys.size()); + fprintf(stderr, " period_group_validities: %lu\n", records.period_group_validities.size()); + fprintf(stderr, " exceptional_operating_days: %lu\n", records.exceptional_operating_days.size()); + fprintf(stderr, " schedule_versions: %lu\n", records.schedule_versions.size()); + fprintf(stderr, " public_journey_passing_times: %lu\n", records.public_journey_passing_times.size()); + fprintf(stderr, " operating_days: %lu\n", records.operating_days.size()); +} + +void printIndexSize(const Kv1Index &index) { + fputs("Index size:\n", stderr); + fprintf(stderr, " organizational_units: %lu\n", index.organizational_units.size()); + fprintf(stderr, " user_stop_points: %lu\n", index.user_stop_points.size()); + fprintf(stderr, " user_stop_areas: %lu\n", index.user_stop_areas.size()); + fprintf(stderr, " timing_links: %lu\n", index.timing_links.size()); + fprintf(stderr, " links: %lu\n", index.links.size()); + fprintf(stderr, " lines: %lu\n", index.lines.size()); + fprintf(stderr, " destinations: %lu\n", index.destinations.size()); + fprintf(stderr, " journey_patterns: %lu\n", index.journey_patterns.size()); + fprintf(stderr, " concession_financer_relations: %lu\n", index.concession_financer_relations.size()); + fprintf(stderr, " concession_areas: %lu\n", index.concession_areas.size()); + fprintf(stderr, " financers: %lu\n", index.financers.size()); + fprintf(stderr, " journey_pattern_timing_links: %lu\n", index.journey_pattern_timing_links.size()); + fprintf(stderr, " points: %lu\n", index.points.size()); + fprintf(stderr, " point_on_links: %lu\n", index.point_on_links.size()); + fprintf(stderr, " icons: %lu\n", index.icons.size()); + fprintf(stderr, " notices: %lu\n", index.notices.size()); + fprintf(stderr, " time_demand_groups: %lu\n", index.time_demand_groups.size()); + fprintf(stderr, " time_demand_group_run_times: %lu\n", index.time_demand_group_run_times.size()); + fprintf(stderr, " period_groups: %lu\n", index.period_groups.size()); + fprintf(stderr, " specific_days: %lu\n", index.specific_days.size()); + fprintf(stderr, " timetable_versions: %lu\n", index.timetable_versions.size()); + fprintf(stderr, " public_journeys: %lu\n", index.public_journeys.size()); + fprintf(stderr, " period_group_validities: %lu\n", index.period_group_validities.size()); + fprintf(stderr, " exceptional_operating_days: %lu\n", index.exceptional_operating_days.size()); + fprintf(stderr, " schedule_versions: %lu\n", index.schedule_versions.size()); + fprintf(stderr, " public_journey_passing_times: %lu\n", index.public_journey_passing_times.size()); + fprintf(stderr, " operating_days: %lu\n", index.operating_days.size()); +} + +int main(int argc, char *argv[]) { + Options options = parseOptions(argc, argv); + + Kv1Records records; + if (!parse(options.kv1_file_path, records)) { + fputs("Error parsing records, exiting\n", stderr); + return EXIT_FAILURE; + } + printParsedRecords(records); + fputs("Indexing...\n", stderr); + Kv1Index index(&records); + fprintf(stderr, "Indexed %lu records\n", index.size()); + // Only notice assignments are not indexed. If this equality is not valid, + // then this means that we had duplicate keys or that something else went + // wrong. That would really not be great. + assert(index.size() == records.size() - records.notice_assignments.size()); + printIndexSize(index); + fputs("Linking records...\n", stderr); + kv1LinkRecords(index); + fputs("Done linking\n", stderr); + + if (options.subcommand == "joparoute"sv) jopaRoute(options, records, index); + if (options.subcommand == "journeyroute"sv) journeyRoute(options, records, index); + if (options.subcommand == "journeys"sv) journeys(options, records, index); + if (options.subcommand == "journeyinfo"sv) journeyInfo(options, records, index); + if (options.subcommand == "schedule"sv) schedule(options, records, index); +} diff --git a/src/querykv1/schedule.cpp b/src/querykv1/schedule.cpp new file mode 100644 index 0000000..2bcfe0a --- /dev/null +++ b/src/querykv1/schedule.cpp @@ -0,0 +1,63 @@ +// vim:set sw=2 ts=2 sts et: + +#include +#include +#include +#include + +#include "daterange.hpp" +#include "schedule.hpp" + +using namespace std::string_view_literals; + +void schedule(const Options &options, Kv1Records &records, Kv1Index &index) { + FILE *out = stdout; + if (options.output_file_path != "-"sv) + out = fopen(options.output_file_path, "wb"); + if (!out) { + fprintf(stderr, "Open %s: %s\n", options.output_file_path, strerrordesc_np(errno)); + exit(EXIT_FAILURE); + } + + std::cerr << "Generating schedule for " << options.line_planning_number << std::endl; + + std::unordered_multimap period_group_validities; + for (const auto &pegr : records.period_group_validities) + period_group_validities.insert({ pegr.key.period_group_code, pegr }); + std::unordered_multimap public_journeys; + for (const auto &pujo : records.public_journeys) + public_journeys.insert({ pujo.key.timetable_version_code, pujo }); + + std::cout << "line_planning_number,journey_number,date,departure_time" << std::endl; + for (const auto &tive : records.timetable_versions) { + std::vector tive_pegrval_ranges; + + auto pegrval_range = period_group_validities.equal_range(tive.key.period_group_code); + for (auto it = pegrval_range.first; it != pegrval_range.second; it++) { + const auto &[_, pegrval] = *it; + tive_pegrval_ranges.emplace_back(pegrval.key.valid_from, pegrval.valid_thru); + } + + DateRangeSeq seq(tive_pegrval_ranges.begin(), tive_pegrval_ranges.end()); + seq = seq.clampFrom(tive.valid_from); + if (tive.valid_thru) + seq = seq.clampThru(*tive.valid_thru); + + for (const auto &range : seq) for (auto date : range) { + auto weekday = std::chrono::year_month_weekday(std::chrono::sys_days(date)).weekday(); + + auto pujo_range = public_journeys.equal_range(tive.key.timetable_version_code); + for (auto itt = pujo_range.first; itt != pujo_range.second; itt++) { + const auto &[_, pujo] = *itt; + + if (pujo.key.line_planning_number == options.line_planning_number && pujo.key.day_type.size() == 7 + && pujo.key.day_type[weekday.iso_encoding() - 1] == static_cast('0' + weekday.iso_encoding())) { + std::cout << pujo.key.line_planning_number << "," << pujo.key.journey_number << "," + << date << "," << pujo.departure_time << std::endl; + } + } + } + } + + if (options.output_file_path != "-"sv) fclose(out); +} diff --git a/src/querykv1/schedule.hpp b/src/querykv1/schedule.hpp new file mode 100644 index 0000000..100bd4c --- /dev/null +++ b/src/querykv1/schedule.hpp @@ -0,0 +1,13 @@ +// vim:set sw=2 ts=2 sts et: + +#ifndef OEUF_QUERYKV1_SCHEDULE_HPP +#define OEUF_QUERYKV1_SCHEDULE_HPP + +#include +#include + +#include "cliopts.hpp" + +void schedule(const Options &options, Kv1Records &records, Kv1Index &index); + +#endif // OEUF_QUERYKV1_SCHEDULE_HPP diff --git a/src/recvkv6/.envrc b/src/recvkv6/.envrc new file mode 100644 index 0000000..694e74f --- /dev/null +++ b/src/recvkv6/.envrc @@ -0,0 +1,2 @@ +source_env ../../ +export DEVMODE=1 diff --git a/src/recvkv6/Makefile b/src/recvkv6/Makefile new file mode 100644 index 0000000..12ff7fb --- /dev/null +++ b/src/recvkv6/Makefile @@ -0,0 +1,21 @@ +# Taken from: +# Open Source Security Foundation (OpenSSF), “Compiler Options Hardening Guide +# for C and C++,” OpenSSF Best Practices Working Group. Accessed: Dec. 01, +# 2023. [Online]. Available: +# https://best.openssf.org/Compiler-Hardening-Guides/Compiler-Options-Hardening-Guide-for-C-and-C++.html +CXXFLAGS=-std=c++2b -g -fno-omit-frame-pointer $(if $(DEVMODE),-Werror,)\ + -O2 -Wall -Wformat=2 -Wconversion -Wtrampolines -Wimplicit-fallthrough \ + -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3 \ + -D_GLIBCXX_ASSERTIONS \ + -fstrict-flex-arrays=3 \ + -fstack-clash-protection -fstack-protector-strong +LDFLAGS=-lzmq -larrow -lparquet -lprometheus-cpp-pull -lprometheus-cpp-core -lz -ltmi8 -Wl,-z,defs \ + -Wl,-z,nodlopen -Wl,-z,noexecstack \ + -Wl,-z,relro -Wl,-z,now + +recvkv6: main.cpp + $(CXX) -o $@ $^ $(CXXFLAGS) $(LDFLAGS) + +.PHONY: clean +clean: + rm recvkv6 diff --git a/src/recvkv6/main.cpp b/src/recvkv6/main.cpp new file mode 100644 index 0000000..2ac3669 --- /dev/null +++ b/src/recvkv6/main.cpp @@ -0,0 +1,1300 @@ +// vim:set sw=2 ts=2 sts et: + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include +#include +#include +#include + +#include + +#include + +#define CHUNK 16384 + +struct RawMessage { + public: + // Takes ownership of envelope and body + RawMessage(zmq_msg_t envelope, zmq_msg_t body) + : envelope(envelope), body(body) + {} + + // Prevent copying + RawMessage(const RawMessage &) = delete; + RawMessage &operator=(RawMessage const &) = delete; + + std::string_view getEnvelope() { + return static_cast(zmq_msg_data(&envelope)); + } + + char *getBody() { + return static_cast(zmq_msg_data(&body)); + } + + size_t getBodySize() { + return zmq_msg_size(&body); + } + + ~RawMessage() { + zmq_msg_close(&envelope); + zmq_msg_close(&body); + } + + private: + zmq_msg_t envelope; + zmq_msg_t body; +}; + +std::optional recvMsg(void *socket) { + while (true) { + zmq_msg_t envelope, body; + int rc = zmq_msg_init(&envelope); + assert(rc == 0); + rc = zmq_msg_init(&body); + assert(rc == 0); + + rc = zmq_msg_recv(&envelope, socket, 0); + if (rc == -1) return std::nullopt; + + int more; + size_t more_size = sizeof(more); + rc = zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size); + if (!more) { + zmq_msg_close(&envelope); + zmq_msg_close(&body); + continue; + } + + rc = zmq_msg_recv(&body, socket, 0); + if (rc == -1) return std::nullopt; + + rc = zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size); + assert(!more); + + return std::make_optional(envelope, body); + } +} + +// Ensures that [output_size] == 0 +char *decompress(char *raw, unsigned int input_size, unsigned int &output_size) { + assert(input_size <= UINT32_MAX); + + z_stream strm; + strm.next_in = reinterpret_cast(raw); + strm.avail_in = input_size; + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + int rc = inflateInit2(&strm, 32); + assert(rc == Z_OK); + + unsigned int buf_cap = CHUNK; + unsigned int buf_len = 0; + char *buf = static_cast(malloc(CHUNK)); + do { + if (buf_len + CHUNK > buf_cap) { + assert(buf_cap <= UINT32_MAX); + buf_cap *= 2; + buf = static_cast(realloc(buf, buf_cap)); + } + strm.avail_out = buf_cap - buf_len; + strm.next_out = reinterpret_cast(buf + buf_len); + + unsigned long old_total = strm.total_out; + rc = inflate(&strm, Z_FINISH); + unsigned progress = static_cast(strm.total_out - old_total); + buf_len += progress; + assert(progress != 0 || rc == Z_STREAM_END); + } while (strm.total_in < input_size); + + if (buf_len == buf_cap) { + buf = static_cast(realloc(buf, buf_len + 1)); + } + buf[buf_len] = 0; + output_size = buf_len; + + rc = inflateEnd(&strm); + assert(rc == Z_OK); + + return buf; +} + +struct Date { + int16_t year = 0; + uint8_t month = 0; + uint8_t day = 0; + + static bool parse(Date &dest, std::string_view src) { + dest.year = 0, dest.month = 0, dest.day = 0; + + int16_t y_mul_fac = 1; + bool extended = false; + + size_t plus = src.find('+'); + if (plus != std::string_view::npos) { + extended = true; + src = src.substr(1); // remove plus sign from the start + } + if (!extended) { + size_t min_or_dash = src.find('-'); + if (min_or_dash == std::string_view::npos) return false; + if (min_or_dash == 0) { + y_mul_fac = -1; // it's a minus sign + src = src.substr(1); // remove minus sign at the start + } + } + + int y_chars = 0; + while (src.size() > 0 && src[0] >= '0' && src[0] <= '9') { + dest.year = static_cast(dest.year * 10 + src[0] - '0'); + src = src.substr(1); + y_chars++; + } + if (src.size() == 0) { dest.year = 0; return false; } + if (src[0] != '-') { dest.year = 0; return false; } + src = src.substr(1); // remove dash + if (y_chars < 4 || (y_chars > 4 && !extended)) { dest.year = 0; return false; } + dest.year *= y_mul_fac; + + bool rest_correct = src.size() == 5 + && src[0] >= '0' && src[0] <= '9' + && src[1] >= '0' && src[1] <= '9' + && src[3] >= '0' && src[3] <= '9' + && src[4] >= '0' && src[4] <= '9'; + if (!rest_correct) { dest.year = 0; return false; } + dest.month = static_cast((src[0] - '0') * 10 + src[1] - '0'); + dest.day = static_cast((src[3] - '0') * 10 + src[4] - '0'); + if (dest.month > 12 || dest.day > 31) { + dest.year = 0, dest.month = 0, dest.day = 0; + return false; + } + return true; + } + + std::string toString() const { + if (year < 0 || year > 9999 || month < 0 || month > 12 || day < 0 || day > 31) + throw std::invalid_argument("one or more date components (year, month, day) out of range"); + char data[11] = "XXXX-XX-XX"; + sprintf(data, "%04u-%02u-%02u", year, month, day); + return data; + } + + std::chrono::days toUnixDays() const { + std::chrono::year_month_day ymd{std::chrono::year(year), std::chrono::month(month), std::chrono::day(day)}; + // This is valid since C++20: as of C++20, the system clock is defined to measure the + // Unix Time, the amount of seconds since Thursday 1 January 1970, without leap seconds. + std::chrono::days since_epoch = std::chrono::sys_days(ymd).time_since_epoch(); + return since_epoch; + } +}; + +struct Time { + uint8_t hour = 0; + uint8_t minute = 0; + uint8_t second = 0; + + static bool parse(Time &dest, std::string_view src) { + bool okay = src.size() == 8 + && src[0] >= '0' && src[0] <= '9' + && src[1] >= '0' && src[1] <= '9' + && src[2] == ':' + && src[3] >= '0' && src[3] <= '9' + && src[4] >= '0' && src[4] <= '9' + && src[5] == ':' + && src[6] >= '0' && src[6] <= '9' + && src[7] >= '0' && src[7] <= '9'; + if (!okay) return false; + dest.hour = static_cast((src[0] - '0') * 10 + src[1] - '0'); + dest.minute = static_cast((src[3] - '0') * 10 + src[4] - '0'); + dest.second = static_cast((src[6] - '0') * 10 + src[7] - '0'); + if (dest.hour > 23 || dest.minute > 59 || dest.second > 59) { + dest.hour = 0, dest.minute = 0, dest.second = 0; + return false; + } + return true; + } + + std::string toString() const { + if (hour < 0 || hour > 23 || minute < 0 || minute > 59 || second < 0 || second > 59) + throw std::invalid_argument("one or more time components (hour, minute, second) out of range"); + char data[9] = "XX:XX:XX"; + sprintf(data, "%02u:%02u:%02u", hour, minute, second); + return data; + } +}; + +// Time zone designator +struct Tzd { + int16_t minutes = 0; + + static bool parse(Tzd &dest, std::string_view src) { + dest.minutes = 0; + + if (src.size() == 0) return false; + if (src == "Z") return true; + + int16_t multiplier = 1; + if (src[0] == '-') multiplier = -1; + else if (src[0] != '+') return false; + src = src.substr(1); + + bool okay = src.size() == 5 + && src[0] >= '0' && src[0] <= '9' + && src[1] >= '0' && src[1] <= '9' + && src[2] == ':' + && src[3] >= '0' && src[3] <= '9' + && src[4] >= '0' && src[4] <= '9'; + if (!okay) return false; + int16_t hours = static_cast((src[0] - '0') * 10 + src[1] - '0'); + int16_t minutes = static_cast((src[3] - '0') * 10 + src[4] - '0'); + if (hours > 23 || minutes > 59) return false; + dest.minutes = static_cast(multiplier * (60 * hours + minutes)); + return true; + } + + std::string toString() const { + if (minutes == 0) + return "Z"; + + bool negative = minutes < 0; + int hours_off = abs(minutes / 60); + int mins_off = abs(minutes) - hours_off*60; + if (hours_off > 23 || mins_off > 59) + throw std::invalid_argument("offset out of range"); + char data[7] = "+XX:XX"; + sprintf(data, "%c%02u:%02u", negative ? '-' : '+', hours_off, mins_off); + return data; + } +}; + +struct Timestamp { + Date date; + Tzd off; + Time time; + + static bool parse(Timestamp &dest, std::string_view src) { + size_t t = src.find('T'); + if (t == std::string_view::npos || t + 1 >= src.size()) return false; + + std::string_view date = src.substr(0, t); + std::string_view time_and_tzd = src.substr(t + 1); + if (time_and_tzd.size() < 9) return false; + if (!Date::parse(dest.date, date)) return false; + + std::string_view time = time_and_tzd.substr(0, 8); + std::string_view tzd = time_and_tzd.substr(8); + if (!Time::parse(dest.time, time)) return false; + return Tzd::parse(dest.off, tzd); + } + + std::string toString() const { + return date.toString() + "T" + time.toString() + off.toString(); + } + + std::chrono::seconds toUnixSeconds() const { + std::chrono::year_month_day ymd(std::chrono::year(date.year), + std::chrono::month(date.month), + std::chrono::day(date.day)); + std::chrono::sys_days sys_days(ymd); + std::chrono::time_point utc_days(sys_days.time_since_epoch()); + std::chrono::utc_seconds utc_seconds = std::chrono::time_point_cast(utc_days); + utc_seconds += std::chrono::hours(time.hour) + std::chrono::minutes(time.minute) + + std::chrono::seconds(time.second) - std::chrono::minutes(off.minutes); + std::chrono::sys_seconds sys_seconds = std::chrono::utc_clock::to_sys(utc_seconds); + std::chrono::seconds unix = sys_seconds.time_since_epoch(); + return unix; + } +}; + +static const std::string_view TMI8_XML_NS = "http://bison.connekt.nl/tmi8/kv6/msg"; + +enum Kv6RecordType { + KV6T_UNKNOWN = 0, + KV6T_DELAY = 1, + KV6T_INIT = 2, + KV6T_ARRIVAL = 3, + KV6T_ON_STOP = 4, + KV6T_DEPARTURE = 5, + KV6T_ON_ROUTE = 6, + KV6T_ON_PATH = 7, + KV6T_OFF_ROUTE = 8, + KV6T_END = 9, + // Always keep this updated to correspond to the + // first and last elements of the enumeration! + _KV6T_FIRST_TYPE = KV6T_UNKNOWN, + _KV6T_LAST_TYPE = KV6T_END, +}; + +enum Kv6Field { + KV6F_NONE = 0, + KV6F_DATA_OWNER_CODE = 1, + KV6F_LINE_PLANNING_NUMBER = 2, + KV6F_OPERATING_DAY = 4, + KV6F_JOURNEY_NUMBER = 8, + KV6F_REINFORCEMENT_NUMBER = 16, + KV6F_TIMESTAMP = 32, + KV6F_SOURCE = 64, + KV6F_PUNCTUALITY = 128, + KV6F_USER_STOP_CODE = 256, + KV6F_PASSAGE_SEQUENCE_NUMBER = 512, + KV6F_VEHICLE_NUMBER = 1024, + KV6F_BLOCK_CODE = 2048, + KV6F_WHEELCHAIR_ACCESSIBLE = 4096, + KV6F_NUMBER_OF_COACHES = 8192, + KV6F_RD_Y = 16384, + KV6F_RD_X = 32768, + KV6F_DISTANCE_SINCE_LAST_USER_STOP = 65536, +}; + +static constexpr Kv6Field KV6T_REQUIRED_FIELDS[_KV6T_LAST_TYPE + 1] = { + // KV6T_UNKNOWN + KV6F_NONE, + // KV6T_DELAY + static_cast( + KV6F_DATA_OWNER_CODE + | KV6F_LINE_PLANNING_NUMBER + | KV6F_OPERATING_DAY + | KV6F_JOURNEY_NUMBER + | KV6F_REINFORCEMENT_NUMBER + | KV6F_TIMESTAMP + | KV6F_SOURCE + | KV6F_PUNCTUALITY), + // KV6T_INIT + static_cast( + KV6F_DATA_OWNER_CODE + | KV6F_LINE_PLANNING_NUMBER + | KV6F_OPERATING_DAY + | KV6F_JOURNEY_NUMBER + | KV6F_REINFORCEMENT_NUMBER + | KV6F_TIMESTAMP + | KV6F_SOURCE + | KV6F_USER_STOP_CODE + | KV6F_PASSAGE_SEQUENCE_NUMBER + | KV6F_VEHICLE_NUMBER + | KV6F_BLOCK_CODE + | KV6F_WHEELCHAIR_ACCESSIBLE + | KV6F_NUMBER_OF_COACHES), + // KV6T_ARRIVAL + static_cast( + KV6F_DATA_OWNER_CODE + | KV6F_LINE_PLANNING_NUMBER + | KV6F_OPERATING_DAY + | KV6F_JOURNEY_NUMBER + | KV6F_REINFORCEMENT_NUMBER + | KV6F_USER_STOP_CODE + | KV6F_PASSAGE_SEQUENCE_NUMBER + | KV6F_TIMESTAMP + | KV6F_SOURCE + | KV6F_VEHICLE_NUMBER + | KV6F_PUNCTUALITY), + // KV6T_ON_STOP + static_cast( + KV6F_DATA_OWNER_CODE + | KV6F_LINE_PLANNING_NUMBER + | KV6F_OPERATING_DAY + | KV6F_JOURNEY_NUMBER + | KV6F_REINFORCEMENT_NUMBER + | KV6F_USER_STOP_CODE + | KV6F_PASSAGE_SEQUENCE_NUMBER + | KV6F_TIMESTAMP + | KV6F_SOURCE + | KV6F_VEHICLE_NUMBER + | KV6F_PUNCTUALITY), + // KV6T_DEPARTURE + static_cast( + KV6F_DATA_OWNER_CODE + | KV6F_LINE_PLANNING_NUMBER + | KV6F_OPERATING_DAY + | KV6F_JOURNEY_NUMBER + | KV6F_REINFORCEMENT_NUMBER + | KV6F_USER_STOP_CODE + | KV6F_PASSAGE_SEQUENCE_NUMBER + | KV6F_TIMESTAMP + | KV6F_SOURCE + | KV6F_VEHICLE_NUMBER + | KV6F_PUNCTUALITY), + // KV6T_ON_ROUTE + static_cast( + KV6F_DATA_OWNER_CODE + | KV6F_LINE_PLANNING_NUMBER + | KV6F_OPERATING_DAY + | KV6F_JOURNEY_NUMBER + | KV6F_REINFORCEMENT_NUMBER + | KV6F_USER_STOP_CODE + | KV6F_PASSAGE_SEQUENCE_NUMBER + | KV6F_TIMESTAMP + | KV6F_SOURCE + | KV6F_VEHICLE_NUMBER + | KV6F_PUNCTUALITY + | KV6F_RD_X + | KV6F_RD_Y), + // KV6T_ON_PATH + KV6F_NONE, + // KV6T_OFF_ROUTE + static_cast( + KV6F_DATA_OWNER_CODE + | KV6F_LINE_PLANNING_NUMBER + | KV6F_OPERATING_DAY + | KV6F_JOURNEY_NUMBER + | KV6F_REINFORCEMENT_NUMBER + | KV6F_TIMESTAMP + | KV6F_SOURCE + | KV6F_USER_STOP_CODE + | KV6F_PASSAGE_SEQUENCE_NUMBER + | KV6F_VEHICLE_NUMBER + | KV6F_RD_X + | KV6F_RD_Y), + // KV6T_END + static_cast( + KV6F_DATA_OWNER_CODE + | KV6F_LINE_PLANNING_NUMBER + | KV6F_OPERATING_DAY + | KV6F_JOURNEY_NUMBER + | KV6F_REINFORCEMENT_NUMBER + | KV6F_TIMESTAMP + | KV6F_SOURCE + | KV6F_USER_STOP_CODE + | KV6F_PASSAGE_SEQUENCE_NUMBER + | KV6F_VEHICLE_NUMBER), +}; + +static constexpr Kv6Field KV6T_OPTIONAL_FIELDS[_KV6T_LAST_TYPE + 1] = { + // KV6T_UNKNOWN + KV6F_NONE, + // KV6T_DELAY + KV6F_NONE, + // KV6T_INIT + KV6F_NONE, + // KV6T_ARRIVAL + static_cast(KV6F_RD_X | KV6F_RD_Y), + // KV6T_ON_STOP + static_cast(KV6F_RD_X | KV6F_RD_Y), + // KV6T_DEPARTURE + static_cast(KV6F_RD_X | KV6F_RD_Y), + // KV6T_ON_ROUTE + KV6F_DISTANCE_SINCE_LAST_USER_STOP, + // KV6T_ON_PATH + KV6F_NONE, + // KV6T_OFF_ROUTE + KV6F_NONE, + // KV6T_END + KV6F_NONE, +}; + +struct Kv6Record { + Kv6RecordType type = KV6T_UNKNOWN; + Kv6Field presence = KV6F_NONE; + Kv6Field next = KV6F_NONE; + std::string data_owner_code; + std::string line_planning_number; + std::string source; + std::string user_stop_code; + std::string wheelchair_accessible; + Date operating_day; + Timestamp timestamp; + uint32_t block_code = 0; + uint32_t journey_number = 0; + uint32_t vehicle_number = 0; + int32_t rd_x = 0; + int32_t rd_y = 0; + // The TMI8 specification is unclear: this field + // might actually be called distancesincelaststop + uint32_t distance_since_last_user_stop = 0; + uint16_t passage_sequence_number = 0; + int16_t punctuality = 0; + uint8_t number_of_coaches = 0; + uint8_t reinforcement_number = 0; + + void markPresent(Kv6Field field) { + presence = static_cast(presence | field); + } + + void removeUnsupportedFields() { + Kv6Field required_fields = KV6T_REQUIRED_FIELDS[type]; + Kv6Field optional_fields = KV6T_OPTIONAL_FIELDS[type]; + Kv6Field supported_fields = static_cast(required_fields | optional_fields); + presence = static_cast(presence & supported_fields); + } + + bool valid() { + Kv6Field required_fields = KV6T_REQUIRED_FIELDS[type]; + Kv6Field optional_fields = KV6T_OPTIONAL_FIELDS[type]; + Kv6Field supported_fields = static_cast(required_fields | optional_fields); + + Kv6Field required_field_presence = static_cast(presence & required_fields); + Kv6Field unsupported_field_presence = static_cast(presence & ~supported_fields); + + return required_field_presence == required_fields && !unsupported_field_presence; + } +}; + +enum Tmi8VvTmPushInfoField { + TMI8F_NONE = 0, + TMI8F_SUBSCRIBER_ID = 1, + TMI8F_VERSION = 2, + TMI8F_DOSSIER_NAME = 4, + TMI8F_TIMESTAMP = 8, +}; + +struct Tmi8VvTmPushInfo { + Tmi8VvTmPushInfoField next = TMI8F_NONE; + Tmi8VvTmPushInfoField presence = TMI8F_NONE; + std::string subscriber_id; + std::string version; + std::string dossier_name; + Timestamp timestamp; + std::vector messages; + + void markPresent(Tmi8VvTmPushInfoField field) { + presence = static_cast(presence | field); + } + + bool valid() { + const Tmi8VvTmPushInfoField REQUIRED_FIELDS = + static_cast( + TMI8F_SUBSCRIBER_ID + | TMI8F_VERSION + | TMI8F_DOSSIER_NAME + | TMI8F_TIMESTAMP); + return (presence & REQUIRED_FIELDS) == REQUIRED_FIELDS; + } +}; + +static const std::array KV6_POS_INFO_RECORD_TYPES = { + "UNKNOWN", "DELAY", "INIT", "ARRIVAL", "ONSTOP", "DEPARTURE", "ONROUTE", "ONPATH", "OFFROUTE", "END", +}; + +std::optional findKv6PosInfoRecordTypeName(Kv6RecordType type) { + if (type > _KV6T_LAST_TYPE) + return std::nullopt; + return KV6_POS_INFO_RECORD_TYPES[type]; +} + +const std::array, 17> KV6_POS_INFO_RECORD_FIELDS = {{ + { "dataownercode", KV6F_DATA_OWNER_CODE }, + { "lineplanningnumber", KV6F_LINE_PLANNING_NUMBER }, + { "operatingday", KV6F_OPERATING_DAY }, + { "journeynumber", KV6F_JOURNEY_NUMBER }, + { "reinforcementnumber", KV6F_REINFORCEMENT_NUMBER }, + { "timestamp", KV6F_TIMESTAMP }, + { "source", KV6F_SOURCE }, + { "punctuality", KV6F_PUNCTUALITY }, + { "userstopcode", KV6F_USER_STOP_CODE }, + { "passagesequencenumber", KV6F_PASSAGE_SEQUENCE_NUMBER }, + { "vehiclenumber", KV6F_VEHICLE_NUMBER }, + { "blockcode", KV6F_BLOCK_CODE }, + { "wheelchairaccessible", KV6F_WHEELCHAIR_ACCESSIBLE }, + { "numberofcoaches", KV6F_NUMBER_OF_COACHES }, + { "rd-y", KV6F_RD_Y }, + { "rd-x", KV6F_RD_X }, + { "distancesincelastuserstop", KV6F_DISTANCE_SINCE_LAST_USER_STOP }, +}}; + +// Returns the maximum amount of digits such that it is guaranteed that +// a corresponding amount of repeated 9's can be represented by the type. +template +constexpr size_t maxDigits() { + size_t digits = 0; + for (T x = std::numeric_limits::max(); x != 0; x /= 10) digits++; + return digits - 1; +} + +template +constexpr bool parseUnsigned(T &out, std::string_view src) { + static_assert(MaxDigits <= maxDigits()); + if (src.size() > MaxDigits) return false; + T res = 0; + while (src.size() > 0) { + if (src[0] < '0' || src[0] > '9') return false; + res = static_cast(res * 10 + src[0] - '0'); + src = src.substr(1); + } + out = res; + return true; +} + +template +constexpr bool parseSigned(T &out, std::string_view src) { + static_assert(MaxDigits <= maxDigits()); + if (src.size() == 0) return false; + bool negative = src[0] == '-'; + if (negative) src = src.substr(1); + if (src.size() > MaxDigits) return false; + T res = 0; + while (src.size() > 0) { + if (src[0] < '0' || src[0] > '9') return false; + res = static_cast(res * 10 + src[0] - '0'); + src = src.substr(1); + } + out = negative ? -res : res; + return true; +} + +struct Xmlns { + const Xmlns *next; + std::string_view prefix; + std::string_view url; +}; + +std::optional resolve(std::string_view prefix, const Xmlns *nss) { + while (nss) + if (nss->prefix == prefix) + return nss->url; + else + nss = nss->next; + return std::nullopt; +} + +template +void withXmlnss(const rapidxml::xml_attribute<> *attr, const Xmlns *nss, const T &fn) { + while (attr) { + std::string_view name(attr->name(), attr->name_size()); + if (name.starts_with("xmlns")) { + if (name.size() == 5) { // just xmlns + Xmlns ns0 = { + .next = nss, + .url = std::string_view(attr->value(), attr->value_size()), + }; + withXmlnss(attr->next_attribute(), &ns0, fn); + return; + } else if (name.size() > 6 && name[5] == ':') { // xmlns: + Xmlns ns0 = { + .next = nss, + .prefix = name.substr(6), + .url = std::string_view(attr->value(), attr->value_size()), + }; + withXmlnss(attr->next_attribute(), &ns0, fn); + return; + } + } + attr = attr->next_attribute(); + } + fn(nss); +} + +template +void ifResolvable(const rapidxml::xml_node<> &node, const Xmlns *nss, const T &fn) { + std::string_view name(node.name(), node.name_size()); + std::string_view ns; + size_t colon = name.find(':'); + + if (colon != std::string_view::npos) { + if (colon >= name.size() - 1) // last character + return; + ns = name.substr(0, colon); + name = name.substr(colon + 1); + } + + withXmlnss(node.first_attribute(), nss, [&](const Xmlns *nss) { + std::optional ns_url = resolve(ns, nss); + if (!ns_url && !ns.empty()) return; + if (!ns_url) fn(std::string_view(), name, nss); + else fn(*ns_url, name, nss); + }); +} + +template +void ifTmi8Element(const rapidxml::xml_node<> &node, const Xmlns *nss, const T &fn) { + ifResolvable(node, nss, [&](std::string_view ns_url, std::string_view name, const Xmlns *nss) { + if (node.type() == rapidxml::node_element && (ns_url.empty() || ns_url == TMI8_XML_NS)) fn(name, nss); + }); +} + +bool onlyTextElement(const rapidxml::xml_node<> &node) { + return node.type() == rapidxml::node_element + && node.first_node() + && node.first_node() == node.last_node() + && node.first_node()->type() == rapidxml::node_data; +} + +std::string_view getValue(const rapidxml::xml_node<> &node) { + return std::string_view(node.value(), node.value_size()); +} + +bool parseStringValue(std::string &into, size_t max_len, std::string_view val) { + if (val.size() > max_len) + return false; + into = val; + return true; +} + +struct Kv6Parser { + std::stringstream &errs; + std::stringstream &warns; + + void error(std::string_view msg) { + errs << msg << '\n'; + } + + void warn(std::string_view msg) { + warns << msg << '\n'; + } + +#define PERRASSERT(msg, ...) do { if (!(__VA_ARGS__)) { error(msg); return; } } while (false) +#define PWARNASSERT(msg, ...) do { if (!(__VA_ARGS__)) { warn(msg); return; } } while (false) + + std::optional parseKv6PosInfoRecord(Kv6RecordType type, const rapidxml::xml_node<> &node, const Xmlns *nss) { + Kv6Record fields = { .type = type }; + for (const rapidxml::xml_node<> *child = node.first_node(); child; child = child->next_sibling()) { + ifTmi8Element(*child, nss, [&](std::string_view name, const Xmlns *nss) { + for (const auto &[fname, field] : KV6_POS_INFO_RECORD_FIELDS) { + if (field == KV6F_NONE) + continue; + if (fname == name) { + PWARNASSERT("Expected KV6 record field element to only contain data", + onlyTextElement(*child)); + std::string_view childval = getValue(*child); + switch (field) { + case KV6F_DATA_OWNER_CODE: + PWARNASSERT("Invalid value for dataownercode", + parseStringValue(fields.data_owner_code, 10, childval)); + break; + case KV6F_LINE_PLANNING_NUMBER: + PWARNASSERT("Invalid value for lineplanningnumber", + parseStringValue(fields.line_planning_number, 10, childval)); + break; + case KV6F_OPERATING_DAY: + PWARNASSERT("Invalid value for operatatingday: not a valid date", + Date::parse(fields.operating_day, childval)); + break; + case KV6F_JOURNEY_NUMBER: + PWARNASSERT("Invalid value for journeynumber:" + " not a valid unsigned number with at most six digits", + parseUnsigned<6>(fields.journey_number, childval)); + break; + case KV6F_REINFORCEMENT_NUMBER: + PWARNASSERT("Invalid value for reinforcementnumber:" + " not a valid unsigned number with at most two digits", + parseUnsigned<2>(fields.reinforcement_number, childval)); + break; + case KV6F_TIMESTAMP: + PWARNASSERT("Invalid value for timestamp: not a valid timestamp", + Timestamp::parse(fields.timestamp, childval)); + break; + case KV6F_SOURCE: + PWARNASSERT("Invalid value for source:" + " not a valid string of at most 10 bytes", + parseStringValue(fields.source, 10, childval)); + break; + case KV6F_PUNCTUALITY: + PWARNASSERT("Invalid value for punctuality:" + " not a valid signed number with at most four digits", + parseSigned<4>(fields.punctuality, childval)); + break; + case KV6F_USER_STOP_CODE: + PWARNASSERT("Invalid value for userstopcode:" + " not a valid string of at most 10 bytes", + parseStringValue(fields.user_stop_code, 10, childval)); + break; + case KV6F_PASSAGE_SEQUENCE_NUMBER: + PWARNASSERT("Invalid value for passagesequencenumber:" + " not a valid unsigned number with at most four digits", + parseUnsigned<4>(fields.passage_sequence_number, childval)); + break; + case KV6F_VEHICLE_NUMBER: + PWARNASSERT("Invalid value for vehiclenumber:" + " not a valid unsigned number with at most six digits", + parseUnsigned<6>(fields.vehicle_number, childval)); + break; + case KV6F_BLOCK_CODE: + PWARNASSERT("Invalid value for blockcode:" + " not a valid unsigned number with at most eight digits", + parseUnsigned<8>(fields.block_code, childval)); + break; + case KV6F_WHEELCHAIR_ACCESSIBLE: + PWARNASSERT("Invalid value for wheelchairaccessible:" + " not a valid value for wheelchair accessibility", + childval == "ACCESSIBLE" + || childval == "NOTACCESSIBLE" + || childval == "UNKNOWN"); + fields.wheelchair_accessible = childval; + break; + case KV6F_NUMBER_OF_COACHES: + PWARNASSERT("Invalid for numberofcoaches:" + " not a valid unsigned number with at most two digits", + parseUnsigned<2>(fields.number_of_coaches, childval)); + break; + case KV6F_RD_X: + PWARNASSERT("Invalid value for rd-x:" + " not a valid signed number with at most six digits", + parseSigned<6>(fields.rd_x, childval)); + break; + case KV6F_RD_Y: + PWARNASSERT("Invalid value for rd-y:" + " not a valid signed number with at most six digits", + parseSigned<6>(fields.rd_y, childval)); + break; + case KV6F_DISTANCE_SINCE_LAST_USER_STOP: + PWARNASSERT("Invalid value for distancesincelastuserstop:" + " not a valid unsigned number with at most five digits", + parseUnsigned<5>(fields.distance_since_last_user_stop, childval)); + break; + case KV6F_NONE: + error("NONE field type case should be unreachable in parseKv6PosInfoRecord"); + return; + } + fields.markPresent(field); + break; + } + } + }); + } + + fields.removeUnsupportedFields(); + + if (!fields.valid()) + return std::nullopt; + return fields; + } + + std::vector parseKv6PosInfo(const rapidxml::xml_node<> &node, const Xmlns *nss) { + std::vector records; + for (const rapidxml::xml_node<> *child = node.first_node(); child; child = child->next_sibling()) { + ifTmi8Element(*child, nss, [&](std::string_view name, const Xmlns *nss) { + for (auto type = _KV6T_FIRST_TYPE; + type != _KV6T_LAST_TYPE; + type = static_cast(type + 1)) { + if (type == KV6T_UNKNOWN) + continue; + if (KV6_POS_INFO_RECORD_TYPES[type] == name) { + auto record = parseKv6PosInfoRecord(type, *child, nss); + if (record) { + records.push_back(*record); + } + } + } + }); + } + return records; + } + + std::optional parseVvTmPush(const rapidxml::xml_node<> &node, const Xmlns *nss) { + Tmi8VvTmPushInfo info; + for (const rapidxml::xml_node<> *child = node.first_node(); child; child = child->next_sibling()) { + ifTmi8Element(*child, nss, [&](std::string_view name, const Xmlns *nss) { + if (name == "Timestamp") { + PERRASSERT("Invalid value for Timestamp: Bad format", onlyTextElement(*child)); + PERRASSERT("Invalid value for Timestamp: Invalid timestamp", Timestamp::parse(info.timestamp, getValue(*child))); + info.markPresent(TMI8F_TIMESTAMP); + } else if (name == "SubscriberID") { + PERRASSERT("Invalid value for SubscriberID: Bad format", onlyTextElement(*child)); + info.subscriber_id = getValue(*child); + info.markPresent(TMI8F_SUBSCRIBER_ID); + } else if (name == "Version") { + PERRASSERT("Invalid value for Version: Bad format", onlyTextElement(*child)); + info.version = getValue(*child); + info.markPresent(TMI8F_VERSION); + } else if (name == "DossierName") { + PERRASSERT("Invalid value for DossierName: Bad format", onlyTextElement(*child)); + info.dossier_name = getValue(*child); + info.markPresent(TMI8F_DOSSIER_NAME); + } else if (name == "KV6posinfo") { + info.messages = parseKv6PosInfo(*child, nss); + } + }); + } + + if (!info.valid()) + return std::nullopt; + return info; + } + + std::optional parse(const rapidxml::xml_document<> &doc) { + std::optional msg; + for (const rapidxml::xml_node<> *node = doc.first_node(); node; node = node->next_sibling()) { + ifTmi8Element(*node, nullptr /* nss */, [&](std::string_view name, const Xmlns *nss) { + if (name == "VV_TM_PUSH") { + if (msg) { + error("Duplicated VV_TM_PUSH"); + return; + } + msg = parseVvTmPush(*node, nss); + if (!msg) { + error("Invalid VV_TM_PUSH"); + } + } + }); + } + if (!msg) + error("Expected to find VV_TM_PUSH"); + return msg; + } +}; + +std::optional parseXml(const rapidxml::xml_document<> &doc, std::stringstream &errs, std::stringstream &warns) { + Kv6Parser parser = { errs, warns }; + return parser.parse(doc); +} + +struct Metrics { + prometheus::Counter &messages_counter_ok; + prometheus::Counter &messages_counter_error; + prometheus::Counter &messages_counter_warning; + prometheus::Counter &rows_written_counter; + prometheus::Histogram &records_hist; + prometheus::Histogram &message_parse_hist; + prometheus::Histogram &payload_size_hist; + + using BucketBoundaries = prometheus::Histogram::BucketBoundaries; + + enum class ParseStatus { + OK, + WARNING, + ERROR, + }; + + Metrics(std::shared_ptr registry) : + Metrics(registry, prometheus::BuildCounter() + .Name("kv6_vv_tm_push_messages_total") + .Help("Number of KV6 VV_TM_PUSH messages received") + .Register(*registry)) + {} + + void addMeasurement(std::chrono::duration took_secs, size_t payload_size, size_t records, ParseStatus parsed) { + double millis = took_secs.count() * 1000.0; + + if (parsed == ParseStatus::OK) messages_counter_ok.Increment(); + else if (parsed == ParseStatus::WARNING) messages_counter_warning.Increment(); + else if (parsed == ParseStatus::ERROR) messages_counter_error.Increment(); + records_hist.Observe(static_cast(records)); + message_parse_hist.Observe(millis); + payload_size_hist.Observe(static_cast(payload_size)); + } + + void rowsWritten(int64_t rows) { + rows_written_counter.Increment(static_cast(rows)); + } + + private: + Metrics(std::shared_ptr registry, + prometheus::Family &messages_counter) : + messages_counter_ok(messages_counter + .Add({{ "status", "ok" }})), + messages_counter_error(messages_counter + .Add({{ "status", "error" }})), + messages_counter_warning(messages_counter + .Add({{ "status", "warning" }})), + rows_written_counter(prometheus::BuildCounter() + .Name("kv6_vv_tm_push_records_written") + .Help("Numer of VV_TM_PUSH records written to disk") + .Register(*registry) + .Add({})), + records_hist(prometheus::BuildHistogram() + .Name("kv6_vv_tm_push_records_amount") + .Help("Number of KV6 VV_TM_PUSH records") + .Register(*registry) + .Add({}, BucketBoundaries{ 5.0, 10.0, 20.0, 50.0, 100.0, 250.0, 500.0 })), + message_parse_hist(prometheus::BuildHistogram() + .Name("kv6_vv_tm_push_message_parse_millis") + .Help("Milliseconds taken to parse KV6 VV_TM_PUSH messages") + .Register(*registry) + .Add({}, BucketBoundaries{ 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 100.0, 1000.0, 2000.0 })), + payload_size_hist(prometheus::BuildHistogram() + .Name("kv6_payload_size") + .Help("Sizes of KV6 ZeroMQ message payloads") + .Register(*registry) + .Add({}, BucketBoundaries{ 500.0, 1000.0, 2500.0, 5000.0, 10000.0, 25000.0, 50000.0 })) + {} +}; + +// Note: it *must* hold that decompressed[size] == 0 +std::optional parseMsg(char *decompressed, size_t size, Metrics &metrics, std::stringstream &errs, std::stringstream &warns) { + auto start = std::chrono::steady_clock::now(); + + std::optional info; + + if (decompressed[size] != 0) { + errs << "Not parsing: missing null terminator" << '\n'; + } else { + rapidxml::xml_document<> doc; + constexpr int PARSE_FLAGS = rapidxml::parse_trim_whitespace + | rapidxml::parse_no_string_terminators + | rapidxml::parse_validate_closing_tags; + + try { + doc.parse(decompressed); + info = parseXml(doc, errs, warns); + } catch (const rapidxml::parse_error &err) { + errs << "XML parsing failed" << '\n'; + } + } + + auto end = std::chrono::steady_clock::now(); + std::chrono::duration took = end - start; + + if (info) + if (warns.view().empty()) + metrics.addMeasurement(took, size, info->messages.size(), Metrics::ParseStatus::OK); + else + metrics.addMeasurement(took, size, info->messages.size(), Metrics::ParseStatus::WARNING); + else + metrics.addMeasurement(took, size, 0, Metrics::ParseStatus::ERROR); + + return info; +} + +bool terminate = false; + +void onSigIntOrTerm(int /* signum */) { + terminate = true; +} + +arrow::Result> getTable(const std::vector &messages, size_t &rows_written) { + ParquetBuilder builder; + + for (const auto &msg : messages) { + Kv6Field present = msg.presence; + Kv6Field required = KV6T_REQUIRED_FIELDS[msg.type]; + Kv6Field optional = KV6T_OPTIONAL_FIELDS[msg.type]; + if ((~msg.presence & required) != 0) { + std::cout << "Invalid message: not all required fields present; skipping" << std::endl; + continue; + } + Kv6Field used = static_cast(present & (required | optional)); + rows_written++; + + // RD-X and RD-Y fix: some datatypes have these fields marked as required, but still give option + // of not providing these fields by setting them to -1. We want this normalized, where these + // fields are instead simply marked as not present. + if ((used & KV6F_RD_X) && msg.rd_x == -1) + used = static_cast(used & ~KV6F_RD_X); + if ((used & KV6F_RD_Y) && msg.rd_y == -1) + used = static_cast(used & ~KV6F_RD_Y); + + ARROW_RETURN_NOT_OK(builder.types.Append(*findKv6PosInfoRecordTypeName(msg.type))); + ARROW_RETURN_NOT_OK(used & KV6F_DATA_OWNER_CODE + ? builder.data_owner_codes.Append(msg.data_owner_code) + : builder.data_owner_codes.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_LINE_PLANNING_NUMBER + ? builder.line_planning_numbers.Append(msg.line_planning_number) + : builder.line_planning_numbers.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_OPERATING_DAY + ? builder.operating_days.Append(static_cast(msg.operating_day.toUnixDays().count())) + : builder.operating_days.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_JOURNEY_NUMBER + ? builder.journey_numbers.Append(msg.journey_number) + : builder.journey_numbers.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_REINFORCEMENT_NUMBER + ? builder.reinforcement_numbers.Append(msg.reinforcement_number) + : builder.reinforcement_numbers.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_TIMESTAMP + ? builder.timestamps.Append(msg.timestamp.toUnixSeconds().count()) + : builder.timestamps.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_SOURCE + ? builder.sources.Append(msg.source) + : builder.sources.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_PUNCTUALITY + ? builder.punctualities.Append(msg.punctuality) + : builder.punctualities.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_USER_STOP_CODE + ? builder.user_stop_codes.Append(msg.user_stop_code) + : builder.user_stop_codes.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_PASSAGE_SEQUENCE_NUMBER + ? builder.passage_sequence_numbers.Append(msg.passage_sequence_number) + : builder.passage_sequence_numbers.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_VEHICLE_NUMBER + ? builder.vehicle_numbers.Append(msg.vehicle_number) + : builder.vehicle_numbers.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_BLOCK_CODE + ? builder.block_codes.Append(msg.block_code) + : builder.block_codes.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_WHEELCHAIR_ACCESSIBLE + ? builder.wheelchair_accessibles.Append(msg.wheelchair_accessible) + : builder.wheelchair_accessibles.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_NUMBER_OF_COACHES + ? builder.number_of_coaches.Append(msg.number_of_coaches) + : builder.number_of_coaches.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_RD_Y + ? builder.rd_ys.Append(msg.rd_y) + : builder.rd_ys.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_RD_X + ? builder.rd_xs.Append(msg.rd_x) + : builder.rd_xs.AppendNull()); + ARROW_RETURN_NOT_OK(used & KV6F_DISTANCE_SINCE_LAST_USER_STOP + ? builder.distance_since_last_user_stops.Append(msg.distance_since_last_user_stop) + : builder.distance_since_last_user_stops.AppendNull()); + } + + return builder.getTable(); +} + +std::tuple getMinMaxTimestamp(const std::vector &messages) { + if (messages.size() == 0) + return { 0, 0 }; + int64_t min = std::numeric_limits::max(); + int64_t max = 0; + for (const auto &message : messages) { + if (~message.presence & KV6F_TIMESTAMP) + continue; + int64_t seconds = message.timestamp.toUnixSeconds().count(); + if (seconds < min) + min = seconds; + if (seconds > max) + max = seconds; + } + if (min == std::numeric_limits::max()) + return { 0, 0 }; // this is stupid + return { min, max }; +} + +arrow::Status writeParquet(const std::vector &messages, Metrics &metrics) { + size_t rows_written = 0; + ARROW_ASSIGN_OR_RAISE(std::shared_ptr table, getTable(messages, rows_written)); + + auto timestamp = std::chrono::round(std::chrono::utc_clock::now()); + std::string filename = std::format("oeuf-{:%FT%T%Ez}.parquet", timestamp); + ARROW_RETURN_NOT_OK(writeArrowTableAsParquetFile(*table, filename)); + std::cout << "Wrote Parquet file " << filename << std::endl; + + auto [min_timestamp, max_timestamp] = getMinMaxTimestamp(messages); + std::ofstream metaf(filename + ".meta.json.part", std::ios::binary); + nlohmann::json meta{ + { "min_timestamp", min_timestamp }, + { "max_timestamp", max_timestamp }, + { "rows_written", rows_written }, + }; + metaf << meta; + metaf.close(); + std::filesystem::rename(filename + ".meta.json.part", filename + ".meta.json"); + + metrics.rowsWritten(rows_written); + + return arrow::Status::OK(); +} + +using SteadyTime = std::chrono::steady_clock::time_point; + +std::string dumpFailedMsg(std::string_view txt, std::string_view errs, std::string_view warns) { + auto timestamp = std::chrono::round(std::chrono::utc_clock::now()); + std::string filename = std::format("oeuf-error-{:%FT%T%Ez}.txt", timestamp); + std::ofstream dumpf(filename, std::ios::binary); + dumpf << "======= ERROR MESSAGES ========" << std::endl; + dumpf << errs; + dumpf << "======= WARNING MESSAGES ======" << std::endl; + dumpf << warns; + dumpf << "======= RECEIVED MESSAGE ======" << std::endl; + dumpf << txt << std::endl; + dumpf.close(); + return filename; +} + +void handleMsg(RawMessage &msg, Metrics &metrics, SteadyTime &last_output, std::vector &msg_buf) { + unsigned int decompressed_size = 0; + if (msg.getBodySize() > std::numeric_limits::max()) + std::cout << "parseMsg failed due to too large message" << std::endl; + char *decompressed = decompress(msg.getBody(), static_cast(msg.getBodySize()), decompressed_size); + + std::stringstream errs; + std::stringstream warns; + // We know that decompressed[decompressed_size] == 0 because decompress() ensures this. + auto parsed_msg = parseMsg(decompressed, decompressed_size, metrics, errs, warns); + if (parsed_msg) { + const Tmi8VvTmPushInfo &info = *parsed_msg; + auto new_msgs_it = info.messages.begin(); + while (new_msgs_it != info.messages.end()) { + size_t remaining_space = MAX_PARQUET_CHUNK - msg_buf.size(); + size_t new_msgs_left = info.messages.end() - new_msgs_it; + auto new_msgs_start = new_msgs_it; + auto new_msgs_end = new_msgs_start + std::min(remaining_space, new_msgs_left); + new_msgs_it = new_msgs_end; + msg_buf.insert(msg_buf.end(), new_msgs_start, new_msgs_end); + + bool time_expired = std::chrono::steady_clock::now() - last_output > std::chrono::minutes(5); + if (msg_buf.size() >= MAX_PARQUET_CHUNK || (new_msgs_it == info.messages.end() && time_expired)) { + arrow::Status status = writeParquet(msg_buf, metrics); + if (!status.ok()) + std::cout << "Writing Parquet file failed: " << status << std::endl; + msg_buf.clear(); + last_output = std::chrono::steady_clock::now(); + } + } + if (!errs.view().empty() || !warns.view().empty()) { + std::filesystem::path dump_file = dumpFailedMsg(std::string_view(decompressed, decompressed_size), errs.str(), warns.str()); + std::cout << "parseMsg finished with warnings: details dumped to " << dump_file << std::endl; + } + } else { + std::filesystem::path dump_file = dumpFailedMsg(std::string_view(decompressed, decompressed_size), errs.str(), warns.str()); + std::cout << "parseMsg failed: error details dumped to " << dump_file << std::endl; + } + free(decompressed); +} + +int main(int argc, char *argv[]) { + std::cout << "Working directory: " << std::filesystem::current_path() << std::endl; + + const char *metrics_addr = getenv("METRICS_ADDR"); + if (!metrics_addr || strlen(metrics_addr) == 0) { + std::cout << "Error: no METRICS_ADDR set!" << std::endl; + exit(EXIT_FAILURE); + } + prometheus::Exposer exposer{metrics_addr}; + + bool prod = false; + const char *prod_env = getenv("NDOV_PRODUCTION"); + if (prod_env && strcmp(prod_env, "true") == 0) prod = true; + + void *zmq_context = zmq_ctx_new(); + void *zmq_subscriber = zmq_socket(zmq_context, ZMQ_SUB); + int rc = zmq_connect(zmq_subscriber, prod ? "tcp://pubsub.ndovloket.nl:7658" : "tcp://pubsub.besteffort.ndovloket.nl:7658"); + assert(rc == 0); + + const char *topic = "/CXX/KV6posinfo"; + rc = zmq_setsockopt(zmq_subscriber, ZMQ_SUBSCRIBE, topic, strlen(topic)); + assert(rc == 0); + + signal(SIGINT, onSigIntOrTerm); + signal(SIGTERM, onSigIntOrTerm); + + SteadyTime last_output = std::chrono::steady_clock::now(); + + auto registry = std::make_shared(); + Metrics metrics(registry); + exposer.RegisterCollectable(registry); + + std::vector msg_buf; + while (!terminate) { + std::optional msg = recvMsg(zmq_subscriber); + if (!msg) { + if (!terminate) + perror("recvMsg"); + continue; + } + handleMsg(*msg, metrics, last_output, msg_buf); + } + + std::cout << "Terminating" << std::endl; + if (msg_buf.size() > 0) { + arrow::Status status = writeParquet(msg_buf, metrics); + if (!status.ok()) std::cout << "Writing final Parquet file failed: " << status << std::endl; + else std::cout << "Final data written" << std::endl; + msg_buf.clear(); + } + + if (zmq_close(zmq_subscriber)) + perror("zmq_close"); + if (zmq_ctx_destroy(zmq_context)) + perror("zmq_ctx_destroy"); + + std::cout << "Bye" << std::endl; + + return 0; +} -- cgit v1.2.3