如何解决我的弹性搜索批量索引器没有正确关闭
我将 AWS ElasticSearch 7.10 与 Kafka 一起使用,并将一些代码推送到我的开发环境。我使用的配置是:
func NewClient(addresses []string,username string,password string) (*elasticsearch.Client,error) {
es,err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: addresses,Username: username,Password: password,RetryOnStatus: []int{502,503,504,429},MaxRetries: 5,})
if err != nil {
return nil,err
}
_,err = es.Info()
if err != nil {
return nil,err
}
return es,nil
}
func NewBulkIndexer(es *elasticsearch.Client,defaultIndexName string) (esutil.BulkIndexer,error) {
bi,err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: defaultIndexName,// The default index name
Client: es,// The Elasticsearch client
NumWorkers: runtime.NumCPU(),// The number of worker goroutines
FlushBytes: int(5e+6),// The flush threshold in bytes
FlushInterval: 30 * time.Second,// The periodic flush interval
})
return bi,err
}
// repository/job.go
func (r *JobRepo) SyncJob(ctx context.Context,indexName string,job domain.Job) error {
body := new(bytes.Buffer)
if err := json.NewEncoder(body).Encode(job); err != nil {
logger.Errorf("Error parsing task: %v",err)
return err
}
err := r.bi.Add(
context.Background(),esutil.BulkIndexerItem{
Index: indexName,Action: "index",DocumentID: job.ID,Body: bytes.NewReader(body.Bytes()),OnSuccess: func(ctx context.Context,item esutil.BulkIndexerItem,res esutil.BulkIndexerResponseItem) {
logger.Infof("Success syncing job ID: %s",job.ID)
},OnFailure: func(ctx context.Context,res esutil.BulkIndexerResponseItem,err error) {
if err != nil {
logger.Errorf("Bulk indexer error: %v",err)
} else {
logger.Errorf("Bulk indexer error: %s: %s",res.Error.Type,res.Error.Reason)
}
},},)
if err != nil {
logger.Errorf("Unexpected error: %s",err)
return err
}
return nil
}
仅在一段时间后,我才添加了关闭批量索引器的代码:
func (r *JobRepo) Disconnect() error {
if err := r.bi.Close(context.Background()); err != nil { logger.Errorf("Unexpected error: %v",err) return err } return nil }
// main.go
addresses := strings.Split(cfg.Elastic.Addresses,",")
es,err := elasticsearch.NewClient(addresses,cfg.Elastic.Username,cfg.Elastic.Password)
if err != nil {
logger.Errorf("Failed to create Elasticsearch client: %v",err)
return err
}
bi,err := elasticsearch.NewBulkIndexer(es,cfg.IndexName)
if err != nil {
logger.Errorf("Failed to create bulk indexer %v",err)
return err
}
repo := repository.NewJobRepo(es,bi,cfg.IndexName)
defer cleanup(repo)
c := make(chan os.Signal,1)
signal.Notify(c,syscall.SIGINT,syscall.SIGTERM,syscall.SIGKILL)
go func() {
logger.Infof("Exiting due to signal: %v",<-c)
cleanup(repo)
os.Exit(1)
}()
.
.
.
.
.
func cleanup(repo *repository.JobRepo) error {
return repo.Disconnect()
}
虽然我已经推送了环境,但是每次启动后都有这样的日志:
成功同步作业 ID:....
超过 50 个条目。我以为我已经正确关闭了批量索引器,但它仍然显示这些消息,尽管我没有发布任何 kafka 消息来同步我的记录。有什么我可以做的吗?我想删除我的 aws 弹性数据库,因为内容并不重要,无论如何我想重新开始使用新的索引模板,但我想知道修复这个批量索引器的其他方法。谢谢
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。