Newer
Older
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package getty
import (
"math/rand"
"time"
perrors "github.com/pkg/errors"
"gopkg.in/yaml.v2"
)
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
errSessionNotExist = perrors.New("session not exist")
errClientClosed = perrors.New("client closed")
errClientReadTimeoutOrDecoedFail = perrors.New("maybe the client read timeout or fail to decode tcp stream in Writer.Write")
clientConf *ClientConfig
clientGrpool *gxsync.TaskPool
)
// it is init client for single protocol.
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
if protocol == "" {
return
}
// load clientconfig from consumer_config
// default use dubbo
consumerConfig := config.GetConsumerConfig()
if consumerConfig.ApplicationConfig == nil {
return
}
protocolConf := config.GetConsumerConfig().ProtocolConf
defaultClientConfig := GetDefaultClientConfig()
if protocolConf == nil {
logger.Info("protocol_conf default use dubbo config")
} else {
dubboConf := protocolConf.(map[interface{}]interface{})[protocol]
if dubboConf == nil {
logger.Warnf("dubboConf is nil")
return
}
dubboConfByte, err := yaml.Marshal(dubboConf)
if err != nil {
panic(err)
}
err = yaml.Unmarshal(dubboConfByte, &defaultClientConfig)
if err != nil {
panic(err)
}
}
clientConf = &defaultClientConfig
if err := clientConf.CheckValidity(); err != nil {
logger.Warnf("[CheckValidity] error: %v", err)
return
}
setClientGrpool()
rand.Seed(time.Now().UnixNano())
}
func SetClientConf(c ClientConfig) {
clientConf = &c
err := clientConf.CheckValidity()
if err != nil {
logger.Warnf("[ClientConfig CheckValidity] error: %v", err)
return
}
setClientGrpool()
}
func setClientGrpool() {
if clientConf.GrPoolSize > 1 {
clientGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(clientConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(clientConf.QueueLen),
gxsync.WithTaskPoolTaskQueueNumber(clientConf.QueueNumber))
}
}
// Options : param config
// remove request timeout, it will be calculate for every request
// Client : some configuration for network communication.
addr string
opts Options
conf ClientConfig
pool *gettyRPCClientPool
codec remoting.Codec
ExchangeClient *remoting.ExchangeClient
func NewClient(opt Options) *Client {
switch {
case opt.ConnectTimeout == 0:
opt.ConnectTimeout = 3 * time.Second
fallthrough
case opt.RequestTimeout == 0:
opt.RequestTimeout = 3 * time.Second
}
c := &Client{
opts: opt,
}
return c
}
func (c *Client) SetExchangeClient(client *remoting.ExchangeClient) {
c.ExchangeClient = client
}
initClient(url.Protocol)
c.conf = *clientConf
// new client
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
c.pool.sslEnabled = url.GetParamBool(constant.SSL_ENABLED_KEY, false)
if err != nil {
logger.Errorf("try to connect server %v failed for : %v", url.Location, err)
}
// close network connection
func (c *Client) Close() {
if c.pool != nil {
c.pool.close()
}
c.pool = nil
}
func (c *Client) Request(request *remoting.Request, timeout time.Duration, response *remoting.PendingResponse) error {
if err != nil {
return perrors.WithStack(err)
}
if session == nil {
return errSessionNotExist
}
return nil
}
select {
case <-getty.GetTimeWheel().After(timeout):
return perrors.WithStack(errClientReadTimeoutOrDecoedFail)
case <-response.Done:
err = response.Err
}
return perrors.WithStack(err)
}
// isAvailable returns true if the connection is available, or it can be re-established.
client, _, err := c.selectSession(c.addr)
return err == nil &&
// defensive check
client != nil
func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, error) {
rpcClient, err := c.pool.getGettyRpcClient(addr)
if err != nil {
return nil, nil, perrors.WithStack(err)
}
return rpcClient, rpcClient.selectSession(), nil
}
func (c *Client) transfer(session getty.Session, request *remoting.Request, timeout time.Duration) error {
err := session.WritePkg(request, timeout)
return perrors.WithStack(err)
}