文章Flyai人工智能社区参赛非官方指南—代码篇
Flyai人工智能社区参赛非官方指南—代码篇春节前写了个简易版的参赛指南,居然得到了11个赞,通过对比,发现这已经算是多的了,所以感谢这11位小伙伴不吝鼠标左键。通过这些天观察,发现有不少新的小伙伴,想着这些小伙伴将是祖国的未来,顿时感觉应该发挥一下余热,让新来的小伙伴少走弯路,尽快上手FLYAI赛事,领略AI魅力,得个好名次,赚点零花钱,实现小目标,迎娶白富美,走上人生巅峰……好了,废话不多说,针对一些小伙伴提出的没有baseline的问题,这次是代码篇,以实例说话,主要介绍main.py和prediction.py这两个文件框架。因为这两个文件最重要,其它的文件最终都是为这两个文件服务的。理论上我们只要完成这两个文件的编写,就可以完成比赛了。下面,我将以“1024程序员节—蝴蝶分类开源竞赛”为例,讲解一下这两个文件的结构和编写方法。因为这个比赛开源了,所以小伙伴们可以免费下载这个比赛的代码。 一、main.py将官方样例下载到本地,解压后,就可以看到这个main.py文件,打开它,便得到下面的代码框架: # -*- coding: utf-8 -*- import argparse import os from flyai.data_helper import DataHelper from flyai.framework import FlyAI from path import MODEL_PATH ''' 此项目为FlyAI2.0新版本框架,数据读取,评估方式与之前不同 2.0框架不再限制数据如何读取 样例代码仅供参考学习,可以自己修改实现逻辑。 模版项目下载支持 PyTorch、Tensorflow、Keras、MXNET、scikit-learn等机器学习框架 第一次使用请看项目中的:FlyAI2.0竞赛框架使用说明.html 使用FlyAI提供的预训练模型可查看:https://www.flyai.com/models 学习资料可查看文档中心:https://doc.flyai.com/ 常见问题:https://doc.flyai.com/question.html 遇到问题不要着急,添加小姐姐微信,扫描项目里面的:FlyAI小助手二维码-小姐姐在线解答您的问题.png ''' if not os.path.exists(MODEL_PATH): os.makedirs(MODEL_PATH) # 项目的超参,不使用可以删除 parser = argparse.ArgumentParser() parser.add_argument("-e", "--EPOCHS", default=10, type=int, help="train epochs") parser.add_argument("-b", "--BATCH", default=32, type=int, help="batch size") args = parser.parse_args() class Main(FlyAI): ''' 项目中必须继承FlyAI类,否则线上运行会报错。 ''' def download_data(self): # 根据数据ID下载训练数据 data_helper = DataHelper() data_helper.download_from_ids("ButterflyClassification") def deal_with_data(self): ''' 处理数据,没有可不写。 :return: ''' pass def train(self): ''' 训练模型,必须实现此方法 :return: ''' pass if __name__ == '__main__': main = Main() main.download_data() main.train() 前面导入几个库,我们不用管它,且看“项目的超参,不使用可以删除”这行注释下面这几行代码,其作用就是接收我们训练设置对话框里的BATCH和EPOCH的参数,如果没有用到,就不用管它,删掉也没事。在代码的下半部分,我们可以看到有一个叫Main的类,这里只有一个框架,我们得把这个类写完整。样例里,Main类有三个功能:download_data,deal_with_data,train。如有需要,我们也可以添加其它功能,其实我们只要完成这三个功能就够了,下面分别阐述。 1、download_data功能函数它的作用就是从服务器上面下载数据,它将在当前目录建立一个data文件夹,下载的训练数据就存在这个文件夹里,如果是本地调试下载,只能下载部分数据,线上训练时才能下载全部数据。图片数据存在data/input/ButterflyClassification/image目录下,data/input/ButterflyClassification/目录下有个train.csv文件,这个文件很重要,是图片数据的标签文件,里面是对应图片的分类信息,我们打开这个文件看一下:第一栏,image_path栏下是图片路径信息,需要注意的是路径不是完整的,我们在代码中读取后,还需要补全路径。第二栏,label栏是图片标签信息。这就是download_data实现的功能,我们不用管它,只需执行就可以了,官方已经把它实现好了。 2、deal_with_data功能函数该功能主要实现数据的读取和处理,得根据自身的需要来编写。先说读取,读取一般分两个步骤,先用pandas库读取train.csv文件内容,当然你可以用其它库。使用前别忘import pandas as pd, 代码如下: df = pd.read_csv(os.path.join(DATA_PATH, 'ButterflyClassification', 'train.csv')) image_path_list = df['image_path'].values label_list = df['label'].values image_path_list是路径信息列表,如[“image/5697.jpg”, “image/17805.jpg”,……],label_list是标签列表,如[“Danaus_chrysippus”, “Losaria_coon”,……],这两个列表长度一样,图片路径和标签是一一对应关系。如果要改动其中一个列表,那么另一个列表也要对应改动,才能保持一致。得到train.csv的信息后,我们进入第二个步骤,读取图片和建立标签,这里可以用cv2或者PIL读取图片,用对应的数字建立标签,这个比较简单。再说处理,图片数据处理略为复杂,不过主要有放大、缩小、剪裁、归一化、训练集和验证集划分等,一般来说,还要进行一些数据增强处理,比如亮度、对比度变化,随机剪裁、随机擦除、平移、翻转等,keras中有ImageDataGenerator,pytorch中有transforms,这里不过多述叙。总之数据的处理很重要,可以让模型训练得更加“鲁棒”,提高分数。数据处理完成后,就可以构建自己的模型,在图片分类中,我们一般都采用预训练模型,这样不用从头训练,也不需要庞大的数据训练,很快就能得一个不错的结果。我们可以在Main类之外,编写模型代码,也单独可以建立一个模型文件,如net.py,然后调用即可。 3、train功能函数顾名思义,该函数主要是完成模型的训练。在这里,我们可以定义模型的优化器、学习率、损失函数、训练次数等,最后将训练好的模型保存到MODEL_PATH路径下,MODEL_PATH的定义可以查看path.py文件。 下面以keras为例实现一个简易版的main.py: # -*- coding: utf-8 -*- import argparse import os import cv2 from flyai.data_helper import DataHelper from flyai.framework import FlyAI from path import MODEL_PATH from net import mymodel from keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau from tensorflow.compat.v1 import ConfigProto from tensorflow.compat.v1 import InteractiveSession # tensorflow2.1 keras2.3.1需要加入下面三行,否则在服务器上运行会报错 config = ConfigProto() config.gpu_options.allow_growth = True session = InteractiveSession(config=config) ''' 此项目为FlyAI2.0新版本框架,数据读取,评估方式与之前不同 2.0框架不再限制数据如何读取 样例代码仅供参考学习,可以自己修改实现逻辑。 模版项目下载支持 PyTorch、Tensorflow、Keras、MXNET、scikit-learn等机器学习框架 第一次使用请看项目中的:FlyAI2.0竞赛框架使用说明.html 使用FlyAI提供的预训练模型可查看:https://www.flyai.com/models 学习资料可查看文档中心:https://doc.flyai.com/ 常见问题:https://doc.flyai.com/question.html 遇到问题不要着急,添加小姐姐微信,扫描项目里面的:FlyAI小助手二维码-小姐姐在线解答您的问题.png ''' if not os.path.exists(MODEL_PATH): os.makedirs(MODEL_PATH) # 项目的超参,不使用可以删除 parser = argparse.ArgumentParser() parser.add_argument("-e", "--EPOCHS", default=10, type=int, help="train epochs") parser.add_argument("-b", "--BATCH", default=32, type=int, help="batch size") args = parser.parse_args() class_dict = {'Danaus_chrysippus': 0, 'Losaria_coon': 1,......} # 这里略写了,需要补充 class Main(FlyAI): ''' 项目中必须继承FlyAI类,否则线上运行会报错。 ''' def download_data(self): # 根据数据ID下载训练数据 data_helper = DataHelper() data_helper.download_from_ids("ButterflyClassification") def deal_with_data(self): ''' 处理数据,没有可不写。 :return: ''' df = pd.read_csv(os.path.join(DATA_PATH, 'ButterflyClassification', 'train.csv')) image_path_list = df['image_path'].values label_list = df['label'].values x_data = [] y_data = [] for image_path, label in zip(image_path_list, label_list): image_path = os.path.join(DATA_PATH, 'ButterflyClassification', image_path) image = cv2.imread(image_path) image = cv2.resize(image, (224, 224)) x_data.append(image) y_data.append(class_dict[label]) self.x_data = np.array(x_data) / 255. self.y_data = np.array(y_data) def train(self): ''' 训练模型,必须实现此方法 :return: ''' model = mymodel() # 这个model需要自己编写 model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['acc']) model_path = os.path.join(MODEL_PATH, 'model.h5') mp = ModelCheckpoint(filepath=model_path, save_best_only=True, save_weights_only=False, mode='min', monitor='val_loss', verbose=1) reduce_lr = ReduceLROnPlateau(monitor='val_loss', mode='auto', factor=0.1, patience=3, verbose=1) el = EarlyStopping(monitor='val_loss', patience=5, verbose=1, mode='min') cllist = [mp, reduce_lr, el] batch_size = 16 his = finalmodel.fit(self.x_data, self.y_data, batch_size=batch_size, verbose=2, epochs=20, validation_split=0.1, callbacks=cllist, ) if __name__ == '__main__': main = Main() main.download_data() main.train() 可以看出,在main.py文件里,我们主要完成deal_with_data和train功能函数的编写,还有模型的建立,有数据有模型就要以进行训练了。 二、prediction.py在评论里看到,有的小伙伴不懂得prediction.py的编写,这个可是个收获果实的文件,训练半天,保存了模型,就需要这个文件来调用,在测试集上进行验证,得到模型分数。好吧,我们先来看文件结构: # -*- coding: utf-8 -* from flyai.framework import FlyAI class Prediction(FlyAI): def load_model(self): ''' 模型初始化,必须在此方法中加载模型 ''' pass def predict(self, image_path): ''' 模型预测返回结果 :param input: 评估传入样例 {"image_path": "./data/input/image/172691.jpg"} :return: 模型预测成功中,返回预测结果格式 {"label": "Losaria_coon"} ''' # 假设图片image_path对应的标签为Losaria_coon, 则返回格式为 {"label": "Losaria_coon"} return {"label": "Losaria_coon"} 该文件中只有一个类Prediction,该类有两个功能,一个是load_model,另一个是predict。 1、load_model该功能主要实现模型载入,就是将前面训练好,保存在MODEL_PATH路径下面的模型文件载入。以keras为例,如果前面保存在MODEL_PATH路径下面的模型名称为model.h5,那么在load_model功能中我们可以这样写: model_path = os.path.join(MODEL_PATH, 'model.h5') self.model = load_model(model_path) 当然,别忘了在文件头添加相应的import。 import os from path import MODEL_PATH from keras.models import load_model 这样就实现了模型的载入,至于keras和pytorch模型载入方法,可以查找一下相关资料,挺简单的! 2、predict很明显,这里就是实现模型的预测功能,它有一个参数image_path,这个是需要预测的图片路径,直接读取就行, 如: image = cv2.imread(image_path) 再用main.py文件中的方法调整图片的大小和归一化。 下面简要以keras为例,写一个简易版的prediction.py文件: # -*- coding: utf-8 -* import os import numpy as np import cv2 from flyai.framework import FlyAI from keras.models import load_model from path import MODEL_PATH, DATA_PATH from tensorflow.compat.v1 import ConfigProto from tensorflow.compat.v1 import InteractiveSession # tensorflow2.1 keras2.3.1需要加入下面三行,否则在服务器上运行会报错 config = ConfigProto() config.gpu_options.allow_growth = True session = InteractiveSession(config=config) class_num = ['Danaus_chrysippus', 'Losaria_coon',......] class Prediction(FlyAI): def load_model(self): ''' 模型初始化,必须在此方法中加载模型 ''' model_path = os.path.join(MODEL_PATH, 'model.h5') self.model = load_model(model_path) def predict(self, image_path): ''' 模型预测返回结果 :param input: 评估传入样例 {"image_path": "./data/input/image/172691.jpg"} :return: 模型预测成功中,返回预测结果格式 {"label": "Losaria_coon"} # 假设图片image_path对应的标签为Losaria_coon, 则返回格式为 {"label": "Losaria_coon"} ''' image = cv2.imread(image_path) image = cv2.resize(image, (224, 224)) image = np.array([image]) / 255. preds = self.model.predict(image)[0] pred = np.argmax(preds) return {"label": class_num[int(pred)]} 这样一个简易版的就完成了,小伙伴你学废了么? 总之,就是要多写多练,不要怕出错,有什么不明白,你就去撩小助手,小助手可是个单身的、漂亮的妹纸哦! pass:由于时间仓促,如有错漏,概不负责!!!

2

图像分类

天涯·明月·刀·2021-05-10 10:42 3 阅读 101
文章Flyai人工智能社区参赛指南—非官方版
Flyai人工智能社区参赛指南—非官方版        牛年新春马上到来,在这里,先祝小伙伴们新春大吉,工作顺利,合家幸福!再祝小助手,天天美美哒!         不知不觉,在社区混迹有两年余,通过打怪升级,从一个AI新手成功的晋升为AI菜鸟!别人是天才,我是“抄添裁”,代码靠抄,适度增添,合理剪裁!ctrl+C(V)法力无边,千秋万代,一统江湖!为了给新手们提供点方便,让他们尽快上手,于是就想写个参赛指南,尽一个菜鸟应尽的责任和义务(主要还是因为新手看完官方的指南,往往找不着北)。下面,是依据flyai2.0 windows版本,结合自己的经验编写,不足之处,大家多多指正。          一、主页面         在浏览器中,输入www.flyai.com,进入官方网站主页面:         主页面上方有赛事、学习圈、实验室,以及自己的用户名。         当前主页面就是赛事页面,可以看到当前的比赛,你可以参加自己感兴趣的比赛。学习圈发布的主要是小助手不知道从哪里抄来的文章、其他小伙伴的水文和答辩经历等。实验室,是个什么东东,估计和google colab类似的东西吧,我也想进去看看。         参加比赛活动,需要个人注册,很简单,微信一扫基本OK了,然后官方会送1000的FlyAI分值,这些分值比赛会用到,基本是线上训练一分钟,就消耗1个FlyAI分值。曾经就有1000分在我面前,我没珍惜,直到用光,我才后悔莫急……现在主要靠每日签到赚个二三十分,痛并快乐着!         二、项目页面         注册好后,现在我们选取一个比赛来参加吧,比如上图第一个,心率不齐病症检测赛。         页面上,我们可以看到赛事介绍,竞赛排行榜,讨论等内容。我们通过阅读赛事介绍,可以了解比赛的相关内容,竞赛排行榜可以看到小伙伴们的比赛成绩,你还可以在讨论页面发表高论,赚个20FlyAI分。         好吧,我们现在要做的就是报名参赛,点击右上方那个金光灿灿的“报名参赛“按钮,然后,报名参赛就变成了查看样例,再点击”查看样例“,进入代码页面。         左侧边栏,是代码的文件,这里有五个,分别是app.json,main.py,path.py,prediction.py和requirements.txt。        app.json:这个是该项目的配置文件,不用管它。         main.py:这是代码的主要文件,将代码上传到服务器后,服务器就会自动执行该文件的代码,这个也可以说是代码的入口文件,我们将在这个文件中实现程序的主要功能,比如说建立模型、处理数据、训练模型、保存模型等。         path.py:这里面是路径的配置,默认有DATA_PATH和MODEL_PATH,你可以添加自己的路径配置。         prediction.py:main.py执行完毕后,服务器将调用该文件的Prediction类,依次调用这个类的load_model和predict功能,完成模型的验证,根据模型提交的结果,给出得分。         requirements.txt:这里面是该项目用到的包,比如你代码里面用的pytorch版本是1.4,那么,你可以在这个文件里面加入torch==1.4.0,官方服务器一般会把常用的包安装好了,比如说numpy、pandas等,一些不常用的,而你的代码里需要用到的,就可以在这个文件里面添加。         理论上,我们只要按照代码框架,完成main.py和prediction.py这两个文件代码的编写就行了。         右侧栏,就是相应文件的内容,我们可以在电脑上进行更改,完成后,点击下方的提交训练,就可以上传代码到服务器端进行训练。         右侧栏上方最右边,有三个按钮,分别是下载代码、上传代码、查看代码。下载代码,即可将代码下载到本地编辑,完成后,再打包,点击上传代码,即可将线下代码上传到线上;点击查看代码,可以查看以前提交过的代码,方便修改。         三、下载代码         线上编写,有很多不方便,我们可以下载下来线下编写调试。点击下载代码,解压后,可以得到如下文件:         可以看到,会比线上多出几个文件,多出的文件不用管它,我们只看这个flyai.exe,双击运行它,如下图:         在这里,可以点击“下载数据“按钮,下载部分数据到线下,这时会在当前文件下建立一个data文件夹,用来保存下载数据。点击”提交GPU训练“按钮,即可以将当前目录下的相关文件上传到服务器端进行训练。         我们先来看看下载下来的样例数据,样例数据只是全部数据的一部分,主要是用来线下调试代码的。比如,在路径“data/input/ArrhythmiaClassification/“下,有个train.csv文件,打开看看:         其中,data就是模型训练需要用到的数据,label是标签。在图像类的比赛,第一栏一般是图片的路径,不同的比赛,略有不同。         现在,就可以按照main.py和prediction.py里面样例框架,编写自己的代码,值得注意的是,训练好的模型文件要保存在“data/output/model”路径下面。根据需要,也可以另外新建文件夹或者文件。完成代码编写后,可先用线下数据,本地运行一下,看有没有什么BUG,确定无误后,就可以上传代码,到线上训练了。         四、上传代码         上传代码有两种方式,一种是直接点击上面flyai.exe图中的“提交GPU训练”,程序就会自动将当前文件夹下面的文件上传到线上,这里需要注意的是,有些我们自己建立的文件夹可能不会上传,所以,如果有新建的文件夹,建议不要采取这种方式上传代码。         另外一种上传方式是手动打包上传,这里要将app.json、main.py、prediction.py、path.py、requirements.txt这五个文件,加上我们新建的文件和文件夹,一起打包成压缩包,然后点击下图中“下载代码”旁边的上传按钮,就可以把我们的文件上传到线上了,再点击“提交训练”按钮,就可以训练了。        五、线上运行         这时候,我们可以查看程序运行状态,如下图:         由于线上服务器数量有限,可能需要排队,只能耐心等待,如果不想等,可以点击“取消排队”,但是不能重复提交代码训练,一次只能训练一个任务,如果提交两个以上任务,就会出现上图中的“已加入队列!之前任务完成后,将自动开始训练”,不过有时候,网站后台代码有问题,也会出现这种情况,这时候,就需要我们美丽的Flyai小助手出场了,将你遇见的问题,跟她反馈,一般都能很好解决。         训练日志内容会显示requirement.txt里面安装包的安装状况,之后就会显示执行main.py输出显示的内容,比如print。如果程序有错误,会显示错误提示,停止训练。         每训练一次,都会消耗我们的FLYAI分值,如果分值为零,就不能再进行训练了。获取积分的方式有每日签到、每日讨论、写写文章,邀请朋友和充钱等等,也可以坚持签到,像我一样发发文章,Fai值自然而来!         好了,基本上,常用的东西就是这样了,不过,各位一定确定以及肯定还会碰到各种问题,可以问小助手,边打怪边升级,钢铁就是这样练废的!

11

数据分析

天涯·明月·刀·2021-02-22 17:04 6 阅读 561
文章Python数模笔记-Sklearn(1) 介绍
1、SKlearn 是什么   Sklearn(全称 SciKit-Learn),是基于 Python 语言的机器学习工具包。   Sklearn 主要用Python编写,建立在 Numpy、Scipy、Pandas 和 Matplotlib 的基础上,也用 Cython编写了一些核心算法来提高性能。   Sklearn 包括六大功能模块: 分类(Classification):识别样本属于哪个类别,常用算法有 SVM(支持向量机)、nearest neighbors(最近邻)、random forest(随机森林) 回归(Regression):预测与对象相关联的连续值属性,常用算法有 SVR(支持向量机)、 ridge regression(岭回归)、Lasso 聚类(Clustering):对样本进行无监督的自动分类,常用算法有 k-Means(k均值)、spectral clustering(特征聚类)、mean-shift(均值漂移) 数据降维(Dimensionality reduction):减少相关变量维数,常用算法有 PCA(主成分分析)、feature selection(特征选择)、non-negative matrix factorization(非负矩阵分解) 模型选择(Model Selection):比较,验证,选择参数和模型,常用模块有 grid search(网格搜索)、cross validation(交叉验证)、 metrics(度量) 数据处理 (Preprocessing):特征提取和归一化,常用模块有 preprocessing(预处理),feature extraction(特征提取) 这六个功能模块涉及 4类算法,分类、回归 属于监督学习,聚类属于非监督学习。   官网地址:https://scikit-learn.org/   官方文档中文版: https://www.scikitlearn.com.cn/   内置数据集:https://scikit-learn.org/stable/datasets.html 2、SKlearn 的安装   Sklearn 的安装要求:Python 3.5 以上版本,需要安装 NumPy、SciPy、Pandas 工具包的支持,部分内容需要使用 Matplotlib、joblib 工具包。   pip 安装命令:    pip3 install -U scikit-learnpip3 install -U scikit-learn -i https://pypi.douban.com/simple   注意 Sklearn 建议安装 Numpy+mkl,可以在网址http://www.lfd.uci.edu/~gohlke/pythonlibs/ 找到你需要的numpy+mkl版本,下载后 pip3安装: pip install numpy-1.11.1+mkl-cp27-cp27m-win_amd64.whl 3、SKlearn 内置数据集   Sklearn 内置了一些标准数据集可以用于练习和测试,都是经常被引用的经典问题,数据网址:https://scikit-learn.org/stable/datasets.html    Sklearn 标准数据集主要包括: 测试问题数据集波士顿房价:Boston house prices dataset鸢尾花问题:Iris plants dataset糖尿病数据:Diabetes dataset手写数字的识别:Optical recognition of handwritten digits dataset体能训练:Linnerrud dataset葡萄酒鉴别:Wine recognition dataset威斯康星州癌症诊断:reast cancer wisconsin (diagnostic) dataset实际问题数据集人脸数据:The Olivetti faces dataset20个新闻文本数据:The 20 newsgroups text dataset标记的人脸数据:The Labeled Faces in the Wild face recognition dataset森林覆盖类型:Forest covertypes路透社新闻数据:RCV1 dataset网络入侵检测数据:Kddcup 99 dataset加州住房数据:California Housing dataset 4、Sklearn 数模笔记的计划   粗略看看 Sklearn 的文档,是一个功能强大和丰富的机器学习库,远远超出了数学建模学习的范围。  基于数模教学的目的,本系列主要对应数模学习中的分类、聚类、降维问题,并不打算全面讲解 Sklearn 的各种算法,而是以典型问题为例来介绍原理简单、使用广泛的基本方法,以便新手入门。

0

Python3

AI小助手·2021-05-14 10:28 1 阅读 33
文章Python学习笔记-StatsModels 统计回归——可视化
1、如何认识可视化?   图形总是比数据更加醒目、直观。解决统计回归问题,无论在分析问题的过程中,还是在结果的呈现和发表时,都需要可视化工具的帮助和支持。    需要指出的是,虽然不同绘图工具包的功能、效果会有差异,但在常用功能上相差并不是很大。与选择哪种绘图工具包相比,更重要的是针对不同的问题,需要思考选择什么方式、何种图形去展示分析过程和结果。换句话说,可视化只是手段和形式,手段要为目的服务,形式要为内容服务,这个关系一定不能颠倒了。    因此,可视化是伴随着分析问题、解决问题的过程而进行思考、设计和实现的,而且还会影响问题的分析和解决过程: 可视化工具是数据探索的常用手段 回归分析是基于数据的建模,在导入数据后首先要进行数据探索,对给出的或收集的数据有个大概的了解,主要包括数据质量探索和数据特征分析。数据准备中的异常值分析,往往就需要用到箱形图(Boxplot)。对于数据特征的分析,经常使用频率分布图或频率分布直方图(Hist),饼图(Pie)。 分析问题需要可视化工具的帮助 对于问题中变量之间的关系,有些可以通过定性分析来确定或猜想,需要进一步的验证,有些复杂关系难以由分析得到,则要通过对数据进行初步的相关分析来寻找线索。在分析问题、尝试求解的过程中,虽然可以得到各种统计量、特征值,但可视化图形能提供更快捷、直观、丰富的信息,对于发现规律、产生灵感很有帮助。 解题过程需要可视化工具的支持 在解决问题的过程中,也经常会希望尽快获得初步的结果、总体的评价,以便确认解决问题的思路和方法是否正确。这些情况下,我们更关心的往往是绘图的便捷性,图形的表现效果反而是次要的。 可视化是结果发布的重要内容 问题解决之后需要对结果进行呈现或发表,这时则需要结合表达的需要,特别是表达的逻辑框架,设计可视化的方案,选择适当的图形种类和形式,准备图形数据。在此基础上,才谈得上选择何种绘图工具包,如何呈现更好的表现效果。 2、StatsModels 绘图工具包 (Graphics)   Statsmodels 本身支持绘图功能(Graphics),包括拟合图(Fit Plots)、箱线图(Box Plots)、相关图(Correlation Plots)、函数图(Functional Plots)、回归图(Regression Plots)和时间序列图(Time Series Plots)。    Statsmodels 内置绘图功能 Graphics 的使用似乎并不流行,网络上的介绍也不多。分析其原因,一是 Graphics 做的并不太好用,文档和例程不友好,二是学习成本高:能用通用的可视化包实现的功能,何必还要花时间去学习一个专用的 Graphics?    下面是 Statsmodels 官方文档的例程,最简单的单变量线性回归问题,绘制样本数据散点图和拟合直线图。Graphics 提供了将拟合与绘图合二为一的函数 qqline(),但是为了绘制出样本数据则要调用 Matplotlib 的 matplotlib.pyplot.scatter(),所以… import statsmodels.api as sm import matplotlib.pyplot as plt from statsmodels.graphics.gofplots import qqline foodexp = sm.datasets.engel.load(as_pandas=False) x = foodexp.exog y = foodexp.endog ax = plt.subplot(111) plt.scatter(x, y) qqline(ax, "r", x, y) plt.show()   右图看起来有点象 Seaborn中的 relplot,但把官方文档研究了半天也没搞明白,只好直接分析例程和数据,最后的结论是:基本没啥用。   这大概就是更多用户直接选择 Python 的可视化工具包进行绘图的原因吧。最常用的当属 Matplotlib 无疑,而在统计回归分析中 Seaborn 绘图工具包则更好用更炫酷。 3、Matplotlib 绘图工具包   Matplotlib 绘图包就不用介绍了。Matplotlib 用于 Statsmodels 可视化,最大的优势在于Matplotlib 谁都会用,实现统计回归的基本图形的也很简单。如果需要复杂的图形,炫酷的效果,虽然 Matplotlib 原理上也能实现,但往往需要比较繁琐的数据准备,并不常用的函数和参数设置。既然学习成本高,出错概率大,就没必要非 Matplotlib 不可了。   Matplotlib 在统计回归问题中经常用到的是折线图、散点图、箱线图和直方图。这也是 Matplotlib 最常用的绘图形式,本系列文中也有相关例程,本文不再具体介绍相关函数的用法。   例如,在本系列《Python学习笔记-StatsModels 统计回归(2)线性回归》的例程和附图,不仅显示了原始检测数据、理论模型数据、拟合模型数据,而且给出了置信区间的上下限,看起来还是比较“高级”的。但是,如果把置信区间的边界线隐藏起来,图形马上就显得不那么“高级”,比较“平常”了——这就是选择什么方式、何种图形进行展示的区别。   由此所反映的问题,还是表达的逻辑和数据的准备:要表达什么内容,为什么要表达这个内容,有没有相应的数据?问题的关键并不是什么工具包或什么函数,更不是什么颜色什么线性,而是有没有置信区间上下限的数据。   如果需要复杂的图形,炫酷的效果,虽然 Matplotlib 原理上也能实现,但往往需要比较繁琐的数据准备,使用并不常用的函数和参数设置。学习成本高,出错概率大,就没必要非 Matplotlib 不可了。    4、Seaborn 绘图工具包   Seaborn 是在 Matplotlib 上构建的,支持 Scipy 和 Statamodels 的统计模型可视化,可以实现: 赏心悦目的内置主题及颜色主题展示和比较 一维变量、二维变量 各变量的分布情况可视化 线性回归模型中的独立变量和关联变量可视化 矩阵数据,通过聚类算法探究矩阵间的结构可视化 时间序列,展示不确定性复杂的可视化,如在分割区域制图   Seaborn 绘图工具包以数据可视化为中心来挖掘与理解数据,本身就带有一定的统计回归功能,而且简单好用,特别适合进行定性分析、初步评价。   下图给出了几种常用的 Seaborn 图形,分别是带拟合线的直方图(distplot)、箱线图(boxplot)、散点图(scatterplot)和回归图(regplot),后文给出了对应的程序。   实际上,这些图形用 StatsModels Graphics、Matplotlib 也可以绘制,估计任何绘图包都可以实现。那么,为什么还要推荐 Seaborn 工具包,把这些图归入 Seaborn 的实例呢?我们来看看实现的例程就明白了:简单,便捷,舒服。不需要数据准备和变换处理,直接调用变量数据,自带回归功能;不需要复杂的参数设置,直接给出舒服的图形,自带图形风格设计。    fig1, axes = plt.subplots(2, 2, figsize=(10, 8)) # 创建一个 2行 2列的画布 sns.distplot(df['price'], bins=10, ax=axes[0, 0]) # axes[0,1] 左上图 sns.boxplot(df['price'], df['sales'], data=df, ax=axes[0, 1]) # axes[0,1] 右上图 sns.scatterplot(x=df['advertise'], y=df['sales'], ax=axes[1, 0]) # axes[1,0] 左下图 sns.regplot(x=df['difference'], y=df['sales'], ax=axes[1, 1]) # axes[1,1] 右下图 plt.show() 5、多元回归案例分析(Statsmodels) 5.1 问题描述   数据文件中收集了 30个月本公司牙膏销售量、价格、广告费用及同期的市场均价。  (1)分析牙膏销售量与价格、广告投入之间的关系,建立数学模型;  (2)估计所建立数学模型的参数,进行统计分析;  (3)利用拟合模型,预测在不同价格和广告费用下的牙膏销售量。   * 本问题及数据来自:姜启源、谢金星,数学模型(第 3版),高等教育出版社。 5.2 问题分析   本案例在《Python学习笔记-StatsModels 统计回归(3)模型数据的准备》中就曾出现,文中还提到该文的例程并不是最佳的求解方法和结果。这是因为该文例程是直接将所有给出的特征变量(销售价格、市场均价、广告费、价格差)都作为自变量,直接进行线性回归。谢金星老师说,这不科学。科学的方法是先分析这些特征变量对目标变量(销量)的影响,然后选择能影响目标的特征变量,或者对特征变量进行适当变换(如:平方、对数)后,再进行线性回归。以下参考视频教程中的解题思路进行分析。 观察数据分布特征   案例问题的数据量很小,数据完整规范,实际上并不需要进行数据探索和数据清洗,不过可以看一下数据的分布特性。例程和结果如下,我是没看出什么名堂来,与正态分布的差距都不小。 # 数据探索:分布特征 fig1, axes = plt.subplots(2, 2, figsize=(10, 8)) # 创建一个 2行 2列的画布 sns.distplot(dfData['price'], bins=10, ax=axes[0,0]) # axes[0,1] 左上图 sns.distplot(dfData['average'], bins=10, ax=axes[0,1]) # axes[0,1] 右上图 sns.distplot(dfData['advertise'], bins=10, ax=axes[1,0]) # axes[1,0] 左下图 sns.distplot(dfData['difference'], bins=10, ax=axes[1,1]) # axes[1,1] 右下图 plt.show() 观察数据间的相关性  既然将所有特征变量都作为自变量直接进行线性回归不科学,就要先对每个自变量与因变量的关系进行考察。 # 数据探索:相关性 fig2, axes = plt.subplots(2, 2, figsize=(10, 8)) # 创建一个 2行 2列的画布 sns.regplot(x=dfData['price'], y=dfData['sales'], ax=axes[0,0]) sns.regplot(x=dfData['average'], y=dfData['sales'], ax=axes[0,1]) sns.regplot(x=dfData['advertise'], y=dfData['sales'], ax=axes[1,0]) sns.regplot(x=dfData['difference'], y=dfData['sales'], ax=axes[1,1]) plt.show()   单变量线性回归图还是很有价值的。首先上面两图(sales-price,sales-average)的数据点分散,与回归直线差的太远,说明与销量的相关性小——谢金星老师讲课中也是这样分析的。其次下面两图(sales-advertise,sales-difference)的线性度较高,至少比上图好多了,回归直线和置信区间也反映出线性关系。因此,可以将广告费(advertise)、价格差(difference)作为自变量建模进行线性回归。   进一步地,有人观察散点图后认为销量与广告费的关系(sales-advertise)更接近二次曲线,对此也可以通过回归图对 sales 与 advertise 进行高阶多项式回归拟合,结果如下图。 建模与拟合 模型 1:将所有特征变量都作为自变量直接进行线性回归,这就是《(3)模型数据的准备》中的方案。模型 2:选择价格差(difference)、广告费(advertise)作为自变量建模进行线性回归。模型 3:选择价格差(difference)、广告费(advertise)及广告费的平方项作为作为自变量建模进行线性回归。  下段给出了使用不同模型进行线性回归的例程和运行结果。对于这个问题的分析和结果讨论,谢金星老师在视频中讲的很详细,网络上也有不少相关文章。由于本文主要讲可视化,对结果就不做详细讨论了。 6、Python 例程(Statsmodels) 6.1 问题描述   数据文件中收集了 30个月本公司牙膏销售量、价格、广告费用及同期的市场均价。  (1)分析牙膏销售量与价格、广告投入之间的关系,建立数学模型;  (2)估计所建立数学模型的参数,进行统计分析;  (3)利用拟合模型,预测在不同价格和广告费用下的牙膏销售量。 6.2 Python 程序 # LinearRegression_v4.py # v4.0: 分析和结果的可视化 # 日期:2021-05-08 # Copyright 2021 YouCans, XUPT import numpy as np import pandas as pd import statsmodels.api as sm from statsmodels.sandbox.regression.predstd import wls_prediction_std import matplotlib.pyplot as plt import seaborn as sns # 主程序 def main(): # 读取数据文件 readPath = "../data/toothpaste.csv" # 数据文件的地址和文件名 dfOpenFile = pd.read_csv(readPath, header=0, sep=",") # 间隔符为逗号,首行为标题行 # 准备建模数据:分析因变量 Y(sales) 与 自变量 x1~x4 的关系 dfData = dfOpenFile.dropna() # 删除含有缺失值的数据 sns.set_style('dark') # 数据探索:分布特征 fig1, axes = plt.subplots(2, 2, figsize=(10, 8)) # 创建一个 2行 2列的画布 sns.distplot(dfData['price'], bins=10, ax=axes[0,0]) # axes[0,1] 左上图 sns.distplot(dfData['average'], bins=10, ax=axes[0,1]) # axes[0,1] 右上图 sns.distplot(dfData['advertise'], bins=10, ax=axes[1,0]) # axes[1,0] 左下图 sns.distplot(dfData['difference'], bins=10, ax=axes[1,1]) # axes[1,1] 右下图 plt.show() # 数据探索:相关性 fig2, axes = plt.subplots(2, 2, figsize=(10, 8)) # 创建一个 2行 2列的画布 sns.regplot(x=dfData['price'], y=dfData['sales'], ax=axes[0,0]) sns.regplot(x=dfData['average'], y=dfData['sales'], ax=axes[0,1]) sns.regplot(x=dfData['advertise'], y=dfData['sales'], ax=axes[1,0]) sns.regplot(x=dfData['difference'], y=dfData['sales'], ax=axes[1,1]) plt.show() # 数据探索:考察自变量平方项的相关性 fig3, axes = plt.subplots(1, 2, figsize=(10, 4)) # 创建一个 2行 2列的画布 sns.regplot(x=dfData['advertise'], y=dfData['sales'], order=2, ax=axes[0]) # order=2, 按 y=b*x**2 回归 sns.regplot(x=dfData['difference'], y=dfData['sales'], order=2, ax=axes[1]) # YouCans, XUPT plt.show() # 线性回归:分析因变量 Y(sales) 与 自变量 X1(Price diffrence)、X2(Advertise) 的关系 y = dfData['sales'] # 根据因变量列名 list,建立 因变量数据集 x0 = np.ones(dfData.shape[0]) # 截距列 x0=[1,...1] x1 = dfData['difference'] # 价格差,x4 = x1 - x2 x2 = dfData['advertise'] # 广告费 x3 = dfData['price'] # 销售价格 x4 = dfData['average'] # 市场均价 x5 = x2**2 # 广告费的二次元 x6 = x1 * x2 # 考察两个变量的相互作用 # Model 1:Y = b0 + b1*X1 + b2*X2 + e # # 线性回归:分析因变量 Y(sales) 与 自变量 X1(Price diffrence)、X2(Advertise) 的关系 X = np.column_stack((x0,x1,x2)) # [x0,x1,x2] Model1 = sm.OLS(y, X) # 建立 OLS 模型: Y = b0 + b1*X1 + b2*X2 + e result1 = Model1.fit() # 返回模型拟合结果 yFit1 = result1.fittedvalues # 模型拟合的 y 值 prstd, ivLow, ivUp = wls_prediction_std(result1) # 返回标准偏差和置信区间 print(result1.summary()) # 输出回归分析的摘要 print("\nModel1: Y = b0 + b1*X + b2*X2") print('Parameters: ', result1.params) # 输出:拟合模型的系数 # # Model 2:Y = b0 + b1*X1 + b2*X2 + b3*X3 + b4*X4 + e # 线性回归:分析因变量 Y(sales) 与 自变量 X1~X4 的关系 X = np.column_stack((x0,x1,x2,x3,x4)) #[x0,x1,x2,...,x4] Model2 = sm.OLS(y, X) # 建立 OLS 模型: Y = b0 + b1*X1 + b2*X2 + b3*X3 + e result2 = Model2.fit() # 返回模型拟合结果 yFit2 = result2.fittedvalues # 模型拟合的 y 值 prstd, ivLow, ivUp = wls_prediction_std(result2) # 返回标准偏差和置信区间 print(result2.summary()) # 输出回归分析的摘要 print("\nModel2: Y = b0 + b1*X + ... + b4*X4") print('Parameters: ', result2.params) # 输出:拟合模型的系数 # # Model 3:Y = b0 + b1*X1 + b2*X2 + b3*X2**2 + e # # 线性回归:分析因变量 Y(sales) 与 自变量 X1、X2 及 X2平方(X5)的关系 X = np.column_stack((x0,x1,x2,x5)) # [x0,x1,x2,x2**2] Model3 = sm.OLS(y, X) # 建立 OLS 模型: Y = b0 + b1*X1 + b2*X2 + b3*X2**2 + e result3 = Model3.fit() # 返回模型拟合结果 yFit3 = result3.fittedvalues # 模型拟合的 y 值 prstd, ivLow, ivUp = wls_prediction_std(result3) # 返回标准偏差和置信区间 print(result3.summary()) # 输出回归分析的摘要 print("\nModel3: Y = b0 + b1*X1 + b2*X2 + b3*X2**2") print('Parameters: ', result3.params) # 输出:拟合模型的系数 # 拟合结果绘图 fig, ax = plt.subplots(figsize=(8,6)) # YouCans, XUPT ax.plot(range(len(y)), y, 'b-.', label='Sample') # 样本数据 ax.plot(range(len(y)), yFit3, 'r-', label='Fitting') # 拟合数据 # ax.plot(range(len(y)), yFit2, 'm--', label='fitting') # 拟合数据 ax.plot(range(len(y)), ivUp, '--',color='pink',label="ConfR") # 95% 置信区间 上限 ax.plot(range(len(y)), ivLow, '--',color='pink') # 95% 置信区间 下限 ax.legend(loc='best') # 显示图例 plt.title('Regression analysis with sales of toothpaste') plt.xlabel('period') plt.ylabel('sales') plt.show() return if __name__ == '__main__': main() 6.3 程序运行结果: OLS Regression Results ============================================================================== Dep. Variable: sales R-squared: 0.886 Model: OLS Adj. R-squared: 0.878 Method: Least Squares F-statistic: 105.0 Date: Sat, 08 May 2021 Prob (F-statistic): 1.84e-13 Time: 22:18:04 Log-Likelihood: 2.0347 No. Observations: 30 AIC: 1.931 Df Residuals: 27 BIC: 6.134 Df Model: 2 Covariance Type: nonrobust ============================================================================== coef std err t P>|t| [0.025 0.975] ------------------------------------------------------------------------------ const 4.4075 0.722 6.102 0.000 2.925 5.890 x1 1.5883 0.299 5.304 0.000 0.974 2.203 x2 0.5635 0.119 4.733 0.000 0.319 0.808 ============================================================================== Omnibus: 1.445 Durbin-Watson: 1.627 Prob(Omnibus): 0.486 Jarque-Bera (JB): 0.487 Skew: 0.195 Prob(JB): 0.784 Kurtosis: 3.486 Cond. No. 115. ============================================================================== Model1: Y = b0 + b1*X + b2*X2 Parameters: const 4.407493 x1 1.588286 x2 0.563482 OLS Regression Results ============================================================================== Dep. Variable: sales R-squared: 0.895 Model: OLS Adj. R-squared: 0.883 Method: Least Squares F-statistic: 74.20 Date: Sat, 08 May 2021 Prob (F-statistic): 7.12e-13 Time: 22:18:04 Log-Likelihood: 3.3225 No. Observations: 30 AIC: 1.355 Df Residuals: 26 BIC: 6.960 Df Model: 3 Covariance Type: nonrobust ============================================================================== coef std err t P>|t| [0.025 0.975] ------------------------------------------------------------------------------ const 8.0368 2.480 3.241 0.003 2.940 13.134 x1 1.3832 0.288 4.798 0.000 0.791 1.976 x2 0.4927 0.125 3.938 0.001 0.236 0.750 x3 -1.1184 0.398 -2.811 0.009 -1.936 -0.300 x4 0.2648 0.199 1.332 0.195 -0.144 0.674 ============================================================================== Omnibus: 0.141 Durbin-Watson: 1.762 Prob(Omnibus): 0.932 Jarque-Bera (JB): 0.030 Skew: 0.052 Prob(JB): 0.985 Kurtosis: 2.885 Cond. No. 2.68e+16 ============================================================================== Model2: Y = b0 + b1*X + ... + b4*X4 Parameters: const 8.036813 x1 1.383207 x2 0.492728 x3 -1.118418 x4 0.264789 OLS Regression Results ============================================================================== Dep. Variable: sales R-squared: 0.905 Model: OLS Adj. R-squared: 0.894 Method: Least Squares F-statistic: 82.94 Date: Sat, 08 May 2021 Prob (F-statistic): 1.94e-13 Time: 22:18:04 Log-Likelihood: 4.8260 No. Observations: 30 AIC: -1.652 Df Residuals: 26 BIC: 3.953 Df Model: 3 Covariance Type: nonrobust ============================================================================== coef std err t P>|t| [0.025 0.975] ------------------------------------------------------------------------------ const 17.3244 5.641 3.071 0.005 5.728 28.921 x1 1.3070 0.304 4.305 0.000 0.683 1.931 x2 -3.6956 1.850 -1.997 0.056 -7.499 0.108 x3 0.3486 0.151 2.306 0.029 0.038 0.659 ============================================================================== Omnibus: 0.631 Durbin-Watson: 1.619 Prob(Omnibus): 0.729 Jarque-Bera (JB): 0.716 Skew: 0.203 Prob(JB): 0.699 Kurtosis: 2.362 Cond. No. 6.33e+03 ============================================================================== Model3: Y = b0 + b1*X1 + b2*X2 + b3*X2**2 Parameters: const 17.324369 x1 1.306989 x2 -3.695587 x3 0.348612

0

Python3

AI小助手·2021-05-12 11:11 0 阅读 39
文章Python学习笔记-StatsModels 统计回归——模型数据的准备
1、读取数据文件  回归分析问题所用的数据都是保存在数据文件中的,首先就要从数据文件读取数据。   数据文件的格式很多,最常用的是 .csv,.xls 和 .txt 文件,以及 sql 数据库文件的读取 。   使用 pandas 从数据文件导入数据的程序最为简单,示例如下:   (1)读取 .csv 文件: df = pd.read_csv("./example.csv", engine="python", encoding="utf_8_sig") # engine="python"允许处理中文路径,encoding="utf_8_sig"允许读取中文数据   (2)读取 .xls 文件: df = pd.read_excel("./example.xls", sheetname='Sheet1', header=0, encoding="utf_8_sig") # sheetname 表示读取的sheet,header=0 表示首行为标题行, encoding 表示编码方式   (3)读取 .txt 文件: df = pd.read_table("./example.txt", sep="\t", header=None) # sep 表示分隔符,header=None表示无标题行,第一行是数据 2、数据文件的拆分与合并  统计回归所需处理的数据量可能非常大,必要时需对文件进行拆分或合并,也可以用 pandas 进行处理,示例如下: (1)将 Excel 文件分割为多个文件 # 将 Excel 文件分割为多个文件 import pandas as pd dfData = pd.read_excel('./example.xls', sheetname='Sheet1') nRow, nCol = dfData.shape # 获取数据的行列 # 假设数据共有198,000行,分割为 20个文件,每个文件 10,000行 for i in range(0, int(nRow/10000)+1): saveData = dfData.iloc[i*10000+1:(i+1)*10000+1, :] # 每隔 10,000 fileName= './example_{}.xls'.format(str(i)) saveData.to_excel(fileName, sheet_name = 'Sheet1', index = False) (2)将 多个 Excel 文件合并为一个文件 # 将多个 Excel 文件合并为一个文件 import pandas as pd ## 两个 Excel 文件合并 #data1 = pd.read_excel('./example0.xls', sheetname='Sheet1') #data2 = pd.read_excel('./example1.xls', sheetname='Sheet1') #data = pd.concat([data1, data2]) # 多个 Excel 文件合并 dfData = pd.read_excel('./example0.xls', sheetname='Sheet1') for i in range(1, 20): fileName = './example_{}.xls'.format(str(i)) dfNew = pd.read_excel(fileName) dfData = pd.concat([dfData, dfNew]) dfData.to_excel('./example', index = False) 3、数据的预处理  在实际工作中,在开始建立模型和拟合分析之前,还要对原始数据进行数据预处理(data preprocessing),主要包括:缺失值处理、重复数据处理、异常值处理、变量格式转换、训练集划分、数据的规范化、归一化等。   数据预处理的很多内容已经超出了 Statsmodels 的范围,在此只介绍最基本的方法:   (1)缺失数据的处理   导入的数据存在缺失是经常发生的,最简单的处理方式是删除缺失的数据行。使用 pandas 中的 .dropna() 删除含有缺失值的行或列,也可以 对特定的列进行缺失值删除处理 。 dfNew = dfData.dropna(axis = 0)) # 删除含有缺失值的行   有时也会填充缺失值或替换缺失值,在此就不做介绍了。    (2)重复数据的处理    对于重复数据,通常会删除重复行。使用 pandas 中的 .duplicated() 可以查询重复数据的内容,使用 .drop_duplicated() 可以删除重复数据,也可以对指定的数据列进行去重。    dfNew = dfData.drop_duplicates(inplace=True) # 删除重复的数据行   (3)异常值处理   数据中可能包括异常值, 是指一个样本中的数值明显偏离样本集中其它样本的观测值,也称为离群点。异常值可以通过箱线图、正态分布图进行识别,也可以通过回归、聚类建模进行识别。    箱线图技术是利用数据的分位数识别其中的异常点。箱形图分析也超过本文的内容,不能详细介绍了。只能笼统地说通过观察箱形图,可以查看整体的异常情况,进而发现异常值。 dfData.boxplot() # 绘制箱形图   对于异常值通常不易直接删除,需要结合具体情况进行考虑和处理。使用 pandas 中的 .drop() 可以直接删除异常值数据行,或者使用判断条件来判定并删除异常值数据行。 # 按行删除,drop() 默认 axis=0 按行删除 dfNew = dfData.drop(labels=0) # 按照行号 labels,删除 行号为 0 的行 dfNew = dfData.drop(index=dfData[dfData['A']==-1].index[0]) # 按照条件检索,删除 dfData['A']=-1 的行 4、Python 例程(Statsmodels)4.1 问题描述  数据文件中收集了 30个月本公司牙膏销售量、价格、广告费用及同期的市场均价。  (1)分析牙膏销售量与价格、广告投入之间的关系,建立数学模型;  (2)估计所建立数学模型的参数,进行统计分析;  (3)利用拟合模型,预测在不同价格和广告费用下的牙膏销售量。    本问题及数据来自:姜启源、谢金星,数学模型(第 3版),高等教育出版社。  需要说明的是,本文例程并不是问题最佳的求解方法和结果,只是使用该问题及数据示范读取数据文件和数据处理的方法。 4.2 Python 程序 # LinearRegression_v3.py # v1.0: 调用 statsmodels 实现一元线性回归 # v2.0: 调用 statsmodels 实现多元线性回归 # v3.0: 从文件读取数据样本 # 日期:2021-05-06 # Copyright 2021 YouCans, XUPT import numpy as np import pandas as pd import statsmodels.api as sm import matplotlib.pyplot as plt # 主程序 def main(): # 读取数据文件 readPath = "../data/toothpaste.csv" # 数据文件的地址和文件名 try: if (readPath[-4:] == ".csv"): dfOpenFile = pd.read_csv(readPath, header=0, sep=",") # 间隔符为逗号,首行为标题行 # dfOpenFile = pd.read_csv(filePath, header=None, sep=",") # sep: 间隔符,无标题行 elif (readPath[-4:] == ".xls") or (readPath[-5:] == ".xlsx"): # sheet_name 默认为 0 dfOpenFile = pd.read_excel(readPath, header=0) # 首行为标题行 # dfOpenFile = pd.read_excel(filePath, header=None) # 无标题行 elif (readPath[-4:] == ".dat"): # sep: 间隔符,header:首行是否为标题行 dfOpenFile = pd.read_table(readPath, sep=" ", header=0) # 间隔符为空格,首行为标题行 # dfOpenFile = pd.read_table(filePath,sep=",",header=None) # 间隔符为逗号,无标题行 else: print("不支持的文件格式。") print(dfOpenFile.head()) except Exception as e: print("读取数据文件失败:{}".format(str(e))) return # 数据预处理 dfData = dfOpenFile.dropna() # 删除含有缺失值的数据 print(dfData.dtypes) # 查看 df 各列的数据类型 print(dfData.shape) # 查看 df 的行数和列数 # colNameList = dfData.columns.tolist() # 将 df 的列名转换为列表 list # print(colNameList) # 查看列名列表 list # featureCols = ['price', 'average', 'advertise', 'difference'] # 筛选列,建立自变量列名 list # X = dfData[['price', 'average', 'advertise', 'difference']] # 根据自变量列名 list,建立 自变量数据集 # 准备建模数据:分析因变量 Y(sales) 与 自变量 x1~x4 的关系 y = dfData.sales # 根据因变量列名 list,建立 因变量数据集 x0 = np.ones(dfData.shape[0]) # 截距列 x0=[1,...1] x1 = dfData.price # 销售价格 x2 = dfData.average # 市场均价 x3 = dfData.advertise # 广告费 x4 = dfData.difference # 价格差,x4 = x1 - x2 X = np.column_stack((x0,x1,x2,x3,x4)) #[x0,x1,x2,...,x4] # 建立模型与参数估计 # Model 1:Y = b0 + b1*X1 + b2*X2 + b3*X3 + b4*X4 + e model = sm.OLS(y, X) # 建立 OLS 模型 results = model.fit() # 返回模型拟合结果 yFit = results.fittedvalues # 模型拟合的 y 值 print(results.summary()) # 输出回归分析的摘要 print("\nOLS model: Y = b0 + b1*X + ... + bm*Xm") print('Parameters: ', results.params) # 输出:拟合模型的系数 # 拟合结果绘图 fig, ax = plt.subplots(figsize=(10, 8)) ax.plot(range(len(y)), y, 'bo', label='sample') ax.plot(range(len(yFit)), yFit, 'r--', label='predict') ax.legend(loc='best') # 显示图例 plt.show() # YouCans, XUPT return if __name__ == '__main__': main() 4.3 程序运行结果:

0

Python3

AI小助手·2021-05-08 10:57 0 阅读 43
文章Python学习笔记-StatsModels 统计回归——线性回归
1、背景知识1.1 插值、拟合、回归和预测   插值、拟合、回归和预测,都是数学建模中经常提到的概念,而且经常会被混为一谈。 插值,是在离散数据的基础上补插连续函数,使得这条连续曲线通过全部给定的离散数据点。 插值是离散函数逼近的重要方法,利用它可通过函数在有限个点处的取值状况,估算出函数在其他点处的近似值。拟合,是用一个连续函数(曲线)靠近给定的离散数据,使其与给定的数据相吻合。   因此,插值和拟合都是根据已知数据点求变化规律和特征相似的近似曲线的过程,但是插值要求近似曲线完全经过给定的数据点,而拟合只要求近似曲线在整体上尽可能接近数据点,并反映数据的变化规律和发展趋势。插值可以看作是一种特殊的拟合,是要求误差函数为 0的拟合。由于数据点通常都带有误差,误差为 0 往往意味着过拟合,过拟合模型对于训练集以外的数据的泛化能力是较差的。因此在实践中,插值多用于图像处理,拟合多用于实验数据处理。 回归,是研究一组随机变量与另一组随机变量之间关系的统计分析方法,包括建立数学模型并估计模型参数,并检验数学模型的可信度,也包括利用建立的模型和估计的模型参数进行预测或控制。 预测是非常广泛的概念,在数模中是指对获得的数据、信息进行定量研究,据此建立与预测目的相适应的数学模型,然后对未来的发展变化进行定量地预测。通常认为,插值和拟合都是预测类的方法。   回归是一种数据分析方法,拟合是一种具体的数据处理方法。拟合侧重于曲线参数寻优,使曲线与数据相符;而回归侧重于研究两个或多个变量之间的关系。 1.2 线性回归   回归分析(Regression analysis)是一种统计分析方法,研究是自变量和因变量之间的定量关系,经常用于预测分析、时间序列模型以及发现变量之间的因果关系。按照变量之间的关系类型,回归分析可以分为线性回归和非线性回归。   线性回归(Linear regression) 假设给定数据集中的目标(y)与特征(X)存在线性关系,即满足一个多元一次方程 。 回归分析中,只包括一个自变量和一个因变量,且二者的关系可用一条直线近似表示,称为一元线性回归;如果包括两个或多个的自变量,且因变量和自变量之间是线性关系,则称为多元线性回归。    根据样本数据,采用最小二乘法可以得到线性回归模型参数的估计量,并使根据估计参数计算的模型数据与给定的样本数据之间误差的平方和为最小。   进一步地,还需要分析对于样本数据究竟能不能采用线性回归方法,或者说线性相关的假设是否合理、线性模型是否具有良好的稳定性?这就需要使用统计分析进行显著性检验,检验因变量与自变量之间的线性关系是否显著,用线性模型来描述它们之间的关系是否恰当。 2、Statsmodels 进行线性回归本节结合 Statsmodels 统计分析包 的使用介绍线性拟合和回归分析。线性模型可以表达为如下公式: 2.1 导入工具包 import statsmodels.api as sm from statsmodels.sandbox.regression.predstd import wls_prediction_std 2.2 导入样本数据   样本数据通常保存在数据文件中,因此要读取数据文件获得样本数据。为便于阅读和测试程序,本文使用随机数生成样本数据。读取数据文件导入数据的方法,将在后文介绍。 # 生成样本数据: nSample = 100 x1 = np.linspace(0, 10, nSample) # 起点为 0,终点为 10,均分为 nSample个点 e = np.random.normal(size=len(x1)) # 正态分布随机数 yTrue = 2.36 + 1.58 * x1 # y = b0 + b1*x1 yTest = yTrue + e # 产生模型数据   本案例是一元线性回归问题,(yTest,x)是导入的样本数据,我们需要通过线性回归获得因变量 y 与自变量 x 之间的定量关系。yTrue 是理想模型的数值,yTest 模拟实验检测的数据,在理想模型上加入了正态分布的随机误差。 2.3 建模与拟合   一元线性回归模型方程为:  y = β0 + β1 * x + e  先通过 sm.add_constant() 向矩阵 X 添加截距列后,再用 sm.OLS() 建立普通最小二乘模型,最后用 model.fit() 就能实现线性回归模型的拟合,并返回拟合与统计分析的结果摘要。 X = sm.add_constant(x1) # 向 x1 左侧添加截距列 x0=[1,...1] model = sm.OLS(yTest, X) # 建立最小二乘模型(OLS) results = model.fit() # 返回模型拟合结果   statsmodels.OLS 是 statsmodels.regression.linear_model 的函数,有 4个参数 (endog, exog, missing, hasconst)。   第一个参数 endog 是回归模型中的因变量 y(t), 是1-d array 数据类型。   第二个输入 exog 是自变量 x0(t),x1(t),…,xm(t),是(m+1)-d array 数据类型。  需要注意的是,statsmodels.OLS 的回归模型没有常数项,其形式为:  y = BX + e = β0x0 + β1*x1 + e, x0 = [1,…1]  而之前导入的数据 (yTest,x1) 并不包含 x0,因此需要在 x1 左侧增加一列截距列 x0=[1,…1],将自变量矩阵转换为 X = (x0, x1)。函数 sm.add_constant() 实现的就是这个功能。  参数 missing 用于数据检查, hasconst 用于检查常量,一般情况不需要。   2.4 拟合和统计结果的输出   Statsmodels 进行线性回归分析的输出结果非常丰富,results.summary() 返回了回归分析的摘要。 print(results.summary()) # 输出回归分析的摘要   摘要所返回的内容非常丰富,这里先讨论最重要的一些结果,在 summary 的中间段落。 coef:回归系数(Regression coefficient),即模型参数 β0、β1、…的估计值。 std err :标准差( Standard deviation),也称标准偏差,是方差的算术平方根,反映样本数据值与回归模型估计值之间的平均差异程度 。标准差越大,回归系数越不可靠。 t:t 统计量(t-Statistic),等于回归系数除以标准差,用于对每个回归系数分别进行检验,检验每个自变量对因变量的影响是否显著。如果某个自变量 xi的影响不显著,意味着可以从模型中剔除这个自变量。 P>|t|:t检验的 P值(Prob(t-Statistic)),反映每个自变量 xi 与因变量 y 的相关性假设的显著性。如果 p<0.05,可以理解为在0.05的显著性水平下变量xi与y存在回归关系,具有显著性。 [0.025,0.975]:回归系数的置信区间(Confidence interval)的下限、上限,某个回归系数的置信区间以 95%的置信度包含该回归系数 。注意并不是指样本数据落在这一区间的概率为 95%。 此外,还有一些重要的指标需要关注: R-squared:R方判定系数(Coefficient of determination),表示所有自变量对因变量的联合的影响程度,用于度量回归方程拟合度的好坏,越接近于 1说明拟合程度越好。 F-statistic:F 统计量(F-Statistic),用于对整体回归方程进行显著性检验,检验所有自变量在整体上对因变量的影响是否显著。   Statsmodels 也可以通过属性获取所需的回归分析的数据,例如: print("OLS model: Y = b0 + b1 * x") # b0: 回归直线的截距,b1: 回归直线的斜率 print('Parameters: ', results.params) # 输出:拟合模型的系数 yFit = results.fittedvalues # 拟合模型计算出的 y值 ax.plot(x1, yTest, 'o', label="data") # 原始数据 ax.plot(x1, yFit, 'r-', label="OLS") # 拟合数据 3、一元线性回归3.1 一元线性回归 Python 程序: # LinearRegression_v1.py # Linear Regression with statsmodels (OLS: Ordinary Least Squares) # v1.0: 调用 statsmodels 实现一元线性回归 # 日期:2021-05-04 import numpy as np import matplotlib.pyplot as plt import statsmodels.api as sm from statsmodels.sandbox.regression.predstd import wls_prediction_std # 主程序 def main(): # 主程序 # 生成测试数据: nSample = 100 x1 = np.linspace(0, 10, nSample) # 起点为 0,终点为 10,均分为 nSample个点 e = np.random.normal(size=len(x1)) # 正态分布随机数 yTrue = 2.36 + 1.58 * x1 # y = b0 + b1*x1 yTest = yTrue + e # 产生模型数据 # 一元线性回归:最小二乘法(OLS) X = sm.add_constant(x1) # 向矩阵 X 添加截距列(x0=[1,...1]) model = sm.OLS(yTest, X) # 建立最小二乘模型(OLS) results = model.fit() # 返回模型拟合结果 yFit = results.fittedvalues # 模型拟合的 y值 prstd, ivLow, ivUp = wls_prediction_std(results) # 返回标准偏差和置信区间 # OLS model: Y = b0 + b1*X + e print(results.summary()) # 输出回归分析的摘要 print("\nOLS model: Y = b0 + b1 * x") # b0: 回归直线的截距,b1: 回归直线的斜率 print('Parameters: ', results.params) # 输出:拟合模型的系数 # 绘图:原始数据点,拟合曲线,置信区间 fig, ax = plt.subplots(figsize=(10, 8)) ax.plot(x1, yTest, 'o', label="data") # 原始数据 ax.plot(x1, yFit, 'r-', label="OLS") # 拟合数据 ax.plot(x1, ivUp, '--',color='orange',label="upConf") # 95% 置信区间 上限 ax.plot(x1, ivLow, '--',color='orange',label="lowConf") # 95% 置信区间 下限 ax.legend(loc='best') # 显示图例 plt.title('OLS linear regression ') plt.show() return if __name__ == '__main__': #YouCans, XUPT main() 3.2 一元线性回归 程序运行结果: OLS Regression Results ============================================================================== Dep. Variable: y R-squared: 0.961 Model: OLS Adj. R-squared: 0.961 Method: Least Squares F-statistic: 2431. Date: Wed, 05 May 2021 Prob (F-statistic): 5.50e-71 Time: 16:24:22 Log-Likelihood: -134.62 No. Observations: 100 AIC: 273.2 Df Residuals: 98 BIC: 278.5 Df Model: 1 Covariance Type: nonrobust ============================================================================== coef std err t P>|t| [0.025 0.975] ------------------------------------------------------------------------------ const 2.4669 0.186 13.230 0.000 2.097 2.837 x1 1.5883 0.032 49.304 0.000 1.524 1.652 ============================================================================== Omnibus: 0.070 Durbin-Watson: 2.016 Prob(Omnibus): 0.966 Jarque-Bera (JB): 0.187 Skew: 0.056 Prob(JB): 0.911 Kurtosis: 2.820 Cond. No. 11.7 ============================================================================== OLS model: Y = b0 + b1 * x Parameters: [2.46688389 1.58832741] 4、多元线性回归4.1 多元线性回归 Python 程序: # LinearRegression_v2.py # Linear Regression with statsmodels (OLS: Ordinary Least Squares) # v2.0: 调用 statsmodels 实现多元线性回归 # 日期:2021-05-04 import numpy as np import matplotlib.pyplot as plt import statsmodels.api as sm from statsmodels.sandbox.regression.predstd import wls_prediction_std # 主程序 def main(): # 主程序 # 生成测试数据: nSample = 100 x0 = np.ones(nSample) # 截距列 x0=[1,...1] x1 = np.linspace(0, 20, nSample) # 起点为 0,终点为 10,均分为 nSample个点 x2 = np.sin(x1) x3 = (x1-5)**2 X = np.column_stack((x0, x1, x2, x3)) # (nSample,4): [x0,x1,x2,...,xm] beta = [5., 0.5, 0.5, -0.02] # beta = [b1,b2,...,bm] yTrue = np.dot(X, beta) # 向量点积 y = b1*x1 + ...+ bm*xm yTest = yTrue + 0.5 * np.random.normal(size=nSample) # 产生模型数据 # 多元线性回归:最小二乘法(OLS) model = sm.OLS(yTest, X) # 建立 OLS 模型: Y = b0 + b1*X + ... + bm*Xm + e results = model.fit() # 返回模型拟合结果 yFit = results.fittedvalues # 模型拟合的 y值 print(results.summary()) # 输出回归分析的摘要 print("\nOLS model: Y = b0 + b1*X + ... + bm*Xm") print('Parameters: ', results.params) # 输出:拟合模型的系数 # 绘图:原始数据点,拟合曲线,置信区间 prstd, ivLow, ivUp = wls_prediction_std(results) # 返回标准偏差和置信区间 fig, ax = plt.subplots(figsize=(10, 8)) ax.plot(x1, yTest, 'o', label="data") # 实验数据(原始数据+误差) ax.plot(x1, yTrue, 'b-', label="True") # 原始数据 ax.plot(x1, yFit, 'r-', label="OLS") # 拟合数据 ax.plot(x1, ivUp, '--',color='orange', label="ConfInt") # 置信区间 上届 ax.plot(x1, ivLow, '--',color='orange') # 置信区间 下届 ax.legend(loc='best') # 显示图例 plt.xlabel('x') plt.ylabel('y') plt.show() return if __name__ == '__main__': main() 4.2 多元线性回归 程序运行结果: OLS Regression Results ============================================================================== Dep. Variable: y R-squared: 0.932 Model: OLS Adj. R-squared: 0.930 Method: Least Squares F-statistic: 440.0 Date: Thu, 06 May 2021 Prob (F-statistic): 6.04e-56 Time: 10:38:51 Log-Likelihood: -68.709 No. Observations: 100 AIC: 145.4 Df Residuals: 96 BIC: 155.8 Df Model: 3 Covariance Type: nonrobust ============================================================================== coef std err t P>|t| [0.025 0.975] ------------------------------------------------------------------------------ const 5.0411 0.120 41.866 0.000 4.802 5.280 x1 0.4894 0.019 26.351 0.000 0.452 0.526 x2 0.5158 0.072 7.187 0.000 0.373 0.658 x3 -0.0195 0.002 -11.957 0.000 -0.023 -0.016 ============================================================================== Omnibus: 1.472 Durbin-Watson: 1.824 Prob(Omnibus): 0.479 Jarque-Bera (JB): 1.194 Skew: 0.011 Prob(JB): 0.551 Kurtosis: 2.465 Cond. No. 223. ============================================================================== OLS model: Y = b0 + b1*X + ... + bm*Xm Parameters: [ 5.04111867 0.4893574 0.51579806 -0.01951219] 5、附录:回归结果详细说明 Dep.Variable: y 因变量 Model:OLS 最小二乘模型 Method: Least Squares 最小二乘 No. Observations: 样本数据的数量 Df Residuals:残差自由度(degree of freedom of residuals) Df Model:模型自由度(degree of freedom of model) Covariance Type:nonrobust 协方差阵的稳健性 R-squared:R 判定系数 Adj. R-squared: 修正的判定系数 F-statistic: 统计检验 F 统计量 Prob (F-statistic): F检验的 P值 Log likelihood: 对数似然 coef:自变量和常数项的系数,b1,b2,...bm,b0 std err:系数估计的标准误差 t:统计检验 t 统计量 P>|t|:t 检验的 P值 [0.025, 0.975]:估计参数的 95%置信区间的下限和上限 Omnibus:基于峰度和偏度进行数据正态性的检验 Prob(Omnibus):基于峰度和偏度进行数据正态性的检验概率 Durbin-Watson:检验残差中是否存在自相关 Skewness:偏度,反映数据分布的非对称程度 Kurtosis:峰度,反映数据分布陡峭或平滑程度 Jarque-Bera(JB):基于峰度和偏度对数据正态性的检验 Prob(JB):Jarque-Bera(JB)检验的 P值。 Cond. No.:检验变量之间是否存在精确相关关系或高度相关关系。

0

Python3

AI小助手·2021-05-06 17:08 0 阅读 57
文章写给程序员的机器学习入门 (十四) - 对抗生成网络 如何造假脸
这篇文章将会教你怎样用机器学习来伪造假数据,题材还是人脸,以下六张人脸里面,有两张是假的,猜猜是哪两张????? 生成假人脸使用的网络是对抗生成网络 (GAN - Generative adversarial network),这个网络与之前介绍的比起来相当特殊,虽然看起来不算复杂,但训练起来极其困难,以下将从基础原理开始一直讲到具体代码,还会引入一些之前没有讲过的组件和训练方法????。 对抗生成网络 (GAN) 的原理所谓生成网络就是用于生成文章,音频,图片,甚至代码等数据的机器学习模型,例如我们可以给出一个需求让网络生成一份代码,如果网络足够强大,生成的代码质量足够好并且能满足需求,那码农们就要面临失业了????。当然,目前机器学习模型可以生成的数据比较有限并且质量都很一般,码农们的饭碗还是能保住一段时间的。 生成网络和普通的模型一样,要求有输入和输出,假设我们可以传入一些条件让网络生成符合条件的图片: 看起来非常好用,但训练这样的模型需要一个庞大的数据集,并且得一张张图片去标记它们的属性,实现起来会累死人。这篇文章介绍的对抗生成网络属于无监督学习,可以完全不需要给数据打标签,你只需要给模型认识一些真实数据,就可以让模型输出类似真实数据的假数据。对抗生成网络分为两部分,第一部分是生成器 (Generator),第二部分是识别器 (Discriminator),生成器负责根据随机条件生成数据,识别器负责识别数据是否为真。 训练对抗生成网络有两大目标,这两大目标是矛盾的,这就是为什么我们叫对抗生成网络: 生成器需要生成骗过识别器 (输出为真) 的数据识别器需要不被生成器骗过去 (针对生成器生成的数据输出为假,针对真实数据输出为真) 对抗生成网络的训练流程大致如下,需要循环训练生成器和识别器: 简单通俗一点我们可以用造假皮包为例来理解,好理解了吧????: 和现实造假皮包一样,生成器会生成越来越接近真实数据的假数据,最后会生成和真实数据一模一样的数据,但这样反而就远离我们构建生成网络的目的了(不如直接用真实数据)。使用生成网络通常是为了达到以下的目的: 要求大量看上去是真的,但稍微不一样的数据要求没有版权保护的数据 (假数据没来的版权????)生成想要但是现实没有的数据 (需要更进一步的工作) 看以上的流程你可能会发现,因为对抗生成网络是无监督学习,不需要标签,我们只能给模型传入随机的条件来让它生成数据,模型生成出来的数据看起来可能像真的但不一定是我们想要的。如果我们想要指定具体的条件,则需要在训练完成以后分析随机条件对生成结果的影响,例如随机生成的第二个数字代表性别,第六个数字代表年龄,第八个数字代表头发的数量,这样我们就可以调整这些条件来让模型生成想要的图片。 还记得上一篇人脸识别的模型不?人脸识别的模型会把图片转换为某个长度的向量,训练完成以后这个向量的值会代表人物的属性,而这一篇是反过来,把某个长度的向量转换回图片,训练成功以后这个向量同样会代表人物的各个属性。当然,两种的向量表现形式是不同的,把人脸识别输出的向量交给对抗生成网络,生成的图片和原有的图片可能会相差很远,把人脸识别输出的向量还原回去的方法后面再研究吧????。 对抗生成网络的实现反卷积层 (ConvTranspose2d) 在第八篇介绍 CNN 的文章中,我们了解过卷积层运算 (Conv2d) 的实现原理,CNN 模型会利用卷积层来把图片的长宽逐渐缩小,通道数逐渐扩大,最后扁平化输出一个代表图片特征的向量: 而在对抗生成网络的生成器中,我们需要实现反向的操作,即把向量当作一个 (向量长度, 1, 1) 的图片,然后把长宽逐渐扩大,通道数 (最开始是向量长度) 逐渐缩小,最后变为 (3, 图片长度, 图片宽度) 的图片 (3 代表 RGB)。 实现反向操作需要反卷积层 (ConvTranspose2d),反卷积层简单的来说就是在参数数量相同的情况下,把输出大小的数据还原为输入大小的数据: 要理解反卷积层的具体运算方式,我们可以把卷积层拆解为简单的矩阵乘法: 可以看到卷积层计算的时候可以根据内核参数和输入大小生成一个矩阵,然后计算输入与这个矩阵的乘积来得到输出结果。 而反卷积层则会计算输入与转置 (Transpose) 后的矩阵的乘积得到输出结果: 可以看到卷积层与反卷积层的区别只在于是否转置计算使用的矩阵。此外,通道数量转换的计算方式也是一样的。 测试反卷积层的代码如下: >>> import torch # 生成测试用的矩阵 # 第一个维度代表批次,第二个维度代表通道数量,第三个维度代表长度,第四个维度代表宽度 >>> a = torch.arange(1, 5).float().reshape(1, 1, 2, 2) >>> a tensor([[[[1., 2.], [3., 4.]]]]) # 创建反卷积层 >>> convtranspose2d = torch.nn.ConvTranspose2d(1, 1, kernel_size=2, stride=2, bias=False) # 手动指定权重 (让计算更好理解) >>> convtranspose2d.weight = torch.nn.Parameter(torch.tensor([0.1, 0.2, 0.5, 0.8]).reshape(1, 1, 2, 2)) >>> convtranspose2d.weight Parameter containing: tensor([[[[0.1000, 0.2000], [0.5000, 0.8000]]]], requires_grad=True) # 测试反卷积层 >>> convtranspose2d(a) tensor([[[[0.1000, 0.2000, 0.2000, 0.4000], [0.5000, 0.8000, 1.0000, 1.6000], [0.3000, 0.6000, 0.4000, 0.8000], [1.5000, 2.4000, 2.0000, 3.2000]]]], grad_fn=<SlowConvTranspose2DBackward>) 需要注意的是,不一定存在一个反卷积层可以把卷积层的输出还原到输入,这是因为卷积层的计算是不可逆的,即使存在一个可以把输出还原到输入的矩阵,这个矩阵也不一定有一个等效的反卷积层的内核参数。 生成器的实现 (Generator) 接下来我们看一下生成器的定义,原始介绍 GAN 的论文给出了生成 64x64 图片的网络,而这里给出的是生成 80x80 图片的网络,其实区别只在于一开始的输出通道数量 (论文是 4, 这里是 5) class GenerationModel(nn.Module): """生成虚假数据的模型""" # 编码长度 EmbeddedSize = 128 def __init__(self): super().__init__() self.generator = nn.Sequential( # 128,1,1 => 512,5,5 nn.ConvTranspose2d(128, 512, kernel_size=5, stride=1, padding=0, bias=False), nn.BatchNorm2d(512), nn.ReLU(inplace=True), # => 256,10,10 nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1, bias=False), nn.BatchNorm2d(256), nn.ReLU(inplace=True), # => 128,20,20 nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1, bias=False), nn.BatchNorm2d(128), nn.ReLU(inplace=True), # => 64,40,40 nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1, bias=False), nn.BatchNorm2d(64), nn.ReLU(inplace=True), # => 3,80,80 nn.ConvTranspose2d(64, 3, kernel_size=4, stride=2, padding=1, bias=False), # 限制输出在 -1 ~ 1,不使用 Hardtanh 是为了让超过范围的值可以传播给上层 nn.Tanh()) def forward(self, x): y = self.generator(x.view(x.shape[0], x.shape[1], 1, 1)) return y 表现如下: 其中批次正规化 (BatchNorm) 用于控制参数值范围,防止层数过多 (后面会结合识别器训练) 导致梯度爆炸问题。 还有一个要点是生成器输出的范围会在 -1 ~ 1,也就是使用 -1 ~ 1 代表 0 ~ 255 的颜色值,这跟我们之前处理图片的时候把值除以 255 使得范围在 0 ~ 1 不一样。使用 -1 ~ 1 可以提升输出颜色的精度 (减少浮点数的精度损失)。 识别器的实现 (Discriminator) 我们再看以下识别器的定义,基本上就是前面生成器的相反流程: class DiscriminationModel(nn.Module): """识别数据是否真实的模型""" def __init__(self): super().__init__() self.discriminator = nn.Sequential( # 3,80,80 => 64,40,40 nn.Conv2d(3, 64, kernel_size=4, stride=2, padding=1, bias=False), nn.LeakyReLU(0.2, inplace=True), # => 128,20,20 nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1, bias=False), nn.BatchNorm2d(128), nn.LeakyReLU(0.2, inplace=True), # => 256,10,10 nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1, bias=False), nn.BatchNorm2d(256), nn.LeakyReLU(0.2, inplace=True), # => 512,5,5 nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1, bias=False), nn.BatchNorm2d(512), nn.LeakyReLU(0.2, inplace=True), # => 1,1,1 nn.Conv2d(512, 1, kernel_size=5, stride=1, padding=0, bias=False), # 扁平化 nn.Flatten(), # 输出是否真实数据 (0 or 1) nn.Sigmoid()) def forward(self, x): y = self.discriminator(x) return y 表现如下: 看到这里你可能会有几个疑问: 为什么用 LeakyReLU: 这是为了防止层数叠加次数过多导致的梯度消失问题,参考第三篇,LeakyReLU 对于负数输入不会返回 0,而是返回 输入 * slope,这里的 slope 指定为 0.2为什么第一层不加批次正规化 (BatchNorm): 原有论文中提到实际测试中,如果在所有层添加批次正规化会让模型训练结果不稳定,生成器的最后一层和识别器的第一层拿掉以后效果会好一些为什么不加池化层: 添加池化层以后可逆性将会降低,例如识别器针对假数据返回接近 0 的数值时,判断哪些部分导致这个输出的依据会减少 训练生成器和识别器的方法 接下来就是训练生成器和识别器,生成器和识别器需要分别训练,训练识别器的时候不能动生成器的参数,训练生成器的时候不能动识别器的参数,使用的代码大致如下: # 创建模型实例 generation_model = GenerationModel().to(device) discrimination_model = DiscriminationModel().to(device) # 创建参数调整器 # 根据生成器和识别器分别创建 optimizer_g = torch.optim.Adam(generation_model.parameters()) optimizer_d = torch.optim.Adam(discrimination_model.parameters()) # 随机生成编码 def generate_vectors(batch_size): vectors = torch.randn((batch_size, GenerationModel.EmbeddedSize), device=device) return vectors # 开始训练过程 for epoch in range(0, 10000): # 枚举真实数据 for index, batch_x in enumerate(read_batches()): # 生成随机编码 training_vectors = generate_vectors(minibatch_size) # 生成虚假数据 generated = generation_model(training_vectors) # 获取真实数据 real = batch_x # 训练识别器 (只调整识别器的参数) predicted_t = discrimination_model(real) predicted_f = discrimination_model(generated) loss_d = ( nn.functional.binary_cross_entropy( predicted_t, torch.ones(predicted_t.shape, device=device)) + nn.functional.binary_cross_entropy( predicted_f, torch.zeros(predicted_f.shape, device=device))) loss_d.backward() # 根据损失自动微分 optimizer_d.step() # 调整识别器的参数 optimizer_g.zero_grad() # 清空生成器参数记录的导函数值 optimizer_d.zero_grad() # 清空识别器参数记录的导函数值 # 训练生成器 (只调整生成器的参数) predicted_f = discrimination_model(generated) loss_g = nn.functional.binary_cross_entropy( predicted_f, torch.ones(predicted_f.shape, device=device)) loss_g.backward() # 根据损失自动微分 optimizer_g.step() # 调整生成器的参数 optimizer_g.zero_grad() # 清空生成器参数记录的导函数值 optimizer_d.zero_grad() # 清空识别器参数记录的导函数值 上述例子应该可以帮助你理解大致的训练流程和只训练识别器或生成器的方法,但是直接这么做效果会很差????,接下来我们会看看对抗生成网络的问题,并且给出优化方案,后面的完整代码会跟上述例子有一些不同。 如果对原始论文有兴趣可以参考这里,原始的对抗生成网络又称 DCGAN (Deep Convolutional GAN)。 对抗生成网络的问题看完以上的内容你可能会觉得,嘿嘿,还是挺简单的。不????,虽然原理看上去挺好理解,模型本身也不复杂,但对抗生成网络是目前介绍过的模型里面训练难度最高的,这是因为对抗生成网络建立在矛盾上,没有一个明确的目标 (之前的模型目标都是针对未学习过的数据预测正确率尽可能接近 100%)。如果生成器生成 100% 可以骗过识别器的数据,那可能代表识别器根本没正常工作,或者生成器生成的数据跟真实数据 100% 相同,没实用价值;而如果识别器 100% 可以识别生成器生成的数据,那代表生成器生成的数据太垃圾,一个都骗不过。本篇介绍的例子使用了最蠢最简单的方法,把每一轮学习后生成器生成的数据输出到硬盘,然后人工鉴定生成的效果怎样????,同时还会每 100 轮训练记录一次模型状态,供训练完以后回滚使用 (最后一个模型状态效果不会是最好的,后面会说明)。 另一个问题是识别器和生成器不能同时训练,怎样安排训练过程对训练结果的影响非常大????,理想的过程是:识别器稍微领先生成器,生成器跟着识别器慢慢的生成越来越精准的数据。举例来说,识别器首先会识别肤色占比较多的图片为人脸,接下来生成器会生成全部都是肤色的图片,然后识别器会识别有两个看上去是眼睛的图片为人脸,接下来生成器会加上两个看上去是眼睛的形状到图片,之后识别器会识别带有五官的图片为人脸,接下来生成器会加上剩余的五官到图片,最后识别器会识别五官和脸形状比较正常的人为人脸,生成器会尽量调整五官和人脸形状接近正常水平。而不理想的过程是识别器大幅领先生成器,例如识别器很早就达到了接近 100% 的正确率,而生成器因为找不到学习的方向正确率会一直原地踏步;另一个不理想的过程是生成器领先识别器,这时会出现识别器找不到学习的方向,生成器也找不到学习的方向而原地转的情况。实现识别器稍微领先生成器,可以增加识别器的训练次数,常见的方法是每训练 n 次识别器就训练 1 次生成器,而本文后面会介绍根据正确率动态调整识别器和生成器学习次数的方法,参考后面的代码吧。 对抗生成网络最大的问题是模式崩溃 (Mode Collapse) 问题,这个问题所有训练对抗生成网络的人都会面对,并且目前没有 100% 的方法避免????。简单的来说就是生成器学会偷懒作弊,只会输出一到几个与真实数据几乎一模一样的虚假数据,因为生成的数据同质化非常严重,即使可以骗过识别器也没什么实用价值。发生模式崩溃以后的输出例子如下,可以看到很多人脸都非常接近: 为了尽量避免模式崩溃问题,以下几个改进的模型被发明了出来,这就是人民群众的智慧啊????。 改进对抗生成网络 (WGAN)模式崩溃问题的原因之一就是部分模型参数会随着训练固化 (达到本地最优),因为原始的对抗生成网络会让识别器输出尽可能接近 1 或者 0 的值,如果值已经是 0 或者 1 那么参数就不会被调整。WGAN (Wasserstein GAN) 的解决方式是不限制识别器输出的值范围,只要求识别器针对真实数据输出的值大于虚假数据输出的值,和要求生成器生成可以让识别器输出更大的值的数据。 第一个修改是拿掉识别器最后的 Sigmoid,这样识别器输出的值就不会限制在 0 ~ 1 的范围内。 第二个修改是修改计算损失的方式: # 计算识别器的损失,修改前 loss_d = ( nn.functional.binary_cross_entropy( predicted_t, torch.ones(predicted_t.shape, device=device)) + nn.functional.binary_cross_entropy( predicted_f, torch.zeros(predicted_f.shape, device=device))) # 计算识别器的损失,修改后 loss_d = predicted_f.mean() - predicted_t.mean() # 计算生成器的损失,修改前 loss_g = nn.functional.binary_cross_entropy( predicted_f, torch.ones(predicted_f.shape, device=device)) # 计算生成器的损失,修改后 loss_g = -predicted_f.mean() 这么修改以后会出现一个问题,识别器输出的值范围会随着训练越来越大 (生成器提高虚假数据的输出值,接下来识别器提高真实数据的输出值,循环下去输出值就会越来越大????),从而导致梯度爆炸问题。为了解决这个问题 WGAN 对识别器参数的可取范围做出了限制,也就是在调整完参数以后裁剪参数,第三个修改如下: # 让识别器参数必须在 -0.1 ~ 0.1 之间 for p in discrimination_model.parameters(): p.data.clamp_(-0.1, 0.1) 如果有兴趣可以参考 WGAN 的原始论文,里面一大堆数学公式可以把人吓坏????,但主要的部分只有上面提到的三点。 改进对抗生成网络 (WGAN-GP)WGAN 为了防止梯度爆炸问题对识别器参数的可取范围做出了限制,但这个做法比较粗暴,WGAN-GP (Wasserstein GAN Gradient Penalty) 提出了一个更优雅的方法,即限制导函数值的范围,如果导函数值偏移某个指定的值则通过损失给与模型惩罚。 具体实现如下,看起来比较复杂但做的事情只是计算识别器输入数据的导函数值,然后判断所有通道合计的导函数值的 L2 合计与常量 1 相差多少,相差越大就返回越高的损失,这样识别器模型参数自然会控制在某个水平。 def gradient_penalty(discrimination_model, real, generated): """控制导函数值的范围,用于防止模型参数失控 (https://arxiv.org/pdf/1704.00028.pdf)""" # 给批次中的每个样本分别生成不同的随机值,范围在 0 ~ 1 batch_size = real.shape[0] rate = torch.randn(batch_size, 1, 1, 1) rate = rate.expand(batch_size, real.shape[1], real.shape[2], real.shape[3]).to(device) # 按随机值比例混合真样本和假样本 mixed = (rate * real + (1 - rate) * generated) # 识别混合样本 predicted_m = discrimination_model(mixed) # 计算 mixed 对 predicted_m 的影响,也就是 mixed => predicted_m 的微分 # 与以下代码计算结果相同,但不会影响途中 (即模型参数) 的 grad 值 # mixed = torch.tensor(mixed, requires_grad=True) # predicted_m.sum().backward() # grad = mixed.grad grad = torch.autograd.grad( outputs = predicted_m, inputs = mixed, grad_outputs = torch.ones(predicted_m.shape).to(device), create_graph=True, retain_graph=True)[0] # 让导函数值的 L2 norm (所有通道合计) 在 1 左右,如果偏离 1 则使用损失给与惩罚 grad_penalty = ((grad.norm(2, dim=1) - 1) ** 2).mean() * 10 return grad_penalty 然后再修改计算识别器损失的方法: # 计算识别器的损失,修改前 loss_d = predicted_f.mean() - predicted_t.mean() # 计算识别器的损失,修改后 loss_d = (predicted_f.mean() - predicted_t.mean() + gradient_penalty(discrimination_model, real, generated)) 最后把识别器中的批次正规化 (BatchNorm) 删掉或者改为实例正规化 (InstanceNorm) 就完了。InstanceNorm 和 BatchNorm 的区别在于计算平均值和标准差的时候不会根据整个批次计算,而是只根据各个样本自身计算,关于 BatchNorm 的计算方式可以参考第四篇。 如果有兴趣可以参考 WGAN-GP 的原始论文。 完整代码又到完整代码的时间了????,这份代码同时包含了原始的 GAN 模型 (DCGAN),WGAN 和 WGAN-GP 的实现,后面还会比较它们之间的效果相差多少。 使用的数据集链接如下,前一篇的人脸识别文章也用到了这个数据集: https://www.kaggle.com/atulanandjha/lfwpeople 需要注意的是人脸图片数量越多就越容易出现模式崩溃问题,这也是对抗生成网络训练的难点之一????,这份代码只会随机选取 2000 张图片用于训练。 这份代码还会根据正确率动态调整生成器和识别器的训练比例,如果识别器比生成器更强则训练 1 次生成器,如果生成器比识别器更强则训练 5 次识别器,这么做可以省去手动调整训练比例的麻烦,经实验效果也不错????。 import os import sys import torch import gzip import itertools import random import numpy import math import json from PIL import Image from torch import nn from matplotlib import pyplot from functools import lru_cache # 生成或识别图片的大小 IMAGE_SIZE = (80, 80) # 训练使用的数据集路径 DATASET_DIR = "./dataset/lfwpeople/lfw_funneled" # 模型类别, 支持 DCGAN, WGAN, WGAN-GP MODEL_TYPE = "WGAN-GP" # 用于启用 GPU 支持 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") class GenerationModel(nn.Module): """生成虚假数据的模型""" # 编码长度 EmbeddedSize = 128 def __init__(self): super().__init__() self.generator = nn.Sequential( # 128,1,1 => 512,5,5 nn.ConvTranspose2d(128, 512, kernel_size=5, stride=1, padding=0, bias=False), nn.BatchNorm2d(512), nn.ReLU(inplace=True), # => 256,10,10 nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1, bias=False), nn.BatchNorm2d(256), nn.ReLU(inplace=True), # => 128,20,20 nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1, bias=False), nn.BatchNorm2d(128), nn.ReLU(inplace=True), # => 64,40,40 nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1, bias=False), nn.BatchNorm2d(64), nn.ReLU(inplace=True), # => 3,80,80 nn.ConvTranspose2d(64, 3, kernel_size=4, stride=2, padding=1, bias=False), # 限制输出在 -1 ~ 1,不使用 Hardtanh 是为了让超过范围的值可以传播给上层 nn.Tanh()) def forward(self, x): y = self.generator(x.view(x.shape[0], x.shape[1], 1, 1)) return y @staticmethod def calc_accuracy(predicted_f): """正确率计算器""" # 返回骗过识别器的虚假数据比例 if MODEL_TYPE == "DCGAN": threshold = 0.5 elif MODEL_TYPE in ("WGAN", "WGAN-GP"): threshold = DiscriminationModel.LastTrueSamplePredictedMean else: raise ValueError("unknown model type") return (predicted_f >= threshold).float().mean().item() class DiscriminationModel(nn.Module): """识别数据是否真实的模型""" # 最终识别真实样本的输出平均值,WGAN 会使用这个值判断骗过识别器的虚假数据比例 LastTrueSamplePredictedMean = 0.5 def __init__(self): super().__init__() # 标准化函数 def norm2d(features): if MODEL_TYPE == "WGAN-GP": # WGAN-GP 本来不需要 BatchNorm,但可以额外的加 InstanceNorm 改善效果 # InstanceNorm 不一样的是平均值和标准差会针对批次中的各个样本分别计算 # affine = True 表示调整量可学习 (BatchNorm2d 默认为 True) return nn.InstanceNorm2d(features, affine=True) return nn.BatchNorm2d(features) self.discriminator = nn.Sequential( # 3,80,80 => 64,40,40 nn.Conv2d(3, 64, kernel_size=4, stride=2, padding=1, bias=False), nn.LeakyReLU(0.2, inplace=True), # => 128,20,20 nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1, bias=False), norm2d(128), nn.LeakyReLU(0.2, inplace=True), # => 256,10,10 nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1, bias=False), norm2d(256), nn.LeakyReLU(0.2, inplace=True), # => 512,5,5 nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1, bias=False), norm2d(512), nn.LeakyReLU(0.2, inplace=True), # => 1,1,1 nn.Conv2d(512, 1, kernel_size=5, stride=1, padding=0, bias=False), # 扁平化 nn.Flatten()) if MODEL_TYPE == "DCGAN": # 输出是否真实数据 (0 or 1) # WGAN 不限制输出值范围在 0 ~ 1 之间 self.discriminator.add_module("sigmoid", nn.Sigmoid()) def forward(self, x): y = self.discriminator(x) return y @staticmethod def calc_accuracy(predicted_f, predicted_t): """正确率计算器""" # 返回正确识别的数据比例 if MODEL_TYPE == "DCGAN": return (((predicted_f <= 0.5).float().mean() + (predicted_t > 0.5).float().mean()) / 2).item() elif MODEL_TYPE in ("WGAN", "WGAN-GP"): DiscriminationModel.LastTrueSamplePredictedMean = predicted_t.mean() return (predicted_t > predicted_f).float().mean().item() else: raise ValueError("unknown model type") def gradient_penalty(self, real, generated): """控制导函数值的范围,用于防止模型参数失控 (https://arxiv.org/pdf/1704.00028.pdf)""" # 给批次中的每个样本分别生成不同的随机值,范围在 0 ~ 1 batch_size = real.shape[0] rate = torch.randn(batch_size, 1, 1, 1) rate = rate.expand(batch_size, real.shape[1], real.shape[2], real.shape[3]).to(device) # 按随机值比例混合真样本和假样本 mixed = (rate * real + (1 - rate) * generated) # 识别混合样本 predicted_m = self.forward(mixed) # 计算 mixed 对 predicted_m 的影响,也就是 mixed => predicted_m 的微分 # 与以下代码计算结果相同,但不会影响途中 (即模型参数) 的 grad 值 # mixed = torch.tensor(mixed, requires_grad=True) # predicted_m.sum().backward() # grad = mixed.grad grad = torch.autograd.grad( outputs = predicted_m, inputs = mixed, grad_outputs = torch.ones(predicted_m.shape).to(device), create_graph=True, retain_graph=True)[0] # 让导函数值的 L2 norm (所有通道合计) 在 1 左右,如果偏离 1 则使用损失给与惩罚 grad_penalty = ((grad.norm(2, dim=1) - 1) ** 2).mean() * 10 return grad_penalty def save_tensor(tensor, path): """保存 tensor 对象到文件""" torch.save(tensor, gzip.GzipFile(path, "wb")) # 为了减少读取时间这里缓存了读取的 tensor 对象 # 如果内存不够应该适当减少 maxsize @lru_cache(maxsize=200) def load_tensor(path): """从文件读取 tensor 对象""" return torch.load(gzip.GzipFile(path, "rb")) def image_to_tensor(img): """缩放并转换图片对象到 tensor 对象""" img = img.resize(IMAGE_SIZE) # 缩放图片,比例不一致时拉伸 arr = numpy.asarray(img) t = torch.from_numpy(arr) t = t.transpose(0, 2) # 转换维度 H,W,C 到 C,W,H t = (t / 255.0) * 2 - 1 # 正规化数值使得范围在 -1 ~ 1 return t def tensor_to_image(t): """转换 tensor 对象到图片""" t = (t + 1) / 2 * 255.0 # 转换颜色回 0 ~ 255 t = t.transpose(0, 2) # 转换维度 C,W,H 到 H,W,C t = t.int() # 转换数值到整数 img = Image.fromarray(t.numpy().astype("uint8"), "RGB") return img def prepare(): """准备训练""" # 数据集转换到 tensor 以后会保存在 data 文件夹下 if not os.path.isdir("data"): os.makedirs("data") # 查找人脸图片列表 # 每个人最多使用 2 张图片 image_paths = [] for dirname in os.listdir(DATASET_DIR): dirpath = os.path.join(DATASET_DIR, dirname) if not os.path.isdir(dirpath): continue for filename in os.listdir(dirpath)[:2]: image_paths.append(os.path.join(DATASET_DIR, dirname, filename)) print(f"found {len(image_paths)} images") # 随机打乱人脸图片列表 random.shuffle(image_paths) # 限制人脸数量 # 如果数量太多,识别器难以记住人脸的具体特征,会需要更长时间训练或直接陷入模式崩溃问题 image_paths = image_paths[:2000] print(f"only use {len(image_paths)} images") # 保存人脸图片数据 for batch, index in enumerate(range(0, len(image_paths), 200)): paths = image_paths[index:index+200] images = [] for path in paths: img = Image.open(path) # 扩大人脸占比 w, h = img.size img = img.crop((int(w*0.25), int(h*0.25), int(w*0.75), int(h*0.75))) images.append(img) tensors = [ image_to_tensor(img) for img in images ] tensor = torch.stack(tensors) # 维度: (图片数量, 3, 宽度, 高度) save_tensor(tensor, os.path.join("data", f"{batch}.pt")) print(f"saved batch {batch}") print("done") def train(): """开始训练模型""" # 创建模型实例 generation_model = GenerationModel().to(device) discrimination_model = DiscriminationModel().to(device) # 创建损失计算器 ones_map = {} zeros_map = {} def loss_function_t(predicted): """损失计算器 (训练识别结果为 1)""" count = predicted.shape[0] ones = ones_map.get(count) if ones is None: ones = torch.ones((count, 1), device=device) ones_map[count] = ones return nn.functional.binary_cross_entropy(predicted, ones) def loss_function_f(predicted): """损失计算器 (训练识别结果为 0)""" count = predicted.shape[0] zeros = zeros_map.get(count) if zeros is None: zeros = torch.zeros((count, 1), device=device) zeros_map[count] = zeros return nn.functional.binary_cross_entropy(predicted, zeros) # 创建参数调整器 # 学习率和 betas 跟各个论文给出的一样,可以一定程度提升学习效果,但不是决定性的 if MODEL_TYPE == "DCGAN": optimizer_g = torch.optim.Adam(generation_model.parameters(), lr=0.0002, betas=(0.5, 0.999)) optimizer_d = torch.optim.Adam(discrimination_model.parameters(), lr=0.0002, betas=(0.5, 0.999)) elif MODEL_TYPE == "WGAN": optimizer_g = torch.optim.RMSprop(generation_model.parameters(), lr=0.00005) optimizer_d = torch.optim.RMSprop(discrimination_model.parameters(), lr=0.00005) elif MODEL_TYPE == "WGAN-GP": optimizer_g = torch.optim.Adam(generation_model.parameters(), lr=0.0001, betas=(0.0, 0.999)) optimizer_d = torch.optim.Adam(discrimination_model.parameters(), lr=0.0001, betas=(0.0, 0.999)) else: raise ValueError("unknown model type") # 记录训练集和验证集的正确率变化 training_accuracy_g_history = [] training_accuracy_d_history = [] # 计算正确率的工具函数 calc_accuracy_g = generation_model.calc_accuracy calc_accuracy_d = discrimination_model.calc_accuracy # 随机生成编码 def generate_vectors(batch_size): vectors = torch.randn((batch_size, GenerationModel.EmbeddedSize), device=device) return vectors # 输出生成的图片样本 def output_generated_samples(epoch, samples): dir_path = f"./generated_samples/{epoch}" if not os.path.isdir(dir_path): os.makedirs(dir_path) for index, sample in enumerate(samples): path = os.path.join(dir_path, f"{index}.png") tensor_to_image(sample.cpu()).save(path) # 读取批次的工具函数 def read_batches(): for batch in itertools.count(): path = f"data/{batch}.pt" if not os.path.isfile(path): break x = load_tensor(path) yield x.to(device) # 开始训练过程 validating_vectors = generate_vectors(100) for epoch in range(0, 10000): print(f"epoch: {epoch}") # 根据训练集训练并修改参数 # 切换模型到训练模式 generation_model.train() discrimination_model.train() training_accuracy_g_list = [] training_accuracy_d_list = [] last_accuracy_g = 0 last_accuracy_d = 0 minibatch_size = 20 train_discriminator_count = 0 for index, batch_x in enumerate(read_batches()): # 使用小批次训练 training_batch_accuracy_g = 0.0 training_batch_accuracy_d = 0.0 minibatch_count = 0 for begin in range(0, batch_x.shape[0], minibatch_size): # 测试目前生成器和识别器哪边占劣势,训练占劣势的一方 # 最终的平衡状态是: 生成器正确率 = 1.0, 识别器正确率 = 0.5 # 代表生成器生成的图片和真实图片基本完全一样,但不应该训练到这个程度 training_vectors = generate_vectors(minibatch_size) # 随机向量 generated = generation_model(training_vectors) # 根据随机向量生成的虚假数据 real = batch_x[begin:begin+minibatch_size] # 真实数据 predicted_t = discrimination_model(real) predicted_f = discrimination_model(generated) accuracy_g = calc_accuracy_g(predicted_f) accuracy_d = calc_accuracy_d(predicted_f, predicted_t) train_discriminator = (accuracy_g / 2) >= accuracy_d if train_discriminator or train_discriminator_count > 0: # 训练识别器 if MODEL_TYPE == "DCGAN": loss_d = loss_function_f(predicted_f) + loss_function_t(predicted_t) elif MODEL_TYPE == "WGAN": loss_d = predicted_f.mean() - predicted_t.mean() elif MODEL_TYPE == "WGAN-GP": loss_d = (predicted_f.mean() - predicted_t.mean() + discrimination_model.gradient_penalty(real, generated)) else: raise ValueError("unknown model type") loss_d.backward() optimizer_d.step() # 限制识别器参数范围以防止模型参数失控 (WGAN-GP 有更好的方法) # 这里的限制值比论文的值 (0.01) 更大是因为模型层数和参数量更多 if MODEL_TYPE == "WGAN": for p in discrimination_model.parameters(): p.data.clamp_(-0.1, 0.1) # 让识别器训练次数多于生成器 if train_discriminator and train_discriminator_count == 0: train_discriminator_count = 5 train_discriminator_count -= 1 else: # 训练生成器 if MODEL_TYPE == "DCGAN": loss_g = loss_function_t(predicted_f) elif MODEL_TYPE in ("WGAN", "WGAN-GP"): loss_g = -predicted_f.mean() else: raise ValueError("unknown model type") loss_g.backward() optimizer_g.step() optimizer_g.zero_grad() optimizer_d.zero_grad() training_batch_accuracy_g += accuracy_g training_batch_accuracy_d += accuracy_d minibatch_count += 1 training_batch_accuracy_g /= minibatch_count training_batch_accuracy_d /= minibatch_count # 输出批次正确率 training_accuracy_g_list.append(training_batch_accuracy_g) training_accuracy_d_list.append(training_batch_accuracy_d) print(f"epoch: {epoch}, batch: {index},", f"accuracy_g: {training_batch_accuracy_g}, accuracy_d: {training_batch_accuracy_d}") training_accuracy_g = sum(training_accuracy_g_list) / len(training_accuracy_g_list) training_accuracy_d = sum(training_accuracy_d_list) / len(training_accuracy_d_list) training_accuracy_g_history.append(training_accuracy_g) training_accuracy_d_history.append(training_accuracy_d) print(f"training accuracy_g: {training_accuracy_g}, accuracy_d: {training_accuracy_d}") # 保存虚假数据用于评价训练效果 output_generated_samples(epoch, generation_model(validating_vectors)) # 保存模型状态 if (epoch + 1) % 10 == 0: save_tensor(generation_model.state_dict(), "model.generation.pt") save_tensor(discrimination_model.state_dict(), "model.discrimination.pt") if (epoch + 1) % 100 == 0: save_tensor(generation_model.state_dict(), f"model.generation.epoch_{epoch}.pt") save_tensor(discrimination_model.state_dict(), f"model.discrimination.epoch_{epoch}.pt") print("model saved") print("training finished") # 显示训练集的正确率变化 pyplot.plot(training_accuracy_g_history, label="training_accuracy_g") pyplot.plot(training_accuracy_d_history, label="training_accuracy_d") pyplot.ylim(0, 1) pyplot.legend() pyplot.show() from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import parse_qs from io import BytesIO class RequestHandler(BaseHTTPRequestHandler): """用于测试生成图片的简单服务器""" # 模型状态的路径,这里使用看起来效果最好的记录 MODEL_STATE_PATH = "model.generation.epoch_2999.pt" Model = None @staticmethod def get_model(): if RequestHandler.Model is None: # 创建模型实例,加载训练好的状态,然后切换到验证模式 model = GenerationModel().to(device) model.load_state_dict(load_tensor(RequestHandler.MODEL_STATE_PATH)) model.eval() RequestHandler.Model = model return RequestHandler.Model def do_GET(self): parts = self.path.partition("?") if parts[0] == "/": self.send_response(200) self.send_header("Content-type", "text/html") self.end_headers() with open("gan_eval.html", "rb") as f: self.wfile.write(f.read()) elif parts[0] == "/generate": # 根据传入的参数生成图片 params = parse_qs(parts[-1]) vector = (torch.tensor([float(x) for x in params["values"][0].split(",")]) .reshape(1, GenerationModel.EmbeddedSize) .to(device)) generated = RequestHandler.get_model()(vector)[0] img = tensor_to_image(generated.cpu()) bytes_io = BytesIO() img.save(bytes_io, format="PNG") # 返回图片 self.send_response(200) self.send_header("Content-type", "image/png") self.end_headers() self.wfile.write(bytes_io.getvalue()) else: self.send_response(404) self.end_headers() self.wfile.write(b"Not Found") def eval_model(): """使用训练好的模型生成图片""" server = HTTPServer(("localhost", 8666), RequestHandler) print("Please access http://localhost:8666") try: server.serve_forever() except KeyboardInterrupt: pass server.server_close() exit() def main(): """主函数""" if len(sys.argv) < 2: print(f"Please run: {sys.argv[0]} prepare|train|eval") exit() # 给随机数生成器分配一个初始值,使得每次运行都可以生成相同的随机数 # 这是为了让过程可重现,你也可以选择不这样做 random.seed(0) torch.random.manual_seed(0) # 根据命令行参数选择操作 operation = sys.argv[1] if operation == "prepare": prepare() elif operation == "train": train() elif operation == "eval": eval_model() else: raise ValueError(f"Unsupported operation: {operation}") if __name__ == "__main__": main() 保存代码到 gan.py,然后执行以下命令即可开始训练: python3 gan.py prepare python3 gan.py train 同样训练 2000 轮以后,DCGAN, WGAN, WGAN-GP 输出的样本如下: DCGAN WGAN WGAN-GP 可以看到 WGAN-GP 受模式崩溃问题影响最少,并且效果也更好????。 WGAN-GP 训练到 3000 次以后输出的样本如下: WGAN-GP 训练到 10000 次以后输出的样本如下: 随着训练次数增多,WGAN-GP 一样无法避免模式崩溃问题,这就是为什么以上代码会记录每一轮训练后输出的样本,并在每 100 轮训练以后保存单独的模型状态,这样训练结束以后我们可以通过评价输出的样本找到效果最好的批次,然后使用该批次的模型状态。 上述的例子效果最好的状态是训练 3000 次以后的状态。 你可能发现输出的样本中夹杂了一些畸形????,这是因为生成器没有覆盖到输入的向量空间,最主要的原因是随机输入中包含了很多接近 0 的值,避免这个问题简单的做法是生成随机输入时限制值必须小于或大于某个值。原则上给反卷积层设置 Bias 也可以避免这个问题,但会更容易陷入模式崩溃问题。 使用训练好的模型生成人脸就比较简单了: generation_model = GenerationModel().to(device) model.load_state_dict(load_tensor("model.generation.epoch_2999.pt")) model.eval() # 随机生成 100 张人脸 vector = torch.randn((100, GenerationModel.EmbeddedSize), device=device) samples = model(vector) for index, sample in enumerate(samples): img = tensor_to_image(sample.cpu()) img.save(f"{index}.png") 额外的,我做了一个可以动态调整参数捏脸的网页,html 代码如下: <!DOCTYPE html> <html lang="cn"> <head> <meta charset="utf-8"> <title>测试人脸生成</title> <style> html, body { width: 100%; height: 100%; margin: 0px; } .left-pane { width: 50%; height: 100%; border-right: 1px solid #000; } .right-pane { position: fixed; left: 70%; top: 35%; width: 25%; } .sliders { padding: 8px; } .slider-container { display: inline-block; min-width: 25%; } #image { left: 25%; top: 25%; width: 50%; height: 50%; } </style> </head> <body> <div class="left-pane"> <div class="sliders"> </div> </div> <div class="right-pane"> <p><img id="target" src="data:image/png;base64," alt="image" /></p> <p><button class="set-random">随机生成</button></p> </div> </body> <script> (function() { // 滑动条改变后的处理 var onChanged = function() { var sliderInputs = document.querySelectorAll(".slider"); var values = []; sliderInputs.forEach(function(s) { values.push(s.value); }); var image = document.querySelector("#target"); image.setAttribute("src", "/generate?values=" + values.join(",")); }; // 点击随机生成时的处理 var setRandomButton = document.querySelector(".set-random"); setRandomButton.onclick = function() { var sliderInputs = document.querySelectorAll(".slider"); sliderInputs.forEach(function(s) { s.value = Math.random() * 2 - 1; }); onChanged(); }; // 添加滑动条 var sliders = document.querySelector(".sliders"); for (var n = 0; n < 128; ++n) { var container = document.createElement("div"); container.setAttribute("class", "slider-container"); var span = document.createElement("span"); span.innerText = n; container.appendChild(span); var slider = document.createElement("input"); slider.setAttribute("type", "range") slider.setAttribute("class", "slider"); slider.setAttribute("min", "-1"); slider.setAttribute("max", "1"); slider.setAttribute("step", "0.01"); slider.value = 0; slider.onchange = onChanged; slider.oninput = onChanged; container.appendChild(slider); sliders.appendChild(container); } })(); </script> </html> 保存到 gan_eval.html 以后执行以下命令即可启动服务器: python3 gan.py eval 浏览器打开 http://localhost:8666 以后会显示以下界面,点击随机生成按钮可以随机生成人脸,拉动左边的参数条可以动态调整参数: 一些捏脸的网站会分析各个参数的含义,看看哪些参数代表肤色,那些参数代表表情,哪些参数代表脱发程度,我比较懒就只给出各个参数的序号了????。 写在最后又摸完一个新的模型了,跟到这篇的人也越来越少了,估计这个系列再写一两篇就会结束 (VAE, 强化学习)。 前一篇论文我提到了可能会开一个新的系列介绍 .NET 的机器学习,但我决定不开了。经过试验发现没有达到可用的水平,文档基本等于没有,社区气氛也不行 (大会 PPT 倒是做的挺好的)。毕竟语言只是个工具,不是老祖宗,还是看开一点吧。学 python 再做机器学习会轻松很多,就像长远来说学一点基础英语再编程比完全只用中文编程 (先把基础框架类库系统接口的英文全部翻译成中文,再用中文写) 简单很多,对叭????。 本系列将陆续更新 文章作者:老农的博客 文章来源:https://www.cnblogs.com/zkweb/p/14684775.html

0

机器学习

AI小助手·2021-04-29 10:20 0 阅读 93
文章写给程序员的机器学习入门 (十三) - 人脸识别
这篇将会介绍人脸识别模型的实现,以及如何结合前几篇文章的模型来识别图片上的人,最终效果如下: 实现人脸识别的方法你可能会想起第八篇文章介绍如何识别图片上物体类型的 CNN 模型,那么人脸是否也能用同样的方法识别呢?例如有 100 个人,把这 100 个人当作 100 个分类,然后用他们的照片来训练,似乎就可以训练出可以根据图片识别哪个人的模型了,真的吗????。 很遗憾,用于识别物体类型的模型并不能用在人脸识别上,主要有以下原因: 识别物体类型的模型通常要求每个分类有大量的图片,而人脸识别模型很多时候只能拿到个位数的人脸,这样训练出来的精度很不理想。这个问题又称 One-shot 学习问题 (每个分类只有很少的样本数量)。识别物体类型的模型只能识别训练过的类型,如果想添加新类型则需要重新开始训练 (如果一开始预留有多的分类数量可以基于上一次的模型状态继续训练,这个做法又称迁移学习)同上,识别物体类型的模型不能识别没有学习过的人物 我们需要用不同的方法来实现人脸识别????,目前主流的方法有两种,一种是基于指标,根据人脸生成对应的编码,然后调整编码之间的距离 (同一个人的编码接近,不同的人的编码远离) 来实现人脸的区分;另一种是基于分类,可以看作是识别物体类型的模型的改进版,同样会根据人脸生成对应的编码,但最后会添加一层输出分类的线性模型,来实现间接的调整编码。 基于指标的方法 基于指标的方法使用的模型结构如下: 我们最终想要模型根据人脸输出编码,如果是同一个人那么编码就会比较接近,如果是不同的人那么编码就会比较远离。如果训练成功,我们可以根据已有的人脸构建一个编码数据库,识别新的人脸时生成新的人脸的编码,然后对比数据库中的编码找出最接近的人脸,如下图所示。 输出编码的模型定义如下,这里的编码长度是 32 (完整代码会在后面给出): # Resnet 的实现 self.resnet = torchvision.models.resnet18(num_classes=256) # 支持黑白图片 if USE_GRAYSCALE: self.resnet.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False) # 最终输出编码的线性模型 # 因为 torchvision 的 resnet 最终会使用一个 Linear,这里省略掉第一个 Linear self.encode_model = nn.Sequential( nn.ReLU(inplace=True), nn.Linear(256, 128), nn.ReLU(inplace=True), nn.Linear(128, 32)) 而比较编码找出最接近的人脸可以使用以下的代码 (计算编码中各个值的相差的平方的合计): diff = (new_code - exists_code).pow(2).sum(dim=1).sort() most_similar = diff.indices[0] 如果编码数据库中有大量的编码,计算所有编码的距离消耗会比较大,我们可以使用支持搜索向量的数据库,例如把编码保存到 Elastic Search 数据库并使用 dense_vector 类型,Elastic Search 数据库会根据编码构建索引并实现更高效的查找????。 看到这里你可能会觉得,就这么点吗?那该如何训练模型,让同一个人的编码更接近呢?这就是最难的部分了????,一开始大家想到的是以下的方式: # 计算损失的逻辑 loss1 = 同一个人的编码距离 loss2 = ReLU(常量A - 不同的人的编码距离) loss = loss1 + loss2 这样做看上去经过训练以后同一个人的编码会完全相同,而不同的人的编码距离最少需要大于常量A,但实际上这个方式很难训练成功,因为即使是同一个人,图片上的人脸角度、光线、脸色、以及背景都不一样,生成完全一样的编码会非常困难。 2015 年的 Facenet 论文 提出了使用 Triplet Loss 来训练人脸识别模型的手法,简单来说就是同时准备两个人的三张图片 (又称三元组),然后让同一个人的编码距离小于不同的人的编码距离,计算方式如下: loss = ReLU(同一个人的编码距离 + 常量A - 不同的人的编码距离) 看上去只是把前面的 loss1 放到了 loss2 的 ReLU 函数里面啊,对????,这么做了以后同一个人的编码距离不需要等于 0,只需要和不同的人的编码距离相差常量A即可,如下图所示: ![](https://img2020.cnblogs.com/blog/881857/202103/881857-20210324191039624-2101013930.png) 经过训练以后的编码分布大概会像下图,同一个人物的编码不会完全一样但会聚集在一起 (这就是一种通过机器学习实现聚类的方法????): 现在我们知道选取两个人的三张图片 (又称三元组),然后使用 Triplet Loss 计算损失即可训练模型聚类人脸,那应该怎样选取图片呢?简单的做法是随机选取图片,但随着训练次数增多,同一个人物的编码距离小于不同人物的编码距离的频率就越高,也即是说 loss 为 0 的频率越高,如果 90% 的 loss 为 0,那么就代表 90% 的计算都白费了。而且,这样训练出来的模型对于看上去相似但是不是同一个人的识别能力会比较弱。 更好的方法是记录上一次训练时各个图片的编码,先选取一张基础图片 (Anchor),然后选取”同一个人物但编码距离最远”的一张图片 (Hard Positive),和”不同的人但编码距离最近”的一张图片 (Hard Negative)。这样可以给模型尽可能大的压力来训练看上去不相似但是同一个人,和看上去相似但不是同一个人的识别能力。这个方法实现起来有一定的难度,因为: 如果训练的图片数量很多,例如上百万张,那么每次选取图片都需要计算基础图片的编码和上百万个编码之间的距离,计算量会非常庞大,训练起来像乌龟一样慢????如果你不小心把同一个人的图片放到其他人的文件夹,或者混杂一些质量比较垃圾的图片,这时候就好玩了,模型会想方设法的去适应这些图片,导致训练出来的模型不能泛化如果你一直给模型看很相似的人物然后告诉模型这不是同一个人 (有可能因为第二个原因,也有可能因为真是双胞胎????),那模型下次看到同一个人也会怀疑不是同一个,同样会影响模型的泛化能力 Facenet 中缓解这个问题的方法是把图片切分小批次 (Mini batch),然后在小批次中局部查找编码距离最近但不同的人,也会在小部分样本中随机选取批次外的人物。本文给出的实现将会使用另一种方法,具体看后面的介绍吧????。 顺道一提,Facenet 中使用的编码长度是 32,常量A的值是 0.2,本文的实现也会使用相同的参数,参考后面给出的代码叭。 基于分类的方法 基于指标的方法可以直接调整编码之间的距离,但选取三元组有一定的难度,并且随着数据量增多,选取时的计算量也会越多。基于分类的方法是另外一种途径,可以无须选取三元组而间接的调整编码之间的距离。模型的结构如下: 看起来只是在输出编码以后加一个单层线性模型,然后输出人物对应的分类,如果是同一个分类那么编码应该会更接近。如果忽视掉编码,把多层线性模型和单层线性连在一起,就是普通识别物体类型的模型。真的能行吗? 当然,没这么简单????,如文章开始提到过的,直接应用识别物体类型的模型到人脸上效果会很差,因为各个人的样本数量都不多,加上最后的单层线性模型只需要划分编码到分类而不需要聚集编码,训练出来的模型识别能力会很弱。训练出来的人脸分布可能会像下图一样: https://img2020.cnblogs.com/blog/881857/202103/881857-20210324191103721-1435705085.png 关键点在于计算损失的函数,普通识别物体类型的模型会使用 Softmax + CrossEntropyLoss,而识别人脸的模型则需要使用变种函数计算损失,本文不会详细介绍这些变种函数,如果你有兴趣可以参考 CosFace,SphereFace,和 ArcFace 的论文。 因为基于分类的方法速度更快,它更适合计算数量非常庞大的数据集,而这篇的例子收集到的人脸数据比较少,所有还是会采用基于指标的方法来实现,慢一点就慢一点吧????。 关于计算编码距离的补充 计算编码距离主要有两种方法,第一种是计算欧几里德距离 (Euclidean Distance),也就是在前面看到过的计算方法;第二种是计算余弦相似度 (Cosine Similarity),如果你参考基于分类的方法的论文会发现里面基本上都会使用余弦相似度计算。 使用 pytorch 计算欧几里德距离的例子如下,Triplet Loss 使用的时候会除掉 sqrt 的部分: >>> import torch >>> a = torch.tensor([1, 0, 0, 0.9, 0.1]) >>> b = torch.tensor([1, 0, 0.2, 0.8, 0]) >>> (a - b).pow(2).sum().sqrt() tensor(0.2449) // 结果等于 0 代表完全相同 使用 pytorch 计算余弦相似度的例子如下: >>> import torch >>> a = torch.tensor([1, 0, 0, 0.9, 0.1]) >>> b = torch.tensor([1, 0, 0.2, 0.8, 0]) >>> torch.nn.functional.cosine_similarity(a, b, dim=0) tensor(0.9836) // 相当于 >>> (a * b).sum() / (a.pow(2).sum().sqrt() * b.pow(2).sum().sqrt()) tensor(0.9836) // 结果等于 1 代表完全相同,结果等于 0 代表完全相反 实现人脸认证的方法使用以上的方法我们可以找到最接近的人脸,那这个人脸是否就是我们传入的人脸是同一个人呢?判断是否同一个人的依据可以是编码距离是否小于某个阈值,然而这个阈值是很难定义的。有个现象是,经过训练的同一个人的编码距离明显小于没有经过训练的同一个人的编码距离,人脸分布可能会如下: 一个比较好的方法是训练另外一个专门根据人脸编码距离判断是否同一个人的模型,这个模型只有一层线性模型,它会给编码中的每个指标乘以一个系数,然后加上偏移值,再交给 Sigmoid 转换到 0 ~ 1 之间的值,0 代表不是同一个人,1 代表是同一个人。 模型的定义如下,完整代码参考后面: self.verify_model = nn.Sequential( nn.Linear(32, 1), nn.Sigmoid()) 假设如果有以下的编码: >>> a = torch.tensor([1, 0, 0, 0.9, 0.1]) >>> b = torch.tensor([1, 0, 0.2, 0.8, 0]) >>> c = torch.tensor([1, 1, 0.9, 0.2, 1]) a 与 b,a 与 c 之间的距离可以用以下方式计算: >>> diff_1 = (a - b).pow(2) >>> diff_1 tensor([0.0000, 0.0000, 0.0400, 0.0100, 0.0100]) >>> diff_2 = (a - c).pow(2) >>> diff_2 tensor([0.0000, 1.0000, 0.8100, 0.4900, 0.8100]) 再假设模型参数如下: >>> w = torch.tensor([[-1.58, -2.96, -0.8, -0.1, -1.28]]).transpose(0, 1) >>> b = torch.tensor(3.68) 再应用到编码相差值就会发现 a 与 b 是同一个人的可能性很高 (接近 1),而 a 与 c 是同一个人的可能性很低 (接近 0): >>> torch.nn.functional.sigmoid(diff_1.unsqueeze(0).mm(w) + b) tensor([[0.9743]]) >>> torch.nn.functional.sigmoid(diff_2.unsqueeze(0).mm(w) + b) tensor([[0.2662]]) 训练人脸认证模型的代码会在后面给出。 看到这里你可能会问,为什么需要给编码中的指标分别训练不同的系数呢?不能直接用 sum 相加起来,再根据这个相加的值来判断吗?想想编码里面的内容代表了什么,模型为了区分人脸,需要给不同的人物分配不同的编码,而这个编码实际上就隐含了人物的属性,例如某个指标可能代表人物的性别,某个指标可能代表人物的年龄,某个指标可能代表人物的器官形状,这些指标的相差值有的会更重要,例如代表性别的指标不一致那就肯定是不同的人了,而代表年龄的指标不一致则还有余地。给每个指标分别训练不同的系数 (也可以称为权重) 可以更精准的判断是否同一个人。 准备训练使用的数据集好了,又到动手的时候了????。首先我们需要准备数据集,这次还是在 kaggle 上扒,一共有三个数据集符合要求,地址如下: https://www.kaggle.com/atulanandjha/lfwpeoplehttps://www.kaggle.com/vasukipatel/face-recognition-datasethttps://www.kaggle.com/hereisburak/pins-face-recognition 合计一共有 5855 个人和 33329 张图片,和其他公用数据集一样,里面大部分是白人,对亚洲人和黑人的效果会打个折????。训练人脸识别模型通常需要上百万张人脸,而这里只有三万多张,所以预计精确度会稍微低一些????。 需要注意的是,里面有相当一部分人物是只有一张图片的,这种人物会只拿来当负样本 (不同的人物) 使用。 此外,训练人脸识别模型的时候人脸的位置和占比要比较标准,数据质量会直接影响训练出来的正确率。以上三个数据集的人脸图片都是经过预处理的,不需要使用上一篇文章介绍的模型来调整中心点,但数据集的人脸占比不一样,所以会经过裁剪再参与训练。 裁剪比例分别是: 第一个数据集:中心 50%第二个数据集:不裁剪第三个数据集:中心 70% 运行后面的代码以后可以到 debug_faces 目录下查看裁剪后的人脸,内容大致如下: 完整代码到介绍完整代码的时候了????,以下代码包含了人脸识别模型和人脸认证模型,需要先训练人脸识别模型,再训练人脸认证模型,代码下面会给出一些实现细节的说明。 import os import sys import torch import gzip import itertools import random import numpy import math import json import torchvision from PIL import Image from torch import nn from matplotlib import pyplot from collections import defaultdict from functools import lru_cache # 缩放图片的大小 IMAGE_SIZE = (80, 80) # 训练使用的数据集路径 DATASET_1_DIR = "./dataset/lfwpeople/lfw_funneled" DATASET_2_DIR = "./dataset/face-recognition-dataset/Faces/Faces" DATASET_3_DIR = "./dataset/105_classes_pins_dataset" # 每一轮训练中样本的重复次数 REPEAT_SAMPLES = 2 # 用于对比的不同人物 (负样本) 数量 NEGATIVE_SAMPLES = 10 # 负样本中随机抽取的数量 NEGATIVE_RANDOM_SAMPLES = 3 # 跳过最接近的人脸数量 # 避免双胞胎问题: # 如果你给模型大量很相似的人脸 (有可能因为误标记,有可能因为图片质量很低,也有可能因为真相似) # 然后跟模型说不是同一个人,下次模型看到未经过训练的同一个人也会认为不是 # Facenet 论文中避免这个问题使用的方法是计算局部最接近的不同人物 # 而这里会计算全局最接近但跳过排在前面的人脸,数据量不多的时候可以这么做 NEGATIVE_SKIP_NEAREST = 20 # 识别同一人物最少要求的图片数量 MINIMAL_POSITIVE_SAMPLES = 2 # 处理图片前是否先转换为黑白图片 USE_GRAYSCALE = True # 用于启用 GPU 支持 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") class FaceRecognitionModel(nn.Module): """人脸识别模型,计算用于寻找最接近人脸的编码 (基于 ResNet 的变种)""" # 编码长度 EmbeddedSize = 32 # 要求不同人物编码之间的距离 (平方值合计) ExclusiveMargin = 0.2 def __init__(self): super().__init__() # Resnet 的实现 self.resnet = torchvision.models.resnet18(num_classes=256) # 支持黑白图片 if USE_GRAYSCALE: self.resnet.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False) # 最终输出编码的线性模型 # 因为 torchvision 的 resnet 最终会使用一个 Linear,这里省略掉第一个 Linear self.encode_model = nn.Sequential( nn.ReLU(inplace=True), nn.Linear(256, 128), nn.ReLU(inplace=True), nn.Linear(128, FaceRecognitionModel.EmbeddedSize)) def forward(self, x): tmp = self.resnet(x) y = self.encode_model(tmp) return y @staticmethod def loss_function(predicted): """损失计算器""" losses = [] verify_positive = torch.ones(1).to(device) verify_negative = torch.zeros(NEGATIVE_SAMPLES).to(device) for index in range(0, predicted.shape[0], 2 + NEGATIVE_SAMPLES): a = predicted[index] # 基础人物的编码 b = predicted[index+1] # 基础人物的编码 (另一张图片) c = predicted[index+2:index+2+NEGATIVE_SAMPLES] # 对比人物的编码 # 计算编码相差值 diff_positive = (a - b).pow(2).sum() diff_negative = (a - c).pow(2).sum(dim=1) # 计算损失 # 使用 Triplet Loss,要求同一人物编码距离和不同人物编码距离至少相差 ExclusiveMargin loss = nn.functional.relu( diff_positive - diff_negative + FaceRecognitionModel.ExclusiveMargin).sum() losses.append(loss) loss_total = torch.stack(losses).mean() return loss_total @staticmethod def calc_accuracy(predicted): """正确率计算器""" total_count = 0 correct_count = 0 for index in range(0, predicted.shape[0], 2 + NEGATIVE_SAMPLES): a = predicted[index] # 基础人物的编码 b = predicted[index+1] # 基础人物的编码 (另一张图片) c = predicted[index+2:index+2+NEGATIVE_SAMPLES] # 对比人物的编码 # 判断同一人物的编码是否小于不同人物的编码 diff_positive = (a - b).pow(2).sum() diff_negative = (a - c).pow(2).sum(dim=1) if (diff_positive < diff_negative).sum() == diff_negative.shape[0]: correct_count += 1 total_count += 1 return correct_count / total_count class FaceVerificationModel(nn.Module): """人脸认证模型,判断是否同一个人,参数是编码相差值的平方""" # 判断是否同一个人的阈值,实际使用模型时可以用更高的值防止误判 VerifyThreshold = 0.5 def __init__(self): super().__init__() # 判断是否同一人物的线性模型 self.verify_model = nn.Sequential( nn.Linear(FaceRecognitionModel.EmbeddedSize, 1), nn.Sigmoid()) def forward(self, x): # 经过训练后 weight 应该是负数,bias 应该是正数 y = self.verify_model(x) return y.view(-1) @staticmethod def loss_function(predicted): """损失计算器""" # 输出应该为 [ 同一人物, 不同人物, 不同人物, ..., 同一人物, 不同人物, 不同人物, ... ] # 这里需要分别计算正负损失,否则会因为负样本占多而引起 bias 被调整为负数 positive_indexes = [] negative_indexes = [] for index in list(range(0, predicted.shape[0], 1+NEGATIVE_SAMPLES)): positive_indexes.append(index) negative_indexes += list(range(index+1, index+1+NEGATIVE_SAMPLES)) positive_loss = nn.functional.mse_loss( predicted[positive_indexes], torch.ones(len(positive_indexes)).to(device)) negative_loss = nn.functional.mse_loss( predicted[negative_indexes], torch.zeros(len(negative_indexes)).to(device)) return (positive_loss + negative_loss) / 2 @staticmethod def calc_accuracy(predicted): """正确率计算器""" positive_correct = 0 positive_total = 0 negative_correct = 0 negative_total = 0 for index in range(0, predicted.shape[0], 1+NEGATIVE_SAMPLES): positive_correct += (predicted[index] >= FaceVerificationModel.VerifyThreshold).sum().item() negative_correct += (predicted[index+1:index+1+NEGATIVE_SAMPLES] < FaceVerificationModel.VerifyThreshold).sum().item() positive_total += 1 negative_total += NEGATIVE_SAMPLES # 因为负样本占大多数,这里返回正样本正确率和负样本正确率的平均值 return (positive_correct / positive_total + negative_correct / negative_total) / 2 def save_tensor(tensor, path): """保存 tensor 对象到文件""" torch.save(tensor, gzip.GzipFile(path, "wb")) # 为了减少读取时间这里缓存了读取的 tensor 对象 # 如果内存不够应该适当减少 maxsize @lru_cache(maxsize=10000) def load_tensor(path): """从文件读取 tensor 对象""" return torch.load(gzip.GzipFile(path, "rb")) def calc_resize_parameters(sw, sh): """计算缩放图片的参数""" sw_new, sh_new = sw, sh dw, dh = IMAGE_SIZE pad_w, pad_h = 0, 0 if sw / sh < dw / dh: sw_new = int(dw / dh * sh) pad_w = (sw_new - sw) // 2 # 填充左右 else: sh_new = int(dh / dw * sw) pad_h = (sh_new - sh) // 2 # 填充上下 return sw_new, sh_new, pad_w, pad_h def resize_image(img): """缩放图片,比例不一致时填充""" sw, sh = img.size sw_new, sh_new, pad_w, pad_h = calc_resize_parameters(sw, sh) img_new = Image.new("RGB", (sw_new, sh_new)) img_new.paste(img, (pad_w, pad_h)) img_new = img_new.resize(IMAGE_SIZE) return img_new def image_to_tensor_grayscale(img): """缩放并转换图片对象到 tensor 对象 (黑白)""" img = img.convert("L") # 转换到黑白图片并缩放 arr = numpy.asarray(img) t = torch.from_numpy(arr) t = t.unsqueeze(0) # 添加通道 t = t / 255.0 # 正规化数值使得范围在 0 ~ 1 return t def image_to_tensor_rgb(img): """缩放并转换图片对象到 tensor 对象 (彩色)""" img = img.convert("RGB") # 缩放图片 arr = numpy.asarray(img) t = torch.from_numpy(arr) t = t.transpose(0, 2) # 转换维度 H,W,C 到 C,W,H t = t / 255.0 # 正规化数值使得范围在 0 ~ 1 return t if USE_GRAYSCALE: image_to_tensor = image_to_tensor_grayscale else: image_to_tensor = image_to_tensor_rgb def prepare(): """准备训练""" # 数据集转换到 tensor 以后会保存在 data 文件夹下 if not os.path.isdir("data"): os.makedirs("data") # 截取后的人脸图片会保存在 debug_faces 文件夹下 if not os.path.isdir("debug_faces"): os.makedirs("debug_faces") # 查找人物和对应的图片路径列表 # { 人物名称: [ 图片路径, 图片路径, .. ] } images_map = defaultdict(lambda: []) def add_image(name, path): if os.path.splitext(path)[1].lower() not in (".jpg", ".png"): return name = name.replace(" ", "").replace("-", "").replace(".", "").replace("_", "").lower() images_map[name].append(path) for dirname in os.listdir(DATASET_1_DIR): dirpath = os.path.join(DATASET_1_DIR, dirname) if not os.path.isdir(dirpath): continue for filename in os.listdir(dirpath): add_image(dirname, os.path.join(DATASET_1_DIR, dirname, filename)) for filename in os.listdir(DATASET_2_DIR): add_image(filename.split("_")[0], os.path.join(DATASET_2_DIR, filename)) for dirname in os.listdir(DATASET_3_DIR): dirpath = os.path.join(DATASET_3_DIR, dirname) name = dirname.replace("pins_", "") if not os.path.isdir(dirpath): continue for filename in os.listdir(dirpath): add_image(name, os.path.join(DATASET_3_DIR, dirname, filename)) images_count = sum(map(len, images_map.values())) print(f"found {len(images_map)} peoples and {images_count} images") # 保存各个人物的图片数据 # 这里不翻转图片,因为人脸照片通常不会左右翻转,而且部分器官的特征会因左右有差异 img_index = 0 for index, (name, paths) in enumerate(images_map.items()): images = [] for path in paths: img = Image.open(path) # 裁剪图片让各个数据集的人脸占比更接近 if path.startswith(DATASET_1_DIR): w, h = img.size img = img.crop((int(w*0.25), int(h*0.25), int(w*0.75), int(h*0.75))) elif path.startswith(DATASET_3_DIR): w, h = img.size img = img.crop((int(w*0.15), int(h*0.15), int(w*0.85), int(h*0.85))) # 保存截取后的人脸图片以调试范围 img.save(f"debug_faces/{img_index}.png") img_index += 1 images.append(img) tensors = [ image_to_tensor(resize_image(img)) for img in images ] tensor = torch.stack(tensors) # 维度: (图片数量, 3, 宽度, 高度) save_tensor(tensor, os.path.join("data", f"{name}.{len(images)}.pt")) print(f"saved {index+1}/{len(images_map)} peoples") print("done") def train(): """开始训练人脸识别模型""" # 创建模型实例 model = FaceRecognitionModel().to(device) # 创建损失计算器 loss_function = model.loss_function # 创建参数调整器 optimizer = torch.optim.Adam(model.parameters()) # 记录训练集和验证集的正确率变化 training_accuracy_history = [] validating_accuracy_history = [] # 记录最高的验证集正确率 validating_accuracy_highest = -1 validating_accuracy_highest_epoch = 0 # 计算正确率的工具函数 calc_accuracy = model.calc_accuracy # 读取人物列表,区分图片数量足够的人物和图片数量不足的人物 # 图片数量不足的人物会作为负样本使用 filenames = os.listdir("data") multiple_samples = [] single_samples = [] for filename in filenames: if int(filename.split('.')[-2]) >= MINIMAL_POSITIVE_SAMPLES: multiple_samples.append(filename) else: single_samples.append(filename) random.shuffle(multiple_samples) random.shuffle(single_samples) total_multiple_samples = len(multiple_samples) total_single_samples = len(single_samples) # 分割训练集 (80%),验证集 (10%) 和测试集 (10%) training_set = multiple_samples[:int(total_multiple_samples*0.8)] training_set_single = single_samples[:int(total_single_samples*0.8)] validating_set = multiple_samples[int(total_multiple_samples*0.8):int(total_multiple_samples*0.9)] validating_set_single = single_samples[int(total_single_samples*0.8):int(total_single_samples*0.9)] testing_set = multiple_samples[int(total_multiple_samples*0.9):] testing_set_single = single_samples[int(total_single_samples*0.9):] # 训练集的各个人物对应的编码 (基于最后以后一次训练使用的图片) training_image_to_vector_index = {} training_vector_index_to_image = {} for filename in training_set + training_set_single: for image_index in range(int(filename.split('.')[1])): vector_index = len(training_image_to_vector_index) training_image_to_vector_index[(filename, image_index)] = vector_index training_vector_index_to_image[vector_index] = (filename, image_index) training_vectors = torch.zeros(len(training_image_to_vector_index), FaceRecognitionModel.EmbeddedSize) training_vectors_calculated_indices = set() # 生成用于训练的输入 # 返回 [ 基础图片, 同一人物图片 (正样本), 不同人物图片 (负样本), ... ] def generate_inputs(dataset_multiple, dataset_single, batch_size): # 获取已计算过的编码 is_training = dataset_multiple == training_set if is_training: calculated_index_list = list(training_vectors_calculated_indices) calculated_index_set = set(calculated_index_list) calculated_index_to_image = { ci: training_vector_index_to_image[vi] for ci, vi in enumerate(calculated_index_list) } training_vectors_calculated = training_vectors[calculated_index_list] # 枚举数据集,会重复 REPEAT_SAMPLES 次以减少随机选择导致的正确率浮动 image_tensors = [] vector_indices = [] for base_filename in dataset_multiple * REPEAT_SAMPLES: # 读取基础人物的图片 base_tensor = load_tensor(os.path.join("data", base_filename)) base_tensors = list(enumerate(base_tensor)) # 打乱顺序,然后两张两张图片的选取基础图片和正样本 random.shuffle(base_tensors) for index in range(0, len(base_tensors)-1, 2): # 添加基础图片和正样本到列表 anchor_image_index, anchor_tensor = base_tensors[index] positive_image_index, positive_tensor = base_tensors[index+1] image_tensors.append(anchor_tensor) image_tensors.append(positive_tensor) if is_training: vector_indices.append(training_image_to_vector_index[(base_filename, anchor_image_index)]) vector_indices.append(training_image_to_vector_index[(base_filename, positive_image_index)]) # 如果是训练集,则计算基础图片的编码与其他编码的距离 nearest_indices = [] if is_training: vector_index = training_image_to_vector_index[(base_filename, anchor_image_index)] if vector_index in calculated_index_set: nearest_indices = ((training_vectors_calculated - training_vectors[vector_index]).abs().sum(dim=1).sort().indices).tolist() # 选取负样本 # 如果是训练集则选取编码最接近的样本+随机样本作为负样本 # 如果是验证集和测试集则随机选取样本 if is_training and nearest_indices: negative_samples = NEGATIVE_SAMPLES - NEGATIVE_RANDOM_SAMPLES negative_random_samples = NEGATIVE_RANDOM_SAMPLES else: negative_samples = 0 negative_random_samples = NEGATIVE_SAMPLES negative_skip_nearest = NEGATIVE_SKIP_NEAREST for calculated_index in nearest_indices: if negative_samples <= 0: break filename, image_index = calculated_index_to_image[calculated_index] if filename == base_filename: continue # 跳过同一人物 if negative_skip_nearest > 0: negative_skip_nearest -= 1 continue # 跳过非常相似的人物 target_tensor = load_tensor(os.path.join("data", filename)) # 添加负样本到列表 image_tensors.append(target_tensor[image_index]) if is_training: vector_indices.append(training_image_to_vector_index[(filename, image_index)]) negative_samples -= 1 while negative_random_samples > 0: file_index = random.randint(0, len(dataset_multiple) + len(dataset_single) - 1) if file_index < len(dataset_multiple): filename = dataset_multiple[file_index] else: filename = dataset_single[file_index - len(dataset_multiple)] if filename == base_filename: continue # 跳过同一人物 target_tensor = load_tensor(os.path.join("data", filename)) image_index = random.randint(0, target_tensor.shape[0] - 1) # 添加负样本到列表 image_tensors.append(target_tensor[image_index]) if is_training: vector_indices.append(training_image_to_vector_index[(filename, image_index)]) negative_random_samples -= 1 assert negative_samples == 0 assert negative_random_samples == 0 # 如果图片数量大于批次大小,则返回批次 if len(image_tensors) >= batch_size: yield torch.stack(image_tensors).to(device), vector_indices image_tensors.clear() vector_indices.clear() if image_tensors: yield torch.stack(image_tensors).to(device), vector_indices # 开始训练过程 for epoch in range(0, 200): print(f"epoch: {epoch}") # 根据训练集训练并修改参数 # 切换模型到训练模式 model.train() training_accuracy_list = [] for index, (batch_x, vector_indices) in enumerate( generate_inputs(training_set, training_set_single, 400)): # 计算预测值 predicted = model(batch_x) # 计算损失 loss = loss_function(predicted) # 从损失自动微分求导函数值 loss.backward() # 使用参数调整器调整参数 optimizer.step() # 清空导函数值 optimizer.zero_grad() # 记录各个人物的编码 for vector_index, vector in zip(vector_indices, predicted): # 复制回 cpu 并去掉用于自动微分的计算路径信息 training_vectors[vector_index] = vector.to("cpu").detach() training_vectors_calculated_indices.add(vector_index) # 记录这一个批次的正确率,torch.no_grad 代表临时禁用自动微分功能 with torch.no_grad(): training_batch_accuracy = calc_accuracy(predicted) # 输出批次正确率 training_accuracy_list.append(training_batch_accuracy) print(f"epoch: {epoch}, batch: {index}, accuracy: {training_batch_accuracy}") training_accuracy = sum(training_accuracy_list) / len(training_accuracy_list) training_accuracy_history.append(training_accuracy) print(f"training accuracy: {training_accuracy}") # 检查验证集 # 切换模型到验证模式 model.eval() validating_accuracy_list = [] for batch_x, _ in generate_inputs(validating_set, validating_set_single, 100): predicted = model(batch_x) validating_batch_accuracy = calc_accuracy(predicted) validating_accuracy_list.append(validating_batch_accuracy) # 释放 predicted 占用的显存避免显存不足的错误 predicted = None validating_accuracy = sum(validating_accuracy_list) / len(validating_accuracy_list) validating_accuracy_history.append(validating_accuracy) print(f"validating accuracy: {validating_accuracy}") # 记录最高的验证集正确率与当时的模型状态,判断是否在多次训练后仍然没有刷新记录 # 因为验证集的负样本是随机选择的,允许 1% 的波动使得模型可以训练更多次 if (validating_accuracy + 0.01) > validating_accuracy_highest: if validating_accuracy > validating_accuracy_highest: validating_accuracy_highest = validating_accuracy print("highest validating accuracy updated") else: print("highest validating accuracy not dropped") validating_accuracy_highest_epoch = epoch save_tensor(model.state_dict(), "model.recognition.pt") elif epoch - validating_accuracy_highest_epoch > 20: # 在多次训练后仍然没有刷新记录,结束训练 print("stop training because validating accuracy dropped from highest in 20 epoches") break # 使用达到最高正确率时的模型状态 print(f"highest validating accuracy: {validating_accuracy_highest}", f"from epoch {validating_accuracy_highest_epoch}") model.load_state_dict(load_tensor("model.recognition.pt")) # 检查测试集 testing_accuracy_list = [] for batch_x, _ in generate_inputs(testing_set, testing_set_single, 100): predicted = model(batch_x) testing_batch_accuracy = calc_accuracy(predicted) testing_accuracy_list.append(testing_batch_accuracy) testing_accuracy = sum(testing_accuracy_list) / len(testing_accuracy_list) print(f"testing accuracy: {testing_accuracy}") # 显示训练集和验证集的正确率变化 pyplot.plot(training_accuracy_history, label="training_accuracy") pyplot.plot(validating_accuracy_history, label="validating_accuracy") pyplot.ylim(0, 1) pyplot.legend() pyplot.show() def train_verify(): """开始训练人脸认证模型""" # 创建人脸识别模型实例并加载训练好的参数 recognize_model = FaceRecognitionModel().to(device) recognize_model.load_state_dict(load_tensor("model.recognition.pt")) recognize_model.eval() # 创建人脸认证模型实例 model = FaceVerificationModel().to(device) # 创建损失计算器 loss_function = model.loss_function # 创建参数调整器 optimizer = torch.optim.Adam(model.parameters()) # 记录训练集和验证集的正确率变化 training_accuracy_history = [] validating_accuracy_history = [] # 记录最高的验证集正确率 validating_accuracy_highest = -1 validating_accuracy_highest_epoch = 0 # 计算正确率的工具函数 calc_accuracy = model.calc_accuracy # 读取人物列表,区分图片数量足够的人物和图片数量不足的人物 # 图片数量不足的人物会作为负样本使用 filenames = os.listdir("data") multiple_samples = [] single_samples = [] for filename in filenames: if int(filename.split('.')[-2]) >= MINIMAL_POSITIVE_SAMPLES: multiple_samples.append(filename) else: single_samples.append(filename) random.seed(123) # 让这里的顺序跟训练人脸识别模型时的顺序不一样 random.shuffle(multiple_samples) random.shuffle(single_samples) total_multiple_samples = len(multiple_samples) total_single_samples = len(single_samples) # 分割训练集 (80%),验证集 (10%) 和测试集 (10%) training_set = multiple_samples[:int(total_multiple_samples*0.8)] training_set_single = single_samples[:int(total_single_samples*0.8)] validating_set = multiple_samples[int(total_multiple_samples*0.8):int(total_multiple_samples*0.9)] validating_set_single = single_samples[int(total_single_samples*0.8):int(total_single_samples*0.9)] testing_set = multiple_samples[int(total_multiple_samples*0.9):] testing_set_single = single_samples[int(total_single_samples*0.9):] # 编码的缓存 { (文件名,索引值): 编码 } vector_cache = {} # 根据图片获取编码 def get_vector(filename, image_index, image_tensor): key = (filename, image_index) vector = vector_cache.get(key) if vector is None: with torch.no_grad(): vector = recognize_model(image_tensor.unsqueeze(0).to(device))[0].to("cpu") vector_cache[key] = vector return vector # 生成用于训练的输入 # 返回 [ 同一人物编码的差异, 不同人物编码的差异, ... ] def generate_inputs(dataset_multiple, dataset_single, batch_size): # 枚举数据集,会重复 REPEAT_SAMPLES 次以减少随机选择导致的正确率浮动 diff_tensors = [] for base_filename in dataset_multiple * REPEAT_SAMPLES: # 读取基础人物的图片 base_tensor = load_tensor(os.path.join("data", base_filename)) base_tensors = list(enumerate(base_tensor)) # 打乱顺序,然后两张两张图片的选取基础图片和正样本 random.shuffle(base_tensors) for index in range(0, len(base_tensors)-1, 2): # 计算基础图片和正样本的编码差异并添加到列表 anchor_image_index, anchor_tensor = base_tensors[index] positive_image_index, positive_tensor = base_tensors[index+1] anchor_vector = get_vector(base_filename, anchor_image_index, anchor_tensor) positive_vector = get_vector(base_filename, positive_image_index, positive_tensor) diff_tensors.append((anchor_vector - positive_vector).pow(2)) # 随机选取负样本,计算差异并添加到列表 negative_random_samples = NEGATIVE_SAMPLES while negative_random_samples > 0: file_index = random.randint(0, len(dataset_multiple) + len(dataset_single) - 1) if file_index < len(dataset_multiple): filename = dataset_multiple[file_index] else: filename = dataset_single[file_index - len(dataset_multiple)] if filename == base_filename: continue # 跳过同一人物 target_tensor = load_tensor(os.path.join("data", filename)) image_index = random.randint(0, target_tensor.shape[0] - 1) negative_vector = get_vector(filename, image_index, target_tensor[image_index]) diff_tensors.append((anchor_vector - negative_vector).pow(2)) negative_random_samples -= 1 # 如果差异数量大于批次大小,则返回批次 if len(diff_tensors) >= batch_size: yield torch.stack(diff_tensors).to(device) diff_tensors.clear() if diff_tensors: yield torch.stack(diff_tensors).to(device) # 开始训练过程 for epoch in range(1, 20): print(f"epoch: {epoch}") # 根据训练集训练并修改参数 # 切换模型到训练模式 model.train() training_accuracy_list = [] for index, batch_x in enumerate( generate_inputs(training_set, training_set_single, 400)): # 计算预测值 predicted = model(batch_x) # 计算损失 loss = loss_function(predicted) # 从损失自动微分求导函数值 loss.backward() # 使用参数调整器调整参数 optimizer.step() # 清空导函数值 optimizer.zero_grad() # 记录这一个批次的正确率,torch.no_grad 代表临时禁用自动微分功能 with torch.no_grad(): training_batch_accuracy = calc_accuracy(predicted) # 输出批次正确率 training_accuracy_list.append(training_batch_accuracy) print(f"epoch: {epoch}, batch: {index}, accuracy: {training_batch_accuracy}") training_accuracy = sum(training_accuracy_list) / len(training_accuracy_list) training_accuracy_history.append(training_accuracy) print(f"training accuracy: {training_accuracy}") # 检查验证集 # 切换模型到验证模式 model.eval() validating_accuracy_list = [] for batch_x in generate_inputs(validating_set, validating_set_single, 100): predicted = model(batch_x) validating_batch_accuracy = calc_accuracy(predicted) validating_accuracy_list.append(validating_batch_accuracy) # 释放 predicted 占用的显存避免显存不足的错误 predicted = None validating_accuracy = sum(validating_accuracy_list) / len(validating_accuracy_list) validating_accuracy_history.append(validating_accuracy) print(f"validating accuracy: {validating_accuracy}") # 记录最高的验证集正确率与当时的模型状态,判断是否在多次训练后仍然没有刷新记录 # 因为验证集的负样本是随机选择的,允许 1% 的波动使得模型可以训练更多次 if (validating_accuracy + 0.01) > validating_accuracy_highest: if validating_accuracy > validating_accuracy_highest: validating_accuracy_highest = validating_accuracy print("highest validating accuracy updated") else: print("highest validating accuracy not dropped") validating_accuracy_highest_epoch = epoch save_tensor(model.state_dict(), "model.verification.pt") elif epoch - validating_accuracy_highest_epoch > 20: # 在多次训练后仍然没有刷新记录,结束训练 print("stop training because validating accuracy dropped from highest in 20 epoches") break # 使用达到最高正确率时的模型状态 print(f"highest validating accuracy: {validating_accuracy_highest}", f"from epoch {validating_accuracy_highest_epoch}") model.load_state_dict(load_tensor("model.verification.pt")) # 检查测试集 testing_accuracy_list = [] for batch_x in generate_inputs(testing_set, testing_set_single, 100): predicted = model(batch_x) testing_batch_accuracy = calc_accuracy(predicted) testing_accuracy_list.append(testing_batch_accuracy) testing_accuracy = sum(testing_accuracy_list) / len(testing_accuracy_list) print(f"testing accuracy: {testing_accuracy}") # 显示训练集和验证集的正确率变化 pyplot.plot(training_accuracy_history, label="training_accuracy") pyplot.plot(validating_accuracy_history, label="validating_accuracy") pyplot.ylim(0, 1) pyplot.legend() pyplot.show() def main(): """主函数""" if len(sys.argv) < 2: print(f"Please run: {sys.argv[0]} prepare|train") exit() # 给随机数生成器分配一个初始值,使得每次运行都可以生成相同的随机数 # 这是为了让过程可重现,你也可以选择不这样做 random.seed(0) torch.random.manual_seed(0) # 根据命令行参数选择操作 operation = sys.argv[1] if operation == "prepare": prepare() elif operation == "train": train() elif operation == "train-verify": train_verify() else: raise ValueError(f"Unsupported operation: {operation}") if __name__ == "__main__": main() 以下是一些实现上的细节。 因为图片数量不多,在处理图片之前会先转换到黑白图片和缩放到 80x80 以避免过拟合,如果图片数量足够可以不需要转换到黑白和使用更高的像素值跟 Facenet 论文不一样,这个实现会全局查找不同人物但编码距离最接近的人脸,同时跳过前 20 个人脸跟 Facenet 论文一样,这个实现的编码长度是 32,要求不同人物的距离是 0.2这个实现选择人物的时候不会只选择 3 张图片,而是选择 1 张基础图片,1 张正样本图片 (同一个人) 和 10 张负样本图片 (不同的人),这样可以让计算速度更快人脸识别模型的骨干网络还是用了 ResNet,如果有兴趣可以试试 Facenet 使用的 Inception 模型,torchvision 同样有提供人脸识别模型的正确率会依据选取的”基础图片和正样本图片的编码距离”是否小于”基础图片和所有负样本图片的编码距离”计算因为随机选择会导致验证集正确率浮动,这里会允许最高正确率浮动 1%,并且最多训练次数会有比较低的限制 把代码保存到 face_recognition.py,然后按以下文件夹结构存放: face-recognition dataset 105_classes_pins_dataset face-recognition-dataset Faces lfwpeople lfw_funneled face_recognition.py 再执行以下命令即可训练人脸识别模型: python3 face_recognition.py prepare python3 face_recognition.py train 最终输出结果如下,大概 83%~85% 的最接近的人脸是同一个人????: epoch: 199, batch: 646, accuracy: 1.0 epoch: 199, batch: 647, accuracy: 1.0 epoch: 199, batch: 648, accuracy: 1.0 training accuracy: 0.9798332275899607 validating accuracy: 0.8293447293447292 highest validating accuracy updated highest validating accuracy: 0.8293447293447292 from epoch 199 testing accuracy: 0.8504521963824275 接下来再执行以下命令即可训练人脸认证模型: python3 face_recognition.py train-verify 最终输出结果如下,令人吃惊的是识别是否同一个人的精度达到了 96%~97% (这还是正负样本正确率分别计算再平均后的值),这说明了即使欧几里德距离最接近也不一定是同一个人,人脸认证模型给各项指标分配不同的系数来计算是有一定效果的。后面我们应用模型的时候,会先选取最接近的 N 张人脸,再使用人脸认证模型来一个个判断是否同一个人。 epoch: 19, batch: 569, accuracy: 0.9283783783783783 epoch: 19, batch: 570, accuracy: 0.9567567567567568 epoch: 19, batch: 571, accuracy: 0.9304347826086956 training accuracy: 0.9617334193421163 validating accuracy: 0.9590857605178013 highest validating accuracy not dropped highest validating accuracy: 0.9623381877022668 from epoch 19 testing accuracy: 0.9714264705882371 和之前几篇文章不一样的是,这份代码没有给出 eval_model 函数,这是因为人脸图片通常都需要经过预处理,不会直接交给模型使用。下面将会介绍如何结合 faster-rcnn 模型检测图片中的人脸,再根据上一篇文章介绍的脸部关键点检测模型调整人脸范围,最后截取人脸交给人脸识别模型和人脸认证模型。 结合前几篇的模型实现人脸检测 + 识别假设我们想识别某张图片上的人是谁,组合本文介绍的模型和前几篇文章介绍的模型即可得出以下的流程: 首先我们把三个模型的代码和训练好的参数文件 (model.pt) 按以下结构存放 faster-rcnn fasterrcnn.py: 来源在这里 model.pt: 可以到这里下载,下载后需要重命名 (faster-rcnn-10.5-model.pt => model.pt) face-landmark face_landmark.py: 来源在这里 model.pt: 可以到这里下载,下载后需要重命名 (face-landmark-model.pt => model.pt) face-recognition face_recognition.py: 来源在这篇文章的上面 model.recognition.pt: 可以到这里下载,下载后需要重命名 (face-recognition-model.recognition.pt => model.recognition.pt) model.verification.pt: 可以到这里下载,下载后需要重命名 (face-recognition-model.verification.pt => model.verification.pt) 之后我们需要准备构建人脸编码数据库的人脸,也就是识别的对象,这里我为了方便测试收集了一些体坛巨星的照片????,可以到这里下载。 然后就是结合这些模型的代码了: import sys import os import torch from collections import defaultdict from PIL import Image, ImageDraw, ImageFont # 加载模型所在的模块 sys.path.append("../face-landmark") sys.path.append("../face-recognition") import fasterrcnn import face_landmark import face_recognition # 构建编码数据库使用的文件夹 # 要求图片路径是 {SAMPLES_DIR}/{人物名称}/图片名 SAMPLES_DIR = "./face_recognition_samples" # 输出人脸图片使用的文件夹 FACES_OUTPUT_DIR = "./detected_faces" # 编码数据库的保存路径 # 如果需要重新构建请删除此文件 VECTORS_PATH = "./face_recognition_vectors.pt" # 使用关键点调整脸部范围的次数 ADJUST_FACE_REGION_BY_LANDMARK_TIMES = 3 # 脸部范围相对关键点范围的比例 FACE_REGION_RATIO_BY_LANDMARK = 3.0 # 判断是同一人物需要的分数阈值 FACE_VERIFICATION_THRESHOLD = 0.92 # 用于启用 GPU 支持 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") def adjust_face_region(image, region, face_landmark_model): """使用关键点调整脸部范围""" x, y, w, h = region child_img = image.crop((x, y, x+w, y+h)) points = [] for _ in range(ADJUST_FACE_REGION_BY_LANDMARK_TIMES): points = face_landmark_model.detect_landmarks([child_img])[0] min_x = min(p[0] for p in points) min_y = min(p[1] for p in points) max_x = max(p[0] for p in points) max_y = max(p[1] for p in points) landmark_w = max_x - min_x landmark_h = max_y - min_y center_x = min_x + landmark_w * 0.5 + x center_y = min_y + landmark_h * 0.5 + y radius = max(landmark_w, landmark_h) * FACE_REGION_RATIO_BY_LANDMARK / 2 x0 = int(min(max(center_x - radius, 0), image.size[0]-1)) x1 = int(min(max(center_x + radius, 0), image.size[0]-1)) y0 = int(min(max(center_y - radius, 0), image.size[1]-1)) y1 = int(min(max(center_y + radius, 0), image.size[1]-1)) x_diff = x0 - x y_diff = y0 - y x, y, w, h = x0, y0, x1-x0, y1-y0 points = [ (px - x_diff, py - y_diff) for px, py in points ] if w == 0 or h == 0: # 检测关键点失败,返回原始范围 return child_img, region, [] child_img = image.crop((x, y, x+w, y+h)) return child_img, (x, y, w, h), points def build_vectors(faster_rcnn_model, face_landmark_model, face_recognition_model): """根据人物和图片构建编码数据库""" print("building face recognition vectors from samples") names = [] images = [] result_names = [] result_vectors = [] batch_size = 10 def process_images(): # 查找人脸区域 tensor_in = torch.stack([ fasterrcnn.image_to_tensor(fasterrcnn.resize_image(img)) for img in images ]).to(device) cls_result = faster_rcnn_model(tensor_in)[-1] # 截取各图片中的人脸区域 face_names = [] face_images = [] for name, image, result in zip(names, images, cls_result): if not result: print("no face found for", name) continue result = fasterrcnn.MyModel.merge_predicted_result(result) # 按分数排序并选取分数最高的区域 result.sort(key=lambda r: -r[2]) label, box, rpn_score, cls_score = result[0] x, y, w, h = fasterrcnn.map_box_to_original_image(box, *image.size) if w == 0 or h == 0: print("no face found for", name, "due to incorrect size") continue # 使用脸部关键点调整脸部范围 child_img, _, points = adjust_face_region(image, (x, y, w, h), face_landmark_model) # 保存检测结果用于调试 if not os.path.isdir(FACES_OUTPUT_DIR): os.makedirs(FACES_OUTPUT_DIR) face_landmark.draw_points(child_img, points) child_img.convert("RGB").save(os.path.join( FACES_OUTPUT_DIR, f"{name[0]}_{os.path.splitext(name[1])[0]}.png")) face_names.append(name) face_images.append(child_img) # 转换人脸图片到编码 tensor_in = torch.stack([ face_recognition.image_to_tensor(face_recognition.resize_image(img)) for img in face_images ]).to(device) face_vectors = face_recognition_model(tensor_in) # 添加名称和编码到结果列表 for name in face_names: result_names.append(name) result_vectors.append(face_vectors) names.clear() images.clear() for dirname in os.listdir(SAMPLES_DIR): dirpath = os.path.join(SAMPLES_DIR, dirname) if not os.path.isdir(dirpath): continue for filename in os.listdir(dirpath): if os.path.splitext(filename)[1].lower() not in (".jpg", ".png"): continue names.append((dirname, filename)) images.append(Image.open(os.path.join(dirpath, filename))) if len(images) >= batch_size: process_images() if images: process_images() # 整合编码列表 result_vector = torch.cat(result_vectors, dim=0) # 保存名称和编码 fasterrcnn.save_tensor((result_names, result_vector), VECTORS_PATH) peoples_count = len(set(n[0] for n in result_names)) print(f"built vectors for {peoples_count} peoples and {result_vector.shape[0]} images") def recognize_face( faster_rcnn_model, face_landmark_model, face_recognition_model, face_verification_model, image, vectors, top_range=10): """识别人脸位置与身份""" # 查找人脸区域 tensor_in = fasterrcnn.image_to_tensor( fasterrcnn.resize_image(image)).unsqueeze(0).to(device) cls_result = faster_rcnn_model(tensor_in)[-1] # 按分数排序并选取分数最高的区域 result = cls_result[0] if not result: return None result = fasterrcnn.MyModel.merge_predicted_result(result) result.sort(key=lambda r: -r[2]) label, box, rpn_score, cls_score = result[0] x, y, w, h = fasterrcnn.map_box_to_original_image(box, *image.size) if w == 0 or h == 0: return None # 使用脸部关键点调整脸部范围 child_img, (x, y, w, h), points = adjust_face_region( image, (x, y, w, h), face_landmark_model) # 生成人脸编码 tensor_in = face_recognition.image_to_tensor( face_recognition.resize_image(child_img)).unsqueeze(0).to(device) vector = face_recognition_model(tensor_in)[0] # 比较人脸编码,得出最接近的 N 张人脸 diff = (vector - vectors).pow(2) diff_sorted = diff.sum(dim=1).sort() indices = diff_sorted.indices[:top_range].tolist() # 使用人脸认证模型匹配人脸,身份不明时返回 None highest_score = None highest_index = None scores = face_verification_model(diff[indices]).tolist() for index, score in zip(indices, scores): if score < FACE_VERIFICATION_THRESHOLD: continue if highest_score and score <= highest_score: continue highest_index = index highest_score = score # 返回人脸图片,人脸范围,人脸关键点,匹配出来的身份,分数 # 没有匹配到身份时,身份和分数会为 None return child_img, (x, y, w, h), points, highest_index, highest_score def main(): """组合模型实现人脸检测 + 识别 + 认证""" # 让 fasterrcnn 合并区域时不使用 NMS 算法,使得结果区域更大 fasterrcnn.USE_NMS_ALGORITHM = False # 加载 fasterrcnn 模型 faster_rcnn_model = fasterrcnn.MyModel().to(device) faster_rcnn_model.load_state_dict(fasterrcnn.load_tensor("model.pt")) faster_rcnn_model.eval() # 加载人脸关键点识别模型 face_landmark_model = face_landmark.FaceLandmarkModel().to(device) face_landmark_model.load_state_dict(fasterrcnn.load_tensor("../face-landmark/model.pt")) face_landmark_model.eval() # 加载人脸识别模型 face_recognition_model = face_recognition.FaceRecognitionModel().to(device) face_recognition_model.load_state_dict(face_recognition.load_tensor("../face-recognition/model.recognition.pt")) face_recognition_model.eval() # 加载人脸验证模型 face_verification_model = face_recognition.FaceVerificationModel().to(device) face_verification_model.load_state_dict(face_recognition.load_tensor("../face-recognition/model.verification.pt")) face_verification_model.eval() # 根据人物和图片构建编码数据库 if not os.path.isfile(VECTORS_PATH): build_vectors(faster_rcnn_model, face_landmark_model, face_recognition_model) # 读取编码数据库 names, vectors = fasterrcnn.load_tensor(VECTORS_PATH) # 询问图片路径,并识别人脸 font = ImageFont.truetype("wqy-microhei.ttc", 20) while True: try: # 打开图片 image_path = input("Image path: ") if not image_path: continue image = Image.open(image_path) # 识别人脸 result = recognize_face( faster_rcnn_model, face_landmark_model, face_recognition_model, face_verification_model, image, vectors) if not result: print("no face found\n") continue # 根据结果标记到图片上 child_img, (x, y, w, h), points, index, score = result points = [ (px + x, py + y) for px, py in points ] label = f"{names[index][0]}: {score:.2f}" if index else "Unknown" print("result:", label) color = "#FF0000" draw = ImageDraw.Draw(image) draw.rectangle((x, y, x+w, y+h), outline=color, width=3) draw.text((x, y-20), label, fill=color, font=font) face_landmark.draw_points(image, points) image.save("img_output.png") print("saved to img_output.png") print() except Exception as e: print("error:", e) if __name__ == "__main__": main() 使用脸部关键点调整脸部范围的代码在 adjust_face_region 函数中,构建人物编码数据库的代码在 build_vectors 函数中,而识别人脸的代码在 recognize_face 函数中。 把代码和人物图片按以下的文件夹结构保存: faster-rcnn face_recognition_samples 人物名称 图片1.jpg 图片2.jpg 人物名称 图片1.jpg 图片2.jpg 图片3.jpg fasterrcnn.py fasterrcnn_face_recognition.py (上面给出的代码) model.pt face-landmark face_landmark.py model.pt face-recognition face_recognition.py model.recognition.pt model.verification.pt 然后再执行以下命令即可尝试人脸识别功能: cd faster-rcnn python3 fasterrcnn_face_recognition.py 首次运行的时候会根据 face_recognition_samples 文件夹下的人物构建编码数据库,如果想重新构建可以删掉 face_recognition_vectors.pt 再运行。 构建编码数据库的时候会同时输出截取的人脸图片到 detected_faces 文件夹下方便除错,以下是文件夹的内容: 以下是成功识别的结果????: 不在数据库的人物的识别结果: 经过试验精度还是不错的????。 本篇文章介绍的模型可以用在考勤,访客管理等系统中,但不应该用于支付和解锁,支付和解锁使用的人脸识别模型要求精度更高,并且很大可能要求构建三维模型而不是简单的识别图片????。此外,识别逃犯反而不要求太高的准确度,因为逃犯通常会打扮的和数据库中的照片不一样,只要识别出可疑人物即可通知警察人工判断,如果真的看上去是同一个人,就可以跟踪抓捕了????。 写在最后这个系列预计再写一两篇就会发目录给出版社,到时候也会重新编排内容让学习更加简单,继续等等吧????。 本系列将陆续更新 文章作者:老农的博客 文章来源:https://www.cnblogs.com/zkweb/p/14596373.html

0

机器学习

AI小助手·2021-04-28 15:02 0 阅读 89
没有更多了