diff --git a/common/extension/protocol.go b/common/extension/protocol.go index 6c195a2fd9e6da81cb4947634d89aeb80866ff02..1183d884010fab6afe91d2b21e5571c3c746c70e 100644 --- a/common/extension/protocol.go +++ b/common/extension/protocol.go @@ -1,6 +1,6 @@ package extension -import "github.com/dubbo/dubbo-go/protocol" +import "github.com/dubbo/dubbo-go/protocol" var ( protocols map[string]func() protocol.Protocol @@ -10,10 +10,17 @@ func init() { protocols = make(map[string]func() protocol.Protocol) } -func SetRefProtocol(fn func() protocol.Protocol){ +func SetRefProtocol(fn func() protocol.Protocol) { protocols["refProtocol"] = fn } -func GetRefProtocol() protocol.Protocol{ +func GetRefProtocol() protocol.Protocol { return protocols["refProtocol"]() } +func SetProtocol(name string, v func() protocol.Protocol) { + protocols[name] = v +} + +func GetProtocolExtension(name string) protocol.Protocol { + return protocols[name]() +} diff --git a/config/config_loader.go b/config/config_loader.go index 1740efa2f0751ec2781c636c5cc9e23f61e0db2e..fdd3ca484311f531472ee154945ec01fe595545f 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -2,6 +2,7 @@ package config import ( "fmt" + "gopkg.in/yaml.v2" "io/ioutil" "os" "path" @@ -9,9 +10,8 @@ import ( ) import ( - gxlog "github.com/AlexStocks/goext/log" + "github.com/AlexStocks/goext/log" jerrors "github.com/juju/errors" - "gopkg.in/yaml.v2" ) import ( @@ -25,38 +25,47 @@ var ( ) // loaded comsumer & provider config from xxx.yml -// Namely: dubbo.comsumer.xml & dubbo.provider.xml +// Namely: dubbo.comsumer.xml & dubbo.provider.xml in java dubbo func init() { var ( - confFile string + confConFile, confProFile string ) - // configure - confFile = os.Getenv(constant.CONF_CONSUMER_FILE_PATH) - if confFile == "" { - panic(fmt.Sprintf("application configure file name is nil")) - } - if path.Ext(confFile) != ".yml" { - panic(fmt.Sprintf("application configure file name{%v} suffix must be .yml", confFile)) - } + confConFile = os.Getenv(constant.CONF_CONSUMER_FILE_PATH) + confProFile = os.Getenv(constant.CONF_PROVIDER_FILE_PATH) - confFileStream, err := ioutil.ReadFile(confFile) - if err != nil { - panic(fmt.Sprintf("ioutil.ReadFile(file:%s) = error:%s", confFile, jerrors.ErrorStack(err))) + if confConFile == "" && confProFile == "" { + panic(fmt.Sprintf("application configure(consumer & provider) file name is nil")) } - err = yaml.Unmarshal(confFileStream, consumerConfig) - if err != nil { - panic(fmt.Sprintf("yaml.Unmarshal() = error:%s", jerrors.ErrorStack(err))) - } - //鍔ㄦ€佸姞杞絪ervice config end - for _, config := range consumerConfig.Registries { - if config.Timeout, err = time.ParseDuration(config.TimeoutStr); err != nil { - panic(fmt.Sprintf("time.ParseDuration(Registry_Config.Timeout:%#v) = error:%s", config.TimeoutStr, err)) + + if confConFile != "" { + + if path.Ext(confConFile) != ".yml" { + panic(fmt.Sprintf("application configure file name{%v} suffix must be .yml", confConFile)) } + + confFileStream, err := ioutil.ReadFile(confConFile) + if err != nil { + panic(fmt.Sprintf("ioutil.ReadFile(file:%s) = error:%s", confConFile, jerrors.ErrorStack(err))) + } + err = yaml.Unmarshal(confFileStream, consumerConfig) + if err != nil { + panic(fmt.Sprintf("yaml.Unmarshal() = error:%s", jerrors.ErrorStack(err))) + } + //鍔ㄦ€佸姞杞絪ervice config end + for _, config := range consumerConfig.Registries { + if config.Timeout, err = time.ParseDuration(config.TimeoutStr); err != nil { + panic(fmt.Sprintf("time.ParseDuration(Registry_Config.Timeout:%#v) = error:%s", config.TimeoutStr, err)) + } + } + + gxlog.CInfo("consumer config{%#v}\n", consumerConfig) } - gxlog.CInfo("config{%#v}\n", consumerConfig) + if confProFile != "" { + + } // log //confFile = os.Getenv(APP_LOG_CONF_FILE) diff --git a/config/config_url.go b/config/config_url.go index 41b9e8a0852fb277c71ff669b41e24020d07c18b..89f229b5eabf50bb872d0f6cb61572d3b37cc972 100644 --- a/config/config_url.go +++ b/config/config_url.go @@ -14,12 +14,12 @@ import ( ) type baseUrl struct { - Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"` - Location string // ip+port - Ip string - Port string - Timeout time.Duration - Query url.Values + Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"` + Location string // ip+port + Ip string + Port string + Timeout time.Duration + Query url.Values PrimitiveURL string } @@ -32,13 +32,14 @@ type ConfigURL struct { Version string `yaml:"version" json:"version,omitempty"` Group string `yaml:"group" json:"group,omitempty"` - Weight int32 - Methods string `yaml:"methods" json:"methods,omitempty"` - Username string - Password string + Weight int32 + Methods string `yaml:"methods" json:"methods,omitempty"` + Username string + Password string } func NewConfigURL(urlString string) (*ConfigURL, error) { + var ( err error rawUrlString string @@ -46,6 +47,11 @@ func NewConfigURL(urlString string) (*ConfigURL, error) { s = &ConfigURL{} ) + // new a null instance + if urlString == "" { + return s, nil + } + rawUrlString, err = url.QueryUnescape(urlString) if err != nil { return nil, jerrors.Errorf("url.QueryUnescape(%s), error{%v}", urlString, err) @@ -92,6 +98,7 @@ func (c *ConfigURL) Key() string { } func (c *ConfigURL) ConfigURLEqual(url ConfigURL) bool { + if c.Key() != url.Key() { return false } diff --git a/config/reference_config.go b/config/reference_config.go index b8a7ca5fd4f987d5a154bb20e46e64ebda646cda..cac6a8a5ed1991d57f778b3271ab92d03a6b6747 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -50,5 +50,9 @@ func (refconfig *ReferenceConfig) loadRegistries() []ConfigURL { } } +<<<<<<< HEAD return urls +======= + return nil +>>>>>>> 34ad782cbc0dfc96000bc3a18191537218a21186 } diff --git a/config/registry_config.go b/config/registry_config.go index 7e41fdfd60ab56061e96424364e81e7b887832f8..8ead2b5ad33906af2d6a0852778ecd5e2c92bbb3 100644 --- a/config/registry_config.go +++ b/config/registry_config.go @@ -4,7 +4,7 @@ import "time" type RegistryConfig struct { Id string `required:"true" yaml:"id" json:"id,omitempty"` - Address string `required:"true" yaml:"address" json:"address,omitempty"` + Address string `required:"true" yaml:"address" json:"address,omitempty"` UserName string `yaml:"user_name" json:"user_name,omitempty"` Password string `yaml:"password" json:"password,omitempty"` TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty"` // unit: second diff --git a/config/rpc_service.go b/config/rpc_service.go new file mode 100644 index 0000000000000000000000000000000000000000..2fef5b88c7097839c27082e11bf23fe77d9ac99d --- /dev/null +++ b/config/rpc_service.go @@ -0,0 +1,164 @@ +package config + +import ( + "reflect" + "sync" + "unicode" + "unicode/utf8" +) + +import ( + log "github.com/AlexStocks/log4go" +) + +// rpc service interface +type RPCService interface { + Service() string // Service Interface + Version() string +} + +var ( + // A value sent as a placeholder for the server's response value when the server + // receives an invalid request. It is never decoded by the client since the Response + // contains an error when it is used. + invalidRequest = struct{}{} + + // Precompute the reflect type for error. Can't use error directly + // because Typeof takes an empty interface value. This is annoying. + typeOfError = reflect.TypeOf((*error)(nil)).Elem() + + ServiceMap = &serviceMap{ + serviceMap: make(map[string]*Service), + } +) + +// info of method +type MethodType struct { + method reflect.Method + ctxType reflect.Type // type of the request context + argType reflect.Type + replyType reflect.Type +} + +func (m *MethodType) Method() reflect.Method { + return m.method +} +func (m *MethodType) CtxType() reflect.Type { + return m.ctxType +} +func (m *MethodType) ArgType() reflect.Type { + return m.argType +} +func (m *MethodType) ReplyType() reflect.Type { + return m.replyType +} + +// info of service interface +type Service struct { + name string + rcvr reflect.Value + rcvrType reflect.Type + method map[string]*MethodType +} + +func (s *Service) Method() map[string]*MethodType { + return s.method +} +func (s *Service) RcvrType() reflect.Type { + return s.rcvrType +} +func (s *Service) Rcvr() reflect.Value { + return s.rcvr +} + +type serviceMap struct { + mutex sync.Mutex // protects the serviceMap + serviceMap map[string]*Service // service name -> service +} + +func (sm *serviceMap) GetService(name string) *Service { + return sm.serviceMap[name] +} + +// Is this an exported - upper case - name +func isExported(name string) bool { + rune, _ := utf8.DecodeRuneInString(name) + return unicode.IsUpper(rune) +} + +// Is this type exported or a builtin? +func isExportedOrBuiltinType(t reflect.Type) bool { + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + // PkgPath will be non-empty even for an exported type, + // so we need to check the type name as well. + return isExported(t.Name()) || t.PkgPath() == "" +} + +// suitableMethods returns suitable Rpc methods of typ +func suitableMethods(typ reflect.Type) (string, map[string]*MethodType) { + methods := make(map[string]*MethodType) + mts := "" + for m := 0; m < typ.NumMethod(); m++ { + method := typ.Method(m) + if mt := suiteMethod(method); mt != nil { + methods[method.Name] = mt + mts += method.Name + "," + } + } + return mts, methods +} + +// suiteMethod returns a suitable Rpc methodType +func suiteMethod(method reflect.Method) *MethodType { + mtype := method.Type + mname := method.Name + + // Method must be exported. + if method.PkgPath != "" { + return nil + } + + var replyType, argType, ctxType reflect.Type + switch mtype.NumIn() { + case 3: + argType = mtype.In(1) + replyType = mtype.In(2) + case 4: + ctxType = mtype.In(1) + argType = mtype.In(2) + replyType = mtype.In(3) + default: + log.Error("method %s of mtype %v has wrong number of in parameters %d; needs exactly 3/4", + mname, mtype, mtype.NumIn()) + return nil + } + // First arg need not be a pointer. + if !isExportedOrBuiltinType(argType) { + log.Error("argument type of method %q is not exported %v", mname, argType) + return nil + } + // Second arg must be a pointer. + if replyType.Kind() != reflect.Ptr { + log.Error("reply type of method %q is not a pointer %v", mname, replyType) + return nil + } + // Reply type must be exported. + if !isExportedOrBuiltinType(replyType) { + log.Error("reply type of method %s not exported{%v}", mname, replyType) + return nil + } + // Method needs one out. + if mtype.NumOut() != 1 { + log.Error("method %q has %d out parameters; needs exactly 1", mname, mtype.NumOut()) + return nil + } + // The return type of the method must be error. + if returnType := mtype.Out(0); returnType != typeOfError { + log.Error("return type %s of method %q is not error", returnType, mname) + return nil + } + + return &MethodType{method: method, argType: argType, replyType: replyType, ctxType: ctxType} +} diff --git a/config/service_config.go b/config/service_config.go index 4914eb0677463632fe02a36593b1e9d1a719bc9f..0370ce04de2f496a4b567962fd9f8204a469055c 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -1,11 +1,17 @@ package config - type ServiceConfig struct { - Service string `required:"true" yaml:"service" json:"service,omitempty"` - URLs []ConfigURL + Service string `required:"true" yaml:"service" json:"service,omitempty"` + URLs []ConfigURL + rpcService RPCService } func NewDefaultProviderServiceConfig() *ServiceConfig { return &ServiceConfig{} } + +// todo: D:\Users\yc.fang\WorkStation\goWorkStation\fangyincheng\dubbo-go\dubbo\server.go#Line102 +// todo: D:\Users\yc.fang\WorkStation\goWorkStation\fangyincheng\dubbo-go\jsonrpc\server.go#Line238 +func (sc *ServiceConfig) Export() { + +} diff --git a/go.mod b/go.mod index 8abc40265af71ddb6db8a122d6346edfc3f4f417..6539462ceb8754987e002e53bfb605a5fd6dd36b 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,8 @@ require ( github.com/AlexStocks/log4go v1.0.2 github.com/dubbogo/hessian2 v0.0.0-20190410112310-f093e4436e31 github.com/juju/errors v0.0.0-20190207033735-e65537c515d7 + github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223 // indirect + github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec gopkg.in/yaml.v2 v2.2.2 ) diff --git a/go.sum b/go.sum index c4f8086db8b059460c5a0565dbf57de62d8d2391..aabe80b1b0fb02dc55ea6f6f328937353bf65412 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,7 @@ github.com/AlexStocks/goext v0.3.2/go.mod h1:3M5j9Pjge4CdkNg2WIjRLUeoPedJHHKwkkg github.com/AlexStocks/log4go v1.0.2 h1:1K5WM8KjSUECaoXUl8FSF05KGeCJDfBrhKUBsxwUvhk= github.com/AlexStocks/log4go v1.0.2/go.mod h1:6kCCRo/orDo8mh5CEDOeuSSM674wBQ8M6E0K8dVOIz4= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927/go.mod h1:h/aW8ynjgkuj+NQRlZcDbAbM1ORAbXjXX77sX7T289U= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= @@ -32,6 +33,7 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -74,9 +76,12 @@ github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcncea github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.7 h1:UvyT9uN+3r7yLEYSlJsbQGdsaB/a0DlgWP3pql6iwOc= github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223 h1:F9x/1yl3T2AeKLr2AMdilSD8+f9bvMnNN8VS5iDtovc= +github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/name5566/leaf v0.0.0-20181103040206-1364c176dfbd h1:22rhYEzttbrnKjgYh5pifnDluXHHcJ3uSOi2l8Nw+9A= github.com/name5566/leaf v0.0.0-20181103040206-1364c176dfbd/go.mod h1:JrOIxq3vDxvtuEI7Kmm2yqkuBfuT9DMLFMnCyYHLaKM= github.com/name5566/leaf v0.0.0-20181103040206-1364c176dfbd/go.mod h1:JrOIxq3vDxvtuEI7Kmm2yqkuBfuT9DMLFMnCyYHLaKM= @@ -86,10 +91,14 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740= github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jOqq0gIVUe6Yk0/QMZ640k6NvkxcBf+8= github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a h1:9a8MnZMP0X2nLJdBg+pBmGgkJlSaKC2KaQmTCk1XDtE= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec h1:6ncX5ko6B9LntYM0YBRXkiSaZMmLYeZ/NWcmeB43mMY= diff --git a/protocol/dubbo/.gitkeep b/protocol/dubbo/.gitkeep deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go new file mode 100644 index 0000000000000000000000000000000000000000..695fd16226dedffcb6a198e59ef757f9cf0be5f9 --- /dev/null +++ b/protocol/dubbo/client.go @@ -0,0 +1,278 @@ +package dubbo + +import ( + "math/rand" + "strings" + "sync" + "time" +) + +import ( + "github.com/AlexStocks/getty" + "github.com/AlexStocks/goext/sync/atomic" + "github.com/dubbogo/hessian2" + jerrors "github.com/juju/errors" +) + +import ( + "github.com/dubbo/dubbo-go/config" +) + +var ( + errInvalidCodecType = jerrors.New("illegal CodecType") + errInvalidAddress = jerrors.New("remote address invalid or empty") + errSessionNotExist = jerrors.New("session not exist") + errClientClosed = jerrors.New("client closed") + errClientReadTimeout = jerrors.New("client read timeout") +) + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +type CallOptions struct { + // request timeout + RequestTimeout time.Duration + // response timeout + ResponseTimeout time.Duration + // serial ID + SerialID SerialID + Meta map[interface{}]interface{} +} + +type CallOption func(*CallOptions) + +func WithCallRequestTimeout(d time.Duration) CallOption { + return func(o *CallOptions) { + o.RequestTimeout = d + } +} + +func WithCallResponseTimeout(d time.Duration) CallOption { + return func(o *CallOptions) { + o.ResponseTimeout = d + } +} + +func WithCallSerialID(s SerialID) CallOption { + return func(o *CallOptions) { + o.SerialID = s + } +} + +func WithCallMeta(k, v interface{}) CallOption { + return func(o *CallOptions) { + if o.Meta == nil { + o.Meta = make(map[interface{}]interface{}) + } + o.Meta[k] = v + } +} + +type CallResponse struct { + Opts CallOptions + Cause error + Start time.Time // invoke(call) start time == write start time + ReadStart time.Time // read start time, write duration = ReadStart - Start + Reply interface{} +} + +type AsyncCallback func(response CallResponse) + +type Client struct { + conf ClientConfig + pool *gettyRPCClientPool + sequence gxatomic.Uint64 + + pendingLock sync.RWMutex + pendingResponses map[SequenceType]*PendingResponse +} + +func NewClient(conf *ClientConfig) (*Client, error) { + if err := conf.CheckValidity(); err != nil { + return nil, jerrors.Trace(err) + } + + c := &Client{ + pendingResponses: make(map[SequenceType]*PendingResponse), + conf: *conf, + } + c.pool = newGettyRPCClientConnPool(c, conf.PoolSize, time.Duration(int(time.Second)*conf.PoolTTL)) + + return c, nil +} + +// call one way +func (c *Client) CallOneway(addr string, svcUrl config.ConfigURL, method string, args interface{}, opts ...CallOption) error { + var copts CallOptions + + for _, o := range opts { + o(&copts) + } + + return jerrors.Trace(c.call(CT_OneWay, addr, svcUrl, method, args, nil, nil, copts)) +} + +// if @reply is nil, the transport layer will get the response without notify the invoker. +func (c *Client) Call(addr string, svcUrl config.ConfigURL, method string, args, reply interface{}, opts ...CallOption) error { + var copts CallOptions + + for _, o := range opts { + o(&copts) + } + + ct := CT_TwoWay + if reply == nil { + ct = CT_OneWay + } + + return jerrors.Trace(c.call(ct, addr, svcUrl, method, args, reply, nil, copts)) +} + +func (c *Client) AsyncCall(addr string, svcUrl config.ConfigURL, method string, args interface{}, + callback AsyncCallback, reply interface{}, opts ...CallOption) error { + + var copts CallOptions + for _, o := range opts { + o(&copts) + } + + return jerrors.Trace(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback, copts)) +} + +func (c *Client) call(ct CallType, addr string, svcUrl config.ConfigURL, method string, + args, reply interface{}, callback AsyncCallback, opts CallOptions) error { + + if opts.RequestTimeout == 0 { + opts.RequestTimeout = c.conf.GettySessionParam.tcpWriteTimeout + } + if opts.ResponseTimeout == 0 { + opts.ResponseTimeout = c.conf.GettySessionParam.tcpReadTimeout + } + + p := &DubboPackage{} + p.Service.Path = strings.TrimPrefix(svcUrl.Path(), "/") + p.Service.Target = strings.TrimPrefix(svcUrl.Path(), "/") + p.Service.Version = svcUrl.Version() + p.Service.Method = method + p.Service.Timeout = opts.RequestTimeout + if opts.SerialID == 0 { + p.Header.SerialID = byte(S_Dubbo) + } else { + p.Header.SerialID = byte(opts.SerialID) + } + p.Body = args + + var rsp *PendingResponse + if ct != CT_OneWay { + rsp = NewPendingResponse() + rsp.reply = reply + rsp.callback = callback + rsp.opts = opts + } + + var ( + err error + session getty.Session + conn *gettyRPCClient + ) + conn, session, err = c.selectSession(addr) + if err != nil || session == nil { + return errSessionNotExist + } + defer c.pool.release(conn, err) + + if err = c.transfer(session, p, rsp, opts); err != nil { + return jerrors.Trace(err) + } + + if ct == CT_OneWay || callback != nil { + return nil + } + + select { + case <-getty.GetTimeWheel().After(opts.ResponseTimeout): + err = errClientReadTimeout + c.removePendingResponse(SequenceType(rsp.seq)) + case <-rsp.done: + err = rsp.err + } + + return jerrors.Trace(err) +} + +func (c *Client) Close() { + if c.pool != nil { + c.pool.close() + } + c.pool = nil +} + +func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, error) { + rpcClient, err := c.pool.getGettyRpcClient(DUBBO, addr) + if err != nil { + return nil, nil, jerrors.Trace(err) + } + return rpcClient, rpcClient.selectSession(), nil +} + +func (c *Client) heartbeat(session getty.Session) error { + return c.transfer(session, nil, NewPendingResponse(), CallOptions{}) +} + +func (c *Client) transfer(session getty.Session, pkg *DubboPackage, + rsp *PendingResponse, opts CallOptions) error { + + var ( + sequence uint64 + err error + ) + + sequence = c.sequence.Add(1) + + if pkg == nil { + pkg = &DubboPackage{} + pkg.Body = []interface{}{} + pkg.Header.Type = hessian.Heartbeat + pkg.Header.SerialID = byte(S_Dubbo) + } else { + pkg.Header.Type = hessian.Request + } + pkg.Header.ID = int64(sequence) + + // cond1 + if rsp != nil { + rsp.seq = sequence + c.addPendingResponse(rsp) + } + + err = session.WritePkg(pkg, opts.RequestTimeout) + if err != nil { + c.removePendingResponse(SequenceType(rsp.seq)) + } else if rsp != nil { // cond2 + // cond2 should not merged with cond1. cause the response package may be returned very + // soon and it will be handled by other goroutine. + rsp.readStart = time.Now() + } + + return jerrors.Trace(err) +} + +func (c *Client) addPendingResponse(pr *PendingResponse) { + c.pendingLock.Lock() + defer c.pendingLock.Unlock() + c.pendingResponses[SequenceType(pr.seq)] = pr +} + +func (c *Client) removePendingResponse(seq SequenceType) *PendingResponse { + c.pendingLock.Lock() + defer c.pendingLock.Unlock() + if c.pendingResponses == nil { + return nil + } + if presp, ok := c.pendingResponses[seq]; ok { + delete(c.pendingResponses, seq) + return presp + } + return nil +} diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go new file mode 100644 index 0000000000000000000000000000000000000000..d1a96940a4606377f6e42a122bbb8f91c789c556 --- /dev/null +++ b/protocol/dubbo/codec.go @@ -0,0 +1,107 @@ +package dubbo + +import ( + "bufio" + "bytes" + "fmt" + "time" +) + +import ( + "github.com/dubbogo/hessian2" + jerrors "github.com/juju/errors" +) + +// serial ID +type SerialID byte + +const ( + S_Dubbo SerialID = 2 +) + +// call type +type CallType int32 + +const ( + CT_UNKOWN CallType = 0 + CT_OneWay CallType = 1 + CT_TwoWay CallType = 2 +) + +//////////////////////////////////////////// +// dubbo package +//////////////////////////////////////////// + +type SequenceType int64 + +type DubboPackage struct { + Header hessian.DubboHeader + Service hessian.Service + Body interface{} + Err error +} + +func (p DubboPackage) String() string { + return fmt.Sprintf("DubboPackage: Header-%v, Service-%v, Body-%v", p.Header, p.Service, p.Body) +} + +func (p *DubboPackage) Marshal() (*bytes.Buffer, error) { + codec := hessian.NewHessianCodec(nil) + + pkg, err := codec.Write(p.Service, p.Header, p.Body) + if err != nil { + return nil, jerrors.Trace(err) + } + + return bytes.NewBuffer(pkg), nil +} + +func (p *DubboPackage) Unmarshal(buf *bytes.Buffer) error { + codec := hessian.NewHessianCodec(bufio.NewReader(buf)) + + // read header + err := codec.ReadHeader(&p.Header) + if err != nil { + return jerrors.Trace(err) + } + + if p.Header.Type&hessian.Heartbeat != 0x00 { + return nil + } + + // read body + err = codec.ReadBody(p.Body) + return jerrors.Trace(err) +} + +//////////////////////////////////////////// +// PendingResponse +//////////////////////////////////////////// + +type PendingResponse struct { + seq uint64 + err error + start time.Time + readStart time.Time + callback AsyncCallback + reply interface{} + opts CallOptions + done chan struct{} +} + +func NewPendingResponse() *PendingResponse { + return &PendingResponse{ + start: time.Now(), + done: make(chan struct{}), + } +} + +func (r PendingResponse) GetCallResponse() CallResponse { + return CallResponse{ + Opts: r.opts, + Cause: r.err, + Start: r.start, + ReadStart: r.readStart, + Reply: r.reply, + } +} diff --git a/protocol/dubbo/config.go b/protocol/dubbo/config.go new file mode 100644 index 0000000000000000000000000000000000000000..3d0d5116c49cea3b7035b1aeee7d79f66802dde7 --- /dev/null +++ b/protocol/dubbo/config.go @@ -0,0 +1,136 @@ +package dubbo + +import ( + "time" +) + +import ( + jerrors "github.com/juju/errors" +) + +type ( + GettySessionParam struct { + CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"` + TcpNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"` + TcpKeepAlive bool `default:"true" yaml:"tcp_keep_alive" json:"tcp_keep_alive,omitempty"` + KeepAlivePeriod string `default:"180s" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"` + keepAlivePeriod time.Duration + TcpRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"` + TcpWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"` + PkgRQSize int `default:"1024" yaml:"pkg_rq_size" json:"pkg_rq_size,omitempty"` + PkgWQSize int `default:"1024" yaml:"pkg_wq_size" json:"pkg_wq_size,omitempty"` + TcpReadTimeout string `default:"1s" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"` + tcpReadTimeout time.Duration + TcpWriteTimeout string `default:"5s" yaml:"tcp_write_timeout" json:"tcp_write_timeout,omitempty"` + tcpWriteTimeout time.Duration + WaitTimeout string `default:"7s" yaml:"wait_timeout" json:"wait_timeout,omitempty"` + waitTimeout time.Duration + MaxMsgLen int `default:"1024" yaml:"max_msg_len" json:"max_msg_len,omitempty"` + SessionName string `default:"rpc" yaml:"session_name" json:"session_name,omitempty"` + } + + // Config holds supported types by the multiconfig package + ServerConfig struct { + // local address + //AppName string `default:"rpc-server" yaml:"app_name" json:"app_name,omitempty"` + //Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"` + //Ports []string `yaml:"ports" json:"ports,omitempty"` // `default:["10000"]` + //ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"` + + // session + SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"` + sessionTimeout time.Duration + SessionNumber int `default:"1000" yaml:"session_number" json:"session_number,omitempty"` + + // app + FailFastTimeout string `default:"5s" yaml:"fail_fast_timeout" json:"fail_fast_timeout,omitempty"` + failFastTimeout time.Duration + + // session tcp parameters + GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"` + } + + // Config holds supported types by the multiconfig package + ClientConfig struct { + // local address + //AppName string `default:"rpc-client" yaml:"app_name" json:"app_name,omitempty"` + //Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"` + //ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"` + + // session pool + ConnectionNum int `default:"16" yaml:"connection_number" json:"connection_number,omitempty"` + + // heartbeat + HeartbeatPeriod string `default:"15s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"` + heartbeatPeriod time.Duration + + // session + SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"` + sessionTimeout time.Duration + + // app + FailFastTimeout string `default:"5s" yaml:"fail_fast_timeout" json:"fail_fast_timeout,omitempty"` + failFastTimeout time.Duration + + // Connection Pool + PoolSize int `default:"2" yaml:"pool_size" json:"pool_size,omitempty"` + PoolTTL int `default:"180" yaml:"pool_ttl" json:"pool_ttl,omitempty"` + + // session tcp parameters + GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"` + } +) + +func (c *GettySessionParam) CheckValidity() error { + var err error + + if c.keepAlivePeriod, err = time.ParseDuration(c.KeepAlivePeriod); err != nil { + return jerrors.Annotatef(err, "time.ParseDuration(KeepAlivePeriod{%#v})", c.KeepAlivePeriod) + } + + if c.tcpReadTimeout, err = time.ParseDuration(c.TcpReadTimeout); err != nil { + return jerrors.Annotatef(err, "time.ParseDuration(TcpReadTimeout{%#v})", c.TcpReadTimeout) + } + + if c.tcpWriteTimeout, err = time.ParseDuration(c.TcpWriteTimeout); err != nil { + return jerrors.Annotatef(err, "time.ParseDuration(TcpWriteTimeout{%#v})", c.TcpWriteTimeout) + } + + if c.waitTimeout, err = time.ParseDuration(c.WaitTimeout); err != nil { + return jerrors.Annotatef(err, "time.ParseDuration(WaitTimeout{%#v})", c.WaitTimeout) + } + + return nil +} + +func (c *ClientConfig) CheckValidity() error { + var err error + + if c.heartbeatPeriod, err = time.ParseDuration(c.HeartbeatPeriod); err != nil { + return jerrors.Annotatef(err, "time.ParseDuration(HeartbeatPeroid{%#v})", c.HeartbeatPeriod) + } + + if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil { + return jerrors.Annotatef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout) + } + + if c.failFastTimeout, err = time.ParseDuration(c.FailFastTimeout); err != nil { + return jerrors.Annotatef(err, "time.ParseDuration(FailFastTimeout{%#v})", c.FailFastTimeout) + } + + return jerrors.Trace(c.GettySessionParam.CheckValidity()) +} + +func (c *ServerConfig) CheckValidity() error { + var err error + + if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil { + return jerrors.Annotatef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout) + } + + if c.failFastTimeout, err = time.ParseDuration(c.FailFastTimeout); err != nil { + return jerrors.Annotatef(err, "time.ParseDuration(FailFastTimeout{%#v})", c.FailFastTimeout) + } + + return jerrors.Trace(c.GettySessionParam.CheckValidity()) +} diff --git a/protocol/dubbo/dubbo_exporter.go b/protocol/dubbo/dubbo_exporter.go new file mode 100644 index 0000000000000000000000000000000000000000..70c0c8e11336089e4abb95e1189e6abc1f92b7a8 --- /dev/null +++ b/protocol/dubbo/dubbo_exporter.go @@ -0,0 +1,18 @@ +package dubbo + +import "github.com/dubbo/dubbo-go/protocol" + +// wrapping invoker +type DubboExporter struct { + key string + invoker protocol.Invoker +} + +func (de *DubboExporter) GetInvoker() protocol.Invoker { + return de.invoker + +} + +func (de *DubboExporter) Unexport() { + +} diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..16cc7ad1a8499d1b064aced602d498b0e707d3e5 --- /dev/null +++ b/protocol/dubbo/dubbo_invoker.go @@ -0,0 +1,19 @@ +package dubbo + +import "github.com/dubbo/dubbo-go/config" + +type DubboInvoker struct { + url config.ConfigURL +} + +func (di *DubboInvoker) Invoke() { + +} + +func (di *DubboInvoker) GetURL() config.ConfigURL { + return di.url +} + +func (di *DubboInvoker) Destroy() { + +} diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go new file mode 100644 index 0000000000000000000000000000000000000000..c33088759ea23ebed7b665ec193ae2bede45a6f5 --- /dev/null +++ b/protocol/dubbo/dubbo_protocol.go @@ -0,0 +1,62 @@ +package dubbo + +import ( + log "github.com/AlexStocks/log4go" +) + +import ( + "github.com/dubbo/dubbo-go/common/extension" + "github.com/dubbo/dubbo-go/config" + "github.com/dubbo/dubbo-go/protocol" +) + +const DUBBO = "dubbo" + +func init() { + extension.SetProtocol(DUBBO, GetProtocol) +} + +var dubboProtocol *DubboProtocol + +type DubboProtocol struct { + exporterMap map[string]protocol.Exporter + invokers []protocol.Invoker +} + +func NewDubboProtocol() protocol.Protocol { + return &DubboProtocol{exporterMap: make(map[string]protocol.Exporter)} +} + +func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { + url := invoker.GetURL() + serviceKey := url.Key() + exporter := &DubboExporter{invoker: invoker, key: serviceKey} + dp.exporterMap[serviceKey] = exporter + log.Info("Export service: ", url.String()) + + // start server + dp.openServer(url) + return exporter +} + +func (dp *DubboProtocol) Refer(url config.ConfigURL) protocol.Invoker { + invoker := &DubboInvoker{url: url} + dp.invokers = append(dp.invokers, invoker) + log.Info("Refer service: ", url.String()) + return invoker +} + +func (dp *DubboProtocol) Destroy() { + +} + +func (dp *DubboProtocol) openServer(url config.ConfigURL) { + srv.Start(url) +} + +func GetProtocol() protocol.Protocol { + if dubboProtocol != nil { + return dubboProtocol + } + return NewDubboProtocol() +} diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go new file mode 100644 index 0000000000000000000000000000000000000000..2328c5c15486126a22117a1ff8e022d4e40f9198 --- /dev/null +++ b/protocol/dubbo/listener.go @@ -0,0 +1,290 @@ +package dubbo + +import ( + "context" + "reflect" + "sync" + "time" +) + +import ( + "github.com/AlexStocks/getty" + log "github.com/AlexStocks/log4go" + "github.com/dubbo/dubbo-go/config" + "github.com/dubbogo/hessian2" + jerrors "github.com/juju/errors" +) + +// todo: WritePkg_Timeout will entry *.yml +const WritePkg_Timeout = 5 * time.Second + +var ( + errTooManySessions = jerrors.New("too many sessions") +) + +type rpcSession struct { + session getty.Session + reqNum int32 +} + +//////////////////////////////////////////// +// RpcClientHandler +//////////////////////////////////////////// + +type RpcClientHandler struct { + conn *gettyRPCClient +} + +func NewRpcClientHandler(client *gettyRPCClient) *RpcClientHandler { + return &RpcClientHandler{conn: client} +} + +func (h *RpcClientHandler) OnOpen(session getty.Session) error { + h.conn.addSession(session) + return nil +} + +func (h *RpcClientHandler) OnError(session getty.Session, err error) { + log.Info("session{%s} got error{%v}, will be closed.", session.Stat(), err) + h.conn.removeSession(session) +} + +func (h *RpcClientHandler) OnClose(session getty.Session) { + log.Info("session{%s} is closing......", session.Stat()) + h.conn.removeSession(session) +} + +func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { + p, ok := pkg.(*DubboPackage) + if !ok { + log.Error("illegal package") + return + } + + if p.Header.Type&hessian.Heartbeat != 0x00 { + log.Debug("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body) + return + } + log.Debug("get rpc response{header: %#v, body: %#v}", p.Header, p.Body) + + h.conn.updateSession(session) + + pendingResponse := h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID)) + if pendingResponse == nil { + return + } + + if p.Err != nil { + pendingResponse.err = p.Err + } + + if pendingResponse.callback == nil { + pendingResponse.done <- struct{}{} + } else { + pendingResponse.callback(pendingResponse.GetCallResponse()) + } +} + +func (h *RpcClientHandler) OnCron(session getty.Session) { + rpcSession, err := h.conn.getClientRpcSession(session) + if err != nil { + log.Error("client.getClientSession(session{%s}) = error{%s}", + session.Stat(), jerrors.ErrorStack(err)) + return + } + if h.conn.pool.rpcClient.conf.sessionTimeout.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() { + log.Warn("session{%s} timeout{%s}, reqNum{%d}", + session.Stat(), time.Since(session.GetActive()).String(), rpcSession.reqNum) + h.conn.removeSession(session) // -> h.conn.close() -> h.conn.pool.remove(h.conn) + return + } + + h.conn.pool.rpcClient.heartbeat(session) +} + +//////////////////////////////////////////// +// RpcServerHandler +//////////////////////////////////////////// + +type RpcServerHandler struct { + maxSessionNum int + sessionTimeout time.Duration + sessionMap map[getty.Session]*rpcSession + rwlock sync.RWMutex +} + +func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler { + return &RpcServerHandler{ + maxSessionNum: maxSessionNum, + sessionTimeout: sessionTimeout, + sessionMap: make(map[getty.Session]*rpcSession), + } +} + +func (h *RpcServerHandler) OnOpen(session getty.Session) error { + var err error + h.rwlock.RLock() + if h.maxSessionNum <= len(h.sessionMap) { + err = errTooManySessions + } + h.rwlock.RUnlock() + if err != nil { + return jerrors.Trace(err) + } + + log.Info("got session:%s", session.Stat()) + h.rwlock.Lock() + h.sessionMap[session] = &rpcSession{session: session} + h.rwlock.Unlock() + return nil +} + +func (h *RpcServerHandler) OnError(session getty.Session, err error) { + log.Info("session{%s} got error{%v}, will be closed.", session.Stat(), err) + h.rwlock.Lock() + delete(h.sessionMap, session) + h.rwlock.Unlock() +} + +func (h *RpcServerHandler) OnClose(session getty.Session) { + log.Info("session{%s} is closing......", session.Stat()) + h.rwlock.Lock() + delete(h.sessionMap, session) + h.rwlock.Unlock() +} + +func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { + h.rwlock.Lock() + if _, ok := h.sessionMap[session]; ok { + h.sessionMap[session].reqNum++ + } + h.rwlock.Unlock() + + p, ok := pkg.(*DubboPackage) + if !ok { + log.Error("illegal packge{%#v}", pkg) + return + } + p.Header.ResponseStatus = hessian.Response_OK + + // heartbeat + if p.Header.Type&hessian.Heartbeat != 0x00 { + log.Debug("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body) + h.reply(session, p, hessian.Heartbeat) + return + } + + // twoway + if p.Header.Type&hessian.Request_TwoWay == 0x00 { + h.reply(session, p, hessian.Response) + h.callService(p, nil) + return + } + + h.callService(p, nil) + h.reply(session, p, hessian.Response) +} + +func (h *RpcServerHandler) OnCron(session getty.Session) { + var ( + flag bool + active time.Time + ) + + h.rwlock.RLock() + if _, ok := h.sessionMap[session]; ok { + active = session.GetActive() + if h.sessionTimeout.Nanoseconds() < time.Since(active).Nanoseconds() { + flag = true + log.Warn("session{%s} timeout{%s}, reqNum{%d}", + session.Stat(), time.Since(active).String(), h.sessionMap[session].reqNum) + } + } + h.rwlock.RUnlock() + + if flag { + h.rwlock.Lock() + delete(h.sessionMap, session) + h.rwlock.Unlock() + session.Close() + } +} + +func (h *RpcServerHandler) callService(req *DubboPackage, ctx context.Context) { + + defer func() { + if e := recover(); e != nil { + req.Header.ResponseStatus = hessian.Response_BAD_REQUEST + if err, ok := e.(error); ok { + log.Error("callService panic: %#v", err) + req.Body = e.(error) + } else if err, ok := e.(string); ok { + log.Error("callService panic: %#v", jerrors.New(err)) + req.Body = jerrors.New(err) + } else { + log.Error("callService panic: %#v", e) + req.Body = e + } + } + }() + + svc := req.Body.(map[string]interface{})["service"].(*config.Service) + method := svc.Method()[req.Service.Method] + + // prepare argv + var argv reflect.Value + argIsValue := false // if true, need to indirect before calling. + if method.ArgType().Kind() == reflect.Ptr { + argv = reflect.New(method.ArgType().Elem()) + } else { + argv = reflect.New(method.ArgType()) + argIsValue = true + } + argvTmp := argv.Interface() + argvTmp = req.Body.(map[string]interface{})["args"] // type is []interface + if argIsValue { + argv = argv.Elem() + } + + // prepare replyv + replyv := reflect.New(method.ReplyType().Elem()) + var returnValues []reflect.Value + if method.CtxType == nil { + returnValues = method.Method().Func.Call([]reflect.Value{svc.Rcvr(), reflect.ValueOf(argvTmp), reflect.ValueOf(replyv.Interface())}) + } else { + if contextv := reflect.ValueOf(ctx); contextv.IsValid() { + returnValues = method.Method().Func.Call([]reflect.Value{svc.Rcvr(), contextv, reflect.ValueOf(argvTmp), reflect.ValueOf(replyv.Interface())}) + } else { + returnValues = method.Method().Func.Call([]reflect.Value{svc.Rcvr(), reflect.Zero(method.CtxType()), reflect.ValueOf(argvTmp), reflect.ValueOf(replyv.Interface())}) + } + } + + // The return value for the method is an error. + if retErr := returnValues[0].Interface(); retErr != nil { + req.Header.ResponseStatus = hessian.Response_SERVER_ERROR + req.Body = retErr.(error) + } else { + req.Body = replyv.Interface() + } +} + +func (h *RpcServerHandler) reply(session getty.Session, req *DubboPackage, tp hessian.PackgeType) { + resp := &DubboPackage{ + Header: hessian.DubboHeader{ + SerialID: req.Header.SerialID, + Type: tp, + ID: req.Header.ID, + ResponseStatus: req.Header.ResponseStatus, + }, + } + + if req.Header.Type&hessian.Request != 0x00 { + resp.Body = req.Body + } else { + resp.Body = nil + } + + if err := session.WritePkg(resp, WritePkg_Timeout); err != nil { + log.Error("WritePkg error: %#v, %#v", jerrors.Trace(err), req.Header) + } +} diff --git a/protocol/dubbo/pool.go b/protocol/dubbo/pool.go new file mode 100644 index 0000000000000000000000000000000000000000..2cc9cc7ac99142b2ad32beea6cc39722d0dd9a39 --- /dev/null +++ b/protocol/dubbo/pool.go @@ -0,0 +1,346 @@ +package dubbo + +import ( + "fmt" + "math/rand" + "net" + "strings" + "sync" + "time" +) + +import ( + "github.com/AlexStocks/getty" + log "github.com/AlexStocks/log4go" + jerrors "github.com/juju/errors" +) + +type gettyRPCClient struct { + once sync.Once + protocol string + addr string + created int64 // 涓�0锛屽垯璇存槑娌℃湁琚垱寤烘垨鑰呰閿€姣佷簡 + + pool *gettyRPCClientPool + + lock sync.RWMutex + gettyClient getty.Client + sessions []*rpcSession +} + +var ( + errClientPoolClosed = jerrors.New("client pool closed") +) + +func newGettyRPCClientConn(pool *gettyRPCClientPool, protocol, addr string) (*gettyRPCClient, error) { + c := &gettyRPCClient{ + protocol: protocol, + addr: addr, + pool: pool, + gettyClient: getty.NewTCPClient( + getty.WithServerAddress(addr), + getty.WithConnectionNumber((int)(pool.rpcClient.conf.ConnectionNum)), + ), + } + c.gettyClient.RunEventLoop(c.newSession) + idx := 1 + for { + idx++ + if c.isAvailable() { + break + } + + if idx > 5000 { + return nil, jerrors.New(fmt.Sprintf("failed to create client connection to %s in 5 seconds", addr)) + } + time.Sleep(1e6) + } + log.Info("client init ok") + c.created = time.Now().Unix() + + return c, nil +} + +func (c *gettyRPCClient) newSession(session getty.Session) error { + var ( + ok bool + tcpConn *net.TCPConn + conf ClientConfig + ) + + conf = c.pool.rpcClient.conf + if conf.GettySessionParam.CompressEncoding { + session.SetCompressType(getty.CompressZip) + } + + if tcpConn, ok = session.Conn().(*net.TCPConn); !ok { + panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn())) + } + + tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay) + tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive) + if conf.GettySessionParam.TcpKeepAlive { + tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod) + } + tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize) + tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize) + + session.SetName(conf.GettySessionParam.SessionName) + session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen) + session.SetPkgHandler(NewRpcClientPackageHandler(c.pool.rpcClient)) + session.SetEventListener(NewRpcClientHandler(c)) + session.SetRQLen(conf.GettySessionParam.PkgRQSize) + session.SetWQLen(conf.GettySessionParam.PkgWQSize) + session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout) + session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout) + session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6)) + session.SetWaitTime(conf.GettySessionParam.waitTimeout) + log.Debug("client new session:%s\n", session.Stat()) + + return nil +} + +func (c *gettyRPCClient) selectSession() getty.Session { + c.lock.RLock() + defer c.lock.RUnlock() + + if c.sessions == nil { + return nil + } + + count := len(c.sessions) + if count == 0 { + return nil + } + return c.sessions[rand.Int31n(int32(count))].session +} + +func (c *gettyRPCClient) addSession(session getty.Session) { + log.Debug("add session{%s}", session.Stat()) + if session == nil { + return + } + + c.lock.Lock() + c.sessions = append(c.sessions, &rpcSession{session: session}) + c.lock.Unlock() +} + +func (c *gettyRPCClient) removeSession(session getty.Session) { + if session == nil { + return + } + + c.lock.Lock() + defer c.lock.Unlock() + if c.sessions == nil { + return + } + + for i, s := range c.sessions { + if s.session == session { + c.sessions = append(c.sessions[:i], c.sessions[i+1:]...) + log.Debug("delete session{%s}, its index{%d}", session.Stat(), i) + break + } + } + log.Info("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions)) + if len(c.sessions) == 0 { + c.close() // -> pool.remove(c) + } +} + +func (c *gettyRPCClient) updateSession(session getty.Session) { + if session == nil { + return + } + c.lock.Lock() + defer c.lock.Unlock() + if c.sessions == nil { + return + } + + for i, s := range c.sessions { + if s.session == session { + c.sessions[i].reqNum++ + break + } + } +} + +func (c *gettyRPCClient) getClientRpcSession(session getty.Session) (rpcSession, error) { + var ( + err error + rpcSession rpcSession + ) + c.lock.Lock() + defer c.lock.Unlock() + if c.sessions == nil { + return rpcSession, errClientClosed + } + + err = errSessionNotExist + for _, s := range c.sessions { + if s.session == session { + rpcSession = *s + err = nil + break + } + } + + return rpcSession, jerrors.Trace(err) +} + +func (c *gettyRPCClient) isAvailable() bool { + if c.selectSession() == nil { + return false + } + + return true +} + +func (c *gettyRPCClient) close() error { + err := jerrors.Errorf("close gettyRPCClient{%#v} again", c) + c.once.Do(func() { + // delete @c from client pool + c.pool.remove(c) + for _, s := range c.sessions { + log.Info("close client session{%s, last active:%s, request number:%d}", + s.session.Stat(), s.session.GetActive().String(), s.reqNum) + s.session.Close() + } + c.gettyClient.Close() + c.gettyClient = nil + c.sessions = c.sessions[:0] + + c.created = 0 + err = nil + }) + return err +} + +type gettyRPCClientPool struct { + rpcClient *Client + size int // []*gettyRPCClient鏁扮粍鐨剆ize + ttl int64 // 姣忎釜gettyRPCClient鐨勬湁鏁堟湡鏃堕棿. pool瀵硅薄浼氬湪getConn鏃舵墽琛宼tl妫€鏌� + + sync.Mutex + connMap map[string][]*gettyRPCClient // 浠嶽]*gettyRPCClient 鍙key鏄繛鎺ュ湴鍧€锛岃€寁alue鏄搴旇繖涓湴鍧€鐨勮繛鎺ユ暟缁� +} + +func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientPool { + return &gettyRPCClientPool{ + rpcClient: rpcClient, + size: size, + ttl: int64(ttl.Seconds()), + connMap: make(map[string][]*gettyRPCClient), + } +} + +func (p *gettyRPCClientPool) close() { + p.Lock() + connMap := p.connMap + p.connMap = nil + p.Unlock() + for _, connArray := range connMap { + for _, conn := range connArray { + conn.close() + } + } +} + +func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPCClient, error) { + var builder strings.Builder + + builder.WriteString(addr) + builder.WriteString("@") + builder.WriteString(protocol) + + key := builder.String() + + p.Lock() + defer p.Unlock() + if p.connMap == nil { + return nil, errClientPoolClosed + } + + connArray := p.connMap[key] + now := time.Now().Unix() + + for len(connArray) > 0 { + conn := connArray[len(connArray)-1] + connArray = connArray[:len(connArray)-1] + p.connMap[key] = connArray + + if d := now - conn.created; d > p.ttl { + conn.close() // -> pool.remove(c) + continue + } + + return conn, nil + } + + // create new conn + return newGettyRPCClientConn(p, protocol, addr) +} + +func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) { + if conn == nil || conn.created == 0 { + return + } + if err != nil { + conn.close() + return + } + + var builder strings.Builder + + builder.WriteString(conn.addr) + builder.WriteString("@") + builder.WriteString(conn.protocol) + + key := builder.String() + + p.Lock() + defer p.Unlock() + if p.connMap == nil { + return + } + + connArray := p.connMap[key] + if len(connArray) >= p.size { + conn.close() + return + } + p.connMap[key] = append(connArray, conn) +} + +func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) { + if conn == nil || conn.created == 0 { + return + } + + var builder strings.Builder + + builder.WriteString(conn.addr) + builder.WriteString("@") + builder.WriteString(conn.protocol) + + key := builder.String() + + p.Lock() + defer p.Unlock() + if p.connMap == nil { + return + } + + connArray := p.connMap[key] + if len(connArray) > 0 { + for idx, c := range connArray { + if conn == c { + p.connMap[key] = append(connArray[:idx], connArray[idx+1:]...) + break + } + } + } +} diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go new file mode 100644 index 0000000000000000000000000000000000000000..be720e0090266674093bc127d4a59ea09cbd177e --- /dev/null +++ b/protocol/dubbo/readwriter.go @@ -0,0 +1,132 @@ +package dubbo + +import ( + "bytes" + "github.com/dubbo/dubbo-go/config" + "reflect" +) + +import ( + "github.com/AlexStocks/getty" + log "github.com/AlexStocks/log4go" + jerrors "github.com/juju/errors" +) + +//////////////////////////////////////////// +// RpcClientPackageHandler +//////////////////////////////////////////// + +type RpcClientPackageHandler struct { + client *Client +} + +func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler { + return &RpcClientPackageHandler{client: client} +} + +func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { + pkg := &DubboPackage{ + Body: p.client.pendingResponses[SequenceType(int64(p.client.sequence.Load()))].reply, + } + + buf := bytes.NewBuffer(data) + err := pkg.Unmarshal(buf) + if err != nil { + pkg.Err = jerrors.Trace(err) // client will get this err + return pkg, len(data), nil + } + + return pkg, len(data), nil +} + +func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) error { + req, ok := pkg.(*DubboPackage) + if !ok { + log.Error("illegal pkg:%+v\n", pkg) + return jerrors.New("invalid rpc request") + } + + buf, err := req.Marshal() + if err != nil { + log.Warn("binary.Write(req{%#v}) = err{%#v}", req, jerrors.ErrorStack(err)) + return jerrors.Trace(err) + } + + return jerrors.Trace(ss.WriteBytes(buf.Bytes())) +} + +//////////////////////////////////////////// +// RpcServerPackageHandler +//////////////////////////////////////////// + +type RpcServerPackageHandler struct { +} + +func NewRpcServerPackageHandler() *RpcServerPackageHandler { + return &RpcServerPackageHandler{} +} + +func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { + pkg := &DubboPackage{ + Body: make([]interface{}, 7), + } + + buf := bytes.NewBuffer(data) + err := pkg.Unmarshal(buf) + if err != nil { + return nil, 0, jerrors.Trace(err) + } + // convert params of request + req := pkg.Body.([]interface{}) // length of body should be 7 + if len(req) > 0 { + var dubboVersion, argsTypes string + var args []interface{} + var attachments map[interface{}]interface{} + if req[0] != nil { + dubboVersion = req[0].(string) + } + if req[1] != nil { + pkg.Service.Target = req[1].(string) + } + if req[2] != nil { + pkg.Service.Version = req[2].(string) + } + if req[3] != nil { + pkg.Service.Method = req[3].(string) + } + if req[4] != nil { + argsTypes = req[4].(string) + } + if req[5] != nil { + args = req[5].([]interface{}) + } + if req[6] != nil { + attachments = req[6].(map[interface{}]interface{}) + } + pkg.Body = map[string]interface{}{ + "dubboVersion": dubboVersion, + "argsTypes": argsTypes, + "args": args, + "service": config.ServiceMap.GetService(pkg.Service.Target), + "attachments": attachments, + } + } + + return pkg, len(data), nil +} + +func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) error { + res, ok := pkg.(*DubboPackage) + if !ok { + log.Error("illegal pkg:%+v\n, it is %+v", pkg, reflect.TypeOf(pkg)) + return jerrors.New("invalid rpc response") + } + + buf, err := res.Marshal() + if err != nil { + log.Warn("binary.Write(res{%#v}) = err{%#v}", res, jerrors.ErrorStack(err)) + return jerrors.Trace(err) + } + + return jerrors.Trace(ss.WriteBytes(buf.Bytes())) +} diff --git a/protocol/dubbo/server.go b/protocol/dubbo/server.go new file mode 100644 index 0000000000000000000000000000000000000000..714707aa5ac2af357344750139bf1d3ff1dec682 --- /dev/null +++ b/protocol/dubbo/server.go @@ -0,0 +1,126 @@ +package dubbo + +import ( + "fmt" + "gopkg.in/yaml.v2" + "io/ioutil" + "net" + "os" +) + +import ( + "github.com/AlexStocks/getty" + log "github.com/AlexStocks/log4go" + jerrors "github.com/juju/errors" +) + +import ( + "github.com/dubbo/dubbo-go/config" +) + +var srv = NewServer() + +const CONF_SERVER_FILE_PATH = "CONF_SERVER_FILE_PATH" + +type Server struct { + conf ServerConfig + tcpServerList []getty.Server +} + +func NewServer() *Server { + + s := &Server{} + + // load serverconfig from *.yml + path := os.Getenv(CONF_SERVER_FILE_PATH) + if path == "" { + log.Info("CONF_SERVER_FILE_PATH is null") + return s + } + + file, err := ioutil.ReadFile(path) + if err != nil { + log.Error(jerrors.Trace(err)) + return s + } + + conf := &ServerConfig{} + err = yaml.Unmarshal(file, conf) + if err != nil { + log.Error(jerrors.Trace(err)) + return s + } + + if err := conf.CheckValidity(); err != nil { + log.Error("ServerConfig check failed: ", err) + return s + } + + s.conf = *conf + + return s +} + +func (s *Server) newSession(session getty.Session) error { + var ( + ok bool + tcpConn *net.TCPConn + ) + conf := s.conf + + if conf.GettySessionParam.CompressEncoding { + session.SetCompressType(getty.CompressZip) + } + + if tcpConn, ok = session.Conn().(*net.TCPConn); !ok { + panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn())) + } + + tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay) + tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive) + if conf.GettySessionParam.TcpKeepAlive { + tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod) + } + tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize) + tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize) + + session.SetName(conf.GettySessionParam.SessionName) + session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen) + session.SetPkgHandler(NewRpcServerPackageHandler()) // TODO: now, a server will bind all service + session.SetEventListener(NewRpcServerHandler(conf.SessionNumber, conf.sessionTimeout)) + session.SetRQLen(conf.GettySessionParam.PkgRQSize) + session.SetWQLen(conf.GettySessionParam.PkgWQSize) + session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout) + session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout) + session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6)) + session.SetWaitTime(conf.GettySessionParam.waitTimeout) + log.Debug("app accepts new session:%s\n", session.Stat()) + + return nil +} + +func (s *Server) Start(url config.ConfigURL) { + var ( + addr string + tcpServer getty.Server + ) + + addr = url.Location() + tcpServer = getty.NewTCPServer( + getty.WithLocalAddress(addr), + ) + tcpServer.RunEventLoop(s.newSession) + log.Debug("s bind addr{%s} ok!", addr) + s.tcpServerList = append(s.tcpServerList, tcpServer) + +} + +func (s *Server) Stop() { + list := s.tcpServerList + s.tcpServerList = nil + if list != nil { + for _, tcpServer := range list { + tcpServer.Close() + } + } +} diff --git a/protocol/invoker.go b/protocol/invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..6fc09b9091a3cdfcef515e012b5622d3c1fd624d --- /dev/null +++ b/protocol/invoker.go @@ -0,0 +1,10 @@ +package protocol + +import "github.com/dubbo/dubbo-go/config" + +// Extension - Invoker +type Invoker interface { + Invoke() + GetURL() config.ConfigURL + Destroy() +} diff --git a/protocol/protocol.go b/protocol/protocol.go index ce6ef68bb3ae44617fda13bf2e5ab9095b6e6b99..b02ca71c7eb87ab601913bf141ea117d9fa0bfa0 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -4,7 +4,12 @@ import "github.com/dubbo/dubbo-go/config" // Extension - Protocol type Protocol interface { - Export() - Refer(url config.ConfigURL) + Export(invoker Invoker) Exporter + Refer(url config.ConfigURL) Invoker Destroy() } + +type Exporter interface { + GetInvoker() Invoker + Unexport() +} diff --git a/registry/protocol.go b/registry/protocol.go index 89342d70ea3a7a7899d5541671b39e2db617096c..52f83fb1238be74e56ec6314c96aa7048700dc10 100644 --- a/registry/protocol.go +++ b/registry/protocol.go @@ -37,5 +37,4 @@ func (*RegistryProtocol) Export() { } func (*RegistryProtocol) Destroy() { - } diff --git a/registry/zookeeper/consumer.go b/registry/zookeeper/consumer.go index fd59e30bceca1cd43da2bcb9fb4ddd4d66456380..087454fb24c475624b292f60096394b1c009f802 100644 --- a/registry/zookeeper/consumer.go +++ b/registry/zookeeper/consumer.go @@ -114,7 +114,7 @@ func (r *ZkRegistry) getListener(conf config.ConfigURL) (*zkEventListener, error // listen r.cltLock.Lock() for _, svs := range r.services { - if svs.ConfigURLEqual(conf){ + if svs.ConfigURLEqual(conf) { go zkListener.listenServiceEvent(svs) } }