diff --git a/common/url.go b/common/url.go index 9b0c6352ae781a4f6eacb36e3f727e2b7ac55232..9ea3e0b09179a576bd7aac3f4601e6f61c437f47 100644 --- a/common/url.go +++ b/common/url.go @@ -292,3 +292,52 @@ func (c URL) GetMethodParam(method string, key string, d string) string { } return r } + +// configuration > reference config >service config +// in this function we should merge the reference local url config into the service url from registry. +//TODO configuration merge, in the future , the configuration center's config should merge too. +func MergeUrl(serviceUrl URL, referenceUrl *URL) URL { + mergedUrl := serviceUrl + var methodConfigMergeFcn = []func(method string){} + //iterator the referenceUrl if serviceUrl not have the key ,merge in + + for k, v := range referenceUrl.Params { + if _, ok := mergedUrl.Params[k]; !ok { + mergedUrl.Params.Set(k, v[0]) + } + } + //loadBalance strategy config + if v := referenceUrl.Params.Get(constant.LOADBALANCE_KEY); v != "" { + mergedUrl.Params.Set(constant.LOADBALANCE_KEY, v) + } + methodConfigMergeFcn = append(methodConfigMergeFcn, func(method string) { + if v := referenceUrl.Params.Get(method + "." + constant.LOADBALANCE_KEY); v != "" { + mergedUrl.Params.Set(method+"."+constant.LOADBALANCE_KEY, v) + } + }) + + //cluster strategy config + if v := referenceUrl.Params.Get(constant.CLUSTER_KEY); v != "" { + mergedUrl.Params.Set(constant.CLUSTER_KEY, v) + } + methodConfigMergeFcn = append(methodConfigMergeFcn, func(method string) { + if v := referenceUrl.Params.Get(method + "." + constant.CLUSTER_KEY); v != "" { + mergedUrl.Params.Set(method+"."+constant.CLUSTER_KEY, v) + } + }) + + //remote timestamp + if v := serviceUrl.Params.Get(constant.TIMESTAMP_KEY); v != "" { + mergedUrl.Params.Set(constant.REMOTE_TIMESTAMP_KEY, v) + mergedUrl.Params.Set(constant.TIMESTAMP_KEY, referenceUrl.Params.Get(constant.TIMESTAMP_KEY)) + } + + //finally execute methodConfigMergeFcn + for _, method := range referenceUrl.Methods { + for _, fcn := range methodConfigMergeFcn { + fcn("methods." + method) + } + } + + return mergedUrl +} diff --git a/common/url_test.go b/common/url_test.go index cfac4703ef8c2c6443efbc0562c98add8acd5466..ff5463275ebc9e7b79f979d7632c4cac225cdd39 100644 --- a/common/url_test.go +++ b/common/url_test.go @@ -16,6 +16,7 @@ package common import ( "context" + "github.com/dubbo/go-for-apache-dubbo/common/constant" "net/url" "testing" ) @@ -133,3 +134,19 @@ func TestURL_GetMethodParam(t *testing.T) { v = u.GetMethodParam("GetValue", "timeout", "1s") assert.Equal(t, "1s", v) } + +func TestMergeUrl(t *testing.T) { + referenceUrlParams := url.Values{} + referenceUrlParams.Set(constant.CLUSTER_KEY, "random") + referenceUrlParams.Set("test3", "1") + serviceUrlParams := url.Values{} + serviceUrlParams.Set("test2", "1") + serviceUrlParams.Set(constant.CLUSTER_KEY, "roundrobin") + referenceUrl, _ := NewURL(context.TODO(), "mock1://127.0.0.1:1111", WithParams(referenceUrlParams)) + serviceUrl, _ := NewURL(context.TODO(), "mock2://127.0.0.1:20000", WithParams(serviceUrlParams)) + + mergedUrl := MergeUrl(serviceUrl, &referenceUrl) + assert.Equal(t, "random", mergedUrl.GetParam(constant.CLUSTER_KEY, "")) + assert.Equal(t, "1", mergedUrl.GetParam("test2", "")) + assert.Equal(t, "1", mergedUrl.GetParam("test3", "")) +} diff --git a/common/utils/strings.go b/common/utils/strings.go new file mode 100644 index 0000000000000000000000000000000000000000..1844479fb66e98062e1e3a18dabe6eba14f41d09 --- /dev/null +++ b/common/utils/strings.go @@ -0,0 +1,16 @@ +package utils + +import "regexp" + +func RegSplit(text string, delimeter string) []string { + reg := regexp.MustCompile(delimeter) + indexes := reg.FindAllStringIndex(text, -1) + laststart := 0 + result := make([]string, len(indexes)+1) + for i, element := range indexes { + result[i] = text[laststart:element[0]] + laststart = element[1] + } + result[len(indexes)] = text[laststart:len(text)] + return result +} diff --git a/common/utils/strings_test.go b/common/utils/strings_test.go new file mode 100644 index 0000000000000000000000000000000000000000..37db5700446be999a147a259a25e7c16b4805cbb --- /dev/null +++ b/common/utils/strings_test.go @@ -0,0 +1,14 @@ +package utils + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_RegSplit(t *testing.T) { + strings := RegSplit("dubbo://123.1.2.1;jsonrpc://127.0.0.1;registry://3.2.1.3?registry=zookeeper", "\\s*[;]+\\s*") + assert.Len(t, strings, 3) + assert.Equal(t, "dubbo://123.1.2.1", strings[0]) + assert.Equal(t, "jsonrpc://127.0.0.1", strings[1]) + assert.Equal(t, "registry://3.2.1.3?registry=zookeeper", strings[2]) +} diff --git a/config/reference_config.go b/config/reference_config.go index af57cc5114a744c417848500bfd25784880f33c7..2124cd44e4caf7b585d084841235e6044899dd05 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -16,6 +16,7 @@ package config import ( "context" + "fmt" "net/url" "strconv" "time" @@ -27,6 +28,7 @@ import ( "github.com/dubbo/go-for-apache-dubbo/common/constant" "github.com/dubbo/go-for-apache-dubbo/common/extension" "github.com/dubbo/go-for-apache-dubbo/common/proxy" + "github.com/dubbo/go-for-apache-dubbo/common/utils" "github.com/dubbo/go-for-apache-dubbo/protocol" ) @@ -35,6 +37,7 @@ type ReferenceConfig struct { pxy *proxy.Proxy InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty"` Check *bool `yaml:"check" json:"check,omitempty"` + Url string `yaml:"url" json:"url,omitempty"` Filter string `yaml:"filter" json:"filter,omitempty"` Protocol string `yaml:"protocol" json:"protocol,omitempty"` Registries []ConfigRegistry `required:"true" yaml:"registries" json:"registries,omitempty"` @@ -50,6 +53,7 @@ type ReferenceConfig struct { } `yaml:"methods" json:"methods,omitempty"` async bool `yaml:"async" json:"async,omitempty"` invoker protocol.Invoker + urls []*common.URL } type ConfigRegistry string @@ -68,27 +72,57 @@ func (refconfig *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) erro return nil } func (refconfig *ReferenceConfig) Refer() { - //1. user specified SubURL, could be peer-to-peer address, or register center's address. - - //2. assemble SubURL from register center's configuration妯″紡 - regUrls := loadRegistries(refconfig.Registries, consumerConfig.Registries, common.CONSUMER) url := common.NewURLWithOptions(refconfig.InterfaceName, common.WithProtocol(refconfig.Protocol), common.WithParams(refconfig.getUrlMap())) - //set url to regUrls - for _, regUrl := range regUrls { - regUrl.SubURL = url - } + //1. user specified URL, could be peer-to-peer address, or register center's address. + if refconfig.Url != "" { + urlStrings := utils.RegSplit(refconfig.Url, "\\s*[;]+\\s*") + for _, urlStr := range urlStrings { + serviceUrl, err := common.NewURL(context.Background(), urlStr) + if err != nil { + panic(fmt.Sprintf("user specified URL %v refer error, error message is %v ", urlStr, err.Error())) + } + if serviceUrl.Protocol == constant.REGISTRY_PROTOCOL { + serviceUrl.SubURL = url + refconfig.urls = append(refconfig.urls, &serviceUrl) + } else { + if serviceUrl.Path == "" { + serviceUrl.Path = refconfig.InterfaceName + "/" + } + // merge url need to do + newUrl := common.MergeUrl(serviceUrl, url) + refconfig.urls = append(refconfig.urls, &newUrl) + } + + } + } else { + //2. assemble SubURL from register center's configuration妯″紡 + refconfig.urls = loadRegistries(refconfig.Registries, consumerConfig.Registries, common.CONSUMER) - if len(regUrls) == 1 { - refconfig.invoker = extension.GetProtocol("registry").Refer(*regUrls[0]) + //set url to regUrls + for _, regUrl := range refconfig.urls { + regUrl.SubURL = url + } + } + if len(refconfig.urls) == 1 { + refconfig.invoker = extension.GetProtocol(refconfig.urls[0].Protocol).Refer(*refconfig.urls[0]) } else { invokers := []protocol.Invoker{} - for _, regUrl := range regUrls { - invokers = append(invokers, extension.GetProtocol("registry").Refer(*regUrl)) + var regUrl *common.URL = nil + for _, u := range refconfig.urls { + invokers = append(invokers, extension.GetProtocol(u.Protocol).Refer(*u)) + if u.Protocol == constant.REGISTRY_PROTOCOL { + regUrl = u + } + } + if regUrl != nil { + cluster := extension.GetCluster("registryAware") + refconfig.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) + } else { + cluster := extension.GetCluster(refconfig.Cluster) + refconfig.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) } - cluster := extension.GetCluster("registryAware") - refconfig.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) } //create proxy diff --git a/config/reference_config_test.go b/config/reference_config_test.go index 8d1c718f830923a63b79439cd0ba865d23e9ddca..a43c1e61c7d3b024c1e09db738173d9d3371e0ad 100644 --- a/config/reference_config_test.go +++ b/config/reference_config_test.go @@ -136,7 +136,44 @@ func Test_Refer(t *testing.T) { } consumerConfig = nil } +func Test_ReferP2P(t *testing.T) { + doInit() + extension.SetProtocol("dubbo", GetProtocol) + consumerConfig.References[0].Url = "dubbo://127.0.0.1:20000" + + for _, reference := range consumerConfig.References { + reference.Refer() + assert.NotNil(t, reference.invoker) + assert.NotNil(t, reference.pxy) + } + consumerConfig = nil +} +func Test_ReferMultiP2P(t *testing.T) { + doInit() + extension.SetProtocol("dubbo", GetProtocol) + consumerConfig.References[0].Url = "dubbo://127.0.0.1:20000;dubbo://127.0.0.2:20000" + + for _, reference := range consumerConfig.References { + reference.Refer() + assert.NotNil(t, reference.invoker) + assert.NotNil(t, reference.pxy) + } + consumerConfig = nil +} + +func Test_ReferMultiP2PWithReg(t *testing.T) { + doInit() + extension.SetProtocol("dubbo", GetProtocol) + extension.SetProtocol("registry", GetProtocol) + consumerConfig.References[0].Url = "dubbo://127.0.0.1:20000;registry://127.0.0.2:20000" + for _, reference := range consumerConfig.References { + reference.Refer() + assert.NotNil(t, reference.invoker) + assert.NotNil(t, reference.pxy) + } + consumerConfig = nil +} func Test_Implement(t *testing.T) { doInit() extension.SetProtocol("registry", GetProtocol) diff --git a/examples/dubbo/go-client/profiles/dev/client.yml b/examples/dubbo/go-client/profiles/dev/client.yml index 93ce354c9c65682d393619a6afa1ae449b96660d..4e3acb0043d82cf74099ecbd2c33437478b6c8da 100644 --- a/examples/dubbo/go-client/profiles/dev/client.yml +++ b/examples/dubbo/go-client/profiles/dev/client.yml @@ -35,6 +35,7 @@ references: - "shanghaizk" protocol : "dubbo" +# url: "dubbo://127.0.0.1:20000" interface : "com.ikurento.user.UserProvider" cluster: "failover" methods : diff --git a/registry/directory/directory.go b/registry/directory/directory.go index c86bc2697edee6686a2e6af33c3d0554d5439212..635e2283b4b003e22fe3e4a4df743d7d57b06dd1 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -185,7 +185,7 @@ func (dir *registryDirectory) cacheInvoker(url common.URL) { referenceUrl := dir.GetUrl().SubURL //check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" { - url = mergeUrl(url, referenceUrl) + url = common.MergeUrl(url, referenceUrl) if _, ok := dir.cacheInvokersMap.Load(url.Key()); !ok { logger.Debugf("service will be added in cache invokers: invokers key is %s!", url.Key()) @@ -225,52 +225,3 @@ func (dir *registryDirectory) Destroy() { dir.cacheInvokers = []protocol.Invoker{} }) } - -// configuration > reference config >service config -// in this function we should merge the reference local url config into the service url from registry. -//TODO configuration merge, in the future , the configuration center's config should merge too. -func mergeUrl(serviceUrl common.URL, referenceUrl *common.URL) common.URL { - mergedUrl := serviceUrl - var methodConfigMergeFcn = []func(method string){} - //iterator the referenceUrl if serviceUrl not have the key ,merge in - - for k, v := range referenceUrl.Params { - if _, ok := mergedUrl.Params[k]; !ok { - mergedUrl.Params.Set(k, v[0]) - } - } - //loadBalance strategy config - if v := referenceUrl.Params.Get(constant.LOADBALANCE_KEY); v != "" { - mergedUrl.Params.Set(constant.LOADBALANCE_KEY, v) - } - methodConfigMergeFcn = append(methodConfigMergeFcn, func(method string) { - if v := referenceUrl.Params.Get(method + "." + constant.LOADBALANCE_KEY); v != "" { - mergedUrl.Params.Set(method+"."+constant.LOADBALANCE_KEY, v) - } - }) - - //cluster strategy config - if v := referenceUrl.Params.Get(constant.CLUSTER_KEY); v != "" { - mergedUrl.Params.Set(constant.CLUSTER_KEY, v) - } - methodConfigMergeFcn = append(methodConfigMergeFcn, func(method string) { - if v := referenceUrl.Params.Get(method + "." + constant.CLUSTER_KEY); v != "" { - mergedUrl.Params.Set(method+"."+constant.CLUSTER_KEY, v) - } - }) - - //remote timestamp - if v := serviceUrl.Params.Get(constant.TIMESTAMP_KEY); v != "" { - mergedUrl.Params.Set(constant.REMOTE_TIMESTAMP_KEY, v) - mergedUrl.Params.Set(constant.TIMESTAMP_KEY, referenceUrl.Params.Get(constant.TIMESTAMP_KEY)) - } - - //finally execute methodConfigMergeFcn - for _, method := range referenceUrl.Methods { - for _, fcn := range methodConfigMergeFcn { - fcn("methods." + method) - } - } - - return mergedUrl -} diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index 685e96a685bb3af2c6577d1ab24f1a87bef34a28..169bbfc0acef4cdd44f8e6c3fc3c36c3586379e3 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -128,18 +128,3 @@ func normalRegistryDir() (*registryDirectory, *registry.MockRegistry) { return registryDirectory, mockRegistry.(*registry.MockRegistry) } -func TestMergeUrl(t *testing.T) { - referenceUrlParams := url.Values{} - referenceUrlParams.Set(constant.CLUSTER_KEY, "random") - referenceUrlParams.Set("test3", "1") - serviceUrlParams := url.Values{} - serviceUrlParams.Set("test2", "1") - serviceUrlParams.Set(constant.CLUSTER_KEY, "roundrobin") - referenceUrl, _ := common.NewURL(context.TODO(), "mock1://127.0.0.1:1111", common.WithParams(referenceUrlParams)) - serviceUrl, _ := common.NewURL(context.TODO(), "mock2://127.0.0.1:20000", common.WithParams(serviceUrlParams)) - - mergedUrl := mergeUrl(serviceUrl, &referenceUrl) - assert.Equal(t, "random", mergedUrl.GetParam(constant.CLUSTER_KEY, "")) - assert.Equal(t, "1", mergedUrl.GetParam("test2", "")) - assert.Equal(t, "1", mergedUrl.GetParam("test3", "")) -} diff --git a/registry/zookeeper/zk_client.go b/registry/zookeeper/zk_client.go index 3721cc7b3223178f17507f761e9a8add69241188..d11b5f7e5d07573fec2e41fc7eb9d52c51c77d77 100644 --- a/registry/zookeeper/zk_client.go +++ b/registry/zookeeper/zk_client.go @@ -302,7 +302,7 @@ func (z *zookeeperClient) Create(basePath string) error { z.Unlock() if err != nil { if err == zk.ErrNodeExists { - logger.Errorf("zk.create(\"%s\") exists\n", tmpPath) + logger.Infof("zk.create(\"%s\") exists\n", tmpPath) } else { logger.Errorf("zk.create(\"%s\") error(%v)\n", tmpPath, perrors.WithStack(err)) return perrors.WithMessagef(err, "zk.Create(path:%s)", basePath)