Supercharging superbooga (#3272)
This commit is contained in:
parent
ad00b8eb26
commit
0845724a89
21 changed files with 12294 additions and 2 deletions
199
extensions/superboogav2/data_preprocessor.py
Normal file
199
extensions/superboogav2/data_preprocessor.py
Normal file
|
@ -0,0 +1,199 @@
|
|||
"""
|
||||
This module contains utils for preprocessing the text before converting it to embeddings.
|
||||
|
||||
- TextPreprocessorBuilder preprocesses individual strings.
|
||||
* lowering cases
|
||||
* converting numbers to words or characters
|
||||
* merging and stripping spaces
|
||||
* removing punctuation
|
||||
* removing stop words
|
||||
* lemmatizing
|
||||
* removing specific parts of speech (adverbs and interjections)
|
||||
- TextSummarizer extracts the most important sentences from a long string using text-ranking.
|
||||
"""
|
||||
import pytextrank
|
||||
import string
|
||||
import spacy
|
||||
import math
|
||||
import nltk
|
||||
import re
|
||||
|
||||
from nltk.corpus import stopwords
|
||||
from nltk.stem import WordNetLemmatizer
|
||||
from num2words import num2words
|
||||
|
||||
|
||||
class TextPreprocessorBuilder:
|
||||
# Define class variables as None initially
|
||||
_stop_words = set(stopwords.words('english'))
|
||||
_lemmatizer = WordNetLemmatizer()
|
||||
|
||||
# Some of the functions are expensive. We cache the results.
|
||||
_lemmatizer_cache = {}
|
||||
_pos_remove_cache = {}
|
||||
|
||||
|
||||
def __init__(self, text: str):
|
||||
self.text = text
|
||||
|
||||
|
||||
def to_lower(self):
|
||||
# Match both words and non-word characters
|
||||
tokens = re.findall(r'\b\w+\b|\W+', self.text)
|
||||
for i, token in enumerate(tokens):
|
||||
# Check if token is a word
|
||||
if re.match(r'^\w+$', token):
|
||||
# Check if token is not an abbreviation or constant
|
||||
if not re.match(r'^[A-Z]+$', token) and not re.match(r'^[A-Z_]+$', token):
|
||||
tokens[i] = token.lower()
|
||||
self.text = "".join(tokens)
|
||||
return self
|
||||
|
||||
|
||||
def num_to_word(self, min_len: int = 1):
|
||||
# Match both words and non-word characters
|
||||
tokens = re.findall(r'\b\w+\b|\W+', self.text)
|
||||
for i, token in enumerate(tokens):
|
||||
# Check if token is a number of length `min_len` or more
|
||||
if token.isdigit() and len(token) >= min_len:
|
||||
# This is done to pay better attention to numbers (e.g. ticket numbers, thread numbers, post numbers)
|
||||
# 740700 will become "seven hundred and forty thousand seven hundred".
|
||||
tokens[i] = num2words(int(token)).replace(",","") # Remove commas from num2words.
|
||||
self.text = "".join(tokens)
|
||||
return self
|
||||
|
||||
|
||||
def num_to_char_long(self, min_len: int = 1):
|
||||
# Match both words and non-word characters
|
||||
tokens = re.findall(r'\b\w+\b|\W+', self.text)
|
||||
for i, token in enumerate(tokens):
|
||||
# Check if token is a number of length `min_len` or more
|
||||
if token.isdigit() and len(token) >= min_len:
|
||||
# This is done to pay better attention to numbers (e.g. ticket numbers, thread numbers, post numbers)
|
||||
# 740700 will become HHHHHHEEEEEAAAAHHHAAA
|
||||
convert_token = lambda token: ''.join((chr(int(digit) + 65) * (i + 1)) for i, digit in enumerate(token[::-1]))[::-1]
|
||||
tokens[i] = convert_token(tokens[i])
|
||||
self.text = "".join(tokens)
|
||||
return self
|
||||
|
||||
def num_to_char(self, min_len: int = 1):
|
||||
# Match both words and non-word characters
|
||||
tokens = re.findall(r'\b\w+\b|\W+', self.text)
|
||||
for i, token in enumerate(tokens):
|
||||
# Check if token is a number of length `min_len` or more
|
||||
if token.isdigit() and len(token) >= min_len:
|
||||
# This is done to pay better attention to numbers (e.g. ticket numbers, thread numbers, post numbers)
|
||||
# 740700 will become HEAHAA
|
||||
tokens[i] = ''.join(chr(int(digit) + 65) for digit in token)
|
||||
self.text = "".join(tokens)
|
||||
return self
|
||||
|
||||
def merge_spaces(self):
|
||||
self.text = re.sub(' +', ' ', self.text)
|
||||
return self
|
||||
|
||||
def strip(self):
|
||||
self.text = self.text.strip()
|
||||
return self
|
||||
|
||||
def remove_punctuation(self):
|
||||
self.text = self.text.translate(str.maketrans('', '', string.punctuation))
|
||||
return self
|
||||
|
||||
def remove_stopwords(self):
|
||||
self.text = "".join([word for word in re.findall(r'\b\w+\b|\W+', self.text) if word not in TextPreprocessorBuilder._stop_words])
|
||||
return self
|
||||
|
||||
def remove_specific_pos(self):
|
||||
"""
|
||||
In the English language, adverbs and interjections rarely provide meaningul information.
|
||||
Removing them improves the embedding precision. Don't tell JK Rowling, though.
|
||||
"""
|
||||
processed_text = TextPreprocessorBuilder._pos_remove_cache.get(self.text)
|
||||
if processed_text:
|
||||
self.text = processed_text
|
||||
return self
|
||||
|
||||
# Match both words and non-word characters
|
||||
tokens = re.findall(r'\b\w+\b|\W+', self.text)
|
||||
|
||||
# Exclude adverbs and interjections
|
||||
excluded_tags = ['RB', 'RBR', 'RBS', 'UH']
|
||||
|
||||
for i, token in enumerate(tokens):
|
||||
# Check if token is a word
|
||||
if re.match(r'^\w+$', token):
|
||||
# Part-of-speech tag the word
|
||||
pos = nltk.pos_tag([token])[0][1]
|
||||
# If the word's POS tag is in the excluded list, remove the word
|
||||
if pos in excluded_tags:
|
||||
tokens[i] = ''
|
||||
|
||||
new_text = "".join(tokens)
|
||||
TextPreprocessorBuilder._pos_remove_cache[self.text] = new_text
|
||||
self.text = new_text
|
||||
|
||||
return self
|
||||
|
||||
def lemmatize(self):
|
||||
processed_text = TextPreprocessorBuilder._lemmatizer_cache.get(self.text)
|
||||
if processed_text:
|
||||
self.text = processed_text
|
||||
return self
|
||||
|
||||
new_text = "".join([TextPreprocessorBuilder._lemmatizer.lemmatize(word) for word in re.findall(r'\b\w+\b|\W+', self.text)])
|
||||
TextPreprocessorBuilder._lemmatizer_cache[self.text] = new_text
|
||||
self.text = new_text
|
||||
|
||||
return self
|
||||
|
||||
def build(self):
|
||||
return self.text
|
||||
|
||||
class TextSummarizer:
|
||||
_nlp_pipeline = None
|
||||
_cache = {}
|
||||
|
||||
@staticmethod
|
||||
def _load_nlp_pipeline():
|
||||
# Lazy-load it.
|
||||
if TextSummarizer._nlp_pipeline is None:
|
||||
TextSummarizer._nlp_pipeline = spacy.load('en_core_web_sm')
|
||||
TextSummarizer._nlp_pipeline.add_pipe("textrank", last=True)
|
||||
return TextSummarizer._nlp_pipeline
|
||||
|
||||
@staticmethod
|
||||
def process_long_text(text: str, min_num_sent: int) -> list[str]:
|
||||
"""
|
||||
This function applies a text summarization process on a given text string, extracting
|
||||
the most important sentences based on the principle that 20% of the content is responsible
|
||||
for 80% of the meaning (the Pareto Principle).
|
||||
|
||||
Returns:
|
||||
list: A list of the most important sentences
|
||||
"""
|
||||
|
||||
# Attempt to get the result from cache
|
||||
cache_key = (text, min_num_sent)
|
||||
cached_result = TextSummarizer._cache.get(cache_key, None)
|
||||
if cached_result is not None:
|
||||
return cached_result
|
||||
|
||||
nlp_pipeline = TextSummarizer._load_nlp_pipeline()
|
||||
doc = nlp_pipeline(text)
|
||||
|
||||
num_sent = len(list(doc.sents))
|
||||
result = []
|
||||
|
||||
if num_sent >= min_num_sent:
|
||||
|
||||
limit_phrases = math.ceil(len(doc._.phrases) * 0.20) # 20% of the phrases, rounded up
|
||||
limit_sentences = math.ceil(num_sent * 0.20) # 20% of the sentences, rounded up
|
||||
result = [str(sent) for sent in doc._.textrank.summary(limit_phrases=limit_phrases, limit_sentences=limit_sentences)]
|
||||
|
||||
else:
|
||||
result = [text]
|
||||
|
||||
# Store the result in cache before returning it
|
||||
TextSummarizer._cache[cache_key] = result
|
||||
return result
|
Loading…
Add table
Add a link
Reference in a new issue