algo-ds/algods/algods.py

391 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import argparse
import unicodedata
import sys
from typing import Optional
from matplotlib import pyplot as plt
import numpy as np
SHINGLE_SIZE = 5 # Known as k
def parse_args(argv: dict = None) -> argparse.Namespace:
"""
Parse arguments from the command line using argparse.
This returns an Argparse namespace with the parsed arguments.
Raises an error if an argument is invalid.
--help option is implicitly added.
"""
if argv is None:
argv = sys.argv
parser = argparse.ArgumentParser(description='Document similarity')
# Input document option. Can be whatever file descriptor, including standard input.
parser.add_argument('input', nargs='?', type=argparse.FileType('r'),
help='Documents to read.', default=sys.stdin)
# Give similarity threshold.
parser.add_argument('similarity', nargs='?', type=float, help='Similarity threshold.', default=0.05)
# Optional. Give statistics about true and false positive/negative rates, which take some time.
parser.add_argument('--stats', '-s', action='store_true',
help='Display some statistics.')
# Optional. Let to display a progress bar while generating and applying permutations,
# which is the most expensive state.
parser.add_argument('--progress', '-p', '--tqdm', action='store_true',
help='Display progress bar while calculating signature matrix.')
parser.add_argument('--graph', '-g', action='store_true', help="Draw graphs.")
return parser.parse_args(argv[1:])
def normalize(doc: str) -> str:
"""
Remove accents from letters, remove non-ascii letters, keep only letters and digits.
For instance, "I l0ve Pokémons & co." gives "il0vepokemonsco".
"""
return ''.join(char for char in unicodedata.normalize(
'NFKD', doc.casefold().replace('æ', 'ae').replace('œ', 'oe'))
if unicodedata.category(char) in ['Lu', 'Ll', 'Nd']
).casefold().encode('ascii', 'ignore').decode('ascii')
def compute_shingles(docs: list[str], single_size: int) -> np.ndarray:
"""
Transform a list of documents into a shingle matrix.
This takes as input a list of documents (strings) that are well-formated (without any special character),
and the length of the shingles.
It outputs a Numpy boolean matrix where M(i, j) = True states that the shingle i appears in the document j.
Since we don't know first the shingles count, we extend regularly the size of the matrix, without
generating an overflow.
"""
# Initialize the shingle matrix with 2 shingles
shingle_matrix = np.zeros((2, len(docs)), dtype=bool)
shingle_id = {}
for doc_id, doc in enumerate(docs):
# Compute different shingles for a single document
char_shing = [doc[i:i + single_size] for i in range(len(doc) - single_size + 1)]
for sh in char_shing:
# The given shingle is unknown. Register it
if sh not in shingle_id:
shingle_id[sh] = len(shingle_id)
if shingle_id[sh] >= len(shingle_matrix):
# Matrix is too slow, so we double its size
shingle_matrix = np.append(shingle_matrix, np.zeros(shingle_matrix.shape, dtype=bool), axis=0)
# Store the information that the shingle is in the document
shingle_matrix[shingle_id[sh], doc_id] = True
# Reduce matrix size to useful content
shingle_matrix = shingle_matrix[:len(shingle_id)]
return shingle_matrix
def compute_optimal_matrix_size(threshold: float) -> tuple[int, int]:
"""
Compute bands and rows number for the signature matrix
such that these values let an approximation of the similarity of two
documents, using LSH.
Recall that the threshold for a signature matrix with b bands of r rows
is given by t = (1 / b) ** (1 / r).
We want that this value is lower than the expected threshold to avoid
true negatives, but we want that this value stay lear the expected
value since we also want to avoid false positives.
Since a check will be opered at the end on candidate pairs, we mainly
want to minimize true negatives instead of false positives.
Then, we ensure that the estimated threshold is between
threshold/2 and 4*threshold/5.
To achieve that, we start from some values, then we add bands if the
threshold is too high, or add some rows per band if it is too high.
"""
# Compute b and r such that s/2 < t < s
# Use at least 2 rows and 16 bands to have good values
rows = 2
bands = 16
est_threshold = (1 / bands) ** (1 / rows)
# Threshold is not acceptable
while not (threshold / 2 < est_threshold < 4 * threshold / 5):
# Add bands
if est_threshold >= 4 * threshold / 5:
bands *= 2
# Add rows
else:
rows *= 2
est_threshold = (1 / bands) ** (1 / rows)
# Estimated threshold is now near required threshold
return bands, rows
def compute_signature_matrix(shingles: np.ndarray, permutations_count: int, display_tqdm: bool = False) -> np.ndarray:
"""
Implementation of the min-hash algorithm.
We compute a signature matrix of shingles generated by random permutations.
The shingles parameters stands for the shingle boolean matrix (shingle x document)
where shingles[i, j] = True states that shingle i appears in document j.
The permutations_count argument indicates the number of random permutations to generate.
The output is the signature matrix, that has for dimensions (permutations_count x docs_count).
For each permutation, we generate it randomly, then we take the first shingle of the document.
While the permutation generation can be done quickly, the check of the first shingle of a
document in a permutation (which can be achieved with an argmax in a boolean row) may be
quite expensive, and take some time. If supported and option enabled, a progress bar can
be displayed.
"""
shingles_count, docs_count = shingles.shape
# Initialize matrix
signature_matrix = np.inf * np.ones((permutations_count, docs_count))
permutations_iterator = range(permutations_count)
# If supported, load tqdm to display the progress bar
if display_tqdm:
try:
from tqdm import tqdm
permutations_iterator = tqdm(permutations_iterator, unit="perm.", position=1)
except ImportError:
print("tqdm is not installed. Please install tqdm before using --tqdm option.")
for permutation_id in permutations_iterator:
# Generate random permutation of shingles
# This is not the most expensive task
permutation = np.random.permutation(shingles)
# For each document, get the smallest shingle after permutation
# This is the expensive operation
signature_matrix[permutation_id] = permutation.argmax(0)
return signature_matrix
def find_candidate_pairs(signature: np.ndarray, bands: int, rows: int) -> set[tuple[int, int]]:
"""
Implementation of the LSH algorithm.
Given a signature matrix and band and rows per band numbers, we want to
find some candidate document pairs to be similar.
We already know that the probability that two documents have the same signature
is the same as their similarity.
Two documents are called are a candidate pair if they have the same signature on
all rows of at least one band.
The output is a set of pairs of document ids.
"""
_, docs_count = signature.shape
candidate_pairs = set()
for band_id in range(bands):
# Get interesting band
band = signature[band_id * rows:(band_id + 1) * rows]
buckets = {}
# Put documents into buckets
# A bucket is the tuple of all signatures of a row
for doc in range(docs_count):
sign_doc = tuple(band[:, doc])
buckets.setdefault(sign_doc, set())
buckets[sign_doc].add(doc)
# Find documents in the same bucket
for bucket in buckets.values():
for doc_a in bucket:
for doc_b in bucket:
if doc_a != doc_b:
# Sort documents for nice output
doc_a, doc_b = min(doc_a, doc_b), max(doc_a, doc_b)
candidate_pairs.add((doc_a, doc_b))
return candidate_pairs
def shingle_set(shingles: np.ndarray, doc_id: int) -> set[int]:
"""
Return the set of all shingle id from a document.
To don't recompute multiple times this, this is cached.
"""
if not hasattr(shingle_set, '_cache'):
shingle_set._cache = {}
if doc_id not in shingle_set._cache:
shingle_set._cache[doc_id] = set(x for x in range(len(shingles)) if shingles[x, doc_id])
return shingle_set._cache[doc_id]
def jaccard_similarity(doc1: set, doc2: set) -> float:
"""
Compute jaccard similarity of two sets.
This is defined as 0 if both sets are empty, |A ∩ B| / |A B| if general cases.
"""
if not doc1 or not doc2:
return 0.0
inter = doc1.intersection(doc2)
union = doc1.union(doc2)
return len(inter) / len(union)
def parse(stream, similarity: float, *, stats: bool = False, display_tqdm: bool = False, verbose: bool = True) \
-> Optional[tuple[int, int, int, int]]:
"""
Given a stream of documents (separated by line feeds) and a similarity threshold,
we display in standard output an estimation of document pairs that
have a Jaccard similarity higher than the requested threshold.
We use k-shringling, MinHash and LSH to compute the estimation.
"""
docs = [line.rstrip('\n') for line in stream] # Read stream
docs = [normalize(doc) for doc in docs] # Remove special characters and normalize accents
# Compute k-shingles
shingles = compute_shingles(docs, SHINGLE_SIZE)
return parse_shingles(docs, shingles, similarity, stats=stats, display_tqdm=display_tqdm, verbose=verbose)
def parse_shingles(docs: list[str], shingles: np.ndarray, similarity: float, *, stats: bool = False,
display_tqdm: bool = False, verbose: bool = True) -> Optional[tuple[int, int, int, int]]:
# Compute best values for permutations count
bands, rows = compute_optimal_matrix_size(similarity)
# Compute signature matrix using MinHash
signature = compute_signature_matrix(shingles, bands * rows, display_tqdm)
# Guess candidate pairs using LSH
candidate_pairs = find_candidate_pairs(signature, bands, rows)
# Sort pairs for a nice output
candidate_pairs = sorted(candidate_pairs)
# Compute true and false positive counts
tp = 0
fp = 0
# For each document pair, compute true Jaccard similarity and display it
for doc_a, doc_b in candidate_pairs:
# Compute true jaccard similarity
shingles_a = shingle_set(shingles, doc_a)
shingles_b = shingle_set(shingles, doc_b)
d = jaccard_similarity(shingles_a, shingles_b)
if d >= similarity:
if verbose:
print(f"{doc_a} {doc_b} {d:.06f}")
tp += 1
else:
fp += 1
if stats:
# Compute true and false negative counts, for validation only
tn = 0
fn = 0
for doc_a in range(len(docs)):
for doc_b in range(doc_a + 1, len(docs)):
# Compute true jaccard similarity
shingles_a = shingle_set(shingles, doc_a)
shingles_b = shingle_set(shingles, doc_b)
d = jaccard_similarity(shingles_a, shingles_b)
if d >= similarity and (doc_a, doc_b) not in candidate_pairs:
fn += 1
elif d < similarity and (doc_a, doc_b) not in candidate_pairs:
tn += 1
return tp, fp, tn, fn
def main():
# Parse arguments from command line
ns = parse_args()
if ns.graph:
# Don't use the program to compute something
return graph(ns.input, ns.progress)
if not (0 < ns.similarity <= 1):
raise ValueError(f"Invalid similiarity value: {ns.similarity}")
# Analyse documents
output = parse(ns.input, ns.similarity, stats=ns.stats, display_tqdm=ns.progress)
if ns.stats:
tp, fp, tn, fn = output
print(f"True positive: {tp}", file=sys.stderr)
print(f"False positive: {fp}", file=sys.stderr)
print(f"True negative: {tn}", file=sys.stderr)
print(f"False negative: {fn}", file=sys.stderr)
tp_rate = tp / (tp + fn)
fp_rate = fp / (fp + tn)
print(f"True positive rate: {tp_rate:.06f}", file=sys.stderr)
print(f"False positive rate: {fp_rate:.06f}", file=sys.stderr)
def graph(stream, display_tqdm: bool = False) -> None:
"""
Draw statistic graphs about false-positive and true positive rates using matplotlib.
"""
docs = [line.rstrip('\n') for line in stream] # Read stream
docs = [normalize(doc) for doc in docs] # Remove special characters and normalize accents
# Compute k-shingles
shingles = compute_shingles(docs, SHINGLE_SIZE)
step = 0.05
n = int(1 // step)
tps, fps, tns, fns = [], [], [], []
step_iterator = range(1, n + 1)
if display_tqdm:
from tqdm import tqdm
step_iterator = tqdm(step_iterator, position=1)
for i in step_iterator:
t = i * step
tp, fp, tn, fn = parse_shingles(docs, shingles, t, stats=True, display_tqdm=display_tqdm, verbose=False)
tps.append(tp)
fps.append(fp)
tns.append(tn)
fns.append(fn)
tps = np.array(tps)
fps = np.array(fps)
tns = np.array(tns)
fns = np.array(fns)
tps_rate = tps / (tps + fns)
fps_rate = fps / (fps + tns)
print("tps = np.array(", tps, ")")
print("fps = np.array(", fps, ")")
print("tns = np.array(", tns, ")")
print("fns = np.array(", fns, ")")
x_axis = step * np.array(range(1, n + 1))
plt.plot(x_axis, tps_rate, '*')
plt.xlabel("Threshold value")
plt.ylabel("True positive rate")
plt.title("True positive rate per threshold value")
plt.show()
plt.plot(x_axis, fps_rate, '*')
plt.xlabel("Threshold value")
plt.ylabel("False positive rate")
plt.title("False positive rate per threshold value")
plt.show()
plt.plot(x_axis, np.log(fps_rate), '*')
plt.xlabel("Threshold value")
plt.ylabel("False positive rate (log scale)")
plt.title("False positive rate per threshold value (logarithmic scale)")
plt.show()