import argparse import unicodedata import sys from typing import Optional import numpy as np SHINGLE_SIZE = 5 # Known as k def parse_args(argv: dict = None) -> argparse.Namespace: """ Parse arguments from the command line using argparse. This returns an Argparse namespace with the parsed arguments. Raises an error if an argument is invalid. --help option is implicitly added. """ if argv is None: argv = sys.argv parser = argparse.ArgumentParser(description='Document similarity') # Input document option. Can be whatever file descriptor, including standard input. parser.add_argument('input', nargs='?', type=argparse.FileType('r'), help='Documents to read.', default=sys.stdin) # Give similarity threshold. parser.add_argument('similarity', nargs='?', type=float, help='Similarity threshold.', default=0.05) # Optional. Give statistics about true and false positive/negative rates, which take some time. parser.add_argument('--stats', '-s', action='store_true', help='Display some statistics.') # Optional. Let to display a progress bar while generating and applying permutations, # which is the most expensive state. parser.add_argument('--progress', '-p', '--tqdm', action='store_true', help='Display progress bar while calculating signature matrix.') return parser.parse_args(argv[1:]) def normalize(doc: str) -> str: """ Remove accents from letters, remove non-ascii letters, keep only letters and digits. For instance, "I l0ve Pokémons & co." gives "il0vepokemonsco". """ return ''.join(char for char in unicodedata.normalize( 'NFKD', doc.casefold().replace('æ', 'ae').replace('œ', 'oe')) if unicodedata.category(char) in ['Lu', 'Ll', 'Nd'] ).casefold().encode('ascii', 'ignore').decode('ascii') def compute_shingles(docs: list[str], single_size: int) -> np.ndarray: """ Transform a list of documents into a shingle matrix. This takes as input a list of documents (strings) that are well-formated (without any special character), and the length of the shingles. It outputs a Numpy boolean matrix where M(i, j) = True states that the shingle i appears in the document j. Since we don't know first the shingles count, we extend regularly the size of the matrix, without generating an overflow. """ # Initialize the shingle matrix with 2 shingles shingle_matrix = np.zeros((2, len(docs)), dtype=bool) shingle_id = {} for doc_id, doc in enumerate(docs): # Compute different shingles for a single document char_shing = [doc[i:i + single_size] for i in range(len(doc) - single_size + 1)] for sh in char_shing: # The given shingle is unknown. Register it if sh not in shingle_id: shingle_id[sh] = len(shingle_id) if shingle_id[sh] >= len(shingle_matrix): # Matrix is too slow, so we double its size shingle_matrix = np.append(shingle_matrix, np.zeros(shingle_matrix.shape, dtype=bool), axis=0) # Store the information that the shingle is in the document shingle_matrix[shingle_id[sh], doc_id] = True # Reduce matrix size to useful content shingle_matrix = shingle_matrix[:len(shingle_id)] return shingle_matrix def compute_optimal_matrix_size(threshold: float) -> tuple[int, int]: """ Compute bands and rows number for the signature matrix such that these values let an approximation of the similarity of two documents, using LSH. Recall that the threshold for a signature matrix with b bands of r rows is given by t = (1 / b) ** (1 / r). We want that this value is lower than the expected threshold to avoid true negatives, but we want that this value stay lear the expected value since we also want to avoid false positives. Then, we ensure that the estimated threshold is between 2/3*threshold and threshold. To achieve that, we start from some values, then we add bands if the threshold is too high, or add some rows per band if it is too high. """ # Compute b and r such that s/2 < t < s # Use at least 2 rows and 16 bands to have good values rows = 2 bands = 16 est_threshold = (1 / bands) ** (1 / rows) # Threshold is not acceptable while not (2 * threshold / 3 < est_threshold < threshold): # Add bands if est_threshold >= threshold: bands *= 2 # Add rows else: rows *= 2 est_threshold = (1 / bands) ** (1 / rows) # Estimated threshold is now near required threshold return bands, rows def compute_signature_matrix(shingles: np.ndarray, permutations_count: int, display_tqdm: bool = False) -> np.ndarray: """ Implementation of the min-hash algorithm. We compute a signature matrix of shingles generated by random permutations. The shingles parameters stands for the shingle boolean matrix (shingle x document) where shingles[i, j] = True states that shingle i appears in document j. The permutations_count argument indicates the number of random permutations to generate. The output is the signature matrix, that has for dimensions (permutations_count x docs_count). For each permutation, we generate it randomly, then we take the first shingle of the document. While the permutation generation can be done quickly, the check of the first shingle of a document in a permutation (which can be achieved with an argmax in a boolean row) may be quite expensive, and take some time. If supported and option enabled, a progress bar can be displayed. """ shingles_count, docs_count = shingles.shape # Initialize matrix signature_matrix = np.inf * np.ones((permutations_count, docs_count)) permutations_iterator = range(permutations_count) # If supported, load tqdm to display the progress bar if display_tqdm: try: from tqdm import tqdm permutations_iterator = tqdm(permutations_iterator, unit="perm.") except ImportError: print("tqdm is not installed. Please install tqdm before using --tqdm option.") for permutation_id in permutations_iterator: # Generate random permutation of shingles # This is not the most expensive task permutation = np.random.permutation(shingles) # For each document, get the smallest shingle after permutation # This is the expensive operation signature_matrix[permutation_id] = permutation.argmax(0) return signature_matrix def find_candidate_pairs(signature: np.ndarray, bands: int, rows: int) -> set[tuple[int, int]]: """ Implementation of the LSH algorithm. Given a signature matrix and band and rows per band numbers, we want to find some candidate document pairs to be similar. We already know that the probability that two documents have the same signature is the same as their similarity. Two documents are called are a candidate pair if they have the same signature on all rows of at least one band. The output is a set of pairs of document ids. """ _, docs_count = signature.shape candidate_pairs = set() for band_id in range(bands): # Get interesting band band = signature[band_id * rows:(band_id + 1) * rows] buckets = {} # Put documents into buckets # A bucket is the tuple of all signatures of a row for doc in range(docs_count): sign_doc = tuple(band[:, doc]) buckets.setdefault(sign_doc, set()) buckets[sign_doc].add(doc) # Find documents in the same bucket for bucket in buckets.values(): for doc_a in bucket: for doc_b in bucket: if doc_a != doc_b: # Sort documents for nice output doc_a, doc_b = min(doc_a, doc_b), max(doc_a, doc_b) candidate_pairs.add((doc_a, doc_b)) return candidate_pairs def jaccard_similarity(doc1: set, doc2: set) -> float: """ Compute jaccard similarity of two sets. This is defined as 0 if both sets are empty, |A ∩ B| / |A ∪ B| if general cases. """ if not doc1 or not doc2: return 0.0 inter = doc1.intersection(doc2) union = doc1.union(doc2) return len(inter) / len(union) def parse(stream, similarity: float, *, stats: bool = False, display_tqdm: bool = False) \ -> Optional[tuple[int, int, int, int]]: """ Given a stream of documents (separated by line feeds) and a similarity threshold, we display in standard output an estimation of document pairs that have a Jaccard similarity higher than the requested threshold. We use k-shringling, MinHash and LSH to compute the estimation. """ docs = [line.rstrip('\n') for line in stream] # Read stream docs = [normalize(doc) for doc in docs] # Remove special characters and normalize accents # Compute k-shingles shingles = compute_shingles(docs, SHINGLE_SIZE) # Compute best values for permutations count bands, rows = compute_optimal_matrix_size(similarity) # Compute signature matrix using MinHash signature = compute_signature_matrix(shingles, bands * rows, display_tqdm) # Guess candidate pairs using LSH candidate_pairs = find_candidate_pairs(signature, bands, rows) # Sort pairs for a nice output candidate_pairs = sorted(candidate_pairs) # Compute true and false positive counts tp = 0 fp = 0 # For each document pair, compute true Jaccard similarity and display it shingles_set = [set(x for x in range(len(shingles)) if shingles[x, doc]) for doc in range(len(docs))] for doc_a, doc_b in candidate_pairs: # Compute true jaccard similarity shingles_a = shingles_set[doc_a] shingles_b = shingles_set[doc_b] d = jaccard_similarity(shingles_a, shingles_b) if d >= similarity: print(f"{doc_a} {doc_b} {d:.06f}") tp += 1 else: fp += 1 if stats: # Compute true and false negative counts, for validation only tn = 0 fn = 0 for doc_a in range(len(docs)): for doc_b in range(doc_a + 1, len(docs)): # Compute true jaccard similarity shingles_a = shingles_set[doc_a] shingles_b = shingles_set[doc_b] d = jaccard_similarity(shingles_a, shingles_b) if d >= similarity and (doc_a, doc_b) not in candidate_pairs: fn += 1 elif d < similarity and (doc_a, doc_b) not in candidate_pairs: tn += 1 fp_rate = fp / (fp + tn) tp_rate = tp / (tp + fn) return tp, fp, tn, fn def main(): # Parse arguments from command line ns = parse_args() if not (0 < ns.similarity <= 1): raise ValueError(f"Invalid similiarity value: {ns.similarity}") # Analyse documents output = parse(ns.input, ns.similarity, stats=ns.stats, display_tqdm=ns.progress) if ns.stats: tp, fp, tn, fn = output print(f"True positive: {tp}", file=sys.stderr) print(f"False positive: {tn}", file=sys.stderr) print(f"True negative: {fp}", file=sys.stderr) print(f"False negative: {fn}", file=sys.stderr) tp_rate = tp / (tp + fn) fp_rate = fp / (fp + tn) print(f"True positive rate: {tp_rate:.06f}", file=sys.stderr) print(f"False positive rate: {fp_rate:.06f}", file=sys.stderr)