207 lines
9.1 KiB
Python
207 lines
9.1 KiB
Python
"""
|
|
To use this lookup plugin, you need to pass ldap:
|
|
ssh -L 1636:172.16.10.1:636 172.16.10.1
|
|
"""
|
|
|
|
import ipaddress
|
|
|
|
from ansible.errors import AnsibleError, AnsibleParserError
|
|
from ansible.plugins.lookup import LookupBase
|
|
from ansible.utils.display import Display
|
|
|
|
try:
|
|
import ldap
|
|
except ImportError:
|
|
raise AnsibleError("You need to install python3-ldap")
|
|
|
|
display = Display()
|
|
|
|
def decode_object(object):
|
|
return {attribute: [value.decode('utf-8') for value in object[attribute]] for attribute in object}
|
|
|
|
class LookupModule(LookupBase):
|
|
|
|
def __init__(self, **kwargs):
|
|
self.base = ldap.initialize('ldaps://localhost:1636/')
|
|
self.base.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_ALLOW)
|
|
self.base.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
|
|
self.base_dn = 'dc=ynerant,dc=fr'
|
|
|
|
def query(self, base, scope, filter='(objectClass=*)', attr=None):
|
|
"""
|
|
Make a LDAP query
|
|
query('ldap', 'query', BASE, SCOPE[, FILTER[, ATTR]])
|
|
BASE: base dn
|
|
SCOPE: 'base', 'one' or 'sub'
|
|
FILTER: ldap filter (optional)
|
|
ATTR: list of attributes (optional)
|
|
"""
|
|
scope = { 'base': ldap.SCOPE_BASE, 'one': ldap.SCOPE_ONELEVEL, 'sub': ldap.SCOPE_SUBTREE }[scope]
|
|
query_id = self.base.search(f"{base}", scope, filter, attr)
|
|
result = self.base.result(query_id)[1]
|
|
result = { dn: decode_object(entry) for dn, entry in result }
|
|
return result
|
|
|
|
def ip(self, host, vlan):
|
|
"""
|
|
Retrieve IP addresses of an interface of a device
|
|
query('ldap', 'ip', HOST, VLAN)
|
|
"""
|
|
if isinstance(vlan, int):
|
|
network_query_id = self.base.search(f"ou=networks,{self.base_dn}", ldap.SCOPE_ONELEVEL, f"description={vlan}")
|
|
network_result = self.base.result(network_query_id)
|
|
vlan = network_result[1][0][1]['cn'][0].decode('utf-8')
|
|
if vlan == 'adh':
|
|
query_id = self.base.search(f"cn={host}.ynerant.fr,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE)
|
|
else:
|
|
query_id = self.base.search(f"cn={host}.{vlan}.ynerant.fr,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE)
|
|
result = self.base.result(query_id)
|
|
result = result[1][0][1]
|
|
result = [res.decode('utf-8') for res in result['ipHostNumber']]
|
|
return result
|
|
|
|
def all_ip(self, host):
|
|
"""
|
|
Retrieve all IP addresses of a device
|
|
query('ldap', 'all_ip', HOST)
|
|
"""
|
|
interfaces_query_id = self.base.search(f"cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_ONELEVEL)
|
|
interfaces_result = self.base.result(interfaces_query_id)
|
|
result = []
|
|
for dn, interface in interfaces_result[1]:
|
|
for ip in interface['ipHostNumber']:
|
|
result.append(ip.decode('utf-8'))
|
|
return result
|
|
|
|
def cn(self, host, vlan):
|
|
"""
|
|
Retrieve aliases of an interface of a device
|
|
query('ldap', 'cn', HOST, VLAN)
|
|
"""
|
|
if isinstance(vlan, int):
|
|
network_query_id = self.base.search(f"ou=networks,{self.base_dn}", ldap.SCOPE_ONELEVEL, f"description={vlan}")
|
|
network_result = self.base.result(network_query_id)
|
|
vlan = network_result[1][0][1]['cn'][0].decode('utf-8')
|
|
if vlan == 'adh':
|
|
query_id = self.base.search(f"cn={host}.ynerant.fr,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE)
|
|
else:
|
|
query_id = self.base.search(f"cn={host}.{vlan}.ynerant.fr,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE)
|
|
result = self.base.result(query_id)
|
|
result = result[1][0][1]
|
|
result = [res.decode('utf-8') for res in result['cn']]
|
|
return result
|
|
|
|
def all_cn(self, host):
|
|
"""
|
|
Retrieve all aliases addresses of a device
|
|
query('ldap', 'all_cn', HOST)
|
|
"""
|
|
interfaces_query_id = self.base.search(f"cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_ONELEVEL)
|
|
interfaces_result = self.base.result(interfaces_query_id)
|
|
result = []
|
|
for dn, interface in interfaces_result[1]:
|
|
for cn in interface['cn']:
|
|
result.append(cn.decode('utf-8'))
|
|
return result
|
|
|
|
def ssh_keys(self, host):
|
|
"""
|
|
Retrieve SSH keys of a host
|
|
query('ldap', 'ssh_keys', HOST)
|
|
"""
|
|
host_query_id = self.base.search(f"cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE)
|
|
host_result = self.base.result(host_query_id)[1][0][1]
|
|
result = []
|
|
if 'description' not in host_result:
|
|
return result
|
|
for description in host_result['description']:
|
|
description = description.decode('utf-8')
|
|
key, value = description.split(':', 1)
|
|
if key in {'ecdsa-sha2-nistp256', 'ssh-ed25519', 'ssh-dss', 'ssh-rsa'}:
|
|
result.append(f'{key} {value}')
|
|
return result
|
|
|
|
def subnet_ipv4(self, subnet):
|
|
"""
|
|
Retrieve used IP addresses on a subnet
|
|
query('ldap', 'subnet_ipv4', SUBNET)
|
|
"""
|
|
network_query_id = self.base.search(f"cn={subnet},ou=networks,{self.base_dn}", ldap.SCOPE_BASE)
|
|
network_result = self.base.result(network_query_id)
|
|
network = network_result[1][0][1]
|
|
network, hostmask = network['ipNetworkNumber'][0].decode('utf-8'), network['ipNetmaskNumber'][0].decode('utf-8')
|
|
subnet = ipaddress.IPv4Network(f"{network}/{hostmask}")
|
|
query_id = self.base.search(f"ou=hosts,{self.base_dn}", ldap.SCOPE_SUBTREE, "objectClass=ipHost")
|
|
result = self.base.result(query_id)
|
|
result = [ip.decode('utf-8') for dn, entry in result[1] for ip in entry['ipHostNumber'] if ipaddress.ip_address(ip.decode('utf-8')) in subnet]
|
|
return result
|
|
|
|
def run(self, terms, variables=None, **kwargs):
|
|
if terms[0] == 'query':
|
|
result = self.query(*terms[1:])
|
|
elif terms[0] == 'ip':
|
|
result = self.ip(*terms[1:])
|
|
elif terms[0] == 'all_ip':
|
|
result = self.all_ip(*terms[1:])
|
|
elif terms[0] == 'cn':
|
|
result = self.cn(*terms[1:])
|
|
elif terms[0] == 'all_cn':
|
|
result = self.all_cn(*terms[1:])
|
|
elif terms[0] == 'subnet_ipv4':
|
|
result = self.subnet_ipv4(*terms[1:])
|
|
elif terms[0] == 'ssh_keys':
|
|
result = self.ssh_keys(*terms[1:])
|
|
elif terms[0] == 'group':
|
|
query_id = self.base.search(f"ou=group,{self.base_dn}", ldap.SCOPE_SUBTREE, "objectClass=posixGroup")
|
|
result = self.base.result(query_id)
|
|
result = result[1]
|
|
# query interface attribute
|
|
# query('ldap', 'hosts', HOST, VLAN, ATTR)
|
|
# HOST: device name
|
|
# VLAN: vlan name
|
|
# ATTR: attribute
|
|
elif terms[0] == 'hosts':
|
|
host = terms[1]
|
|
vlan = terms[2]
|
|
attr = terms[3]
|
|
if isinstance(vlan, int):
|
|
network_query_id = self.base.search(f"ou=networks,{self.base_dn}", ldap.SCOPE_ONELEVEL, f"description={vlan}")
|
|
network_result = self.base.result(network_query_id)
|
|
vlan = network_result[1][0][1]['cn'][0].decode('utf-8')
|
|
if vlan == 'adh':
|
|
query_id = self.base.search(f"cn={host}.ynerant.fr,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE)
|
|
else:
|
|
query_id = self.base.search(f"cn={host}.{vlan}.ynerant.fr,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE)
|
|
result = self.base.result(query_id)
|
|
result = result[1][0][1]
|
|
result = [res.decode('utf-8') for res in result[attr]]
|
|
elif terms[0] == 'network':
|
|
network = terms[1]
|
|
query_id = self.base.search(f"cn={network},ou=networks,{self.base_dn}", ldap.SCOPE_BASE, "objectClass=ipNetwork")
|
|
result = self.base.result(query_id)
|
|
result = result[1][0][1]
|
|
return str(ipaddress.ip_network('{}/{}'.format(result['ipNetworkNumber'][0].decode('utf-8'), result['ipNetmaskNumber'][0].decode('utf-8'))))
|
|
elif terms[0] == 'zones':
|
|
query_id = self.base.search(f"ou=networks,{self.base_dn}", ldap.SCOPE_ONELEVEL, "objectClass=ipNetwork")
|
|
result = self.base.result(query_id)
|
|
res = []
|
|
for _, network in result[1]:
|
|
network = network['cn'][0].decode('utf-8')
|
|
if network == 'adh':
|
|
res.append('ynerant.fr')
|
|
else:
|
|
res.append(f"{network}.ynerant.fr")
|
|
result = res
|
|
elif terms[0] == 'vlanid':
|
|
network = terms[1]
|
|
query_id = self.base.search(f"cn={network},ou=networks,{self.base_dn}", ldap.SCOPE_BASE, "objectClass=ipNetwork")
|
|
result = self.base.result(query_id)
|
|
result = result[1][0][1]
|
|
return int(result['description'][0])
|
|
elif terms[0] == 'role':
|
|
role = terms[1]
|
|
query_id = self.base.search(f"ou=hosts,{self.base_dn}", ldap.SCOPE_ONELEVEL, f"description=role:{role}")
|
|
result = self.base.result(query_id)
|
|
result = [cn.decode('utf-8') for res in result[1] for cn in res[1]['cn']]
|
|
return result
|