# 计算距离,每个样本与所有样本的距离 distance_matrix = torch.sum((x1-x2) ** 2, dim=1) index = torch.argmin(distance_matrix, dim=0) |
import numpy as np import torch from torch import nn class DLCorr(nn.Module): def __init__(self): """相似推荐,从指定数据集寻找最自己最接近的数据 - 求数据第1个元素与后面元素的相似度 return ------------------------- 返回与自己最相近的数据的索引 """ super().__init__() def forward(self, X): """正向传播 - 计算目标数据中与自己相似度最高的数据的索引 """ x1 = X[:1,:] x2 = X[1:,:] # 计算距离,每个样本与所有样本的距离 distance_matrix = torch.sum((x1-x2) ** 2, dim=1) index = torch.argmin(distance_matrix, dim=0) return index X = torch.tensor([ [0.3745, 0.9507, 0.7320], [0.5987, 0.1560, 0.1560], [0.0581, 0.8662, 0.6011], [0.7081, 0.0206, 0.9699], [0.8324, 0.2123, 0.1818], [0.1834, 0.3042, 0.5248], [0.4319, 0.2912, 0.6119]]).float() model = DLCorr() index = model(X) X[1:][index] tensor([0.0581, 0.8662, 0.6011]) 下面的h0要能跑通模型,大部分模型批次维度给1就行 但这个自定义模型是第1个元素与后面的元素对比,那么就至少需要2个元素 所以具体的形状还得看模型设计 #[B,C,H,W],trace需要通过实际运行一遍模型导出其静态图,故需要一个输入数据 h0 = torch.zeros(3, 3) # trace方式,在模型设计时不要用for循环, # 能在模型外完成的数据操作不要在模型中写, # 不用inplace等高大上的语法,保持简单,简洁,否则onnx可能无法完全转换过去 torch.onnx.export( model=model, # model的参数,就是原来y_out = model(args)的args在这里指定了 # 有其shape能让模型运行一次就行,不需要真实数据 args=(h0,), # 储存的文件路径 f="model_knn2.onnx", # 导出模型参数,默认为True export_params = True, # eval推理模式,dropout,BatchNorm等超参数固定或不生效 training=torch.onnx.TrainingMode.EVAL, # 打印详细信息 verbose=True, # 为输入和输出节点指定名称,方便后面查看或者操作 input_names=["input1"], output_names=["output1"], # 这里的opset,指各类算子以何种方式导出,对应于symbolic_opset11 opset_version=11, # batch维度是动态的,其他的避免动态 dynamic_axes={ "input1": {0: "batch"}, "output1": {0: "batch"}, } ) import onnx import onnxruntime import numpy as np model_onnx = onnx.load("model_knn2.onnx") # 加载onnx模型 onnx.checker.check_model(model_onnx) # 验证onnx模型是否加载成功 # 创建会话 session = onnxruntime.InferenceSession("model_knn2.onnx",providers=[ 'CPUExecutionProvider']) ort_input = {session.get_inputs()[0].name: np.array(X).astype(np.float32)} #这里要转为Numpy数组 output_name = session.get_outputs()[0].name ort_output = session.run([output_name], ort_input) index = ort_output[0] X[1:][index] tensor([0.0581, 0.8662, 0.6011]) |
返回与第1条数据最相似数据的相似度大小及索引 $ \text{similarity} = \dfrac{x_1 \cdot x_2}{\max(\Vert x_1 \Vert _2, \epsilon) \cdot \max(\Vert x_2 \Vert _2, \epsilon)}$ ![]() import numpy as np import torch from torch import nn import torch.nn.functional as F class DLCorr(nn.Module): def __init__(self): """相似推荐,从指定数据集寻找最自己最接近的数据 - 返回与第1条数据最相似数据的相似度大小及索引号 return ------------------------- 返回与自己最相近的数据的索引 """ super().__init__() def forward(self, X): """正向传播 - 计算目标数据中与自己相似度最高的数据的索引 """ x1 = X[:1,:] x2 = X[1:,:] # 计算余弦相似度,[0,1]越大相似度越高 cosine_sim = F.cosine_similarity(x1, x2, dim=1) index = torch.argmax(cosine_sim, dim=0) return cosine_sim[index],index X = torch.tensor([ [0.3745, 0.9507, 0.7320], [0.5987, 0.1560, 0.1560], [0.0581, 0.8662, 0.6011], [0.7081, 0.0206, 0.9699], [0.8324, 0.2123, 0.1818], [0.1834, 0.3042, 0.5248], [0.4319, 0.2912, 0.6119]]).float() model = DLCorr() cosine_sim,index = model(X) X[1:][index] #tensor([0.0581, 0.8662, 0.6011]) #[B,C,H,W],trace需要通过实际运行一遍模型导出其静态图,故需要一个输入数据 h0 = torch.zeros(3, 3) # trace方式,在模型设计时不要用for循环, # 能在模型外完成的数据操作不要在模型中写, # 不用inplace等高大上的语法,保持简单,简洁,否则onnx可能无法完全转换过去 torch.onnx.export( model=model, # model的参数,就是原来y_out = model(args)的args在这里指定了 # 有其shape能让模型运行一次就行,不需要真实数据 args=(h0,), # 储存的文件路径 f="model_knn2.onnx", # 导出模型参数,默认为True export_params = True, # eval推理模式,dropout,BatchNorm等超参数固定或不生效 training=torch.onnx.TrainingMode.EVAL, # 打印详细信息 verbose=True, # 为输入和输出节点指定名称,方便后面查看或者操作 input_names=["input1"], output_names=["output1"], # 这里的opset,指各类算子以何种方式导出,对应于symbolic_opset11 opset_version=11, # batch维度是动态的,其他的避免动态 dynamic_axes={ "input1": {0: "batch"}, "output1": {0: "batch"}, } ) import onnx import onnxruntime import numpy as np model_onnx = onnx.load("model_knn2.onnx") # 加载onnx模型 onnx.checker.check_model(model_onnx) # 验证onnx模型是否加载成功 # 创建会话 session = onnxruntime.InferenceSession("model_knn2.onnx",providers=[ 'CPUExecutionProvider']) ort_input = {session.get_inputs()[0].name: np.array(X).astype(np.float32)} #这里要转为Numpy数组 ort_output = session.run(None, ort_input) cosine_sim = ort_output[0] index = ort_output[1] cosine_sim,X[1:][index] (array(0.9683632, dtype=float32), tensor([0.0581, 0.8662, 0.6011])) |
|
|
import os import time import numpy as np import torch from torch import nn from tpf import pkl_save,pkl_load BASE_DIR = "/wks/datasets/ibm_aml" file_pkl = "ibm_aml_1.pkl" # file_path = os.path.join(BASE_DIR, file_pkl) file_path = file_pkl X_train, X_test, y_train, y_test = pkl_load(file_path=file_path) len(y_test[y_test==1]),len(y_train[y_train==1]),len(X_train) from torch.utils.data import Dataset from torch.utils.data import DataLoader from tpf.dl import DataSet11 from tpf.dl import T11 from tpf.mlib.seq import SeqOne model = SeqOne(seq_len=X_test.shape[1], out_features=2) T11.train(model, X_train, y_train, X_test, y_test, epochs=50000, batch_size=512, learning_rate=1e-4, model_param_path="model_params_12.pkl.dict", log_file="/tmp/train.log", per_epoch=100) |
|
|
|
|
import torch from torch import nn from tpf.mlib.seq import SeqOne a = torch.randn(64,512) #模拟2维数表 model = SeqOne(seq_len=a.shape[1], out_features=2) model(a)[:3] tensor([[-0.2379, -0.3309], [-0.6055, 0.4042], [ 0.2394, 0.1917]], grad_fn=SliceBackward0) ------------------------------------------------------------------------ |
from T import train from T import X_test from tpf.mlib.seq import SeqOne model = SeqOne(seq_len=X_test.shape[1], out_features=2) # model(torch.tensor(X_test).float()[:3])[:3] train(model) ------------------------------------------------------------------------ |
import numpy as np import pandas as pd from sklearn.datasets import load_breast_cancer from sklearn.model_selection import train_test_split import torch from torch import nn # 加载乳腺癌数据集 data = load_breast_cancer() X = pd.DataFrame(data.data, columns=data.feature_names) y = pd.DataFrame(data.target,columns=['target']) X = torch.tensor(data.data).float() y = torch.tensor(data.target).reshape(-1,1).float() X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) """ 构建数据集 """ import numpy as np from torch.utils.data import Dataset from torch.utils.data import DataLoader import torch from torch import nn from torch.nn import functional as F class MyDataSet(Dataset): def __init__(self,X,y): """ 构建数据集 """ self.X = X self.y = y.reshape(-1) # self.y = y.reshape(-1,1) def __len__(self): return len(self.X) def __getitem__(self, idx): x = self.X[idx] y = self.y[idx] return torch.tensor(data=x).float(), torch.tensor(data=y).long() train_dataset = MyDataSet(X=X_train, y=y_train) train_dataloader = DataLoader(dataset=train_dataset, shuffle=True, batch_size=128) test_dataset = MyDataSet(X=X_test, y=y_test) test_dataloader = DataLoader(dataset=test_dataset, shuffle=False, batch_size=256) # 定义训练轮次 epochs = 20 device = "cuda:0" if torch.cuda.is_available() else "cpu" # 定义过程监控函数 def get_acc(dataloader=train_dataloader, model=None): accs = [] model.to(device=device) model.eval() with torch.no_grad(): for X,y in dataloader: X=X.to(device=device) y=y.to(device=device) y_pred = model(X) y_pred = y_pred.argmax(dim=1) acc = (y_pred == y).float().mean().item() accs.append(acc) return np.array(accs).mean() # 定义训练过程 def train(model, epochs=epochs, train_dataloader=train_dataloader, test_dataloader=test_dataloader): model.to(device=device) # 定义优化器 optimizer = torch.optim.SGD(params=model.parameters(), lr=1e-3) # 定义损失函数 loss_fn = nn.CrossEntropyLoss() for epoch in range(1, epochs+1): print(f"正在进行第 {epoch} 轮训练:") model.train() for X,y in train_dataloader: X=X.to(device=device) y=y.to(device=device) # 正向传播 y_pred = model(X) # 清空梯度 optimizer.zero_grad() # 计算损失 loss = loss_fn(y_pred, y) # 梯度下降 loss.backward() # 优化一步 optimizer.step() print(f"train_acc: {get_acc(dataloader=train_dataloader,model=model)}, test_acc: {get_acc(dataloader=test_dataloader,model=model)}") |
简易示例 import torch from tpf.mlib.seq import SeqOne from tpf.mlib import ModelPre as mp a = torch.randn(3,512) #模拟2维数表 model = SeqOne(seq_len=a.shape[1], out_features=2) file_path = "seqone.dict" torch.save(model.state_dict(), file_path) model.load_state_dict(torch.load(file_path,weights_only=True)) 封装示例:模型保存必须以.dict后缀,否则无法使用该API import torch from tpf.mlib.seq import SeqOne from tpf.mlib import ModelPre as mp a = torch.randn(3,512) #模拟2维数表 model = SeqOne(seq_len=a.shape[1], out_features=2) file_path = "seqone.dict" y_pred = mp.predict_proba(a,model_path=file_path,model=model) y_pred array([-0.03084982, 0.01873165, -0.01635017], dtype=float32) 参数 - 数据 - 模型参数 - 模型定义 batch_size默认10W,为0表示全量预测 y_pred = mp.predict_proba(a,model_path=file_path,model=model,batch_size=1000000) y_pred array([0.03419267, 0.0257818 , 0.04169804], dtype=float32) |
|
从决策树中提取出叶子节点中数据分布集中的树分支 import numpy as np import pandas as pd from sklearn.datasets import load_breast_cancer from sklearn.model_selection import train_test_split # 加载乳腺癌数据集 data = load_breast_cancer() X = pd.DataFrame(data.data, columns=data.feature_names) y = pd.DataFrame(data.target,columns=['label']) y = y['label'] from sklearn.tree import DecisionTreeClassifier clf = DecisionTreeClassifier( max_depth=3, min_samples_leaf=30 ) clf = clf.fit(X, y) import matplotlib.pyplot as plt import dtreeviz import warnings warnings.filterwarnings("ignore") viz_model = dtreeviz.model(clf, X_train=X, y_train=y, target_name='label', feature_names=X.columns, class_names={0:'good',1:'bad'}, ) v = viz_model.view(fancy=False) v.show() v.save("img.svg") --------------------------------------------------------------------------- |
import numpy as np import pandas as pd from sklearn.datasets import load_breast_cancer from sklearn.model_selection import train_test_split # 加载乳腺癌数据集 data = load_breast_cancer() X = pd.DataFrame(data.data, columns=data.feature_names) y = pd.DataFrame(data.target,columns=['target']) Y = y['target'] from sklearn import tree clf = tree.DecisionTreeClassifier( max_depth=3, min_samples_leaf=50 ) clf = clf.fit(X, Y) import matplotlib.pyplot as plt #设置图片的大小,想要清晰的可以设置的大点 plt.figure(figsize=(8,8),dpi=1000) tree.plot_tree(clf) plt.show() # 保存矢量图格式(SVG) plt.savefig('b.svg', format='svg', bbox_inches='tight') |
|
|
|
风控策略的自动化生成-利用决策树分分钟生成上千条策略