4. 客户端开发示例

Client

Example: 101basic

客户端使用和服务同样的通信协议来发送请求和获取响应。

type Client struct {
    Conn net.Conn
    Plugins PluginContainer
    // 包含过滤后的或者不可导出的字段
}

Conn 代表客户端与服务器之前的连接。 Plugins 包含了客户端启用的插件。有以下方法:

    func (client *Client) Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error
    func (client *Client) Close() error
    func (c *Client) Connect(network, address string) error
    func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call
    func (client *Client) IsClosing() bool
    func (client *Client) IsShutdown() bool
  • Call 代表对服务同步调用。客户端在收到响应或错误前一直是阻塞的。 然而 Go 是异步调用。它返回一个指向 Call 的指针, 你可以检查 *Call 的值来获取返回的结果或错误。
  • Close 会关闭所有与服务的连接。他会立刻关闭连接,不会等待未完成的请求结束。
  • IsClosing 表示客户端是关闭着的并且不会接受新的调用。
  • IsShutdown 表示客户端不会接受服务返回的响应。
  • Client uses the default CircuitBreaker (circuit.NewRateBreaker(0.95, 100)) to handle errors. This is a poplular rpc error handling style. When the error rate hits the threshold, this service is marked unavailable in 10 second window. You can implement your customzied CircuitBreaker.
  • Client 使用默认的 CircuitBreaker (circuit.NewRateBreaker(0.95, 100)) 来处理错误。这是rpc处理错误的普遍做法。当出错率达到阈值, 这个服务就会在接下来的10秒内被标记为不可用。你也可以实现你自己的 CircuitBreaker。

下面是客户端的例子:

    client := &Client{
        option: DefaultOption,
    }
    err := client.Connect("tcp", addr)
    if err != nil {
        t.Fatalf("failed to connect: %v", err)
    }
    defer client.Close()
    args := &Args{
        A: 10,
        B: 20,
    }
    reply := &Reply{}
    err = client.Call(context.Background(), "Arith", "Mul", args, reply)
    if err != nil {
        t.Fatalf("failed to call: %v", err)
    }
    if reply.C != 200 {
        t.Fatalf("expect 200 but got %d", reply.C)
    }

XClient

XClient 是对客户端的封装,增加了一些服务发现和服务治理的特性。

type XClient interface {
    SetPlugins(plugins PluginContainer)
    ConfigGeoSelector(latitude, longitude float64)
    Auth(auth string)
    Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *Call) (*Call, error)
    Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
    Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
    Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
    Close() error
}
  • SetPlugins 方法可以用来设置 Plugin 容器, Auth 可以用来设置鉴权token。
  • ConfigGeoSelector 是一个可以通过地址位置选择器来设置客户端的经纬度的特别方法。
  • 一个XCLinet只对一个服务负责,它可以通过serviceMethod参数来调用这个服务的所有方法。如果你想调用多个服务,你必须为每个服务创建一个XClient。
  • 一个应用中,一个服务只需要一个共享的XClient。它可以被通过goroutine共享,并且是协程安全的。
  • Go 代表异步调用, Call 代表同步调用。
  • XClient对于一个服务节点使用单一的连接,并且它会缓存这个连接直到失效或异常。

服务发现

rpcx 支持许多服务发现机制,你也可以实现自己的服务发现。

  • Peer to Peer: 客户端直连每个服务节点。 the client connects the single service directly. It acts like the client type.
  • Peer to Multiple: 客户端可以连接多个服务。服务可以被编程式配置。
  • Zookeeper: 通过 zookeeper 寻找服务。
  • Etcd: 通过 etcd 寻找服务。
  • Consul: 通过 consul 寻找服务。
  • mDNS: 通过 mDNS 寻找服务(支持本地服务发现)。
  • In process: 在同一进程寻找服务。客户端通过进程调用服务,不走TCP或UDP,方便调试使用。

下面是一个同步的 rpcx 例子:

package main
import (
    "context"
    "flag"
    "log"
    example "github.com/rpcx-ecosystem/rpcx-examples3"
    "github.com/smallnest/rpcx/client"
)
var (
    addr = flag.String("addr", "localhost:8972", "server address")
)
func main() {
    flag.Parse()
    d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
    xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
    defer xclient.Close()
    args := &example.Args{
        A: 10,
        B: 20,
    }
    reply := &example.Reply{}
    err := xclient.Call(context.Background(), "Mul", args, reply)
    if err != nil {
        log.Fatalf("failed to call: %v", err)
    }
    log.Printf("%d * %d = %d", args.A, args.B, reply.C)
}

服务治理 (失败模式与负载均衡)

在一个大规模的rpc系统中,有许多服务节点提供同一个服务。客户端如何选择最合适的节点来调用呢?如果调用失败,客户端应该选择另一个节点或者立即返回错误?这里就有了故障模式和负载均衡的问题。rpcx 支持 故障模式:

  • Failfast:如果调用失败,立即返回错误
  • Failover:选择其他节点,直到达到最大重试次数
  • Failtry:选择相同节点并重试,直到达到最大重试次数

对于负载均衡,rpcx 提供了许多选择器:

  • Random: 随机选择节点
    • Roundrobin: 使用 roundrobin 算法选择节点
    • Consistent hashing: 如果服务路径、方法和参数一致,就选择同一个节点。使用了非常快的jump consistent hash算法。
    • Weighted: 根据元数据里配置好的权重(weight=xxx)来选择节点。类似于nginx里的实现(smooth weighted algorithm)
    • Network quality: 根据ping的结果来选择节点。网络质量越好,该节点被选择的几率越大。
    • Geography: 如果有多个数据中心,客户端趋向于连接同一个数据机房的节点。
    • Customized Selector: 如果以上的选择器都不适合你,你可以自己定制选择器。例如一个rpcx用户写过它自己的选择器,他有2个数据中心,但是这些数据中心彼此有限制,不能使用 Network quality 来检测连接质量。

下面是一个异步的 rpcx 例子:

package main
import (
    "context"
    "flag"
    "log"
    example "github.com/rpcx-ecosystem/rpcx-examples3"
    "github.com/smallnest/rpcx/client"
)
var (
    addr2 = flag.String("addr", "localhost:8972", "server address")
)
func main() {
    flag.Parse()
    d := client.NewPeer2PeerDiscovery("tcp@"+*addr2, "")
    xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
    defer xclient.Close()
    args := &example.Args{
        A: 10,
        B: 20,
    }
    reply := &example.Reply{}
    call, err := xclient.Go(context.Background(), "Mul", args, reply, nil)
    if err != nil {
        log.Fatalf("failed to call: %v", err)
    }
    replyCall := <-call.Done
    if replyCall.Error != nil {
        log.Fatalf("failed to call: %v", replyCall.Error)
    } else {
        log.Printf("%d * %d = %d", args.A, args.B, reply.C)
    }
}

客户端使用了 Failtry 模式并且随机选择节点。

广播与群发

特殊情况下,你可以使用 XClient 的 BroadcastFork 方法。

    Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
    Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error

Broadcast 表示向所有服务器发送请求,只有所有服务器正确返回时才会成功。此时FailMode 和 SelectMode的设置是无效的。请设置超时来避免阻塞。

Fork 表示向所有服务器发送请求,只要任意一台服务器正确返回就成功。此时FailMode 和 SelectMode的设置是无效的。

你可以使用 NewXClient 来获取一个 XClient 实例。

func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient

NewXClient 必须使用服务名称作为第一个参数, 然后是 failmode、 selector、 discovery等其他选项。

下一节:rpcx 可以通过 TCP、HTTP、UnixDomain、QUIC和KCP通信。你也可以使用http客户端通过网关或者http调用来访问rpcx服务。