# Copyright (C) 2018-2025 by BDE ENS Paris-Saclay # SPDX-License-Identifier: GPL-3.0-or-later import time import json from functools import lru_cache from random import Random from django import forms from django.db import transaction from django.db.models import Q from django.utils.translation import gettext_lazy as _ from django.utils.safestring import mark_safe from .base import WEISurvey, WEISurveyInformation, WEISurveyAlgorithm, WEIBusInformation from ...models import WEIMembership, Bus WORDS = { 'list': [ 'Fiesta', 'Graillance', 'Move it move it', 'Calme', 'Nert et geek', 'Jeux de rôles et danse rock', 'Strass et paillettes', 'Spectaculaire', 'Splendide', 'Flow inégalable', 'Rap', 'Battles légendaires', 'Techno', 'Alcool', 'Kiffeur·euse', 'Rugby', 'Médiéval', 'Festif', 'Stylé', 'Chipie', 'Rétro', 'Vache', 'Farfadet', 'Fanfare', ], 'questions': { "alcool": [ """Sur une échelle allant de 0 (= 0 alcool ou très peu) à 5 (= la fontaine de jouvence alcoolique), quel niveau de consommation d’alcool souhaiterais-tu ?""", { 42: "", 47: "", 48: "", 45: "", 44: "", 46: "", 43: "", 49: "" } ], "voie_post_bac": [ """Si la DA du bus de ton choix correspondait à une voie post-bac, laquelle serait-elle ?""", { 42: "", 47: "", 48: "", 45: "", 44: "", 46: "", 43: "", 49: "" } ], "boite": [ """Tu es seul·e sur une île déserte et devant toi il y a une sombre boîte de taille raisonnable. Qu’y a-t-il à l’intérieur ?""", { 42: "", 47: "", 48: "", 45: "", 44: "", 46: "", 43: "", 49: "" } ], "tardif": [ """Il est 00h, tu as passé la journée à la plage avec tes copains et iels te proposent de prolonger parce qu’après tout, il n’y a plus personne sur la plage à cette heure-ci. Tu n’habites pas loin mais t’enchaînes demain avec une journée similaire avec un autre groupe d’amis parce que t’es trop #busy. Que fais-tu ?""", { 42: "", 47: "", 48: "", 45: "", 44: "", 46: "", 43: "", 49: "" } ], "cohesion": [ """C’est la rentrée de Seconde et tu découvres ta classe, tes camarades et ta prof principale!!! qui vous propose une activité de cohésion. Laquelle est-elle ?""", { 42: "", 47: "", 48: "", 45: "", 44: "", 46: "", 43: "", 49: "" } ], "artiste": [ """C’est l’été et la saison des festivals a commencé. Tu regardes la programmation du festival pas loin de chez toi et tu découvres avec joie la présence d’un·e artiste. De qui s’agit-il ?""", { 42: "", 47: "", 48: "", 45: "", 44: "", 46: "", 43: "", 49: "" } ], "annonce_noel": [ """C’est Noël et tu revois toute ta famille, oncles, tantes, cousin·e·s, grands-parents, la totale. D’un coup, tu te lèves, tapotes de manière pompeuse sur ton verre avec un de tes couverts. Qu’annonces-tu ?""", { 42: "", 47: "", 48: "", 45: "", 44: "", 46: "", 43: "", 49: "" } ], "vacances": [ """Les vacances sont là et t’aimerais bien partir quelque part, mais où ?""", { 42: "", 47: "", 48: "", 45: "", 44: "", 46: "", 43: "", 49: "" } ], "loisir": [ """T’as fini ta journée de cours et tu t’apprêtes à profiter d’une activité/hobby/loisir de ton choix. Laquelle est-ce ?""", { 42: "", 47: "", 48: "", 45: "", 44: "", 46: "", 43: "", 49: "" } ], "plan": [ """Tu reçois un message sur la conversation de groupe que tu partages avec tes potes : vous êtes chaud·e·s pour vous retrouver. Quel plan t’attire le plus ?""", { 42: "", 47: "", 48: "", 45: "", 44: "", 46: "", 43: "", 49: "" } ] } } IMAGES = { } NB_WORDS = 5 class OptionalImageRadioSelect(forms.RadioSelect): def __init__(self, images=None, *args, **kwargs): self.images = images or {} super().__init__(*args, **kwargs) def create_option(self, name, value, label, selected, index, subindex=None, attrs=None): option = super().create_option(name, value, label, selected, index, subindex=subindex, attrs=attrs) img_url = self.images.get(value) if img_url: option['label'] = mark_safe(f'{label} ') else: option['label'] = label return option class WEISurveyForm2025(forms.Form): """ Survey form for the year 2025. Members choose 20 words, from which we calculate the best associated bus. """ def set_registration(self, registration): """ Filter the bus selector with the buses of the current WEI. """ information = WEISurveyInformation2025(registration) if not information.seed: information.seed = int(1000 * time.time()) information.save(registration) registration._force_save = True registration.save() rng = Random((information.step + 1) * information.seed) if information.step == 0: self.fields["words"] = forms.MultipleChoiceField( label=_(f"Select {NB_WORDS} words that describe the WEI experience you want to have."), choices=[(w, w) for w in WORDS['list']], widget=forms.CheckboxSelectMultiple(), required=True, ) if self.is_valid(): return all_preferred_words = WORDS['list'] rng.shuffle(all_preferred_words) self.fields["words"].choices = [(w, w) for w in all_preferred_words] else: questions = list(WORDS['questions'].items()) idx = information.step - 1 if idx < len(questions): q, (desc, answers) = questions[idx] if q == 'alcool': choices = [(i / 2, str(i / 2)) for i in range(11)] else: choices = [(k, v) for k, v in answers.items()] rng.shuffle(choices) self.fields[q] = forms.ChoiceField( label=desc, choices=choices, widget=OptionalImageRadioSelect(images=IMAGES.get(q, {})), required=True, ) def clean_words(self): data = self.cleaned_data['words'] if len(data) != NB_WORDS: raise forms.ValidationError(_(f"Please choose exactly {NB_WORDS} words")) return data class WEIBusInformation2025(WEIBusInformation): """ For each word, the bus has a score """ scores: dict def __init__(self, bus): self.scores = {} super().__init__(bus) class BusInformationForm2025(forms.ModelForm): class Meta: model = Bus fields = ['information_json'] widgets = { 'information_json': forms.HiddenInput(), } def __init__(self, *args, words=None, **kwargs): super().__init__(*args, **kwargs) initial_scores = {} if self.instance and self.instance.information_json: try: info = json.loads(self.instance.information_json) initial_scores = info.get("scores", {}) except (json.JSONDecodeError, TypeError, AttributeError): initial_scores = {} if words is None: words = WORDS['list'] self.words = words choices = [(i, str(i)) for i in range(6)] # [(0, '0'), (1, '1'), ..., (5, '5')] for word in words: self.fields[word] = forms.TypedChoiceField( label=word, choices=choices, coerce=int, initial=initial_scores.get(word, 0) if word in initial_scores else None, required=True, widget=forms.RadioSelect, help_text=_("Rate between 0 and 5."), ) def clean(self): cleaned_data = super().clean() scores = {} for word in self.words: value = cleaned_data.get(word) if value is not None: scores[word] = value # On encode en JSON cleaned_data['information_json'] = json.dumps({"scores": scores}) return cleaned_data class WEISurveyInformation2025(WEISurveyInformation): """ We store the id of the selected bus. We store only the name, but is not used in the selection: that's only for humans that try to read data. """ # Random seed that is stored at the first time to ensure that words are generated only once seed = 0 step = 0 def __init__(self, registration): for i in range(1, NB_WORDS + 1): setattr(self, "word" + str(i), None) for q in WORDS['questions']: setattr(self, q, None) super().__init__(registration) def reset(self, registration): """ Réinitialise complètement le questionnaire : step, seed, mots choisis et réponses aux questions. """ self.step = 0 self.seed = 0 for i in range(1, NB_WORDS + 1): setattr(self, f"word{i}", None) for q in WORDS['questions']: setattr(self, q, None) self.save(registration) registration._force_save = True registration.save() class WEISurvey2025(WEISurvey): """ Survey for the year 2025. """ @classmethod def get_year(cls): return 2025 @classmethod def get_survey_information_class(cls): return WEISurveyInformation2025 def get_form_class(self): return WEISurveyForm2025 def update_form(self, form): """ Filter the bus selector with the buses of the WEI. """ form.set_registration(self.registration) @transaction.atomic def form_valid(self, form): if self.information.step == 0: words = form.cleaned_data['words'] for i, word in enumerate(words, 1): setattr(self.information, "word" + str(i), word) self.information.step += 1 self.save() else: questions = list(WORDS['questions'].keys()) idx = self.information.step - 1 if idx < len(questions): q = questions[idx] setattr(self.information, q, form.cleaned_data[q]) self.information.step += 1 self.save() @classmethod def get_algorithm_class(cls): return WEISurveyAlgorithm2025 def is_complete(self) -> bool: """ The survey is complete once the bus is chosen. """ return self.information.step > len(WORDS['questions']) @classmethod @lru_cache() def word_mean(cls, word): """ Calculate the mid-score given by all buses. """ buses = cls.get_algorithm_class().get_buses() return sum([cls.get_algorithm_class().get_bus_information(bus).scores[word] for bus in buses]) / buses.count() @lru_cache() def score_questions(self, bus): """ The score given by the answers to the questions """ if not self.is_complete(): raise ValueError("Survey is not ended, can't calculate score") s = sum(1 for q in WORDS['questions'] if q != 'alcool' and getattr(self.information, q) == bus.pk) if 'alcool' in WORDS['questions'] and bus.pk in WORDS['questions']['alcool'][1] and hasattr(self.information, 'alcool'): s -= abs(float(self.information.alcool) - float(WORDS['questions']['alcool'][1][bus.pk])) return s @lru_cache() def score_words(self, bus): """ The score given by the choice of words """ if not self.is_complete(): raise ValueError("Survey is not ended, can't calculate score") bus_info = self.get_algorithm_class().get_bus_information(bus) # Score is the given score by the bus subtracted to the mid-score of the buses. s = sum(bus_info.scores[getattr(self.information, 'word' + str(i))] - self.word_mean(getattr(self.information, 'word' + str(i))) for i in range(1, 1 + NB_WORDS)) / self.get_algorithm_class().get_buses().count() return s @lru_cache() def scores_per_bus(self): return {bus: (self.score_questions(bus), self.score_words(bus)) for bus in self.get_algorithm_class().get_buses()} @lru_cache() def ordered_buses(self): """ Order the buses by the score_questions of the survey. """ values = list(self.scores_per_bus().items()) values.sort(key=lambda item: -item[1][0]) return values @classmethod def clear_cache(cls): cls.word_mean.cache_clear() return super().clear_cache() class WEISurveyAlgorithm2025(WEISurveyAlgorithm): """ The algorithm class for the year 2025. We use Gale-Shapley algorithm to attribute 1y students into buses. """ @classmethod def get_survey_class(cls): return WEISurvey2025 @classmethod def get_bus_information_class(cls): return WEIBusInformation2025 @classmethod def get_bus_information_form(cls): return BusInformationForm2025 @classmethod def get_buses(cls): if not hasattr(cls, '_buses'): cls._buses = Bus.objects.filter(wei__year=cls.get_survey_class().get_year(), size__gt=0).all().exclude(name='Staff') return cls._buses def run_algorithm(self, display_tqdm=False): """ Gale-Shapley algorithm implementation. We modify it to allow buses to have multiple "weddings". We use lexigographical order on both scores """ surveys = list(self.get_survey_class()(r) for r in self.get_registrations()) # All surveys surveys = [s for s in surveys if s.is_complete()] # Don't consider invalid surveys # Don't manage hardcoded people surveys = [s for s in surveys if not hasattr(s.information, 'hardcoded') or not s.information.hardcoded] # Reset previous algorithm run for survey in surveys: survey.free() survey.save() non_men = [s for s in surveys if s.registration.gender != 'male'] men = [s for s in surveys if s.registration.gender == 'male'] quotas = {} registrations = self.get_registrations() non_men_total = registrations.filter(~Q(gender='male')).count() for bus in self.get_buses(): free_seats = bus.size - WEIMembership.objects.filter(bus=bus, registration__first_year=False).count() # Remove hardcoded people free_seats -= WEIMembership.objects.filter(bus=bus, registration__first_year=True, registration__information_json__icontains="hardcoded").count() quotas[bus] = 4 + int(non_men_total / registrations.count() * free_seats) tqdm_obj = None if display_tqdm: from tqdm import tqdm tqdm_obj = tqdm(total=len(non_men), desc="Non-hommes") # Repartition for non men people first self.make_repartition(non_men, quotas, tqdm_obj=tqdm_obj) quotas = {} for bus in self.get_buses(): free_seats = bus.size - WEIMembership.objects.filter(bus=bus, registration__first_year=False).count() free_seats -= sum(1 for s in non_men if s.information.selected_bus_pk == bus.pk) # Remove hardcoded people free_seats -= WEIMembership.objects.filter(bus=bus, registration__first_year=True, registration__information_json__icontains="hardcoded").count() quotas[bus] = free_seats if display_tqdm: tqdm_obj.close() from tqdm import tqdm tqdm_obj = tqdm(total=len(men), desc="Hommes") self.make_repartition(men, quotas, tqdm_obj=tqdm_obj) if display_tqdm: tqdm_obj.close() # Clear cache information after running algorithm WEISurvey2025.clear_cache() def make_repartition(self, surveys, quotas=None, tqdm_obj=None): free_surveys = surveys.copy() # Remaining surveys while free_surveys: # Some students are not affected survey = free_surveys[0] buses = survey.ordered_buses() # Preferences of the student for bus, current_scores in buses: if self.get_bus_information(bus).has_free_seats(surveys, quotas): # Selected bus has free places. Put student in the bus survey.select_bus(bus) survey.save() free_surveys.remove(survey) break else: # Current bus has not enough places. Remove the least preferred student from the bus if existing least_preferred_survey = None least_score = -1 # Find the least student in the bus that has a lower score than the current student for survey2 in surveys: if not survey2.information.valid or survey2.information.get_selected_bus() != bus: continue score2 = survey2.score_words(bus) if current_scores[1] <= score2: # Ignore better students continue if least_preferred_survey is None or score2 < least_score: least_preferred_survey = survey2 least_score = score2 if least_preferred_survey is not None: # Remove the least student from the bus and put the current student in. # If it does not exist, choose the next bus. least_preferred_survey.free() least_preferred_survey.save() free_surveys.append(least_preferred_survey) survey.select_bus(bus) survey.save() free_surveys.remove(survey) break else: raise ValueError(f"User {survey.registration.user} has no free seat") if tqdm_obj is not None: tqdm_obj.n = len(surveys) - len(free_surveys) tqdm_obj.refresh()