由于接手了新蛋那边的几个项目,新蛋那边给我们的网络架构提了一点建议,其中一个就是搜集请求日志。
但是我们这边使用的是 k8s,所以我们这边整理了一下,搜集请求日志的架构大致是:
先贴上我们 lua 收集请求信息的脚本(demo):
lua_package_path "/usr/local/openresty/lualib/resty/kafka/?.lua;;";
lua_need_request_body on;
server {
listen 80;
server_name testkafka;
location / {
content_by_lua '
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{ host = "10.222.31.226", port = 9092 }
}
local topic = "log"
local key = "key"
local log_obj = {}
local request = {}
local response = {}
request["headers"] = ngx.req.get_headers()
request["uri_args"] = ngx.req.get_uri_args()
request["body"] = ngx.req.read_body()
request["http_version"] = ngx.req.http_version()
request["method"] = ngx.req.get_method()
-- request["raw_reader"] = ngx.req.raw_header()
request["body"] = ngx.req.get_body_data()
request["is_internal"] = ngx.req.is_internal()
request["referer"] = ngx.var.http_referer or ""
request["http_via"] = ngx.var.http_via
request["query_string"] = ngx.var.query_string
request["request_uri"] = ngx.var.request_uri
request["uri"] = ngx.var.uri
request["host"] = ngx.var.host
log_obj["start_time"] = ngx.req.start_time()
log_obj["remote_addr"] = ngx.var.remote_addr
log_obj["remote_user"] = ngx.var.remote_user
log_obj["request"] = request
log_obj["response"] = response
ngx.ctx.msg = log_obj
local message = cjson.encode(log_obj)
-- usually we do not use this library directly
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata(topic)
if not brokers then
ngx.say("fetch_metadata failed, err:", partitions)
end
-- this is async producer_type and bp will be reused in the whole nginx worker
bp = producer:new(broker_list, { producer_type = "async" })
local ok, err = bp:send(topic, key, message)
if not ok then
ngx.log("kafka send err:", err)
return
end
-- ngx.say("host : ", ngx.var.host)
-- ngx.say("uri : ", ngx.var.uri)
-- ngx.say("args : ", ngx.var.args)
-- ngx.say("body : ", ngx.req.get_body_data())
-- ngx.say("client ip : ", ngx.var.remote_addr)
-- ngx.say("time : ", ngx.var.time_local)
-- ngx.say("send success, ok:", ok)
';
body_filter_by_lua '
local log_obj = ngx.ctx.msg
local response = log_obj["response"]
local resp_body = string.sub(ngx.arg[1], 1, 1000)
ngx.ctx.buffered = (ngx.ctx.buffered or"") .. resp_body
if ngx.arg[2] then
response["body"] = ngx.ctx.buffered
end
ngx.ctx.msg = log_obj
';
log_by_lua '
local cjson = require "cjson"
local log_obj = ngx.ctx.msg
local response = log_obj["response"]
log_obj["cost_time"] = ngx.now() - ngx.req.start_time()
response["headers"] = ngx.resp.get_headers()
response["status"] = ngx.status
local message = cjson.encode(log_obj)
local topic = "log"
local key = "key"
local ok, err = bp:send(topic, key, message)
';
}
}
这里不仅需要让 nginx-ingress 支持 lua,还需要倒入一个 lua 的第三方模块 lua-resty-kafka
,这个第三方模块里面实现了一个生产者来发送消息。我们只需要 new 一个 producer ,把搜集的消息 encode 一下,就能发送给 kafka 了,这里注意一点的是填写 broker_list 内容的时候,写的是 kafka broker 的 ip ,不能写 hostname 。(原因还不清楚)
这里还有一个难点是把这段 lua 脚本嵌入到 nginx-ingress 里面,这里运维同学发现了可以使用 ingress 的 nginx.ingress.kubernetes.io/server-snippet
注解,这个注解可以在服务器配置块中添加自定义配置。
使用方式参考https://kubernetes.github.io/ingress-nginx/user-guide/nginx-configuration/annotations/#server-snippet