diff --git a/common/constant/key.go b/common/constant/key.go index 324c0259e8a900c7bc13b9393c00c92c4f26d1e0..9f2fe14786d5704e9fbc00b925c12e5ad916e122 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -243,4 +243,4 @@ const ( const ( NACOS_GROUP = "nacos.group" -) \ No newline at end of file +) diff --git a/common/extension/service_discovery.go b/common/extension/service_discovery.go index 355e922d6bb1e0d897e6dc7a8f69e75498fb321a..4398dec39a8077f03faf51589ac3b9371b071dc4 100644 --- a/common/extension/service_discovery.go +++ b/common/extension/service_discovery.go @@ -22,11 +22,11 @@ import ( "github.com/apache/dubbo-go/registry" ) -var( - discoveryCreatorMap = make(map[string]func(url *common.URL)(registry.ServiceDiscovery, error), 4) +var ( + discoveryCreatorMap = make(map[string]func(url *common.URL) (registry.ServiceDiscovery, error), 4) ) -func SetServiceDiscovery(name string, creator func(url *common.URL)(registry.ServiceDiscovery, error)) { +func SetServiceDiscovery(name string, creator func(url *common.URL) (registry.ServiceDiscovery, error)) { discoveryCreatorMap[name] = creator } diff --git a/registry/event.go b/registry/event.go index 0500cc70188b5cf59a2058fee224106fea06001a..da66ccb33dda6eac197d50ba0a2b908b9069a837 100644 --- a/registry/event.go +++ b/registry/event.go @@ -86,6 +86,22 @@ func newBaseEvent(source interface{}) *baseEvent { // ServiceInstancesChangedEvent represents service instances make some changing type ServiceInstancesChangedEvent struct { - fmt.Stringer baseEvent + ServiceName string + Instances []ServiceInstance +} + +func (s *ServiceInstancesChangedEvent) String() string { + return fmt.Sprintf("ServiceInstancesChangedEvent[source=%s]", s.ServiceName) +} + +func NewServiceInstancesChangedEvent(serviceName string, instances []ServiceInstance) *ServiceInstancesChangedEvent { + return &ServiceInstancesChangedEvent{ + baseEvent: baseEvent{ + source: serviceName, + timestamp: time.Now(), + }, + ServiceName: serviceName, + Instances: instances, + } } diff --git a/registry/event_listener.go b/registry/event_listener.go index cca32382159c8caf3feaf4e7a6d32cb78420b216..b8d6148442d9e10e210958dead690c4a95b33fb6 100644 --- a/registry/event_listener.go +++ b/registry/event_listener.go @@ -40,4 +40,3 @@ type ConditionalEventListener interface { type ServiceInstancesChangedListener struct { ServiceName string } - diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go index e7e6b39bd7a818ffde4a439cae568049cd5d83e4..697cbbeb83971e28f44044d016362e20f78d2bd7 100644 --- a/registry/nacos/registry.go +++ b/registry/nacos/registry.go @@ -183,4 +183,4 @@ func (nr *nacosRegistry) IsAvailable() bool { func (nr *nacosRegistry) Destroy() { return -} \ No newline at end of file +} diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go index 9f22b8fab868c03334e94fcd0d41c10838ce34a9..2f8eb7706fd50bf7884556f8e230f8425038aa14 100644 --- a/registry/nacos/service_discovery.go +++ b/registry/nacos/service_discovery.go @@ -165,7 +165,7 @@ func (n *nacosServiceDiscovery) GetHealthyInstancesByPage(serviceName string, of i = offset count = 0 ) - for ; i < len(all) && count < pageSize; { + for i < len(all) && count < pageSize { ins := all[i] if ins.IsHealthy() == healthy { res = append(res, all[i]) @@ -188,25 +188,51 @@ func (n *nacosServiceDiscovery) GetRequestInstances(serviceNames []string, offse // AddListener will add a listener func (n *nacosServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { - // return n.namingClient.Subscribe(&vo.SubscribeParam{ - // ServiceName:listener.ServiceName, - // SubscribeCallback: func(services []model.SubscribeService, err error) { - // services[0].InstanceId - // n.DispatchEventForInstances() - // }, - // }) + return n.namingClient.Subscribe(&vo.SubscribeParam{ + ServiceName: listener.ServiceName, + SubscribeCallback: func(services []model.SubscribeService, err error) { + if err != nil { + logger.Errorf("Could not handle the subscribe notification because the err is not nil."+ + " service name: %s, err: %v", listener.ServiceName, err) + } + instances := make([]registry.ServiceInstance, 0, len(services)) + for _, service := range services { + // we won't use the nacos instance id here but use our instance id + metadata := service.Metadata + id := metadata[idKey] + + delete(metadata, idKey) + + instances = append(instances, ®istry.DefaultServiceInstance{ + Id: id, + ServiceName: service.ServiceName, + Host: service.Ip, + Port: int(service.Port), + Enable: service.Enable, + Healthy: true, + Metadata: metadata, + }) + } + + e := n.DispatchEventForInstances(listener.ServiceName, instances) + if e != nil { + logger.Errorf("Dispatching event got exception, service name: %s, err: %v", listener.ServiceName, err) + } + }, + }) } func (n *nacosServiceDiscovery) DispatchEventByServiceName(serviceName string) error { - panic("implement me") + return n.DispatchEventForInstances(serviceName, n.GetInstances(serviceName)) } func (n *nacosServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { - panic("implement me") + return n.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances)) } -func (n *nacosServiceDiscovery) DispatchEvent(event registry.ServiceInstancesChangedEvent) error { - panic("implement me") +func (n *nacosServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { + // TODO(waiting for event dispatcher, another task) + return nil } func (n *nacosServiceDiscovery) toRegisterInstance(instance registry.ServiceInstance) vo.RegisterInstanceParam { diff --git a/registry/nacos/service_discovery_test.go b/registry/nacos/service_discovery_test.go new file mode 100644 index 0000000000000000000000000000000000000000..dbe9b88fe0ac42993132f3ca00114105f80f5afd --- /dev/null +++ b/registry/nacos/service_discovery_test.go @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "net/url" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" +) + +func TestNacosServiceDiscovery_Destroy(t *testing.T) { + serviceDiscovry, err := extension.GetServiceDiscovery(constant.NACOS_KEY, mockUrl()) + assert.Nil(t, err) + assert.NotNil(t, serviceDiscovry) +} + +func mockUrl() *common.URL { + urlMap := url.Values{} + urlMap.Set(constant.GROUP_KEY, "guangzhou-idc") + urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)) + urlMap.Set(constant.INTERFACE_KEY, "com.ikurento.user.UserProvider") + urlMap.Set(constant.VERSION_KEY, "1.0.0") + urlMap.Set(constant.CLUSTER_KEY, "mock") + url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"})) + return &url +} diff --git a/registry/service_discovery.go b/registry/service_discovery.go index c38d48d44e2373f3ba15eb2fcc0a64bab484fd81..a8228a4abe8ed07e3c5afda300702f778daea4ae 100644 --- a/registry/service_discovery.go +++ b/registry/service_discovery.go @@ -72,6 +72,7 @@ type ServiceDiscovery interface { // ----------------- event ---------------------- // AddListener adds a new ServiceInstancesChangedListener + // see addServiceInstancesChangedListener in Java AddListener(listener *ServiceInstancesChangedListener) error // DispatchEventByServiceName dispatches the ServiceInstancesChangedEvent to service instance whose name is serviceName @@ -81,7 +82,5 @@ type ServiceDiscovery interface { DispatchEventForInstances(serviceName string, instances []ServiceInstance) error // DispatchEvent dispatches the event - DispatchEvent(event ServiceInstancesChangedEvent) error + DispatchEvent(event *ServiceInstancesChangedEvent) error } - -