#!/usr/env/bin python3 from django.core.management.base import BaseCommand from django.core.management import call_command from django.utils import timezone import psycopg2 as pg import psycopg2.extras as pge from django.db import transaction import json import datetime import collections from django.core.exceptions import ValidationError from django.utils.timezone import make_aware from django.db import IntegrityError from django.contrib.auth.models import User from note.models import Note, NoteSpecial, NoteUser, NoteClub from note.models import Alias from note.models import Transaction, TransactionTemplate,\ TemplateCategory, RecurrentTransaction, MembershipTransaction from member.models import Profile, Club, Membership """ Script d'import de la nk15: TODO: import transactions TODO: import adhesion TODO: import activite TODO: import ... """ M_DURATION = 396 M_START = datetime.date(2019,8,31) M_END = datetime.date(2020,9,30) MAP_IDBDE={ -4: 2, # Carte Bancaire -3: 4, # Virement -2: 1, # Especes -1: 3, # Chèque 0: 5, # BDE } def update_line(n,N, content): n = str(n) N = str(N) n.rjust(len(N)) print(f"({n}/{N}) {content:10.10}", end="\r") @transaction.atomic def import_comptes(cur): cur.execute("SELECT * FROM comptes WHERE idbde > 0 ORDER BY idbde;") pkclub = 3 N = cur.rowcount for idx, row in enumerate(cur): update_line(idx,N,row["pseudo"]) if row["type"] == "personne": #sanitize password if row["passwd"] != "*|*": passwd_nk15 = "$".join(["custom_nk15","1",row["passwd"]]) else: passwd_nk15 = '' try: obj_dict = { "username": row["pseudo"], "password": passwd_nk15, "first_name": row["nom"], "last_name": row["prenom"], "email": row["mail"], "is_active" : True, # temporary } user = User.objects.create(**obj_dict) profile = user.profile profile.phone_number = row['tel'] profile.address = row['adresse'] profile.paid = row['normalien'] profile.registration_valid = True profile.email_confirmed = True user.save() profile.save() # sanitize duplicate aliases (nk12) except ValidationError as e: if e.code == 'same_alias': user.username = row["pseudo"]+str(row["idbde"]) user.save() else: raise(e) # profile and note created via signal. note = user.note date = row.get("last_negatif",None) if date != None: note.last_negative = make_aware(date) note.balance = row["solde"] obj_list =[user, profile, note] note.save() else: # club obj_dict = { "pk":pkclub, "name": row["pseudo"], "email": row["mail"], "membership_duration": M_DURATION, "membership_start": M_START, "membership_end": M_END, "membership_fee_paid": 0, "membership_fee_unpaid":0, } club,c = Club.objects.get_or_create(**obj_dict) pkclub +=1 note = club.note note.balance = row["solde"] club.save() note.save() MAP_IDBDE[row["idbde"]] = note.pk @transaction.atomic def import_boutons(cur): cur.execute("SELECT * FROM boutons;") N = cur.rowcount for idx, row in enumerate(cur): update_line(idx,N,row["label"]) cat, created = TemplateCategory.objects.get_or_create(name=row["categorie"]) if created: cat.save() obj_dict = { "pk": row["id"], "name": row["label"], "amount": row["montant"], "destination_id": MAP_IDBDE[row["destinataire"]], "category": cat, "display" : row["affiche"], "description": row["description"], } try: with transaction.atomic(): # required for error management button = TransactionTemplate.objects.create(**obj_dict) except IntegrityError as e: # button with the same name is not possible in NK20. if "unique" in e.args[0]: qs = Club.objects.filter(note__id=MAP_IDBDE[row["destinataire"]]).values('name') note_name = qs[0]["name"] #rename button name obj_dict["name"] ="{} {}".format(obj_dict["name"],note_name) button = TransactionTemplate.objects.create(**obj_dict) else: raise(e) button.save() @transaction.atomic def import_transaction(cur): cur.execute("SELECT * FROM transactions LEFT JOIN adhesions ON transactions.id = adhesions.idtransaction ORDER BY transactions.id;") N = cur.rowcount for idx, row in enumerate(cur): update_line(idx,N,row["description"]) # some date are set to None, use the previous one date = row["date"] if date is None: date = old_date else: old_date = date obj_dict = { # "pk": row["id"], "destination_id" : MAP_IDBDE[row["destinataire"]], "source_id": MAP_IDBDE[row["emetteur"]], "created_at":make_aware(date), "amount":row["montant"], "quantity":row["quantite"], "reason":row["description"], "valid":row["valide"], } ttype = row["type"] if ttype == "don" or ttype == "transfert" or ttype == "invitation": transac = Transaction.objects.create(**obj_dict) elif ttype == "bouton": cat_name = row["categorie"] if cat_name == None: cat_name = 'None' cat, created = TemplateCategory.objects.get_or_create(name=cat_name) if created: cat.save() obj_dict["category"] = cat transac = RecurrentTransaction.objects.create(**obj_dict) elif ttype == "crédit" or ttype == "retrait": field_id = "source_id" if ttype == "crédit" else "destination_id" if "espèce" in row["description"]: obj_dict[field_id] = 1 elif "carte" in row["description"]: obj_dict[field_id] = 2 elif "cheques" in row["description"]: obj_dict[field_id] = 3 elif "virement" in row["description"]: obj_dict[field_id] = 4 actor = Note.objects.get(MAP_IDBDE[max(row["destinataire"], row["emetteur"])]) # custom fields of SpecialTransaction if dest.__class__.__name__ == "NoteUser": obj_dict["first_name"] = dest.user.first_name obj_dict["last_name"] = dest.user.last_name elif dest.__class__.__name__ == "NoteClub": obj_dict["first_name"] = dest.club.name obj_dict["last_name"] = dest.club.name else: raise("You should'nt be there") transac = SpecialTransaction.objects.create(**obj_dict) elif ttype == "adhésion": print("adhesion not supported yet") else: print("other type not supported yet") @transaction.atomic def import_aliases(cur): cur.execute("SELECT * FROM aliases ORDER by id") N = cur.rowcount for idx, row in enumerate(cur): update_line(idx,N,row["alias"]) alias_name = row["alias"] alias_name_good = (alias_name[:252]+'...') if len(alias_name) > 255 else alias_name obj_dict = { "note_id":MAP_IDBDE[row["idbde"]], "name":alias_name_good, "normalized_name":Alias.normalize(alias_name_good) } try: with transaction.atomic(): alias, created = Alias.objects.get_or_create(**obj_dict) except IntegrityError as e: if "unique" in e.args[0]: continue else: raise(e) alias.save() class Command(BaseCommand): """ Command for importing the database of NK15. Need to be run by a user with a registered role in postgres for the database nk15. """ def print_success(self,to_print): return self.stdout.write(self.style.SUCCESS(to_print)) def add_arguments(self,parser): parser.add_argument('-c', '--comptes', action = 'store_true', help="import accounts") parser.add_argument('-b', '--boutons', action = 'store_true', help="import boutons") parser.add_argument('-t', '--transactions', action = 'store_true',help="import transaction") parser.add_argument('-a', '--aliases', action = 'store_true',help="import aliases") parser.add_argument('-s', '--save', action='store', help="save mapping of idbde") parser.add_argument('-m', '--map', action='store', help="import mapping of idbde") parser.add_argument('-d', '--nk15db', action='store', default='nk15', help='NK15 database name') parser.add_argument('-u', '--nk15user', action='store', default='nk15_user', help='NK15 database owner') def handle(self, *args, **kwargs): global MAP_IDBDE nk15db, nk15user = kwargs['nk15db'], kwargs['nk15user'] # connecting to nk15 database conn = pg.connect(database=nk15db,user=nk15user) cur = conn.cursor(cursor_factory = pge.DictCursor) if kwargs["comptes"]: #reset database. call_command("migrate") call_command("loaddata","initial") self.print_success("reset nk20 database") import_comptes(cur) self.print_success("comptes table imported") elif kwargs["map"]: filename = kwargs["map"] with open(filename,'r') as fp: MAP_IDBDE = json.load(fp) MAP_IDBDE = {int(k):int(v) for k,v in MAP_IDBDE.items()} if kwargs["save"]: filename = kwargs["save"] with open(filename,'w') as fp: json.dump(MAP_IDBDE,fp,sort_keys=True, indent=2) # /!\ need a prober MAP_IDBDE if kwargs["boutons"]: import_boutons(cur) self.print_success("boutons table imported") if kwargs["transactions"]: import_transaction(cur) self.print_success("transaction imported") if kwargs["aliases"]: import_aliases(cur) self.print_success("aliases imported")