Skip to content
Snippets Groups Projects
getty_client.go 5.74 KiB
Newer Older
cvictory's avatar
cvictory committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package getty

import (
	"math/rand"
	"time"
cvictory's avatar
cvictory committed

fangyincheng's avatar
fangyincheng committed
	"github.com/apache/dubbo-getty"
cvictory's avatar
cvictory committed
	gxsync "github.com/dubbogo/gost/sync"
	perrors "github.com/pkg/errors"
	"gopkg.in/yaml.v2"
)
cvictory's avatar
cvictory committed

cvictory's avatar
cvictory committed
	"github.com/apache/dubbo-go/common"
fangyincheng's avatar
fangyincheng committed
	"github.com/apache/dubbo-go/common/constant"
cvictory's avatar
cvictory committed
	"github.com/apache/dubbo-go/common/logger"
	"github.com/apache/dubbo-go/config"
	"github.com/apache/dubbo-go/remoting"
cvictory's avatar
cvictory committed
)

var (
yakecanlee's avatar
yakecanlee committed
	errSessionNotExist               = perrors.New("session not exist")
	errClientClosed                  = perrors.New("client closed")
yakecanlee's avatar
yakecanlee committed
	errClientReadTimeoutOrDecoedFail = perrors.New("maybe the client read timeout or fail to decode tcp stream in Writer.Write")
cvictory's avatar
cvictory committed

	clientConf   *ClientConfig
	clientGrpool *gxsync.TaskPool
)

// it is init client for single protocol.
cvictory's avatar
cvictory committed
func initClient(protocol string) {
cvictory's avatar
cvictory committed
	if protocol == "" {
		return
	}

	// load clientconfig from consumer_config
	// default use dubbo
	consumerConfig := config.GetConsumerConfig()
	if consumerConfig.ApplicationConfig == nil {
		return
	}
	protocolConf := config.GetConsumerConfig().ProtocolConf
	defaultClientConfig := GetDefaultClientConfig()
	if protocolConf == nil {
		logger.Info("protocol_conf default use dubbo config")
	} else {
		dubboConf := protocolConf.(map[interface{}]interface{})[protocol]
		if dubboConf == nil {
			logger.Warnf("dubboConf is nil")
			return
		}
		dubboConfByte, err := yaml.Marshal(dubboConf)
		if err != nil {
			panic(err)
		}
		err = yaml.Unmarshal(dubboConfByte, &defaultClientConfig)
		if err != nil {
			panic(err)
		}
	}
	clientConf = &defaultClientConfig
	if err := clientConf.CheckValidity(); err != nil {
		logger.Warnf("[CheckValidity] error: %v", err)
		return
	}
	setClientGrpool()

	rand.Seed(time.Now().UnixNano())
}

cvictory's avatar
cvictory committed
// Config ClientConf
cvictory's avatar
cvictory committed
func SetClientConf(c ClientConfig) {
	clientConf = &c
	err := clientConf.CheckValidity()
	if err != nil {
		logger.Warnf("[ClientConfig CheckValidity] error: %v", err)
		return
	}
	setClientGrpool()
}

func setClientGrpool() {
	if clientConf.GrPoolSize > 1 {
		clientGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(clientConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(clientConf.QueueLen),
			gxsync.WithTaskPoolTaskQueueNumber(clientConf.QueueNumber))
	}
}

// Options : param config
cvictory's avatar
cvictory committed
type Options struct {
	// connect timeout
AlexStocks's avatar
AlexStocks committed
	// remove request timeout, it will be calculate for every request
cvictory's avatar
cvictory committed
	ConnectTimeout time.Duration
fangyincheng's avatar
fangyincheng committed
	// request timeout
	RequestTimeout time.Duration
cvictory's avatar
cvictory committed
}

// Client : some configuration for network communication.
cvictory's avatar
cvictory committed
type Client struct {
fangyincheng's avatar
fangyincheng committed
	addr           string
	opts           Options
	conf           ClientConfig
	pool           *gettyRPCClientPool
	codec          remoting.Codec
	ExchangeClient *remoting.ExchangeClient
cvictory's avatar
cvictory committed
}

cvictory's avatar
cvictory committed
func NewClient(opt Options) *Client {
	switch {
	case opt.ConnectTimeout == 0:
		opt.ConnectTimeout = 3 * time.Second
fangyincheng's avatar
fangyincheng committed
		fallthrough
	case opt.RequestTimeout == 0:
		opt.RequestTimeout = 3 * time.Second
cvictory's avatar
cvictory committed
	}

	c := &Client{
		opts: opt,
	}
	return c
}

cvictory's avatar
cvictory committed
func (c *Client) SetExchangeClient(client *remoting.ExchangeClient) {
	c.ExchangeClient = client
}
cvictory's avatar
cvictory committed

// init client and try to connection.
haohongfan's avatar
haohongfan committed
func (c *Client) Connect(url *common.URL) error {
cvictory's avatar
cvictory committed
	initClient(url.Protocol)
	c.conf = *clientConf
	// new client
	c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
fangyincheng's avatar
fangyincheng committed
	c.pool.sslEnabled = url.GetParamBool(constant.SSL_ENABLED_KEY, false)

cvictory's avatar
cvictory committed
	// codec
	c.codec = remoting.GetCodec(url.Protocol)
cvictory's avatar
cvictory committed
	c.addr = url.Location
cvictory's avatar
cvictory committed
	_, _, err := c.selectSession(c.addr)
cvictory's avatar
cvictory committed
	if err != nil {
		logger.Errorf("try to connect server %v failed for : %v", url.Location, err)
	}
cvictory's avatar
cvictory committed
	return err
cvictory's avatar
cvictory committed
}
cvictory's avatar
cvictory committed

// close network connection
cvictory's avatar
cvictory committed
func (c *Client) Close() {
	if c.pool != nil {
		c.pool.close()
	}
	c.pool = nil
}

cvictory's avatar
cvictory committed
func (c *Client) Request(request *remoting.Request, timeout time.Duration, response *remoting.PendingResponse) error {
cvictory's avatar
cvictory committed
	_, session, err := c.selectSession(c.addr)
cvictory's avatar
cvictory committed
	if err != nil {
		return perrors.WithStack(err)
	}
	if session == nil {
		return errSessionNotExist
	}

cvictory's avatar
cvictory committed
	if err = c.transfer(session, request, timeout); err != nil {
cvictory's avatar
cvictory committed
		return perrors.WithStack(err)
	}

cvictory's avatar
cvictory committed
	if !request.TwoWay || response.Callback != nil {
cvictory's avatar
cvictory committed
		return nil
	}

	select {
	case <-getty.GetTimeWheel().After(timeout):
yakecanlee's avatar
yakecanlee committed
		return perrors.WithStack(errClientReadTimeoutOrDecoedFail)
cvictory's avatar
cvictory committed
	case <-response.Done:
		err = response.Err
	}

	return perrors.WithStack(err)
}

// isAvailable returns true if the connection is available, or it can be re-established.
Ian Luo's avatar
Ian Luo committed
func (c *Client) IsAvailable() bool {
	client, _, err := c.selectSession(c.addr)
	return err == nil &&
		// defensive check
		client != nil
cvictory's avatar
cvictory committed
func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, error) {
	rpcClient, err := c.pool.getGettyRpcClient(addr)
	if err != nil {
		return nil, nil, perrors.WithStack(err)
	}
	return rpcClient, rpcClient.selectSession(), nil
}

cvictory's avatar
cvictory committed
func (c *Client) transfer(session getty.Session, request *remoting.Request, timeout time.Duration) error {
cvictory's avatar
cvictory committed
	err := session.WritePkg(request, timeout)
	return perrors.WithStack(err)
}