@@ -2,7 +2,8 @@ use super::AuroraConnection;
2
2
use crate :: aurora:: error:: AuroraDatabaseError ;
3
3
use crate :: aurora:: statement:: AuroraStatementMetadata ;
4
4
use crate :: aurora:: {
5
- Aurora , AuroraArguments , AuroraColumn , AuroraDone , AuroraRow , AuroraStatement , AuroraTypeInfo ,
5
+ Aurora , AuroraArguments , AuroraColumn , AuroraDbType , AuroraDone , AuroraRow , AuroraStatement ,
6
+ AuroraTypeInfo ,
6
7
} ;
7
8
use crate :: describe:: Describe ;
8
9
use crate :: error:: Error ;
@@ -16,6 +17,8 @@ use futures_core::stream::BoxStream;
16
17
use futures_core:: Stream ;
17
18
use futures_util:: stream;
18
19
use futures_util:: { pin_mut, TryStreamExt } ;
20
+ use once_cell:: sync:: Lazy ;
21
+ use regex:: Regex ;
19
22
use rusoto_rds_data:: { ExecuteStatementRequest , ExecuteStatementResponse , RdsData } ;
20
23
use std:: borrow:: Cow ;
21
24
use std:: sync:: Arc ;
@@ -24,15 +27,47 @@ impl AuroraConnection {
24
27
async fn run < ' e , ' c : ' e , ' q : ' e > (
25
28
& ' c mut self ,
26
29
query : & ' q str ,
27
- arguments : Option < AuroraArguments > ,
30
+ mut arguments : Option < AuroraArguments > ,
28
31
) -> Result < impl Stream < Item = Result < Either < AuroraDone , AuroraRow > , Error > > + ' e , Error > {
29
32
let mut logger = QueryLogger :: new ( query, self . log_settings . clone ( ) ) ;
30
33
34
+ static MYSQL_PARAMS_RE : Lazy < Regex > = Lazy :: new ( || Regex :: new ( r"\?" ) . unwrap ( ) ) ;
35
+ static POSTGRES_PARAMS_RE : Lazy < Regex > = Lazy :: new ( || Regex :: new ( r"\$\d+" ) . unwrap ( ) ) ;
36
+
37
+ let regex = match self . db_type {
38
+ AuroraDbType :: MySQL => & MYSQL_PARAMS_RE ,
39
+ AuroraDbType :: Postgres => & POSTGRES_PARAMS_RE ,
40
+ } ;
41
+
42
+ let mut offset = 0 ;
43
+ let mut owned_query = query. to_owned ( ) ;
44
+
45
+ if let Some ( arguments) = arguments. as_mut ( ) {
46
+ regex
47
+ . find_iter ( query)
48
+ . zip ( arguments. parameters . iter_mut ( ) )
49
+ . enumerate ( )
50
+ . for_each ( |( idx, ( mat, param) ) | {
51
+ let name = format ! ( "param_{}" , idx + 1 ) ;
52
+
53
+ owned_query. replace_range (
54
+ ( mat. start ( ) + offset) ..( mat. end ( ) + offset) ,
55
+ & format ! ( ":{}" , name) ,
56
+ ) ;
57
+
58
+ offset += name. len ( ) + 1 - mat. as_str ( ) . len ( ) ;
59
+
60
+ param. name = Some ( name) ;
61
+ } ) ;
62
+ }
63
+
64
+ dbg ! ( & owned_query) ;
65
+
31
66
// TODO: is this correct?
32
67
let transaction_id = self . transaction_ids . last ( ) . cloned ( ) ;
33
68
34
69
let request = ExecuteStatementRequest {
35
- sql : query . to_owned ( ) ,
70
+ sql : owned_query ,
36
71
parameters : arguments. map ( |m| m. parameters ) ,
37
72
resource_arn : self . resource_arn . clone ( ) ,
38
73
secret_arn : self . secret_arn . clone ( ) ,
0 commit comments