Skip to content

Commit 26c1da3

Browse files
committed
Add caching of sharding function
The ddl.bucket_id() function needs to know a sharding function. It is costly to obtain the function declaration / definition stored in the _ddl_sharding_func space. This cache adds sharding function cache divided into two parts: raw and processed. Raw part is used for get_schema() method. Raw cache stored as is. Processed part is used for bucket_id(). Processed sharding_func cache entry may be: * table with parsed dot notation (like {'foo', 'bar'}) * function ready to call, this offloads using of loadstring() * string with an error Cache will be rebuilded if: * _ddl_sharding_func space changed: cache sets _ddl_sharding_func:on_replace trigger * schema changed: cache checks box.internal.schema_version changes This patch does not server hot reload techniques. This entails an on_replace trigger duplication if hot reload occures. Hot reload support will be done in separate task. Closes #82
1 parent 17804f8 commit 26c1da3

File tree

4 files changed

+344
-12
lines changed

4 files changed

+344
-12
lines changed

ddl/cache.lua

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
local fiber = require('fiber')
2+
3+
local cache = nil
4+
5+
local CACHE_LOCK_TIMEOUT = 3
6+
local SPACE_NAME_IDX = 1
7+
local SHARD_FUNC_NAME_IDX = 2
8+
local SHARD_FUNC_BODY_IDX = 3
9+
10+
-- Function decorator that is used to prevent cache_build() from being
11+
-- called concurrently by different fibers.
12+
local function locked(f)
13+
return function(...)
14+
local ok = cache.lock:put(true, CACHE_LOCK_TIMEOUT)
15+
16+
if not ok then
17+
error("cache lock timeout is exceeded")
18+
end
19+
20+
local status, err = pcall(f, ...)
21+
cache.lock:get()
22+
23+
if not status or err ~= nil then
24+
return err
25+
end
26+
end
27+
end
28+
29+
-- Build cache and setup "on_replace" trigger.
30+
--
31+
-- Cache structure format:
32+
-- cache = {
33+
-- spaces = {
34+
-- 'space_name' = {
35+
-- raw = {}, -- raw sharding metadata, used for ddl.get()
36+
-- processed = {} -- table with parsed dot notation (like {'foo', 'bar'})
37+
-- -- or a function ready to call (or a string with an error)
38+
-- }
39+
-- },
40+
-- lock -- locking based on fiber.channel()
41+
-- }
42+
local cache_build = locked(function()
43+
-- clear cache
44+
cache.spaces = {}
45+
46+
if box.space._ddl_sharding_func == nil then
47+
return
48+
end
49+
50+
for _, tuple in box.space._ddl_sharding_func:pairs() do
51+
local space_name = tuple[SPACE_NAME_IDX]
52+
local func_name = tuple[SHARD_FUNC_NAME_IDX]
53+
local func_body = tuple[SHARD_FUNC_BODY_IDX]
54+
55+
cache.spaces[space_name] = {}
56+
cache.spaces[space_name].sharding_func = { raw = tuple }
57+
58+
if func_body ~= nil then
59+
local sharding_func, err = loadstring('return ' .. func_body)
60+
if sharding_func == nil then
61+
cache.spaces[space_name].sharding_func.processed =
62+
string.format("Body is incorrect in sharding_func for space (%s): %s",
63+
space_name, err)
64+
else
65+
cache.spaces[space_name].sharding_func.processed =
66+
sharding_func()
67+
end
68+
elseif func_name ~= nil then
69+
local chunks = string.split(func_name, '.')
70+
cache.spaces[space_name].sharding_func.processed = chunks
71+
end
72+
end
73+
74+
cache._schema_version = box.internal.schema_version()
75+
end)
76+
77+
-- Rebuild cache if _ddl_sharding_func space changed.
78+
local function cache_set_trigger()
79+
if box.space._ddl_sharding_func == nil then
80+
return
81+
end
82+
83+
local trigger_found = false
84+
85+
for _, func in pairs(box.space._ddl_sharding_func:on_replace()) do
86+
if func == cache_build then
87+
trigger_found = true
88+
break
89+
end
90+
end
91+
92+
if not trigger_found then
93+
box.space._ddl_sharding_func:on_replace(cache_build)
94+
end
95+
end
96+
97+
-- Get data from cache.
98+
-- Returns all cached data for "space_name".
99+
local function cache_get(space_name)
100+
if space_name == nil then
101+
return nil
102+
end
103+
104+
local schema_version = box.internal.schema_version()
105+
106+
if not cache then
107+
cache = {
108+
spaces = {},
109+
lock = fiber.channel(1)
110+
}
111+
cache_build()
112+
cache_set_trigger()
113+
end
114+
115+
-- rebuild cache if database schema changed
116+
if schema_version ~= cache._schema_version then
117+
cache_build()
118+
cache_set_trigger()
119+
end
120+
121+
return cache.spaces[space_name]
122+
end
123+
124+
return {
125+
internal = {
126+
get = cache_get,
127+
}
128+
}

ddl/get.lua

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
local utils = require('ddl.utils')
2+
local cache = require('ddl.cache')
23
local ddl_check = require('ddl.check')
34

45
local function _get_index_field_path(space, index_part)
@@ -66,11 +67,25 @@ local function get_metadata(space_name, metadata_name)
6667
end
6768

6869
local function get_sharding_func(space_name)
69-
local record = get_metadata(space_name, "sharding_func")
70-
if not record then
70+
local record = cache.internal.get(space_name)
71+
72+
if not record or not record.sharding_func then
73+
return nil
74+
end
75+
76+
return record.sharding_func.processed
77+
end
78+
79+
local function get_sharding_func_raw(space_name)
80+
local record = cache.internal.get(space_name)
81+
82+
if not record or not record.sharding_func or not
83+
record.sharding_func.raw then
7184
return nil
7285
end
7386

87+
record = record.sharding_func.raw
88+
7489
if record.sharding_func_body ~= nil then
7590
return {body = record.sharding_func_body}
7691
end
@@ -97,7 +112,7 @@ local function get_space_schema(space_name)
97112
space_ddl.engine = box_space.engine
98113
space_ddl.format = box_space:format()
99114
space_ddl.sharding_key = get_sharding_key(space_name)
100-
space_ddl.sharding_func = get_sharding_func(space_name)
115+
space_ddl.sharding_func = get_sharding_func_raw(space_name)
101116
for _, field in ipairs(space_ddl.format) do
102117
if field.is_nullable == nil then
103118
field.is_nullable = false
@@ -115,21 +130,21 @@ local function get_space_schema(space_name)
115130
end
116131

117132
local function prepare_sharding_func_for_call(space_name, sharding_func_def)
118-
if type(sharding_func_def) == 'string' then
133+
if type(sharding_func_def) == 'table' then
119134
local sharding_func = utils.get_G_function(sharding_func_def)
120135
if sharding_func ~= nil and
121136
ddl_check.internal.is_callable(sharding_func) == true then
122137
return sharding_func
123138
end
124139
end
125140

126-
if type(sharding_func_def) == 'table' then
127-
local sharding_func, err = loadstring('return ' .. sharding_func_def.body)
128-
if sharding_func == nil then
129-
return nil, string.format(
130-
"Body is incorrect in sharding_func for space (%s): %s", space_name, err)
131-
end
132-
return sharding_func()
141+
if type(sharding_func_def) == 'function' then
142+
return sharding_func_def
143+
end
144+
145+
-- error from cache
146+
if type(sharding_func_def) == 'string' then
147+
return nil, sharding_func_def
133148
end
134149

135150
return nil, string.format(

ddl/utils.lua

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,19 @@ end
189189
-- split sharding func name in dot notation by dot
190190
-- foo.bar.baz -> chunks: foo bar baz
191191
-- foo -> chunks: foo
192+
--
193+
-- func_name parameter may be a string in dot notation or table
194+
-- if func_name type is of type table it is assumed that it is already split
192195
local function get_G_function(func_name)
193-
local chunks = string.split(func_name, '.')
194196
local sharding_func = _G
197+
local chunks
198+
199+
if type(func_name) == 'string' then
200+
chunks = string.split(func_name, '.')
201+
else
202+
chunks = func_name
203+
end
204+
195205
-- check is the each chunk an identifier
196206
for _, chunk in pairs(chunks) do
197207
if not check_name_isident(chunk) or sharding_func == nil then

0 commit comments

Comments
 (0)