diff --git a/common/constant/key.go b/common/constant/key.go index 5769ccfe6c8d2e296de190259d85821dd92dd8f4..324c0259e8a900c7bc13b9393c00c92c4f26d1e0 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -238,3 +238,9 @@ const ( // The default time window of circuit-tripped in millisecond if not specfied MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS = 30000 ) + +// service discovery + +const ( + NACOS_GROUP = "nacos.group" +) \ No newline at end of file diff --git a/common/extension/service_discovery.go b/common/extension/service_discovery.go new file mode 100644 index 0000000000000000000000000000000000000000..355e922d6bb1e0d897e6dc7a8f69e75498fb321a --- /dev/null +++ b/common/extension/service_discovery.go @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/registry" +) + +var( + discoveryCreatorMap = make(map[string]func(url *common.URL)(registry.ServiceDiscovery, error), 4) +) + +func SetServiceDiscovery(name string, creator func(url *common.URL)(registry.ServiceDiscovery, error)) { + discoveryCreatorMap[name] = creator +} + +func GetServiceDiscovery(name string, url *common.URL) (registry.ServiceDiscovery, error) { + creator, ok := discoveryCreatorMap[name] + if !ok { + panic("Could not find the service discovery with name: " + name) + } + return creator(url) +} diff --git a/registry/event_listener.go b/registry/event_listener.go index 810420319b8cb83edeb82203142d7207a5a6bb6b..cca32382159c8caf3feaf4e7a6d32cb78420b216 100644 --- a/registry/event_listener.go +++ b/registry/event_listener.go @@ -38,4 +38,6 @@ type ConditionalEventListener interface { // TODO (implement ConditionalEventListener) type ServiceInstancesChangedListener struct { + ServiceName string } + diff --git a/registry/nacos/base_registry.go b/registry/nacos/base_registry.go new file mode 100644 index 0000000000000000000000000000000000000000..b6ce09bc715b3b0775820629ff8d07863dbd6675 --- /dev/null +++ b/registry/nacos/base_registry.go @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "net" + "strconv" + "strings" + "time" + + "github.com/nacos-group/nacos-sdk-go/clients" + "github.com/nacos-group/nacos-sdk-go/clients/naming_client" + nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant" + perrors "github.com/pkg/errors" + + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" +) + +// baseRegistry is the parent of both interface-level registry +// and service discovery(related to application-level registry) +type baseRegistry struct { + *common.URL + namingClient naming_client.INamingClient +} + +func newBaseRegistry(url *common.URL) (baseRegistry, error) { + nacosConfig, err := getNacosConfig(url) + if err != nil { + return baseRegistry{}, err + } + client, err := clients.CreateNamingClient(nacosConfig) + if err != nil { + return baseRegistry{}, err + } + registry := baseRegistry{ + URL: url, + namingClient: client, + } + return registry, nil +} + +func getNacosConfig(url *common.URL) (map[string]interface{}, error) { + if url == nil { + return nil, perrors.New("url is empty!") + } + if len(url.Location) == 0 { + return nil, perrors.New("url.location is empty!") + } + configMap := make(map[string]interface{}, 2) + + addresses := strings.Split(url.Location, ",") + serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses)) + for _, addr := range addresses { + ip, portStr, err := net.SplitHostPort(addr) + if err != nil { + return nil, perrors.WithMessagef(err, "split [%s] ", addr) + } + port, _ := strconv.Atoi(portStr) + serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{ + IpAddr: ip, + Port: uint64(port), + }) + } + configMap["serverConfigs"] = serverConfigs + + var clientConfig nacosConstant.ClientConfig + timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) + if err != nil { + return nil, err + } + clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000) + clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs + clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "") + clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "") + clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "") + clientConfig.NotLoadCacheAtStart = true + configMap["clientConfig"] = clientConfig + + return configMap, nil +} diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go index 965e91e894ac61562bfd25c8f564f789afd6c8a1..e7e6b39bd7a818ffde4a439cae568049cd5d83e4 100644 --- a/registry/nacos/registry.go +++ b/registry/nacos/registry.go @@ -19,7 +19,6 @@ package nacos import ( "bytes" - "net" "strconv" "strings" "time" @@ -27,9 +26,6 @@ import ( import ( gxnet "github.com/dubbogo/gost/net" - "github.com/nacos-group/nacos-sdk-go/clients" - "github.com/nacos-group/nacos-sdk-go/clients/naming_client" - nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant" "github.com/nacos-group/nacos-sdk-go/vo" perrors "github.com/pkg/errors" ) @@ -57,64 +53,17 @@ func init() { } type nacosRegistry struct { - *common.URL - namingClient naming_client.INamingClient -} - -func getNacosConfig(url *common.URL) (map[string]interface{}, error) { - if url == nil { - return nil, perrors.New("url is empty!") - } - if len(url.Location) == 0 { - return nil, perrors.New("url.location is empty!") - } - configMap := make(map[string]interface{}, 2) - - addresses := strings.Split(url.Location, ",") - serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses)) - for _, addr := range addresses { - ip, portStr, err := net.SplitHostPort(addr) - if err != nil { - return nil, perrors.WithMessagef(err, "split [%s] ", addr) - } - port, _ := strconv.Atoi(portStr) - serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{ - IpAddr: ip, - Port: uint64(port), - }) - } - configMap["serverConfigs"] = serverConfigs - - var clientConfig nacosConstant.ClientConfig - timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) - if err != nil { - return nil, err - } - clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000) - clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs - clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "") - clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "") - clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "") - clientConfig.NotLoadCacheAtStart = true - configMap["clientConfig"] = clientConfig - - return configMap, nil + baseRegistry } func newNacosRegistry(url *common.URL) (registry.Registry, error) { - nacosConfig, err := getNacosConfig(url) + base, err := newBaseRegistry(url) if err != nil { - return nil, err + return nil, perrors.WithStack(err) } - client, err := clients.CreateNamingClient(nacosConfig) - if err != nil { - return nil, err - } - registry := nacosRegistry{ - URL: url, - namingClient: client, - } - return ®istry, nil + return &nacosRegistry{ + base, + }, nil } func getCategory(url common.URL) string { @@ -234,4 +183,4 @@ func (nr *nacosRegistry) IsAvailable() bool { func (nr *nacosRegistry) Destroy() { return -} +} \ No newline at end of file diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go new file mode 100644 index 0000000000000000000000000000000000000000..9f22b8fab868c03334e94fcd0d41c10838ce34a9 --- /dev/null +++ b/registry/nacos/service_discovery.go @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "github.com/dubbogo/gost/container/set" + "github.com/dubbogo/gost/page" + "github.com/nacos-group/nacos-sdk-go/model" + "github.com/nacos-group/nacos-sdk-go/vo" + perrors "github.com/pkg/errors" + + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/registry" +) + +const ( + defaultGroup = "DEFAULT_GROUP" + idKey = "id" +) + +func init() { + extension.SetServiceDiscovery(constant.NACOS_KEY, newNacosServiceDiscovery) +} + +// nacosServiceDiscovery is the implementation of service discovery based on nacos. +// There is a problem, the go client for nacos does not support the id field. +// we will use the metadata to store the id of ServiceInstance +type nacosServiceDiscovery struct { + baseRegistry + group string +} + +// Destroy will close the service discovery. +// Actually, it only marks the naming client as null and then return +func (n *nacosServiceDiscovery) Destroy() error { + n.namingClient = nil + return nil +} + +// Register will register the service to nacos +func (n *nacosServiceDiscovery) Register(instance registry.ServiceInstance) error { + ins := n.toRegisterInstance(instance) + ok, err := n.namingClient.RegisterInstance(ins) + if err != nil || !ok { + return perrors.WithMessage(err, "Could not register the instance. "+instance.GetServiceName()) + } + return nil +} + +// Update will update the information +// However, because nacos client doesn't support the update API, +// so we should unregister the instance and then register it again. +// the error handling is hard to implement +func (n *nacosServiceDiscovery) Update(instance registry.ServiceInstance) error { + // The + err := n.Unregister(instance) + if err != nil { + return perrors.WithStack(err) + } + return n.Register(instance) +} + +// Unregister will unregister the instance +func (n *nacosServiceDiscovery) Unregister(instance registry.ServiceInstance) error { + ok, err := n.namingClient.DeregisterInstance(n.toDeregisterInstance(instance)) + if err != nil || !ok { + return perrors.WithMessage(err, "Could not unregister the instance. "+instance.GetServiceName()) + } + return nil +} + +// GetDefaultPageSize will return the constant registry.DefaultPageSize +func (n *nacosServiceDiscovery) GetDefaultPageSize() int { + return registry.DefaultPageSize +} + +// GetServices will return the all services +func (n *nacosServiceDiscovery) GetServices() *gxset.HashSet { + services, err := n.namingClient.GetAllServicesInfo(vo.GetAllServiceInfoParam{ + GroupName: n.group, + }) + + res := gxset.NewSet() + if err != nil { + logger.Errorf("Could not query the services: %v", err) + return res + } + + for _, e := range services { + res.Add(e) + } + return res +} + +// GetInstances will return the instances of serviceName and the group +func (n *nacosServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { + instances, err := n.namingClient.SelectAllInstances(vo.SelectAllInstancesParam{ + ServiceName: serviceName, + GroupName: n.group, + }) + if err != nil { + logger.Errorf("Could not query the instances for service: " + serviceName + ", group: " + n.group) + return make([]registry.ServiceInstance, 0, 0) + } + res := make([]registry.ServiceInstance, 0, len(instances)) + for _, ins := range instances { + metadata := ins.Metadata + id := metadata[idKey] + + delete(metadata, idKey) + + res = append(res, ®istry.DefaultServiceInstance{ + Id: id, + ServiceName: ins.ServiceName, + Host: ins.Ip, + Port: int(ins.Port), + Enable: ins.Enable, + Healthy: ins.Healthy, + Metadata: metadata, + }) + } + + return res +} + +// GetInstancesByPage will return the instances +// Due to nacos client does not support pagination, so we have to query all instances and then return part of them +func (n *nacosServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { + all := n.GetInstances(serviceName) + res := make([]interface{}, 0, pageSize) + // could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance + for i := offset; i < len(all) && i < offset+pageSize; i++ { + res = append(res, all[i]) + } + return gxpage.New(offset, pageSize, res, len(all)) +} + +// GetHealthyInstancesByPage will return the instance +// The nacos client has an API SelectInstances, which has a parameter call HealthyOnly. +// However, the healthy parameter in this method maybe false. So we can not use that API. +// Thus, we must query all instances and then do filter +func (n *nacosServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { + all := n.GetInstances(serviceName) + res := make([]interface{}, 0, pageSize) + // could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance + var ( + i = offset + count = 0 + ) + for ; i < len(all) && count < pageSize; { + ins := all[i] + if ins.IsHealthy() == healthy { + res = append(res, all[i]) + count++ + } + i++ + } + return gxpage.New(offset, pageSize, res, len(all)) +} + +// GetRequestInstances will return the instances +// The nacos client doesn't have batch API, so we should query those serviceNames one by one. +func (n *nacosServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { + res := make(map[string]gxpage.Pager, len(serviceNames)) + for _, name := range serviceNames { + res[name] = n.GetInstancesByPage(name, offset, requestedSize) + } + return res +} + +// AddListener will add a listener +func (n *nacosServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { + // return n.namingClient.Subscribe(&vo.SubscribeParam{ + // ServiceName:listener.ServiceName, + // SubscribeCallback: func(services []model.SubscribeService, err error) { + // services[0].InstanceId + // n.DispatchEventForInstances() + // }, + // }) +} + +func (n *nacosServiceDiscovery) DispatchEventByServiceName(serviceName string) error { + panic("implement me") +} + +func (n *nacosServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { + panic("implement me") +} + +func (n *nacosServiceDiscovery) DispatchEvent(event registry.ServiceInstancesChangedEvent) error { + panic("implement me") +} + +func (n *nacosServiceDiscovery) toRegisterInstance(instance registry.ServiceInstance) vo.RegisterInstanceParam { + metadata := instance.GetMetadata() + metadata[idKey] = instance.GetId() + return vo.RegisterInstanceParam{ + ServiceName: instance.GetServiceName(), + Ip: instance.GetHost(), + Port: uint64(instance.GetPort()), + Metadata: metadata, + Enable: instance.IsEnable(), + Healthy: instance.IsHealthy(), + GroupName: n.group, + } +} + +func (n *nacosServiceDiscovery) toDeregisterInstance(instance registry.ServiceInstance) vo.DeregisterInstanceParam { + return vo.DeregisterInstanceParam{ + ServiceName: instance.GetServiceName(), + Ip: instance.GetHost(), + Port: uint64(instance.GetPort()), + GroupName: n.group, + } +} + +func newNacosServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { + base, err := newBaseRegistry(url) + if err != nil { + return nil, perrors.WithStack(err) + } + return &nacosServiceDiscovery{ + baseRegistry: base, + group: url.GetParam(constant.NACOS_GROUP, defaultGroup), + }, nil +} diff --git a/registry/service_discovery.go b/registry/service_discovery.go index 5577f52930c7639afca851f7390779ff6e690c6e..c38d48d44e2373f3ba15eb2fcc0a64bab484fd81 100644 --- a/registry/service_discovery.go +++ b/registry/service_discovery.go @@ -26,19 +26,13 @@ import ( gxpage "github.com/dubbogo/gost/page" ) -import ( - "github.com/apache/dubbo-go/common" -) +const DefaultPageSize = 100 type ServiceDiscovery interface { fmt.Stringer // ----------------- lifecycle ------------------- - // Initialize will initialize the service discovery instance - // if initialize failed, it will return the error - Initialize(url common.URL) error - // Destroy will destroy the service discovery. // If the discovery cannot be destroy, it will return an error. Destroy() error @@ -59,7 +53,7 @@ type ServiceDiscovery interface { GetDefaultPageSize() int // GetServices will return the all service names. - GetServices() gxset.HashSet + GetServices() *gxset.HashSet // GetInstances will return all service instances with serviceName GetInstances(serviceName string) []ServiceInstance @@ -89,3 +83,5 @@ type ServiceDiscovery interface { // DispatchEvent dispatches the event DispatchEvent(event ServiceInstancesChangedEvent) error } + + diff --git a/registry/service_instance.go b/registry/service_instance.go index d27ed8accfc4a415dc9474b22433a62779b59662..2cc229ee3b056da2d9f1a1b70d3e0f5858c9da5f 100644 --- a/registry/service_instance.go +++ b/registry/service_instance.go @@ -29,8 +29,7 @@ type ServiceInstance interface { GetHost() string // GetPort will return the port. - // if the port is not present, return error - GetPort() (int, error) + GetPort() int // IsEnable will return the enable status of this instance IsEnable() bool @@ -41,3 +40,50 @@ type ServiceInstance interface { // GetMetadata will return the metadata GetMetadata() map[string]string } + +// DefaultServiceInstance the default implementation of ServiceInstance +// or change the ServiceInstance to be struct??? +type DefaultServiceInstance struct { + Id string + ServiceName string + Host string + Port int + Enable bool + Healthy bool + Metadata map[string]string +} + +// GetId will return this instance's id. It should be unique. +func (d *DefaultServiceInstance) GetId() string { + return d.Id +} + +// GetServiceName will return the serviceName +func (d *DefaultServiceInstance) GetServiceName() string { + return d.ServiceName +} + +// GetHost will return the hostname +func (d *DefaultServiceInstance) GetHost() string { + return d.Host +} + +// GetPort will return the port. +func (d *DefaultServiceInstance) GetPort() int { + return d.Port +} + +// IsEnable will return the enable status of this instance +func (d *DefaultServiceInstance) IsEnable() bool { + return d.Enable +} + +// IsHealthy will return the value represent the instance whether healthy or not +func (d *DefaultServiceInstance) IsHealthy() bool { + return d.Healthy +} + +// GetMetadata will return the metadata +func (d *DefaultServiceInstance) GetMetadata() map[string]string { + return d.Metadata +}