diff --git a/common/constant/key.go b/common/constant/key.go index 3b09d708c4a48cb9f1714a18ca5cb7cba2fdce02..906a08b724cb8adec641fc57f3f4c9c600efa1cd 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -3,3 +3,16 @@ package constant const ( ASYNC_KEY = "async" // it's value should be "true" or "false" of string type ) + +const ( + GROUP_KEY = "group" + VERSION_KEY = "Version" + INTERFACE_KEY = "interface" + PATH_KEY = "path" + SERVICE_KEY = "service" +) + +const ( + SERVICE_FILTER_KEY = "service.filter" + REFERENCE_FILTER_KEY = "reference.filter" +) diff --git a/common/extension/filter.go b/common/extension/filter.go index 48cb91f5a836a014beb9f1e78f3bca548f0aafa8..fcab82bd40e088158ed7794b3c9f8d3fb4324206 100644 --- a/common/extension/filter.go +++ b/common/extension/filter.go @@ -1 +1,21 @@ package extension + +import ( + "github.com/dubbo/dubbo-go/filter" +) + +var ( + filters map[string]func() filter.Filter +) + +func init() { + filters = make(map[string]func() filter.Filter) +} + +func SetFilter(name string, v func() filter.Filter) { + filters[name] = v +} + +func GetFilterExtension(name string) filter.Filter { + return filters[name]() +} diff --git a/config/rpc_service.go b/config/rpc_service.go index 2fef5b88c7097839c27082e11bf23fe77d9ac99d..3d1e0482fb95f00ff7963f5e2fe5e2e449c8d6bf 100644 --- a/config/rpc_service.go +++ b/config/rpc_service.go @@ -1,7 +1,9 @@ package config import ( + "context" "reflect" + "strings" "sync" "unicode" "unicode/utf8" @@ -9,6 +11,7 @@ import ( import ( log "github.com/AlexStocks/log4go" + jerrors "github.com/juju/errors" ) // rpc service interface @@ -18,11 +21,6 @@ type RPCService interface { } var ( - // A value sent as a placeholder for the server's response value when the server - // receives an invalid request. It is never decoded by the client since the Response - // contains an error when it is used. - invalidRequest = struct{}{} - // Precompute the reflect type for error. Can't use error directly // because Typeof takes an empty interface value. This is annoying. typeOfError = reflect.TypeOf((*error)(nil)).Elem() @@ -32,7 +30,10 @@ var ( } ) +////////////////////////// // info of method +////////////////////////// + type MethodType struct { method reflect.Method ctxType reflect.Type // type of the request context @@ -52,17 +53,26 @@ func (m *MethodType) ArgType() reflect.Type { func (m *MethodType) ReplyType() reflect.Type { return m.replyType } +func (m *MethodType) SuiteContext(ctx context.Context) reflect.Value { + if contextv := reflect.ValueOf(ctx); contextv.IsValid() { + return contextv + } + return reflect.Zero(m.ctxType) +} +////////////////////////// // info of service interface +////////////////////////// + type Service struct { name string rcvr reflect.Value rcvrType reflect.Type - method map[string]*MethodType + methods map[string]*MethodType } func (s *Service) Method() map[string]*MethodType { - return s.method + return s.methods } func (s *Service) RcvrType() reflect.Type { return s.rcvrType @@ -71,13 +81,66 @@ func (s *Service) Rcvr() reflect.Value { return s.rcvr } +////////////////////////// +// serviceMap +// todo: use sync.Map? +////////////////////////// + type serviceMap struct { - mutex sync.Mutex // protects the serviceMap + mutex sync.RWMutex // protects the serviceMap serviceMap map[string]*Service // service name -> service } func (sm *serviceMap) GetService(name string) *Service { - return sm.serviceMap[name] + sm.mutex.RLock() + defer sm.mutex.RUnlock() + if s, ok := sm.serviceMap[name]; ok { + return s + } + return nil +} + +func (sm *serviceMap) Register(rcvr RPCService) (string, error) { + sm.mutex.Lock() + defer sm.mutex.Unlock() + if sm.serviceMap == nil { + sm.serviceMap = make(map[string]*Service) + } + + s := new(Service) + s.rcvrType = reflect.TypeOf(rcvr) + s.rcvr = reflect.ValueOf(rcvr) + sname := reflect.Indirect(s.rcvr).Type().Name() + if sname == "" { + s := "no service name for type " + s.rcvrType.String() + log.Error(s) + return "", jerrors.New(s) + } + if !isExported(sname) { + s := "type " + sname + " is not exported" + log.Error(s) + return "", jerrors.New(s) + } + + sname = rcvr.Service() + if server := sm.GetService(sname); server == nil { + return "", jerrors.New("service already defined: " + sname) + } + s.name = sname + s.methods = make(map[string]*MethodType) + + // Install the methods + methods := "" + methods, s.methods = suitableMethods(s.rcvrType) + + if len(s.methods) == 0 { + s := "type " + sname + " has no exported methods of suitable type" + log.Error(s) + return "", jerrors.New(s) + } + sm.serviceMap[s.name] = s + + return strings.TrimSuffix(methods, ","), nil } // Is this an exported - upper case - name diff --git a/filter/.gitkeep b/filter/.gitkeep deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/filter/filter.go b/filter/filter.go new file mode 100644 index 0000000000000000000000000000000000000000..558d01c990f16fde57b2c93f0901af1534d8460c --- /dev/null +++ b/filter/filter.go @@ -0,0 +1,11 @@ +package filter + +import ( + "github.com/dubbo/dubbo-go/protocol" +) + +// Extension - Protocol +type Filter interface { + Invoke(protocol.Invoker, protocol.Invocation) protocol.Result + OnResponse(protocol.Result, protocol.Invoker, protocol.Invocation) protocol.Result +} diff --git a/jsonrpc/map.go b/jsonrpc/map.go deleted file mode 100644 index 43c083330d082a6e75d3501663fc81bbac6b26ec..0000000000000000000000000000000000000000 --- a/jsonrpc/map.go +++ /dev/null @@ -1,285 +0,0 @@ -package jsonrpc - -import ( - "bytes" - "context" - "io" - "io/ioutil" - "net" - "net/http" - "reflect" - "strings" - "sync" - "unicode" - "unicode/utf8" -) - -import ( - log "github.com/AlexStocks/log4go" - jerrors "github.com/juju/errors" -) - -var ( - // A value sent as a placeholder for the server's response value when the server - // receives an invalid request. It is never decoded by the client since the Response - // contains an error when it is used. - invalidRequest = struct{}{} - - // Precompute the reflect type for error. Can't use error directly - // because Typeof takes an empty interface value. This is annoying. - typeOfError = reflect.TypeOf((*error)(nil)).Elem() -) - -type serviceMethod struct { - method reflect.Method // receiver method - ctxType reflect.Type // type of the request context - argsType reflect.Type // type of the request argument - replyType reflect.Type // type of the response argument -} - -func (m *serviceMethod) suiteContext(ctx context.Context) reflect.Value { - if contextv := reflect.ValueOf(ctx); contextv.IsValid() { - return contextv - } - return reflect.Zero(m.ctxType) -} - -type svc struct { - name string // name of service - rcvr reflect.Value // receiver of methods for the service - rcvrType reflect.Type // type of the receiver - methods map[string]*serviceMethod // registered methods, function name -> reflect.function -} - -type serviceMap struct { - mutex sync.Mutex // protects the serviceMap - serviceMap map[string]*svc // service name -> service -} - -func initServer() *serviceMap { - return &serviceMap{ - serviceMap: make(map[string]*svc), - } -} - -// isExported returns true of a string is an exported (upper case) name. -func isExported(name string) bool { - rune, _ := utf8.DecodeRuneInString(name) - return unicode.IsUpper(rune) -} - -// isExportedOrBuiltin returns true if a type is exported or a builtin. -func isExportedOrBuiltin(t reflect.Type) bool { - for t.Kind() == reflect.Ptr { - t = t.Elem() - } - // PkgPath will be non-empty even for an exported type, - // so we need to check the type name as well. - return isExported(t.Name()) || t.PkgPath() == "" -} - -func suiteMethod(method reflect.Method) *serviceMethod { - mtype := method.Type - mname := method.Name - - // Method must be exported. - if method.PkgPath != "" { - return nil - } - - var replyType, argType, ctxType reflect.Type - switch mtype.NumIn() { - case 3: - argType = mtype.In(1) - replyType = mtype.In(2) - case 4: - ctxType = mtype.In(1) - argType = mtype.In(2) - replyType = mtype.In(3) - default: - log.Error("method %s of mtype %v has wrong number of in parameters %d; needs exactly 3/4", - mname, mtype, mtype.NumIn()) - return nil - } - // First arg need not be a pointer. - if !isExportedOrBuiltin(argType) { - log.Error("argument type of method %q is not exported %v", mname, argType) - return nil - } - // Second arg must be a pointer. - if replyType.Kind() != reflect.Ptr { - log.Error("reply type of method %q is not a pointer %v", mname, replyType) - return nil - } - // Reply type must be exported. - if !isExportedOrBuiltin(replyType) { - log.Error("reply type of method %s not exported{%v}", mname, replyType) - return nil - } - // Method needs one out. - if mtype.NumOut() != 1 { - log.Error("method %q has %d out parameters; needs exactly 1", mname, mtype.NumOut()) - return nil - } - // The return type of the method must be error. - if returnType := mtype.Out(0); returnType != typeOfError { - log.Error("return type %s of method %q is not error", returnType, mname) - return nil - } - - return &serviceMethod{method: method, argsType: argType, replyType: replyType, ctxType: ctxType} -} - -func (server *serviceMap) register(rcvr Handler) (string, error) { - server.mutex.Lock() - defer server.mutex.Unlock() - if server.serviceMap == nil { - server.serviceMap = make(map[string]*svc) - } - - s := new(svc) - s.rcvrType = reflect.TypeOf(rcvr) - s.rcvr = reflect.ValueOf(rcvr) - sname := reflect.Indirect(s.rcvr).Type().Name() - if sname == "" { - s := "no service name for type " + s.rcvrType.String() - log.Error(s) - return "", jerrors.New(s) - } - if !isExported(sname) { - s := "type " + sname + " is not exported" - log.Error(s) - return "", jerrors.New(s) - } - - sname = rcvr.Service() - if _, dup := server.serviceMap[sname]; dup { - return "", jerrors.New("service already defined: " + sname) - } - s.name = sname - s.methods = make(map[string]*serviceMethod) - - // Install the methods - methods := "" - num := s.rcvrType.NumMethod() - for m := 0; m < num; m++ { - method := s.rcvrType.Method(m) - if mt := suiteMethod(method); mt != nil { - s.methods[method.Name] = mt - methods += method.Name + "," - } - } - - if len(s.methods) == 0 { - s := "type " + sname + " has no exported methods of suitable type" - log.Error(s) - return "", jerrors.New(s) - } - server.serviceMap[s.name] = s - - return strings.TrimSuffix(methods, ","), nil -} - -func (server *serviceMap) serveRequest(ctx context.Context, - header map[string]string, body []byte, conn net.Conn) error { - - // read request header - codec := newServerCodec() - err := codec.ReadHeader(header, body) - if err != nil { - if err == io.EOF || err == io.ErrUnexpectedEOF { - return jerrors.Trace(err) - } - - return jerrors.New("server cannot decode request: " + err.Error()) - } - serviceName := header["Path"] - methodName := codec.req.Method - if len(serviceName) == 0 || len(methodName) == 0 { - codec.ReadBody(nil) - return jerrors.New("service/method request ill-formed: " + serviceName + "/" + methodName) - } - - // get method - server.mutex.Lock() - svc := server.serviceMap[serviceName] - server.mutex.Unlock() - if svc == nil { - codec.ReadBody(nil) - return jerrors.New("cannot find svc " + serviceName) - } - mtype := svc.methods[methodName] - if mtype == nil { - codec.ReadBody(nil) - return jerrors.New("cannot find method " + methodName + " of svc " + serviceName) - } - - // get args - var argv reflect.Value - argIsValue := false - if mtype.argsType.Kind() == reflect.Ptr { - argv = reflect.New(mtype.argsType.Elem()) - } else { - argv = reflect.New(mtype.argsType) - argIsValue = true - } - // argv guaranteed to be a pointer now. - if err = codec.ReadBody(argv.Interface()); err != nil { - return jerrors.Trace(err) - } - if argIsValue { - argv = argv.Elem() - } - - replyv := reflect.New(mtype.replyType.Elem()) - - // call service.method(args) - var errMsg string - returnValues := mtype.method.Func.Call([]reflect.Value{ - svc.rcvr, - mtype.suiteContext(ctx), - reflect.ValueOf(argv.Interface()), - reflect.ValueOf(replyv.Interface()), - }) - // The return value for the method is an error. - if retErr := returnValues[0].Interface(); retErr != nil { - errMsg = retErr.(error).Error() - } - - // write response - code := 200 - rspReply := replyv.Interface() - if len(errMsg) != 0 { - code = 500 - rspReply = invalidRequest - } - rspStream, err := codec.Write(errMsg, rspReply) - if err != nil { - return jerrors.Trace(err) - } - rsp := &http.Response{ - StatusCode: code, - ProtoMajor: 1, - ProtoMinor: 1, - Header: make(http.Header), - ContentLength: int64(len(rspStream)), - Body: ioutil.NopCloser(bytes.NewReader(rspStream)), - } - delete(header, "Content-Type") - delete(header, "Content-Length") - delete(header, "Timeout") - for k, v := range header { - rsp.Header.Set(k, v) - } - - rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize)) - rspBuf.Reset() - if err = rsp.Write(rspBuf); err != nil { - log.Warn("rsp.Write(rsp:%#v) = error:%s", rsp, err) - return nil - } - if _, err = rspBuf.WriteTo(conn); err != nil { - log.Warn("rspBuf.WriteTo(conn:%#v) = error:%s", conn, err) - } - return nil -} diff --git a/jsonrpc/server.go b/jsonrpc/server.go deleted file mode 100644 index d8604d4b17895b25da3383e0deea86c1fa692430..0000000000000000000000000000000000000000 --- a/jsonrpc/server.go +++ /dev/null @@ -1,383 +0,0 @@ -package jsonrpc - -import ( - "bufio" - "bytes" - "context" - "github.com/dubbo/dubbo-go/plugins" - "io/ioutil" - "net" - "net/http" - "runtime" - "runtime/debug" - "sync" - "time" -) - -import ( - "github.com/AlexStocks/goext/net" - log "github.com/AlexStocks/log4go" - "github.com/dubbo/dubbo-go/registry" - jerrors "github.com/juju/errors" -) - -import ( - "github.com/dubbo/dubbo-go/server" -) - -const ( - DefaultMaxSleepTime = 1 * time.Second // accept涓棿鏈€澶leep interval - DefaultHTTPRspBufferSize = 1024 - PathPrefix = byte('/') -) - -// Handler interface represents a Service request handler. It's generated -// by passing any type of public concrete object with methods into server.NewHandler. -// Most will pass in a struct. -// -// Example: -// -// type Hello struct {} -// -// func (s *Hello) Method(context, request, response) error { -// return nil -// } -// -// func (s *Hello) Service() string { -// return "com.youni.service" -// } -// -// func (s *Hello) Version() string { -// return "1.0.0" -// } - -type Handler interface { - Service() string // Service Interface - Version() string -} - -type Option func(*Options) - -type Options struct { - Registry registry.Registry - ConfList []server.ServerConfig - ServiceConfList []registry.ProviderServiceConfig - Timeout time.Duration -} - -func newOptions(opt ...Option) Options { - opts := Options{} - for _, o := range opt { - o(&opts) - } - - if opts.Registry == nil { - panic("server.Options.Registry is nil") - } - - return opts -} - -// Registry used for discovery -func Registry(r registry.Registry) Option { - return func(o *Options) { - o.Registry = r - } -} - -func ConfList(confList []server.ServerConfig) Option { - return func(o *Options) { - o.ConfList = confList - for i := 0; i < len(o.ConfList); i++ { - if o.ConfList[i].IP == "" { - o.ConfList[i].IP, _ = gxnet.GetLocalIP() - } - } - } -} - -func ServiceConfList(confList []registry.ProviderServiceConfig) Option { - return func(o *Options) { - o.ServiceConfList = confList - } -} - -type Server struct { - rpc []*serviceMap - done chan struct{} - once sync.Once - - sync.RWMutex - opts Options - handlers map[string]Handler - wg sync.WaitGroup -} - -func NewServer(opts ...Option) *Server { - var ( - num int - ) - options := newOptions(opts...) - Servers := make([]*serviceMap, len(options.ConfList)) - num = len(options.ConfList) - for i := 0; i < num; i++ { - Servers[i] = initServer() - } - return &Server{ - opts: options, - rpc: Servers, - handlers: make(map[string]Handler), - done: make(chan struct{}), - } -} - -func (s *Server) handlePkg(rpc *serviceMap, conn net.Conn) { - defer func() { - if r := recover(); r != nil { - log.Warn("connection{local:%v, remote:%v} panic error:%#v, debug stack:%s", - conn.LocalAddr(), conn.RemoteAddr(), r, string(debug.Stack())) - } - - conn.Close() - }() - - setReadTimeout := func(conn net.Conn, timeout time.Duration) { - t := time.Time{} - if timeout > time.Duration(0) { - t = time.Now().Add(timeout) - } - - conn.SetDeadline(t) - } - - sendErrorResp := func(header http.Header, body []byte) error { - rsp := &http.Response{ - Header: header, - StatusCode: 500, - ContentLength: int64(len(body)), - Body: ioutil.NopCloser(bytes.NewReader(body)), - } - rsp.Header.Del("Content-Type") - rsp.Header.Del("Content-Length") - rsp.Header.Del("Timeout") - - rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize)) - rspBuf.Reset() - err := rsp.Write(rspBuf) - if err != nil { - return jerrors.Trace(err) - } - _, err = rspBuf.WriteTo(conn) - return jerrors.Trace(err) - } - - for { - bufReader := bufio.NewReader(conn) - r, err := http.ReadRequest(bufReader) - if err != nil { - return - } - - reqBody, err := ioutil.ReadAll(r.Body) - if err != nil { - return - } - r.Body.Close() - - reqHeader := make(map[string]string) - for k := range r.Header { - reqHeader[k] = r.Header.Get(k) - } - reqHeader["Path"] = r.URL.Path[1:] // to get service name - if r.URL.Path[0] != PathPrefix { - reqHeader["Path"] = r.URL.Path - } - reqHeader["HttpMethod"] = r.Method - - httpTimeout := s.Options().Timeout - contentType := reqHeader["Content-Type"] - if contentType != "application/json" && contentType != "application/json-rpc" { - setReadTimeout(conn, httpTimeout) - r.Header.Set("Content-Type", "text/plain") - if errRsp := sendErrorResp(r.Header, []byte(jerrors.ErrorStack(err))); errRsp != nil { - log.Warn("sendErrorResp(header:%#v, error:%s) = error:%s", - r.Header, jerrors.ErrorStack(err), errRsp) - } - return - } - - ctx := context.Background() - if len(reqHeader["Timeout"]) > 0 { - timeout, err := time.ParseDuration(reqHeader["Timeout"]) - if err == nil { - httpTimeout = timeout - ctx, _ = context.WithTimeout(ctx, httpTimeout) - } - delete(reqHeader, "Timeout") - } - setReadTimeout(conn, httpTimeout) - - if err := rpc.serveRequest(ctx, reqHeader, reqBody, conn); err != nil { - if errRsp := sendErrorResp(r.Header, []byte(jerrors.ErrorStack(err))); errRsp != nil { - log.Warn("sendErrorResp(header:%#v, error:%s) = error:%s", - r.Header, jerrors.ErrorStack(err), errRsp) - } - - log.Info("Unexpected error serving request, closing socket: %v", err) - return - } - } -} - -func (s *Server) Options() Options { - s.RLock() - defer s.RUnlock() - return s.opts -} - -func (s *Server) Handle(h Handler) error { - var ( - err error - ) - - opts := s.Options() - serviceConf := plugins.DefaultProviderServiceConfig()() - - serviceConf.SetService(h.Service()) - serviceConf.SetVersion(h.Version()) - - flag := 0 - serviceNum := len(opts.ServiceConfList) - ServerNum := len(opts.ConfList) - for i := 0; i < serviceNum; i++ { - if opts.ServiceConfList[i].Service() == serviceConf.Service() && - opts.ServiceConfList[i].Version() == serviceConf.Version() { - - serviceConf.SetProtocol(opts.ServiceConfList[i].Protocol()) - serviceConf.SetGroup(opts.ServiceConfList[i].Group()) - // serviceConf.Version = opts.ServiceConfList[i].Version - var methods, path string - for j := 0; j < ServerNum; j++ { - if opts.ConfList[j].Protocol == serviceConf.Protocol() { - s.Lock() - methods, err = s.rpc[j].register(h) - s.Unlock() - if err != nil { - return err - } - serviceConf.SetMethods(methods) - - path = opts.ConfList[j].Address() - serviceConf.SetPath(path) - err = opts.Registry.Register(serviceConf) - if err != nil { - return err - } - flag = 1 - } - } - } - } - - if flag == 0 { - return jerrors.Errorf("fail to register Handler{service:%s, version:%s}", - serviceConf.Service(), serviceConf.Version()) - } - - s.Lock() - s.handlers[h.Service()] = h - s.Unlock() - - return nil -} - -func accept(listener net.Listener, fn func(net.Conn)) error { - var ( - err error - c net.Conn - ok bool - ne net.Error - tmpDelay time.Duration - ) - - for { - c, err = listener.Accept() - if err != nil { - if ne, ok = err.(net.Error); ok && ne.Temporary() { - if tmpDelay != 0 { - tmpDelay <<= 1 - } else { - tmpDelay = 5 * time.Millisecond - } - if tmpDelay > DefaultMaxSleepTime { - tmpDelay = DefaultMaxSleepTime - } - log.Info("http: Accept error: %v; retrying in %v\n", err, tmpDelay) - time.Sleep(tmpDelay) - continue - } - return jerrors.Trace(err) - } - - go func() { - defer func() { - if r := recover(); r != nil { - const size = 64 << 10 - buf := make([]byte, size) - buf = buf[:runtime.Stack(buf, false)] - log.Error("http: panic serving %v: %v\n%s", c.RemoteAddr(), r, buf) - c.Close() - } - }() - - fn(c) - }() - } -} - -func (s *Server) Start() error { - config := s.Options() - - ServerNum := len(config.ConfList) - for i := 0; i < ServerNum; i++ { - listener, err := net.Listen("tcp", config.ConfList[i].Address()) - if err != nil { - return err - } - log.Info("rpc server start to listen on %s", listener.Addr()) - - s.Lock() - rpc := s.rpc[i] - s.Unlock() - - s.wg.Add(1) - go func(servo *serviceMap) { - accept(listener, func(conn net.Conn) { s.handlePkg(rpc, conn) }) - s.wg.Done() - }(rpc) - - s.wg.Add(1) - go func(servo *serviceMap) { // Server done goroutine - var err error - <-s.done // step1: block to wait for done channel(wait Server.Stop step2) - err = listener.Close() // step2: and then close listener - if err != nil { - log.Warn("listener{addr:%s}.Close() = error{%#v}", listener.Addr(), err) - } - s.wg.Done() - }(rpc) - } - - return nil -} - -func (s *Server) Stop() { - s.once.Do(func() { - close(s.done) - s.wg.Wait() - if s.opts.Registry != nil { - s.opts.Registry.Close() - s.opts.Registry = nil - } - }) -} diff --git a/protocol/dubbo/dubbo_exporter.go b/protocol/dubbo/dubbo_exporter.go index 4a122f8b4fa170394bfaf7c5dfcbfd98666f23f6..274fdd3016bc56943f2f1aec31d82cbb1687aa5d 100644 --- a/protocol/dubbo/dubbo_exporter.go +++ b/protocol/dubbo/dubbo_exporter.go @@ -1,41 +1,15 @@ package dubbo -import ( - "context" -) - -import ( - log "github.com/AlexStocks/log4go" -) - import ( "github.com/dubbo/dubbo-go/protocol" ) -// wrapping invoker type DubboExporter struct { - ctx context.Context - key string - invoker protocol.Invoker - exporterMap map[string]protocol.Exporter + protocol.BaseExporter } -func NewDubboExporter(ctx context.Context, key string, invoker protocol.Invoker, exporterMap map[string]protocol.Exporter) *DubboExporter { +func NewDubboExporter(key string, invoker protocol.Invoker, exporterMap map[string]protocol.Exporter) *DubboExporter { return &DubboExporter{ - ctx: ctx, - key: key, - invoker: invoker, - exporterMap: exporterMap, + BaseExporter: *protocol.NewBaseExporter(key, invoker, exporterMap), } } - -func (de *DubboExporter) GetInvoker() protocol.Invoker { - return de.invoker - -} - -func (de *DubboExporter) Unexport() { - log.Info("DubboExporter unexport.") - de.invoker.Destroy() - delete(de.exporterMap, de.key) -} diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index 434de69a33efd26aa86bca7857997a93e7aa31ff..2155af0ad377ce6f50593b27c786d552dfe4fd57 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -1,7 +1,6 @@ package dubbo import ( - "context" "errors" "strconv" "sync" @@ -18,24 +17,18 @@ import ( "github.com/dubbo/dubbo-go/protocol" ) -var Err_No_Reply = errors.New("no reply") +var Err_No_Reply = errors.New("request need @reply") type DubboInvoker struct { - ctx context.Context - url config.URL + protocol.BaseInvoker client *Client - available bool - destroyed bool destroyLock sync.Mutex } -func NewDubboInvoker(ctx context.Context, url config.URL, client *Client) *DubboInvoker { +func NewDubboInvoker(url config.IURL, client *Client) *DubboInvoker { return &DubboInvoker{ - ctx: ctx, - url: url, - client: client, - available: true, - destroyed: false, + BaseInvoker: protocol.NewBaseInvoker(url), + client: client, } } @@ -56,13 +49,13 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { } if async { if callBack, ok := inv.CallBack().(func(response CallResponse)); ok { - result.Err = di.client.AsyncCall(url.Location, *url, inv.MethodName(), inv.Reply(), callBack, + result.Err = di.client.AsyncCall(url.Location, *url, inv.MethodName(), inv.Arguments(), callBack, inv.Reply(), WithCallRequestTimeout(inv.Params()["requestTimeout"].(time.Duration)), WithCallResponseTimeout(inv.Params()["responseTimeout"].(time.Duration)), WithCallSerialID(inv.Params()["serialID"].(SerialID)), WithCallMeta_All(inv.Params()["callMeta"].(map[interface{}]interface{}))) } else { - result.Err = di.client.CallOneway(url.Location, *url, inv.MethodName(), + result.Err = di.client.CallOneway(url.Location, *url, inv.MethodName(), inv.Arguments(), WithCallRequestTimeout(inv.Params()["requestTimeout"].(time.Duration)), WithCallResponseTimeout(inv.Params()["responseTimeout"].(time.Duration)), WithCallSerialID(inv.Params()["serialID"].(SerialID)), @@ -72,7 +65,7 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { if inv.Reply() == nil { result.Err = Err_No_Reply } else { - result.Err = di.client.Call(url.Location, *url, inv.MethodName(), inv.Reply(), + result.Err = di.client.Call(url.Location, *url, inv.MethodName(), inv.Arguments(), inv.Reply(), WithCallRequestTimeout(inv.Params()["requestTimeout"].(time.Duration)), WithCallResponseTimeout(inv.Params()["responseTimeout"].(time.Duration)), WithCallSerialID(inv.Params()["serialID"].(SerialID)), @@ -81,26 +74,23 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { } } - return result -} - -func (di *DubboInvoker) GetUrl() config.IURL { - return &di.url -} - -func (di *DubboInvoker) IsAvailable() bool { - return di.available + return &result } func (di *DubboInvoker) Destroy() { - if di.destroyed { + if di.IsDestroyed() { return } di.destroyLock.Lock() defer di.destroyLock.Unlock() - di.destroyed = true - di.available = false + if di.IsDestroyed() { + return + } - di.client.Close() // close client + di.BaseInvoker.Destroy() + + if di.client != nil { + di.client.Close() // close client + } } diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go index 496c9185f624cf78efff538736ce7d7a813aa3ee..271d3727db130d9fca0256c8c7bf83c5510974fd 100644 --- a/protocol/dubbo/dubbo_protocol.go +++ b/protocol/dubbo/dubbo_protocol.go @@ -1,7 +1,6 @@ package dubbo import ( - "context" log "github.com/AlexStocks/log4go" ) @@ -20,20 +19,22 @@ func init() { var dubboProtocol *DubboProtocol type DubboProtocol struct { - ctx context.Context - exporterMap map[string]protocol.Exporter - invokers []protocol.Invoker + protocol.BaseProtocol + serverMap map[string]*Server } -func NewDubboProtocol(ctx context.Context) *DubboProtocol { - return &DubboProtocol{ctx: ctx, exporterMap: make(map[string]protocol.Exporter)} +func NewDubboProtocol() *DubboProtocol { + return &DubboProtocol{ + BaseProtocol: protocol.NewBaseProtocol(), + serverMap: make(map[string]*Server), + } } func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { url := invoker.GetUrl().(*config.URL) serviceKey := url.Key() - exporter := NewDubboExporter(nil, serviceKey, invoker, dp.exporterMap) - dp.exporterMap[serviceKey] = exporter + exporter := NewDubboExporter(serviceKey, invoker, dp.ExporterMap()) + dp.SetExporterMap(serviceKey, exporter) log.Info("Export service: ", url.String()) // start server @@ -41,19 +42,28 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { return exporter } -func (dp *DubboProtocol) Refer(url config.URL) protocol.Invoker { - invoker := NewDubboInvoker(nil, url, getClient()) - dp.invokers = append(dp.invokers, invoker) - log.Info("Refer service: ", url.String()) +func (dp *DubboProtocol) Refer(url config.IURL) protocol.Invoker { + invoker := NewDubboInvoker(url, NewClient()) + dp.SetInvokers(invoker) + log.Info("Refer service: ", url.(*config.URL).String()) return invoker } func (dp *DubboProtocol) Destroy() { log.Info("DubboProtocol destroy.") - srv.Stop() // stop server + + dp.BaseProtocol.Destroy() + + // stop server + for key, server := range dp.serverMap { + delete(dp.serverMap, key) + server.Stop() + } } func (dp *DubboProtocol) openServer(url config.URL) { + srv := NewServer(dp.ExporterMap()[url.Key()]) + dp.serverMap[url.Location] = srv srv.Start(url) } @@ -61,9 +71,5 @@ func GetProtocol() protocol.Protocol { if dubboProtocol != nil { return dubboProtocol } - return NewDubboProtocol(nil) -} - -func getClient() *Client { - return NewClient() + return NewDubboProtocol() } diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index 2328c5c15486126a22117a1ff8e022d4e40f9198..38a12e275f7011882b93a23001097c3524ea235b 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -10,11 +10,16 @@ import ( import ( "github.com/AlexStocks/getty" log "github.com/AlexStocks/log4go" - "github.com/dubbo/dubbo-go/config" "github.com/dubbogo/hessian2" jerrors "github.com/juju/errors" ) +import ( + "github.com/dubbo/dubbo-go/common/constant" + "github.com/dubbo/dubbo-go/config" + "github.com/dubbo/dubbo-go/protocol" +) + // todo: WritePkg_Timeout will entry *.yml const WritePkg_Timeout = 5 * time.Second @@ -107,14 +112,16 @@ func (h *RpcClientHandler) OnCron(session getty.Session) { //////////////////////////////////////////// type RpcServerHandler struct { + exporter protocol.Exporter maxSessionNum int sessionTimeout time.Duration sessionMap map[getty.Session]*rpcSession rwlock sync.RWMutex } -func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler { +func NewRpcServerHandler(exporter protocol.Exporter, maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler { return &RpcServerHandler{ + exporter: exporter, maxSessionNum: maxSessionNum, sessionTimeout: sessionTimeout, sessionMap: make(map[getty.Session]*rpcSession), @@ -167,6 +174,23 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { } p.Header.ResponseStatus = hessian.Response_OK + invoker := h.exporter.GetInvoker() + if invoker != nil { + attchments := map[string]string{} + // todo: use them followingly if need + url := invoker.GetUrl().(*config.URL) + attchments[constant.PATH_KEY] = url.Path + attchments[constant.GROUP_KEY] = url.Group + attchments[constant.SERVICE_KEY] = url.Service + attchments[constant.VERSION_KEY] = url.Version + result := invoker.Invoke(protocol.NewRPCInvocationForProvider(attchments)) + if err := result.Error(); err != nil { + p.Header.ResponseStatus = hessian.Response_SERVER_ERROR + p.Body = err + return + } + } + // heartbeat if p.Header.Type&hessian.Heartbeat != 0x00 { log.Debug("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body) @@ -174,7 +198,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { return } - // twoway + // not twoway if p.Header.Type&hessian.Request_TwoWay == 0x00 { h.reply(session, p, hessian.Response) h.callService(p, nil) @@ -228,7 +252,14 @@ func (h *RpcServerHandler) callService(req *DubboPackage, ctx context.Context) { } }() - svc := req.Body.(map[string]interface{})["service"].(*config.Service) + svcIf := req.Body.(map[string]interface{})["service"] + if svcIf == nil { + log.Error("service not found!") + req.Header.ResponseStatus = hessian.Response_SERVICE_NOT_FOUND + req.Body = nil + return + } + svc := svcIf.(*config.Service) method := svc.Method()[req.Service.Method] // prepare argv diff --git a/protocol/dubbo/server.go b/protocol/dubbo/server.go index f2c90f35bcf9fb1b0d77e3025661254f461cd512..ccc16bef694023c29455145e71e7ef8c441c13e7 100644 --- a/protocol/dubbo/server.go +++ b/protocol/dubbo/server.go @@ -2,6 +2,7 @@ package dubbo import ( "fmt" + "github.com/dubbo/dubbo-go/protocol" "gopkg.in/yaml.v2" "io/ioutil" "net" @@ -18,45 +19,51 @@ import ( "github.com/dubbo/dubbo-go/config" ) -var srv = NewServer() +var srvConf *ServerConfig -const CONF_SERVER_FILE_PATH = "CONF_SERVER_FILE_PATH" - -type Server struct { - conf ServerConfig - tcpServerList []getty.Server -} - -func NewServer() *Server { - - s := &Server{} +const CONF_DUBBO_SERVER_FILE_PATH = "CONF_SERVER_FILE_PATH" +func init() { // load serverconfig from *.yml - path := os.Getenv(CONF_SERVER_FILE_PATH) + path := os.Getenv(CONF_DUBBO_SERVER_FILE_PATH) if path == "" { log.Info("CONF_SERVER_FILE_PATH is null") - return s + return } file, err := ioutil.ReadFile(path) if err != nil { log.Error(jerrors.Trace(err)) - return s + return } conf := &ServerConfig{} err = yaml.Unmarshal(file, conf) if err != nil { log.Error(jerrors.Trace(err)) - return s + return } if err := conf.CheckValidity(); err != nil { log.Error("ServerConfig check failed: ", err) - return s + return } - s.conf = *conf + srvConf = conf +} + +type Server struct { + conf ServerConfig + tcpServer getty.Server + exporter protocol.Exporter +} + +func NewServer(exporter protocol.Exporter) *Server { + + s := &Server{ + exporter: exporter, + conf: *srvConf, + } return s } @@ -86,8 +93,8 @@ func (s *Server) newSession(session getty.Session) error { session.SetName(conf.GettySessionParam.SessionName) session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen) - session.SetPkgHandler(NewRpcServerPackageHandler()) // TODO: now, a server will bind all service - session.SetEventListener(NewRpcServerHandler(conf.SessionNumber, conf.sessionTimeout)) + session.SetPkgHandler(NewRpcServerPackageHandler()) + session.SetEventListener(NewRpcServerHandler(s.exporter, conf.SessionNumber, conf.sessionTimeout)) session.SetRQLen(conf.GettySessionParam.PkgRQSize) session.SetWQLen(conf.GettySessionParam.PkgWQSize) session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout) @@ -111,16 +118,10 @@ func (s *Server) Start(url config.URL) { ) tcpServer.RunEventLoop(s.newSession) log.Debug("s bind addr{%s} ok!", addr) - s.tcpServerList = append(s.tcpServerList, tcpServer) + s.tcpServer = tcpServer } func (s *Server) Stop() { - list := s.tcpServerList - s.tcpServerList = nil - if list != nil { - for _, tcpServer := range list { - tcpServer.Close() - } - } + s.tcpServer.Close() } diff --git a/protocol/invocation.go b/protocol/invocation.go index a6d86a3d8bea752d3c110ff5d04245406a6e46c1..5e079f3dc2aa54e095f0c3ebb5e3416e7fe07f8b 100644 --- a/protocol/invocation.go +++ b/protocol/invocation.go @@ -30,7 +30,7 @@ type RPCInvocation struct { } // todo: arguments table is too many -func NewRPCInvocation(methodName string, parameterTypes []reflect.Type, arguments []interface{}, +func NewRPCInvocationForConsumer(methodName string, parameterTypes []reflect.Type, arguments []interface{}, reply interface{}, callBack interface{}, attachments map[string]string, invoker Invoker, params map[string]interface{}) *RPCInvocation { return &RPCInvocation{ methodName: methodName, @@ -44,6 +44,12 @@ func NewRPCInvocation(methodName string, parameterTypes []reflect.Type, argument } } +func NewRPCInvocationForProvider(attachments map[string]string) *RPCInvocation { + return &RPCInvocation{ + attachments: attachments, + } +} + func (r *RPCInvocation) MethodName() string { return r.methodName } diff --git a/protocol/invoker.go b/protocol/invoker.go index 1b974abc421aec9e5cf82aa6710674379d04eef9..92236286495b6e75a198a3c58f3c51a9cd93cf35 100644 --- a/protocol/invoker.go +++ b/protocol/invoker.go @@ -1,9 +1,56 @@ package protocol -import "github.com/dubbo/dubbo-go/common" +import ( + log "github.com/AlexStocks/log4go" +) + +import ( + "github.com/dubbo/dubbo-go/common" + "github.com/dubbo/dubbo-go/config" +) // Extension - Invoker type Invoker interface { common.Node Invoke(Invocation) Result } + +///////////////////////////// +// base invoker +///////////////////////////// + +type BaseInvoker struct { + url config.IURL + available bool + destroyed bool +} + +func NewBaseInvoker(url config.IURL) BaseInvoker { + return BaseInvoker{ + url: url, + available: true, + destroyed: false, + } +} + +func (bi *BaseInvoker) GetUrl() config.IURL { + return bi.url +} + +func (bi *BaseInvoker) IsAvailable() bool { + return bi.available +} + +func (bi *BaseInvoker) IsDestroyed() bool { + return bi.destroyed +} + +func (bi *BaseInvoker) Invoke(invocation Invocation) Result { + return nil +} + +func (bi *BaseInvoker) Destroy() { + log.Info("Destroy invoker: %s", bi.GetUrl().(*config.URL).String()) + bi.destroyed = true + bi.available = false +} diff --git a/protocol/jsonrpc/.gitkeep b/protocol/jsonrpc/.gitkeep deleted file mode 100644 index d3f5a12faa99758192ecc4ed3fc22c9249232e86..0000000000000000000000000000000000000000 --- a/protocol/jsonrpc/.gitkeep +++ /dev/null @@ -1 +0,0 @@ - diff --git a/jsonrpc/http.go b/protocol/jsonrpc/http.go similarity index 83% rename from jsonrpc/http.go rename to protocol/jsonrpc/http.go index dbf6d1d7a4129283daf41ff9d5068683ecd340e6..256616eb89e8f437f2230f773e8f7f73861b43f7 100644 --- a/jsonrpc/http.go +++ b/protocol/jsonrpc/http.go @@ -20,9 +20,8 @@ import ( ) import ( - "github.com/dubbo/dubbo-go/client" + "github.com/dubbo/dubbo-go/config" "github.com/dubbo/dubbo-go/public" - "github.com/dubbo/dubbo-go/registry" ) ////////////////////////////////////////////// @@ -38,11 +37,6 @@ type Request struct { method string args interface{} contentType string - conf registry.ReferenceConfig -} - -func (r *Request) ServiceConfig() registry.ReferenceConfig { - return r.conf } ////////////////////////////////////////////// @@ -83,30 +77,28 @@ func NewHTTPClient(opt *HTTPOptions) *HTTPClient { } } -func (c *HTTPClient) NewRequest(conf registry.ReferenceConfig, method string, args interface{}) (client.Request, error) { +func (c *HTTPClient) NewRequest(service config.URL, method string, args interface{}) *Request { return &Request{ ID: atomic.AddInt64(&c.ID, 1), - group: conf.Group(), - protocol: conf.Protocol(), - version: conf.Version(), - service: conf.Service(), + group: service.Group, + protocol: service.Protocol, + version: service.Version, + service: service.Service, method: method, args: args, - conf: conf, - }, nil + } } -func (c *HTTPClient) Call(ctx context.Context, service config.URL, request client.Request, rsp interface{}) error { +func (c *HTTPClient) Call(ctx context.Context, service config.URL, req *Request, rsp interface{}) error { // header - req := request.(*Request) httpHeader := http.Header{} httpHeader.Set("Content-Type", "application/json") httpHeader.Set("Accept", "application/json") reqTimeout := c.options.HTTPTimeout - if service.Timeout() != 0 && service.Timeout() < reqTimeout { - reqTimeout = time.Duration(service.Timeout()) + if service.Timeout != 0 && service.Timeout < reqTimeout { + reqTimeout = time.Duration(service.Timeout) } if reqTimeout <= 0 { reqTimeout = 1e8 @@ -130,7 +122,7 @@ func (c *HTTPClient) Call(ctx context.Context, service config.URL, request clien return jerrors.Trace(err) } - rspBody, err := c.Do(service.Location(), service.Query().Get("interface"), httpHeader, reqBody) + rspBody, err := c.Do(service.Location, service.Params.Get("interface"), httpHeader, reqBody) if err != nil { return jerrors.Trace(err) } diff --git a/jsonrpc/json.go b/protocol/jsonrpc/json.go similarity index 100% rename from jsonrpc/json.go rename to protocol/jsonrpc/json.go diff --git a/protocol/jsonrpc/jsonrpc_exporter.go b/protocol/jsonrpc/jsonrpc_exporter.go new file mode 100644 index 0000000000000000000000000000000000000000..dd0918281127ea647cd58e09d52751d83d2fe02c --- /dev/null +++ b/protocol/jsonrpc/jsonrpc_exporter.go @@ -0,0 +1,15 @@ +package jsonrpc + +import ( + "github.com/dubbo/dubbo-go/protocol" +) + +type JsonrpcExporter struct { + protocol.BaseExporter +} + +func NewJsonrpcExporter(key string, invoker protocol.Invoker, exporterMap map[string]protocol.Exporter) *JsonrpcExporter { + return &JsonrpcExporter{ + BaseExporter: *protocol.NewBaseExporter(key, invoker, exporterMap), + } +} diff --git a/protocol/jsonrpc/jsonrpc_invoker.go b/protocol/jsonrpc/jsonrpc_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..9667c46f40d2d72961cf083a6ba9057cc4cade92 --- /dev/null +++ b/protocol/jsonrpc/jsonrpc_invoker.go @@ -0,0 +1,53 @@ +package jsonrpc + +import ( + "context" +) + +import ( + log "github.com/AlexStocks/log4go" + jerrors "github.com/juju/errors" +) + +import ( + "github.com/dubbo/dubbo-go/config" + "github.com/dubbo/dubbo-go/protocol" + "github.com/dubbo/dubbo-go/public" +) + +type JsonrpcInvoker struct { + protocol.BaseInvoker + client *HTTPClient +} + +func NewJsonrpcInvoker(url config.IURL, client *HTTPClient) *JsonrpcInvoker { + return &JsonrpcInvoker{ + BaseInvoker: protocol.NewBaseInvoker(url), + client: client, + } +} + +func (ji *JsonrpcInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + + var ( + result protocol.RPCResult + ) + + inv := invocation.(*protocol.RPCInvocation) + url := inv.Invoker().GetUrl().(*config.URL) + + req := ji.client.NewRequest(*url, inv.MethodName(), inv.Arguments()) + ctx := context.WithValue(context.Background(), public.DUBBOGO_CTX_KEY, map[string]string{ + "X-Proxy-Id": "dubbogo", + "X-Services": url.Service, + "X-Method": inv.MethodName(), + }) + if err := ji.client.Call(ctx, *url, req, inv.Reply()); err != nil { + log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err)) + result.Err = err + } else { + result.Rest = inv.Reply() + } + + return &result +} diff --git a/protocol/jsonrpc/jsonrpc_protocol.go b/protocol/jsonrpc/jsonrpc_protocol.go new file mode 100644 index 0000000000000000000000000000000000000000..8fe385bae3adbbc65ca485a140a1897495957fe3 --- /dev/null +++ b/protocol/jsonrpc/jsonrpc_protocol.go @@ -0,0 +1,79 @@ +package jsonrpc + +import ( + log "github.com/AlexStocks/log4go" +) + +import ( + "github.com/dubbo/dubbo-go/common/extension" + "github.com/dubbo/dubbo-go/config" + "github.com/dubbo/dubbo-go/protocol" +) + +const JSONRPC = "jsonrpc" + +func init() { + extension.SetProtocol(JSONRPC, GetProtocol) +} + +var jsonrpcProtocol *JsonrpcProtocol + +type JsonrpcProtocol struct { + protocol.BaseProtocol + serverMap map[string]*Server +} + +func NewDubboProtocol() *JsonrpcProtocol { + return &JsonrpcProtocol{ + BaseProtocol: protocol.NewBaseProtocol(), + serverMap: make(map[string]*Server), + } +} + +func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter { + url := invoker.GetUrl().(*config.URL) + serviceKey := url.Key() + exporter := NewJsonrpcExporter(serviceKey, invoker, jp.ExporterMap()) + jp.SetExporterMap(serviceKey, exporter) + log.Info("Export service: ", url.String()) + + // start server + jp.openServer(*url) + + return exporter +} + +func (jp *JsonrpcProtocol) Refer(url config.IURL) protocol.Invoker { + invoker := NewJsonrpcInvoker(url, NewHTTPClient(&HTTPOptions{ + HandshakeTimeout: config.GetConsumerConfig().ConnectTimeout, + HTTPTimeout: config.GetConsumerConfig().RequestTimeout, + })) + jp.SetInvokers(invoker) + log.Info("Refer service: ", url.(*config.URL).String()) + return invoker +} + +func (jp *JsonrpcProtocol) Destroy() { + log.Info("jsonrpcProtocol destroy.") + + jp.BaseProtocol.Destroy() + + // stop server + for key, server := range jp.serverMap { + delete(jp.serverMap, key) + server.Stop() + } +} + +func (jp *JsonrpcProtocol) openServer(url config.URL) { + srv := NewServer(jp.ExporterMap()[url.Key()]) + jp.serverMap[url.Location] = srv + srv.Start(url) +} + +func GetProtocol() protocol.Protocol { + if jsonrpcProtocol != nil { + return jsonrpcProtocol + } + return NewDubboProtocol() +} diff --git a/protocol/jsonrpc/server.go b/protocol/jsonrpc/server.go new file mode 100644 index 0000000000000000000000000000000000000000..01950d3ab69d51ce755823b15ae80476957c17a5 --- /dev/null +++ b/protocol/jsonrpc/server.go @@ -0,0 +1,354 @@ +package jsonrpc + +import ( + "bufio" + "bytes" + "context" + "github.com/dubbo/dubbo-go/common/constant" + "io" + "io/ioutil" + "net" + "net/http" + "reflect" + "runtime" + "runtime/debug" + "sync" + "time" +) + +import ( + log "github.com/AlexStocks/log4go" + jerrors "github.com/juju/errors" +) + +import ( + "github.com/dubbo/dubbo-go/config" + "github.com/dubbo/dubbo-go/protocol" +) + +var ( + // A value sent as a placeholder for the server's response value when the server + // receives an invalid request. It is never decoded by the client since the Response + // contains an error when it is used. + invalidRequest = struct{}{} +) + +const ( + DefaultMaxSleepTime = 1 * time.Second // accept涓棿鏈€澶leep interval + DefaultHTTPRspBufferSize = 1024 + PathPrefix = byte('/') +) + +type Server struct { + exporter protocol.Exporter + done chan struct{} + once sync.Once + + sync.RWMutex + wg sync.WaitGroup + timeout time.Duration +} + +func NewServer(exporter protocol.Exporter) *Server { + return &Server{ + exporter: exporter, + done: make(chan struct{}), + } +} + +func (s *Server) handlePkg(conn net.Conn) { + defer func() { + if r := recover(); r != nil { + log.Warn("connection{local:%v, remote:%v} panic error:%#v, debug stack:%s", + conn.LocalAddr(), conn.RemoteAddr(), r, string(debug.Stack())) + } + + conn.Close() + }() + + setReadTimeout := func(conn net.Conn, timeout time.Duration) { + t := time.Time{} + if timeout > time.Duration(0) { + t = time.Now().Add(timeout) + } + + conn.SetDeadline(t) + } + + sendErrorResp := func(header http.Header, body []byte) error { + rsp := &http.Response{ + Header: header, + StatusCode: 500, + ContentLength: int64(len(body)), + Body: ioutil.NopCloser(bytes.NewReader(body)), + } + rsp.Header.Del("Content-Type") + rsp.Header.Del("Content-Length") + rsp.Header.Del("Timeout") + + rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize)) + rspBuf.Reset() + err := rsp.Write(rspBuf) + if err != nil { + return jerrors.Trace(err) + } + _, err = rspBuf.WriteTo(conn) + return jerrors.Trace(err) + } + + for { + bufReader := bufio.NewReader(conn) + r, err := http.ReadRequest(bufReader) + if err != nil { + return + } + + reqBody, err := ioutil.ReadAll(r.Body) + if err != nil { + return + } + r.Body.Close() + + reqHeader := make(map[string]string) + for k := range r.Header { + reqHeader[k] = r.Header.Get(k) + } + reqHeader["Path"] = r.URL.Path[1:] // to get service name + if r.URL.Path[0] != PathPrefix { + reqHeader["Path"] = r.URL.Path + } + reqHeader["HttpMethod"] = r.Method + + httpTimeout := s.timeout + contentType := reqHeader["Content-Type"] + if contentType != "application/json" && contentType != "application/json-rpc" { + setReadTimeout(conn, httpTimeout) + r.Header.Set("Content-Type", "text/plain") + if errRsp := sendErrorResp(r.Header, []byte(jerrors.ErrorStack(err))); errRsp != nil { + log.Warn("sendErrorResp(header:%#v, error:%s) = error:%s", + r.Header, jerrors.ErrorStack(err), errRsp) + } + return + } + + // exporter invoke + invoker := s.exporter.GetInvoker() + if invoker != nil { + attchments := map[string]string{} + // todo: use them followingly if need + url := invoker.GetUrl().(*config.URL) + attchments[constant.PATH_KEY] = url.Path + attchments[constant.GROUP_KEY] = url.Group + attchments[constant.SERVICE_KEY] = url.Service + attchments[constant.VERSION_KEY] = url.Version + result := invoker.Invoke(protocol.NewRPCInvocationForProvider(attchments)) + if err := result.Error(); err != nil { + if errRsp := sendErrorResp(r.Header, []byte(err.Error())); errRsp != nil { + log.Warn("Exporter: sendErrorResp(header:%#v, error:%v) = error:%s", + r.Header, err, errRsp) + } + return + } + } + + ctx := context.Background() + if len(reqHeader["Timeout"]) > 0 { + timeout, err := time.ParseDuration(reqHeader["Timeout"]) + if err == nil { + httpTimeout = timeout + ctx, _ = context.WithTimeout(ctx, httpTimeout) + } + delete(reqHeader, "Timeout") + } + setReadTimeout(conn, httpTimeout) + + if err := serveRequest(ctx, reqHeader, reqBody, conn); err != nil { + if errRsp := sendErrorResp(r.Header, []byte(jerrors.ErrorStack(err))); errRsp != nil { + log.Warn("sendErrorResp(header:%#v, error:%s) = error:%s", + r.Header, jerrors.ErrorStack(err), errRsp) + } + + log.Info("Unexpected error serving request, closing socket: %v", err) + return + } + } +} + +func accept(listener net.Listener, fn func(net.Conn)) error { + var ( + err error + c net.Conn + ok bool + ne net.Error + tmpDelay time.Duration + ) + + for { + c, err = listener.Accept() + if err != nil { + if ne, ok = err.(net.Error); ok && ne.Temporary() { + if tmpDelay != 0 { + tmpDelay <<= 1 + } else { + tmpDelay = 5 * time.Millisecond + } + if tmpDelay > DefaultMaxSleepTime { + tmpDelay = DefaultMaxSleepTime + } + log.Info("http: Accept error: %v; retrying in %v\n", err, tmpDelay) + time.Sleep(tmpDelay) + continue + } + return jerrors.Trace(err) + } + + go func() { + defer func() { + if r := recover(); r != nil { + const size = 64 << 10 + buf := make([]byte, size) + buf = buf[:runtime.Stack(buf, false)] + log.Error("http: panic serving %v: %v\n%s", c.RemoteAddr(), r, buf) + c.Close() + } + }() + + fn(c) + }() + } +} + +func (s *Server) Start(url config.URL) { + listener, err := net.Listen("tcp", url.Location) + if err != nil { + log.Error("jsonrpc server [%s] start failed: %v", url.Service, err) + return + } + log.Info("rpc server start to listen on %s", listener.Addr()) + + s.wg.Add(1) + go func() { + accept(listener, func(conn net.Conn) { s.handlePkg(conn) }) + s.wg.Done() + }() + + s.wg.Add(1) + go func() { // Server done goroutine + var err error + <-s.done // step1: block to wait for done channel(wait Server.Stop step2) + err = listener.Close() // step2: and then close listener + if err != nil { + log.Warn("listener{addr:%s}.Close() = error{%#v}", listener.Addr(), err) + } + s.wg.Done() + }() +} + +func (s *Server) Stop() { + s.once.Do(func() { + close(s.done) + s.wg.Wait() + }) +} + +func serveRequest(ctx context.Context, + header map[string]string, body []byte, conn net.Conn) error { + + // read request header + codec := newServerCodec() + err := codec.ReadHeader(header, body) + if err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { + return jerrors.Trace(err) + } + + return jerrors.New("server cannot decode request: " + err.Error()) + } + serviceName := header["Path"] + methodName := codec.req.Method + if len(serviceName) == 0 || len(methodName) == 0 { + codec.ReadBody(nil) + return jerrors.New("service/method request ill-formed: " + serviceName + "/" + methodName) + } + + // get method + svc := config.ServiceMap.GetService(serviceName) + if svc == nil { + codec.ReadBody(nil) + return jerrors.New("cannot find svc " + serviceName) + } + mtype := svc.Method()[methodName] + if mtype == nil { + codec.ReadBody(nil) + return jerrors.New("cannot find method " + methodName + " of svc " + serviceName) + } + + // get args + var argv reflect.Value + argIsValue := false + if mtype.ArgType().Kind() == reflect.Ptr { + argv = reflect.New(mtype.ArgType().Elem()) + } else { + argv = reflect.New(mtype.ArgType()) + argIsValue = true + } + // argv guaranteed to be a pointer now. + if err = codec.ReadBody(argv.Interface()); err != nil { + return jerrors.Trace(err) + } + if argIsValue { + argv = argv.Elem() + } + + replyv := reflect.New(mtype.ReplyType().Elem()) + + // call service.method(args) + var errMsg string + returnValues := mtype.Method().Func.Call([]reflect.Value{ + svc.Rcvr(), + mtype.SuiteContext(ctx), + reflect.ValueOf(argv.Interface()), + reflect.ValueOf(replyv.Interface()), + }) + // The return value for the method is an error. + if retErr := returnValues[0].Interface(); retErr != nil { + errMsg = retErr.(error).Error() + } + + // write response + code := 200 + rspReply := replyv.Interface() + if len(errMsg) != 0 { + code = 500 + rspReply = invalidRequest + } + rspStream, err := codec.Write(errMsg, rspReply) + if err != nil { + return jerrors.Trace(err) + } + rsp := &http.Response{ + StatusCode: code, + ProtoMajor: 1, + ProtoMinor: 1, + Header: make(http.Header), + ContentLength: int64(len(rspStream)), + Body: ioutil.NopCloser(bytes.NewReader(rspStream)), + } + delete(header, "Content-Type") + delete(header, "Content-Length") + delete(header, "Timeout") + for k, v := range header { + rsp.Header.Set(k, v) + } + + rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize)) + rspBuf.Reset() + if err = rsp.Write(rspBuf); err != nil { + log.Warn("rsp.Write(rsp:%#v) = error:%s", rsp, err) + return nil + } + if _, err = rspBuf.WriteTo(conn); err != nil { + log.Warn("rspBuf.WriteTo(conn:%#v) = error:%s", conn, err) + } + return nil +} diff --git a/protocol/protocol.go b/protocol/protocol.go index a9da663585e622bdf5f67223e33c3977910e95c8..a490ba4cb8f9fd76cffe5fb316f57621d737876d 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -1,6 +1,9 @@ package protocol -import "github.com/dubbo/dubbo-go/config" +import ( + "github.com/dubbo/dubbo-go/config" + "github.com/prometheus/common/log" +) // Extension - Protocol type Protocol interface { @@ -9,7 +12,95 @@ type Protocol interface { Destroy() } +// wrapping invoker type Exporter interface { GetInvoker() Invoker Unexport() } + +///////////////////////////// +// base protocol +///////////////////////////// + +type BaseProtocol struct { + exporterMap map[string]Exporter + invokers []Invoker +} + +func NewBaseProtocol() BaseProtocol { + return BaseProtocol{ + exporterMap: make(map[string]Exporter), + } +} + +func (bp *BaseProtocol) SetExporterMap(key string, exporter Exporter) { + bp.exporterMap[key] = exporter +} + +func (bp *BaseProtocol) ExporterMap() map[string]Exporter { + return bp.exporterMap +} + +func (bp *BaseProtocol) SetInvokers(invoker Invoker) { + bp.invokers = append(bp.invokers, invoker) +} + +func (bp *BaseProtocol) Invokers() []Invoker { + return bp.invokers +} + +func (bp *BaseProtocol) Export(invoker Invoker) Exporter { + return nil +} + +func (bp *BaseProtocol) Refer(url config.IURL) Invoker { + return nil +} + +func (bp *BaseProtocol) Destroy() { + // destroy invokers + for _, invoker := range bp.invokers { + if invoker != nil { + invoker.Destroy() + } + } + bp.invokers = []Invoker{} + + // unexport exporters + for key, exporter := range bp.ExporterMap() { + if exporter != nil { + exporter.Unexport() + } else { + delete(bp.exporterMap, key) + } + } +} + +///////////////////////////// +// base exporter +///////////////////////////// + +type BaseExporter struct { + key string + invoker Invoker + exporterMap map[string]Exporter +} + +func NewBaseExporter(key string, invoker Invoker, exporterMap map[string]Exporter) *BaseExporter { + return &BaseExporter{ + key: key, + invoker: invoker, + exporterMap: exporterMap, + } +} + +func (de *BaseExporter) GetInvoker() Invoker { + return de.invoker + +} + +func (de *BaseExporter) Unexport() { + log.Info("Exporter unexport.") + de.invoker.Destroy() + delete(de.exporterMap, de.key) +} diff --git a/protocol/protocolwrapper/protocol_filter_wrapper.go b/protocol/protocolwrapper/protocol_filter_wrapper.go new file mode 100644 index 0000000000000000000000000000000000000000..897e82802502337709fec08b4d437a5d650619bb --- /dev/null +++ b/protocol/protocolwrapper/protocol_filter_wrapper.go @@ -0,0 +1,88 @@ +package protocol + +import ( + "strings" +) + +import ( + "github.com/dubbo/dubbo-go/common/constant" + "github.com/dubbo/dubbo-go/common/extension" + "github.com/dubbo/dubbo-go/config" + "github.com/dubbo/dubbo-go/filter" + "github.com/dubbo/dubbo-go/protocol" +) + +const FILTER = "filter" + +func init() { + extension.SetProtocol(FILTER, GetProtocol) +} + +// protocol in url decide who ProtocolFilterWrapper.protocol is +type ProtocolFilterWrapper struct { + protocol protocol.Protocol +} + +func (pfw *ProtocolFilterWrapper) Export(invoker protocol.Invoker) protocol.Exporter { + if pfw.protocol == nil { + pfw.protocol = extension.GetProtocolExtension(invoker.GetUrl().(*config.URL).Protocol) + } + invoker = buildInvokerChain(invoker, constant.SERVICE_FILTER_KEY) + return pfw.protocol.Export(invoker) +} + +func (pfw *ProtocolFilterWrapper) Refer(url config.IURL) protocol.Invoker { + if pfw.protocol == nil { + pfw.protocol = extension.GetProtocolExtension(url.(*config.URL).Protocol) + } + return buildInvokerChain(pfw.protocol.Refer(url), constant.REFERENCE_FILTER_KEY) +} + +func (pfw *ProtocolFilterWrapper) Destroy() { + pfw.protocol.Destroy() +} + +func buildInvokerChain(invoker protocol.Invoker, key string) protocol.Invoker { + filtName := invoker.GetUrl().(*config.URL).Params.Get(key) + filtNames := strings.Split(filtName, ",") + next := invoker + // The order of filters is from left to right, so loading from right to left + for i := len(filtNames) - 1; i >= 0; i-- { + filter := extension.GetFilterExtension(filtNames[i]) + fi := &FilterInvoker{next: next, invoker: invoker, filter: filter} + next = fi + } + + return next +} + +func GetProtocol() protocol.Protocol { + return &ProtocolFilterWrapper{} +} + +/////////////////////////// +// filter invoker +/////////////////////////// + +type FilterInvoker struct { + next protocol.Invoker + invoker protocol.Invoker + filter filter.Filter +} + +func (fi *FilterInvoker) GetUrl() config.IURL { + return fi.invoker.GetUrl() +} + +func (fi *FilterInvoker) IsAvailable() bool { + return fi.invoker.IsAvailable() +} + +func (fi *FilterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + result := fi.filter.Invoke(fi.next, invocation) + return fi.filter.OnResponse(result, fi.invoker, invocation) +} + +func (fi *FilterInvoker) Destroy() { + fi.invoker.Destroy() +} diff --git a/protocol/result.go b/protocol/result.go index c5b6db70463872568842a84edd96db2cffbd7fcf..ba75e6178eeffb36d82213082bca9fe39768e7df 100644 --- a/protocol/result.go +++ b/protocol/result.go @@ -1,6 +1,8 @@ package protocol type Result interface { + Error() error + Result() interface{} } ///////////////////////////// @@ -11,3 +13,11 @@ type RPCResult struct { Err error Rest interface{} } + +func (r *RPCResult) Error() error { + return r.Err +} + +func (r *RPCResult) Result() interface{} { + return r.Rest +}