trainvel/sncfgtfs/management/commands/update_sncf_gtfs_rt.py

159 lines
8.9 KiB
Python

from datetime import timedelta, datetime, date, time
from zoneinfo import ZoneInfo
import requests
from django.core.management import BaseCommand
from sncfgtfs.gtfs_realtime_pb2 import FeedMessage
from sncfgtfs.models import Calendar, CalendarDate, StopTime, StopTimeUpdate, Trip, TripUpdate, Stop
class Command(BaseCommand):
help = "Update the SNCF GTFS Realtime database."
GTFS_RT_FEEDS = {
"TGV": "https://proxy.transport.data.gouv.fr/resource/sncf-tgv-gtfs-rt-trip-updates",
"IC": "https://proxy.transport.data.gouv.fr/resource/sncf-ic-gtfs-rt-trip-updates",
"TER": "https://proxy.transport.data.gouv.fr/resource/sncf-ter-gtfs-rt-trip-updates",
}
def add_arguments(self, parser):
pass
def handle(self, *args, **options):
for feed_type, feed_url in self.GTFS_RT_FEEDS.items():
self.stdout.write(f"Updating {feed_type} feed...")
feed_message = FeedMessage()
feed_message.ParseFromString(requests.get(feed_url).content)
stop_times_updates = []
for entity in feed_message.entity:
if entity.HasField("trip_update"):
trip_update = entity.trip_update
trip_id = trip_update.trip.trip_id
start_date = date(year=int(trip_update.trip.start_date[:4]),
month=int(trip_update.trip.start_date[4:6]),
day=int(trip_update.trip.start_date[6:]))
start_dt = datetime.combine(start_date, time(0), tzinfo=ZoneInfo("Europe/Paris"))
if trip_update.trip.schedule_relationship == 1:
headsign = trip_id[5:-1]
trip_qs = Trip.objects.all()
trip_ids = trip_qs.values_list('id', flat=True)
for stop_sequence, stop_time_update in enumerate(trip_update.stop_time_update):
stop_id = stop_time_update.stop_id
st_queryset = StopTime.objects.filter(stop__parent_station_id=stop_id)
if stop_sequence == 0:
st_queryset = st_queryset.filter(stop_sequence=0)
trip_ids_restrict = trip_ids.intersection(st_queryset.values('trip_id'))
if trip_ids_restrict:
trip_ids = trip_ids_restrict
route_ids = set(Trip.objects.filter(id__in=trip_ids).values_list('route_id', flat=True))
self.stdout.write(f"{len(route_ids)} routes found on trip for train {headsign}")
if not route_ids:
self.stdout.write(f"Route not found for trip {trip_id}.")
continue
elif len(route_ids) > 1:
self.stdout.write(f"Multiple routes found for trip {trip_id}.")
continue
route_id = route_ids.pop()
Calendar.objects.update_or_create(
id=f"{feed_type}-new-{headsign}",
defaults={
"transport_type": feed_type,
"monday": False,
"tuesday": False,
"wednesday": False,
"thursday": False,
"friday": False,
"saturday": False,
"sunday": False,
"start_date": start_date,
"end_date": start_date,
}
)
CalendarDate.objects.update_or_create(
id=f"{feed_type}-{headsign}-{trip_update.trip.start_date}",
defaults={
"service_id": f"{feed_type}-new-{headsign}",
"date": trip_update.trip.start_date,
"exception_type": 1,
"transport_type": feed_type,
}
)
Trip.objects.update_or_create(
id=trip_id,
defaults={
"route_id": route_id,
"service_id": f"{feed_type}-new-{headsign}",
"headsign": headsign,
"direction_id": trip_update.trip.direction_id,
}
)
sample_trip = Trip.objects.filter(id__in=trip_ids).first()
for stop_sequence, stop_time_update in enumerate(trip_update.stop_time_update):
stop_id = stop_time_update.stop_id
stop = Stop.objects.get(id=stop_id)
if stop.location_type == 1:
if stop_sequence == 0:
stop = sample_trip.stop_times.get(stop_sequence=0).stop
else:
previous_stop = sample_trip.stop_times.get(stop_sequence=stop_sequence - 1).stop
stop = next(s for s in stop.children.all() \
if s.location_type == 0 and s.stop_type == previous_stop.stop_type)
stop_id = stop.id
StopTime.objects.update_or_create(
id=f"{trip_id}-{stop_sequence}",
trip_id=trip_id,
defaults={
"stop_id": stop_id,
"stop_sequence": stop_sequence,
"arrival_time": datetime.fromtimestamp(stop_time_update.arrival.time,
tz=ZoneInfo("Europe/Paris")) - start_dt,
"departure_time": datetime.fromtimestamp(stop_time_update.departure.time,
tz=ZoneInfo("Europe/Paris")) - start_dt,
"pickup_type": 0 if stop_time_update.departure.time else 1,
"drop_off_type": 0 if stop_time_update.arrival.time else 1,
}
)
if not Trip.objects.filter(id=trip_id).exists():
self.stdout.write(f"Trip {trip_id} does not exist in the GTFS feed.")
continue
tu, _created = TripUpdate.objects.get_or_create(
trip_id=trip_id,
start_date=trip_update.trip.start_date,
start_time=trip_update.trip.start_time,
schedule_relationship=trip_update.trip.schedule_relationship,
)
for stop_sequence, stop_time_update in enumerate(trip_update.stop_time_update):
if not StopTime.objects.filter(trip_id=trip_id, stop_sequence=stop_sequence).exists():
self.stdout.write(f"Stop {stop_sequence} does not exist in GTFS feed for trip {trip_id}.")
continue
st = StopTime.objects.get(trip_id=trip_id, stop_sequence=stop_sequence)
st_update = StopTimeUpdate(
trip_update=tu,
stop_time=st,
arrival_delay=timedelta(seconds=stop_time_update.arrival.delay),
arrival_time=datetime.fromtimestamp(stop_time_update.arrival.time,
tz=ZoneInfo("Europe/Paris")),
departure_delay=timedelta(seconds=stop_time_update.departure.delay),
departure_time=datetime.fromtimestamp(stop_time_update.departure.time,
tz=ZoneInfo("Europe/Paris")),
schedule_relationship=stop_time_update.schedule_relationship or 0,
)
stop_times_updates.append(st_update)
else:
self.stdout.write(str(entity))
StopTimeUpdate.objects.bulk_create(stop_times_updates,
update_conflicts=True,
update_fields=['arrival_delay', 'arrival_time',
'departure_delay', 'departure_time'],
unique_fields=['trip_update', 'stop_time'])