Skip to content
Snippets Groups Projects
Commit ed1488ab authored by 邹毅贤's avatar 邹毅贤
Browse files

Merge branch 'aliiohs-UseRouter' into feature/addRouter

# Conflicts:
#	cluster/directory/base_directory.go
#	common/constant/env.go
#	common/constant/key.go
#	config_center/dynamic_configuration.go
#	go.mod
#	registry/directory/directory.go
parents 14281690 c2ed5907
No related branches found
No related tags found
No related merge requests found
Showing
with 533 additions and 54 deletions
......@@ -83,7 +83,7 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr
if err != nil {
return &protocol.RPCResult{Err: err}
}
invokers = invoker.directory.List(invocation)
invokers := invoker.directory.List(invocation)
err = invoker.checkInvokers(invokers, invocation)
if err != nil {
return &protocol.RPCResult{Err: err}
......
......@@ -18,27 +18,38 @@
package directory
import (
"sync"
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/dubbogo/gost/container"
"go.uber.org/atomic"
"sync"
)
import (
"github.com/apache/dubbo-go/common"
)
var RouterUrlSet = container.NewSet()
type BaseDirectory struct {
url *common.URL
destroyed *atomic.Bool
mutex sync.Mutex
url *common.URL
ConsumerUrl *common.URL
destroyed *atomic.Bool
routers []cluster.Router
mutex sync.Mutex
once sync.Once
}
func NewBaseDirectory(url *common.URL) BaseDirectory {
return BaseDirectory{
url: url,
destroyed: atomic.NewBool(false),
url: url,
ConsumerUrl: url,
destroyed: atomic.NewBool(false),
}
}
func (dir *BaseDirectory) Destroyed() bool {
return dir.destroyed.Load()
}
func (dir *BaseDirectory) GetUrl() common.URL {
return *dir.url
}
......@@ -46,6 +57,43 @@ func (dir *BaseDirectory) GetDirectoryUrl() *common.URL {
return dir.url
}
func (dir *BaseDirectory) SetRouters(routers []cluster.Router) {
dir.mutex.Lock()
defer dir.mutex.Unlock()
routerKey := dir.GetUrl().GetParam(constant.ROUTER_KEY, "")
if len(routerKey) > 0 {
factory := extension.GetRouterFactory(dir.GetUrl().Protocol)
url := dir.GetUrl()
router, err := factory.Router(&url)
if err == nil {
routers = append(routers, router)
}
}
dir.routers = routers
}
func (dir *BaseDirectory) Routers() []cluster.Router {
var routers []cluster.Router
dir.once.Do(func() {
rs := RouterUrlSet.Values()
for _, r := range rs {
factory := extension.GetRouterFactory(r.(*common.URL).GetParam("router", "condition"))
router, err := factory.Router(r.(*common.URL))
if err == nil {
dir.routers = append(dir.routers, router)
}
routers = append(routers, router)
}
})
if len(routers) > 0 {
return append(dir.routers, routers...)
}
return dir.routers
}
func (dir *BaseDirectory) Destroy(doDestroy func()) {
if dir.destroyed.CAS(false, true) {
dir.mutex.Lock()
......
......@@ -18,8 +18,11 @@
package directory
import (
"fmt"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
"reflect"
)
type staticDirectory struct {
......@@ -53,8 +56,18 @@ func (dir *staticDirectory) IsAvailable() bool {
}
func (dir *staticDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
//TODO:Here should add router
return dir.invokers
invokers := dir.invokers
localRouters := dir.routers
fmt.Println("========", len(localRouters))
if len(localRouters) > 0 {
for _, router := range localRouters {
if reflect.ValueOf(router.Url()).IsNil() || router.Url().GetParamBool(constant.RUNTIME_KEY, false) {
invokers = router.Route(invokers, *dir.ConsumerUrl, invocation)
}
}
}
return invokers
}
func (dir *staticDirectory) Destroy() {
......
......@@ -41,7 +41,9 @@ func Test_StaticDirList(t *testing.T) {
}
staticDir := NewStaticDirectory(invokers)
assert.Len(t, staticDir.List(&invocation.RPCInvocation{}), 10)
list := staticDir.List(&invocation.RPCInvocation{})
assert.Len(t, list, 10)
}
func Test_StaticDirDestroy(t *testing.T) {
......
......@@ -30,12 +30,6 @@ type RouterFactory interface {
type Router interface {
Route([]protocol.Invoker, common.URL, protocol.Invocation) []protocol.Invoker
}
type RouterChain struct {
routers []Router
}
func NewRouterChain(url common.URL) {
Priority() int64
Url() common.URL
}
......@@ -45,8 +45,8 @@ const (
//ConditionRouter condition router struct
type ConditionRouter struct {
Pattern string
Url *common.URL
Priority int64
url *common.URL
priority int64
Force bool
WhenCondition map[string]MatchPair
ThenCondition map[string]MatchPair
......@@ -104,6 +104,14 @@ func newConditionRouter(url *common.URL) (*ConditionRouter, error) {
}, nil
}
func (c ConditionRouter) Priority() int64 {
return c.priority
}
func (c ConditionRouter) Url() common.URL {
return *c.url
}
//Router determine the target server list.
func (c *ConditionRouter) Route(invokers []protocol.Invoker, url common.URL, invocation protocol.Invocation) []protocol.Invoker {
if len(invokers) == 0 {
......@@ -116,7 +124,7 @@ func (c *ConditionRouter) Route(invokers []protocol.Invoker, url common.URL, inv
for _, invo := range invokers {
urls = append(urls, reflect.TypeOf(invo).String())
}
logger.Warnf("Failed to execute condition router rule: %s , invokers: [%s], cause: %v", c.Url.String(), strings.Join(urls, ","), err)
logger.Warnf("Failed to execute condition router rule: %s , invokers: [%s], cause: %v", c.url.String(), strings.Join(urls, ","), err)
return invokers
}
if !isMatchWhen {
......@@ -134,7 +142,7 @@ func (c *ConditionRouter) Route(invokers []protocol.Invoker, url common.URL, inv
for _, invo := range invokers {
urls = append(urls, reflect.TypeOf(invo).String())
}
logger.Warnf("Failed to execute condition router rule: %s , invokers: [%s], cause: %v", c.Url.String(), strings.Join(urls, ","), err)
logger.Warnf("Failed to execute condition router rule: %s , invokers: [%s], cause: %v", c.url.String(), strings.Join(urls, ","), err)
return invokers
}
if isMatchThen {
......
package cluster
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"sort"
)
type RouterChain struct {
routers []Router
builtinRouters []Router
Invokers []protocol.Invoker
}
func NewRouterChain(url common.URL) *RouterChain {
//var builtinRouters []Router
//factories := extension.GetRouterFactories()
//for _, factory := range factories {
// router, _ := factory.Router(&url)
// builtinRouters = append(builtinRouters, router)
//}
//var routers []Router
//copy(routers, builtinRouters)
//sort.Slice(routers, func(i, j int) bool {
// return routers[i].Priority() < routers[j].Priority()
//})
//return &RouterChain{
// builtinRouters: routers,
// routers: routers,
//}
return nil
}
func (r RouterChain) AddRouters(routers []Router) {
r.routers = append(r.routers, routers...)
sort.Slice(r.routers, func(i, j int) bool {
return routers[i].Priority() < routers[j].Priority()
})
}
func (r RouterChain) SetInvokers(invokers []protocol.Invoker) {
r.Invokers = invokers
/*for _, _ := range r.routers {
//router.Notify(r.Invokers)
}*/
}
......@@ -21,4 +21,5 @@ const (
CONF_CONSUMER_FILE_PATH = "CONF_CONSUMER_FILE_PATH"
CONF_PROVIDER_FILE_PATH = "CONF_PROVIDER_FILE_PATH"
APP_LOG_CONF_FILE = "APP_LOG_CONF_FILE"
CONF_ROUTER_FILE_PATH = "CONF_ROUTER_FILE_PATH"
)
......@@ -40,6 +40,7 @@ const (
TOKEN_KEY = "token"
LOCAL_ADDR = "local-addr"
REMOTE_ADDR = "remote-addr"
PATH_SEPARATOR = "/"
)
const (
......@@ -74,6 +75,7 @@ const (
EXECUTE_REJECTED_EXECUTION_HANDLER_KEY = "execute.limit.rejected.handler"
PROVIDER_SHUTDOWN_FILTER = "pshutdown"
CONSUMER_SHUTDOWN_FILTER = "cshutdown"
ANYHOST_VALUE = "0.0.0.0"
)
const (
......@@ -89,16 +91,24 @@ const (
)
const (
APPLICATION_KEY = "application"
ORGANIZATION_KEY = "organization"
NAME_KEY = "name"
MODULE_KEY = "module"
APP_VERSION_KEY = "app.version"
OWNER_KEY = "owner"
ENVIRONMENT_KEY = "environment"
METHOD_KEY = "method"
METHOD_KEYS = "methods"
RULE_KEY = "rule"
APPLICATION_KEY = "application"
ORGANIZATION_KEY = "organization"
NAME_KEY = "name"
MODULE_KEY = "module"
APP_VERSION_KEY = "app.version"
OWNER_KEY = "owner"
ENVIRONMENT_KEY = "environment"
METHOD_KEY = "method"
METHOD_KEYS = "methods"
RULE_KEY = "rule"
RUNTIME_KEY = "runtime"
BACKUP_KEY = "backup"
ROUTERS_CATEGORY = "routers"
ROUTE_PROTOCOL = "route"
CATEGORY_KEY = "category"
PROVIDERS_CATEGORY = "providers"
EMPTY_PROTOCOL = "empty"
ROUTER_KEY = "router"
)
const (
......@@ -120,6 +130,7 @@ const (
ProviderConfigPrefix = "dubbo.provider."
ConsumerConfigPrefix = "dubbo.consumer."
ShutdownConfigPrefix = "dubbo.shutdown."
RouterConfigPrefix = "dubbo.router."
)
const (
......
......@@ -36,3 +36,12 @@ func GetRouterFactory(name string) cluster.RouterFactory {
return routers[name]()
}
func GetRouterFactorys() []cluster.RouterFactory {
var factorys []cluster.RouterFactory
for _, value := range routers {
factorys = append(factorys, value())
}
return factorys
}
......@@ -54,7 +54,7 @@ const (
var (
DubboNodes = [...]string{"consumers", "configurators", "routers", "providers"}
DubboRole = [...]string{"consumer", "", "", "provider"}
DubboRole = [...]string{"consumer", "", "routers", "provider"}
)
type RoleType int
......@@ -218,7 +218,7 @@ func NewURL(ctx context.Context, urlString string, opts ...option) (URL, error)
if strings.Contains(s.Location, ":") {
s.Ip, s.Port, err = net.SplitHostPort(s.Location)
if err != nil {
return s, perrors.Errorf("net.SplitHostPort(Url.Host{%s}), error{%v}", s.Location, err)
return s, perrors.Errorf("net.SplitHostPort(url.Host{%s}), error{%v}", s.Location, err)
}
}
for _, opt := range opts {
......@@ -290,6 +290,36 @@ func (c URL) Key() string {
//return c.ServiceKey()
}
//todo
func (c *URL) GetBackupUrls() []*URL {
var urls []*URL
var host string
urls = append(urls, c)
backups := strings.Split(c.GetParam(constant.BACKUP_KEY, ""), "")
for _, address := range backups {
index := strings.LastIndex(address, ":")
port := c.Port
if index > 0 {
host = address[:index]
port = address[index+1:]
} else {
host = address
}
//todo
newURL := NewURLWithOptions(
WithProtocol(c.Protocol),
WithPath(c.Path),
WithIp(host),
WithUsername(c.Username),
WithPassword(c.Password),
WithPort(port),
WithParams(c.Params))
urls = append(urls, newURL)
}
return urls
}
func (c URL) ServiceKey() string {
intf := c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/"))
if intf == "" {
......@@ -328,7 +358,7 @@ func (c URL) Service() string {
return service
} else if c.SubURL != nil {
service = c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/"))
if service != "" { //if url.path is "" then return suburl's path, special for registry Url
if service != "" { //if url.path is "" then return suburl's path, special for registry url
return service
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import (
"encoding/base64"
"fmt"
"io/ioutil"
"net/url"
"os"
"path"
"strconv"
"strings"
"sync"
)
import (
perrors "github.com/pkg/errors"
"gopkg.in/yaml.v2"
)
import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
)
var (
mutex sync.Mutex
)
/////////////////////////
// routerConfig
/////////////////////////
type ConditionRouterConfig struct {
RawRule string `yaml:"rawRule"`
Scope string `yaml:"scope"`
Priority int `yaml:"priority"`
Enabled bool `yaml:"enabled" default:"true"`
Dynamic bool `yaml:"dynamic" default:"false"`
Valid bool `yaml:"valid" default:"true"`
Force bool `yaml:"force" default:"false"`
Runtime bool `yaml:"runtime" default:"true"`
Key string `yaml:"key"`
Conditions []string `yaml:"conditions"`
}
func (*ConditionRouterConfig) Prefix() string {
return constant.RouterConfigPrefix
}
func RouterInit(confRouterFile string) error {
if len(confRouterFile) == 0 {
return perrors.Errorf("application configure(provider) file name is nil")
}
if path.Ext(confRouterFile) != ".yml" {
return perrors.Errorf("application configure file name{%v} suffix must be .yml", confRouterFile)
}
confFileStream, err := ioutil.ReadFile(confRouterFile)
if err != nil {
return perrors.Errorf("ioutil.ReadFile(file:%s) = error:%v", confRouterFile, perrors.WithStack(err))
}
routerConfig = &ConditionRouterConfig{}
err = yaml.Unmarshal(confFileStream, routerConfig)
if err != nil {
return perrors.Errorf("yaml.Unmarshal() = error:%v", perrors.WithStack(err))
}
logger.Debugf("provider config{%#v}\n", routerConfig)
directory.RouterUrlSet.Add(initRouterUrl())
fmt.Println("=====", directory.RouterUrlSet.Size())
return nil
}
func initRouterUrl() *common.URL {
mutex.Lock()
if routerConfig == nil {
confRouterFile := os.Getenv(constant.CONF_ROUTER_FILE_PATH)
err := RouterInit(confRouterFile)
if err != nil {
return nil
}
}
mutex.Unlock()
rule := parseCondition(routerConfig.Conditions)
return common.NewURLWithOptions(
common.WithProtocol(constant.ROUTE_PROTOCOL),
common.WithIp(constant.ANYHOST_VALUE),
common.WithParams(url.Values{}),
common.WithParamsValue("enabled", strconv.FormatBool(routerConfig.Enabled)),
common.WithParamsValue("dynamic", strconv.FormatBool(routerConfig.Dynamic)),
common.WithParamsValue("force", strconv.FormatBool(routerConfig.Force)),
common.WithParamsValue("runtime", strconv.FormatBool(routerConfig.Runtime)),
common.WithParamsValue("priority", strconv.Itoa(routerConfig.Priority)),
common.WithParamsValue("scope", routerConfig.Scope),
common.WithParamsValue(constant.RULE_KEY, base64.URLEncoding.EncodeToString([]byte(rule))),
common.WithParamsValue("router", "condition"),
common.WithParamsValue(constant.CATEGORY_KEY, constant.ROUTERS_CATEGORY))
}
func parseCondition(conditions []string) string {
var when, then string
for _, condition := range conditions {
condition = strings.Trim(condition, " ")
if strings.Contains(condition, "=>") {
array := strings.SplitN(condition, "=>", 2)
consumer := strings.Trim(array[0], " ")
provider := strings.Trim(array[1], " ")
if len(consumer) != 0 {
if len(when) != 0 {
when = strings.Join([]string{when, consumer}, " & ")
} else {
when = consumer
}
}
if len(provider) != 0 {
if len(then) != 0 {
then = strings.Join([]string{then, provider}, " & ")
} else {
then = provider
}
}
}
}
return strings.Join([]string{when, then}, " => ")
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import (
"strings"
"testing"
)
func TestString(t *testing.T) {
s := "a1=>a2"
s1 := "=>a2"
s2 := "a1=>"
n := strings.SplitN(s, "=>", 2)
n1 := strings.SplitN(s1, "=>", 2)
n2 := strings.SplitN(s2, "=>", 2)
println(n[0], " ", n[1])
println(n1[0], " ", n1[1])
println(n2[0], " ", n2[1])
}
......@@ -33,6 +33,7 @@ import (
var (
consumerConfig *ConsumerConfig
providerConfig *ProviderConfig
routerConfig *ConditionRouterConfig
maxWait = 3
)
......@@ -40,19 +41,27 @@ var (
// Namely: dubbo.consumer.xml & dubbo.provider.xml in java dubbo
func init() {
var (
confConFile, confProFile string
confConFile, confProFile, confRouterFile string
)
confConFile = os.Getenv(constant.CONF_CONSUMER_FILE_PATH)
confProFile = os.Getenv(constant.CONF_PROVIDER_FILE_PATH)
confRouterFile = os.Getenv(constant.CONF_ROUTER_FILE_PATH)
if errCon := ConsumerInit(confConFile); errCon != nil {
log.Printf("[consumerInit] %#v", errCon)
consumerConfig = nil
}
if errPro := ProviderInit(confProFile); errPro != nil {
log.Printf("[providerInit] %#v", errPro)
providerConfig = nil
}
if errPro := RouterInit(confRouterFile); errPro != nil {
log.Printf("[routerConfig] %#v", errPro)
routerConfig = nil
}
}
func checkRegistries(registries map[string]*RegistryConfig, singleRegistry *RegistryConfig) {
......
......@@ -44,6 +44,10 @@ type DynamicConfiguration interface {
//GetInternalProperty get value by key in Default properties file(dubbo.properties)
GetInternalProperty(string, ...Option) (string, error)
GetConfig(string, ...Option) (string, error)
SetConfig(string, string, string) error
GetConfigs(string, ...Option) (string, error)
}
type Options struct {
......
......@@ -97,6 +97,10 @@ func (c *MockDynamicConfiguration) GetConfig(key string, opts ...Option) (string
return c.content, nil
}
func (c *mockDynamicConfiguration) SetConfig(group string, key string, value string) error {
return nil
}
//For zookeeper, getConfig and getConfigs have the same meaning.
func (c *MockDynamicConfiguration) GetConfigs(key string, opts ...Option) (string, error) {
return c.GetConfig(key, opts...)
......
......@@ -18,15 +18,20 @@
package directory
import (
"fmt"
"reflect"
"strings"
"sync"
"time"
)
import (
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
......@@ -59,6 +64,8 @@ type registryDirectory struct {
consumerConfigurationListener *consumerConfigurationListener
referenceConfigurationListener *referenceConfigurationListener
Options
serviceKey string
forbidden atomic.Bool
}
func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...Option) (*registryDirectory, error) {
......@@ -86,9 +93,38 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...O
//subscribe from registry
func (dir *registryDirectory) Subscribe(url *common.URL) {
dir.consumerConfigurationListener.addNotifyListener(dir)
dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
dir.registry.Subscribe(url, dir)
for {
if !dir.registry.IsAvailable() {
logger.Warnf("event listener game over.")
return
}
listener, err := dir.registry.Subscribe(url)
if err != nil {
if !dir.registry.IsAvailable() {
logger.Warnf("event listener game over.")
return
}
logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
continue
}
for {
serviceEvent, err := listener.Next()
if err != nil {
logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
listener.Close()
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
return
}
logger.Infof("update begin, service event: %v", serviceEvent.String())
go dir.update(serviceEvent)
}
}
}
func (dir *registryDirectory) Notify(event *registry.ServiceEvent) {
......@@ -121,16 +157,34 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
//TODO: router
}
switch res.Action {
case remoting.EventTypeAdd, remoting.EventTypeUpdate:
//dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL)
dir.cacheInvoker(url)
case remoting.EventTypeDel:
//dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL)
dir.uncacheInvoker(url)
logger.Infof("selector delete service url{%s}", res.Service)
default:
return
case remoting.EventTypeAdd:
url := dir.GetUrl()
var urls []*common.URL
for _, v := range url.GetBackupUrls() {
p := v.Protocol
category := v.GetParam(constant.CATEGORY_KEY, constant.PROVIDERS_CATEGORY)
if strings.EqualFold(category, constant.ROUTERS_CATEGORY) || strings.EqualFold(constant.ROUTE_PROTOCOL, p) {
urls = append(urls, v)
}
}
if len(urls) > 0 {
routers := toRouters(urls)
if len(routers) > 0 {
dir.SetRouters(routers)
}
}
//dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL)
dir.cacheInvoker(res.Service)
case remoting.EventTypeDel:
//dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL)
dir.uncacheInvoker(res.Service)
logger.Infof("selector delete service url{%s}", res.Service)
default:
return
}
}
newInvokers := dir.toGroupInvokers()
......@@ -215,8 +269,19 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
//select the protocol invokers from the directory
func (dir *registryDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
//TODO:router
return dir.cacheInvokers
invokers := dir.cacheInvokers
localRouters := dir.Routers()
fmt.Println("========", len(localRouters))
if len(localRouters) > 0 {
for _, router := range localRouters {
if reflect.ValueOf(router.Url()).IsValid() || router.Url().GetParamBool(constant.RUNTIME_KEY, false) {
invokers = router.Route(invokers, *dir.ConsumerUrl, invocation)
}
}
}
return invokers
}
func (dir *registryDirectory) IsAvailable() bool {
......@@ -299,3 +364,16 @@ func (l *consumerConfigurationListener) Process(event *config_center.ConfigChang
l.BaseConfigurationListener.Process(event)
l.directory.refreshInvokers(nil)
}
func newConsumerConfigurationListener(dir *registryDirectory) *consumerConfigurationListener {
listener := &consumerConfigurationListener{directory: dir}
listener.InitWith(
config.GetConsumerConfig().ApplicationConfig.Name+constant.CONFIGURATORS_SUFFIX,
listener,
extension.GetDefaultConfiguratorFunc(),
)
return listener
}
func (l *consumerConfigurationListener) addNotifyListener(listener registry.NotifyListener) {
l.listeners = appe
......@@ -118,7 +118,8 @@ func Test_List(t *testing.T) {
registryDirectory, _ := normalRegistryDir()
time.Sleep(1e9)
assert.Len(t, registryDirectory.List(&invocation.RPCInvocation{}), 3)
invokers, _ := registryDirectory.List(&invocation.RPCInvocation{})
assert.Len(t, invokers, 3)
assert.Equal(t, true, registryDirectory.IsAvailable())
}
......
......@@ -231,7 +231,7 @@ func (r *zkRegistry) Register(conf common.URL) error {
r.cltLock.Lock()
r.services[conf.Key()] = conf
r.cltLock.Unlock()
logger.Debugf("(consumerZkConsumerRegistry)Register(conf{%#v})", conf)
logger.Debugf("(ZkConsumerRegistry)Register(conf{%#v})", conf)
case common.PROVIDER:
......@@ -256,6 +256,25 @@ func (r *zkRegistry) Register(conf common.URL) error {
r.cltLock.Unlock()
logger.Debugf("(ZkProviderRegistry)Register(conf{%#v})", conf)
case common.ROUTER:
key := conf.String()
r.cltLock.Lock()
_, ok = r.services[key]
r.cltLock.Unlock()
if ok {
return perrors.Errorf("Path{%s} has been registered", conf.Path)
}
err = r.register(conf)
if err != nil {
return perrors.WithMessagef(err, "register(conf:%+v)", conf)
}
r.cltLock.Lock()
r.services[conf.Key()] = conf
r.cltLock.Unlock()
logger.Debugf("(ZkRouterRegistry)Register(conf{%#v})", conf)
}
return nil
......@@ -366,6 +385,23 @@ func (r *zkRegistry) register(c common.URL) error {
dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.CONSUMER)).String())
logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL)
case common.ROUTER:
//todo
dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), common.DubboNodes[common.ROUTER])
r.cltLock.Lock()
err = r.client.Create(dubboPath)
r.cltLock.Unlock()
if err != nil {
logger.Errorf("zkClient.create(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err))
return perrors.WithStack(err)
}
params.Add("protocol", c.Protocol)
params.Add("category", (common.RoleType(common.ROUTER)).String())
rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, c.Location, c.Path, params.Encode())
encodedURL = url.QueryEscape(rawURL)
logger.Debugf("router path:%s, url:%s", dubboPath, rawURL)
default:
return perrors.Errorf("@c{%v} type is not referencer or provider", c)
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment