Skip to content
Snippets Groups Projects
Commit f7bdf3aa authored by vito.he's avatar vito.he
Browse files

sepatate the client loadbalance from registry & create invoker/loadbalance module in client folder

parent 33db5917
No related branches found
No related tags found
No related merge requests found
Showing
with 502 additions and 331 deletions
package invoker
import (
"context"
log "github.com/AlexStocks/log4go"
"github.com/dubbo/dubbo-go/client/loadBalance"
"github.com/dubbo/dubbo-go/jsonrpc"
"github.com/dubbo/dubbo-go/registry"
"github.com/dubbo/dubbo-go/service"
jerrors "github.com/juju/errors"
"sync"
"time"
)
type Options struct{
ServiceTTL time.Duration
selector loadBalance.Selector
ctx context.Context
}
type Option func(*Options)
func WithServiceTTL(ttl time.Duration) Option {
return func(o *Options) {
o.ServiceTTL = ttl
}
}
func WithLBSelector(selector loadBalance.Selector ) Option {
return func(o *Options) {
o.selector= selector
}
}
func WithContext(ctx context.Context) Option {
return func(o *Options) {
o.ctx= ctx
}
}
type Invoker struct {
Options
client *jsonrpc.HTTPClient
cacheServiceMap map[string]*ServiceArray
registry registry.Registry
listenerLock sync.Mutex
}
func NewInvoker(registry registry.Registry,client *jsonrpc.HTTPClient, opts ...Option)*Invoker{
options:=Options{
//default 300s
ServiceTTL:time.Duration(300e9),
ctx:context.Background(),
selector:loadBalance.NewRandomSelector(),
}
for _,opt:=range opts{
opt(&options)
}
invoker := &Invoker{
Options:options,
client:client,
cacheServiceMap:make(map[string]*ServiceArray),
registry:registry,
}
invoker.Listen()
return invoker
}
func (ivk * Invoker)Listen(){
go ivk.listen()
}
func (ivk *Invoker)listen(){
ch:=ivk.registry.Listen()
for {
e, isOpen := <-ch
if !isOpen {
log.Warn("registry listen channel closed!")
break
}
log.Warn("registry listen channel not closed!")
ivk.update(e)
}
}
func (ivk * Invoker) update(res *registry.ServiceURLEvent) {
if res == nil || res.Service == nil {
return
}
log.Debug("registry update, result{%s}", res)
serviceKey := res.Service.ServiceConfig().Key()
ivk.listenerLock.Lock()
defer ivk.listenerLock.Unlock()
svcArr, ok := ivk.cacheServiceMap[serviceKey]
log.Debug("service name:%s, its current member lists:%+v", serviceKey, svcArr)
switch res.Action {
case registry.ServiceURLAdd:
if ok {
svcArr.add(res.Service, ivk.ServiceTTL)
} else {
ivk.cacheServiceMap[serviceKey] = newServiceArray([]*service.ServiceURL{res.Service})
}
case registry.ServiceURLDel:
if ok {
svcArr.del(res.Service, ivk.ServiceTTL)
if len(svcArr.arr) == 0 {
delete(ivk.cacheServiceMap, serviceKey)
log.Warn("delete service %s from service map", serviceKey)
}
}
log.Error("selector delete serviceURL{%s}", *res.Service)
}
}
func (ivk * Invoker) getService(serviceConf *service.ServiceConfig)(*ServiceArray,error){
serviceKey := serviceConf.Key()
ivk.listenerLock.Lock()
svcArr, sok := ivk.cacheServiceMap[serviceKey]
log.Debug("r.svcArr[serviceString{%v}] = svcArr{%s}", serviceKey, svcArr)
if sok && time.Since(svcArr.birth) < ivk.Options.ServiceTTL{
return svcArr,nil
}
ivk.listenerLock.Unlock()
svcs, err := ivk.registry.GetService(serviceConf)
ivk.listenerLock.Lock()
defer ivk.listenerLock.Unlock()
if err != nil {
log.Error("Registry.get(conf:%+v) = {err:%s, svcs:%+v}",
serviceConf, jerrors.ErrorStack(err), svcs)
return nil, jerrors.Trace(err)
}
newSvcArr := newServiceArray(svcs)
ivk.cacheServiceMap[serviceKey] = newSvcArr
return newSvcArr, nil
}
func (ivk * Invoker)Call(reqId int64,serviceConf *service.ServiceConfig,req jsonrpc.Request,resp interface{})error{
serviceArray ,err:= ivk.getService(serviceConf)
if err != nil{
return err
}
url,err := ivk.selector.Select(reqId,serviceArray)
if err != nil{
return err
}
if err = ivk.client.Call(ivk.ctx, url, req, resp); err != nil {
log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err))
return err
}
log.Info("response result:%s", resp)
return nil
}
\ No newline at end of file
package invoker
import (
"fmt"
"github.com/dubbo/dubbo-go/service"
jerrors "github.com/juju/errors"
"strings"
"time"
)
//////////////////////////////////////////
// service array
// should be returned by registry ,will be used by client & waiting to loadBalance
//////////////////////////////////////////
var (
ErrServiceArrayEmpty = jerrors.New("serviceArray empty")
ErrServiceArrayTimeout = jerrors.New("serviceArray timeout")
)
type ServiceArray struct {
arr []*service.ServiceURL
birth time.Time
idx int64
}
func newServiceArray(arr []*service.ServiceURL) *ServiceArray {
return &ServiceArray{
arr: arr,
birth: time.Now(),
}
}
func (s *ServiceArray)GetIdx()*int64{
return &s.idx
}
func (s *ServiceArray)GetSize()int{
return len(s.arr)
}
func (s *ServiceArray)GetService(i int)*service.ServiceURL{
return s.arr[i]
}
func (s *ServiceArray) String() string {
var builder strings.Builder
builder.WriteString(fmt.Sprintf("birth:%s, idx:%d, arr len:%d, arr:{", s.birth, s.idx, len(s.arr)))
for i := range s.arr {
builder.WriteString(fmt.Sprintf("%d:%s, ", i, s.arr[i]))
}
builder.WriteString("}")
return builder.String()
}
func (s *ServiceArray) add(service *service.ServiceURL, ttl time.Duration) {
s.arr = append(s.arr, service)
s.birth = time.Now().Add(ttl)
}
func (s *ServiceArray) del(service *service.ServiceURL, ttl time.Duration) {
for i, svc := range s.arr {
if svc.PrimitiveURL == service.PrimitiveURL {
s.arr = append(s.arr[:i], s.arr[i+1:]...)
s.birth = time.Now().Add(ttl)
break
}
}
}
package loadBalance
import (
"github.com/dubbo/dubbo-go/client"
"github.com/dubbo/dubbo-go/service"
)
type Selector interface {
Select(ID int64, array client.ServiceArrayIf) (*service.ServiceURL, error)
}
//////////////////////////////////////////
// load balancer mode
//////////////////////////////////////////
//
//// Mode defines the algorithm of selecting a provider from cluster
//type Mode int
//
//const (
// SM_BEGIN Mode = iota
// SM_Random
// SM_RoundRobin
// SM_END
//)
//
//var modeStrings = [...]string{
// "Begin",
// "Random",
// "RoundRobin",
// "End",
//}
//
//func (s Mode) String() string {
// if SM_BEGIN < s && s < SM_END {
// return modeStrings[s]
// }
//
// return ""
//}
package loadBalance
import (
"github.com/dubbo/dubbo-go/client"
"github.com/dubbo/dubbo-go/service"
"math/rand"
"sync/atomic"
)
type RandomSelector struct{
}
func NewRandomSelector()Selector{
return &RandomSelector{}
}
func (s *RandomSelector) Select(ID int64,array client.ServiceArrayIf) (*service.ServiceURL, error) {
idx := atomic.AddInt64(array.GetIdx(), 1)
idx = ((int64)(rand.Int()) + ID) % int64(array.GetSize())
return array.GetService(int(idx)), nil
}
package loadBalance
import (
"github.com/dubbo/dubbo-go/client"
"github.com/dubbo/dubbo-go/service"
"sync/atomic"
)
type RoundRobinSelector struct{
}
func NewRoundRobinSelector()Selector{
return &RoundRobinSelector{}
}
func (s *RoundRobinSelector) Select(ID int64,array client.ServiceArrayIf) (*service.ServiceURL, error) {
idx := atomic.AddInt64(array.GetIdx(), 1)
idx = (ID + idx) % int64(array.GetSize())
//default: // random
// idx = ((int64)(rand.Int()) + ID) % int64(arrSize)
//}
return array.GetService(int(idx)), nil
}
package client
import "github.com/dubbo/dubbo-go/service"
type ServiceArrayIf interface{
GetIdx()*int64
GetSize()int
GetService(i int)*service.ServiceURL
}
......@@ -20,9 +20,9 @@ import (
import (
"github.com/dubbo/dubbo-go/plugins"
"github.com/dubbo/dubbo-go/registry/zookeeper"
"github.com/dubbo/dubbo-go/public"
"github.com/dubbo/dubbo-go/registry"
"github.com/dubbo/dubbo-go/registry/zookeeper"
)
var (
......@@ -45,11 +45,12 @@ func main() {
time.Sleep(3e9)
gxlog.CInfo("\n\n\nstart to test jsonrpc")
testJsonrpc("A003")
testJsonrpc("A003","GetUser")
time.Sleep(3e9)
gxlog.CInfo("\n\n\nstart to test jsonrpc illegal method")
testJsonrpcIllegalMethod("A003")
testJsonrpc("A003","GetUser1")
initSignal()
}
......@@ -70,21 +71,12 @@ func initClient() {
registry.WithDubboType(registry.CONSUMER),
registry.WithApplicationConf(clientConfig.Application_Config),
zookeeper.WithRegistryConf(clientConfig.ZkRegistryConfig),
registry.WithBalanceMode(registry.SM_RoundRobin),
registry.WithServiceTTL(300e9),
)
if err != nil {
panic(fmt.Sprintf("fail to init registry.Registy, err:%s", jerrors.ErrorStack(err)))
return
}
for _, service := range clientConfig.Service_List {
err = clientRegistry.ConsumerRegister(service)
if err != nil {
panic(fmt.Sprintf("registry.Register(service{%#v}) = error{%v}", service, jerrors.ErrorStack(err)))
return
}
}
// consumer
clientConfig.requestTimeout, err = time.ParseDuration(clientConfig.Request_Timeout)
......@@ -107,6 +99,14 @@ func initClient() {
}
}
for _, service := range clientConfig.Service_List {
err = clientRegistry.ConsumerRegister(&service)
if err != nil {
panic(fmt.Sprintf("registry.Register(service{%#v}) = error{%v}", service, jerrors.ErrorStack(err)))
return
}
}
}
func uninitClient() {
......
......@@ -3,6 +3,7 @@ package main
import (
"fmt"
"github.com/dubbo/dubbo-go/registry/zookeeper"
"github.com/dubbo/dubbo-go/service"
"io/ioutil"
"os"
"path"
......@@ -51,7 +52,7 @@ type (
Application_Config registry.ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"`
ZkRegistryConfig zookeeper.ZkRegistryConfig `yaml:"zk_registry_config" json:"zk_registry_config,omitempty"`
// 一个客户端只允许使用一个service的其中一个group和其中一个version
Service_List []registry.ServiceConfig `yaml:"service_list" json:"service_list,omitempty"`
Service_List []service.ServiceConfig `yaml:"service_list" json:"service_list,omitempty"`
}
)
......
......@@ -3,33 +3,29 @@ package main
import (
"context"
"fmt"
"github.com/AlexStocks/goext/log"
"github.com/dubbo/dubbo-go/client/invoker"
"github.com/dubbo/dubbo-go/client/loadBalance"
"github.com/dubbo/dubbo-go/registry/zookeeper"
"github.com/dubbo/dubbo-go/service"
_ "net/http/pprof"
)
import (
// "github.com/AlexStocks/goext/log"
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
)
import (
"github.com/dubbo/dubbo-go/jsonrpc"
"github.com/dubbo/dubbo-go/public"
"github.com/dubbo/dubbo-go/registry"
)
func testJsonrpc(userKey string) {
func testJsonrpc(userKey string,method string) {
var (
err error
service string
method string
svc string
serviceIdx int
user *JsonRPCUser
ctx context.Context
conf registry.ServiceConfig
conf service.ServiceConfig
req jsonrpc.Request
serviceURL *registry.ServiceURL
clt *jsonrpc.HTTPClient
)
......@@ -40,10 +36,11 @@ func testJsonrpc(userKey string) {
},
)
serviceIdx = -1
service = "com.ikurento.user.UserProvider"
svc = "com.ikurento.user.UserProvider"
for i := range clientConfig.Service_List {
if clientConfig.Service_List[i].Service == service && clientConfig.Service_List[i].Protocol == public.CODECTYPE_JSONRPC.String() {
if clientConfig.Service_List[i].Service == svc && clientConfig.Service_List[i].Protocol == public.CODECTYPE_JSONRPC.String() {
serviceIdx = i
break
}
......@@ -53,111 +50,33 @@ func testJsonrpc(userKey string) {
}
// Create request
method = string("GetUser")
// gxlog.CInfo("jsonrpc selected service %#v", clientConfig.Service_List[serviceIdx])
conf = registry.ServiceConfig{
Group: clientConfig.Service_List[serviceIdx].Group,
Protocol: public.CodecType(public.CODECTYPE_JSONRPC).String(),
Version: clientConfig.Service_List[serviceIdx].Version,
Service: clientConfig.Service_List[serviceIdx].Service,
conf = service.ServiceConfig{
Group: clientConfig.Service_List[serviceIdx].Group,
Protocol: public.CodecType(public.CODECTYPE_JSONRPC).String(),
Version: clientConfig.Service_List[serviceIdx].Version,
Service: clientConfig.Service_List[serviceIdx].Service,
}
// Attention the last parameter : []UserKey{userKey}
req = clt.NewRequest(conf, method, []string{userKey})
clientRegistry = clientRegistry.(*zookeeper.ZkRegistry)
serviceURL, err = clientRegistry.Filter(req.ServiceConfig(), 1)
if err != nil {
log.Error("registry.Filter(conf:%#v) = error:%s", req.ServiceConfig(), jerrors.ErrorStack(err))
// gxlog.CError("registry.Filter(conf:%#v) = error:%s", req.ServiceConfig(), jerrors.ErrorStack(err))
return
}
log.Debug("got serviceURL: %s", serviceURL)
// Set arbitrary headers in context
ctx = context.WithValue(context.Background(), public.DUBBOGO_CTX_KEY, map[string]string{
"X-Proxy-Id": "dubbogo",
"X-Services": service,
"X-Method": method,
})
user = new(JsonRPCUser)
// Call service
if err = clt.Call(ctx, *serviceURL, req, user); err != nil {
log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err))
// gxlog.CError("client.Call() return error:%+v", jerrors.ErrorStack(err))
return
}
log.Info("response result:%s", user)
// gxlog.CInfo("response result:%s", user)
}
func testJsonrpcIllegalMethod(userKey string) {
var (
err error
service string
method string
serviceIdx int
user *JsonRPCUser
ctx context.Context
conf registry.ServiceConfig
req jsonrpc.Request
serviceURL *registry.ServiceURL
clt *jsonrpc.HTTPClient
)
clt = jsonrpc.NewHTTPClient(
&jsonrpc.HTTPOptions{
HandshakeTimeout: clientConfig.connectTimeout,
HTTPTimeout: clientConfig.requestTimeout,
},
)
serviceIdx = -1
service = "com.ikurento.user.UserProvider"
for i := range clientConfig.Service_List {
if clientConfig.Service_List[i].Service == service && clientConfig.Service_List[i].Protocol == public.CODECTYPE_JSONRPC.String() {
serviceIdx = i
break
}
}
if serviceIdx == -1 {
panic(fmt.Sprintf("can not find service in config service list:%#v", clientConfig.Service_List))
}
// Create request
method = string("GetUser1")
// gxlog.CInfo("jsonrpc selected service %#v", clientConfig.Service_List[serviceIdx])
conf = registry.ServiceConfig{
Group: clientConfig.Service_List[serviceIdx].Group,
Protocol: public.CodecType(public.CODECTYPE_JSONRPC).String(),
Version: clientConfig.Service_List[serviceIdx].Version,
Service: clientConfig.Service_List[serviceIdx].Service,
}
// Attention the last parameter : []UserKey{userKey}
req = clt.NewRequest(conf, method, []string{userKey})
serviceURL, err = clientRegistry.Filter(req.ServiceConfig(), 1)
if err != nil {
log.Error("registry.Filter(conf:%#v) = error:%s", req.ServiceConfig(), jerrors.ErrorStack(err))
// gxlog.CError("registry.Filter(conf:%#v) = error:%s", req.ServiceConfig(), jerrors.ErrorStack(err))
return
}
log.Debug("got serviceURL: %s", serviceURL)
// Set arbitrary headers in context
ctx = context.WithValue(context.Background(), public.DUBBOGO_CTX_KEY, map[string]string{
"X-Proxy-Id": "dubbogo",
"X-Services": service,
"X-Services": svc,
"X-Method": method,
})
user = new(JsonRPCUser)
// Call service
if err = clt.Call(ctx, *serviceURL, req, user); err != nil {
log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err))
// gxlog.CError("client.Call() return error:%+v", jerrors.ErrorStack(err))
return
// new invoker to Call service
invoker := invoker.NewInvoker(clientRegistry,clt,
invoker.WithContext(ctx),
invoker.WithLBSelector(loadBalance.NewRandomSelector()))
err = invoker.Call(1,&conf,req,user)
if err !=nil{
jerrors.Errorf("call service err , msg is :",err)
}
log.Info("response result:%s", user)
// gxlog.CInfo("response result:%s", user)
gxlog.CInfo("response result:%s", user)
}
......@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/dubbo/dubbo-go/registry/zookeeper"
"github.com/dubbo/dubbo-go/server"
"github.com/dubbo/dubbo-go/service"
"io/ioutil"
"os"
"path"
......@@ -42,7 +43,7 @@ type (
// Registry_Address string `default:"192.168.35.3:2181"`
Registry string `default:"zookeeper" yaml:"registry" json:"registry,omitempty"`
ZkRegistryConfig zookeeper.ZkRegistryConfig `yaml:"zk_registry_config" json:"zk_registry_config,omitempty"`
Service_List []registry.ServiceConfig `yaml:"service_list" json:"service_list,omitempty"`
Service_List []service.ServiceConfig `yaml:"service_list" json:"service_list,omitempty"`
Server_List []server.ServerConfig `yaml:"server_list" json:"server_list,omitempty"`
}
)
......
......@@ -2,20 +2,20 @@ package main
import (
"fmt"
"github.com/AlexStocks/goext/net"
"github.com/AlexStocks/goext/time"
log "github.com/AlexStocks/log4go"
"github.com/dubbo/dubbo-go/jsonrpc"
"github.com/dubbo/dubbo-go/plugins"
registry2 "github.com/dubbo/dubbo-go/registry"
"github.com/dubbo/dubbo-go/registry/zookeeper"
jerrors "github.com/juju/errors"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"strconv"
"syscall"
"github.com/AlexStocks/goext/net"
"github.com/AlexStocks/goext/time"
log "github.com/AlexStocks/log4go"
"github.com/dubbo/dubbo-go/jsonrpc"
jerrors "github.com/juju/errors"
)
var (
......@@ -62,8 +62,6 @@ func initServer() *jsonrpc.Server {
registry2.WithDubboType(registry2.PROVIDER),
registry2.WithApplicationConf(conf.Application_Config),
zookeeper.WithRegistryConf(conf.ZkRegistryConfig),
//zookeeper.WithBalanceMode(registry.SM_RoundRobin),
registry2.WithServiceTTL(conf.netTimeout),
)
if err != nil || registry == nil {
......
......@@ -5,6 +5,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/dubbo/dubbo-go/service"
"io/ioutil"
"net"
"net/http"
......@@ -21,7 +22,6 @@ import (
import (
"github.com/dubbo/dubbo-go/public"
"github.com/dubbo/dubbo-go/registry"
)
//////////////////////////////////////////////
......@@ -39,8 +39,8 @@ type Request struct {
contentType string
}
func (r *Request) ServiceConfig() registry.ServiceConfigIf {
return &registry.ServiceConfig{
func (r *Request) ServiceConfig() service.ServiceConfigIf {
return &service.ServiceConfig{
Protocol: r.protocol,
Service: r.service,
Group: r.group,
......@@ -86,7 +86,7 @@ func NewHTTPClient(opt *HTTPOptions) *HTTPClient {
}
}
func (c *HTTPClient) NewRequest(conf registry.ServiceConfig, method string, args interface{}) Request {
func (c *HTTPClient) NewRequest(conf service.ServiceConfig, method string, args interface{}) Request {
return Request{
ID: atomic.AddInt64(&c.ID, 1),
group: conf.Group,
......@@ -98,7 +98,7 @@ func (c *HTTPClient) NewRequest(conf registry.ServiceConfig, method string, args
}
}
func (c *HTTPClient) Call(ctx context.Context, service registry.ServiceURL, req Request, rsp interface{}) error {
func (c *HTTPClient) Call(ctx context.Context, service *service.ServiceURL, req Request, rsp interface{}) error {
// header
httpHeader := http.Header{}
httpHeader.Set("Content-Type", "application/json")
......
......@@ -44,7 +44,7 @@ func (m *serviceMethod) suiteContext(ctx context.Context) reflect.Value {
return reflect.Zero(m.ctxType)
}
type service struct {
type svc struct {
name string // name of service
rcvr reflect.Value // receiver of methods for the service
rcvrType reflect.Type // type of the receiver
......@@ -53,12 +53,12 @@ type service struct {
type serviceMap struct {
mutex sync.Mutex // protects the serviceMap
serviceMap map[string]*service // service name -> service
serviceMap map[string]*svc // service name -> service
}
func initServer() *serviceMap {
return &serviceMap{
serviceMap: make(map[string]*service),
serviceMap: make(map[string]*svc),
}
}
......@@ -134,10 +134,10 @@ func (server *serviceMap) register(rcvr Handler) (string, error) {
server.mutex.Lock()
defer server.mutex.Unlock()
if server.serviceMap == nil {
server.serviceMap = make(map[string]*service)
server.serviceMap = make(map[string]*svc)
}
s := new(service)
s := new(svc)
s.rcvrType = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name()
......
......@@ -5,6 +5,7 @@ import (
"bytes"
"context"
"github.com/dubbo/dubbo-go/server"
"github.com/dubbo/dubbo-go/service"
"io/ioutil"
"net"
"net/http"
......@@ -57,7 +58,7 @@ type Option func(*Options)
type Options struct {
Registry registry.Registry
ConfList []server.ServerConfig
ServiceConfList []registry.ServiceConfig
ServiceConfList []service.ServiceConfig
Timeout time.Duration
}
......@@ -92,7 +93,7 @@ func ConfList(confList []server.ServerConfig) Option {
}
}
func ServiceConfList(confList []registry.ServiceConfig) Option {
func ServiceConfList(confList []service.ServiceConfig) Option {
return func(o *Options) {
o.ServiceConfList = confList
}
......@@ -234,7 +235,7 @@ func (s *Server) Options() Options {
func (s *Server) Handle(h Handler) error {
var (
err error
serviceConf registry.ServiceConfig
serviceConf service.ServiceConfig
)
opts := s.Options()
......
package registry
import (
"fmt"
"github.com/dubbo/dubbo-go/service"
"math/rand"
"time"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
//////////////////////////////////////////
// service url event type
//////////////////////////////////////////
type ServiceURLEventType int
const (
ServiceURLAdd = iota
ServiceURLDel
)
var serviceURLEventTypeStrings = [...]string{
"add service url",
"delete service url",
}
func (t ServiceURLEventType) String() string {
return serviceURLEventTypeStrings[t]
}
//////////////////////////////////////////
// service url event
//////////////////////////////////////////
type ServiceURLEvent struct {
Action ServiceURLEventType
Service *service.ServiceURL
}
func (e ServiceURLEvent) String() string {
return fmt.Sprintf("ServiceURLEvent{Action{%s}, Service{%s}}", e.Action.String(), e.Service)
}
package registry
import (
"time"
)
const (
CONSUMER = iota
CONFIGURATOR
......@@ -41,8 +37,6 @@ type OptionInf interface{
}
type Options struct{
ApplicationConfig
Mode Mode
ServiceTTL time.Duration
DubboType DubboType
}
......@@ -70,14 +64,3 @@ func WithApplicationConf(conf ApplicationConfig) Option {
o.ApplicationConfig = conf
}
}
func WithServiceTTL(ttl time.Duration) Option {
return func(o *Options) {
o.ServiceTTL = ttl
}
}
func WithBalanceMode(mode Mode) Option {
return func(o *Options) {
o.Mode = mode
}
}
package registry
import (
"fmt"
"github.com/dubbo/dubbo-go/service"
)
//////////////////////////////////////////////
......@@ -14,66 +14,22 @@ import (
type Registry interface {
//used for service provider calling , register services to registry
ProviderRegister(conf ServiceConfigIf) error
ProviderRegister(conf service.ServiceConfigIf) error
//used for service consumer calling , register services cared about ,for dubbo's admin monitoring
ConsumerRegister(conf ServiceConfig) error
ConsumerRegister(conf *service.ServiceConfig) error
//unregister service for service provider
//Unregister(conf interface{}) error
//used for service consumer ,start listen goroutine
Listen()
Listen()chan *ServiceURLEvent
//input the serviceConfig , registry should return serviceUrlArray with multi location(provider nodes) available
GetService(*service.ServiceConfig) ([]*service.ServiceURL, error)
//input service config & request id, should return url which registry used
Filter(ServiceConfigIf, int64) (*ServiceURL, error)
//Filter(ServiceConfigIf, int64) (*ServiceURL, error)
Close()
//new Provider conf
NewProviderServiceConfig(ServiceConfig)ServiceConfigIf
}
//////////////////////////////////////////////
// service config
//////////////////////////////////////////////
type ServiceConfigIf interface {
String() string
ServiceEqual(url *ServiceURL) bool
}
type ServiceConfig struct {
Protocol string `required:"true",default:"dubbo" yaml:"protocol" json:"protocol,omitempty"`
Service string `required:"true" yaml:"service" json:"service,omitempty"`
Group string `yaml:"group" json:"group,omitempty"`
Version string `yaml:"version" json:"version,omitempty"`
//add for provider
Path string `yaml:"path" json:"path,omitempty"`
Methods string `yaml:"methods" json:"methods,omitempty"`
NewProviderServiceConfig(service.ServiceConfig)service.ServiceConfigIf
}
func (c ServiceConfig) Key() string {
return fmt.Sprintf("%s@%s", c.Service, c.Protocol)
}
func (c ServiceConfig) String() string {
return fmt.Sprintf("%s@%s-%s-%s", c.Service, c.Protocol, c.Group, c.Version)
}
func (c ServiceConfig) ServiceEqual(url *ServiceURL) bool {
if c.Protocol != url.Protocol {
return false
}
if c.Service != url.Query.Get("interface") {
return false
}
if c.Group != url.Group {
return false
}
if c.Version != url.Version {
return false
}
return true
}
......@@ -3,18 +3,18 @@ package zookeeper
import (
"fmt"
"github.com/dubbo/dubbo-go/registry"
"github.com/dubbo/dubbo-go/service"
jerrors "github.com/juju/errors"
log "github.com/AlexStocks/log4go"
"time"
)
func (r *ZkRegistry) ConsumerRegister(conf registry.ServiceConfig) error {
func (r *ZkRegistry) ConsumerRegister(conf *service.ServiceConfig) error {
var (
ok bool
err error
listener *zkEventListener
)
ok = false
r.Lock()
_, ok = r.services[conf.Key()]
......@@ -29,7 +29,7 @@ func (r *ZkRegistry) ConsumerRegister(conf registry.ServiceConfig) error {
}
r.Lock()
r.services[conf.Key()] = &conf
r.services[conf.Key()] = conf
r.Unlock()
log.Debug("(consumerZkConsumerRegistry)Register(conf{%#v})", conf)
......@@ -44,88 +44,45 @@ func (r *ZkRegistry) ConsumerRegister(conf registry.ServiceConfig) error {
}
func (r *ZkRegistry) Listen() {
go r.listen()
}
func (r *ZkRegistry) Filter(s registry.ServiceConfigIf, reqID int64) (*registry.ServiceURL, error) {
var serviceConf registry.ServiceConfig
if scp, ok := s.(*registry.ServiceConfig); ok {
serviceConf = *scp
} else if sc, ok := s.(registry.ServiceConfig); ok {
serviceConf = sc
} else {
return nil, jerrors.Errorf("illegal @s:%#v", s)
}
serviceKey := serviceConf.Key()
r.listenerLock.Lock()
svcArr, sok := r.listenerServiceMap[serviceKey]
log.Debug("r.svcArr[serviceString{%v}] = svcArr{%s}", serviceKey, svcArr)
if sok {
if serviceURL, err := svcArr.Select(reqID, r.Mode, r.ServiceTTL); err == nil {
r.listenerLock.Unlock()
return serviceURL, nil
}
}
r.listenerLock.Unlock()
svcs, err := r.get(serviceConf)
r.listenerLock.Lock()
defer r.listenerLock.Unlock()
if err != nil {
log.Error("Registry.get(conf:%+v) = {err:%s, svcs:%+v}",
serviceConf, jerrors.ErrorStack(err), svcs)
if sok && len(svcArr.Arr) > 0 {
log.Error("serviceArray{%v} timeout, can not get new, use old instead", svcArr)
service, err := svcArr.Select(reqID, r.Mode, 0)
return service, jerrors.Trace(err)
}
return nil, jerrors.Trace(err)
}
newSvcArr := registry.NewServiceArray(svcs)
service, err := newSvcArr.Select(reqID, r.Mode, 0)
r.listenerServiceMap[serviceKey] = newSvcArr
return service, jerrors.Trace(err)
func (r *ZkRegistry) Listen()chan *registry.ServiceURLEvent {
eventCh := make(chan *registry.ServiceURLEvent,1000)
go r.listen(eventCh)
return eventCh
}
// name: service@protocol
func (r *ZkRegistry) get(sc registry.ServiceConfig) ([]*registry.ServiceURL, error) {
func (r *ZkRegistry) GetService(conf *service.ServiceConfig) ([]*service.ServiceURL, error) {
var (
ok bool
err error
dubboPath string
nodes []string
listener *zkEventListener
serviceURL *registry.ServiceURL
serviceConfIf registry.ServiceConfigIf
serviceConf *registry.ServiceConfig
serviceURL *service.ServiceURL
serviceConfIf service.ServiceConfigIf
serviceConf *service.ServiceConfig
)
r.listenerLock.Lock()
listener = r.listener
r.listenerLock.Unlock()
if listener != nil {
listener.listenServiceEvent(sc)
listener.listenServiceEvent(conf)
}
r.Lock()
serviceConfIf, ok = r.services[sc.Key()]
serviceConfIf, ok = r.services[conf.Key()]
r.Unlock()
if !ok {
return nil, jerrors.Errorf("Service{%s} has not been registered", sc.Key())
return nil, jerrors.Errorf("Service{%s} has not been registered", conf.Key())
}
serviceConf, ok = serviceConfIf.(*registry.ServiceConfig)
serviceConf, ok = serviceConfIf.(*service.ServiceConfig)
if !ok {
return nil, jerrors.Errorf("Service{%s}: failed to get serviceConfigIf type", sc.Key())
return nil, jerrors.Errorf("Service{%s}: failed to get serviceConfigIf type", conf.Key())
}
dubboPath = fmt.Sprintf("/dubbo/%s/providers", sc.Service)
dubboPath = fmt.Sprintf("/dubbo/%s/providers", conf.Service)
err = r.validateZookeeperClient()
if err != nil {
return nil, jerrors.Trace(err)
......@@ -138,9 +95,9 @@ func (r *ZkRegistry) get(sc registry.ServiceConfig) ([]*registry.ServiceURL, err
return nil, jerrors.Trace(err)
}
var listenerServiceMap = make(map[string]*registry.ServiceURL)
var listenerServiceMap = make(map[string]*service.ServiceURL)
for _, n := range nodes {
serviceURL, err = registry.NewServiceURL(n)
serviceURL, err = service.NewServiceURL(n)
if err != nil {
log.Error("NewServiceURL({%s}) = error{%v}", n, err)
continue
......@@ -157,7 +114,7 @@ func (r *ZkRegistry) get(sc registry.ServiceConfig) ([]*registry.ServiceURL, err
}
}
var services []*registry.ServiceURL
var services []*service.ServiceURL
for _, service := range listenerServiceMap {
services = append(services, service)
}
......@@ -165,7 +122,7 @@ func (r *ZkRegistry) get(sc registry.ServiceConfig) ([]*registry.ServiceURL, err
return services, nil
}
func (r *ZkRegistry) listen() {
func (r *ZkRegistry) listen(ch chan *registry.ServiceURLEvent) {
defer r.wg.Done()
for {
......@@ -185,7 +142,7 @@ func (r *ZkRegistry) listen() {
continue
}
if err = listener.listenEvent(r); err != nil {
if err = listener.listenEvent(r,ch); err != nil {
log.Warn("Selector.watch() = error{%v}", jerrors.ErrorStack(err))
r.listenerLock.Lock()
......@@ -204,7 +161,7 @@ func (r *ZkRegistry) getListener() (*zkEventListener, error) {
var (
ok bool
zkListener *zkEventListener
serviceConf *registry.ServiceConfig
serviceConf *service.ServiceConfig
)
r.listenerLock.Lock()
......@@ -230,9 +187,9 @@ func (r *ZkRegistry) getListener() (*zkEventListener, error) {
// listen
r.Lock()
for _, service := range r.services {
if serviceConf, ok = service.(*registry.ServiceConfig); ok {
go zkListener.listenServiceEvent(*serviceConf)
for _, svs := range r.services {
if serviceConf, ok = svs.(*service.ServiceConfig); ok {
go zkListener.listenServiceEvent(serviceConf)
}
}
r.Unlock()
......@@ -240,36 +197,3 @@ func (r *ZkRegistry) getListener() (*zkEventListener, error) {
return zkListener, nil
}
func (r *ZkRegistry) update(res *registry.ServiceURLEvent) {
if res == nil || res.Service == nil {
return
}
log.Debug("registry update, result{%s}", res)
serviceKey := res.Service.ServiceConfig().Key()
r.listenerLock.Lock()
defer r.listenerLock.Unlock()
svcArr, ok := r.listenerServiceMap[serviceKey]
log.Debug("service name:%s, its current member lists:%+v", serviceKey, svcArr)
switch res.Action {
case registry.ServiceURLAdd:
if ok {
svcArr.Add(res.Service, r.Options.ServiceTTL)
} else {
r.listenerServiceMap[serviceKey] = registry.NewServiceArray([]*registry.ServiceURL{res.Service})
}
case registry.ServiceURLDel:
if ok {
svcArr.Del(res.Service, r.Options.ServiceTTL)
if len(svcArr.Arr) == 0 {
delete(r.listenerServiceMap, serviceKey)
log.Warn("delete service %s from service map", serviceKey)
}
}
log.Error("selector delete serviceURL{%s}", *res.Service)
}
}
\ No newline at end of file
......@@ -3,6 +3,7 @@ package zookeeper
import (
"fmt"
"github.com/dubbo/dubbo-go/registry"
"github.com/dubbo/dubbo-go/service"
"path"
"sync"
"time"
......@@ -79,7 +80,7 @@ func (l *zkEventListener) listenServiceNodeEvent(zkPath string) bool {
return false
}
func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, conf registry.ServiceConfig) {
func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, conf *service.ServiceConfig) {
contains := func(s []string, e string) bool {
for _, a := range s {
if a == e {
......@@ -99,7 +100,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co
// a node was added -- listen the new node
var (
newNode string
serviceURL *registry.ServiceURL
serviceURL *service.ServiceURL
)
for _, n := range newChildren {
if contains(children, n) {
......@@ -108,7 +109,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co
newNode = path.Join(zkPath, n)
log.Info("add zkNode{%s}", newNode)
serviceURL, err = registry.NewServiceURL(n)
serviceURL, err = service.NewServiceURL(n)
if err != nil {
log.Error("NewServiceURL(%s) = error{%v}", n, jerrors.ErrorStack(err))
continue
......@@ -120,7 +121,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co
log.Info("add serviceURL{%s}", serviceURL)
l.events <- zkEvent{&registry.ServiceURLEvent{registry.ServiceURLAdd, serviceURL}, nil}
// listen l service node
go func(node string, serviceURL *registry.ServiceURL) {
go func(node string, serviceURL *service.ServiceURL) {
log.Info("delete zkNode{%s}", node)
if l.listenServiceNodeEvent(node) {
log.Info("delete serviceURL{%s}", serviceURL)
......@@ -139,7 +140,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co
oldNode = path.Join(zkPath, n)
log.Warn("delete zkPath{%s}", oldNode)
serviceURL, err = registry.NewServiceURL(n)
serviceURL, err = service.NewServiceURL(n)
if !conf.ServiceEqual(serviceURL) {
log.Warn("serviceURL{%s} has been deleted is not compatible with ServiceConfig{%#v}", serviceURL, conf)
continue
......@@ -153,7 +154,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co
}
}
func (l *zkEventListener) listenDirEvent(zkPath string, conf registry.ServiceConfig) {
func (l *zkEventListener) listenDirEvent(zkPath string, conf *service.ServiceConfig) {
l.wg.Add(1)
defer l.wg.Done()
......@@ -219,13 +220,13 @@ func (l *zkEventListener) listenDirEvent(zkPath string, conf registry.ServiceCon
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent
// |
// --------> listenServiceNodeEvent
func (l *zkEventListener) listenServiceEvent(conf registry.ServiceConfig) {
func (l *zkEventListener) listenServiceEvent(conf *service.ServiceConfig) {
var (
err error
zkPath string
dubboPath string
children []string
serviceURL *registry.ServiceURL
serviceURL *service.ServiceURL
)
zkPath = fmt.Sprintf("/dubbo/%s/providers", conf.Service)
......@@ -250,7 +251,7 @@ func (l *zkEventListener) listenServiceEvent(conf registry.ServiceConfig) {
}
for _, c := range children {
serviceURL, err = registry.NewServiceURL(c)
serviceURL, err = service.NewServiceURL(c)
if err != nil {
log.Error("NewServiceURL(r{%s}) = error{%v}", c, err)
continue
......@@ -265,7 +266,7 @@ func (l *zkEventListener) listenServiceEvent(conf registry.ServiceConfig) {
// listen l service node
dubboPath = path.Join(zkPath, c)
log.Info("listen dubbo service key{%s}", dubboPath)
go func(zkPath string, serviceURL *registry.ServiceURL) {
go func(zkPath string, serviceURL *service.ServiceURL) {
if l.listenServiceNodeEvent(dubboPath) {
log.Debug("delete serviceUrl{%s}", serviceURL)
l.events <- zkEvent{&registry.ServiceURLEvent{registry.ServiceURLDel, serviceURL}, nil}
......@@ -275,23 +276,25 @@ func (l *zkEventListener) listenServiceEvent(conf registry.ServiceConfig) {
}
log.Info("listen dubbo path{%s}", zkPath)
go func(zkPath string, conf registry.ServiceConfig) {
go func(zkPath string, conf *service.ServiceConfig) {
l.listenDirEvent(zkPath, conf)
log.Warn("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, conf)
}
func (l *zkEventListener) listenEvent(r *ZkRegistry) error {
func (l *zkEventListener) listenEvent(r *ZkRegistry,ch chan *registry.ServiceURLEvent) error {
for {
select {
case <-l.client.done():
log.Warn("listener's zk client connection is broken, so zk event listener exit now.")
l.close()
close(ch)
return jerrors.New("listener stopped")
case <-r.done:
log.Warn("zk consumer register has quit, so zk event listener exit asap now.")
l.close()
close(ch)
return jerrors.New("listener stopped")
case e := <-l.events:
......@@ -303,7 +306,9 @@ func (l *zkEventListener) listenEvent(r *ZkRegistry) error {
log.Warn("update @result{%s}. But its connection to registry is invalid", e.res)
continue
}
r.update(e.res)
//r.update(e.res)
//write to invoker
ch <- e.res
}
}
}
......
package zookeeper
import (
"github.com/dubbo/dubbo-go/registry"
jerrors "github.com/juju/errors"
log "github.com/AlexStocks/log4go"
"github.com/dubbo/dubbo-go/service"
jerrors "github.com/juju/errors"
)
type ProviderServiceConfig struct {
registry.ServiceConfig
service.ServiceConfig
}
func (r *ZkRegistry) NewProviderServiceConfig(config registry.ServiceConfig)registry.ServiceConfigIf{
func (r *ZkRegistry) NewProviderServiceConfig(config service.ServiceConfig)service.ServiceConfigIf{
return ProviderServiceConfig{
config,
}
}
func (r *ZkRegistry) ProviderRegister(c registry.ServiceConfigIf) error {
func (r *ZkRegistry) ProviderRegister(c service.ServiceConfigIf) error {
var (
ok bool
err error
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment