アカリの部屋

通过PMML上线模型到Spark

最近实在是忙得要死没时间更新博客,现在回想一下之前的GAN文章还放着没上代码呢,而且传统ML的部分也有太多想完善的地方,但愿后面能有大块的时间把博客搞搞吧,不过我觉得可能比较悲观(

环境问题

JPMML体系比较复杂,在Spark方面,从依赖的顶端来看,jpmml-evaluator-spark是用于Spark环境的对pmml-evaluator的封装库、pmml-evaluator用于做预测、pmml-model是算法库/模型库。

这里存在一个问题:在某些发行版的Spark当中,Spark MLLib内部引用了老版本的model库(1.2.15),如果此时在pom里引入最近(2017.11.23)版的model库(1.3.8),在集群上就会报错,从源码来看,比如pmml-evaluator在解析pmml时看到的本应该是org.dmg.pmml.tree.TreeModel,而实际看到的是org.dmg.pmml.TreeModel,匹配不到类型,则导致pmml文件解析失败。

依赖的依赖问题需要改POM解决,在上面三个包之外,还有一个游离在上面依赖路径外的包pmml-sparkml,它是对MLLib的支持库,真正的坑在于作者在这个repo里告诉了你有这个问题,然而在你真需要的jpmml-evaluator-spark项目里没有给出完整的详解,并且百度/Google到的帖子里对这个repo只字未提(我似乎在什么地方看到过一句不要care这个repo…),看来还是要把无关的repo的README也看一看。

还有一个坑是,jpmml-evaluator-spark项目在maven官方仓库最高到1.0.0(2017.11.23),它支持4.2版本的pmml文件,而如果想支持4.3版本的pmml,则需要自己下载1.1-SNAPSHOT并且maven clean install到本地.m2,这个问题在README当中也没有提及。

文档说要从集群环境中移除下面两个包:$SPARK_HOME/jars/pmml-model-1.2.15.jar$SPARK_HOME/jars/pmml-schema-1.2.15.jar,但是在EMR上并没有必要这么做。

下面这两个依赖是没必要导入的,因为在jpmml-evaluator-spark里已经给你导了正确的版本,而下面1.3.8点算法库却在JPMML-SparkML项目里也只字未提:

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.jpmml</groupId>
<artifactId>pmml-model</artifactId>
<version>1.3.8</version>
</dependency>
<dependency>
<groupId>org.jpmml</groupId>
<artifactId>pmml-evaluator</artifactId>
<version>1.3.10</version>
</dependency>

集群配置

废了一车话,所以实际要做的就是,在pom中导入Spark自身的MLLib的方式要作修改,绕开pmml:

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.jpmml</groupId>
<artifactId>pmml-model</artifactId>
</exclusion>
</exclusions>
</dependency>

并且需要通过maven的shade插件改变命名空间来使用正确的JPMML-Model中的org.dmg.pmml.*和org.jpmml.*:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.fxc.rpc.impl.member.MemberProvider</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
</transformers>
<relocations>
<relocation>
<pattern>org.dmg.pmml</pattern>
<shadedPattern>org.shaded.dmg.pmml</shadedPattern>
</relocation>
<relocation>
<pattern>org.jpmml</pattern>
<shadedPattern>org.shaded.jpmml</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>

这种配置可以保证在EMR集群上能识别4.2和4.3版本的pmml。至于本地run,则不要搞这一大坨配置,把那两个包导进来就行,但是在集群上就run不了了。因为每个人本地环境又不一样,这里就不写了。

Spark代码开发

首先,Spark主节点需要通过Java代码调用jpmml-evaluator库构造一个Transformer,这个Transformer相当于一整条机器学习pipeline,里面可以包含特征处理和模型训练,然后把它分发到子节点,作为一个DataFrame上的转换器,给原始数据添加学习后的列,这个列是个数组,包含特征处理的中间结果和回归/分类标签。

假设现在有一个sklearn导出的lr.pmml,放在resources下,Java代码构造Transformer如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class LRClassifier_s3 {
public static Transformer getClassifier() throws JAXBException, SAXException, IOException {
InputStream is = LRClassifier_s3.class.getResourceAsStream("/lr.pmml");
Evaluator evaluator = EvaluatorUtil.createEvaluator(is);
TransformerBuilder modelBuilder = new TransformerBuilder(evaluator)
.withOutputCols()
.withTargetCols()
.exploded(false);
Transformer transformer = modelBuilder.build();
return transformer;
}
}

Scala语言写的Spark调用方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
object test_s3 {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().getOrCreate()
val inputPath = "s3://dm/dataset.csv"
val inputDF = sparkSession.read.option("header", true).option("charset", "UTF8").csv(inputPath)
val clf = LRClassifier_s3.getClassifier()
val resultDF = clf.transform(inputDF)
val outputDF = resultDF.select("x0", "x1", "pmml.y")
val outputPath = "s3://dm/lrtestoutput/"
outputDF.write.option("header", true).mode(SaveMode.Overwrite).csv(outputPath)
}
}

这样就可以把原始数据的x0、x1加上标记y一共三列写到输出文件里了。

Sklearn导出模型

题外话,sklearn通过流水线可以把整个机器学习过程自动化,将sklearn模型导出,需要使用的包是:
pip install --user git+https://github.com/jpmml/sklearn2pmml.git

需要导入

1
2
from sklearn_pandas import DataFrameMapper
from sklearn2pmml import PMMLPipeline

最后用库包中的子类串联管道,而不是使用sklearn自己的pipeline,然后一定要fit:

1
2
pipeline_pmml = PMMLPipeline([step1, step2, step3])
pipeline_pmml.fit(X_train, y_train)

导出的时候,一定要指定列头,否则模型将无法和原始文件结构匹配:

1
2
3
4
5
from sklearn2pmml import make_pmml_pipeline
pipeline_pmml = make_pmml_pipeline(pipeline_pmml, active_fields = X_train.columns, target_fields = y_train.columns)
from sklearn2pmml import sklearn2pmml
sklearn2pmml(pipeline_pmml, './lr.pmml')