由SparkNet的代码谈一谈Spark RDD与C++程序的交互(1)

Posted by CodingCat on December 31, 2015

前一段时间,Spark社区里有过一个关于如何把native c++程序和rdd整合起来的一个讨论 大意是如何让native c++程序能够和RDD之间进行数据通信,这个想法无论是从社区发展还是从具体工作的角度来说都是非常有意义的。

本月月初,NIPS在蒙特利尔召开。今年NIPS上有个非常赞的LearningSys Workshop,对于做系统的人来讲这个workshop比主会更有意义。我本人也有一个工作在这个workshop上,Running a Parameter Server within a
Distributed Dataflow Framework
。这篇博客要说的不是我自己的工作,而是一个把深度学习框架caffe跑在在Spark里面的工作,叫SparkNet。这个工作应该是native c++和RDD整合的先行者。今天花了一些时间来研究SparkNet的代码,现整理如下。

我们首先从宏观的角度看一看这个系统是怎么工作的。

  • 首先SparkNet利用Spark的接口来读取训练数据和测试数据以及对应的label, 把读取的数据输入到native的caffe library的内存中

  • 利用内存中的数据训练本地的一个caffe net

  • 将分散在各个服务器上的参数通过Spark的机制收集求平均 (SparkNet不需要每个迭代都做这个操作,具体参阅论文) 然后进入下一个迭代

从这三步中我们看出了整合native C++程序和RDD的三个对应以上三点的要求

  • 能够把RDD的数据放到C++内存中

  • 运行C++程序

  • 把C++内存里的数据读回到RDD(Spark)空间 进行后续操作

在这个系列的博客就按照这三点的顺序来阐述。

在这篇博客里我首先讲第一点, RDD 数据到C++内存的拷贝。我们的思路是首先看一看SparkNet如何做的,然后再讨论其他的做法。

RDD 数据到C++内存 (Spark)

我们从SparkNet apps/CifarApp.scala开始看。这是一个跑Cifar数据集的程序,基本把运行SparkNet要做的事都做了,所以是个很典型的例子。

apps/CifarApp.scala
val loader = new CifarLoader(sparkNetHome + "/caffe/data/cifar10/")
log("loading train data")
var trainRDD = sc.parallelize(loader.trainImages.zip(loader.trainLabels))
log("loading test data")
var testRDD = sc.parallelize(loader.testImages.zip(loader.testLabels))

这个程序目前是从本地读取训练数据集然后通过RDD的parallelize API来形成RDD。后续的代码经过对训练集的一系列变换来得到合乎要求的数据。这些变换都是通过标准的RDD API 来实现的,和本篇博客的主题没有太大关系,所以这里我们忽略这一段过程。我们默认最后得到的RDD的名字是trainMiniBatchRDDtestMiniBatchRDD

下面我们要做的如何把存在于JVM RDD空间的数据传到C++空间来调用Caffe来计算。

apps/CifarApp.scala
trainPartitionSizes.zipPartitions(trainMiniBatchRDD) (
(lenIt, trainMinibatchIt) => {
assert(lenIt.hasNext && trainMinibatchIt.hasNext)
val len = lenIt.next
assert(!lenIt.hasNext)
val miniBatchSampler = new MiniBatchSampler(trainMinibatchIt, len, syncInterval)
net.setTrainData(miniBatchSampler, None)
//we synchronize the parameter for every syncInterval iterations
net.train(syncInterval)
Array(0).iterator
}
).foreachPartition(_ => ()) // trigger a job

在上一段代码中我们会看到,只有被标记的124行和数据的拷贝有关,很显然这一行是关键,那么在这段代码里做了什么呢?我们进入到定义这个setTrainData方法的Net.scala文件,相关代码如下。一个叫做的library的变量call了一个叫set_train_data_callback的方法两次。从名字上推断,应该是设置了某个自动运行的callback。

libs/Net.scala
def setTrainData(minibatchSampler: MiniBatchSampler, trainPreprocessing: Option[(ByteImage, Array[Float]) => Unit] = None) = {
imageTrainCallback = Some(makeImageCallback(minibatchSampler, trainPreprocessing))
labelTrainCallback = Some(makeLabelCallback(minibatchSampler))
library.set_train_data_callback(state, 0, imageTrainCallback.get)
library.set_train_data_callback(state, 1, labelTrainCallback.get)
}

从代码里可以看出设置的callback应该是对应了训练数据和相应的标记数据。这两者的实现方法应该类似,我们只看makeImageCallback大体是怎么做的。

libs/Net.scala
private def makeImageCallback(minibatchSampler: MiniBatchSampler, preprocessing: Option[(ByteImage, Array[Float]) => Unit] = None): CaffeLibrary.java_callback_t = {
return new CaffeLibrary.java_callback_t() {
def invoke(data: Pointer, batchSize: Int, numDims: Int, shape: Pointer) {
val currentImageBatch = minibatchSampler.nextImageMinibatch()
assert(currentImageBatch.length == batchSize)
val arrayShape = new Array[Int](numDims) // figure out what shape images Caffe expects
for (i <- 0 to numDims - 1) {
val dim = shape.getInt(i * intSize)
arrayShape(i) = dim
}
val size = arrayShape.product
val buffer = new Array[Float](size)
for (j <- 0 to batchSize - 1) {
val currentImage = currentImageBatch(j)
val processedImage = {
if (preprocessing.isEmpty) {
// copy the image into a float buffer, ignoring the numChannels component
currentImage.cropInto(buffer, new Array[Int](numDims - 1), arrayShape.slice(1, numDims))
} else {
preprocessing.get(currentImage, buffer)
}
}
data.write(j * size * dtypeSize, buffer, 0, size)
}
}
}
}

从被标记的174和179行可以看出 这个 callback做的事情很简单,就是拷贝数据。那么问题来了?我们知道了callback在干什么,并且有人把这个callback赋值给了某个未知的变量。到底是谁调用了这个callback呢? 设置callback的library的类型是CaffeLibrary。这是一个连接C++实现和JVM层调用的接口,他定义了神经网络训练的基本方法,我们会在下一篇博客里着重介绍,这里下按下不表。 让我们先粗略看一下set_train_data_callback到底在干什么。刚才说过了,声明set_train_data_callback的是一个连接了C++实现和JVM调用者的,类型为CaffeLibrary的接口,我们把具体这个接口是怎么工作的细节留到下一篇博客,直接跳到C++的set_train_data_callback实现。实现了set_train_data_callback的C++文件是libccaffe/ccaffe.cpp:

libccaffe/ccaffe.cpp
int set_data_callback(caffenet_state* state, int layer_idx, java_callback_t callback, caffe::Net<DTYPE> *net) {
if (layer_idx >= net->layers().size()) {
return -1;
}
boost::shared_ptr<caffe::JavaDataLayer<DTYPE> > md_layer =
boost::dynamic_pointer_cast<caffe::JavaDataLayer<DTYPE> >(net->layers()[layer_idx]);
if (!md_layer) {
return -2;
}
md_layer->SetCallback(callback);
return 0;
}
int set_train_data_callback(caffenet_state* state, int layer_idx, java_callback_t callback) {
return set_data_callback(state, layer_idx, callback, state->net);
}

在上段代码中,我标记了最关键的两行。这两行的大体意思是通过调用一个实现在caffe名字空间里的JavaDataLayer的SetCallback方法,然后就完成了所有在SparkNet一端拷贝RDD数据到C++内存空间做的工作。现在我们至少可以得出一个结论,

SparkNet关于数据拷贝做的基于JVM的实现非常少,他依赖了C++的某种实现来达到最终的目的。

另外我们还能得到一个线索,最关键的实现在JavaDataLayer

JavaDataLayer 的实现 - 谁调用了callback

要理解JavaDataLayer,我们需要具备一些caffe的基础知识。caffe作为一个深度学习框架,最关键的就是定义深度神经网络的结构。caffe允许用户通过protobuf的格式来定义深度神经网络各个layer。

比如下面这一段代码就定义了类型是JavaData的一个输入数据层

layer {
name: "cifardata"
type: "JavaData"
top: "data"
java_data_param {
shape {
dim: 100
dim: 3
dim: 32
dim: 32
}
}
}
layer {
name: "cifarlabel"
type: "JavaData"
top: "label"
java_data_param {
shape {
dim: 100
dim: 1
}
}
}

每一个layer包含两个功能: 一个是Forward,也就是通过把计算逻辑施加到输入数据上来产生相应的输出; 另一个是Backforward,也就是根据得到的gradient和输出计算出与它之前的layer对应的gradient。JavaDataLayer也不例外,他作为一个提供数据的输入数据层,自然定义了自己的Forward的方法:

caffe/src/caffe/layers/java_data_layer.cpp
template <typename Dtype>
void JavaDataLayer<Dtype>::SetCallback(java_callback_t callback) {
java_callback_ = callback;
}
template <typename Dtype>
void JavaDataLayer<Dtype>::Forward_cpu(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {
CHECK(java_callback_) << "JavaDataLayer needs to be initalized by calling SetCallback";
// Get data from scala
java_callback_(static_cast<void*>(buffer_), batch_size_, shape_.dim_size() - 1, java_shape_);
top[0]->Reshape(shape_);
top[0]->set_cpu_data(buffer_);
}

在这段代码中我们看到了JavaDataLayer一旦被触发就通过调用已经被赋值过的java_callback_来实现数据拷贝,然后把CPU对应的输入数据设置成含有拷贝过的数据的buffer.

关于SparkNet中RDD与C++交互的总结和思考

从以上的分析可以看出SparkNet实现RDD和C++之间的数据拷贝具体的执行是在c++空间中进行的,这得益于caffe的接口设计,可以有forward这样一个方法来很自然的做这个事情。当Native的程序没有这种非常自然的接口设计怎么办呢?比如我们的程序有一个叫Matrix的类,提供了根据长和宽来初始化矩阵的构造函数,程序所有的操作都是基于这个Matrix作为输入数据来进行的。如何把RDD的数据拷贝到C++空间来生成一个Matrix呢?

一个基本的pattern是: 我们需要Hack一下原来Matrix的实现从而允许我们能够从二进制数据直接生成Matrix.

这里我们要注意的是,我们进行的RDD和C++程序的整合工作,不是去开发Java/Scala Bindings of C++ Program。所以我们应该把操作尽量实现在C++空间里,不要暴露太多的业务逻辑给RDD端,比如Matrix的getColumn/mul这种就不需要做API了。