Skip to content
Snippets Groups Projects
Commit fc49df7d authored by vito.he's avatar vito.he
Browse files

Ftr:new feature about p2p to resolve issue #56

parent 51ea9600
No related branches found
No related tags found
No related merge requests found
......@@ -292,3 +292,52 @@ func (c URL) GetMethodParam(method string, key string, d string) string {
}
return r
}
// configuration > reference config >service config
// in this function we should merge the reference local url config into the service url from registry.
//TODO configuration merge, in the future , the configuration center's config should merge too.
func MergeUrl(serviceUrl URL, referenceUrl *URL) URL {
mergedUrl := serviceUrl
var methodConfigMergeFcn = []func(method string){}
//iterator the referenceUrl if serviceUrl not have the key ,merge in
for k, v := range referenceUrl.Params {
if _, ok := mergedUrl.Params[k]; !ok {
mergedUrl.Params.Set(k, v[0])
}
}
//loadBalance strategy config
if v := referenceUrl.Params.Get(constant.LOADBALANCE_KEY); v != "" {
mergedUrl.Params.Set(constant.LOADBALANCE_KEY, v)
}
methodConfigMergeFcn = append(methodConfigMergeFcn, func(method string) {
if v := referenceUrl.Params.Get(method + "." + constant.LOADBALANCE_KEY); v != "" {
mergedUrl.Params.Set(method+"."+constant.LOADBALANCE_KEY, v)
}
})
//cluster strategy config
if v := referenceUrl.Params.Get(constant.CLUSTER_KEY); v != "" {
mergedUrl.Params.Set(constant.CLUSTER_KEY, v)
}
methodConfigMergeFcn = append(methodConfigMergeFcn, func(method string) {
if v := referenceUrl.Params.Get(method + "." + constant.CLUSTER_KEY); v != "" {
mergedUrl.Params.Set(method+"."+constant.CLUSTER_KEY, v)
}
})
//remote timestamp
if v := serviceUrl.Params.Get(constant.TIMESTAMP_KEY); v != "" {
mergedUrl.Params.Set(constant.REMOTE_TIMESTAMP_KEY, v)
mergedUrl.Params.Set(constant.TIMESTAMP_KEY, referenceUrl.Params.Get(constant.TIMESTAMP_KEY))
}
//finally execute methodConfigMergeFcn
for _, method := range referenceUrl.Methods {
for _, fcn := range methodConfigMergeFcn {
fcn("methods." + method)
}
}
return mergedUrl
}
......@@ -16,6 +16,7 @@ package common
import (
"context"
"github.com/dubbo/go-for-apache-dubbo/common/constant"
"net/url"
"testing"
)
......@@ -133,3 +134,19 @@ func TestURL_GetMethodParam(t *testing.T) {
v = u.GetMethodParam("GetValue", "timeout", "1s")
assert.Equal(t, "1s", v)
}
func TestMergeUrl(t *testing.T) {
referenceUrlParams := url.Values{}
referenceUrlParams.Set(constant.CLUSTER_KEY, "random")
referenceUrlParams.Set("test3", "1")
serviceUrlParams := url.Values{}
serviceUrlParams.Set("test2", "1")
serviceUrlParams.Set(constant.CLUSTER_KEY, "roundrobin")
referenceUrl, _ := NewURL(context.TODO(), "mock1://127.0.0.1:1111", WithParams(referenceUrlParams))
serviceUrl, _ := NewURL(context.TODO(), "mock2://127.0.0.1:20000", WithParams(serviceUrlParams))
mergedUrl := MergeUrl(serviceUrl, &referenceUrl)
assert.Equal(t, "random", mergedUrl.GetParam(constant.CLUSTER_KEY, ""))
assert.Equal(t, "1", mergedUrl.GetParam("test2", ""))
assert.Equal(t, "1", mergedUrl.GetParam("test3", ""))
}
package utils
import "regexp"
func RegSplit(text string, delimeter string) []string {
reg := regexp.MustCompile(delimeter)
indexes := reg.FindAllStringIndex(text, -1)
laststart := 0
result := make([]string, len(indexes)+1)
for i, element := range indexes {
result[i] = text[laststart:element[0]]
laststart = element[1]
}
result[len(indexes)] = text[laststart:len(text)]
return result
}
package utils
import (
"github.com/stretchr/testify/assert"
"testing"
)
func Test_RegSplit(t *testing.T) {
strings := RegSplit("dubbo://123.1.2.1;jsonrpc://127.0.0.1;registry://3.2.1.3?registry=zookeeper", "\\s*[;]+\\s*")
assert.Len(t, strings, 3)
assert.Equal(t, "dubbo://123.1.2.1", strings[0])
assert.Equal(t, "jsonrpc://127.0.0.1", strings[1])
assert.Equal(t, "registry://3.2.1.3?registry=zookeeper", strings[2])
}
......@@ -16,6 +16,7 @@ package config
import (
"context"
"fmt"
"net/url"
"strconv"
"time"
......@@ -27,6 +28,7 @@ import (
"github.com/dubbo/go-for-apache-dubbo/common/constant"
"github.com/dubbo/go-for-apache-dubbo/common/extension"
"github.com/dubbo/go-for-apache-dubbo/common/proxy"
"github.com/dubbo/go-for-apache-dubbo/common/utils"
"github.com/dubbo/go-for-apache-dubbo/protocol"
)
......@@ -35,6 +37,7 @@ type ReferenceConfig struct {
pxy *proxy.Proxy
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty"`
Check *bool `yaml:"check" json:"check,omitempty"`
Url string `yaml:"url" json:"url,omitempty"`
Filter string `yaml:"filter" json:"filter,omitempty"`
Protocol string `yaml:"protocol" json:"protocol,omitempty"`
Registries []ConfigRegistry `required:"true" yaml:"registries" json:"registries,omitempty"`
......@@ -50,6 +53,7 @@ type ReferenceConfig struct {
} `yaml:"methods" json:"methods,omitempty"`
async bool `yaml:"async" json:"async,omitempty"`
invoker protocol.Invoker
urls []*common.URL
}
type ConfigRegistry string
......@@ -68,27 +72,57 @@ func (refconfig *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) erro
return nil
}
func (refconfig *ReferenceConfig) Refer() {
//1. user specified SubURL, could be peer-to-peer address, or register center's address.
//2. assemble SubURL from register center's configuration模式
regUrls := loadRegistries(refconfig.Registries, consumerConfig.Registries, common.CONSUMER)
url := common.NewURLWithOptions(refconfig.InterfaceName, common.WithProtocol(refconfig.Protocol), common.WithParams(refconfig.getUrlMap()))
//set url to regUrls
for _, regUrl := range regUrls {
regUrl.SubURL = url
}
//1. user specified URL, could be peer-to-peer address, or register center's address.
if refconfig.Url != "" {
urlStrings := utils.RegSplit(refconfig.Url, "\\s*[;]+\\s*")
for _, urlStr := range urlStrings {
serviceUrl, err := common.NewURL(context.Background(), urlStr)
if err != nil {
panic(fmt.Sprintf("user specified URL %v refer error, error message is %v ", urlStr, err.Error()))
}
if serviceUrl.Protocol == constant.REGISTRY_PROTOCOL {
serviceUrl.SubURL = url
refconfig.urls = append(refconfig.urls, &serviceUrl)
} else {
if serviceUrl.Path == "" {
serviceUrl.Path = refconfig.InterfaceName + "/"
}
// merge url need to do
newUrl := common.MergeUrl(serviceUrl, url)
refconfig.urls = append(refconfig.urls, &newUrl)
}
}
} else {
//2. assemble SubURL from register center's configuration模式
refconfig.urls = loadRegistries(refconfig.Registries, consumerConfig.Registries, common.CONSUMER)
if len(regUrls) == 1 {
refconfig.invoker = extension.GetProtocol("registry").Refer(*regUrls[0])
//set url to regUrls
for _, regUrl := range refconfig.urls {
regUrl.SubURL = url
}
}
if len(refconfig.urls) == 1 {
refconfig.invoker = extension.GetProtocol(refconfig.urls[0].Protocol).Refer(*refconfig.urls[0])
} else {
invokers := []protocol.Invoker{}
for _, regUrl := range regUrls {
invokers = append(invokers, extension.GetProtocol("registry").Refer(*regUrl))
var regUrl *common.URL = nil
for _, u := range refconfig.urls {
invokers = append(invokers, extension.GetProtocol(u.Protocol).Refer(*u))
if u.Protocol == constant.REGISTRY_PROTOCOL {
regUrl = u
}
}
if regUrl != nil {
cluster := extension.GetCluster("registryAware")
refconfig.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
} else {
cluster := extension.GetCluster(refconfig.Cluster)
refconfig.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
}
cluster := extension.GetCluster("registryAware")
refconfig.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
}
//create proxy
......
......@@ -136,7 +136,44 @@ func Test_Refer(t *testing.T) {
}
consumerConfig = nil
}
func Test_ReferP2P(t *testing.T) {
doInit()
extension.SetProtocol("dubbo", GetProtocol)
consumerConfig.References[0].Url = "dubbo://127.0.0.1:20000"
for _, reference := range consumerConfig.References {
reference.Refer()
assert.NotNil(t, reference.invoker)
assert.NotNil(t, reference.pxy)
}
consumerConfig = nil
}
func Test_ReferMultiP2P(t *testing.T) {
doInit()
extension.SetProtocol("dubbo", GetProtocol)
consumerConfig.References[0].Url = "dubbo://127.0.0.1:20000;dubbo://127.0.0.2:20000"
for _, reference := range consumerConfig.References {
reference.Refer()
assert.NotNil(t, reference.invoker)
assert.NotNil(t, reference.pxy)
}
consumerConfig = nil
}
func Test_ReferMultiP2PWithReg(t *testing.T) {
doInit()
extension.SetProtocol("dubbo", GetProtocol)
extension.SetProtocol("registry", GetProtocol)
consumerConfig.References[0].Url = "dubbo://127.0.0.1:20000;registry://127.0.0.2:20000"
for _, reference := range consumerConfig.References {
reference.Refer()
assert.NotNil(t, reference.invoker)
assert.NotNil(t, reference.pxy)
}
consumerConfig = nil
}
func Test_Implement(t *testing.T) {
doInit()
extension.SetProtocol("registry", GetProtocol)
......
......@@ -35,6 +35,7 @@ references:
- "shanghaizk"
protocol : "dubbo"
# url: "dubbo://127.0.0.1:20000"
interface : "com.ikurento.user.UserProvider"
cluster: "failover"
methods :
......
......@@ -185,7 +185,7 @@ func (dir *registryDirectory) cacheInvoker(url common.URL) {
referenceUrl := dir.GetUrl().SubURL
//check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol
if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
url = mergeUrl(url, referenceUrl)
url = common.MergeUrl(url, referenceUrl)
if _, ok := dir.cacheInvokersMap.Load(url.Key()); !ok {
logger.Debugf("service will be added in cache invokers: invokers key is %s!", url.Key())
......@@ -225,52 +225,3 @@ func (dir *registryDirectory) Destroy() {
dir.cacheInvokers = []protocol.Invoker{}
})
}
// configuration > reference config >service config
// in this function we should merge the reference local url config into the service url from registry.
//TODO configuration merge, in the future , the configuration center's config should merge too.
func mergeUrl(serviceUrl common.URL, referenceUrl *common.URL) common.URL {
mergedUrl := serviceUrl
var methodConfigMergeFcn = []func(method string){}
//iterator the referenceUrl if serviceUrl not have the key ,merge in
for k, v := range referenceUrl.Params {
if _, ok := mergedUrl.Params[k]; !ok {
mergedUrl.Params.Set(k, v[0])
}
}
//loadBalance strategy config
if v := referenceUrl.Params.Get(constant.LOADBALANCE_KEY); v != "" {
mergedUrl.Params.Set(constant.LOADBALANCE_KEY, v)
}
methodConfigMergeFcn = append(methodConfigMergeFcn, func(method string) {
if v := referenceUrl.Params.Get(method + "." + constant.LOADBALANCE_KEY); v != "" {
mergedUrl.Params.Set(method+"."+constant.LOADBALANCE_KEY, v)
}
})
//cluster strategy config
if v := referenceUrl.Params.Get(constant.CLUSTER_KEY); v != "" {
mergedUrl.Params.Set(constant.CLUSTER_KEY, v)
}
methodConfigMergeFcn = append(methodConfigMergeFcn, func(method string) {
if v := referenceUrl.Params.Get(method + "." + constant.CLUSTER_KEY); v != "" {
mergedUrl.Params.Set(method+"."+constant.CLUSTER_KEY, v)
}
})
//remote timestamp
if v := serviceUrl.Params.Get(constant.TIMESTAMP_KEY); v != "" {
mergedUrl.Params.Set(constant.REMOTE_TIMESTAMP_KEY, v)
mergedUrl.Params.Set(constant.TIMESTAMP_KEY, referenceUrl.Params.Get(constant.TIMESTAMP_KEY))
}
//finally execute methodConfigMergeFcn
for _, method := range referenceUrl.Methods {
for _, fcn := range methodConfigMergeFcn {
fcn("methods." + method)
}
}
return mergedUrl
}
......@@ -128,18 +128,3 @@ func normalRegistryDir() (*registryDirectory, *registry.MockRegistry) {
return registryDirectory, mockRegistry.(*registry.MockRegistry)
}
func TestMergeUrl(t *testing.T) {
referenceUrlParams := url.Values{}
referenceUrlParams.Set(constant.CLUSTER_KEY, "random")
referenceUrlParams.Set("test3", "1")
serviceUrlParams := url.Values{}
serviceUrlParams.Set("test2", "1")
serviceUrlParams.Set(constant.CLUSTER_KEY, "roundrobin")
referenceUrl, _ := common.NewURL(context.TODO(), "mock1://127.0.0.1:1111", common.WithParams(referenceUrlParams))
serviceUrl, _ := common.NewURL(context.TODO(), "mock2://127.0.0.1:20000", common.WithParams(serviceUrlParams))
mergedUrl := mergeUrl(serviceUrl, &referenceUrl)
assert.Equal(t, "random", mergedUrl.GetParam(constant.CLUSTER_KEY, ""))
assert.Equal(t, "1", mergedUrl.GetParam("test2", ""))
assert.Equal(t, "1", mergedUrl.GetParam("test3", ""))
}
......@@ -302,7 +302,7 @@ func (z *zookeeperClient) Create(basePath string) error {
z.Unlock()
if err != nil {
if err == zk.ErrNodeExists {
logger.Errorf("zk.create(\"%s\") exists\n", tmpPath)
logger.Infof("zk.create(\"%s\") exists\n", tmpPath)
} else {
logger.Errorf("zk.create(\"%s\") error(%v)\n", tmpPath, perrors.WithStack(err))
return perrors.WithMessagef(err, "zk.Create(path:%s)", basePath)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment