#!/usr/bin/env python3 import json from dolibarrpy import Dolibarrpy from flask import Flask, abort, request from ldap3 import ALL, Connection, ObjectDef, Reader, Server, WritableEntry, Writer import config def main(): dolibarr_client = Dolibarrpy(url=config.DOLIBARR_API_BASE, token=config.DOLIBARR_API_TOKEN, timeout=16, debug=config.DOLIBARR_API_DEBUG) ldap_server = Server(config.LDAP_HOST, config.LDAP_PORT, get_info=ALL) with Connection(ldap_server, config.LDAP_BIND_USER, config.LDAP_BIND_PASSWORD) as ldap_conn: if config.LDAP_USERS_EXTRA_FIELDS or config.LDAP_USERS_EXTRA_OBJECT_CLASSES: manage_users_extra_fields(ldap_conn, dolibarr_client) if config.LDAP_GROUPS_EXTRA_FIELDS or config.LDAP_GROUPS_EXTRA_OBJECT_CLASSES: manage_groups_extra_fields(ldap_conn, dolibarr_client) def manage_users_extra_fields(ldap_conn: Connection, dolibarr_client: Dolibarrpy): dolibarr_users = dolibarr_client.find_all_users() for dolibarr_user in dolibarr_users: manage_user_extra_fields(ldap_conn, dolibarr_user, dolibarr_client) def manage_user_extra_fields(ldap_conn: Connection, dolibarr_user: dict, dolibarr_client: Dolibarrpy, /, manage_user_attrs: bool = True, manage_group_attrs: bool = True, oldgroupid: int | None = None, newgroupid: int | None = None, new_group: dict | None = None): login = dolibarr_user['login'] obj_inetorgperson = ObjectDef(['top', 'inetOrgPerson', 'posixAccount'], ldap_conn) obj_user = ObjectDef(['top', 'inetOrgPerson', 'posixAccount'] + config.LDAP_GROUPS_EXTRA_OBJECT_CLASSES, ldap_conn) users_reader = Reader(ldap_conn, obj_inetorgperson, config.LDAP_USERS_OU, f"uid:={login}") users_reader.search() users_writer = Writer.from_cursor(users_reader, object_def=obj_user) if users_writer.entries: ldap_user = users_writer[0] else: attrs = { 'cn': f"{dolibarr_user['firstname']} {dolibarr_user['lastname']}".strip(), 'givenName': dolibarr_user['firstname'], 'sn': dolibarr_user['lastname'], 'mail': dolibarr_user['email'], 'street': dolibarr_user['address'], 'postalCode': dolibarr_user['zip'], 'l': dolibarr_user['town'], 'mobile': dolibarr_user['user_mobile'], 'uidNumber': dolibarr_user['id'], 'gidNumber': dolibarr_user['id'], 'homeDirectory': f"/home/{login}", } for key, value in list(attrs.items()): if not value: del attrs[key] ldap_conn.add(f"uid={login},{config.LDAP_USERS_OU}", ["top", "inetOrgPerson", "posixAccount", "shadowAccount"], attrs) users_reader.search() users_writer = Writer.from_cursor(users_reader, object_def=obj_user) ldap_user = users_writer[0] if manage_user_attrs: append_extra_fields_to_ldap_user(ldap_user, dolibarr_user) if manage_group_attrs: append_extra_group_fields_to_ldap_user(ldap_user, dolibarr_user, dolibarr_client, oldgroupid=oldgroupid, newgroupid=newgroupid, new_group=new_group) users_writer.commit() def append_extra_fields_to_ldap_user(ldap_user: WritableEntry, dolibarr_user: dict): for extra_object_class in config.LDAP_USERS_EXTRA_OBJECT_CLASSES: if extra_object_class not in ldap_user.objectClass: ldap_user.objectClass += extra_object_class for extra_field in config.LDAP_USERS_EXTRA_FIELDS: dolibarr_attrs, ldap_attr = extra_field.split('::') dolibarr_attrs = dolibarr_attrs.split('+') values = [] if dolibarr_attrs[0].startswith('GROUP__'): continue for dolibarr_attr in dolibarr_attrs: if dolibarr_attr.endswith('[]'): dolibarr_attr = dolibarr_attr[:-2] value = dolibarr_user.get(dolibarr_attr, None) or dolibarr_user['array_options'].get(f'options_{dolibarr_attr}', None) value = value.split() if value else [] values += value else: if dolibarr_attr.startswith("'") and dolibarr_attr.endswith("'"): value = dolibarr_attr[1:-1] else: value = dolibarr_user.get(dolibarr_attr, None) or dolibarr_user['array_options'].get(f'options_{dolibarr_attr}', None) if value: values.append(value) if ldap_attr.endswith('[]'): ldap_attr = ldap_attr[:-2] value = values else: value = "".join(map(str, values)) setattr(ldap_user, ldap_attr, value) def append_extra_group_fields_to_ldap_user(ldap_user: WritableEntry, dolibarr_user: dict, dolibarr_client: Dolibarrpy, /, oldgroupid: int | None = None, newgroupid: int | None = None, new_group: dict | None = None): if not any(dolibarr_attr.startswith('GROUP') for extra_field in config.LDAP_USERS_EXTRA_FIELDS for dolibarr_attr in extra_field.split('::')[0].split('|')): return user_id = dolibarr_user['id'] dolibarr_groups: list[dict] = dolibarr_client.get_user_groups_uid(user_id) if not isinstance(dolibarr_groups, list): dolibarr_groups = [] if oldgroupid: dolibarr_groups = [group for group in dolibarr_groups if group['id'] != oldgroupid] if newgroupid: dolibarr_groups.append(dolibarr_client.call_get_api('users/groups', newgroupid)) if new_group: dolibarr_groups = [group for group in dolibarr_groups if group['id'] != new_group['id']] + [new_group] if not dolibarr_groups: return for extra_field in config.LDAP_USERS_EXTRA_FIELDS: dolibarr_attrs, ldap_attr = extra_field.split('::') dolibarr_attrs = dolibarr_attrs.split('+') values = [] if not dolibarr_attrs[0].startswith('GROUP__'): continue for dolibarr_attr in dolibarr_attrs: dolibarr_attr = dolibarr_attr[7:] for dolibarr_group in dolibarr_groups: if dolibarr_attr.endswith('[]'): attr = dolibarr_attr[:-2] value = dolibarr_group.get(attr, None) or dolibarr_group['array_options'].get(f'options_{attr}', None) value = value.split() if value else [] values += value else: value = dolibarr_group.get(dolibarr_attr, None) or dolibarr_group['array_options'].get(f'options_{dolibarr_attr}', None) if value: values.append(value) break # Don't concatenate the value for multiple groups if ldap_attr.endswith('[]'): ldap_attr = ldap_attr[:-2] value = values else: value = "".join(map(str, values)) setattr(ldap_user, ldap_attr, value) def manage_groups_extra_fields(ldap_conn: Connection, dolibarr_client: Dolibarrpy): dolibarr_groups = dolibarr_client.call_list_api('users/groups') for dolibarr_group in dolibarr_groups: manage_group_extra_fields(ldap_conn, dolibarr_group) def manage_group_extra_fields(ldap_conn: Connection, dolibarr_group: dict): name = dolibarr_group['name'] obj_posixgroup = ObjectDef(['posixGroup'], ldap_conn) obj_group = ObjectDef(['posixGroup'] + config.LDAP_GROUPS_EXTRA_OBJECT_CLASSES, ldap_conn) groups_reader = Reader(ldap_conn, obj_posixgroup, config.LDAP_GROUPS_OU, f"cn:={name}") groups_reader.search() groups_writer = Writer.from_cursor(groups_reader, object_def=obj_group) if groups_writer.entries: ldap_group = groups_writer[0] else: attrs = { 'cn': name, 'gidNumber': dolibarr_group['id'], } for key, value in list(attrs.items()): if not value: del attrs[key] ldap_conn.add(f"cn={name},{config.LDAP_GROUPS_OU}", ["top", "posixGroup"], attrs) groups_reader.search() groups_writer = Writer.from_cursor(groups_reader, object_def=obj_group) ldap_group = groups_writer[0] append_extra_fields_to_ldap_group(ldap_group, dolibarr_group) groups_writer.commit() def append_extra_fields_to_ldap_group(ldap_group: WritableEntry, dolibarr_group: dict): for extra_object_class in config.LDAP_GROUPS_EXTRA_OBJECT_CLASSES: if extra_object_class not in ldap_group.objectClass: ldap_group.objectClass += extra_object_class for extra_field in config.LDAP_GROUPS_EXTRA_FIELDS: dolibarr_attrs, ldap_attr = extra_field.split('::') dolibarr_attrs = dolibarr_attrs.split('+') values = [] for dolibarr_attr in dolibarr_attrs: if dolibarr_attr.endswith('[]'): dolibarr_attr = dolibarr_attr[:-2] value = dolibarr_group.get(dolibarr_attr, None) or dolibarr_group['array_options'].get(f'options_{dolibarr_attr}', None) value = value.split() if value else [] values += value else: if dolibarr_attr.startswith("'") and dolibarr_attr.endswith("'"): value = dolibarr_attr[1:-1] else: value = dolibarr_group.get(dolibarr_attr, None) or dolibarr_group['array_options'].get(f'options_{dolibarr_attr}', None) if value: values.append(value) if ldap_attr.endswith('[]'): ldap_attr = ldap_attr[:-2] value = values else: value = "".join(map(str, values)) setattr(ldap_group, ldap_attr, value) flask_app = Flask(__name__) @flask_app.post('/webhook') def webhook_receiver(): data = request.json if 'triggercode' not in data or 'object' not in data: abort(400) triggercode = data['triggercode'] obj = data['object'] if config.DOLIBARR_API_DEBUG: print("Received webhook trigger of type", triggercode, "with content:") print(json.dumps(obj, indent=4)) dolibarr_client = Dolibarrpy(url=config.DOLIBARR_API_BASE, token=config.DOLIBARR_API_TOKEN, timeout=16, debug=config.DOLIBARR_API_DEBUG) ldap_server = Server(config.LDAP_HOST, config.LDAP_PORT, get_info=ALL) if triggercode.startswith('USER_'): oldgid, newgid = None, None if 'context' in obj and obj['context']: audit = obj['context']['audit'] if audit == "UserSetInGroup": newgid = obj['context']['newgroupid'] elif audit == "UserRemovedFromGroup": oldgid = obj['context']['oldgroupid'] with Connection(ldap_server, config.LDAP_BIND_USER, config.LDAP_BIND_PASSWORD) as ldap_conn: manage_user_extra_fields(ldap_conn, obj, dolibarr_client, oldgroupid=oldgid, newgroupid=newgid) elif triggercode.startswith('USERGROUP_'): with Connection(ldap_server, config.LDAP_BIND_USER, config.LDAP_BIND_PASSWORD) as ldap_conn: manage_group_extra_fields(ldap_conn, obj) group_members = obj['members'] or dict() for group_member in group_members.values(): manage_user_extra_fields(ldap_conn, group_member, dolibarr_client, manage_user_attrs=False, new_group=obj) else: abort(400) return "", 204 if __name__ == '__main__': main()