17
17
18
18
//! Distributed execution context.
19
19
20
+ use log:: info;
20
21
use parking_lot:: Mutex ;
21
22
use sqlparser:: ast:: Statement ;
22
23
use std:: collections:: HashMap ;
@@ -25,7 +26,8 @@ use std::path::PathBuf;
25
26
use std:: sync:: Arc ;
26
27
27
28
use ballista_core:: config:: BallistaConfig ;
28
- use ballista_core:: serde:: protobuf:: LogicalPlanNode ;
29
+ use ballista_core:: serde:: protobuf:: scheduler_grpc_client:: SchedulerGrpcClient ;
30
+ use ballista_core:: serde:: protobuf:: { ExecuteQueryParams , KeyValuePair , LogicalPlanNode } ;
29
31
use ballista_core:: utils:: create_df_ctx_with_ballista_query_planner;
30
32
31
33
use datafusion:: catalog:: TableReference ;
@@ -63,26 +65,86 @@ impl BallistaContextState {
63
65
}
64
66
}
65
67
68
+ pub fn config ( & self ) -> & BallistaConfig {
69
+ & self . config
70
+ }
71
+ }
72
+
73
+ pub struct BallistaContext {
74
+ state : Arc < Mutex < BallistaContextState > > ,
75
+ context : Arc < SessionContext > ,
76
+ }
77
+
78
+ impl BallistaContext {
79
+ /// Create a context for executing queries against a remote Ballista scheduler instance
80
+ pub async fn remote (
81
+ host : & str ,
82
+ port : u16 ,
83
+ config : & BallistaConfig ,
84
+ ) -> ballista_core:: error:: Result < Self > {
85
+ let state = BallistaContextState :: new ( host. to_owned ( ) , port, config) ;
86
+
87
+ let scheduler_url =
88
+ format ! ( "http://{}:{}" , & state. scheduler_host, state. scheduler_port) ;
89
+ info ! (
90
+ "Connecting to Ballista scheduler at {}" ,
91
+ scheduler_url. clone( )
92
+ ) ;
93
+ let mut scheduler = SchedulerGrpcClient :: connect ( scheduler_url. clone ( ) )
94
+ . await
95
+ . map_err ( |e| DataFusionError :: Execution ( format ! ( "{:?}" , e) ) ) ?;
96
+
97
+ let remote_session_id = scheduler
98
+ . execute_query ( ExecuteQueryParams {
99
+ query : None ,
100
+ settings : config
101
+ . settings ( )
102
+ . iter ( )
103
+ . map ( |( k, v) | KeyValuePair {
104
+ key : k. to_owned ( ) ,
105
+ value : v. to_owned ( ) ,
106
+ } )
107
+ . collect :: < Vec < _ > > ( ) ,
108
+ optional_session_id : None ,
109
+ } )
110
+ . await
111
+ . map_err ( |e| DataFusionError :: Execution ( format ! ( "{:?}" , e) ) ) ?
112
+ . into_inner ( )
113
+ . session_id ;
114
+
115
+ info ! (
116
+ "Server side SessionContext created with session id: {}" ,
117
+ remote_session_id
118
+ ) ;
119
+
120
+ let ctx = {
121
+ create_df_ctx_with_ballista_query_planner :: < LogicalPlanNode > (
122
+ scheduler_url,
123
+ remote_session_id,
124
+ state. config ( ) ,
125
+ )
126
+ } ;
127
+
128
+ Ok ( Self {
129
+ state : Arc :: new ( Mutex :: new ( state) ) ,
130
+ context : Arc :: new ( ctx) ,
131
+ } )
132
+ }
133
+
66
134
#[ cfg( feature = "standalone" ) ]
67
- pub async fn new_standalone (
135
+ pub async fn standalone (
68
136
config : & BallistaConfig ,
69
137
concurrent_tasks : usize ,
70
138
) -> ballista_core:: error:: Result < Self > {
71
- use ballista_core:: serde:: protobuf:: scheduler_grpc_client:: SchedulerGrpcClient ;
72
139
use ballista_core:: serde:: protobuf:: PhysicalPlanNode ;
73
140
use ballista_core:: serde:: BallistaCodec ;
74
141
75
142
log:: info!( "Running in local mode. Scheduler will be run in-proc" ) ;
76
143
77
144
let addr = ballista_scheduler:: standalone:: new_standalone_scheduler ( ) . await ?;
78
-
79
- let scheduler = loop {
80
- match SchedulerGrpcClient :: connect ( format ! (
81
- "http://localhost:{}" ,
82
- addr. port( )
83
- ) )
84
- . await
85
- {
145
+ let scheduler_url = format ! ( "http://localhost:{}" , addr. port( ) ) ;
146
+ let mut scheduler = loop {
147
+ match SchedulerGrpcClient :: connect ( scheduler_url. clone ( ) ) . await {
86
148
Err ( _) => {
87
149
tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 100 ) ) . await ;
88
150
log:: info!( "Attempting to connect to in-proc scheduler..." ) ;
@@ -91,6 +153,37 @@ impl BallistaContextState {
91
153
}
92
154
} ;
93
155
156
+ let remote_session_id = scheduler
157
+ . execute_query ( ExecuteQueryParams {
158
+ query : None ,
159
+ settings : config
160
+ . settings ( )
161
+ . iter ( )
162
+ . map ( |( k, v) | KeyValuePair {
163
+ key : k. to_owned ( ) ,
164
+ value : v. to_owned ( ) ,
165
+ } )
166
+ . collect :: < Vec < _ > > ( ) ,
167
+ optional_session_id : None ,
168
+ } )
169
+ . await
170
+ . map_err ( |e| DataFusionError :: Execution ( format ! ( "{:?}" , e) ) ) ?
171
+ . into_inner ( )
172
+ . session_id ;
173
+
174
+ info ! (
175
+ "Server side SessionContext created with session id: {}" ,
176
+ remote_session_id
177
+ ) ;
178
+
179
+ let ctx = {
180
+ create_df_ctx_with_ballista_query_planner :: < LogicalPlanNode > (
181
+ scheduler_url,
182
+ remote_session_id,
183
+ config,
184
+ )
185
+ } ;
186
+
94
187
let default_codec: BallistaCodec < LogicalPlanNode , PhysicalPlanNode > =
95
188
BallistaCodec :: default ( ) ;
96
189
@@ -101,43 +194,12 @@ impl BallistaContextState {
101
194
)
102
195
. await ?;
103
196
104
- Ok ( Self {
105
- config : config. clone ( ) ,
106
- scheduler_host : "localhost" . to_string ( ) ,
107
- scheduler_port : addr. port ( ) ,
108
- tables : HashMap :: new ( ) ,
109
- } )
110
- }
111
-
112
- pub fn config ( & self ) -> & BallistaConfig {
113
- & self . config
114
- }
115
- }
116
-
117
- pub struct BallistaContext {
118
- state : Arc < Mutex < BallistaContextState > > ,
119
- }
120
-
121
- impl BallistaContext {
122
- /// Create a context for executing queries against a remote Ballista scheduler instance
123
- pub fn remote ( host : & str , port : u16 , config : & BallistaConfig ) -> Self {
124
- let state = BallistaContextState :: new ( host. to_owned ( ) , port, config) ;
125
-
126
- Self {
127
- state : Arc :: new ( Mutex :: new ( state) ) ,
128
- }
129
- }
130
-
131
- #[ cfg( feature = "standalone" ) ]
132
- pub async fn standalone (
133
- config : & BallistaConfig ,
134
- concurrent_tasks : usize ,
135
- ) -> ballista_core:: error:: Result < Self > {
136
197
let state =
137
- BallistaContextState :: new_standalone ( config , concurrent_tasks ) . await ? ;
198
+ BallistaContextState :: new ( "localhost" . to_string ( ) , addr . port ( ) , config ) ;
138
199
139
200
Ok ( Self {
140
201
state : Arc :: new ( Mutex :: new ( state) ) ,
202
+ context : Arc :: new ( ctx) ,
141
203
} )
142
204
}
143
205
@@ -152,15 +214,7 @@ impl BallistaContext {
152
214
let path = PathBuf :: from ( path) ;
153
215
let path = fs:: canonicalize ( & path) ?;
154
216
155
- // use local DataFusion context for now but later this might call the scheduler
156
- let mut ctx = {
157
- let guard = self . state . lock ( ) ;
158
- create_df_ctx_with_ballista_query_planner :: < LogicalPlanNode > (
159
- & guard. scheduler_host ,
160
- guard. scheduler_port ,
161
- guard. config ( ) ,
162
- )
163
- } ;
217
+ let ctx = self . context . clone ( ) ;
164
218
let df = ctx. read_avro ( path. to_str ( ) . unwrap ( ) , options) . await ?;
165
219
Ok ( df)
166
220
}
@@ -172,15 +226,7 @@ impl BallistaContext {
172
226
let path = PathBuf :: from ( path) ;
173
227
let path = fs:: canonicalize ( & path) ?;
174
228
175
- // use local DataFusion context for now but later this might call the scheduler
176
- let mut ctx = {
177
- let guard = self . state . lock ( ) ;
178
- create_df_ctx_with_ballista_query_planner :: < LogicalPlanNode > (
179
- & guard. scheduler_host ,
180
- guard. scheduler_port ,
181
- guard. config ( ) ,
182
- )
183
- } ;
229
+ let ctx = self . context . clone ( ) ;
184
230
let df = ctx. read_parquet ( path. to_str ( ) . unwrap ( ) ) . await ?;
185
231
Ok ( df)
186
232
}
@@ -196,15 +242,7 @@ impl BallistaContext {
196
242
let path = PathBuf :: from ( path) ;
197
243
let path = fs:: canonicalize ( & path) ?;
198
244
199
- // use local DataFusion context for now but later this might call the scheduler
200
- let mut ctx = {
201
- let guard = self . state . lock ( ) ;
202
- create_df_ctx_with_ballista_query_planner :: < LogicalPlanNode > (
203
- & guard. scheduler_host ,
204
- guard. scheduler_port ,
205
- guard. config ( ) ,
206
- )
207
- } ;
245
+ let ctx = self . context . clone ( ) ;
208
246
let df = ctx. read_csv ( path. to_str ( ) . unwrap ( ) , options) . await ?;
209
247
Ok ( df)
210
248
}
@@ -291,34 +329,31 @@ impl BallistaContext {
291
329
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
292
330
/// might require the schema to be inferred.
293
331
pub async fn sql ( & self , sql : & str ) -> Result < Arc < DataFrame > > {
294
- let mut ctx = {
295
- let state = self . state . lock ( ) ;
296
- create_df_ctx_with_ballista_query_planner :: < LogicalPlanNode > (
297
- & state. scheduler_host ,
298
- state. scheduler_port ,
299
- state. config ( ) ,
300
- )
301
- } ;
332
+ let mut ctx = self . context . clone ( ) ;
302
333
303
334
let is_show = self . is_show_statement ( sql) . await ?;
304
335
// the show tables、 show columns sql can not run at scheduler because the tables is store at client
305
336
if is_show {
306
337
let state = self . state . lock ( ) ;
307
- ctx = SessionContext :: with_config (
338
+ ctx = Arc :: new ( SessionContext :: with_config (
308
339
SessionConfig :: new ( ) . with_information_schema (
309
340
state. config . default_with_information_schema ( ) ,
310
341
) ,
311
- ) ;
342
+ ) ) ;
312
343
}
313
344
314
345
// register tables with DataFusion context
315
346
{
316
347
let state = self . state . lock ( ) ;
317
348
for ( name, prov) in & state. tables {
318
- ctx. register_table (
319
- TableReference :: Bare { table : name } ,
320
- Arc :: clone ( prov) ,
321
- ) ?;
349
+ // ctx is shared between queries, check table exists or not before register
350
+ let table_ref = TableReference :: Bare { table : name } ;
351
+ if !ctx. table_exist ( table_ref) ? {
352
+ ctx. register_table (
353
+ TableReference :: Bare { table : name } ,
354
+ Arc :: clone ( prov) ,
355
+ ) ?;
356
+ }
322
357
}
323
358
}
324
359
@@ -341,16 +376,16 @@ impl BallistaContext {
341
376
. has_header ( * has_header) ,
342
377
)
343
378
. await ?;
344
- Ok ( Arc :: new ( DataFrame :: new ( ctx. state , & plan) ) )
379
+ Ok ( Arc :: new ( DataFrame :: new ( ctx. state . clone ( ) , & plan) ) )
345
380
}
346
381
FileType :: Parquet => {
347
382
self . register_parquet ( name, location) . await ?;
348
- Ok ( Arc :: new ( DataFrame :: new ( ctx. state , & plan) ) )
383
+ Ok ( Arc :: new ( DataFrame :: new ( ctx. state . clone ( ) , & plan) ) )
349
384
}
350
385
FileType :: Avro => {
351
386
self . register_avro ( name, location, AvroReadOptions :: default ( ) )
352
387
. await ?;
353
- Ok ( Arc :: new ( DataFrame :: new ( ctx. state , & plan) ) )
388
+ Ok ( Arc :: new ( DataFrame :: new ( ctx. state . clone ( ) , & plan) ) )
354
389
}
355
390
_ => Err ( DataFusionError :: NotImplemented ( format ! (
356
391
"Unsupported file type {:?}." ,
@@ -475,17 +510,13 @@ mod tests {
475
510
use datafusion:: arrow:: datatypes:: Schema ;
476
511
use datafusion:: arrow:: util:: pretty;
477
512
use datafusion:: datasource:: file_format:: csv:: CsvFormat ;
478
- use datafusion:: datasource:: file_format:: parquet:: ParquetFormat ;
479
513
use datafusion:: datasource:: listing:: {
480
514
ListingOptions , ListingTable , ListingTableConfig ,
481
515
} ;
482
516
483
517
use ballista_core:: config:: {
484
518
BallistaConfigBuilder , BALLISTA_WITH_INFORMATION_SCHEMA ,
485
519
} ;
486
- use std:: fs:: File ;
487
- use std:: io:: Write ;
488
- use tempfile:: TempDir ;
489
520
let config = BallistaConfigBuilder :: default ( )
490
521
. set ( BALLISTA_WITH_INFORMATION_SCHEMA , "true" )
491
522
. build ( )
0 commit comments