Skip to content
Snippets Groups Projects
Commit d95f61ad authored by fangyincheng's avatar fangyincheng
Browse files

Meg:upstream/feature/dubbo

parents 4d872e64 29180ff0
No related branches found
No related tags found
No related merge requests found
Showing
with 275 additions and 212 deletions
......@@ -2,7 +2,6 @@ package invoker
import (
"context"
"github.com/dubbo/dubbo-go/dubbo"
"sync"
"time"
)
......@@ -13,15 +12,15 @@ import (
)
import (
"github.com/dubbo/dubbo-go/client/loadBalance"
"github.com/dubbo/dubbo-go/client/selector"
"github.com/dubbo/dubbo-go/dubbo"
"github.com/dubbo/dubbo-go/jsonrpc"
"github.com/dubbo/dubbo-go/registry"
"github.com/dubbo/dubbo-go/service"
)
type Options struct {
ServiceTTL time.Duration
selector loadBalance.Selector
selector selector.Selector
//TODO:we should provider a transport client interface
HttpClient *jsonrpc.HTTPClient
DubboClient *dubbo.Client
......@@ -45,7 +44,7 @@ func WithDubboClient(client *dubbo.Client) Option {
}
}
func WithLBSelector(selector loadBalance.Selector) Option {
func WithLBSelector(selector selector.Selector) Option {
return func(o *Options) {
o.selector = selector
}
......@@ -62,7 +61,7 @@ func NewInvoker(registry registry.Registry, opts ...Option) (*Invoker, error) {
options := Options{
//default 300s
ServiceTTL: time.Duration(300e9),
selector: loadBalance.NewRandomSelector(),
selector: selector.NewRandomSelector(),
}
for _, opt := range opts {
opt(&options)
......@@ -99,77 +98,77 @@ func (ivk *Invoker) listen() {
}
}
func (ivk *Invoker) update(res *registry.ServiceURLEvent) {
func (ivk *Invoker) update(res *registry.ServiceEvent) {
if res == nil || res.Service == nil {
return
}
log.Debug("registry update, result{%s}", res)
serviceKey := res.Service.ServiceConfig().Key()
registryKey := res.Service.ServiceConfig().Key()
ivk.listenerLock.Lock()
defer ivk.listenerLock.Unlock()
svcArr, ok := ivk.cacheServiceMap[serviceKey]
log.Debug("service name:%s, its current member lists:%+v", serviceKey, svcArr)
svcArr, ok := ivk.cacheServiceMap[registryKey]
log.Debug("registry name:%s, its current member lists:%+v", registryKey, svcArr)
switch res.Action {
case registry.ServiceURLAdd:
case registry.ServiceAdd:
if ok {
svcArr.add(res.Service, ivk.ServiceTTL)
} else {
ivk.cacheServiceMap[serviceKey] = newServiceArray([]*service.ServiceURL{res.Service})
ivk.cacheServiceMap[registryKey] = newServiceArray([]*registry.ServiceURL{res.Service})
}
case registry.ServiceURLDel:
case registry.ServiceDel:
if ok {
svcArr.del(res.Service, ivk.ServiceTTL)
if len(svcArr.arr) == 0 {
delete(ivk.cacheServiceMap, serviceKey)
log.Warn("delete service %s from service map", serviceKey)
delete(ivk.cacheServiceMap, registryKey)
log.Warn("delete registry %s from registry map", registryKey)
}
}
log.Error("selector delete serviceURL{%s}", *res.Service)
log.Error("selector delete registryURL{%s}", *res.Service)
}
}
func (ivk *Invoker) getService(serviceConf *service.ServiceConfig) (*ServiceArray, error) {
func (ivk *Invoker) getService(registryConf *registry.ServiceConfig) (*ServiceArray, error) {
defer ivk.listenerLock.Unlock()
serviceKey := serviceConf.Key()
registryKey := registryConf.Key()
ivk.listenerLock.Lock()
svcArr, sok := ivk.cacheServiceMap[serviceKey]
log.Debug("r.svcArr[serviceString{%v}] = svcArr{%s}", serviceKey, svcArr)
svcArr, sok := ivk.cacheServiceMap[registryKey]
log.Debug("r.svcArr[registryString{%v}] = svcArr{%s}", registryKey, svcArr)
if sok && time.Since(svcArr.birth) < ivk.Options.ServiceTTL {
return svcArr, nil
}
ivk.listenerLock.Unlock()
svcs, err := ivk.registry.GetService(serviceConf)
svcs, err := ivk.registry.GetService(registryConf)
ivk.listenerLock.Lock()
if err != nil {
log.Error("Registry.get(conf:%+v) = {err:%s, svcs:%+v}",
serviceConf, jerrors.ErrorStack(err), svcs)
registryConf, jerrors.ErrorStack(err), svcs)
return nil, jerrors.Trace(err)
}
newSvcArr := newServiceArray(svcs)
ivk.cacheServiceMap[serviceKey] = newSvcArr
ivk.cacheServiceMap[registryKey] = newSvcArr
return newSvcArr, nil
}
func (ivk *Invoker) HttpCall(ctx context.Context, reqId int64, serviceConf *service.ServiceConfig, req jsonrpc.Request, resp interface{}) error {
func (ivk *Invoker) HttpCall(ctx context.Context, reqId int64, registryConf *registry.ServiceConfig, req jsonrpc.Request, resp interface{}) error {
serviceArray, err := ivk.getService(serviceConf)
registryArray, err := ivk.getService(registryConf)
if err != nil {
return err
}
if len(serviceArray.arr) == 0 {
return jerrors.New("cannot find svc " + serviceConf.String())
if len(registryArray.arr) == 0 {
return jerrors.New("cannot find svc " + registryConf.String())
}
url, err := ivk.selector.Select(reqId, serviceArray)
url, err := ivk.selector.Select(reqId, registryArray)
if err != nil {
return err
}
......@@ -181,16 +180,16 @@ func (ivk *Invoker) HttpCall(ctx context.Context, reqId int64, serviceConf *serv
return nil
}
func (ivk *Invoker) DubboCall(reqId int64, serviceConf *service.ServiceConfig, method string, args, reply interface{}, opts ...dubbo.CallOption) error {
func (ivk *Invoker) DubboCall(reqId int64, registryConf *registry.ServiceConfig, method string, args, reply interface{}, opts ...dubbo.CallOption) error {
serviceArray, err := ivk.getService(serviceConf)
registryArray, err := ivk.getService(registryConf)
if err != nil {
return err
}
if len(serviceArray.arr) == 0 {
return jerrors.New("cannot find svc " + serviceConf.String())
if len(registryArray.arr) == 0 {
return jerrors.New("cannot find svc " + registryConf.String())
}
url, err := ivk.selector.Select(reqId, serviceArray)
url, err := ivk.selector.Select(reqId, registryArray)
if err != nil {
return err
}
......
......@@ -11,26 +11,26 @@ import (
)
import (
"github.com/dubbo/dubbo-go/service"
"github.com/dubbo/dubbo-go/registry"
)
//////////////////////////////////////////
// service array
// should be returned by registry ,will be used by client & waiting to loadBalance
// registry array
// should be returned by registry ,will be used by client & waiting to selector
//////////////////////////////////////////
var (
ErrServiceArrayEmpty = jerrors.New("serviceArray empty")
ErrServiceArrayTimeout = jerrors.New("serviceArray timeout")
ErrServiceArrayEmpty = jerrors.New("registryArray empty")
ErrServiceArrayTimeout = jerrors.New("registryArray timeout")
)
type ServiceArray struct {
arr []*service.ServiceURL
arr []*registry.ServiceURL
birth time.Time
idx int64
}
func newServiceArray(arr []*service.ServiceURL) *ServiceArray {
func newServiceArray(arr []*registry.ServiceURL) *ServiceArray {
return &ServiceArray{
arr: arr,
birth: time.Now(),
......@@ -40,10 +40,12 @@ func newServiceArray(arr []*service.ServiceURL) *ServiceArray {
func (s *ServiceArray) GetIdx() *int64 {
return &s.idx
}
func (s *ServiceArray) GetSize() int {
return len(s.arr)
func (s *ServiceArray) GetSize() int64 {
return int64(len(s.arr))
}
func (s *ServiceArray) GetService(i int) *service.ServiceURL {
func (s *ServiceArray) GetService(i int64) *registry.ServiceURL {
return s.arr[i]
}
......@@ -58,14 +60,14 @@ func (s *ServiceArray) String() string {
return builder.String()
}
func (s *ServiceArray) add(service *service.ServiceURL, ttl time.Duration) {
s.arr = append(s.arr, service)
func (s *ServiceArray) add(registry *registry.ServiceURL, ttl time.Duration) {
s.arr = append(s.arr, registry)
s.birth = time.Now().Add(ttl)
}
func (s *ServiceArray) del(service *service.ServiceURL, ttl time.Duration) {
func (s *ServiceArray) del(registry *registry.ServiceURL, ttl time.Duration) {
for i, svc := range s.arr {
if svc.PrimitiveURL == service.PrimitiveURL {
if svc.PrimitiveURL == registry.PrimitiveURL {
s.arr = append(s.arr[:i], s.arr[i+1:]...)
s.birth = time.Now().Add(ttl)
break
......
package loadBalance
import (
"github.com/dubbo/dubbo-go/client"
"github.com/dubbo/dubbo-go/service"
)
type Selector interface {
Select(ID int64, array client.ServiceArrayIf) (*service.ServiceURL, error)
}
//////////////////////////////////////////
// load balancer mode
//////////////////////////////////////////
//
//// Mode defines the algorithm of selecting a provider from cluster
//type Mode int
//
//const (
// SM_BEGIN Mode = iota
// SM_Random
// SM_RoundRobin
// SM_END
//)
//
//var modeStrings = [...]string{
// "Begin",
// "Random",
// "RoundRobin",
// "End",
//}
//
//func (s Mode) String() string {
// if SM_BEGIN < s && s < SM_END {
// return modeStrings[s]
// }
//
// return ""
//}
package loadBalance
package selector
import (
"math/rand"
......@@ -7,21 +7,21 @@ import (
import (
"github.com/dubbo/dubbo-go/client"
"github.com/dubbo/dubbo-go/service"
"github.com/dubbo/dubbo-go/registry"
)
type RandomSelector struct {
}
type RandomSelector struct{}
func NewRandomSelector() Selector {
return &RandomSelector{}
}
func (s *RandomSelector) Select(ID int64, array client.ServiceArrayIf) (*service.ServiceURL, error) {
func (s *RandomSelector) Select(ID int64, array client.ServiceArrayIf) (*registry.ServiceURL, error) {
if array.GetSize() == 0 {
return nil, ServiceArrayEmpty
}
idx := atomic.AddInt64(array.GetIdx(), 1)
idx = ((int64)(rand.Int()) + ID) % int64(array.GetSize())
return array.GetService(int(idx)), nil
idx = ((int64)(rand.Int()) + ID) % array.GetSize()
return array.GetService(idx), nil
}
package loadBalance
package selector
import (
"sync/atomic"
......@@ -6,24 +6,21 @@ import (
import (
"github.com/dubbo/dubbo-go/client"
"github.com/dubbo/dubbo-go/service"
"github.com/dubbo/dubbo-go/registry"
)
type RoundRobinSelector struct {
}
type RoundRobinSelector struct{}
func NewRoundRobinSelector() Selector {
return &RoundRobinSelector{}
}
func (s *RoundRobinSelector) Select(ID int64, array client.ServiceArrayIf) (*service.ServiceURL, error) {
func (s *RoundRobinSelector) Select(ID int64, array client.ServiceArrayIf) (*registry.ServiceURL, error) {
if array.GetSize() == 0 {
return nil, ServiceArrayEmpty
}
idx := atomic.AddInt64(array.GetIdx(), 1)
idx = (ID + idx) % int64(array.GetSize())
//default: // random
// idx = ((int64)(rand.Int()) + ID) % int64(arrSize)
//}
return array.GetService(int(idx)), nil
idx = (ID + idx) % array.GetSize()
return array.GetService(idx), nil
}
package selector
import (
"fmt"
)
import (
"github.com/dubbo/dubbo-go/client"
"github.com/dubbo/dubbo-go/registry"
)
var (
ServiceArrayEmpty = fmt.Errorf("emtpy service array")
)
type Selector interface {
Select(ID int64, array client.ServiceArrayIf) (*registry.ServiceURL, error)
}
package client
import "github.com/dubbo/dubbo-go/service"
import (
"github.com/dubbo/dubbo-go/registry"
)
type ServiceArrayIf interface {
GetIdx() *int64
GetSize() int
GetService(i int) *service.ServiceURL
GetSize() int64
GetService(i int64) *registry.ServiceURL
}
......@@ -16,7 +16,7 @@ import (
import (
"github.com/dubbo/dubbo-go/public"
svc "github.com/dubbo/dubbo-go/service"
"github.com/dubbo/dubbo-go/registry"
)
var (
......@@ -104,7 +104,7 @@ func NewClient(conf *ClientConfig) (*Client, error) {
}
// call one way
func (c *Client) CallOneway(addr string, svcUrl svc.ServiceURL, method string, args interface{}, opts ...CallOption) error {
func (c *Client) CallOneway(addr string, svcUrl registry.ServiceURL, method string, args interface{}, opts ...CallOption) error {
var copts CallOptions
for _, o := range opts {
......@@ -115,7 +115,7 @@ func (c *Client) CallOneway(addr string, svcUrl svc.ServiceURL, method string, a
}
// if @reply is nil, the transport layer will get the response without notify the invoker.
func (c *Client) Call(addr string, svcUrl svc.ServiceURL, method string, args, reply interface{}, opts ...CallOption) error {
func (c *Client) Call(addr string, svcUrl registry.ServiceURL, method string, args, reply interface{}, opts ...CallOption) error {
var copts CallOptions
for _, o := range opts {
......@@ -130,7 +130,7 @@ func (c *Client) Call(addr string, svcUrl svc.ServiceURL, method string, args, r
return jerrors.Trace(c.call(ct, addr, svcUrl, method, args, reply, nil, copts))
}
func (c *Client) AsyncCall(addr string, svcUrl svc.ServiceURL, method string, args interface{},
func (c *Client) AsyncCall(addr string, svcUrl registry.ServiceURL, method string, args interface{},
callback AsyncCallback, reply interface{}, opts ...CallOption) error {
var copts CallOptions
......@@ -141,7 +141,7 @@ func (c *Client) AsyncCall(addr string, svcUrl svc.ServiceURL, method string, ar
return jerrors.Trace(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback, copts))
}
func (c *Client) call(ct CallType, addr string, svcUrl svc.ServiceURL, method string,
func (c *Client) call(ct CallType, addr string, svcUrl registry.ServiceURL, method string,
args, reply interface{}, callback AsyncCallback, opts CallOptions) error {
if opts.RequestTimeout == 0 {
......
......@@ -50,7 +50,7 @@ func (h *RpcClientHandler) OnClose(session getty.Session) {
func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
p, ok := pkg.(*DubboPackage)
if !ok {
log.Error("illegal packge")
log.Error("illegal package")
return
}
......
package dubbo
import (
"math/rand"
"sync"
"time"
)
var (
seededIDGen = rand.New(rand.NewSource(time.Now().UnixNano()))
// The golang rand generators are *not* intrinsically thread-safe.
seededIDLock sync.Mutex
)
func randomID() uint64 {
seededIDLock.Lock()
defer seededIDLock.Unlock()
return uint64(seededIDGen.Int63())
}
......@@ -93,7 +93,7 @@ func initClient(clientConfig *examples.ClientConfig) {
}
for _, service := range clientConfig.Service_List {
err = clientRegistry.ConsumerRegister(&service)
err = clientRegistry.RegisterConsumer(&service)
if err != nil {
panic(fmt.Sprintf("registry.Register(service{%#v}) = error{%v}", service, jerrors.ErrorStack(err)))
return
......
......@@ -96,7 +96,7 @@ func initClient(clientConfig *examples.ClientConfig) {
}
for _, service := range clientConfig.Service_List {
err = clientRegistry.ConsumerRegister(&service)
err = clientRegistry.RegisterConsumer(&service)
if err != nil {
panic(fmt.Sprintf("registry.Register(service{%#v}) = error{%v}", service, jerrors.ErrorStack(err)))
return
......
......@@ -5,7 +5,6 @@ import (
"bytes"
"context"
"fmt"
"github.com/dubbo/dubbo-go/service"
"io/ioutil"
"net"
"net/http"
......@@ -22,6 +21,7 @@ import (
import (
"github.com/dubbo/dubbo-go/public"
"github.com/dubbo/dubbo-go/registry"
)
//////////////////////////////////////////////
......@@ -39,8 +39,8 @@ type Request struct {
contentType string
}
func (r *Request) ServiceConfig() service.ServiceConfigIf {
return &service.ServiceConfig{
func (r *Request) ServiceConfig() registry.ServiceConfigIf {
return &registry.ServiceConfig{
Protocol: r.protocol,
Service: r.service,
Group: r.group,
......@@ -86,7 +86,7 @@ func NewHTTPClient(opt *HTTPOptions) *HTTPClient {
}
}
func (c *HTTPClient) NewRequest(conf service.ServiceConfig, method string, args interface{}) Request {
func (c *HTTPClient) NewRequest(conf registry.ServiceConfig, method string, args interface{}) Request {
return Request{
ID: atomic.AddInt64(&c.ID, 1),
group: conf.Group,
......@@ -98,7 +98,7 @@ func (c *HTTPClient) NewRequest(conf service.ServiceConfig, method string, args
}
}
func (c *HTTPClient) Call(ctx context.Context, service *service.ServiceURL, req Request, rsp interface{}) error {
func (c *HTTPClient) Call(ctx context.Context, service *registry.ServiceURL, req Request, rsp interface{}) error {
// header
httpHeader := http.Header{}
httpHeader.Set("Content-Type", "application/json")
......
......@@ -4,8 +4,6 @@ import (
"bufio"
"bytes"
"context"
"github.com/dubbo/dubbo-go/server"
"github.com/dubbo/dubbo-go/service"
"io/ioutil"
"net"
"net/http"
......@@ -22,6 +20,10 @@ import (
jerrors "github.com/juju/errors"
)
import (
"github.com/dubbo/dubbo-go/server"
)
const (
DefaultMaxSleepTime = 1 * time.Second // accept中间最大sleep interval
DefaultHTTPRspBufferSize = 1024
......@@ -58,7 +60,7 @@ type Option func(*Options)
type Options struct {
Registry registry.Registry
ConfList []server.ServerConfig
ServiceConfList []service.ServiceConfig
ServiceConfList []registry.ServiceConfig
Timeout time.Duration
}
......@@ -93,7 +95,7 @@ func ConfList(confList []server.ServerConfig) Option {
}
}
func ServiceConfList(confList []service.ServiceConfig) Option {
func ServiceConfList(confList []registry.ServiceConfig) Option {
return func(o *Options) {
o.ServiceConfList = confList
}
......@@ -235,7 +237,7 @@ func (s *Server) Options() Options {
func (s *Server) Handle(h Handler) error {
var (
err error
serviceConf service.ServiceConfig
serviceConf registry.ProviderServiceConfig
)
opts := s.Options()
......@@ -263,7 +265,7 @@ func (s *Server) Handle(h Handler) error {
}
serviceConf.Path = opts.ConfList[j].Address()
err = opts.Registry.ProviderRegister(s.opts.Registry.NewProviderServiceConfig(serviceConf))
err = opts.Registry.RegisterProvider(registry.ProviderServiceConfig{serviceConf})
if err != nil {
return err
}
......
package plugins
import (
"github.com/dubbo/dubbo-go/client/loadBalance"
"github.com/dubbo/dubbo-go/client/selector"
"github.com/dubbo/dubbo-go/registry"
"github.com/dubbo/dubbo-go/registry/zookeeper"
)
......@@ -10,7 +10,7 @@ var PluggableRegistries = map[string]func(...registry.RegistryOption) (registry.
"zookeeper": zookeeper.NewZkRegistry,
}
var PluggableLoadbalance = map[string]func() loadBalance.Selector{
"round_robin": loadBalance.NewRoundRobinSelector,
"random": loadBalance.NewRandomSelector,
var PluggableLoadbalance = map[string]func() selector.Selector{
"round_robin": selector.NewRoundRobinSelector,
"random": selector.NewRandomSelector,
}
......@@ -6,10 +6,6 @@ import (
"time"
)
import (
"github.com/dubbo/dubbo-go/service"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
......@@ -18,31 +14,31 @@ func init() {
// service url event type
//////////////////////////////////////////
type ServiceURLEventType int
type ServiceEventType int
const (
ServiceURLAdd = iota
ServiceURLDel
ServiceAdd = iota
ServiceDel
)
var serviceURLEventTypeStrings = [...]string{
"add service url",
"delete service url",
var serviceEventTypeStrings = [...]string{
"add service",
"delete service",
}
func (t ServiceURLEventType) String() string {
return serviceURLEventTypeStrings[t]
func (t ServiceEventType) String() string {
return serviceEventTypeStrings[t]
}
//////////////////////////////////////////
// service url event
// service event
//////////////////////////////////////////
type ServiceURLEvent struct {
Action ServiceURLEventType
Service *service.ServiceURL
type ServiceEvent struct {
Action ServiceEventType
Service *ServiceURL
}
func (e ServiceURLEvent) String() string {
return fmt.Sprintf("ServiceURLEvent{Action{%s}, Service{%s}}", e.Action, e.Service)
func (e ServiceEvent) String() string {
return fmt.Sprintf("ServiceEvent{Action{%s}, Service{%s}}", e.Action, e.Service)
}
package registry
import (
"github.com/dubbo/dubbo-go/service"
)
//////////////////////////////////////////////
// Registry Interface
//////////////////////////////////////////////
......@@ -12,16 +8,14 @@ import (
type Registry interface {
//used for service provider calling , register services to registry
ProviderRegister(conf service.ServiceConfigIf) error
RegisterProvider(ServiceConfigIf) error
//used for service consumer calling , register services cared about ,for dubbo's admin monitoring
ConsumerRegister(conf *service.ServiceConfig) error
RegisterConsumer(ServiceConfigIf) error
//used for service consumer ,start listen goroutine
GetListenEvent() chan *ServiceURLEvent
GetListenEvent() chan *ServiceEvent
//input the serviceConfig , registry should return serviceUrlArray with multi location(provider nodes) available
GetService(*service.ServiceConfig) ([]*service.ServiceURL, error)
GetService(*ServiceConfig) ([]*ServiceURL, error)
Close()
//new Provider conf
NewProviderServiceConfig(service.ServiceConfig) service.ServiceConfigIf
}
package service
package registry
import (
"fmt"
......@@ -13,6 +13,57 @@ import (
jerrors "github.com/juju/errors"
)
//////////////////////////////////////////////
// service config
//////////////////////////////////////////////
type ServiceConfigIf interface {
Key() string
String() string
ServiceEqual(url *ServiceURL) bool
}
type ServiceConfig struct {
Protocol string `required:"true",default:"dubbo" yaml:"protocol" json:"protocol,omitempty"`
Service string `required:"true" yaml:"service" json:"service,omitempty"`
Group string `yaml:"group" json:"group,omitempty"`
Version string `yaml:"version" json:"version,omitempty"`
}
func (c ServiceConfig) Key() string {
return fmt.Sprintf("%s@%s", c.Service, c.Protocol)
}
func (c ServiceConfig) String() string {
return fmt.Sprintf("%s@%s-%s-%s", c.Service, c.Protocol, c.Group, c.Version)
}
func (c ServiceConfig) ServiceEqual(url *ServiceURL) bool {
if c.Protocol != url.Protocol {
return false
}
if c.Service != url.Query.Get("interface") {
return false
}
if c.Group != url.Group {
return false
}
if c.Version != url.Version {
return false
}
return true
}
type ProviderServiceConfig struct {
ServiceConfig
Path string `yaml:"path" json:"path,omitempty"`
Methods string `yaml:"methods" json:"methods,omitempty"`
}
//////////////////////////////////////////
// service url
//////////////////////////////////////////
......@@ -31,14 +82,6 @@ type ServiceURL struct {
PrimitiveURL string
}
func (s ServiceURL) String() string {
return fmt.Sprintf(
"ServiceURL{Protocol:%s, Location:%s, Path:%s, Ip:%s, Port:%s, "+
"Timeout:%s, Version:%s, Group:%s, Weight:%d, Query:%+v}",
s.Protocol, s.Location, s.Path, s.Ip, s.Port,
s.Timeout, s.Version, s.Group, s.Weight, s.Query)
}
func NewServiceURL(urlString string) (*ServiceURL, error) {
var (
err error
......@@ -88,6 +131,14 @@ func NewServiceURL(urlString string) (*ServiceURL, error) {
return s, nil
}
func (s ServiceURL) String() string {
return fmt.Sprintf(
"ServiceURL{Protocol:%s, Location:%s, Path:%s, Ip:%s, Port:%s, "+
"Timeout:%s, Version:%s, Group:%s, Weight:%d, Query:%+v}",
s.Protocol, s.Location, s.Path, s.Ip, s.Port,
s.Timeout, s.Version, s.Group, s.Weight, s.Query)
}
func (s *ServiceURL) ServiceConfig() ServiceConfig {
interfaceName := s.Query.Get("interface")
return ServiceConfig{
......
......@@ -12,19 +12,24 @@ import (
import (
"github.com/dubbo/dubbo-go/registry"
"github.com/dubbo/dubbo-go/service"
)
func (r *ZkRegistry) ConsumerRegister(conf *service.ServiceConfig) error {
func (r *ZkRegistry) RegisterConsumer(regConf registry.ServiceConfigIf) error {
var (
ok bool
err error
listener *zkEventListener
conf *registry.ServiceConfig
)
if conf, ok = regConf.(*registry.ServiceConfig); !ok {
return jerrors.Errorf("the type of @regConf %T is not registry.ServiceConfig", regConf)
}
ok = false
r.Lock()
r.cltLock.Lock()
_, ok = r.services[conf.Key()]
r.Unlock()
r.cltLock.Unlock()
if ok {
return jerrors.Errorf("Service{%s} has been registered", conf.Service)
}
......@@ -34,9 +39,9 @@ func (r *ZkRegistry) ConsumerRegister(conf *service.ServiceConfig) error {
return jerrors.Trace(err)
}
r.Lock()
r.cltLock.Lock()
r.services[conf.Key()] = conf
r.Unlock()
r.cltLock.Unlock()
log.Debug("(consumerZkConsumerRegistry)Register(conf{%#v})", conf)
r.listenerLock.Lock()
......@@ -49,21 +54,21 @@ func (r *ZkRegistry) ConsumerRegister(conf *service.ServiceConfig) error {
return nil
}
func (r *ZkRegistry) GetListenEvent() chan *registry.ServiceURLEvent {
func (r *ZkRegistry) GetListenEvent() chan *registry.ServiceEvent {
return r.outerEventCh
}
// name: service@protocol
func (r *ZkRegistry) GetService(conf *service.ServiceConfig) ([]*service.ServiceURL, error) {
func (r *ZkRegistry) GetService(conf *registry.ServiceConfig) ([]*registry.ServiceURL, error) {
var (
ok bool
err error
dubboPath string
nodes []string
listener *zkEventListener
serviceURL *service.ServiceURL
serviceConfIf service.ServiceConfigIf
serviceConf *service.ServiceConfig
serviceURL *registry.ServiceURL
serviceConfIf registry.ServiceConfigIf
serviceConf *registry.ServiceConfig
)
r.listenerLock.Lock()
listener = r.listener
......@@ -73,13 +78,13 @@ func (r *ZkRegistry) GetService(conf *service.ServiceConfig) ([]*service.Service
listener.listenServiceEvent(conf)
}
r.Lock()
r.cltLock.Lock()
serviceConfIf, ok = r.services[conf.Key()]
r.Unlock()
r.cltLock.Unlock()
if !ok {
return nil, jerrors.Errorf("Service{%s} has not been registered", conf.Key())
}
serviceConf, ok = serviceConfIf.(*service.ServiceConfig)
serviceConf, ok = serviceConfIf.(*registry.ServiceConfig)
if !ok {
return nil, jerrors.Errorf("Service{%s}: failed to get serviceConfigIf type", conf.Key())
}
......@@ -89,17 +94,17 @@ func (r *ZkRegistry) GetService(conf *service.ServiceConfig) ([]*service.Service
if err != nil {
return nil, jerrors.Trace(err)
}
r.Lock()
r.cltLock.Lock()
nodes, err = r.client.getChildren(dubboPath)
r.Unlock()
r.cltLock.Unlock()
if err != nil {
log.Warn("getChildren(dubboPath{%s}) = error{%v}", dubboPath, err)
return nil, jerrors.Trace(err)
}
var listenerServiceMap = make(map[string]*service.ServiceURL)
var listenerServiceMap = make(map[string]*registry.ServiceURL)
for _, n := range nodes {
serviceURL, err = service.NewServiceURL(n)
serviceURL, err = registry.NewServiceURL(n)
if err != nil {
log.Error("NewServiceURL({%s}) = error{%v}", n, err)
continue
......@@ -116,7 +121,7 @@ func (r *ZkRegistry) GetService(conf *service.ServiceConfig) ([]*service.Service
}
}
var services []*service.ServiceURL
var services []*registry.ServiceURL
for _, service := range listenerServiceMap {
services = append(services, service)
}
......@@ -162,7 +167,7 @@ func (r *ZkRegistry) getListener() (*zkEventListener, error) {
var (
ok bool
zkListener *zkEventListener
serviceConf *service.ServiceConfig
serviceConf *registry.ServiceConfig
)
r.listenerLock.Lock()
......@@ -172,9 +177,9 @@ func (r *ZkRegistry) getListener() (*zkEventListener, error) {
return zkListener, nil
}
r.Lock()
r.cltLock.Lock()
client := r.client
r.Unlock()
r.cltLock.Unlock()
if client == nil {
return nil, jerrors.New("zk connection broken")
}
......@@ -187,13 +192,13 @@ func (r *ZkRegistry) getListener() (*zkEventListener, error) {
r.listenerLock.Unlock()
// listen
r.Lock()
r.cltLock.Lock()
for _, svs := range r.services {
if serviceConf, ok = svs.(*service.ServiceConfig); ok {
if serviceConf, ok = svs.(*registry.ServiceConfig); ok {
go zkListener.listenServiceEvent(serviceConf)
}
}
r.Unlock()
r.cltLock.Unlock()
return zkListener, nil
}
......@@ -15,7 +15,6 @@ import (
import (
"github.com/dubbo/dubbo-go/registry"
"github.com/dubbo/dubbo-go/service"
)
const (
......@@ -23,7 +22,7 @@ const (
)
type zkEvent struct {
res *registry.ServiceURLEvent
res *registry.ServiceEvent
err error
}
......@@ -81,7 +80,7 @@ func (l *zkEventListener) listenServiceNodeEvent(zkPath string) bool {
return false
}
func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, conf *service.ServiceConfig) {
func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, conf *registry.ServiceConfig) {
contains := func(s []string, e string) bool {
for _, a := range s {
if a == e {
......@@ -101,7 +100,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co
// a node was added -- listen the new node
var (
newNode string
serviceURL *service.ServiceURL
serviceURL *registry.ServiceURL
)
for _, n := range newChildren {
if contains(children, n) {
......@@ -110,7 +109,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co
newNode = path.Join(zkPath, n)
log.Info("add zkNode{%s}", newNode)
serviceURL, err = service.NewServiceURL(n)
serviceURL, err = registry.NewServiceURL(n)
if err != nil {
log.Error("NewServiceURL(%s) = error{%v}", n, jerrors.ErrorStack(err))
continue
......@@ -120,13 +119,13 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co
continue
}
log.Info("add serviceURL{%s}", serviceURL)
l.events <- zkEvent{&registry.ServiceURLEvent{registry.ServiceURLAdd, serviceURL}, nil}
l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceAdd, Service: serviceURL}, nil}
// listen l service node
go func(node string, serviceURL *service.ServiceURL) {
go func(node string, serviceURL *registry.ServiceURL) {
log.Info("delete zkNode{%s}", node)
if l.listenServiceNodeEvent(node) {
log.Info("delete serviceURL{%s}", serviceURL)
l.events <- zkEvent{&registry.ServiceURLEvent{registry.ServiceURLDel, serviceURL}, nil}
l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil}
}
log.Warn("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(newNode, serviceURL)
......@@ -141,7 +140,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co
oldNode = path.Join(zkPath, n)
log.Warn("delete zkPath{%s}", oldNode)
serviceURL, err = service.NewServiceURL(n)
serviceURL, err = registry.NewServiceURL(n)
if !conf.ServiceEqual(serviceURL) {
log.Warn("serviceURL{%s} has been deleted is not compatible with ServiceConfig{%#v}", serviceURL, conf)
continue
......@@ -151,11 +150,11 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co
log.Error("NewServiceURL(i{%s}) = error{%v}", n, jerrors.ErrorStack(err))
continue
}
l.events <- zkEvent{&registry.ServiceURLEvent{registry.ServiceURLDel, serviceURL}, nil}
l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil}
}
}
func (l *zkEventListener) listenDirEvent(zkPath string, conf *service.ServiceConfig) {
func (l *zkEventListener) listenDirEvent(zkPath string, conf *registry.ServiceConfig) {
l.wg.Add(1)
defer l.wg.Done()
......@@ -221,13 +220,13 @@ func (l *zkEventListener) listenDirEvent(zkPath string, conf *service.ServiceCon
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent
// |
// --------> listenServiceNodeEvent
func (l *zkEventListener) listenServiceEvent(conf *service.ServiceConfig) {
func (l *zkEventListener) listenServiceEvent(conf *registry.ServiceConfig) {
var (
err error
zkPath string
dubboPath string
children []string
serviceURL *service.ServiceURL
serviceURL *registry.ServiceURL
)
zkPath = fmt.Sprintf("/dubbo/%s/providers", conf.Service)
......@@ -253,7 +252,7 @@ func (l *zkEventListener) listenServiceEvent(conf *service.ServiceConfig) {
for _, c := range children {
serviceURL, err = service.NewServiceURL(c)
serviceURL, err = registry.NewServiceURL(c)
if err != nil {
log.Error("NewServiceURL(r{%s}) = error{%v}", c, err)
continue
......@@ -263,22 +262,22 @@ func (l *zkEventListener) listenServiceEvent(conf *service.ServiceConfig) {
continue
}
log.Debug("add serviceUrl{%s}", serviceURL)
l.events <- zkEvent{&registry.ServiceURLEvent{registry.ServiceURLAdd, serviceURL}, nil}
l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceAdd, Service: serviceURL}, nil}
// listen l service node
dubboPath = path.Join(zkPath, c)
log.Info("listen dubbo service key{%s}", dubboPath)
go func(zkPath string, serviceURL *service.ServiceURL) {
go func(zkPath string, serviceURL *registry.ServiceURL) {
if l.listenServiceNodeEvent(dubboPath) {
log.Debug("delete serviceUrl{%s}", serviceURL)
l.events <- zkEvent{&registry.ServiceURLEvent{registry.ServiceURLDel, serviceURL}, nil}
l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil}
}
log.Warn("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath, serviceURL)
}
log.Info("listen dubbo path{%s}", zkPath)
go func(zkPath string, conf *service.ServiceConfig) {
go func(zkPath string, conf *registry.ServiceConfig) {
l.listenDirEvent(zkPath, conf)
log.Warn("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, conf)
......@@ -302,7 +301,7 @@ func (l *zkEventListener) listenEvent(r *ZkRegistry) error {
if e.err != nil {
return jerrors.Trace(e.err)
}
if e.res.Action == registry.ServiceURLDel && !l.valid() {
if e.res.Action == registry.ServiceDel && !l.valid() {
log.Warn("update @result{%s}. But its connection to registry is invalid", e.res)
continue
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment