PySpark 读写外部数据库实战指南:MySQL 和 HBase

使用PySpark轻松读写MySQL和HBase数据库,附详细代码示例和配置说明。

原文标题:大数据系列之PySpark读写外部数据库

原文作者:牧羊人的方向

冷月清谈:

本文介绍了如何使用 PySpark 读写 MySQL 和 HBase 数据库。

**1. PySpark 读写 MySQL**

* **连接 MySQL:** 使用 PyMySQL 库连接 MySQL 数据库,需要提供主机地址、用户名、密码和数据库名等信息。
* **写入 MySQL:** 将数据转换为 RDD,遍历 RDD 中的每一条数据,使用 SQL 语句将数据插入到 MySQL 表中。
* **读取 MySQL:** 使用 Spark 的 JDBC 接口读取 MySQL 数据,需要提供 JDBC URL、驱动程序、表名、用户名和密码等信息。

**2. PySpark 读写 HBase**

* **写入 HBase:** 使用 `saveAsNewAPIHadoopDataset` 算子将 RDD 数据写入 HBase。需要配置 HBase 的相关参数,例如 Zookeeper 地址、表名、输出格式等。
* **读取 HBase:** 使用 `newAPIHadoopRDD` 算子读取 HBase 数据。需要配置 HBase 的相关参数,例如 Zookeeper 地址、表名、输入格式等。

需要注意的是,在使用 PySpark 读写 HBase 时,需要下载并配置相关的依赖库。

怜星夜思:

1、文章中提到了 PyMySQL 和 MySQLdb 模块,它们之间有什么区别?在实际使用中该如何选择?
2、在使用 Spark 读写 HBase 时,需要注意哪些版本兼容性问题?
3、除了 MySQL 和 HBase,Spark 还支持哪些外部数据库的读写?有哪些最佳实践?

原文内容

本文以MySQL和HBASE为例,简要介绍Spark通过PyMySQL和HadoopAPI算子对外部数据库的读写操作

1、PySpark读写MySQL

MySQL环境准备参考“”部分

1.1 PyMySQL和MySQLDB模块

PyMySQL是在Python3.x版本中用于连接MySQL服务器的一个库,Python2中则使用mysqldb,目前在Python 2版本支持PyMySQL。使用以下命令安装PyMysql模块:

pip install PyMySQL

连接到MySQL数据库

import pymysql
# 打开数据库连接
db = pymysql.connect("localhost","testuser","test123","TESTDB" )
# 使用 cursor() 方法创建一个游标对象
cursor cursor = db.cursor()
# 使用 execute() 方法执行 SQL 查询
cursor.execute("SELECT VERSION()")
# 使用 fetchone() 方法获取单条数据.
data = cursor.fetchone() print ("Database version : %s " % data)
# 关闭数据库连接
db.close()
1.2 Spark数据写入MySQL

1)启动MySQL服务并检查

[root@tango-01 bin]# ./mysqld_safe &
[root@tango-01 bin]# 180814 15:50:02 mysqld_safe Logging to '/usr/local/mysql/data/error.log'.
180814 15:50:02 mysqld_safe Starting mysqld daemon with databases from /usr/local/mysql/data
[root@tango-01 bin]# ps -ef|grep mysql

2)创建MySQL表

[root@tango-01 bin]# ./mysql -u root -proot
mysql> use test;
mysql> create table test_spark(id int(4),info char(8),name char(20),sex char(2));
mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| test_spark |
+----------------+
2 rows in set (0.00 sec)

3)向MySQL中写入数据

  • 启动ipython notebook

PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook"  HADOOP_CONF_DIR=/usr/local/spark/hadoop-2.9.0/etc/hadoop  pyspark
  • 建立MySQL连接,写入数据

from pyspark import SparkContext
from pyspark import SparkConf
import pymysql

rawData=['1,info1,tango,F','2,info2,zhangsan,M']
conn = pymysql.connect(user="root",passwd="xxxxxx",host="192.168.112.10",db="test",charset="utf8")
cursor=conn.cursor()
for i in range(len(rawData)):
retData=rawData[i].split(',')
id = retData[0]
info = retData[1]
name = retData[2]
sex = retData[3]
sql = "insert into test_spark(id,info,name,sex) values('%s','%s','%s','%s')" %(id,info,name,sex)
cursor.execute(sql)
conn.commit()
conn.close()

  • 查询MySQL表数据

1.3 Spark读取MySQL数据

1)下载mysql-connect-java驱动,并存放在spark目录的jars下

2)运行pyspark,执行以下语句

[root@tango-spark01 jars]# pyspark
>>> from pyspark.sql import SQLContext
>>> sqlContext = SQLContext(sc)
>>> dataframe_mysql = sqlContext.read.format("jdbc").\
... options(url="jdbc:mysql://192.168.112.10:3306/test", driver="com.mysql.jdbc.Driver",
... dbtable="test_spark", user="root", password="xxxxxx").load()
>>> dataframe_mysql.show()

2、PySpark读写HBASE

HBASE环境准备参考“”部分,HBASE版本为1.2.6,Hadoop版本为2.9.0,Spark版本为2.3.0。注:使用高版本的HBASE如2.1.0出现NotFoundMethod接口问题。

2.1 Spark读写HBASE模块

1)saveAsNewAPIHadoopDataset模块

Spark算子saveAsNewAPIHadoopDataset使用新的Hadoop API将RDD输出到任何Hadoop支持的存储系统,为该存储系统使用Hadoop Configuration对象。saveAsNewAPIHadoopDataset参数说明如下:

saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
- confHBASE的配置文件
- keyConverterkey值的输出类型
- valueConvertervalue值的输出类型

2)newAPIHadoopRDD模块

使用新的Hadoop API读取数据,参数如下:

newAPIHadoopRDD(inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)
- inputFormatClass Hadoop InputFormat class名称
- keyClasskey Writable class名称
- valueClassvalue Writable class名称
- keyConverterkey值的输入类型
- valueConvertervalue值的输入类型
- confHBASE的配置文件
- batchSizePython对象作为单个Java对象个数,默认为0,自动选择
2.2 Spark数据写入HBASE

1)启动HBASE服务

[root@tango-spark01 hbase-2.1.0]# ./bin/start-hbase.sh

在Master和Slave服务器使用jps查看HMaster和HRegionServer进程:

[root@tango-spark01 logs]# jps
1859 ResourceManager
1493 NameNode
4249 HMaster
5578 Jps
1695 SecondaryNameNode
[root@tango-spark02 conf]# jps
1767 NodeManager
3880 HRegionServer
1627 DataNode
4814 Jps

注:启动HBASE之前需先启动zookeeper集群和Hadoop集群环境

2)创建HBASE表

hbase(main):027:0> create 'spark_hbase','userinfo'
Created table spark_hbase
Took 2.6556 seconds
=> Hbase::Table - spark_hbase
hbase(main):028:0> put 'spark_hbase','2018001','userinfo:name','zhangsan'
Took 0.0426 seconds
hbase(main):029:0> put 'spark_hbase','2018001','userinfo:age','16'
Took 0.0079 seconds
hbase(main):030:0> put 'spark_hbase','2018001','userinfo:sex','M'

3)配置Spark 在Spark 2.0版本上缺少相关把hbase的数据转换python可读取的jar包,需要另行下载https://mvnrepository.com/artifact/org.apache.spark/spark-examples_2.11/1.6.0-typesafe-001

  • 上传jar包到spark lib库

[root@tango-spark01 jars]# pwd
/usr/local/spark/spark-2.3.0/jars
[root@tango-spark01 jars]# mkdir hbase
[root@tango-spark01 jars]# cd hbase
[root@tango-spark01 hbase]# ls
spark-examples_2.11-1.6.0-typesafe-001.jar
  • 编辑spark-env.sh,添加以下内容:

export SPARK_DIST_CLASSPATH=$(/usr/local/spark/hadoop-2.9.0/bin/hadoop classpath):$(/usr/local/spark/hbase-2.1.0/bin/hbase classpath):/usr/local/spark/spark-2.3.0/jars/hbase/*
  • 拷贝HBASE下的lib库到spark下

[root@tango-spark01 lib]# pwd
/usr/local/spark/hbase-2.1.0/lib
[root@tango-spark01 lib]# cp -f hbase-* /usr/local/spark/spark-2.3.0/jars/hbase/
[root@tango-spark01 lib]# cp -f guava-11.0.2.jar /usr/local/spark/spark-2.3.0/jars/hbase/
[root@tango-spark01 lib]# cp -f htrace-core-3.1.0-incubating.jar /usr/local/spark/spark-2.3.0/jars/hbase/
[root@tango-spark01 lib]# cp -f protobuf-java-2.5.0.jar /usr/local/spark/spark-2.3.0/jars/hbase/
  • 重启HBASE

[root@tango-spark01 hbase-2.1.0]# ./bin/stop-hbase.sh
[root@tango-spark01 hbase-2.1.0]# ./bin/start-hbase.sh

4)向HBASE中写入数据

  • 启动ipython notebook

PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook"  HADOOP_CONF_DIR=/usr/local/spark/hadoop-2.8.3/etc/hadoop  pyspark
  • 配置初始化

zk_host="192.168.112.101"
table = "spark_hbase"
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
conf = {"hbase.zookeeper.quorum": zk_host,"hbase.mapred.outputtable": table,
"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
"mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
  • 初始化数据并序列化转换为RDD

rawData = ['2018003,userinfo,name,Lily','2018004,userinfo,name,Tango','2018003,userinfo,age,22','2018004,userinfo,age,28']
print(rawData)
rddRow = sc.parallelize(rawData).map(lambda x: (x[0:7],x.split(',')))
rddRow.take(5)

  • 调用saveAsNewAPIHadoopDataset模块写入HBASE

rddRow.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
  • 查询HBASE中表数据,看到插入数据

2.3 Spark读取HBASE数据

Spark读取HBASE数据使用newAPIHadoopRDD模块

1)配置初始化

host = '192.168.112.101'
table = 'spark_hbase'
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"

2)调用newAPIHadoopRDD模块读取HBASE数据

hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
count = hbase_rdd.count()
hbase_rdd.cache()
output = hbase_rdd.collect()
for (k, v) in output:
print (k, v)

输出结果如下:

参考资料

  1. http://spark.apache.org/docs/latest/api/python/pyspark.html

关于PyMySQL和MySQLdb,它们的主要区别在于对Python版本的支持。MySQLdb是老牌驱动,只支持Python 2,而PyMySQL是纯Python写的,同时支持Python 2和3。所以,如果你使用Python 3,那就选择PyMySQL;如果还在用Python 2,两者都可以,但推荐使用更新更活跃的PyMySQL,毕竟维护更好嘛。

版本兼容性确实是个头疼的问题。文章中提到使用高版本的HBase(如2.1.0)会出现NotFoundMethod接口问题,这是因为Spark和HBase的API可能不匹配。建议根据Spark版本选择对应的HBase版本,可以参考官方文档或一些博客,通常情况下,选择相近的稳定版本比较保险。

补充一下,MySQLdb虽然不支持python3,但有MySQLclient这个fork的支持,而且MySQLclient的性能比PyMySQL更好,API也和MySQLdb基本一致。所以,追求性能或者已有MySQLdb代码库的话,可以考虑MySQLclient。

除了HBase版本,Hadoop版本也需要注意,Spark是基于Hadoop的,所以Hadoop、Spark、HBase三个版本之间需要相互兼容。我之前遇到过Hadoop版本太低导致无法正常连接HBase的情况,升级Hadoop后问题就解决了。建议使用发行版打包好的Hadoop+Spark+HBase环境,省去很多兼容性问题。

实际操作中,最好先搭建一个测试环境,验证一下各个组件的兼容性,避免在生产环境中出现问题。另外,要注意依赖库的版本,有些jar包冲突也会导致NotFoundMethod之类的错误。

补充一点,对于大规模数据读写,可以考虑使用Spark的并行化特性,例如分区和并行读取,提高效率。另外,要注意数据倾斜问题,避免某些节点任务过重,影响整体性能。可以预先进行数据预处理,或者使用一些优化策略,比如数据倾斜优化参数等。

个人经验,PyMySQL轻量级一些,安装方便,对Python 3的支持也更好。MySQLdb在一些老项目中可能还会用到,但新项目建议直接上PyMySQL,省心。

安全方面也需要注意,数据库连接信息不要硬编码在代码里,最好使用配置文件或者环境变量,避免泄露敏感信息。另外,对于一些敏感数据,可以考虑加密存储或传输,提高安全性。

Spark支持很多外部数据库,比如PostgreSQL、Cassandra、MongoDB、Redis等等。一般来说,使用Spark SQL的JDBC接口可以连接关系型数据库,而对于NoSQL数据库,则需要使用对应的连接器或者API。最佳实践方面,建议使用连接池来管理数据库连接,优化性能,同时要注意数据格式的转换,避免数据丢失或错误。