diff --git a/README.md b/README.md index dcca56ce4b7c47b3f08caff241b0da1be4709b57..617eb03bdc03827fe882f47c46ae62bf3fbb026e 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ Finished List: Working List: - Cluster Strategy: Failfast/Failsafe/Failback/Forking -- Load Balance: RoundRobin/LeastActive/ConsistentHash +- Load Balance: ConsistentHash - Filter: TokenFilter/AccessLogFilter/CountFilter/ActiveLimitFilter/ExecuteLimitFilter/GenericFilter/TpsLimitFilter - Registry: etcd/k8s/consul diff --git a/README_CN.md b/README_CN.md index 20a9b41e209b487abb9103385e149f2417d79fc1..14ba3dd8fed983699e06fd113d6d4f6f5e90aee6 100644 --- a/README_CN.md +++ b/README_CN.md @@ -37,7 +37,7 @@ Apache License, Version 2.0 开发中列表: - Cluster Strategy: Failfast/Failsafe/Failback/Forking -- Load Balance: RoundRobin/LeastActive/ConsistentHash +- Load Balance: ConsistentHash - Filter: TokenFilter/AccessLogFilter/CountFilter/ActiveLimitFilter/ExecuteLimitFilter/GenericFilter/TpsLimitFilter - Registry: etcd/k8s/consul diff --git a/config/service_config.go b/config/service_config.go index 9db0f99facbc69671153a713b5ff4a347e32d749..958737ca2d77045f23e33880d2c5767f8c6f4f2c 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -19,6 +19,7 @@ package config import ( "context" + "fmt" "net/url" "strconv" "strings" @@ -37,6 +38,7 @@ import ( "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/protocolwrapper" ) type ServiceConfig struct { @@ -109,23 +111,33 @@ func (srvconfig *ServiceConfig) Export() error { common.WithParams(urlMap), common.WithMethods(strings.Split(methods, ","))) - for _, regUrl := range regUrls { - regUrl.SubURL = url - - srvconfig.cacheMutex.Lock() - if srvconfig.cacheProtocol == nil { - logger.Infof("First load the registry protocol!") - srvconfig.cacheProtocol = extension.GetProtocol("registry") + if len(regUrls) > 0 { + for _, regUrl := range regUrls { + regUrl.SubURL = url + + srvconfig.cacheMutex.Lock() + if srvconfig.cacheProtocol == nil { + logger.Infof(fmt.Sprintf("First load the registry protocol , url is {%v}!", url)) + srvconfig.cacheProtocol = extension.GetProtocol("registry") + } + srvconfig.cacheMutex.Unlock() + + invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*regUrl) + exporter := srvconfig.cacheProtocol.Export(invoker) + if exporter == nil { + panic(perrors.New(fmt.Sprintf("Registry protocol new exporter error,registry is {%v},url is {%v}", regUrl, url))) + } + srvconfig.exporters = append(srvconfig.exporters, exporter) } - srvconfig.cacheMutex.Unlock() - - invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*regUrl) - exporter := srvconfig.cacheProtocol.Export(invoker) + } else { + invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*url) + exporter := extension.GetProtocol(protocolwrapper.FILTER).Export(invoker) if exporter == nil { - panic(perrors.New("New exporter error")) + panic(perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error,url is {%v}", url))) } srvconfig.exporters = append(srvconfig.exporters, exporter) } + } return nil diff --git a/go.mod b/go.mod index 4c2276429ae315a44c282a8af9fb7e6d7ee1be8e..213d7ee8e8e530c26e1f80fe9deecf5fa0a6b714 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module github.com/apache/dubbo-go require ( github.com/dubbogo/getty v1.0.7 - github.com/dubbogo/hessian2 v1.0.1 + github.com/dubbogo/hessian2 v1.0.2 github.com/pkg/errors v0.8.1 github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec github.com/stretchr/testify v1.3.0 diff --git a/go.sum b/go.sum index a4fd7f5099b5521b03e03cf5958c641991433d39..f39e814c86a6d7b32e699ac36198f4965937f44c 100644 --- a/go.sum +++ b/go.sum @@ -3,8 +3,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dubbogo/getty v1.0.7 h1:5Hg+JwXyCKm9Yr4yJkm98ahhnoa8c2h6br5QJxwQ+YU= github.com/dubbogo/getty v1.0.7/go.mod h1:cRMSuoCmwc5lULFFnYZTxyCfZhObmRTNbS7XRnPNHSo= -github.com/dubbogo/hessian2 v1.0.1 h1:ztI7gJxR3Isxrrl2jE1IZKX61eNR93eRKGhn49vPEX8= -github.com/dubbogo/hessian2 v1.0.1/go.mod h1:XFGDn4oSZX26zkcfhkM/fCJrOqwQJxk/xgWW1KMJBKM= +github.com/dubbogo/hessian2 v1.0.2 h1:Ka9Z32ZszGAdCpgrGuZQmwkT0qe1pd3o9r7ERCDnSlQ= +github.com/dubbogo/hessian2 v1.0.2/go.mod h1:XFGDn4oSZX26zkcfhkM/fCJrOqwQJxk/xgWW1KMJBKM= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go index 6d789ae13e458249747f660c406ab2fe4c6463f0..529aa759a5c39a467a1f72560d56c1b48738a9a6 100644 --- a/protocol/dubbo/readwriter.go +++ b/protocol/dubbo/readwriter.go @@ -24,7 +24,7 @@ import ( import ( "github.com/dubbogo/getty" - "github.com/dubbogo/hessian2" + hessian "github.com/dubbogo/hessian2" perrors "github.com/pkg/errors" ) import ( @@ -89,12 +89,11 @@ func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) error // RpcServerPackageHandler //////////////////////////////////////////// -type RpcServerPackageHandler struct { -} +var ( + rpcServerPkgHandler = &RpcServerPackageHandler{} +) -func NewRpcServerPackageHandler() *RpcServerPackageHandler { - return &RpcServerPackageHandler{} -} +type RpcServerPackageHandler struct{} func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { pkg := &DubboPackage{ diff --git a/protocol/dubbo/server.go b/protocol/dubbo/server.go index 80568c61bb44b540df0d0689273f22d206411ca4..8bed30f7465bbe22e8c61b5ca887da1c668a3ba9 100644 --- a/protocol/dubbo/server.go +++ b/protocol/dubbo/server.go @@ -79,6 +79,8 @@ type Server struct { conf ServerConfig tcpServer getty.Server exporter protocol.Exporter + + rpcHandler *RpcServerHandler } func NewServer(exporter protocol.Exporter) *Server { @@ -88,6 +90,8 @@ func NewServer(exporter protocol.Exporter) *Server { conf: *srvConf, } + s.rpcHandler = NewRpcServerHandler(s.exporter, s.conf.SessionNumber, s.conf.sessionTimeout) + return s } @@ -116,8 +120,8 @@ func (s *Server) newSession(session getty.Session) error { session.SetName(conf.GettySessionParam.SessionName) session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen) - session.SetPkgHandler(NewRpcServerPackageHandler()) - session.SetEventListener(NewRpcServerHandler(s.exporter, conf.SessionNumber, conf.sessionTimeout)) + session.SetPkgHandler(rpcServerPkgHandler) + session.SetEventListener(s.rpcHandler) session.SetRQLen(conf.GettySessionParam.PkgRQSize) session.SetWQLen(conf.GettySessionParam.PkgWQSize) session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)