From b1366995e0539b81c2031d85b4f2b466991267e3 Mon Sep 17 00:00:00 2001 From: Pierre-antoine Comby Date: Mon, 18 May 2020 13:56:16 +0200 Subject: [PATCH] Batch import Function MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On utilise bulk_create de Django, mais il faut préparer tous les models au préalable. C'est *beaucoup* plus rapide. --- management/commands/_import_utils.py | 84 +++++++++++ management/commands/import_account.py | 199 ++++++++++++++++++++++++++ 2 files changed, 283 insertions(+) create mode 100644 management/commands/_import_utils.py create mode 100644 management/commands/import_account.py diff --git a/management/commands/_import_utils.py b/management/commands/_import_utils.py new file mode 100644 index 0000000..80c7b59 --- /dev/null +++ b/management/commands/_import_utils.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python3 + +from django.core.management.base import BaseCommand +from collections import defaultdict +from django.apps import apps +from django.db import transaction + +from django.contrib.auth.models import User +from django.db.models import Model +from polymorphic.models import PolymorphicModel + + +class ImportCommand(BaseCommand): + """ + Generic command for import of NK15 database + """ + + def print_success(self, to_print): + return self.stdout.write(self.style.SUCCESS(to_print)) + + def print_error(self, to_print): + return self.stdout.write(self.style.ERROR(to_print)) + + def update_line(self, n, total, content): + n = str(n) + total = str(total) + n.rjust(len(total)) + print(f"\r ({n}/{total}) {content:10.10}", end="") + + def create_parser(self, prog_name, subcommand, **kwargs): + parser = super().create_parser(prog_name, subcommand, **kwargs) + parser.add_argument('--nk15db', action='store', default='nk15', help='NK15 database name') + parser.add_argument('--nk15user', action='store', default='nk15_user', help='NK15 database owner') + parser.add_argument('-s', '--save', action='store', help="save mapping of idbde") + parser.add_argument('-m', '--map', action='store', help="import mapping of idbde") + return parser + + +class BulkCreateManager(object): + """ + This helper class keeps track of ORM objects to be created for multiple + model classes, and automatically creates those objects with `bulk_create` + when the number of objects accumulated for a given model class exceeds + `chunk_size`. + Upon completion of the loop that's `add()`ing objects, the developer must + call `done()` to ensure the final set of objects is created for all models. + """ + + def __init__(self, chunk_size=100): + self._create_queues = defaultdict(list) + self.chunk_size = chunk_size + + def _commit(self, model_class): + model_key = model_class._meta.label + if model_class.__base__ in [Model, PolymorphicModel] or model_class is User: + model_class.objects.bulk_create(self._create_queues[model_key]) + else: + # ensure that parents models exists + self._commit(model_class.__base__) + with transaction.atomic(): + for obj in self._create_queues[model_key]: + obj.save_base(raw=True) + self._create_queues[model_key] = [] + + def add(self, *args): + """ + Add an object to the queue to be created, and call bulk_create if we + have enough objs. + """ + for obj in args: + model_class = type(obj) + model_key = model_class._meta.label + self._create_queues[model_key].append(obj) + if len(self._create_queues[model_key]) >= self.chunk_size: + self._commit(model_class) + + def done(self): + """ + Always call this upon completion to make sure the final partial chunk + is saved. + """ + for model_name, objs in self._create_queues.items(): + if len(objs) > 0: + self._commit(apps.get_model(model_name)) diff --git a/management/commands/import_account.py b/management/commands/import_account.py new file mode 100644 index 0000000..b173265 --- /dev/null +++ b/management/commands/import_account.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 + +import psycopg2 as pg +import psycopg2.extras as pge +import datetime + +from django.utils.timezone import make_aware, now +from django.contrib.auth.models import User +from django.contrib.contenttypes.models import ContentType + +from django.db import transaction +from django.db import IntegrityError + +from note.models import Note, NoteUser, NoteClub +from note.models import Alias +from member.models import Club, Profile + +from ._import_utils import ImportCommand, BulkCreateManager + +M_DURATION = 396 +M_START = datetime.date(2019, 8, 31) +M_END = datetime.date(2020, 9, 30) + +MAP_IDBDE = { + -4: 2, # Carte Bancaire + -3: 4, # Virement + -2: 1, # Especes + -1: 3, # Chèque + 0: 5, # BDE +} +note_user_type = ContentType.objects.get(app_label="note", model="noteuser") +note_club_type = ContentType.objects.get(app_label="note", model="noteclub") + + +class Command(ImportCommand): + """ + Import command for People base data (Comptes, and Aliases) + """ + + def add_arguments(self, parser): + parser.add_argument('-a', '--alias', action='store', help="import alias") + parser.add_argument('-c', '--chunk', type=int, default=100, help="chunk size for bulk_create") + + @transaction.atomic + def import_account(self, cur, chunk_size): + """ + Import every account of the nk15 in a batch fashion. + Every Model has to be manually created, and no magic `.save()` + function is being called. + """ + cur.execute("SELECT * FROM comptes WHERE idbde > 0 ORDER BY idbde;") + pk_club = 3 + pk_user = 1 + pk_profile = 1 + pk_note = 7 # pk 6 is Kfet! + n = cur.rowcount + + bulk_mgr = BulkCreateManager(chunk_size=chunk_size) + pseudo_list = set() + for idx, row in enumerate(cur): + pseudo = row["pseudo"] + pseudo_norm = Alias.normalize(pseudo) + self.update_line(idx, n, pseudo) + # clean pseudo (normalized pseudo must be unique) + if pseudo_norm in pseudo_list: + pseudo = pseudo+str(row["idbde"]) + else: + pseudo_list.add(pseudo_norm) + # clean date + note_dict = { + "pk": pk_note, + "balance": 0, + "last_negative": None, + "is_active": True, + "display_image": "", + "created_at": now() + } + if row["last_negatif"] is not None: + note_dict["last_negative"] = make_aware(row["last_negatif"]) + if row["type"] == "personne": + # sanitize password + if row["passwd"] != "*|*" and not row["deleted"]: + passwd_nk15 = "$".join(["custom_nk15", "1", row["passwd"]]) + else: + passwd_nk15 = '' + + obj_dict = { + "pk": pk_user, + "username": row["pseudo"], + "password": passwd_nk15, + "first_name": row["nom"], + "last_name": row["prenom"], + "email": row["mail"], + "is_active": True, # temporary + } + profile_dict = { + "pk": pk_profile, + "user_id": pk_user, + "phone_number": row['tel'], + "address": row['adresse'], + "paid": row['normalien'], + "registration_valid": True, + "email_confirmed": True, + } + note_dict["polymorphic_ctype"] = note_user_type + note_user_dict = { + "pk": pk_note, + "user_id": pk_user, + } + alias_dict = { + "pk": pk_note, + "name": pseudo, + "normalized_name": Alias.normalize(pseudo), + "note_id": pk_note, + } + + bulk_mgr.add(User(**obj_dict), + Profile(**profile_dict), + Note(**note_dict), + NoteUser(**note_user_dict), + Alias(**alias_dict),) + pk_user += 1 + pk_profile += 1 + + else: # club + obj_dict = { + "pk": pk_club, + "name": row["pseudo"], + "email": row["mail"], + "membership_duration": M_DURATION, + "membership_start": M_START, + "membership_end": M_END, + "membership_fee_paid": 0, + "membership_fee_unpaid": 0, + } + note_club_dict = { + "pk": pk_note, + "club_id": pk_club, + } + alias_dict = { + "pk": pk_note, + "name": pseudo, + "normalized_name": Alias.normalize(pseudo), + "note_id": pk_note + } + note_dict["polymorphic_ctype"] = note_club_type + bulk_mgr.add(Club(**obj_dict), + Note(**note_dict), + NoteClub(**note_club_dict), + Alias(**alias_dict)) + pk_club += 1 + # row import completed + MAP_IDBDE[row["idbde"]] = pk_note + pk_note += 1 + self.print_success("comptes table imported") + + def import_alias(self, cur): + """ + Import Alias from nk15 + We rely on validation of the models, but it is slow. + """ + cur.execute("SELECT * FROM aliases ORDER by id") + n = cur.rowcount + for idx, row in enumerate(cur): + self.update_line(idx, n, row["alias"]) + alias_name = row["alias"] + alias_name_good = (alias_name[:252] + '...') if len(alias_name) > 255 else alias_name + obj_dict = { + "note_id": MAP_IDBDE[row["idbde"]], + "name": alias_name_good, + "normalized_name": Alias.normalize(alias_name_good), + } + try: + with transaction.atomic(): + alias, created = Alias.objects.get_or_create(**obj_dict) + except IntegrityError as e: + if "unique" in e.args[0]: + continue + else: + raise e + alias.save() + return None + + def handle(self, *args, **kwargs): + global MAP_IDBDE + # default args, provided by ImportCommand. + nk15db, nk15user = kwargs['nk15db'], kwargs['nk15user'] + # connecting to nk15 database + conn = pg.connect(database=nk15db, user=nk15user) + cur = conn.cursor(cursor_factory=pge.DictCursor) + + self.import_account(cur,kwargs["chunk"]) + if kwargs["save"]: + filename = kwargs["save"] + with open(filename, 'w') as fp: + json.dump(MAP_IDBDE, fp, sort_keys=True, indent=2) + # Alias Management + if kwargs["alias"]: + self.impot_alias(cur)