酒店评论

读取处理好的文本索引

 
import numpy as np
import os
from ai.box.d1 import pkl_save,pkl_load 

"""
读取原始数据
X_train是python list列表,其值为索引
"""
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
dict_file = os.path.join(BASE_DIR,"data_pkl/wordict.pkl")
file_train = os.path.join(BASE_DIR,"data_pkl/train.pkl")
file_test = os.path.join(BASE_DIR, "data_pkl/test.pkl")

# word2idx中PAD的索引为0,其他的单词随机编码,
# 如果要优化则可以在这个基础上再做一些处理
words_set,word2idx = pkl_load(file_path=dict_file)   # 字典
X_train,y_train = pkl_load(file_path=file_train)     # 训练集
X_test,y_test = pkl_load(file_path=file_test)        # 测试集

X_train = np.array(X_train)
y_train = np.array(y_train)
X_test = np.array(X_test)
y_test = np.array(y_test)
print("X_train:",X_train.shape)  # X_train: (4800, 85)
print("y_train:",y_train.shape)  # y_train: (4800,)
print("X_test:",X_test.shape)    # X_test: (1200, 85)
print("y_test:",y_test.shape)    # y_test: (1200,)

保存加工后的文本索引,以快速验证模型,评论索引数据集下载

封闭以方便调用

 
封装一下,以方便调用,路径需要暂时在自己的项目中固定:
def load_hotel(return_dict=False, return_Xy=True, dataset_path=dp):
    """酒店评论索引数据集
    - return_dict: 返回 words_set,word2idx
    - return_Xy: 返回 X_train,y_train,X_test,y_test, np.ndarray类型
    """
    BASE_DIR = os.path.join(dataset_path,"hotel_reader")
    dict_file = os.path.join(BASE_DIR,"data_pkl/wordict.pkl")
    file_train = os.path.join(BASE_DIR,"data_pkl/train.pkl")
    file_test = os.path.join(BASE_DIR, "data_pkl/test.pkl")
    if return_dict:
        words_set,word2idx = pkl_load(file_path=dict_file)   # 字典
        return words_set,word2idx 
    if return_Xy:
        X_train,y_train = pkl_load(file_path=file_train)     # 训练集
        X_test,y_test = pkl_load(file_path=file_test)        # 测试集
        X_train = np.array(X_train)
        y_train = np.array(y_train)
        X_test = np.array(X_test)
        y_test = np.array(y_test)
        print("X_train:",X_train.shape)  # X_train: (4800, 85)
        print("y_train:",y_train.shape)  # y_train: (4800,)
        print("X_test:",X_test.shape)    # X_test: (1200, 85)
        print("y_test:",y_test.shape)    # y_test: (1200,)
        return X_train,y_train,X_test,y_test

再次调用,两行代码就好了,干净清爽:
from ai.datasets import load_hotel 
X_train,y_train,X_test,y_test = load_hotel(return_dict=False, return_Xy=True)

数据集介绍

 
语料:训练集4800个样本,测试集1200个样本
txt文件:一个个文本文件,每个文件中记录一段评论,GBK编码
标注文件:两个csv文件,每个csv中有两列,一列记录的是文件路径,一列记录的是文件属于正面评论还是负面评论

正样本为标记为0,负样本标记为1,其中一条示例:
外部环境不错,但房间太旧了,晚上睡觉阳台门须用东西顶住。性价比不高。

数据加工简介

 
文本转索引,一个文本转化为一个索引向量,对应一个标签,形成一对(X,y)可用于模型训练
批次处理:封装为Dataset数据集

词典处理:
jieba分词,索引编号,其中PAD的索引设置为0
可使用已有的单词集合,再合并训练集中新出现的单词
测试文件中新出现的单词标记为未知单词

保存文件:
词典,转化后的训练集索引,转化后的测试集索引

文本数据优化:
作为文本类处理,可优化的处理主要集中于两点:
1. 去掉没用的词,比如使用停用词表,高频过高的,过低的等
2. 保留关键词,那什么才算关键,又有一系列算法...
上面的数据集为原始数据转索引的集合,未做优化 

数据加工代码

 
import os 
BASE_DIR = os.path.dirname(os.path.abspath(__file__))

"""
1. 获取词典(不重复单词集合)
2. 文本转索引向量,word2idx
3. embed方式转向量,这一步可以现在做,也可以在训练时再做,但通常随模型一起训练
"""

import numpy as np
from sklearn.model_selection import train_test_split

import os
import jieba
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

import torch
from torch import nn
from torch.nn import functional as F

from ai.box.d1 import pkl_save,pkl_load 

"""
读取原始数据
"""
train_file = os.path.join(BASE_DIR,"train.csv")
test_file = os.path.join(BASE_DIR,"test.csv")
dict_file = os.path.join(BASE_DIR,"data_pkl/wordict.pkl")

file_train = os.path.join(BASE_DIR,"data_pkl/train.pkl")
file_test = os.path.join(BASE_DIR, "data_pkl/test.pkl")

# 构建词典,单词集合
# UNK:未知词
# PAD:填充词

pad_flag = "<PAD>"
unknown_flag = "<UNK>"

# words_set:不重复单词集
words_set = {unknown_flag, pad_flag}

def modify_dict(word2idx, pad_flag=pad_flag):
    """修改字典
    将PAD标记的索引调为0,
    理论上,将词频高的单词索引放前面更合适一些,此处未处理该项
    """
    idx2word = {idx:word for word, idx in word2idx.items()}
    word0 = idx2word[0]
    pad_flag_index = word2idx[pad_flag]
    word2idx[pad_flag] = 0
    word2idx[word0] = pad_flag_index
    return word2idx


if os.path.exists(file_train):
    words_set,word2idx = pkl_load(file_path=dict_file)   # 字典
    X_train,y_train = pkl_load(file_path=file_train)     # 训练集
    X_test,y_test = pkl_load(file_path=file_test)        # 测试集

else:
    if os.path.exists(dict_file):
        words_set = pkl_load(file_path=dict_file)

    """训练集切词生成及字典更新
    """
    # 训练集文本和标签
    X_train = []
    y_train = []
    seq_len_list = []
    count = 0

    # 训练集读取
    with open(file=train_file, mode="r", encoding="utf8") as f:
        for line in f.readlines():
            file_name, label = line.strip().split(",")
            file_name = os.path.join(BASE_DIR,file_name)
            y_train.append(int(label))
            count = count + 1
            if count % 100 == 0:
                print(f"正在处理第 {count} 个训练文件")
            with open(file=file_name, mode="r", encoding="gbk", errors="ignore") as f1:
                txt = f1.read().replace(" ", "").replace("\n", "").replace("\t", "")
                txt_cut = jieba.lcut(txt)
                seq_len_list.append(len(txt_cut))
                words_set = words_set.union(set(txt_cut))
                X_train.append(txt_cut)


    """测试集切词
    测试集没有权力更新字典,所以不需要更新
    """
    # 测试集文本和标签
    X_test = []
    y_test = []

    # 测试集读取
    with open(file=test_file, mode="r", encoding="utf8") as f:
        for line in f.readlines():
            file_name, label = line.strip().split(",")
            file_name = os.path.join(BASE_DIR,file_name)
            y_test.append(int(label))
            with open(file=file_name, mode="r", encoding="gbk", errors="ignore") as f1:
                txt = f1.read().replace(" ", "").replace("\n", "").replace("\t", "")
                txt_cut = jieba.lcut(txt)
                X_test.append(txt_cut)


    # 训练集文本转索引
    # -------------------------------------------------------------------
    # 这里可以考虑对字典做一个优化,PAD=0,词频高的排前一些
    # 本方法也没有保存词典,即一个单词什么编号不重要,每次随机编号即可
    # 训练的参数也是从一个随机数开始
    word2idx = {word:idx for idx, word in enumerate(words_set)}
    word2idx = modify_dict(word2idx)


    # 序列长度(平均句子长度),超过该长度可以取关键词
    seq_len = 86 
    seq_len = int(np.array(seq_len_list).mean())
    print("seq_len:", seq_len)  

    """
        截取固定长度,补齐,为了批次处理
    """
    # 训练集
    X_train1 = []
    for x in X_train:
        temp = x + [pad_flag] * seq_len
        X_train1.append(temp[:seq_len])

    """
        向量化,word2id,id为索引 
    """
    # 训练集向量化
    X_train2 = []
    for x in X_train1:
        temp = []
        for word in x:
            idx = word2idx[word] if word in word2idx else word2idx[unknown_flag]
            temp.append(idx)
        X_train2.append(temp)
    X_train = X_train2

    # 测试集文本转索引
    # -------------------------------------------------------------------

    """
        截取固定长度,补齐,为了批次处理
    """

    # 测试集
    X_test1 = []
    for x in X_test:
        temp = x + [pad_flag] * seq_len
        X_test1.append(temp[:seq_len])

    """
        向量化,word2id,id为索引 
    """
    # 测试集向量化
    X_test2 = []
    for x in X_test1:
        temp = []
        for word in x:
            idx = word2idx[word] if word in word2idx else word2idx[unknown_flag]
            temp.append(idx)
        X_test2.append(temp)
    X_test = X_test2

    pkl_save(data=(words_set,word2idx),file_path=dict_file)  # 字典
    pkl_save(data=(X_train,y_train),file_path=file_train)    # 训练集
    pkl_save(data=(X_test,y_test),file_path=file_test)       # 测试集


class MyDataSet(Dataset):
    """
    构建数据集
    """
    def __init__(self, X, y):
        self.X = X
        self.y = y
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        x = self.X[idx]
        y = self.y[idx]
        
        return torch.tensor(data=x).long(), torch.tensor(data=y).long()

if __name__ == "__main__":
    train_dataset = MyDataSet(X=X_train, y=y_train)
    test_dataset = MyDataSet(X=X_test,y=y_test)

    print(train_dataset[0],len(train_dataset))
    print("test len=",len(test_dataset))
    print(f"words_set len={len(words_set)}")

数据读取

 
"""
1. 获取词典(不重复单词集合)
2. 文本转索引向量,word2idx
3. embed方式转向量,这一步可以现在做,也可以在训练时再做,但通常随模型一起训练
"""

import numpy as np
import os
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

import torch
from torch import nn
from torch.nn import functional as F

from ai.box.d1 import pkl_save,pkl_load 
import os 
BASE_DIR = os.path.dirname(os.path.abspath(__file__))

"""
读取原始数据
"""
dict_file = os.path.join(BASE_DIR,"data_pkl/wordict.pkl")
file_train = os.path.join(BASE_DIR,"data_pkl/train.pkl")
file_test = os.path.join(BASE_DIR, "data_pkl/test.pkl")

words_set,word2idx = pkl_load(file_path=dict_file)   # 字典
X_train,y_train = pkl_load(file_path=file_train)     # 训练集
X_test,y_test = pkl_load(file_path=file_test)        # 测试集

class MyDataSet(Dataset):
    """
    构建数据集
    """
    def __init__(self, X, y):
        self.X = X
        self.y = y
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        x = self.X[idx]
        y = self.y[idx]
        
        return torch.tensor(data=x).long(), torch.tensor(data=y).long()


train_dataset = MyDataSet(X=X_train, y=y_train)
test_dataset = MyDataSet(X=X_test,y=y_test)

print(train_dataset[0],len(train_dataset))
print("test len=",len(test_dataset))
print(f"words_set len={len(words_set)}")


参考