Skip to content

Commit 85ac9ad

Browse files
committed
[CALCITE-6861] Switch to opaque protobuf API
1 parent 3eb181e commit 85ac9ad

15 files changed

+3153
-2116
lines changed

connection.go

+30-30
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ func (c *conn) prepare(ctx context.Context, query string) (driver.Stmt, error) {
4444
return nil, driver.ErrBadConn
4545
}
4646

47-
response, err := c.httpClient.post(ctx, &message.PrepareRequest{
47+
response, err := c.httpClient.post(ctx, message.PrepareRequest_builder{
4848
ConnectionId: c.connectionId,
4949
Sql: query,
5050
MaxRowsTotal: c.config.maxRowsTotal,
51-
})
51+
}.Build())
5252

5353
if err != nil {
5454
return nil, c.avaticaErrorToResponseErrorOrError(err)
@@ -57,10 +57,10 @@ func (c *conn) prepare(ctx context.Context, query string) (driver.Stmt, error) {
5757
prepareResponse := response.(*message.PrepareResponse)
5858

5959
return &stmt{
60-
statementID: prepareResponse.Statement.Id,
60+
statementID: prepareResponse.GetStatement().GetId(),
6161
conn: c,
62-
parameters: prepareResponse.Statement.Signature.Parameters,
63-
handle: prepareResponse.Statement,
62+
parameters: prepareResponse.GetStatement().GetSignature().GetParameters(),
63+
handle: prepareResponse.GetStatement(),
6464
batchUpdates: make([]*message.UpdateBatch, 0),
6565
}, nil
6666
}
@@ -79,9 +79,9 @@ func (c *conn) Close() error {
7979
return driver.ErrBadConn
8080
}
8181

82-
_, err := c.httpClient.post(context.Background(), &message.CloseConnectionRequest{
82+
_, err := c.httpClient.post(context.Background(), message.CloseConnectionRequest_builder{
8383
ConnectionId: c.connectionId,
84-
})
84+
}.Build())
8585

8686
c.connectionId = ""
8787

@@ -106,14 +106,14 @@ func (c *conn) begin(ctx context.Context, isolationLevel isoLevel) (driver.Tx, e
106106
isolationLevel = isoLevel(c.config.transactionIsolation)
107107
}
108108

109-
_, err := c.httpClient.post(ctx, &message.ConnectionSyncRequest{
109+
_, err := c.httpClient.post(ctx, message.ConnectionSyncRequest_builder{
110110
ConnectionId: c.connectionId,
111-
ConnProps: &message.ConnectionProperties{
111+
ConnProps: message.ConnectionProperties_builder{
112112
AutoCommit: false,
113113
HasAutoCommit: true,
114114
TransactionIsolation: uint32(isolationLevel),
115-
},
116-
})
115+
}.Build(),
116+
}.Build())
117117

118118
if err != nil {
119119
return nil, c.avaticaErrorToResponseErrorOrError(err)
@@ -137,31 +137,31 @@ func (c *conn) exec(ctx context.Context, query string, args []namedValue) (drive
137137
return nil, driver.ErrSkip
138138
}
139139

140-
st, err := c.httpClient.post(ctx, &message.CreateStatementRequest{
140+
st, err := c.httpClient.post(ctx, message.CreateStatementRequest_builder{
141141
ConnectionId: c.connectionId,
142-
})
142+
}.Build())
143143

144144
if err != nil {
145145
return nil, c.avaticaErrorToResponseErrorOrError(err)
146146
}
147147

148-
statementID := st.(*message.CreateStatementResponse).StatementId
148+
statementID := st.(*message.CreateStatementResponse).GetStatementId()
149149
defer c.closeStatement(context.Background(), statementID)
150150

151-
res, err := c.httpClient.post(ctx, &message.PrepareAndExecuteRequest{
151+
res, err := c.httpClient.post(ctx, message.PrepareAndExecuteRequest_builder{
152152
ConnectionId: c.connectionId,
153153
StatementId: statementID,
154154
Sql: query,
155155
MaxRowsTotal: c.config.maxRowsTotal,
156156
FirstFrameMaxSize: c.config.frameMaxSize,
157-
})
157+
}.Build())
158158

159159
if err != nil {
160160
return nil, c.avaticaErrorToResponseErrorOrError(err)
161161
}
162162

163163
// Currently there is only 1 ResultSet per response for exec
164-
changed := int64(res.(*message.ExecuteResponse).Results[0].UpdateCount)
164+
changed := int64(res.(*message.ExecuteResponse).GetResults()[0].GetUpdateCount())
165165

166166
return &result{
167167
affectedRows: changed,
@@ -183,30 +183,30 @@ func (c *conn) query(ctx context.Context, query string, args []namedValue) (driv
183183
return nil, driver.ErrSkip
184184
}
185185

186-
st, err := c.httpClient.post(ctx, &message.CreateStatementRequest{
186+
st, err := c.httpClient.post(ctx, message.CreateStatementRequest_builder{
187187
ConnectionId: c.connectionId,
188-
})
188+
}.Build())
189189

190190
if err != nil {
191191
return nil, c.avaticaErrorToResponseErrorOrError(err)
192192
}
193193

194-
statementID := st.(*message.CreateStatementResponse).StatementId
194+
statementID := st.(*message.CreateStatementResponse).GetStatementId()
195195

196-
res, err := c.httpClient.post(ctx, &message.PrepareAndExecuteRequest{
196+
res, err := c.httpClient.post(ctx, message.PrepareAndExecuteRequest_builder{
197197
ConnectionId: c.connectionId,
198198
StatementId: statementID,
199199
Sql: query,
200200
MaxRowsTotal: c.config.maxRowsTotal,
201201
FirstFrameMaxSize: c.config.frameMaxSize,
202-
})
202+
}.Build())
203203

204204
if err != nil {
205205
_ = c.closeStatement(context.Background(), statementID)
206206
return nil, c.avaticaErrorToResponseErrorOrError(err)
207207
}
208208

209-
resultSets := res.(*message.ExecuteResponse).Results
209+
resultSets := res.(*message.ExecuteResponse).GetResults()
210210

211211
return newRows(c, statementID, true, resultSets), nil
212212
}
@@ -226,11 +226,11 @@ func (c *conn) avaticaErrorToResponseErrorOrError(err error) error {
226226
}
227227

228228
return avaticaErrors.ResponseError{
229-
Exceptions: avaticaErr.message.Exceptions,
230-
ErrorMessage: avaticaErr.message.ErrorMessage,
231-
Severity: int8(avaticaErr.message.Severity),
232-
ErrorCode: avaticaErrors.ErrorCode(avaticaErr.message.ErrorCode),
233-
SqlState: avaticaErrors.SQLState(avaticaErr.message.SqlState),
229+
Exceptions: avaticaErr.message.GetExceptions(),
230+
ErrorMessage: avaticaErr.message.GetErrorMessage(),
231+
Severity: int8(avaticaErr.message.GetSeverity()),
232+
ErrorCode: avaticaErrors.ErrorCode(avaticaErr.message.GetErrorCode()),
233+
SqlState: avaticaErrors.SQLState(avaticaErr.message.GetSqlState()),
234234
Metadata: &avaticaErrors.RPCMetadata{
235235
ServerAddress: message.ServerAddressFromMetadata(avaticaErr.message),
236236
},
@@ -247,9 +247,9 @@ func (c *conn) ResetSession(_ context.Context) error {
247247
}
248248

249249
func (c *conn) closeStatement(ctx context.Context, statementID uint32) error {
250-
_, err := c.httpClient.post(context.Background(), &message.CloseStatementRequest{
250+
_, err := c.httpClient.post(context.Background(), message.CloseStatementRequest_builder{
251251
ConnectionId: c.connectionId,
252252
StatementId: statementID,
253-
})
253+
}.Build())
254254
return c.avaticaErrorToResponseErrorOrError(err)
255255
}

docker-compose.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,8 @@ services:
103103
working_dir: /source
104104
command: sh -c "./docker.sh compile-protobuf"
105105
environment:
106-
AVATICA_VERSION: 1.24.0
107-
PROTOBUF_VERSION: 25.1
106+
AVATICA_VERSION: 1.26.0
107+
PROTOBUF_VERSION: 29.3
108108
volumes:
109109
- .:/source
110110
- $GOPATH/pkg/mod/cache:/go/pkg/mod/cache

docker.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,7 @@ compile_protobuf(){
603603

604604
# Compile the protobuf
605605
GITHUB_PACKAGE=github.com/apache/calcite-avatica-go
606-
protoc --proto_path=/tmp/avatica/core/src/main/protobuf --go_out=/source --go_opt=module=$GITHUB_PACKAGE --go_opt=Mcommon.proto=$GITHUB_PACKAGE/message --go_opt=Mrequests.proto=$GITHUB_PACKAGE/message --go_opt=Mresponses.proto=$GITHUB_PACKAGE/message /tmp/avatica/core/src/main/protobuf/*.proto
606+
protoc --proto_path=/tmp/avatica/core/src/main/protobuf --go_out=/source --go_opt=module=$GITHUB_PACKAGE --go_opt=Mcommon.proto=$GITHUB_PACKAGE/message --go_opt=Mrequests.proto=$GITHUB_PACKAGE/message --go_opt=Mresponses.proto=$GITHUB_PACKAGE/message /tmp/avatica/core/src/main/protobuf/*.proto --go_opt=default_api_level=API_OPAQUE #Remove API_OPAQUE when Avatica protobufs are updated to edition 2024
607607

608608
echo "Protobuf compiled successfully"
609609
}

driver.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,9 @@ func (c *Connector) Connect(context.Context) (driver.Conn, error) {
9999
if err != nil {
100100
return nil, err
101101
}
102-
response, err := conn.httpClient.post(context.Background(), &message.DatabasePropertyRequest{
102+
response, err := conn.httpClient.post(context.Background(), message.DatabasePropertyRequest_builder{
103103
ConnectionId: conn.connectionId,
104-
})
104+
}.Build())
105105

106106
if err != nil {
107107
return nil, conn.avaticaErrorToResponseErrorOrError(err)
@@ -111,9 +111,9 @@ func (c *Connector) Connect(context.Context) (driver.Conn, error) {
111111

112112
adapter := ""
113113

114-
for _, property := range databasePropertyResponse.Props {
115-
if property.Key.Name == "GET_DRIVER_NAME" {
116-
adapter = property.Value.StringValue
114+
for _, property := range databasePropertyResponse.GetProps() {
115+
if property.GetKey().GetName() == "GET_DRIVER_NAME" {
116+
adapter = property.GetValue().GetStringValue()
117117
}
118118
}
119119

@@ -131,12 +131,12 @@ func registerConn(conn *conn) error {
131131
info[k] = v
132132
}
133133
// Open a connection to the server
134-
req := &message.OpenConnectionRequest{
134+
req := message.OpenConnectionRequest_builder{
135135
ConnectionId: conn.connectionId,
136136
Info: info,
137-
}
137+
}.Build()
138138
if conn.config.schema != "" {
139-
req.Info["schema"] = conn.config.schema
139+
req.GetInfo()["schema"] = conn.config.schema
140140
}
141141
_, err := conn.httpClient.post(context.Background(), req)
142142
if err != nil {

generic/generic.go

+15-15
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,22 @@ func (a Adapter) GetPingStatement() string {
3737
func (a Adapter) GetColumnTypeDefinition(col *message.ColumnMetaData) *internal.Column {
3838

3939
column := &internal.Column{
40-
Name: col.ColumnName,
41-
TypeName: col.Type.Name,
42-
Nullable: col.Nullable != 0,
40+
Name: col.GetColumnName(),
41+
TypeName: col.GetType().GetName(),
42+
Nullable: col.GetNullable() != 0,
4343
}
4444

4545
// Handle precision and length
46-
switch col.Type.Name {
46+
switch col.GetType().GetName() {
4747
case "DECIMAL":
4848

49-
precision := int64(col.Precision)
49+
precision := int64(col.GetPrecision())
5050

5151
if precision == 0 {
5252
precision = math.MaxInt64
5353
}
5454

55-
scale := int64(col.Scale)
55+
scale := int64(col.GetScale())
5656

5757
if scale == 0 {
5858
scale = math.MaxInt64
@@ -63,11 +63,11 @@ func (a Adapter) GetColumnTypeDefinition(col *message.ColumnMetaData) *internal.
6363
Scale: scale,
6464
}
6565
case "VARCHAR", "CHARACTER VARYING", "CHAR", "CHARACTER", "BINARY", "VARBINARY", "BINARY VARYING":
66-
column.Length = int64(col.Precision)
66+
column.Length = int64(col.GetPrecision())
6767
}
6868

6969
// Handle scan types
70-
switch col.Type.Name {
70+
switch col.GetType().GetName() {
7171
case "INTEGER", "BIGINT", "TINYINT", "SMALLINT":
7272
column.ScanType = reflect.TypeOf(int64(0))
7373

@@ -91,7 +91,7 @@ func (a Adapter) GetColumnTypeDefinition(col *message.ColumnMetaData) *internal.
9191
}
9292

9393
// Handle rep type special cases for decimals, floats, date, time and timestamp
94-
switch col.Type.Name {
94+
switch col.GetType().GetName() {
9595
case "DECIMAL", "NUMERIC":
9696
column.Rep = message.Rep_BIG_DECIMAL
9797
case "FLOAT", "REAL":
@@ -103,19 +103,19 @@ func (a Adapter) GetColumnTypeDefinition(col *message.ColumnMetaData) *internal.
103103
case "TIMESTAMP", "TIMESTAMP WITH LOCAL TIME ZONE", "TIMESTAMP WITH TIME ZONE":
104104
column.Rep = message.Rep_JAVA_SQL_TIMESTAMP
105105
default:
106-
column.Rep = col.Type.Rep
106+
column.Rep = col.GetType().GetRep()
107107
}
108108

109109
return column
110110
}
111111

112112
func (a Adapter) ErrorResponseToResponseError(err *message.ErrorResponse) errors.ResponseError {
113113
return errors.ResponseError{
114-
Exceptions: err.Exceptions,
115-
ErrorMessage: err.ErrorMessage,
116-
Severity: int8(err.Severity),
117-
ErrorCode: errors.ErrorCode(err.ErrorCode),
118-
SqlState: errors.SQLState(err.SqlState),
114+
Exceptions: err.GetExceptions(),
115+
ErrorMessage: err.GetErrorMessage(),
116+
Severity: int8(err.GetSeverity()),
117+
ErrorCode: errors.ErrorCode(err.GetErrorCode()),
118+
SqlState: errors.SQLState(err.GetSqlState()),
119119
Metadata: &errors.RPCMetadata{
120120
ServerAddress: message.ServerAddressFromMetadata(err),
121121
},

hsqldb/hsqldb.go

+15-15
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,22 @@ func (a Adapter) GetPingStatement() string {
3737
func (a Adapter) GetColumnTypeDefinition(col *message.ColumnMetaData) *internal.Column {
3838

3939
column := &internal.Column{
40-
Name: col.ColumnName,
41-
TypeName: col.Type.Name,
42-
Nullable: col.Nullable != 0,
40+
Name: col.GetColumnName(),
41+
TypeName: col.GetType().GetName(),
42+
Nullable: col.GetNullable() != 0,
4343
}
4444

4545
// Handle precision and length
46-
switch col.Type.Name {
46+
switch col.GetType().GetName() {
4747
case "DECIMAL", "NUMERIC":
4848

49-
precision := int64(col.Precision)
49+
precision := int64(col.GetPrecision())
5050

5151
if precision == 0 {
5252
precision = math.MaxInt64
5353
}
5454

55-
scale := int64(col.Scale)
55+
scale := int64(col.GetScale())
5656

5757
if scale == 0 {
5858
scale = math.MaxInt64
@@ -63,11 +63,11 @@ func (a Adapter) GetColumnTypeDefinition(col *message.ColumnMetaData) *internal.
6363
Scale: scale,
6464
}
6565
case "VARCHAR", "CHAR", "CHARACTER", "BINARY", "VARBINARY", "BIT", "BITVARYING":
66-
column.Length = int64(col.Precision)
66+
column.Length = int64(col.GetPrecision())
6767
}
6868

6969
// Handle scan types
70-
switch col.Type.Name {
70+
switch col.GetType().GetName() {
7171
case "INTEGER", "BIGINT", "TINYINT", "SMALLINT":
7272
column.ScanType = reflect.TypeOf(int64(0))
7373

@@ -91,7 +91,7 @@ func (a Adapter) GetColumnTypeDefinition(col *message.ColumnMetaData) *internal.
9191
}
9292

9393
// Handle rep type special cases for decimals, floats, date, time and timestamp
94-
switch col.Type.Name {
94+
switch col.GetType().GetName() {
9595
case "DECIMAL", "NUMERIC":
9696
column.Rep = message.Rep_BIG_DECIMAL
9797
case "FLOAT":
@@ -103,19 +103,19 @@ func (a Adapter) GetColumnTypeDefinition(col *message.ColumnMetaData) *internal.
103103
case "TIMESTAMP":
104104
column.Rep = message.Rep_JAVA_SQL_TIMESTAMP
105105
default:
106-
column.Rep = col.Type.Rep
106+
column.Rep = col.GetType().GetRep()
107107
}
108108

109109
return column
110110
}
111111

112112
func (a Adapter) ErrorResponseToResponseError(err *message.ErrorResponse) errors.ResponseError {
113113
return errors.ResponseError{
114-
Exceptions: err.Exceptions,
115-
ErrorMessage: err.ErrorMessage,
116-
Severity: int8(err.Severity),
117-
ErrorCode: errors.ErrorCode(err.ErrorCode),
118-
SqlState: errors.SQLState(err.SqlState),
114+
Exceptions: err.GetExceptions(),
115+
ErrorMessage: err.GetErrorMessage(),
116+
Severity: int8(err.GetSeverity()),
117+
ErrorCode: errors.ErrorCode(err.GetErrorCode()),
118+
SqlState: errors.SQLState(err.GetSqlState()),
119119
Metadata: &errors.RPCMetadata{
120120
ServerAddress: message.ServerAddressFromMetadata(err),
121121
},

0 commit comments

Comments
 (0)