pyspark中的HBaseConverters

最近项目上有个需求,使用 pyspark 读取 HBase 中存储的 java.math.BigDecimal。

最近甲方让我们写一个 pyspark 的教程,他们以后打算使用 pyspark 开发。他们的数据是那种精度要求比较高的数据,我们使用 java.math.BigDecimal 表示数字,然后转成 byte[] 后存入了 HBase,但是 python 是没法直接读取这个 BigDecimal,所以需要使用 spark-examples 中 HBaseConverters.scala 读取。

我们讨论的 spark 版本是 1.6,因为用的是 CDH 5,所以是这个版本。

原理实际上是,pyspark 在读取 HBase 的时候需要借助 org.apache.spark.examples.pythonconverters 这么一个类,这个类实际上是 scala 将 HBase 中的数据读取后,转换成 json 字符串返回,这样 pyspark 可以通过这个类从 HBase 中直接获取到 json 字符串这样的返回值。

可以从 HBaseConverters.scala 这里看到 HBaseConverters.scala 的源码,我们感兴趣的是从 HBase 中查询 value 这一部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package org.apache.spark.examples.pythonconverters

import scala.collection.JavaConverters._
import scala.util.parsing.json.JSONObject

import org.apache.spark.api.python.Converter
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.KeyValue.Type
import org.apache.hadoop.hbase.CellUtil

/**
* Implementation of [[org.apache.spark.api.python.Converter]] that converts all
* the records in an HBase Result to a String
*/
class HBaseResultToStringConverter extends Converter[Any, String] {
override def convert(obj: Any): String = {
val result = obj.asInstanceOf[Result]
val output = result.listCells.asScala.map(cell =>
Map(
"row" -> Bytes.toStringBinary(CellUtil.cloneRow(cell)),
"columnFamily" -> Bytes.toStringBinary(CellUtil.cloneFamily(cell)),
"qualifier" -> Bytes.toStringBinary(CellUtil.cloneQualifier(cell)),
"timestamp" -> cell.getTimestamp.toString,
"type" -> Type.codeToType(cell.getTypeByte).toString,
"value" -> Bytes.toStringBinary(CellUtil.cloneValue(cell))
)
)
output.map(JSONObject(_).toString()).mkString("\n")
}
}

这段代码很简单,实际上就是使用 java HBase 的 API 读取 HBase 中的值,将所有的值转换为 String 返回,我需要做的,只是将 value 这个字段的值,先从 byte[] 转到 BigDecimal,再转换为 String 即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class MyHBaseResultToStringConverter extends Converter[Any, String] {
override def convert(obj: Any): String = {
val result = obj.asInstanceOf[Result]
val output = result.listCells.asScala.map(cell =>
Map(
"row" -> Bytes.toStringBinary(CellUtil.cloneRow(cell)),
"columnFamily" -> Bytes.toStringBinary(CellUtil.cloneFamily(cell)),
"qualifier" -> Bytes.toStringBinary(CellUtil.cloneQualifier(cell)),
"timestamp" -> cell.getTimestamp.toString,
"type" -> Type.codeToType(cell.getTypeByte).toString,
"value" -> Bytes.toBigDecimal(CellUtil.cloneValue(cell)).toString()
)
)
output.map(JSONObject(_).toString()).mkString("\n")
}
}

这样代码就改完了,然后需要编译,打成 jar 包。

装好 maven,我装的是 3.6.0,不需要配置什么。

下载 spark 的源码,最开始我从 github 上面下载的,发现速度很慢,然后就去 spark 官网,找仓库中的源代码下载下来。

编译 spark-examples 的时候需要先从根目录中把 scalastyle-config.xml 拷贝到 examples 目录下再进行编译

cd 到 examples 目录下,使用以下命令编译 spark-examples

1
mvn clean install -pl :spark-examples_2.10

编译的时候没有遇到错误,编译好的包在同级目录下的 target 中,有个叫 spark-examples_2.10-1.6.0.jar 的文件。

然后就是使用这个包读取 HBase 中的 BigDecimal了:

我们使用 standalone 模式运行 pyspark:

1
pyspark --master spark://host1:7077 --jars spark-examples_2.10-1.6.0.jar

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import json

zookeeper_host = 'host1'
hbase_table_name = 'testTable'

conf = {"hbase.zookeeper.quorum": zookeeper_host, "hbase.mapreduce.inputtable": hbase_table_name}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"

# 注意这里,使用自己定义的Converter读取
valueConv = "org.apache.spark.examples.pythonconverters.MyHBaseResultToStringConverter"

hbase_rdd = sc.newAPIHadoopRDD(
"org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result",
keyConverter=keyConv,
valueConverter=valueConv,
conf=conf)
hbase_rdd = hbase_rdd.flatMapValues(lambda v: v.split("\n")).mapValues(json.loads)

hbase_rdd.take(1)

然后就可以看到结果了。

以上就是如何通过修改 HBaseConverters.scala 让 pyspark 从 HBase 中读取 java 的特殊类型。