/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package getty import ( "math/rand" "time" ) import ( "github.com/apache/dubbo-getty" gxsync "github.com/dubbogo/gost/sync" perrors "github.com/pkg/errors" "gopkg.in/yaml.v2" ) import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/remoting" ) var ( errSessionNotExist = perrors.New("session not exist") errClientClosed = perrors.New("client closed") errClientReadTimeoutOrDecoedFail = perrors.New("maybe the client read timeout or fail to decode tcp stream in Writer.Write") clientConf *ClientConfig clientGrpool *gxsync.TaskPool ) // it is init client for single protocol. func initClient(protocol string) { if protocol == "" { return } // load clientconfig from consumer_config // default use dubbo consumerConfig := config.GetConsumerConfig() if consumerConfig.ApplicationConfig == nil { return } protocolConf := config.GetConsumerConfig().ProtocolConf defaultClientConfig := GetDefaultClientConfig() if protocolConf == nil { logger.Info("protocol_conf default use dubbo config") } else { dubboConf := protocolConf.(map[interface{}]interface{})[protocol] if dubboConf == nil { logger.Warnf("dubboConf is nil") return } dubboConfByte, err := yaml.Marshal(dubboConf) if err != nil { panic(err) } err = yaml.Unmarshal(dubboConfByte, &defaultClientConfig) if err != nil { panic(err) } } clientConf = &defaultClientConfig if err := clientConf.CheckValidity(); err != nil { logger.Warnf("[CheckValidity] error: %v", err) return } setClientGrpool() rand.Seed(time.Now().UnixNano()) } // Config ClientConf func SetClientConf(c ClientConfig) { clientConf = &c err := clientConf.CheckValidity() if err != nil { logger.Warnf("[ClientConfig CheckValidity] error: %v", err) return } setClientGrpool() } func setClientGrpool() { if clientConf.GrPoolSize > 1 { clientGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(clientConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(clientConf.QueueLen), gxsync.WithTaskPoolTaskQueueNumber(clientConf.QueueNumber)) } } // Options : param config type Options struct { // connect timeout // remove request timeout, it will be calculate for every request ConnectTimeout time.Duration // request timeout RequestTimeout time.Duration } // Client : some configuration for network communication. type Client struct { addr string opts Options conf ClientConfig pool *gettyRPCClientPool codec remoting.Codec ExchangeClient *remoting.ExchangeClient } // create client func NewClient(opt Options) *Client { switch { case opt.ConnectTimeout == 0: opt.ConnectTimeout = 3 * time.Second fallthrough case opt.RequestTimeout == 0: opt.RequestTimeout = 3 * time.Second } c := &Client{ opts: opt, } return c } func (c *Client) SetExchangeClient(client *remoting.ExchangeClient) { c.ExchangeClient = client } // init client and try to connection. func (c *Client) Connect(url *common.URL) error { initClient(url.Protocol) c.conf = *clientConf // new client c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) c.pool.sslEnabled = url.GetParamBool(constant.SSL_ENABLED_KEY, false) // codec c.codec = remoting.GetCodec(url.Protocol) c.addr = url.Location _, _, err := c.selectSession(c.addr) if err != nil { logger.Errorf("try to connect server %v failed for : %v", url.Location, err) } return err } // close network connection func (c *Client) Close() { if c.pool != nil { c.pool.close() } c.pool = nil } // send request func (c *Client) Request(request *remoting.Request, timeout time.Duration, response *remoting.PendingResponse) error { _, session, err := c.selectSession(c.addr) if err != nil { return perrors.WithStack(err) } if session == nil { return errSessionNotExist } if err = c.transfer(session, request, timeout); err != nil { return perrors.WithStack(err) } if !request.TwoWay || response.Callback != nil { return nil } select { case <-getty.GetTimeWheel().After(timeout): return perrors.WithStack(errClientReadTimeoutOrDecoedFail) case <-response.Done: err = response.Err } return perrors.WithStack(err) } // isAvailable returns true if the connection is available, or it can be re-established. func (c *Client) IsAvailable() bool { client, _, err := c.selectSession(c.addr) return err == nil && // defensive check client != nil } func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, error) { rpcClient, err := c.pool.getGettyRpcClient(addr) if err != nil { return nil, nil, perrors.WithStack(err) } return rpcClient, rpcClient.selectSession(), nil } func (c *Client) transfer(session getty.Session, request *remoting.Request, timeout time.Duration) error { err := session.WritePkg(request, timeout) return perrors.WithStack(err) }