Elasticsearch批量insert和批量upsert
golang版本的elasticsearch批量插入和批量更新方法如下:
package main
import (
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esutil"
)
func main() {
list:=make(User, 0)
bulkES(list)
}
func bulkES(list []User) error {
indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: "search-user",
Client: ES,
})
if err != nil {
return err
}
for _, v := range list {
data, err := json.Marshal(v)
if err != nil {
return err
}
err = indexer.Add(
context.Background(),
esutil.BulkIndexerItem{
Action: "index",
Body: bytes.NewReader(data),
},
)
if err != nil {
return err
}
}
indexer.Close(context.Background())
return nil
}
最简单版的代码整体上就是如此,但需要注意,该代码不适用于生产环境,只能作为测试。主要原因是indexer实例不能多次实例化,所以上面所示代码是一个函数,这个函数是不能多次调用,因为多次进行indexer实例化了,很快就会内存溢出崩溃掉。
正确的使用方式是在init
函数或者连接elasticsearch后进行实例化一次,作为全局变量在后续使用,大概是这样:
package main
import (
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esutil"
)
var userindexer esutil.BulkIndexer
func init() {
indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: "search-user",
Client: ES,
})
if err != nil {
return err
}
userindexer = indexer
}
func bulkES(list []User) error {
for _, v := range list {
data, err := json.Marshal(v)
if err != nil {
return err
}
err = userindexer.Add(
context.Background(),
esutil.BulkIndexerItem{
Action: "index",
Body: bytes.NewReader(data),
},
)
if err != nil {
return err
}
}
// userindexer.Close(context.Background())
return nil
}
注意: 代码只做演示使用,不保证能正确运行。