【tensorflow2.0】使用spark-scala调用tensorflow2.0训练好的模型

本篇文章介绍在spark中调用训练好的tensorflow模型进行预测的方法。

本文内容的学习需要一定的spark和scala基础。

如果使用pyspark的话会比较简单,只需要在每个excutor上用Python加载模型分别预测就可以了。

但工程上为了性能考虑,通常使用的是scala版本的spark。

本篇文章我们通过TensorFlow for Java 在spark中调用训练好的tensorflow模型。

利用spark的分布式计算能力,从而可以让训练好的tensorflow模型在成百上千的机器上分布式并行执行模型推断。

〇,spark-scala调用tensorflow模型概述

在spark(scala)中调用tensorflow模型进行预测需要完成以下几个步骤。

(1)准备protobuf模型文件

(2)创建spark(scala)项目,在项目中添加java版本的tensorflow对应的jar包依赖

(3)在spark(scala)项目中driver端加载tensorflow模型调试成功

(4)在spark(scala)项目中通过RDD在excutor上加载tensorflow模型调试成功

(5) 在spark(scala)项目中通过DataFrame在excutor上加载tensorflow模型调试成功

一,准备protobuf模型文件

我们使用tf.keras 训练一个简单的线性回归模型,并保存成protobuf文件。

import tensorflow as tf
from tensorflow.keras  models,layers,optimizers
 
## 样本数量
n = 800
 
# 生成测试用数据集
X = tf.random.uniform([n,2],minval=-10,maxval=10) 
w0 = tf.constant([[2.0],[-1.0]])
b0 = tf.constant(3.0)
 
Y = X@w0 + b0 + tf.random.normal([n,1],mean = 0.0,stddev= 2.0)   @表示矩阵乘法,增加正态扰动
 
# 建立模型
tf.keras.backend.clear_session()
inputs = layers.Input(shape = (2,),name ="inputs") 设置输入名字为inputs
outputs = layers.Dense(1,name = outputs")(inputs) 设置输出名字为outputs
linear = models.Model(inputs = inputs,outputs = outputs)
linear.summary()
 
# 使用fit方法进行训练
linear.compile(optimizer=rmsprop",loss=msemae"])
linear.fit(X,Y,batch_size = 8,epochs = 100)  
 
tf.print(w = ].kernel)
tf.b = ].bias)
 
# 将模型保存成pb格式文件
export_path = ./data/linear_model/
version = 1"       后续可以通过版本号进行模型版本迭代与管理
linear.save(export_path+version,save_format=tf) 
!ls {export_path+version}
 查看模型文件相关信息
!saved_model_cli show --dir {export_path+str(version)} --all

模型文件信息中这些标红的部分都是后面有可能会用到的。

二,创建spark(scala)项目,在项目中添加java版本的tensorflow对应的jar包依赖 

如果使用maven管理项目,需要添加如下 jar包依赖

<!-- https://mvnrepository.com/artifact/org.tensorflow/tensorflow -->
<dependency>
    <groupId>org.tensorflow</groupId>
    <artifactId>tensorflow</artifactId>
    <version>1.15.0</version>
</dependency>

也可以从下面网址中直接下载 org.tensorflow.tensorflow的jar包

以及其依赖的org.tensorflow.libtensorflow 和 org.tensorflowlibtensorflow_jni的jar包 放到项目中。

https://mvnrepository.com/artifact/org.tensorflow/tensorflow/1.15.0

三, 在spark(scala)项目中driver端加载tensorflow模型调试成功

我们的示范代码在jupyter notebook中进行演示,需要安装toree以支持spark(scala)。

import scala.collection.mutable.WrappedArray
import org.{tensorflow=>tf}
 
//注:load函数的第二个参数一般都是“serve”,可以从模型文件相关信息中找到
 
val bundle = tf.SavedModelBundle 
   .load("/Users/liangyun/CodeFiles/eat_tensorflow2_in_30_days/data/linear_model/1","serve")
 
注:在java版本的tensorflow中还是类似tensorflow1.0中静态计算图的模式,需要建立Session,指定feed的数据和fetch的结果,然后 run.
//注:如果有多个数据需要喂入,可以连续用用多个feed方法注:输入必须是float类型
 
val sess = bundle.session()
val x = tf.Tensor.create(Array(Array(1.0f,2.0f),Array(2.0f,3.0f)))
val y =  sess.runner().feed("serving_default_inputs:0",x)
         .fetch("StatefulPartitionedCall:0").run().get(0)
 
val result = Array.ofDim[Float](y.shape()(0).toInt,y.shape()(1).toInt)
y.copyTo(result)
 
if(x != null) x.close()
if(y != ) y.close()
if(sess != ) sess.close()
if(bundle != ) bundle.close()  
 
result

输出如下:

Array(Array(3.019596), Array(3.9878292))

四,在spark(scala)项目中通过RDD在excutor上加载tensorflow模型调试成功

下面我们通过广播机制将Driver端加载的TensorFlow模型传递到各个excutor上,并在excutor上分布式地调用模型进行推断。

import org.apache.spark.sql.SparkSession
import scala.collection.mutable.WrappedArray
import org.{tensorflow=>tf}
 
val spark = SparkSession
    .builder()
    .appName("TfRDD")
    .enableHiveSupport()
    .getOrCreate()
 
val sc = spark.sparkContext
 
在Driver端加载模型
val bundle = tf.SavedModelBundle 
   .load("/Users/liangyun/CodeFiles/master_tensorflow2_in_20_hours/data/linear_model/1",1)">利用广播将模型发送到excutor上
val broads = sc.broadcast(bundle)
 
构造数据集
val rdd_data = sc.makeRDD(List(Array(1.0f,Array(3.0f,5.0f),Array(6.0f,7.0f),Array(8.0f,1)">.0f)))
 
通过mapPartitions调用模型进行批量推断
val rdd_result = rdd_data.mapPartitions(iter => {
 
    val arr = iter.toArray
    val model = broads.value
    val sess = model.session()
    val x = tf.Tensor.create(arr)
    val y =  sess.runner().feed("serving_default_inputs:0")
 
    将预测结果拷贝到相同shape的Float类型的Array中
    val result = Array.ofDim[Float](y.shape()(0).toInt,1)">).toInt)
    y.copyTo(result)
    result.iterator
 
})
 
 
rdd_result.take(5)
bundle.close

输出如下:

Array(Array(3.019596), Array(3.9264367), Array(7.8607616), Array(15.974984)

五, 在spark(scala)项目中通过DataFrame在excutor上加载tensorflow模型调试成功

除了可以在Spark的RDD数据上调用tensorflow模型进行分布式推断,

我们也可以在DataFrame数据上调用tensorflow模型进行分布式推断。

主要思路是将推断方法注册成为一个sparkSQL函数。

tf}
 
object TfDataFrame extends Serializable{
 
 
    def main(args:Array[String]):Unit = {
 
        val spark = SparkSession
        .builder()
        .appName("TfDataFrame")
        .enableHiveSupport()
        .getOrCreate()
        val sc = spark.sparkContext
 
 
        import spark.implicits._
 
        val bundle = tf.SavedModelBundle 
           .load("/Users/liangyun/CodeFiles/master_tensorflow2_in_20_hours/data/linear_model/1",1)">)
 
        val broads = sc.broadcast(bundle)
 
        构造预测函数,并将其注册成sparkSQL的udf
        val tfpredict = (features:WrappedArray[Float])  => {
            val bund = broads.value
            val sess = bund.session()
            val x = tf.Tensor.create(Array(features.toArray))
            val y =  sess.runner().feed("serving_default_inputs:0")
            val result = Array.ofDim[Float](y.shape()(0).toInt,1)">).toInt)
            y.copyTo(result)
            val y_pred = result(0)(0)
            y_pred
        }
        spark.udf.register("tfpredict"构造DataFrame数据集,将features放到一列中
        val dfdata = sc.parallelize(List(Array(1.0f,Array(7.0f,8.0f))).toDF("features")
        dfdata.show 
 
        调用sparkSQL预测函数,增加一个新的列作为y_preds
        val dfresult = dfdata.selectExpr("features","tfpredict(features) as y_preds")
        dfresult.show 
        bundle.close
    }
}
TfDataFrame.main(Array())
+----------+
|  features|
+----------+
|[1.0, 2.0]|
|[3.0, 5.0]|
|[7.0, 8.0]|
+----------+
 
+----------+---------+
|  features|  y_preds|
+----------+---------+
|[1.0, 2.0]| 3.019596|
|[3.0, 5.0]|3.9264367|
|[7.0, 8.0]| 8.828995|
+----------+---------+

以上我们分别在spark 的RDD数据结构和DataFrame数据结构上实现了调用一个tf.keras实现的线性回归模型进行分布式模型推断。

在本例基础上稍作修改则可以用spark调用训练好的各种复杂的神经网络模型进行分布式模型推断。

但实际上tensorflow并不仅仅适合实现神经网络,其底层的计算图语言可以表达各种数值计算过程。

利用其丰富的低阶API,我们可以在tensorflow2.0上实现任意机器学习模型,

结合tf.Module提供的便捷的封装功能,我们可以将训练好的任意机器学习模型导出成模型文件并在spark上分布式调用执行。

这无疑为我们的工程应用提供了巨大的想象空间。

 

参考:

开源电子书地址:https://lyhue1991.github.io/eat_tensorflow2_in_30_days/

GitHub 项目地址:https://github.com/lyhue1991/eat_tensorflow2_in_30_days

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


MNIST数据集可以说是深度学习的入门,但是使用模型预测单张MNIST图片得到数字识别结果的文章不多,所以本人查找资料,把代码写下,希望可以帮到大家~1#BudingyourfirstimageclassificationmodelwithMNISTdataset2importtensorflowastf3importnumpyasnp4impor
1、新建tensorflow环境(1)打开anacondaprompt,输入命令行condacreate-ntensorflowpython=3.6注意:尽量不要更起名字,不然环境容易出错在选择是否安装时输入“y”(即为“yes”)。其中tensorflow为新建的虚拟环境名称,可以按喜好自由选择。python=3.6为指定python版本为3
这篇文章主要介绍“张量tensor是什么”,在日常操作中,相信很多人在张量tensor是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大...
tensorflow中model.fit()用法model.fit()方法用于执行训练过程model.fit(训练集的输入特征,训练集的标签,batch_size,#每一个batch的大小epochs,#迭代次数validation_data=(测试集的输入特征,
https://blog.csdn.net/To_be_little/article/details/124438800 目录1、查看GPU的数量2、设置GPU加速3、单GPU模拟多GPU环境1、查看GPU的数量importtensorflowastf#查看gpu和cpu的数量gpus=tf.config.experimental.list_physical_devices(device_type='GPU')cpus=tf.c
根据身高推测体重const$=require('jquery');consttf=require('@tensorflowfjs');consttfvis=require('@tensorflowfjs-vis');/*根据身高推测体重*///把数据处理成符合模型要求的格式functiongetData(){//学习数据constheights=[150,151,160,161,16
#!/usr/bin/envpython2#-*-coding:utf-8-*-"""CreatedonThuSep610:16:372018@author:myhaspl@email:myhaspl@myhaspl.com二分法求解一元多次方程"""importtensorflowastfdeff(x):y=pow(x,3)*3+pow(x,2)*2-19return
 继续上篇的pyspark集成后,我们再来看看当今热的不得了的tensorflow是如何继承进pycharm环境的参考:http://blog.csdn.net/include1224/article/details/53452824思路其实很简单,说下要点吧1.python必须要3.564位版本(上一篇直接装的是64位版本的Anaconda)2.激活3.5版本的
首先要下载python3.6:https://www.python.org/downloadselease/python-361/接着下载:numpy-1.13.0-cp36-none-win_amd64.whl 安装这两个:安装python3.6成功,接着安装numpy.接着安装tensorflow: 最后测试一下: python3.6+tensorflow安装完毕,高深的AI就等着你去
参考书《TensorFlow:实战Google深度学习框架》(第2版)以下TensorFlow程序完成了从图像片段截取,到图像大小调整再到图像翻转及色彩调整的整个图像预处理过程。#!/usr/bin/envpython#-*-coding:UTF-8-*-#coding=utf-8"""@author:LiTian@contact:694317828@qq.com
参考:TensorFlow在windows上安装与简单示例写在开头:刚开始安装的时候,由于自己的Python版本是3.7,安装了好几次都失败了,后来发现原来是tensorflow不支持3.7版本的python,所以后来换成了Python3.6,就成功了。。。。。anconda:5.3.2python版本:3.6.8tensorflow版本:1.12.0安装Anconda
实验介绍数据采用CriteoDisplayAds。这个数据一共11G,有13个integerfeatures,26个categoricalfeatures。Spark由于数据比较大,且只在一个txt文件,处理前用split-l400000train.txt对数据进行切分。连续型数据利用log进行变换,因为从实时训练的角度上来判断,一般的标准化方式,
 1)登录需要一个 invitationcode,申请完等邮件吧,大概要3-5个小时;2)界面3)配置数据集,在右边列设置 
模型文件的保存tensorflow将模型保持到本地会生成4个文件:meta文件:保存了网络的图结构,包含变量、op、集合等信息ckpt文件:二进制文件,保存了网络中所有权重、偏置等变量数值,分为两个文件,一个是.data-00000-of-00001文件,一个是.index文件checkpoint文件:文本文件,记录了最新保持
原文地址:https://blog.csdn.net/jesmine_gu/article/details/81093686这里只是做个收藏,防止原链接失效importosimportnumpyasnpfromPILimportImageimporttensorflowastfimportmatplotlib.pyplotaspltangry=[]label_angry=[]disgusted=[]label_d
 首先声明参考博客:https://blog.csdn.net/beyond_xnsx/article/details/79771690?tdsourcetag=s_pcqq_aiomsg实践过程主线参考这篇博客,相应地方进行了变通。接下来记载我的实践过程。  一、GPU版的TensorFlow的安装准备工作:笔者电脑是Windows10企业版操作系统,在这之前已
1.tensorflow安装  进入AnacondaPrompt(windows10下按windows键可找到)a.切换到创建好的tensorflow36环境下:activatetensorflow36    b.安装tensorflow:pipinstlltensorflow    c.测试环境是否安装好       看到已经打印出了"h
必须走如下步骤:sess=tf.Session()sess.run(result)sess.close()才能执行运算。Withtf.Session()assess:Sess.run()通过会话计算结果:withsess.as_default():print(result.eval())表示输出result的值生成一个权重矩阵:tf.Variable(tf.random_normal([2,3]
tf.zeros函数tf.zeros(shape,dtype=tf.float32,name=None)定义在:tensorflow/python/ops/array_ops.py.创建一个所有元素都设置为零的张量. 该操作返回一个带有形状shape的类型为dtype张量,并且所有元素都设为零.例如:tf.zeros([3,4],tf.int32)#[[0,0,
一、Tensorflow基本概念1、使用图(graphs)来表示计算任务,用于搭建神经网络的计算过程,但其只搭建网络,不计算2、在被称之为会话(Session)的上下文(context)中执行图3、使用张量(tensor)表示数据,用“阶”表示张量的维度。关于这一点需要展开一下       0阶张量称