将输入的推特文档转换为小写,这里统一处理,使得后续查询不区分大小写。
根据特定标记在推特文档中查找并确定关键部分信息的位置索引,并提取出推特文档中的tweetid和tweet内容。
对提取出的文本内容进行分词处理,并将单词转换为其单数形式。
对分词后的词列表进行词形还原,主要针对动词的还原操作。同时,筛去[“text”, “tweetid”]
将筛选出的有效词添加到最终结果列表中,并返回。
#分词预处理
def tokenize_tweet(document):
# 统一处理使查询不区分大小写
document = document.lower()
# 根据特定标记在推特文档中查找并确定关键部分信息的位置索引
# 这里的减1减3是对引号逗号切入与否的调整
a = document.index("tweetid") - 1
b = document.index("errorcode") - 1
c = document.index("text") - 1
d = document.index("timestr") - 3
# 将推特文档中的tweetid和text内容主要信息提取出来
document = document[a:b] + document[c:d]
# 分词处理,并将单词转换为其单数形式
terms = TextBlob(document).words.singularize()
# 将分词后的词列表进行词形还原,并筛选出不属于无用词的有效词
result = []
for word in terms:
# 将当前词转换为Word对象
expected_str = Word(word)
# 动词的还原操作
expected_str = expected_str.lemmatize("v")
if expected_str not in uselessTerm:
# 筛去["text", "tweetid"],添加到result中
result.append(expected_str)
return result
logarithmic tf (l as first character), no idf and cosine normalization。
logarithmic tf (l as first character))
cosine normalization
),遍历tf字典的键(即词项),得到归一化因子。最后,代码再次遍历tf字典的键,并将每个词项的频率乘以归一化因子。得到最后的对应tf权重。# 统计词项频率,记录每个词在当前文档中的出现次数
tf = {}
for word in line:
if word in tf.keys():
tf[word] += 1
else:
tf[word] = 1
# logarithmic tf
for word in tf.keys():
tf[word] = 1 + math.log(tf[word])
# 归一化,cosine normalization
cosine = 0
for word in tf.keys():
cosine = cosine + tf[word] * tf[word]
cosine = 1.0 / math.sqrt(cosine)
for word in tf.keys():
tf[word] = tf[word] * cosine
logarithmic tf (l in leftmost column), idf (t in second column), no normalization
。tf[word] = (math.log(tf[word]) + 1) * math.log(N / df)
,对应ltn(logarithmic tf, idf, no normalization)
。def similarity(query):
global score_tid
tf = {}
# 统计词项频率
for word in query:
if word in tf:
tf[word] += 1
else:
tf[word] = 1
# 统计文档频率
for word in tf.keys():
if word in postings:
df = len(postings[word])
else:
df = N
# 对应ltn,logarithmic tf (l in leftmost column), idf (t in second column), no normalization
tf[word] = (math.log(tf[word]) + 1) * math.log(N / df)
# 计算相似度
for word in query:
if word in postings:
for tid in postings[word]:
if tid in score_tid.keys():
score_tid[tid] += postings[word][tid] * tf[word]
else:
score_tid[tid] = postings[word][tid] * tf[word]
# 按照得分(相似度)进行降序排序
similarity = sorted(score_tid.items(), key=lambda x: x[1], reverse=True)
return similarity
def token(doc):
# 将输入文本转换为小写字母,以便统一处理。
doc = doc.lower()
# 将文本拆分为单个词项,并尝试将词项转换为单数形式
terms = TextBlob(doc).words.singularize()
# 将分词后的词列表进行词形还原,返回结果列表result
result = []
for word in terms:
expected_str = Word(word)
expected_str = expected_str.lemmatize("v")
result.append(expected_str)
return result
def Union(sets):
return reduce(set.union, [s for s in sets])
def do_search():
query = token(input("please input search query >> "))
result = []
if query == []:
sys.exit()
# set()去除查询词列表中的重复项
unique_query = set(query)
# 生成一个包含每个查询词对应的tweet的id集合的列表,并且利用Union()函数将这些集合取并集
relevant_tweetids = Union([set(postings[term].keys()) for term in unique_query])
print("一共有" + str(len(relevant_tweetids)) + "条相关tweet!")
if not relevant_tweetids:
print("No tweets matched any query terms for")
print(query)
else:
print("the top 100 tweets are:")
scores = similarity(query)
i = 1
for (id, score) in scores:
if i <= 100: # 返回前n条查询到的信息
result.append(id)
print(str(score) + ": " + id)
i = i + 1
else:
break
print("finished")
最终代码
import sys
from collections import defaultdict
from textblob import TextBlob
from textblob import Word
import math
from functools import reduce
uselessTerm = ["text", "tweetid"]
# 构建倒排索引表,存储term在每个doc中的TF with pairs (docID, tf)
postings = defaultdict(dict)
# 文档数目N
N = 0
# 最终权值
score_tid = defaultdict(dict)
#分词预处理
def tokenize_tweet(document):
# 统一处理使查询不区分大小写
document = document.lower()
# 根据特定标记在推特文档中查找并确定关键部分信息的位置索引
# 这里的减1减3是对引号逗号切入与否的调整
a = document.index("tweetid") - 1
b = document.index("errorcode") - 1
c = document.index("text") - 1
d = document.index("timestr") - 3
# 将推特文档中的tweetid和text内容主要信息提取出来
document = document[a:b] + document[c:d]
# 分词处理,并将单词转换为其单数形式
terms = TextBlob(document).words.singularize()
# 将分词后的词列表进行词形还原,并筛选出不属于无用词的有效词
result = []
for word in terms:
# 将当前词转换为Word对象
expected_str = Word(word)
# 动词的还原操作
expected_str = expected_str.lemmatize("v")
if expected_str not in uselessTerm:
# 筛去["text", "tweetid"],添加到result中
result.append(expected_str)
return result
# 构建倒排索引表,存储term在每个doc中的TF with pairs (docID, tf)
# lnc:logarithmic tf, no idf and cosine normalization
def get_postings():
global postings, N
content = open(r"Tweets.txt")
# 内容读取,每一条推特作为一个元素存储在lines中
lines = content.readlines()
for line in lines:
N += 1
# 预处理
line = tokenize_tweet(line)
# 提取处理后的词列表中的第一个元素,即推特文档的tweetid
tweetid = line[0]
# 提取后删除,不作为有效词
line.pop(0)
# 统计词项频率,记录每个词在当前文档中的出现次数
tf = {}
for word in line:
if word in tf.keys():
tf[word] += 1
else:
tf[word] = 1
# logarithmic tf
for word in tf.keys():
tf[word] = 1 + math.log(tf[word])
# 归一化,cosine normalization
cosine = 0
for word in tf.keys():
cosine = cosine + tf[word] * tf[word]
cosine = 1.0 / math.sqrt(cosine)
for word in tf.keys():
tf[word] = tf[word] * cosine
# 将处理后的词列表转换为集合,获取其中的唯一词
unique_terms = set(line)
for key_word in unique_terms:
if key_word in postings.keys():
postings[key_word][tweetid] = tf[key_word]
else:
postings[key_word][tweetid] = tf[key_word]
# query标准化处理
def token(doc):
# 将输入文本转换为小写字母,以便统一处理。
doc = doc.lower()
# 将文本拆分为单个词项,并尝试将词项转换为单数形式
terms = TextBlob(doc).words.singularize()
# 将分词后的词列表进行词形还原,返回结果列表result
result = []
for word in terms:
expected_str = Word(word)
expected_str = expected_str.lemmatize("v")
result.append(expected_str)
return result
# 计算query和各个文档的相似度
def similarity(query):
global score_tid
tf = {}
# 统计词项频率
for word in query:
if word in tf:
tf[word] += 1
else:
tf[word] = 1
# 统计文档频率
for word in tf.keys():
if word in postings:
df = len(postings[word])
else:
df = N
# 对应ltn,logarithmic tf (l in leftmost column), idf (t in second column), no normalization
tf[word] = (math.log(tf[word]) + 1) * math.log(N / df)
# 计算相似度
for word in query:
if word in postings:
for tid in postings[word]:
if tid in score_tid.keys():
score_tid[tid] += postings[word][tid] * tf[word]
else:
score_tid[tid] = postings[word][tid] * tf[word]
# 按照得分(相似度)进行降序排序
similarity = sorted(score_tid.items(), key=lambda x: x[1], reverse=True)
return similarity
def Union(sets):
return reduce(set.union, [s for s in sets])
def do_search():
query = token(input("please input search query >> "))
result = []
if query == []:
sys.exit()
# set()去除查询词列表中的重复项
unique_query = set(query)
# 生成一个包含每个查询词对应的tweet的id集合的列表,并且利用Union()函数将这些集合取并集
relevant_tweetids = Union([set(postings[term].keys()) for term in unique_query])
print("一共有" + str(len(relevant_tweetids)) + "条相关tweet!")
if not relevant_tweetids:
print("No tweets matched any query terms for")
print(query)
else:
print("the top 100 tweets are:")
scores = similarity(query)
i = 1
for (id, score) in scores:
if i <= 100: # 返回前n条查询到的信息
result.append(id)
print(str(score) + ": " + id)
i = i + 1
else:
break
print("finished")
def main():
get_postings()
while True:
do_search()
if __name__ == "__main__":
main()