Elasticsearch批量insert和批量upsert
golang版本的elasticsearch批量插入和批量更新方法如下:
package main
import (
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esutil"
)
func main() {
list:=make(User, 0)
bulkES(list)
}
func bulkES(list []User) error {
indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: "search-user",
Client: ES,
})
if err != nil {
return err
}
for _, v := range list {
data, err := json.Marshal(v)
if err != nil {
return err
}
err = indexer.Add(
context.Background(),
esutil.BulkIndexerItem{
Action: "index",
Body: bytes.NewReader(data),
},
)
if err != nil {
return err
}
}
indexer.Close(context.Background())
return nil
}
最简单版的代码整体上就是如此,但需要注意,该代码不适用于生产环境,只能作为测试。主要原因是indexer实例不能多次实例化,所以上面所示代码是一个函数,这个函数是不能多次调用,因为多次进行indexer实例化了,很快就会内存溢出崩溃掉。
正确的使用方式是在init
函数或者连接elasticsearch后进行实例化一次,作为全局变量在后续使用,大概是这样:
package main
import (
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esutil"
)
var userindexer esutil.BulkIndexer
func init() {
indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: "search-user",
Client: ES,
})
if err != nil {
return err
}
userindexer = indexer
}
func bulkES(list []User) error {
for _, v := range list {
data, err := json.Marshal(v)
if err != nil {
return err
}
err = userindexer.Add(
context.Background(),
esutil.BulkIndexerItem{
Action: "index",
Body: bytes.NewReader(data),
},
)
if err != nil {
return err
}
}
// userindexer.Close(context.Background())
return nil
}
注意: 代码只做演示使用,不保证能正确运行。
userindexer.Close(context.Background())
在实际使用中不使用,因为我们不是使用一次后就关闭。
action:(行为),包含create(文档不存在时创建)、update(更新文档)、index(创建新文档或替换已有文档)、delete(删除一个文档)。
当action设置为index时有upsert的作用。一定要设置文档的_id
,只有_id
存在相同的文档时才会更新已有文档。
以下两张图,在未设置_id时会自动生成不同的文档,当设置了自定义的id后,后续就会更新,而不是创建新的文档。可以从图片的updatedAt时间看出,13:48分之后没有新的文档,只更新了原有文档。