nk20-scripts/management/commands/import_nk15.py

294 lines
11 KiB
Python

#!/usr/env/bin python3
from django.core.management.base import BaseCommand
from django.core.management import call_command
from django.utils import timezone
import psycopg2 as pg
import psycopg2.extras as pge
from django.db import transaction
import json
import datetime
import collections
from django.core.exceptions import ValidationError
from django.utils.timezone import make_aware
from django.db import IntegrityError
from django.contrib.auth.models import User
from note.models import Note, NoteSpecial, NoteUser, NoteClub
from note.models import Alias
from note.models import TemplateCategory, TransactionTemplate,\
Transaction, RecurrentTransaction, MembershipTransaction, SpecialTransaction
from member.models import Profile, Club, Membership
"""
Script d'import de la nk15:
TODO: import transactions
TODO: import adhesion
TODO: import activite
TODO: import ...
"""
M_DURATION = 396
M_START = datetime.date(2019,8,31)
M_END = datetime.date(2020,9,30)
MAP_IDBDE={
-4: 2, # Carte Bancaire
-3: 4, # Virement
-2: 1, # Especes
-1: 3, # Chèque
0: 5, # BDE
}
def update_line(n,N, content):
n = str(n)
N = str(N)
n.rjust(len(N))
print(f"\r ({n}/{N}) {content:10.10}",end="")
@transaction.atomic
def import_comptes(cur):
cur.execute("SELECT * FROM comptes WHERE idbde > 0 ORDER BY idbde;")
pkclub = 3
N = cur.rowcount
for idx, row in enumerate(cur):
update_line(idx,N,row["pseudo"])
if row["type"] == "personne":
#sanitize password
if row["passwd"] != "*|*":
passwd_nk15 = "$".join(["custom_nk15","1",row["passwd"]])
else:
passwd_nk15 = ''
try:
obj_dict = {
"username": row["pseudo"],
"password": passwd_nk15,
"first_name": row["nom"],
"last_name": row["prenom"],
"email": row["mail"],
"is_active" : True, # temporary
}
user = User.objects.create(**obj_dict)
profile = user.profile
profile.phone_number = row['tel']
profile.address = row['adresse']
profile.paid = row['normalien']
profile.registration_valid = True
profile.email_confirmed = True
user.save()
profile.save()
# sanitize duplicate aliases (nk12)
except ValidationError as e:
if e.code == 'same_alias':
user.username = row["pseudo"]+str(row["idbde"])
user.save()
else:
raise(e)
# profile and note created via signal.
note = user.note
date = row.get("last_negatif",None)
if date != None:
note.last_negative = make_aware(date)
note.balance = row["solde"]
obj_list =[user, profile, note]
note.save()
else: # club
obj_dict = {
"pk":pkclub,
"name": row["pseudo"],
"email": row["mail"],
"membership_duration": M_DURATION,
"membership_start": M_START,
"membership_end": M_END,
"membership_fee_paid": 0,
"membership_fee_unpaid":0,
}
club,c = Club.objects.get_or_create(**obj_dict)
pkclub +=1
note = club.note
note.balance = row["solde"]
club.save()
note.save()
MAP_IDBDE[row["idbde"]] = note.note_ptr_id
@transaction.atomic
def import_boutons(cur):
cur.execute("SELECT * FROM boutons;")
N = cur.rowcount
for idx, row in enumerate(cur):
update_line(idx,N,row["label"])
cat, created = TemplateCategory.objects.get_or_create(name=row["categorie"])
if created:
cat.save()
obj_dict = {
"pk": row["id"],
"name": row["label"],
"amount": row["montant"],
"destination_id": MAP_IDBDE[row["destinataire"]],
"category": cat,
"display" : row["affiche"],
"description": row["description"],
}
try:
with transaction.atomic(): # required for error management
button = TransactionTemplate.objects.create(**obj_dict)
except IntegrityError as e:
# button with the same name is not possible in NK20.
if "unique" in e.args[0]:
qs = Club.objects.filter(note__note_ptr=MAP_IDBDE[row["destinataire"]]).values('name')
note_name = qs[0]["name"]
#rename button name
obj_dict["name"] ="{} {}".format(obj_dict["name"],note_name)
button = TransactionTemplate.objects.create(**obj_dict)
else:
raise(e)
button.save()
@transaction.atomic
def import_transaction(cur):
idmin=58770
cur.execute("SELECT *, transactions.date AS transac_date\
FROM transactions\
LEFT JOIN adhesions ON transactions.id = adhesions.id\
WHERE transactions.id> {}\
ORDER BY transactions.id;".format(idmin))
N = cur.rowcount
transac_list = []
for idx, row in enumerate(cur):
update_line(idx,N,row["description"])
# some date are set to None, use the previous one
date = row["transac_date"]
obj_dict = {
# "pk": row["id"],
"destination_id" : MAP_IDBDE[row["destinataire"]],
"source_id": MAP_IDBDE[row["emetteur"]],
"created_at":make_aware(date),
"amount":row["montant"],
"quantity":row["quantite"],
"reason":row["description"],
"valid":row["valide"],
}
ttype = row["type"]
if ttype == "don" or ttype == "transfert" or ttype == "invitation":
transac = Transaction.objects.create(**obj_dict)
elif ttype == "bouton":
cat_name = row["categorie"]
if cat_name == None:
cat_name = 'None'
cat, created = TemplateCategory.objects.get_or_create(name=cat_name)
if created:
cat.save()
obj_dict["category"] = cat
transac = RecurrentTransaction.objects.create(**obj_dict)
elif ttype == "crédit" or ttype == "retrait":
field_id = "source_id" if ttype == "crédit" else "destination_id"
if "espèce" in row["description"]:
obj_dict[field_id] = 1
elif "carte" in row["description"]:
obj_dict[field_id] = 2
elif "cheques" in row["description"]:
obj_dict[field_id] = 3
elif "virement" in row["description"]:
obj_dict[field_id] = 4
pk = max(row["destinataire"], row["emetteur"])
actor = Note.objects.get(id=MAP_IDBDE[pk])
# custom fields of SpecialTransaction
if actor.__class__.__name__ == "NoteUser":
obj_dict["first_name"] = actor.user.first_name
obj_dict["last_name"] = actor.user.last_name
elif actor.__class__.__name__ == "NoteClub":
obj_dict["first_name"] = actor.club.name
obj_dict["last_name"] = actor.club.name
else:
raise("You should'nt be there")
transac = SpecialTransaction.objects.create(**obj_dict)
elif ttype == "adhésion":
print("adhesion not supported yet")
else:
print("other type not supported yet")
@transaction.atomic
def import_aliases(cur):
cur.execute("SELECT * FROM aliases ORDER by id")
N = cur.rowcount
for idx, row in enumerate(cur):
update_line(idx,N,row["alias"])
alias_name = row["alias"]
alias_name_good = (alias_name[:252]+'...') if len(alias_name) > 255 else alias_name
obj_dict = {
"note_id":MAP_IDBDE[row["idbde"]],
"name":alias_name_good,
"normalized_name":Alias.normalize(alias_name_good)
}
try:
with transaction.atomic():
alias, created = Alias.objects.get_or_create(**obj_dict)
except IntegrityError as e:
if "unique" in e.args[0]:
continue
else:
raise(e)
alias.save()
class Command(BaseCommand):
"""
Command for importing the database of NK15.
Need to be run by a user with a registered role in postgres for the database nk15.
"""
def print_success(self,to_print):
return self.stdout.write(self.style.SUCCESS(to_print))
def add_arguments(self,parser):
parser.add_argument('-c', '--comptes', action = 'store_true', help="import accounts")
parser.add_argument('-b', '--boutons', action = 'store_true', help="import boutons")
parser.add_argument('-t', '--transactions', action = 'store_true',help="import transaction")
parser.add_argument('-a', '--aliases', action = 'store_true',help="import aliases")
parser.add_argument('-s', '--save', action='store', help="save mapping of idbde")
parser.add_argument('-m', '--map', action='store', help="import mapping of idbde")
parser.add_argument('-d', '--nk15db', action='store', default='nk15', help='NK15 database name')
parser.add_argument('-u', '--nk15user', action='store', default='nk15_user', help='NK15 database owner')
def handle(self, *args, **kwargs):
global MAP_IDBDE
nk15db, nk15user = kwargs['nk15db'], kwargs['nk15user']
# connecting to nk15 database
conn = pg.connect(database=nk15db,user=nk15user)
cur = conn.cursor(cursor_factory = pge.DictCursor)
if kwargs["comptes"]:
#reset database.
call_command("migrate")
call_command("loaddata","initial")
self.print_success("reset nk20 database")
import_comptes(cur)
self.print_success("comptes table imported")
elif kwargs["map"]:
filename = kwargs["map"]
with open(filename,'r') as fp:
MAP_IDBDE = json.load(fp)
MAP_IDBDE = {int(k):int(v) for k,v in MAP_IDBDE.items()}
if kwargs["save"]:
filename = kwargs["save"]
with open(filename,'w') as fp:
json.dump(MAP_IDBDE,fp,sort_keys=True, indent=2)
# /!\ need a prober MAP_IDBDE
if kwargs["boutons"]:
import_boutons(cur)
self.print_success("boutons table imported")
if kwargs["transactions"]:
import_transaction(cur)
self.print_success("transaction imported")
if kwargs["aliases"]:
import_aliases(cur)
self.print_success("aliases imported")