""" To use this lookup plugin, you need to pass ldap: ssh -L 1636:172.16.10.1:636 172.16.10.1 """ import ipaddress from ansible.errors import AnsibleError, AnsibleParserError from ansible.plugins.lookup import LookupBase from ansible.utils.display import Display try: import ldap except ImportError: raise AnsibleError("You need to install python3-ldap") display = Display() def decode_object(object): return {attribute: [value.decode('utf-8') for value in object[attribute]] for attribute in object} class LookupModule(LookupBase): def __init__(self, **kwargs): self.base = ldap.initialize('ldaps://localhost:1636/') self.base.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_ALLOW) self.base.set_option(ldap.OPT_X_TLS_NEWCTX, 0) self.base_dn = 'dc=ynerant,dc=fr' def query(self, base, scope, filter='(objectClass=*)', attr=None): """ Make a LDAP query query('ldap', 'query', BASE, SCOPE[, FILTER[, ATTR]]) BASE: base dn SCOPE: 'base', 'one' or 'sub' FILTER: ldap filter (optional) ATTR: list of attributes (optional) """ scope = { 'base': ldap.SCOPE_BASE, 'one': ldap.SCOPE_ONELEVEL, 'sub': ldap.SCOPE_SUBTREE }[scope] query_id = self.base.search(f"{base}", scope, filter, attr) result = self.base.result(query_id)[1] result = { dn: decode_object(entry) for dn, entry in result } return result def ip(self, host, vlan): """ Retrieve IP addresses of an interface of a device query('ldap', 'ip', HOST, VLAN) """ if isinstance(vlan, int): network_query_id = self.base.search(f"ou=networks,{self.base_dn}", ldap.SCOPE_ONELEVEL, f"description={vlan}") network_result = self.base.result(network_query_id) vlan = network_result[1][0][1]['cn'][0].decode('utf-8') if vlan == 'adh': query_id = self.base.search(f"cn={host}.ynerant.fr,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE) else: query_id = self.base.search(f"cn={host}.{vlan}.ynerant.fr,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE) result = self.base.result(query_id) result = result[1][0][1] result = [res.decode('utf-8') for res in result['ipHostNumber']] return result def all_ip(self, host): """ Retrieve all IP addresses of a device query('ldap', 'all_ip', HOST) """ interfaces_query_id = self.base.search(f"cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_ONELEVEL) interfaces_result = self.base.result(interfaces_query_id) result = [] for dn, interface in interfaces_result[1]: for ip in interface['ipHostNumber']: result.append(ip.decode('utf-8')) return result def cn(self, host, vlan): """ Retrieve aliases of an interface of a device query('ldap', 'cn', HOST, VLAN) """ if isinstance(vlan, int): network_query_id = self.base.search(f"ou=networks,{self.base_dn}", ldap.SCOPE_ONELEVEL, f"description={vlan}") network_result = self.base.result(network_query_id) vlan = network_result[1][0][1]['cn'][0].decode('utf-8') if vlan == 'adh': query_id = self.base.search(f"cn={host}.ynerant.fr,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE) else: query_id = self.base.search(f"cn={host}.{vlan}.ynerant.fr,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE) result = self.base.result(query_id) result = result[1][0][1] result = [res.decode('utf-8') for res in result['cn']] return result def all_cn(self, host): """ Retrieve all aliases addresses of a device query('ldap', 'all_cn', HOST) """ interfaces_query_id = self.base.search(f"cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_ONELEVEL) interfaces_result = self.base.result(interfaces_query_id) result = [] for dn, interface in interfaces_result[1]: for cn in interface['cn']: result.append(cn.decode('utf-8')) return result def ssh_keys(self, host): """ Retrieve SSH keys of a host query('ldap', 'ssh_keys', HOST) """ host_query_id = self.base.search(f"cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE) host_result = self.base.result(host_query_id)[1][0][1] result = [] if 'description' not in host_result: return result for description in host_result['description']: description = description.decode('utf-8') key, value = description.split(':', 1) if key in {'ecdsa-sha2-nistp256', 'ssh-ed25519', 'ssh-dss', 'ssh-rsa'}: result.append(f'{key} {value}') return result def subnet_ipv4(self, subnet): """ Retrieve used IP addresses on a subnet query('ldap', 'subnet_ipv4', SUBNET) """ network_query_id = self.base.search(f"cn={subnet},ou=networks,{self.base_dn}", ldap.SCOPE_BASE) network_result = self.base.result(network_query_id) network = network_result[1][0][1] network, hostmask = network['ipNetworkNumber'][0].decode('utf-8'), network['ipNetmaskNumber'][0].decode('utf-8') subnet = ipaddress.IPv4Network(f"{network}/{hostmask}") query_id = self.base.search(f"ou=hosts,{self.base_dn}", ldap.SCOPE_SUBTREE, "objectClass=ipHost") result = self.base.result(query_id) result = [ip.decode('utf-8') for dn, entry in result[1] for ip in entry['ipHostNumber'] if ipaddress.ip_address(ip.decode('utf-8')) in subnet] return result def run(self, terms, variables=None, **kwargs): if terms[0] == 'query': result = self.query(*terms[1:]) elif terms[0] == 'ip': result = self.ip(*terms[1:]) elif terms[0] == 'all_ip': result = self.all_ip(*terms[1:]) elif terms[0] == 'cn': result = self.cn(*terms[1:]) elif terms[0] == 'all_cn': result = self.all_cn(*terms[1:]) elif terms[0] == 'subnet_ipv4': result = self.subnet_ipv4(*terms[1:]) elif terms[0] == 'ssh_keys': result = self.ssh_keys(*terms[1:]) elif terms[0] == 'group': query_id = self.base.search(f"ou=group,{self.base_dn}", ldap.SCOPE_SUBTREE, "objectClass=posixGroup") result = self.base.result(query_id) result = result[1] # query interface attribute # query('ldap', 'hosts', HOST, VLAN, ATTR) # HOST: device name # VLAN: vlan name # ATTR: attribute elif terms[0] == 'hosts': host = terms[1] vlan = terms[2] attr = terms[3] if isinstance(vlan, int): network_query_id = self.base.search(f"ou=networks,{self.base_dn}", ldap.SCOPE_ONELEVEL, f"description={vlan}") network_result = self.base.result(network_query_id) vlan = network_result[1][0][1]['cn'][0].decode('utf-8') if vlan == 'adh': query_id = self.base.search(f"cn={host}.ynerant.fr,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE) else: query_id = self.base.search(f"cn={host}.{vlan}.ynerant.fr,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE) result = self.base.result(query_id) result = result[1][0][1] result = [res.decode('utf-8') for res in result[attr]] elif terms[0] == 'network': network = terms[1] query_id = self.base.search(f"cn={network},ou=networks,{self.base_dn}", ldap.SCOPE_BASE, "objectClass=ipNetwork") result = self.base.result(query_id) result = result[1][0][1] return str(ipaddress.ip_network('{}/{}'.format(result['ipNetworkNumber'][0].decode('utf-8'), result['ipNetmaskNumber'][0].decode('utf-8')))) elif terms[0] == 'zones': query_id = self.base.search(f"ou=networks,{self.base_dn}", ldap.SCOPE_ONELEVEL, "objectClass=ipNetwork") result = self.base.result(query_id) res = [] for _, network in result[1]: network = network['cn'][0].decode('utf-8') if network == 'adh': res.append('ynerant.fr') else: res.append(f"{network}.ynerant.fr") result = res elif terms[0] == 'vlanid': network = terms[1] query_id = self.base.search(f"cn={network},ou=networks,{self.base_dn}", ldap.SCOPE_BASE, "objectClass=ipNetwork") result = self.base.result(query_id) result = result[1][0][1] return int(result['description'][0]) elif terms[0] == 'role': role = terms[1] query_id = self.base.search(f"ou=hosts,{self.base_dn}", ldap.SCOPE_ONELEVEL, f"description=role:{role}") result = self.base.result(query_id) result = [cn.decode('utf-8') for res in result[1] for cn in res[1]['cn']] return result