Elasticsearch批量insert和批量upsert
golang版本的elasticsearch批量插入和批量更新方法如下:
package main
import (
  "github.com/elastic/go-elasticsearch/v8"
  "github.com/elastic/go-elasticsearch/v8/esutil"
)
func main()  {
  list:=make(User, 0)
  bulkES(list)
}
func bulkES(list []User) error {
	indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
		Index:  "search-user",
		Client: ES,
	})
	if err != nil {
		return err
	}
	for _, v := range list {
		data, err := json.Marshal(v)
		if err != nil {
			return err
		}
		err = indexer.Add(
			context.Background(),
			esutil.BulkIndexerItem{
				Action: "index",
				Body:   bytes.NewReader(data),
			},
		)
		if err != nil {
			return err
		}
	}
	indexer.Close(context.Background())
	return nil
}
最简单版的代码整体上就是如此,但需要注意,该代码不适用于生产环境,只能作为测试。主要原因是indexer实例不能多次实例化,所以上面所示代码是一个函数,这个函数是不能多次调用,因为多次进行indexer实例化了,很快就会内存溢出崩溃掉。
正确的使用方式是在init函数或者连接elasticsearch后进行实例化一次,作为全局变量在后续使用,大概是这样:
package main
import (
  "github.com/elastic/go-elasticsearch/v8"
  "github.com/elastic/go-elasticsearch/v8/esutil"
)
var userindexer esutil.BulkIndexer
func init() {
  indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
		Index:  "search-user",
		Client: ES,
	})
  if err != nil {
		return err
	}
  userindexer = indexer
}
func bulkES(list []User) error {
	for _, v := range list {
		data, err := json.Marshal(v)
		if err != nil {
			return err
		}
		err = userindexer.Add(
			context.Background(),
			esutil.BulkIndexerItem{
				Action: "index",
				Body:   bytes.NewReader(data),
			},
		)
		if err != nil {
			return err
		}
	}
	// userindexer.Close(context.Background())
	return nil
}
注意: 代码只做演示使用,不保证能正确运行。