Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions cloud/src/rasp-cloud/es/es.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"fmt"
"github.com/astaxie/beego"
"github.com/astaxie/beego/logs"
"github.com/olivere/elastic"
"github.com/olivere/elastic/v7"
"rasp-cloud/conf"
"rasp-cloud/environment"
"rasp-cloud/tools"
Expand All @@ -34,8 +34,8 @@ var (
ElasticClient *elastic.Client
Version string
ttlIndexes = make(chan map[string]time.Duration, 1)
minEsVersion = "5.6.0"
maxEsVersion = "7.0.0"
minEsVersion = "7.0.0"
maxEsVersion = "9.0.0"
)

func init() {
Expand Down Expand Up @@ -178,10 +178,10 @@ func CreateEsIndex(index string, alias string, template string) error {
return nil
}

func Insert(index string, docType string, doc interface{}) (err error) {
func Insert(index string, doc interface{}) (err error) {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
defer cancel()
_, err = ElasticClient.Index().Index(index).Type(docType).BodyJson(doc).Do(ctx)
_, err = ElasticClient.Index().Index(index).BodyJson(doc).Do(ctx)
return
}

Expand All @@ -196,15 +196,13 @@ func BulkInsertAlarm(docType string, docs []map[string]interface{}) (err error)
docType == "crash-alarm" {
bulkService.Add(elastic.NewBulkUpdateRequest().
Index("real-openrasp-" + docType + "-" + appId).
Type(docType).
Id(fmt.Sprint(doc["upsert_id"])).
DocAsUpsert(true).
Doc(doc))
} else {
if appId, ok := doc["app_id"].(string); ok {
bulkService.Add(elastic.NewBulkIndexRequest().
Index("real-openrasp-" + docType + "-" + appId).
Type(docType).
OpType("index").
Doc(doc))
}
Expand Down Expand Up @@ -233,12 +231,11 @@ func BulkInsertAlarm(docType string, docs []map[string]interface{}) (err error)
return err
}

func BulkInsert(index string, docType string, docs []map[string]interface{}) (err error) {
func BulkInsert(index string, docs []map[string]interface{}) (err error) {
bulkService := ElasticClient.Bulk()
for _, doc := range docs {
bulkService.Add(elastic.NewBulkUpdateRequest().
Index(index).
Type(docType).
Id(fmt.Sprint(doc["upsert_id"])).
DocAsUpsert(true).
Doc(doc["content"]))
Expand All @@ -256,10 +253,10 @@ func DeleteIndex(indexName string) error {
return err
}

func DeleteByQuery(index string, docType string, query elastic.Query) error {
func DeleteByQuery(index string, query elastic.Query) error {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(15*time.Second))
defer cancel()
_, err := ElasticClient.DeleteByQuery(index).Type(docType).Query(query).ProceedOnVersionConflict().Do(ctx)
_, err := ElasticClient.DeleteByQuery(index).Query(query).ProceedOnVersionConflict().Do(ctx)
if err != nil {
beego.Error("failed to delete by query", err)
return err
Expand Down
Loading