您的位置:首页 > 数据库 > SQL

Go sqlx包 + postges pq包实现postgres的批量插入、更新

2017-01-26 11:59 579 查看
大家使用SQL like的数据库会经常用到类似如下的数据库语句批量插入数据:

insert into tablename set (item1, item2, item3) values
(value1_1, value1_2, value1_3),
(value2_1, value2_1, value2_3),
...


当有大量数据需要插入时,批量操作省去了大量建立、关闭连接的操作,将多次数据库写IO操作合并成一次操作,省去了很多不必要的时间消耗,将显著提高写效率。

写SQL语句相当的清晰明了,但是如果是写代码访问数据库,不同的语言访问不同数据库又有所不同。本文结合博主自己实践讲解下如何在Go语言中批量的实现postgres的插入和更新操作。

数据库访问流程

博主前面有三篇博文详细的介绍了Go语言如何访问SQL like的数据库。《Golang访问SQL like数据库(一)——思想、driver需实现接口》, Golang访问SQL like数据库(二)——sql package主要数据结构及方法, Golang访问SQL Like数据库(三)——sql package + Postgres driver源码走读

三篇文章只是介绍了Go语言中访问SQL like数据库接口的设计思想,数据接口以及代码走读等,但是并没有具体的示例讲解如何实践。本文将作为对那个系列文章的补充,从实战的角度进一步加深理解。

简单的说一次数据库访问的流程包括四个步骤:1)创建、打开一个数据库连接;2)创建一个数据库操作事务;3)执行数据库事务;4)结束、关闭数据库连接。

原始办法

Go的接口化设计让访问数据库变得相当简单,只需要import相应的数据库driver包,然后根据document调用相应的interface,传入正确的餐数据就可以实现的对数据的访问。

注:本文示例使用的是在
database/spl
基础上封装了一层的
sqlx
包,相对于
sql
来说
sqlx
提供的了更多的接口支持。

最原始的方法就是:创建一个数据库连接,发起一次事务,拼SQL语句,执行SQL,commit然后断开连接结束当前事务。然后重新创建连接执行下一条SQL语句。

首先调用
Open()
方法,创建一个数据库连接,返回一个DB对象。

然后调用DB对象的
Beginx
方法开启一次事务;

然后调用事务的
Exec
方法执行插入或更新,或通过
Query
执行查询操作。

最后
Commit
事务并关闭数据库连接。

循环前面的步骤。

具体操作

假设有如下BD

create table test_c (
name varchar,
alia varchar,
city_id int
)


示例代码

func MoreLazyWay() {
db, err := sqlx.Open("postgres", "postgres://postgres:postgres@192.168.56.101/test_db?sslmode=disable")

if err != nil {
fmt.Println("Open error:", err)
panic(err)
}
defer db.Close()

sql := "insert into test_c (name, alia, city_id) values ($1, $2, $3)"

for _, value := range values {
tx, err := db.Beginx()
if err != nil {
fmt.Println("Beginx error:", err)
panic(err)
}

_, err = tx.Exec(sql, value...)
if err != nil {
tx.Rollback()
}
tx.Commit()
}
}


稍微好一点的方法

前面的方法中,我们每操作一条SQL语句都要重新创建一次连接,一次有多条SQL需要执行时,需要重复执行多次SQL连接的建立与关闭。我们首先通过Postgres事务机制,只建立一次连接,把所有的SQL语句都放到一次事务中执行。通过减少连接的建立和关闭来优化数据库访问时间。

流程变为:

Open
建立数据库连接;

Begin
开始一个数据库操作事务;

拼SQL语句,
Exec
执行SQL

循环第#3步,执行所有的SQL

Commit
提交操作,
Close
关闭连接。

为了保证后面测试的独立性,重新创建一个相同数据结构的表。

create table test (
name varchar,
alia varchar,
city_id int
)


示例代码如下:

func LazyWay() {
db, err := sqlx.Open("postgres", "postgres://postgres:postgres@192.168.56.101/test_db?sslmode=disable")

if err != nil {
fmt.Println("Open error:", err)
panic(err)
}
defer db.Close()

tx, err := db.Beginx()
if err != nil {
fmt.Println("Beginx error:", err)
panic(err)
}

sql := "insert into test (name, alia, city_id) values ($1, $2, $3)"

for _, value := range values {
_, err = tx.Exec(sql, value...)
if err != nil {
tx.Rollback()
}
}
tx.Commit()
}


更快的方法

前一种方法主要在数据库的连接以及关闭阶段进行了优化,减少了一些不必要的开销,至于具体效果如何稍后看测试结果。

现在我们继续优化,参考postgres driver(github.com/lib/pq)的文档,postgres的driver支持bulk批量操作。参考pq包的示例,以及结合sqlx(sql)接口,重新优化流程。

因为sqlx在sql包基础上又封装了一层,所以实际
4000
实现与pq包的示例稍有不同。

流程为:

Open
建立数据库连接;

Begin
开始一个数据库操作事务;

拼SQL语句,对所有相同操作的SQL调用tx.Prepare方法返回一个statement准备状态。

调用stmt的
Exec
方法循环将每条SQL的参数写到目标表缓冲区。

所有数据都写完后,再执行
st.Commit
将数据一次性刷新到表

最后关闭连接。

为保证测试不相互影响,创建一个新表

为了保证后面测试的独立性,重新创建一个相同数据结构的表
test_b


create table test_b (
name varchar,
alia varchar,
city_id int
)


示例代码:

func BulkWay() {
db, err := sqlx.Open("postgres", "postgres://postgres:postgres@192.168.56.101/test_db?sslmode=disable")

if err != nil {
fmt.Println("Open error:", err)
panic(err)
}
defer db.Close()

tx, err := db.Beginx()
if err != nil {
fmt.Println("Beginx error:", err)
panic(err)
}

stmt, err := tx.Preparex(db.Rebind("insert into test_b (name, alia, city_id) values ($1, $2, $3)"))
if err != nil {
fmt.Println("Prepare error:", err)
panic(err)
}
for _, value := range values {
_, err = stmt.Exec(value...)
if err != nil {
fmt.Println("Exec error:", err)
panic(err)
}
}

err = stmt.Close()
if err != nil {
fmt.Println("stmt close error:", err)
panic(err)
}

err = tx.Commit()
if err != nil {
fmt.Println("commit error:", err)
panic(err)
}
}


测试

前面三种方法中都使用到一个名字为
values
的变量集,
values
其实一个包含26条记录的数组,意味中每种方法的测试都是26次循环。
values
内容:

var values = [][]interface{}{
{"a", "aa", 1},
{"b", "bb", 2},
{"c", "cc", 3},
{"d", "dd", 4},
{"e", "ee", 5},
{"f", "ff", 6},
{"g", "gg", 7},
{"h", "hh", 8},
{"i", "ii", 9},
{"j", "jj", 10},
{"k", "kk", 11},
{"l", "ll", 12},
{"m", "mm", 13},
{"n", "nn", 14},
{"o", "oo", 15},
{"p", "pp", 16},
{"q", "qq", 17},
{"r", "rr", 18},
{"s", "ss", 19},
{"t", "tt", 20},
{"u", "uu", 21},
{"v", "vv", 22},
{"w", "ww", 23},
{"x", "xx", 24},
{"y", "yy", 25},
{"z", "zz", 26},
}


分别为三种方法写一个benchmark测试函数。

func Benchmark_MoreLazyWay(b *testing.B) {
for i := 0; i < b.N; i++ {
MoreLazyWay()
}
}

func Benchmark_LazyWay(b *testing.B) {
for i := 0; i < b.N; i++ {
LazyWay()
}
}

func Benchmark_BulkWay(b *testing.B) {
for i := 0; i < b.N; i++ {
BulkWay()
}
}


执行benchmark测试。

go test -bench="."
testing: warning: no tests to run
PASS
Benchmark_MoreLazyWay-4       50      38971361 ns/op
Benchmark_LazyWay-4          100      15922024 ns/op
Benchmark_BulkWay-4          100      10421029 ns/op
ok      test/sql    5.181s


测试结果可以看到,执行流程为每种方法一次插入26条记录,第一种方法一共执行了50次循环,第二和第三种方法分别执行了100次循环。从结果可以发下,第一种方法效率相当低下,每次循环耗时是第二种方法的两倍多。同时,第三种方法的耗时只有第二种的三分之二的样子。这还只是在对单次操作只有26条记录的情况下进行优化的结果,如果单次操作记录更多,效果更加明显。

那么,问题来了!

细心的朋友可能这个时候就有疑问了,第二种方法跟第三种方法同样都是一次SQL事务,都是通过循环先执行SQL语句将数据写到目标表,最后再commit结果的,为什么会有这么大的性能差距呢?

让我们来撸撸代码,看看Go sqlx(sql) package和postgres driver——pq内部的一些具体细节。

创建连接,开始一个事务等这些代码这里就不深究了,感兴趣的朋友自己去撸去。这里主要追一下第二中种方法直接执行事务(Tx)的
Exec
方法执行SQL语句与第三种方法通过prepare一个statement,再通过准备状态(St)的Exec执行SQL语句之间的差别,找到第二种方法和第三种方法性能差异的根本原因。

直接调用Tx事务的
Exec
执行SQL流程(第二种方法)

如下是这种方法的大概流程,感兴趣的朋友可以参考这个思路自己去撸源码。

LazyWay()
|
|-> db, err := sqlx.Open()                  //创建DB连接,sqlx package,sqlx.go
|
|-> tx, err := db.Beginx()                  //开始一个事务,sqlx package,sqlx.go
|
|-> res, err := tx.Exec()                   //执行SQL语句,sqlx package,sqlx.go
|       |
|       |-> execer = dc.ci.(driver.Execer) //通过driver的connection得到driver实现的Execer interface。 sql package,sql.go
|       |
|       |-> execer.Exec()                   //执行SQL语句,注Execer同driver的Connection interface 定义的Exec方法是同一个,所以这里实际调用的方法是(cn *Conn)Exec()。pq package, conn.go
|           |
|           |-> st = cn.prepareTo()         //准备一个statement,pq package,conn.go
|           |
|           |-> res = st.Exec()             //执行SQL, pq package,conn.go
|
|-> tx.Commit()                             //提交请求,将数据flash到表中。sqlx package, sqlx.go
|
|-> tx.Close()                              //关闭数据库连接。sqlx package, sqlx.go


从这个流程中,可以发现就算是直接调用事务的
Exec
方法执行SQL语句,最后到了postgres的driver实现层,仍然使用的是首先准备一个statement,然后在执行SQL写数据的方式。

通过Tx的
Prepare
返回一个statement执行SQL流程(第三种方法)

大概流程如下:

BulkWay()
|
|-> db, err := sqlx.Open()                  //创建DB连接,sqlx package,sqlx.go
|
|-> tx, err := db.Beginx()                  //开始一个事务,sqlx package,sqlx.go
|
|-> stmt, err := tx.Preparex()          //准备一个statement, sqlx package, sqlx.go
|       |
|       |-> Tx.Prepare()                        //准备statement实际调用的这个方法,sql package,sql.go
|           |
|           |-> si, err := dc.ci.Prepare() //调用postgres driver的Prepare()方法返回一个准备一个statement。sql package, sql.go
|               |
|               |-> cn.prepareTo()          //准备一个statement,pq package,conn.go
|           |
|           |-> stmt.txsi.si = si
|
|-> res, err = stmt.Exec()                  //执行SQL,sqlx package, sqlx.go
|       |
|       |-> Tx.Exec()                           //实际是sql package包中实现的方法,sql.go
|           |
|           |-> stmt.Exec()                 //最后是调用driver中实现的Exec方法,pq package, conn.go
|               |
|               |-> res, err = resultFromStatement() //执行SQL,pq package,conn.go
|                   |
|                   |-> resi, err = dc.si.Exec()    //最终执行写数据的方法,pq package, conn.go
|
|-> stmt.Close()                                //close stmt,sql package, sql.go
|
|-> tx.Commit()                             //提交请求,将数据flash到表中。sqlx package, sqlx.go
|
|-> tx.Close()                              //关闭数据库连接。sqlx package, sqlx.go


这个流程中可以看到,直接调用sqlx包的
Preparex
方法最终还是调用了postgres driver的
prepareTo
方法准备statement。也就是说前面第二种方法也第三种方法在最后执行SQL的时候,底层逻辑是一样的,流程几乎也是一样的。都是首先准备一个statement,然后执行SQL。

进一步深入问题,既然两种方法底层逻辑、流程几乎都是一样的,那么为什么第二种方法和第三种方法在性能上还有这么大差距呢?

关键性的一点

细心的朋友可能已经发现,第二种方法在执行SQL的时候虽然也是用的先准备一个statement,再写参数的方法。但是这个流程是在Exec里面做的,而Exec是在循环里面做的,以为着这整个过程每一个循环都要完整的执行一遍。但是第三种方法,准备statement是在循环外面做的,循环里面只是在执行写参数的过程。

区别:

所以差别就是:第二种方法循环执行了准备statement的过程,而第三种方法只执行了一次准备statement。

看看prepareTo做了些啥

如下是postgres driver(pq package)中prepareTo的源码。从这段代码中可以看到,主要是一些字符串处理,然后根据SQL语句的格式检查数据库缓冲,准备数据插入格式等。对于同样的SQL操作,这些完全只用进行一次,后续只需要在Exec是根据位置参数传入相应的值即可。

func (cn *conn) prepareTo(q, stmtName string) *stmt {
st := &stmt{cn: cn, name: stmtName}

b := cn.writeBuf('P')
b.string(st.name)
b.string(q)
b.int16(0)

b.next('D')
b.byte('S')
b.string(st.name)

b.next('S')
cn.send(b)

cn.readParseResponse()
st.paramTyps, st.colNames, st.colTyps = cn.readStatementDescribeResponse()
st.colFmts, st.colFmtData = decideColumnFormats(st.colTyps, cn.disablePreparedBinaryResult)
cn.readReadyForQuery()
return st
}


更多

细心的朋友可能会发现第二种方法的流程中,driver底层实现Tx.Exec时没有执行stmt.Close()的步骤,但是第三种方法却单独的执行了stmt.Close()操作。针对这个,作者实验了将第三种方法的stmt.Close()注释掉,代码同样能够正确执行,并不影响结果。

根据posgtres driver(pq package)官方文档的描述,使用第三种方法的执行bulk操作时,需要在最后执行一次没有参数的
Exec()
操作将所有写到缓冲中的数据刷新。实际操作中,第二种方法和第三种方法都没有执行这一步。相反,如果你这么做了,还会得到一个你prepare的SQL语句需要3个参数,但是你一个也没传个它的错误。大概这个是早期设计,文档没有更新的原因吧。如下是官方文档截取。

You can perform bulk imports by preparing a statement returned by pq.CopyIn (or pq.CopyInSchema) in an explicit transaction (sql.Tx). The returned statement handle can then be repeatedly “executed” to copy data into the target table. After all data has been processed you should call Exec() once with no arguments to flush all buffered data. Any call to Exec() might return an error which should be handled appropriately, but because of the internal buffering an error returned by Exec() might not be related to the data passed in the call that failed.

最后

马上就是农历鸡年了,祝大家新年快乐。来年加薪、升职,技术飞跃,少欠技术债(这也博主自己的愿望,今年欠了好多技术债,很多文章都开在草稿箱里。)

参考文档:

https://godoc.org/github.com/lib/pq
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  数据库 sql go