Skip to content
Snippets Groups Projects
Commit 92d04926 authored by flycash's avatar flycash
Browse files

Merge 2.7.5

parents 62ac4748 cb3f5570
No related branches found
No related tags found
No related merge requests found
Showing
with 93 additions and 56 deletions
......@@ -25,6 +25,8 @@ import (
"net/url"
"strconv"
"strings"
"github.com/apache/dubbo-go/common/logger"
)
import (
......@@ -177,7 +179,11 @@ func WithToken(token string) option {
if len(token) > 0 {
value := token
if strings.ToLower(token) == "true" || strings.ToLower(token) == "default" {
value = uuid.NewV4().String()
u, err := uuid.NewV4()
if err != nil {
logger.Errorf("could not generator UUID: %v", err)
}
value = u.String()
}
url.SetParam(constant.TOKEN_KEY, value)
}
......
......@@ -43,8 +43,11 @@ type multiConfiger interface {
// BaseConfig is the event configuration for provider and consumer
type BaseConfig struct {
ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"`
// since 1.5.0 version
Remotes map[string]*RemoteConfig `yaml:"remote" json:"remote,omitempty"`
ServiceDiscoveries map[string]*ServiceDiscoveryConfig `yaml:"service_discovery" json:"service_discovery,omitempty"`
MetadataReportConfig *MetadataReportConfig `yaml:"metadata_report" json:"metadata_report,omitempty" property:"metadata_report"`
Registry *RegistryConfig `yaml:"registry" json:"registry,omitempty" property:"registry"`
Registries map[string]*RegistryConfig `yaml:"registries" json:"registries,omitempty" property:"registries"`
......
......@@ -146,9 +146,9 @@ func loadConsumerConfig() {
checkok = false
count++
if count > maxWait {
errMsg := fmt.Sprintf("Failed to check the status of the service %v . No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
logger.Error(errMsg)
panic(errMsg)
// errMsg := fmt.Sprintf("Failed to check the status of the service %v . No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
// logger.Error(errMsg)
// panic(errMsg)
}
time.Sleep(time.Second * 1)
break
......@@ -223,7 +223,7 @@ func Load() {
extension.SetAndInitGlobalDispatcher(GetBaseConfig().EventDispatcherType)
// start the metadata report if config set
if err := startMetadataReport(GetApplicationConfig().MetadataType, GetProviderConfig().MetadataReportConfig); err != nil {
if err := startMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil {
logger.Errorf("Provider starts metadata report error, and the error is {%#v}", err)
return
}
......
......@@ -40,8 +40,6 @@ type ProviderConfig struct {
BaseConfig `yaml:",inline"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty" property:"proxy_factory"`
// metadata-report
MetadataReportConfig *MetadataReportConfig `yaml:"metadata_report" json:"metadata_report,omitempty" property:"metadata_report"`
Services map[string]*ServiceConfig `yaml:"services" json:"services,omitempty" property:"services"`
Protocols map[string]*ProtocolConfig `yaml:"protocols" json:"protocols,omitempty" property:"protocols"`
ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf" `
......
......@@ -125,7 +125,7 @@ func (n *nacosDynamicConfiguration) GetConfigKeysByGroup(group string) (*gxset.H
return result, perrors.WithMessage(err, "can not find the client config")
}
for _, itm := range page.PageItems {
result.Add(itm.Appname)
result.Add(itm.DataId)
}
return result, nil
}
......
......@@ -46,7 +46,9 @@ func (n *nacosDynamicConfiguration) addListener(key string, listener config_cent
go callback(listener, namespace, group, dataId, data)
},
})
logger.Errorf("nacos : listen config fail, error:%v ", err)
if err != nil {
logger.Errorf("nacos : listen config fail, error:%v ", err)
}
newListener := make(map[config_center.ConfigurationListener]context.CancelFunc)
newListener[listener] = cancel
n.keyListeners.Store(key, newListener)
......
......@@ -36,7 +36,7 @@ require (
github.com/opentracing/opentracing-go v1.1.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.1.0
github.com/satori/go.uuid v1.2.0
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 // indirect
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/stretchr/testify v1.5.1
......@@ -56,3 +56,5 @@ require (
)
go 1.13
replace github.com/nacos-group/nacos-sdk-go => /Users/mindeng/go-workspace/src/nacos-sdk-go
......@@ -459,6 +459,8 @@ github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735 h1:7YvPJVmEeFHR1T
github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM=
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shirou/gopsutil v0.0.0-20181107111621-48177ef5f880 h1:1Ge4j/3uB2rxzPWD3TC+daeCw+w91z8UCUL/7WH5gn8=
......
......@@ -97,13 +97,21 @@ func BuildServiceDefinition(service common.Service, url common.URL) *ServiceDefi
for k, m := range service.Method() {
var paramTypes []string
for _, t := range m.ArgsType() {
paramTypes = append(paramTypes, t.Kind().String())
if len(m.ArgsType()) > 0 {
for _, t := range m.ArgsType() {
paramTypes = append(paramTypes, t.Kind().String())
}
}
var returnType string
if m.ReplyType() != nil {
returnType = m.ReplyType().Kind().String()
}
methodD := MethodDefinition{
Name: k,
ParameterTypes: paramTypes,
ReturnType: m.ReplyType().Kind().String(),
ReturnType: returnType,
}
sd.Methods = append(sd.Methods, methodD)
}
......
......@@ -41,7 +41,7 @@ func NewServiceMetadataIdentifier(url common.URL) *ServiceMetadataIdentifier {
}
}
// GetIdentifierKey will return string format as service:Version:Group:Side:Protocol:"revision"+Revision
// GetIdentifierKey will return string format as service:Version:Group:Side:Protocol:"revision-"+Revision
func (mdi *ServiceMetadataIdentifier) GetIdentifierKey() string {
return mdi.BaseMetadataIdentifier.getIdentifierKey(mdi.Protocol, constant.KEY_REVISON_PREFIX+mdi.Revision)
}
......
......@@ -268,7 +268,7 @@ func (mr *MetadataReport) doHandlerMetadataCollection(metadataMap map[*identifie
}
for e := range metadataMap {
if common.RoleType(common.PROVIDER).Role() == e.Side {
mr.StoreProviderMetadata(e, metadataMap[e].(*definition.FullServiceDefinition))
mr.StoreProviderMetadata(e, metadataMap[e].(*definition.ServiceDefinition))
} else if common.RoleType(common.CONSUMER).Role() == e.Side {
mr.StoreConsumerMetadata(e, metadataMap[e].(map[string]string))
}
......
......@@ -51,6 +51,7 @@ type nacosMetadataReport struct {
}
// StoreProviderMetadata will store the metadata
// metadata including the basic info of the server, provider info, and other user custom info
func (n *nacosMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error {
return n.storeMetadata(vo.ConfigParam{
DataId: providerIdentifier.GetIdentifierKey(),
......@@ -60,6 +61,7 @@ func (n *nacosMetadataReport) StoreProviderMetadata(providerIdentifier *identifi
}
// StoreConsumerMetadata will store the metadata
// metadata including the basic info of the server, consumer info, and other user custom info
func (n *nacosMetadataReport) StoreConsumerMetadata(consumerMetadataIdentifier *identifier.MetadataIdentifier, serviceParameterString string) error {
return n.storeMetadata(vo.ConfigParam{
DataId: consumerMetadataIdentifier.GetIdentifierKey(),
......@@ -69,6 +71,7 @@ func (n *nacosMetadataReport) StoreConsumerMetadata(consumerMetadataIdentifier *
}
// SaveServiceMetadata will store the metadata
// metadata including the basic info of the server, service info, and other user custom info
func (n *nacosMetadataReport) SaveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier, url common.URL) error {
return n.storeMetadata(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
......
......@@ -20,9 +20,13 @@ package nacos
import (
"strconv"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
......
......@@ -121,7 +121,8 @@ func (mts *MetadataService) PublishServiceDefinition(url common.URL) error {
ServiceInterface: interfaceName,
Version: url.GetParam(constant.VERSION_KEY, ""),
// Group: url.GetParam(constant.GROUP_KEY, constant.SERVICE_DISCOVERY_DEFAULT_GROUP),
Group: url.GetParam(constant.GROUP_KEY, "test"),
Group: url.GetParam(constant.GROUP_KEY, constant.DUBBO),
Side: url.GetParam(constant.SIDE_KEY, "provider"),
},
}
mts.delegateReport.StoreProviderMetadata(id, sd)
......
......@@ -54,7 +54,7 @@ func (p *ProtocolPortsMetadataCustomizer) Customize(instance registry.ServiceIns
list, err := metadataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE)
if err != nil || len(list) == 0 {
logger.Errorf("Could not find exported urls", err)
logger.Debugf("Could not find exported urls", err)
return
}
......
......@@ -177,10 +177,17 @@ func (s *serviceDiscoveryRegistry) Register(url common.URL) error {
if err != nil {
return perrors.WithMessage(err, "could not create servcie instance, please check your service url")
}
err = s.serviceDiscovery.Register(ins)
if err != nil {
return perrors.WithMessage(err, "register the service failed")
}
err = s.metaDataService.PublishServiceDefinition(url)
if err != nil {
return perrors.WithMessage(err, "publish the service definition failed. ")
}
return s.serviceNameMapping.Map(url.GetParam(constant.INTERFACE_KEY, ""),
url.GetParam(constant.GROUP_KEY, ""),
url.GetParam(constant.Version, ""),
......
......@@ -72,7 +72,10 @@ func getNacosConfig(url *common.URL) (map[string]interface{}, error) {
if err != nil {
return nil, perrors.WithMessagef(err, "split [%s] ", addr)
}
port, _ := strconv.Atoi(portStr)
port, err := strconv.Atoi(portStr)
if err != nil {
return configMap, perrors.WithMessage(err, "the port string is invalid. "+portStr)
}
serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{
IpAddr: ip,
Port: uint64(port),
......@@ -80,18 +83,21 @@ func getNacosConfig(url *common.URL) (map[string]interface{}, error) {
}
configMap["serverConfigs"] = serverConfigs
var clientConfig nacosConstant.ClientConfig
timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
return nil, err
}
clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000)
clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs
clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "")
clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "")
clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "")
clientConfig.NotLoadCacheAtStart = true
configMap["clientConfig"] = clientConfig
timeoutMs := uint64(timeout.Nanoseconds() / constant.MsToNanoRate)
configMap["clientConfig"] = nacosConstant.ClientConfig{
TimeoutMs: timeoutMs,
ListenInterval: 2 * timeoutMs,
CacheDir: url.GetParam(constant.NACOS_CACHE_DIR_KEY, ""),
LogDir: url.GetParam(constant.NACOS_LOG_DIR_KEY, ""),
Endpoint: url.GetParam(constant.NACOS_ENDPOINT, ""),
NotLoadCacheAtStart: true,
}
return configMap, nil
}
......
......@@ -51,7 +51,7 @@ var (
type ZookeeperClient struct {
name string
ZkAddrs []string
sync.Mutex // for conn
sync.RWMutex // for conn
Conn *zk.Conn
Timeout time.Duration
exit chan struct{}
......@@ -275,7 +275,7 @@ LOOP:
break LOOP
case (int)(zk.EventNodeDataChanged), (int)(zk.EventNodeChildrenChanged):
logger.Infof("zkClient{%s} get zk node changed event{path:%s}", z.name, event.Path)
z.Lock()
z.RLock()
for p, a := range z.eventRegistry {
if strings.HasPrefix(p, event.Path) {
logger.Infof("send event{state:zk.EventNodeDataChange, Path:%s} notify event to path{%s} related listener",
......@@ -285,7 +285,7 @@ LOOP:
}
}
}
z.Unlock()
z.RUnlock()
case (int)(zk.StateConnecting), (int)(zk.StateConnected), (int)(zk.StateHasSession):
if state == (int)(zk.StateHasSession) {
continue
......@@ -368,11 +368,11 @@ func (z *ZookeeperClient) ZkConnValid() bool {
}
valid := true
z.Lock()
z.RLock()
if z.Conn == nil {
valid = false
}
z.Unlock()
z.RUnlock()
return valid
}
......@@ -413,15 +413,15 @@ func (z *ZookeeperClient) CreateWithValue(basePath string, value []byte) error {
)
logger.Debugf("zookeeperClient.Create(basePath{%s})", basePath)
conn := z.getConn()
err = errNilZkClientConn
if conn == nil {
return perrors.WithMessagef(err, "zk.Create(path:%s)", basePath)
}
for _, str := range strings.Split(basePath, "/")[1:] {
tmpPath = path.Join(tmpPath, "/", str)
err = errNilZkClientConn
z.Lock()
conn := z.Conn
z.Unlock()
if conn != nil {
_, err = conn.Create(tmpPath, value, 0, zk.WorldACL(zk.PermAll))
}
_, err = conn.Create(tmpPath, value, 0, zk.WorldACL(zk.PermAll))
if err != nil {
if err == zk.ErrNodeExists {
......@@ -443,9 +443,7 @@ func (z *ZookeeperClient) Delete(basePath string) error {
)
err = errNilZkClientConn
z.Lock()
conn := z.Conn
z.Unlock()
conn := z.getConn()
if conn != nil {
err = conn.Delete(basePath, -1)
}
......@@ -465,9 +463,7 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, er
err = errNilZkClientConn
data = []byte("")
zkPath = path.Join(basePath) + "/" + node
z.Lock()
conn := z.Conn
z.Unlock()
conn := z.getConn()
if conn != nil {
tmpPath, err = conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
}
......@@ -490,9 +486,7 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string,
)
err = errNilZkClientConn
z.Lock()
conn := z.Conn
z.Unlock()
conn := z.getConn()
if conn != nil {
tmpPath, err = conn.Create(
path.Join(basePath)+"/",
......@@ -523,9 +517,7 @@ func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event,
)
err = errNilZkClientConn
z.Lock()
conn := z.Conn
z.Unlock()
conn := z.getConn()
if conn != nil {
children, stat, watcher, err = conn.ChildrenW(path)
}
......@@ -559,9 +551,7 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
)
err = errNilZkClientConn
z.Lock()
conn := z.Conn
z.Unlock()
conn := z.getConn()
if conn != nil {
children, stat, err = conn.Children(path)
}
......@@ -592,9 +582,7 @@ func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) {
)
err = errNilZkClientConn
z.Lock()
conn := z.Conn
z.Unlock()
conn := z.getConn()
if conn != nil {
exist, _, watcher, err = conn.ExistsW(zkPath)
}
......@@ -615,3 +603,10 @@ func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) {
func (z *ZookeeperClient) GetContent(zkPath string) ([]byte, *zk.Stat, error) {
return z.Conn.Get(zkPath)
}
// getConn gets zookeeper connection safely
func (z *ZookeeperClient) getConn() *zk.Conn {
z.RLock()
defer z.RUnlock()
return z.Conn
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment