Skip to content
Snippets Groups Projects
Select Git revision
  • ad8a392e9ca7dd9c6609f017e8fb3a947dc20f48
  • master default protected
  • 3.0
  • develop
  • revert-2069-tripleVersion
  • 3.1
  • rest-protocol
  • feat/remoting_rocketmq
  • dapr-support
  • 1.5
  • 1.4
  • 1.3
  • 1.2
  • 1.1
  • v3.0.3-rc2
  • v3.0.3-rc1
  • v3.0.2
  • v1.5.8
  • v1.5.9-rc1
  • v3.0.1
  • v1.5.8-rc1
  • v3.0.0
  • v3.0.0-rc4-1
  • v3.0.0-rc4
  • v3.0.0-rc3
  • v1.5.7
  • v1.5.7-rc2
  • v3.0.0-rc2
  • remove
  • v1.5.7-rc1
  • v3.0.0-rc1
  • v1.5.7-rc1-tmp
  • 1.5.6
  • v1.5.6
34 results

map.go

Blame
  • map.go 7.40 KiB
    package jsonrpc
    
    import (
    	"bytes"
    	"context"
    	"io"
    	"io/ioutil"
    	"net"
    	"net/http"
    	"reflect"
    	"strings"
    	"sync"
    	"unicode"
    	"unicode/utf8"
    )
    
    import (
    	log "github.com/AlexStocks/log4go"
    	jerrors "github.com/juju/errors"
    )
    
    var (
    	// A value sent as a placeholder for the server's response value when the server
    	// receives an invalid request. It is never decoded by the client since the Response
    	// contains an error when it is used.
    	invalidRequest = struct{}{}
    
    	// Precompute the reflect type for error. Can't use error directly
    	// because Typeof takes an empty interface value. This is annoying.
    	typeOfError = reflect.TypeOf((*error)(nil)).Elem()
    )
    
    type serviceMethod struct {
    	method    reflect.Method // receiver method
    	ctxType   reflect.Type   // type of the request context
    	argsType  reflect.Type   // type of the request argument
    	replyType reflect.Type   // type of the response argument
    }
    
    func (m *serviceMethod) suiteContext(ctx context.Context) reflect.Value {
    	if contextv := reflect.ValueOf(ctx); contextv.IsValid() {
    		return contextv
    	}
    	return reflect.Zero(m.ctxType)
    }
    
    type svc struct {
    	name     string                    // name of service
    	rcvr     reflect.Value             // receiver of methods for the service
    	rcvrType reflect.Type              // type of the receiver
    	methods  map[string]*serviceMethod // registered methods, function name -> reflect.function
    }
    
    type serviceMap struct {
    	mutex      sync.Mutex      // protects the serviceMap
    	serviceMap map[string]*svc // service name -> service
    }
    
    func initServer() *serviceMap {
    	return &serviceMap{
    		serviceMap: make(map[string]*svc),
    	}
    }
    
    // isExported returns true of a string is an exported (upper case) name.
    func isExported(name string) bool {
    	rune, _ := utf8.DecodeRuneInString(name)
    	return unicode.IsUpper(rune)
    }
    
    // isExportedOrBuiltin returns true if a type is exported or a builtin.
    func isExportedOrBuiltin(t reflect.Type) bool {
    	for t.Kind() == reflect.Ptr {
    		t = t.Elem()
    	}
    	// PkgPath will be non-empty even for an exported type,
    	// so we need to check the type name as well.
    	return isExported(t.Name()) || t.PkgPath() == ""
    }
    
    func suiteMethod(method reflect.Method) *serviceMethod {
    	mtype := method.Type
    	mname := method.Name
    
    	// Method must be exported.
    	if method.PkgPath != "" {
    		return nil
    	}
    
    	var replyType, argType, ctxType reflect.Type
    	switch mtype.NumIn() {
    	case 3:
    		argType = mtype.In(1)
    		replyType = mtype.In(2)
    	case 4:
    		ctxType = mtype.In(1)
    		argType = mtype.In(2)
    		replyType = mtype.In(3)
    	default:
    		log.Error("method %s of mtype %v has wrong number of in parameters %d; needs exactly 3/4",
    			mname, mtype, mtype.NumIn())
    		return nil
    	}
    	// First arg need not be a pointer.
    	if !isExportedOrBuiltin(argType) {
    		log.Error("argument type of method %q is not exported %v", mname, argType)
    		return nil
    	}
    	// Second arg must be a pointer.
    	if replyType.Kind() != reflect.Ptr {
    		log.Error("reply type of method %q is not a pointer %v", mname, replyType)
    		return nil
    	}
    	// Reply type must be exported.
    	if !isExportedOrBuiltin(replyType) {
    		log.Error("reply type of method %s not exported{%v}", mname, replyType)
    		return nil
    	}
    	// Method needs one out.
    	if mtype.NumOut() != 1 {
    		log.Error("method %q has %d out parameters; needs exactly 1", mname, mtype.NumOut())
    		return nil
    	}
    	// The return type of the method must be error.
    	if returnType := mtype.Out(0); returnType != typeOfError {
    		log.Error("return type %s of method %q is not error", returnType, mname)
    		return nil
    	}
    
    	return &serviceMethod{method: method, argsType: argType, replyType: replyType, ctxType: ctxType}
    }
    
    func (server *serviceMap) register(rcvr Handler) (string, error) {
    	server.mutex.Lock()
    	defer server.mutex.Unlock()
    	if server.serviceMap == nil {
    		server.serviceMap = make(map[string]*svc)
    	}
    
    	s := new(svc)
    	s.rcvrType = reflect.TypeOf(rcvr)
    	s.rcvr = reflect.ValueOf(rcvr)
    	sname := reflect.Indirect(s.rcvr).Type().Name()
    	if sname == "" {
    		s := "no service name for type " + s.rcvrType.String()
    		log.Error(s)
    		return "", jerrors.New(s)
    	}
    	if !isExported(sname) {
    		s := "type " + sname + " is not exported"
    		log.Error(s)
    		return "", jerrors.New(s)
    	}
    
    	sname = rcvr.Service()
    	if _, dup := server.serviceMap[sname]; dup {
    		return "", jerrors.New("service already defined: " + sname)
    	}
    	s.name = sname
    	s.methods = make(map[string]*serviceMethod)
    
    	// Install the methods
    	methods := ""
    	num := s.rcvrType.NumMethod()
    	for m := 0; m < num; m++ {
    		method := s.rcvrType.Method(m)
    		if mt := suiteMethod(method); mt != nil {
    			s.methods[method.Name] = mt
    			methods += method.Name + ","
    		}
    	}
    
    	if len(s.methods) == 0 {
    		s := "type " + sname + " has no exported methods of suitable type"
    		log.Error(s)
    		return "", jerrors.New(s)
    	}
    	server.serviceMap[s.name] = s
    
    	return strings.TrimSuffix(methods, ","), nil
    }
    
    func (server *serviceMap) serveRequest(ctx context.Context,
    	header map[string]string, body []byte, conn net.Conn) error {
    
    	// read request header
    	codec := newServerCodec()
    	err := codec.ReadHeader(header, body)
    	if err != nil {
    		if err == io.EOF || err == io.ErrUnexpectedEOF {
    			return jerrors.Trace(err)
    		}
    
    		return jerrors.New("server cannot decode request: " + err.Error())
    	}
    	serviceName := header["Path"]
    	methodName := codec.req.Method
    	if len(serviceName) == 0 || len(methodName) == 0 {
    		codec.ReadBody(nil)
    		return jerrors.New("service/method request ill-formed: " + serviceName + "/" + methodName)
    	}
    
    	// get method
    	server.mutex.Lock()
    	svc := server.serviceMap[serviceName]
    	server.mutex.Unlock()
    	if svc == nil {
    		codec.ReadBody(nil)
    		return jerrors.New("cannot find svc " + serviceName)
    	}
    	mtype := svc.methods[methodName]
    	if mtype == nil {
    		codec.ReadBody(nil)
    		return jerrors.New("cannot find method " + methodName + " of svc " + serviceName)
    	}
    
    	// get args
    	var argv reflect.Value
    	argIsValue := false
    	if mtype.argsType.Kind() == reflect.Ptr {
    		argv = reflect.New(mtype.argsType.Elem())
    	} else {
    		argv = reflect.New(mtype.argsType)
    		argIsValue = true
    	}
    	// argv guaranteed to be a pointer now.
    	if err = codec.ReadBody(argv.Interface()); err != nil {
    		return jerrors.Trace(err)
    	}
    	if argIsValue {
    		argv = argv.Elem()
    	}
    
    	replyv := reflect.New(mtype.replyType.Elem())
    
    	//  call service.method(args)
    	var errMsg string
    	returnValues := mtype.method.Func.Call([]reflect.Value{
    		svc.rcvr,
    		mtype.suiteContext(ctx),
    		reflect.ValueOf(argv.Interface()),
    		reflect.ValueOf(replyv.Interface()),
    	})
    	// The return value for the method is an error.
    	if retErr := returnValues[0].Interface(); retErr != nil {
    		errMsg = retErr.(error).Error()
    	}
    
    	// write response
    	code := 200
    	rspReply := replyv.Interface()
    	if len(errMsg) != 0 {
    		code = 500
    		rspReply = invalidRequest
    	}
    	rspStream, err := codec.Write(errMsg, rspReply)
    	if err != nil {
    		return jerrors.Trace(err)
    	}
    	rsp := &http.Response{
    		StatusCode:    code,
    		ProtoMajor:    1,
    		ProtoMinor:    1,
    		Header:        make(http.Header),
    		ContentLength: int64(len(rspStream)),
    		Body:          ioutil.NopCloser(bytes.NewReader(rspStream)),
    	}
    	delete(header, "Content-Type")
    	delete(header, "Content-Length")
    	delete(header, "Timeout")
    	for k, v := range header {
    		rsp.Header.Set(k, v)
    	}
    
    	rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize))
    	rspBuf.Reset()
    	if err = rsp.Write(rspBuf); err != nil {
    		log.Warn("rsp.Write(rsp:%#v) = error:%s", rsp, err)
    		return nil
    	}
    	if _, err = rspBuf.WriteTo(conn); err != nil {
    		log.Warn("rspBuf.WriteTo(conn:%#v) = error:%s", conn, err)
    	}
    	return nil
    }