From 2277d2fe64387f69bb35b31078187e6f3f568599 Mon Sep 17 00:00:00 2001 From: Emmy D'Anello Date: Wed, 8 May 2024 10:30:25 +0200 Subject: [PATCH] Better management of GTFS realtime --- .../management/commands/update_sncf_gtfs.py | 8 +- .../commands/update_sncf_gtfs_rt.py | 286 ++++++++++-------- sncfgtfs/models.py | 2 +- 3 files changed, 171 insertions(+), 125 deletions(-) diff --git a/sncfgtfs/management/commands/update_sncf_gtfs.py b/sncfgtfs/management/commands/update_sncf_gtfs.py index dee97d7..7adee86 100644 --- a/sncfgtfs/management/commands/update_sncf_gtfs.py +++ b/sncfgtfs/management/commands/update_sncf_gtfs.py @@ -17,10 +17,10 @@ class Command(BaseCommand): "IC": "https://eu.ftp.opendatasoft.com/sncf/gtfs/export-intercites-gtfs-last.zip", "TER": "https://eu.ftp.opendatasoft.com/sncf/gtfs/export-ter-gtfs-last.zip", "TN": "https://eu.ftp.opendatasoft.com/sncf/gtfs/transilien-gtfs.zip", - # "ES": "https://www.data.gouv.fr/fr/datasets/r/9089b550-696e-4ae0-87b5-40ea55a14292", - # "TI": "https://www.data.gouv.fr/fr/datasets/r/4d1dd21a-b061-47ac-9514-57ffcc09b4a5", - # "RENFE": "https://ssl.renfe.com/gtransit/Fichero_AV_LD/google_transit.zip", - # "OBB": "https://static.oebb.at/open-data/soll-fahrplan-gtfs/GTFS_OP_2024_obb.zip", + "ES": "https://www.data.gouv.fr/fr/datasets/r/9089b550-696e-4ae0-87b5-40ea55a14292", + "TI": "https://thello.axelor.com/public/gtfs/gtfs.zip", + "RENFE": "https://ssl.renfe.com/gtransit/Fichero_AV_LD/google_transit.zip", + "OBB": "https://static.oebb.at/open-data/soll-fahrplan-gtfs/GTFS_OP_2024_obb.zip", } def add_arguments(self, parser): diff --git a/sncfgtfs/management/commands/update_sncf_gtfs_rt.py b/sncfgtfs/management/commands/update_sncf_gtfs_rt.py index 6eeda6e..24d2f79 100644 --- a/sncfgtfs/management/commands/update_sncf_gtfs_rt.py +++ b/sncfgtfs/management/commands/update_sncf_gtfs_rt.py @@ -3,10 +3,12 @@ from zoneinfo import ZoneInfo import requests from django.core.management import BaseCommand -from django.db.models import Q, Max +from django.db.models import Q from sncfgtfs.gtfs_realtime_pb2 import FeedMessage -from sncfgtfs.models import Calendar, CalendarDate, StopTime, StopTimeUpdate, Trip, TripUpdate, Stop +from sncfgtfs.models import Agency, Calendar, CalendarDate, ExceptionType, LocationType, PickupType, \ + Route, RouteType, Stop, StopScheduleRelationship, StopTime, StopTimeUpdate, \ + Trip, TripUpdate, TripScheduleRelationship class Command(BaseCommand): @@ -16,6 +18,7 @@ class Command(BaseCommand): "TGV": "https://proxy.transport.data.gouv.fr/resource/sncf-tgv-gtfs-rt-trip-updates", "IC": "https://proxy.transport.data.gouv.fr/resource/sncf-ic-gtfs-rt-trip-updates", "TER": "https://proxy.transport.data.gouv.fr/resource/sncf-ter-gtfs-rt-trip-updates", + "TI": "https://thello.axelor.com/public/gtfs/GTFS-RT.bin", } def add_arguments(self, parser): @@ -35,123 +38,21 @@ class Command(BaseCommand): trip_id = trip_update.trip.trip_id if feed_type in ["TGV", "IC", "TER"]: trip_id = trip_id.split(":", 1)[0] + start_date = date(year=int(trip_update.trip.start_date[:4]), month=int(trip_update.trip.start_date[4:6]), day=int(trip_update.trip.start_date[6:])) start_dt = datetime.combine(start_date, time(0), tzinfo=ZoneInfo("Europe/Paris")) - if trip_update.trip.schedule_relationship == 1: - headsign = trip_id[5:-1] - trip_qs = Trip.objects.all() - trip_ids = trip_qs.values_list('id', flat=True) - - first_stop_queryset = StopTime.objects.filter( - stop__parent_station_id=trip_update.stop_time_update[0].stop_id, - ).values('trip_id') - last_stop_queryset = StopTime.objects.filter( - stop__parent_station_id=trip_update.stop_time_update[-1].stop_id, - ).values('trip_id') - trip_ids = trip_ids.intersection(first_stop_queryset).intersection(last_stop_queryset) - for stop_sequence, stop_time_update in enumerate(trip_update.stop_time_update): - stop_id = stop_time_update.stop_id - st_queryset = StopTime.objects.filter(stop__parent_station_id=stop_id) - if stop_sequence == 0: - st_queryset = st_queryset.filter(stop_sequence=0) - trip_ids_restrict = trip_ids.intersection(st_queryset.values('trip_id')) - if trip_ids_restrict: - trip_ids = trip_ids_restrict - else: - stop = Stop.objects.get(id=stop_id) - self.stdout.write(self.style.WARNING(f"Warning: No trip is found passing by stop " - f"{stop.name} ({stop_id})")) - trip_ids = set(trip_ids) - route_ids = set(Trip.objects.filter(id__in=trip_ids).values_list('route_id', flat=True)) - self.stdout.write(f"{len(route_ids)} routes found on trip for new train {headsign}") - if not route_ids: - self.stdout.write(f"Route not found for trip {trip_id}.") - continue - elif len(route_ids) > 1: - self.stdout.write(f"Multiple routes found for trip {trip_id}.") - self.stdout.write(", ".join(route_ids)) - route_id = route_ids.pop() - - Calendar.objects.update_or_create( - id=f"{feed_type}-new-{headsign}", - defaults={ - "transport_type": feed_type, - "monday": False, - "tuesday": False, - "wednesday": False, - "thursday": False, - "friday": False, - "saturday": False, - "sunday": False, - "start_date": start_date, - "end_date": start_date, - } - ) - CalendarDate.objects.update_or_create( - id=f"{feed_type}-{headsign}-{trip_update.trip.start_date}", - defaults={ - "service_id": f"{feed_type}-new-{headsign}", - "date": trip_update.trip.start_date, - "exception_type": 1, - } - ) - Trip.objects.update_or_create( - id=trip_id, - defaults={ - "route_id": route_id, - "service_id": f"{feed_type}-new-{headsign}", - "headsign": headsign, - "direction_id": trip_update.trip.direction_id, - } - ) - - sample_trip = Trip.objects.filter(id__in=trip_ids, route_id=route_id).first() - for stop_sequence, stop_time_update in enumerate(trip_update.stop_time_update): - stop_id = stop_time_update.stop_id - stop = Stop.objects.get(id=stop_id) - if stop.location_type == 1: - if not StopTime.objects.filter(trip_id=trip_id).exists(): - print(trip_id, sample_trip.id) - stop = StopTime.objects.get(trip_id=sample_trip.id, - stop__parent_station_id=stop_id).stop - elif StopTime.objects.filter(trip_id=trip_id, stop__parent_station_id=stop_id).exists(): - stop = StopTime.objects.get(trip_id=trip_id, stop__parent_station_id=stop_id).stop - else: - stop = next(s for s in Stop.objects.filter(parent_station_id=stop_id).all() - for s2 in StopTime.objects.filter(trip_id=trip_id).all() - if s.stop_type in s2.stop.stop_type - or s2.stop.stop_type in s.stop_type) - stop_id = stop.id - - arr_time = datetime.fromtimestamp(stop_time_update.arrival.time, - tz=ZoneInfo("Europe/Paris")) - start_dt - dep_time = datetime.fromtimestamp(stop_time_update.departure.time, - tz=ZoneInfo("Europe/Paris")) - start_dt - - pickup_type = 0 if stop_time_update.departure.time and stop_sequence > 0 else 1 - drop_off_type = 0 if stop_time_update.arrival.time \ - and stop_sequence < len(trip_update.stop_time_update) - 1 else 1 - - StopTime.objects.update_or_create( - id=f"{trip_id}-{stop_id}", - trip_id=trip_id, - defaults={ - "stop_id": stop_id, - "stop_sequence": stop_sequence, - "arrival_time": arr_time, - "departure_time": dep_time, - "pickup_type": pickup_type, - "drop_off_type": drop_off_type, - } - ) + if trip_update.trip.schedule_relationship == TripScheduleRelationship.ADDED: + # C'est un trajet nouveau. On crée le trajet associé. + self.create_trip(trip_update, trip_id, start_dt, feed_type) if not Trip.objects.filter(id=trip_id).exists(): self.stdout.write(f"Trip {trip_id} does not exist in the GTFS feed.") continue + # Création du TripUpdate tu, _created = TripUpdate.objects.update_or_create( trip_id=trip_id, start_date=trip_update.trip.start_date, @@ -164,13 +65,16 @@ class Command(BaseCommand): for stop_sequence, stop_time_update in enumerate(trip_update.stop_time_update): stop_id = stop_time_update.stop_id if stop_id.startswith('StopArea:'): + # On est dans le cadre d'une gare. On cherche le quai associé. if StopTime.objects.filter(trip_id=trip_id, stop__parent_station_id=stop_id).exists(): + # U stop = StopTime.objects.get(trip_id=trip_id, stop__parent_station_id=stop_id).stop else: - stop = next(s for s in Stop.objects.filter(parent_station_id=stop_id).all() - for s2 in StopTime.objects.filter(trip_id=trip_id).all() - if s.stop_type in s2.stop.stop_type - or s2.stop.stop_type in s.stop_type) + stops = [s for s in Stop.objects.filter(parent_station_id=stop_id).all() + for s2 in StopTime.objects.filter(trip_id=trip_id).all() + if s.stop_type in s2.stop.stop_type + or s2.stop.stop_type in s.stop_type] + stop = stops[0] if stops else Stop.objects.get(id=stop_id) st, _created = StopTime.objects.update_or_create( id=f"{trip_id}-{stop.id}", @@ -182,16 +86,18 @@ class Command(BaseCommand): tz=ZoneInfo("Europe/Paris")) - start_dt, "departure_time": datetime.fromtimestamp(stop_time_update.departure.time, tz=ZoneInfo("Europe/Paris")) - start_dt, - "pickup_type": 0 if stop_time_update.departure.time else 1, - "drop_off_type": 0 if stop_time_update.arrival.time else 1, + "pickup_type": (PickupType.REGULAR if stop_time_update.departure.time + else PickupType.NONE), + "drop_off_type": (PickupType.REGULAR if stop_time_update.arrival.time + else PickupType.NONE), } ) - elif stop_time_update.schedule_relationship == 1: + elif stop_time_update.schedule_relationship == StopScheduleRelationship.SKIPPED: st = StopTime.objects.get(Q(stop=stop_id) | Q(stop__parent_station_id=stop_id), trip_id=trip_id) - if st.pickup_type != 1 or st.drop_off_type != 1: - st.pickup_type = 1 - st.drop_off_type = 1 + if st.pickup_type != PickupType.NONE or st.drop_off_type != PickupType.NONE: + st.pickup_type = PickupType.NONE + st.drop_off_type = PickupType.NONE st.save() else: qs = StopTime.objects.filter(Q(stop=stop_id) | Q(stop__parent_station_id=stop_id), @@ -213,7 +119,8 @@ class Command(BaseCommand): departure_delay=timedelta(seconds=stop_time_update.departure.delay), departure_time=datetime.fromtimestamp(stop_time_update.departure.time, tz=ZoneInfo("Europe/Paris")), - schedule_relationship=stop_time_update.schedule_relationship or 0, + schedule_relationship=stop_time_update.schedule_relationship + or StopScheduleRelationship.SCHEDULED, ) stop_times_updates.append(st_update) else: @@ -224,3 +131,142 @@ class Command(BaseCommand): update_fields=['arrival_delay', 'arrival_time', 'departure_delay', 'departure_time'], unique_fields=['trip_update', 'stop_time']) + + def create_trip(self, trip_update, trip_id, start_dt, feed_type): + headsign = trip_id[5:-1] + trip_qs = Trip.objects.all() + trip_ids = trip_qs.values_list('id', flat=True) + + first_stop_queryset = StopTime.objects.filter( + stop__parent_station_id=trip_update.stop_time_update[0].stop_id, + ).values('trip_id') + last_stop_queryset = StopTime.objects.filter( + stop__parent_station_id=trip_update.stop_time_update[-1].stop_id, + ).values('trip_id') + + trip_ids = trip_ids.intersection(first_stop_queryset).intersection(last_stop_queryset) + # print(trip_id, trip_ids) + for stop_sequence, stop_time_update in enumerate(trip_update.stop_time_update): + stop_id = stop_time_update.stop_id + st_queryset = StopTime.objects.filter(stop__parent_station_id=stop_id) + if stop_sequence == 0: + st_queryset = st_queryset.filter(stop_sequence=0) + # print(stop_sequence, Stop.objects.get(id=stop_id).name, stop_time_update) + # print(trip_ids) + # print(st_queryset.values('trip_id').all()) + trip_ids_restrict = trip_ids.intersection(st_queryset.values('trip_id')) + if trip_ids_restrict: + trip_ids = trip_ids_restrict + else: + stop = Stop.objects.get(id=stop_id) + self.stdout.write(self.style.WARNING(f"Warning: No trip is found passing by stop " + f"{stop.name} ({stop_id})")) + trip_ids = set(trip_ids) + route_ids = set(Trip.objects.filter(id__in=trip_ids).values_list('route_id', flat=True)) + self.stdout.write(f"{len(route_ids)} routes found on trip for new train {headsign}") + if not route_ids: + origin_id = trip_update.stop_time_update[0].stop_id + origin = Stop.objects.get(id=origin_id) + destination_id = trip_update.stop_time_update[-1].stop_id + destination = Stop.objects.get(id=destination_id) + trip_name = f"{origin.name} - {destination.name}" + trip_reverse_name = f"{destination.name} - {origin.name}" + route_qs = Route.objects.filter(long_name=trip_name, transport_type=feed_type) + route_reverse_qs = Route.objects.filter(long_name=trip_reverse_name, + transport_type=feed_type) + if route_qs.exists(): + route_ids = set(route_qs.values_list('id', flat=True)) + elif route_reverse_qs.exists(): + route_ids = set(route_reverse_qs.values_list('id', flat=True)) + else: + self.stdout.write(f"Route not found for trip {trip_id} ({trip_name}). Creating new one") + route = Route.objects.create( + id=f"CREATED-{trip_name}", + agency=Agency.objects.filter(routes__transport_type=feed_type).first(), + transport_type=feed_type, + type=RouteType.RAIL, + short_name=trip_name, + long_name=trip_name, + ) + route_ids = {route.id} + self.stdout.write(f"Route {route.id} created for trip {trip_id} ({trip_name})") + elif len(route_ids) > 1: + self.stdout.write(f"Multiple routes found for trip {trip_id}.") + self.stdout.write(", ".join(route_ids)) + route_id = route_ids.pop() + + Calendar.objects.update_or_create( + id=f"{feed_type}-new-{headsign}", + defaults={ + "transport_type": feed_type, + "monday": False, + "tuesday": False, + "wednesday": False, + "thursday": False, + "friday": False, + "saturday": False, + "sunday": False, + "start_date": start_dt.date(), + "end_date": start_dt.date(), + } + ) + CalendarDate.objects.update_or_create( + id=f"{feed_type}-{headsign}-{trip_update.trip.start_date}", + defaults={ + "service_id": f"{feed_type}-new-{headsign}", + "date": trip_update.trip.start_date, + "exception_type": ExceptionType.ADDED, + } + ) + Trip.objects.update_or_create( + id=trip_id, + defaults={ + "route_id": route_id, + "service_id": f"{feed_type}-new-{headsign}", + "headsign": headsign, + "direction_id": trip_update.trip.direction_id, + } + ) + + sample_trip = Trip.objects.filter(id__in=trip_ids, route_id=route_id) + sample_trip = sample_trip.first() if sample_trip.exists() else None + for stop_sequence, stop_time_update in enumerate(trip_update.stop_time_update): + stop_id = stop_time_update.stop_id + stop = Stop.objects.get(id=stop_id) + if stop.location_type == LocationType.STATION: + if not StopTime.objects.filter(trip_id=trip_id).exists(): + if sample_trip: + stop = StopTime.objects.get(trip_id=sample_trip.id, + stop__parent_station_id=stop_id).stop + elif StopTime.objects.filter(trip_id=trip_id, stop__parent_station_id=stop_id).exists(): + stop = StopTime.objects.get(trip_id=trip_id, stop__parent_station_id=stop_id).stop + else: + stops = [s for s in Stop.objects.filter(parent_station_id=stop_id).all() + for s2 in StopTime.objects.filter(trip_id=trip_id).all() + if s.stop_type in s2.stop.stop_type + or s2.stop.stop_type in s.stop_type] + stop = stops[0] if stops else stop + stop_id = stop.id + + arr_time = datetime.fromtimestamp(stop_time_update.arrival.time, + tz=ZoneInfo("Europe/Paris")) - start_dt + dep_time = datetime.fromtimestamp(stop_time_update.departure.time, + tz=ZoneInfo("Europe/Paris")) - start_dt + + pickup_type = PickupType.REGULAR if stop_time_update.departure.time and stop_sequence > 0 \ + else PickupType.NONE + drop_off_type = PickupType.REGULAR if stop_time_update.arrival.time \ + and stop_sequence < len(trip_update.stop_time_update) - 1 else PickupType.NONE + + StopTime.objects.update_or_create( + id=f"{trip_id}-{stop_id}", + trip_id=trip_id, + defaults={ + "stop_id": stop_id, + "stop_sequence": stop_sequence, + "arrival_time": arr_time, + "departure_time": dep_time, + "pickup_type": pickup_type, + "drop_off_type": drop_off_type, + } + ) diff --git a/sncfgtfs/models.py b/sncfgtfs/models.py index 5bfae6f..3ef0b94 100644 --- a/sncfgtfs/models.py +++ b/sncfgtfs/models.py @@ -217,7 +217,7 @@ class Stop(models.Model): @property def stop_type(self): - train_type = self.id.split('StopPoint:OCE')[1].split('-')[0] + train_type = self.id.split('StopPoint:OCE')[-1].split('StopArea:OCE')[-1].split('-')[0] return train_type def __str__(self):