import argparse import unicodedata import sys import numpy as np SHINGLE_SIZE = 5 # Known as k def parse_args(argv: dict = None) -> argparse.Namespace: if argv is None: argv = sys.argv parser = argparse.ArgumentParser(description='Exercise 1') parser.add_argument('input', nargs='?', type=argparse.FileType('r'), help='Documents to read.', default=sys.stdin) parser.add_argument('similarity', nargs='?', type=float, help='Similarity threshold.', default=0.05) parser.add_argument('--progress', '-p', '--tqdm', action='store_true', help='Display progress bar while calculating signature matrix.') return parser.parse_args(argv[1:]) def normalize(doc: str) -> str: """ Remove accents from letters, remove non-ascii letters, keep only letters and digits. """ return ''.join(char for char in unicodedata.normalize( 'NFKD', doc.casefold().replace('æ', 'ae').replace('œ', 'oe')) if unicodedata.category(char) in ['Lu', 'Ll', 'Nd'] ).casefold().encode('ascii', 'ignore').decode('ascii') def compute_shingles(docs: list[str], single_size: int) -> np.ndarray: shingle_matrix = np.zeros((2, len(docs)), dtype=bool) shingle_id = {} for doc_id, doc in enumerate(docs): char_shing = [doc[i:i + single_size] for i in range(len(doc) - single_size + 1)] for sh in char_shing: if sh not in shingle_id: shingle_id[sh] = len(shingle_id) if shingle_id[sh] >= len(shingle_matrix): # Extend matrix, double its size shingle_matrix = np.append(shingle_matrix, np.zeros(shingle_matrix.shape, dtype=bool), axis=0) shingle_matrix[shingle_id[sh], doc_id] = True shingle_matrix = shingle_matrix[:len(shingle_id)] return shingle_matrix def min_hash(doc: str, perm: list[str]) -> str: for d in perm: if d in doc: return d def compute_signature_matrix(shingles: np.ndarray, permutations_count: int, display_tqdm: bool = False) -> np.ndarray: shingles_count, docs_count = shingles.shape signature_matrix = np.inf * np.ones((permutations_count, docs_count)) permutations_iterator = range(permutations_count) if display_tqdm: try: from tqdm import tqdm permutations_iterator = tqdm(permutations_iterator) except ImportError: print("tqdm is not installed. Please install tqdm before using --tqdm option.") for permutation_id in permutations_iterator: permutation = np.random.permutation(shingles) signature_matrix[permutation_id] = permutation.argmax(0) return signature_matrix def jaccard_similarity(doc1: set, doc2: set) -> float: if not doc1 or not doc2: return 0.0 inter = doc1.intersection(doc2) union = doc1.union(doc2) return len(inter) / len(union) def parse(stream, similarity: float, display_tqdm: bool = False) -> None: docs = [line.rstrip('\n') for line in stream] docs = [normalize(doc) for doc in docs] # Remove special characters and normalize accents shingles = compute_shingles(docs, SHINGLE_SIZE) # Compute b and r such that s/2 < t < s # Use at least 2 rows and 16 bands to have good values rows = 2 bands = 16 threshold = (1 / bands) ** (1 / rows) while not (2 * similarity / 3 < threshold < similarity): if threshold >= similarity: bands *= 2 else: rows *= 2 threshold = (1 / bands) ** (1 / rows) signature = compute_signature_matrix(shingles, bands * rows, display_tqdm) candidate_pairs = set() for band_id in range(bands): band = signature[band_id * rows:(band_id + 1) * rows] buckets = {} for doc in range(len(docs)): sign_doc = tuple(band[:, doc]) buckets.setdefault(sign_doc, set()) buckets[sign_doc].add(doc) for bucket in buckets.values(): for doc_a in bucket: for doc_b in bucket: if doc_a != doc_b: doc_a, doc_b = min(doc_a, doc_b), max(doc_a, doc_b) candidate_pairs.add((doc_a, doc_b)) candidate_pairs = sorted(candidate_pairs) for doc_a, doc_b in candidate_pairs: # Compute true jaccard similarity shingles_a = set(x for x in range(len(shingles)) if shingles[x, doc_a]) shingles_b = set(x for x in range(len(shingles)) if shingles[x, doc_b]) d = jaccard_similarity(shingles_a, shingles_b) if d >= similarity: print(f"{doc_a} {doc_b} {d:.06f}") def main(): ns = parse_args() parse(ns.input, ns.similarity, ns.progress)