diff --git a/cluster/cluster_impl/failover_cluster_invoker.go b/cluster/cluster_impl/failover_cluster_invoker.go index 70db6d4c1cdf5150d607d6c5250dfb3da631e95a..3595a04c711eca37b72ceed6f84c55e4dc23bad0 100644 --- a/cluster/cluster_impl/failover_cluster_invoker.go +++ b/cluster/cluster_impl/failover_cluster_invoker.go @@ -72,6 +72,9 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr invoked := []protocol.Invoker{} providers := []string{} var result protocol.Result + if retries > len(invokers) { + retries = len(invokers) + } for i := 0; i <= retries; i++ { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. @@ -87,6 +90,9 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr } } ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked) + if ivk == nil { + continue + } invoked = append(invoked, ivk) //DO INVOKE result = ivk.Invoke(invocation) diff --git a/go.mod b/go.mod index 09948c93c582feca5a3c8ee8df379e03bb6a4e84..36ce95545228981734e7de6877b3e8e70203f8f9 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ require ( github.com/Workiva/go-datastructures v1.0.50 github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e // indirect - github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190909140437-80cbb25cbb22 + github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190923055845-e3dd5d131df5 github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect github.com/coreos/bbolt v1.3.3 // indirect github.com/coreos/etcd v3.3.13+incompatible @@ -12,7 +12,7 @@ require ( github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/creasty/defaults v1.3.0 - github.com/dubbogo/getty v1.2.2 + github.com/dubbogo/getty v1.3.0 github.com/dubbogo/gost v1.1.1 github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect github.com/go-errors/errors v1.0.1 // indirect diff --git a/go.sum b/go.sum index 1452a762985e615d49a40df33d8ee8166940a4c7..f8900569b0eca5d39a98291e0d7e515dfb8bfbcb 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,10 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e h1:MSuLXx/mveDbpDNhVrcWTMeV4lbYWKcyO4rH+jAxmX0= github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e/go.mod h1:myCDvQSzCW+wB1WAlocEru4wMGJxy+vlxHdhegi1CDQ= github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= -github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190909140437-80cbb25cbb22 h1:Ku+3LFRYVelgo/INS9893QOUeIiKNeNKzK3CzDcqt/4= -github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190909140437-80cbb25cbb22/go.mod h1:LWnndnrFXZmJLAzoyNAPNHSIJ1KOHVkTSsHgC3YYWlo= +github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190921023740-335b8c601359 h1:ti5HOgxW/aKonsBe4Sj/W3+RMq4Jxl/EAAblROneggg= +github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190921023740-335b8c601359/go.mod h1:LWnndnrFXZmJLAzoyNAPNHSIJ1KOHVkTSsHgC3YYWlo= +github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190923055845-e3dd5d131df5 h1:p85EqnwOfcqqayW7OPREn0YJxIPIuEmuBJPezzhtO/M= +github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190923055845-e3dd5d131df5/go.mod h1:LWnndnrFXZmJLAzoyNAPNHSIJ1KOHVkTSsHgC3YYWlo= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= @@ -102,8 +104,8 @@ github.com/docker/go-connections v0.3.0 h1:3lOnM9cSzgGwx8VfK/NGOW5fLQ0GjIlCkaktF github.com/docker/go-connections v0.3.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-units v0.3.3 h1:Xk8S3Xj5sLGlG5g67hJmYMmUgXv5N4PhkjJHHqrwnTk= github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= -github.com/dubbogo/getty v1.2.2 h1:qDC9WXjxcs5NPvWZz2ruVKBKr2r1Jjm6i0Sq//CQwbE= -github.com/dubbogo/getty v1.2.2/go.mod h1:K4b3MkGLf7T+lMgQNFgpg0dI1Wvv1PTisFs1Psf86kU= +github.com/dubbogo/getty v1.3.0 h1:GImOCANdts7dlRqi9GMVsZJnfst9EPyjTVTR1AesOD8= +github.com/dubbogo/getty v1.3.0/go.mod h1:K4b3MkGLf7T+lMgQNFgpg0dI1Wvv1PTisFs1Psf86kU= github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI= github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg= github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74 h1:2MIhn2R6oXQbgW5yHfS+d6YqyMfXiu2L55rFZC4UD/M= @@ -184,7 +186,7 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGa github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= -github.com/gotestyourself/gotestyourself v2.2.0+incompatible h1:1yOKgt0XYKUg1HOKunGOSt2ocU4bxLCjmIHt0vRtVHM= +github.com/gotestyourself/gotestyourself v2.2.0+incompatible h1:AQwinXlbQR2HvPjQZOmDhRqsv5mZf+Jb1RnSLxcqZcI= github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= @@ -553,7 +555,7 @@ gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bl gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gotest.tools v2.2.0+incompatible h1:y0IMTfclpMdsdIbr6uwmJn5/WZ7vFuObxDMdrylFM3A= +gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go index a57c29f890cc76aa57b316aba8bead1bb76cf6ff..930382cca8bac6955b516a88e93ce26d73e235fe 100644 --- a/protocol/dubbo/readwriter.go +++ b/protocol/dubbo/readwriter.go @@ -68,20 +68,20 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil } -func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) error { +func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) { req, ok := pkg.(*DubboPackage) if !ok { logger.Errorf("illegal pkg:%+v\n", pkg) - return perrors.New("invalid rpc request") + return nil, perrors.New("invalid rpc request") } buf, err := req.Marshal() if err != nil { logger.Warnf("binary.Write(req{%#v}) = err{%#v}", req, perrors.WithStack(err)) - return perrors.WithStack(err) + return nil, perrors.WithStack(err) } - return perrors.WithStack(ss.WriteBytes(buf.Bytes())) + return buf.Bytes(), nil } //////////////////////////////////////////// @@ -164,18 +164,18 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil } -func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) error { +func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) { res, ok := pkg.(*DubboPackage) if !ok { logger.Errorf("illegal pkg:%+v\n, it is %+v", pkg, reflect.TypeOf(pkg)) - return perrors.New("invalid rpc response") + return nil, perrors.New("invalid rpc response") } buf, err := res.Marshal() if err != nil { logger.Warnf("binary.Write(res{%#v}) = err{%#v}", res, perrors.WithStack(err)) - return perrors.WithStack(err) + return nil, perrors.WithStack(err) } - return perrors.WithStack(ss.WriteBytes(buf.Bytes())) + return buf.Bytes(), nil } diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index d1e02b11b21396d7963d2cfffb8e1211b6ee6666..c746cf8ef291bd198274971aad23e91ba6443e36 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -174,7 +174,7 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte logger.Infof("The exporter has not been cached, and will return a new exporter!") } - reg.Subscribe(overriderUrl, overrideSubscribeListener) + go reg.Subscribe(overriderUrl, overrideSubscribeListener) return cachedExporter.(protocol.Exporter) }