Skip to content
Snippets Groups Projects
Commit 47247be8 authored by 邹毅贤's avatar 邹毅贤
Browse files

Merge pull request #722 from dubbo-x/read

fix exporter append
parent a16259db
No related branches found
No related tags found
No related merge requests found
...@@ -169,7 +169,7 @@ func (sm *serviceMap) GetService(protocol, name string) *Service { ...@@ -169,7 +169,7 @@ func (sm *serviceMap) GetService(protocol, name string) *Service {
return nil return nil
} }
// GetInterface gets an interface defination by interface name // GetInterface gets an interface definition by interface name
func (sm *serviceMap) GetInterface(interfaceName string) []*Service { func (sm *serviceMap) GetInterface(interfaceName string) []*Service {
sm.mutex.RLock() sm.mutex.RLock()
defer sm.mutex.RUnlock() defer sm.mutex.RUnlock()
......
...@@ -141,19 +141,18 @@ func loadConsumerConfig() { ...@@ -141,19 +141,18 @@ func loadConsumerConfig() {
// wait for invoker is available, if wait over default 3s, then panic // wait for invoker is available, if wait over default 3s, then panic
var count int var count int
checkok := true
for { for {
checkok := true
for _, refconfig := range consumerConfig.References { for _, refconfig := range consumerConfig.References {
if (refconfig.Check != nil && *refconfig.Check) || if (refconfig.Check != nil && *refconfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check != nil && *consumerConfig.Check) || (refconfig.Check == nil && consumerConfig.Check != nil && *consumerConfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check == nil) { // default to true (refconfig.Check == nil && consumerConfig.Check == nil) { // default to true
if refconfig.invoker != nil && if refconfig.invoker != nil && !refconfig.invoker.IsAvailable() {
!refconfig.invoker.IsAvailable() {
checkok = false checkok = false
count++ count++
if count > maxWait { if count > maxWait {
errMsg := fmt.Sprintf("Failed to check the status of the service %v . No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version) errMsg := fmt.Sprintf("Failed to check the status of the service %v. No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
logger.Error(errMsg) logger.Error(errMsg)
panic(errMsg) panic(errMsg)
} }
...@@ -161,14 +160,13 @@ func loadConsumerConfig() { ...@@ -161,14 +160,13 @@ func loadConsumerConfig() {
break break
} }
if refconfig.invoker == nil { if refconfig.invoker == nil {
logger.Warnf("The interface %s invoker not exist , may you should check your interface config.", refconfig.InterfaceName) logger.Warnf("The interface %s invoker not exist, may you should check your interface config.", refconfig.InterfaceName)
} }
} }
} }
if checkok { if checkok {
break break
} }
checkok = true
} }
} }
......
...@@ -60,8 +60,8 @@ type ConsumerConfig struct { ...@@ -60,8 +60,8 @@ type ConsumerConfig struct {
References map[string]*ReferenceConfig `yaml:"references" json:"references,omitempty" property:"references"` References map[string]*ReferenceConfig `yaml:"references" json:"references,omitempty" property:"references"`
ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf"` ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf"`
FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf" ` FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf"`
ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf" ` ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf"`
ConfigType map[string]string `yaml:"config_type" json:"config_type,omitempty" property:"config_type"` ConfigType map[string]string `yaml:"config_type" json:"config_type,omitempty" property:"config_type"`
} }
......
...@@ -43,9 +43,9 @@ type ProviderConfig struct { ...@@ -43,9 +43,9 @@ type ProviderConfig struct {
ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty" property:"proxy_factory"` ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty" property:"proxy_factory"`
Services map[string]*ServiceConfig `yaml:"services" json:"services,omitempty" property:"services"` Services map[string]*ServiceConfig `yaml:"services" json:"services,omitempty" property:"services"`
Protocols map[string]*ProtocolConfig `yaml:"protocols" json:"protocols,omitempty" property:"protocols"` Protocols map[string]*ProtocolConfig `yaml:"protocols" json:"protocols,omitempty" property:"protocols"`
ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf" ` ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf"`
FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf" ` FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf"`
ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf" ` ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf"`
ConfigType map[string]string `yaml:"config_type" json:"config_type,omitempty" property:"config_type"` ConfigType map[string]string `yaml:"config_type" json:"config_type,omitempty" property:"config_type"`
Registry *RegistryConfig `yaml:"registry" json:"registry,omitempty" property:"registry"` Registry *RegistryConfig `yaml:"registry" json:"registry,omitempty" property:"registry"`
......
...@@ -133,7 +133,7 @@ func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List { ...@@ -133,7 +133,7 @@ func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List {
tcp, err := gxnet.ListenOnTCPRandomPort(proto.Ip) tcp, err := gxnet.ListenOnTCPRandomPort(proto.Ip)
if err != nil { if err != nil {
panic(perrors.New(fmt.Sprintf("Get tcp port error,err is {%v}", err))) panic(perrors.New(fmt.Sprintf("Get tcp port error, err is {%v}", err)))
} }
defer tcp.Close() defer tcp.Close()
ports.PushBack(strings.Split(tcp.Addr().String(), ":")[1]) ports.PushBack(strings.Split(tcp.Addr().String(), ":")[1])
...@@ -145,14 +145,14 @@ func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List { ...@@ -145,14 +145,14 @@ func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List {
func (c *ServiceConfig) Export() error { func (c *ServiceConfig) Export() error {
// TODO: config center start here // TODO: config center start here
// TODO:delay export // TODO: delay export
if c.unexported != nil && c.unexported.Load() { if c.unexported != nil && c.unexported.Load() {
err := perrors.Errorf("The service %v has already unexported! ", c.InterfaceName) err := perrors.Errorf("The service %v has already unexported!", c.InterfaceName)
logger.Errorf(err.Error()) logger.Errorf(err.Error())
return err return err
} }
if c.unexported != nil && c.exported.Load() { if c.unexported != nil && c.exported.Load() {
logger.Warnf("The service %v has already exported! ", c.InterfaceName) logger.Warnf("The service %v has already exported!", c.InterfaceName)
return nil return nil
} }
...@@ -160,23 +160,23 @@ func (c *ServiceConfig) Export() error { ...@@ -160,23 +160,23 @@ func (c *ServiceConfig) Export() error {
urlMap := c.getUrlMap() urlMap := c.getUrlMap()
protocolConfigs := loadProtocol(c.Protocol, c.Protocols) protocolConfigs := loadProtocol(c.Protocol, c.Protocols)
if len(protocolConfigs) == 0 { if len(protocolConfigs) == 0 {
logger.Warnf("The service %v's '%v' protocols don't has right protocolConfigs ", c.InterfaceName, c.Protocol) logger.Warnf("The service %v's '%v' protocols don't has right protocolConfigs", c.InterfaceName, c.Protocol)
return nil return nil
} }
ports := getRandomPort(protocolConfigs) ports := getRandomPort(protocolConfigs)
nextPort := ports.Front() nextPort := ports.Front()
proxyFactory := extension.GetProxyFactory(providerConfig.ProxyFactory)
for _, proto := range protocolConfigs { for _, proto := range protocolConfigs {
// registry the service reflect // registry the service reflect
methods, err := common.ServiceMap.Register(c.InterfaceName, proto.Name, c.rpcService) methods, err := common.ServiceMap.Register(c.InterfaceName, proto.Name, c.rpcService)
if err != nil { if err != nil {
formatErr := perrors.Errorf("The service %v export the protocol %v error! Error message is %v .", c.InterfaceName, proto.Name, err.Error()) formatErr := perrors.Errorf("The service %v export the protocol %v error! Error message is %v.", c.InterfaceName, proto.Name, err.Error())
logger.Errorf(formatErr.Error()) logger.Errorf(formatErr.Error())
return formatErr return formatErr
} }
port := proto.Port port := proto.Port
if len(proto.Port) == 0 { if len(proto.Port) == 0 {
port = nextPort.Value.(string) port = nextPort.Value.(string)
nextPort = nextPort.Next() nextPort = nextPort.Next()
...@@ -196,33 +196,31 @@ func (c *ServiceConfig) Export() error { ...@@ -196,33 +196,31 @@ func (c *ServiceConfig) Export() error {
ivkURL.AddParam(constant.Tagkey, c.Tag) ivkURL.AddParam(constant.Tagkey, c.Tag)
} }
var exporter protocol.Exporter
if len(regUrls) > 0 { if len(regUrls) > 0 {
c.cacheMutex.Lock()
if c.cacheProtocol == nil {
logger.Infof(fmt.Sprintf("First load the registry protocol, url is {%v}!", ivkURL))
c.cacheProtocol = extension.GetProtocol("registry")
}
c.cacheMutex.Unlock()
for _, regUrl := range regUrls { for _, regUrl := range regUrls {
regUrl.SubURL = ivkURL regUrl.SubURL = ivkURL
invoker := proxyFactory.GetInvoker(*regUrl)
c.cacheMutex.Lock() exporter := c.cacheProtocol.Export(invoker)
if c.cacheProtocol == nil {
logger.Infof(fmt.Sprintf("First load the registry protocol , url is {%v}!", ivkURL))
c.cacheProtocol = extension.GetProtocol("registry")
}
c.cacheMutex.Unlock()
invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*regUrl)
exporter = c.cacheProtocol.Export(invoker)
if exporter == nil { if exporter == nil {
panic(perrors.New(fmt.Sprintf("Registry protocol new exporter error,registry is {%v},url is {%v}", regUrl, ivkURL))) return perrors.New(fmt.Sprintf("Registry protocol new exporter error, registry is {%v}, url is {%v}", regUrl, ivkURL))
} }
c.exporters = append(c.exporters, exporter)
} }
} else { } else {
invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*ivkURL) invoker := proxyFactory.GetInvoker(*ivkURL)
exporter = extension.GetProtocol(protocolwrapper.FILTER).Export(invoker) exporter := extension.GetProtocol(protocolwrapper.FILTER).Export(invoker)
if exporter == nil { if exporter == nil {
panic(perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error,url is {%v}", ivkURL))) return perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error, url is {%v}", ivkURL))
} }
c.exporters = append(c.exporters, exporter)
} }
c.exporters = append(c.exporters, exporter)
} }
c.exported.Store(true) c.exported.Store(true)
return nil return nil
...@@ -314,7 +312,6 @@ func (c *ServiceConfig) getUrlMap() url.Values { ...@@ -314,7 +312,6 @@ func (c *ServiceConfig) getUrlMap() url.Values {
urlMap.Set(constant.EXECUTE_LIMIT_KEY, v.ExecuteLimit) urlMap.Set(constant.EXECUTE_LIMIT_KEY, v.ExecuteLimit)
urlMap.Set(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, v.ExecuteLimitRejectedHandler) urlMap.Set(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, v.ExecuteLimitRejectedHandler)
} }
return urlMap return urlMap
......
...@@ -48,7 +48,6 @@ func NewMetadataServiceExporter(metadataService service.MetadataService) exporte ...@@ -48,7 +48,6 @@ func NewMetadataServiceExporter(metadataService service.MetadataService) exporte
// Export will export the metadataService // Export will export the metadataService
func (exporter *MetadataServiceExporter) Export() error { func (exporter *MetadataServiceExporter) Export() error {
if !exporter.IsExported() { if !exporter.IsExported() {
serviceConfig := config.NewServiceConfig(constant.SIMPLE_METADATA_SERVICE_NAME, context.Background()) serviceConfig := config.NewServiceConfig(constant.SIMPLE_METADATA_SERVICE_NAME, context.Background())
serviceConfig.Protocol = constant.DEFAULT_PROTOCOL serviceConfig.Protocol = constant.DEFAULT_PROTOCOL
serviceConfig.Protocols = map[string]*config.ProtocolConfig{ serviceConfig.Protocols = map[string]*config.ProtocolConfig{
......
...@@ -59,7 +59,7 @@ func NewJsonrpcProtocol() *JsonrpcProtocol { ...@@ -59,7 +59,7 @@ func NewJsonrpcProtocol() *JsonrpcProtocol {
} }
} }
// Export JSON RPC service for remote invocation // Export JSON RPC service for remote invocation
func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter { func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
url := invoker.GetUrl() url := invoker.GetUrl()
serviceKey := strings.TrimPrefix(url.Path, "/") serviceKey := strings.TrimPrefix(url.Path, "/")
......
...@@ -127,10 +127,10 @@ func (s *Server) handlePkg(conn net.Conn) { ...@@ -127,10 +127,10 @@ func (s *Server) handlePkg(conn net.Conn) {
} }
reqBody, err := ioutil.ReadAll(r.Body) reqBody, err := ioutil.ReadAll(r.Body)
r.Body.Close()
if err != nil { if err != nil {
return return
} }
r.Body.Close()
reqHeader := make(map[string]string) reqHeader := make(map[string]string)
for k := range r.Header { for k := range r.Header {
...@@ -263,8 +263,7 @@ func (s *Server) Stop() { ...@@ -263,8 +263,7 @@ func (s *Server) Stop() {
}) })
} }
func serveRequest(ctx context.Context, func serveRequest(ctx context.Context, header map[string]string, body []byte, conn net.Conn) error {
header map[string]string, body []byte, conn net.Conn) error {
sendErrorResp := func(header map[string]string, body []byte) error { sendErrorResp := func(header map[string]string, body []byte) error {
rsp := &http.Response{ rsp := &http.Response{
Header: make(http.Header), Header: make(http.Header),
...@@ -324,13 +323,12 @@ func serveRequest(ctx context.Context, ...@@ -324,13 +323,12 @@ func serveRequest(ctx context.Context,
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
return perrors.WithStack(err) return perrors.WithStack(err)
} }
return perrors.New("server cannot decode request: " + err.Error()) return perrors.New("server cannot decode request: " + err.Error())
} }
path := header["Path"] path := header["Path"]
methodName := codec.req.Method methodName := codec.req.Method
if len(path) == 0 || len(methodName) == 0 { if len(path) == 0 || len(methodName) == 0 {
codec.ReadBody(nil)
return perrors.New("service/method request ill-formed: " + path + "/" + methodName) return perrors.New("service/method request ill-formed: " + path + "/" + methodName)
} }
......
...@@ -68,21 +68,19 @@ func (pfw *ProtocolFilterWrapper) Destroy() { ...@@ -68,21 +68,19 @@ func (pfw *ProtocolFilterWrapper) Destroy() {
} }
func buildInvokerChain(invoker protocol.Invoker, key string) protocol.Invoker { func buildInvokerChain(invoker protocol.Invoker, key string) protocol.Invoker {
filtName := invoker.GetUrl().GetParam(key, "") filterName := invoker.GetUrl().GetParam(key, "")
if filtName == "" { if filterName == "" {
return invoker return invoker
} }
filtNames := strings.Split(filtName, ",") filterNames := strings.Split(filterName, ",")
next := invoker
// The order of filters is from left to right, so loading from right to left // The order of filters is from left to right, so loading from right to left
next := invoker
for i := len(filtNames) - 1; i >= 0; i-- { for i := len(filterNames) - 1; i >= 0; i-- {
flt := extension.GetFilter(filtNames[i]) flt := extension.GetFilter(filterNames[i])
fi := &FilterInvoker{next: next, invoker: invoker, filter: flt} fi := &FilterInvoker{next: next, invoker: invoker, filter: flt}
next = fi next = fi
} }
return next return next
} }
......
...@@ -22,6 +22,7 @@ import ( ...@@ -22,6 +22,7 @@ import (
"strings" "strings"
"sync" "sync"
) )
import ( import (
gxset "github.com/dubbogo/gost/container/set" gxset "github.com/dubbogo/gost/container/set"
) )
...@@ -54,9 +55,10 @@ var ( ...@@ -54,9 +55,10 @@ var (
type registryProtocol struct { type registryProtocol struct {
invokers []protocol.Invoker invokers []protocol.Invoker
// Registry Map<RegistryAddress, Registry> // Registry Map<RegistryAddress, Registry>
registries *sync.Map registries *sync.Map
// To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed. // To solve the problem of RMI repeated exposure port conflicts,
// the services that have been exposed are no longer exposed.
// providerurl <--> exporter // providerurl <--> exporter
bounds *sync.Map bounds *sync.Map
overrideListeners *sync.Map overrideListeners *sync.Map
...@@ -100,7 +102,6 @@ func getUrlToRegistry(providerUrl *common.URL, registryUrl *common.URL) *common. ...@@ -100,7 +102,6 @@ func getUrlToRegistry(providerUrl *common.URL, registryUrl *common.URL) *common.
// filterHideKey filter the parameters that do not need to be output in url(Starting with .) // filterHideKey filter the parameters that do not need to be output in url(Starting with .)
func filterHideKey(url *common.URL) *common.URL { func filterHideKey(url *common.URL) *common.URL {
// be careful params maps in url is map type // be careful params maps in url is map type
removeSet := gxset.NewSet() removeSet := gxset.NewSet()
for k := range url.GetParams() { for k := range url.GetParams() {
...@@ -139,7 +140,6 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { ...@@ -139,7 +140,6 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
} }
var reg registry.Registry var reg registry.Registry
if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded { if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded {
reg = getRegistry(&registryUrl) reg = getRegistry(&registryUrl)
proto.registries.Store(registryUrl.Key(), reg) proto.registries.Store(registryUrl.Key(), reg)
...@@ -150,7 +150,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { ...@@ -150,7 +150,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
// new registry directory for store service url from registry // new registry directory for store service url from registry
directory, err := extension.GetDefaultRegistryDirectory(&registryUrl, reg) directory, err := extension.GetDefaultRegistryDirectory(&registryUrl, reg)
if err != nil { if err != nil {
logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!", logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!",
serviceUrl.String(), err.Error()) serviceUrl.String(), err.Error())
return nil return nil
} }
...@@ -163,7 +163,6 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { ...@@ -163,7 +163,6 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
// new cluster invoker // new cluster invoker
cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER)) cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))
invoker := cluster.Join(directory) invoker := cluster.Join(directory)
proto.invokers = append(proto.invokers, invoker) proto.invokers = append(proto.invokers, invoker)
return invoker return invoker
...@@ -204,7 +203,7 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte ...@@ -204,7 +203,7 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte
} }
key := getCacheKey(providerUrl) key := getCacheKey(providerUrl)
logger.Infof("The cached exporter keys is %v !", key) logger.Infof("The cached exporter keys is %v!", key)
cachedExporter, loaded := proto.bounds.Load(key) cachedExporter, loaded := proto.bounds.Load(key)
if loaded { if loaded {
logger.Infof("The exporter has been cached, and will return cached exporter!") logger.Infof("The exporter has been cached, and will return cached exporter!")
...@@ -217,7 +216,6 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte ...@@ -217,7 +216,6 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte
go reg.Subscribe(overriderUrl, overrideSubscribeListener) go reg.Subscribe(overriderUrl, overrideSubscribeListener)
return cachedExporter.(protocol.Exporter) return cachedExporter.(protocol.Exporter)
} }
func (proto *registryProtocol) reExport(invoker protocol.Invoker, newUrl *common.URL) { func (proto *registryProtocol) reExport(invoker protocol.Invoker, newUrl *common.URL) {
...@@ -229,7 +227,6 @@ func (proto *registryProtocol) reExport(invoker protocol.Invoker, newUrl *common ...@@ -229,7 +227,6 @@ func (proto *registryProtocol) reExport(invoker protocol.Invoker, newUrl *common
proto.bounds.Delete(key) proto.bounds.Delete(key)
proto.Export(wrappedNewInvoker) proto.Export(wrappedNewInvoker)
// TODO: unregister & unsubscribe // TODO: unregister & unsubscribe
} }
} }
...@@ -366,7 +363,7 @@ func (proto *registryProtocol) Destroy() { ...@@ -366,7 +363,7 @@ func (proto *registryProtocol) Destroy() {
func getRegistryUrl(invoker protocol.Invoker) *common.URL { func getRegistryUrl(invoker protocol.Invoker) *common.URL {
// here add * for return a new url // here add * for return a new url
url := invoker.GetUrl() url := invoker.GetUrl()
// if the protocol == registry ,set protocol the registry value in url.params // if the protocol == registry, set protocol the registry value in url.params
if url.Protocol == constant.REGISTRY_PROTOCOL { if url.Protocol == constant.REGISTRY_PROTOCOL {
protocol := url.GetParam(constant.REGISTRY_KEY, "") protocol := url.GetParam(constant.REGISTRY_KEY, "")
url.Protocol = protocol url.Protocol = protocol
......
...@@ -30,30 +30,38 @@ import ( ...@@ -30,30 +30,38 @@ import (
// Registry Extension - Registry // Registry Extension - Registry
type Registry interface { type Registry interface {
common.Node common.Node
//used for service provider calling , register services to registry
//And it is also used for service consumer calling , register services cared about ,for dubbo's admin monitoring. // Register is used for service provider calling, register services
// to registry. And it is also used for service consumer calling, register
// services cared about, for dubbo's admin monitoring.
Register(url common.URL) error Register(url common.URL) error
// UnRegister is required to support the contract: // UnRegister is required to support the contract:
// 1. If it is the persistent stored data of dynamic=false, the registration data can not be found, then the IllegalStateException is thrown, otherwise it is ignored. // 1. If it is the persistent stored data of dynamic=false, the
// registration data can not be found, then the IllegalStateException
// is thrown, otherwise it is ignored.
// 2. Unregister according to the full url match. // 2. Unregister according to the full url match.
// url Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin // url Registration information, is not allowed to be empty, e.g:
// dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin
UnRegister(url common.URL) error UnRegister(url common.URL) error
//When creating new registry extension,pls select one of the following modes. // Subscribe is required to support the contract:
//Will remove in dubbogo version v1.1.0 // When creating new registry extension, pls select one of the
//mode1 : return Listener with Next function which can return subscribe service event from registry // following modes.
//Deprecated! // Will remove in dubbogo version v1.1.0
//subscribe(event.URL) (Listener, error) // mode1: return Listener with Next function which can return
// subscribe service event from registry
//Will replace mode1 in dubbogo version v1.1.0 // Deprecated!
//mode2 : callback mode, subscribe with notify(notify listener). // subscribe(event.URL) (Listener, error)
// Will replace mode1 in dubbogo version v1.1.0
// mode2: callback mode, subscribe with notify(notify listener).
Subscribe(*common.URL, NotifyListener) error Subscribe(*common.URL, NotifyListener) error
// UnSubscribe is required to support the contract: // UnSubscribe is required to support the contract:
// 1. If don't subscribe, ignore it directly. // 1. If don't subscribe, ignore it directly.
// 2. Unsubscribe by full URL match. // 2. Unsubscribe by full URL match.
// url Subscription condition, not allowed to be empty, e.g. consumer://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin // url Subscription condition, not allowed to be empty, e.g.
// consumer://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin
// listener A listener of the change event, not allowed to be empty // listener A listener of the change event, not allowed to be empty
UnSubscribe(*common.URL, NotifyListener) error UnSubscribe(*common.URL, NotifyListener) error
} }
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment