Skip to content

Commit 176fd74

Browse files
Lars Maierneunhoef
Lars Maier
authored andcommitted
First version of transactions in go. (#199)
1 parent 96e5014 commit 176fd74

6 files changed

+277
-0
lines changed

collection_document_impl.go

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ func (c *collection) DocumentExists(ctx context.Context, key string) (bool, erro
3939
if err != nil {
4040
return false, WithStack(err)
4141
}
42+
applyContextSettings(ctx, req)
4243
resp, err := c.conn.Do(ctx, req)
4344
if err != nil {
4445
return false, WithStack(err)

context.go

+10
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ const (
5858
keyBatchID ContextKey = "arangodb-batchID"
5959
keyJobIDResponse ContextKey = "arangodb-jobIDResponse"
6060
keyAllowDirtyReads ContextKey = "arangodb-allowDirtyReads"
61+
keyTransactionID ContextKey = "arangodb-transactionID"
6162
)
6263

6364
// WithRevision is used to configure a context to make document
@@ -230,6 +231,11 @@ func WithJobIDResponse(parent context.Context, jobID *string) context.Context {
230231
return context.WithValue(contextOrBackground(parent), keyJobIDResponse, jobID)
231232
}
232233

234+
// WithTransactionID is used to bind a request to a specific transaction
235+
func WithTransactionID(parent context.Context, tid TransactionID) context.Context {
236+
return context.WithValue(contextOrBackground(parent), keyTransactionID, tid)
237+
}
238+
233239
type contextSettings struct {
234240
Silent bool
235241
WaitForSync bool
@@ -321,6 +327,10 @@ func applyContextSettings(ctx context.Context, req Request) contextSettings {
321327
result.DirtyReadFlag = dirtyReadFlag
322328
}
323329
}
330+
// TransactionID
331+
if v := ctx.Value(keyTransactionID); v != nil {
332+
req.SetHeader("x-arango-trx-id", string(v.(TransactionID)))
333+
}
324334
// ReturnOld
325335
if v := ctx.Value(keyReturnOld); v != nil {
326336
req.SetQuery("returnOld", "true")

database.go

+3
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ type Database interface {
5252
// Graph functions
5353
DatabaseGraphs
5454

55+
// Streaming Transactions functions
56+
DatabaseStreamingTransactions
57+
5558
// Query performs an AQL query, returning a cursor used to iterate over the returned documents.
5659
// Note that the returned Cursor must always be closed to avoid holding on to resources in the server while they are no longer needed.
5760
Query(ctx context.Context, query string, bindVars map[string]interface{}) (Cursor, error)

database_transactions.go

+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2017 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
// Author Lars Maier
21+
//
22+
23+
package driver
24+
25+
import (
26+
"context"
27+
"time"
28+
)
29+
30+
// BeginTransactionOptions provides options for BeginTransaction call
31+
type BeginTransactionOptions struct {
32+
WaitForSync bool
33+
AllowImplicit bool
34+
LockTimeout time.Duration
35+
MaxTransactionSize uint64
36+
}
37+
38+
// TransactionCollections is used to specify which collecitions are accessed by
39+
// a transaction and how
40+
type TransactionCollections struct {
41+
Read []string `json:"read,omitempty"`
42+
Write []string `json:"write,omitempty"`
43+
Exclusive []string `json:"exclusive,omitempty"`
44+
}
45+
46+
// CommitTransactionOptions provides options for CommitTransaction. Currently unused
47+
type CommitTransactionOptions struct{}
48+
49+
// AbortTransactionOptions provides options for CommitTransaction. Currently unused
50+
type AbortTransactionOptions struct{}
51+
52+
// TransactionID identifies a transaction
53+
type TransactionID string
54+
55+
// TransactionStatus describes the status of an transaction
56+
type TransactionStatus string
57+
58+
const (
59+
TransactionRunning TransactionStatus = "running"
60+
TransactionCommitted TransactionStatus = "committed"
61+
TransactionAborted TransactionStatus = "aborted"
62+
)
63+
64+
// TransactionStatusRecord provides insight about the status of transaction
65+
type TransactionStatusRecord struct {
66+
Status TransactionStatus
67+
}
68+
69+
// DatabaseStreamingTransactions provides access to the Streaming Transactions API
70+
type DatabaseStreamingTransactions interface {
71+
BeginTransaction(ctx context.Context, cols TransactionCollections, opts *BeginTransactionOptions) (TransactionID, error)
72+
CommitTransaction(ctx context.Context, tid TransactionID, opts *CommitTransactionOptions) error
73+
AbortTransaction(ctx context.Context, tid TransactionID, opts *AbortTransactionOptions) error
74+
75+
TransactionStatus(ctx context.Context, tid TransactionID) (TransactionStatusRecord, error)
76+
}

database_transactions_impl.go

+100
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2017 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
// Author Lars Maier
21+
//
22+
23+
package driver
24+
25+
import (
26+
"context"
27+
"path"
28+
)
29+
30+
type beginTransactionRequest struct {
31+
WaitForSync bool `json:"waitForSync,omitempty"`
32+
AllowImplicit bool `json:"allowImplicit,omitempty"`
33+
LockTimeout float64 `json:"lockTimeout,omitempty"`
34+
MaxTransactionSize uint64 `json:"maxTransactionSize,omitempty"`
35+
Collections TransactionCollections `json:"collections,omitempty"`
36+
}
37+
38+
func (d *database) BeginTransaction(ctx context.Context, cols TransactionCollections, opts *BeginTransactionOptions) (TransactionID, error) {
39+
req, err := d.conn.NewRequest("POST", path.Join(d.relPath(), "_api/transaction/begin"))
40+
if err != nil {
41+
return "", WithStack(err)
42+
}
43+
var reqBody beginTransactionRequest
44+
if opts != nil {
45+
reqBody.WaitForSync = opts.WaitForSync
46+
reqBody.AllowImplicit = opts.AllowImplicit
47+
reqBody.LockTimeout = opts.LockTimeout.Seconds()
48+
}
49+
reqBody.Collections = cols
50+
if _, err := req.SetBody(reqBody); err != nil {
51+
return "", WithStack(err)
52+
}
53+
resp, err := d.conn.Do(ctx, req)
54+
if err != nil {
55+
return "", WithStack(err)
56+
}
57+
if err := resp.CheckStatus(201); err != nil {
58+
return "", WithStack(err)
59+
}
60+
var result struct {
61+
TransactionID TransactionID `json:"id,omitempty"`
62+
}
63+
if err := resp.ParseBody("result", &result); err != nil {
64+
return "", WithStack(err)
65+
}
66+
return result.TransactionID, nil
67+
}
68+
69+
func (d *database) requestForTransaction(ctx context.Context, tid TransactionID, method string) (TransactionStatusRecord, error) {
70+
req, err := d.conn.NewRequest(method, path.Join(d.relPath(), "_api/transaction/", string(tid)))
71+
if err != nil {
72+
return TransactionStatusRecord{}, WithStack(err)
73+
}
74+
resp, err := d.conn.Do(ctx, req)
75+
if err != nil {
76+
return TransactionStatusRecord{}, WithStack(err)
77+
}
78+
if err := resp.CheckStatus(200); err != nil {
79+
return TransactionStatusRecord{}, WithStack(err)
80+
}
81+
var result TransactionStatusRecord
82+
if err := resp.ParseBody("result", &result); err != nil {
83+
return TransactionStatusRecord{}, WithStack(err)
84+
}
85+
return result, nil
86+
}
87+
88+
func (d *database) CommitTransaction(ctx context.Context, tid TransactionID, opts *CommitTransactionOptions) error {
89+
_, err := d.requestForTransaction(ctx, tid, "PUT")
90+
return err
91+
}
92+
93+
func (d *database) AbortTransaction(ctx context.Context, tid TransactionID, opts *AbortTransactionOptions) error {
94+
_, err := d.requestForTransaction(ctx, tid, "DELETE")
95+
return err
96+
}
97+
98+
func (d *database) TransactionStatus(ctx context.Context, tid TransactionID) (TransactionStatusRecord, error) {
99+
return d.requestForTransaction(ctx, tid, "GET")
100+
}

test/database_transaction_test.go

+87
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package test
22

33
import (
4+
"context"
45
"fmt"
56
"reflect"
67
"testing"
@@ -38,3 +39,89 @@ func TestDatabaseTransaction(t *testing.T) {
3839
})
3940
}
4041
}
42+
43+
func insertDocument(ctx context.Context, col driver.Collection, t *testing.T) driver.DocumentMeta {
44+
doc := struct {
45+
Name string `json:"name,omitempty"`
46+
}{
47+
Name: "Hello World",
48+
}
49+
if meta, err := col.CreateDocument(ctx, &doc); err != nil {
50+
t.Fatalf("Failed to create document: %s", describe(err))
51+
} else {
52+
return meta
53+
}
54+
return driver.DocumentMeta{}
55+
}
56+
57+
func documentExists(ctx context.Context, col driver.Collection, key string, exists bool, t *testing.T) {
58+
if found, err := col.DocumentExists(ctx, key); err != nil {
59+
t.Fatalf("DocumentExists failed: %s", describe(err))
60+
} else {
61+
if exists != found {
62+
t.Errorf("Document status not as expected: expected: %t, actual: %t", exists, found)
63+
}
64+
}
65+
}
66+
67+
func TestTransactionCommit(t *testing.T) {
68+
c := createClientFromEnv(t, true)
69+
skipBelowVersion(c, "3.5", t)
70+
colname := "trx_test_col"
71+
ctx := context.Background()
72+
db := ensureDatabase(ctx, c, "trx_test", nil, t)
73+
col := ensureCollection(ctx, db, colname, nil, t)
74+
75+
trxid, err := db.BeginTransaction(ctx, driver.TransactionCollections{Exclusive: []string{colname}}, nil)
76+
if err != nil {
77+
t.Fatalf("Failed to begin transaction: %s", describe(err))
78+
}
79+
80+
tctx := driver.WithTransactionID(ctx, trxid)
81+
meta1 := insertDocument(tctx, col, t)
82+
83+
// document should not exist without transaction
84+
documentExists(ctx, col, meta1.Key, false, t)
85+
86+
// document should exist with transaction
87+
documentExists(tctx, col, meta1.Key, true, t)
88+
89+
// Now commit the transaction
90+
if err := db.CommitTransaction(ctx, trxid, nil); err != nil {
91+
t.Fatalf("Failed to commit transaction: %s", describe(err))
92+
}
93+
94+
// document should exist
95+
documentExists(ctx, col, meta1.Key, true, t)
96+
}
97+
98+
func TestTransactionAbort(t *testing.T) {
99+
c := createClientFromEnv(t, true)
100+
skipBelowVersion(c, "3.5", t)
101+
colname := "trx_test_col_abort"
102+
ctx := context.Background()
103+
db := ensureDatabase(ctx, c, "trx_test", nil, t)
104+
col := ensureCollection(ctx, db, colname, nil, t)
105+
106+
trxid, err := db.BeginTransaction(ctx, driver.TransactionCollections{Exclusive: []string{colname}}, nil)
107+
if err != nil {
108+
t.Fatalf("Failed to begin transaction: %s", describe(err))
109+
}
110+
111+
tctx := driver.WithTransactionID(ctx, trxid)
112+
meta1 := insertDocument(tctx, col, t)
113+
114+
// document should not exist without transaction
115+
documentExists(ctx, col, meta1.Key, false, t)
116+
117+
// document should exist with transaction
118+
documentExists(tctx, col, meta1.Key, true, t)
119+
120+
// Now commit the transaction
121+
if err := db.AbortTransaction(ctx, trxid, nil); err != nil {
122+
t.Fatalf("Failed to abort transaction: %s", describe(err))
123+
}
124+
125+
// document should exist
126+
documentExists(ctx, col, meta1.Key, false, t)
127+
}

0 commit comments

Comments
 (0)