19. 插件

Metrics 插件

  • 示例: metrics
  • Metrics 插件使用流行的go-metrics 来计算服务的指标。
  • 它包含多个统计指标:
    • serviceCounter
    • clientMeter
    • “service_”+servicePath+”.”+serviceMethod+”Read Qps”
    • “service_”+servicePath+”.”+serviceMethod+”Write Qps”
    • “service_”+servicePath+”.”+serviceMethod+”_CallTime”
    • 你可以将metrics输出到graphite中,通过grafana来监控。

限流

  • 实例: rate-limiting
  • 限流是一种保护错误,避免服务被突发的或者大量的请求所拖垮。
  • 这个插件使用 juju/ratelimit来限流。
  • 使用 func NewRateLimitingPlugin(fillInterval time.Duration, capacity int64) *RateLimitingPlugin t来创建这个插件。

别名

  • 示例: alias
  • 这个插件可以为一个服务方法设置一个别名。
  • 下面的代码使用 Arith的别名a.b.c.d, 为Mul设置别名Times
    func main() {
        flag.Parse()
        a := serverplugin.NewAliasPlugin()
        a.Alias("a.b.c.d", "Times", "Arith", "Mul")
        s := server.NewServer()
        s.Plugins.Add(a)
        s.RegisterName("Arith", new(example.Arith), "")
        err := s.Serve("reuseport", *addr)
        if err != nil {
            panic(err)
        }
    }
    
  • 客户端可以使用别名来调用服务:
    func main() {
        flag.Parse()
        d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
        option := client.DefaultOption
        option.ReadTimeout = 10 * time.Second
        xclient := client.NewXClient("a.b.c.d", client.Failtry, client.RandomSelect, d, option)
        defer xclient.Close()
        args := &example.Args{
            A: 10,
            B: 20,
        }
        reply := &example.Reply{}
        err := xclient.Call(context.Background(), "Times", args, reply)
        if err != nil {
            log.Fatalf("failed to call: %v", err)
        }
        log.Printf("%d * %d = %d", args.A, args.B, reply.C)
    }
    

身份认证

  • 示例: auth
  • 出于安全的考虑, 很多场景下只有授权的客户端才可以调用服务。
  • 客户端必须设置一个 token, 这个token可以从其他的 OAuth/OAuth2 服务器获得,或者由服务提供者分配。
  • 服务接收到请求后,需要验证这个token。如果是token是从 OAuth2服务器中申请到,则服务需要到OAuth2服务中去验证, 如果是自己分配的,则需要和自己的记录进行对别。
  • 因为rpcx提供的是一个身份验证的框架,所以具体的身份验证需要自己集成和验证。
    func main() {
        flag.Parse()
        s := server.NewServer()
        s.RegisterName("Arith", new(example.Arith), "")
        s.AuthFunc = auth
        s.Serve("reuseport", *addr)
    }
    func auth(ctx context.Context, req *protocol.Message, token string) error {
        if token == "bearer tGzv3JOkF0XG5Qx2TlKWIA" {
            return nil
        }
        return errors.New("invalid token")
    }
    
  • 服务器必须定义 AuthFunc 来验证token。在上面的例子中, 只有token为bearer tGzv3JOkF0XG5Qx2TlKWIA 才是合法的客户端。
  • 客户端必须设置这个toekn
    func main() {
        flag.Parse()
        d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
        option := client.DefaultOption
        option.ReadTimeout = 10 * time.Second
        xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, option)
        defer xclient.Close()
        //xclient.Auth("bearer tGzv3JOkF0XG5Qx2TlKWIA")
        xclient.Auth("bearer abcdefg1234567")
        args := &example.Args{
            A: 10,
            B: 20,
        }
        reply := &example.Reply{}
        ctx := context.WithValue(context.Background(), share.ReqMetaDataKey, make(map[string]string))
        err := xclient.Call(ctx, "Mul", args, reply)
        if err != nil {
            log.Fatalf("failed to call: %v", err)
        }
        log.Printf("%d * %d = %d", args.A, args.B, reply.C)
    }
    
  • 注意: 你必须设置 map[string]stringshare.ReqMetaDataKey的值,否则调用会出错

插件开发

rpcx为服务器和客户端定义了几个插件接口,在一些处理点上可以调用插件。

Server Plugin

  • 示例: trace
  • go doc
    type PostConnAcceptPlugin
    type PostConnAcceptPlugin interface {
        HandleConnAccept(net.Conn) (net.Conn, bool)
    }
    type PostReadRequestPlugin
    type PostReadRequestPlugin interface {
        PostReadRequest(ctx context.Context, r *protocol.Message, e error) error
    }
    PostReadRequestPlugin represents .
    type PostWriteResponsePlugin
    type PostWriteResponsePlugin interface {
        PostWriteResponse(context.Context, *protocol.Message, *protocol.Message, error) error
    }
    PostWriteResponsePlugin represents .
    type PreReadRequestPlugin
    type PreReadRequestPlugin interface {
        PreReadRequest(ctx context.Context) error
    }
    PreReadRequestPlugin represents .
    type PreWriteResponsePlugin
    type PreWriteResponsePlugin interface {
        PreWriteResponse(context.Context, *protocol.Message) error
    }
    PreWriteResponsePlugin represents .
    

Client Plugin

  • go doc
    type PluginContainer
    type PluginContainer interface {
        Add(plugin Plugin)
        Remove(plugin Plugin)
        All() []Plugin
        DoPreCall(ctx context.Context, servicePath, serviceMethod string, args interface{}) error
        DoPostCall(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, err error) error
    }
    type PostCallPlugin
    type PostCallPlugin interface {
        DoPostCall(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, err error) error
    }
    PostCallPlugin is invoked after the client calls a server.
    type PreCallPlugin
    type PreCallPlugin interface {
        DoPreCall(ctx context.Context, servicePath, serviceMethod string, args interface{}) error
    }
    PreCallPlugin is invoked before the client calls a server.