Emmy D'Anello 6cfd51a294
None by default when a Dolibarr attr does not exist
Signed-off-by: Emmy D'Anello <emmy@luemy.eu>
2025-03-02 17:01:30 +01:00

246 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
import json
from dolibarrpy import Dolibarrpy
from flask import Flask, abort, request
from ldap3 import ALL, Connection, ObjectDef, Reader, Server, WritableEntry, Writer
import config
def main():
dolibarr_client = Dolibarrpy(url=config.DOLIBARR_API_BASE, token=config.DOLIBARR_API_TOKEN, timeout=16, debug=config.DOLIBARR_API_DEBUG)
ldap_server = Server(config.LDAP_HOST, config.LDAP_PORT, get_info=ALL)
with Connection(ldap_server, config.LDAP_BIND_USER, config.LDAP_BIND_PASSWORD) as ldap_conn:
if config.LDAP_USERS_EXTRA_FIELDS or config.LDAP_USERS_EXTRA_OBJECT_CLASSES:
manage_users_extra_fields(ldap_conn, dolibarr_client)
if config.LDAP_GROUPS_EXTRA_FIELDS or config.LDAP_GROUPS_EXTRA_OBJECT_CLASSES:
manage_groups_extra_fields(ldap_conn, dolibarr_client)
def manage_users_extra_fields(ldap_conn: Connection, dolibarr_client: Dolibarrpy):
dolibarr_users = dolibarr_client.find_all_users()
for dolibarr_user in dolibarr_users:
manage_user_extra_fields(ldap_conn, dolibarr_user, dolibarr_client)
def manage_user_extra_fields(ldap_conn: Connection, dolibarr_user: dict, dolibarr_client: Dolibarrpy, /,
manage_user_attrs: bool = True, manage_group_attrs: bool = True,
oldgroupid: int | None = None, newgroupid: int | None = None, new_group: dict | None = None):
login = dolibarr_user['login']
obj_inetorgperson = ObjectDef(['top', 'inetOrgPerson', 'posixAccount'], ldap_conn)
obj_user = ObjectDef(['top', 'inetOrgPerson', 'posixAccount'] + config.LDAP_GROUPS_EXTRA_OBJECT_CLASSES, ldap_conn)
users_reader = Reader(ldap_conn, obj_inetorgperson, config.LDAP_USERS_OU, f"uid:={login}")
users_reader.search()
users_writer = Writer.from_cursor(users_reader, object_def=obj_user)
if users_writer.entries:
ldap_user = users_writer[0]
else:
attrs = {
'cn': f"{dolibarr_user['firstname']} {dolibarr_user['lastname']}".strip(),
'givenName': dolibarr_user['firstname'],
'sn': dolibarr_user['lastname'],
'mail': dolibarr_user['email'],
'street': dolibarr_user['address'],
'postalCode': dolibarr_user['zip'],
'l': dolibarr_user['town'],
'mobile': dolibarr_user['user_mobile'],
'uidNumber': dolibarr_user['id'],
'gidNumber': dolibarr_user['id'],
'homeDirectory': f"/home/{login}",
}
for key, value in list(attrs.items()):
if not value:
del attrs[key]
ldap_conn.add(f"uid={login},{config.LDAP_USERS_OU}", ["top", "inetOrgPerson", "posixAccount", "shadowAccount"], attrs)
users_reader.search()
users_writer = Writer.from_cursor(users_reader, object_def=obj_user)
ldap_user = users_writer[0]
if manage_user_attrs:
append_extra_fields_to_ldap_user(ldap_user, dolibarr_user)
if manage_group_attrs:
append_extra_group_fields_to_ldap_user(ldap_user, dolibarr_user, dolibarr_client, oldgroupid=oldgroupid, newgroupid=newgroupid, new_group=new_group)
users_writer.commit()
def append_extra_fields_to_ldap_user(ldap_user: WritableEntry, dolibarr_user: dict):
for extra_object_class in config.LDAP_USERS_EXTRA_OBJECT_CLASSES:
if extra_object_class not in ldap_user.objectClass:
ldap_user.objectClass += extra_object_class
for extra_field in config.LDAP_USERS_EXTRA_FIELDS:
dolibarr_attrs, ldap_attr = extra_field.split('::')
dolibarr_attrs = dolibarr_attrs.split('+')
values = []
if dolibarr_attrs[0].startswith('GROUP__'):
continue
for dolibarr_attr in dolibarr_attrs:
if dolibarr_attr.endswith('[]'):
dolibarr_attr = dolibarr_attr[:-2]
value = dolibarr_user.get(dolibarr_attr, None) or dolibarr_user['array_options'].get(f'options_{dolibarr_attr}', None)
value = value.split() if value else []
values += value
else:
if dolibarr_attr.startswith("'") and dolibarr_attr.endswith("'"):
value = dolibarr_attr[1:-1]
else:
value = dolibarr_user.get(dolibarr_attr, None) or dolibarr_user['array_options'].get(f'options_{dolibarr_attr}', None)
if value:
values.append(value)
if ldap_attr.endswith('[]'):
ldap_attr = ldap_attr[:-2]
value = values
else:
value = "".join(map(str, values))
setattr(ldap_user, ldap_attr, value)
def append_extra_group_fields_to_ldap_user(ldap_user: WritableEntry, dolibarr_user: dict, dolibarr_client: Dolibarrpy, /,
oldgroupid: int | None = None, newgroupid: int | None = None, new_group: dict | None = None):
if not any(dolibarr_attr.startswith('GROUP')
for extra_field in config.LDAP_USERS_EXTRA_FIELDS
for dolibarr_attr in extra_field.split('::')[0].split('|')):
return
user_id = dolibarr_user['id']
dolibarr_groups: list[dict] = dolibarr_client.get_user_groups_uid(user_id)
if not isinstance(dolibarr_groups, list):
dolibarr_groups = []
if oldgroupid:
dolibarr_groups = [group for group in dolibarr_groups if group['id'] != oldgroupid]
if newgroupid:
dolibarr_groups.append(dolibarr_client.call_get_api('users/groups', newgroupid))
if new_group:
dolibarr_groups = [group for group in dolibarr_groups if group['id'] != new_group['id']] + [new_group]
if not dolibarr_groups:
return
for extra_field in config.LDAP_USERS_EXTRA_FIELDS:
dolibarr_attrs, ldap_attr = extra_field.split('::')
dolibarr_attrs = dolibarr_attrs.split('+')
values = []
if not dolibarr_attrs[0].startswith('GROUP__'):
continue
for dolibarr_attr in dolibarr_attrs:
dolibarr_attr = dolibarr_attr[7:]
for dolibarr_group in dolibarr_groups:
if dolibarr_attr.endswith('[]'):
attr = dolibarr_attr[:-2]
value = dolibarr_group.get(attr, None) or dolibarr_group['array_options'].get(f'options_{attr}', None)
value = value.split() if value else []
values += value
else:
value = dolibarr_group.get(dolibarr_attr, None) or dolibarr_group['array_options'].get(f'options_{dolibarr_attr}', None)
if value:
values.append(value)
break # Don't concatenate the value for multiple groups
if ldap_attr.endswith('[]'):
ldap_attr = ldap_attr[:-2]
value = values
else:
value = "".join(map(str, values))
setattr(ldap_user, ldap_attr, value)
def manage_groups_extra_fields(ldap_conn: Connection, dolibarr_client: Dolibarrpy):
dolibarr_groups = dolibarr_client.call_list_api('users/groups')
for dolibarr_group in dolibarr_groups:
manage_group_extra_fields(ldap_conn, dolibarr_group)
def manage_group_extra_fields(ldap_conn: Connection, dolibarr_group: dict):
name = dolibarr_group['name']
obj_posixgroup = ObjectDef(['posixGroup'], ldap_conn)
obj_group = ObjectDef(['posixGroup'] + config.LDAP_GROUPS_EXTRA_OBJECT_CLASSES, ldap_conn)
groups_reader = Reader(ldap_conn, obj_posixgroup, config.LDAP_GROUPS_OU, f"cn:={name}")
groups_reader.search()
groups_writer = Writer.from_cursor(groups_reader, object_def=obj_group)
if groups_writer.entries:
ldap_group = groups_writer[0]
else:
attrs = {
'cn': name,
'gidNumber': dolibarr_group['id'],
}
for key, value in list(attrs.items()):
if not value:
del attrs[key]
ldap_conn.add(f"cn={name},{config.LDAP_GROUPS_OU}", ["top", "posixGroup"], attrs)
groups_reader.search()
groups_writer = Writer.from_cursor(groups_reader, object_def=obj_group)
ldap_group = groups_writer[0]
append_extra_fields_to_ldap_group(ldap_group, dolibarr_group)
groups_writer.commit()
def append_extra_fields_to_ldap_group(ldap_group: WritableEntry, dolibarr_group: dict):
for extra_object_class in config.LDAP_GROUPS_EXTRA_OBJECT_CLASSES:
if extra_object_class not in ldap_group.objectClass:
ldap_group.objectClass += extra_object_class
for extra_field in config.LDAP_GROUPS_EXTRA_FIELDS:
dolibarr_attrs, ldap_attr = extra_field.split('::')
dolibarr_attrs = dolibarr_attrs.split('+')
values = []
for dolibarr_attr in dolibarr_attrs:
if dolibarr_attr.endswith('[]'):
dolibarr_attr = dolibarr_attr[:-2]
value = dolibarr_group.get(dolibarr_attr, None) or dolibarr_group['array_options'].get(f'options_{dolibarr_attr}', None)
value = value.split() if value else []
values += value
else:
if dolibarr_attr.startswith("'") and dolibarr_attr.endswith("'"):
value = dolibarr_attr[1:-1]
else:
value = dolibarr_group.get(dolibarr_attr, None) or dolibarr_group['array_options'].get(f'options_{dolibarr_attr}', None)
if value:
values.append(value)
if ldap_attr.endswith('[]'):
ldap_attr = ldap_attr[:-2]
value = values
else:
value = "".join(map(str, values))
setattr(ldap_group, ldap_attr, value)
flask_app = Flask(__name__)
@flask_app.post('/webhook')
def webhook_receiver():
data = request.json
if 'triggercode' not in data or 'object' not in data:
abort(400)
triggercode = data['triggercode']
obj = data['object']
if config.DOLIBARR_API_DEBUG:
print("Received webhook trigger of type", triggercode, "with content:")
print(json.dumps(obj, indent=4))
dolibarr_client = Dolibarrpy(url=config.DOLIBARR_API_BASE, token=config.DOLIBARR_API_TOKEN, timeout=16, debug=config.DOLIBARR_API_DEBUG)
ldap_server = Server(config.LDAP_HOST, config.LDAP_PORT, get_info=ALL)
if triggercode.startswith('USER_'):
oldgid, newgid = None, None
if 'context' in obj and obj['context']:
audit = obj['context']['audit']
if audit == "UserSetInGroup":
newgid = obj['context']['newgroupid']
elif audit == "UserRemovedFromGroup":
oldgid = obj['context']['oldgroupid']
with Connection(ldap_server, config.LDAP_BIND_USER, config.LDAP_BIND_PASSWORD) as ldap_conn:
manage_user_extra_fields(ldap_conn, obj, dolibarr_client, oldgroupid=oldgid, newgroupid=newgid)
elif triggercode.startswith('USERGROUP_'):
with Connection(ldap_server, config.LDAP_BIND_USER, config.LDAP_BIND_PASSWORD) as ldap_conn:
manage_group_extra_fields(ldap_conn, obj)
group_members = obj['members'] or dict()
for group_member in group_members.values():
manage_user_extra_fields(ldap_conn, group_member, dolibarr_client, manage_user_attrs=False, new_group=obj)
else:
abort(400)
return "", 204
if __name__ == '__main__':
main()