notes

前言

由于接手了新蛋那边的几个项目,新蛋那边给我们的网络架构提了一点建议,其中一个就是搜集请求日志。

但是我们这边使用的是 k8s,所以我们这边整理了一下,搜集请求日志的架构大致是:

  1. 有一个 nginx-ingress 作为统一的网关
  2. 在这个 nginx-ingress 内部使用 lua 搜集请求日志,并发送给kafka队列
  3. 把 kafka 搜集的消息存储到 elasticsearch
  4. 使用 kibana (k8s的日志管理平台)查看请求日志

实现1,2步骤

先贴上我们 lua 收集请求信息的脚本(demo):

lua_package_path "/usr/local/openresty/lualib/resty/kafka/?.lua;;";
lua_need_request_body on;
server {
    listen 80;
    server_name testkafka;

    location / {
        content_by_lua '
            local cjson = require "cjson"
            local client = require "resty.kafka.client"
            local producer = require "resty.kafka.producer"

            local broker_list = {
                { host = "10.222.31.226", port = 9092 }
            }

            local topic = "log"
            local key = "key"

            local log_obj = {}
            local request = {}
            local response = {}

            request["headers"] = ngx.req.get_headers()
            request["uri_args"] = ngx.req.get_uri_args()
            request["body"] = ngx.req.read_body()
            request["http_version"] = ngx.req.http_version()
            request["method"] = ngx.req.get_method()
            -- request["raw_reader"] = ngx.req.raw_header()
            request["body"] = ngx.req.get_body_data()
            request["is_internal"] = ngx.req.is_internal()
            request["referer"] = ngx.var.http_referer or ""
            request["http_via"] = ngx.var.http_via
            request["query_string"] = ngx.var.query_string
            request["request_uri"] = ngx.var.request_uri
            request["uri"] = ngx.var.uri
            request["host"] = ngx.var.host
            
            log_obj["start_time"] = ngx.req.start_time()
            log_obj["remote_addr"] = ngx.var.remote_addr
            log_obj["remote_user"] = ngx.var.remote_user
            log_obj["request"] = request
            log_obj["response"] = response

            ngx.ctx.msg = log_obj
            local message = cjson.encode(log_obj)
            -- usually we do not use this library directly
            local cli = client:new(broker_list)
            local brokers, partitions = cli:fetch_metadata(topic)
            if not brokers then
                ngx.say("fetch_metadata failed, err:", partitions)
            end

            -- this is async producer_type and bp will be reused in the whole nginx worker
            bp = producer:new(broker_list, { producer_type = "async" })

            local ok, err = bp:send(topic, key, message)
            if not ok then
                ngx.log("kafka send err:", err)
                return
            end

            -- ngx.say("host : ", ngx.var.host)
            -- ngx.say("uri : ", ngx.var.uri)
            -- ngx.say("args : ", ngx.var.args)
            -- ngx.say("body : ", ngx.req.get_body_data())
            -- ngx.say("client ip : ", ngx.var.remote_addr)
            -- ngx.say("time : ", ngx.var.time_local)
            -- ngx.say("send success, ok:", ok)
        ';

        body_filter_by_lua '
            local log_obj = ngx.ctx.msg
            local response = log_obj["response"]
            local resp_body = string.sub(ngx.arg[1], 1, 1000)
            ngx.ctx.buffered = (ngx.ctx.buffered or"") .. resp_body
            if ngx.arg[2] then
                response["body"] = ngx.ctx.buffered
            end
            ngx.ctx.msg = log_obj
        ';

        log_by_lua '
            local cjson = require "cjson"
            local log_obj = ngx.ctx.msg
            local response = log_obj["response"]
            log_obj["cost_time"] = ngx.now() - ngx.req.start_time()
            response["headers"] = ngx.resp.get_headers()
            response["status"] = ngx.status
            local message = cjson.encode(log_obj)
            local topic = "log"
            local key = "key"
            local ok, err = bp:send(topic, key, message)
        ';

    }
}

这里不仅需要让 nginx-ingress 支持 lua,还需要倒入一个 lua 的第三方模块 lua-resty-kafka ,这个第三方模块里面实现了一个生产者来发送消息。我们只需要 new 一个 producer ,把搜集的消息 encode 一下,就能发送给 kafka 了,这里注意一点的是填写 broker_list 内容的时候,写的是 kafka broker 的 ip ,不能写 hostname 。(原因还不清楚)

这里还有一个难点是把这段 lua 脚本嵌入到 nginx-ingress 里面,这里运维同学发现了可以使用 ingress 的 nginx.ingress.kubernetes.io/server-snippet 注解,这个注解可以在服务器配置块中添加自定义配置。

使用方式参考https://kubernetes.github.io/ingress-nginx/user-guide/nginx-configuration/annotations/#server-snippet

步骤3