Skip to content
Snippets Groups Projects
Commit 8129bca6 authored by vito.he's avatar vito.he
Browse files

Ftr: new feature implement for config center, config refresh from config center when dubbogo start

parent 6e3c3051
No related branches found
No related tags found
No related merge requests found
Showing
with 995 additions and 404 deletions
......@@ -19,6 +19,7 @@ package config
import (
"container/list"
"strings"
"sync"
)
......@@ -58,31 +59,45 @@ func (env *Environment) UpdateExternalConfigMap(externalMap map[string]string) {
}
}
func (env *Environment) Configuration(prefix string) *list.List {
func (env *Environment) Configuration() *list.List {
list := list.New()
memConf := newInmemoryConfiguration(prefix)
memConf := newInmemoryConfiguration()
memConf.setProperties(env.externalConfigMap)
list.PushBack(memConf)
return list
}
type InmemoryConfiguration struct {
prefix string
store sync.Map
store sync.Map
}
func newInmemoryConfiguration(prefix string) *InmemoryConfiguration {
return &InmemoryConfiguration{prefix: prefix}
func newInmemoryConfiguration() *InmemoryConfiguration {
return &InmemoryConfiguration{}
}
func (conf *InmemoryConfiguration) setProperties(p sync.Map) {
conf.store = p
}
func (conf *InmemoryConfiguration) GetProperty(key string) string {
v, ok := conf.store.Load(conf.prefix + "." + key)
func (conf *InmemoryConfiguration) GetProperty(key string) (bool, string) {
v, ok := conf.store.Load(key)
if ok {
return v.(string)
return true, v.(string)
} else {
return ""
return false, ""
}
}
func (conf *InmemoryConfiguration) GetSubProperty(subKey string) map[string]struct{} {
properties := make(map[string]struct{})
conf.store.Range(func(key, value interface{}) bool {
if idx := strings.Index(key.(string), subKey); idx >= 0 {
after := key.(string)[idx+len(subKey):]
if i := strings.Index(after, "."); i >= 0 {
properties[after[0:strings.Index(after, ".")]] = struct{}{}
}
}
return true
})
return properties
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import (
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
func TestGetEnvInstance(t *testing.T) {
GetEnvInstance()
assert.NotNil(t, instance)
}
func TestEnvironment_UpdateExternalConfigMap(t *testing.T) {
GetEnvInstance().UpdateExternalConfigMap(map[string]string{"1": "2"})
v, ok := GetEnvInstance().externalConfigMap.Load("1")
assert.True(t, ok)
assert.Equal(t, "2", v)
}
func TestEnvironment_ConfigurationAndGetProperty(t *testing.T) {
GetEnvInstance().UpdateExternalConfigMap(map[string]string{"1": "2"})
list := GetEnvInstance().Configuration()
ok, v := list.Front().Value.(*InmemoryConfiguration).GetProperty("1")
assert.True(t, ok)
assert.Equal(t, "2", v)
}
func TestInmemoryConfiguration_GetSubProperty(t *testing.T) {
GetEnvInstance().UpdateExternalConfigMap(map[string]string{"123": "2"})
list := GetEnvInstance().Configuration()
m := list.Front().Value.(*InmemoryConfiguration).GetSubProperty("1")
assert.Equal(t, struct{}{}, m["123"])
}
......@@ -72,3 +72,11 @@ const (
CONFIG_NAMESPACE_KEY = "config.namespace"
CONFIG_TIMEOUT_KET = "config.timeout"
)
const (
RegistryConfigPrefix = "dubbo.registries."
ReferenceConfigPrefix = "dubbo.reference."
ServiceConfigPrefix = "dubbo.service."
ProtocolConfigPrefix = "dubbo.protocols."
ProviderConfigPrefix = "dubbo.provider."
ConsumerConfigPrefix = "dubbo.consumer."
)
......@@ -164,6 +164,13 @@ func NewURL(ctx context.Context, urlString string, opts ...option) (URL, error)
}
//rawUrlString = "//" + rawUrlString
if strings.Index(rawUrlString, "//") < 0 {
t := URL{baseUrl: baseUrl{ctx: ctx}}
for _, opt := range opts {
opt(&t)
}
rawUrlString = t.Protocol + "://" + rawUrlString
}
serviceUrl, err = url.Parse(rawUrlString)
if err != nil {
return s, perrors.Errorf("url.Parse(url string{%s}), error{%v}", rawUrlString, err)
......@@ -197,10 +204,10 @@ func NewURL(ctx context.Context, urlString string, opts ...option) (URL, error)
// s.Timeout = time.Duration(timeout * 1e6) // timeout unit is millisecond
// }
//}
//fmt.Println(s.String())
for _, opt := range opts {
opt(&s)
}
//fmt.Println(s.String())
return s, nil
}
......
......@@ -17,11 +17,23 @@
package config
import "github.com/apache/dubbo-go/common/constant"
type ApplicationConfig struct {
Organization string `yaml:"organization" json:"organization,omitempty"`
Name string `yaml:"name" json:"name,omitempty"`
Module string `yaml:"module" json:"module,omitempty"`
Version string `yaml:"version" json:"version,omitempty"`
Owner string `yaml:"owner" json:"owner,omitempty"`
Environment string `yaml:"environment" json:"environment,omitempty"`
Organization string `yaml:"organization" json:"organization,omitempty" property:"organization"`
Name string `yaml:"name" json:"name,omitempty" property:"name"`
Module string `yaml:"module" json:"module,omitempty" property:"module"`
Version string `yaml:"version" json:"version,omitempty" property:"version"`
Owner string `yaml:"owner" json:"owner,omitempty" property:"owner"`
Environment string `yaml:"environment" json:"environment,omitempty" property:"environment"`
}
func (*ApplicationConfig) Prefix() string {
return constant.DUBBO + ".application."
}
func (c *ApplicationConfig) Id() string {
return ""
}
func (c *ApplicationConfig) SetId(id string) {
}
......@@ -18,10 +18,8 @@ package config
import (
"context"
"github.com/apache/dubbo-go/common/constant"
"reflect"
"strconv"
"strings"
)
import (
perrors "github.com/pkg/errors"
......@@ -34,24 +32,31 @@ import (
"github.com/apache/dubbo-go/config_center"
)
type baseConfig struct {
ConfigCenterConfig ConfigCenterConfig
type multiConfiger interface {
Prefix() string
}
type BaseConfig struct {
ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"`
configCenterUrl *common.URL
prefix string
fatherConfig interface{}
}
func (c *baseConfig) startConfigCenter(ctx context.Context) error {
var err error
*c.configCenterUrl, err = common.NewURL(ctx, c.ConfigCenterConfig.Address)
func (c *BaseConfig) startConfigCenter(ctx context.Context) error {
url, err := common.NewURL(ctx, c.ConfigCenterConfig.Address, common.WithProtocol(c.ConfigCenterConfig.Protocol))
if err != nil {
return err
}
c.configCenterUrl = &url
if c.prepareEnvironment() != nil {
return perrors.WithMessagef(err, "start config center error!")
}
c.fresh()
//c.fresh()
return err
}
func (c *baseConfig) prepareEnvironment() error {
func (c *BaseConfig) prepareEnvironment() error {
factory := extension.GetConfigCenterFactory(c.ConfigCenterConfig.Protocol)
dynamicConfig, err := factory.GetDynamicConfiguration(c.configCenterUrl)
......@@ -72,76 +77,184 @@ func (c *baseConfig) prepareEnvironment() error {
return nil
}
func (c *baseConfig) fresh() {
configList := config.GetEnvInstance().Configuration(c.Prefix())
config := configList.Front().Value.(*config.InmemoryConfiguration)
val := reflect.Indirect(reflect.ValueOf(c.fatherConfig))
func getKeyPrefix(val reflect.Value, id reflect.Value) string {
var prefix string
var idStr string
if id.Kind() == reflect.String {
idStr = id.Interface().(string)
}
if val.CanAddr() {
prefix = val.Addr().MethodByName("Prefix").Call(nil)[0].String()
} else {
prefix = val.MethodByName("Prefix").Call(nil)[0].String()
}
if idStr != "" {
return prefix + idStr + "."
} else {
return prefix
}
}
func setFieldValue(val reflect.Value, id reflect.Value, config *config.InmemoryConfiguration) {
for i := 0; i < val.NumField(); i++ {
if key := val.Type().Field(i).Tag.Get("property"); key != "-" && key != "" {
f := val.Field(i)
if f.IsValid() {
value := config.GetProperty(key)
setValue := func(f reflect.Value) {
if f.Kind() == reflect.Int {
x, err := strconv.Atoi(value)
if err != nil {
logger.Errorf("Dynamic change the configuration in struct {%v} field {%v} error ,error message is {%v}",
val.Type().Name(), val.Type().Field(i).Name, err)
} else {
if !f.OverflowInt(int64(x)) {
f.SetInt(int64(x))
} else {
setBaseValue := func(f reflect.Value) {
ok, value := config.GetProperty(getKeyPrefix(val, id) + key)
if ok {
if f.Kind() == reflect.Int64 {
x, err := strconv.Atoi(value)
if err != nil {
logger.Errorf("Dynamic change the configuration in struct {%v} field {%v} error ,error message is {%v}",
val.Type().Name(), val.Type().Field(i).Name, perrors.Errorf("the int64 value {%v} from config center is overflow", int64(x)))
val.Type().Name(), val.Type().Field(i).Name, err)
} else {
if !f.OverflowInt(int64(x)) {
f.SetInt(int64(x))
} else {
logger.Errorf("Dynamic change the configuration in struct {%v} field {%v} error ,error message is {%v}",
val.Type().Name(), val.Type().Field(i).Name, perrors.Errorf("the int64 value {%v} from config center is overflow", int64(x)))
}
}
}
if f.Kind() == reflect.String {
f.SetString(value)
}
if f.Kind() == reflect.Bool {
x, err := strconv.ParseBool(value)
if err != nil {
logger.Errorf("Dynamic change the configuration in struct {%v} field {%v} error ,error message is {%v}",
val.Type().Name(), val.Type().Field(i).Name, err)
}
f.SetBool(x)
}
if f.Kind() == reflect.Float64 {
x, err := strconv.ParseFloat(value, 64)
if err != nil {
logger.Errorf("Dynamic change the configuration in struct {%v} field {%v} error ,error message is {%v}",
val.Type().Name(), val.Type().Field(i).Name, err)
} else {
if !f.OverflowFloat(x) {
f.SetFloat(x)
} else {
logger.Errorf("Dynamic change the configuration in struct {%v} field {%v} error ,error message is {%v}",
val.Type().Name(), val.Type().Field(i).Name, perrors.Errorf("the float64 value {%v} from config center is overflow", x))
}
}
}
}
if f.Kind() == reflect.String {
f.SetString(value)
}
setBaseValue(f)
if f.Kind() == reflect.Ptr {
if f.Elem().Kind() == reflect.Struct {
setFieldValue(f.Elem(), reflect.Value{}, config)
} else {
setBaseValue(f.Elem())
}
if f.Kind() == reflect.Bool {
x, err := strconv.ParseBool(value)
if err != nil {
logger.Errorf("Dynamic change the configuration in struct {%v} field {%v} error ,error message is {%v}",
val.Type().Name(), val.Type().Field(i).Name, err)
}
if f.Kind() == reflect.Struct {
setFieldValue(f, reflect.Value{}, config)
}
if f.Kind() == reflect.Slice {
for i := 0; i < f.Len(); i++ {
e := f.Index(i)
if e.Kind() == reflect.Ptr {
if e.Elem().Kind() == reflect.Struct {
setFieldValue(e.Elem(), reflect.Value{}, config)
} else {
setBaseValue(e.Elem())
}
}
f.SetBool(x)
}
if f.Kind() == reflect.Float64 {
x, err := strconv.ParseFloat(value, 64)
if err != nil {
logger.Errorf("Dynamic change the configuration in struct {%v} field {%v} error ,error message is {%v}",
val.Type().Name(), val.Type().Field(i).Name, err)
} else {
if !f.OverflowFloat(x) {
f.SetFloat(x)
}
if f.Kind() == reflect.Map {
//initiate config
s := reflect.New(f.Type().Elem().Elem())
prefix := s.MethodByName("Prefix").Call(nil)[0].String()
m := config.GetSubProperty(prefix)
for k := range m {
f.SetMapIndex(reflect.ValueOf(k), reflect.New(f.Type().Elem().Elem()))
}
//iter := f.MapRange()
for _, k := range f.MapKeys() {
v := f.MapIndex(k)
if v.Kind() == reflect.Ptr {
if v.Elem().Kind() == reflect.Struct {
setFieldValue(v.Elem(), k, config)
} else {
logger.Errorf("Dynamic change the configuration in struct {%v} field {%v} error ,error message is {%v}",
val.Type().Name(), val.Type().Field(i).Name, perrors.Errorf("the float64 value {%v} from config center is overflow", x))
setBaseValue(v.Elem())
}
}
}
}
setValue(f)
if f.Kind() == reflect.Ptr {
setValue(f.Elem())
}
}
}
}
}
func (c *BaseConfig) fresh() {
configList := config.GetEnvInstance().Configuration()
config := configList.Front().Value.(*config.InmemoryConfiguration)
func (c *baseConfig) SetPrefix(prefix string) {
c.prefix = prefix
}
func (c *baseConfig) Prefix() string {
if c.prefix == "" {
return constant.DUBBO + "." + strings.ToLower(strings.Replace(reflect.Indirect(reflect.ValueOf(c.fatherConfig)).Type().Name(), "Config", "", -1))
} else {
return c.prefix
}
//reflect to init struct
tp := reflect.ValueOf(c.fatherConfig).Elem().Type()
initializeStruct(tp, reflect.ValueOf(c.fatherConfig).Elem())
val := reflect.Indirect(reflect.ValueOf(c.fatherConfig))
setFieldValue(val, reflect.Value{}, config)
}
func (c *baseConfig) SetFatherConfig(fatherConfig interface{}) {
func (c *BaseConfig) SetFatherConfig(fatherConfig interface{}) {
c.fatherConfig = fatherConfig
}
func initializeStruct(t reflect.Type, v reflect.Value) {
if v.Kind() == reflect.Struct {
for i := 0; i < v.NumField(); i++ {
f := v.Field(i)
ft := t.Field(i)
if ft.Tag.Get("property") != "" {
switch ft.Type.Kind() {
case reflect.Map:
if f.IsNil() {
f.Set(reflect.MakeMap(ft.Type))
}
case reflect.Slice:
if f.IsNil() {
f.Set(reflect.MakeSlice(ft.Type, 0, 0))
}
case reflect.Chan:
if f.IsNil() {
f.Set(reflect.MakeChan(ft.Type, 0))
}
case reflect.Struct:
if f.IsNil() {
initializeStruct(ft.Type, f)
}
case reflect.Ptr:
if f.IsNil() {
fv := reflect.New(ft.Type.Elem())
initializeStruct(ft.Type.Elem(), fv.Elem())
f.Set(fv)
}
default:
}
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import (
"github.com/apache/dubbo-go/common/config"
"github.com/stretchr/testify/assert"
"context"
"fmt"
"reflect"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common/config"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
)
func Test_refresh(t *testing.T) {
//Id string `required:"true" yaml:"id" json:"id,omitempty"`
//Type string `required:"true" yaml:"type" json:"type,omitempty"`
//TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty"` // unit: second
//Group string `yaml:"group" json:"group,omitempty"`
////for registry
//Address string `yaml:"address" json:"address,omitempty"`
//Username string `yaml:"username" json:"address,omitempty"`
//Password string `yaml:"password" json:"address,omitempty"`
c := &baseConfig{}
c := &BaseConfig{}
mockMap := map[string]string{}
mockMap["dubbo.registry.type"] = "zookeeper"
mockMap["dubbo.registry.timeout"] = "3s"
mockMap["dubbo.registry.group"] = "hangzhou"
mockMap["dubbo.registry.address"] = "zookeeper://172.0.0.1:2181"
mockMap["dubbo.registry.username"] = "admin"
mockMap["dubbo.registry.password"] = "admin"
mockMap["dubbo.registries.shanghai_reg1.protocol"] = "mock100"
mockMap["dubbo.reference.MockService.MockService.retries"] = "10"
mockMap["dubbo.MockService.MockService.GetUser.retries"] = "10"
mockMap["dubbo.consumer.check"] = "false"
mockMap["dubbo.application.name"] = "dubbo"
config.GetEnvInstance().UpdateExternalConfigMap(mockMap)
father := &RegistryConfig{Type: "1111"}
father := &ConsumerConfig{
Check: &[]bool{true}[0],
ApplicationConfig: &ApplicationConfig{
Organization: "dubbo_org",
Name: "dubbo",
Module: "module",
Version: "2.6.0",
Owner: "dubbo",
Environment: "test"},
Registries: map[string]*RegistryConfig{
//"shanghai_reg1": {
// id: "shanghai_reg1",
// Protocol: "mock",
// TimeoutStr: "2s",
// Group: "shanghai_idc",
// Address: "127.0.0.1:2181",
// Username: "user1",
// Password: "pwd1",
//},
"shanghai_reg2": {
Protocol: "mock",
TimeoutStr: "2s",
Group: "shanghai_idc",
Address: "127.0.0.2:2181",
Username: "user1",
Password: "pwd1",
},
"hangzhou_reg1": {
Protocol: "mock",
TimeoutStr: "2s",
Group: "hangzhou_idc",
Address: "127.0.0.3:2181",
Username: "user1",
Password: "pwd1",
},
"hangzhou_reg2": {
Protocol: "mock",
TimeoutStr: "2s",
Group: "hangzhou_idc",
Address: "127.0.0.4:2181",
Username: "user1",
Password: "pwd1",
},
},
References: map[string]*ReferenceConfig{
"MockService": {
InterfaceName: "MockService",
Protocol: "mock",
Cluster: "failover",
Loadbalance: "random",
Retries: 3,
Group: "huadong_idc",
Version: "1.0.0",
Methods: []*MethodConfig{
{
InterfaceId: "MockService",
InterfaceName: "MockService",
Name: "GetUser",
Retries: 2,
Loadbalance: "random",
},
{InterfaceId: "MockService",
InterfaceName: "MockService",
Name: "GetUser1",
Retries: 2,
Loadbalance: "random",
},
},
},
},
}
c.SetFatherConfig(father)
c.fresh()
assert.Equal(t, "zookeeper", father.Type)
assert.Equal(t, "zookeeper", father.Type)
assert.Equal(t, "mock100", father.Registries["shanghai_reg1"].Protocol)
assert.Equal(t, int64(10), father.References["MockService"].Retries)
assert.Equal(t, int64(10), father.References["MockService"].Methods[0].Retries)
assert.Equal(t, &[]bool{false}[0], father.Check)
assert.Equal(t, "dubbo", father.ApplicationConfig.Name)
}
func Test_refreshWithPrefix(t *testing.T) {
//Id string `required:"true" yaml:"id" json:"id,omitempty"`
//Type string `required:"true" yaml:"type" json:"type,omitempty"`
//TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty"` // unit: second
//Group string `yaml:"group" json:"group,omitempty"`
////for registry
//Address string `yaml:"address" json:"address,omitempty"`
//Username string `yaml:"username" json:"address,omitempty"`
//Password string `yaml:"password" json:"address,omitempty"`
c := &baseConfig{}
func Test_refreshProvider(t *testing.T) {
c := &BaseConfig{}
mockMap := map[string]string{}
mockMap["dubbo.customRegistry.type"] = "zookeeper"
mockMap["dubbo.customRegistry.timeout"] = "3s"
mockMap["dubbo.customRegistry.group"] = "hangzhou"
mockMap["dubbo.customRegistry.address"] = "zookeeper://172.0.0.1:2181"
mockMap["dubbo.customRegistry.username"] = "admin"
mockMap["dubbo.customRegistry.password"] = "admin"
mockMap["dubbo.registries.shanghai_reg1.protocol"] = "mock100"
mockMap["dubbo.service.MockService.MockService.retries"] = "10"
mockMap["dubbo.MockService.MockService.GetUser.retries"] = "10"
mockMap["dubbo.consumer.check"] = "false"
mockMap["dubbo.application.name"] = "dubbo"
mockMap["dubbo.protocols.jsonrpc1.name"] = "jsonrpc"
mockMap["dubbo.protocols.jsonrpc1.ip"] = "127.0.0.1"
mockMap["dubbo.protocols.jsonrpc1.port"] = "20001"
config.GetEnvInstance().UpdateExternalConfigMap(mockMap)
father := &RegistryConfig{Type: "1111"}
c.SetPrefix("dubbo.customRegistry")
father := &ProviderConfig{
ApplicationConfig: &ApplicationConfig{
Organization: "dubbo_org",
Name: "dubbo",
Module: "module",
Version: "2.6.0",
Owner: "dubbo",
Environment: "test"},
Registries: map[string]*RegistryConfig{
//"shanghai_reg1": {
// id: "shanghai_reg1",
// Protocol: "mock",
// TimeoutStr: "2s",
// Group: "shanghai_idc",
// Address: "127.0.0.1:2181",
// Username: "user1",
// Password: "pwd1",
//},
"shanghai_reg2": {
Protocol: "mock",
TimeoutStr: "2s",
Group: "shanghai_idc",
Address: "127.0.0.2:2181",
Username: "user1",
Password: "pwd1",
},
"hangzhou_reg1": {
Protocol: "mock",
TimeoutStr: "2s",
Group: "hangzhou_idc",
Address: "127.0.0.3:2181",
Username: "user1",
Password: "pwd1",
},
"hangzhou_reg2": {
Protocol: "mock",
TimeoutStr: "2s",
Group: "hangzhou_idc",
Address: "127.0.0.4:2181",
Username: "user1",
Password: "pwd1",
},
},
Services: map[string]*ServiceConfig{
"MockService": {
InterfaceName: "MockService",
Protocol: "mock",
Cluster: "failover",
Loadbalance: "random",
Retries: 3,
Group: "huadong_idc",
Version: "1.0.0",
Methods: []*MethodConfig{
{
InterfaceId: "MockService",
InterfaceName: "MockService",
Name: "GetUser",
Retries: 2,
Loadbalance: "random",
},
{InterfaceId: "MockService",
InterfaceName: "MockService",
Name: "GetUser1",
Retries: 2,
Loadbalance: "random",
},
},
},
},
}
c.SetFatherConfig(father)
c.fresh()
assert.Equal(t, "zookeeper", father.Type)
assert.Equal(t, "zookeeper", father.Type)
assert.Equal(t, "mock100", father.Registries["shanghai_reg1"].Protocol)
assert.Equal(t, int64(10), father.Services["MockService"].Retries)
assert.Equal(t, int64(10), father.Services["MockService"].Methods[0].Retries)
assert.Equal(t, "dubbo", father.ApplicationConfig.Name)
assert.Equal(t, "20001", father.Protocols["jsonrpc1"].Port)
}
func Test_startConfigCenter(t *testing.T) {
extension.SetConfigCenterFactory("mock", func() config_center.DynamicConfigurationFactory {
return &config_center.MockDynamicConfigurationFactory{}
})
c := &BaseConfig{ConfigCenterConfig: &ConfigCenterConfig{
Protocol: "mock",
Address: "172.0.0.1",
Group: "dubbo",
ConfigFile: "mockDubbo.properties",
}}
err := c.startConfigCenter(context.Background())
assert.NoError(t, err)
b, v := config.GetEnvInstance().Configuration().Front().Value.(*config.InmemoryConfiguration).GetProperty("dubbo.application.organization")
assert.True(t, b)
assert.Equal(t, "ikurento.com", v)
}
func Test_initializeStruct(t *testing.T) {
consumerConfig := &ConsumerConfig{}
tp := reflect.TypeOf(ConsumerConfig{})
v := reflect.New(tp)
initializeStruct(tp, v.Elem())
fmt.Println(reflect.ValueOf(consumerConfig).Elem().Type().String())
fmt.Println(v.Elem().Type().String())
reflect.ValueOf(consumerConfig).Elem().Set(v.Elem())
assert.Condition(t, func() (success bool) {
return consumerConfig.ApplicationConfig != nil
})
assert.Condition(t, func() (success bool) {
return consumerConfig.Registries != nil
})
assert.Condition(t, func() (success bool) {
return consumerConfig.References != nil
})
}
......@@ -30,7 +30,6 @@ type ConfigCenterConfig struct {
Group string `default:"dubbo" yaml:"group" json:"group,omitempty"`
Username string `yaml:"username" json:"username,omitempty"`
Password string `yaml:"password" json:"password,omitempty"`
Check *bool `yaml:"check" json:"check,omitempty"`
ConfigFile string `default:"dubbo.properties" yaml:"config_file" json:"config_file,omitempty"`
TimeoutStr string `yaml:"timeout" json:"timeout,omitempty"`
timeout time.Duration
......
......@@ -19,19 +19,11 @@ package config
import (
"fmt"
"io/ioutil"
"log"
"os"
"path"
"strings"
"time"
)
import (
perrors "github.com/pkg/errors"
"gopkg.in/yaml.v2"
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
......@@ -47,14 +39,12 @@ var (
// loaded comsumer & provider config from xxx.yml, and log config from xxx.xml
// Namely: dubbo.comsumer.xml & dubbo.provider.xml in java dubbo
func init() {
var (
confConFile, confProFile string
)
confConFile = os.Getenv(constant.CONF_CONSUMER_FILE_PATH)
confProFile = os.Getenv(constant.CONF_PROVIDER_FILE_PATH)
if errCon := consumerInit(confConFile); errCon != nil {
log.Printf("[consumerInit] %#v", errCon)
consumerConfig = nil
......@@ -65,148 +55,9 @@ func init() {
}
}
func consumerInit(confConFile string) error {
if confConFile == "" {
return perrors.Errorf("application configure(consumer) file name is nil")
}
if path.Ext(confConFile) != ".yml" {
return perrors.Errorf("application configure file name{%v} suffix must be .yml", confConFile)
}
confFileStream, err := ioutil.ReadFile(confConFile)
if err != nil {
return perrors.Errorf("ioutil.ReadFile(file:%s) = error:%v", confConFile, perrors.WithStack(err))
}
consumerConfig = &ConsumerConfig{}
err = yaml.Unmarshal(confFileStream, consumerConfig)
if err != nil {
return perrors.Errorf("yaml.Unmarshal() = error:%v", perrors.WithStack(err))
}
if consumerConfig.RequestTimeout, err = time.ParseDuration(consumerConfig.Request_Timeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(Request_Timeout{%#v})", consumerConfig.Request_Timeout)
}
if consumerConfig.ConnectTimeout, err = time.ParseDuration(consumerConfig.Connect_Timeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(Connect_Timeout{%#v})", consumerConfig.Connect_Timeout)
}
logger.Debugf("consumer config{%#v}\n", consumerConfig)
return nil
}
func providerInit(confProFile string) error {
if confProFile == "" {
return perrors.Errorf("application configure(provider) file name is nil")
}
if path.Ext(confProFile) != ".yml" {
return perrors.Errorf("application configure file name{%v} suffix must be .yml", confProFile)
}
confFileStream, err := ioutil.ReadFile(confProFile)
if err != nil {
return perrors.Errorf("ioutil.ReadFile(file:%s) = error:%v", confProFile, perrors.WithStack(err))
}
providerConfig = &ProviderConfig{}
err = yaml.Unmarshal(confFileStream, providerConfig)
if err != nil {
return perrors.Errorf("yaml.Unmarshal() = error:%v", perrors.WithStack(err))
}
logger.Debugf("provider config{%#v}\n", providerConfig)
return nil
}
/////////////////////////
// consumerConfig
/////////////////////////
type ConsumerConfig struct {
baseConfig
Filter string `yaml:"filter" json:"filter,omitempty"`
// application
ApplicationConfig ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"`
// client
Connect_Timeout string `default:"100ms" yaml:"connect_timeout" json:"connect_timeout,omitempty"`
ConnectTimeout time.Duration
Request_Timeout string `yaml:"request_timeout" default:"5s" json:"request_timeout,omitempty"`
RequestTimeout time.Duration
ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty"`
Check *bool `yaml:"check" json:"check,omitempty"`
Registries []RegistryConfig `yaml:"registries" json:"registries,omitempty"`
References []ReferenceConfig `yaml:"references" json:"references,omitempty"`
ConfigCenter ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"`
ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty"`
}
type ReferenceConfigTmp struct {
Service string `required:"true" yaml:"service" json:"service,omitempty"`
Registries []RegistryConfig `required:"true" yaml:"registries" json:"registries,omitempty"`
URLs []map[string]string
}
func SetConsumerConfig(c ConsumerConfig) {
consumerConfig = &c
}
func GetConsumerConfig() ConsumerConfig {
if consumerConfig == nil {
logger.Warnf("consumerConfig is nil!")
return ConsumerConfig{}
}
return *consumerConfig
}
/////////////////////////
// providerConfig
/////////////////////////
type ProviderConfig struct {
Filter string `yaml:"filter" json:"filter,omitempty"`
ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty"`
ApplicationConfig ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"`
Registries []RegistryConfig `yaml:"registries" json:"registries,omitempty"`
Services []ServiceConfig `yaml:"services" json:"services,omitempty"`
Protocols []ProtocolConfig `yaml:"protocols" json:"protocols,omitempty"`
ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty"`
}
func SetProviderConfig(p ProviderConfig) {
providerConfig = &p
}
func GetProviderConfig() ProviderConfig {
if providerConfig == nil {
logger.Warnf("providerConfig is nil!")
return ProviderConfig{}
}
return *providerConfig
}
type ProtocolConfig struct {
Name string `required:"true" yaml:"name" json:"name,omitempty"`
Ip string `required:"true" yaml:"ip" json:"ip,omitempty"`
Port string `required:"true" yaml:"port" json:"port,omitempty"`
ContextPath string `required:"true" yaml:"contextPath" json:"contextPath,omitempty"`
}
func loadProtocol(protocolsIds string, protocols []ProtocolConfig) []ProtocolConfig {
returnProtocols := []ProtocolConfig{}
for _, v := range strings.Split(protocolsIds, ",") {
for _, prot := range protocols {
if v == prot.Name {
returnProtocols = append(returnProtocols, prot)
}
}
}
return returnProtocols
}
// Dubbo Init
func Load() (map[string]*ReferenceConfig, map[string]*ServiceConfig) {
var refMap map[string]*ReferenceConfig
var srvMap map[string]*ServiceConfig
......@@ -214,20 +65,20 @@ func Load() (map[string]*ReferenceConfig, map[string]*ServiceConfig) {
if consumerConfig == nil {
logger.Warnf("consumerConfig is nil!")
} else {
if err := configCenterRefreshConsumer(); err != nil {
logger.Errorf("[consumer config center refresh] %#v", err)
}
refMap = make(map[string]*ReferenceConfig)
length := len(consumerConfig.References)
for index := 0; index < length; index++ {
con := &consumerConfig.References[index]
rpcService := GetConsumerService(con.InterfaceName)
for _, ref := range consumerConfig.References {
rpcService := GetConsumerService(ref.InterfaceName)
if rpcService == nil {
logger.Warnf("%s is not exsist!", con.InterfaceName)
logger.Warnf("%s is not exsist!", ref.InterfaceName)
continue
}
con.Refer()
con.Implement(rpcService)
refMap[con.InterfaceName] = con
ref.Refer()
ref.Implement(rpcService)
refMap[ref.InterfaceName] = ref
}
//wait for invoker is available, if wait over default 3s, then panic
var count int
checkok := true
......@@ -263,20 +114,21 @@ func Load() (map[string]*ReferenceConfig, map[string]*ServiceConfig) {
if providerConfig == nil {
logger.Warnf("providerConfig is nil!")
} else {
if err := configCenterRefreshProvider(); err != nil {
logger.Errorf("[provider config center refresh] %#v", err)
}
srvMap = make(map[string]*ServiceConfig)
length := len(providerConfig.Services)
for index := 0; index < length; index++ {
pro := &providerConfig.Services[index]
rpcService := GetProviderService(pro.InterfaceName)
for _, svs := range providerConfig.Services {
rpcService := GetProviderService(svs.InterfaceName)
if rpcService == nil {
logger.Warnf("%s is not exsist!", pro.InterfaceName)
logger.Warnf("%s is not exsist!", svs.InterfaceName)
continue
}
pro.Implement(rpcService)
if err := pro.Export(); err != nil {
panic(fmt.Sprintf("service %s export failed! ", pro.InterfaceName))
svs.Implement(rpcService)
if err := svs.Export(); err != nil {
panic(fmt.Sprintf("service %s export failed! ", svs.InterfaceName))
}
srvMap[pro.InterfaceName] = pro
srvMap[svs.InterfaceName] = svs
}
}
......
......@@ -31,6 +31,7 @@ import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/proxy/proxy_factory"
"github.com/apache/dubbo-go/config_center"
)
func TestConfigLoader(t *testing.T) {
......@@ -65,7 +66,6 @@ func TestLoad(t *testing.T) {
extension.SetProtocol("registry", GetProtocol)
extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster)
extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory)
consumerConfig.References[0].Registries = []ConfigRegistry{"shanghai_reg1"}
refConfigs, svcConfigs := Load()
assert.NotEqual(t, 0, len(refConfigs))
......@@ -77,3 +77,35 @@ func TestLoad(t *testing.T) {
consumerConfig = nil
providerConfig = nil
}
func TestConfigLoaderWithConfigCenter(t *testing.T) {
extension.SetConfigCenterFactory("mock", func() config_center.DynamicConfigurationFactory {
return &config_center.MockDynamicConfigurationFactory{}
})
conPath, err := filepath.Abs("./testdata/consumer_config_with_configcenter.yml")
assert.NoError(t, err)
proPath, err := filepath.Abs("./testdata/provider_config.yml")
assert.NoError(t, err)
assert.Nil(t, consumerConfig)
assert.Equal(t, ConsumerConfig{}, GetConsumerConfig())
assert.Nil(t, providerConfig)
assert.Equal(t, ProviderConfig{}, GetProviderConfig())
err = consumerInit(conPath)
configCenterRefreshConsumer()
assert.NoError(t, err)
err = providerInit(proPath)
configCenterRefreshProvider()
assert.NoError(t, err)
assert.NotNil(t, consumerConfig)
assert.NotEqual(t, ConsumerConfig{}, GetConsumerConfig())
assert.NotNil(t, providerConfig)
assert.NotEqual(t, ProviderConfig{}, GetProviderConfig())
assert.Equal(t, "BDTService", consumerConfig.ApplicationConfig.Name)
assert.Equal(t, "127.0.0.1:2181", consumerConfig.Registries["hangzhouzk"].Address)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import (
"context"
"io/ioutil"
"path"
"time"
)
import (
perrors "github.com/pkg/errors"
"gopkg.in/yaml.v2"
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
)
/////////////////////////
// consumerConfig
/////////////////////////
type ConsumerConfig struct {
BaseConfig `yaml:",inline"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
// application
ApplicationConfig *ApplicationConfig `yaml:"application_config" json:"application_config,omitempty" property:"application_config"`
// client
Connect_Timeout string `default:"100ms" yaml:"connect_timeout" json:"connect_timeout,omitempty" property:"connect_timeout"`
ConnectTimeout time.Duration
Request_Timeout string `yaml:"request_timeout" default:"5s" json:"request_timeout,omitempty" property:"request_timeout"`
RequestTimeout time.Duration
ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty" property:"proxy_factory"`
Check *bool `yaml:"check" json:"check,omitempty" property:"check"`
Registries map[string]*RegistryConfig `yaml:"registries" json:"registries,omitempty" property:"registries"`
References map[string]*ReferenceConfig `yaml:"references" json:"references,omitempty" property:"references"`
ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf"`
}
func (*ConsumerConfig) Prefix() string {
return constant.ConsumerConfigPrefix
}
func SetConsumerConfig(c ConsumerConfig) {
consumerConfig = &c
}
func GetConsumerConfig() ConsumerConfig {
if consumerConfig == nil {
logger.Warnf("consumerConfig is nil!")
return ConsumerConfig{}
}
return *consumerConfig
}
func consumerInit(confConFile string) error {
if confConFile == "" {
return perrors.Errorf("application configure(consumer) file name is nil")
}
if path.Ext(confConFile) != ".yml" {
return perrors.Errorf("application configure file name{%v} suffix must be .yml", confConFile)
}
confFileStream, err := ioutil.ReadFile(confConFile)
if err != nil {
return perrors.Errorf("ioutil.ReadFile(file:%s) = error:%v", confConFile, perrors.WithStack(err))
}
consumerConfig = &ConsumerConfig{}
err = yaml.Unmarshal(confFileStream, consumerConfig)
if err != nil {
return perrors.Errorf("yaml.Unmarshal() = error:%v", perrors.WithStack(err))
}
//set method interfaceId & interfaceName
for k, v := range consumerConfig.References {
//set id for reference
for _, n := range consumerConfig.References[k].Methods {
n.InterfaceName = v.InterfaceName
n.InterfaceId = k
}
}
if consumerConfig.Request_Timeout != "" {
if consumerConfig.RequestTimeout, err = time.ParseDuration(consumerConfig.Request_Timeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(Request_Timeout{%#v})", consumerConfig.Request_Timeout)
}
}
if consumerConfig.Connect_Timeout != "" {
if consumerConfig.ConnectTimeout, err = time.ParseDuration(consumerConfig.Connect_Timeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(Connect_Timeout{%#v})", consumerConfig.Connect_Timeout)
}
}
logger.Debugf("consumer config{%#v}\n", consumerConfig)
return nil
}
func configCenterRefreshConsumer() error {
//fresh it
var err error
if consumerConfig.ConfigCenterConfig != nil {
consumerConfig.SetFatherConfig(consumerConfig)
if err := consumerConfig.startConfigCenter(context.Background()); err != nil {
return perrors.Errorf("start config center error , error message is {%v}", perrors.WithStack(err))
}
consumerConfig.fresh()
}
if consumerConfig.Request_Timeout != "" {
if consumerConfig.RequestTimeout, err = time.ParseDuration(consumerConfig.Request_Timeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(Request_Timeout{%#v})", consumerConfig.Request_Timeout)
}
}
if consumerConfig.Connect_Timeout != "" {
if consumerConfig.ConnectTimeout, err = time.ParseDuration(consumerConfig.Connect_Timeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(Connect_Timeout{%#v})", consumerConfig.Connect_Timeout)
}
}
return err
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import "github.com/apache/dubbo-go/common/constant"
type MethodConfig struct {
InterfaceId string
InterfaceName string
Name string `yaml:"name" json:"name,omitempty" property:"name"`
Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Weight int64 `yaml:"weight" json:"weight,omitempty" property:"weight"`
}
func (c *MethodConfig) Prefix() string {
if c.InterfaceId != "" {
return constant.DUBBO + "." + c.InterfaceName + "." + c.InterfaceId + "." + c.Name + "."
} else {
return constant.DUBBO + "." + c.InterfaceName + "." + c.Name + "."
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import (
"github.com/apache/dubbo-go/common/constant"
"strings"
)
type ProtocolConfig struct {
Name string `required:"true" yaml:"name" json:"name,omitempty" property:"name"`
Ip string `required:"true" yaml:"ip" json:"ip,omitempty" property:"ip"`
Port string `required:"true" yaml:"port" json:"port,omitempty" property:"port"`
}
func (c *ProtocolConfig) Prefix() string {
return constant.ProtocolConfigPrefix
}
func loadProtocol(protocolsIds string, protocols map[string]*ProtocolConfig) []*ProtocolConfig {
returnProtocols := []*ProtocolConfig{}
for _, v := range strings.Split(protocolsIds, ",") {
for _, prot := range protocols {
if v == prot.Name {
returnProtocols = append(returnProtocols, prot)
}
}
}
return returnProtocols
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import (
"context"
"github.com/apache/dubbo-go/common/constant"
"io/ioutil"
"path"
)
import (
perrors "github.com/pkg/errors"
"gopkg.in/yaml.v2"
)
import (
"github.com/apache/dubbo-go/common/logger"
)
/////////////////////////
// providerConfig
/////////////////////////
type ProviderConfig struct {
BaseConfig `yaml:",inline"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty" property:"proxy_factory"`
ApplicationConfig *ApplicationConfig `yaml:"application_config" json:"application_config,omitempty" property:"application_config"`
Registries map[string]*RegistryConfig `yaml:"registries" json:"registries,omitempty" property:"registries"`
Services map[string]*ServiceConfig `yaml:"services" json:"services,omitempty" property:"services"`
Protocols map[string]*ProtocolConfig `yaml:"protocols" json:"protocols,omitempty" property:"protocols"`
ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf" `
}
func (*ProviderConfig) Prefix() string {
return constant.ProviderConfigPrefix
}
func SetProviderConfig(p ProviderConfig) {
providerConfig = &p
}
func GetProviderConfig() ProviderConfig {
if providerConfig == nil {
logger.Warnf("providerConfig is nil!")
return ProviderConfig{}
}
return *providerConfig
}
func providerInit(confProFile string) error {
if confProFile == "" {
return perrors.Errorf("application configure(provider) file name is nil")
}
if path.Ext(confProFile) != ".yml" {
return perrors.Errorf("application configure file name{%v} suffix must be .yml", confProFile)
}
confFileStream, err := ioutil.ReadFile(confProFile)
if err != nil {
return perrors.Errorf("ioutil.ReadFile(file:%s) = error:%v", confProFile, perrors.WithStack(err))
}
providerConfig = &ProviderConfig{}
err = yaml.Unmarshal(confFileStream, providerConfig)
if err != nil {
return perrors.Errorf("yaml.Unmarshal() = error:%v", perrors.WithStack(err))
}
//set method interfaceId & interfaceName
for k, v := range providerConfig.Services {
//set id for reference
for _, n := range providerConfig.Services[k].Methods {
n.InterfaceName = v.InterfaceName
n.InterfaceId = k
}
}
logger.Debugf("provider config{%#v}\n", providerConfig)
return nil
}
func configCenterRefreshProvider() error {
//fresh it
if providerConfig.ConfigCenterConfig != nil {
providerConfig.fatherConfig = providerConfig
if err := providerConfig.startConfigCenter(context.Background()); err != nil {
return perrors.Errorf("start config center error , error message is {%v}", perrors.WithStack(err))
}
providerConfig.fresh()
}
return nil
}
......@@ -38,25 +38,25 @@ import (
type ReferenceConfig struct {
context context.Context
pxy *proxy.Proxy
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty"`
Check *bool `yaml:"check" json:"check,omitempty"`
Url string `yaml:"url" json:"url,omitempty"`
Filter string `yaml:"filter" json:"filter,omitempty"`
Protocol string `yaml:"protocol" json:"protocol,omitempty"`
Registries []ConfigRegistry `required:"true" yaml:"registries" json:"registries,omitempty"`
Cluster string `yaml:"cluster" json:"cluster,omitempty"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty"`
Retries int64 `yaml:"retries" json:"retries,omitempty"`
Group string `yaml:"group" json:"group,omitempty"`
Version string `yaml:"version" json:"version,omitempty"`
Methods []struct {
Name string `yaml:"name" json:"name,omitempty"`
Retries int64 `yaml:"retries" json:"retries,omitempty"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty"`
} `yaml:"methods" json:"methods,omitempty"`
async bool `yaml:"async" json:"async,omitempty"`
invoker protocol.Invoker
urls []*common.URL
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Check *bool `yaml:"check" json:"check,omitempty" property:"check"`
Url string `yaml:"url" json:"url,omitempty" property:"url"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
//Registries []ConfigRegistry `required:"true" yaml:"registries" json:"registries,omitempty" property:"registries"`
Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version"`
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
async bool `yaml:"async" json:"async,omitempty" property:"async"`
invoker protocol.Invoker
urls []*common.URL
}
func (c *ReferenceConfig) Prefix() string {
return constant.ReferenceConfigPrefix + c.InterfaceName + "."
}
type ConfigRegistry string
......@@ -102,7 +102,7 @@ func (refconfig *ReferenceConfig) Refer() {
}
} else {
//2. assemble SubURL from register center's configuration模式
refconfig.urls = loadRegistries(refconfig.Registries, consumerConfig.Registries, common.CONSUMER)
refconfig.urls = loadRegistries(consumerConfig.Registries, common.CONSUMER)
//set url to regUrls
for _, regUrl := range refconfig.urls {
......
......@@ -37,44 +37,40 @@ var regProtocol protocol.Protocol
func doInit() {
consumerConfig = &ConsumerConfig{
ApplicationConfig: ApplicationConfig{
ApplicationConfig: &ApplicationConfig{
Organization: "dubbo_org",
Name: "dubbo",
Module: "module",
Version: "2.6.0",
Owner: "dubbo",
Environment: "test"},
Registries: []RegistryConfig{
{
Id: "shanghai_reg1",
Type: "mock",
Registries: map[string]*RegistryConfig{
"shanghai_reg1": {
Protocol: "mock",
TimeoutStr: "2s",
Group: "shanghai_idc",
Address: "127.0.0.1:2181",
Username: "user1",
Password: "pwd1",
},
{
Id: "shanghai_reg2",
Type: "mock",
"shanghai_reg2": {
Protocol: "mock",
TimeoutStr: "2s",
Group: "shanghai_idc",
Address: "127.0.0.2:2181",
Username: "user1",
Password: "pwd1",
},
{
Id: "hangzhou_reg1",
Type: "mock",
"hangzhou_reg1": {
Protocol: "mock",
TimeoutStr: "2s",
Group: "hangzhou_idc",
Address: "127.0.0.3:2181",
Username: "user1",
Password: "pwd1",
},
{
Id: "hangzhou_reg2",
Type: "mock",
"hangzhou_reg2": {
Protocol: "mock",
TimeoutStr: "2s",
Group: "hangzhou_idc",
Address: "127.0.0.4:2181",
......@@ -82,21 +78,16 @@ func doInit() {
Password: "pwd1",
},
},
References: []ReferenceConfig{
{
References: map[string]*ReferenceConfig{
"MockService": {
InterfaceName: "MockService",
Protocol: "mock",
Registries: []ConfigRegistry{"shanghai_reg1", "shanghai_reg2", "hangzhou_reg1", "hangzhou_reg2"},
Cluster: "failover",
Loadbalance: "random",
Retries: 3,
Group: "huadong_idc",
Version: "1.0.0",
Methods: []struct {
Name string `yaml:"name" json:"name,omitempty"`
Retries int64 `yaml:"retries" json:"retries,omitempty"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty"`
}{
Methods: []*MethodConfig{
{
Name: "GetUser",
Retries: 2,
......@@ -130,7 +121,6 @@ func Test_Refer(t *testing.T) {
doInit()
extension.SetProtocol("registry", GetProtocol)
extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster)
consumerConfig.References[0].Registries = []ConfigRegistry{"shanghai_reg1"}
for _, reference := range consumerConfig.References {
reference.Refer()
......@@ -142,7 +132,8 @@ func Test_Refer(t *testing.T) {
func Test_ReferP2P(t *testing.T) {
doInit()
extension.SetProtocol("dubbo", GetProtocol)
consumerConfig.References[0].Url = "dubbo://127.0.0.1:20000"
m := consumerConfig.References["MockService"]
m.Url = "dubbo://127.0.0.1:20000"
for _, reference := range consumerConfig.References {
reference.Refer()
......@@ -154,7 +145,8 @@ func Test_ReferP2P(t *testing.T) {
func Test_ReferMultiP2P(t *testing.T) {
doInit()
extension.SetProtocol("dubbo", GetProtocol)
consumerConfig.References[0].Url = "dubbo://127.0.0.1:20000;dubbo://127.0.0.2:20000"
m := consumerConfig.References["MockService"]
m.Url = "dubbo://127.0.0.1:20000;dubbo://127.0.0.2:20000"
for _, reference := range consumerConfig.References {
reference.Refer()
......@@ -168,7 +160,8 @@ func Test_ReferMultiP2PWithReg(t *testing.T) {
doInit()
extension.SetProtocol("dubbo", GetProtocol)
extension.SetProtocol("registry", GetProtocol)
consumerConfig.References[0].Url = "dubbo://127.0.0.1:20000;registry://127.0.0.2:20000"
m := consumerConfig.References["MockService"]
m.Url = "dubbo://127.0.0.1:20000;registry://127.0.0.2:20000"
for _, reference := range consumerConfig.References {
reference.Refer()
......
......@@ -30,8 +30,7 @@ import (
)
type RegistryConfig struct {
Id string `required:"true" yaml:"id" json:"id,omitempty"`
Type string `required:"true" yaml:"type" json:"type,omitempty" property:"type"`
Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty" property:"timeout"` // unit: second
Group string `yaml:"group" json:"group,omitempty" property:"group"`
//for registry
......@@ -40,30 +39,30 @@ type RegistryConfig struct {
Password string `yaml:"password" json:"address,omitempty" property:"password"`
}
func loadRegistries(registriesIds []ConfigRegistry, registries []RegistryConfig, roleType common.RoleType) []*common.URL {
var urls []*common.URL
for _, registry := range registriesIds {
for _, registryConf := range registries {
if string(registry) == registryConf.Id {
func (*RegistryConfig) Prefix() string {
return constant.RegistryConfigPrefix
}
url, err := common.NewURL(
context.TODO(),
constant.REGISTRY_PROTOCOL+"://"+registryConf.Address,
common.WithParams(registryConf.getUrlMap(roleType)),
common.WithUsername(registryConf.Username),
common.WithPassword(registryConf.Password),
)
func loadRegistries(registries map[string]*RegistryConfig, roleType common.RoleType) []*common.URL {
var urls []*common.URL
for k, registryConf := range registries {
if err != nil {
logger.Errorf("The registry id:%s url is invalid ,and will skip the registry, error: %#v", registryConf.Id, err)
} else {
urls = append(urls, &url)
}
url, err := common.NewURL(
context.TODO(),
constant.REGISTRY_PROTOCOL+"://"+registryConf.Address,
common.WithParams(registryConf.getUrlMap(roleType)),
common.WithUsername(registryConf.Username),
common.WithPassword(registryConf.Password),
)
}
if err != nil {
logger.Errorf("The registry id:%s url is invalid ,and will skip the registry, error: %#v", k, err)
} else {
urls = append(urls, &url)
}
}
return urls
}
......@@ -71,7 +70,7 @@ func (regconfig *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values
urlMap := url.Values{}
urlMap.Set(constant.GROUP_KEY, regconfig.Group)
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(int(roleType)))
urlMap.Set(constant.REGISTRY_KEY, regconfig.Type)
urlMap.Set(constant.REGISTRY_KEY, regconfig.Protocol)
urlMap.Set(constant.REGISTRY_TIMEOUT_KEY, regconfig.TimeoutStr)
return urlMap
......
......@@ -41,22 +41,17 @@ import (
type ServiceConfig struct {
context context.Context
Filter string `yaml:"filter" json:"filter,omitempty"`
Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"` //multi protocol support, split by ','
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty"`
Registries []ConfigRegistry `required:"true" yaml:"registries" json:"registries,omitempty"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty"`
Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty"`
Group string `yaml:"group" json:"group,omitempty"`
Version string `yaml:"version" json:"version,omitempty"`
Methods []struct {
Name string `yaml:"name" json:"name,omitempty"`
Retries int64 `yaml:"retries" json:"retries,omitempty"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty"`
Weight int64 `yaml:"weight" json:"weight,omitempty"`
} `yaml:"methods" json:"methods,omitempty"`
Warmup string `yaml:"warmup" json:"warmup,omitempty"`
Retries int64 `yaml:"retries" json:"retries,omitempty"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` //multi protocol support, split by ','
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Registries []ConfigRegistry `required:"true" yaml:"registries" json:"registries,omitempty" property:"registries"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version" `
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"`
unexported *atomic.Bool
exported *atomic.Bool
rpcService common.RPCService
......@@ -65,6 +60,10 @@ type ServiceConfig struct {
cacheMutex sync.Mutex
}
func (c *ServiceConfig) Prefix() string {
return constant.ServiceConfigPrefix + c.InterfaceName + "."
}
func NewServiceConfig() *ServiceConfig {
return &ServiceConfig{
unexported: atomic.NewBool(false),
......@@ -87,7 +86,7 @@ func (srvconfig *ServiceConfig) Export() error {
return nil
}
regUrls := loadRegistries(srvconfig.Registries, providerConfig.Registries, common.PROVIDER)
regUrls := loadRegistries(providerConfig.Registries, common.PROVIDER)
urlMap := srvconfig.getUrlMap()
for _, proto := range loadProtocol(srvconfig.Protocol, providerConfig.Protocols) {
......
......@@ -31,44 +31,40 @@ import (
func doinit() {
providerConfig = &ProviderConfig{
ApplicationConfig: ApplicationConfig{
ApplicationConfig: &ApplicationConfig{
Organization: "dubbo_org",
Name: "dubbo",
Module: "module",
Version: "2.6.0",
Owner: "dubbo",
Environment: "test"},
Registries: []RegistryConfig{
{
Id: "shanghai_reg1",
Type: "mock",
Registries: map[string]*RegistryConfig{
"shanghai_reg1": {
Protocol: "mock",
TimeoutStr: "2s",
Group: "shanghai_idc",
Address: "127.0.0.1:2181",
Username: "user1",
Password: "pwd1",
},
{
Id: "shanghai_reg2",
Type: "mock",
"shanghai_reg2": {
Protocol: "mock",
TimeoutStr: "2s",
Group: "shanghai_idc",
Address: "127.0.0.2:2181",
Username: "user1",
Password: "pwd1",
},
{
Id: "hangzhou_reg1",
Type: "mock",
"hangzhou_reg1": {
Protocol: "mock",
TimeoutStr: "2s",
Group: "hangzhou_idc",
Address: "127.0.0.3:2181",
Username: "user1",
Password: "pwd1",
},
{
Id: "hangzhou_reg2",
Type: "mock",
"hangzhou_reg2": {
Protocol: "mock",
TimeoutStr: "2s",
Group: "hangzhou_idc",
Address: "127.0.0.4:2181",
......@@ -76,8 +72,8 @@ func doinit() {
Password: "pwd1",
},
},
Services: []ServiceConfig{
{
Services: map[string]*ServiceConfig{
"MockService": {
InterfaceName: "MockService",
Protocol: "mock",
Registries: []ConfigRegistry{"shanghai_reg1", "shanghai_reg2", "hangzhou_reg1", "hangzhou_reg2"},
......@@ -86,12 +82,7 @@ func doinit() {
Retries: 3,
Group: "huadong_idc",
Version: "1.0.0",
Methods: []struct {
Name string `yaml:"name" json:"name,omitempty"`
Retries int64 `yaml:"retries" json:"retries,omitempty"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty"`
Weight int64 `yaml:"weight" json:"weight,omitempty"`
}{
Methods: []*MethodConfig{
{
Name: "GetUser",
Retries: 2,
......@@ -107,12 +98,11 @@ func doinit() {
},
},
},
Protocols: []ProtocolConfig{
{
Name: "mock",
Ip: "127.0.0.1",
Port: "20000",
ContextPath: "/xxx",
Protocols: map[string]*ProtocolConfig{
"mock": {
Name: "mock",
Ip: "127.0.0.1",
Port: "20000",
},
},
}
......
......@@ -17,14 +17,16 @@ application_config:
environment : "dev"
registries :
- id: "hangzhouzk"
"hangzhouzk":
id: "hangzhouzk"
type: "zookeeper"
timeout : "3s"
address: "127.0.0.1:2181"
username: ""
password: ""
- id: "shanghaizk"
"shanghaizk":
id: "shanghaizk"
type: "zookeeper"
timeout : "3s"
address: "127.0.0.1:2182"
......@@ -32,9 +34,7 @@ registries :
password: ""
references:
- registries :
- "hangzhouzk"
- "shanghaizk"
"UserProvider":
filter: ""
protocol : "dubbo"
interface : "com.ikurento.user.UserProvider"
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment