Skip to content
Snippets Groups Projects
Unverified Commit bfb89b50 authored by lzp0412's avatar lzp0412 Committed by GitHub
Browse files

Merge pull request #8 from flycash/2.7.5

2.7.5
parents 7e912e7f cc4d5639
No related branches found
No related tags found
No related merge requests found
......@@ -68,11 +68,15 @@ func init() {
if errCon := ConsumerInit(confConFile); errCon != nil {
log.Printf("[consumerInit] %#v", errCon)
consumerConfig = nil
} else {
baseConfig = &consumerConfig.BaseConfig
}
if errPro := ProviderInit(confProFile); errPro != nil {
log.Printf("[providerInit] %#v", errPro)
providerConfig = nil
} else {
baseConfig = &providerConfig.BaseConfig
}
}
......@@ -282,7 +286,7 @@ func GetProviderConfig() ProviderConfig {
defer configAccessMutex.Unlock()
if providerConfig == nil {
logger.Warnf("creating empty provider config. You should see this log only once.")
providerConfig = &ProviderConfig{}
return ProviderConfig{}
}
}
return *providerConfig
......@@ -300,13 +304,14 @@ func GetConsumerConfig() ConsumerConfig {
defer configAccessMutex.Unlock()
if consumerConfig == nil {
logger.Warnf("creating empty consumer config. You should see this log only once.")
consumerConfig = &ConsumerConfig{}
return ConsumerConfig{}
}
}
return *consumerConfig
}
func GetBaseConfig() *BaseConfig {
if baseConfig == nil {
baseConfigOnce.Do(func() {
baseConfig = &BaseConfig{
......
......@@ -34,13 +34,10 @@ import (
// MethodConfig ...
type MetadataReportConfig struct {
Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"`
Address string `yaml:"address" json:"address,omitempty" property:"address"`
Username string `yaml:"username" json:"username,omitempty" property:"username"`
Password string `yaml:"password" json:"password,omitempty" property:"password"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty" property:"timeout"` // unit: second
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"`
RemoteRef string `required:"true" yaml:"remote_ref" json:"remote_ref,omitempty"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
}
// Prefix ...
......@@ -70,18 +67,24 @@ func (c *MetadataReportConfig) ToUrl() (*common.URL, error) {
}
}
url, err := common.NewURL(c.Address,
rc, ok := GetBaseConfig().GetRemoteConfig(c.RemoteRef)
if !ok {
return nil, perrors.New("Could not find out the remote ref config, name: " + c.RemoteRef)
}
res, err := common.NewURL(rc.Address,
common.WithParams(urlMap),
common.WithUsername(c.Username),
common.WithPassword(c.Password),
common.WithLocation(c.Address),
common.WithUsername(rc.Username),
common.WithPassword(rc.Password),
common.WithLocation(rc.Address),
common.WithProtocol(c.Protocol),
)
if err != nil || len(url.Protocol) == 0 {
if err != nil || len(res.Protocol) == 0 {
return nil, perrors.New("Invalid MetadataReportConfig.")
}
url.SetParam("metadata", url.Protocol)
return &url, nil
res.SetParam("metadata", res.Protocol)
return &res, nil
}
func (c *MetadataReportConfig) IsValid() bool {
......@@ -94,10 +97,8 @@ func startMetadataReport(metadataType string, metadataReportConfig *MetadataRepo
return nil
}
if metadataType == constant.METACONFIG_REMOTE {
return perrors.New("No MetadataConfig found, you must specify the remote Metadata Center address when 'metadata=remote' is enabled.")
} else if metadataType == constant.METACONFIG_REMOTE && len(metadataReportConfig.Address) == 0 {
return perrors.New("MetadataConfig address can not be empty.")
if metadataType == constant.METACONFIG_REMOTE && len(metadataReportConfig.RemoteRef) == 0 {
return perrors.New("MetadataConfig remote ref can not be empty.")
}
if url, err := metadataReportConfig.ToUrl(); err == nil {
......
......@@ -24,12 +24,15 @@ import (
)
func TestMetadataReportConfig_ToUrl(t *testing.T) {
metadataReportConfig := MetadataReportConfig{
Protocol: "mock",
GetBaseConfig().Remotes["mock"] = &RemoteConfig{
Address: "127.0.0.1:2181",
Username: "test",
Password: "test",
TimeoutStr: "3s",
}
metadataReportConfig := MetadataReportConfig{
Protocol: "mock",
RemoteRef: "mock",
Params: map[string]string{
"k": "v",
},
......
......@@ -87,15 +87,15 @@ func ProviderInit(confProFile string) error {
}
providerConfig.fileStream = bytes.NewBuffer(fileStream)
//set method interfaceId & interfaceName
// set method interfaceId & interfaceName
for k, v := range providerConfig.Services {
//set id for reference
// set id for reference
for _, n := range providerConfig.Services[k].Methods {
n.InterfaceName = v.InterfaceName
n.InterfaceId = k
}
}
//start the metadata report if config set
// start the metadata report if config set
if err := startMetadataReport(providerConfig.ApplicationConfig.MetadataType, providerConfig.MetadataReportConfig); err != nil {
return perrors.WithMessagef(err, "Provider starts metadata report error, and the error is {%#v}", err)
}
......@@ -105,7 +105,7 @@ func ProviderInit(confProFile string) error {
}
func configCenterRefreshProvider() error {
//fresh it
// fresh it
if providerConfig.ConfigCenterConfig != nil {
providerConfig.fatherConfig = providerConfig
if err := providerConfig.startConfigCenter(); err != nil {
......
......@@ -19,12 +19,24 @@ package config
import (
"time"
"github.com/apache/dubbo-go/common/logger"
)
type RemoteConfig struct {
Address string `yaml:"address" json:"address,omitempty"`
Timeout time.Duration `default:"10s" yaml:"timeout" json:"timeout,omitempty"`
Params map[string]string `yaml:"params" json:"address,omitempty"`
Address string `yaml:"address" json:"address,omitempty"`
TimeoutStr string `default:"5s" yaml:"timeout" json:"timeout,omitempty"`
Username string `yaml:"username" json:"username,omitempty" property:"username"`
Password string `yaml:"password" json:"password,omitempty" property:"password"`
Params map[string]string `yaml:"params" json:"address,omitempty"`
}
func (rc *RemoteConfig) Timeout() time.Duration {
if res, err := time.ParseDuration(rc.TimeoutStr); err == nil {
return res
}
logger.Errorf("Could not parse the timeout string to Duration: %s, the default value will be returned", rc.TimeoutStr)
return 5 * time.Second
}
// GetParam will return the value of the key. If not found, def will be return;
......
......@@ -18,6 +18,7 @@
package dynamic
import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/metadata/mapping"
"strconv"
"sync"
......@@ -41,6 +42,10 @@ const (
slash = "/"
)
func init() {
extension.SetServiceNameMapping("dynamic", GetServiceNameMappingInstance)
}
// DynamicConfigurationServiceNameMapping is the implementation based on config center
type DynamicConfigurationServiceNameMapping struct {
dc config_center.DynamicConfiguration
......
......@@ -130,7 +130,7 @@ func NewMetadataReport() (*MetadataReport, error) {
scheduler := gocron.NewScheduler(time.UTC)
_, err := scheduler.Every(1).Day().Do(
func() {
logger.Info("start to publish all metadata in metadata report %v.", url)
logger.Info("start to publish all metadata in metadata report %s.", url.String())
bmr.allMetadataReportsLock.RLock()
bmr.doHandlerMetadataCollection(bmr.allMetadataReports)
bmr.allMetadataReportsLock.RUnlock()
......
......@@ -65,7 +65,7 @@ func (mts *MetadataService) setInMemoryMetadataService(metadata *inmemory.Metada
// ExportURL will be implemented by in memory service
func (mts *MetadataService) ExportURL(url common.URL) (bool, error) {
return true, nil
return mts.inMemoryMetadataService.ExportURL(url)
}
// UnexportURL
......@@ -76,13 +76,13 @@ func (mts *MetadataService) UnexportURL(url common.URL) error {
}
// SubscribeURL will be implemented by in memory service
func (MetadataService) SubscribeURL(url common.URL) (bool, error) {
return true, nil
func (mts *MetadataService) SubscribeURL(url common.URL) (bool, error) {
return mts.inMemoryMetadataService.SubscribeURL(url)
}
// UnsubscribeURL will be implemented by in memory service
func (MetadataService) UnsubscribeURL(url common.URL) error {
return nil
func (mts *MetadataService) UnsubscribeURL(url common.URL) error {
return mts.UnsubscribeURL(url)
}
// PublishServiceDefinition will call remote metadata's StoreProviderMetadata to store url info and service definition
......
......@@ -19,7 +19,6 @@ package nacos
import (
"testing"
"time"
"github.com/apache/dubbo-go/config"
)
......@@ -59,8 +58,8 @@ func Test_newNacosServiceDiscovery(t *testing.T) {
assert.NotNil(t, err)
config.GetBaseConfig().Remotes["mock"] = &config.RemoteConfig{
Address: "console.nacos.io:80",
Timeout: 10 * time.Second,
Address: "console.nacos.io:80",
TimeoutStr: "10s",
}
res, err := newNacosServiceDiscovery(name)
......@@ -168,7 +167,7 @@ func prepareData() {
}
config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{
Address: "console.nacos.io:80",
Timeout: 10 * time.Second,
Address: "console.nacos.io:80",
TimeoutStr: "10s",
}
}
......@@ -33,9 +33,10 @@ import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/observer"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/metadata/mapping"
"github.com/apache/dubbo-go/metadata/service"
"github.com/apache/dubbo-go/metadata/service/inmemory"
"github.com/apache/dubbo-go/metadata/service/remote"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/registry/servicediscovery/proxy"
"github.com/apache/dubbo-go/registry/servicediscovery/synthesizer"
......@@ -76,8 +77,10 @@ func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) {
subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, ""))
subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer()
serviceNameMapping := extension.GetServiceNameMapping(url.GetParam(constant.SERVICE_NAME_MAPPING_KEY, ""))
// TODO it's need to get implement by factory
metaDataService := inmemory.NewMetadataService()
metaDataService, err := remote.NewMetadataService()
if err != nil {
return nil, perrors.WithMessage(err, "could not init metadata service")
}
return &serviceDiscoveryRegistry{
url: url,
serviceDiscovery: serviceDiscovery,
......@@ -91,15 +94,26 @@ func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) {
}
func (s *serviceDiscoveryRegistry) UnRegister(url common.URL) error {
panic("implement me")
if !shouldRegister(url) {
return nil
}
return s.metaDataService.UnexportURL(url)
}
func (s *serviceDiscoveryRegistry) UnSubscribe(*common.URL, registry.NotifyListener) error {
panic("implement me")
func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener registry.NotifyListener) error {
if !shouldSubscribe(*url) {
return nil
}
return s.metaDataService.UnsubscribeURL(*url)
}
func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
return extension.GetServiceDiscovery(url.Protocol, "TODO")
sdcName := url.GetParam(constant.SERVICE_DISCOVERY_KEY, "")
sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(sdcName)
if !ok {
return nil, perrors.Errorf("The service discovery with name: %s is not found", sdcName)
}
return extension.GetServiceDiscovery(sdc.Protocol, sdcName)
}
func parseServices(literalServices string) *gxset.HashSet {
......@@ -125,6 +139,7 @@ func (s *serviceDiscoveryRegistry) GetUrl() common.URL {
}
func (s *serviceDiscoveryRegistry) IsAvailable() bool {
// TODO(whether available depends on metadata service and service discovery)
return true
}
......
......@@ -118,7 +118,7 @@ func NewNacosClient(rc *config.RemoteConfig) (naming_client.INamingClient, error
configMap["serverConfigs"] = serverConfigs
var clientConfig nacosConstant.ClientConfig
timeout := rc.Timeout
timeout := rc.Timeout()
clientConfig.TimeoutMs = uint64(timeout.Nanoseconds() / constant.MsToNanoRate)
clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs
clientConfig.CacheDir = rc.GetParam(constant.NACOS_CACHE_DIR_KEY, "")
......
......@@ -19,7 +19,6 @@ package nacos
import (
"testing"
"time"
)
import (
......@@ -43,7 +42,7 @@ func TestNewNacosClient(t *testing.T) {
assert.NotNil(t, err)
rc.Address = "console.nacos.io:80"
rc.Timeout = 10 * time.Second
rc.TimeoutStr = "10s"
client, err = NewNacosClient(rc)
assert.NotNil(t, client)
assert.Nil(t, err)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment