以太坊源码分析 RPC
本文主要分析以太坊 RPC 的完整流程,也就是 API 注册和 API 调用流程。
以太坊 RPC 遵循JSON RPC规范,API列表参见以下链接:
https://github.com/ethereum/wiki/wiki/JSON-RPC
1. API注册流程
先看一张图,理清各组件之间的关系:
Node启动时会调用各Service的构造函数创建Service实例。Service是一个接口,要对外暴露RPC API的模块都需要实现该接口,比如Ethereum, Whisper, Swarm等等,在图中统一用<module>表示。
在Node的startRPC()函数中,首先会调用每个Service的APIs()函数,把所有RPC API收集到一个数组中:
// Gather all the possible APIs to surface apis := n.apis() for _, service := range services { apis = append(apis, service.APIs()...) }
我们看一下API结构的定义,代码位于rpc/types.go:
type API struct { Namespace string // namespace under which the rpc methods of Service are exposed Version string // api version for DApp's Service interface{} // receiver instance which holds the methods Public bool // indication if the methods must be considered safe for public use }
这里有一个Namespace的概念,可以理解为API的类别分组,方便管理。用过geth客户端的同学可能有印象,我们会在命令行指定类似下面的参数:
geth --rpc --rpcapi “eth,net,web3”
这里的eth,net,web3就是Namespace,指定了需要对外暴露哪些类型的RPC API。
除了Namespace,还有一个Service字段,注意它的类型是interface{},而不是我们之前提到的Service接口,所以个人感觉这个命名不太好,改成receiver可能就不容易引起混淆了。
那么这个Service指向哪里呢?我们以eth模块为例,看一下返回的API数组里到底存的是什么内容,代码位于eth/backend.go:
func (s *Ethereum) APIs() []rpc.API { …… // Append all the local APIs and return return append(apis, []rpc.API{ { Namespace: "eth", Version: "1.0", Service: NewPublicEthereumAPI(s), Public: true, }, { Namespace: "eth", Version: "1.0", Service: NewPublicMinerAPI(s), Public: true, }, ……) }
其他先不看,我们先看下NewPublicEthereumAPI是个什么东西,代码位于eth.api.go:
func NewPublicEthereumAPI(e *Ethereum) *PublicEthereumAPI { return &PublicEthereumAPI{e} } type PublicEthereumAPI struct { e *Ethereum }
可以看到就,就是一个普通的struct,内部有一个指针指向模块本身,其实就是一个wrapper。现在再回过头来看上面那张图的黄色部分,是不是就很清楚了?
获得所有RPC API的集合后,就开始启动RPC server了。上一篇提到RPC有4种方式:InProc、IPC、HTTP、WS,在Node中对应的字段都用不同颜色标识了。流程都是类似的,这里以HTTP为例进行分析。
HTTP相关的有3个字段:
- httpEndpoint:这是一个字符串,表示IP和端口号,默认是localhost:8545
- httpListener:这是一个接口,调用net.Listen()时返回,包含了Accept()/Close()/Addr()这3个函数,可以用来接受和关闭连接
- httpHandler:这是一个需要重点分析的结构,定义位于rpc/types.go:
type Server struct { services serviceRegistry run int32 codecsMu sync.Mutex codecs *set.Set } type serviceRegistry map[string]*service // collection of services
可以看到,其中有一个services字段,是一个map,key是Namespace,value是一个service实例。注意这个service类型首字母是小写的,所以是不对外暴露的,定义位于rpc/types.go:
type service struct { name string // name for service typ reflect.Type // receiver type callbacks callbacks // registered handlers subscriptions subscriptions // available subscriptions/notifications }
service中包含了两个字段callbacks和subscriptions,继续看:
type callbacks map[string]*callback // collection of RPC callbacks type subscriptions map[string]*callback // collection of subscription callbacks type callback struct { rcvr reflect.Value // receiver of method method reflect.Method // callback argTypes []reflect.Type // input argument types hasCtx bool // method's first argument is a context (not included in argTypes) errPos int // err return idx, of -1 when method cannot return error isSubscribe bool // indication if the callback is a subscription }
可以看到,subscription是一种特殊的callback,而callback结构中包含了RPC API所需要的所有信息:
- rcvr:方法的接收者,这是一个反射值类型,其实就是指向了之前的NewPublicEthereumAPI
- method:对应rcvr中的函数
- argTypes:函数参数的类型列表
- hasCtx:标识函数的第一个参数是否是context.Context类型
- isSubscribe:是否是subscription类型(因为它们共用一个结构定义)
至此,各组件直接的关系就分析完了,可以回顾一下上面的那张图。
下面分析一下API注册的具体流程。
看一下rpc/endpoints.go中的startHTTPEndpoint()函数:
func StartHTTPEndpoint(endpoint string, apis []API, modules []string, cors []string, vhosts []string) (net.Listener, *Server, error) { …… handler := NewServer() for _, api := range apis { if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { if err := handler.RegisterName(api.Namespace, api.Service); err != nil { return nil, nil, err } log.Debug("HTTP registered", "namespace", api.Namespace) } } …… }
具体就是调用了handler.RegisterName()函数,代码位于rpc/server.go:
func (s *Server) RegisterName(name string, rcvr interface{}) error { if s.services == nil { s.services = make(serviceRegistry) } svc := new(service) svc.typ = reflect.TypeOf(rcvr) rcvrVal := reflect.ValueOf(rcvr) …… methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ) // already a previous service register under given sname, merge methods/subscriptions if regsvc, present := s.services[name]; present { …… for _, m := range methods { regsvc.callbacks[formatName(m.method.Name)] = m } for _, s := range subscriptions { regsvc.subscriptions[formatName(s.method.Name)] = s } return nil } svc.name = name svc.callbacks, svc.subscriptions = methods, subscriptions …… s.services[svc.name] = svc return nil }
可以看到是先创建一个service实例,然后填充它的callbacks和subscriptions字段。其中suitableCallbacks()函数会检查API定义是否符合标准,然后创建callback实例放入map中。
另外如果发现API属于同一个Namespace,会进行合并,因为这里的结构是map而不是数组。
至此,API注册流程就分析完了,接下来分析API调用流程。
2. API调用流程
先试用一下API调用,有一个直观的印象:
curl -H "Content-Type: application/json" -X POST --data '{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["0x1b4", true],"id":1}' localhost:8545
返回结果:
请求数据遵循JSON RPC规范:http://www.jsonrpc.org/specification
具体来说,请求对象需要包括下面4个字段:
- jsonrpc:协议版本号,固定是2.0
- method:请求调用的函数名,可以看到是Namespace_Method这种命名方式
- params: 函数参数列表,一般是一个数组
- id:客户端和服务器之前通信的一个标识,服务器返回响应时必须返回相同的id。可以是数字或者字符串,不建议设为NULL
相应的,返回的响应需要包含以下字段:
- jsonrpc:协议版本号,固定是2.0
- result/error:返回的结果或者错误,二选一
- id:客户端和服务器之前通信的一个标识,服务器返回响应时必须返回相同的id。可以是数字或者字符串,不建议设为NULL
API调用相关的结构和流程参见下图:
下面开始代码分析。还是回到rpc/endpoints.go中的startHTTPEndpoint()函数,主要看后半段:
func StartHTTPEndpoint(endpoint string, apis []API, modules []string, cors []string, vhosts []string) (net.Listener, *Server, error) { …… var ( listener net.Listener err error ) if listener, err = net.Listen("tcp", endpoint); err != nil { return nil, nil, err } go NewHTTPServer(cors, vhosts, handler).Serve(listener) return listener, handler, err }
首先侦听TCP端口,获得listener接口实例。然后创建了一个http.Server实例,并启动一个goroutine调用它的Serve()方法。看一下NewHTTPServer()函数(rpc/http.go):
func NewHTTPServer(cors []string, vhosts []string, srv *Server) *http.Server { // Wrap the CORS-handler within a host-handler handler := newCorsHandler(srv, cors) handler = newVHostHandler(vhosts, handler) return &http.Server{Handler: handler} }
这里有一个Handler参数,用到了装饰者模式,其实最终实现还是在rpc.Server中。Handler是一个接口,需要实现它的ServerHTTP()函数来处理网络数据,代码如下:
func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { …… body := io.LimitReader(r.Body, maxRequestContentLength) codec := NewJSONCodec(&httpReadWriteNopCloser{body, w}) defer codec.Close() w.Header().Set("content-type", contentType) srv.ServeSingleRequest(ctx, codec, OptionMethodInvocation) }
可以看到,首先创建一个Reader用于读取原始数据,然后创建一个JSON的编解码器,最后调用ServeSingleRequest()函数:
func (s *Server) ServeSingleRequest(ctx context.Context, codec ServerCodec, options CodecOption) { s.serveRequest(ctx, codec, true, options) } func (s *Server) serveRequest(ctx context.Context, codec ServerCodec, singleShot bool, options CodecOption) error { …… s.codecs.Add(codec) …… for atomic.LoadInt32(&s.run) == 1 { reqs, batch, err := s.readRequest(codec) …… // If a single shot request is executing, run and return immediately if singleShot { if batch { s.execBatch(ctx, codec, reqs) } else { s.exec(ctx, codec, reqs[0]) } return nil } // For multi-shot connections, start a goroutine to serve and loop back pend.Add(1) go func(reqs []*serverRequest, batch bool) { defer pend.Done() if batch { s.execBatch(ctx, codec, reqs) } else { s.exec(ctx, codec, reqs[0]) } }(reqs, batch) } return nil }
可以看到,就是一个循环,每次调用readRequest()解析请求数据,然后调用exec()或者execBatch()执行API调用。
首先看一下readRequest()函数:
func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) { reqs, batch, err := codec.ReadRequestHeaders() …… requests := make([]*serverRequest, len(reqs)) for i, r := range reqs { var ok bool var svc *service …… if svc, ok = s.services[r.service]; !ok { // rpc method isn't available requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}} continue } …… if callb, ok := svc.callbacks[r.method]; ok { // lookup RPC method requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb} if r.params != nil && len(callb.argTypes) > 0 { if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil { requests[i].args = args } else { requests[i].err = &invalidParamsError{err.Error()} } } continue } requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}} } return requests, batch, nil }
这里就对应于图上的两个红色箭头部分:首先codec把原始JSON数据解析为一个rpcRequest数组,然后遍历这个数组,根据Namespace找到对应的service,再从service的callbacks表中查询需要调用的method,最后组装成一个新的数据结构serverRequest。
接着就是调用exec()执行这个severRequest指向的API实现了:
func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) { var response interface{} var callback func() if req.err != nil { response = codec.CreateErrorResponse(&req.id, req.err) } else { response, callback = s.handle(ctx, codec, req) } if err := codec.Write(response); err != nil { log.Error(fmt.Sprintf("%v\n", err)) codec.Close() } // when request was a subscribe request this allows these subscriptions to be actived if callback != nil { callback() } }
可以看到调用了handle()方法获取响应数据,然后通过codec组装成JSON发送给请求端。
看一下handle()函数:
func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) (interface{}, func()) { …… arguments := []reflect.Value{req.callb.rcvr} if req.callb.hasCtx { arguments = append(arguments, reflect.ValueOf(ctx)) } if len(req.args) > 0 { arguments = append(arguments, req.args...) } // execute RPC method and return result reply := req.callb.method.Func.Call(arguments) if len(reply) == 0 { return codec.CreateResponse(req.id, nil), nil } …… return codec.CreateResponse(req.id, reply[0].Interface()), nil }
首先处理参数列表,如果发现调用的函数需要Context参数则加到最前面。然后就是通过反射调用API了,最后把结果送给codec,按JSON RPC的格式要求组装成响应返回就可以了。
这里提到一个Context类型,主要是用来存储一些和请求相关的信息,初始化代码位于rpc/http.go:
ctx := context.Background() ctx = context.WithValue(ctx, "remote", r.RemoteAddr) ctx = context.WithValue(ctx, "scheme", r.Proto) ctx = context.WithValue(ctx, "local", r.Host)
具体实现比较有意思,类似于一个单链表:
至此,API调用的整个流程就分析完了。
下一章:以太坊源码分析 账户管理
以太坊源码分析 账户管理:本文分析以太坊的账户管理的源码,主要包括两个部分: 获取钱包列表、订阅钱包事件。1. 获取钱包列表账户组件之间的关系:从图中可以看出wallet、account、address这三者的区别和联系i:wal ...