最近项目上有个需求,使用 pyspark 读取 HBase 中存储的 java.math.BigDecimal。
最近甲方让我们写一个 pyspark 的教程,他们以后打算使用 pyspark 开发。他们的数据是那种精度要求比较高的数据,我们使用 java.math.BigDecimal 表示数字,然后转成 byte[] 后存入了 HBase,但是 python 是没法直接读取这个 BigDecimal,所以需要使用 spark-examples 中 HBaseConverters.scala 读取。
我们讨论的 spark 版本是 1.6,因为用的是 CDH 5,所以是这个版本。
原理实际上是,pyspark 在读取 HBase 的时候需要借助 org.apache.spark.examples.pythonconverters 这么一个类,这个类实际上是 scala 将 HBase 中的数据读取后,转换成 json 字符串返回,这样 pyspark 可以通过这个类从 HBase 中直接获取到 json 字符串这样的返回值。
可以从 HBaseConverters.scala 这里看到 HBaseConverters.scala 的源码,我们感兴趣的是从 HBase 中查询 value 这一部分:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package org.apache.spark.examples.pythonconvertersimport scala.collection.JavaConverters ._import scala.util.parsing.json.JSONObject import org.apache.spark.api.python.Converter import org.apache.hadoop.hbase.client.{Put , Result }import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.KeyValue .Type import org.apache.hadoop.hbase.CellUtil class HBaseResultToStringConverter extends Converter [Any , String ] { override def convert (obj: Any ): String = { val result = obj.asInstanceOf[Result ] val output = result.listCells.asScala.map(cell => Map ( "row" -> Bytes .toStringBinary(CellUtil .cloneRow(cell)), "columnFamily" -> Bytes .toStringBinary(CellUtil .cloneFamily(cell)), "qualifier" -> Bytes .toStringBinary(CellUtil .cloneQualifier(cell)), "timestamp" -> cell.getTimestamp.toString, "type" -> Type .codeToType(cell.getTypeByte).toString, "value" -> Bytes .toStringBinary(CellUtil .cloneValue(cell)) ) ) output.map(JSONObject (_).toString()).mkString("\n" ) } }
这段代码很简单,实际上就是使用 java HBase 的 API 读取 HBase 中的值,将所有的值转换为 String 返回,我需要做的,只是将 value 这个字段的值,先从 byte[] 转到 BigDecimal,再转换为 String 即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class MyHBaseResultToStringConverter extends Converter [Any , String ] { override def convert (obj: Any ): String = { val result = obj.asInstanceOf[Result ] val output = result.listCells.asScala.map(cell => Map ( "row" -> Bytes .toStringBinary(CellUtil .cloneRow(cell)), "columnFamily" -> Bytes .toStringBinary(CellUtil .cloneFamily(cell)), "qualifier" -> Bytes .toStringBinary(CellUtil .cloneQualifier(cell)), "timestamp" -> cell.getTimestamp.toString, "type" -> Type .codeToType(cell.getTypeByte).toString, "value" -> Bytes .toBigDecimal(CellUtil .cloneValue(cell)).toString() ) ) output.map(JSONObject (_).toString()).mkString("\n" ) } }
这样代码就改完了,然后需要编译,打成 jar 包。
装好 maven,我装的是 3.6.0,不需要配置什么。
下载 spark 的源码,最开始我从 github 上面下载的,发现速度很慢,然后就去 spark 官网,找仓库中的源代码下载下来。
编译 spark-examples 的时候需要先从根目录中把 scalastyle-config.xml 拷贝到 examples 目录下再进行编译
cd 到 examples 目录下,使用以下命令编译 spark-examples
1 mvn clean install -pl :spark-examples_2.10
编译的时候没有遇到错误,编译好的包在同级目录下的 target 中,有个叫 spark-examples_2.10-1.6.0.jar 的文件。
然后就是使用这个包读取 HBase 中的 BigDecimal了:
我们使用 standalone 模式运行 pyspark:1 pyspark --master spark://host1:7077 --jars spark-examples_2.10-1.6.0.jar
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import jsonzookeeper_host = 'host1' hbase_table_name = 'testTable' conf = {"hbase.zookeeper.quorum" : zookeeper_host, "hbase.mapreduce.inputtable" : hbase_table_name} keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" valueConv = "org.apache.spark.examples.pythonconverters.MyHBaseResultToStringConverter" hbase_rdd = sc.newAPIHadoopRDD( "org.apache.hadoop.hbase.mapreduce.TableInputFormat" , "org.apache.hadoop.hbase.io.ImmutableBytesWritable" , "org.apache.hadoop.hbase.client.Result" , keyConverter=keyConv, valueConverter=valueConv, conf=conf) hbase_rdd = hbase_rdd.flatMapValues(lambda v: v.split("\n" )).mapValues(json.loads) hbase_rdd.take(1 )
然后就可以看到结果了。
以上就是如何通过修改 HBaseConverters.scala 让 pyspark 从 HBase 中读取 java 的特殊类型。