Compare commits
	
		
			8 Commits
		
	
	
		
			4907b2d7f5
			...
			main
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 6cfd51a294 | |||
| c093dc02f2 | |||
| f2e94d53fc | |||
| 27f3145857 | |||
| 94dd261d15 | |||
| ce34b1f805 | |||
| de5483107a | |||
| 6145384f04 | 
							
								
								
									
										136
									
								
								main.py
									
									
									
									
									
								
							
							
						
						
									
										136
									
								
								main.py
									
									
									
									
									
								
							| @@ -1,5 +1,7 @@ | ||||
| #!/usr/bin/env python3 | ||||
|  | ||||
| import json | ||||
|  | ||||
| from dolibarrpy import Dolibarrpy | ||||
| from flask import Flask, abort, request | ||||
| from ldap3 import ALL, Connection, ObjectDef, Reader, Server, WritableEntry, Writer | ||||
| @@ -22,10 +24,12 @@ def main(): | ||||
| def manage_users_extra_fields(ldap_conn: Connection, dolibarr_client: Dolibarrpy): | ||||
|     dolibarr_users = dolibarr_client.find_all_users() | ||||
|     for dolibarr_user in dolibarr_users: | ||||
|         manage_user_extra_fields(ldap_conn, dolibarr_user) | ||||
|         manage_user_extra_fields(ldap_conn, dolibarr_user, dolibarr_client) | ||||
|  | ||||
|  | ||||
| def manage_user_extra_fields(ldap_conn: Connection, dolibarr_user: dict): | ||||
| def manage_user_extra_fields(ldap_conn: Connection, dolibarr_user: dict, dolibarr_client: Dolibarrpy, /, | ||||
|                              manage_user_attrs: bool = True, manage_group_attrs: bool = True, | ||||
|                              oldgroupid: int | None = None, newgroupid: int | None = None, new_group: dict | None = None): | ||||
|     login = dolibarr_user['login'] | ||||
|     obj_inetorgperson = ObjectDef(['top', 'inetOrgPerson', 'posixAccount'], ldap_conn) | ||||
|     obj_user = ObjectDef(['top', 'inetOrgPerson', 'posixAccount'] + config.LDAP_GROUPS_EXTRA_OBJECT_CLASSES, ldap_conn) | ||||
| @@ -55,7 +59,10 @@ def manage_user_extra_fields(ldap_conn: Connection, dolibarr_user: dict): | ||||
|         users_reader.search() | ||||
|         users_writer = Writer.from_cursor(users_reader, object_def=obj_user) | ||||
|         ldap_user = users_writer[0] | ||||
|     append_extra_fields_to_ldap_user(ldap_user, dolibarr_user) | ||||
|     if manage_user_attrs: | ||||
|         append_extra_fields_to_ldap_user(ldap_user, dolibarr_user) | ||||
|     if manage_group_attrs: | ||||
|         append_extra_group_fields_to_ldap_user(ldap_user, dolibarr_user, dolibarr_client, oldgroupid=oldgroupid, newgroupid=newgroupid, new_group=new_group) | ||||
|     users_writer.commit() | ||||
|  | ||||
|  | ||||
| @@ -65,15 +72,78 @@ def append_extra_fields_to_ldap_user(ldap_user: WritableEntry, dolibarr_user: di | ||||
|             ldap_user.objectClass += extra_object_class | ||||
|  | ||||
|     for extra_field in config.LDAP_USERS_EXTRA_FIELDS: | ||||
|         dolibarr_attr, ldap_attr = extra_field.split(':') | ||||
|         if dolibarr_attr.endswith('[]'): | ||||
|             dolibarr_attr = dolibarr_attr[:-2] | ||||
|             value = dolibarr_user['array_options'][f'options_{dolibarr_attr}'] | ||||
|             value = value.split() if value else [] | ||||
|             setattr(ldap_user, ldap_attr, value) | ||||
|         dolibarr_attrs, ldap_attr = extra_field.split('::') | ||||
|         dolibarr_attrs = dolibarr_attrs.split('+') | ||||
|         values = [] | ||||
|         if dolibarr_attrs[0].startswith('GROUP__'): | ||||
|             continue | ||||
|         for dolibarr_attr in dolibarr_attrs: | ||||
|             if dolibarr_attr.endswith('[]'): | ||||
|                 dolibarr_attr = dolibarr_attr[:-2] | ||||
|                 value = dolibarr_user.get(dolibarr_attr, None) or dolibarr_user['array_options'].get(f'options_{dolibarr_attr}', None) | ||||
|                 value = value.split() if value else [] | ||||
|                 values += value | ||||
|             else: | ||||
|                 if dolibarr_attr.startswith("'") and dolibarr_attr.endswith("'"): | ||||
|                     value = dolibarr_attr[1:-1] | ||||
|                 else: | ||||
|                     value = dolibarr_user.get(dolibarr_attr, None) or dolibarr_user['array_options'].get(f'options_{dolibarr_attr}', None) | ||||
|                 if value: | ||||
|                     values.append(value) | ||||
|         if ldap_attr.endswith('[]'): | ||||
|             ldap_attr = ldap_attr[:-2] | ||||
|             value = values | ||||
|         else: | ||||
|             value = dolibarr_user['array_options'][f'options_{dolibarr_attr}'] or "" | ||||
|             setattr(ldap_user, ldap_attr, value) | ||||
|             value = "".join(map(str, values)) | ||||
|         setattr(ldap_user, ldap_attr, value) | ||||
|  | ||||
|  | ||||
| def append_extra_group_fields_to_ldap_user(ldap_user: WritableEntry, dolibarr_user: dict, dolibarr_client: Dolibarrpy, /, | ||||
|                                            oldgroupid: int | None = None, newgroupid: int | None = None, new_group: dict | None = None): | ||||
|     if not any(dolibarr_attr.startswith('GROUP') | ||||
|                for extra_field in config.LDAP_USERS_EXTRA_FIELDS | ||||
|                for dolibarr_attr in extra_field.split('::')[0].split('|')): | ||||
|         return | ||||
|  | ||||
|     user_id = dolibarr_user['id'] | ||||
|     dolibarr_groups: list[dict] = dolibarr_client.get_user_groups_uid(user_id) | ||||
|     if not isinstance(dolibarr_groups, list): | ||||
|         dolibarr_groups = [] | ||||
|     if oldgroupid: | ||||
|         dolibarr_groups = [group for group in dolibarr_groups if group['id'] != oldgroupid] | ||||
|     if newgroupid: | ||||
|         dolibarr_groups.append(dolibarr_client.call_get_api('users/groups', newgroupid)) | ||||
|     if new_group: | ||||
|         dolibarr_groups = [group for group in dolibarr_groups if group['id'] != new_group['id']] + [new_group] | ||||
|  | ||||
|     if not dolibarr_groups: | ||||
|         return | ||||
|  | ||||
|     for extra_field in config.LDAP_USERS_EXTRA_FIELDS: | ||||
|         dolibarr_attrs, ldap_attr = extra_field.split('::') | ||||
|         dolibarr_attrs = dolibarr_attrs.split('+') | ||||
|         values = [] | ||||
|         if not dolibarr_attrs[0].startswith('GROUP__'): | ||||
|             continue | ||||
|         for dolibarr_attr in dolibarr_attrs: | ||||
|             dolibarr_attr = dolibarr_attr[7:] | ||||
|             for dolibarr_group in dolibarr_groups: | ||||
|                 if dolibarr_attr.endswith('[]'): | ||||
|                     attr = dolibarr_attr[:-2] | ||||
|                     value = dolibarr_group.get(attr, None) or dolibarr_group['array_options'].get(f'options_{attr}', None) | ||||
|                     value = value.split() if value else [] | ||||
|                     values += value | ||||
|                 else: | ||||
|                     value = dolibarr_group.get(dolibarr_attr, None) or dolibarr_group['array_options'].get(f'options_{dolibarr_attr}', None) | ||||
|                     if value: | ||||
|                         values.append(value) | ||||
|                         break  # Don't concatenate the value for multiple groups | ||||
|         if ldap_attr.endswith('[]'): | ||||
|             ldap_attr = ldap_attr[:-2] | ||||
|             value = values | ||||
|         else: | ||||
|             value = "".join(map(str, values)) | ||||
|         setattr(ldap_user, ldap_attr, value) | ||||
|  | ||||
|  | ||||
| def manage_groups_extra_fields(ldap_conn: Connection, dolibarr_client: Dolibarrpy): | ||||
| @@ -112,15 +182,28 @@ def append_extra_fields_to_ldap_group(ldap_group: WritableEntry, dolibarr_group: | ||||
|             ldap_group.objectClass += extra_object_class | ||||
|  | ||||
|     for extra_field in config.LDAP_GROUPS_EXTRA_FIELDS: | ||||
|         dolibarr_attr, ldap_attr = extra_field.split(':') | ||||
|         if dolibarr_attr.endswith('[]'): | ||||
|             dolibarr_attr = dolibarr_attr[:-2] | ||||
|             value = dolibarr_group['array_options'][f'options_{dolibarr_attr}'] | ||||
|             value = value.split() if value else [] | ||||
|             setattr(ldap_group, ldap_attr, value) | ||||
|         dolibarr_attrs, ldap_attr = extra_field.split('::') | ||||
|         dolibarr_attrs = dolibarr_attrs.split('+') | ||||
|         values = [] | ||||
|         for dolibarr_attr in dolibarr_attrs: | ||||
|             if dolibarr_attr.endswith('[]'): | ||||
|                 dolibarr_attr = dolibarr_attr[:-2] | ||||
|                 value = dolibarr_group.get(dolibarr_attr, None) or dolibarr_group['array_options'].get(f'options_{dolibarr_attr}', None) | ||||
|                 value = value.split() if value else [] | ||||
|                 values += value | ||||
|             else: | ||||
|                 if dolibarr_attr.startswith("'") and dolibarr_attr.endswith("'"): | ||||
|                     value = dolibarr_attr[1:-1] | ||||
|                 else: | ||||
|                     value = dolibarr_group.get(dolibarr_attr, None) or dolibarr_group['array_options'].get(f'options_{dolibarr_attr}', None) | ||||
|                 if value: | ||||
|                     values.append(value) | ||||
|         if ldap_attr.endswith('[]'): | ||||
|             ldap_attr = ldap_attr[:-2] | ||||
|             value = values | ||||
|         else: | ||||
|             value = dolibarr_group['array_options'][f'options_{dolibarr_attr}'] or "" | ||||
|             setattr(ldap_group, ldap_attr, value) | ||||
|             value = "".join(map(str, values)) | ||||
|         setattr(ldap_group, ldap_attr, value) | ||||
|  | ||||
|  | ||||
| flask_app = Flask(__name__) | ||||
| @@ -132,14 +215,27 @@ def webhook_receiver(): | ||||
|         abort(400) | ||||
|     triggercode = data['triggercode'] | ||||
|     obj = data['object'] | ||||
|     if config.DOLIBARR_API_DEBUG: | ||||
|         print("Received webhook trigger of type", triggercode, "with content:") | ||||
|         print(json.dumps(obj, indent=4)) | ||||
|     dolibarr_client = Dolibarrpy(url=config.DOLIBARR_API_BASE, token=config.DOLIBARR_API_TOKEN, timeout=16, debug=config.DOLIBARR_API_DEBUG) | ||||
|     ldap_server = Server(config.LDAP_HOST, config.LDAP_PORT, get_info=ALL) | ||||
|     if triggercode.startswith('USER_'): | ||||
|         oldgid, newgid = None, None | ||||
|         if 'context' in obj and obj['context']: | ||||
|             audit = obj['context']['audit'] | ||||
|             if audit == "UserSetInGroup": | ||||
|                 newgid = obj['context']['newgroupid'] | ||||
|             elif audit == "UserRemovedFromGroup": | ||||
|                 oldgid = obj['context']['oldgroupid'] | ||||
|         with Connection(ldap_server, config.LDAP_BIND_USER, config.LDAP_BIND_PASSWORD) as ldap_conn: | ||||
|             manage_user_extra_fields(ldap_conn, obj) | ||||
|             manage_user_extra_fields(ldap_conn, obj, dolibarr_client, oldgroupid=oldgid, newgroupid=newgid) | ||||
|     elif triggercode.startswith('USERGROUP_'): | ||||
|         with Connection(ldap_server, config.LDAP_BIND_USER, config.LDAP_BIND_PASSWORD) as ldap_conn: | ||||
|             manage_group_extra_fields(ldap_conn, obj) | ||||
|             group_members = obj['members'] or dict() | ||||
|             for group_member in group_members.values(): | ||||
|                 manage_user_extra_fields(ldap_conn, group_member, dolibarr_client, manage_user_attrs=False, new_group=obj) | ||||
|     else: | ||||
|         abort(400) | ||||
|     return "", 204 | ||||
|   | ||||
		Reference in New Issue
	
	Block a user