@@ -119,14 +119,13 @@ class KibbleBit:
119
119
""" KibbleBit class with direct ElasticSearch access """
120
120
121
121
def __init__ (self , broker , organisation , tid ):
122
- self .config = broker .config
123
122
self .organisation = organisation
124
123
self .broker = broker
125
124
self .json_queue = []
126
125
self .queueMax = 1000 # Entries to keep before bulk pushing
127
126
self .pluginname = ""
128
127
self .tid = tid
129
- self .dbname = self . broker . config [ "elasticsearch" ][ "database" ]
128
+ self .dbname = conf . get ( "elasticsearch" , "database" )
130
129
131
130
def __del__ (self ):
132
131
""" On unload/delete, push the last chunks of data to ES """
@@ -144,7 +143,7 @@ def pprint(self, string, err=False):
144
143
def update_source (self , source ):
145
144
""" Updates a source document, usually with a status update """
146
145
self .broker .DB .index (
147
- index = self .broker . config [ "elasticsearch" ][ "database" ] ,
146
+ index = self .dbname ,
148
147
doc_type = "source" ,
149
148
id = source ["sourceID" ],
150
149
body = source ,
@@ -153,7 +152,7 @@ def update_source(self, source):
153
152
def get (self , doctype , docid ):
154
153
""" Fetches a document from the DB """
155
154
doc = self .broker .DB .get (
156
- index = self .broker . config [ "elasticsearch" ][ "database" ] ,
155
+ index = self .dbname ,
157
156
doc_type = doctype ,
158
157
id = docid ,
159
158
)
@@ -164,14 +163,14 @@ def get(self, doctype, docid):
164
163
def exists (self , doctype , docid ):
165
164
""" Checks whether a document already exists or not """
166
165
return self .broker .DB .exists (
167
- index = self .broker . config [ "elasticsearch" ][ "database" ] ,
166
+ index = self .dbname ,
168
167
doc_type = doctype ,
169
168
id = docid ,
170
169
)
171
170
172
171
def index (self , doctype , docid , document ):
173
172
""" Adds a new document to the index """
174
- dbname = self .broker . config [ "elasticsearch" ][ "database" ]
173
+ dbname = self .dbname
175
174
self .broker .DB .index (index = dbname , doc_type = doctype , id = docid , body = document )
176
175
177
176
def append (self , t , doc ):
@@ -195,7 +194,7 @@ def bulk(self):
195
194
js = entry
196
195
doc = js
197
196
js ["@version" ] = 1
198
- dbname = self .broker . config [ "elasticsearch" ][ "database" ]
197
+ dbname = self .dbname
199
198
if self .broker .noTypes :
200
199
dbname += "_%s" % js ["doctype" ]
201
200
js_arr .append (
@@ -233,6 +232,7 @@ def __init__(self, broker, org):
233
232
234
233
self .broker = broker
235
234
self .id = org
235
+ self .dbname = conf .get ("elasticsearch" , "database" )
236
236
237
237
def sources (self , sourceType = None , view = None ):
238
238
""" Get all sources or sources of a specific type for an org """
@@ -241,7 +241,7 @@ def sources(self, sourceType=None, view=None):
241
241
mustArray = [{"term" : {"organisation" : self .id }}]
242
242
if view :
243
243
res = self .broker .DB .get (
244
- index = self .broker . config [ "elasticsearch" ][ "database" ] ,
244
+ index = self .dbname ,
245
245
doc_type = "view" ,
246
246
id = view ,
247
247
)
@@ -252,7 +252,7 @@ def sources(self, sourceType=None, view=None):
252
252
mustArray .append ({"term" : {"type" : sourceType }})
253
253
# Run the search, fetch all results, 9999 max. TODO: Scroll???
254
254
res = self .broker .DB .search (
255
- index = self .broker . config [ "elasticsearch" ][ "database" ] ,
255
+ index = self .dbname ,
256
256
doc_type = "source" ,
257
257
size = 9999 ,
258
258
body = {"query" : {"bool" : {"must" : mustArray }}, "sort" : {"sourceURL" : "asc" }},
0 commit comments