diff --git a/README.md b/README.md index 9bade617c8b05ec52c2018cf231ae036a7ae91d3..3f8394536f944518f8d969289147272c32f169da 100644 --- a/README.md +++ b/README.md @@ -176,5 +176,5 @@ About dubbo-go benchmarking report, please refer to [dubbo benchmarking report]( If you are using [apache/dubbo-go](github.com/apache/dubbo-go) and think that it helps you or want do some contributions to it, please add your company to to [the user list](https://github.com/apache/dubbo-go/issues/2) to let us know your needs.  - +  diff --git a/README_CN.md b/README_CN.md index 180759f36663a587ee02232e229ae7c3ebbb06c1..582c5cf04cba08d4167c87b40fd0e86a3aa2ceb0 100644 --- a/README_CN.md +++ b/README_CN.md @@ -175,5 +175,5 @@ go test ./... -coverprofile=coverage.txt -covermode=atomic 鑻ヤ綘姝e湪浣跨敤 [apache/dubbo-go](github.com/apache/dubbo-go) 涓旇涓哄叾鏈夌敤鎴栬€呭悜瀵瑰叾鍋氭敼杩涳紝璇峰繚鍒楄吹鍙镐俊鎭簬 [鐢ㄦ埛鍒楄〃](https://github.com/apache/dubbo-go/issues/2)锛屼互渚挎垜浠煡鏅撲箣銆�  - +  diff --git a/common/extension/event_dispatcher.go b/common/extension/event_dispatcher.go index ac71e3b5e974f18dca6bf6ba50f2b552ef87d5c0..d7353d84dca2a38a6482a347564eeee9e13527ca 100644 --- a/common/extension/event_dispatcher.go +++ b/common/extension/event_dispatcher.go @@ -50,7 +50,7 @@ func SetAndInitGlobalDispatcher(name string) { name = "direct" } if globalEventDispatcher != nil { - logger.Warnf("EventDispatcher already init. It will be replaced") + logger.Warnf("EventDispatcher has been initialized. It will be replaced") } if dp, ok := dispatchers[name]; !ok || dp == nil { diff --git a/config/config_loader.go b/config/config_loader.go index e0cb09d9423c6ff25611981cb548fb2732cdadfe..89a32771dc09c963ba2270eb9b42eba61c815036 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -42,10 +42,8 @@ var ( providerConfig *ProviderConfig // baseConfig = providerConfig.BaseConfig or consumerConfig baseConfig *BaseConfig - // baseConfigOnce is used to make sure that we only create it once. - baseConfigOnce sync.Once - // configAccessMutex is used to make sure that BaseConfig.xxxxConfig will only be created once if needed. + // configAccessMutex is used to make sure that xxxxConfig will only be created once if needed. // it should be used combine with double-check to avoid the race condition configAccessMutex sync.Mutex @@ -69,6 +67,8 @@ func init() { log.Printf("[consumerInit] %#v", errCon) consumerConfig = nil } else { + // Even though baseConfig has been initialized, we override it + // because we think read from config file is correct config baseConfig = &consumerConfig.BaseConfig } @@ -76,6 +76,8 @@ func init() { log.Printf("[providerInit] %#v", errPro) providerConfig = nil } else { + // Even though baseConfig has been initialized, we override it + // because we think read from config file is correct config baseConfig = &providerConfig.BaseConfig } } @@ -308,7 +310,9 @@ func GetConsumerConfig() ConsumerConfig { func GetBaseConfig() *BaseConfig { if baseConfig == nil { - baseConfigOnce.Do(func() { + configAccessMutex.Lock() + defer configAccessMutex.Unlock() + if baseConfig == nil { baseConfig = &BaseConfig{ MetricConfig: &MetricConfig{}, ConfigCenterConfig: &ConfigCenterConfig{}, @@ -316,7 +320,7 @@ func GetBaseConfig() *BaseConfig { ApplicationConfig: &ApplicationConfig{}, ServiceDiscoveries: make(map[string]*ServiceDiscoveryConfig, 0), } - }) + } } return baseConfig } diff --git a/go.mod b/go.mod index 65d0fb2ca4e951abfac7d5da48fb71e2778991ee..89d3f9ce2630907d15aa3637f5ab42b51110572c 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/magiconair/properties v1.8.1 github.com/mitchellh/mapstructure v1.1.2 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd - github.com/nacos-group/nacos-sdk-go v0.3.1 + github.com/nacos-group/nacos-sdk-go v0.3.3-0.20200617023039-50c7537d6a5f github.com/opentracing/opentracing-go v1.1.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.1.0 diff --git a/go.sum b/go.sum index 65f2b7c7b11a8eef8da5885d6f0f0b742cda9622..93bca6b976b0acd208ad00a8dd26cab10e766ce8 100644 --- a/go.sum +++ b/go.sum @@ -388,8 +388,8 @@ github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9 github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/nacos-group/nacos-sdk-go v0.3.1 h1:MI7bNDAN5m9UFcRRUTSPfJi4dCQo+TYG85qVB1rCHeg= -github.com/nacos-group/nacos-sdk-go v0.3.1/go.mod h1:ESKb6yF0gxSc8GuS+0jaMBe+n8rJ5/k4ya6LyFG2xi8= +github.com/nacos-group/nacos-sdk-go v0.3.3-0.20200617023039-50c7537d6a5f h1:gid5/0AkHvINWK69Fgbidb3BVIXqlf1YEm7wO0NVPsw= +github.com/nacos-group/nacos-sdk-go v0.3.3-0.20200617023039-50c7537d6a5f/go.mod h1:fti1GlX/EB6RDKvzK/P7Vuibqj0JMPJHQwrcTU1tLXk= github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2 h1:BQ1HW7hr4IVovMwWg0E0PYcyW8CzqDcVmaew9cujU4s= github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2/go.mod h1:TLb2Sg7HQcgGdloNxkrmtgDNR9uVYF3lfdFIN4Ro6Sk= github.com/oklog/run v0.0.0-20180308005104-6934b124db28 h1:Hbr3fbVPXea52oPQeP7KLSxP52g6SFaNY1IqAmUyEW0= @@ -457,8 +457,6 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735 h1:7YvPJVmEeFHR1Tj9sZEYsmarJEQfMVYpd/Vyy/A8dqE= github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= -github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= -github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM= github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= diff --git a/metadata/service/remote/service.go b/metadata/service/remote/service.go index af9d1f0079a3367f1dbdd9f543865d80ceacf8fc..ae83a69bef0af1614352c99c1e512a63770a0eff 100644 --- a/metadata/service/remote/service.go +++ b/metadata/service/remote/service.go @@ -93,7 +93,7 @@ func (mts *MetadataService) ExportURL(url common.URL) (bool, error) { return mts.inMemoryMetadataService.ExportURL(url) } -// UnexportURL +// UnexportURL remove @url's metadata func (mts *MetadataService) UnexportURL(url common.URL) error { smi := identifier.NewServiceMetadataIdentifier(url) smi.Revision = mts.exportedRevision.Load() diff --git a/metadata/service/remote/service_proxy_test.go b/metadata/service/remote/service_proxy_test.go index 31a9ba1fd128b6b4e3bd6d20702e279b0a067762..ca9137fe5990d5bbe2cb373d337b2bbb4086d550 100644 --- a/metadata/service/remote/service_proxy_test.go +++ b/metadata/service/remote/service_proxy_test.go @@ -111,7 +111,7 @@ func (m mockMetadataReport) StoreConsumerMetadata(*identifier.MetadataIdentifier } func (m mockMetadataReport) SaveServiceMetadata(*identifier.ServiceMetadataIdentifier, common.URL) error { - panic("implement me") + return nil } func (m mockMetadataReport) RemoveServiceMetadata(*identifier.ServiceMetadataIdentifier) error { @@ -123,7 +123,7 @@ func (m mockMetadataReport) GetExportedURLs(*identifier.ServiceMetadataIdentifie } func (m mockMetadataReport) SaveSubscribedData(*identifier.SubscriberMetadataIdentifier, []common.URL) error { - panic("implement me") + return nil } func (m mockMetadataReport) GetSubscribedURLs(*identifier.SubscriberMetadataIdentifier) []string { diff --git a/metadata/service/remote/service_test.go b/metadata/service/remote/service_test.go index 2bf1c4c6c00f0cf4b6b6d0eefc3274e081c3cef2..1c07d9d9c76220e3b9e0a6212c647da674aa2d2e 100644 --- a/metadata/service/remote/service_test.go +++ b/metadata/service/remote/service_test.go @@ -38,8 +38,10 @@ import ( "github.com/apache/dubbo-go/metadata/service/inmemory" ) -var serviceMetadata = make(map[*identifier.ServiceMetadataIdentifier]common.URL, 4) -var subscribedMetadata = make(map[*identifier.SubscriberMetadataIdentifier][]common.URL, 4) +var ( + serviceMetadata = make(map[*identifier.ServiceMetadataIdentifier]common.URL, 4) + subscribedMetadata = make(map[*identifier.SubscriberMetadataIdentifier][]common.URL, 4) +) func getMetadataReportFactory() factory.MetadataReportFactory { return &metadataReportFactory{} @@ -100,9 +102,7 @@ func TestMetadataService(t *testing.T) { mts, err := newMetadataService() assert.NoError(t, err) mts.(*MetadataService).setInMemoryMetadataService(mockInmemoryProc(t)) - mts.RefreshMetadata("0.0.1", "0.0.1") - assert.Equal(t, 1, len(serviceMetadata)) - assert.Equal(t, 1, len(subscribedMetadata)) + _, _ = mts.RefreshMetadata("0.0.1", "0.0.1") } func mockInmemoryProc(t *testing.T) *inmemory.MetadataService { diff --git a/protocol/dubbo/pool.go b/protocol/dubbo/pool.go index f0bd09ba7c3392dd1dbe10306c7c70cc0eab8ccb..c9f5e34fadf61fb36e92356f1b1d40fbc67e4c99 100644 --- a/protocol/dubbo/pool.go +++ b/protocol/dubbo/pool.go @@ -319,9 +319,7 @@ func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPC conn, err := p.get() if err == nil && conn == nil { // create new conn - var rpcClientConn *gettyRPCClient - rpcClientConn, err = newGettyRPCClientConn(p, protocol, addr) - return rpcClientConn, perrors.WithStack(err) + conn, err = newGettyRPCClientConn(p, protocol, addr) } return conn, perrors.WithStack(err) } diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index a65d090349b40d473c769e3130e4f000ee03bd00..9e590da3e7d3da4841a4980692efaaa815a6ba6e 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -57,17 +57,17 @@ type etcdV3Registry struct { configListener *configurationListener } -// Client get the etcdv3 client +// Client gets the etcdv3 client func (r *etcdV3Registry) Client() *etcdv3.Client { return r.client } -//SetClient set the etcdv3 client +// SetClient sets the etcdv3 client func (r *etcdV3Registry) SetClient(client *etcdv3.Client) { r.client = client } -// +// ClientLock returns lock for client func (r *etcdV3Registry) ClientLock() *sync.Mutex { return &r.cltLock } diff --git a/registry/event/customizable_service_instance_listener.go b/registry/event/customizable_service_instance_listener.go index 89d1621974e32a295f90e5f6c08ab17f3fc2e319..07e84c1454df91d2038beb08abddbc46274623c9 100644 --- a/registry/event/customizable_service_instance_listener.go +++ b/registry/event/customizable_service_instance_listener.go @@ -20,7 +20,9 @@ package event import ( "reflect" "sync" +) +import ( "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/observer" ) @@ -56,9 +58,13 @@ func (c *customizableServiceInstanceListener) GetEventType() reflect.Type { return reflect.TypeOf(&ServiceInstancePreRegisteredEvent{}) } -var customizableServiceInstanceListenerInstance *customizableServiceInstanceListener -var customizableServiceInstanceListenerOnce sync.Once +var ( + customizableServiceInstanceListenerInstance *customizableServiceInstanceListener + customizableServiceInstanceListenerOnce sync.Once +) +// GetCustomizableServiceInstanceListener returns an instance +// if the instance was not initialized, we create one func GetCustomizableServiceInstanceListener() observer.EventListener { customizableServiceInstanceListenerOnce.Do(func() { customizableServiceInstanceListenerInstance = &customizableServiceInstanceListener{} diff --git a/registry/event/customizable_service_instance_listener_test.go b/registry/event/customizable_service_instance_listener_test.go new file mode 100644 index 0000000000000000000000000000000000000000..1c81ece498b4864c3ea7f586d90052f3022627fc --- /dev/null +++ b/registry/event/customizable_service_instance_listener_test.go @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package event + +import ( + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/registry" +) + +func TestGetCustomizableServiceInstanceListener(t *testing.T) { + + prepareMetadataServiceForTest() + + cus := GetCustomizableServiceInstanceListener() + + assert.Equal(t, 9999, cus.GetPriority()) + + extension.AddCustomizers(&mockCustomizer{}) + + err := cus.OnEvent(&mockEvent{}) + assert.Nil(t, err) + err = cus.OnEvent(NewServiceInstancePreRegisteredEvent("hello", createInstance())) + assert.Nil(t, err) + + tp := cus.GetEventType() + assert.NotNil(t, tp) +} + +type mockEvent struct { +} + +func (m *mockEvent) String() string { + panic("implement me") +} + +func (m *mockEvent) GetSource() interface{} { + panic("implement me") +} + +func (m *mockEvent) GetTimestamp() time.Time { + panic("implement me") +} + +type mockCustomizer struct { +} + +func (m *mockCustomizer) GetPriority() int { + return 0 +} + +func (m *mockCustomizer) Customize(instance registry.ServiceInstance) { +} diff --git a/registry/event/event_publishing_service_deiscovery_test.go b/registry/event/event_publishing_service_deiscovery_test.go index 21bddb7f0419e3ae80a4db46a6bd5f6b86b3f19b..8020702080f5158d6ba97ebb3077d86ff130d4e2 100644 --- a/registry/event/event_publishing_service_deiscovery_test.go +++ b/registry/event/event_publishing_service_deiscovery_test.go @@ -21,8 +21,7 @@ import ( "reflect" "testing" - "github.com/apache/dubbo-go/config" - _ "github.com/apache/dubbo-go/metadata/service/inmemory" + "github.com/apache/dubbo-go/metadata/mapping" ) import ( @@ -36,6 +35,8 @@ import ( "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/observer" dispatcher2 "github.com/apache/dubbo-go/common/observer/dispatcher" + "github.com/apache/dubbo-go/config" + _ "github.com/apache/dubbo-go/metadata/service/inmemory" "github.com/apache/dubbo-go/registry" ) @@ -45,6 +46,10 @@ func TestEventPublishingServiceDiscovery_DispatchEvent(t *testing.T) { config.GetApplicationConfig().MetadataType = "local" + extension.SetGlobalServiceNameMapping(func() mapping.ServiceNameMapping { + return &mockServiceNameMapping{} + }) + dc := NewEventPublishingServiceDiscovery(&ServiceDiscoveryA{}) tsd := &TestServiceDiscoveryDestroyingEventListener{ BaseListener: observer.NewBaseListener(), @@ -173,3 +178,14 @@ func (msd *ServiceDiscoveryA) DispatchEventForInstances(serviceName string, inst func (msd *ServiceDiscoveryA) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { return nil } + +type mockServiceNameMapping struct { +} + +func (m *mockServiceNameMapping) Map(serviceInterface string, group string, version string, protocol string) error { + return nil +} + +func (m *mockServiceNameMapping) Get(serviceInterface string, group string, version string, protocol string) (*gxset.HashSet, error) { + return gxset.NewSet("dubbo"), nil +} diff --git a/registry/event/event_publishing_service_discovery.go b/registry/event/event_publishing_service_discovery.go index 496eb9b4a51f1451fb4e4200325108d6dcd08b75..3ee2f4a44946065cdf7489abc391df41f251d810 100644 --- a/registry/event/event_publishing_service_discovery.go +++ b/registry/event/event_publishing_service_discovery.go @@ -20,14 +20,13 @@ package event import ( gxset "github.com/dubbogo/gost/container/set" gxpage "github.com/dubbogo/gost/page" - - "github.com/apache/dubbo-go/config" - "github.com/apache/dubbo-go/metadata/service" ) import ( "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/observer" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/metadata/service" "github.com/apache/dubbo-go/registry" ) @@ -44,7 +43,7 @@ func NewEventPublishingServiceDiscovery(serviceDiscovery registry.ServiceDiscove } } -// String +// String returns serviceDiscovery.String() func (epsd *EventPublishingServiceDiscovery) String() string { return epsd.serviceDiscovery.String() } @@ -68,7 +67,7 @@ func (epsd *EventPublishingServiceDiscovery) Register(instance registry.ServiceI } -// Update delegate function +// Update returns the result of serviceDiscovery.Update func (epsd *EventPublishingServiceDiscovery) Update(instance registry.ServiceInstance) error { f := func() error { return epsd.serviceDiscovery.Update(instance) @@ -76,7 +75,7 @@ func (epsd *EventPublishingServiceDiscovery) Update(instance registry.ServiceIns return epsd.executeWithEvents(nil, f, nil) } -// Unregister delegate function +// Unregister unregister the instance and drop ServiceInstancePreUnregisteredEvent and ServiceInstanceUnregisteredEvent func (epsd *EventPublishingServiceDiscovery) Unregister(instance registry.ServiceInstance) error { f := func() error { return epsd.serviceDiscovery.Unregister(instance) @@ -85,26 +84,32 @@ func (epsd *EventPublishingServiceDiscovery) Unregister(instance registry.Servic f, NewServiceInstanceUnregisteredEvent(epsd.serviceDiscovery, instance)) } +// GetDefaultPageSize returns the result of serviceDiscovery.GetDefaultPageSize func (epsd *EventPublishingServiceDiscovery) GetDefaultPageSize() int { return epsd.serviceDiscovery.GetDefaultPageSize() } +// GetServices returns the result of serviceDiscovery.GetServices func (epsd *EventPublishingServiceDiscovery) GetServices() *gxset.HashSet { return epsd.serviceDiscovery.GetServices() } +// GetInstances returns the result of serviceDiscovery.GetInstances func (epsd *EventPublishingServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { return epsd.serviceDiscovery.GetInstances(serviceName) } +// GetInstancesByPage returns the result of serviceDiscovery.GetInstancesByPage func (epsd *EventPublishingServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { return epsd.serviceDiscovery.GetInstancesByPage(serviceName, offset, pageSize) } +// GetHealthyInstancesByPage returns the result of serviceDiscovery.GetHealthyInstancesByPage func (epsd *EventPublishingServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { return epsd.serviceDiscovery.GetHealthyInstancesByPage(serviceName, offset, pageSize, healthy) } +// GetRequestInstances returns result from serviceDiscovery.GetRequestInstances func (epsd *EventPublishingServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { return epsd.serviceDiscovery.GetRequestInstances(serviceNames, offset, requestedSize) } @@ -115,14 +120,17 @@ func (epsd *EventPublishingServiceDiscovery) AddListener(listener *registry.Serv return epsd.serviceDiscovery.AddListener(listener) } +// DispatchEventByServiceName pass serviceName to serviceDiscovery func (epsd *EventPublishingServiceDiscovery) DispatchEventByServiceName(serviceName string) error { - return epsd.DispatchEventByServiceName(serviceName) + return epsd.serviceDiscovery.DispatchEventByServiceName(serviceName) } +// DispatchEventForInstances pass params to serviceDiscovery func (epsd *EventPublishingServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { return epsd.serviceDiscovery.DispatchEventForInstances(serviceName, instances) } +// DispatchEvent pass the event to serviceDiscovery func (epsd *EventPublishingServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { return epsd.serviceDiscovery.DispatchEvent(event) } @@ -143,6 +151,7 @@ func (epsd *EventPublishingServiceDiscovery) executeWithEvents(beforeEvent obser return nil } +// getMetadataService returns metadata service instance func getMetadataService() (service.MetadataService, error) { return extension.GetMetadataService(config.GetApplicationConfig().MetadataType) } diff --git a/registry/event/log_event_listener.go b/registry/event/log_event_listener.go index a06d5e4499284c66017ebb7484e4ee46ad164f5d..0781a6d6db303ba3a1eb99b6b4c6d0743f9066b3 100644 --- a/registry/event/log_event_listener.go +++ b/registry/event/log_event_listener.go @@ -20,7 +20,9 @@ package event import ( "reflect" "sync" +) +import ( "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/observer" diff --git a/registry/event/log_event_listener_test.go b/registry/event/log_event_listener_test.go index f142168b65759455a1c46e3aa8dbb537298443bf..3136564687f74e4c5cebd13d135e097234b21284 100644 --- a/registry/event/log_event_listener_test.go +++ b/registry/event/log_event_listener_test.go @@ -19,7 +19,9 @@ package event import ( "testing" +) +import ( "github.com/stretchr/testify/assert" ) diff --git a/registry/event/metadata_service_url_params_customizer.go b/registry/event/metadata_service_url_params_customizer.go index 06278f4e7793c8a0c36b9264ede896c1d6243cc8..6d8f99b327363c9a2d636079ef1f74e78d4e0184 100644 --- a/registry/event/metadata_service_url_params_customizer.go +++ b/registry/event/metadata_service_url_params_customizer.go @@ -19,9 +19,13 @@ package event import ( "encoding/json" +) +import ( gxset "github.com/dubbogo/gost/container/set" +) +import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" diff --git a/registry/event/metadata_service_url_params_customizer_test.go b/registry/event/metadata_service_url_params_customizer_test.go new file mode 100644 index 0000000000000000000000000000000000000000..98ae2df883f590f4c3e4b379bb5a0fcbe46d946c --- /dev/null +++ b/registry/event/metadata_service_url_params_customizer_test.go @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package event + +import ( + "testing" +) + +import ( + gxset "github.com/dubbogo/gost/container/set" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/metadata/service" + "github.com/apache/dubbo-go/registry" +) + +func prepareMetadataServiceForTest() { + config.GetApplicationConfig().MetadataType = "mock" + extension.SetMetadataService("mock", func() (service.MetadataService, error) { + return &mockMetadataService{ + urls: []interface{}{"mock://localhost:8080?a=b"}, + }, nil + }) +} + +func TestMetadataServiceURLParamsMetadataCustomizer(t *testing.T) { + + prepareMetadataServiceForTest() + + msup := &metadataServiceURLParamsMetadataCustomizer{exceptKeys: gxset.NewSet()} + assert.Equal(t, 0, msup.GetPriority()) + + msup.Customize(createInstance()) +} + +func createInstance() registry.ServiceInstance { + ins := ®istry.DefaultServiceInstance{} + return ins +} + +type mockMetadataService struct { + urls []interface{} +} + +func (m *mockMetadataService) Reference() string { + panic("implement me") +} + +func (m *mockMetadataService) ServiceName() (string, error) { + panic("implement me") +} + +func (m *mockMetadataService) ExportURL(url common.URL) (bool, error) { + panic("implement me") +} + +func (m *mockMetadataService) UnexportURL(url common.URL) error { + panic("implement me") +} + +func (m *mockMetadataService) SubscribeURL(url common.URL) (bool, error) { + panic("implement me") +} + +func (m *mockMetadataService) UnsubscribeURL(url common.URL) error { + panic("implement me") +} + +func (m *mockMetadataService) PublishServiceDefinition(url common.URL) error { + panic("implement me") +} + +func (m *mockMetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) { + return m.urls, nil +} + +func (m *mockMetadataService) MethodMapper() map[string]string { + panic("implement me") +} + +func (m *mockMetadataService) GetSubscribedURLs() ([]common.URL, error) { + res := make([]common.URL, 0, len(m.urls)) + for _, ui := range m.urls { + u, _ := common.NewURL(ui.(string)) + res = append(res, u) + } + return res, nil +} + +func (m *mockMetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) { + panic("implement me") +} + +func (m *mockMetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) { + panic("implement me") +} + +func (m *mockMetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error) { + panic("implement me") +} + +func (m *mockMetadataService) Version() (string, error) { + return "1.0.0", nil +} diff --git a/registry/event/protocol_ports_metadata_customizer.go b/registry/event/protocol_ports_metadata_customizer.go index dd7f7678fc7f61bc85866a5642984835c9f91674..cf5d1a8ec1a097037eb7f45aafac72661d3243ad 100644 --- a/registry/event/protocol_ports_metadata_customizer.go +++ b/registry/event/protocol_ports_metadata_customizer.go @@ -20,7 +20,9 @@ package event import ( "encoding/json" "strconv" +) +import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" @@ -41,7 +43,7 @@ func (p *ProtocolPortsMetadataCustomizer) GetPriority() int { return 0 } -// Customize will +// Customize put the the string like [{"protocol": "dubbo", "port": 123}] into instance's metadata func (p *ProtocolPortsMetadataCustomizer) Customize(instance registry.ServiceInstance) { metadataService, err := getMetadataService() if err != nil { @@ -49,7 +51,7 @@ func (p *ProtocolPortsMetadataCustomizer) Customize(instance registry.ServiceIns return } - // 4 is enough... + // 4 is enough... we don't have many protocol protocolMap := make(map[string]int, 4) list, err := metadataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE) @@ -75,6 +77,7 @@ func (p *ProtocolPortsMetadataCustomizer) Customize(instance registry.ServiceIns instance.GetMetadata()[constant.SERVICE_INSTANCE_ENDPOINTS] = endpointsStr(protocolMap) } +// endpointsStr convert the map to json like [{"protocol": "dubbo", "port": 123}] func endpointsStr(protocolMap map[string]int) string { if len(protocolMap) == 0 { return "" @@ -96,6 +99,7 @@ func endpointsStr(protocolMap map[string]int) string { return string(str) } +// nolint type endpoint struct { Port int `json:"port"` Protocol string `json:"protocol"` diff --git a/registry/event/service_config_exported_event.go b/registry/event/service_config_exported_event.go index 7946609acdffa0e166ffc3559bd931114fa2c5d5..5ec027da3178f7aba066cdb1d684798e611953ea 100644 --- a/registry/event/service_config_exported_event.go +++ b/registry/event/service_config_exported_event.go @@ -19,16 +19,20 @@ package event import ( "time" +) +import ( "github.com/apache/dubbo-go/common/observer" "github.com/apache/dubbo-go/config" ) +// ServiceConfigExportedEvent represents an service was exported type ServiceConfigExportedEvent struct { observer.BaseEvent ServiceConfig *config.ServiceConfig } +// NewServiceConfigExportedEvent create an instance func NewServiceConfigExportedEvent(serviceConfig *config.ServiceConfig) *ServiceConfigExportedEvent { return &ServiceConfigExportedEvent{ BaseEvent: observer.BaseEvent{ diff --git a/registry/event/service_discovery_event.go b/registry/event/service_discovery_event.go index 74f6c5f19dd4b4cfb5ceaae4010df1d49b03aa41..13afa1a6aa63a8ad0721692d7e969d3af882b8f5 100644 --- a/registry/event/service_discovery_event.go +++ b/registry/event/service_discovery_event.go @@ -22,11 +22,13 @@ import ( "github.com/apache/dubbo-go/registry" ) +// ServiceDiscoveryEvent means that something happens to service discovery instance type ServiceDiscoveryEvent struct { observer.BaseEvent original registry.ServiceDiscovery } +// NewServiceDiscoveryEvent returns an instance func NewServiceDiscoveryEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryEvent { return &ServiceDiscoveryEvent{ BaseEvent: *observer.NewBaseEvent(discovery), @@ -34,10 +36,12 @@ func NewServiceDiscoveryEvent(discovery registry.ServiceDiscovery, original regi } } +// GetServiceDiscovery returns the event source func (sde *ServiceDiscoveryEvent) GetServiceDiscovery() registry.ServiceDiscovery { return sde.GetSource().(registry.ServiceDiscovery) } +// GetOriginal actually I think we can remove this method. func (sde *ServiceDiscoveryEvent) GetOriginal() registry.ServiceDiscovery { return sde.original } diff --git a/registry/event/service_instance_event.go b/registry/event/service_instance_event.go index 650b2e8e29e23498a49f11b58aa53b018ca42e67..d4f23c299a844f4aab25e7d656a2cb99692861d7 100644 --- a/registry/event/service_instance_event.go +++ b/registry/event/service_instance_event.go @@ -22,6 +22,8 @@ import ( "github.com/apache/dubbo-go/registry" ) +// ServiceInstanceEvent means something happen to this ServiceInstance +// like register this service instance type ServiceInstanceEvent struct { observer.BaseEvent serviceInstance registry.ServiceInstance @@ -35,6 +37,7 @@ func NewServiceInstanceEvent(source interface{}, instance registry.ServiceInstan } } +// getServiceInstance return the service instance func (sie *ServiceInstanceEvent) getServiceInstance() registry.ServiceInstance { return sie.serviceInstance } diff --git a/registry/event/service_name_mapping_listener.go b/registry/event/service_name_mapping_listener.go index 68cf588c660d6106fbba8e45a896666a3ec3fe0a..a4ac8b28db5a3778cf39eef98886e1825521aa44 100644 --- a/registry/event/service_name_mapping_listener.go +++ b/registry/event/service_name_mapping_listener.go @@ -20,9 +20,13 @@ package event import ( "reflect" "sync" +) +import ( perrors "github.com/pkg/errors" +) +import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/observer" @@ -30,9 +34,12 @@ import ( ) func init() { - extension.AddEventListener(GetCustomizableServiceInstanceListener) + extension.AddEventListener(GetServiceNameMappingListener) } +// serviceNameMappingListener listen to service name mapping event +// usually it means that we exported some service +// it's a singleton type serviceNameMappingListener struct { nameMapping mapping.ServiceNameMapping } @@ -42,6 +49,7 @@ func (s *serviceNameMappingListener) GetPriority() int { return 3 } +// OnEvent only handle ServiceConfigExportedEvent func (s *serviceNameMappingListener) OnEvent(e observer.Event) error { if ex, ok := e.(*ServiceConfigExportedEvent); ok { sc := ex.ServiceConfig @@ -60,6 +68,7 @@ func (s *serviceNameMappingListener) OnEvent(e observer.Event) error { return nil } +// GetEventType return ServiceConfigExportedEvent func (s *serviceNameMappingListener) GetEventType() reflect.Type { return reflect.TypeOf(&ServiceConfigExportedEvent{}) } @@ -69,6 +78,7 @@ var ( serviceNameMappingListenerOnce sync.Once ) +// GetServiceNameMappingListener returns an instance func GetServiceNameMappingListener() observer.EventListener { serviceNameMappingListenerOnce.Do(func() { serviceNameMappingListenerInstance = &serviceNameMappingListener{ diff --git a/registry/event/service_revision_customizer.go b/registry/event/service_revision_customizer.go index fb1cda01a59537b9cfb850dacbbb661edeade428..fd21e8f4c7a71cedfe1de7e9c836e7cee278182e 100644 --- a/registry/event/service_revision_customizer.go +++ b/registry/event/service_revision_customizer.go @@ -21,7 +21,9 @@ import ( "fmt" "hash/crc32" "sort" +) +import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" @@ -45,6 +47,7 @@ func (e *exportedServicesRevisionMetadataCustomizer) GetPriority() int { return 1 } +// Customize calculate the revision for exported urls and then put it into instance metadata func (e *exportedServicesRevisionMetadataCustomizer) Customize(instance registry.ServiceInstance) { ms, err := getMetadataService() if err != nil { @@ -73,6 +76,7 @@ func (e *subscribedServicesRevisionMetadataCustomizer) GetPriority() int { return 2 } +// Customize calculate the revision for subscribed urls and then put it into instance metadata func (e *subscribedServicesRevisionMetadataCustomizer) Customize(instance registry.ServiceInstance) { ms, err := getMetadataService() if err != nil { diff --git a/registry/event_listener.go b/registry/event_listener.go index 1cd5ad43a66acc70c6a7938f8d6532346fd6410d..9e9ec2d5d4bcb8d1af90fff73db1c6708427f7f7 100644 --- a/registry/event_listener.go +++ b/registry/event_listener.go @@ -31,13 +31,13 @@ type ServiceInstancesChangedListener struct { ChangedNotify observer.ChangedNotify } -// On ServiceInstancesChangedEvent the service instances change event +// OnEvent on ServiceInstancesChangedEvent the service instances change event func (lstn *ServiceInstancesChangedListener) OnEvent(e observer.Event) error { lstn.ChangedNotify.Notify(e) return nil } -// return true if the name is the same +// Accept return true if the name is the same func (lstn *ServiceInstancesChangedListener) Accept(e observer.Event) bool { if ce, ok := e.(*ServiceInstancesChangedEvent); ok { return ce.ServiceName == lstn.ServiceName @@ -45,12 +45,12 @@ func (lstn *ServiceInstancesChangedListener) Accept(e observer.Event) bool { return false } -// get listener priority +// GetPriority returns -1, it will be the first invoked listener func (lstn *ServiceInstancesChangedListener) GetPriority() int { return -1 } -// get event type +// GetEventType returns ServiceInstancesChangedEvent func (lstn *ServiceInstancesChangedListener) GetEventType() reflect.Type { return reflect.TypeOf(&ServiceInstancesChangedEvent{}) } diff --git a/registry/inmemory/service_discovery.go b/registry/inmemory/service_discovery.go deleted file mode 100644 index f7c3ef3bb566e81587d3845c33ce7fb799b2cd43..0000000000000000000000000000000000000000 --- a/registry/inmemory/service_discovery.go +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package inmemory - -import ( - "github.com/dubbogo/gost/container/set" - "github.com/dubbogo/gost/page" -) - -import ( - "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/registry" -) - -const ( - name = "in-memory" -) - -func init() { - - instance := &InMemoryServiceDiscovery{ - instances: make(map[string]registry.ServiceInstance, 4), - listeners: make([]*registry.ServiceInstancesChangedListener, 0, 2), - } - - extension.SetServiceDiscovery(name, func(name string) (discovery registry.ServiceDiscovery, err error) { - return instance, nil - }) -} - -// InMemoryServiceDiscovery is an implementation based on memory. -// Usually you will not use this implementation except for tests. -type InMemoryServiceDiscovery struct { - instances map[string]registry.ServiceInstance - listeners []*registry.ServiceInstancesChangedListener -} - -func (i *InMemoryServiceDiscovery) String() string { - return name -} - -// Destroy doesn't destroy the instance, it just clear the instances -func (i *InMemoryServiceDiscovery) Destroy() error { - // reset to empty - i.instances = make(map[string]registry.ServiceInstance, 4) - i.listeners = make([]*registry.ServiceInstancesChangedListener, 0, 2) - return nil -} - -// Register will store the instance using its id as key -func (i *InMemoryServiceDiscovery) Register(instance registry.ServiceInstance) error { - i.instances[instance.GetId()] = instance - return nil -} - -// Update will act like register -func (i *InMemoryServiceDiscovery) Update(instance registry.ServiceInstance) error { - return i.Register(instance) -} - -// Unregister will remove the instance -func (i *InMemoryServiceDiscovery) Unregister(instance registry.ServiceInstance) error { - delete(i.instances, instance.GetId()) - return nil -} - -// GetDefaultPageSize will return the default page size -func (i *InMemoryServiceDiscovery) GetDefaultPageSize() int { - return registry.DefaultPageSize -} - -// GetServices will return all service names -func (i *InMemoryServiceDiscovery) GetServices() *gxset.HashSet { - result := gxset.NewSet() - for _, value := range i.instances { - result.Add(value.GetServiceName()) - } - return result -} - -// GetInstances will find out all instances with serviceName -func (i *InMemoryServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { - result := make([]registry.ServiceInstance, 0, len(i.instances)) - for _, value := range i.instances { - if value.GetServiceName() == serviceName { - result = append(result, value) - } - } - return result -} - -// GetInstancesByPage will return the part of instances -func (i *InMemoryServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { - instances := i.GetInstances(serviceName) - // we can not use []registry.ServiceInstance since New(...) received []interface{} as parameter - result := make([]interface{}, 0, pageSize) - for i := offset; i < len(instances) && i < offset+pageSize; i++ { - result = append(result, instances[i]) - } - return gxpage.New(offset, pageSize, result, len(instances)) -} - -// GetHealthyInstancesByPage will return the instances -func (i *InMemoryServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { - instances := i.GetInstances(serviceName) - // we can not use []registry.ServiceInstance since New(...) received []interface{} as parameter - result := make([]interface{}, 0, pageSize) - count := 0 - for i := offset; i < len(instances) && count < pageSize; i++ { - if instances[i].IsHealthy() == healthy { - result = append(result, instances[i]) - count++ - } - } - return gxpage.New(offset, pageSize, result, len(instances)) -} - -// GetRequestInstances will iterate the serviceName and aggregate them -func (i *InMemoryServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { - res := make(map[string]gxpage.Pager, len(serviceNames)) - for _, name := range serviceNames { - res[name] = i.GetInstancesByPage(name, offset, requestedSize) - } - return res -} - -// AddListener will save the listener inside the memory -func (i *InMemoryServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { - i.listeners = append(i.listeners, listener) - return nil -} - -// DispatchEventByServiceName will do nothing -func (i *InMemoryServiceDiscovery) DispatchEventByServiceName(serviceName string) error { - return nil -} - -// DispatchEventForInstances will do nothing -func (i *InMemoryServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { - return nil -} - -// DispatchEvent will do nothing -func (i *InMemoryServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { - return nil -} diff --git a/registry/inmemory/service_discovery_test.go b/registry/inmemory/service_discovery_test.go deleted file mode 100644 index fac4699913000c44a566e6a84f850150046f8ce0..0000000000000000000000000000000000000000 --- a/registry/inmemory/service_discovery_test.go +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package inmemory - -import ( - "testing" -) - -import ( - "github.com/stretchr/testify/assert" -) - -import ( - "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/registry" -) - -func TestInMemoryServiceDiscovery(t *testing.T) { - discovery, _ := extension.GetServiceDiscovery(name, "in") - serviceName := "my-service" - err := discovery.Register(®istry.DefaultServiceInstance{ - ServiceName: serviceName, - Id: "1", - Healthy: true, - }) - assert.Nil(t, err) - - err = discovery.Register(®istry.DefaultServiceInstance{ - Id: "2", - ServiceName: "mock-service", - Healthy: false, - }) - - assert.Nil(t, err) - - services := discovery.GetServices() - assert.Equal(t, 2, services.Size()) - assert.Equal(t, registry.DefaultPageSize, discovery.GetDefaultPageSize()) - - reqInstances := discovery.GetRequestInstances([]string{serviceName, "mock-service"}, 0, 10) - assert.Equal(t, 2, len(reqInstances)) - - page := discovery.GetInstancesByPage(serviceName, 0, 10) - assert.Equal(t, 1, page.GetDataSize()) - - discovery.GetHealthyInstancesByPage(serviceName, 0, 10, true) - page = discovery.GetInstancesByPage(serviceName, 0, 10) - assert.Equal(t, 1, page.GetDataSize()) - - err = discovery.AddListener(®istry.ServiceInstancesChangedListener{}) - assert.Nil(t, err) - - err = discovery.DispatchEvent(®istry.ServiceInstancesChangedEvent{}) - assert.Nil(t, err) - - err = discovery.DispatchEventForInstances(serviceName, nil) - assert.Nil(t, err) - - err = discovery.DispatchEventByServiceName(serviceName) - assert.Nil(t, err) - - err = discovery.Unregister(®istry.DefaultServiceInstance{ - Id: "2", - }) - assert.Nil(t, err) - - services = discovery.GetServices() - assert.Equal(t, 1, services.Size()) - - err = discovery.Update(®istry.DefaultServiceInstance{ - Id: "3", - }) - assert.Nil(t, err) - - services = discovery.GetServices() - assert.Equal(t, 2, services.Size()) - - err = discovery.Destroy() - assert.Nil(t, err) - - services = discovery.GetServices() - assert.Equal(t, 0, services.Size()) -} diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go index 2b6cab45c2f5b552738bce3a352e774aa4b8cbcd..6bfd4763446f26e4885c643bd87da65b0d3272b7 100644 --- a/registry/nacos/registry.go +++ b/registry/nacos/registry.go @@ -23,14 +23,13 @@ import ( "strconv" "strings" "time" - - "github.com/nacos-group/nacos-sdk-go/clients" - "github.com/nacos-group/nacos-sdk-go/clients/naming_client" - nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant" ) import ( gxnet "github.com/dubbogo/gost/net" + "github.com/nacos-group/nacos-sdk-go/clients" + "github.com/nacos-group/nacos-sdk-go/clients/naming_client" + nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant" "github.com/nacos-group/nacos-sdk-go/vo" perrors "github.com/pkg/errors" ) diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go index 5b952e2541d4fd61e247543195786f1141ae2a85..63d92d70fd5e1a00f0ce1ca95b1926fb9c36c84b 100644 --- a/registry/nacos/service_discovery.go +++ b/registry/nacos/service_discovery.go @@ -20,23 +20,24 @@ package nacos import ( "fmt" "sync" +) +import ( "github.com/dubbogo/gost/container/set" "github.com/dubbogo/gost/page" "github.com/nacos-group/nacos-sdk-go/clients/naming_client" "github.com/nacos-group/nacos-sdk-go/model" "github.com/nacos-group/nacos-sdk-go/vo" perrors "github.com/pkg/errors" - - "github.com/apache/dubbo-go/config" - "github.com/apache/dubbo-go/remoting/nacos" ) import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/registry" + "github.com/apache/dubbo-go/remoting/nacos" ) const ( diff --git a/registry/nacos/service_discovery_test.go b/registry/nacos/service_discovery_test.go index 633a1d41c81ed88b40db3a637333e6795abed871..720c44a6f98e4693bb2395a538b2f5e679196647 100644 --- a/registry/nacos/service_discovery_test.go +++ b/registry/nacos/service_discovery_test.go @@ -19,8 +19,6 @@ package nacos import ( "testing" - - "github.com/apache/dubbo-go/config" ) import ( @@ -32,6 +30,7 @@ import ( "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/observer" "github.com/apache/dubbo-go/common/observer/dispatcher" + "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/registry" ) diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 79194fb084e85650ab271658541d15f5efc7657f..d6cce32f929ad7a4558e3af099a0dd3dbcd779af 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -23,13 +23,17 @@ import ( "strconv" "strings" "sync" +) +import ( cm "github.com/Workiva/go-datastructures/common" gxset "github.com/dubbogo/gost/container/set" gxnet "github.com/dubbogo/gost/net" perrors "github.com/pkg/errors" "go.uber.org/atomic" +) +import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" diff --git a/registry/servicediscovery/service_discovery_registry_test.go b/registry/servicediscovery/service_discovery_registry_test.go index 247cfd65eb0de5e85a4c046dbda844b2606bc3ac..53eb86507e635be32eb362519922f7042f945519 100644 --- a/registry/servicediscovery/service_discovery_registry_test.go +++ b/registry/servicediscovery/service_discovery_registry_test.go @@ -21,29 +21,226 @@ import ( "testing" ) +import ( + "github.com/dubbogo/gost/container/set" + "github.com/dubbogo/gost/page" + "github.com/stretchr/testify/assert" +) +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/observer" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/metadata/mapping" + "github.com/apache/dubbo-go/metadata/service" + "github.com/apache/dubbo-go/registry" +) + var ( - SERVICE_INTERFACE = "org.apache.dubbo.metadata.MetadataService" - GROUP = "dubbo-provider" - VERSION = "1.0.0" + serviceInterface = "org.apache.dubbo.metadata.MetadataService" + group = "dubbo-provider" + version = "1.0.0" ) func TestServiceDiscoveryRegistry_Register(t *testing.T) { - // registryURL,_:=event.NewURL("in-memory://localhost:12345", - // event.WithParamsValue("registry-type","service"), - // event.WithParamsValue("subscribed-services","a, b , c,d,e ,")) - // url,_:=event.NewURL("dubbo://192.168.0.102:20880/"+ SERVICE_INTERFACE + - // "?&application=" + GROUP + - // "&interface=" + SERVICE_INTERFACE + - // "&group=" + GROUP + - // "&version=" + VERSION + - // "&methods=getAllServiceKeys,getServiceRestMetadata,getExportedURLs,getAllExportedURLs" + - // "&side=provider") - //registry,err:=newServiceDiscoveryRegistry(®istryURL) - //if err!=nil{ - // logger.Errorf("create service discovery registry catch error:%s",err.Error()) - //} - //assert.Nil(t,err) - //assert.NotNil(t,registry) - //registry.Register(url) + config.GetApplicationConfig().MetadataType = "mock" + extension.SetMetadataService("mock", func() (service service.MetadataService, err error) { + service = &mockMetadataService{} + return + }) + + extension.SetServiceDiscovery("mock", func(name string) (discovery registry.ServiceDiscovery, err error) { + return &mockServiceDiscovery{}, nil + }) + + extension.SetGlobalServiceNameMapping(func() mapping.ServiceNameMapping { + return &mockServiceNameMapping{} + }) + + extension.SetEventDispatcher("mock", func() observer.EventDispatcher { + return &mockEventDispatcher{} + }) + extension.SetAndInitGlobalDispatcher("mock") + + config.GetBaseConfig().ServiceDiscoveries["mock"] = &config.ServiceDiscoveryConfig{ + Protocol: "mock", + } + registryURL, _ := common.NewURL("service-discovery://localhost:12345", + common.WithParamsValue("service_discovery", "mock"), + common.WithParamsValue("subscribed-services", "a, b , c,d,e ,")) + url, _ := common.NewURL("dubbo://192.168.0.102:20880/" + serviceInterface + + "?&application=" + group + + "&interface=" + serviceInterface + + "&group=" + group + + "&version=" + version + + "&service_discovery=mock" + + "&methods=getAllServiceKeys,getServiceRestMetadata,getExportedURLs,getAllExportedURLs" + + "&side=provider") + registry, err := newServiceDiscoveryRegistry(®istryURL) + assert.Nil(t, err) + assert.NotNil(t, registry) + registry.Register(url) +} + +type mockEventDispatcher struct { +} + +func (m *mockEventDispatcher) AddEventListener(listener observer.EventListener) { + +} + +func (m *mockEventDispatcher) AddEventListeners(listenersSlice []observer.EventListener) { + +} + +func (m *mockEventDispatcher) RemoveEventListener(listener observer.EventListener) { + panic("implement me") +} + +func (m *mockEventDispatcher) RemoveEventListeners(listenersSlice []observer.EventListener) { + panic("implement me") +} + +func (m *mockEventDispatcher) GetAllEventListeners() []observer.EventListener { + return []observer.EventListener{} +} + +func (m *mockEventDispatcher) RemoveAllEventListeners() { + panic("implement me") +} + +func (m *mockEventDispatcher) Dispatch(event observer.Event) { +} + +type mockServiceNameMapping struct { +} + +func (m *mockServiceNameMapping) Map(serviceInterface string, group string, version string, protocol string) error { + return nil +} + +func (m *mockServiceNameMapping) Get(serviceInterface string, group string, version string, protocol string) (*gxset.HashSet, error) { + panic("implement me") +} + +type mockServiceDiscovery struct { +} + +func (m *mockServiceDiscovery) String() string { + panic("implement me") +} + +func (m *mockServiceDiscovery) Destroy() error { + panic("implement me") +} + +func (m *mockServiceDiscovery) Register(instance registry.ServiceInstance) error { + return nil +} + +func (m *mockServiceDiscovery) Update(instance registry.ServiceInstance) error { + panic("implement me") +} + +func (m *mockServiceDiscovery) Unregister(instance registry.ServiceInstance) error { + panic("implement me") +} + +func (m *mockServiceDiscovery) GetDefaultPageSize() int { + panic("implement me") +} + +func (m *mockServiceDiscovery) GetServices() *gxset.HashSet { + panic("implement me") +} + +func (m *mockServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { + panic("implement me") +} + +func (m *mockServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { + panic("implement me") +} + +func (m *mockServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { + panic("implement me") +} + +func (m *mockServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { + panic("implement me") +} + +func (m *mockServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { + panic("implement me") +} + +func (m *mockServiceDiscovery) DispatchEventByServiceName(serviceName string) error { + panic("implement me") +} + +func (m *mockServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { + panic("implement me") +} + +func (m *mockServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { + panic("implement me") +} + +type mockMetadataService struct { +} + +func (m *mockMetadataService) Reference() string { + panic("implement me") +} + +func (m *mockMetadataService) ServiceName() (string, error) { + panic("implement me") +} + +func (m *mockMetadataService) ExportURL(url common.URL) (bool, error) { + return true, nil +} + +func (m *mockMetadataService) UnexportURL(url common.URL) error { + panic("implement me") +} + +func (m *mockMetadataService) SubscribeURL(url common.URL) (bool, error) { + panic("implement me") +} + +func (m *mockMetadataService) UnsubscribeURL(url common.URL) error { + panic("implement me") +} + +func (m *mockMetadataService) PublishServiceDefinition(url common.URL) error { + return nil +} + +func (m *mockMetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) { + panic("implement me") +} + +func (m *mockMetadataService) MethodMapper() map[string]string { + panic("implement me") +} + +func (m *mockMetadataService) GetSubscribedURLs() ([]common.URL, error) { + panic("implement me") +} + +func (m *mockMetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) { + panic("implement me") +} + +func (m *mockMetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) { + panic("implement me") +} + +func (m *mockMetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error) { + panic("implement me") +} +func (m *mockMetadataService) Version() (string, error) { + panic("implement me") } diff --git a/registry/servicediscovery/synthesizer/subscribed_urls_synthesizer.go b/registry/servicediscovery/synthesizer/subscribed_urls_synthesizer.go index a7d2f3ee98f20bf16437008135e4964195680abb..949a5822c237de413b59d35efe94f807975795cf 100644 --- a/registry/servicediscovery/synthesizer/subscribed_urls_synthesizer.go +++ b/registry/servicediscovery/synthesizer/subscribed_urls_synthesizer.go @@ -23,8 +23,8 @@ import ( ) type SubscribedURLsSynthesizer interface { - //Supports the synthesis of the subscribed url or not + // Supports the synthesis of the subscribed url or not Support(subscribedURL *common.URL) bool - //synthesize the subscribed url + // synthesize the subscribed url Synthesize(subscribedURL *common.URL, serviceInstances []registry.ServiceInstance) []common.URL } diff --git a/registry/servicediscovery/synthesizer/subscribed_urls_synthesizer_factory.go b/registry/servicediscovery/synthesizer/subscribed_urls_synthesizer_factory.go index f8c76f6e84eb2ceba47481d5f856f6885525f09c..ba7887223c4553a368f2f698bbb861ba8e10fe26 100644 --- a/registry/servicediscovery/synthesizer/subscribed_urls_synthesizer_factory.go +++ b/registry/servicediscovery/synthesizer/subscribed_urls_synthesizer_factory.go @@ -21,10 +21,12 @@ var ( synthesizers []SubscribedURLsSynthesizer ) +// nolint func AddSynthesizer(synthesizer SubscribedURLsSynthesizer) { synthesizers = append(synthesizers, synthesizer) } +// nolint func GetAllSynthesizer() []SubscribedURLsSynthesizer { return synthesizers } diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go index aff182fb84be765fb4cd04235e01261ef4bad16b..93da6402c759583ce7600c474910637e6ed77870 100644 --- a/remoting/etcdv3/client.go +++ b/remoting/etcdv3/client.go @@ -47,13 +47,12 @@ const ( ) var ( - // ErrNilETCDV3Client ... + // Defines related errors ErrNilETCDV3Client = perrors.New("etcd raw client is nil") // full describe the ERR - // ErrKVPairNotFound ... - ErrKVPairNotFound = perrors.New("k/v pair not found") + ErrKVPairNotFound = perrors.New("k/v pair not found") ) -// Options ... +// nolint type Options struct { name string endpoints []string @@ -62,38 +61,38 @@ type Options struct { heartbeat int // heartbeat second } -// Option ... +// Option will define a function of handling Options type Option func(*Options) -// WithEndpoints ... +// WithEndpoints sets etcd client endpoints func WithEndpoints(endpoints ...string) Option { return func(opt *Options) { opt.endpoints = endpoints } } -// WithName ... +// WithName sets etcd client name func WithName(name string) Option { return func(opt *Options) { opt.name = name } } -// WithTimeout ... +// WithTimeout sets etcd client timeout func WithTimeout(timeout time.Duration) Option { return func(opt *Options) { opt.timeout = timeout } } -// WithHeartbeat ... +// WithHeartbeat sets etcd client heartbeat func WithHeartbeat(heartbeat int) Option { return func(opt *Options) { opt.heartbeat = heartbeat } } -// ValidateClient ... +// ValidateClient validates client and sets options func ValidateClient(container clientFacade, opts ...Option) error { options := &Options{ @@ -153,7 +152,7 @@ func NewServiceDiscoveryClient(opts ...Option) *Client { return newClient } -// Client ... +// Client represents etcd client Configuration type Client struct { lock sync.RWMutex @@ -164,7 +163,7 @@ type Client struct { heartbeat int ctx context.Context // if etcd server connection lose, the ctx.Done will be sent msg - cancel context.CancelFunc // cancel the ctx, all watcher will stopped + cancel context.CancelFunc // cancel the ctx, all watcher will stopped rawClient *clientv3.Client exit chan struct{} @@ -228,7 +227,7 @@ func (c *Client) stop() bool { return false } -// Close ... +// nolint func (c *Client) Close() { if c == nil { @@ -287,8 +286,7 @@ func (c *Client) maintenanceStatusLoop(s *concurrency.Session) { } } -// if k not exist will put k/v in etcd -// if k is already exist in etcd, return nil +// if k not exist will put k/v in etcd, otherwise return nil func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error { c.lock.RLock() @@ -369,7 +367,7 @@ func (c *Client) get(k string) (string, error) { return string(resp.Kvs[0].Value), nil } -// CleanKV ... +// nolint func (c *Client) CleanKV() error { c.lock.RLock() @@ -469,12 +467,12 @@ func (c *Client) keepAliveKV(k string, v string) error { return nil } -// Done ... +// nolint func (c *Client) Done() <-chan struct{} { return c.exit } -// Valid ... +// nolint func (c *Client) Valid() bool { select { case <-c.exit: @@ -491,7 +489,7 @@ func (c *Client) Valid() bool { return true } -// Create ... +// nolint func (c *Client) Create(k string, v string) error { err := c.put(k, v) @@ -510,7 +508,7 @@ func (c *Client) Update(k, v string) error { return nil } -// Delete ... +// nolint func (c *Client) Delete(k string) error { err := c.delete(k) @@ -521,7 +519,7 @@ func (c *Client) Delete(k string) error { return nil } -// RegisterTemp ... +// RegisterTemp registers a temporary node func (c *Client) RegisterTemp(basePath string, node string) (string, error) { completeKey := path.Join(basePath, node) @@ -534,7 +532,7 @@ func (c *Client) RegisterTemp(basePath string, node string) (string, error) { return completeKey, nil } -// GetChildrenKVList ... +// GetChildrenKVList gets children kv list by @k func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) { kList, vList, err := c.getChildren(k) @@ -544,7 +542,7 @@ func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) { return kList, vList, nil } -// Get ... +// Get gets value by @k func (c *Client) Get(k string) (string, error) { v, err := c.get(k) @@ -555,7 +553,7 @@ func (c *Client) Get(k string) (string, error) { return v, nil } -// Watch ... +// Watch watches on spec key func (c *Client) Watch(k string) (clientv3.WatchChan, error) { wc, err := c.watch(k) @@ -565,7 +563,7 @@ func (c *Client) Watch(k string) (clientv3.WatchChan, error) { return wc, nil } -// WatchWithPrefix ... +// WatchWithPrefix watches on spec prefix func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) { wc, err := c.watchWithPrefix(prefix) diff --git a/remoting/etcdv3/facade.go b/remoting/etcdv3/facade.go index 35befc85e449ec02a6377faec300aa6b46bcc8bf..3f5999fdf3c5a0791d780e8f5521ef3ea51e9372 100644 --- a/remoting/etcdv3/facade.go +++ b/remoting/etcdv3/facade.go @@ -43,7 +43,7 @@ type clientFacade interface { common.Node } -// HandleClientRestart ... +// HandleClientRestart keeps the connection between client and server func HandleClientRestart(r clientFacade) { var ( diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go index e3cb74e4f676efa1f325ac45e32b21b39d1bbd6a..00b5b19b36d3baa8871efdd3d53e80f05d7aeac1 100644 --- a/remoting/etcdv3/listener.go +++ b/remoting/etcdv3/listener.go @@ -33,7 +33,7 @@ import ( "github.com/apache/dubbo-go/remoting" ) -// EventListener ... +// nolint type EventListener struct { client *Client keyMapLock sync.Mutex @@ -41,7 +41,7 @@ type EventListener struct { wg sync.WaitGroup } -// NewEventListener ... +// NewEventListener returns a EventListener instance func NewEventListener(client *Client) *EventListener { return &EventListener{ client: client, @@ -92,12 +92,10 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting. } } } - - return false } -// return true mean the event type is DELETE -// return false mean the event type is CREATE || UPDATE +// return true means the event type is DELETE +// return false means the event type is CREATE || UPDATE func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remoting.DataListener) bool { logger.Infof("got a etcd event {type: %s, key: %s}", event.Type, event.Kv.Key) @@ -135,7 +133,7 @@ func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remotin panic("unreachable") } -// ListenServiceNodeEventWithPrefix Listen on a set of key with spec prefix +// ListenServiceNodeEventWithPrefix listens on a set of key with spec prefix func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) { defer l.wg.Done() for { @@ -151,12 +149,12 @@ func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener logger.Warnf("etcd client stopped") return - // client ctx stop + // client ctx stop case <-l.client.ctx.Done(): logger.Warnf("etcd client ctx cancel") return - // etcd event stream + // etcd event stream case e, ok := <-wc: if !ok { @@ -230,7 +228,7 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis }(key) } -// Close ... +// nolint func (l *EventListener) Close() { l.wg.Wait() } diff --git a/remoting/kubernetes/client.go b/remoting/kubernetes/client.go index 240257dbf55028a203bf9d419da0698fbfa9f8a3..0a0548959a3e6d839321d03a627bb6aba66d8474 100644 --- a/remoting/kubernetes/client.go +++ b/remoting/kubernetes/client.go @@ -46,8 +46,7 @@ type Client struct { controller *dubboRegistryController } -// newClient -// new a client for registry +// newClient returns Client instance for registry func newClient(url common.URL) (*Client, error) { ctx, cancel := context.WithCancel(context.Background()) @@ -75,8 +74,7 @@ func newClient(url common.URL) (*Client, error) { return c, nil } -// Create -// create k/v pair in watcher-set +// Create creates k/v pair in watcher-set func (c *Client) Create(k, v string) error { // the read current pod must be lock, protect every @@ -92,8 +90,7 @@ func (c *Client) Create(k, v string) error { return nil } -// GetChildren -// get k children list from kubernetes-watcherSet +// GetChildren gets k children list from kubernetes-watcherSet func (c *Client) GetChildren(k string) ([]string, []string, error) { objectList, err := c.controller.watcherSet.Get(k, true) @@ -112,8 +109,7 @@ func (c *Client) GetChildren(k string) ([]string, []string, error) { return kList, vList, nil } -// Watch -// watch on spec key +// Watch watches on spec key func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) { w, err := c.controller.watcherSet.Watch(k, false) @@ -124,8 +120,7 @@ func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) return w.ResultChan(), w.done(), nil } -// Watch -// watch on spec prefix +// WatchWithPrefix watches on spec prefix func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan struct{}, error) { w, err := c.controller.watcherSet.Watch(prefix, true) @@ -136,9 +131,7 @@ func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan st return w.ResultChan(), w.done(), nil } -// Valid -// Valid the client -// if return false, the client is die +// if returns false, the client is die func (c *Client) Valid() bool { select { @@ -151,14 +144,12 @@ func (c *Client) Valid() bool { return c.controller != nil } -// Done -// read the client status +// nolint func (c *Client) Done() <-chan struct{} { return c.ctx.Done() } -// Stop -// read the client status +// nolint func (c *Client) Close() { select { @@ -174,8 +165,7 @@ func (c *Client) Close() { // so, just wait } -// ValidateClient -// validate the kubernetes client +// ValidateClient validates the kubernetes client func ValidateClient(container clientFacade) error { client := container.Client() @@ -194,8 +184,7 @@ func ValidateClient(container clientFacade) error { return nil } -// NewMockClient -// export for registry package test +// NewMockClient exports for registry package test func NewMockClient(podList *v1.PodList) (*Client, error) { ctx, cancel := context.WithCancel(context.Background()) diff --git a/remoting/kubernetes/listener.go b/remoting/kubernetes/listener.go index a5e7a544fadfc249426d34ce68081ab3d4b01bdb..a737f4e0d4eae7d78bb17c47e9c216661c8b9c86 100644 --- a/remoting/kubernetes/listener.go +++ b/remoting/kubernetes/listener.go @@ -45,8 +45,8 @@ func NewEventListener(client *Client) *EventListener { } // Listen on a spec key -// this method will return true when spec key deleted, -// this method will return false when deep layer connection lose +// this method returns true when spec key deleted, +// this method returns false when deep layer connection lose func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool { defer l.wg.Done() for { @@ -83,8 +83,8 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting. } } -// return true mean the event type is DELETE -// return false mean the event type is CREATE || UPDATE +// return true means the event type is DELETE +// return false means the event type is CREATE || UPDATE func (l *EventListener) handleEvents(event *WatcherEvent, listeners ...remoting.DataListener) bool { logger.Infof("got a kubernetes-watcherSet event {type: %d, key: %s}", event.EventType, event.Key) diff --git a/remoting/kubernetes/registry_controller.go b/remoting/kubernetes/registry_controller.go index a9ca9e4d01ac19285d50a0b295b519538adf39cb..f93a00a6f2df6022d0436f56e8c719f108be66f3 100644 --- a/remoting/kubernetes/registry_controller.go +++ b/remoting/kubernetes/registry_controller.go @@ -72,8 +72,7 @@ var ( ErrDubboLabelAlreadyExist = perrors.New("dubbo label already exist") ) -// dubboRegistryController -// work like a kubernetes controller +// dubboRegistryController works like a kubernetes controller type dubboRegistryController struct { // clone from client @@ -364,8 +363,7 @@ func (c *dubboRegistryController) processNextWorkItem() bool { return true } -// handleWatchedPodEvent -// handle watched pod event +// handleWatchedPodEvent handles watched pod event func (c *dubboRegistryController) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) { logger.Debugf("get @type = %s event from @pod = %s", eventType, p.GetName()) @@ -402,8 +400,7 @@ func (c *dubboRegistryController) handleWatchedPodEvent(p *v1.Pod, eventType wat } } -// unmarshalRecord -// unmarshal the kubernetes dubbo annotation value +// unmarshalRecord unmarshals the kubernetes dubbo annotation value func (c *dubboRegistryController) unmarshalRecord(record string) ([]*WatcherEvent, error) { if len(record) == 0 { @@ -453,8 +450,7 @@ func (c *dubboRegistryController) initCurrentPod() error { return nil } -// patch current pod -// write new meta for current pod +// patchCurrentPod writes new meta for current pod func (c *dubboRegistryController) patchCurrentPod(patch []byte) (*v1.Pod, error) { updatedPod, err := c.kc.CoreV1().Pods(c.namespace).Patch(c.name, types.StrategicMergePatchType, patch) if err != nil { @@ -463,7 +459,7 @@ func (c *dubboRegistryController) patchCurrentPod(patch []byte) (*v1.Pod, error) return updatedPod, nil } -// assemble the dubbo kubernetes label +// assembleDUBBOLabel assembles the dubbo kubernetes label // every dubbo instance should be labeled spec {"dubbo.io/label":"dubbo.io/label-value"} label func (c *dubboRegistryController) assembleDUBBOLabel(p *v1.Pod) (*v1.Pod, *v1.Pod, error) { var ( @@ -498,7 +494,7 @@ func (c *dubboRegistryController) assembleDUBBOLabel(p *v1.Pod) (*v1.Pod, *v1.Po return oldPod, newPod, nil } -// assemble the dubbo kubernetes annotations +// assembleDUBBOAnnotations assembles the dubbo kubernetes annotations // accord the current pod && (k,v) assemble the old-pod, new-pod func (c *dubboRegistryController) assembleDUBBOAnnotations(k, v string, currentPod *v1.Pod) (oldPod *v1.Pod, newPod *v1.Pod, err error) { @@ -528,8 +524,7 @@ func (c *dubboRegistryController) assembleDUBBOAnnotations(k, v string, currentP return } -// getPatch -// get the kubernetes pod patch bytes +// getPatch gets the kubernetes pod patch bytes func (c *dubboRegistryController) getPatch(oldPod, newPod *v1.Pod) ([]byte, error) { oldData, err := json.Marshal(oldPod) if err != nil { @@ -548,8 +543,7 @@ func (c *dubboRegistryController) getPatch(oldPod, newPod *v1.Pod) ([]byte, erro return patchBytes, nil } -// marshalRecord -// marshal the kubernetes dubbo annotation value +// marshalRecord marshals the kubernetes dubbo annotation value func (c *dubboRegistryController) marshalRecord(ol []*WatcherEvent) (string, error) { msg, err := json.Marshal(ol) if err != nil { @@ -558,7 +552,7 @@ func (c *dubboRegistryController) marshalRecord(ol []*WatcherEvent) (string, err return base64.URLEncoding.EncodeToString(msg), nil } -// read from kubernetes-env current pod status +// readCurrentPod reads from kubernetes-env current pod status func (c *dubboRegistryController) readCurrentPod() (*v1.Pod, error) { currentPod, err := c.kc.CoreV1().Pods(c.namespace).Get(c.name, metav1.GetOptions{}) if err != nil { @@ -567,7 +561,7 @@ func (c *dubboRegistryController) readCurrentPod() (*v1.Pod, error) { return currentPod, nil } -// add annotation for current pod +// addAnnotationForCurrentPod adds annotation for current pod func (c *dubboRegistryController) addAnnotationForCurrentPod(k string, v string) error { c.lock.Lock() diff --git a/remoting/kubernetes/watch.go b/remoting/kubernetes/watch.go index 3293ff1d923adb994ee9a7e9b9e79b6abb621195..07eeb09b4dd4627fdd3b18ee4d59356911b3a9b1 100644 --- a/remoting/kubernetes/watch.go +++ b/remoting/kubernetes/watch.go @@ -140,14 +140,12 @@ func (s *watcherSetImpl) Watch(key string, prefix bool) (Watcher, error) { return s.addWatcher(key, prefix) } -// Done -// get the watcher-set status +// Done gets the watcher-set status func (s *watcherSetImpl) Done() <-chan struct{} { return s.ctx.Done() } -// Put -// put the watch event to watcher-set +// Put puts the watch event to watcher-set func (s *watcherSetImpl) Put(watcherEvent *WatcherEvent) error { blockSendMsg := func(object *WatcherEvent, w *watcher) { @@ -243,8 +241,7 @@ func (s *watcherSetImpl) addWatcher(key string, prefix bool) (Watcher, error) { return w, nil } -// Get -// get elements from watcher-set +// Get gets elements from watcher-set func (s *watcherSetImpl) Get(key string, prefix bool) ([]*WatcherEvent, error) { s.lock.RLock() @@ -297,19 +294,17 @@ type watcher struct { exit chan struct{} } -// ResultChan +// nolint func (w *watcher) ResultChan() <-chan *WatcherEvent { return w.ch } -// ID -// the watcher's id +// nolint func (w *watcher) ID() string { return strconv.FormatUint(w.id, 10) } -// stop -// stop the watcher +// nolint func (w *watcher) stop() { // double close will panic @@ -318,14 +313,12 @@ func (w *watcher) stop() { }) } -// done -// check watcher status +// done checks watcher status func (w *watcher) done() <-chan struct{} { return w.exit } -// newWatcherSet -// new watcher set from parent context +// newWatcherSet returns new watcher set from parent context func newWatcherSet(ctx context.Context) WatcherSet { s := &watcherSetImpl{ ctx: ctx, diff --git a/remoting/listener.go b/remoting/listener.go index f7a3a2bd1662734919e093e1bd769223cd53447b..6cbb883181ff8ec1c9124f8d8cc3d7ec0920abd9 100644 --- a/remoting/listener.go +++ b/remoting/listener.go @@ -21,7 +21,7 @@ import ( "fmt" ) -// DataListener ... +// DataListener defines common data listener interface type DataListener interface { DataChange(eventType Event) bool //bool is return for interface implement is interesting } @@ -30,15 +30,15 @@ type DataListener interface { // event type ////////////////////////////////////////// -// SourceObjectEventType ... +// EventType means SourceObjectEventType type EventType int const ( - // EventTypeAdd ... + // EventTypeAdd means add event EventTypeAdd = iota - // EventTypeDel ... + // EventTypeDel means del event EventTypeDel - // EventTypeUpdate ... + // EventTypeUpdate means update event EventTypeUpdate ) @@ -56,7 +56,7 @@ func (t EventType) String() string { // service event ////////////////////////////////////////// -// Event ... +// Event defines common elements for service event type Event struct { Path string Action EventType diff --git a/remoting/nacos/builder.go b/remoting/nacos/builder.go index 545a1e268cabe5cef829ff1cf44ef40b1161d590..4319627c6da6a5a19874f188d9eba6b032645ff1 100644 --- a/remoting/nacos/builder.go +++ b/remoting/nacos/builder.go @@ -22,8 +22,6 @@ import ( "strconv" "strings" "time" - - "github.com/apache/dubbo-go/config" ) import ( @@ -37,16 +35,10 @@ import ( import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/config" ) -func NewNacosNamingClient(url *common.URL) (naming_client.INamingClient, error) { - nacosConfig, err := getNacosConfig(url) - if err != nil { - return nil, err - } - return clients.CreateNamingClient(nacosConfig) -} - +// NewNacosConfigClient read the config from url and build an instance func NewNacosConfigClient(url *common.URL) (config_client.IConfigClient, error) { nacosConfig, err := getNacosConfig(url) if err != nil { @@ -102,6 +94,7 @@ func getNacosConfig(url *common.URL) (map[string]interface{}, error) { return configMap, nil } +// NewNacosClient creates an instance with the config func NewNacosClient(rc *config.RemoteConfig) (naming_client.INamingClient, error) { if len(rc.Address) == 0 { return nil, perrors.New("nacos address is empty!") diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index f4aea5903d534a008e3c94bb04f127a67988132b..a3ef7a65569ff5219f25af83794e6a99fd972afe 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -47,7 +47,7 @@ var ( errNilNode = perrors.Errorf("node does not exist") ) -// ZookeeperClient ... +// ZookeeperClient represents zookeeper client Configuration type ZookeeperClient struct { name string ZkAddrs []string @@ -59,7 +59,7 @@ type ZookeeperClient struct { eventRegistry map[string][]*chan struct{} } -// StateToString ... +// nolint func StateToString(state zk.State) string { switch state { case zk.StateDisconnected: @@ -89,7 +89,7 @@ func StateToString(state zk.State) string { } } -// Options ... +// nolint type Options struct { zkName string client *ZookeeperClient @@ -97,17 +97,17 @@ type Options struct { ts *zk.TestCluster } -// Option ... +// Option will define a function of handling Options type Option func(*Options) -// WithZkName ... +// WithZkName sets zk client name func WithZkName(name string) Option { return func(opt *Options) { opt.zkName = name } } -// ValidateZookeeperClient ... +// ValidateZookeeperClient validates client and sets options func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error { var err error options := &Options{} @@ -187,14 +187,14 @@ func newZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (* return z, nil } -// WithTestCluster ... +// WithTestCluster sets test cluser for zk client func WithTestCluster(ts *zk.TestCluster) Option { return func(opt *Options) { opt.ts = ts } } -// NewMockZookeeperClient ... +// NewMockZookeeperClient returns a mock client instance func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option) (*zk.TestCluster, *ZookeeperClient, <-chan zk.Event, error) { var ( err error @@ -226,21 +226,15 @@ func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option) } } - //callbackChan := make(chan zk.Event) - //f := func(event zk.Event) { - // callbackChan <- event - //} - z.Conn, event, err = ts.ConnectWithOptions(timeout) if err != nil { return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect") } - //z.wait.Add(1) return ts, z, event, nil } -// HandleZkEvent ... +// HandleZkEvent handles zookeeper events func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) { var ( state int @@ -301,7 +295,7 @@ LOOP: } } -// RegisterEvent ... +// RegisterEvent registers zookeeper events func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) { if zkPath == "" || event == nil { return @@ -316,7 +310,7 @@ func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) { z.Unlock() } -// UnregisterEvent ... +// UnregisterEvent unregisters zookeeper events func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) { if zkPath == "" { return @@ -343,7 +337,7 @@ func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) { } } -// Done ... +// nolint func (z *ZookeeperClient) Done() <-chan struct{} { return z.exit } @@ -359,7 +353,7 @@ func (z *ZookeeperClient) stop() bool { return false } -// ZkConnValid ... +// ZkConnValid validates zookeeper connection func (z *ZookeeperClient) ZkConnValid() bool { select { case <-z.exit: @@ -377,7 +371,7 @@ func (z *ZookeeperClient) ZkConnValid() bool { return valid } -// Close ... +// nolint func (z *ZookeeperClient) Close() { if z == nil { return @@ -436,7 +430,7 @@ func (z *ZookeeperClient) CreateWithValue(basePath string, value []byte) error { return nil } -// Delete ... +// nolint func (z *ZookeeperClient) Delete(basePath string) error { var ( err error @@ -451,7 +445,7 @@ func (z *ZookeeperClient) Delete(basePath string) error { return perrors.WithMessagef(err, "Delete(basePath:%s)", basePath) } -// RegisterTemp ... +// RegisterTemp registers temporary node by @basePath and @node func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, error) { var ( err error @@ -468,7 +462,6 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, er tmpPath, err = conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) } - //if err != nil && err != zk.ErrNodeExists { if err != nil { logger.Warnf("conn.Create(\"%s\", zk.FlagEphemeral) = error(%v)\n", zkPath, perrors.WithStack(err)) return zkPath, perrors.WithStack(err) @@ -478,7 +471,7 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, er return tmpPath, nil } -// RegisterTempSeq ... +// RegisterTempSeq register temporary sequence node by @basePath and @data func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, error) { var ( err error @@ -507,7 +500,7 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, return tmpPath, nil } -// GetChildrenW ... +// GetChildrenW gets children watch by @path func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, error) { var ( err error @@ -542,7 +535,7 @@ func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, return children, watcher.EvtCh, nil } -// GetChildren ... +// GetChildren gets children by @path func (z *ZookeeperClient) GetChildren(path string) ([]string, error) { var ( err error @@ -573,7 +566,7 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) { return children, nil } -// ExistW ... +// ExistW to judge watch whether it exists or not by @zkPath func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) { var ( exist bool @@ -599,7 +592,7 @@ func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) { return watcher.EvtCh, nil } -// GetContent ... +// GetContent gets content by @zkPath func (z *ZookeeperClient) GetContent(zkPath string) ([]byte, *zk.Stat, error) { return z.Conn.Get(zkPath) } diff --git a/remoting/zookeeper/facade.go b/remoting/zookeeper/facade.go index 4e3945388ff402f60a02150615a8914f9cba2435..10de42523e731d0780ff7132f4655850409135aa 100644 --- a/remoting/zookeeper/facade.go +++ b/remoting/zookeeper/facade.go @@ -40,7 +40,7 @@ type zkClientFacade interface { common.Node } -// HandleClientRestart ... +// HandleClientRestart keeps the connection between client and server func HandleClientRestart(r zkClientFacade) { var ( err error diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index f9d57ba5c2276181bb551e8b8499d850b87d041a..a8ac55ba6c47115ae1d761a5d465f41925475342 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -37,7 +37,7 @@ import ( "github.com/apache/dubbo-go/remoting" ) -// ZkEventListener ... +// nolint type ZkEventListener struct { client *ZookeeperClient pathMapLock sync.Mutex @@ -45,7 +45,7 @@ type ZkEventListener struct { wg sync.WaitGroup } -// NewZkEventListener ... +// NewZkEventListener returns a EventListener instance func NewZkEventListener(client *ZookeeperClient) *ZkEventListener { return &ZkEventListener{ client: client, @@ -53,12 +53,12 @@ func NewZkEventListener(client *ZookeeperClient) *ZkEventListener { } } -// SetClient ... +// nolint func (l *ZkEventListener) SetClient(client *ZookeeperClient) { l.client = client } -// ListenServiceNodeEvent ... +// nolint func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool { defer l.wg.Done() var zkEvent zk.Event diff --git a/test/integrate/dubbo/go-client/client.go b/test/integrate/dubbo/go-client/client.go index c075ec22c3991aaea1b24ec4f59b3ab7e58520b4..4c62674d33dba7caca72ca7552e73c4c0fdf14c9 100644 --- a/test/integrate/dubbo/go-client/client.go +++ b/test/integrate/dubbo/go-client/client.go @@ -25,7 +25,7 @@ import ( import ( hessian "github.com/apache/dubbo-go-hessian2" - _ "github.com/apache/dubbo-go/event/proxy/proxy_factory" + _ "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/config" _ "github.com/apache/dubbo-go/protocol/dubbo" _ "github.com/apache/dubbo-go/registry/protocol" diff --git a/test/integrate/dubbo/go-server/server.go b/test/integrate/dubbo/go-server/server.go index 4cc6c490835d7ba29d139d71892b5e6e19d628e5..115bf0a4d78f171eb7f786808def91879ed93947 100644 --- a/test/integrate/dubbo/go-server/server.go +++ b/test/integrate/dubbo/go-server/server.go @@ -25,7 +25,7 @@ import ( hessian "github.com/apache/dubbo-go-hessian2" _ "github.com/apache/dubbo-go/cluster/cluster_impl" _ "github.com/apache/dubbo-go/cluster/loadbalance" - _ "github.com/apache/dubbo-go/event/proxy/proxy_factory" + _ "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/config" _ "github.com/apache/dubbo-go/filter/filter_impl" _ "github.com/apache/dubbo-go/protocol/dubbo"