1717
1818//! Benchmark derived from TPC-H. This is not an official TPC-H benchmark.
1919
20+ use futures:: future:: join_all;
21+ use rand:: prelude:: * ;
22+ use std:: ops:: Div ;
2023use std:: {
2124 fs,
2225 iter:: Iterator ,
@@ -137,6 +140,48 @@ struct DataFusionBenchmarkOpt {
137140 mem_table : bool ,
138141}
139142
143+ #[ derive( Debug , StructOpt , Clone ) ]
144+ struct BallistaLoadtestOpt {
145+ #[ structopt( short = "q" , long) ]
146+ query_list : String ,
147+
148+ /// Activate debug mode to see query results
149+ #[ structopt( short, long) ]
150+ debug : bool ,
151+
152+ /// Number of requests
153+ #[ structopt( short = "r" , long = "requests" , default_value = "100" ) ]
154+ requests : usize ,
155+
156+ /// Number of connections
157+ #[ structopt( short = "c" , long = "concurrency" , default_value = "5" ) ]
158+ concurrency : usize ,
159+
160+ /// Number of partitions to process in parallel
161+ #[ structopt( short = "n" , long = "partitions" , default_value = "2" ) ]
162+ partitions : usize ,
163+
164+ /// Path to data files
165+ #[ structopt( parse( from_os_str) , required = true , short = "p" , long = "data-path" ) ]
166+ path : PathBuf ,
167+
168+ /// Path to sql files
169+ #[ structopt( parse( from_os_str) , required = true , long = "sql-path" ) ]
170+ sql_path : PathBuf ,
171+
172+ /// File format: `csv` or `parquet`
173+ #[ structopt( short = "f" , long = "format" , default_value = "parquet" ) ]
174+ file_format : String ,
175+
176+ /// Ballista executor host
177+ #[ structopt( long = "host" ) ]
178+ host : Option < String > ,
179+
180+ /// Ballista executor port
181+ #[ structopt( long = "port" ) ]
182+ port : Option < u16 > ,
183+ }
184+
140185#[ derive( Debug , StructOpt ) ]
141186struct ConvertOpt {
142187 /// Path to csv files
@@ -173,11 +218,19 @@ enum BenchmarkSubCommandOpt {
173218 DataFusionBenchmark ( DataFusionBenchmarkOpt ) ,
174219}
175220
221+ #[ derive( Debug , StructOpt ) ]
222+ #[ structopt( about = "loadtest command" ) ]
223+ enum LoadtestOpt {
224+ #[ structopt( name = "ballista-load" ) ]
225+ BallistaLoadtest ( BallistaLoadtestOpt ) ,
226+ }
227+
176228#[ derive( Debug , StructOpt ) ]
177229#[ structopt( name = "TPC-H" , about = "TPC-H Benchmarks." ) ]
178230enum TpchOpt {
179231 Benchmark ( BenchmarkSubCommandOpt ) ,
180232 Convert ( ConvertOpt ) ,
233+ Loadtest ( LoadtestOpt ) ,
181234}
182235
183236const TABLES : & [ & str ] = & [
@@ -187,6 +240,7 @@ const TABLES: &[&str] = &[
187240#[ tokio:: main]
188241async fn main ( ) -> Result < ( ) > {
189242 use BenchmarkSubCommandOpt :: * ;
243+ use LoadtestOpt :: * ;
190244
191245 env_logger:: init ( ) ;
192246 match TpchOpt :: from_args ( ) {
@@ -197,6 +251,9 @@ async fn main() -> Result<()> {
197251 benchmark_datafusion ( opt) . await . map ( |_| ( ) )
198252 }
199253 TpchOpt :: Convert ( opt) => convert_tbl ( opt) . await ,
254+ TpchOpt :: Loadtest ( BallistaLoadtest ( opt) ) => {
255+ loadtest_ballista ( opt) . await . map ( |_| ( ) )
256+ }
200257 }
201258}
202259
@@ -268,6 +325,151 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
268325 // register tables with Ballista context
269326 let path = opt. path . to_str ( ) . unwrap ( ) ;
270327 let file_format = opt. file_format . as_str ( ) ;
328+
329+ register_tables ( path, file_format, & ctx) . await ;
330+
331+ let mut millis = vec ! [ ] ;
332+
333+ // run benchmark
334+ let sql = get_query_sql ( opt. query ) ?;
335+ println ! ( "Running benchmark with query {}:\n {}" , opt. query, sql) ;
336+ for i in 0 ..opt. iterations {
337+ let start = Instant :: now ( ) ;
338+ let df = ctx
339+ . sql ( & sql)
340+ . await
341+ . map_err ( |e| DataFusionError :: Plan ( format ! ( "{:?}" , e) ) )
342+ . unwrap ( ) ;
343+ let batches = df
344+ . collect ( )
345+ . await
346+ . map_err ( |e| DataFusionError :: Plan ( format ! ( "{:?}" , e) ) )
347+ . unwrap ( ) ;
348+ let elapsed = start. elapsed ( ) . as_secs_f64 ( ) * 1000.0 ;
349+ millis. push ( elapsed as f64 ) ;
350+ println ! ( "Query {} iteration {} took {:.1} ms" , opt. query, i, elapsed) ;
351+ if opt. debug {
352+ pretty:: print_batches ( & batches) ?;
353+ }
354+ }
355+
356+ let avg = millis. iter ( ) . sum :: < f64 > ( ) / millis. len ( ) as f64 ;
357+ println ! ( "Query {} avg time: {:.2} ms" , opt. query, avg) ;
358+
359+ Ok ( ( ) )
360+ }
361+
362+ async fn loadtest_ballista ( opt : BallistaLoadtestOpt ) -> Result < ( ) > {
363+ println ! (
364+ "Running loadtest_ballista with the following options: {:?}" ,
365+ opt
366+ ) ;
367+
368+ let config = BallistaConfig :: builder ( )
369+ . set (
370+ BALLISTA_DEFAULT_SHUFFLE_PARTITIONS ,
371+ & format ! ( "{}" , opt. partitions) ,
372+ )
373+ . build ( )
374+ . map_err ( |e| DataFusionError :: Execution ( format ! ( "{:?}" , e) ) ) ?;
375+
376+ let concurrency = opt. concurrency ;
377+ let request_amount = opt. requests ;
378+ let mut clients = vec ! [ ] ;
379+
380+ for _num in 0 ..concurrency {
381+ clients. push ( BallistaContext :: remote (
382+ opt. host . clone ( ) . unwrap ( ) . as_str ( ) ,
383+ opt. port . unwrap ( ) ,
384+ & config,
385+ ) ) ;
386+ }
387+
388+ // register tables with Ballista context
389+ let path = opt. path . to_str ( ) . unwrap ( ) ;
390+ let file_format = opt. file_format . as_str ( ) ;
391+ let sql_path = opt. sql_path . to_str ( ) . unwrap ( ) . to_string ( ) ;
392+
393+ for ctx in & clients {
394+ register_tables ( path, file_format, ctx) . await ;
395+ }
396+
397+ let request_per_thread = request_amount. div ( concurrency) ;
398+ // run benchmark
399+ let query_list: Vec < usize > = opt
400+ . query_list
401+ . split ( ',' )
402+ . map ( |s| s. parse ( ) . unwrap ( ) )
403+ . collect ( ) ;
404+ println ! ( "query list: {:?} " , & query_list) ;
405+
406+ let total = Instant :: now ( ) ;
407+ let mut futures = vec ! [ ] ;
408+
409+ for ( client_id, client) in clients. into_iter ( ) . enumerate ( ) {
410+ let query_list_clone = query_list. clone ( ) ;
411+ let sql_path_clone = sql_path. clone ( ) ;
412+ let handle = tokio:: spawn ( async move {
413+ for i in 0 ..request_per_thread {
414+ let query_id = query_list_clone
415+ . get (
416+ ( 0 ..query_list_clone. len ( ) )
417+ . choose ( & mut rand:: thread_rng ( ) )
418+ . unwrap ( ) ,
419+ )
420+ . unwrap ( ) ;
421+ let sql =
422+ get_query_sql_by_path ( query_id. to_owned ( ) , sql_path_clone. clone ( ) )
423+ . unwrap ( ) ;
424+ println ! (
425+ "Client {} Round {} Query {} started" ,
426+ & client_id, & i, query_id
427+ ) ;
428+ let start = Instant :: now ( ) ;
429+ let df = client
430+ . sql ( & sql)
431+ . await
432+ . map_err ( |e| DataFusionError :: Plan ( format ! ( "{:?}" , e) ) )
433+ . unwrap ( ) ;
434+ let batches = df
435+ . collect ( )
436+ . await
437+ . map_err ( |e| DataFusionError :: Plan ( format ! ( "{:?}" , e) ) )
438+ . unwrap ( ) ;
439+ let elapsed = start. elapsed ( ) . as_secs_f64 ( ) * 1000.0 ;
440+ println ! (
441+ "Client {} Round {} Query {} took {:.1} ms " ,
442+ & client_id, & i, query_id, elapsed
443+ ) ;
444+ if opt. debug {
445+ pretty:: print_batches ( & batches) . unwrap ( ) ;
446+ }
447+ }
448+ } ) ;
449+ futures. push ( handle) ;
450+ }
451+ join_all ( futures) . await ;
452+ let elapsed = total. elapsed ( ) . as_secs_f64 ( ) * 1000.0 ;
453+ println ! ( "###############################" ) ;
454+ println ! ( "load test took {:.1} ms" , elapsed) ;
455+ Ok ( ( ) )
456+ }
457+
458+ fn get_query_sql_by_path ( query : usize , mut sql_path : String ) -> Result < String > {
459+ if sql_path. ends_with ( '/' ) {
460+ sql_path. pop ( ) ;
461+ }
462+ if query > 0 && query < 23 {
463+ let filename = format ! ( "{}/q{}.sql" , sql_path, query) ;
464+ Ok ( fs:: read_to_string ( & filename) . expect ( "failed to read query" ) )
465+ } else {
466+ Err ( DataFusionError :: Plan (
467+ "invalid query. Expected value between 1 and 22" . to_owned ( ) ,
468+ ) )
469+ }
470+ }
471+
472+ async fn register_tables ( path : & str , file_format : & str , ctx : & BallistaContext ) {
271473 for table in TABLES {
272474 match file_format {
273475 // dbgen creates .tbl ('|' delimited) files without header
@@ -281,55 +483,30 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
281483 . file_extension ( ".tbl" ) ;
282484 ctx. register_csv ( table, & path, options)
283485 . await
284- . map_err ( |e| DataFusionError :: Plan ( format ! ( "{:?}" , e) ) ) ?;
486+ . map_err ( |e| DataFusionError :: Plan ( format ! ( "{:?}" , e) ) )
487+ . unwrap ( ) ;
285488 }
286489 "csv" => {
287490 let path = format ! ( "{}/{}" , path, table) ;
288491 let schema = get_schema ( table) ;
289492 let options = CsvReadOptions :: new ( ) . schema ( & schema) . has_header ( true ) ;
290493 ctx. register_csv ( table, & path, options)
291494 . await
292- . map_err ( |e| DataFusionError :: Plan ( format ! ( "{:?}" , e) ) ) ?;
495+ . map_err ( |e| DataFusionError :: Plan ( format ! ( "{:?}" , e) ) )
496+ . unwrap ( ) ;
293497 }
294498 "parquet" => {
295499 let path = format ! ( "{}/{}" , path, table) ;
296500 ctx. register_parquet ( table, & path)
297501 . await
298- . map_err ( |e| DataFusionError :: Plan ( format ! ( "{:?}" , e) ) ) ?;
502+ . map_err ( |e| DataFusionError :: Plan ( format ! ( "{:?}" , e) ) )
503+ . unwrap ( ) ;
299504 }
300505 other => {
301506 unimplemented ! ( "Invalid file format '{}'" , other) ;
302507 }
303508 }
304509 }
305-
306- let mut millis = vec ! [ ] ;
307-
308- // run benchmark
309- let sql = get_query_sql ( opt. query ) ?;
310- println ! ( "Running benchmark with query {}:\n {}" , opt. query, sql) ;
311- for i in 0 ..opt. iterations {
312- let start = Instant :: now ( ) ;
313- let df = ctx
314- . sql ( & sql)
315- . await
316- . map_err ( |e| DataFusionError :: Plan ( format ! ( "{:?}" , e) ) ) ?;
317- let batches = df
318- . collect ( )
319- . await
320- . map_err ( |e| DataFusionError :: Plan ( format ! ( "{:?}" , e) ) ) ?;
321- let elapsed = start. elapsed ( ) . as_secs_f64 ( ) * 1000.0 ;
322- millis. push ( elapsed as f64 ) ;
323- println ! ( "Query {} iteration {} took {:.1} ms" , opt. query, i, elapsed) ;
324- if opt. debug {
325- pretty:: print_batches ( & batches) ?;
326- }
327- }
328-
329- let avg = millis. iter ( ) . sum :: < f64 > ( ) / millis. len ( ) as f64 ;
330- println ! ( "Query {} avg time: {:.2} ms" , opt. query, avg) ;
331-
332- Ok ( ( ) )
333510}
334511
335512fn get_query_sql ( query : usize ) -> Result < String > {
0 commit comments