mirror of
https://gitlab.crans.org/bde/nk20
synced 2025-10-24 13:53:04 +02:00
585 lines
26 KiB
Python
585 lines
26 KiB
Python
# Copyright (C) 2028-2024 by BDE ENS Paris-Saclay
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import json
|
|
from argparse import ArgumentParser
|
|
|
|
from django.core.management import BaseCommand
|
|
from django.db.models import Q
|
|
from note.models import Note, Transaction
|
|
from member.models import User, Club, Membership
|
|
from activity.models import Activity, Entry
|
|
from wei.models import WEIClub
|
|
|
|
from ...models import Bde, Wrapped
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = "Generate wrapper for the annual BDE change"
|
|
|
|
def add_arguments(self, parser: ArgumentParser):
|
|
parser.add_argument(
|
|
'-b', '--bde',
|
|
type=str,
|
|
required=False,
|
|
help="A list of BDE name, BDE1,BDE2,... (a BDE name cannot have ',')",
|
|
dest='bde',
|
|
)
|
|
parser.add_argument(
|
|
'-i', '--id',
|
|
type=str,
|
|
required=False,
|
|
help="A list of BDE id, id1,id2,...",
|
|
dest='bde_id',
|
|
)
|
|
parser.add_argument(
|
|
'-u', '--users',
|
|
type=str,
|
|
required=False,
|
|
help="""User will have their(s) wrapped generated,
|
|
all = all users
|
|
adh = all users who have a valid cd memberships to BDE during the BDE considered
|
|
supersuser = all superusers
|
|
custom user1,user2,... = a list of username,
|
|
custom_id id1,id2,... = a list of user id""",
|
|
dest='user',
|
|
)
|
|
parser.add_argument(
|
|
'-c', '--club',
|
|
type=str,
|
|
required=False,
|
|
help="""Club will have their(s) wrapped generated,
|
|
all = all clubs,
|
|
active = all clubs with at least one transaction during the BDE mandate considered,
|
|
custom club1,club2,... = a list of club name,
|
|
custom_id id1,id2,... = a list of club id""",
|
|
dest='club',
|
|
)
|
|
parser.add_argument(
|
|
'-f', '--force-change',
|
|
required=False,
|
|
action='store_true',
|
|
help="if wrapped already exist change data_json",
|
|
dest='change',
|
|
)
|
|
parser.add_argument(
|
|
'-n', '--no-creation',
|
|
required=False,
|
|
action='store_false',
|
|
help="if wrapped don't already exist, don't generate it",
|
|
dest='create',
|
|
)
|
|
|
|
def handle(self, *args, **options): # NOQA
|
|
# Traitement des paramètres
|
|
verb = options['verbosity']
|
|
bde = []
|
|
if options['bde']:
|
|
bde_list = options['bde'].split(',')
|
|
bde = [Bde.objects.get(name=bde_name) for bde_name in bde_list]
|
|
|
|
if options['bde_id']:
|
|
if bde:
|
|
if verb >= 1:
|
|
self.stdout.write(self.style.WARNING(
|
|
"WARNING\nYou already defined bde with their name !"))
|
|
if verb >= 0:
|
|
self.stdout.write(self.style.ERROR("ABORT"))
|
|
exit(1)
|
|
bde_id = options['bde_id'].split(',')
|
|
bde = [Bde.objects.get(pk=i) for i in bde_id]
|
|
|
|
user = []
|
|
if options['user']:
|
|
if options['user'] == 'all':
|
|
user = ['all', None]
|
|
elif options['user'] == 'adh':
|
|
user = ['adh', None]
|
|
elif options['user'] == 'superuser':
|
|
user = ['superuser', None]
|
|
elif options['user'].split(' ')[0] == 'custom':
|
|
user_list = options['user'].split(' ')[1].split(',')
|
|
user = ['custom', [User.objects.get(username=u) for u in user_list]]
|
|
elif options['user'].split(' ')[0] == 'custom_id':
|
|
user_id = options['user'].split(' ')[1].split(',')
|
|
user = ['custom_id', [User.objects.get(pk=u) for u in user_id]]
|
|
else:
|
|
if verb >= 1:
|
|
self.sdtout.write(self.style.WARNING(
|
|
"WARNING\nYou user option is not recognized"))
|
|
if verb >= 0:
|
|
self.stdout.write(self.style.ERROR("ABORT"))
|
|
exit(1)
|
|
|
|
club = []
|
|
if options['club']:
|
|
if options['club'] == 'all':
|
|
club = ['all', None]
|
|
elif options['club'] == 'active':
|
|
club = ['active', None]
|
|
elif options['club'].split(' ')[0] == 'custom':
|
|
club_list = options['club'].split(' ')[1].split(',')
|
|
club = ['custom', [Club.objects.get(name=club_name) for club_name in club_list]]
|
|
elif options['club'].split(' ')[0] == 'custom_id':
|
|
club_id = options['club'].split(' ')[1].split(',')
|
|
club = ['custom_id', [Club.objects.get(pk=c) for c in club_id]]
|
|
else:
|
|
if verb >= 1:
|
|
self.stdout.write(self.style.WARNING(
|
|
"WARNING\nYou club option is not recognized"))
|
|
if verb >= 0:
|
|
self.stdout.write(self.style.ERROR("ABORT"))
|
|
exit(1)
|
|
|
|
change = options['change']
|
|
create = options['create']
|
|
|
|
# check if parameters are sufficient for generate wrapped with the desired option
|
|
if not bde:
|
|
if verb >= 1:
|
|
self.stdout.write(self.style.WARNING(
|
|
"WARNING\nYou have not selectionned a BDE !"))
|
|
if verb >= 0:
|
|
self.stdout.write(self.style.ERROR("ABORT"))
|
|
exit(1)
|
|
if not (user or club):
|
|
if verb >= 1:
|
|
self.stdout.write(self.style.WARNING(
|
|
"WARNING\nNo club or user selected !"))
|
|
if verb >= 0:
|
|
self.stdout.write(self.style.ERROR("ABORT"))
|
|
exit(1)
|
|
|
|
if verb >= 3:
|
|
self.stdout.write("Options:")
|
|
bde_str = ''
|
|
for b in bde:
|
|
bde_str += str(b) + '\n'
|
|
self.stdout.write("BDE: " + bde_str)
|
|
if user:
|
|
self.stdout.write('User: ' + user[0])
|
|
if club:
|
|
self.stdout.write('Club: ' + club[0])
|
|
self.stdout.write('change: ' + str(change))
|
|
self.stdout.write('create: ' + str(create) + '\n')
|
|
if not (change or create):
|
|
if verb >= 1:
|
|
self.stdout.write(self.style.WARNING(
|
|
"WARNING\nchange and create is set to false, none wrapped will be created"))
|
|
if verb >= 0:
|
|
self.stdout.write(self.style.ERROR("ABORT"))
|
|
exit(1)
|
|
if verb >= 1 and change:
|
|
self.stdout.write(self.style.WARNING(
|
|
"WARNING\nchange is set to true, some wrapped may be replaced !"))
|
|
if verb >= 1 and not create:
|
|
self.stdout.write(self.style.WARNING(
|
|
"WARNING\ncreate is set to false, wrapped will not be created !"))
|
|
if verb >= 3 or change or not create:
|
|
a = str(input('\033[mContinue ? (y/n) ')).lower()
|
|
if a in ['n', 'no', 'non', '0']:
|
|
if verb >= 0:
|
|
self.stdout.write(self.style.ERROR("ABORT"))
|
|
exit(1)
|
|
|
|
note = self.convert_to_note(change, create, bde=bde, user=user, club=club, verb=verb)
|
|
if verb >= 1:
|
|
self.stdout.write(self.style.SUCCESS(
|
|
"User and/or Club given has successfully convert in their note"))
|
|
|
|
global_data = self.global_data(bde, verb=verb)
|
|
if verb >= 1:
|
|
self.stdout.write(self.style.SUCCESS(
|
|
"Global data has been successfully generated"))
|
|
|
|
unique_data = self.unique_data(bde, note, global_data=global_data, verb=verb)
|
|
if verb >= 1:
|
|
self.stdout.write(self.style.SUCCESS(
|
|
"Unique data has been successfully generated"))
|
|
|
|
self.make_wrapped(unique_data, note, bde, change, create, verb=verb)
|
|
if verb >= 1:
|
|
self.stdout.write(self.style.SUCCESS(
|
|
"The wrapped has been generated !"))
|
|
if verb >= 0:
|
|
self.stdout.write(self.style.SUCCESS("SUCCESS"))
|
|
exit(0)
|
|
|
|
def convert_to_note(self, change, create, bde=None, user=None, club=None, verb=1): # NOQA
|
|
notes = []
|
|
for b in bde:
|
|
note_for_bde = Note.objects.filter(pk__lte=-1)
|
|
if user:
|
|
if 'custom' in user[0]:
|
|
for u in user[1]:
|
|
query = Q(noteuser__user=u)
|
|
note_for_bde |= Note.objects.filter(query)
|
|
elif user[0] == 'all':
|
|
query = Q(noteuser__user__pk__gte=-1)
|
|
note_for_bde |= Note.objects.filter(query)
|
|
elif user[0] == 'adh':
|
|
m = Membership.objects.filter(club=1,
|
|
date_start__lt=b.date_end,
|
|
date_end__gt=b.date_start,
|
|
).distinct('user')
|
|
for membership in m:
|
|
note_for_bde |= Note.objects.filter(noteuser__user=membership.user)
|
|
|
|
elif user[0] == 'superuser':
|
|
query |= Q(noteuser__user__is_superuser=True)
|
|
note_for_bde |= Note.objects.filter(query)
|
|
|
|
if club:
|
|
if 'custom' in club[0]:
|
|
for c in club[1]:
|
|
query = Q(noteclub__club=c)
|
|
note_for_bde |= Note.objects.filter(query)
|
|
elif club[0] == 'all':
|
|
query = Q(noteclub__club__pk__gte=-1)
|
|
note_for_bde |= Note.objects.filter(query)
|
|
elif club[0] == 'active':
|
|
nc = Note.objects.filter(noteclub__club__pk__gte=-1)
|
|
for noteclub in nc:
|
|
if Transaction.objects.filter(
|
|
Q(created_at__gte=b.date_start,
|
|
created_at__lte=b.date_end) & (Q(source=noteclub) | Q(destination=noteclub))):
|
|
note_for_bde |= Note.objects.filter(pk=noteclub.pk)
|
|
|
|
note_for_bde = self.filter_note(b, note_for_bde, change, create, verb=verb)
|
|
notes.append(note_for_bde)
|
|
if verb >= 2:
|
|
self.stdout.write(f"{len(note_for_bde)} note selectionned for bde {b.name}")
|
|
return notes
|
|
|
|
def global_data(self, bde, verb=1): # NOQA
|
|
data = {}
|
|
for b in bde:
|
|
if b.name == 'Rave Part[list]':
|
|
if verb >= 2:
|
|
self.stdout.write("Begin to make global data")
|
|
if verb >= 3:
|
|
self.stdout.write("nb_transaction")
|
|
# nb total de transactions
|
|
data['nb_transaction'] = Transaction.objects.filter(
|
|
created_at__gte=b.date_start,
|
|
created_at__lte=b.date_end,
|
|
valid=True).count()
|
|
|
|
if verb >= 3:
|
|
self.stdout.write("nb_vieux_con")
|
|
# nb total de vielleux con·ne·s derrière le bar
|
|
button_id = [2884, 2585]
|
|
transactions = Transaction.objects.filter(
|
|
created_at__gte=b.date_start,
|
|
created_at__lte=b.date_end,
|
|
valid=True,
|
|
recurrenttransaction__template__pk__in=button_id)
|
|
|
|
q = 0
|
|
for t in transactions:
|
|
q += t.quantity
|
|
data['nb_vieux_con'] = q
|
|
|
|
if verb >= 3:
|
|
self.stdout.write("nb_soiree")
|
|
# nb total de soirée
|
|
a_type_id = [1, 2, 4, 5, 7, 10]
|
|
data['nb_soiree'] = Activity.objects.filter(
|
|
date_end__gte=b.date_start,
|
|
date_start__lte=b.date_end,
|
|
valid=True,
|
|
activity_type__pk__in=a_type_id).count()
|
|
|
|
if verb >= 3:
|
|
self.stdout.write('pots, nb_entree_pot')
|
|
# nb d'entrée totale aux pots
|
|
pot_id = [1, 4, 10]
|
|
pots = Activity.objects.filter(
|
|
date_end__gte=b.date_start,
|
|
date_start__lte=b.date_end,
|
|
valid=True,
|
|
activity_type__pk__in=pot_id)
|
|
data['pots'] = pots # utile dans unique_data
|
|
data['nb_entree_pot'] = 0
|
|
for pot in pots:
|
|
data['nb_entree_pot'] += Entry.objects.filter(activity=pot).count()
|
|
|
|
if verb >= 3:
|
|
self.stdout.write('top3_buttons')
|
|
# top 3 des boutons les plus cliqués
|
|
transactions = Transaction.objects.filter(
|
|
created_at__gte=b.date_start,
|
|
created_at__lte=b.date_end,
|
|
valid=True,
|
|
amount__gt=0,
|
|
recurrenttransaction__template__pk__gte=-1)
|
|
|
|
d = {}
|
|
for t in transactions:
|
|
if t.recurrenttransaction.template.name in d:
|
|
d[t.recurrenttransaction.template.name] += t.quantity
|
|
else:
|
|
d[t.recurrenttransaction.template.name] = t.quantity
|
|
|
|
data['top3_buttons'] = list(sorted(d.items(), key=lambda item: item[1], reverse=True))[:3]
|
|
|
|
if verb >= 3:
|
|
self.stdout.write('class_conso_all')
|
|
# le classement des plus gros consommateurs (BDE + club)
|
|
transactions = Transaction.objects.filter(
|
|
created_at__gte=b.date_start,
|
|
created_at__lte=b.date_end,
|
|
valid=True,
|
|
source__noteuser__user__pk__gte=-1,
|
|
destination__noteclub__club__pk__gte=-1)
|
|
|
|
d = {}
|
|
for t in transactions:
|
|
if t.source in d:
|
|
d[t.source] += t.total
|
|
else:
|
|
d[t.source] = t.total
|
|
|
|
data['class_conso_all'] = dict(sorted(d.items(), key=lambda item: item[1], reverse=True))
|
|
|
|
if verb >= 3:
|
|
self.stdout.write('class_conso_bde')
|
|
# le classement des plus gros consommateurs BDE
|
|
transactions = Transaction.objects.filter(
|
|
created_at__gte=b.date_start,
|
|
created_at__lte=b.date_end,
|
|
valid=True,
|
|
source__noteuser__user__pk__gte=-1,
|
|
destination=5)
|
|
|
|
d = {}
|
|
for t in transactions:
|
|
if t.source in d:
|
|
d[t.source] += t.total
|
|
else:
|
|
d[t.source] = t.total
|
|
|
|
data['class_conso_bde'] = dict(sorted(d.items(), key=lambda item: item[1], reverse=True))
|
|
|
|
else:
|
|
# make your wrapped or reuse previous wrapped
|
|
raise NotImplementedError(f"The BDE: {b.name} has not personalized wrapped, make it !")
|
|
return data
|
|
|
|
def unique_data(self, bde, note, global_data=None, verb=1): # NOQA
|
|
data = []
|
|
for i in range(len(bde)):
|
|
data_bde = []
|
|
if bde[i].name == 'Rave Part[list]':
|
|
if verb >= 3:
|
|
total = len(note[i])
|
|
current = 0
|
|
self.stdout.write(f"Make {total} data for wrapped sponsored by {bde[i].name}")
|
|
for n in note[i]:
|
|
d = {}
|
|
if 'user' in n.__dir__():
|
|
# première conso du mandat
|
|
transactions = Transaction.objects.filter(
|
|
valid=True,
|
|
recurrenttransaction__template__id__gte=-1,
|
|
created_at__gte=bde[i].date_start,
|
|
created_at__lte=bde[i].date_end,
|
|
source=n,
|
|
destination=5).order_by('created_at')
|
|
if transactions:
|
|
d['first_conso'] = transactions[0].template.name
|
|
else:
|
|
d['first_conso'] = ''
|
|
# Wei + bus
|
|
wei = WEIClub.objects.filter(
|
|
date_start__lte=bde[i].date_end,
|
|
date_end__gte=bde[i].date_start)
|
|
if not wei:
|
|
d['wei'] = ''
|
|
d['bus'] = ''
|
|
else:
|
|
w = wei[0]
|
|
memberships = Membership.objects.filter(club=w, user=n.user)
|
|
if not memberships:
|
|
d['wei'] = ''
|
|
d['bus'] = ''
|
|
else:
|
|
alias = []
|
|
for a in w.note.alias.iterator():
|
|
alias.append(str(a))
|
|
d['wei'] = alias[-1]
|
|
d['bus'] = memberships[0].weimembership.bus.name
|
|
# top3 conso
|
|
transactions = Transaction.objects.filter(
|
|
valid=True,
|
|
created_at__gte=bde[i].date_start,
|
|
created_at__lte=bde[i].date_end,
|
|
source=n,
|
|
amount__gt=0,
|
|
recurrenttransaction__template__id__gte=-1)
|
|
dt = {}
|
|
dc = {}
|
|
for t in transactions:
|
|
if t.template.name in dt:
|
|
dt[t.template.name] += t.quantity
|
|
else:
|
|
dt[t.template.name] = t.quantity
|
|
if t.template.category.name in dc:
|
|
dc[t.template.category.name] += t.quantity
|
|
else:
|
|
dc[t.template.category.name] = t.quantity
|
|
|
|
d['top3_conso'] = list(sorted(dt.items(), key=lambda item: item[1], reverse=True))[:3]
|
|
# catégorie de bouton préférée
|
|
if dc:
|
|
d['top_category'] = list(sorted(dc.items(), key=lambda item: item[1], reverse=True))[0][0]
|
|
else:
|
|
d['top_category'] = ''
|
|
# nombre de pot, et nombre d'entrée pot
|
|
pots = global_data['pots']
|
|
d['nb_pots'] = pots.count()
|
|
|
|
p = 0
|
|
for pot in pots:
|
|
if Entry.objects.filter(activity=pot, note=n):
|
|
p += 1
|
|
d['nb_pot_entry'] = p
|
|
# ton nombre de rechargement
|
|
d['nb_rechargement'] = Transaction.objects.filter(
|
|
valid=True,
|
|
created_at__gte=bde[i].date_start,
|
|
created_at__lte=bde[i].date_end,
|
|
destination=n,
|
|
source__pk__in=[1, 2, 3, 4]).count()
|
|
# ajout info globale spécifique user
|
|
# classement et montant conso all
|
|
d['class_part_all'] = len(global_data['class_conso_all'])
|
|
if n in global_data['class_conso_all']:
|
|
d['class_conso_all'] = list(global_data['class_conso_all']).index(n) + 1
|
|
d['amount_conso_all'] = global_data['class_conso_all'][n] / 100
|
|
else:
|
|
d['class_conso_all'] = 0
|
|
d['amount_conso_all'] = 0
|
|
# classement et montant conso bde
|
|
d['class_part_bde'] = len(global_data['class_conso_bde'])
|
|
if n in global_data['class_conso_bde']:
|
|
d['class_conso_bde'] = list(global_data['class_conso_bde']).index(n) + 1
|
|
d['amount_conso_bde'] = global_data['class_conso_bde'][n] / 100
|
|
else:
|
|
d['class_conso_bde'] = 0
|
|
d['amount_conso_bde'] = 0
|
|
|
|
if 'club' in n.__dir__():
|
|
# plus gros consommateur
|
|
transactions = Transaction.objects.filter(
|
|
valid=True,
|
|
created_at__lte=bde[i].date_end,
|
|
created_at__gte=bde[i].date_start,
|
|
destination=n,
|
|
source__noteuser__user__pk__gte=-1)
|
|
dt = {}
|
|
|
|
for t in transactions:
|
|
if t.source.user.username in dt:
|
|
dt[t.source.user.username] += t.total
|
|
else:
|
|
dt[t.source.user.username] = t.total
|
|
if dt:
|
|
d['big_consumer'] = list(sorted(dt.items(), key=lambda item: item[1], reverse=True))[0]
|
|
d['big_consumer'] = (d['big_consumer'][0], d['big_consumer'][1] / 100)
|
|
else:
|
|
d['big_consumer'] = ''
|
|
# plus gros créancier
|
|
transactions = Transaction.objects.filter(
|
|
valid=True,
|
|
created_at__lte=bde[i].date_end,
|
|
created_at__gte=bde[i].date_start,
|
|
source=n,
|
|
destination__noteuser__user__pk__gte=-1)
|
|
dt = {}
|
|
|
|
for t in transactions:
|
|
if t.destination.user.username in dt:
|
|
dt[t.destination.user.username] += t.total
|
|
else:
|
|
dt[t.destination.user.username] = t.total
|
|
if dt:
|
|
d['big_creancier'] = list(sorted(dt.items(), key=lambda item: item[1], reverse=True))[0]
|
|
d['big_creancier'] = (d['big_creancier'][0], d['big_creancier'][1] / 100)
|
|
else:
|
|
d['big_creancier'] = ''
|
|
# nb de soirée organisée
|
|
d['nb_soiree_orga'] = Activity.objects.filter(
|
|
valid=True,
|
|
date_start__lte=bde[i].date_end,
|
|
date_end__gte=bde[i].date_start,
|
|
organizer=n.club).count()
|
|
# nb de membres cumulé
|
|
d['nb_member'] = Membership.objects.filter(
|
|
date_start__lte=bde[i].date_end,
|
|
date_end__gte=bde[i].date_start,
|
|
club=n.club).distinct('user').count()
|
|
|
|
# ajout info globale
|
|
# top3 button
|
|
d['glob_top3_conso'] = global_data['top3_buttons']
|
|
# nb entree pot
|
|
d['glob_nb_entree_pot'] = global_data['nb_entree_pot']
|
|
# nb soiree
|
|
d['glob_nb_soiree'] = global_data['nb_soiree']
|
|
# nb vieux con
|
|
d['glob_nb_vieux_con'] = global_data['nb_vieux_con']
|
|
# nb transaction
|
|
d['glob_nb_transaction'] = global_data['nb_transaction']
|
|
|
|
data_bde.append(json.dumps(d))
|
|
if verb >= 3:
|
|
current += 1
|
|
self.stdout.write("\033[2K" + f"({current}/{total})" + "\033[1A")
|
|
|
|
else:
|
|
# make your wrapped or reuse previous wrapped
|
|
raise NotImplementedError(f"The BDE: {bde[i].name} has not personalized wrapped, make it !")
|
|
data.append(data_bde)
|
|
return data
|
|
|
|
def make_wrapped(self, unique_data, note, bde, change, create, verb=1):
|
|
if verb >= 3:
|
|
current = 0
|
|
total = 0
|
|
for n in note:
|
|
total += len(n)
|
|
self.stdout.write(f"Make {total} wrapped")
|
|
for i in range(len(bde)):
|
|
for j in range(len(note[i])):
|
|
if create and not Wrapped.objects.filter(bde=bde[i], note=note[i][j]):
|
|
Wrapped(bde=bde[i],
|
|
note=note[i][j],
|
|
data_json=unique_data[i][j],
|
|
public=False,
|
|
generated=True).save()
|
|
elif change:
|
|
w = Wrapped.objects.get(bde=bde[i], note=note[i][j])
|
|
w.data_json = unique_data[i][j]
|
|
w.save()
|
|
if verb >= 3:
|
|
current += 1
|
|
self.stdout.write("\033[2K" + f"({current}/{total})" + "\033[1A")
|
|
return
|
|
|
|
def filter_note(self, bde, note, change, create, verb=1):
|
|
if change and create:
|
|
return list(note)
|
|
if change and not create:
|
|
note_new = []
|
|
for n in note:
|
|
if Wrapped.objects.filter(bde=bde, note=n):
|
|
note_new.append(n)
|
|
return note_new
|
|
if not change and create:
|
|
note_new = []
|
|
for n in note:
|
|
if not Wrapped.objects.filter(bde=bde, note=n):
|
|
note_new.append(n)
|
|
return note_new
|