Skip to content

Commit 3862d21

Browse files
committed
Add caching of sharding function
The ddl.bucket_id() function needs to know a sharding function. It is costly to obtain the function declaration / definition stored in the _ddl_sharding_func space. This cache adds sharding function cache divided into two parts: raw and processed. Raw part is used for get_schema() method. Raw cache stored as is. Processed part is used for bucket_id(). Processed sharding_func cache entry may be: * table with parsed dot notation (like {'foo', 'bar'}) * function ready to call, this offloads using of loadstring() * string with an error Cache will be rebuilded if: * _ddl_sharding_func space changed: cache sets _ddl_sharding_func:on_replace trigger * schema changed: cache checks box.internal.schema_version changes This patch does not serve hot reload techniques. This entails an on_replace trigger duplication if hot reload occurs. Hot reload support will be done in separate task: #87 Closes #82
1 parent 17804f8 commit 3862d21

File tree

4 files changed

+341
-11
lines changed

4 files changed

+341
-11
lines changed

ddl/cache.lua

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
local fiber = require('fiber')
2+
3+
local cache = nil
4+
5+
local CACHE_LOCK_TIMEOUT = 3
6+
local SPACE_NAME_IDX = 1
7+
local SHARD_FUNC_NAME_IDX = 2
8+
local SHARD_FUNC_BODY_IDX = 3
9+
10+
-- Function decorator that is used to prevent cache_build() from being
11+
-- called concurrently by different fibers.
12+
local function locked(f)
13+
return function(...)
14+
local ok = cache.lock:put(true, CACHE_LOCK_TIMEOUT)
15+
16+
if not ok then
17+
error("cache lock timeout is exceeded")
18+
end
19+
20+
local status, err = pcall(f, ...)
21+
cache.lock:get()
22+
23+
if not status or err ~= nil then
24+
return err
25+
end
26+
end
27+
end
28+
29+
-- Build cache.
30+
--
31+
-- Cache structure format:
32+
-- cache = {
33+
-- spaces = {
34+
-- 'space_name' = {
35+
-- raw = {}, -- raw sharding metadata, used for ddl.get()
36+
-- processed = {} -- table with parsed dot notation (like {'foo', 'bar'})
37+
-- -- or a function ready to call (or a string with an error)
38+
-- }
39+
-- },
40+
-- lock -- locking based on fiber.channel()
41+
-- }
42+
local cache_build = locked(function()
43+
-- clear cache
44+
cache.spaces = {}
45+
46+
if box.space._ddl_sharding_func == nil then
47+
return
48+
end
49+
50+
for _, tuple in box.space._ddl_sharding_func:pairs() do
51+
local space_name = tuple[SPACE_NAME_IDX]
52+
local func_name = tuple[SHARD_FUNC_NAME_IDX]
53+
local func_body = tuple[SHARD_FUNC_BODY_IDX]
54+
55+
cache.spaces[space_name] = {}
56+
cache.spaces[space_name].raw = tuple
57+
58+
if func_body ~= nil then
59+
local sharding_func, err = loadstring('return ' .. func_body)
60+
if sharding_func == nil then
61+
cache.spaces[space_name].processed =
62+
string.format("Body is incorrect in sharding_func for space (%s): %s",
63+
space_name, err)
64+
else
65+
cache.spaces[space_name].processed =
66+
sharding_func()
67+
end
68+
elseif func_name ~= nil then
69+
local chunks = string.split(func_name, '.')
70+
cache.spaces[space_name].processed = chunks
71+
end
72+
end
73+
74+
cache._schema_version = box.internal.schema_version()
75+
end)
76+
77+
-- Rebuild cache if _ddl_sharding_func space changed.
78+
local function cache_set_trigger()
79+
if box.space._ddl_sharding_func == nil then
80+
return
81+
end
82+
83+
local trigger_found = false
84+
85+
for _, func in pairs(box.space._ddl_sharding_func:on_replace()) do
86+
if func == cache_build then
87+
trigger_found = true
88+
break
89+
end
90+
end
91+
92+
if not trigger_found then
93+
box.space._ddl_sharding_func:on_replace(cache_build)
94+
end
95+
end
96+
97+
-- Get data from cache.
98+
-- Returns all cached data for "space_name".
99+
local function cache_get(space_name)
100+
if space_name == nil then
101+
return nil
102+
end
103+
104+
local schema_version = box.internal.schema_version()
105+
106+
if not cache then
107+
cache = {
108+
lock = fiber.channel(1)
109+
}
110+
cache_build()
111+
cache_set_trigger()
112+
end
113+
114+
-- rebuild cache if database schema changed
115+
if schema_version ~= cache._schema_version then
116+
cache_build()
117+
cache_set_trigger()
118+
end
119+
120+
return cache.spaces[space_name]
121+
end
122+
123+
return {
124+
internal = {
125+
get = cache_get,
126+
}
127+
}

ddl/get.lua

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
local utils = require('ddl.utils')
2+
local cache = require('ddl.cache')
23
local ddl_check = require('ddl.check')
34

45
local function _get_index_field_path(space, index_part)
@@ -66,11 +67,24 @@ local function get_metadata(space_name, metadata_name)
6667
end
6768

6869
local function get_sharding_func(space_name)
69-
local record = get_metadata(space_name, "sharding_func")
70+
local record = cache.internal.get(space_name)
71+
7072
if not record then
7173
return nil
7274
end
7375

76+
return record.processed
77+
end
78+
79+
local function get_sharding_func_raw(space_name)
80+
local record = cache.internal.get(space_name)
81+
82+
if not record or not record.raw then
83+
return nil
84+
end
85+
86+
record = record.raw
87+
7488
if record.sharding_func_body ~= nil then
7589
return {body = record.sharding_func_body}
7690
end
@@ -97,7 +111,7 @@ local function get_space_schema(space_name)
97111
space_ddl.engine = box_space.engine
98112
space_ddl.format = box_space:format()
99113
space_ddl.sharding_key = get_sharding_key(space_name)
100-
space_ddl.sharding_func = get_sharding_func(space_name)
114+
space_ddl.sharding_func = get_sharding_func_raw(space_name)
101115
for _, field in ipairs(space_ddl.format) do
102116
if field.is_nullable == nil then
103117
field.is_nullable = false
@@ -115,21 +129,21 @@ local function get_space_schema(space_name)
115129
end
116130

117131
local function prepare_sharding_func_for_call(space_name, sharding_func_def)
118-
if type(sharding_func_def) == 'string' then
132+
if type(sharding_func_def) == 'table' then
119133
local sharding_func = utils.get_G_function(sharding_func_def)
120134
if sharding_func ~= nil and
121135
ddl_check.internal.is_callable(sharding_func) == true then
122136
return sharding_func
123137
end
124138
end
125139

126-
if type(sharding_func_def) == 'table' then
127-
local sharding_func, err = loadstring('return ' .. sharding_func_def.body)
128-
if sharding_func == nil then
129-
return nil, string.format(
130-
"Body is incorrect in sharding_func for space (%s): %s", space_name, err)
131-
end
132-
return sharding_func()
140+
if type(sharding_func_def) == 'function' then
141+
return sharding_func_def
142+
end
143+
144+
-- error from cache
145+
if type(sharding_func_def) == 'string' then
146+
return nil, sharding_func_def
133147
end
134148

135149
return nil, string.format(

ddl/utils.lua

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,19 @@ end
189189
-- split sharding func name in dot notation by dot
190190
-- foo.bar.baz -> chunks: foo bar baz
191191
-- foo -> chunks: foo
192+
--
193+
-- func_name parameter may be a string in dot notation or table
194+
-- if func_name type is of type table it is assumed that it is already split
192195
local function get_G_function(func_name)
193-
local chunks = string.split(func_name, '.')
194196
local sharding_func = _G
197+
local chunks
198+
199+
if type(func_name) == 'string' then
200+
chunks = string.split(func_name, '.')
201+
else
202+
chunks = func_name
203+
end
204+
195205
-- check is the each chunk an identifier
196206
for _, chunk in pairs(chunks) do
197207
if not check_name_isident(chunk) or sharding_func == nil then

test/cache_test.lua

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
#!/usr/bin/env tarantool
2+
3+
local t = require('luatest')
4+
local db = require('test.db')
5+
local ddl = require('ddl')
6+
local cache = require('ddl.cache')
7+
8+
local SPACE_NAME_IDX = 1
9+
local SHARD_FUNC_NAME_IDX = 2
10+
local SHARD_FUNC_BODY_IDX = 3
11+
12+
local test_space = {
13+
engine = 'memtx',
14+
is_local = true,
15+
temporary = false,
16+
format = {
17+
{name = 'unsigned_nonnull', type = 'unsigned', is_nullable = false},
18+
{name = 'unsigned_nullable', type = 'unsigned', is_nullable = true},
19+
{name = 'integer_nonnull', type = 'integer', is_nullable = false},
20+
{name = 'integer_nullable', type = 'integer', is_nullable = true},
21+
{name = 'number_nonnull', type = 'number', is_nullable = false},
22+
{name = 'number_nullable', type = 'number', is_nullable = true},
23+
{name = 'boolean_nonnull', type = 'boolean', is_nullable = false},
24+
{name = 'boolean_nullable', type = 'boolean', is_nullable = true},
25+
{name = 'string_nonnull', type = 'string', is_nullable = false},
26+
{name = 'string_nullable', type = 'string', is_nullable = true},
27+
{name = 'scalar_nonnull', type = 'scalar', is_nullable = false},
28+
{name = 'scalar_nullable', type = 'scalar', is_nullable = true},
29+
{name = 'array_nonnull', type = 'array', is_nullable = false},
30+
{name = 'array_nullable', type = 'array', is_nullable = true},
31+
{name = 'map_nonnull', type = 'map', is_nullable = false},
32+
{name = 'map_nullable', type = 'map', is_nullable = true},
33+
{name = 'any_nonnull', type = 'any', is_nullable = false},
34+
{name = 'any_nullable', type = 'any', is_nullable = true},
35+
},
36+
}
37+
38+
local primary_index = {
39+
type = 'HASH',
40+
unique = true,
41+
parts = {
42+
{path = 'string_nonnull', is_nullable = false, type = 'string'},
43+
{path = 'unsigned_nonnull', is_nullable = false, type = 'unsigned'},
44+
},
45+
name = 'primary'
46+
}
47+
48+
local bucket_id_idx = {
49+
type = 'TREE',
50+
unique = false,
51+
parts = {{path = 'bucket_id', type = 'unsigned', is_nullable = false}},
52+
name = 'bucket_id'
53+
}
54+
55+
local func_body_first = 'function() return 42 end'
56+
local func_body_second = 'function() return 24 end'
57+
58+
local function rebuild_db(g)
59+
db.drop_all()
60+
61+
g.space = table.deepcopy(test_space)
62+
table.insert(g.space.format, 1, {
63+
name = 'bucket_id', type = 'unsigned', is_nullable = false
64+
})
65+
66+
g.space.indexes = {
67+
table.deepcopy(primary_index),
68+
table.deepcopy(bucket_id_idx)
69+
}
70+
g.space.sharding_key = {'unsigned_nonnull', 'integer_nonnull'}
71+
g.schema = {
72+
spaces = {
73+
space = g.space,
74+
}
75+
}
76+
end
77+
78+
local g = t.group()
79+
g.before_all(db.init)
80+
g.before_each(function()
81+
rebuild_db(g)
82+
end)
83+
84+
function g.test_cache_processed_func_body()
85+
g.schema.spaces.space.sharding_func = {
86+
body = func_body_first
87+
}
88+
local ok, err = ddl.set_schema(g.schema)
89+
t.assert_equals(err, nil)
90+
t.assert_equals(ok, true)
91+
92+
local res = cache.internal.get('space')
93+
t.assert(res)
94+
t.assert(res.processed)
95+
res = res.processed
96+
t.assert(type(res) == 'function')
97+
t.assert_equals(res(), 42)
98+
end
99+
100+
function g.test_cache_processed_func_name()
101+
local sharding_func_name = 'sharding_func'
102+
rawset(_G, sharding_func_name, function(key) return key end)
103+
g.schema.spaces.space.sharding_func = sharding_func_name
104+
105+
local ok, err = ddl.set_schema(g.schema)
106+
t.assert_equals(err, nil)
107+
t.assert_equals(ok, true)
108+
109+
local res = cache.internal.get('space')
110+
t.assert(res)
111+
t.assert(res.processed)
112+
res = res.processed
113+
t.assert(type(res) == 'table')
114+
t.assert_equals(res[1], 'sharding_func')
115+
116+
rawset(_G, sharding_func_name, nil)
117+
end
118+
119+
function g.test_cache_schema_changed()
120+
g.schema.spaces.space.sharding_func = {
121+
body = func_body_first
122+
}
123+
local ok, err = ddl.set_schema(g.schema)
124+
t.assert_equals(err, nil)
125+
t.assert_equals(ok, true)
126+
127+
local res = cache.internal.get('space')
128+
t.assert(res)
129+
t.assert(res.raw)
130+
res = res.raw
131+
t.assert_equals(res[SPACE_NAME_IDX], 'space')
132+
t.assert_equals(res[SHARD_FUNC_NAME_IDX], nil)
133+
t.assert_equals(res[SHARD_FUNC_BODY_IDX], func_body_first)
134+
135+
rebuild_db(g)
136+
137+
g.schema.spaces.space.sharding_func = {
138+
body = func_body_second
139+
}
140+
local ok, err = ddl.set_schema(g.schema)
141+
t.assert_equals(err, nil)
142+
t.assert_equals(ok, true)
143+
144+
local res = cache.internal.get('space')
145+
t.assert(res)
146+
t.assert(res.raw)
147+
res = res.raw
148+
t.assert_equals(res[SPACE_NAME_IDX], 'space')
149+
t.assert_equals(res[SHARD_FUNC_NAME_IDX], nil)
150+
t.assert_equals(res[SHARD_FUNC_BODY_IDX], func_body_second)
151+
end
152+
153+
function g.test_cache_space_updated()
154+
g.schema.spaces.space.sharding_func = {
155+
body = func_body_first
156+
}
157+
local ok, err = ddl.set_schema(g.schema)
158+
t.assert_equals(err, nil)
159+
t.assert_equals(ok, true)
160+
161+
local res = cache.internal.get('space')
162+
t.assert(res)
163+
t.assert(res.raw)
164+
res = res.raw
165+
t.assert_equals(res[SPACE_NAME_IDX], 'space')
166+
t.assert_equals(res[SHARD_FUNC_NAME_IDX], nil)
167+
t.assert_equals(res[SHARD_FUNC_BODY_IDX], func_body_first)
168+
169+
box.space._ddl_sharding_func
170+
:update({'space'}, {{'=', SHARD_FUNC_BODY_IDX, func_body_second}})
171+
172+
local res = cache.internal.get('space')
173+
t.assert(res)
174+
t.assert(res.raw)
175+
res = res.raw
176+
t.assert_equals(res[SPACE_NAME_IDX], 'space')
177+
t.assert_equals(res[SHARD_FUNC_NAME_IDX], nil)
178+
t.assert_equals(res[SHARD_FUNC_BODY_IDX], func_body_second)
179+
end

0 commit comments

Comments
 (0)