diff --git a/.requirements b/.requirements index a68a68eca89f..7caf7b52d300 100644 --- a/.requirements +++ b/.requirements @@ -22,7 +22,7 @@ ATC_ROUTER=ffd11db657115769bf94f0c4f915f98300bc26b6 # 1.6.2 SNAPPY=23b3286820105438c5dbb9bc22f1bb85c5812c8a # 1.2.0 KONG_MANAGER=nightly -NGX_WASM_MODULE=96b4e27e10c63b07ed40ea88a91c22f23981db35 +NGX_WASM_MODULE=ad1e5c7a83b6c356a25df95f6ed6bf265c1943ba WASMER=3.1.1 WASMTIME=23.0.2 V8=12.0.267.17 diff --git a/changelog/unreleased/kong/prometheus-wasmx-metrics.yml b/changelog/unreleased/kong/prometheus-wasmx-metrics.yml new file mode 100644 index 000000000000..422fdb16d535 --- /dev/null +++ b/changelog/unreleased/kong/prometheus-wasmx-metrics.yml @@ -0,0 +1,3 @@ +message: Expose WasmX metrics as part of the Prometheus plugin +type: feature +scope: Plugin diff --git a/kong-3.9.0-0.rockspec b/kong-3.9.0-0.rockspec index cc4a204fee05..24552d39e24f 100644 --- a/kong-3.9.0-0.rockspec +++ b/kong-3.9.0-0.rockspec @@ -532,6 +532,7 @@ build = { ["kong.plugins.prometheus.prometheus"] = "kong/plugins/prometheus/prometheus.lua", ["kong.plugins.prometheus.serve"] = "kong/plugins/prometheus/serve.lua", ["kong.plugins.prometheus.schema"] = "kong/plugins/prometheus/schema.lua", + ["kong.plugins.prometheus.wasmx"] = "kong/plugins/prometheus/wasmx.lua", ["kong.plugins.session.handler"] = "kong/plugins/session/handler.lua", ["kong.plugins.session.schema"] = "kong/plugins/session/schema.lua", diff --git a/kong/plugins/prometheus/exporter.lua b/kong/plugins/prometheus/exporter.lua index bdc5eeafcbce..33486c949e54 100644 --- a/kong/plugins/prometheus/exporter.lua +++ b/kong/plugins/prometheus/exporter.lua @@ -1,11 +1,14 @@ +local balancer = require "kong.runloop.balancer" +local yield = require("kong.tools.yield").yield +local wasm = require "kong.plugins.prometheus.wasmx" + + local kong = kong local ngx = ngx local get_phase = ngx.get_phase local lower = string.lower local ngx_timer_pending_count = ngx.timer.pending_count local ngx_timer_running_count = ngx.timer.running_count -local balancer = require("kong.runloop.balancer") -local yield = require("kong.tools.yield").yield local get_all_upstreams = balancer.get_all_upstreams if not balancer.get_all_upstreams then -- API changed since after Kong 2.5 get_all_upstreams = require("kong.runloop.balancer.upstreams").get_all_upstreams @@ -517,6 +520,7 @@ local function metric_data(write_fn) -- notify the function if prometheus plugin is enabled, -- so that it can avoid exporting unnecessary metrics if not prometheus:metric_data(write_fn, not IS_PROMETHEUS_ENABLED) + wasm.metric_data() end local function collect() diff --git a/kong/plugins/prometheus/wasmx.lua b/kong/plugins/prometheus/wasmx.lua new file mode 100644 index 000000000000..450c5c9d02aa --- /dev/null +++ b/kong/plugins/prometheus/wasmx.lua @@ -0,0 +1,204 @@ +local buffer = require "string.buffer" +local wasm = require "kong.runloop.wasm" +local wasmx_shm + + +local fmt = string.format +local str_find = string.find +local str_match = string.match +local str_sub = string.sub +local table_insert = table.insert +local table_sort = table.sort +local buf_new = buffer.new +local ngx_say = ngx.say + + +local _M = {} + + +local FLUSH_EVERY = 100 + + +local function sorted_iter(ctx) + local v = ctx.t[ctx.sorted_keys[ctx.i]] + ctx.i = ctx.i + 1 + + return v +end + + +local function sorted_pairs(t) + local sorted_keys = {} + + for k, _ in pairs(t) do + table_insert(sorted_keys, k) + end + + table_sort(sorted_keys) + + return sorted_iter, { t = t, sorted_keys = sorted_keys, i = 1 } +end + + +local function parse_pw_key(key) + local name = key + local labels = {} + local header_size = 3 -- pw. + local first_label = #key + + local second_dot_pos, _ = str_find(key, "%.", header_size + 1) + local filter_name = str_sub(key, header_size + 1, second_dot_pos - 1) + + local filter_config = wasm.filters_by_name[filter_name].config or {} + local patterns = filter_config.pw_metrics + and filter_config.pw_metrics.label_patterns or {} + + for _, pair in ipairs(patterns) do + local label_kv, label_v = str_match(key, pair.pattern) + if label_kv then + local label_k = str_sub(label_kv, 0, str_find(label_kv, "=")) + local label_k_start, _ = str_find(key, label_k) + + first_label = (label_k_start < first_label) and label_k_start or first_label + + table_insert(labels, { pair.label, label_v }) + end + end + + if first_label ~= #key then + name = str_sub(key, 0, first_label - 1) + end + + return name, labels +end + + +local function parse_key(key) + -- TODO: parse wa. (WasmX metrics) and lua. (metrics defined in Lua land) + local header = { pw = "pw." } + + local name = key + local labels = {} + + local is_pw = #key > #header.pw and key:sub(0, #header.pw) == header.pw + + if is_pw then + name, labels = parse_pw_key(key) + end + + name = name:gsub("%.", "_") + + return name, labels +end + + +local function serialize_labels(labels) + local buf = buf_new() + + for _, pair in ipairs(labels) do + buf:put(fmt(',%s="%s"', pair[1], pair[2])) + end + + buf:get(1) -- discard trailing comma + + return "{" .. buf:get() .. "}" +end + + +local function serialize_metric(m, buf) + buf:put(fmt("# HELP %s\n# TYPE %s %s", m.name, m.name, m.type)) + + if m.type == "histogram" then + local h_count, sum = 0, 0 + + for _, pair in ipairs(m.labels) do + local labels, labeled_m = pair[1], pair[2] + local slabels = (#labels > 0) and serialize_labels(labels) or "" + local l_count = 0 + + slabels = (#labels > 0) and (slabels:sub(1, #slabels - 1) .. ",") or "{" + + for _, bin in ipairs(labeled_m.value) do + local ub = (bin.ub ~= 4294967295) and bin.ub or "+Inf" + local ubl = fmt('le="%s"', ub) + local hlabels = slabels .. ubl .. "}" + + h_count = h_count + bin.count + l_count = l_count + bin.count + + buf:put(fmt("\n%s%s %s", m.name, hlabels, l_count)) + end + + sum = sum + labeled_m.sum + end + + buf:put(fmt("\n%s_sum %s", m.name, sum)) + buf:put(fmt("\n%s_count %s", m.name, h_count)) + + else + for _, pair in ipairs(m.labels) do + local labels, labeled_m = pair[1], pair[2] + local slabels = (#labels > 0) and serialize_labels(labels) or "" + + buf:put(fmt("\n%s%s %s", m.name, slabels, labeled_m.value)) + end + end + + buf:put("\n") +end + + +_M.metric_data = function() + local i = 0 + local metrics = {} + local parsed = {} + local buf = buf_new() + + -- delayed require of the WasmX module, to ensure it is loaded + -- after ngx_wasm_module.so is loaded. + if not wasmx_shm then + local ok, _wasmx_shm = pcall(require, "resty.wasmx.shm") + if ok then + wasmx_shm = _wasmx_shm + end + end + + if not wasmx_shm then + return + end + + wasmx_shm.metrics:lock() + + for key in wasmx_shm.metrics:iterate_keys() do + table_insert(metrics, { key, wasmx_shm.metrics:get_by_name(key, { prefix = false })}) + end + + wasmx_shm.metrics:unlock() + + -- in WasmX the different labels of a metric are stored as separate metrics + -- aggregate those separate metrics into a single one + for _, pair in ipairs(metrics) do + local key = pair[1] + local m = pair[2] + local name, labels = parse_key(key) + + parsed[name] = parsed[name] or { name = name, type = m.type, labels = {} } + + table_insert(parsed[name].labels, { labels, m }) + end + + for metric_by_label in sorted_pairs(parsed) do + buf:put(serialize_metric(metric_by_label, buf)) + + i = i + 1 + + if i % FLUSH_EVERY == 0 then + ngx_say(buf:get()) + end + end + + ngx_say(buf:get()) +end + + +return _M diff --git a/kong/runloop/wasm.lua b/kong/runloop/wasm.lua index 5833660c6297..74becc6d91a6 100644 --- a/kong/runloop/wasm.lua +++ b/kong/runloop/wasm.lua @@ -402,6 +402,8 @@ local function rebuild_state(db, version, old_state) for _, filter in ipairs(chain.filters) do if filter.enabled then + _M.filters_by_name[filter.name].config = cjson_decode(filter.config) or filter.config + -- Serialize all JSON configurations up front -- -- NOTE: there is a subtle difference between a raw, non-JSON filter @@ -415,6 +417,7 @@ local function rebuild_state(db, version, old_state) if filter.config ~= nil and type(filter.config) ~= "string" then filter.config = cjson_encode(filter.config) end + end end @@ -778,6 +781,13 @@ local function register_property_handlers() return ok, value, const end) + properties.add_getter("kong.route_name", function(_, _, ctx) + local value = ctx.route and ctx.route.name + local ok = value ~= nil + local const = ok + return ok, value, const + end) + properties.add_getter("kong.service.response.status", function(kong) return true, kong.service.response.get_status(), false end) @@ -789,6 +799,13 @@ local function register_property_handlers() return ok, value, const end) + properties.add_getter("kong.service_name", function(_, _, ctx) + local value = ctx.service and ctx.service.name + local ok = value ~= nil + local const = ok + return ok, value, const + end) + properties.add_getter("kong.version", function(kong) return true, kong.version, true end) diff --git a/spec/03-plugins/26-prometheus/09-wasmx_spec.lua b/spec/03-plugins/26-prometheus/09-wasmx_spec.lua new file mode 100644 index 000000000000..1525149fc798 --- /dev/null +++ b/spec/03-plugins/26-prometheus/09-wasmx_spec.lua @@ -0,0 +1,258 @@ +local helpers = require "spec.helpers" +local cjson = require "cjson" + + +local json_encode = cjson.encode + + +local status_api_port = helpers.get_available_port() +local fixtures = { + dns_mock = helpers.dns_mock.new({ + mocks_only = true + }), + http_mock = {}, + stream_mock = {} +} + +fixtures.dns_mock:A({ + name = "mock.io", + address = "127.0.0.1" +}) + +fixtures.dns_mock:A({ + name = "status.io", + address = "127.0.0.1" +}) + +local rt_config = json_encode({ + append = { + headers = { + "X-Added-Header: true", + }, + }, + pw_metrics = { + label_patterns = { + { label = "service", pattern = "(_s_id=([0-9a-z%-]+))" }, + { label = "route", pattern = "(_r_id=([0-9a-z%-]+))" }, + } + } +}) + + +for _, strategy in helpers.each_strategy() do + describe("Plugin: prometheus (metrics) [#" .. strategy .. "]", function() + local admin_client + local proxy_client + + setup(function() + require("kong.runloop.wasm").enable({ + { name = "tests", + path = helpers.test_conf.wasm_filters_path .. "/tests.wasm", + }, + { name = "response_transformer", + path = helpers.test_conf.wasm_filters_path .. "/response_transformer.wasm", + }, + }) + + local bp = helpers.get_db_utils(strategy, { + "services", + "routes", + "plugins", + "filter_chains", + }) + + local function service_and_route(name, path) + local service = assert(bp.services:insert({ + name = name, + url = helpers.mock_upstream_url, + })) + + local route = assert(bp.routes:insert({ + name = name .. "-route", + service = { id = service.id }, + paths = { path }, + hosts = { name }, + protocols = { "https" }, + })) + + return service, route + end + + local service, _ = service_and_route("mock", "/") + local service2, _ = service_and_route("mock2", "/v2") + service_and_route("status.io", "/metrics") + + local filters = { + { name = "tests", enabled = true, config = "metrics=c1,g1,h1" }, + { name = "response_transformer", enabled = true, config = rt_config }, + } + + assert(bp.filter_chains:insert({ + service = { id = service.id }, + filters = filters, + })) + + assert(bp.filter_chains:insert({ + service = { id = service2.id }, + filters = filters, + })) + + bp.plugins:insert({ + name = "prometheus", + config = { + status_code_metrics = true, + latency_metrics = true, + bandwidth_metrics = true, + upstream_health_metrics = true, + }, + }) + + assert(helpers.start_kong({ + nginx_conf = "spec/fixtures/custom_nginx.template", + wasm = true, + plugins = "bundled,prometheus", + status_listen = '127.0.0.1:' .. status_api_port .. ' ssl', -- status api does not support h2 + status_access_log = "logs/status_access.log", + status_error_log = "logs/status_error.log" + }, nil, nil, fixtures)) + + proxy_client = helpers.proxy_ssl_client() + + local res = proxy_client:get("/", { + headers = { host = "mock" }, + }) + assert.res_status(200, res) + + res = proxy_client:get("/v2", { + headers = { host = "mock2" }, + }) + assert.res_status(200, res) + + proxy_client:close() + end) + + teardown(function() + if admin_client then + admin_client:close() + end + if proxy_client then + proxy_client:close() + end + + helpers.stop_kong() + end) + + before_each(function() + admin_client = helpers.admin_client() + proxy_client = helpers.proxy_ssl_client() + end) + + after_each(function() + if admin_client then + admin_client:close() + end + if proxy_client then + proxy_client:close() + end + end) + + it("exposes WasmX counters", function() + local res = assert(admin_client:send{ + method = "GET", + path = "/metrics" + }) + + local body = assert.res_status(200, res) + + local expected_c = '# HELP pw_tests_c1\n' + .. '# TYPE pw_tests_c1 counter\n' + .. 'pw_tests_c1 0' + + assert.matches(expected_c, body, nil, true) + end) + + it("exposes WasmX labeled counters", function() + local res = assert(admin_client:send{ + method = "GET", + path = "/metrics" + }) + + local body = assert.res_status(200, res) + + local expected_c = '# HELP pw_response_transformer_append\n' + .. '# TYPE pw_response_transformer_append counter\n' + .. 'pw_response_transformer_append{service="mock",route="mock-route"} 1\n' + .. 'pw_response_transformer_append{service="mock2",route="mock2-route"} 1' + + assert.matches(expected_c, body, nil, true) + end) + + it("exposes WasmX gauges", function() + local res = assert(admin_client:send{ + method = "GET", + path = "/metrics" + }) + + local body = assert.res_status(200, res) + + local expected_g = '# HELP pw_tests_g1\n' + .. '# TYPE pw_tests_g1 gauge\n' + .. 'pw_tests_g1 0' + + assert.matches(expected_g, body, nil, true) + end) + + it("exposes WasmX labeled gauges", function() + local res = assert(admin_client:send{ + method = "GET", + path = "/metrics" + }) + + local body = assert.res_status(200, res) + + local expected_g = '# HELP pw_response_transformer_last_run\n' + .. '# TYPE pw_response_transformer_last_run gauge\n' + .. 'pw_response_transformer_last_run\\{service\\="mock2",route\\="mock2-route"\\} \\d{10}\n' + .. 'pw_response_transformer_last_run\\{service\\="mock",route\\="mock-route"\\} \\d{10}' + + assert.match_re(body, expected_g) + end) + + it("exposes WasmX histograms", function() + local res = assert(admin_client:send{ + method = "GET", + path = "/metrics" + }) + + local body = assert.res_status(200, res) + + local expected_h = '# HELP pw_tests_h1\n' + .. '# TYPE pw_tests_h1 histogram\n' + .. 'pw_tests_h1{le="+Inf"} 0\n' + .. 'pw_tests_h1_sum 0\n' + .. 'pw_tests_h1_count 0' + + assert.matches(expected_h, body, nil, true) + end) + + it("exposes WasmX labeled histograms", function() + local res = assert(admin_client:send{ + method = "GET", + path = "/metrics" + }) + + local body = assert.res_status(200, res) + + local expected_h = '# HELP pw_response_transformer_a_histogram\n' + .. '# TYPE pw_response_transformer_a_histogram histogram\n' + .. 'pw_response_transformer_a_histogram{service="mock",route="mock-route",le="2"} 1\n' + .. 'pw_response_transformer_a_histogram{service="mock",route="mock-route",le="+Inf"} 1\n' + .. 'pw_response_transformer_a_histogram{service="mock2",route="mock2-route",le="2"} 1\n' + .. 'pw_response_transformer_a_histogram{service="mock2",route="mock2-route",le="+Inf"} 1\n' + .. 'pw_response_transformer_a_histogram_sum 4\n' + .. 'pw_response_transformer_a_histogram_count 2' + + assert.matches(expected_h, body, nil, true) + end) + end) +end diff --git a/spec/fixtures/proxy_wasm_filters/response_transformer/src/filter.rs b/spec/fixtures/proxy_wasm_filters/response_transformer/src/filter.rs index fb23189b3ee2..b11a7af7f1ef 100644 --- a/spec/fixtures/proxy_wasm_filters/response_transformer/src/filter.rs +++ b/spec/fixtures/proxy_wasm_filters/response_transformer/src/filter.rs @@ -1,10 +1,17 @@ mod types; -use proxy_wasm::traits::{Context, RootContext, HttpContext}; -use proxy_wasm::types::{Action, LogLevel, ContextType}; +use std::collections::HashMap; +use std::cell::RefCell; +use std::time::SystemTime; + use crate::types::*; -use serde_json; + +use proxy_wasm::hostcalls::{define_metric, increment_metric, record_metric, get_current_time}; +use proxy_wasm::traits::{Context, RootContext, HttpContext}; +use proxy_wasm::types::{Action, LogLevel, ContextType, MetricType}; + use log::*; +use serde_json; proxy_wasm::main! {{ proxy_wasm::set_log_level(LogLevel::Info); @@ -13,12 +20,84 @@ proxy_wasm::main! {{ }); }} +thread_local! { + static METRICS: Metrics = Metrics::new(); +} + +struct Metrics { + metrics: RefCell>, +} + +impl Metrics { + fn new() -> Metrics { + Metrics { + metrics: RefCell::new(HashMap::new()), + } + } + + fn get_metric(&self, metric_type: MetricType, name: &str, s_id: &str, r_id: &str) -> u32 { + let key = format!("{}_s_id={}_r_id={}", name, s_id, r_id); + let mut map = self.metrics.borrow_mut(); + + match map.get(&key) { + Some(m_id) => *m_id, + None => { + match define_metric(metric_type, &key) { + Ok(m_id) => { + map.insert(key, m_id); + + m_id + }, + Err(_) => 0 + } + } + } + } + + fn get_counter(&self, name: &str, s_id: &str, r_id: &str) -> u32 { + self.get_metric(MetricType::Counter, name, s_id, r_id) + } + + fn get_gauge(&self, name: &str, s_id: &str, r_id: &str) -> u32 { + self.get_metric(MetricType::Gauge, name, s_id, r_id) + } + + fn get_histogram(&self, name: &str, s_id: &str, r_id: &str) -> u32 { + self.get_metric(MetricType::Histogram, name, s_id, r_id) + } +} + struct ResponseTransformerContext { config: Config, } impl ResponseTransformerContext { + fn get_prop(&self, ns: &str, prop: &str) -> String { + if let Some(addr) = self.get_property(vec![ns, prop]) { + match std::str::from_utf8(&addr) { + Ok(value) => value.to_string(), + Err(_) => "".to_string(), + } + } else { + "".to_string() + } + } + + fn increment_counter(&self, name: &str, s_id: &str, r_id: &str) { + let m_id = METRICS.with(|metrics| metrics.get_counter(name, s_id, r_id)); + increment_metric(m_id, 1).unwrap(); + } + + fn record_gauge(&self, name: &str, s_id: &str, r_id: &str, value: u64) { + let m_id = METRICS.with(|metrics| metrics.get_gauge(name, s_id, r_id)); + record_metric(m_id, value).unwrap(); + } + + fn record_histogram(&self, name: &str, s_id: &str, r_id: &str, value: u64) { + let m_id = METRICS.with(|metrics| metrics.get_histogram(name, s_id, r_id)); + record_metric(m_id, value).unwrap(); + } } impl RootContext for ResponseTransformerContext { @@ -55,9 +134,16 @@ impl Context for ResponseTransformerContext { impl HttpContext for ResponseTransformerContext { fn on_http_response_headers(&mut self, _num_headers: usize, _end_of_stream: bool) -> Action { + let s_id = self.get_prop("kong", "service_name"); + let r_id = self.get_prop("kong", "route_name"); + + let t = get_current_time().unwrap().duration_since(SystemTime::UNIX_EPOCH); + self.config.remove.headers.iter().for_each(|name| { info!("[response-transformer] removing header: {}", name); self.set_http_response_header(&name, None); + + self.increment_counter("remove", &s_id, &r_id); }); self.config.rename.headers.iter().for_each(|KeyValuePair(from, to)| { @@ -65,12 +151,16 @@ impl HttpContext for ResponseTransformerContext { let value = self.get_http_response_header(&from); self.set_http_response_header(&from, None); self.set_http_response_header(&to, value.as_deref()); + + self.increment_counter("rename", &s_id, &r_id); }); self.config.replace.headers.iter().for_each(|KeyValuePair(name, value)| { if self.get_http_response_header(&name).is_some() { info!("[response-transformer] updating header {} value to {}", name, value); self.set_http_response_header(&name, Some(&value)); + + self.increment_counter("replace", &s_id, &r_id); } }); @@ -78,14 +168,20 @@ impl HttpContext for ResponseTransformerContext { if self.get_http_response_header(&name).is_none() { info!("[response-transformer] adding header {} => {}", name, value); self.set_http_response_header(&name, Some(&value)); + + self.increment_counter("add", &s_id, &r_id); } }); self.config.append.headers.iter().for_each(|KeyValuePair(name, value)| { info!("[response-transformer] appending header {} => {}", name, value); self.add_http_response_header(&name, &value); + + self.increment_counter("append", &s_id, &r_id); }); + self.record_histogram("a_histogram", &s_id, &r_id, 2); + self.record_gauge("last_run", &s_id, &r_id, t.unwrap().as_secs()); Action::Continue } diff --git a/spec/fixtures/proxy_wasm_filters/tests/src/filter.rs b/spec/fixtures/proxy_wasm_filters/tests/src/filter.rs index 9251987e6966..f9abc5eeb8b4 100644 --- a/spec/fixtures/proxy_wasm_filters/tests/src/filter.rs +++ b/spec/fixtures/proxy_wasm_filters/tests/src/filter.rs @@ -7,20 +7,56 @@ use crate::test_http::*; use crate::types::*; use http::StatusCode; use log::*; +use proxy_wasm::hostcalls::*; use proxy_wasm::traits::*; use proxy_wasm::types::*; +use std::collections::HashMap; use std::str::FromStr; use std::time::Duration; proxy_wasm::main! {{ proxy_wasm::set_log_level(LogLevel::Info); proxy_wasm::set_root_context(|_| -> Box { - Box::new(TestRoot { config: None }) + Box::new(TestRoot { config: None, metrics: HashMap::new() }) }); }} struct TestRoot { config: Option, + metrics: HashMap, +} + +impl TestRoot { + fn get_config(&self, name: &str) -> Option<&str> { + match &self.config { + Some(config) => config.map.get(name).map(|s| s.as_str()), + None => None, + } + } + + fn define_metrics(&mut self) { + let config = self.get_config("metrics").map_or("c1,g1,h1".to_string(), |x| x.to_string()); + + for metric in config.split(",") { + let metric_char = metric.chars().nth(0).unwrap(); + let metric_type = match metric_char { + 'c' => MetricType::Counter, + 'g' => MetricType::Gauge, + 'h' => MetricType::Histogram, + _ => panic!("unexpected metric type"), + }; + let n = metric[1..].parse::().expect("bad metrics value"); + + for i in 1..(n + 1) { + let name = format!("{}{}", metric_char, i); + let m_id = define_metric(metric_type, &name).expect("cannot define new metric"); + + info!("defined metric {} as {:?}", &name, m_id); + + self.metrics.insert(name, m_id); + } + } + } } impl Context for TestRoot {} @@ -44,6 +80,8 @@ impl RootContext for TestRoot { self.set_tick_period(Duration::from_millis(ms)); } + + self.define_metrics(); } true