diff --git a/cluster/support/base_cluster_invoker.go b/cluster/cluster_impl/base_cluster_invoker.go similarity index 96% rename from cluster/support/base_cluster_invoker.go rename to cluster/cluster_impl/base_cluster_invoker.go index 1ae288697dc2b4139494d2ae8460eade1b8789d2..913ffe313c8dee18ef1558210104f6cbe6557cf3 100644 --- a/cluster/support/base_cluster_invoker.go +++ b/cluster/cluster_impl/base_cluster_invoker.go @@ -2,13 +2,13 @@ package cluster import ( gxnet "github.com/AlexStocks/goext/net" + "github.com/dubbo/go-for-apache-dubbo/common" jerrors "github.com/juju/errors" "github.com/tevino/abool" ) import ( "github.com/dubbo/go-for-apache-dubbo/cluster" - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol" "github.com/dubbo/go-for-apache-dubbo/version" ) @@ -26,7 +26,7 @@ func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker { destroyed: abool.NewBool(false), } } -func (invoker *baseClusterInvoker) GetUrl() config.URL { +func (invoker *baseClusterInvoker) GetUrl() common.URL { return invoker.directory.GetUrl() } diff --git a/cluster/support/failover_cluster.go b/cluster/cluster_impl/failover_cluster.go similarity index 100% rename from cluster/support/failover_cluster.go rename to cluster/cluster_impl/failover_cluster.go diff --git a/cluster/support/failover_cluster_invoker.go b/cluster/cluster_impl/failover_cluster_invoker.go similarity index 100% rename from cluster/support/failover_cluster_invoker.go rename to cluster/cluster_impl/failover_cluster_invoker.go diff --git a/cluster/support/mock_cluster.go b/cluster/cluster_impl/mock_cluster.go similarity index 100% rename from cluster/support/mock_cluster.go rename to cluster/cluster_impl/mock_cluster.go diff --git a/cluster/support/registry_aware_cluster.go b/cluster/cluster_impl/registry_aware_cluster.go similarity index 100% rename from cluster/support/registry_aware_cluster.go rename to cluster/cluster_impl/registry_aware_cluster.go diff --git a/cluster/support/registry_aware_cluster_invoker.go b/cluster/cluster_impl/registry_aware_cluster_invoker.go similarity index 100% rename from cluster/support/registry_aware_cluster_invoker.go rename to cluster/cluster_impl/registry_aware_cluster_invoker.go diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go index a0a69304096a02314e91d6bc17bc0448af2a9996..5794c6852c6808ed6a5168da1c9d0ae59a4b15ec 100644 --- a/cluster/directory/base_directory.go +++ b/cluster/directory/base_directory.go @@ -1,26 +1,24 @@ package directory import ( + "github.com/dubbo/go-for-apache-dubbo/common" "github.com/tevino/abool" "sync" ) -import ( - "github.com/dubbo/go-for-apache-dubbo/config" -) type BaseDirectory struct { - url *config.URL + url *common.URL destroyed *abool.AtomicBool mutex sync.Mutex } -func NewBaseDirectory(url *config.URL) BaseDirectory { +func NewBaseDirectory(url *common.URL) BaseDirectory { return BaseDirectory{ url: url, destroyed: abool.NewBool(false), } } -func (dir *BaseDirectory) GetUrl() config.URL { +func (dir *BaseDirectory) GetUrl() common.URL { return *dir.url } diff --git a/cluster/directory/service_array.go b/cluster/directory/service_array.go index d7d8821e4e1ba492c5f090e1b2bcae9a42e7f3b5..f510edc0abda334c26f6812115d0e64657d0de2e 100644 --- a/cluster/directory/service_array.go +++ b/cluster/directory/service_array.go @@ -3,6 +3,7 @@ package directory import ( "context" "fmt" + "github.com/dubbo/go-for-apache-dubbo/common" "strings" "time" ) @@ -11,8 +12,6 @@ import ( jerrors "github.com/juju/errors" ) -import "github.com/dubbo/go-for-apache-dubbo/config" - ////////////////////////////////////////// // registry array // should be returned by registry ,will be used by client & waiting to selector @@ -25,12 +24,12 @@ var ( type ServiceArray struct { context context.Context - arr []config.URL + arr []common.URL birth time.Time idx int64 } -func NewServiceArray(ctx context.Context, arr []config.URL) *ServiceArray { +func NewServiceArray(ctx context.Context, arr []common.URL) *ServiceArray { return &ServiceArray{ context: ctx, arr: arr, @@ -46,7 +45,7 @@ func (s *ServiceArray) GetSize() int64 { return int64(len(s.arr)) } -func (s *ServiceArray) GetService(i int64) config.URL { +func (s *ServiceArray) GetService(i int64) common.URL { return s.arr[i] } @@ -61,12 +60,12 @@ func (s *ServiceArray) String() string { return builder.String() } -func (s *ServiceArray) Add(url config.URL, ttl time.Duration) { +func (s *ServiceArray) Add(url common.URL, ttl time.Duration) { s.arr = append(s.arr, url) s.birth = time.Now().Add(ttl) } -func (s *ServiceArray) Del(url config.URL, ttl time.Duration) { +func (s *ServiceArray) Del(url common.URL, ttl time.Duration) { for i, svc := range s.arr { if svc.PrimitiveURL == url.PrimitiveURL { s.arr = append(s.arr[:i], s.arr[i+1:]...) diff --git a/cluster/directory/static_directory.go b/cluster/directory/static_directory.go index d3117c97d4208410bf0a7c3e2e4fc828bbe903b3..bab58e3c94192947a68e848b70bc20db52495865 100644 --- a/cluster/directory/static_directory.go +++ b/cluster/directory/static_directory.go @@ -1,7 +1,7 @@ package directory import ( - "github.com/dubbo/go-for-apache-dubbo/config" + "github.com/dubbo/go-for-apache-dubbo/common" "github.com/dubbo/go-for-apache-dubbo/protocol" ) @@ -12,7 +12,7 @@ type StaticDirectory struct { func NewStaticDirectory(invokers []protocol.Invoker) *StaticDirectory { return &StaticDirectory{ - BaseDirectory: NewBaseDirectory(&config.URL{}), + BaseDirectory: NewBaseDirectory(&common.URL{}), invokers: invokers, } } diff --git a/cluster/loadbalance.go b/cluster/loadbalance.go index 18f183e937ebd1caeb22b1814ea5190296011e0b..68a09f43b2278c5d77030ff297a74741b2ee6219 100644 --- a/cluster/loadbalance.go +++ b/cluster/loadbalance.go @@ -1,18 +1,18 @@ package cluster import ( + "github.com/dubbo/go-for-apache-dubbo/common" "time" ) import ( "github.com/dubbo/go-for-apache-dubbo/common/constant" - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol" ) // Extension - LoadBalance type LoadBalance interface { - Select([]protocol.Invoker, config.URL, protocol.Invocation) protocol.Invoker + Select([]protocol.Invoker, common.URL, protocol.Invocation) protocol.Invoker } func GetWeight(invoker protocol.Invoker, invocation protocol.Invocation) int64 { diff --git a/cluster/loadbalance/random.go b/cluster/loadbalance/random.go index 6251438f9666d7285eb6e6f25a0741390df3c70f..09c9ff337e10d18cc4d377c770075fe140f829fb 100644 --- a/cluster/loadbalance/random.go +++ b/cluster/loadbalance/random.go @@ -1,13 +1,13 @@ package loadbalance import ( + "github.com/dubbo/go-for-apache-dubbo/common" "math/rand" ) import ( "github.com/dubbo/go-for-apache-dubbo/cluster" "github.com/dubbo/go-for-apache-dubbo/common/extension" - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol" ) @@ -24,7 +24,7 @@ func NewRandomLoadBalance() cluster.LoadBalance { return &RandomLoadBalance{} } -func (lb *RandomLoadBalance) Select(invokers []protocol.Invoker, url config.URL, invocation protocol.Invocation) protocol.Invoker { +func (lb *RandomLoadBalance) Select(invokers []protocol.Invoker, url common.URL, invocation protocol.Invocation) protocol.Invoker { var length int if length = len(invokers); length == 1 { return invokers[0] diff --git a/cluster/router.go b/cluster/router.go index 828af3a180896d19d98e3af6b1ff70997230789e..0d4b98d79627e1ab0a374cc7a95fdd7028155d55 100644 --- a/cluster/router.go +++ b/cluster/router.go @@ -1,24 +1,24 @@ package cluster import ( - "github.com/dubbo/go-for-apache-dubbo/config" + "github.com/dubbo/go-for-apache-dubbo/common" "github.com/dubbo/go-for-apache-dubbo/protocol" ) // Extension - Router type RouterFactory interface { - Router(config.URL) Router + Router(common.URL) Router } type Router interface { - Route([]protocol.Invoker, config.URL, protocol.Invocation) []protocol.Invoker + Route([]protocol.Invoker, common.URL, protocol.Invocation) []protocol.Invoker } type RouterChain struct { routers []Router } -func NewRouterChain(url config.URL) { +func NewRouterChain(url common.URL) { } diff --git a/common/extension/registry.go b/common/extension/registry.go index 82291c84b37dea6a62f913bbdd37d3ed43ec17a4..a3238d65816c0f7242918db673b1c4696967375c 100644 --- a/common/extension/registry.go +++ b/common/extension/registry.go @@ -1,12 +1,12 @@ package extension import ( - "github.com/dubbo/go-for-apache-dubbo/config" + "github.com/dubbo/go-for-apache-dubbo/common" "github.com/dubbo/go-for-apache-dubbo/registry" ) var ( - registrys map[string]func(config *config.URL) (registry.Registry, error) + registrys map[string]func(config *common.URL) (registry.Registry, error) ) /* @@ -14,14 +14,14 @@ it must excute first */ func init() { // init map - registrys = make(map[string]func(config *config.URL) (registry.Registry, error)) + registrys = make(map[string]func(config *common.URL) (registry.Registry, error)) } -func SetRegistry(name string, v func(config *config.URL) (registry.Registry, error)) { +func SetRegistry(name string, v func(config *common.URL) (registry.Registry, error)) { registrys[name] = v } -func GetRegistryExtension(name string, config *config.URL) (registry.Registry, error) { +func GetRegistryExtension(name string, config *common.URL) (registry.Registry, error) { return registrys[name](config) } diff --git a/common/node.go b/common/node.go index 4090f8cdacbe84c5fcd89611a1d987a4482171d1..c788a73665e223fc1f021c6589aa841c1f1edfe7 100644 --- a/common/node.go +++ b/common/node.go @@ -1,9 +1,7 @@ package common -import "github.com/dubbo/go-for-apache-dubbo/config" - type Node interface { - GetUrl() config.URL + GetUrl() URL IsAvailable() bool Destroy() } diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go index 19a27eb4a3f916dc9fdc1d5c9369cbbd88dafd32..fd876da6bebb2ae2501d1eea43e3e3e82dde40bb 100644 --- a/common/proxy/proxy.go +++ b/common/proxy/proxy.go @@ -1,6 +1,7 @@ package proxy import ( + "github.com/dubbo/go-for-apache-dubbo/common" "reflect" ) import ( @@ -8,14 +9,13 @@ import ( ) import ( - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol" - "github.com/dubbo/go-for-apache-dubbo/protocol/support" + "github.com/dubbo/go-for-apache-dubbo/protocol/invocation" ) // Proxy struct type Proxy struct { - rpc config.RPCService + rpc common.RPCService invoke protocol.Invoker callBack interface{} attachments map[string]string @@ -36,7 +36,7 @@ func NewProxy(invoke protocol.Invoker, callBack interface{}, attachments map[str // type XxxProvider struct { // Yyy func(ctx context.Context, args []interface{}, rsp *Zzz) error // } -func (p *Proxy) Implement(v config.RPCService) { +func (p *Proxy) Implement(v common.RPCService) { // check parameters, incoming interface must be a elem's pointer. valueOf := reflect.ValueOf(v) @@ -54,7 +54,7 @@ func (p *Proxy) Implement(v config.RPCService) { makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value { return func(in []reflect.Value) []reflect.Value { log.Info("call method!") - inv := support.NewRPCInvocationForConsumer(methodName, nil, in[1].Interface().([]interface{}), in[2].Interface(), p.callBack, config.URL{}, nil) + inv := invocation.NewRPCInvocationForConsumer(methodName, nil, in[1].Interface().([]interface{}), in[2].Interface(), p.callBack, common.URL{}, nil) for k, v := range p.attachments { inv.SetAttachments(k, v) } @@ -113,6 +113,6 @@ func (p *Proxy) Implement(v config.RPCService) { } -func (p *Proxy) Get() config.RPCService { +func (p *Proxy) Get() common.RPCService { return p.rpc } diff --git a/common/proxy/proxy_test.go b/common/proxy/proxy_test.go index 10a051652b5e749ccbfe3f4f70ee63d142230ccc..6d5422ffd4fe340765fbb09157afc791852d3f19 100644 --- a/common/proxy/proxy_test.go +++ b/common/proxy/proxy_test.go @@ -3,6 +3,7 @@ package proxy import ( "context" "errors" + "github.com/dubbo/go-for-apache-dubbo/common" "testing" ) @@ -11,7 +12,6 @@ import ( ) import ( - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol" ) @@ -28,7 +28,7 @@ func (s *TestService) Version() string { func TestProxy_Implement(t *testing.T) { - invoker := protocol.NewBaseInvoker(config.URL{}) + invoker := protocol.NewBaseInvoker(common.URL{}) p := NewProxy(invoker, nil, nil) s := &TestService{MethodOne: func(i context.Context, i2 []interface{}, i3 *struct{}) error { return errors.New("errors") diff --git a/config/rpc_service.go b/common/rpc_service.go similarity index 99% rename from config/rpc_service.go rename to common/rpc_service.go index d3607775305e23a9234b882898bc576452114fcc..8b1243794e480d2ff9323feb54d715f95925a1ef 100644 --- a/config/rpc_service.go +++ b/common/rpc_service.go @@ -1,4 +1,4 @@ -package config +package common import ( "context" diff --git a/config/rpc_service_test.go b/common/rpc_service_test.go similarity index 97% rename from config/rpc_service_test.go rename to common/rpc_service_test.go index ce4edf48f39662e2d1f90ab4c8c9abf1a7f4f6b6..768c83ce47f2fa4915a47a216ed2b79bf3e813bd 100644 --- a/config/rpc_service_test.go +++ b/common/rpc_service_test.go @@ -1,4 +1,4 @@ -package config +package common import ( "context" diff --git a/config/url.go b/common/url.go similarity index 99% rename from config/url.go rename to common/url.go index 54cc24056b5dbf6f20b7442ad5d6ad67545c8e1f..a24a03c1f10e447d66a6a5f0bdef5e878d58807d 100644 --- a/config/url.go +++ b/common/url.go @@ -1,4 +1,4 @@ -package config +package common import ( "context" diff --git a/config/support/application_config.go b/config/application_config.go similarity index 96% rename from config/support/application_config.go rename to config/application_config.go index 367bc20468c19f4d3e70eb83aa06ab60d7b6688e..81d255a345541e5cb414f23c41e93340e0ae010a 100644 --- a/config/support/application_config.go +++ b/config/application_config.go @@ -1,4 +1,4 @@ -package support +package config type ApplicationConfig struct { Organization string `yaml:"organization" json:"organization,omitempty"` diff --git a/config/support/config_loader.go b/config/config_loader.go similarity index 99% rename from config/support/config_loader.go rename to config/config_loader.go index b223615f89aa3912ed63ae8876921e48d9e1b5c2..22c940870e89d99ab533f7a1acc8c25c7389a32f 100644 --- a/config/support/config_loader.go +++ b/config/config_loader.go @@ -1,4 +1,4 @@ -package support +package config import ( "fmt" diff --git a/config/support/config_loader_test.go b/config/config_loader_test.go similarity index 97% rename from config/support/config_loader_test.go rename to config/config_loader_test.go index c96c782207934370ec3be60dc23d638bb932028f..1b3e9a36cb28e7e5e65923bbb44e894f7ef07705 100644 --- a/config/support/config_loader_test.go +++ b/config/config_loader_test.go @@ -1,4 +1,4 @@ -package support +package config import ( "path/filepath" diff --git a/config/support/consumer_config.yml b/config/consumer_config.yml similarity index 100% rename from config/support/consumer_config.yml rename to config/consumer_config.yml diff --git a/config/support/provider_config.yml b/config/provider_config.yml similarity index 100% rename from config/support/provider_config.yml rename to config/provider_config.yml diff --git a/config/support/reference_config.go b/config/reference_config.go similarity index 92% rename from config/support/reference_config.go rename to config/reference_config.go index 7c781223f46ef86f54e4d5521650d733d97b7d23..f61b70d4aa270fffb82bbe84d4784eb8616e60a4 100644 --- a/config/support/reference_config.go +++ b/config/reference_config.go @@ -1,7 +1,8 @@ -package support +package config import ( "context" + "github.com/dubbo/go-for-apache-dubbo/common" "net/url" "strconv" "time" @@ -12,7 +13,6 @@ import ( "github.com/dubbo/go-for-apache-dubbo/common/constant" "github.com/dubbo/go-for-apache-dubbo/common/extension" "github.com/dubbo/go-for-apache-dubbo/common/proxy" - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol" ) @@ -46,8 +46,8 @@ func (refconfig *ReferenceConfig) Refer() { //首先是user specified SubURL, could be peer-to-peer address, or register center's address. //其次是assemble SubURL from register center's configuration模式 - regUrls := loadRegistries(refconfig.Registries, consumerConfig.Registries, config.CONSUMER) - url := config.NewURLWithOptions(refconfig.InterfaceName, config.WithProtocol(refconfig.Protocol), config.WithParams(refconfig.getUrlMap())) + regUrls := loadRegistries(refconfig.Registries, consumerConfig.Registries, common.CONSUMER) + url := common.NewURLWithOptions(refconfig.InterfaceName, common.WithProtocol(refconfig.Protocol), common.WithParams(refconfig.getUrlMap())) //set url to regUrls for _, regUrl := range regUrls { @@ -72,11 +72,11 @@ func (refconfig *ReferenceConfig) Refer() { } // @v is service provider implemented RPCService -func (refconfig *ReferenceConfig) Implement(v config.RPCService) { +func (refconfig *ReferenceConfig) Implement(v common.RPCService) { refconfig.pxy.Implement(v) } -func (refconfig *ReferenceConfig) GetRPCService() config.RPCService { +func (refconfig *ReferenceConfig) GetRPCService() common.RPCService { return refconfig.pxy.Get() } diff --git a/config/support/registry_config.go b/config/registry_config.go similarity index 77% rename from config/support/registry_config.go rename to config/registry_config.go index d77e1a8c39499d59f2b181f6b837ede132ab2e18..fb40a9a108662678cba14bc2b17df057a72e0116 100644 --- a/config/support/registry_config.go +++ b/config/registry_config.go @@ -1,7 +1,8 @@ -package support +package config import ( "context" + "github.com/dubbo/go-for-apache-dubbo/common" "github.com/dubbo/go-for-apache-dubbo/common/constant" "net/url" "strconv" @@ -9,7 +10,6 @@ import ( import ( log "github.com/AlexStocks/log4go" ) -import "github.com/dubbo/go-for-apache-dubbo/config" type RegistryConfig struct { Id string `required:"true" yaml:"id" json:"id,omitempty"` @@ -22,14 +22,14 @@ type RegistryConfig struct { Password string `yaml:"password" json:"address,omitempty"` } -func loadRegistries(registriesIds []ConfigRegistry, registries []RegistryConfig, roleType config.RoleType) []*config.URL { - var urls []*config.URL +func loadRegistries(registriesIds []ConfigRegistry, registries []RegistryConfig, roleType common.RoleType) []*common.URL { + var urls []*common.URL for _, registry := range registriesIds { for _, registryConf := range registries { if string(registry) == registryConf.Id { - url, err := config.NewURL(context.TODO(), constant.REGISTRY_PROTOCOL+"://"+registryConf.Address, config.WithParams(registryConf.getUrlMap(roleType)), - config.WithUsername(registryConf.Username), config.WithPassword(registryConf.Password), + url, err := common.NewURL(context.TODO(), constant.REGISTRY_PROTOCOL+"://"+registryConf.Address, common.WithParams(registryConf.getUrlMap(roleType)), + common.WithUsername(registryConf.Username), common.WithPassword(registryConf.Password), ) if err != nil { @@ -45,7 +45,7 @@ func loadRegistries(registriesIds []ConfigRegistry, registries []RegistryConfig, return urls } -func (regconfig *RegistryConfig) getUrlMap(roleType config.RoleType) url.Values { +func (regconfig *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values { urlMap := url.Values{} urlMap.Set(constant.GROUP_KEY, regconfig.Group) urlMap.Set(constant.ROLE_KEY, strconv.Itoa(int(roleType))) diff --git a/config/service.go b/config/service.go new file mode 100644 index 0000000000000000000000000000000000000000..e7b2070a2560ab7702fef82b35223894f0943325 --- /dev/null +++ b/config/service.go @@ -0,0 +1,28 @@ +package config + +import ( + "github.com/dubbo/go-for-apache-dubbo/common" +) + +var ( + conServices = map[string]common.RPCService{} // service name -> service + proServices = map[string]common.RPCService{} // service name -> service +) + +// SetConService is called by init() of implement of RPCService +func SetConService(service common.RPCService) { + conServices[service.Service()] = service +} + +// SetProService is called by init() of implement of RPCService +func SetProService(service common.RPCService) { + proServices[service.Service()] = service +} + +func GetConService(name string) common.RPCService { + return conServices[name] +} + +func GetProService(name string) common.RPCService { + return proServices[name] +} diff --git a/config/support/service_config.go b/config/service_config.go similarity index 90% rename from config/support/service_config.go rename to config/service_config.go index b5e9f16c48603cc896288d4ea334ca0c44fd7eae..2700c3babc5ffe28a3b5018dab878c9197d21a41 100644 --- a/config/support/service_config.go +++ b/config/service_config.go @@ -1,7 +1,8 @@ -package support +package config import ( "context" + "github.com/dubbo/go-for-apache-dubbo/common" "net/url" "strconv" "strings" @@ -16,7 +17,6 @@ import ( import ( "github.com/dubbo/go-for-apache-dubbo/common/constant" "github.com/dubbo/go-for-apache-dubbo/common/extension" - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol" ) @@ -39,7 +39,7 @@ type ServiceConfig struct { Retries int64 `yaml:"retries" json:"retries,omitempty"` unexported *atomic.Bool exported *atomic.Bool - rpcService config.RPCService + rpcService common.RPCService exporters []protocol.Exporter cacheProtocol protocol.Protocol cacheMutex sync.Mutex @@ -67,12 +67,12 @@ func (srvconfig *ServiceConfig) Export() error { return nil } - regUrls := loadRegistries(srvconfig.Registries, providerConfig.Registries, config.PROVIDER) + regUrls := loadRegistries(srvconfig.Registries, providerConfig.Registries, common.PROVIDER) urlMap := srvconfig.getUrlMap() for _, proto := range loadProtocol(srvconfig.Protocol, providerConfig.Protocols) { //registry the service reflect - methods, err := config.ServiceMap.Register(proto.Name, srvconfig.rpcService) + methods, err := common.ServiceMap.Register(proto.Name, srvconfig.rpcService) if err != nil { err := jerrors.Errorf("The service %v export the protocol %v error! Error message is %v .", srvconfig.InterfaceName, proto.Name, err.Error()) log.Error(err.Error()) @@ -82,12 +82,12 @@ func (srvconfig *ServiceConfig) Export() error { //if contextPath == "" { // contextPath = providerConfig.Path //} - url := config.NewURLWithOptions(srvconfig.InterfaceName, - config.WithProtocol(proto.Name), - config.WithIp(proto.Ip), - config.WithPort(proto.Port), - config.WithParams(urlMap), - config.WithMethods(strings.Split(methods, ","))) + url := common.NewURLWithOptions(srvconfig.InterfaceName, + common.WithProtocol(proto.Name), + common.WithIp(proto.Ip), + common.WithPort(proto.Port), + common.WithParams(urlMap), + common.WithMethods(strings.Split(methods, ","))) for _, regUrl := range regUrls { regUrl.SubURL = url @@ -109,7 +109,7 @@ func (srvconfig *ServiceConfig) Export() error { } -func (srvconfig *ServiceConfig) Implement(s config.RPCService) { +func (srvconfig *ServiceConfig) Implement(s common.RPCService) { srvconfig.rpcService = s } diff --git a/config/support/service.go b/config/support/service.go deleted file mode 100644 index 1e4a9d4a1addc77d7bed9ab28d0b0cba2ce2604e..0000000000000000000000000000000000000000 --- a/config/support/service.go +++ /dev/null @@ -1,26 +0,0 @@ -package support - -import "github.com/dubbo/go-for-apache-dubbo/config" - -var ( - conServices = map[string]config.RPCService{} // service name -> service - proServices = map[string]config.RPCService{} // service name -> service -) - -// SetConService is called by init() of implement of RPCService -func SetConService(service config.RPCService) { - conServices[service.Service()] = service -} - -// SetProService is called by init() of implement of RPCService -func SetProService(service config.RPCService) { - proServices[service.Service()] = service -} - -func GetConService(name string) config.RPCService { - return conServices[name] -} - -func GetProService(name string) config.RPCService { - return proServices[name] -} diff --git a/examples/dubbo/go-client/app/client.go b/examples/dubbo/go-client/app/client.go index 9206448ed7157fb2e1dcedf06de73b7af65d6c16..f984bd8f198b6d56be51ff104da07426347b8aa7 100644 --- a/examples/dubbo/go-client/app/client.go +++ b/examples/dubbo/go-client/app/client.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "github.com/dubbo/go-for-apache-dubbo/config" "net/http" _ "net/http/pprof" "os" @@ -26,10 +27,8 @@ import ( _ "github.com/dubbo/go-for-apache-dubbo/filter/imp" _ "github.com/dubbo/go-for-apache-dubbo/cluster/loadbalance" - _ "github.com/dubbo/go-for-apache-dubbo/cluster/support" + _ "github.com/dubbo/go-for-apache-dubbo/cluster/cluster_impl" _ "github.com/dubbo/go-for-apache-dubbo/registry/zookeeper" - - "github.com/dubbo/go-for-apache-dubbo/config/support" ) var ( @@ -45,7 +44,7 @@ func main() { hessian.RegisterJavaEnum(Gender(WOMAN)) hessian.RegisterPOJO(&User{}) - conMap, _ := support.Load() + conMap, _ := config.Load() if conMap == nil { panic("conMap is nil") } @@ -80,7 +79,7 @@ func main() { } func initProfiling() { - if !support.GetConsumerConfig().Pprof_Enabled { + if !config.GetConsumerConfig().Pprof_Enabled { return } const ( @@ -96,7 +95,7 @@ func initProfiling() { if err != nil { panic("cat not get local ip!") } - addr = ip + ":" + strconv.Itoa(support.GetConsumerConfig().Pprof_Port) + addr = ip + ":" + strconv.Itoa(config.GetConsumerConfig().Pprof_Port) log.Info("App Profiling startup on address{%v}", addr+PprofPath) go func() { diff --git a/examples/dubbo/go-client/app/user.go b/examples/dubbo/go-client/app/user.go index 341196b084932befc3cc693a6087655a6c166167..ed2c0822c03f495f49c6c5d8e5a6665d6b6a286f 100644 --- a/examples/dubbo/go-client/app/user.go +++ b/examples/dubbo/go-client/app/user.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "github.com/dubbo/go-for-apache-dubbo/config" "strconv" "time" ) @@ -11,14 +12,10 @@ import ( "github.com/dubbogo/hessian2" ) -import ( - "github.com/dubbo/go-for-apache-dubbo/config/support" -) - type Gender hessian.JavaEnum func init() { - support.SetConService(new(UserProvider)) + config.SetConService(new(UserProvider)) } const ( diff --git a/examples/dubbo/go-server/app/server.go b/examples/dubbo/go-server/app/server.go index 504b36262cb01d52d33b2e6e6768c60aa1b3bf43..4b25dc1f66fd058bf08362d3c83b9769122dbbd4 100644 --- a/examples/dubbo/go-server/app/server.go +++ b/examples/dubbo/go-server/app/server.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "github.com/dubbo/go-for-apache-dubbo/config" "net/http" _ "net/http/pprof" "os" @@ -18,15 +19,13 @@ import ( ) import ( - "github.com/dubbo/go-for-apache-dubbo/config/support" - _ "github.com/dubbo/go-for-apache-dubbo/protocol/dubbo" _ "github.com/dubbo/go-for-apache-dubbo/registry/protocol" _ "github.com/dubbo/go-for-apache-dubbo/filter/imp" _ "github.com/dubbo/go-for-apache-dubbo/cluster/loadbalance" - _ "github.com/dubbo/go-for-apache-dubbo/cluster/support" + _ "github.com/dubbo/go-for-apache-dubbo/cluster/cluster_impl" _ "github.com/dubbo/go-for-apache-dubbo/registry/zookeeper" ) @@ -45,7 +44,7 @@ func main() { hessian.RegisterPOJO(&User{}) // ------------ - _, proMap := support.Load() + _, proMap := config.Load() if proMap == nil { panic("proMap is nil") } @@ -56,7 +55,7 @@ func main() { } func initProfiling() { - if !support.GetProviderConfig().Pprof_Enabled { + if !config.GetProviderConfig().Pprof_Enabled { return } const ( @@ -72,7 +71,7 @@ func initProfiling() { if err != nil { panic("can not get local ip!") } - addr = ip + ":" + strconv.Itoa(support.GetProviderConfig().Pprof_Port) + addr = ip + ":" + strconv.Itoa(config.GetProviderConfig().Pprof_Port) log.Info("App Profiling startup on address{%v}", addr+PprofPath) go func() { diff --git a/examples/dubbo/go-server/app/user.go b/examples/dubbo/go-server/app/user.go index a6a51bb9b92a2cf69359fab69b5a444e3bb06a34..8565f8071929a2cc194039df00bc7f9fe36e4b3a 100644 --- a/examples/dubbo/go-server/app/user.go +++ b/examples/dubbo/go-server/app/user.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "github.com/dubbo/go-for-apache-dubbo/config" "strconv" "time" ) @@ -12,14 +13,10 @@ import ( "github.com/dubbogo/hessian2" ) -import ( - "github.com/dubbo/go-for-apache-dubbo/config/support" -) - type Gender hessian.JavaEnum func init() { - support.SetProService(new(UserProvider)) + config.SetProService(new(UserProvider)) } const ( diff --git a/examples/jsonrpc/go-client/app/client.go b/examples/jsonrpc/go-client/app/client.go index aacb37e4006204ec69ff5a148f12b47e2e76b29f..a315eef5c0d68a71af4e64f7f580939bae64b79d 100644 --- a/examples/jsonrpc/go-client/app/client.go +++ b/examples/jsonrpc/go-client/app/client.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "github.com/dubbo/go-for-apache-dubbo/config" "net/http" _ "net/http/pprof" "os" @@ -25,10 +26,8 @@ import ( _ "github.com/dubbo/go-for-apache-dubbo/filter/imp" _ "github.com/dubbo/go-for-apache-dubbo/cluster/loadbalance" - _ "github.com/dubbo/go-for-apache-dubbo/cluster/support" + _ "github.com/dubbo/go-for-apache-dubbo/cluster/cluster_impl" _ "github.com/dubbo/go-for-apache-dubbo/registry/zookeeper" - - "github.com/dubbo/go-for-apache-dubbo/config/support" ) var ( @@ -40,7 +39,7 @@ var ( // export APP_LOG_CONF_FILE="xxx" func main() { - conMap, _ := support.Load() + conMap, _ := config.Load() if conMap == nil { panic("conMap is nil") } @@ -75,7 +74,7 @@ func main() { } func initProfiling() { - if !support.GetConsumerConfig().Pprof_Enabled { + if !config.GetConsumerConfig().Pprof_Enabled { return } const ( @@ -91,7 +90,7 @@ func initProfiling() { if err != nil { panic("cat not get local ip!") } - addr = ip + ":" + strconv.Itoa(support.GetConsumerConfig().Pprof_Port) + addr = ip + ":" + strconv.Itoa(config.GetConsumerConfig().Pprof_Port) log.Info("App Profiling startup on address{%v}", addr+PprofPath) go func() { diff --git a/examples/jsonrpc/go-client/app/user.go b/examples/jsonrpc/go-client/app/user.go index ed5f11f700fd83c4387080ca523478ad3dc089cd..a6010939e34a15fd2a8585f0c5da1805c4e66410 100644 --- a/examples/jsonrpc/go-client/app/user.go +++ b/examples/jsonrpc/go-client/app/user.go @@ -3,18 +3,15 @@ package main import ( "context" "fmt" + "github.com/dubbo/go-for-apache-dubbo/config" ) import ( "github.com/AlexStocks/goext/time" ) -import ( - "github.com/dubbo/go-for-apache-dubbo/config/support" -) - func init() { - support.SetConService(new(UserProvider)) + config.SetConService(new(UserProvider)) } type JsonRPCUser struct { diff --git a/examples/jsonrpc/go-client/benchmark/benchmark.go b/examples/jsonrpc/go-client/benchmark/benchmark.go index 766df5a57069aa85c393f463ff8ad1033bdc661a..ecf631f32818933231a128d88a9823d8383ec123 100644 --- a/examples/jsonrpc/go-client/benchmark/benchmark.go +++ b/examples/jsonrpc/go-client/benchmark/benchmark.go @@ -4,6 +4,7 @@ import ( "context" "errors" "flag" + "github.com/dubbo/go-for-apache-dubbo/config" "log" "sync" "sync/atomic" @@ -16,14 +17,13 @@ import ( ) import ( - "github.com/dubbo/go-for-apache-dubbo/config/support" _ "github.com/dubbo/go-for-apache-dubbo/protocol/jsonrpc" _ "github.com/dubbo/go-for-apache-dubbo/registry/protocol" _ "github.com/dubbo/go-for-apache-dubbo/filter/imp" _ "github.com/dubbo/go-for-apache-dubbo/cluster/loadbalance" - _ "github.com/dubbo/go-for-apache-dubbo/cluster/support" + _ "github.com/dubbo/go-for-apache-dubbo/cluster/cluster_impl" _ "github.com/dubbo/go-for-apache-dubbo/registry/zookeeper" ) @@ -51,7 +51,7 @@ func main() { log.Printf("sent total %d messages, %d message per client", n*m, m) - conMap, _ := support.Load() + conMap, _ := config.Load() if conMap == nil { panic("conMap is nil") } diff --git a/examples/jsonrpc/go-client/benchmark/user.go b/examples/jsonrpc/go-client/benchmark/user.go index b5d189adcabbcb345157129654f80bfb138ffabd..eecb5e6b7826b02f84e6484e9b207ddc431cfe00 100644 --- a/examples/jsonrpc/go-client/benchmark/user.go +++ b/examples/jsonrpc/go-client/benchmark/user.go @@ -3,18 +3,15 @@ package main import ( "context" "fmt" + "github.com/dubbo/go-for-apache-dubbo/config" ) import ( "github.com/AlexStocks/goext/time" ) -import ( - "github.com/dubbo/go-for-apache-dubbo/config/support" -) - func init() { - support.SetConService(new(UserProvider)) + config.SetConService(new(UserProvider)) } type JsonRPCUser struct { diff --git a/examples/jsonrpc/go-server/app/server.go b/examples/jsonrpc/go-server/app/server.go index 8e4ec3bc62ddfcde643f2b2b767c7b1b90817a6b..ebaed7426da5e2bc8bb42cd62dc12fb77c0152bf 100644 --- a/examples/jsonrpc/go-server/app/server.go +++ b/examples/jsonrpc/go-server/app/server.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "github.com/dubbo/go-for-apache-dubbo/config" "net/http" _ "net/http/pprof" "os" @@ -17,15 +18,13 @@ import ( ) import ( - "github.com/dubbo/go-for-apache-dubbo/config/support" - _ "github.com/dubbo/go-for-apache-dubbo/protocol/jsonrpc" _ "github.com/dubbo/go-for-apache-dubbo/registry/protocol" _ "github.com/dubbo/go-for-apache-dubbo/filter/imp" _ "github.com/dubbo/go-for-apache-dubbo/cluster/loadbalance" - _ "github.com/dubbo/go-for-apache-dubbo/cluster/support" + _ "github.com/dubbo/go-for-apache-dubbo/cluster/cluster_impl" _ "github.com/dubbo/go-for-apache-dubbo/registry/zookeeper" ) @@ -38,7 +37,7 @@ var ( // export APP_LOG_CONF_FILE="xxx" func main() { - _, proMap := support.Load() + _, proMap := config.Load() if proMap == nil { panic("proMap is nil") } @@ -49,7 +48,7 @@ func main() { } func initProfiling() { - if !support.GetProviderConfig().Pprof_Enabled { + if !config.GetProviderConfig().Pprof_Enabled { return } const ( @@ -65,7 +64,7 @@ func initProfiling() { if err != nil { panic("cat not get local ip!") } - addr = ip + ":" + strconv.Itoa(support.GetProviderConfig().Pprof_Port) + addr = ip + ":" + strconv.Itoa(config.GetProviderConfig().Pprof_Port) log.Info("App Profiling startup on address{%v}", addr+PprofPath) go func() { diff --git a/examples/jsonrpc/go-server/app/user.go b/examples/jsonrpc/go-server/app/user.go index d8fe1f818585949aa12e61c3af37f0ac8c5fa4c2..5a0b5be99f629b00ae2d064be6a13125d3434913 100644 --- a/examples/jsonrpc/go-server/app/user.go +++ b/examples/jsonrpc/go-server/app/user.go @@ -4,6 +4,7 @@ import ( // "encoding/json" "context" "fmt" + "github.com/dubbo/go-for-apache-dubbo/config" "time" ) @@ -12,14 +13,10 @@ import ( "github.com/AlexStocks/goext/time" ) -import ( - "github.com/dubbo/go-for-apache-dubbo/config/support" -) - type Gender int func init() { - support.SetProService(new(UserProvider)) + config.SetProService(new(UserProvider)) } const ( diff --git a/filter/imp/echo_filter_test.go b/filter/imp/echo_filter_test.go index ab7653caf3d2626fa46c010b900e0706113b9d4a..ba2f52c79a34b189a77947adfcd95f7a86165284 100644 --- a/filter/imp/echo_filter_test.go +++ b/filter/imp/echo_filter_test.go @@ -1,6 +1,7 @@ package imp import ( + "github.com/dubbo/go-for-apache-dubbo/common" "testing" ) @@ -9,19 +10,18 @@ import ( ) import ( - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol" - "github.com/dubbo/go-for-apache-dubbo/protocol/support" + "github.com/dubbo/go-for-apache-dubbo/protocol/invocation" ) func TestEchoFilter_Invoke(t *testing.T) { filter := GetFilter() - result := filter.Invoke(protocol.NewBaseInvoker(config.URL{}), - support.NewRPCInvocationForProvider("Echo", []interface{}{"OK"}, nil)) + result := filter.Invoke(protocol.NewBaseInvoker(common.URL{}), + invocation.NewRPCInvocationForProvider("Echo", []interface{}{"OK"}, nil)) assert.Equal(t, "OK", result.Result()) - result = filter.Invoke(protocol.NewBaseInvoker(config.URL{}), - support.NewRPCInvocationForProvider("MethodName", []interface{}{"OK"}, nil)) + result = filter.Invoke(protocol.NewBaseInvoker(common.URL{}), + invocation.NewRPCInvocationForProvider("MethodName", []interface{}{"OK"}, nil)) assert.Nil(t, result.Error()) assert.Nil(t, result.Result()) } diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index 7528571d497fa03f29a45a03b5b90bb7163708b5..8daec979176663bf3caaf3d82b632d297d51ba50 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -1,6 +1,8 @@ package dubbo import ( + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/config" "strings" "sync" "time" @@ -17,8 +19,6 @@ import ( import ( "github.com/dubbo/go-for-apache-dubbo/common/constant" - "github.com/dubbo/go-for-apache-dubbo/config" - "github.com/dubbo/go-for-apache-dubbo/config/support" ) var ( @@ -34,7 +34,7 @@ var ( func init() { // load clientconfig from consumer_config - protocolConf := support.GetConsumerConfig().ProtocolConf + protocolConf := config.GetConsumerConfig().ProtocolConf if protocolConf == nil { log.Warn("protocol_conf is nil") return @@ -147,7 +147,7 @@ func NewClient() *Client { } // call one way -func (c *Client) CallOneway(addr string, svcUrl config.URL, method string, args interface{}, opts ...CallOption) error { +func (c *Client) CallOneway(addr string, svcUrl common.URL, method string, args interface{}, opts ...CallOption) error { var copts CallOptions for _, o := range opts { @@ -158,7 +158,7 @@ func (c *Client) CallOneway(addr string, svcUrl config.URL, method string, args } // if @reply is nil, the transport layer will get the response without notify the invoker. -func (c *Client) Call(addr string, svcUrl config.URL, method string, args, reply interface{}, opts ...CallOption) error { +func (c *Client) Call(addr string, svcUrl common.URL, method string, args, reply interface{}, opts ...CallOption) error { var copts CallOptions for _, o := range opts { @@ -173,7 +173,7 @@ func (c *Client) Call(addr string, svcUrl config.URL, method string, args, reply return jerrors.Trace(c.call(ct, addr, svcUrl, method, args, reply, nil, copts)) } -func (c *Client) AsyncCall(addr string, svcUrl config.URL, method string, args interface{}, +func (c *Client) AsyncCall(addr string, svcUrl common.URL, method string, args interface{}, callback AsyncCallback, reply interface{}, opts ...CallOption) error { var copts CallOptions @@ -184,7 +184,7 @@ func (c *Client) AsyncCall(addr string, svcUrl config.URL, method string, args i return jerrors.Trace(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback, copts)) } -func (c *Client) call(ct CallType, addr string, svcUrl config.URL, method string, +func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string, args, reply interface{}, callback AsyncCallback, opts CallOptions) error { if opts.RequestTimeout == 0 { diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index 51b9ab095849307fc5ccc7edccca8f9ae663f492..8f844be02b17b467b6849a40e56d5e361ab240fd 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -2,6 +2,7 @@ package dubbo import ( "errors" + "github.com/dubbo/go-for-apache-dubbo/common" "strconv" "sync" ) @@ -12,9 +13,8 @@ import ( import ( "github.com/dubbo/go-for-apache-dubbo/common/constant" - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol" - "github.com/dubbo/go-for-apache-dubbo/protocol/support" + invocation_impl "github.com/dubbo/go-for-apache-dubbo/protocol/invocation" ) var Err_No_Reply = errors.New("request need @reply") @@ -25,7 +25,7 @@ type DubboInvoker struct { destroyLock sync.Mutex } -func NewDubboInvoker(url config.URL, client *Client) *DubboInvoker { +func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker { return &DubboInvoker{ BaseInvoker: *protocol.NewBaseInvoker(url), client: client, @@ -39,7 +39,7 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { result protocol.RPCResult ) - inv := invocation.(*support.RPCInvocation) + inv := invocation.(*invocation_impl.RPCInvocation) url := di.GetUrl() // async async, err := strconv.ParseBool(inv.AttachmentsByKey(constant.ASYNC_KEY, "false")) diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go index 3ab1bcd21c7418470cafc599839b76e85e80b673..483bac387c22c7a943295dc781570a10ed65bec9 100644 --- a/protocol/dubbo/dubbo_protocol.go +++ b/protocol/dubbo/dubbo_protocol.go @@ -2,11 +2,11 @@ package dubbo import ( log "github.com/AlexStocks/log4go" + "github.com/dubbo/go-for-apache-dubbo/common" ) import ( "github.com/dubbo/go-for-apache-dubbo/common/extension" - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol" ) @@ -42,7 +42,7 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { return exporter } -func (dp *DubboProtocol) Refer(url config.URL) protocol.Invoker { +func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker { invoker := NewDubboInvoker(url, NewClient()) dp.SetInvokers(invoker) log.Info("Refer service: %s", url.String()) @@ -61,7 +61,7 @@ func (dp *DubboProtocol) Destroy() { } } -func (dp *DubboProtocol) openServer(url config.URL) { +func (dp *DubboProtocol) openServer(url common.URL) { exporter, ok := dp.ExporterMap().Load(url.Key()) if !ok { panic("[DubboProtocol]" + url.Key() + "is not existing") diff --git a/protocol/dubbo/dubbo_protocol_test.go b/protocol/dubbo/dubbo_protocol_test.go index 2ec45733b05371b4b289f5442f51e8666cd100e3..0379a4b19224405b33e9b5445be0d10c806fb59a 100644 --- a/protocol/dubbo/dubbo_protocol_test.go +++ b/protocol/dubbo/dubbo_protocol_test.go @@ -2,6 +2,7 @@ package dubbo import ( "context" + "github.com/dubbo/go-for-apache-dubbo/common" "testing" ) @@ -10,14 +11,13 @@ import ( ) import ( - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol" ) func TestDubboProtocol_Export(t *testing.T) { // Export proto := GetProtocol() - url, err := config.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+ + url, err := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+ "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"+ "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ @@ -48,7 +48,7 @@ func TestDubboProtocol_Export(t *testing.T) { func TestDubboProtocol_Refer(t *testing.T) { // Refer proto := GetProtocol() - url, err := config.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+ + url, err := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+ "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"+ "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index df21515687c04559d03fedb79bbef30da05a01b1..bba2e11e63d191b1f4aca614b34f9bbf40f7404a 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -2,6 +2,7 @@ package dubbo import ( "context" + "github.com/dubbo/go-for-apache-dubbo/common" "github.com/dubbo/go-for-apache-dubbo/common/constant" "reflect" "sync" @@ -16,9 +17,8 @@ import ( ) import ( - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol" - "github.com/dubbo/go-for-apache-dubbo/protocol/support" + "github.com/dubbo/go-for-apache-dubbo/protocol/invocation" ) // todo: WritePkg_Timeout will entry *.yml @@ -191,7 +191,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { invoker := h.exporter.GetInvoker() if invoker != nil { - result := invoker.Invoke(support.NewRPCInvocationForProvider(p.Service.Method, p.Body.(map[string]interface{})["args"].([]interface{}), map[string]string{ + result := invoker.Invoke(invocation.NewRPCInvocationForProvider(p.Service.Method, p.Body.(map[string]interface{})["args"].([]interface{}), map[string]string{ constant.PATH_KEY: p.Service.Path, //attachments[constant.GROUP_KEY] = url.GetParam(constant.GROUP_KEY, "") constant.INTERFACE_KEY: p.Service.Interface, @@ -265,7 +265,7 @@ func (h *RpcServerHandler) callService(req *DubboPackage, ctx context.Context) { req.Body = nil return } - svc := svcIf.(*config.Service) + svc := svcIf.(*common.Service) method := svc.Method()[req.Service.Method] if method == nil { log.Error("method not found!") diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go index 4ff68025c38ee4c24331d59be883554f1114c11a..0dcec72d6e455cdff6b7c9a2d93546f66494565a 100644 --- a/protocol/dubbo/readwriter.go +++ b/protocol/dubbo/readwriter.go @@ -2,6 +2,7 @@ package dubbo import ( "bytes" + "github.com/dubbo/go-for-apache-dubbo/common" "reflect" ) @@ -10,9 +11,6 @@ import ( log "github.com/AlexStocks/log4go" jerrors "github.com/juju/errors" ) -import ( - "github.com/dubbo/go-for-apache-dubbo/config" -) //////////////////////////////////////////// // RpcClientPackageHandler @@ -109,7 +107,7 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface "dubboVersion": dubboVersion, "argsTypes": argsTypes, "args": args, - "service": config.ServiceMap.GetService(DUBBO, pkg.Service.Target), + "service": common.ServiceMap.GetService(DUBBO, pkg.Service.Target), "attachments": attachments, } } diff --git a/protocol/dubbo/server.go b/protocol/dubbo/server.go index 01b39e8517d1e9f31e43acf8d639486b16752917..8a47650700406013d4907b29c8853c326339b9a2 100644 --- a/protocol/dubbo/server.go +++ b/protocol/dubbo/server.go @@ -2,6 +2,8 @@ package dubbo import ( "fmt" + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/config" "net" ) @@ -12,8 +14,6 @@ import ( ) import ( - "github.com/dubbo/go-for-apache-dubbo/config" - "github.com/dubbo/go-for-apache-dubbo/config/support" "github.com/dubbo/go-for-apache-dubbo/protocol" ) @@ -22,7 +22,7 @@ var srvConf *ServerConfig func init() { // load clientconfig from provider_config - protocolConf := support.GetProviderConfig().ProtocolConf + protocolConf := config.GetProviderConfig().ProtocolConf if protocolConf == nil { log.Warn("protocol_conf is nil") return @@ -112,7 +112,7 @@ func (s *Server) newSession(session getty.Session) error { return nil } -func (s *Server) Start(url config.URL) { +func (s *Server) Start(url common.URL) { var ( addr string tcpServer getty.Server diff --git a/protocol/support/rpcinvocation.go b/protocol/invocation/rpcinvocation.go similarity index 95% rename from protocol/support/rpcinvocation.go rename to protocol/invocation/rpcinvocation.go index 9fdb7e38cd7af2b425eac64e488dd7fe716f24b7..89143856b0b93500ac45fbceea0e38c8aca4b4e9 100644 --- a/protocol/support/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -1,12 +1,12 @@ -package support +package invocation import ( + "github.com/dubbo/go-for-apache-dubbo/common" "reflect" ) import ( "github.com/dubbo/go-for-apache-dubbo/common/constant" - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol" ) @@ -25,7 +25,7 @@ type RPCInvocation struct { } func NewRPCInvocationForConsumer(methodName string, parameterTypes []reflect.Type, arguments []interface{}, - reply interface{}, callBack interface{}, url config.URL, invoker protocol.Invoker) *RPCInvocation { + reply interface{}, callBack interface{}, url common.URL, invoker protocol.Invoker) *RPCInvocation { attachments := map[string]string{} attachments[constant.PATH_KEY] = url.Path diff --git a/protocol/invoker.go b/protocol/invoker.go index 5611bb31f759c669617157a472613beffe4571ac..46a1782fe4a44bffa37a6253d285017d429b3a04 100644 --- a/protocol/invoker.go +++ b/protocol/invoker.go @@ -6,7 +6,6 @@ import ( import ( "github.com/dubbo/go-for-apache-dubbo/common" - "github.com/dubbo/go-for-apache-dubbo/config" ) // Extension - Invoker @@ -20,12 +19,12 @@ type Invoker interface { ///////////////////////////// type BaseInvoker struct { - url config.URL + url common.URL available bool destroyed bool } -func NewBaseInvoker(url config.URL) *BaseInvoker { +func NewBaseInvoker(url common.URL) *BaseInvoker { return &BaseInvoker{ url: url, available: true, @@ -33,7 +32,7 @@ func NewBaseInvoker(url config.URL) *BaseInvoker { } } -func (bi *BaseInvoker) GetUrl() config.URL { +func (bi *BaseInvoker) GetUrl() common.URL { return bi.url } diff --git a/protocol/jsonrpc/http.go b/protocol/jsonrpc/http.go index 3bdaae56ec846bde5c6ed5f7c7fbb7cc2ee7a313..1dbe012a03e2082972d51c01a11a7e84714bb79a 100644 --- a/protocol/jsonrpc/http.go +++ b/protocol/jsonrpc/http.go @@ -5,6 +5,7 @@ import ( "bytes" "context" "fmt" + "github.com/dubbo/go-for-apache-dubbo/common" "io/ioutil" "net" "net/http" @@ -21,7 +22,6 @@ import ( import ( "github.com/dubbo/go-for-apache-dubbo/common/constant" - "github.com/dubbo/go-for-apache-dubbo/config" ) ////////////////////////////////////////////// @@ -77,7 +77,7 @@ func NewHTTPClient(opt *HTTPOptions) *HTTPClient { } } -func (c *HTTPClient) NewRequest(service config.URL, method string, args interface{}) *Request { +func (c *HTTPClient) NewRequest(service common.URL, method string, args interface{}) *Request { return &Request{ ID: atomic.AddInt64(&c.ID, 1), @@ -90,7 +90,7 @@ func (c *HTTPClient) NewRequest(service config.URL, method string, args interfac } } -func (c *HTTPClient) Call(ctx context.Context, service config.URL, req *Request, rsp interface{}) error { +func (c *HTTPClient) Call(ctx context.Context, service common.URL, req *Request, rsp interface{}) error { // header httpHeader := http.Header{} httpHeader.Set("Content-Type", "application/json") diff --git a/protocol/jsonrpc/jsonrpc_invoker.go b/protocol/jsonrpc/jsonrpc_invoker.go index 8f2e197b2e2b09705b3aac1866346c5bc15d43bb..3ded3ef813c88e338e24336b7cbee43954f155c5 100644 --- a/protocol/jsonrpc/jsonrpc_invoker.go +++ b/protocol/jsonrpc/jsonrpc_invoker.go @@ -2,6 +2,7 @@ package jsonrpc import ( "context" + "github.com/dubbo/go-for-apache-dubbo/common" ) import ( @@ -11,9 +12,8 @@ import ( import ( "github.com/dubbo/go-for-apache-dubbo/common/constant" - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol" - "github.com/dubbo/go-for-apache-dubbo/protocol/support" + invocation_impl "github.com/dubbo/go-for-apache-dubbo/protocol/invocation" ) type JsonrpcInvoker struct { @@ -21,7 +21,7 @@ type JsonrpcInvoker struct { client *HTTPClient } -func NewJsonrpcInvoker(url config.URL, client *HTTPClient) *JsonrpcInvoker { +func NewJsonrpcInvoker(url common.URL, client *HTTPClient) *JsonrpcInvoker { return &JsonrpcInvoker{ BaseInvoker: *protocol.NewBaseInvoker(url), client: client, @@ -34,7 +34,7 @@ func (ji *JsonrpcInvoker) Invoke(invocation protocol.Invocation) protocol.Result result protocol.RPCResult ) - inv := invocation.(*support.RPCInvocation) + inv := invocation.(*invocation_impl.RPCInvocation) url := ji.GetUrl() req := ji.client.NewRequest(url, inv.MethodName(), inv.Arguments()) ctx := context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ diff --git a/protocol/jsonrpc/jsonrpc_protocol.go b/protocol/jsonrpc/jsonrpc_protocol.go index 98276e8c568394ec54ff588bc958b45471851eaf..bbd4fc1dd7f0462b2dd9e2aac23e2e10446d4276 100644 --- a/protocol/jsonrpc/jsonrpc_protocol.go +++ b/protocol/jsonrpc/jsonrpc_protocol.go @@ -2,12 +2,12 @@ package jsonrpc import ( log "github.com/AlexStocks/log4go" + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/config" ) import ( "github.com/dubbo/go-for-apache-dubbo/common/extension" - "github.com/dubbo/go-for-apache-dubbo/config" - "github.com/dubbo/go-for-apache-dubbo/config/support" "github.com/dubbo/go-for-apache-dubbo/protocol" ) @@ -44,10 +44,10 @@ func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter { return exporter } -func (jp *JsonrpcProtocol) Refer(url config.URL) protocol.Invoker { +func (jp *JsonrpcProtocol) Refer(url common.URL) protocol.Invoker { invoker := NewJsonrpcInvoker(url, NewHTTPClient(&HTTPOptions{ - HandshakeTimeout: support.GetConsumerConfig().ConnectTimeout, - HTTPTimeout: support.GetConsumerConfig().RequestTimeout, + HandshakeTimeout: config.GetConsumerConfig().ConnectTimeout, + HTTPTimeout: config.GetConsumerConfig().RequestTimeout, })) jp.SetInvokers(invoker) log.Info("Refer service: %s", url.String()) @@ -66,7 +66,7 @@ func (jp *JsonrpcProtocol) Destroy() { } } -func (jp *JsonrpcProtocol) openServer(url config.URL) { +func (jp *JsonrpcProtocol) openServer(url common.URL) { exporter, ok := jp.ExporterMap().Load(url.Key()) if !ok { panic("[JsonrpcProtocol]" + url.Key() + "is not existing") diff --git a/protocol/jsonrpc/jsonrpc_protocol_test.go b/protocol/jsonrpc/jsonrpc_protocol_test.go index e576b0716727bee5251def2461fda26d5ffb5d31..619d3f640ea6b3cb9ab7e70db266ddac8cf41f23 100644 --- a/protocol/jsonrpc/jsonrpc_protocol_test.go +++ b/protocol/jsonrpc/jsonrpc_protocol_test.go @@ -2,6 +2,8 @@ package jsonrpc import ( "context" + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/config" "testing" "time" ) @@ -11,15 +13,13 @@ import ( ) import ( - "github.com/dubbo/go-for-apache-dubbo/config" - "github.com/dubbo/go-for-apache-dubbo/config/support" "github.com/dubbo/go-for-apache-dubbo/protocol" ) func TestJsonrpcProtocol_Export(t *testing.T) { // Export proto := GetProtocol() - url, err := config.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+ + url, err := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+ "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"+ "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ @@ -49,17 +49,17 @@ func TestJsonrpcProtocol_Export(t *testing.T) { func TestJsonrpcProtocol_Refer(t *testing.T) { // Refer proto := GetProtocol() - url, err := config.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+ + url, err := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+ "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"+ "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ "side=provider&timeout=3000×tamp=1556509797245") assert.NoError(t, err) - con := support.ConsumerConfig{ + con := config.ConsumerConfig{ ConnectTimeout: 5 * time.Second, RequestTimeout: 5 * time.Second, } - support.SetConsumerConfig(con) + config.SetConsumerConfig(con) invoker := proto.Refer(url) // make sure url diff --git a/protocol/jsonrpc/server.go b/protocol/jsonrpc/server.go index 9c924c20c511054c3f6cc7af1df5d7bf199dd90b..f64c6c102664c3eda07f04fb793d0860f759ac0e 100644 --- a/protocol/jsonrpc/server.go +++ b/protocol/jsonrpc/server.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "context" + "github.com/dubbo/go-for-apache-dubbo/common" "io" "io/ioutil" "net" @@ -22,9 +23,8 @@ import ( import ( "github.com/dubbo/go-for-apache-dubbo/common/constant" - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol" - "github.com/dubbo/go-for-apache-dubbo/protocol/support" + "github.com/dubbo/go-for-apache-dubbo/protocol/invocation" ) var ( @@ -198,7 +198,7 @@ func accept(listener net.Listener, fn func(net.Conn)) error { } } -func (s *Server) Start(url config.URL) { +func (s *Server) Start(url common.URL) { listener, err := net.Listen("tcp", url.Location) if err != nil { log.Error("jsonrpc server [%s] start failed: %v", url.Path, err) @@ -309,7 +309,7 @@ func serveRequest(ctx context.Context, // exporter invoke invoker := exporter.GetInvoker() if invoker != nil { - result := invoker.Invoke(support.NewRPCInvocationForProvider(methodName, args.([]interface{}), map[string]string{ + result := invoker.Invoke(invocation.NewRPCInvocationForProvider(methodName, args.([]interface{}), map[string]string{ //attachments[constant.PATH_KEY] = url.Path //attachments[constant.GROUP_KEY] = url.GetParam(constant.GROUP_KEY, "") //attachments[constant.INTERFACE_KEY] = url.GetParam(constant.INTERFACE_KEY, "") @@ -336,7 +336,7 @@ func serveRequest(ctx context.Context, } // get method - svc := config.ServiceMap.GetService(JSONRPC, serviceName) + svc := common.ServiceMap.GetService(JSONRPC, serviceName) if svc == nil { return jerrors.New("cannot find svc " + serviceName) } diff --git a/protocol/protocol.go b/protocol/protocol.go index 413b2e61f693a000806c6837bc2cefd8a7c254fe..4df2e609d47df97b69fd368956ab27ee0d64802e 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -1,6 +1,7 @@ package protocol import ( + "github.com/dubbo/go-for-apache-dubbo/common" "sync" ) @@ -8,14 +9,10 @@ import ( log "github.com/AlexStocks/log4go" ) -import ( - "github.com/dubbo/go-for-apache-dubbo/config" -) - // Extension - protocol type Protocol interface { Export(invoker Invoker) Exporter - Refer(url config.URL) Invoker + Refer(url common.URL) Invoker Destroy() } @@ -60,7 +57,7 @@ func (bp *BaseProtocol) Export(invoker Invoker) Exporter { return nil } -func (bp *BaseProtocol) Refer(url config.URL) Invoker { +func (bp *BaseProtocol) Refer(url common.URL) Invoker { return nil } diff --git a/protocol/protocolwrapper/mock_protocol_filter.go b/protocol/protocolwrapper/mock_protocol_filter.go index 2a59ac420024c5833d6822495755e13f9aa44976..7852cfbf14b059a7d743c9f0ff28e29fd62f79b3 100644 --- a/protocol/protocolwrapper/mock_protocol_filter.go +++ b/protocol/protocolwrapper/mock_protocol_filter.go @@ -1,7 +1,7 @@ package protocolwrapper import ( - "github.com/dubbo/go-for-apache-dubbo/config" + "github.com/dubbo/go-for-apache-dubbo/common" "github.com/dubbo/go-for-apache-dubbo/protocol" "sync" ) @@ -17,7 +17,7 @@ func (pfw *mockProtocolFilter) Export(invoker protocol.Invoker) protocol.Exporte return protocol.NewBaseExporter("key", invoker, &sync.Map{}) } -func (pfw *mockProtocolFilter) Refer(url config.URL) protocol.Invoker { +func (pfw *mockProtocolFilter) Refer(url common.URL) protocol.Invoker { return protocol.NewBaseInvoker(url) } diff --git a/protocol/protocolwrapper/protocol_filter_wrapper.go b/protocol/protocolwrapper/protocol_filter_wrapper.go index a0d1e009d82266c4e9c41e876f217ffaac585a22..b8cccec0cf507eca9584f64ee0c65e4243055ec7 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper.go @@ -1,6 +1,7 @@ package protocolwrapper import ( + "github.com/dubbo/go-for-apache-dubbo/common" "github.com/dubbo/go-for-apache-dubbo/filter/imp" "strings" ) @@ -8,7 +9,6 @@ import ( import ( "github.com/dubbo/go-for-apache-dubbo/common/constant" "github.com/dubbo/go-for-apache-dubbo/common/extension" - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/filter" "github.com/dubbo/go-for-apache-dubbo/protocol" ) @@ -32,7 +32,7 @@ func (pfw *ProtocolFilterWrapper) Export(invoker protocol.Invoker) protocol.Expo return pfw.protocol.Export(invoker) } -func (pfw *ProtocolFilterWrapper) Refer(url config.URL) protocol.Invoker { +func (pfw *ProtocolFilterWrapper) Refer(url common.URL) protocol.Invoker { if pfw.protocol == nil { pfw.protocol = extension.GetProtocolExtension(url.Protocol) } @@ -77,7 +77,7 @@ type FilterInvoker struct { filter filter.Filter } -func (fi *FilterInvoker) GetUrl() config.URL { +func (fi *FilterInvoker) GetUrl() common.URL { return fi.invoker.GetUrl() } diff --git a/registry/directory/directory.go b/registry/directory/directory.go index ca959b82a42c1086d40f5123c1fdadbb2f9894e7..57e63599454d44469b155929961036559c955b4d 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -1,6 +1,7 @@ package directory import ( + "github.com/dubbo/go-for-apache-dubbo/common" "sync" "time" ) @@ -14,7 +15,6 @@ import ( "github.com/dubbo/go-for-apache-dubbo/cluster/directory" "github.com/dubbo/go-for-apache-dubbo/common/constant" "github.com/dubbo/go-for-apache-dubbo/common/extension" - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol" "github.com/dubbo/go-for-apache-dubbo/protocol/protocolwrapper" "github.com/dubbo/go-for-apache-dubbo/registry" @@ -38,7 +38,7 @@ type RegistryDirectory struct { Options } -func NewRegistryDirectory(url *config.URL, registry registry.Registry, opts ...Option) (*RegistryDirectory, error) { +func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...Option) (*RegistryDirectory, error) { options := Options{ //default 300s serviceTTL: time.Duration(300e9), @@ -60,7 +60,7 @@ func NewRegistryDirectory(url *config.URL, registry registry.Registry, opts ...O } //subscibe from registry -func (dir *RegistryDirectory) Subscribe(url config.URL) { +func (dir *RegistryDirectory) Subscribe(url common.URL) { for { if !dir.registry.IsAvailable() { log.Warn("event listener game over.") @@ -163,14 +163,14 @@ func (dir *RegistryDirectory) toGroupInvokers(newInvokersMap *sync.Map) []protoc return groupInvokersList } -func (dir *RegistryDirectory) uncacheInvoker(url config.URL) *sync.Map { +func (dir *RegistryDirectory) uncacheInvoker(url common.URL) *sync.Map { log.Debug("service will be deleted in cache invokers: invokers key is %s!", url.Key()) newCacheInvokers := dir.cacheInvokersMap newCacheInvokers.Delete(url.Key()) return newCacheInvokers } -func (dir *RegistryDirectory) cacheInvoker(url config.URL) *sync.Map { +func (dir *RegistryDirectory) cacheInvoker(url common.URL) *sync.Map { referenceUrl := dir.GetUrl().SubURL newCacheInvokers := dir.cacheInvokersMap //check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol @@ -211,7 +211,7 @@ func (dir *RegistryDirectory) Destroy() { // configuration > reference config >service config // in this function we should merge the reference local url config into the service url from registry. //TODO configuration merge, in the future , the configuration center's config should merge too. -func mergeUrl(serviceUrl config.URL, referenceUrl *config.URL) config.URL { +func mergeUrl(serviceUrl common.URL, referenceUrl *common.URL) common.URL { mergedUrl := serviceUrl var methodConfigMergeFcn = []func(method string){} diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index 5d7c2b253d2520751f468e256140e54b77577812..0f5371f070e83e0c0d45fa58887e92ad6341b839 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -2,6 +2,8 @@ package directory import ( "context" + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/protocol" "net/url" "strconv" "testing" @@ -11,55 +13,32 @@ import ( "github.com/stretchr/testify/assert" ) import ( - "github.com/dubbo/go-for-apache-dubbo/cluster/support" + "github.com/dubbo/go-for-apache-dubbo/cluster/cluster_impl" "github.com/dubbo/go-for-apache-dubbo/common/constant" "github.com/dubbo/go-for-apache-dubbo/common/extension" - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol/protocolwrapper" "github.com/dubbo/go-for-apache-dubbo/registry" ) func TestSubscribe(t *testing.T) { - extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter) - - url, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111") - suburl, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000") - url.SubURL = &suburl - mockRegistry, _ := registry.NewMockRegistry(&config.URL{}) - registryDirectory, _ := NewRegistryDirectory(&url, mockRegistry) - - go registryDirectory.Subscribe(*config.NewURLWithOptions("testservice")) - for i := 0; i < 3; i++ { - mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"))}) - } + registryDirectory, _ := normalRegistryDir() time.Sleep(1e9) assert.Len(t, registryDirectory.cacheInvokers, 3) } func TestSubscribe_Delete(t *testing.T) { - extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter) - - url, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111") - suburl, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000") - url.SubURL = &suburl - mockRegistry, _ := registry.NewMockRegistry(&config.URL{}) - registryDirectory, _ := NewRegistryDirectory(&url, mockRegistry) - - go registryDirectory.Subscribe(*config.NewURLWithOptions("testservice")) - for i := 0; i < 3; i++ { - mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"))}) - } + registryDirectory, mockRegistry := normalRegistryDir() time.Sleep(1e9) assert.Len(t, registryDirectory.cacheInvokers, 3) - mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: registry.ServiceDel, Service: *config.NewURLWithOptions("TEST0", config.WithProtocol("dubbo"))}) + mockRegistry.MockEvent(®istry.ServiceEvent{Action: registry.ServiceDel, Service: *common.NewURLWithOptions("TEST0", common.WithProtocol("dubbo"))}) time.Sleep(1e9) assert.Len(t, registryDirectory.cacheInvokers, 2) } func TestSubscribe_InvalidUrl(t *testing.T) { - url, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111") - mockRegistry, _ := registry.NewMockRegistry(&config.URL{}) + url, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111") + mockRegistry, _ := registry.NewMockRegistry(&common.URL{}) _, err := NewRegistryDirectory(&url, mockRegistry) assert.Error(t, err) } @@ -68,32 +47,69 @@ func TestSubscribe_Group(t *testing.T) { extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter) extension.SetCluster("mock", cluster.NewMockCluster) - regurl, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111") - suburl, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000") + regurl, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111") + suburl, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000") suburl.Params.Set(constant.CLUSTER_KEY, "mock") regurl.SubURL = &suburl - mockRegistry, _ := registry.NewMockRegistry(&config.URL{}) + mockRegistry, _ := registry.NewMockRegistry(&common.URL{}) registryDirectory, _ := NewRegistryDirectory(®url, mockRegistry) - go registryDirectory.Subscribe(*config.NewURLWithOptions("testservice")) + go registryDirectory.Subscribe(*common.NewURLWithOptions("testservice")) //for group1 urlmap := url.Values{} urlmap.Set(constant.GROUP_KEY, "group1") urlmap.Set(constant.CLUSTER_KEY, "failover") //to test merge url for i := 0; i < 3; i++ { - mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"), - config.WithParams(urlmap))}) + mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"), + common.WithParams(urlmap))}) } //for group2 urlmap2 := url.Values{} urlmap2.Set(constant.GROUP_KEY, "group2") urlmap2.Set(constant.CLUSTER_KEY, "failover") //to test merge url for i := 0; i < 3; i++ { - mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"), - config.WithParams(urlmap2))}) + mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"), + common.WithParams(urlmap2))}) } time.Sleep(1e9) assert.Len(t, registryDirectory.cacheInvokers, 2) } + +func Test_Destory(t *testing.T) { + registryDirectory, _ := normalRegistryDir() + + time.Sleep(1e9) + assert.Len(t, registryDirectory.cacheInvokers, 3) + assert.Equal(t, true, registryDirectory.IsAvailable()) + + registryDirectory.Destroy() + assert.Len(t, registryDirectory.cacheInvokers, 0) + assert.Equal(t, false, registryDirectory.IsAvailable()) +} + +func Test_List(t *testing.T) { + registryDirectory, _ := normalRegistryDir() + + time.Sleep(1e9) + assert.Len(t, registryDirectory.List(protocol.Invocation()), 3) + assert.Equal(t, true, registryDirectory.IsAvailable()) + +} + +func normalRegistryDir() (*RegistryDirectory, *registry.MockRegistry) { + extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter) + + url, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111") + suburl, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000") + url.SubURL = &suburl + mockRegistry, _ := registry.NewMockRegistry(&common.URL{}) + registryDirectory, _ := NewRegistryDirectory(&url, mockRegistry) + + go registryDirectory.Subscribe(*common.NewURLWithOptions("testservice")) + for i := 0; i < 3; i++ { + mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"))}) + } + return registryDirectory, mockRegistry.(*registry.MockRegistry) +} diff --git a/registry/event.go b/registry/event.go index 976f7f9875ce4da864c2cc3775a5dcd3df060238..36877939426560cc7a347dd54d9d37e4bc0f9976 100644 --- a/registry/event.go +++ b/registry/event.go @@ -2,12 +2,10 @@ package registry import ( "fmt" + "github.com/dubbo/go-for-apache-dubbo/common" "math/rand" "time" ) -import ( - "github.com/dubbo/go-for-apache-dubbo/config" -) func init() { rand.Seed(time.Now().UnixNano()) @@ -39,7 +37,7 @@ func (t ServiceEventType) String() string { type ServiceEvent struct { Action ServiceEventType - Service config.URL + Service common.URL } func (e ServiceEvent) String() string { diff --git a/registry/mock_registry.go b/registry/mock_registry.go index e2fca36bbf0e4ecca192a6eaf0bf26e5747e28a6..f112306d3698f960a13463f83c17cfaf6406c2d0 100644 --- a/registry/mock_registry.go +++ b/registry/mock_registry.go @@ -1,18 +1,16 @@ package registry import ( + "github.com/dubbo/go-for-apache-dubbo/common" "github.com/tevino/abool" ) -import ( - "github.com/dubbo/go-for-apache-dubbo/config" -) type MockRegistry struct { listener *listener destroyed *abool.AtomicBool } -func NewMockRegistry(url *config.URL) (Registry, error) { +func NewMockRegistry(url *common.URL) (Registry, error) { registry := &MockRegistry{ destroyed: abool.NewBool(false), } @@ -20,7 +18,7 @@ func NewMockRegistry(url *config.URL) (Registry, error) { registry.listener = listener return registry, nil } -func (*MockRegistry) Register(url config.URL) error { +func (*MockRegistry) Register(url common.URL) error { return nil } @@ -31,11 +29,11 @@ func (r *MockRegistry) Destroy() { func (r *MockRegistry) IsAvailable() bool { return !r.destroyed.IsSet() } -func (r *MockRegistry) GetUrl() config.URL { - return config.URL{} +func (r *MockRegistry) GetUrl() common.URL { + return common.URL{} } -func (r *MockRegistry) Subscribe(config.URL) (Listener, error) { +func (r *MockRegistry) Subscribe(common.URL) (Listener, error) { return r.listener, nil } diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 8a61cec5bba7bc00d796df5a8c985cdabdb6ee1d..2052d244e0dd0da24264bf1dd039a072c5d6c04f 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -1,6 +1,7 @@ package protocol import ( + "github.com/dubbo/go-for-apache-dubbo/common" "sync" ) @@ -11,7 +12,6 @@ import ( import ( "github.com/dubbo/go-for-apache-dubbo/common/constant" "github.com/dubbo/go-for-apache-dubbo/common/extension" - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol" "github.com/dubbo/go-for-apache-dubbo/protocol/protocolwrapper" "github.com/dubbo/go-for-apache-dubbo/registry" @@ -39,7 +39,7 @@ func NewRegistryProtocol() *RegistryProtocol { bounds: sync.Map{}, } } -func getRegistry(regUrl *config.URL) registry.Registry { +func getRegistry(regUrl *common.URL) registry.Registry { reg, err := extension.GetRegistryExtension(regUrl.Protocol, regUrl) if err != nil { log.Error("Registry can not connect success, program is going to panic.Error message is %s", err.Error()) @@ -47,7 +47,7 @@ func getRegistry(regUrl *config.URL) registry.Registry { } return reg } -func (proto *RegistryProtocol) Refer(url config.URL) protocol.Invoker { +func (proto *RegistryProtocol) Refer(url common.URL) protocol.Invoker { var registryUrl = url var serviceUrl = registryUrl.SubURL @@ -142,7 +142,7 @@ func (proto *RegistryProtocol) Destroy() { }) } -func (*RegistryProtocol) getRegistryUrl(invoker protocol.Invoker) config.URL { +func (*RegistryProtocol) getRegistryUrl(invoker protocol.Invoker) common.URL { //here add * for return a new url url := invoker.GetUrl() //if the protocol == registry ,set protocol the registry value in url.params @@ -153,7 +153,7 @@ func (*RegistryProtocol) getRegistryUrl(invoker protocol.Invoker) config.URL { return url } -func (*RegistryProtocol) getProviderUrl(invoker protocol.Invoker) config.URL { +func (*RegistryProtocol) getProviderUrl(invoker protocol.Invoker) common.URL { url := invoker.GetUrl() return *url.SubURL } @@ -167,18 +167,18 @@ func GetProtocol() protocol.Protocol { type wrappedInvoker struct { invoker protocol.Invoker - url config.URL + url common.URL protocol.BaseInvoker } -func newWrappedInvoker(invoker protocol.Invoker, url config.URL) *wrappedInvoker { +func newWrappedInvoker(invoker protocol.Invoker, url common.URL) *wrappedInvoker { return &wrappedInvoker{ invoker: invoker, url: url, - BaseInvoker: *protocol.NewBaseInvoker(config.URL{}), + BaseInvoker: *protocol.NewBaseInvoker(common.URL{}), } } -func (ivk *wrappedInvoker) GetUrl() config.URL { +func (ivk *wrappedInvoker) GetUrl() common.URL { return ivk.url } func (ivk *wrappedInvoker) getInvoker() protocol.Invoker { diff --git a/registry/protocol/protocol_test.go b/registry/protocol/protocol_test.go index 6f3786a60672e5cd590db05745a5e9afa88abaa5..6b99d544fb8ef54da2393bfa0a5d4c735a6d306e 100644 --- a/registry/protocol/protocol_test.go +++ b/registry/protocol/protocol_test.go @@ -2,16 +2,16 @@ package protocol import ( "context" + "github.com/dubbo/go-for-apache-dubbo/common" "testing" ) import ( "github.com/stretchr/testify/assert" ) import ( - cluster "github.com/dubbo/go-for-apache-dubbo/cluster/support" + cluster "github.com/dubbo/go-for-apache-dubbo/cluster/cluster_impl" "github.com/dubbo/go-for-apache-dubbo/common/constant" "github.com/dubbo/go-for-apache-dubbo/common/extension" - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol" "github.com/dubbo/go-for-apache-dubbo/protocol/protocolwrapper" "github.com/dubbo/go-for-apache-dubbo/registry" @@ -23,8 +23,8 @@ func referNormal(t *testing.T, regProtocol *RegistryProtocol) { extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter) extension.SetCluster("mock", cluster.NewMockCluster) - url, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111") - suburl, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", config.WithParamsValue(constant.CLUSTER_KEY, "mock")) + url, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111") + suburl, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", common.WithParamsValue(constant.CLUSTER_KEY, "mock")) url.SubURL = &suburl @@ -40,8 +40,8 @@ func TestRefer(t *testing.T) { func TestMultiRegRefer(t *testing.T) { regProtocol := NewRegistryProtocol() referNormal(t, regProtocol) - url2, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:2222") - suburl2, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", config.WithParamsValue(constant.CLUSTER_KEY, "mock")) + url2, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:2222") + suburl2, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", common.WithParamsValue(constant.CLUSTER_KEY, "mock")) url2.SubURL = &suburl2 @@ -58,8 +58,8 @@ func TestOneRegRefer(t *testing.T) { regProtocol := NewRegistryProtocol() referNormal(t, regProtocol) - url2, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111") - suburl2, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", config.WithParamsValue(constant.CLUSTER_KEY, "mock")) + url2, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111") + suburl2, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", common.WithParamsValue(constant.CLUSTER_KEY, "mock")) url2.SubURL = &suburl2 @@ -75,8 +75,8 @@ func exporterNormal(t *testing.T, regProtocol *RegistryProtocol) { extension.SetProtocol("registry", GetProtocol) extension.SetRegistry("mock", registry.NewMockRegistry) extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter) - url, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111") - suburl, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", config.WithParamsValue(constant.CLUSTER_KEY, "mock")) + url, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111") + suburl, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", common.WithParamsValue(constant.CLUSTER_KEY, "mock")) url.SubURL = &suburl invoker := protocol.NewBaseInvoker(url) @@ -95,8 +95,8 @@ func TestMultiRegAndMultiProtoExporter(t *testing.T) { regProtocol := NewRegistryProtocol() exporterNormal(t, regProtocol) - url2, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:2222") - suburl2, _ := config.NewURL(context.TODO(), "jsonrpc://127.0.0.1:20000//", config.WithParamsValue(constant.CLUSTER_KEY, "mock")) + url2, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:2222") + suburl2, _ := common.NewURL(context.TODO(), "jsonrpc://127.0.0.1:20000//", common.WithParamsValue(constant.CLUSTER_KEY, "mock")) url2.SubURL = &suburl2 invoker2 := protocol.NewBaseInvoker(url2) @@ -121,8 +121,8 @@ func TestOneRegAndProtoExporter(t *testing.T) { regProtocol := NewRegistryProtocol() exporterNormal(t, regProtocol) - url2, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111") - suburl2, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", config.WithParamsValue(constant.CLUSTER_KEY, "mock")) + url2, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111") + suburl2, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", common.WithParamsValue(constant.CLUSTER_KEY, "mock")) url2.SubURL = &suburl2 invoker2 := protocol.NewBaseInvoker(url2) diff --git a/registry/registry.go b/registry/registry.go index 7b724daa2afbd904b93484a49a5dc75f6b3a10cc..8bc384c215f6caded028e348e436341e201139ba 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -2,7 +2,6 @@ package registry import ( "github.com/dubbo/go-for-apache-dubbo/common" - "github.com/dubbo/go-for-apache-dubbo/config" ) // Extension - Registry @@ -10,10 +9,10 @@ type Registry interface { common.Node //used for service provider calling , register services to registry //And it is also used for service consumer calling , register services cared about ,for dubbo's admin monitoring. - Register(url config.URL) error + Register(url common.URL) error //used for service consumer ,start subscribe service event from registry - Subscribe(config.URL) (Listener, error) + Subscribe(common.URL) (Listener, error) } type Listener interface { diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index 54e48b901f70fb9a5d08ebeabbaff98a2d4800e2..740f9c00984d9e89d4e4448262f0cbfddb310179 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -3,6 +3,7 @@ package zookeeper import ( "context" "fmt" + "github.com/dubbo/go-for-apache-dubbo/common" "path" "sync" "time" @@ -15,7 +16,6 @@ import ( ) import ( - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/registry" ) @@ -84,7 +84,7 @@ func (l *zkEventListener) listenServiceNodeEvent(zkPath string) bool { return false } -func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, conf config.URL) { +func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, conf common.URL) { contains := func(s []string, e string) bool { for _, a := range s { if a == e { @@ -104,7 +104,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co // a node was added -- listen the new node var ( newNode string - serviceURL config.URL + serviceURL common.URL ) for _, n := range newChildren { if contains(children, n) { @@ -114,7 +114,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co newNode = path.Join(zkPath, n) log.Info("add zkNode{%s}", newNode) //context.TODO - serviceURL, err = config.NewURL(context.TODO(), n) + serviceURL, err = common.NewURL(context.TODO(), n) if err != nil { log.Error("NewURL(%s) = error{%v}", n, jerrors.ErrorStack(err)) continue @@ -126,7 +126,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co log.Info("add serviceURL{%s}", serviceURL) l.events <- zkEvent{®istry.ServiceEvent{Action: registry.ServiceAdd, Service: serviceURL}, nil} // listen l service node - go func(node string, serviceURL config.URL) { + go func(node string, serviceURL common.URL) { log.Info("delete zkNode{%s}", node) if l.listenServiceNodeEvent(node) { log.Info("delete serviceURL{%s}", serviceURL) @@ -145,7 +145,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co oldNode = path.Join(zkPath, n) log.Warn("delete zkPath{%s}", oldNode) - serviceURL, err = config.NewURL(context.TODO(), n) + serviceURL, err = common.NewURL(context.TODO(), n) if !conf.URLEqual(serviceURL) { log.Warn("serviceURL{%s} has been deleted is not compatible with SubURL{%#v}", serviceURL.Key(), conf.Key()) continue @@ -159,7 +159,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co } } -func (l *zkEventListener) listenDirEvent(zkPath string, conf config.URL) { +func (l *zkEventListener) listenDirEvent(zkPath string, conf common.URL) { l.wg.Add(1) defer l.wg.Done() @@ -225,13 +225,13 @@ func (l *zkEventListener) listenDirEvent(zkPath string, conf config.URL) { // registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent // | // --------> listenServiceNodeEvent -func (l *zkEventListener) listenServiceEvent(conf config.URL) { +func (l *zkEventListener) listenServiceEvent(conf common.URL) { var ( err error zkPath string dubboPath string children []string - serviceURL config.URL + serviceURL common.URL ) zkPath = fmt.Sprintf("/dubbo%s/providers", conf.Path) @@ -256,7 +256,7 @@ func (l *zkEventListener) listenServiceEvent(conf config.URL) { } for _, c := range children { - serviceURL, err = config.NewURL(context.TODO(), c) + serviceURL, err = common.NewURL(context.TODO(), c) if err != nil { log.Error("NewURL(r{%s}) = error{%v}", c, err) continue @@ -271,7 +271,7 @@ func (l *zkEventListener) listenServiceEvent(conf config.URL) { // listen l service node dubboPath = path.Join(zkPath, c) log.Info("listen dubbo service key{%s}", dubboPath) - go func(zkPath string, serviceURL config.URL) { + go func(zkPath string, serviceURL common.URL) { if l.listenServiceNodeEvent(dubboPath) { log.Debug("delete serviceUrl{%s}", serviceURL) l.events <- zkEvent{®istry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil} @@ -281,7 +281,7 @@ func (l *zkEventListener) listenServiceEvent(conf config.URL) { } log.Info("listen dubbo path{%s}", zkPath) - go func(zkPath string, conf config.URL) { + go func(zkPath string, conf common.URL) { l.listenDirEvent(zkPath, conf) log.Warn("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) }(zkPath, conf) diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 4a0780a3f7c7512268c46775cc6eba27a8132eca..6545b73e6d19c98a3b82709c9a3d26a7961391a0 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -3,6 +3,7 @@ package zookeeper import ( "context" "fmt" + "github.com/dubbo/go-for-apache-dubbo/common" "net/url" "os" "strconv" @@ -21,7 +22,6 @@ import ( import ( "github.com/dubbo/go-for-apache-dubbo/common/constant" "github.com/dubbo/go-for-apache-dubbo/common/extension" - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/registry" "github.com/dubbo/go-for-apache-dubbo/version" ) @@ -50,14 +50,14 @@ func init() { type ZkRegistry struct { context context.Context - *config.URL + *common.URL birth int64 // time of file birth, seconds since Epoch; 0 if unknown wg sync.WaitGroup // wg+done for zk restart done chan struct{} cltLock sync.Mutex client *zookeeperClient - services map[string]config.URL // service name + protocol -> service config + services map[string]common.URL // service name + protocol -> service config listenerLock sync.Mutex listener *zkEventListener @@ -66,7 +66,7 @@ type ZkRegistry struct { zkPath map[string]int // key = protocol://ip:port/interface } -func NewZkRegistry(url *config.URL) (registry.Registry, error) { +func NewZkRegistry(url *common.URL) (registry.Registry, error) { var ( err error r *ZkRegistry @@ -76,7 +76,7 @@ func NewZkRegistry(url *config.URL) (registry.Registry, error) { URL: url, birth: time.Now().UnixNano(), done: make(chan struct{}), - services: make(map[string]config.URL), + services: make(map[string]common.URL), zkPath: make(map[string]int), } @@ -103,7 +103,7 @@ func NewZkRegistry(url *config.URL) (registry.Registry, error) { return r, nil } -func NewMockZkRegistry(url *config.URL) (*zk.TestCluster, *ZkRegistry, error) { +func NewMockZkRegistry(url *common.URL) (*zk.TestCluster, *ZkRegistry, error) { var ( err error r *ZkRegistry @@ -115,7 +115,7 @@ func NewMockZkRegistry(url *config.URL) (*zk.TestCluster, *ZkRegistry, error) { URL: url, birth: time.Now().UnixNano(), done: make(chan struct{}), - services: make(map[string]config.URL), + services: make(map[string]common.URL), zkPath: make(map[string]int), } @@ -134,7 +134,7 @@ func NewMockZkRegistry(url *config.URL) (*zk.TestCluster, *ZkRegistry, error) { return c, r, nil } -func (r *ZkRegistry) GetUrl() config.URL { +func (r *ZkRegistry) GetUrl() common.URL { return *r.URL } @@ -187,7 +187,7 @@ func (r *ZkRegistry) handleZkRestart() { err error flag bool failTimes int - confIf config.URL + confIf common.URL ) defer r.wg.Done() @@ -218,7 +218,7 @@ LOOP: r.client.zkAddrs, jerrors.ErrorStack(err)) if err == nil { // copy r.services - services := []config.URL{} + services := []common.URL{} for _, confIf = range r.services { services = append(services, confIf) } @@ -247,7 +247,7 @@ LOOP: } } -func (r *ZkRegistry) Register(conf config.URL) error { +func (r *ZkRegistry) Register(conf common.URL) error { var ( ok bool err error @@ -255,7 +255,7 @@ func (r *ZkRegistry) Register(conf config.URL) error { ) role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) switch role { - case config.CONSUMER: + case common.CONSUMER: ok = false r.cltLock.Lock() _, ok = r.services[conf.Key()] @@ -280,7 +280,7 @@ func (r *ZkRegistry) Register(conf config.URL) error { if listener != nil { go listener.listenServiceEvent(conf) } - case config.PROVIDER: + case common.PROVIDER: // 检验服务是否已经注册过 ok = false @@ -308,7 +308,7 @@ func (r *ZkRegistry) Register(conf config.URL) error { return nil } -func (r *ZkRegistry) register(c config.URL) error { +func (r *ZkRegistry) register(c common.URL) error { var ( err error //revision string @@ -336,13 +336,13 @@ func (r *ZkRegistry) register(c config.URL) error { role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) switch role { - case config.PROVIDER: + case common.PROVIDER: if c.Path == "" || len(c.Methods) == 0 { return jerrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods) } // 先创建服务下面的provider node - dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, config.DubboNodes[config.PROVIDER]) + dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, common.DubboNodes[common.PROVIDER]) r.cltLock.Lock() err = r.client.Create(dubboPath) r.cltLock.Unlock() @@ -355,10 +355,10 @@ func (r *ZkRegistry) register(c config.URL) error { // dubbo java consumer来启动找provider url时,因为category不匹配,会找不到provider,导致consumer启动不了,所以使用consumers&providers // DubboRole = [...]string{"consumer", "", "", "provider"} // params.Add("category", (RoleType(PROVIDER)).Role()) - params.Add("category", (config.RoleType(config.PROVIDER)).String()) + params.Add("category", (common.RoleType(common.PROVIDER)).String()) params.Add("dubbo", "dubbo-provider-golang-"+version.Version) - params.Add("side", (config.RoleType(config.PROVIDER)).Role()) + params.Add("side", (common.RoleType(common.PROVIDER)).Role()) if len(c.Methods) == 0 { params.Add("methods", strings.Join(c.Methods, ",")) @@ -380,11 +380,11 @@ func (r *ZkRegistry) register(c config.URL) error { encodedURL = url.QueryEscape(rawURL) // 把自己注册service providers - dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, (config.RoleType(config.PROVIDER)).String()) + dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, (common.RoleType(common.PROVIDER)).String()) log.Debug("provider path:%s, url:%s", dubboPath, rawURL) - case config.CONSUMER: - dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, config.DubboNodes[config.CONSUMER]) + case common.CONSUMER: + dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, common.DubboNodes[common.CONSUMER]) r.cltLock.Lock() err = r.client.Create(dubboPath) r.cltLock.Unlock() @@ -392,7 +392,7 @@ func (r *ZkRegistry) register(c config.URL) error { log.Error("zkClient.create(path{%s}) = error{%v}", dubboPath, jerrors.ErrorStack(err)) return jerrors.Trace(err) } - dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, config.DubboNodes[config.PROVIDER]) + dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, common.DubboNodes[common.PROVIDER]) r.cltLock.Lock() err = r.client.Create(dubboPath) r.cltLock.Unlock() @@ -403,13 +403,13 @@ func (r *ZkRegistry) register(c config.URL) error { params.Add("protocol", c.Protocol) - params.Add("category", (config.RoleType(config.CONSUMER)).String()) + params.Add("category", (common.RoleType(common.CONSUMER)).String()) params.Add("dubbo", "dubbogo-consumer-"+version.Version) rawURL = fmt.Sprintf("consumer://%s%s?%s", localIP, c.Path, params.Encode()) encodedURL = url.QueryEscape(rawURL) - dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, (config.RoleType(config.CONSUMER)).String()) + dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, (common.RoleType(common.CONSUMER)).String()) log.Debug("consumer path:%s, url:%s", dubboPath, rawURL) default: return jerrors.Errorf("@c{%v} type is not referencer or provider", c) @@ -446,12 +446,12 @@ func (r *ZkRegistry) registerTempZookeeperNode(root string, node string) error { return nil } -func (r *ZkRegistry) Subscribe(conf config.URL) (registry.Listener, error) { +func (r *ZkRegistry) Subscribe(conf common.URL) (registry.Listener, error) { r.wg.Add(1) return r.getListener(conf) } -func (r *ZkRegistry) getListener(conf config.URL) (*zkEventListener, error) { +func (r *ZkRegistry) getListener(conf common.URL) (*zkEventListener, error) { var ( zkListener *zkEventListener ) diff --git a/registry/zookeeper/registry_test.go b/registry/zookeeper/registry_test.go index c7214f1af7b3d5f9be2b404de73c863e0379c794..b6e7adbc755c17be01c0650154f65d7997fe50b8 100644 --- a/registry/zookeeper/registry_test.go +++ b/registry/zookeeper/registry_test.go @@ -2,6 +2,7 @@ package zookeeper import ( "context" + "github.com/dubbo/go-for-apache-dubbo/common" "strconv" "testing" "time" @@ -11,12 +12,11 @@ import ( ) import ( "github.com/dubbo/go-for-apache-dubbo/common/constant" - "github.com/dubbo/go-for-apache-dubbo/config" ) func Test_Register(t *testing.T) { - regurl, _ := config.NewURL(context.TODO(), "registry://127.0.0.1:1111", config.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(config.PROVIDER))) - url, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", config.WithParamsValue(constant.CLUSTER_KEY, "mock"), config.WithMethods([]string{"GetUser", "AddUser"})) + regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) ts, reg, err := NewMockZkRegistry(®url) defer ts.Stop() @@ -27,8 +27,8 @@ func Test_Register(t *testing.T) { } func Test_Subscribe(t *testing.T) { - regurl, _ := config.NewURL(context.TODO(), "registry://127.0.0.1:1111", config.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(config.PROVIDER))) - url, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", config.WithParamsValue(constant.CLUSTER_KEY, "mock"), config.WithMethods([]string{"GetUser", "AddUser"})) + regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) ts, reg, err := NewMockZkRegistry(®url) defer ts.Stop() @@ -41,7 +41,7 @@ func Test_Subscribe(t *testing.T) { } //consumer register - regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(config.CONSUMER)) + regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) _, reg2, err := NewMockZkRegistry(®url) reg2.client = reg.client err = reg2.Register(url) @@ -57,8 +57,8 @@ func Test_Subscribe(t *testing.T) { } func Test_ConsumerDestory(t *testing.T) { - regurl, _ := config.NewURL(context.TODO(), "registry://127.0.0.1:1111", config.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(config.CONSUMER))) - url, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", config.WithParamsValue(constant.CLUSTER_KEY, "mock"), config.WithMethods([]string{"GetUser", "AddUser"})) + regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))) + url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) ts, reg, err := NewMockZkRegistry(®url) defer ts.Stop() @@ -77,8 +77,8 @@ func Test_ConsumerDestory(t *testing.T) { } func Test_ProviderDestory(t *testing.T) { - regurl, _ := config.NewURL(context.TODO(), "registry://127.0.0.1:1111", config.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(config.PROVIDER))) - url, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", config.WithParamsValue(constant.CLUSTER_KEY, "mock"), config.WithMethods([]string{"GetUser", "AddUser"})) + regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) ts, reg, err := NewMockZkRegistry(®url) defer ts.Stop()