以太坊源码分析 RPC

本文主要分析以太坊 RPC 的完整流程,也就是 API 注册和 API 调用流程。

以太坊 RPC 遵循JSON RPC规范,API列表参见以下链接:

https://github.com/ethereum/wiki/wiki/JSON-RPC

1. API注册流程

先看一张图,理清各组件之间的关系:

Node启动时会调用各Service的构造函数创建Service实例。Service是一个接口,要对外暴露RPC API的模块都需要实现该接口,比如Ethereum, Whisper, Swarm等等,在图中统一用<module>表示。

在Node的startRPC()函数中,首先会调用每个Service的APIs()函数,把所有RPC API收集到一个数组中:

    // Gather all the possible APIs to surface
    apis := n.apis()
    for _, service := range services {
        apis = append(apis, service.APIs()...)
    }

我们看一下API结构的定义,代码位于rpc/types.go:

type API struct {
    Namespace string      // namespace under which the rpc methods of Service are exposed
    Version   string      // api version for DApp's
    Service   interface{} // receiver instance which holds the methods
    Public    bool        // indication if the methods must be considered safe for public use
}

这里有一个Namespace的概念,可以理解为API的类别分组,方便管理。用过geth客户端的同学可能有印象,我们会在命令行指定类似下面的参数:

geth --rpc --rpcapi “eth,net,web3”

这里的eth,net,web3就是Namespace,指定了需要对外暴露哪些类型的RPC API。

除了Namespace,还有一个Service字段,注意它的类型是interface{},而不是我们之前提到的Service接口,所以个人感觉这个命名不太好,改成receiver可能就不容易引起混淆了。

那么这个Service指向哪里呢?我们以eth模块为例,看一下返回的API数组里到底存的是什么内容,代码位于eth/backend.go:

func (s *Ethereum) APIs() []rpc.API {
    ……

    // Append all the local APIs and return
    return append(apis, []rpc.API{
        {
            Namespace: "eth",
            Version:   "1.0",
            Service:   NewPublicEthereumAPI(s),
            Public:    true,
        }, {
            Namespace: "eth",
            Version:   "1.0",
            Service:   NewPublicMinerAPI(s),
            Public:    true,
        },
        ……)
}

其他先不看,我们先看下NewPublicEthereumAPI是个什么东西,代码位于eth.api.go:

func NewPublicEthereumAPI(e *Ethereum) *PublicEthereumAPI {
    return &PublicEthereumAPI{e}
}

type PublicEthereumAPI struct {
    e *Ethereum
}

可以看到就,就是一个普通的struct,内部有一个指针指向模块本身,其实就是一个wrapper。现在再回过头来看上面那张图的黄色部分,是不是就很清楚了?

获得所有RPC API的集合后,就开始启动RPC server了。上一篇提到RPC有4种方式:InProc、IPC、HTTP、WS,在Node中对应的字段都用不同颜色标识了。流程都是类似的,这里以HTTP为例进行分析。

HTTP相关的有3个字段:

  • httpEndpoint:这是一个字符串,表示IP和端口号,默认是localhost:8545
  • httpListener:这是一个接口,调用net.Listen()时返回,包含了Accept()/Close()/Addr()这3个函数,可以用来接受和关闭连接
  • httpHandler:这是一个需要重点分析的结构,定义位于rpc/types.go:
type Server struct {
    services serviceRegistry

    run      int32
    codecsMu sync.Mutex
    codecs   *set.Set
}

type serviceRegistry map[string]*service // collection of services

可以看到,其中有一个services字段,是一个map,key是Namespace,value是一个service实例。注意这个service类型首字母是小写的,所以是不对外暴露的,定义位于rpc/types.go:

type service struct {
    name          string        // name for service
    typ           reflect.Type  // receiver type
    callbacks     callbacks     // registered handlers
    subscriptions subscriptions // available subscriptions/notifications
}

service中包含了两个字段callbacks和subscriptions,继续看:

type callbacks map[string]*callback      // collection of RPC callbacks
type subscriptions map[string]*callback  // collection of subscription callbacks

type callback struct {
    rcvr        reflect.Value  // receiver of method
    method      reflect.Method // callback
    argTypes    []reflect.Type // input argument types
    hasCtx      bool           // method's first argument is a context (not included in argTypes)
    errPos      int            // err return idx, of -1 when method cannot return error
    isSubscribe bool           // indication if the callback is a subscription
}

可以看到,subscription是一种特殊的callback,而callback结构中包含了RPC API所需要的所有信息:

  • rcvr:方法的接收者,这是一个反射值类型,其实就是指向了之前的NewPublicEthereumAPI
  • method:对应rcvr中的函数
  • argTypes:函数参数的类型列表
  • hasCtx:标识函数的第一个参数是否是context.Context类型
  • isSubscribe:是否是subscription类型(因为它们共用一个结构定义)

至此,各组件直接的关系就分析完了,可以回顾一下上面的那张图。

下面分析一下API注册的具体流程。

看一下rpc/endpoints.go中的startHTTPEndpoint()函数:

func StartHTTPEndpoint(endpoint string, apis []API, modules []string, cors []string, vhosts []string) (net.Listener, *Server, error) {
    ……
    handler := NewServer()
    for _, api := range apis {
        if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
            if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
                return nil, nil, err
            }
            log.Debug("HTTP registered", "namespace", api.Namespace)
        }
    }
    ……
}

具体就是调用了handler.RegisterName()函数,代码位于rpc/server.go:

func (s *Server) RegisterName(name string, rcvr interface{}) error {
    if s.services == nil {
        s.services = make(serviceRegistry)
    }

    svc := new(service)
    svc.typ = reflect.TypeOf(rcvr)
    rcvrVal := reflect.ValueOf(rcvr)

    ……
    methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)

    // already a previous service register under given sname, merge methods/subscriptions
    if regsvc, present := s.services[name]; present {
        ……
        for _, m := range methods {
            regsvc.callbacks[formatName(m.method.Name)] = m
        }
        for _, s := range subscriptions {
            regsvc.subscriptions[formatName(s.method.Name)] = s
        }
        return nil
    }

    svc.name = name
    svc.callbacks, svc.subscriptions = methods, subscriptions

    ……

    s.services[svc.name] = svc
    return nil
}

可以看到是先创建一个service实例,然后填充它的callbacks和subscriptions字段。其中suitableCallbacks()函数会检查API定义是否符合标准,然后创建callback实例放入map中。

另外如果发现API属于同一个Namespace,会进行合并,因为这里的结构是map而不是数组。

至此,API注册流程就分析完了,接下来分析API调用流程。

2. API调用流程

先试用一下API调用,有一个直观的印象:

curl -H "Content-Type: application/json" -X POST --data '{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["0x1b4", true],"id":1}' localhost:8545

返回结果:

请求数据遵循JSON RPC规范:http://www.jsonrpc.org/specification

具体来说,请求对象需要包括下面4个字段:

  • jsonrpc:协议版本号,固定是2.0
  • method:请求调用的函数名,可以看到是Namespace_Method这种命名方式
  • params: 函数参数列表,一般是一个数组
  • id:客户端和服务器之前通信的一个标识,服务器返回响应时必须返回相同的id。可以是数字或者字符串,不建议设为NULL

相应的,返回的响应需要包含以下字段:

  • jsonrpc:协议版本号,固定是2.0
  • result/error:返回的结果或者错误,二选一
  • id:客户端和服务器之前通信的一个标识,服务器返回响应时必须返回相同的id。可以是数字或者字符串,不建议设为NULL

API调用相关的结构和流程参见下图:

下面开始代码分析。还是回到rpc/endpoints.go中的startHTTPEndpoint()函数,主要看后半段:

func StartHTTPEndpoint(endpoint string, apis []API, modules []string, cors []string, vhosts []string) (net.Listener, *Server, error) {
    ……
    var (
        listener net.Listener
        err      error
    )
    if listener, err = net.Listen("tcp", endpoint); err != nil {
        return nil, nil, err
    }
    go NewHTTPServer(cors, vhosts, handler).Serve(listener)
    return listener, handler, err
}

首先侦听TCP端口,获得listener接口实例。然后创建了一个http.Server实例,并启动一个goroutine调用它的Serve()方法。看一下NewHTTPServer()函数(rpc/http.go):

func NewHTTPServer(cors []string, vhosts []string, srv *Server) *http.Server {
    // Wrap the CORS-handler within a host-handler
    handler := newCorsHandler(srv, cors)
    handler = newVHostHandler(vhosts, handler)
    return &http.Server{Handler: handler}
}

这里有一个Handler参数,用到了装饰者模式,其实最终实现还是在rpc.Server中。Handler是一个接口,需要实现它的ServerHTTP()函数来处理网络数据,代码如下:

func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    ……

    body := io.LimitReader(r.Body, maxRequestContentLength)
    codec := NewJSONCodec(&httpReadWriteNopCloser{body, w})
    defer codec.Close()

    w.Header().Set("content-type", contentType)
    srv.ServeSingleRequest(ctx, codec, OptionMethodInvocation)
}

可以看到,首先创建一个Reader用于读取原始数据,然后创建一个JSON的编解码器,最后调用ServeSingleRequest()函数:

func (s *Server) ServeSingleRequest(ctx context.Context, codec ServerCodec, options CodecOption) {
    s.serveRequest(ctx, codec, true, options)
}

func (s *Server) serveRequest(ctx context.Context, codec ServerCodec, singleShot bool, options CodecOption) error {
    ……

    s.codecs.Add(codec)
    ……

    for atomic.LoadInt32(&s.run) == 1 {
        reqs, batch, err := s.readRequest(codec)
        ……

        // If a single shot request is executing, run and return immediately
        if singleShot {
            if batch {
                s.execBatch(ctx, codec, reqs)
            } else {
                s.exec(ctx, codec, reqs[0])
            }
            return nil
        }
        // For multi-shot connections, start a goroutine to serve and loop back
        pend.Add(1)

        go func(reqs []*serverRequest, batch bool) {
            defer pend.Done()
            if batch {
                s.execBatch(ctx, codec, reqs)
            } else {
                s.exec(ctx, codec, reqs[0])
            }
        }(reqs, batch)

    }
    return nil
}

可以看到,就是一个循环,每次调用readRequest()解析请求数据,然后调用exec()或者execBatch()执行API调用。

首先看一下readRequest()函数:

func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) {
    reqs, batch, err := codec.ReadRequestHeaders()
    ……

    requests := make([]*serverRequest, len(reqs))

    for i, r := range reqs {
        var ok bool
        var svc *service
        ……

        if svc, ok = s.services[r.service]; !ok { // rpc method isn't available
            requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
            continue
        }
        ……

        if callb, ok := svc.callbacks[r.method]; ok { // lookup RPC method
            requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}
            if r.params != nil && len(callb.argTypes) > 0 {
                if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil {
                    requests[i].args = args
                } else {
                    requests[i].err = &invalidParamsError{err.Error()}
                }
            }
            continue
        }

        requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
    }

    return requests, batch, nil
}

这里就对应于图上的两个红色箭头部分:首先codec把原始JSON数据解析为一个rpcRequest数组,然后遍历这个数组,根据Namespace找到对应的service,再从service的callbacks表中查询需要调用的method,最后组装成一个新的数据结构serverRequest。

接着就是调用exec()执行这个severRequest指向的API实现了:

func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) {
    var response interface{}
    var callback func()
    if req.err != nil {
        response = codec.CreateErrorResponse(&req.id, req.err)
    } else {
        response, callback = s.handle(ctx, codec, req)
    }

    if err := codec.Write(response); err != nil {
        log.Error(fmt.Sprintf("%v\n", err))
        codec.Close()
    }

    // when request was a subscribe request this allows these subscriptions to be actived
    if callback != nil {
        callback()
    }
}

可以看到调用了handle()方法获取响应数据,然后通过codec组装成JSON发送给请求端。

看一下handle()函数:

func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) (interface{}, func()) {
    ……

    arguments := []reflect.Value{req.callb.rcvr}
    if req.callb.hasCtx {
        arguments = append(arguments, reflect.ValueOf(ctx))
    }
    if len(req.args) > 0 {
        arguments = append(arguments, req.args...)
    }

    // execute RPC method and return result
    reply := req.callb.method.Func.Call(arguments)
    if len(reply) == 0 {
        return codec.CreateResponse(req.id, nil), nil
    }
    ……

    return codec.CreateResponse(req.id, reply[0].Interface()), nil
}

首先处理参数列表,如果发现调用的函数需要Context参数则加到最前面。然后就是通过反射调用API了,最后把结果送给codec,按JSON RPC的格式要求组装成响应返回就可以了。

这里提到一个Context类型,主要是用来存储一些和请求相关的信息,初始化代码位于rpc/http.go:

    ctx := context.Background()
    ctx = context.WithValue(ctx, "remote", r.RemoteAddr)
    ctx = context.WithValue(ctx, "scheme", r.Proto)
    ctx = context.WithValue(ctx, "local", r.Host)

具体实现比较有意思,类似于一个单链表:

至此,API调用的整个流程就分析完了。

下一章:以太坊源码分析 账户管理

以太坊源码分析 账户管理:本文分析以太坊的账户管理的源码,主要包括两个部分: 获取钱包列表、订阅钱包事件。1. 获取钱包列表账户组件之间的关系:从图中可以看出wallet、account、address这三者的区别和联系i:wal ...