diff --git a/common/constant/key.go b/common/constant/key.go index b7fc4ad846b80ea8a5d2c846c8a0baeace0e75f7..24bd2d20c32deee11bb1102077617bfc2be5c716 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -86,6 +86,7 @@ const ( ) const ( + NACOS_KEY = "nacos" NACOS_DEFAULT_ROLETYPE = 3 NACOS_CACHE_DIR_KEY = "cacheDir" NACOS_LOG_DIR_KEY = "logDir" diff --git a/registry/nacos/listener.go b/registry/nacos/listener.go index 2d851ecf63e7f26c940389461bd151f966d28a78..b45c3bed58c474acfffea8755ac47b10c497f28c 100644 --- a/registry/nacos/listener.go +++ b/registry/nacos/listener.go @@ -24,17 +24,22 @@ import ( ) type nacosListener struct { - sync.Mutex namingClient naming_client.INamingClient listenUrl common.URL events chan *remoting.ConfigChangeEvent hostMapInstance map[string]model.Instance + cacheLock sync.Mutex done chan struct{} subscribeParam *vo.SubscribeParam } func NewNacosListener(url common.URL, namingClient naming_client.INamingClient) (*nacosListener, error) { - listener := &nacosListener{namingClient: namingClient, listenUrl: url, events: make(chan *remoting.ConfigChangeEvent, 32), hostMapInstance: map[string]model.Instance{}, done: make(chan struct{})} + listener := &nacosListener{ + namingClient: namingClient, + listenUrl: url, events: make(chan *remoting.ConfigChangeEvent, 32), + hostMapInstance: map[string]model.Instance{}, + done: make(chan struct{}), + } err := listener.startListen() return listener, err } @@ -60,15 +65,15 @@ func generateUrl(instance model.Instance) *common.URL { } path := instance.Metadata["path"] myInterface := instance.Metadata["interface"] - if path == "" && myInterface == "" { + if len(path) == 0 && len(myInterface) == 0 { logger.Errorf("nacos instance metadata does not have both path key and interface key,instance:%+v", instance) return nil } - if path == "" && myInterface != "" { + if len(path) == 0 && len(myInterface) != 0 { path = "/" + myInterface } protocol := instance.Metadata["protocol"] - if protocol == "" { + if len(protocol) == 0 { logger.Errorf("nacos instance metadata does not have protocol key,instance:%+v", instance) return nil } @@ -76,65 +81,66 @@ func generateUrl(instance model.Instance) *common.URL { for k, v := range instance.Metadata { urlMap.Set(k, v) } - return common.NewURLWithOptions(common.WithIp(instance.Ip), common.WithPort(strconv.Itoa(int(instance.Port))), common.WithProtocol(protocol), common.WithParams(urlMap), common.WithPath(path)) + return common.NewURLWithOptions(common.WithIp(instance.Ip), common.WithPort(strconv.Itoa(int(instance.Port))), + common.WithProtocol(protocol), common.WithParams(urlMap), common.WithPath(path)) } func (nl *nacosListener) Callback(services []model.SubscribeService, err error) { if err != nil { - logger.Errorf("nacos subscribe callback error:%s ", err.Error()) + logger.Errorf("nacos subscribe callback error:%s , subscribe:%+v ", err.Error(), nl.subscribeParam) return } - nl.Lock() - defer nl.Unlock() - var addInstances []model.Instance - var delInstances []model.Instance - var updateInstances []model.Instance + nl.cacheLock.Lock() + defer nl.cacheLock.Unlock() + addInstances := make([]model.Instance, 0, len(services)) + delInstances := make([]model.Instance, 0, len(services)) + updateInstances := make([]model.Instance, 0, len(services)) - newInstanceMap := map[string]model.Instance{} + newInstanceMap := make(map[string]model.Instance, len(services)) - for _, s := range services { - if !s.Enable || !s.Valid { - //瀹炰緥涓嶅彲浠ョ敤 + for i := range services { + if !services[i].Enable || !services[i].Valid { + // instance is not available,so ignore it continue } - host := s.Ip + ":" + strconv.Itoa(int(s.Port)) - instance := generateInstance(s) + host := services[i].Ip + ":" + strconv.Itoa(int(services[i].Port)) + instance := generateInstance(services[i]) newInstanceMap[host] = instance if old, ok := nl.hostMapInstance[host]; !ok { - //鏂板瀹炰緥鑺傜偣 + //instance is not exsit in cache,add it to cache addInstances = append(addInstances, instance) } else { - //瀹炰緥鏇存柊 + //instance is not different from cache,update it to cache if !reflect.DeepEqual(old, instance) { updateInstances = append(updateInstances, instance) } } } - //鍒ゆ柇鏃х殑瀹炰緥鏄惁鍦ㄦ柊瀹炰緥鍒楄〃涓紝涓嶅瓨鍦ㄥ垯浠h〃瀹炰緥宸蹭笅绾� for host, inst := range nl.hostMapInstance { if _, ok := newInstanceMap[host]; !ok { + //cache instance is not exsit in new instance list, remove it from cache delInstances = append(delInstances, inst) } } nl.hostMapInstance = newInstanceMap - for _, add := range addInstances { - newUrl := generateUrl(add) + for i := range addInstances { + newUrl := generateUrl(addInstances[i]) if newUrl != nil { nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EventTypeAdd}) } } - for _, del := range delInstances { - newUrl := generateUrl(del) + for i := range delInstances { + newUrl := generateUrl(delInstances[i]) if newUrl != nil { nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EventTypeDel}) } } - for _, update := range updateInstances { - newUrl := generateUrl(update) + for i := range updateInstances { + newUrl := generateUrl(updateInstances[i]) if newUrl != nil { nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EvnetTypeUpdate}) } @@ -172,7 +178,7 @@ func (nl *nacosListener) Next() (*registry.ServiceEvent, error) { for { select { case <-nl.done: - logger.Warnf("nacos listener is close!") + logger.Warnf("nacos listener is close!listenUrl:%+v", nl.listenUrl) return nil, perrors.New("listener stopped") case e := <-nl.events: diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go index 73656e25a04e49fd9c208e46932d345fa0d25cab..f10e230bc44dba8f8007ea659db3747a237c9de1 100644 --- a/registry/nacos/registry.go +++ b/registry/nacos/registry.go @@ -29,7 +29,7 @@ var ( func init() { localIP, _ = utils.GetLocalIP() - extension.SetRegistry("nacos", newNacosRegistry) + extension.SetRegistry(constant.NACOS_KEY, newNacosRegistry) } type nacosRegistry struct { @@ -41,13 +41,13 @@ func getNacosConfig(url *common.URL) (map[string]interface{}, error) { if url == nil { return nil, perrors.New("url is empty!") } - if url.Location == "" { + if len(url.Location) == 0 { return nil, perrors.New("url.location is empty!") } - configMap := make(map[string]interface{}) + configMap := make(map[string]interface{}, 2) - var serverConfigs []nacosConstant.ServerConfig addresses := strings.Split(url.Location, ",") + serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses)) for _, addr := range addresses { ip, portStr, err := net.SplitHostPort(addr) if err != nil { @@ -119,17 +119,17 @@ func appendParam(target *bytes.Buffer, url common.URL, key string) { func createRegisterParam(url common.URL, serviceName string) vo.RegisterInstanceParam { category := getCategory(url) - params := map[string]string{} + params := make(map[string]string, len(url.Params)+3) for k, _ := range url.Params { params[k] = url.Params.Get(k) } params[constant.NACOS_CATEGORY_KEY] = category params[constant.NACOS_PROTOCOL_KEY] = url.Protocol params[constant.NACOS_PATH_KEY] = url.Path - if url.Ip == "" { + if len(url.Ip) == 0 { url.Ip = localIP } - if url.Port == "" || url.Port == "0" { + if len(url.Port) == 0 || url.Port == "0" { url.Port = "80" } port, _ := strconv.Atoi(url.Port) @@ -154,7 +154,7 @@ func (nr *nacosRegistry) Register(url common.URL) error { return err } if !isRegistry { - return perrors.New("registry to nacos failed") + return perrors.New("registry [" + serviceName + "] to nacos failed") } return nil }