Elasticsearch批量insert和批量upsert

golang版本的elasticsearch批量插入和批量更新方法如下:

package main

import (
  "github.com/elastic/go-elasticsearch/v8"
  "github.com/elastic/go-elasticsearch/v8/esutil"
)
func main()  {
  list:=make(User, 0)
  bulkES(list)
}

func bulkES(list []User) error {
	indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
		Index:  "search-user",
		Client: ES,
	})
	if err != nil {
		return err
	}
	for _, v := range list {
		data, err := json.Marshal(v)
		if err != nil {
			return err
		}
		err = indexer.Add(
			context.Background(),
			esutil.BulkIndexerItem{
				Action: "index",
				Body:   bytes.NewReader(data),
			},
		)
		if err != nil {
			return err
		}
	}
	indexer.Close(context.Background())
	return nil
}

最简单版的代码整体上就是如此,但需要注意,该代码不适用于生产环境,只能作为测试。主要原因是indexer实例不能多次实例化,所以上面所示代码是一个函数,这个函数是不能多次调用,因为多次进行indexer实例化了,很快就会内存溢出崩溃掉。

正确的使用方式是在init函数或者连接elasticsearch后进行实例化一次,作为全局变量在后续使用,大概是这样:

package main

import (
  "github.com/elastic/go-elasticsearch/v8"
  "github.com/elastic/go-elasticsearch/v8/esutil"
)
var userindexer esutil.BulkIndexer

func init() {
  indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
		Index:  "search-user",
		Client: ES,
	})
  if err != nil {
		return err
	}
  userindexer = indexer
}
func bulkES(list []User) error {
	for _, v := range list {
		data, err := json.Marshal(v)
		if err != nil {
			return err
		}
		err = userindexer.Add(
			context.Background(),
			esutil.BulkIndexerItem{
				Action: "index",
				Body:   bytes.NewReader(data),
			},
		)
		if err != nil {
			return err
		}
	}
	// userindexer.Close(context.Background())
	return nil
}

注意: 代码只做演示使用,不保证能正确运行。

userindexer.Close(context.Background())在实际使用中不使用,因为我们不是使用一次后就关闭。

action:(行为),包含create(文档不存在时创建)、update(更新文档)、index(创建新文档或替换已有文档)、delete(删除一个文档)。 当action设置为index时有upsert的作用。一定要设置文档的_id,只有_id存在相同的文档时才会更新已有文档。

以下两张图,在未设置_id时会自动生成不同的文档,当设置了自定义的id后,后续就会更新,而不是创建新的文档。可以从图片的updatedAt时间看出,13:48分之后没有新的文档,只更新了原有文档。

elasticseach

elasticseach