Skip to content
Snippets Groups Projects
Commit 26ca61e0 authored by aliiohs's avatar aliiohs
Browse files

use router in dubbo-go

parent dabfdedb
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,8 @@ package directory
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"go.uber.org/atomic"
"sync"
)
......@@ -32,6 +34,7 @@ type BaseDirectory struct {
destroyed *atomic.Bool
routers []cluster.Router
mutex sync.Mutex
once sync.Once
}
func NewBaseDirectory(url *common.URL) BaseDirectory {
......@@ -47,6 +50,21 @@ func (dir *BaseDirectory) GetUrl() common.URL {
return *dir.url
}
func (dir *BaseDirectory) SetRouters(routers []cluster.Router) {
dir.mutex.Lock()
defer dir.mutex.Unlock()
dir.once.Do(func() {
routerKey := dir.GetUrl().GetParam(constant.ROUTER_KEY, "")
if len(routerKey) > 0 {
factory := extension.GetRouterFactory(dir.GetUrl().Protocol)
url := dir.GetUrl()
router, err := factory.Router(&url)
if err == nil {
routers = append(routers, router)
}
}
})
dir.routers = routers
}
func (dir *BaseDirectory) Routers() []cluster.Router {
......
......@@ -41,7 +41,9 @@ func Test_StaticDirList(t *testing.T) {
}
staticDir := NewStaticDirectory(invokers)
assert.Len(t, staticDir.List(&invocation.RPCInvocation{}), 10)
list, _ := staticDir.List(&invocation.RPCInvocation{})
assert.Len(t, list, 10)
}
func Test_StaticDirDestroy(t *testing.T) {
......
......@@ -63,18 +63,24 @@ const (
)
const (
APPLICATION_KEY = "application"
ORGANIZATION_KEY = "organization"
NAME_KEY = "name"
MODULE_KEY = "module"
APP_VERSION_KEY = "app.version"
OWNER_KEY = "owner"
ENVIRONMENT_KEY = "environment"
METHOD_KEY = "method"
METHOD_KEYS = "methods"
RULE_KEY = "rule"
RUNTIME_KEY = "runtime"
BACKUP_KEY = "backup"
APPLICATION_KEY = "application"
ORGANIZATION_KEY = "organization"
NAME_KEY = "name"
MODULE_KEY = "module"
APP_VERSION_KEY = "app.version"
OWNER_KEY = "owner"
ENVIRONMENT_KEY = "environment"
METHOD_KEY = "method"
METHOD_KEYS = "methods"
RULE_KEY = "rule"
RUNTIME_KEY = "runtime"
BACKUP_KEY = "backup"
ROUTERS_CATEGORY = "routers"
ROUTE_PROTOCOL = "route"
CATEGORY_KEY = "category"
PROVIDERS_CATEGORY = "providers"
EMPTY_PROTOCOL = "empty"
ROUTER_KEY = "router"
)
const (
......
......@@ -243,8 +243,8 @@ func (c URL) Key() string {
}
//todo
func (c URL) GetBackupUrls() []URL {
var urls []URL
func (c *URL) GetBackupUrls() []*URL {
var urls []*URL
var host string
urls = append(urls, c)
backups := strings.Split(c.GetParam(constant.BACKUP_KEY, ""), "")
......@@ -255,10 +255,18 @@ func (c URL) GetBackupUrls() []URL {
host = address[:index]
port = address[index+1:]
} else {
host = string(append([]byte(host), []byte(port)...))
host = address
}
//todo
newURL, _ := NewURL(c.ctx, address)
newURL := NewURLWithOptions(
WithProtocol(c.Protocol),
WithPath(c.Path),
WithIp(host),
WithUsername(c.Username),
WithPassword(c.Password),
WithPort(port),
WithParams(c.Params))
urls = append(urls, newURL)
}
return urls
......
......@@ -20,6 +20,7 @@ package directory
import (
"fmt"
"reflect"
"strings"
"sync"
"time"
)
......@@ -30,6 +31,7 @@ import (
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
......@@ -101,22 +103,23 @@ func (dir *registryDirectory) Subscribe(url common.URL) {
logger.Warnf("event listener game over.")
return
}
logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
continue
}
for {
if serviceEvent, err := listener.Next(); err != nil {
serviceEvent, err := listener.Next()
if err != nil {
logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
listener.Close()
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
return
} else {
logger.Infof("update begin, service event: %v", serviceEvent.String())
go dir.update(serviceEvent)
}
logger.Infof("update begin, service event: %v", serviceEvent.String())
go dir.update(serviceEvent)
}
}
......@@ -132,9 +135,22 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) {
logger.Debugf("update service name: %s!", res.Service)
//todo
_ = dir.GetUrl()
url := dir.GetUrl()
var urls []*common.URL
for _, v := range url.GetBackupUrls() {
p := v.Protocol
if len(dir.Routers()) > 0 {
category := v.GetParam(constant.CATEGORY_KEY, constant.PROVIDERS_CATEGORY)
if strings.EqualFold(category, constant.ROUTERS_CATEGORY) || strings.EqualFold(constant.ROUTE_PROTOCOL, p) {
urls = append(urls, v)
}
}
if len(urls) > 0 {
routers := toRouters(urls)
if len(routers) > 0 {
dir.SetRouters(routers)
}
}
dir.refreshInvokers(res)
}
......@@ -222,8 +238,8 @@ func (dir *registryDirectory) List(invocation protocol.Invocation) ([]protocol.I
if dir.Destroyed() {
return nil, fmt.Errorf("directory already destroyed .url: %s", dir.GetUrl().String())
}
if dir.forbidden.Load() {
//todo error
localIP, _ := utils.GetLocalIP()
return nil, fmt.Errorf("no provider available from registry %s for service %s on consumer %s use dubbo version %s, please check status of providers(disabled, not registered or in blacklist)", dir.GetUrl().Location, dir.ConsumerUrl.ServiceKey(), localIP, version.Version)
}
......@@ -238,6 +254,7 @@ func (dir *registryDirectory) List(invocation protocol.Invocation) ([]protocol.I
}
}
}
return invokers, nil
}
......@@ -264,3 +281,34 @@ func (dir *registryDirectory) Destroy() {
dir.cacheInvokers = []protocol.Invoker{}
})
}
func toRouters(urls []*common.URL) []cluster.Router {
var routers = []cluster.Router{}
if len(urls) == 0 {
return routers
}
for _, url := range urls {
if strings.EqualFold(constant.EMPTY_PROTOCOL, url.Protocol) {
continue
}
routerType := url.GetParam(constant.ROUTER_KEY, "")
if len(routerType) > 0 {
url = common.NewURLWithOptions(
common.WithProtocol(routerType),
common.WithPath(url.Path),
common.WithIp(url.Ip),
common.WithUsername(url.Username),
common.WithPassword(url.Password),
common.WithPort(url.Port),
common.WithParams(url.Params))
routerFactory := extension.GetRouterFactory(url.Protocol)
router, err := routerFactory.Router(url)
if err != nil {
logger.Errorf("convert router url to router error, url:%s", url.String())
continue
}
routers = append(routers, router)
}
}
return routers
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment