Skip to content
Snippets Groups Projects
Select Git revision
  • cc6ef3456a0a22ebd4f557ec314e65bdb48d6565
  • master default protected
  • 3.0
  • develop
  • revert-2069-tripleVersion
  • 3.1
  • rest-protocol
  • feat/remoting_rocketmq
  • dapr-support
  • 1.5
  • 1.4
  • 1.3
  • 1.2
  • 1.1
  • v3.0.3-rc2
  • v3.0.3-rc1
  • v3.0.2
  • v1.5.8
  • v1.5.9-rc1
  • v3.0.1
  • v1.5.8-rc1
  • v3.0.0
  • v3.0.0-rc4-1
  • v3.0.0-rc4
  • v3.0.0-rc3
  • v1.5.7
  • v1.5.7-rc2
  • v3.0.0-rc2
  • remove
  • v1.5.7-rc1
  • v3.0.0-rc1
  • v1.5.7-rc1-tmp
  • 1.5.6
  • v1.5.6
34 results

impl.go

Blame
  • Joe Zou's avatar
    邹毅贤 authored
    cc6ef345
    History
    impl.go 5.50 KiB
    /*
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at
    
        http://www.apache.org/licenses/LICENSE-2.0
    
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
    */
    
    package apollo
    
    import (
    	"fmt"
    	"github.com/go-errors/errors"
    	"os"
    	"strconv"
    	"strings"
    	"sync"
    
    	"github.com/apache/dubbo-go/common"
    	"github.com/apache/dubbo-go/common/constant"
    	"github.com/apache/dubbo-go/config_center"
    	"github.com/apache/dubbo-go/config_center/parser"
    	"github.com/apache/dubbo-go/remoting"
    
    	"github.com/zouyx/agollo"
    )
    
    const (
    	apolloEnvKey         = "env"
    	apolloAddrKey        = "apollo.meta"
    	apolloClusterKey     = "apollo.cluster"
    	apolloProtocolPrefix = "http://"
    	apolloConfigFormat = "%s.%s"
    )
    
    type apolloDynamicConfiguration struct {
    	url *common.URL
    
    	listeners sync.Map
    	appConf   *agollo.AppConfig
    	parser    parser.ConfigurationParser
    }
    
    func newApolloDynamicConfiguration(url *common.URL) (*apolloDynamicConfiguration, error) {
    	c := &apolloDynamicConfiguration{
    		url: url,
    	}
    	configEnv := url.GetParam(apolloEnvKey, "")
    	configAddr := c.getAddressWithProtocolPrefix(url)
    	configCluster := url.GetParam(constant.CONFIG_GROUP_KEY, "")
    	if len(configEnv) != 0 {
    		os.Setenv(apolloEnvKey, configEnv)
    	}
    
    	key := os.Getenv(apolloEnvKey)
    	if len(key) != 0 || constant.ANYHOST_VALUE == configAddr {
    		configAddr = key
    	}
    
    	appId := os.Getenv("app.id")
    	namespace := url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP)
    	readyConfig := &agollo.AppConfig{
    		AppId:         appId,
    		Cluster:       configCluster,
    		NamespaceName: getNamespaceName(namespace,agollo.YML),
    		Ip:            configAddr,
    	}
    
    	agollo.InitCustomConfig(func() (*agollo.AppConfig, error) {
    		return readyConfig, nil
    	})
    
    	return c, agollo.Start()
    }
    
    type apolloChangeListener struct {
    	c *apolloDynamicConfiguration
    }
    
    func (a *apolloChangeListener) OnChange(event *agollo.ChangeEvent) {
    	for name, change := range event.Changes {
    		cfgChangeEvent := &config_center.ConfigChangeEvent{
    			Key:        name,
    			Value:      change.NewValue,
    			ConfigType: a.c.getChangeType(change.ChangeType),
    		}
    		a.c.listeners.Range(func(key, value interface{}) bool {
    			for listener, _ := range value.(apolloListener).listeners {
    				listener.Process(cfgChangeEvent)
    			}
    			return true
    		})
    	}
    }
    
    func (c *apolloDynamicConfiguration) start() {
    	agollo.AddChangeListener(&apolloChangeListener{})
    }
    
    func (c *apolloDynamicConfiguration) getChangeType(change agollo.ConfigChangeType) remoting.EventType {
    	switch change {
    	case agollo.ADDED:
    		return remoting.EventTypeAdd
    	case agollo.DELETED:
    		return remoting.EventTypeDel
    	case agollo.MODIFIED:
    		return remoting.EventTypeUpdate
    	default:
    		panic("unknow type: " + strconv.Itoa(int(change)))
    	}
    }
    
    func (c *apolloDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opts ...config_center.Option) {
    	k := &config_center.Options{}
    	for _, opt := range opts {
    		opt(k)
    	}
    
    	key = k.Group + key
    	l, _ := c.listeners.LoadOrStore(key, NewApolloListener())
    	l.(apolloListener).AddListener(listener)
    }
    
    func (c *apolloDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, opts ...config_center.Option) {
    	k := &config_center.Options{}
    	for _, opt := range opts {
    		opt(k)
    	}
    
    	key = k.Group + key
    	l, ok := c.listeners.Load(key)
    	if ok {
    		l.(apolloListener).RemoveListener(listener)
    	}
    }
    
    func getNamespaceName(namespace string,configFileFormat agollo.ConfigFileFormat ) string{
    	return fmt.Sprintf(apolloConfigFormat, namespace, configFileFormat)
    }
    
    func (c *apolloDynamicConfiguration) GetConfig(key string, opts ...config_center.Option) (string, error) {
    	k := &config_center.Options{}
    	for _, opt := range opts {
    		opt(k)
    	}
    	group := k.Group
    	if len(group) != 0 && c.url.GetParam(constant.CONFIG_GROUP_KEY, config_center.DEFAULT_GROUP) != group {
    		namespace := c.url.GetParam(constant.CONFIG_GROUP_KEY, config_center.DEFAULT_GROUP)
    		fileNamespace := getNamespaceName(namespace, agollo.Properties)
    		config := agollo.GetConfig(fileNamespace)
    		if config==nil{
    			return "",errors.New(fmt.Sprintf("nothiing in namespace:%s ",fileNamespace))
    		}
    		return config.GetContent(),nil
    	}
    	return agollo.GetStringValue(key, ""), nil
    }
    
    func (c *apolloDynamicConfiguration) getAddressWithProtocolPrefix(url *common.URL) string {
    	address := ""
    	converted := address
    	if len(address) != 0 {
    		parts := strings.Split(address, ",")
    		addrs := make([]string, 0)
    		for _, part := range parts {
    			addr := part
    			if !strings.HasPrefix(part, apolloProtocolPrefix) {
    				addr = apolloProtocolPrefix + part
    			}
    			addrs = append(addrs, addr)
    		}
    		converted = strings.Join(addrs, ",")
    	}
    	return converted
    }
    
    func (c *apolloDynamicConfiguration) Parser() parser.ConfigurationParser {
    	return c.parser
    }
    func (c *apolloDynamicConfiguration) SetParser(p parser.ConfigurationParser) {
    	c.parser = p
    }
    
    func (c *apolloDynamicConfiguration) GetConfigs(key string, opts ...config_center.Option) (string, error) {
    	return c.GetConfig(key, opts...)
    }