Skip to content
Snippets Groups Projects
Commit 48e6b918 authored by lzp0412's avatar lzp0412 Committed by GitHub
Browse files

Merge pull request #1 from apache/feature/dubbo-2.7.5

Feature/dubbo 2.7.5
parents bdd57ef5 95a5a127
No related branches found
No related tags found
No related merge requests found
Showing
with 711 additions and 107 deletions
......@@ -26,6 +26,7 @@ const (
VERSION_KEY = "version"
INTERFACE_KEY = "interface"
PATH_KEY = "path"
PROTOCOL_KEY = "protocol"
SERVICE_KEY = "service"
METHODS_KEY = "methods"
TIMEOUT_KEY = "timeout"
......@@ -40,6 +41,7 @@ const (
TOKEN_KEY = "token"
LOCAL_ADDR = "local-addr"
REMOTE_ADDR = "remote-addr"
PATH_SEPARATOR = "/"
DUBBO_KEY = "dubbo"
RELEASE_KEY = "release"
ANYHOST_KEY = "anyhost"
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/observer"
)
var (
globalEventDispatcher observer.EventDispatcher
initEventListeners []observer.EventListener
)
var (
dispatchers = make(map[string]func() observer.EventDispatcher, 8)
)
// SetEventDispatcher by name
func SetEventDispatcher(name string, v func() observer.EventDispatcher) {
dispatchers[name] = v
}
// SetAndInitGlobalDispatcher
func SetAndInitGlobalDispatcher(name string) {
if len(name) == 0 {
name = "direct"
}
if globalEventDispatcher != nil {
logger.Warnf("EventDispatcher already init. It will be replaced")
}
if dp, ok := dispatchers[name]; !ok || dp == nil {
panic("EventDispatcher for " + name + " is not existing, make sure you have import the package.")
}
globalEventDispatcher = dispatchers[name]()
globalEventDispatcher.AddEventListeners(initEventListeners)
}
// GetGlobalDispatcher
func GetGlobalDispatcher() observer.EventDispatcher {
return globalEventDispatcher
}
// AddEventListener it will be added in global event dispatcher
func AddEventListener(listener observer.EventListener) {
initEventListeners = append(initEventListeners, listener)
}
......@@ -18,20 +18,20 @@
package extension
import (
"github.com/apache/dubbo-go/metadata"
"github.com/apache/dubbo-go/metadata/report/factory"
)
var (
metaDataReportFactories = make(map[string]func() metadata.MetadataReportFactory, 8)
metaDataReportFactories = make(map[string]func() factory.MetadataReportFactory, 8)
)
// SetMetadataReportFactory ...
func SetMetadataReportFactory(name string, v func() metadata.MetadataReportFactory) {
func SetMetadataReportFactory(name string, v func() factory.MetadataReportFactory) {
metaDataReportFactories[name] = v
}
// GetMetadataReportFactory ...
func GetMetadataReportFactory(name string) metadata.MetadataReportFactory {
func GetMetadataReportFactory(name string) factory.MetadataReportFactory {
if metaDataReportFactories[name] == nil {
panic("metadata report for " + name + " is not existing, make sure you have import the package.")
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dispatcher
import (
"reflect"
)
import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/observer"
)
func init() {
extension.SetEventDispatcher("direct", NewDirectEventDispatcher)
}
// DirectEventDispatcher is align with DirectEventDispatcher interface in Java.
// it's the top abstraction
// Align with 2.7.5
// Dispatcher event to listener direct
type DirectEventDispatcher struct {
observer.BaseListenable
}
// NewDirectEventDispatcher ac constructor of DirectEventDispatcher
func NewDirectEventDispatcher() observer.EventDispatcher {
return &DirectEventDispatcher{}
}
// Dispatch event directly
func (ded *DirectEventDispatcher) Dispatch(event observer.Event) {
if event == nil {
logger.Warnf("[DirectEventDispatcher] dispatch event nil")
return
}
eventType := reflect.TypeOf(event).Elem()
value, loaded := ded.ListenersCache.Load(eventType)
if !loaded {
return
}
listenersSlice := value.([]observer.EventListener)
for _, listener := range listenersSlice {
if err := listener.OnEvent(event); err != nil {
logger.Warnf("[DirectEventDispatcher] dispatch event error:%v", err)
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dispatcher
import (
"fmt"
"reflect"
"testing"
)
import (
"github.com/apache/dubbo-go/common/observer"
)
func TestDirectEventDispatcher_Dispatch(t *testing.T) {
ded := NewDirectEventDispatcher()
ded.AddEventListener(&TestEventListener{})
ded.AddEventListener(&TestEventListener1{})
ded.Dispatch(&TestEvent{})
ded.Dispatch(nil)
}
type TestEvent struct {
observer.BaseEvent
}
type TestEventListener struct {
observer.BaseListenable
observer.EventListener
}
func (tel *TestEventListener) OnEvent(e observer.Event) error {
fmt.Println("TestEventListener")
return nil
}
func (tel *TestEventListener) GetPriority() int {
return -1
}
func (tel *TestEventListener) GetEventType() reflect.Type {
return reflect.TypeOf(&TestEvent{})
}
type TestEventListener1 struct {
observer.EventListener
}
func (tel *TestEventListener1) OnEvent(e observer.Event) error {
fmt.Println("TestEventListener1")
return nil
}
func (tel *TestEventListener1) GetPriority() int {
return 1
}
func (tel *TestEventListener1) GetEventType() reflect.Type {
return reflect.TypeOf(TestEvent{})
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package observer
import (
"fmt"
"math/rand"
"time"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
// Event is align with Event interface in Java.
// it's the top abstraction
// Align with 2.7.5
type Event interface {
fmt.Stringer
GetSource() interface{}
GetTimestamp() time.Time
}
// BaseEvent is the base implementation of Event
// You should never use it directly
type BaseEvent struct {
Source interface{}
Timestamp time.Time
}
// GetSource return the source
func (b *BaseEvent) GetSource() interface{} {
return b.Source
}
// GetTimestamp return the Timestamp when the event is created
func (b *BaseEvent) GetTimestamp() time.Time {
return b.Timestamp
}
// String return a human readable string representing this event
func (b *BaseEvent) String() string {
return fmt.Sprintf("BaseEvent[source = %#v]", b.Source)
}
func newBaseEvent(source interface{}) *BaseEvent {
return &BaseEvent{
Source: source,
Timestamp: time.Now(),
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package observer
// EventDispatcher is align with EventDispatcher interface in Java.
// it's the top abstraction
// Align with 2.7.5
type EventDispatcher interface {
Listenable
// Dispatch event
Dispatch(event Event)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package observer
import (
"reflect"
)
import (
gxsort "github.com/dubbogo/gost/sort"
)
// EventListener is an new interface used to align with dubbo 2.7.5
// It contains the Prioritized means that the listener has its priority
type EventListener interface {
gxsort.Prioritizer
// OnEvent handle this event
OnEvent(e Event) error
// GetEventType listen which event type
GetEventType() reflect.Type
}
// ConditionalEventListener only handle the event which it can handle
type ConditionalEventListener interface {
EventListener
// Accept will make the decision whether it should handle this event
Accept(e Event) bool
}
// TODO (implement ConditionalEventListener)
type ServiceInstancesChangedListener struct {
ServiceName string
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package observer
import (
"reflect"
"sort"
"sync"
)
// Listenable could add and remove the event listener
type Listenable interface {
AddEventListener(listener EventListener)
AddEventListeners(listenersSlice []EventListener)
RemoveEventListener(listener EventListener)
RemoveEventListeners(listenersSlice []EventListener)
GetAllEventListeners() []EventListener
RemoveAllEventListeners()
}
// BaseListenable base listenable
type BaseListenable struct {
Listenable
ListenersCache sync.Map
Mutex sync.Mutex
}
// NewBaseListenable a constructor of base listenable
func NewBaseListenable() Listenable {
return &BaseListenable{}
}
// AddEventListener add event listener
func (bl *BaseListenable) AddEventListener(listener EventListener) {
eventType := listener.GetEventType()
if eventType.Kind() == reflect.Ptr {
eventType = eventType.Elem()
}
bl.Mutex.Lock()
defer bl.Mutex.Unlock()
value, loaded := bl.ListenersCache.LoadOrStore(eventType, make([]EventListener, 0, 8))
listenersSlice := value.([]EventListener)
// return if listenersSlice already has this listener
if loaded && containListener(listenersSlice, listener) {
return
}
listenersSlice = append(listenersSlice, listener)
sort.Slice(listenersSlice, func(i, j int) bool {
return listenersSlice[i].GetPriority() < listenersSlice[j].GetPriority()
})
bl.ListenersCache.Store(eventType, listenersSlice)
}
// AddEventListeners add the slice of event listener
func (bl *BaseListenable) AddEventListeners(listenersSlice []EventListener) {
for _, listener := range listenersSlice {
bl.AddEventListener(listener)
}
}
// RemoveEventListener remove the event listener
func (bl *BaseListenable) RemoveEventListener(listener EventListener) {
eventType := listener.GetEventType()
if eventType.Kind() == reflect.Ptr {
eventType = eventType.Elem()
}
bl.Mutex.Lock()
defer bl.Mutex.Unlock()
value, loaded := bl.ListenersCache.Load(eventType)
if !loaded {
return
}
listenersSlice := value.([]EventListener)
for i, l := range listenersSlice {
if l == listener {
listenersSlice = append(listenersSlice[:i], listenersSlice[i+1:]...)
}
}
bl.ListenersCache.Store(eventType, listenersSlice)
}
// RemoveEventListeners remove the slice of event listener
func (bl *BaseListenable) RemoveEventListeners(listenersSlice []EventListener) {
for _, listener := range listenersSlice {
bl.RemoveEventListener(listener)
}
}
// RemoveAllEventListeners remove all
func (bl *BaseListenable) RemoveAllEventListeners() {
bl.Mutex.Lock()
defer bl.Mutex.Unlock()
bl.ListenersCache = sync.Map{}
}
// GetAllEventListeners get all
func (bl *BaseListenable) GetAllEventListeners() []EventListener {
allListenersSlice := make([]EventListener, 0, 16)
bl.ListenersCache.Range(func(_, value interface{}) bool {
listenersSlice := value.([]EventListener)
allListenersSlice = append(allListenersSlice, listenersSlice...)
return true
})
sort.Slice(allListenersSlice, func(i, j int) bool {
return allListenersSlice[i].GetPriority() < allListenersSlice[j].GetPriority()
})
return allListenersSlice
}
// containListener true if contain listener
func containListener(listenersSlice []EventListener, listener EventListener) bool {
for _, loadListener := range listenersSlice {
if loadListener == listener {
return true
}
}
return false
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package observer
import (
"reflect"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
func TestListenable(t *testing.T) {
el := &TestEventListener{}
b := &BaseListenable{}
b.AddEventListener(el)
b.AddEventListener(el)
al := b.GetAllEventListeners()
assert.Equal(t, len(al), 1)
assert.Equal(t, al[0].GetEventType(), reflect.TypeOf(TestEvent{}))
b.RemoveEventListener(el)
assert.Equal(t, len(b.GetAllEventListeners()), 0)
var ts []EventListener
ts = append(ts, el)
b.AddEventListeners(ts)
assert.Equal(t, len(al), 1)
}
type TestEvent struct {
BaseEvent
}
type TestEventListener struct {
EventListener
}
func (tel *TestEventListener) OnEvent(e Event) error {
return nil
}
func (tel *TestEventListener) GetPriority() int {
return -1
}
func (tel *TestEventListener) GetEventType() reflect.Type {
return reflect.TypeOf(TestEvent{})
}
......@@ -133,6 +133,11 @@ func (s *Service) Method() map[string]*MethodType {
return s.methods
}
// Name will return service name
func (s *Service) Name() string {
return s.name
}
// RcvrType ...
func (s *Service) RcvrType() reflect.Type {
return s.rcvrType
......
......@@ -42,14 +42,13 @@ type multiConfiger interface {
// BaseConfig is the common configuration for provider and consumer
type BaseConfig struct {
ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"`
configCenterUrl *common.URL
prefix string
fatherConfig interface{}
MetricConfig *MetricConfig `yaml:"metrics" json:"metrics,omitempty"`
fileStream *bytes.Buffer
ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"`
configCenterUrl *common.URL
prefix string
fatherConfig interface{}
eventDispatcherType string `default:"direct" yaml:"event_dispatcher_type" json:"event_dispatcher_type,omitempty"`
MetricConfig *MetricConfig `yaml:"metrics" json:"metrics,omitempty"`
fileStream *bytes.Buffer
}
// startConfigCenter will start the config center.
......
......@@ -53,15 +53,6 @@ func Test_refresh(t *testing.T) {
Owner: "dubbo",
Environment: "test"},
Registries: map[string]*RegistryConfig{
//"shanghai_reg1": {
// id: "shanghai_reg1",
// Protocol: "mock",
// TimeoutStr: "2s",
// Group: "shanghai_idc",
// Address: "127.0.0.1:2181",
// Username: "user1",
// Password: "pwd1",
//},
"shanghai_reg2": {
Protocol: "mock",
TimeoutStr: "2s",
......@@ -156,15 +147,6 @@ func Test_appExternal_refresh(t *testing.T) {
Owner: "dubbo",
Environment: "test"},
Registries: map[string]*RegistryConfig{
//"shanghai_reg1": {
// id: "shanghai_reg1",
// Protocol: "mock",
// TimeoutStr: "2s",
// Group: "shanghai_idc",
// Address: "127.0.0.1:2181",
// Username: "user1",
// Password: "pwd1",
//},
"shanghai_reg2": {
Protocol: "mock",
TimeoutStr: "2s",
......@@ -251,15 +233,6 @@ func Test_appExternalWithoutId_refresh(t *testing.T) {
Owner: "dubbo",
Environment: "test"},
Registries: map[string]*RegistryConfig{
//"shanghai_reg1": {
// id: "shanghai_reg1",
// Protocol: "mock",
// TimeoutStr: "2s",
// Group: "shanghai_idc",
// Address: "127.0.0.1:2181",
// Username: "user1",
// Password: "pwd1",
//},
"shanghai_reg2": {
Protocol: "mock",
TimeoutStr: "2s",
......@@ -408,15 +381,6 @@ func Test_refreshProvider(t *testing.T) {
Owner: "dubbo",
Environment: "test"},
Registries: map[string]*RegistryConfig{
//"shanghai_reg1": {
// id: "shanghai_reg1",
// Protocol: "mock",
// TimeoutStr: "2s",
// Group: "shanghai_idc",
// Address: "127.0.0.1:2181",
// Username: "user1",
// Password: "pwd1",
//},
"shanghai_reg2": {
Protocol: "mock",
TimeoutStr: "2s",
......
......@@ -33,6 +33,7 @@ import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
_ "github.com/apache/dubbo-go/common/observer/dispatcher"
)
var (
......@@ -91,6 +92,17 @@ func Load() {
}
}
var eventDispatcherType string
if consumerConfig != nil {
eventDispatcherType = consumerConfig.eventDispatcherType
}
// notice consumerConfig.eventDispatcherType will be replaced
if providerConfig != nil {
eventDispatcherType = providerConfig.eventDispatcherType
}
// init EventDispatcher should before everything
extension.SetAndInitGlobalDispatcher(eventDispatcherType)
// reference config
if consumerConfig == nil {
logger.Warnf("consumerConfig is nil!")
......@@ -198,6 +210,7 @@ func Load() {
}
svs.id = key
svs.Implement(rpcService)
svs.Protocols = providerConfig.Protocols
if err := svs.Export(); err != nil {
panic(fmt.Sprintf("service %s export failed! err: %#v", key, err))
}
......
......@@ -24,6 +24,7 @@ import (
import (
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
)
import (
......@@ -90,7 +91,7 @@ func TestLoad(t *testing.T) {
func TestLoadWithSingleReg(t *testing.T) {
doInitConsumerWithSingleRegistry()
doInitProviderWithSingleRegistry()
mockInitProviderWithSingleRegistry()
ms := &MockService{}
SetConsumerService(ms)
......@@ -233,3 +234,55 @@ func TestConfigLoaderWithConfigCenterSingleRegistry(t *testing.T) {
assert.Equal(t, "mock://127.0.0.1:2182", consumerConfig.Registries[constant.DEFAULT_KEY].Address)
}
// mockInitProviderWithSingleRegistry will init a mocked providerConfig
func mockInitProviderWithSingleRegistry() {
providerConfig = &ProviderConfig{
ApplicationConfig: &ApplicationConfig{
Organization: "dubbo_org",
Name: "dubbo",
Module: "module",
Version: "1.0.0",
Owner: "dubbo",
Environment: "test"},
Registry: &RegistryConfig{
Address: "mock://127.0.0.1:2181",
Username: "user1",
Password: "pwd1",
},
Registries: map[string]*RegistryConfig{},
Services: map[string]*ServiceConfig{
"MockService": {
InterfaceName: "com.MockService",
Protocol: "mock",
Cluster: "failover",
Loadbalance: "random",
Retries: "3",
Group: "huadong_idc",
Version: "1.0.0",
Methods: []*MethodConfig{
{
Name: "GetUser",
Retries: "2",
Loadbalance: "random",
Weight: 200,
},
{
Name: "GetUser1",
Retries: "2",
Loadbalance: "random",
Weight: 200,
},
},
exported: new(atomic.Bool),
},
},
Protocols: map[string]*ProtocolConfig{
"mock": {
Name: "mock",
Ip: "127.0.0.1",
Port: "20000",
},
},
}
}
......@@ -24,16 +24,16 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/metadata"
"github.com/apache/dubbo-go/metadata/report"
)
var (
instance metadata.MetadataReport
instance report.MetadataReport
once sync.Once
)
// GetMetadataReportInstance ...
func GetMetadataReportInstance(url *common.URL) metadata.MetadataReport {
func GetMetadataReportInstance(url *common.URL) report.MetadataReport {
once.Do(func() {
instance = extension.GetMetadataReportFactory(url.Protocol).CreateMetadataReport(url)
})
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import "testing"
......
......@@ -71,11 +71,15 @@ type ServiceConfig struct {
ParamSign string `yaml:"param.sign" json:"param.sign,omitempty" property:"param.sign"`
Tag string `yaml:"tag" json:"tag,omitempty" property:"tag"`
Protocols map[string]*ProtocolConfig
unexported *atomic.Bool
exported *atomic.Bool
rpcService common.RPCService
cacheProtocol protocol.Protocol
cacheMutex sync.Mutex
cacheProtocol protocol.Protocol
exportersLock sync.Mutex
exporters []protocol.Exporter
}
// Prefix ...
......@@ -92,6 +96,8 @@ func (c *ServiceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
if err := unmarshal((*plain)(c)); err != nil {
return err
}
c.exported = atomic.NewBool(false)
c.unexported = atomic.NewBool(false)
return nil
}
......@@ -105,6 +111,16 @@ func NewServiceConfig(id string, context context.Context) *ServiceConfig {
}
}
// InitExported will set exported as false atom bool
func (c *ServiceConfig) InitExported() {
c.exported = atomic.NewBool(false)
}
// IsExport will return whether the service config is exported or not
func (c *ServiceConfig) IsExport() bool {
return c.exported.Load()
}
// Export ...
func (c *ServiceConfig) Export() error {
// TODO: config center start here
......@@ -122,7 +138,7 @@ func (c *ServiceConfig) Export() error {
regUrls := loadRegistries(c.Registry, providerConfig.Registries, common.PROVIDER)
urlMap := c.getUrlMap()
protocolConfigs := loadProtocol(c.Protocol, providerConfig.Protocols)
protocolConfigs := loadProtocol(c.Protocol, c.Protocols)
if len(protocolConfigs) == 0 {
logger.Warnf("The service %v's '%v' protocols don't has right protocolConfigs ", c.InterfaceName, c.Protocol)
return nil
......@@ -148,6 +164,9 @@ func (c *ServiceConfig) Export() error {
if len(c.Tag) > 0 {
ivkURL.AddParam(constant.Tagkey, c.Tag)
}
var exporter protocol.Exporter
if len(regUrls) > 0 {
for _, regUrl := range regUrls {
regUrl.SubURL = ivkURL
......@@ -160,22 +179,46 @@ func (c *ServiceConfig) Export() error {
c.cacheMutex.Unlock()
invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*regUrl)
exporter := c.cacheProtocol.Export(invoker)
exporter = c.cacheProtocol.Export(invoker)
if exporter == nil {
panic(perrors.New(fmt.Sprintf("Registry protocol new exporter error,registry is {%v},url is {%v}", regUrl, ivkURL)))
}
}
} else {
invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*ivkURL)
exporter := extension.GetProtocol(protocolwrapper.FILTER).Export(invoker)
exporter = extension.GetProtocol(protocolwrapper.FILTER).Export(invoker)
if exporter == nil {
panic(perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error,url is {%v}", ivkURL)))
}
}
c.exporters = append(c.exporters, exporter)
}
c.exported.Store(true)
return nil
}
// Unexport will call unexport of all exporters service config exported
func (c *ServiceConfig) Unexport() {
if !c.exported.Load() {
return
}
if c.unexported.Load() {
return
}
func() {
c.exportersLock.Lock()
defer c.exportersLock.Unlock()
for _, exporter := range c.exporters {
exporter.Unexport()
}
c.exporters = nil
}()
c.exported.Store(false)
c.unexported.Store(true)
}
// Implement ...
func (c *ServiceConfig) Implement(s common.RPCService) {
c.rpcService = s
......@@ -245,3 +288,16 @@ func (c *ServiceConfig) getUrlMap() url.Values {
return urlMap
}
// GetExportedUrls will return the url in service config's exporter
func (c *ServiceConfig) GetExportedUrls() []*common.URL {
if c.exported.Load() {
var urls []*common.URL
for _, exporter := range c.exporters {
url := exporter.GetInvoker().GetUrl()
urls = append(urls, &url)
}
return urls
}
return nil
}
......@@ -21,6 +21,10 @@ import (
"testing"
)
import (
"go.uber.org/atomic"
)
import (
"github.com/apache/dubbo-go/common/extension"
)
......@@ -92,6 +96,7 @@ func doInitProvider() {
Weight: 200,
},
},
exported: new(atomic.Bool),
},
"MockServiceNoRightProtocol": {
InterfaceName: "com.MockService",
......@@ -116,56 +121,7 @@ func doInitProvider() {
Weight: 200,
},
},
},
},
Protocols: map[string]*ProtocolConfig{
"mock": {
Name: "mock",
Ip: "127.0.0.1",
Port: "20000",
},
},
}
}
func doInitProviderWithSingleRegistry() {
providerConfig = &ProviderConfig{
ApplicationConfig: &ApplicationConfig{
Organization: "dubbo_org",
Name: "dubbo",
Module: "module",
Version: "2.6.0",
Owner: "dubbo",
Environment: "test"},
Registry: &RegistryConfig{
Address: "mock://127.0.0.1:2181",
Username: "user1",
Password: "pwd1",
},
Registries: map[string]*RegistryConfig{},
Services: map[string]*ServiceConfig{
"MockService": {
InterfaceName: "com.MockService",
Protocol: "mock",
Cluster: "failover",
Loadbalance: "random",
Retries: "3",
Group: "huadong_idc",
Version: "1.0.0",
Methods: []*MethodConfig{
{
Name: "GetUser",
Retries: "2",
Loadbalance: "random",
Weight: 200,
},
{
Name: "GetUser1",
Retries: "2",
Loadbalance: "random",
Weight: 200,
},
},
exported: new(atomic.Bool),
},
},
Protocols: map[string]*ProtocolConfig{
......
module github.com/apache/dubbo-go
require (
github.com/Workiva/go-datastructures v1.0.50
github.com/Workiva/go-datastructures v1.0.52
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e // indirect
github.com/apache/dubbo-go-hessian2 v1.4.0
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment