Skip to content

Commit d8c016d

Browse files
committed
Add caching of sharding function
The ddl.bucket_id() function needs to know a sharding function. It is costly to obtain the function declaration / definition stored in the _ddl_sharding_func space. This cache adds sharding function cache divided into two parts: raw and processed. Raw part is used for get_space_schema() method. Raw cache stored as is. Processed part is used for bucket_id(). Closes #82
1 parent 17804f8 commit d8c016d

File tree

4 files changed

+322
-12
lines changed

4 files changed

+322
-12
lines changed

ddl/cache.lua

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
local fiber = require('fiber')
2+
3+
local cache = nil
4+
5+
local CACHE_LOCK_TIMEOUT = 3
6+
local SPACE_NAME_IDX = 1
7+
local SHARD_FUNC_NAME_IDX = 2
8+
local SHARD_FUNC_BODY_IDX = 3
9+
10+
-- Build cache and setup "on_replace" trigger.
11+
--
12+
-- Cache structure format:
13+
-- cache = {
14+
-- spaces = {
15+
-- 'space_name' = {
16+
-- raw = {}, -- raw sharding metadata, used for ddl.get()
17+
-- processed = {} -- sharding function that is ready to call or a string with error
18+
-- }
19+
-- },
20+
-- lock -- locking based on fiber.channel()
21+
-- }
22+
local function cache_build()
23+
-- prevent cache_build() from being called concurrently by different fibers
24+
local unlocked = cache.lock:put(true, CACHE_LOCK_TIMEOUT)
25+
local trigger_found = false
26+
27+
if not unlocked then
28+
return
29+
end
30+
31+
-- clear cache
32+
cache.spaces = {}
33+
34+
if box.space._ddl_sharding_func == nil then
35+
cache.lock:get()
36+
return nil
37+
end
38+
39+
for _, tuple in box.space._ddl_sharding_func:pairs() do
40+
local space_name = tuple[SPACE_NAME_IDX]
41+
local func_name = tuple[SHARD_FUNC_NAME_IDX]
42+
local func_body = tuple[SHARD_FUNC_BODY_IDX]
43+
44+
cache.spaces[space_name] = {}
45+
cache.spaces[space_name].sharding_func = { raw = tuple }
46+
47+
if func_body ~= nil then
48+
local sharding_func, err = loadstring('return ' .. func_body)
49+
if sharding_func == nil then
50+
cache.spaces[space_name].sharding_func.processed =
51+
string.format("Body is incorrect in sharding_func for space (%s): %s",
52+
space_name, err)
53+
else
54+
cache.spaces[space_name].sharding_func.processed =
55+
sharding_func()
56+
end
57+
elseif func_name ~= nil then
58+
local chunks = string.split(func_name, '.')
59+
cache.spaces[space_name].sharding_func.processed = chunks
60+
end
61+
end
62+
63+
cache._schema_version = box.internal.schema_version()
64+
65+
for _, func in pairs(box.space._ddl_sharding_func:on_replace()) do
66+
if func == cache_build then
67+
trigger_found = true
68+
break
69+
end
70+
end
71+
72+
if not trigger_found then
73+
box.space._ddl_sharding_func:on_replace(cache_build)
74+
end
75+
76+
cache.lock:get()
77+
end
78+
79+
-- Get data from cache.
80+
-- Returns all cached data for "space_name".
81+
local function cache_get(space_name)
82+
if space_name == nil then
83+
return nil
84+
end
85+
86+
local schema_version = box.internal.schema_version()
87+
88+
if not cache then
89+
cache = {
90+
spaces = {},
91+
lock = fiber.channel(1)
92+
}
93+
cache_build()
94+
end
95+
96+
-- rebuild cache if database schema changed
97+
if schema_version ~= cache._schema_version then
98+
cache_build()
99+
end
100+
101+
return cache.spaces[space_name]
102+
end
103+
104+
return {
105+
internal = {
106+
get = cache_get,
107+
}
108+
}

ddl/get.lua

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
local utils = require('ddl.utils')
2+
local cache = require('ddl.cache')
23
local ddl_check = require('ddl.check')
34

45
local function _get_index_field_path(space, index_part)
@@ -66,11 +67,25 @@ local function get_metadata(space_name, metadata_name)
6667
end
6768

6869
local function get_sharding_func(space_name)
69-
local record = get_metadata(space_name, "sharding_func")
70-
if not record then
70+
local record = cache.internal.get(space_name)
71+
72+
if not record or not record.sharding_func then
73+
return nil
74+
end
75+
76+
return record.sharding_func.processed
77+
end
78+
79+
local function get_sharding_func_raw(space_name)
80+
local record = cache.internal.get(space_name)
81+
82+
if not record or not record.sharding_func or not
83+
record.sharding_func.raw then
7184
return nil
7285
end
7386

87+
record = record.sharding_func.raw
88+
7489
if record.sharding_func_body ~= nil then
7590
return {body = record.sharding_func_body}
7691
end
@@ -97,7 +112,7 @@ local function get_space_schema(space_name)
97112
space_ddl.engine = box_space.engine
98113
space_ddl.format = box_space:format()
99114
space_ddl.sharding_key = get_sharding_key(space_name)
100-
space_ddl.sharding_func = get_sharding_func(space_name)
115+
space_ddl.sharding_func = get_sharding_func_raw(space_name)
101116
for _, field in ipairs(space_ddl.format) do
102117
if field.is_nullable == nil then
103118
field.is_nullable = false
@@ -115,21 +130,21 @@ local function get_space_schema(space_name)
115130
end
116131

117132
local function prepare_sharding_func_for_call(space_name, sharding_func_def)
118-
if type(sharding_func_def) == 'string' then
133+
if type(sharding_func_def) == 'table' then
119134
local sharding_func = utils.get_G_function(sharding_func_def)
120135
if sharding_func ~= nil and
121136
ddl_check.internal.is_callable(sharding_func) == true then
122137
return sharding_func
123138
end
124139
end
125140

126-
if type(sharding_func_def) == 'table' then
127-
local sharding_func, err = loadstring('return ' .. sharding_func_def.body)
128-
if sharding_func == nil then
129-
return nil, string.format(
130-
"Body is incorrect in sharding_func for space (%s): %s", space_name, err)
131-
end
132-
return sharding_func()
141+
if type(sharding_func_def) == 'function' then
142+
return sharding_func_def
143+
end
144+
145+
-- error from cache
146+
if type(sharding_func_def) == 'string' then
147+
return nil, sharding_func_def
133148
end
134149

135150
return nil, string.format(

ddl/utils.lua

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,16 @@ end
190190
-- foo.bar.baz -> chunks: foo bar baz
191191
-- foo -> chunks: foo
192192
local function get_G_function(func_name)
193-
local chunks = string.split(func_name, '.')
194193
local sharding_func = _G
194+
local chunks
195+
196+
-- function name received from cache is already splitted
197+
if type(func_name) == 'string' then
198+
chunks = string.split(func_name, '.')
199+
else
200+
chunks = func_name
201+
end
202+
195203
-- check is the each chunk an identifier
196204
for _, chunk in pairs(chunks) do
197205
if not check_name_isident(chunk) or sharding_func == nil then

test/cache_test.lua

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
#!/usr/bin/env tarantool
2+
3+
local t = require('luatest')
4+
local db = require('test.db')
5+
local ddl = require('ddl')
6+
local cache = require('ddl.cache')
7+
8+
local SPACE_NAME_IDX = 1
9+
local SHARD_FUNC_NAME_IDX = 2
10+
local SHARD_FUNC_BODY_IDX = 3
11+
12+
local test_space = {
13+
engine = 'memtx',
14+
is_local = true,
15+
temporary = false,
16+
format = {
17+
{name = 'unsigned_nonnull', type = 'unsigned', is_nullable = false},
18+
{name = 'unsigned_nullable', type = 'unsigned', is_nullable = true},
19+
{name = 'integer_nonnull', type = 'integer', is_nullable = false},
20+
{name = 'integer_nullable', type = 'integer', is_nullable = true},
21+
{name = 'number_nonnull', type = 'number', is_nullable = false},
22+
{name = 'number_nullable', type = 'number', is_nullable = true},
23+
{name = 'boolean_nonnull', type = 'boolean', is_nullable = false},
24+
{name = 'boolean_nullable', type = 'boolean', is_nullable = true},
25+
{name = 'string_nonnull', type = 'string', is_nullable = false},
26+
{name = 'string_nullable', type = 'string', is_nullable = true},
27+
{name = 'scalar_nonnull', type = 'scalar', is_nullable = false},
28+
{name = 'scalar_nullable', type = 'scalar', is_nullable = true},
29+
{name = 'array_nonnull', type = 'array', is_nullable = false},
30+
{name = 'array_nullable', type = 'array', is_nullable = true},
31+
{name = 'map_nonnull', type = 'map', is_nullable = false},
32+
{name = 'map_nullable', type = 'map', is_nullable = true},
33+
{name = 'any_nonnull', type = 'any', is_nullable = false},
34+
{name = 'any_nullable', type = 'any', is_nullable = true},
35+
},
36+
}
37+
38+
local primary_index = {
39+
type = 'HASH',
40+
unique = true,
41+
parts = {
42+
{path = 'string_nonnull', is_nullable = false, type = 'string'},
43+
{path = 'unsigned_nonnull', is_nullable = false, type = 'unsigned'},
44+
},
45+
name = 'primary'
46+
}
47+
48+
local bucket_id_idx = {
49+
type = 'TREE',
50+
unique = false,
51+
parts = {{path = 'bucket_id', type = 'unsigned', is_nullable = false}},
52+
name = 'bucket_id'
53+
}
54+
55+
local func_body_first = 'function() return 42 end'
56+
local func_body_second = 'function() return 24 end'
57+
58+
local function rebuild_db(g)
59+
db.drop_all()
60+
61+
g.space = table.deepcopy(test_space)
62+
table.insert(g.space.format, 1, {
63+
name = 'bucket_id', type = 'unsigned', is_nullable = false
64+
})
65+
66+
g.space.indexes = {
67+
table.deepcopy(primary_index),
68+
table.deepcopy(bucket_id_idx)
69+
}
70+
g.space.sharding_key = {'unsigned_nonnull', 'integer_nonnull'}
71+
g.schema = {
72+
spaces = {
73+
space = g.space,
74+
}
75+
}
76+
end
77+
78+
local g = t.group()
79+
g.before_all(db.init)
80+
g.before_each(function()
81+
rebuild_db(g)
82+
end)
83+
84+
function g.test_cache_processed_func_body()
85+
g.schema.spaces.space.sharding_func = {
86+
body = func_body_first
87+
}
88+
local ok, err = ddl.set_schema(g.schema)
89+
t.assert_equals(err, nil)
90+
t.assert_equals(ok, true)
91+
92+
local res = cache.internal.get('space')
93+
t.assert(res)
94+
t.assert(res.sharding_func.processed)
95+
res = res.sharding_func.processed
96+
t.assert(type(res) == 'function')
97+
t.assert_equals(res(), 42)
98+
end
99+
100+
function g.test_cache_processed_func_name()
101+
local sharding_func_name = 'sharding_func'
102+
rawset(_G, sharding_func_name, function(key) return key end)
103+
g.schema.spaces.space.sharding_func = sharding_func_name
104+
105+
local ok, err = ddl.set_schema(g.schema)
106+
t.assert_equals(err, nil)
107+
t.assert_equals(ok, true)
108+
109+
local res = cache.internal.get('space')
110+
t.assert(res)
111+
t.assert(res.sharding_func.processed)
112+
res = res.sharding_func.processed
113+
t.assert(type(res) == 'table')
114+
t.assert_equals(res[1], 'sharding_func')
115+
116+
rawset(_G, sharding_func_name, nil)
117+
end
118+
119+
function g.test_cache_schema_changed()
120+
g.schema.spaces.space.sharding_func = {
121+
body = func_body_first
122+
}
123+
local ok, err = ddl.set_schema(g.schema)
124+
t.assert_equals(err, nil)
125+
t.assert_equals(ok, true)
126+
127+
local res = cache.internal.get('space')
128+
t.assert(res)
129+
t.assert(res.sharding_func.raw)
130+
res = res.sharding_func.raw
131+
t.assert_equals(res[SPACE_NAME_IDX], 'space')
132+
t.assert_equals(res[SHARD_FUNC_NAME_IDX], nil)
133+
t.assert_equals(res[SHARD_FUNC_BODY_IDX], func_body_first)
134+
135+
rebuild_db(g)
136+
137+
g.schema.spaces.space.sharding_func = {
138+
body = func_body_second
139+
}
140+
local ok, err = ddl.set_schema(g.schema)
141+
t.assert_equals(err, nil)
142+
t.assert_equals(ok, true)
143+
144+
local res = cache.internal.get('space')
145+
t.assert(res)
146+
t.assert(res.sharding_func.raw)
147+
res = res.sharding_func.raw
148+
t.assert_equals(res[SPACE_NAME_IDX], 'space')
149+
t.assert_equals(res[SHARD_FUNC_NAME_IDX], nil)
150+
t.assert_equals(res[SHARD_FUNC_BODY_IDX], func_body_second)
151+
end
152+
153+
function g.test_cache_space_updated()
154+
g.schema.spaces.space.sharding_func = {
155+
body = func_body_first
156+
}
157+
local ok, err = ddl.set_schema(g.schema)
158+
t.assert_equals(err, nil)
159+
t.assert_equals(ok, true)
160+
161+
local res = cache.internal.get('space')
162+
t.assert(res)
163+
t.assert(res.sharding_func.raw)
164+
res = res.sharding_func.raw
165+
t.assert_equals(res[SPACE_NAME_IDX], 'space')
166+
t.assert_equals(res[SHARD_FUNC_NAME_IDX], nil)
167+
t.assert_equals(res[SHARD_FUNC_BODY_IDX], func_body_first)
168+
169+
box.space._ddl_sharding_func
170+
:update({'space'}, {{'=', SHARD_FUNC_BODY_IDX, func_body_second}})
171+
172+
local res = cache.internal.get('space')
173+
t.assert(res)
174+
t.assert(res.sharding_func.raw)
175+
res = res.sharding_func.raw
176+
t.assert_equals(res[SPACE_NAME_IDX], 'space')
177+
t.assert_equals(res[SHARD_FUNC_NAME_IDX], nil)
178+
t.assert_equals(res[SHARD_FUNC_BODY_IDX], func_body_second)
179+
end

0 commit comments

Comments
 (0)