Skip to content
Snippets Groups Projects
Commit 5d51d2a0 authored by lizhipeng1's avatar lizhipeng1
Browse files

coding standard

parent 8bdc29f7
No related branches found
No related tags found
No related merge requests found
......@@ -86,6 +86,7 @@ const (
)
const (
NACOS_KEY = "nacos"
NACOS_DEFAULT_ROLETYPE = 3
NACOS_CACHE_DIR_KEY = "cacheDir"
NACOS_LOG_DIR_KEY = "logDir"
......
......@@ -24,17 +24,22 @@ import (
)
type nacosListener struct {
sync.Mutex
namingClient naming_client.INamingClient
listenUrl common.URL
events chan *remoting.ConfigChangeEvent
hostMapInstance map[string]model.Instance
cacheLock sync.Mutex
done chan struct{}
subscribeParam *vo.SubscribeParam
}
func NewNacosListener(url common.URL, namingClient naming_client.INamingClient) (*nacosListener, error) {
listener := &nacosListener{namingClient: namingClient, listenUrl: url, events: make(chan *remoting.ConfigChangeEvent, 32), hostMapInstance: map[string]model.Instance{}, done: make(chan struct{})}
listener := &nacosListener{
namingClient: namingClient,
listenUrl: url, events: make(chan *remoting.ConfigChangeEvent, 32),
hostMapInstance: map[string]model.Instance{},
done: make(chan struct{}),
}
err := listener.startListen()
return listener, err
}
......@@ -60,15 +65,15 @@ func generateUrl(instance model.Instance) *common.URL {
}
path := instance.Metadata["path"]
myInterface := instance.Metadata["interface"]
if path == "" && myInterface == "" {
if len(path) == 0 && len(myInterface) == 0 {
logger.Errorf("nacos instance metadata does not have both path key and interface key,instance:%+v", instance)
return nil
}
if path == "" && myInterface != "" {
if len(path) == 0 && len(myInterface) != 0 {
path = "/" + myInterface
}
protocol := instance.Metadata["protocol"]
if protocol == "" {
if len(protocol) == 0 {
logger.Errorf("nacos instance metadata does not have protocol key,instance:%+v", instance)
return nil
}
......@@ -76,65 +81,66 @@ func generateUrl(instance model.Instance) *common.URL {
for k, v := range instance.Metadata {
urlMap.Set(k, v)
}
return common.NewURLWithOptions(common.WithIp(instance.Ip), common.WithPort(strconv.Itoa(int(instance.Port))), common.WithProtocol(protocol), common.WithParams(urlMap), common.WithPath(path))
return common.NewURLWithOptions(common.WithIp(instance.Ip), common.WithPort(strconv.Itoa(int(instance.Port))),
common.WithProtocol(protocol), common.WithParams(urlMap), common.WithPath(path))
}
func (nl *nacosListener) Callback(services []model.SubscribeService, err error) {
if err != nil {
logger.Errorf("nacos subscribe callback error:%s ", err.Error())
logger.Errorf("nacos subscribe callback error:%s , subscribe:%+v ", err.Error(), nl.subscribeParam)
return
}
nl.Lock()
defer nl.Unlock()
var addInstances []model.Instance
var delInstances []model.Instance
var updateInstances []model.Instance
nl.cacheLock.Lock()
defer nl.cacheLock.Unlock()
addInstances := make([]model.Instance, 0, len(services))
delInstances := make([]model.Instance, 0, len(services))
updateInstances := make([]model.Instance, 0, len(services))
newInstanceMap := map[string]model.Instance{}
newInstanceMap := make(map[string]model.Instance, len(services))
for _, s := range services {
if !s.Enable || !s.Valid {
//实例不可以用
for i := range services {
if !services[i].Enable || !services[i].Valid {
// instance is not available,so ignore it
continue
}
host := s.Ip + ":" + strconv.Itoa(int(s.Port))
instance := generateInstance(s)
host := services[i].Ip + ":" + strconv.Itoa(int(services[i].Port))
instance := generateInstance(services[i])
newInstanceMap[host] = instance
if old, ok := nl.hostMapInstance[host]; !ok {
//新增实例节点
//instance is not exsit in cache,add it to cache
addInstances = append(addInstances, instance)
} else {
//实例更新
//instance is not different from cache,update it to cache
if !reflect.DeepEqual(old, instance) {
updateInstances = append(updateInstances, instance)
}
}
}
//判断旧的实例是否在新实例列表中,不存在则代表实例已下线
for host, inst := range nl.hostMapInstance {
if _, ok := newInstanceMap[host]; !ok {
//cache instance is not exsit in new instance list, remove it from cache
delInstances = append(delInstances, inst)
}
}
nl.hostMapInstance = newInstanceMap
for _, add := range addInstances {
newUrl := generateUrl(add)
for i := range addInstances {
newUrl := generateUrl(addInstances[i])
if newUrl != nil {
nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EventTypeAdd})
}
}
for _, del := range delInstances {
newUrl := generateUrl(del)
for i := range delInstances {
newUrl := generateUrl(delInstances[i])
if newUrl != nil {
nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EventTypeDel})
}
}
for _, update := range updateInstances {
newUrl := generateUrl(update)
for i := range updateInstances {
newUrl := generateUrl(updateInstances[i])
if newUrl != nil {
nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EvnetTypeUpdate})
}
......@@ -172,7 +178,7 @@ func (nl *nacosListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-nl.done:
logger.Warnf("nacos listener is close!")
logger.Warnf("nacos listener is close!listenUrl:%+v", nl.listenUrl)
return nil, perrors.New("listener stopped")
case e := <-nl.events:
......
......@@ -29,7 +29,7 @@ var (
func init() {
localIP, _ = utils.GetLocalIP()
extension.SetRegistry("nacos", newNacosRegistry)
extension.SetRegistry(constant.NACOS_KEY, newNacosRegistry)
}
type nacosRegistry struct {
......@@ -41,13 +41,13 @@ func getNacosConfig(url *common.URL) (map[string]interface{}, error) {
if url == nil {
return nil, perrors.New("url is empty!")
}
if url.Location == "" {
if len(url.Location) == 0 {
return nil, perrors.New("url.location is empty!")
}
configMap := make(map[string]interface{})
configMap := make(map[string]interface{}, 2)
var serverConfigs []nacosConstant.ServerConfig
addresses := strings.Split(url.Location, ",")
serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses))
for _, addr := range addresses {
ip, portStr, err := net.SplitHostPort(addr)
if err != nil {
......@@ -119,17 +119,17 @@ func appendParam(target *bytes.Buffer, url common.URL, key string) {
func createRegisterParam(url common.URL, serviceName string) vo.RegisterInstanceParam {
category := getCategory(url)
params := map[string]string{}
params := make(map[string]string, len(url.Params)+3)
for k, _ := range url.Params {
params[k] = url.Params.Get(k)
}
params[constant.NACOS_CATEGORY_KEY] = category
params[constant.NACOS_PROTOCOL_KEY] = url.Protocol
params[constant.NACOS_PATH_KEY] = url.Path
if url.Ip == "" {
if len(url.Ip) == 0 {
url.Ip = localIP
}
if url.Port == "" || url.Port == "0" {
if len(url.Port) == 0 || url.Port == "0" {
url.Port = "80"
}
port, _ := strconv.Atoi(url.Port)
......@@ -154,7 +154,7 @@ func (nr *nacosRegistry) Register(url common.URL) error {
return err
}
if !isRegistry {
return perrors.New("registry to nacos failed")
return perrors.New("registry [" + serviceName + "] to nacos failed")
}
return nil
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment