diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000000000000000000000000000000000000..8c45cda106605cf19f3bff1e894572837f5eb61c --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "go.formatTool": "goimports" +} \ No newline at end of file diff --git a/cluster/router.go b/cluster/router.go index 688ab58bdda75dd5f93864692472ab6a6c0cc87f..2e6097046029ace7b422e2aed7332297967790c9 100644 --- a/cluster/router.go +++ b/cluster/router.go @@ -25,11 +25,12 @@ import ( // Extension - Router type RouterFactory interface { - Router(common.URL) Router + GetRouter(common.URL) (Router, error) } type Router interface { - Route([]protocol.Invoker, common.URL, protocol.Invocation) []protocol.Invoker + Route([]protocol.Invoker, common.URL, protocol.Invocation) ([]protocol.Invoker, error) + CompareTo(router Router) int } type RouterChain struct { diff --git a/cluster/router/condition_router.go b/cluster/router/condition_router.go new file mode 100644 index 0000000000000000000000000000000000000000..ceeca8e010c0cdc11a67f9f968ccbfea6fce5699 --- /dev/null +++ b/cluster/router/condition_router.go @@ -0,0 +1,208 @@ +package router + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" + "regexp" + "strings" + + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" + perrors "github.com/pkg/errors" +) + +const ( + RoutePattern = `([&!=,]*)\\s*([^&!=,\\s]+)` +) + +type ConditionRouter struct { + Pattern string + Url common.URL + Priority int64 + Force bool + WhenCondition map[string]MatchPair + ThenCondition map[string]MatchPair +} + +func (c *ConditionRouter) Route(invokers []protocol.Invoker, url common.URL, invocation protocol.Invocation) ([]protocol.Invoker, error) { + if len(invokers) == 0 { + return invokers, nil + } + if !c.matchWhen(url, invocation) { + return invokers, nil + } + var result []protocol.Invoker + if len(c.ThenCondition) == 0 { + return result, nil + } + for _, invoker := range invokers { + if c.matchThen(invoker.GetUrl(), url) { + result = append(result, invoker) + } + } + if len(result) > 0 { + return result, nil + } else if c.Force { + //todo 日志 + return result, nil + } + return result, nil +} + +func (c ConditionRouter) CompareTo(r cluster.Router) int { + var result int + router, ok := r.(*ConditionRouter) + if r == nil || !ok { + return 1 + } + if c.Priority == router.Priority { + result = strings.Compare(c.Url.String(), router.Url.String()) + } else { + if c.Priority > router.Priority { + result = 1 + } else { + result = -1 + } + } + return result +} + +func newConditionRouter(url common.URL) (*ConditionRouter, error) { + var whenRule string + var thenRule string + //rule := url.GetParam("rule", "") + w, err := parseRule(whenRule) + if err != nil { + return nil, perrors.Errorf("%s", "") + } + + t, err := parseRule(thenRule) + if err != nil { + return nil, perrors.Errorf("%s", "") + } + when := If(whenRule == "" || "true" == whenRule, make(map[string]MatchPair), w).(map[string]MatchPair) + then := If(thenRule == "" || "false" == thenRule, make(map[string]MatchPair), t).(map[string]MatchPair) + + return &ConditionRouter{ + RoutePattern, + url, + url.GetParamInt("Priority", 0), + url.GetParamBool("Force", false), + when, + then, + }, nil +} + +func parseRule(rule string) (map[string]MatchPair, error) { + + condition := make(map[string]MatchPair) + if rule == "" { + return condition, nil + } + var pair MatchPair + values := make(map[string]interface{}) + + reg := regexp.MustCompile(`([&!=,]*)\s*([^&!=,\s]+)`) + + startIndex := reg.FindIndex([]byte(rule)) + matches := reg.FindAllSubmatch([]byte(rule), -1) + for _, groups := range matches { + separator := string(groups[1]) + content := string(groups[2]) + + switch separator { + case "": + pair = MatchPair{} + condition[content] = pair + case "&": + if r, ok := condition[content]; ok { + pair = r + } else { + pair = MatchPair{} + condition[content] = pair + } + case "=": + if &pair == nil { + return nil, perrors.Errorf("Illegal route rule \"%s\", The error char '%s' at index %d before \"%d\".", rule, separator, startIndex[0], startIndex[0]) + } + values = pair.Matches + values[content] = "" + case "!=": + if &pair == nil { + return nil, perrors.Errorf("Illegal route rule \"%s\", The error char '%s' at index %d before \"%d\".", rule, separator, startIndex[0], startIndex[0]) + } + values = pair.Matches + values[content] = "" + case ",": + if len(values) == 0 { + return nil, perrors.Errorf("Illegal route rule \"%s\", The error char '%s' at index %d before \"%d\".", rule, separator, startIndex[0], startIndex[0]) + } + values[content] = "" + default: + return nil, perrors.Errorf("Illegal route rule \"%s\", The error char '%s' at index %d before \"%d\".", rule, separator, startIndex[0], startIndex[0]) + + } + } + + return condition, nil + //var pair MatchPair + +} + +func (c *ConditionRouter) matchWhen(url common.URL, invocation protocol.Invocation) bool { + + return len(c.WhenCondition) == 0 || len(c.WhenCondition) == 0 || matchCondition(c.WhenCondition, &url, nil, invocation) +} +func (c *ConditionRouter) matchThen(url common.URL, param common.URL) bool { + + return !(len(c.ThenCondition) == 0) && matchCondition(c.ThenCondition, &url, ¶m, nil) +} + +func matchCondition(pairs map[string]MatchPair, url *common.URL, param *common.URL, invocation protocol.Invocation) bool { + sample := url.ToMap() + result := false + for key, matchPair := range pairs { + var sampleValue string + + if invocation != nil && ((constant.METHOD_KEY == key) || (constant.METHOD_KEYS == key)) { + sampleValue = invocation.MethodName() + } else { + sampleValue = sample[key] + if &sampleValue == nil { + sampleValue = sample[constant.DEFAULT_KEY_PREFIX+key] + } + } + if &sampleValue != nil { + if !matchPair.isMatch(sampleValue, param) { + return false + } else { + result = true + } + } else { + if !(len(matchPair.Matches) == 0) { + return false + } else { + result = true + } + } + + } + return result +} + +func If(b bool, t, f interface{}) interface{} { + if b { + return t + } + return f +} + +type MatchPair struct { + Matches map[string]interface{} + Mismatches map[string]interface{} +} + +func (pair MatchPair) isMatch(s string, param *common.URL) bool { + + return false +} diff --git a/cluster/router/condition_router_test.go b/cluster/router/condition_router_test.go new file mode 100644 index 0000000000000000000000000000000000000000..83cb25e582f534652d06ac1b3c6b2e4dcd560520 --- /dev/null +++ b/cluster/router/condition_router_test.go @@ -0,0 +1,108 @@ +package router + +import ( + "context" + perrors "errors" + "fmt" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/stretchr/testify/assert" + "net" + "testing" +) + +type MockInvoker struct { + url common.URL + available bool + destroyed bool + + successCount int +} + +func NewMockInvoker(url common.URL, successCount int) *MockInvoker { + return &MockInvoker{ + url: url, + available: true, + destroyed: false, + successCount: successCount, + } +} + +func (bi *MockInvoker) GetUrl() common.URL { + return bi.url +} + +func (bi *MockInvoker) IsAvailable() bool { + return bi.available +} + +func (bi *MockInvoker) IsDestroyed() bool { + return bi.destroyed +} + +type rest struct { + tried int + success bool +} + +var count int + +func (bi *MockInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + count++ + var success bool + var err error = nil + if count >= bi.successCount { + success = true + } else { + err = perrors.New("error") + } + result := &protocol.RPCResult{Err: err, Rest: rest{tried: count, success: success}} + + return result +} + +func (bi *MockInvoker) Destroy() { + logger.Infof("Destroy invoker: %v", bi.GetUrl().String()) + bi.destroyed = true + bi.available = false +} +func Test_parseRule(t *testing.T) { + parseRule("host = 10.20.153.10 => host = 10.20.153.11") + +} +func LocalIp() string { + addrs, err := net.InterfaceAddrs() + if err != nil { + fmt.Println(err) + } + var ip string = "localhost" + for _, address := range addrs { + if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + ip = ipnet.IP.String() + } + } + } + return ip +} +func TestRoute_matchWhen(t *testing.T) { + +} +func TestRoute_matchFilter(t *testing.T) { + url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService?default.serialization=fastjson") + url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", LocalIp())) + url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", LocalIp())) + invokers := []protocol.Invoker{NewMockInvoker(url1, 1), NewMockInvoker(url2, 2), NewMockInvoker(url3, 3)} + option := common.WithParamsValue("rule", "host = "+LocalIp()+" => "+" host = 10.20.3.3") + option1 := common.WithParamsValue("force", "true") + sUrl, _ := common.NewURL(context.TODO(), "condition://0.0.0.0/com.foo.BarService", option, option1) + + router1, _ := NewConditionRouterFactory().GetRouter(sUrl) + cUrl, _ := common.NewURL(context.TODO(), "consumer://"+LocalIp()+"/com.foo.BarService") + + routers, _ := router1.Route(invokers, cUrl, &invocation.RPCInvocation{}) + assert.Equal(t, 1, len(routers)) + +} diff --git a/cluster/router/router_factory.go b/cluster/router/router_factory.go new file mode 100644 index 0000000000000000000000000000000000000000..3134804e2787abca31e1e8b68182bfc23264ce5a --- /dev/null +++ b/cluster/router/router_factory.go @@ -0,0 +1,21 @@ +package router + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" +) + +func init() { + extension.SetRouterFactory("conditionRouterFactory", NewConditionRouterFactory) +} + +type ConditionRouterFactory struct { +} + +func NewConditionRouterFactory() cluster.RouterFactory { + return ConditionRouterFactory{} +} +func (c ConditionRouterFactory) GetRouter(url common.URL) (cluster.Router, error) { + return newConditionRouter(url) +} diff --git a/common/constant/default.go b/common/constant/default.go index f67f2158fd814f00b52c867eb2be5a15d5e94dec..c17ff4a43250288649053e64d47db408a5bd4699 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -32,6 +32,7 @@ const ( const ( DEFAULT_KEY = "default" + DEFAULT_KEY_PREFIX = "default." DEFAULT_SERVICE_FILTERS = "echo" DEFAULT_REFERENCE_FILTERS = "" ECHO = "$echo" diff --git a/common/constant/key.go b/common/constant/key.go index e82a41aa4a686aa55ed48ad859bfc8458ebdc1b0..f81588379e288f0f1d99319bba5308e0f10df413 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -68,8 +68,14 @@ const ( OWNER_KEY = "owner" ENVIRONMENT_KEY = "environment" ) +<<<<<<< HEAD const ( CONFIG_NAMESPACE_KEY = "config.namespace" CONFIG_TIMEOUT_KET = "config.timeout" +======= +const ( + METHOD_KEY = "method" + METHOD_KEYS = "methods" +>>>>>>> Add routing-related features ) diff --git a/common/extension/engine.go b/common/extension/engine.go new file mode 100644 index 0000000000000000000000000000000000000000..8013091b924a46db5da20526c13bcb3c4cf4f1d6 --- /dev/null +++ b/common/extension/engine.go @@ -0,0 +1,22 @@ +package extension + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common" +) + +var ( + engines = make(map[string]func(config *common.URL) (cluster.Router, error)) +) + +func SetEngine(name string, v func(config *common.URL) (cluster.Router, error)) { + engines[name] = v +} + +func GetEngine(name string, config *common.URL) (cluster.Router, error) { + if engines[name] == nil { + panic("registry for " + name + " is not existing, make sure you have import the package.") + } + return engines[name](config) + +} diff --git a/common/extension/router_factory.go b/common/extension/router_factory.go new file mode 100644 index 0000000000000000000000000000000000000000..f364292b07850d1ef48f008eeb12261bf4b274a5 --- /dev/null +++ b/common/extension/router_factory.go @@ -0,0 +1,21 @@ +package extension + +import ( + "github.com/apache/dubbo-go/cluster" +) + +var ( + routers = make(map[string]func() cluster.RouterFactory) +) + +func SetRouterFactory(name string, fun func() cluster.RouterFactory) { + routers[name] = fun +} + +func GetRouterFactory(name string) cluster.RouterFactory { + if routers[name] == nil { + panic("router_factory for " + name + " is not existing, make sure you have import the package.") + } + return routers[name]() + +} diff --git a/common/url.go b/common/url.go index 9249dd055acedc67a0494e97571468bd3ed67cb0..fd1c71def17fc0c6da7794227e3a8ac43d445c4b 100644 --- a/common/url.go +++ b/common/url.go @@ -288,6 +288,17 @@ func (c URL) GetParam(s string, d string) string { return r } +// GetParamBool +func (c URL) GetParamBool(s string, d bool) bool { + + var r bool + var err error + if r, err = strconv.ParseBool(c.Params.Get(s)); err != nil { + return d + } + return r +} + func (c URL) GetParamInt(s string, d int64) int64 { var r int var err error @@ -323,6 +334,31 @@ func (c URL) GetMethodParam(method string, key string, d string) string { return r } +// ToMap transfer URL to Map +func (c URL) ToMap() map[string]string { + + paramsMap := make(map[string]string) + if c.Protocol != "" { + paramsMap["protocol"] = c.Protocol + } + if c.Username != "" { + paramsMap["username"] = c.Username + } + if c.Password != "" { + paramsMap["password"] = c.Password + } + if c.Ip != "" { + paramsMap["host"] = c.Ip + } + if c.Protocol != "" { + paramsMap["protocol"] = c.Protocol + } + if c.Path != "" { + paramsMap["path"] = c.Path + } + return paramsMap +} + // configuration > reference config >service config // in this function we should merge the reference local url config into the service url from registry. //TODO configuration merge, in the future , the configuration center's config should merge too. diff --git a/examples/dubbo/go-server/app/app b/examples/dubbo/go-server/app/app new file mode 100755 index 0000000000000000000000000000000000000000..43e170c50ba2e8ac2dbe6e5efa0e2bdff1b3194a Binary files /dev/null and b/examples/dubbo/go-server/app/app differ