Skip to content
Snippets Groups Projects
Commit fa0e7016 authored by fangyincheng's avatar fangyincheng
Browse files

Mrg:merge upstream

parents 7b3609f1 cf94b2f9
No related branches found
No related tags found
No related merge requests found
......@@ -37,7 +37,7 @@ Finished List:
Working List:
- Cluster Strategy: Failfast/Failsafe/Failback/Forking
- Load Balance: RoundRobin/LeastActive/ConsistentHash
- Load Balance: ConsistentHash
- Filter: TokenFilter/AccessLogFilter/CountFilter/ActiveLimitFilter/ExecuteLimitFilter/GenericFilter/TpsLimitFilter
- Registry: etcd/k8s/consul
......
......@@ -37,7 +37,7 @@ Apache License, Version 2.0
开发中列表:
- Cluster Strategy: Failfast/Failsafe/Failback/Forking
- Load Balance: RoundRobin/LeastActive/ConsistentHash
- Load Balance: ConsistentHash
- Filter: TokenFilter/AccessLogFilter/CountFilter/ActiveLimitFilter/ExecuteLimitFilter/GenericFilter/TpsLimitFilter
- Registry: etcd/k8s/consul
......
......@@ -19,6 +19,7 @@ package config
import (
"context"
"fmt"
"net/url"
"strconv"
"strings"
......@@ -37,6 +38,7 @@ import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/protocolwrapper"
)
type ServiceConfig struct {
......@@ -109,23 +111,33 @@ func (srvconfig *ServiceConfig) Export() error {
common.WithParams(urlMap),
common.WithMethods(strings.Split(methods, ",")))
for _, regUrl := range regUrls {
regUrl.SubURL = url
srvconfig.cacheMutex.Lock()
if srvconfig.cacheProtocol == nil {
logger.Infof("First load the registry protocol!")
srvconfig.cacheProtocol = extension.GetProtocol("registry")
if len(regUrls) > 0 {
for _, regUrl := range regUrls {
regUrl.SubURL = url
srvconfig.cacheMutex.Lock()
if srvconfig.cacheProtocol == nil {
logger.Infof(fmt.Sprintf("First load the registry protocol , url is {%v}!", url))
srvconfig.cacheProtocol = extension.GetProtocol("registry")
}
srvconfig.cacheMutex.Unlock()
invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*regUrl)
exporter := srvconfig.cacheProtocol.Export(invoker)
if exporter == nil {
panic(perrors.New(fmt.Sprintf("Registry protocol new exporter error,registry is {%v},url is {%v}", regUrl, url)))
}
srvconfig.exporters = append(srvconfig.exporters, exporter)
}
srvconfig.cacheMutex.Unlock()
invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*regUrl)
exporter := srvconfig.cacheProtocol.Export(invoker)
} else {
invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*url)
exporter := extension.GetProtocol(protocolwrapper.FILTER).Export(invoker)
if exporter == nil {
panic(perrors.New("New exporter error"))
panic(perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error,url is {%v}", url)))
}
srvconfig.exporters = append(srvconfig.exporters, exporter)
}
}
return nil
......
......@@ -2,7 +2,7 @@ module github.com/apache/dubbo-go
require (
github.com/dubbogo/getty v1.0.7
github.com/dubbogo/hessian2 v1.0.1
github.com/dubbogo/hessian2 v1.0.2
github.com/pkg/errors v0.8.1
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec
github.com/stretchr/testify v1.3.0
......
......@@ -24,7 +24,7 @@ import (
import (
"github.com/dubbogo/getty"
"github.com/dubbogo/hessian2"
hessian "github.com/dubbogo/hessian2"
perrors "github.com/pkg/errors"
)
import (
......@@ -89,12 +89,11 @@ func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) error
// RpcServerPackageHandler
////////////////////////////////////////////
type RpcServerPackageHandler struct {
}
var (
rpcServerPkgHandler = &RpcServerPackageHandler{}
)
func NewRpcServerPackageHandler() *RpcServerPackageHandler {
return &RpcServerPackageHandler{}
}
type RpcServerPackageHandler struct{}
func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
pkg := &DubboPackage{
......
......@@ -79,6 +79,8 @@ type Server struct {
conf ServerConfig
tcpServer getty.Server
exporter protocol.Exporter
rpcHandler *RpcServerHandler
}
func NewServer(exporter protocol.Exporter) *Server {
......@@ -88,6 +90,8 @@ func NewServer(exporter protocol.Exporter) *Server {
conf: *srvConf,
}
s.rpcHandler = NewRpcServerHandler(s.exporter, s.conf.SessionNumber, s.conf.sessionTimeout)
return s
}
......@@ -116,8 +120,8 @@ func (s *Server) newSession(session getty.Session) error {
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcServerPackageHandler())
session.SetEventListener(NewRpcServerHandler(s.exporter, conf.SessionNumber, conf.sessionTimeout))
session.SetPkgHandler(rpcServerPkgHandler)
session.SetEventListener(s.rpcHandler)
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment