Batch import Function

On utilise bulk_create de Django, mais il faut préparer tous les models
au préalable. C'est *beaucoup* plus rapide.
This commit is contained in:
Pierre-antoine Comby 2020-05-18 13:56:16 +02:00
parent 64e8e88ed3
commit b1366995e0
2 changed files with 283 additions and 0 deletions

View File

@ -0,0 +1,84 @@
#!/usr/bin/env python3
from django.core.management.base import BaseCommand
from collections import defaultdict
from django.apps import apps
from django.db import transaction
from django.contrib.auth.models import User
from django.db.models import Model
from polymorphic.models import PolymorphicModel
class ImportCommand(BaseCommand):
"""
Generic command for import of NK15 database
"""
def print_success(self, to_print):
return self.stdout.write(self.style.SUCCESS(to_print))
def print_error(self, to_print):
return self.stdout.write(self.style.ERROR(to_print))
def update_line(self, n, total, content):
n = str(n)
total = str(total)
n.rjust(len(total))
print(f"\r ({n}/{total}) {content:10.10}", end="")
def create_parser(self, prog_name, subcommand, **kwargs):
parser = super().create_parser(prog_name, subcommand, **kwargs)
parser.add_argument('--nk15db', action='store', default='nk15', help='NK15 database name')
parser.add_argument('--nk15user', action='store', default='nk15_user', help='NK15 database owner')
parser.add_argument('-s', '--save', action='store', help="save mapping of idbde")
parser.add_argument('-m', '--map', action='store', help="import mapping of idbde")
return parser
class BulkCreateManager(object):
"""
This helper class keeps track of ORM objects to be created for multiple
model classes, and automatically creates those objects with `bulk_create`
when the number of objects accumulated for a given model class exceeds
`chunk_size`.
Upon completion of the loop that's `add()`ing objects, the developer must
call `done()` to ensure the final set of objects is created for all models.
"""
def __init__(self, chunk_size=100):
self._create_queues = defaultdict(list)
self.chunk_size = chunk_size
def _commit(self, model_class):
model_key = model_class._meta.label
if model_class.__base__ in [Model, PolymorphicModel] or model_class is User:
model_class.objects.bulk_create(self._create_queues[model_key])
else:
# ensure that parents models exists
self._commit(model_class.__base__)
with transaction.atomic():
for obj in self._create_queues[model_key]:
obj.save_base(raw=True)
self._create_queues[model_key] = []
def add(self, *args):
"""
Add an object to the queue to be created, and call bulk_create if we
have enough objs.
"""
for obj in args:
model_class = type(obj)
model_key = model_class._meta.label
self._create_queues[model_key].append(obj)
if len(self._create_queues[model_key]) >= self.chunk_size:
self._commit(model_class)
def done(self):
"""
Always call this upon completion to make sure the final partial chunk
is saved.
"""
for model_name, objs in self._create_queues.items():
if len(objs) > 0:
self._commit(apps.get_model(model_name))

View File

@ -0,0 +1,199 @@
#!/usr/bin/env python3
import psycopg2 as pg
import psycopg2.extras as pge
import datetime
from django.utils.timezone import make_aware, now
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
from django.db import transaction
from django.db import IntegrityError
from note.models import Note, NoteUser, NoteClub
from note.models import Alias
from member.models import Club, Profile
from ._import_utils import ImportCommand, BulkCreateManager
M_DURATION = 396
M_START = datetime.date(2019, 8, 31)
M_END = datetime.date(2020, 9, 30)
MAP_IDBDE = {
-4: 2, # Carte Bancaire
-3: 4, # Virement
-2: 1, # Especes
-1: 3, # Chèque
0: 5, # BDE
}
note_user_type = ContentType.objects.get(app_label="note", model="noteuser")
note_club_type = ContentType.objects.get(app_label="note", model="noteclub")
class Command(ImportCommand):
"""
Import command for People base data (Comptes, and Aliases)
"""
def add_arguments(self, parser):
parser.add_argument('-a', '--alias', action='store', help="import alias")
parser.add_argument('-c', '--chunk', type=int, default=100, help="chunk size for bulk_create")
@transaction.atomic
def import_account(self, cur, chunk_size):
"""
Import every account of the nk15 in a batch fashion.
Every Model has to be manually created, and no magic `.save()`
function is being called.
"""
cur.execute("SELECT * FROM comptes WHERE idbde > 0 ORDER BY idbde;")
pk_club = 3
pk_user = 1
pk_profile = 1
pk_note = 7 # pk 6 is Kfet!
n = cur.rowcount
bulk_mgr = BulkCreateManager(chunk_size=chunk_size)
pseudo_list = set()
for idx, row in enumerate(cur):
pseudo = row["pseudo"]
pseudo_norm = Alias.normalize(pseudo)
self.update_line(idx, n, pseudo)
# clean pseudo (normalized pseudo must be unique)
if pseudo_norm in pseudo_list:
pseudo = pseudo+str(row["idbde"])
else:
pseudo_list.add(pseudo_norm)
# clean date
note_dict = {
"pk": pk_note,
"balance": 0,
"last_negative": None,
"is_active": True,
"display_image": "",
"created_at": now()
}
if row["last_negatif"] is not None:
note_dict["last_negative"] = make_aware(row["last_negatif"])
if row["type"] == "personne":
# sanitize password
if row["passwd"] != "*|*" and not row["deleted"]:
passwd_nk15 = "$".join(["custom_nk15", "1", row["passwd"]])
else:
passwd_nk15 = ''
obj_dict = {
"pk": pk_user,
"username": row["pseudo"],
"password": passwd_nk15,
"first_name": row["nom"],
"last_name": row["prenom"],
"email": row["mail"],
"is_active": True, # temporary
}
profile_dict = {
"pk": pk_profile,
"user_id": pk_user,
"phone_number": row['tel'],
"address": row['adresse'],
"paid": row['normalien'],
"registration_valid": True,
"email_confirmed": True,
}
note_dict["polymorphic_ctype"] = note_user_type
note_user_dict = {
"pk": pk_note,
"user_id": pk_user,
}
alias_dict = {
"pk": pk_note,
"name": pseudo,
"normalized_name": Alias.normalize(pseudo),
"note_id": pk_note,
}
bulk_mgr.add(User(**obj_dict),
Profile(**profile_dict),
Note(**note_dict),
NoteUser(**note_user_dict),
Alias(**alias_dict),)
pk_user += 1
pk_profile += 1
else: # club
obj_dict = {
"pk": pk_club,
"name": row["pseudo"],
"email": row["mail"],
"membership_duration": M_DURATION,
"membership_start": M_START,
"membership_end": M_END,
"membership_fee_paid": 0,
"membership_fee_unpaid": 0,
}
note_club_dict = {
"pk": pk_note,
"club_id": pk_club,
}
alias_dict = {
"pk": pk_note,
"name": pseudo,
"normalized_name": Alias.normalize(pseudo),
"note_id": pk_note
}
note_dict["polymorphic_ctype"] = note_club_type
bulk_mgr.add(Club(**obj_dict),
Note(**note_dict),
NoteClub(**note_club_dict),
Alias(**alias_dict))
pk_club += 1
# row import completed
MAP_IDBDE[row["idbde"]] = pk_note
pk_note += 1
self.print_success("comptes table imported")
def import_alias(self, cur):
"""
Import Alias from nk15
We rely on validation of the models, but it is slow.
"""
cur.execute("SELECT * FROM aliases ORDER by id")
n = cur.rowcount
for idx, row in enumerate(cur):
self.update_line(idx, n, row["alias"])
alias_name = row["alias"]
alias_name_good = (alias_name[:252] + '...') if len(alias_name) > 255 else alias_name
obj_dict = {
"note_id": MAP_IDBDE[row["idbde"]],
"name": alias_name_good,
"normalized_name": Alias.normalize(alias_name_good),
}
try:
with transaction.atomic():
alias, created = Alias.objects.get_or_create(**obj_dict)
except IntegrityError as e:
if "unique" in e.args[0]:
continue
else:
raise e
alias.save()
return None
def handle(self, *args, **kwargs):
global MAP_IDBDE
# default args, provided by ImportCommand.
nk15db, nk15user = kwargs['nk15db'], kwargs['nk15user']
# connecting to nk15 database
conn = pg.connect(database=nk15db, user=nk15user)
cur = conn.cursor(cursor_factory=pge.DictCursor)
self.import_account(cur,kwargs["chunk"])
if kwargs["save"]:
filename = kwargs["save"]
with open(filename, 'w') as fp:
json.dump(MAP_IDBDE, fp, sort_keys=True, indent=2)
# Alias Management
if kwargs["alias"]:
self.impot_alias(cur)