Skip to content

Commit

Permalink
connpool: support filtering by sharding.roles
Browse files Browse the repository at this point in the history
Support filtering replicasets by the configured sharding role by adding
`sharding_roles` into the `connpool.filter()` and `connpool.call()`
options.

Closes tarantool#10318

@TarantoolBot document
connpool: `sharding.roles` filtering support
Now `connpool.filter()` and `connpool.call()` supports filtering by
sharding roles using the `sharding_roles` option. Its value is the table
containing the required sharding roles. Possible roles are: `router`,
`storage`. Filtering out the `rebalancer` instances are not supported.

Example:

```yaml
groups:
  group-001:
    replicasets:
      replicaset-001:
        sharding:
          roles: [router]
        instances:
          instance-001: {}
          instance-002: {}
  group-002:
    replicasets:
      replicaset-002:
        sharding:
          roles: [storage]
        instances:
          instance-003: {}
          instance-004: {}
```
```lua
-- Returns { 'instance-003', 'instance-004' }
connpool.filter({ sharding_roles = { 'storage' }})

-- Returns { 'instance-001', 'instance-002' }
connpool.filter({ sharding_roles = { 'router' }})

-- Would call the function 'f1' on 'instance-001' or 'instance-002'
connpool.call('f1', nil, { sharding_roles = { 'router' }})
```
  • Loading branch information
georgiy-belyanin authored and Totktonada committed Aug 15, 2024
1 parent f1c76b9 commit 32244f4
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
## feature/connpool

- The `connpool.filter()` and `connpool.call()` functions now support
filtering by the `sharding.roles` option (gh-10318).
21 changes: 21 additions & 0 deletions src/box/lua/connpool.lua
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ local function is_candidate_match_static(names, opts)
is_replicaset_match(opts.replicasets, names.replicaset_name) and
is_instance_match(opts.instances, names.instance_name) and
is_roles_match(opts.roles, config:get('roles', get_opts)) and
is_roles_match(opts.sharding_roles,
config:get('sharding.roles', get_opts)) and
is_labels_match(opts.labels, config:get('labels', get_opts))
end

Expand All @@ -196,19 +198,35 @@ local function filter(opts)
instances = '?table',
labels = '?table',
roles = '?table',
sharding_roles = '?table',
mode = '?string',
})
opts = opts or {}
if opts.mode ~= nil and opts.mode ~= 'ro' and opts.mode ~= 'rw' then
local msg = 'Expected nil, "ro" or "rw", got "%s"'
error(msg:format(opts.mode), 0)
end
if opts.sharding_roles ~= nil then
for _, sharding_role in ipairs(opts.sharding_roles) do
if sharding_role == 'rebalancer' then
error('Filtering by the \"rebalancer\" role is not supported',
0)
elseif sharding_role ~= 'storage' and
sharding_role ~= 'router' then
local msg = 'Unknown sharding role \"%s\" in '..
'connpool.filter() call. Expected one of the '..
'\"storage\", \"router\"'
error(msg:format(sharding_role), 0)
end
end
end
local static_opts = {
groups = opts.groups,
replicasets = opts.replicasets,
instances = opts.instances,
labels = opts.labels,
roles = opts.roles,
sharding_roles = opts.sharding_roles
}
local dynamic_opts = {
mode = opts.mode,
Expand Down Expand Up @@ -248,6 +266,7 @@ local function get_connection(opts)
instances = opts.instances,
labels = opts.labels,
roles = opts.roles,
sharding_roles = opts.sharding_roles,
mode = mode,
}
local candidates = filter(candidates_opts)
Expand Down Expand Up @@ -321,6 +340,7 @@ local function call(func_name, args, opts)
instances = '?table',
labels = '?table',
roles = '?table',
sharding_roles = '?table',
prefer_local = '?boolean',
mode = '?string',
-- The following options passed directly to net.box.call().
Expand All @@ -344,6 +364,7 @@ local function call(func_name, args, opts)
instances = opts.instances,
labels = opts.labels,
roles = opts.roles,
sharding_roles = opts.sharding_roles,
prefer_local = opts.prefer_local,
mode = opts.mode,
}
Expand Down
198 changes: 198 additions & 0 deletions test/config-luatest/rpc_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ local helpers = require('test.config-luatest.helpers')

local g = helpers.group()

local has_vshard = pcall(require, 'vshard-ee')
if not has_vshard then
has_vshard = pcall(require, 'vshard')
end

local function skip_if_no_vshard()
t.skip_if(not has_vshard, 'Module "vshard-ee/vshard" is not available')
end

g.test_connect = function(g)
local dir = treegen.prepare_directory(g, {}, {})
local config = [[
Expand Down Expand Up @@ -229,6 +238,94 @@ g.test_filter = function(g)
g.server_4:exec(check)
end

g.test_filter_vshard = function(g)
skip_if_no_vshard()
local dir = treegen.prepare_directory(g, {}, {})
local config = [[
credentials:
users:
guest:
roles: [super]
storage:
roles: [sharding, super]
password: "storage"
iproto:
listen:
- uri: 'unix/:./{{ instance_name }}.iproto'
advertise:
sharding:
login: 'storage'
groups:
group-001:
replicasets:
replicaset-001:
sharding:
roles: [router]
instances:
instance-001:
database:
mode: rw
instance-002: {}
replicaset-002:
sharding:
roles: [storage]
instances:
instance-003:
database:
mode: rw
instance-004: {}
]]
treegen.write_script(dir, 'config.yaml', config)

local opts = {
env = {LUA_PATH = os.environ()['LUA_PATH']},
config_file = 'config.yaml',
chdir = dir,
}
g.server_1 = server:new(fun.chain(opts, {alias = 'instance-001'}):tomap())
g.server_2 = server:new(fun.chain(opts, {alias = 'instance-002'}):tomap())
g.server_3 = server:new(fun.chain(opts, {alias = 'instance-003'}):tomap())
g.server_4 = server:new(fun.chain(opts, {alias = 'instance-004'}):tomap())

g.server_1:start({wait_until_ready = false})
g.server_2:start({wait_until_ready = false})
g.server_3:start({wait_until_ready = false})
g.server_4:start({wait_until_ready = false})

g.server_1:wait_until_ready()
g.server_2:wait_until_ready()
g.server_3:wait_until_ready()
g.server_4:wait_until_ready()

local function check_conn()
local connpool = require('experimental.connpool')

local exp = {"instance-003", "instance-004"}
local opts = {sharding_roles = {'storage'}}
t.assert_items_equals(connpool.filter(opts), exp)

exp = {"instance-001", "instance-002"}
opts = {sharding_roles = {'router'}}
t.assert_items_equals(connpool.filter(opts), exp)

local exp_err = 'Unknown sharding role \"r1\" in connpool.filter() '..
'call. Expected one of the \"storage\", \"router\"'
opts = {sharding_roles = { 'r1' }}
t.assert_error_msg_equals(exp_err, connpool.filter, opts)

local exp_err = 'Filtering by the \"rebalancer\" role is not supported'
opts = {sharding_roles = { 'rebalancer' }}
t.assert_error_msg_equals(exp_err, connpool.filter, opts)
end

g.server_1:exec(check_conn)
g.server_2:exec(check_conn)
g.server_3:exec(check_conn)
g.server_4:exec(check_conn)
end

g.test_filter_mode = function(g)
local dir = treegen.prepare_directory(g, {}, {})
local config = [[
Expand Down Expand Up @@ -452,6 +549,107 @@ g.test_call = function(g)
g.server_4:exec(check)
end

g.test_call_vshard = function(g)
skip_if_no_vshard()

local dir = treegen.prepare_directory(g, {}, {})
local config = [[
credentials:
users:
guest:
roles: [super]
storage:
roles: [sharding, super]
password: "storage"
iproto:
listen:
- uri: 'unix/:./{{ instance_name }}.iproto'
advertise:
sharding:
login: 'storage'
groups:
group-001:
replicasets:
replicaset-001:
sharding:
roles: [router]
instances:
instance-001:
roles: [one]
database:
mode: rw
instance-002:
roles: [one]
group-002:
replicasets:
replicaset-002:
sharding:
roles: [storage]
instances:
instance-003:
roles: [one]
database:
mode: rw
instance-004:
roles: [one]
]]
treegen.write_script(dir, 'config.yaml', config)

local role = string.dump(function()
local function f1()
return box.info.name
end

rawset(_G, 'f1', f1)

return {
stop = function() end,
apply = function() end,
validate = function() end,
}
end)
treegen.write_script(dir, 'one.lua', role)

local opts = {
env = {LUA_PATH = os.environ()['LUA_PATH']},
config_file = 'config.yaml',
chdir = dir,
}
g.server_1 = server:new(fun.chain(opts, {alias = 'instance-001'}):tomap())
g.server_2 = server:new(fun.chain(opts, {alias = 'instance-002'}):tomap())
g.server_3 = server:new(fun.chain(opts, {alias = 'instance-003'}):tomap())
g.server_4 = server:new(fun.chain(opts, {alias = 'instance-004'}):tomap())

g.server_1:start({wait_until_ready = false})
g.server_2:start({wait_until_ready = false})
g.server_3:start({wait_until_ready = false})
g.server_4:start({wait_until_ready = false})

g.server_1:wait_until_ready()
g.server_2:wait_until_ready()
g.server_3:wait_until_ready()
g.server_4:wait_until_ready()

local function check()
local connpool = require('experimental.connpool')

local exp = {"instance-003", "instance-004"}
local opts = {roles = {'one'}, sharding_roles = {'storage'}}
t.assert_items_include(exp, {connpool.call('f1', nil, opts)})

exp = {"instance-001", "instance-002"}
opts = {roles = {'one'}, sharding_roles = {'router'}}
t.assert_items_include(exp, {connpool.call('f1', nil, opts)})
end

g.server_1:exec(check)
g.server_2:exec(check)
g.server_3:exec(check)
g.server_4:exec(check)
end

g.test_call_mode = function(g)
local dir = treegen.prepare_directory(g, {}, {})
local config = [[
Expand Down

0 comments on commit 32244f4

Please sign in to comment.