#!/usr/bin/env python3 import argparse import itertools import json import tempfile from dataclasses import dataclass from SPARQLWrapper import SPARQLWrapper from bs4 import BeautifulSoup import re import subprocess from functools import lru_cache @dataclass class Property: url: str range: str | None = None domain: str | None = None @staticmethod @lru_cache(maxsize=None) def from_url(url: str) -> "Property": """ The property is only defined by its url and is then cached. """ return Property(url) @property def name(self) -> str: return self.url.split('/')[-1] if self.range else "" @property def range_name(self) -> str: return self.range.split('/')[-1] if self.range else "" @dataclass class Graph: node: str children: list[tuple[Property, "Graph"]] def to_tikz(self, prefix: str = "0", x: float = 0, depth: int = 0, header: bool = False) -> str: tikz = "" if header: tikz = "\\begin{center}\n" tikz += "\\begin{tikzpicture}[y=3cm]\n" tikz += f"\\node[draw] ({prefix}) at ({x:.02f}, -{depth}) {{{self.node}}};\n" for j, child in enumerate(self.children): rel, subgraph = child tikz += subgraph.to_tikz(f"{prefix}-{j}", 5 * j / (depth + 1), depth + 1) tikz += f"\\draw[->] ({prefix}) -- ({prefix}-{j}) node[midway,above,sloped] {{{rel.name}}};\n" if header: tikz += "\\end{tikzpicture}\n" tikz += "\\end{center}\n" return tikz def parse_arguments() -> tuple[str, int, int, int]: """ Parse the command line arguments. """ parser = argparse.ArgumentParser() parser.add_argument('classname', type=str, help="Class name as input from DBPedia") parser.add_argument('threshold', type=int, help="Exceptions threshold") parser.add_argument('--limit', '-l', type=int, default=1000, help="Limit the number of queried results") parser.add_argument('--recursion', '-r', type=int, default=2, help="Max height of keys") args = parser.parse_args() return args.classname, args.threshold, args.limit, args.recursion def query(classname: str, filename: str, limit: int = 1000) -> None: """ Query DBPedia using SPARQL for values of the given class. """ sparql = SPARQLWrapper(endpoint='http://dbpedia.org/sparql') sparql.setQuery(f"""prefix db-owl: SELECT DISTINCT ?x ?r ?y WHERE {{ ?x rdf:type db-owl:{classname} . ?x ?r ?y . }} LIMIT {limit}""") sparql.setReturnFormat('tsv') results = sparql.query().convert().decode('UTF-8') with open(filename, 'w') as f: for i, line in enumerate(results.split('\n')): if i == 0: continue if '\t' in line: x, r, y = line.split('\t') x, r, y = x[1:-1], r[1:-1], y[1:-1] f.write(f'<{x}> <{r}> <{y}> .\n') def sakey_output(file: str, threshold: int) -> tuple[list[list[str]], list[list[str]]]: """ Prints the SAKEY output for the given dataset file. """ process = subprocess.Popen(['java', '-jar', 'sakey.jar', file, str(threshold)], stdout=subprocess.PIPE) process.wait() if process.returncode != 0: # There was an error, that was printed in stderr. exit(process.returncode) with process.stdout as stdout: # Read the output from the process. content = stdout.read().decode('UTF-8') # Parse the output. non_keys, _, almost_keys, _ = content.split('\n') # Remove header non_keys = non_keys.split(':', 1)[1][1:] almost_keys = almost_keys.split(':', 1)[1] # Quote URLs non_keys = re.sub('(\w)]', '\g<1>"]', re.sub('(\w),', '\g<1>",', re.sub('http', '"http', non_keys))) almost_keys = re.sub('(\w)]', '\g<1>"]', re.sub('(\w),', '\g<1>",', re.sub('http', '"http', almost_keys))) # Parse JSON non_keys = json.loads(non_keys) almost_keys = json.loads(almost_keys) return non_keys, almost_keys def find_interesting_keys(keys: list[list[str]]) -> list[list[Property]]: """ Parse keys to a better structure. """ interesting_keys = [] for key in keys: # A key is a list of properties. # We transform this key into a simple graph key # Then we try to find a range and a domain for each property. for prop in key: prop = Property.from_url(prop) if prop.range: # This property can be extended. interesting_keys.append([Property.from_url(url) for url in key]) break return interesting_keys def process(classname: str, threshold: int, limit: int = 1000, recursion: int = 2) -> list[Graph]: filename = tempfile.NamedTemporaryFile().name query(classname, filename, limit) # Parse output from SAKey non_keys, almost_keys = sakey_output(filename, threshold) i_keys = find_interesting_keys(almost_keys) graph_keys = [Graph(classname, [])] if recursion <= 1: for key in i_keys: graph_keys.append(Graph(classname, [(p, Graph(p.range_name, [])) for p in key])) else: for key in i_keys: subgraphs_list = [] for p in key: subgraphs = process(p.range_name, threshold, limit, recursion - 1) subgraphs_list.append(subgraphs) for subgraphs_array in itertools.product(*subgraphs_list): graph_keys.append(Graph(classname, [(p, g) for p, g in zip(key, subgraphs_array)])) return graph_keys def main() -> None: # Parse DBPedia ontology with open('dbpedia.owl') as f: ontology = BeautifulSoup(f, 'xml') for e in ontology.find_all('ns0:range'): url = e.parent.get('rdf:about') p = Property.from_url(url) resource = e.get('rdf:resource') if '#' not in resource: p.range = resource for e in ontology.find_all('ns0:domain'): url = e.parent.get('rdf:about') p = Property.from_url(url) resource = e.get('rdf:resource') if '#' not in resource: p.domain = resource # Read arguments classname, threshold, limit, recursion = parse_arguments() # Process this class name graph_keys = process(classname, threshold, limit, recursion) for graph in graph_keys: print(graph.to_tikz(header=True)) print("\n") if __name__ == '__main__': main()