Skip to content
Snippets Groups Projects
Commit a35e160e authored by fangyincheng's avatar fangyincheng
Browse files

Rft:protocol/dubbo done.

parent 88af7eb4
No related branches found
No related tags found
No related merge requests found
package dubbo
import (
"math/rand"
"strings"
"sync"
"time"
)
import (
"github.com/AlexStocks/getty"
"github.com/AlexStocks/goext/sync/atomic"
"github.com/dubbogo/hessian2"
jerrors "github.com/juju/errors"
)
import (
"github.com/dubbo/dubbo-go/public"
"github.com/dubbo/dubbo-go/registry"
)
var (
errInvalidCodecType = jerrors.New("illegal CodecType")
errInvalidAddress = jerrors.New("remote address invalid or empty")
errSessionNotExist = jerrors.New("session not exist")
errClientClosed = jerrors.New("client closed")
errClientReadTimeout = jerrors.New("client read timeout")
)
func init() {
rand.Seed(time.Now().UnixNano())
}
type CallOptions struct {
// request timeout
RequestTimeout time.Duration
// response timeout
ResponseTimeout time.Duration
// serial ID
SerialID SerialID
Meta map[interface{}]interface{}
}
type CallOption func(*CallOptions)
func WithCallRequestTimeout(d time.Duration) CallOption {
return func(o *CallOptions) {
o.RequestTimeout = d
}
}
func WithCallResponseTimeout(d time.Duration) CallOption {
return func(o *CallOptions) {
o.ResponseTimeout = d
}
}
func WithCallSerialID(s SerialID) CallOption {
return func(o *CallOptions) {
o.SerialID = s
}
}
func WithCallMeta(k, v interface{}) CallOption {
return func(o *CallOptions) {
if o.Meta == nil {
o.Meta = make(map[interface{}]interface{})
}
o.Meta[k] = v
}
}
type CallResponse struct {
Opts CallOptions
Cause error
Start time.Time // invoke(call) start time == write start time
ReadStart time.Time // read start time, write duration = ReadStart - Start
Reply interface{}
}
type AsyncCallback func(response CallResponse)
type Client struct {
conf ClientConfig
pool *gettyRPCClientPool
sequence gxatomic.Uint64
pendingLock sync.RWMutex
pendingResponses map[SequenceType]*PendingResponse
}
func NewClient(conf *ClientConfig) (*Client, error) {
if err := conf.CheckValidity(); err != nil {
return nil, jerrors.Trace(err)
}
c := &Client{
pendingResponses: make(map[SequenceType]*PendingResponse),
conf: *conf,
}
c.pool = newGettyRPCClientConnPool(c, conf.PoolSize, time.Duration(int(time.Second)*conf.PoolTTL))
return c, nil
}
// call one way
func (c *Client) CallOneway(addr string, svcUrl config.URL, method string, args interface{}, opts ...CallOption) error {
var copts CallOptions
for _, o := range opts {
o(&copts)
}
return jerrors.Trace(c.call(CT_OneWay, addr, svcUrl, method, args, nil, nil, copts))
}
// if @reply is nil, the transport layer will get the response without notify the invoker.
func (c *Client) Call(addr string, svcUrl config.URL, method string, args, reply interface{}, opts ...CallOption) error {
var copts CallOptions
for _, o := range opts {
o(&copts)
}
ct := CT_TwoWay
if reply == nil {
ct = CT_OneWay
}
return jerrors.Trace(c.call(ct, addr, svcUrl, method, args, reply, nil, copts))
}
func (c *Client) AsyncCall(addr string, svcUrl config.URL, method string, args interface{},
callback AsyncCallback, reply interface{}, opts ...CallOption) error {
var copts CallOptions
for _, o := range opts {
o(&copts)
}
return jerrors.Trace(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback, copts))
}
func (c *Client) call(ct CallType, addr string, svcUrl config.URL, method string,
args, reply interface{}, callback AsyncCallback, opts CallOptions) error {
if opts.RequestTimeout == 0 {
opts.RequestTimeout = c.conf.GettySessionParam.tcpWriteTimeout
}
if opts.ResponseTimeout == 0 {
opts.ResponseTimeout = c.conf.GettySessionParam.tcpReadTimeout
}
p := &DubboPackage{}
p.Service.Path = strings.TrimPrefix(svcUrl.Path(), "/")
p.Service.Target = strings.TrimPrefix(svcUrl.Path(), "/")
p.Service.Version = svcUrl.Version()
p.Service.Method = method
p.Service.Timeout = opts.RequestTimeout
if opts.SerialID == 0 {
p.Header.SerialID = byte(S_Dubbo)
} else {
p.Header.SerialID = byte(opts.SerialID)
}
p.Body = args
var rsp *PendingResponse
if ct != CT_OneWay {
rsp = NewPendingResponse()
rsp.reply = reply
rsp.callback = callback
rsp.opts = opts
}
var (
err error
session getty.Session
conn *gettyRPCClient
)
conn, session, err = c.selectSession(addr)
if err != nil || session == nil {
return errSessionNotExist
}
defer c.pool.release(conn, err)
if err = c.transfer(session, p, rsp, opts); err != nil {
return jerrors.Trace(err)
}
if ct == CT_OneWay || callback != nil {
return nil
}
select {
case <-getty.GetTimeWheel().After(opts.ResponseTimeout):
err = errClientReadTimeout
c.removePendingResponse(SequenceType(rsp.seq))
case <-rsp.done:
err = rsp.err
}
return jerrors.Trace(err)
}
func (c *Client) Close() {
if c.pool != nil {
c.pool.close()
}
c.pool = nil
}
func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, error) {
rpcClient, err := c.pool.getGettyRpcClient(public.CODECTYPE_DUBBO.String(), addr)
if err != nil {
return nil, nil, jerrors.Trace(err)
}
return rpcClient, rpcClient.selectSession(), nil
}
func (c *Client) heartbeat(session getty.Session) error {
return c.transfer(session, nil, NewPendingResponse(), CallOptions{})
}
func (c *Client) transfer(session getty.Session, pkg *DubboPackage,
rsp *PendingResponse, opts CallOptions) error {
var (
sequence uint64
err error
)
sequence = c.sequence.Add(1)
if pkg == nil {
pkg = &DubboPackage{}
pkg.Body = []interface{}{}
pkg.Header.Type = hessian.Heartbeat
pkg.Header.SerialID = byte(S_Dubbo)
} else {
pkg.Header.Type = hessian.Request
}
pkg.Header.ID = int64(sequence)
// cond1
if rsp != nil {
rsp.seq = sequence
c.addPendingResponse(rsp)
}
err = session.WritePkg(pkg, opts.RequestTimeout)
if err != nil {
c.removePendingResponse(SequenceType(rsp.seq))
} else if rsp != nil { // cond2
// cond2 should not merged with cond1. cause the response package may be returned very
// soon and it will be handled by other goroutine.
rsp.readStart = time.Now()
}
return jerrors.Trace(err)
}
func (c *Client) addPendingResponse(pr *PendingResponse) {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
c.pendingResponses[SequenceType(pr.seq)] = pr
}
func (c *Client) removePendingResponse(seq SequenceType) *PendingResponse {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
if c.pendingResponses == nil {
return nil
}
if presp, ok := c.pendingResponses[seq]; ok {
delete(c.pendingResponses, seq)
return presp
}
return nil
}
package dubbo
import (
"bufio"
"bytes"
"fmt"
"time"
)
import (
"github.com/dubbogo/hessian2"
jerrors "github.com/juju/errors"
)
// serial ID
type SerialID byte
const (
S_Dubbo SerialID = 2
)
// call type
type CallType int32
const (
CT_UNKOWN CallType = 0
CT_OneWay CallType = 1
CT_TwoWay CallType = 2
)
////////////////////////////////////////////
// dubbo package
////////////////////////////////////////////
type SequenceType int64
type DubboPackage struct {
Header hessian.DubboHeader
Service hessian.Service
Body interface{}
Err error
}
func (p DubboPackage) String() string {
return fmt.Sprintf("DubboPackage: Header-%v, Service-%v, Body-%v", p.Header, p.Service, p.Body)
}
func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
codec := hessian.NewHessianCodec(nil)
pkg, err := codec.Write(p.Service, p.Header, p.Body)
if err != nil {
return nil, jerrors.Trace(err)
}
return bytes.NewBuffer(pkg), nil
}
func (p *DubboPackage) Unmarshal(buf *bytes.Buffer) error {
codec := hessian.NewHessianCodec(bufio.NewReader(buf))
// read header
err := codec.ReadHeader(&p.Header)
if err != nil {
return jerrors.Trace(err)
}
if p.Header.Type&hessian.Heartbeat != 0x00 {
return nil
}
// read body
err = codec.ReadBody(p.Body)
return jerrors.Trace(err)
}
////////////////////////////////////////////
// PendingResponse
////////////////////////////////////////////
type PendingResponse struct {
seq uint64
err error
start time.Time
readStart time.Time
callback AsyncCallback
reply interface{}
opts CallOptions
done chan struct{}
}
func NewPendingResponse() *PendingResponse {
return &PendingResponse{
start: time.Now(),
done: make(chan struct{}),
}
}
func (r PendingResponse) GetCallResponse() CallResponse {
return CallResponse{
Opts: r.opts,
Cause: r.err,
Start: r.start,
ReadStart: r.readStart,
Reply: r.reply,
}
}
package dubbo
import (
"time"
)
import (
"github.com/dubbo/dubbo-go/server"
jerrors "github.com/juju/errors"
)
type (
GettySessionParam struct {
CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"`
TcpNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"`
TcpKeepAlive bool `default:"true" yaml:"tcp_keep_alive" json:"tcp_keep_alive,omitempty"`
KeepAlivePeriod string `default:"180s" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"`
keepAlivePeriod time.Duration
TcpRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"`
TcpWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"`
PkgRQSize int `default:"1024" yaml:"pkg_rq_size" json:"pkg_rq_size,omitempty"`
PkgWQSize int `default:"1024" yaml:"pkg_wq_size" json:"pkg_wq_size,omitempty"`
TcpReadTimeout string `default:"1s" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"`
tcpReadTimeout time.Duration
TcpWriteTimeout string `default:"5s" yaml:"tcp_write_timeout" json:"tcp_write_timeout,omitempty"`
tcpWriteTimeout time.Duration
WaitTimeout string `default:"7s" yaml:"wait_timeout" json:"wait_timeout,omitempty"`
waitTimeout time.Duration
MaxMsgLen int `default:"1024" yaml:"max_msg_len" json:"max_msg_len,omitempty"`
SessionName string `default:"rpc" yaml:"session_name" json:"session_name,omitempty"`
}
// Config holds supported types by the multiconfig package
ServerConfig struct {
server.ServerConfig
// local address
//AppName string `default:"rpc-server" yaml:"app_name" json:"app_name,omitempty"`
//Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"`
//Ports []string `yaml:"ports" json:"ports,omitempty"` // `default:["10000"]`
//ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"`
// session
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
sessionTimeout time.Duration
SessionNumber int `default:"1000" yaml:"session_number" json:"session_number,omitempty"`
// app
FailFastTimeout string `default:"5s" yaml:"fail_fast_timeout" json:"fail_fast_timeout,omitempty"`
failFastTimeout time.Duration
// session tcp parameters
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
}
// Config holds supported types by the multiconfig package
ClientConfig struct {
// local address
//AppName string `default:"rpc-client" yaml:"app_name" json:"app_name,omitempty"`
//Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"`
//ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"`
// session pool
ConnectionNum int `default:"16" yaml:"connection_number" json:"connection_number,omitempty"`
// heartbeat
HeartbeatPeriod string `default:"15s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
heartbeatPeriod time.Duration
// session
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
sessionTimeout time.Duration
// app
FailFastTimeout string `default:"5s" yaml:"fail_fast_timeout" json:"fail_fast_timeout,omitempty"`
failFastTimeout time.Duration
// Connection Pool
PoolSize int `default:"2" yaml:"pool_size" json:"pool_size,omitempty"`
PoolTTL int `default:"180" yaml:"pool_ttl" json:"pool_ttl,omitempty"`
// session tcp parameters
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
}
)
func (c *GettySessionParam) CheckValidity() error {
var err error
if c.keepAlivePeriod, err = time.ParseDuration(c.KeepAlivePeriod); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(KeepAlivePeriod{%#v})", c.KeepAlivePeriod)
}
if c.tcpReadTimeout, err = time.ParseDuration(c.TcpReadTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(TcpReadTimeout{%#v})", c.TcpReadTimeout)
}
if c.tcpWriteTimeout, err = time.ParseDuration(c.TcpWriteTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(TcpWriteTimeout{%#v})", c.TcpWriteTimeout)
}
if c.waitTimeout, err = time.ParseDuration(c.WaitTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(WaitTimeout{%#v})", c.WaitTimeout)
}
return nil
}
func (c *ClientConfig) CheckValidity() error {
var err error
if c.heartbeatPeriod, err = time.ParseDuration(c.HeartbeatPeriod); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(HeartbeatPeroid{%#v})", c.HeartbeatPeriod)
}
if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout)
}
if c.failFastTimeout, err = time.ParseDuration(c.FailFastTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(FailFastTimeout{%#v})", c.FailFastTimeout)
}
return jerrors.Trace(c.GettySessionParam.CheckValidity())
}
func (c *ServerConfig) CheckValidity() error {
var err error
if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout)
}
if c.failFastTimeout, err = time.ParseDuration(c.FailFastTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(FailFastTimeout{%#v})", c.FailFastTimeout)
}
return jerrors.Trace(c.GettySessionParam.CheckValidity())
}
package dubbo
import (
"context"
"reflect"
"sync"
"time"
)
import (
"github.com/AlexStocks/getty"
log "github.com/AlexStocks/log4go"
"github.com/dubbogo/hessian2"
jerrors "github.com/juju/errors"
)
// todo: WritePkg_Timeout will entry *.yml
const WritePkg_Timeout = 5 * time.Second
var (
errTooManySessions = jerrors.New("too many sessions")
)
type rpcSession struct {
session getty.Session
reqNum int32
}
////////////////////////////////////////////
// RpcClientHandler
////////////////////////////////////////////
type RpcClientHandler struct {
conn *gettyRPCClient
}
func NewRpcClientHandler(client *gettyRPCClient) *RpcClientHandler {
return &RpcClientHandler{conn: client}
}
func (h *RpcClientHandler) OnOpen(session getty.Session) error {
h.conn.addSession(session)
return nil
}
func (h *RpcClientHandler) OnError(session getty.Session, err error) {
log.Info("session{%s} got error{%v}, will be closed.", session.Stat(), err)
h.conn.removeSession(session)
}
func (h *RpcClientHandler) OnClose(session getty.Session) {
log.Info("session{%s} is closing......", session.Stat())
h.conn.removeSession(session)
}
func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
p, ok := pkg.(*DubboPackage)
if !ok {
log.Error("illegal package")
return
}
if p.Header.Type&hessian.Heartbeat != 0x00 {
log.Debug("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body)
return
}
log.Debug("get rpc response{header: %#v, body: %#v}", p.Header, p.Body)
h.conn.updateSession(session)
pendingResponse := h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID))
if pendingResponse == nil {
return
}
if p.Err != nil {
pendingResponse.err = p.Err
}
if pendingResponse.callback == nil {
pendingResponse.done <- struct{}{}
} else {
pendingResponse.callback(pendingResponse.GetCallResponse())
}
}
func (h *RpcClientHandler) OnCron(session getty.Session) {
rpcSession, err := h.conn.getClientRpcSession(session)
if err != nil {
log.Error("client.getClientSession(session{%s}) = error{%s}",
session.Stat(), jerrors.ErrorStack(err))
return
}
if h.conn.pool.rpcClient.conf.sessionTimeout.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() {
log.Warn("session{%s} timeout{%s}, reqNum{%d}",
session.Stat(), time.Since(session.GetActive()).String(), rpcSession.reqNum)
h.conn.removeSession(session) // -> h.conn.close() -> h.conn.pool.remove(h.conn)
return
}
h.conn.pool.rpcClient.heartbeat(session)
}
////////////////////////////////////////////
// RpcServerHandler
////////////////////////////////////////////
type RpcServerHandler struct {
maxSessionNum int
sessionTimeout time.Duration
sessionMap map[getty.Session]*rpcSession
rwlock sync.RWMutex
}
func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler {
return &RpcServerHandler{
maxSessionNum: maxSessionNum,
sessionTimeout: sessionTimeout,
sessionMap: make(map[getty.Session]*rpcSession),
}
}
func (h *RpcServerHandler) OnOpen(session getty.Session) error {
var err error
h.rwlock.RLock()
if h.maxSessionNum <= len(h.sessionMap) {
err = errTooManySessions
}
h.rwlock.RUnlock()
if err != nil {
return jerrors.Trace(err)
}
log.Info("got session:%s", session.Stat())
h.rwlock.Lock()
h.sessionMap[session] = &rpcSession{session: session}
h.rwlock.Unlock()
return nil
}
func (h *RpcServerHandler) OnError(session getty.Session, err error) {
log.Info("session{%s} got error{%v}, will be closed.", session.Stat(), err)
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
}
func (h *RpcServerHandler) OnClose(session getty.Session) {
log.Info("session{%s} is closing......", session.Stat())
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
}
func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
h.rwlock.Lock()
if _, ok := h.sessionMap[session]; ok {
h.sessionMap[session].reqNum++
}
h.rwlock.Unlock()
p, ok := pkg.(*DubboPackage)
if !ok {
log.Error("illegal packge{%#v}", pkg)
return
}
p.Header.ResponseStatus = hessian.Response_OK
// heartbeat
if p.Header.Type&hessian.Heartbeat != 0x00 {
log.Debug("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body)
h.reply(session, p, hessian.Heartbeat)
return
}
// twoway
if p.Header.Type&hessian.Request_TwoWay == 0x00 {
h.reply(session, p, hessian.Response)
h.callService(p, nil)
return
}
h.callService(p, nil)
h.reply(session, p, hessian.Response)
}
func (h *RpcServerHandler) OnCron(session getty.Session) {
var (
flag bool
active time.Time
)
h.rwlock.RLock()
if _, ok := h.sessionMap[session]; ok {
active = session.GetActive()
if h.sessionTimeout.Nanoseconds() < time.Since(active).Nanoseconds() {
flag = true
log.Warn("session{%s} timeout{%s}, reqNum{%d}",
session.Stat(), time.Since(active).String(), h.sessionMap[session].reqNum)
}
}
h.rwlock.RUnlock()
if flag {
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
session.Close()
}
}
func (h *RpcServerHandler) callService(req *DubboPackage, ctx context.Context) {
defer func() {
if e := recover(); e != nil {
req.Header.ResponseStatus = hessian.Response_BAD_REQUEST
if err, ok := e.(error); ok {
log.Error("callService panic: %#v", err)
req.Body = e.(error)
} else if err, ok := e.(string); ok {
log.Error("callService panic: %#v", jerrors.New(err))
req.Body = jerrors.New(err)
} else {
log.Error("callService panic: %#v", e)
req.Body = e
}
}
}()
svc := req.Body.(map[string]interface{})["service"].(*service)
method := svc.method[req.Service.Method]
// prepare argv
var argv reflect.Value
argIsValue := false // if true, need to indirect before calling.
if method.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(method.ArgType.Elem())
} else {
argv = reflect.New(method.ArgType)
argIsValue = true
}
argvTmp := argv.Interface()
argvTmp = req.Body.(map[string]interface{})["args"] // type is []interface
if argIsValue {
argv = argv.Elem()
}
// prepare replyv
replyv := reflect.New(method.ReplyType.Elem())
var returnValues []reflect.Value
if method.CtxType == nil {
returnValues = method.method.Func.Call([]reflect.Value{svc.rcvr, reflect.ValueOf(argvTmp), reflect.ValueOf(replyv.Interface())})
} else {
if contextv := reflect.ValueOf(ctx); contextv.IsValid() {
returnValues = method.method.Func.Call([]reflect.Value{svc.rcvr, contextv, reflect.ValueOf(argvTmp), reflect.ValueOf(replyv.Interface())})
} else {
returnValues = method.method.Func.Call([]reflect.Value{svc.rcvr, reflect.Zero(method.CtxType), reflect.ValueOf(argvTmp), reflect.ValueOf(replyv.Interface())})
}
}
// The return value for the method is an error.
if retErr := returnValues[0].Interface(); retErr != nil {
req.Header.ResponseStatus = hessian.Response_SERVER_ERROR
req.Body = retErr.(error)
} else {
req.Body = replyv.Interface()
}
}
func (h *RpcServerHandler) reply(session getty.Session, req *DubboPackage, tp hessian.PackgeType) {
resp := &DubboPackage{
Header: hessian.DubboHeader{
SerialID: req.Header.SerialID,
Type: tp,
ID: req.Header.ID,
ResponseStatus: req.Header.ResponseStatus,
},
}
if req.Header.Type&hessian.Request != 0x00 {
resp.Body = req.Body
} else {
resp.Body = nil
}
if err := session.WritePkg(resp, WritePkg_Timeout); err != nil {
log.Error("WritePkg error: %#v, %#v", jerrors.Trace(err), req.Header)
}
}
package dubbo
import (
"fmt"
"math/rand"
"net"
"strings"
"sync"
"time"
)
import (
"github.com/AlexStocks/getty"
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
)
type gettyRPCClient struct {
once sync.Once
protocol string
addr string
created int64 // 为0,则说明没有被创建或者被销毁了
pool *gettyRPCClientPool
lock sync.RWMutex
gettyClient getty.Client
sessions []*rpcSession
}
var (
errClientPoolClosed = jerrors.New("client pool closed")
)
func newGettyRPCClientConn(pool *gettyRPCClientPool, protocol, addr string) (*gettyRPCClient, error) {
c := &gettyRPCClient{
protocol: protocol,
addr: addr,
pool: pool,
gettyClient: getty.NewTCPClient(
getty.WithServerAddress(addr),
getty.WithConnectionNumber((int)(pool.rpcClient.conf.ConnectionNum)),
),
}
c.gettyClient.RunEventLoop(c.newSession)
idx := 1
for {
idx++
if c.isAvailable() {
break
}
if idx > 5000 {
return nil, jerrors.New(fmt.Sprintf("failed to create client connection to %s in 5 seconds", addr))
}
time.Sleep(1e6)
}
log.Info("client init ok")
c.created = time.Now().Unix()
return c, nil
}
func (c *gettyRPCClient) newSession(session getty.Session) error {
var (
ok bool
tcpConn *net.TCPConn
conf ClientConfig
)
conf = c.pool.rpcClient.conf
if conf.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
}
tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay)
tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive)
if conf.GettySessionParam.TcpKeepAlive {
tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod)
}
tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcClientPackageHandler(c.pool.rpcClient))
session.SetEventListener(NewRpcClientHandler(c))
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
log.Debug("client new session:%s\n", session.Stat())
return nil
}
func (c *gettyRPCClient) selectSession() getty.Session {
c.lock.RLock()
defer c.lock.RUnlock()
if c.sessions == nil {
return nil
}
count := len(c.sessions)
if count == 0 {
return nil
}
return c.sessions[rand.Int31n(int32(count))].session
}
func (c *gettyRPCClient) addSession(session getty.Session) {
log.Debug("add session{%s}", session.Stat())
if session == nil {
return
}
c.lock.Lock()
c.sessions = append(c.sessions, &rpcSession{session: session})
c.lock.Unlock()
}
func (c *gettyRPCClient) removeSession(session getty.Session) {
if session == nil {
return
}
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return
}
for i, s := range c.sessions {
if s.session == session {
c.sessions = append(c.sessions[:i], c.sessions[i+1:]...)
log.Debug("delete session{%s}, its index{%d}", session.Stat(), i)
break
}
}
log.Info("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions))
if len(c.sessions) == 0 {
c.close() // -> pool.remove(c)
}
}
func (c *gettyRPCClient) updateSession(session getty.Session) {
if session == nil {
return
}
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return
}
for i, s := range c.sessions {
if s.session == session {
c.sessions[i].reqNum++
break
}
}
}
func (c *gettyRPCClient) getClientRpcSession(session getty.Session) (rpcSession, error) {
var (
err error
rpcSession rpcSession
)
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return rpcSession, errClientClosed
}
err = errSessionNotExist
for _, s := range c.sessions {
if s.session == session {
rpcSession = *s
err = nil
break
}
}
return rpcSession, jerrors.Trace(err)
}
func (c *gettyRPCClient) isAvailable() bool {
if c.selectSession() == nil {
return false
}
return true
}
func (c *gettyRPCClient) close() error {
err := jerrors.Errorf("close gettyRPCClient{%#v} again", c)
c.once.Do(func() {
// delete @c from client pool
c.pool.remove(c)
for _, s := range c.sessions {
log.Info("close client session{%s, last active:%s, request number:%d}",
s.session.Stat(), s.session.GetActive().String(), s.reqNum)
s.session.Close()
}
c.gettyClient.Close()
c.gettyClient = nil
c.sessions = c.sessions[:0]
c.created = 0
err = nil
})
return err
}
type gettyRPCClientPool struct {
rpcClient *Client
size int // []*gettyRPCClient数组的size
ttl int64 // 每个gettyRPCClient的有效期时间. pool对象会在getConn时执行ttl检查
sync.Mutex
connMap map[string][]*gettyRPCClient // 从[]*gettyRPCClient 可见key是连接地址,而value是对应这个地址的连接数组
}
func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientPool {
return &gettyRPCClientPool{
rpcClient: rpcClient,
size: size,
ttl: int64(ttl.Seconds()),
connMap: make(map[string][]*gettyRPCClient),
}
}
func (p *gettyRPCClientPool) close() {
p.Lock()
connMap := p.connMap
p.connMap = nil
p.Unlock()
for _, connArray := range connMap {
for _, conn := range connArray {
conn.close()
}
}
}
func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPCClient, error) {
var builder strings.Builder
builder.WriteString(addr)
builder.WriteString("@")
builder.WriteString(protocol)
key := builder.String()
p.Lock()
defer p.Unlock()
if p.connMap == nil {
return nil, errClientPoolClosed
}
connArray := p.connMap[key]
now := time.Now().Unix()
for len(connArray) > 0 {
conn := connArray[len(connArray)-1]
connArray = connArray[:len(connArray)-1]
p.connMap[key] = connArray
if d := now - conn.created; d > p.ttl {
conn.close() // -> pool.remove(c)
continue
}
return conn, nil
}
// create new conn
return newGettyRPCClientConn(p, protocol, addr)
}
func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) {
if conn == nil || conn.created == 0 {
return
}
if err != nil {
conn.close()
return
}
var builder strings.Builder
builder.WriteString(conn.addr)
builder.WriteString("@")
builder.WriteString(conn.protocol)
key := builder.String()
p.Lock()
defer p.Unlock()
if p.connMap == nil {
return
}
connArray := p.connMap[key]
if len(connArray) >= p.size {
conn.close()
return
}
p.connMap[key] = append(connArray, conn)
}
func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) {
if conn == nil || conn.created == 0 {
return
}
var builder strings.Builder
builder.WriteString(conn.addr)
builder.WriteString("@")
builder.WriteString(conn.protocol)
key := builder.String()
p.Lock()
defer p.Unlock()
if p.connMap == nil {
return
}
connArray := p.connMap[key]
if len(connArray) > 0 {
for idx, c := range connArray {
if conn == c {
p.connMap[key] = append(connArray[:idx], connArray[idx+1:]...)
break
}
}
}
}
package dubbo
import (
"bytes"
"reflect"
)
import (
"github.com/AlexStocks/getty"
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
)
////////////////////////////////////////////
// RpcClientPackageHandler
////////////////////////////////////////////
type RpcClientPackageHandler struct {
client *Client
}
func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler {
return &RpcClientPackageHandler{client: client}
}
func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
pkg := &DubboPackage{
Body: p.client.pendingResponses[SequenceType(int64(p.client.sequence.Load()))].reply,
}
buf := bytes.NewBuffer(data)
err := pkg.Unmarshal(buf)
if err != nil {
pkg.Err = jerrors.Trace(err) // client will get this err
return pkg, len(data), nil
}
return pkg, len(data), nil
}
func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) error {
req, ok := pkg.(*DubboPackage)
if !ok {
log.Error("illegal pkg:%+v\n", pkg)
return jerrors.New("invalid rpc request")
}
buf, err := req.Marshal()
if err != nil {
log.Warn("binary.Write(req{%#v}) = err{%#v}", req, jerrors.ErrorStack(err))
return jerrors.Trace(err)
}
return jerrors.Trace(ss.WriteBytes(buf.Bytes()))
}
////////////////////////////////////////////
// RpcServerPackageHandler
////////////////////////////////////////////
type RpcServerPackageHandler struct {
server *Server
srvMap serviceMap
}
func NewRpcServerPackageHandler(server *Server, srvMap serviceMap) *RpcServerPackageHandler {
return &RpcServerPackageHandler{
server: server,
srvMap: srvMap,
}
}
func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
pkg := &DubboPackage{
Body: make([]interface{}, 7),
}
buf := bytes.NewBuffer(data)
err := pkg.Unmarshal(buf)
if err != nil {
return nil, 0, jerrors.Trace(err)
}
// convert params of request
req := pkg.Body.([]interface{}) // length of body should be 7
if len(req) > 0 {
var dubboVersion, argsTypes string
var args []interface{}
var attachments map[interface{}]interface{}
if req[0] != nil {
dubboVersion = req[0].(string)
}
if req[1] != nil {
pkg.Service.Target = req[1].(string)
}
if req[2] != nil {
pkg.Service.Version = req[2].(string)
}
if req[3] != nil {
pkg.Service.Method = req[3].(string)
}
if req[4] != nil {
argsTypes = req[4].(string)
}
if req[5] != nil {
args = req[5].([]interface{})
}
if req[6] != nil {
attachments = req[6].(map[interface{}]interface{})
}
pkg.Body = map[string]interface{}{
"dubboVersion": dubboVersion,
"argsTypes": argsTypes,
"args": args,
"service": p.srvMap[pkg.Service.Target],
"attachments": attachments,
}
}
return pkg, len(data), nil
}
func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) error {
res, ok := pkg.(*DubboPackage)
if !ok {
log.Error("illegal pkg:%+v\n, it is %+v", pkg, reflect.TypeOf(pkg))
return jerrors.New("invalid rpc response")
}
buf, err := res.Marshal()
if err != nil {
log.Warn("binary.Write(res{%#v}) = err{%#v}", res, jerrors.ErrorStack(err))
return jerrors.Trace(err)
}
return jerrors.Trace(ss.WriteBytes(buf.Bytes()))
}
package dubbo
import (
"reflect"
"sync"
"unicode"
"unicode/utf8"
)
import (
log "github.com/AlexStocks/log4go"
)
var (
typeOfError = reflect.TypeOf((*error)(nil)).Elem()
)
type GettyRPCService interface {
Service() string // Service Interface
Version() string
}
type methodType struct {
sync.Mutex
method reflect.Method
CtxType reflect.Type // type of the request context
ArgType reflect.Type
ReplyType reflect.Type
}
type service struct {
name string
rcvr reflect.Value
rcvrType reflect.Type
method map[string]*methodType
}
// Is this an exported - upper case - name
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune)
}
// Is this type exported or a builtin?
func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return isExported(t.Name()) || t.PkgPath() == ""
}
// suitableMethods returns suitable Rpc methods of typ
func suitableMethods(typ reflect.Type) (string, map[string]*methodType) {
methods := make(map[string]*methodType)
mts := ""
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
if mt := suiteMethod(method); mt != nil {
methods[method.Name] = mt
mts += method.Name + ","
}
}
return mts, methods
}
// suiteMethod returns a suitable Rpc methodType
func suiteMethod(method reflect.Method) *methodType {
mtype := method.Type
mname := method.Name
// Method must be exported.
if method.PkgPath != "" {
return nil
}
var replyType, argType, ctxType reflect.Type
switch mtype.NumIn() {
case 3:
argType = mtype.In(1)
replyType = mtype.In(2)
case 4:
ctxType = mtype.In(1)
argType = mtype.In(2)
replyType = mtype.In(3)
default:
log.Error("method %s of mtype %v has wrong number of in parameters %d; needs exactly 3/4",
mname, mtype, mtype.NumIn())
return nil
}
// First arg need not be a pointer.
if !isExportedOrBuiltinType(argType) {
log.Error("argument type of method %q is not exported %v", mname, argType)
return nil
}
// Second arg must be a pointer.
if replyType.Kind() != reflect.Ptr {
log.Error("reply type of method %q is not a pointer %v", mname, replyType)
return nil
}
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
log.Error("reply type of method %s not exported{%v}", mname, replyType)
return nil
}
// Method needs one out.
if mtype.NumOut() != 1 {
log.Error("method %q has %d out parameters; needs exactly 1", mname, mtype.NumOut())
return nil
}
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
log.Error("return type %s of method %q is not error", returnType, mname)
return nil
}
return &methodType{method: method, ArgType: argType, ReplyType: replyType, CtxType: ctxType}
}
package dubbo
import (
"fmt"
"github.com/dubbo/dubbo-go/plugins"
"net"
"reflect"
"strconv"
)
import (
"github.com/AlexStocks/getty"
"github.com/AlexStocks/goext/net"
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
)
import (
"github.com/dubbo/dubbo-go/registry"
)
type Option func(*Options)
type Options struct {
Registry registry.Registry
ConfList []ServerConfig
ServiceConfList []registry.ReferenceConfig
}
func newOptions(opt ...Option) Options {
opts := Options{}
for _, o := range opt {
o(&opts)
}
if opts.Registry == nil {
panic("server.Options.Registry is nil")
}
return opts
}
// Registry used for discovery
func Registry(r registry.Registry) Option {
return func(o *Options) {
o.Registry = r
}
}
func ConfList(confList []ServerConfig) Option {
return func(o *Options) {
o.ConfList = confList
for i := 0; i < len(o.ConfList); i++ {
if err := o.ConfList[i].CheckValidity(); err != nil {
log.Error("ServerConfig check failed: ", err)
o.ConfList = []ServerConfig{}
return
}
if o.ConfList[i].IP == "" {
o.ConfList[i].IP, _ = gxnet.GetLocalIP()
}
}
}
}
func ServiceConfList(confList []registry.ReferenceConfig) Option {
return func(o *Options) {
o.ServiceConfList = confList
if o.ServiceConfList == nil {
o.ServiceConfList = []registry.ReferenceConfig{}
}
}
}
type serviceMap map[string]*service
type Server struct {
opts Options
indexOfConfList int
srvs []serviceMap
tcpServerList []getty.Server
}
func NewServer(opts ...Option) *Server {
options := newOptions(opts...)
num := len(options.ConfList)
servers := make([]serviceMap, len(options.ConfList))
for i := 0; i < num; i++ {
servers[i] = map[string]*service{}
}
s := &Server{
opts: options,
srvs: servers,
}
return s
}
// Register export services and register with the registry
func (s *Server) Register(rcvr GettyRPCService) error {
serviceConf := plugins.DefaultProviderServiceConfig()()
opts := s.opts
serviceConf.SetService(rcvr.Service())
serviceConf.SetVersion(rcvr.Version())
flag := false
serviceNum := len(opts.ServiceConfList)
serverNum := len(opts.ConfList)
for i := 0; i < serviceNum; i++ {
if opts.ServiceConfList[i].Service() == serviceConf.Service() &&
opts.ServiceConfList[i].Version() == serviceConf.Version() {
serviceConf.SetProtocol(opts.ServiceConfList[i].Protocol())
serviceConf.SetGroup(opts.ServiceConfList[i].Group())
for j := 0; j < serverNum; j++ {
if opts.ConfList[j].Protocol == serviceConf.Protocol() {
rcvrName := reflect.Indirect(reflect.ValueOf(rcvr)).Type().Name()
svc := &service{
rcvrType: reflect.TypeOf(rcvr),
rcvr: reflect.ValueOf(rcvr),
}
if rcvrName == "" {
s := "rpc.Register: no service name for type " + svc.rcvrType.String()
log.Error(s)
return jerrors.New(s)
}
if !isExported(rcvrName) {
s := "rpc.Register: type " + rcvrName + " is not exported"
log.Error(s)
return jerrors.New(s)
}
svc.name = rcvr.Service() // service name is from 'Service()'
if _, present := s.srvs[j][svc.name]; present {
return jerrors.New("rpc: service already defined: " + svc.name)
}
// Install the methods
mts, methods := suitableMethods(svc.rcvrType)
svc.method = methods
if len(svc.method) == 0 {
// To help the user, see if a pointer receiver would work.
mts, methods = suitableMethods(reflect.PtrTo(svc.rcvrType))
str := "rpc.Register: type " + rcvrName + " has no exported methods of suitable type"
if len(methods) != 0 {
str = "rpc.Register: type " + rcvrName + " has no exported methods of suitable type (" +
"hint: pass a pointer to value of that type)"
}
log.Error(str)
return jerrors.New(str)
}
s.srvs[j][svc.name] = svc
serviceConf.SetMethods(mts)
serviceConf.SetPath(opts.ConfList[j].Address())
err := opts.Registry.Register(serviceConf)
if err != nil {
return err
}
flag = true
}
}
}
}
if !flag {
return jerrors.Errorf("fail to register Handler{service:%s, version:%s}",
serviceConf.Service, serviceConf.Version)
}
return nil
}
func (s *Server) newSession(session getty.Session) error {
var (
ok bool
tcpConn *net.TCPConn
)
conf := s.opts.ConfList[s.indexOfConfList]
if conf.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
}
tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay)
tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive)
if conf.GettySessionParam.TcpKeepAlive {
tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod)
}
tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcServerPackageHandler(s, s.srvs[s.indexOfConfList]))
session.SetEventListener(NewRpcServerHandler(conf.SessionNumber, conf.sessionTimeout))
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
log.Debug("app accepts new session:%s\n", session.Stat())
return nil
}
func (s *Server) Start() {
var (
addr string
tcpServer getty.Server
)
if len(s.opts.ConfList) == 0 {
panic("ConfList is nil")
}
for i := 0; i < len(s.opts.ConfList); i++ {
addr = gxnet.HostAddress2(s.opts.ConfList[i].IP, strconv.Itoa(s.opts.ConfList[i].Port))
tcpServer = getty.NewTCPServer(
getty.WithLocalAddress(addr),
)
s.indexOfConfList = i
tcpServer.RunEventLoop(s.newSession)
log.Debug("s bind addr{%s} ok!", addr)
s.tcpServerList = append(s.tcpServerList, tcpServer)
}
}
func (s *Server) Stop() {
list := s.tcpServerList
s.tcpServerList = nil
if list != nil {
for _, tcpServer := range list {
tcpServer.Close()
}
}
}
...@@ -4,22 +4,28 @@ import ( ...@@ -4,22 +4,28 @@ import (
"context" "context"
) )
import (
log "github.com/AlexStocks/log4go"
)
import ( import (
"github.com/dubbo/dubbo-go/protocol" "github.com/dubbo/dubbo-go/protocol"
) )
// wrapping invoker // wrapping invoker
type DubboExporter struct { type DubboExporter struct {
ctx context.Context ctx context.Context
key string key string
invoker protocol.Invoker invoker protocol.Invoker
exporterMap map[string]protocol.Exporter
} }
func NewDubboExporter(ctx context.Context, key string, invoker protocol.Invoker) *DubboExporter { func NewDubboExporter(ctx context.Context, key string, invoker protocol.Invoker, exporterMap map[string]protocol.Exporter) *DubboExporter {
return &DubboExporter{ return &DubboExporter{
ctx: ctx, ctx: ctx,
key: key, key: key,
invoker: invoker, invoker: invoker,
exporterMap: exporterMap,
} }
} }
...@@ -29,5 +35,7 @@ func (de *DubboExporter) GetInvoker() protocol.Invoker { ...@@ -29,5 +35,7 @@ func (de *DubboExporter) GetInvoker() protocol.Invoker {
} }
func (de *DubboExporter) Unexport() { func (de *DubboExporter) Unexport() {
log.Info("DubboExporter unexport.")
de.invoker.Destroy() de.invoker.Destroy()
delete(de.exporterMap, de.key)
} }
...@@ -4,9 +4,14 @@ import ( ...@@ -4,9 +4,14 @@ import (
"context" "context"
"errors" "errors"
"strconv" "strconv"
"sync"
"time" "time"
) )
import (
log "github.com/AlexStocks/log4go"
)
import ( import (
"github.com/dubbo/dubbo-go/common/constant" "github.com/dubbo/dubbo-go/common/constant"
"github.com/dubbo/dubbo-go/config" "github.com/dubbo/dubbo-go/config"
...@@ -16,16 +21,21 @@ import ( ...@@ -16,16 +21,21 @@ import (
var Err_No_Reply = errors.New("no reply") var Err_No_Reply = errors.New("no reply")
type DubboInvoker struct { type DubboInvoker struct {
ctx context.Context ctx context.Context
url config.URL url config.URL
client *Client client *Client
available bool
destroyed bool
destroyLock sync.Mutex
} }
func NewDubboInvoker(ctx context.Context, url config.URL, client *Client) *DubboInvoker { func NewDubboInvoker(ctx context.Context, url config.URL, client *Client) *DubboInvoker {
return &DubboInvoker{ return &DubboInvoker{
ctx: ctx, ctx: ctx,
url: url, url: url,
client: client, client: client,
available: true,
destroyed: false,
} }
} }
...@@ -41,21 +51,34 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { ...@@ -41,21 +51,34 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
// async // async
async, err := strconv.ParseBool(inv.AttachmentsByKey(constant.ASYNC_KEY, "false")) async, err := strconv.ParseBool(inv.AttachmentsByKey(constant.ASYNC_KEY, "false"))
if err != nil { if err != nil {
async = true log.Error("ParseBool - error: %v", err)
async = false
} }
if async { if async {
err = di.client.CallOneway(url.Location, *url, inv.MethodName(), if callBack, ok := inv.CallBack().(func(response CallResponse)); ok {
WithCallRequestTimeout(inv.Params()["requestTimeout"].(time.Duration)), result.Err = di.client.AsyncCall(url.Location, *url, inv.MethodName(), inv.Reply(), callBack,
WithCallResponseTimeout(inv.Params()["responseTimeout"].(time.Duration)), WithCallSerialID(inv.Params()["serialID"].(SerialID)), WithCallRequestTimeout(inv.Params()["requestTimeout"].(time.Duration)),
WithCallMeta_All(inv.Params()["callMeta"].(map[interface{}]interface{}))) WithCallResponseTimeout(inv.Params()["responseTimeout"].(time.Duration)),
WithCallSerialID(inv.Params()["serialID"].(SerialID)),
WithCallMeta_All(inv.Params()["callMeta"].(map[interface{}]interface{})))
} else {
result.Err = di.client.CallOneway(url.Location, *url, inv.MethodName(),
WithCallRequestTimeout(inv.Params()["requestTimeout"].(time.Duration)),
WithCallResponseTimeout(inv.Params()["responseTimeout"].(time.Duration)),
WithCallSerialID(inv.Params()["serialID"].(SerialID)),
WithCallMeta_All(inv.Params()["callMeta"].(map[interface{}]interface{})))
}
} else { } else {
if inv.Reply() == nil { if inv.Reply() == nil {
result.Err = Err_No_Reply result.Err = Err_No_Reply
} else {
result.Err = di.client.Call(url.Location, *url, inv.MethodName(), inv.Reply(),
WithCallRequestTimeout(inv.Params()["requestTimeout"].(time.Duration)),
WithCallResponseTimeout(inv.Params()["responseTimeout"].(time.Duration)),
WithCallSerialID(inv.Params()["serialID"].(SerialID)),
WithCallMeta_All(inv.Params()["callMeta"].(map[interface{}]interface{})))
result.Rest = inv.Reply() // reply should be set to result.Rest when sync
} }
err = di.client.Call(url.Location, *url, inv.MethodName(), inv.Reply(),
WithCallRequestTimeout(inv.Params()["requestTimeout"].(time.Duration)),
WithCallResponseTimeout(inv.Params()["responseTimeout"].(time.Duration)), WithCallSerialID(inv.Params()["serialID"].(SerialID)),
WithCallMeta_All(inv.Params()["callMeta"].(map[interface{}]interface{})))
} }
return result return result
...@@ -66,9 +89,18 @@ func (di *DubboInvoker) GetUrl() config.IURL { ...@@ -66,9 +89,18 @@ func (di *DubboInvoker) GetUrl() config.IURL {
} }
func (di *DubboInvoker) IsAvailable() bool { func (di *DubboInvoker) IsAvailable() bool {
return true return di.available
} }
func (di *DubboInvoker) Destroy() { func (di *DubboInvoker) Destroy() {
//todo: if di.destroyed {
return
}
di.destroyLock.Lock()
defer di.destroyLock.Unlock()
di.destroyed = true
di.available = false
di.client.Close() // close client
} }
...@@ -32,7 +32,7 @@ func NewDubboProtocol(ctx context.Context) *DubboProtocol { ...@@ -32,7 +32,7 @@ func NewDubboProtocol(ctx context.Context) *DubboProtocol {
func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
url := invoker.GetUrl().(*config.URL) url := invoker.GetUrl().(*config.URL)
serviceKey := url.Key() serviceKey := url.Key()
exporter := NewDubboExporter(nil, serviceKey, invoker) exporter := NewDubboExporter(nil, serviceKey, invoker, dp.exporterMap)
dp.exporterMap[serviceKey] = exporter dp.exporterMap[serviceKey] = exporter
log.Info("Export service: ", url.String()) log.Info("Export service: ", url.String())
...@@ -49,7 +49,8 @@ func (dp *DubboProtocol) Refer(url config.URL) protocol.Invoker { ...@@ -49,7 +49,8 @@ func (dp *DubboProtocol) Refer(url config.URL) protocol.Invoker {
} }
func (dp *DubboProtocol) Destroy() { func (dp *DubboProtocol) Destroy() {
log.Info("DubboProtocol destroy.")
srv.Stop() // stop server
} }
func (dp *DubboProtocol) openServer(url config.URL) { func (dp *DubboProtocol) openServer(url config.URL) {
......
...@@ -250,13 +250,8 @@ func (p *gettyRPCClientPool) close() { ...@@ -250,13 +250,8 @@ func (p *gettyRPCClientPool) close() {
} }
func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPCClient, error) { func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPCClient, error) {
var builder strings.Builder
builder.WriteString(addr) key := GenerateEndpointAddr(protocol, addr)
builder.WriteString("@")
builder.WriteString(protocol)
key := builder.String()
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
...@@ -293,13 +288,7 @@ func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) { ...@@ -293,13 +288,7 @@ func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) {
return return
} }
var builder strings.Builder key := GenerateEndpointAddr(conn.protocol, conn.addr)
builder.WriteString(conn.addr)
builder.WriteString("@")
builder.WriteString(conn.protocol)
key := builder.String()
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
...@@ -320,13 +309,7 @@ func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) { ...@@ -320,13 +309,7 @@ func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) {
return return
} }
var builder strings.Builder key := GenerateEndpointAddr(conn.protocol, conn.addr)
builder.WriteString(conn.addr)
builder.WriteString("@")
builder.WriteString(conn.protocol)
key := builder.String()
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
...@@ -344,3 +327,13 @@ func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) { ...@@ -344,3 +327,13 @@ func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) {
} }
} }
} }
func GenerateEndpointAddr(protocol, addr string) string {
var builder strings.Builder
builder.WriteString(protocol)
builder.WriteString("://")
builder.WriteString(addr)
return builder.String()
}
...@@ -105,7 +105,7 @@ func (s *Server) Start(url config.URL) { ...@@ -105,7 +105,7 @@ func (s *Server) Start(url config.URL) {
tcpServer getty.Server tcpServer getty.Server
) )
addr = url.Location() addr = url.Location
tcpServer = getty.NewTCPServer( tcpServer = getty.NewTCPServer(
getty.WithLocalAddress(addr), getty.WithLocalAddress(addr),
) )
......
...@@ -17,24 +17,27 @@ type Invocation interface { ...@@ -17,24 +17,27 @@ type Invocation interface {
///////////////////////////// /////////////////////////////
// Invocation Impletment of RPC // Invocation Impletment of RPC
///////////////////////////// /////////////////////////////
// todo: is it necessary to separate fields of consumer(provider) from RPCInvocation
type RPCInvocation struct { type RPCInvocation struct {
methodName string methodName string
parameterTypes []reflect.Type parameterTypes []reflect.Type
arguments []interface{} arguments []interface{}
reply interface{} reply interface{}
callBack interface{}
attachments map[string]string attachments map[string]string
invoker Invoker invoker Invoker
params map[string]interface{} // Store some parameters that are not easy to refine params map[string]interface{} // Store some parameters that are not easy to refine
} }
// todo: arguments table is too many
func NewRPCInvocation(methodName string, parameterTypes []reflect.Type, arguments []interface{}, func NewRPCInvocation(methodName string, parameterTypes []reflect.Type, arguments []interface{},
reply interface{}, attachments map[string]string, invoker Invoker, params map[string]interface{}) *RPCInvocation { reply interface{}, callBack interface{}, attachments map[string]string, invoker Invoker, params map[string]interface{}) *RPCInvocation {
return &RPCInvocation{ return &RPCInvocation{
methodName: methodName, methodName: methodName,
parameterTypes: parameterTypes, parameterTypes: parameterTypes,
arguments: arguments, arguments: arguments,
reply: reply, reply: reply,
callBack: callBack,
attachments: attachments, attachments: attachments,
invoker: invoker, invoker: invoker,
params: params, params: params,
...@@ -79,3 +82,7 @@ func (r *RPCInvocation) SetInvoker() Invoker { ...@@ -79,3 +82,7 @@ func (r *RPCInvocation) SetInvoker() Invoker {
func (r *RPCInvocation) Params() map[string]interface{} { func (r *RPCInvocation) Params() map[string]interface{} {
return r.params return r.params
} }
func (r *RPCInvocation) CallBack() interface{} {
return r.callBack
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment