Skip to content
Snippets Groups Projects
Commit 3e9cd120 authored by vito.he's avatar vito.he
Browse files

Mod:cluster some rename & fix a bug in registryDirectory

parent 25b5f9b0
No related branches found
No related tags found
No related merge requests found
...@@ -6,7 +6,7 @@ import ( ...@@ -6,7 +6,7 @@ import (
"github.com/dubbo/go-for-apache-dubbo/protocol" "github.com/dubbo/go-for-apache-dubbo/protocol"
) )
type FailoverCluster struct { type failoverCluster struct {
} }
const name = "failover" const name = "failover"
...@@ -16,9 +16,9 @@ func init() { ...@@ -16,9 +16,9 @@ func init() {
} }
func NewFailoverCluster() cluster.Cluster { func NewFailoverCluster() cluster.Cluster {
return &FailoverCluster{} return &failoverCluster{}
} }
func (cluster *FailoverCluster) Join(directory cluster.Directory) protocol.Invoker { func (cluster *failoverCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailoverClusterInvoker(directory) return newFailoverClusterInvoker(directory)
} }
...@@ -5,13 +5,13 @@ import ( ...@@ -5,13 +5,13 @@ import (
"github.com/dubbo/go-for-apache-dubbo/protocol" "github.com/dubbo/go-for-apache-dubbo/protocol"
) )
type MockCluster struct { type mockCluster struct {
} }
func NewMockCluster() cluster.Cluster { func NewMockCluster() cluster.Cluster {
return &MockCluster{} return &mockCluster{}
} }
func (cluster *MockCluster) Join(directory cluster.Directory) protocol.Invoker { func (cluster *mockCluster) Join(directory cluster.Directory) protocol.Invoker {
return protocol.NewBaseInvoker(directory.GetUrl()) return protocol.NewBaseInvoker(directory.GetUrl())
} }
...@@ -6,7 +6,7 @@ import ( ...@@ -6,7 +6,7 @@ import (
"github.com/dubbo/go-for-apache-dubbo/protocol" "github.com/dubbo/go-for-apache-dubbo/protocol"
) )
type RegistryAwareCluster struct { type registryAwareCluster struct {
} }
func init() { func init() {
...@@ -14,9 +14,9 @@ func init() { ...@@ -14,9 +14,9 @@ func init() {
} }
func NewRegistryAwareCluster() cluster.Cluster { func NewRegistryAwareCluster() cluster.Cluster {
return &RegistryAwareCluster{} return &registryAwareCluster{}
} }
func (cluster *RegistryAwareCluster) Join(directory cluster.Directory) protocol.Invoker { func (cluster *registryAwareCluster) Join(directory cluster.Directory) protocol.Invoker {
return newRegistryAwareClusterInvoker(directory) return newRegistryAwareClusterInvoker(directory)
} }
package directory
import (
"context"
"fmt"
"strings"
"time"
)
import (
jerrors "github.com/juju/errors"
)
import (
"github.com/dubbo/go-for-apache-dubbo/common"
)
//////////////////////////////////////////
// registry array
// should be returned by registry ,will be used by client & waiting to selector
//////////////////////////////////////////
var (
ErrServiceArrayEmpty = jerrors.New("registryArray empty")
ErrServiceArrayTimeout = jerrors.New("registryArray timeout")
)
type ServiceArray struct {
context context.Context
arr []common.URL
birth time.Time
idx int64
}
func NewServiceArray(ctx context.Context, arr []common.URL) *ServiceArray {
return &ServiceArray{
context: ctx,
arr: arr,
birth: time.Now(),
}
}
func (s *ServiceArray) GetIdx() *int64 {
return &s.idx
}
func (s *ServiceArray) GetSize() int64 {
return int64(len(s.arr))
}
func (s *ServiceArray) GetService(i int64) common.URL {
return s.arr[i]
}
func (s *ServiceArray) String() string {
var builder strings.Builder
builder.WriteString(fmt.Sprintf("birth:%s, idx:%d, arr len:%d, arr:{", s.birth, s.idx, len(s.arr)))
for i := range s.arr {
builder.WriteString(fmt.Sprintf("%d:%s, ", i, s.arr[i]))
}
builder.WriteString("}")
return builder.String()
}
func (s *ServiceArray) Add(url common.URL, ttl time.Duration) {
s.arr = append(s.arr, url)
s.birth = time.Now().Add(ttl)
}
func (s *ServiceArray) Del(url common.URL, ttl time.Duration) {
for i, svc := range s.arr {
if svc.PrimitiveURL == url.PrimitiveURL {
s.arr = append(s.arr[:i], s.arr[i+1:]...)
s.birth = time.Now().Add(ttl)
break
}
}
}
...@@ -5,20 +5,20 @@ import ( ...@@ -5,20 +5,20 @@ import (
"github.com/dubbo/go-for-apache-dubbo/protocol" "github.com/dubbo/go-for-apache-dubbo/protocol"
) )
type StaticDirectory struct { type staticDirectory struct {
BaseDirectory BaseDirectory
invokers []protocol.Invoker invokers []protocol.Invoker
} }
func NewStaticDirectory(invokers []protocol.Invoker) *StaticDirectory { func NewStaticDirectory(invokers []protocol.Invoker) *staticDirectory {
return &StaticDirectory{ return &staticDirectory{
BaseDirectory: NewBaseDirectory(&common.URL{}), BaseDirectory: NewBaseDirectory(&common.URL{}),
invokers: invokers, invokers: invokers,
} }
} }
//for-loop invokers ,if all invokers is available ,then it means directory is available //for-loop invokers ,if all invokers is available ,then it means directory is available
func (dir *StaticDirectory) IsAvailable() bool { func (dir *staticDirectory) IsAvailable() bool {
for _, invoker := range dir.invokers { for _, invoker := range dir.invokers {
if !invoker.IsAvailable() { if !invoker.IsAvailable() {
return false return false
...@@ -27,12 +27,12 @@ func (dir *StaticDirectory) IsAvailable() bool { ...@@ -27,12 +27,12 @@ func (dir *StaticDirectory) IsAvailable() bool {
return true return true
} }
func (dir *StaticDirectory) List(invocation protocol.Invocation) []protocol.Invoker { func (dir *staticDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
//TODO:Here should add router //TODO:Here should add router
return dir.invokers return dir.invokers
} }
func (dir *StaticDirectory) Destroy() { func (dir *staticDirectory) Destroy() {
dir.BaseDirectory.Destroy(func() { dir.BaseDirectory.Destroy(func() {
for _, ivk := range dir.invokers { for _, ivk := range dir.invokers {
ivk.Destroy() ivk.Destroy()
......
...@@ -17,14 +17,14 @@ func init() { ...@@ -17,14 +17,14 @@ func init() {
extension.SetLoadbalance(name, NewRandomLoadBalance) extension.SetLoadbalance(name, NewRandomLoadBalance)
} }
type RandomLoadBalance struct { type randomLoadBalance struct {
} }
func NewRandomLoadBalance() cluster.LoadBalance { func NewRandomLoadBalance() cluster.LoadBalance {
return &RandomLoadBalance{} return &randomLoadBalance{}
} }
func (lb *RandomLoadBalance) Select(invokers []protocol.Invoker, url common.URL, invocation protocol.Invocation) protocol.Invoker { func (lb *randomLoadBalance) Select(invokers []protocol.Invoker, url common.URL, invocation protocol.Invocation) protocol.Invoker {
var length int var length int
if length = len(invokers); length == 1 { if length = len(invokers); length == 1 {
return invokers[0] return invokers[0]
......
...@@ -27,7 +27,7 @@ type Options struct { ...@@ -27,7 +27,7 @@ type Options struct {
} }
type Option func(*Options) type Option func(*Options)
type RegistryDirectory struct { type registryDirectory struct {
directory.BaseDirectory directory.BaseDirectory
cacheInvokers []protocol.Invoker cacheInvokers []protocol.Invoker
listenerLock sync.Mutex listenerLock sync.Mutex
...@@ -38,7 +38,7 @@ type RegistryDirectory struct { ...@@ -38,7 +38,7 @@ type RegistryDirectory struct {
Options Options
} }
func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...Option) (*RegistryDirectory, error) { func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...Option) (*registryDirectory, error) {
options := Options{ options := Options{
//default 300s //default 300s
serviceTTL: time.Duration(300e9), serviceTTL: time.Duration(300e9),
...@@ -49,7 +49,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...O ...@@ -49,7 +49,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...O
if url.SubURL == nil { if url.SubURL == nil {
return nil, jerrors.Errorf("url is invalid, suburl can not be nil") return nil, jerrors.Errorf("url is invalid, suburl can not be nil")
} }
return &RegistryDirectory{ return &registryDirectory{
BaseDirectory: directory.NewBaseDirectory(url), BaseDirectory: directory.NewBaseDirectory(url),
cacheInvokers: []protocol.Invoker{}, cacheInvokers: []protocol.Invoker{},
cacheInvokersMap: &sync.Map{}, cacheInvokersMap: &sync.Map{},
...@@ -60,7 +60,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...O ...@@ -60,7 +60,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...O
} }
//subscibe from registry //subscibe from registry
func (dir *RegistryDirectory) Subscribe(url common.URL) { func (dir *registryDirectory) Subscribe(url common.URL) {
for { for {
if !dir.registry.IsAvailable() { if !dir.registry.IsAvailable() {
log.Warn("event listener game over.") log.Warn("event listener game over.")
...@@ -95,7 +95,7 @@ func (dir *RegistryDirectory) Subscribe(url common.URL) { ...@@ -95,7 +95,7 @@ func (dir *RegistryDirectory) Subscribe(url common.URL) {
} }
//subscribe service from registry , and update the cacheServices //subscribe service from registry , and update the cacheServices
func (dir *RegistryDirectory) update(res *registry.ServiceEvent) { func (dir *registryDirectory) update(res *registry.ServiceEvent) {
if res == nil { if res == nil {
return return
} }
...@@ -107,35 +107,34 @@ func (dir *RegistryDirectory) update(res *registry.ServiceEvent) { ...@@ -107,35 +107,34 @@ func (dir *RegistryDirectory) update(res *registry.ServiceEvent) {
dir.refreshInvokers(res) dir.refreshInvokers(res)
} }
func (dir *RegistryDirectory) refreshInvokers(res *registry.ServiceEvent) { func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
var newCacheInvokersMap sync.Map
switch res.Action { switch res.Action {
case registry.ServiceAdd: case registry.ServiceAdd:
//dir.cacheService.Add(res.Path, dir.serviceTTL) //dir.cacheService.Add(res.Path, dir.serviceTTL)
newCacheInvokersMap = *dir.cacheInvoker(res.Service) dir.cacheInvoker(res.Service)
case registry.ServiceDel: case registry.ServiceDel:
//dir.cacheService.Del(res.Path, dir.serviceTTL) //dir.cacheService.Del(res.Path, dir.serviceTTL)
newCacheInvokersMap = *dir.uncacheInvoker(res.Service) dir.uncacheInvoker(res.Service)
log.Info("selector delete service url{%s}", res.Service) log.Info("selector delete service url{%s}", res.Service)
default: default:
return return
} }
newInvokers := dir.toGroupInvokers(&newCacheInvokersMap) newInvokers := dir.toGroupInvokers()
dir.listenerLock.Lock() dir.listenerLock.Lock()
defer dir.listenerLock.Unlock() defer dir.listenerLock.Unlock()
dir.cacheInvokers = newInvokers dir.cacheInvokers = newInvokers
} }
func (dir *RegistryDirectory) toGroupInvokers(newInvokersMap *sync.Map) []protocol.Invoker { func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
newInvokersList := []protocol.Invoker{} newInvokersList := []protocol.Invoker{}
groupInvokersMap := make(map[string][]protocol.Invoker) groupInvokersMap := make(map[string][]protocol.Invoker)
groupInvokersList := []protocol.Invoker{} groupInvokersList := []protocol.Invoker{}
newInvokersMap.Range(func(key, value interface{}) bool { dir.cacheInvokersMap.Range(func(key, value interface{}) bool {
newInvokersList = append(newInvokersList, value.(protocol.Invoker)) newInvokersList = append(newInvokersList, value.(protocol.Invoker))
return true return true
}) })
...@@ -163,42 +162,38 @@ func (dir *RegistryDirectory) toGroupInvokers(newInvokersMap *sync.Map) []protoc ...@@ -163,42 +162,38 @@ func (dir *RegistryDirectory) toGroupInvokers(newInvokersMap *sync.Map) []protoc
return groupInvokersList return groupInvokersList
} }
func (dir *RegistryDirectory) uncacheInvoker(url common.URL) *sync.Map { func (dir *registryDirectory) uncacheInvoker(url common.URL) {
log.Debug("service will be deleted in cache invokers: invokers key is %s!", url.Key()) log.Debug("service will be deleted in cache invokers: invokers key is %s!", url.Key())
newCacheInvokers := dir.cacheInvokersMap dir.cacheInvokersMap.Delete(url.Key())
newCacheInvokers.Delete(url.Key())
return newCacheInvokers
} }
func (dir *RegistryDirectory) cacheInvoker(url common.URL) *sync.Map { func (dir *registryDirectory) cacheInvoker(url common.URL) {
referenceUrl := dir.GetUrl().SubURL referenceUrl := dir.GetUrl().SubURL
newCacheInvokers := dir.cacheInvokersMap
//check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol //check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol
if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" { if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
url = mergeUrl(url, referenceUrl) url = mergeUrl(url, referenceUrl)
if _, ok := newCacheInvokers.Load(url.Key()); !ok { if _, ok := dir.cacheInvokersMap.Load(url.Key()); !ok {
log.Debug("service will be added in cache invokers: invokers key is %s!", url.Key()) log.Debug("service will be added in cache invokers: invokers key is %s!", url.Key())
newInvoker := extension.GetProtocolExtension(protocolwrapper.FILTER).Refer(url) newInvoker := extension.GetProtocolExtension(protocolwrapper.FILTER).Refer(url)
if newInvoker != nil { if newInvoker != nil {
newCacheInvokers.Store(url.Key(), newInvoker) dir.cacheInvokersMap.Store(url.Key(), newInvoker)
} }
} }
} }
return newCacheInvokers
} }
//select the protocol invokers from the directory //select the protocol invokers from the directory
func (dir *RegistryDirectory) List(invocation protocol.Invocation) []protocol.Invoker { func (dir *registryDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
//TODO:router //TODO:router
return dir.cacheInvokers return dir.cacheInvokers
} }
func (dir *RegistryDirectory) IsAvailable() bool { func (dir *registryDirectory) IsAvailable() bool {
return dir.BaseDirectory.IsAvailable() return dir.BaseDirectory.IsAvailable()
} }
func (dir *RegistryDirectory) Destroy() { func (dir *registryDirectory) Destroy() {
//TODO:unregister & unsubscribe //TODO:unregister & unsubscribe
dir.BaseDirectory.Destroy(func() { dir.BaseDirectory.Destroy(func() {
for _, ivk := range dir.cacheInvokers { for _, ivk := range dir.cacheInvokers {
......
...@@ -98,7 +98,7 @@ func Test_List(t *testing.T) { ...@@ -98,7 +98,7 @@ func Test_List(t *testing.T) {
} }
func normalRegistryDir() (*RegistryDirectory, *registry.MockRegistry) { func normalRegistryDir() (*registryDirectory, *registry.MockRegistry) {
extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter) extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter)
url, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111") url, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111")
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment