diff --git a/cluster/cluster.go b/cluster/cluster.go index 8e9da20763347e1500b454e6e3a670cfdaddf4a9..bb9f89d0fc54352672995de3442c78bf77dbb2e8 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -3,5 +3,5 @@ package cluster import "github.com/dubbo/dubbo-go/protocol" type Cluster interface { - Join(Directory)protocol.Invoker + Join(Directory) protocol.Invoker } diff --git a/cluster/support/failover_cluster.go b/cluster/support/failover_cluster.go index 1f2cfbc59add599284aa07afa475e82333ce2558..e920051683da68d0fe215ba6d4b5eca2ea0b519c 100644 --- a/cluster/support/failover_cluster.go +++ b/cluster/support/failover_cluster.go @@ -10,10 +10,11 @@ import ( type FailoverCluster struct { context context.Context } + const name = "failover" -func init(){ - extension.SetCluster(name,NewFailoverCluster) +func init() { + extension.SetCluster(name, NewFailoverCluster) } func NewFailoverCluster(ctx context.Context) cluster.Cluster { diff --git a/dubbo/client.go b/dubbo/client.go deleted file mode 100644 index ac508edea915ecee5ca3d59a45d4c664d07e2cfd..0000000000000000000000000000000000000000 --- a/dubbo/client.go +++ /dev/null @@ -1,279 +0,0 @@ -package dubbo - -import ( - "math/rand" - "strings" - "sync" - "time" -) - -import ( - "github.com/AlexStocks/getty" - "github.com/AlexStocks/goext/sync/atomic" - "github.com/dubbogo/hessian2" - jerrors "github.com/juju/errors" -) - -import ( - "github.com/dubbo/dubbo-go/public" - "github.com/dubbo/dubbo-go/registry" -) - -var ( - errInvalidCodecType = jerrors.New("illegal CodecType") - errInvalidAddress = jerrors.New("remote address invalid or empty") - errSessionNotExist = jerrors.New("session not exist") - errClientClosed = jerrors.New("client closed") - errClientReadTimeout = jerrors.New("client read timeout") -) - -func init() { - rand.Seed(time.Now().UnixNano()) -} - -type CallOptions struct { - // request timeout - RequestTimeout time.Duration - // response timeout - ResponseTimeout time.Duration - // serial ID - SerialID SerialID - Meta map[interface{}]interface{} -} - -type CallOption func(*CallOptions) - -func WithCallRequestTimeout(d time.Duration) CallOption { - return func(o *CallOptions) { - o.RequestTimeout = d - } -} - -func WithCallResponseTimeout(d time.Duration) CallOption { - return func(o *CallOptions) { - o.ResponseTimeout = d - } -} - -func WithCallSerialID(s SerialID) CallOption { - return func(o *CallOptions) { - o.SerialID = s - } -} - -func WithCallMeta(k, v interface{}) CallOption { - return func(o *CallOptions) { - if o.Meta == nil { - o.Meta = make(map[interface{}]interface{}) - } - o.Meta[k] = v - } -} - -type CallResponse struct { - Opts CallOptions - Cause error - Start time.Time // invoke(call) start time == write start time - ReadStart time.Time // read start time, write duration = ReadStart - Start - Reply interface{} -} - -type AsyncCallback func(response CallResponse) - -type Client struct { - conf ClientConfig - pool *gettyRPCClientPool - sequence gxatomic.Uint64 - - pendingLock sync.RWMutex - pendingResponses map[SequenceType]*PendingResponse -} - -func NewClient(conf *ClientConfig) (*Client, error) { - if err := conf.CheckValidity(); err != nil { - return nil, jerrors.Trace(err) - } - - c := &Client{ - pendingResponses: make(map[SequenceType]*PendingResponse), - conf: *conf, - } - c.pool = newGettyRPCClientConnPool(c, conf.PoolSize, time.Duration(int(time.Second)*conf.PoolTTL)) - - return c, nil -} - -// call one way -func (c *Client) CallOneway(addr string, svcUrl config.URL, method string, args interface{}, opts ...CallOption) error { - var copts CallOptions - - for _, o := range opts { - o(&copts) - } - - return jerrors.Trace(c.call(CT_OneWay, addr, svcUrl, method, args, nil, nil, copts)) -} - -// if @reply is nil, the transport layer will get the response without notify the invoker. -func (c *Client) Call(addr string, svcUrl config.URL, method string, args, reply interface{}, opts ...CallOption) error { - var copts CallOptions - - for _, o := range opts { - o(&copts) - } - - ct := CT_TwoWay - if reply == nil { - ct = CT_OneWay - } - - return jerrors.Trace(c.call(ct, addr, svcUrl, method, args, reply, nil, copts)) -} - -func (c *Client) AsyncCall(addr string, svcUrl config.URL, method string, args interface{}, - callback AsyncCallback, reply interface{}, opts ...CallOption) error { - - var copts CallOptions - for _, o := range opts { - o(&copts) - } - - return jerrors.Trace(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback, copts)) -} - -func (c *Client) call(ct CallType, addr string, svcUrl config.URL, method string, - args, reply interface{}, callback AsyncCallback, opts CallOptions) error { - - if opts.RequestTimeout == 0 { - opts.RequestTimeout = c.conf.GettySessionParam.tcpWriteTimeout - } - if opts.ResponseTimeout == 0 { - opts.ResponseTimeout = c.conf.GettySessionParam.tcpReadTimeout - } - - p := &DubboPackage{} - p.Service.Path = strings.TrimPrefix(svcUrl.Path(), "/") - p.Service.Target = strings.TrimPrefix(svcUrl.Path(), "/") - p.Service.Version = svcUrl.Version() - p.Service.Method = method - p.Service.Timeout = opts.RequestTimeout - if opts.SerialID == 0 { - p.Header.SerialID = byte(S_Dubbo) - } else { - p.Header.SerialID = byte(opts.SerialID) - } - p.Body = args - - var rsp *PendingResponse - if ct != CT_OneWay { - rsp = NewPendingResponse() - rsp.reply = reply - rsp.callback = callback - rsp.opts = opts - } - - var ( - err error - session getty.Session - conn *gettyRPCClient - ) - conn, session, err = c.selectSession(addr) - if err != nil || session == nil { - return errSessionNotExist - } - defer c.pool.release(conn, err) - - if err = c.transfer(session, p, rsp, opts); err != nil { - return jerrors.Trace(err) - } - - if ct == CT_OneWay || callback != nil { - return nil - } - - select { - case <-getty.GetTimeWheel().After(opts.ResponseTimeout): - err = errClientReadTimeout - c.removePendingResponse(SequenceType(rsp.seq)) - case <-rsp.done: - err = rsp.err - } - - return jerrors.Trace(err) -} - -func (c *Client) Close() { - if c.pool != nil { - c.pool.close() - } - c.pool = nil -} - -func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, error) { - rpcClient, err := c.pool.getGettyRpcClient(public.CODECTYPE_DUBBO.String(), addr) - if err != nil { - return nil, nil, jerrors.Trace(err) - } - return rpcClient, rpcClient.selectSession(), nil -} - -func (c *Client) heartbeat(session getty.Session) error { - return c.transfer(session, nil, NewPendingResponse(), CallOptions{}) -} - -func (c *Client) transfer(session getty.Session, pkg *DubboPackage, - rsp *PendingResponse, opts CallOptions) error { - - var ( - sequence uint64 - err error - ) - - sequence = c.sequence.Add(1) - - if pkg == nil { - pkg = &DubboPackage{} - pkg.Body = []interface{}{} - pkg.Header.Type = hessian.Heartbeat - pkg.Header.SerialID = byte(S_Dubbo) - } else { - pkg.Header.Type = hessian.Request - } - pkg.Header.ID = int64(sequence) - - // cond1 - if rsp != nil { - rsp.seq = sequence - c.addPendingResponse(rsp) - } - - err = session.WritePkg(pkg, opts.RequestTimeout) - if err != nil { - c.removePendingResponse(SequenceType(rsp.seq)) - } else if rsp != nil { // cond2 - // cond2 should not merged with cond1. cause the response package may be returned very - // soon and it will be handled by other goroutine. - rsp.readStart = time.Now() - } - - return jerrors.Trace(err) -} - -func (c *Client) addPendingResponse(pr *PendingResponse) { - c.pendingLock.Lock() - defer c.pendingLock.Unlock() - c.pendingResponses[SequenceType(pr.seq)] = pr -} - -func (c *Client) removePendingResponse(seq SequenceType) *PendingResponse { - c.pendingLock.Lock() - defer c.pendingLock.Unlock() - if c.pendingResponses == nil { - return nil - } - if presp, ok := c.pendingResponses[seq]; ok { - delete(c.pendingResponses, seq) - return presp - } - return nil -} diff --git a/dubbo/codec.go b/dubbo/codec.go deleted file mode 100644 index d1a96940a4606377f6e42a122bbb8f91c789c556..0000000000000000000000000000000000000000 --- a/dubbo/codec.go +++ /dev/null @@ -1,107 +0,0 @@ -package dubbo - -import ( - "bufio" - "bytes" - "fmt" - "time" -) - -import ( - "github.com/dubbogo/hessian2" - jerrors "github.com/juju/errors" -) - -// serial ID -type SerialID byte - -const ( - S_Dubbo SerialID = 2 -) - -// call type -type CallType int32 - -const ( - CT_UNKOWN CallType = 0 - CT_OneWay CallType = 1 - CT_TwoWay CallType = 2 -) - -//////////////////////////////////////////// -// dubbo package -//////////////////////////////////////////// - -type SequenceType int64 - -type DubboPackage struct { - Header hessian.DubboHeader - Service hessian.Service - Body interface{} - Err error -} - -func (p DubboPackage) String() string { - return fmt.Sprintf("DubboPackage: Header-%v, Service-%v, Body-%v", p.Header, p.Service, p.Body) -} - -func (p *DubboPackage) Marshal() (*bytes.Buffer, error) { - codec := hessian.NewHessianCodec(nil) - - pkg, err := codec.Write(p.Service, p.Header, p.Body) - if err != nil { - return nil, jerrors.Trace(err) - } - - return bytes.NewBuffer(pkg), nil -} - -func (p *DubboPackage) Unmarshal(buf *bytes.Buffer) error { - codec := hessian.NewHessianCodec(bufio.NewReader(buf)) - - // read header - err := codec.ReadHeader(&p.Header) - if err != nil { - return jerrors.Trace(err) - } - - if p.Header.Type&hessian.Heartbeat != 0x00 { - return nil - } - - // read body - err = codec.ReadBody(p.Body) - return jerrors.Trace(err) -} - -//////////////////////////////////////////// -// PendingResponse -//////////////////////////////////////////// - -type PendingResponse struct { - seq uint64 - err error - start time.Time - readStart time.Time - callback AsyncCallback - reply interface{} - opts CallOptions - done chan struct{} -} - -func NewPendingResponse() *PendingResponse { - return &PendingResponse{ - start: time.Now(), - done: make(chan struct{}), - } -} - -func (r PendingResponse) GetCallResponse() CallResponse { - return CallResponse{ - Opts: r.opts, - Cause: r.err, - Start: r.start, - ReadStart: r.readStart, - Reply: r.reply, - } -} diff --git a/dubbo/config.go b/dubbo/config.go deleted file mode 100644 index 8405d0522772dda6be6190810a2b0a31a6c88531..0000000000000000000000000000000000000000 --- a/dubbo/config.go +++ /dev/null @@ -1,138 +0,0 @@ -package dubbo - -import ( - "time" -) - -import ( - "github.com/dubbo/dubbo-go/server" - jerrors "github.com/juju/errors" -) - -type ( - GettySessionParam struct { - CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"` - TcpNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"` - TcpKeepAlive bool `default:"true" yaml:"tcp_keep_alive" json:"tcp_keep_alive,omitempty"` - KeepAlivePeriod string `default:"180s" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"` - keepAlivePeriod time.Duration - TcpRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"` - TcpWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"` - PkgRQSize int `default:"1024" yaml:"pkg_rq_size" json:"pkg_rq_size,omitempty"` - PkgWQSize int `default:"1024" yaml:"pkg_wq_size" json:"pkg_wq_size,omitempty"` - TcpReadTimeout string `default:"1s" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"` - tcpReadTimeout time.Duration - TcpWriteTimeout string `default:"5s" yaml:"tcp_write_timeout" json:"tcp_write_timeout,omitempty"` - tcpWriteTimeout time.Duration - WaitTimeout string `default:"7s" yaml:"wait_timeout" json:"wait_timeout,omitempty"` - waitTimeout time.Duration - MaxMsgLen int `default:"1024" yaml:"max_msg_len" json:"max_msg_len,omitempty"` - SessionName string `default:"rpc" yaml:"session_name" json:"session_name,omitempty"` - } - - // Config holds supported types by the multiconfig package - ServerConfig struct { - server.ServerConfig - // local address - //AppName string `default:"rpc-server" yaml:"app_name" json:"app_name,omitempty"` - //Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"` - //Ports []string `yaml:"ports" json:"ports,omitempty"` // `default:["10000"]` - //ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"` - - // session - SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"` - sessionTimeout time.Duration - SessionNumber int `default:"1000" yaml:"session_number" json:"session_number,omitempty"` - - // app - FailFastTimeout string `default:"5s" yaml:"fail_fast_timeout" json:"fail_fast_timeout,omitempty"` - failFastTimeout time.Duration - - // session tcp parameters - GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"` - } - - // Config holds supported types by the multiconfig package - ClientConfig struct { - // local address - //AppName string `default:"rpc-client" yaml:"app_name" json:"app_name,omitempty"` - //Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"` - //ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"` - - // session pool - ConnectionNum int `default:"16" yaml:"connection_number" json:"connection_number,omitempty"` - - // heartbeat - HeartbeatPeriod string `default:"15s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"` - heartbeatPeriod time.Duration - - // session - SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"` - sessionTimeout time.Duration - - // app - FailFastTimeout string `default:"5s" yaml:"fail_fast_timeout" json:"fail_fast_timeout,omitempty"` - failFastTimeout time.Duration - - // Connection Pool - PoolSize int `default:"2" yaml:"pool_size" json:"pool_size,omitempty"` - PoolTTL int `default:"180" yaml:"pool_ttl" json:"pool_ttl,omitempty"` - - // session tcp parameters - GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"` - } -) - -func (c *GettySessionParam) CheckValidity() error { - var err error - - if c.keepAlivePeriod, err = time.ParseDuration(c.KeepAlivePeriod); err != nil { - return jerrors.Annotatef(err, "time.ParseDuration(KeepAlivePeriod{%#v})", c.KeepAlivePeriod) - } - - if c.tcpReadTimeout, err = time.ParseDuration(c.TcpReadTimeout); err != nil { - return jerrors.Annotatef(err, "time.ParseDuration(TcpReadTimeout{%#v})", c.TcpReadTimeout) - } - - if c.tcpWriteTimeout, err = time.ParseDuration(c.TcpWriteTimeout); err != nil { - return jerrors.Annotatef(err, "time.ParseDuration(TcpWriteTimeout{%#v})", c.TcpWriteTimeout) - } - - if c.waitTimeout, err = time.ParseDuration(c.WaitTimeout); err != nil { - return jerrors.Annotatef(err, "time.ParseDuration(WaitTimeout{%#v})", c.WaitTimeout) - } - - return nil -} - -func (c *ClientConfig) CheckValidity() error { - var err error - - if c.heartbeatPeriod, err = time.ParseDuration(c.HeartbeatPeriod); err != nil { - return jerrors.Annotatef(err, "time.ParseDuration(HeartbeatPeroid{%#v})", c.HeartbeatPeriod) - } - - if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil { - return jerrors.Annotatef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout) - } - - if c.failFastTimeout, err = time.ParseDuration(c.FailFastTimeout); err != nil { - return jerrors.Annotatef(err, "time.ParseDuration(FailFastTimeout{%#v})", c.FailFastTimeout) - } - - return jerrors.Trace(c.GettySessionParam.CheckValidity()) -} - -func (c *ServerConfig) CheckValidity() error { - var err error - - if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil { - return jerrors.Annotatef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout) - } - - if c.failFastTimeout, err = time.ParseDuration(c.FailFastTimeout); err != nil { - return jerrors.Annotatef(err, "time.ParseDuration(FailFastTimeout{%#v})", c.FailFastTimeout) - } - - return jerrors.Trace(c.GettySessionParam.CheckValidity()) -} diff --git a/dubbo/listener.go b/dubbo/listener.go deleted file mode 100644 index d97d88b00015111bfaf9778a1d31d00bc8dfe3cc..0000000000000000000000000000000000000000 --- a/dubbo/listener.go +++ /dev/null @@ -1,289 +0,0 @@ -package dubbo - -import ( - "context" - "reflect" - "sync" - "time" -) - -import ( - "github.com/AlexStocks/getty" - log "github.com/AlexStocks/log4go" - "github.com/dubbogo/hessian2" - jerrors "github.com/juju/errors" -) - -// todo: WritePkg_Timeout will entry *.yml -const WritePkg_Timeout = 5 * time.Second - -var ( - errTooManySessions = jerrors.New("too many sessions") -) - -type rpcSession struct { - session getty.Session - reqNum int32 -} - -//////////////////////////////////////////// -// RpcClientHandler -//////////////////////////////////////////// - -type RpcClientHandler struct { - conn *gettyRPCClient -} - -func NewRpcClientHandler(client *gettyRPCClient) *RpcClientHandler { - return &RpcClientHandler{conn: client} -} - -func (h *RpcClientHandler) OnOpen(session getty.Session) error { - h.conn.addSession(session) - return nil -} - -func (h *RpcClientHandler) OnError(session getty.Session, err error) { - log.Info("session{%s} got error{%v}, will be closed.", session.Stat(), err) - h.conn.removeSession(session) -} - -func (h *RpcClientHandler) OnClose(session getty.Session) { - log.Info("session{%s} is closing......", session.Stat()) - h.conn.removeSession(session) -} - -func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { - p, ok := pkg.(*DubboPackage) - if !ok { - log.Error("illegal package") - return - } - - if p.Header.Type&hessian.Heartbeat != 0x00 { - log.Debug("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body) - return - } - log.Debug("get rpc response{header: %#v, body: %#v}", p.Header, p.Body) - - h.conn.updateSession(session) - - pendingResponse := h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID)) - if pendingResponse == nil { - return - } - - if p.Err != nil { - pendingResponse.err = p.Err - } - - if pendingResponse.callback == nil { - pendingResponse.done <- struct{}{} - } else { - pendingResponse.callback(pendingResponse.GetCallResponse()) - } -} - -func (h *RpcClientHandler) OnCron(session getty.Session) { - rpcSession, err := h.conn.getClientRpcSession(session) - if err != nil { - log.Error("client.getClientSession(session{%s}) = error{%s}", - session.Stat(), jerrors.ErrorStack(err)) - return - } - if h.conn.pool.rpcClient.conf.sessionTimeout.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() { - log.Warn("session{%s} timeout{%s}, reqNum{%d}", - session.Stat(), time.Since(session.GetActive()).String(), rpcSession.reqNum) - h.conn.removeSession(session) // -> h.conn.close() -> h.conn.pool.remove(h.conn) - return - } - - h.conn.pool.rpcClient.heartbeat(session) -} - -//////////////////////////////////////////// -// RpcServerHandler -//////////////////////////////////////////// - -type RpcServerHandler struct { - maxSessionNum int - sessionTimeout time.Duration - sessionMap map[getty.Session]*rpcSession - rwlock sync.RWMutex -} - -func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler { - return &RpcServerHandler{ - maxSessionNum: maxSessionNum, - sessionTimeout: sessionTimeout, - sessionMap: make(map[getty.Session]*rpcSession), - } -} - -func (h *RpcServerHandler) OnOpen(session getty.Session) error { - var err error - h.rwlock.RLock() - if h.maxSessionNum <= len(h.sessionMap) { - err = errTooManySessions - } - h.rwlock.RUnlock() - if err != nil { - return jerrors.Trace(err) - } - - log.Info("got session:%s", session.Stat()) - h.rwlock.Lock() - h.sessionMap[session] = &rpcSession{session: session} - h.rwlock.Unlock() - return nil -} - -func (h *RpcServerHandler) OnError(session getty.Session, err error) { - log.Info("session{%s} got error{%v}, will be closed.", session.Stat(), err) - h.rwlock.Lock() - delete(h.sessionMap, session) - h.rwlock.Unlock() -} - -func (h *RpcServerHandler) OnClose(session getty.Session) { - log.Info("session{%s} is closing......", session.Stat()) - h.rwlock.Lock() - delete(h.sessionMap, session) - h.rwlock.Unlock() -} - -func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { - h.rwlock.Lock() - if _, ok := h.sessionMap[session]; ok { - h.sessionMap[session].reqNum++ - } - h.rwlock.Unlock() - - p, ok := pkg.(*DubboPackage) - if !ok { - log.Error("illegal packge{%#v}", pkg) - return - } - p.Header.ResponseStatus = hessian.Response_OK - - // heartbeat - if p.Header.Type&hessian.Heartbeat != 0x00 { - log.Debug("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body) - h.reply(session, p, hessian.Heartbeat) - return - } - - // twoway - if p.Header.Type&hessian.Request_TwoWay == 0x00 { - h.reply(session, p, hessian.Response) - h.callService(p, nil) - return - } - - h.callService(p, nil) - h.reply(session, p, hessian.Response) -} - -func (h *RpcServerHandler) OnCron(session getty.Session) { - var ( - flag bool - active time.Time - ) - - h.rwlock.RLock() - if _, ok := h.sessionMap[session]; ok { - active = session.GetActive() - if h.sessionTimeout.Nanoseconds() < time.Since(active).Nanoseconds() { - flag = true - log.Warn("session{%s} timeout{%s}, reqNum{%d}", - session.Stat(), time.Since(active).String(), h.sessionMap[session].reqNum) - } - } - h.rwlock.RUnlock() - - if flag { - h.rwlock.Lock() - delete(h.sessionMap, session) - h.rwlock.Unlock() - session.Close() - } -} - -func (h *RpcServerHandler) callService(req *DubboPackage, ctx context.Context) { - - defer func() { - if e := recover(); e != nil { - req.Header.ResponseStatus = hessian.Response_BAD_REQUEST - if err, ok := e.(error); ok { - log.Error("callService panic: %#v", err) - req.Body = e.(error) - } else if err, ok := e.(string); ok { - log.Error("callService panic: %#v", jerrors.New(err)) - req.Body = jerrors.New(err) - } else { - log.Error("callService panic: %#v", e) - req.Body = e - } - } - }() - - svc := req.Body.(map[string]interface{})["service"].(*service) - method := svc.method[req.Service.Method] - - // prepare argv - var argv reflect.Value - argIsValue := false // if true, need to indirect before calling. - if method.ArgType.Kind() == reflect.Ptr { - argv = reflect.New(method.ArgType.Elem()) - } else { - argv = reflect.New(method.ArgType) - argIsValue = true - } - argvTmp := argv.Interface() - argvTmp = req.Body.(map[string]interface{})["args"] // type is []interface - if argIsValue { - argv = argv.Elem() - } - - // prepare replyv - replyv := reflect.New(method.ReplyType.Elem()) - var returnValues []reflect.Value - if method.CtxType == nil { - returnValues = method.method.Func.Call([]reflect.Value{svc.rcvr, reflect.ValueOf(argvTmp), reflect.ValueOf(replyv.Interface())}) - } else { - if contextv := reflect.ValueOf(ctx); contextv.IsValid() { - returnValues = method.method.Func.Call([]reflect.Value{svc.rcvr, contextv, reflect.ValueOf(argvTmp), reflect.ValueOf(replyv.Interface())}) - } else { - returnValues = method.method.Func.Call([]reflect.Value{svc.rcvr, reflect.Zero(method.CtxType), reflect.ValueOf(argvTmp), reflect.ValueOf(replyv.Interface())}) - } - } - - // The return value for the method is an error. - if retErr := returnValues[0].Interface(); retErr != nil { - req.Header.ResponseStatus = hessian.Response_SERVER_ERROR - req.Body = retErr.(error) - } else { - req.Body = replyv.Interface() - } -} - -func (h *RpcServerHandler) reply(session getty.Session, req *DubboPackage, tp hessian.PackgeType) { - resp := &DubboPackage{ - Header: hessian.DubboHeader{ - SerialID: req.Header.SerialID, - Type: tp, - ID: req.Header.ID, - ResponseStatus: req.Header.ResponseStatus, - }, - } - - if req.Header.Type&hessian.Request != 0x00 { - resp.Body = req.Body - } else { - resp.Body = nil - } - - if err := session.WritePkg(resp, WritePkg_Timeout); err != nil { - log.Error("WritePkg error: %#v, %#v", jerrors.Trace(err), req.Header) - } -} diff --git a/dubbo/pool.go b/dubbo/pool.go deleted file mode 100644 index 2cc9cc7ac99142b2ad32beea6cc39722d0dd9a39..0000000000000000000000000000000000000000 --- a/dubbo/pool.go +++ /dev/null @@ -1,346 +0,0 @@ -package dubbo - -import ( - "fmt" - "math/rand" - "net" - "strings" - "sync" - "time" -) - -import ( - "github.com/AlexStocks/getty" - log "github.com/AlexStocks/log4go" - jerrors "github.com/juju/errors" -) - -type gettyRPCClient struct { - once sync.Once - protocol string - addr string - created int64 // 为0,则说明没有被创建或者被销毁了 - - pool *gettyRPCClientPool - - lock sync.RWMutex - gettyClient getty.Client - sessions []*rpcSession -} - -var ( - errClientPoolClosed = jerrors.New("client pool closed") -) - -func newGettyRPCClientConn(pool *gettyRPCClientPool, protocol, addr string) (*gettyRPCClient, error) { - c := &gettyRPCClient{ - protocol: protocol, - addr: addr, - pool: pool, - gettyClient: getty.NewTCPClient( - getty.WithServerAddress(addr), - getty.WithConnectionNumber((int)(pool.rpcClient.conf.ConnectionNum)), - ), - } - c.gettyClient.RunEventLoop(c.newSession) - idx := 1 - for { - idx++ - if c.isAvailable() { - break - } - - if idx > 5000 { - return nil, jerrors.New(fmt.Sprintf("failed to create client connection to %s in 5 seconds", addr)) - } - time.Sleep(1e6) - } - log.Info("client init ok") - c.created = time.Now().Unix() - - return c, nil -} - -func (c *gettyRPCClient) newSession(session getty.Session) error { - var ( - ok bool - tcpConn *net.TCPConn - conf ClientConfig - ) - - conf = c.pool.rpcClient.conf - if conf.GettySessionParam.CompressEncoding { - session.SetCompressType(getty.CompressZip) - } - - if tcpConn, ok = session.Conn().(*net.TCPConn); !ok { - panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn())) - } - - tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay) - tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive) - if conf.GettySessionParam.TcpKeepAlive { - tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod) - } - tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize) - tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize) - - session.SetName(conf.GettySessionParam.SessionName) - session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen) - session.SetPkgHandler(NewRpcClientPackageHandler(c.pool.rpcClient)) - session.SetEventListener(NewRpcClientHandler(c)) - session.SetRQLen(conf.GettySessionParam.PkgRQSize) - session.SetWQLen(conf.GettySessionParam.PkgWQSize) - session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout) - session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout) - session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6)) - session.SetWaitTime(conf.GettySessionParam.waitTimeout) - log.Debug("client new session:%s\n", session.Stat()) - - return nil -} - -func (c *gettyRPCClient) selectSession() getty.Session { - c.lock.RLock() - defer c.lock.RUnlock() - - if c.sessions == nil { - return nil - } - - count := len(c.sessions) - if count == 0 { - return nil - } - return c.sessions[rand.Int31n(int32(count))].session -} - -func (c *gettyRPCClient) addSession(session getty.Session) { - log.Debug("add session{%s}", session.Stat()) - if session == nil { - return - } - - c.lock.Lock() - c.sessions = append(c.sessions, &rpcSession{session: session}) - c.lock.Unlock() -} - -func (c *gettyRPCClient) removeSession(session getty.Session) { - if session == nil { - return - } - - c.lock.Lock() - defer c.lock.Unlock() - if c.sessions == nil { - return - } - - for i, s := range c.sessions { - if s.session == session { - c.sessions = append(c.sessions[:i], c.sessions[i+1:]...) - log.Debug("delete session{%s}, its index{%d}", session.Stat(), i) - break - } - } - log.Info("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions)) - if len(c.sessions) == 0 { - c.close() // -> pool.remove(c) - } -} - -func (c *gettyRPCClient) updateSession(session getty.Session) { - if session == nil { - return - } - c.lock.Lock() - defer c.lock.Unlock() - if c.sessions == nil { - return - } - - for i, s := range c.sessions { - if s.session == session { - c.sessions[i].reqNum++ - break - } - } -} - -func (c *gettyRPCClient) getClientRpcSession(session getty.Session) (rpcSession, error) { - var ( - err error - rpcSession rpcSession - ) - c.lock.Lock() - defer c.lock.Unlock() - if c.sessions == nil { - return rpcSession, errClientClosed - } - - err = errSessionNotExist - for _, s := range c.sessions { - if s.session == session { - rpcSession = *s - err = nil - break - } - } - - return rpcSession, jerrors.Trace(err) -} - -func (c *gettyRPCClient) isAvailable() bool { - if c.selectSession() == nil { - return false - } - - return true -} - -func (c *gettyRPCClient) close() error { - err := jerrors.Errorf("close gettyRPCClient{%#v} again", c) - c.once.Do(func() { - // delete @c from client pool - c.pool.remove(c) - for _, s := range c.sessions { - log.Info("close client session{%s, last active:%s, request number:%d}", - s.session.Stat(), s.session.GetActive().String(), s.reqNum) - s.session.Close() - } - c.gettyClient.Close() - c.gettyClient = nil - c.sessions = c.sessions[:0] - - c.created = 0 - err = nil - }) - return err -} - -type gettyRPCClientPool struct { - rpcClient *Client - size int // []*gettyRPCClient数组的size - ttl int64 // 每个gettyRPCClient的有效期时间. pool对象会在getConn时执行ttl检查 - - sync.Mutex - connMap map[string][]*gettyRPCClient // 从[]*gettyRPCClient 可见key是连接地址,而value是对应这个地址的连接数组 -} - -func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientPool { - return &gettyRPCClientPool{ - rpcClient: rpcClient, - size: size, - ttl: int64(ttl.Seconds()), - connMap: make(map[string][]*gettyRPCClient), - } -} - -func (p *gettyRPCClientPool) close() { - p.Lock() - connMap := p.connMap - p.connMap = nil - p.Unlock() - for _, connArray := range connMap { - for _, conn := range connArray { - conn.close() - } - } -} - -func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPCClient, error) { - var builder strings.Builder - - builder.WriteString(addr) - builder.WriteString("@") - builder.WriteString(protocol) - - key := builder.String() - - p.Lock() - defer p.Unlock() - if p.connMap == nil { - return nil, errClientPoolClosed - } - - connArray := p.connMap[key] - now := time.Now().Unix() - - for len(connArray) > 0 { - conn := connArray[len(connArray)-1] - connArray = connArray[:len(connArray)-1] - p.connMap[key] = connArray - - if d := now - conn.created; d > p.ttl { - conn.close() // -> pool.remove(c) - continue - } - - return conn, nil - } - - // create new conn - return newGettyRPCClientConn(p, protocol, addr) -} - -func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) { - if conn == nil || conn.created == 0 { - return - } - if err != nil { - conn.close() - return - } - - var builder strings.Builder - - builder.WriteString(conn.addr) - builder.WriteString("@") - builder.WriteString(conn.protocol) - - key := builder.String() - - p.Lock() - defer p.Unlock() - if p.connMap == nil { - return - } - - connArray := p.connMap[key] - if len(connArray) >= p.size { - conn.close() - return - } - p.connMap[key] = append(connArray, conn) -} - -func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) { - if conn == nil || conn.created == 0 { - return - } - - var builder strings.Builder - - builder.WriteString(conn.addr) - builder.WriteString("@") - builder.WriteString(conn.protocol) - - key := builder.String() - - p.Lock() - defer p.Unlock() - if p.connMap == nil { - return - } - - connArray := p.connMap[key] - if len(connArray) > 0 { - for idx, c := range connArray { - if conn == c { - p.connMap[key] = append(connArray[:idx], connArray[idx+1:]...) - break - } - } - } -} diff --git a/dubbo/readwriter.go b/dubbo/readwriter.go deleted file mode 100644 index e14cf201009a98992d57ed1e714a7cb3252bd862..0000000000000000000000000000000000000000 --- a/dubbo/readwriter.go +++ /dev/null @@ -1,136 +0,0 @@ -package dubbo - -import ( - "bytes" - "reflect" -) - -import ( - "github.com/AlexStocks/getty" - log "github.com/AlexStocks/log4go" - jerrors "github.com/juju/errors" -) - -//////////////////////////////////////////// -// RpcClientPackageHandler -//////////////////////////////////////////// - -type RpcClientPackageHandler struct { - client *Client -} - -func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler { - return &RpcClientPackageHandler{client: client} -} - -func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { - pkg := &DubboPackage{ - Body: p.client.pendingResponses[SequenceType(int64(p.client.sequence.Load()))].reply, - } - - buf := bytes.NewBuffer(data) - err := pkg.Unmarshal(buf) - if err != nil { - pkg.Err = jerrors.Trace(err) // client will get this err - return pkg, len(data), nil - } - - return pkg, len(data), nil -} - -func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) error { - req, ok := pkg.(*DubboPackage) - if !ok { - log.Error("illegal pkg:%+v\n", pkg) - return jerrors.New("invalid rpc request") - } - - buf, err := req.Marshal() - if err != nil { - log.Warn("binary.Write(req{%#v}) = err{%#v}", req, jerrors.ErrorStack(err)) - return jerrors.Trace(err) - } - - return jerrors.Trace(ss.WriteBytes(buf.Bytes())) -} - -//////////////////////////////////////////// -// RpcServerPackageHandler -//////////////////////////////////////////// - -type RpcServerPackageHandler struct { - server *Server - srvMap serviceMap -} - -func NewRpcServerPackageHandler(server *Server, srvMap serviceMap) *RpcServerPackageHandler { - return &RpcServerPackageHandler{ - server: server, - srvMap: srvMap, - } -} - -func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { - pkg := &DubboPackage{ - Body: make([]interface{}, 7), - } - - buf := bytes.NewBuffer(data) - err := pkg.Unmarshal(buf) - if err != nil { - return nil, 0, jerrors.Trace(err) - } - // convert params of request - req := pkg.Body.([]interface{}) // length of body should be 7 - if len(req) > 0 { - var dubboVersion, argsTypes string - var args []interface{} - var attachments map[interface{}]interface{} - if req[0] != nil { - dubboVersion = req[0].(string) - } - if req[1] != nil { - pkg.Service.Target = req[1].(string) - } - if req[2] != nil { - pkg.Service.Version = req[2].(string) - } - if req[3] != nil { - pkg.Service.Method = req[3].(string) - } - if req[4] != nil { - argsTypes = req[4].(string) - } - if req[5] != nil { - args = req[5].([]interface{}) - } - if req[6] != nil { - attachments = req[6].(map[interface{}]interface{}) - } - pkg.Body = map[string]interface{}{ - "dubboVersion": dubboVersion, - "argsTypes": argsTypes, - "args": args, - "service": p.srvMap[pkg.Service.Target], - "attachments": attachments, - } - } - - return pkg, len(data), nil -} - -func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) error { - res, ok := pkg.(*DubboPackage) - if !ok { - log.Error("illegal pkg:%+v\n, it is %+v", pkg, reflect.TypeOf(pkg)) - return jerrors.New("invalid rpc response") - } - - buf, err := res.Marshal() - if err != nil { - log.Warn("binary.Write(res{%#v}) = err{%#v}", res, jerrors.ErrorStack(err)) - return jerrors.Trace(err) - } - - return jerrors.Trace(ss.WriteBytes(buf.Bytes())) -} diff --git a/dubbo/rpc.go b/dubbo/rpc.go deleted file mode 100644 index f20fc9603eeb630a525ee8a87e638eec95aead96..0000000000000000000000000000000000000000 --- a/dubbo/rpc.go +++ /dev/null @@ -1,119 +0,0 @@ -package dubbo - -import ( - "reflect" - "sync" - "unicode" - "unicode/utf8" -) - -import ( - log "github.com/AlexStocks/log4go" -) - -var ( - typeOfError = reflect.TypeOf((*error)(nil)).Elem() -) - -type GettyRPCService interface { - Service() string // Service Interface - Version() string -} - -type methodType struct { - sync.Mutex - method reflect.Method - CtxType reflect.Type // type of the request context - ArgType reflect.Type - ReplyType reflect.Type -} - -type service struct { - name string - rcvr reflect.Value - rcvrType reflect.Type - method map[string]*methodType -} - -// Is this an exported - upper case - name -func isExported(name string) bool { - rune, _ := utf8.DecodeRuneInString(name) - return unicode.IsUpper(rune) -} - -// Is this type exported or a builtin? -func isExportedOrBuiltinType(t reflect.Type) bool { - for t.Kind() == reflect.Ptr { - t = t.Elem() - } - // PkgPath will be non-empty even for an exported type, - // so we need to check the type name as well. - return isExported(t.Name()) || t.PkgPath() == "" -} - -// suitableMethods returns suitable Rpc methods of typ -func suitableMethods(typ reflect.Type) (string, map[string]*methodType) { - methods := make(map[string]*methodType) - mts := "" - for m := 0; m < typ.NumMethod(); m++ { - method := typ.Method(m) - if mt := suiteMethod(method); mt != nil { - methods[method.Name] = mt - mts += method.Name + "," - } - } - return mts, methods -} - -// suiteMethod returns a suitable Rpc methodType -func suiteMethod(method reflect.Method) *methodType { - mtype := method.Type - mname := method.Name - - // Method must be exported. - if method.PkgPath != "" { - return nil - } - - var replyType, argType, ctxType reflect.Type - switch mtype.NumIn() { - case 3: - argType = mtype.In(1) - replyType = mtype.In(2) - case 4: - ctxType = mtype.In(1) - argType = mtype.In(2) - replyType = mtype.In(3) - default: - log.Error("method %s of mtype %v has wrong number of in parameters %d; needs exactly 3/4", - mname, mtype, mtype.NumIn()) - return nil - } - // First arg need not be a pointer. - if !isExportedOrBuiltinType(argType) { - log.Error("argument type of method %q is not exported %v", mname, argType) - return nil - } - // Second arg must be a pointer. - if replyType.Kind() != reflect.Ptr { - log.Error("reply type of method %q is not a pointer %v", mname, replyType) - return nil - } - // Reply type must be exported. - if !isExportedOrBuiltinType(replyType) { - log.Error("reply type of method %s not exported{%v}", mname, replyType) - return nil - } - // Method needs one out. - if mtype.NumOut() != 1 { - log.Error("method %q has %d out parameters; needs exactly 1", mname, mtype.NumOut()) - return nil - } - // The return type of the method must be error. - if returnType := mtype.Out(0); returnType != typeOfError { - log.Error("return type %s of method %q is not error", returnType, mname) - return nil - } - - return &methodType{method: method, ArgType: argType, ReplyType: replyType, CtxType: ctxType} -} diff --git a/dubbo/server.go b/dubbo/server.go deleted file mode 100644 index 79cc4d51e0059ffb0ed2a537626d63dabe32edcf..0000000000000000000000000000000000000000 --- a/dubbo/server.go +++ /dev/null @@ -1,252 +0,0 @@ -package dubbo - -import ( - "fmt" - "github.com/dubbo/dubbo-go/plugins" - "net" - "reflect" - "strconv" -) - -import ( - "github.com/AlexStocks/getty" - "github.com/AlexStocks/goext/net" - log "github.com/AlexStocks/log4go" - jerrors "github.com/juju/errors" -) - -import ( - "github.com/dubbo/dubbo-go/registry" -) - -type Option func(*Options) - -type Options struct { - Registry registry.Registry - ConfList []ServerConfig - ServiceConfList []registry.ReferenceConfig -} - -func newOptions(opt ...Option) Options { - opts := Options{} - for _, o := range opt { - o(&opts) - } - - if opts.Registry == nil { - panic("server.Options.Registry is nil") - } - - return opts -} - -// Registry used for discovery -func Registry(r registry.Registry) Option { - return func(o *Options) { - o.Registry = r - } -} - -func ConfList(confList []ServerConfig) Option { - return func(o *Options) { - o.ConfList = confList - for i := 0; i < len(o.ConfList); i++ { - if err := o.ConfList[i].CheckValidity(); err != nil { - log.Error("ServerConfig check failed: ", err) - o.ConfList = []ServerConfig{} - return - } - if o.ConfList[i].IP == "" { - o.ConfList[i].IP, _ = gxnet.GetLocalIP() - } - } - } -} - -func ServiceConfList(confList []registry.ReferenceConfig) Option { - return func(o *Options) { - o.ServiceConfList = confList - if o.ServiceConfList == nil { - o.ServiceConfList = []registry.ReferenceConfig{} - } - } -} - -type serviceMap map[string]*service - -type Server struct { - opts Options - indexOfConfList int - srvs []serviceMap - tcpServerList []getty.Server -} - -func NewServer(opts ...Option) *Server { - options := newOptions(opts...) - num := len(options.ConfList) - servers := make([]serviceMap, len(options.ConfList)) - - for i := 0; i < num; i++ { - servers[i] = map[string]*service{} - } - - s := &Server{ - opts: options, - srvs: servers, - } - - return s -} - -// Register export services and register with the registry -func (s *Server) Register(rcvr GettyRPCService) error { - - serviceConf := plugins.DefaultProviderServiceConfig()() - - opts := s.opts - - serviceConf.SetService(rcvr.Service()) - serviceConf.SetVersion(rcvr.Version()) - - flag := false - serviceNum := len(opts.ServiceConfList) - serverNum := len(opts.ConfList) - for i := 0; i < serviceNum; i++ { - if opts.ServiceConfList[i].Service() == serviceConf.Service() && - opts.ServiceConfList[i].Version() == serviceConf.Version() { - - serviceConf.SetProtocol(opts.ServiceConfList[i].Protocol()) - serviceConf.SetGroup(opts.ServiceConfList[i].Group()) - - for j := 0; j < serverNum; j++ { - if opts.ConfList[j].Protocol == serviceConf.Protocol() { - rcvrName := reflect.Indirect(reflect.ValueOf(rcvr)).Type().Name() - svc := &service{ - rcvrType: reflect.TypeOf(rcvr), - rcvr: reflect.ValueOf(rcvr), - } - if rcvrName == "" { - s := "rpc.Register: no service name for type " + svc.rcvrType.String() - log.Error(s) - return jerrors.New(s) - } - if !isExported(rcvrName) { - s := "rpc.Register: type " + rcvrName + " is not exported" - log.Error(s) - return jerrors.New(s) - } - - svc.name = rcvr.Service() // service name is from 'Service()' - if _, present := s.srvs[j][svc.name]; present { - return jerrors.New("rpc: service already defined: " + svc.name) - } - - // Install the methods - mts, methods := suitableMethods(svc.rcvrType) - svc.method = methods - - if len(svc.method) == 0 { - // To help the user, see if a pointer receiver would work. - mts, methods = suitableMethods(reflect.PtrTo(svc.rcvrType)) - str := "rpc.Register: type " + rcvrName + " has no exported methods of suitable type" - if len(methods) != 0 { - str = "rpc.Register: type " + rcvrName + " has no exported methods of suitable type (" + - "hint: pass a pointer to value of that type)" - } - log.Error(str) - - return jerrors.New(str) - } - - s.srvs[j][svc.name] = svc - - serviceConf.SetMethods(mts) - serviceConf.SetPath(opts.ConfList[j].Address()) - - err := opts.Registry.Register(serviceConf) - if err != nil { - return err - } - flag = true - } - } - } - } - - if !flag { - return jerrors.Errorf("fail to register Handler{service:%s, version:%s}", - serviceConf.Service, serviceConf.Version) - } - return nil -} - -func (s *Server) newSession(session getty.Session) error { - var ( - ok bool - tcpConn *net.TCPConn - ) - conf := s.opts.ConfList[s.indexOfConfList] - - if conf.GettySessionParam.CompressEncoding { - session.SetCompressType(getty.CompressZip) - } - - if tcpConn, ok = session.Conn().(*net.TCPConn); !ok { - panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn())) - } - - tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay) - tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive) - if conf.GettySessionParam.TcpKeepAlive { - tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod) - } - tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize) - tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize) - - session.SetName(conf.GettySessionParam.SessionName) - session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen) - session.SetPkgHandler(NewRpcServerPackageHandler(s, s.srvs[s.indexOfConfList])) - session.SetEventListener(NewRpcServerHandler(conf.SessionNumber, conf.sessionTimeout)) - session.SetRQLen(conf.GettySessionParam.PkgRQSize) - session.SetWQLen(conf.GettySessionParam.PkgWQSize) - session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout) - session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout) - session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6)) - session.SetWaitTime(conf.GettySessionParam.waitTimeout) - log.Debug("app accepts new session:%s\n", session.Stat()) - - return nil -} - -func (s *Server) Start() { - var ( - addr string - tcpServer getty.Server - ) - - if len(s.opts.ConfList) == 0 { - panic("ConfList is nil") - } - - for i := 0; i < len(s.opts.ConfList); i++ { - addr = gxnet.HostAddress2(s.opts.ConfList[i].IP, strconv.Itoa(s.opts.ConfList[i].Port)) - tcpServer = getty.NewTCPServer( - getty.WithLocalAddress(addr), - ) - s.indexOfConfList = i - tcpServer.RunEventLoop(s.newSession) - log.Debug("s bind addr{%s} ok!", addr) - s.tcpServerList = append(s.tcpServerList, tcpServer) - } - -} - -func (s *Server) Stop() { - list := s.tcpServerList - s.tcpServerList = nil - if list != nil { - for _, tcpServer := range list { - tcpServer.Close() - } - } -} diff --git a/protocol/dubbo/dubbo_exporter.go b/protocol/dubbo/dubbo_exporter.go index 6e761b50ea59c38beceb30fd655a58226d44ace5..4a122f8b4fa170394bfaf7c5dfcbfd98666f23f6 100644 --- a/protocol/dubbo/dubbo_exporter.go +++ b/protocol/dubbo/dubbo_exporter.go @@ -4,22 +4,28 @@ import ( "context" ) +import ( + log "github.com/AlexStocks/log4go" +) + import ( "github.com/dubbo/dubbo-go/protocol" ) // wrapping invoker type DubboExporter struct { - ctx context.Context - key string - invoker protocol.Invoker + ctx context.Context + key string + invoker protocol.Invoker + exporterMap map[string]protocol.Exporter } -func NewDubboExporter(ctx context.Context, key string, invoker protocol.Invoker) *DubboExporter { +func NewDubboExporter(ctx context.Context, key string, invoker protocol.Invoker, exporterMap map[string]protocol.Exporter) *DubboExporter { return &DubboExporter{ - ctx: ctx, - key: key, - invoker: invoker, + ctx: ctx, + key: key, + invoker: invoker, + exporterMap: exporterMap, } } @@ -29,5 +35,7 @@ func (de *DubboExporter) GetInvoker() protocol.Invoker { } func (de *DubboExporter) Unexport() { + log.Info("DubboExporter unexport.") de.invoker.Destroy() + delete(de.exporterMap, de.key) } diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index f99d7034b6840dcff711b32f45f347195add40a8..434de69a33efd26aa86bca7857997a93e7aa31ff 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -4,9 +4,14 @@ import ( "context" "errors" "strconv" + "sync" "time" ) +import ( + log "github.com/AlexStocks/log4go" +) + import ( "github.com/dubbo/dubbo-go/common/constant" "github.com/dubbo/dubbo-go/config" @@ -16,16 +21,21 @@ import ( var Err_No_Reply = errors.New("no reply") type DubboInvoker struct { - ctx context.Context - url config.URL - client *Client + ctx context.Context + url config.URL + client *Client + available bool + destroyed bool + destroyLock sync.Mutex } func NewDubboInvoker(ctx context.Context, url config.URL, client *Client) *DubboInvoker { return &DubboInvoker{ - ctx: ctx, - url: url, - client: client, + ctx: ctx, + url: url, + client: client, + available: true, + destroyed: false, } } @@ -37,34 +47,60 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { ) inv := invocation.(*protocol.RPCInvocation) - url := inv.Invoker().GetURL() + url := inv.Invoker().GetUrl().(*config.URL) // async async, err := strconv.ParseBool(inv.AttachmentsByKey(constant.ASYNC_KEY, "false")) if err != nil { - async = true + log.Error("ParseBool - error: %v", err) + async = false } if async { - err = di.client.CallOneway(url.Location, url, inv.MethodName(), - WithCallRequestTimeout(inv.Params()["requestTimeout"].(time.Duration)), - WithCallResponseTimeout(inv.Params()["responseTimeout"].(time.Duration)), WithCallSerialID(inv.Params()["serialID"].(SerialID)), - WithCallMeta_All(inv.Params()["callMeta"].(map[interface{}]interface{}))) + if callBack, ok := inv.CallBack().(func(response CallResponse)); ok { + result.Err = di.client.AsyncCall(url.Location, *url, inv.MethodName(), inv.Reply(), callBack, + WithCallRequestTimeout(inv.Params()["requestTimeout"].(time.Duration)), + WithCallResponseTimeout(inv.Params()["responseTimeout"].(time.Duration)), + WithCallSerialID(inv.Params()["serialID"].(SerialID)), + WithCallMeta_All(inv.Params()["callMeta"].(map[interface{}]interface{}))) + } else { + result.Err = di.client.CallOneway(url.Location, *url, inv.MethodName(), + WithCallRequestTimeout(inv.Params()["requestTimeout"].(time.Duration)), + WithCallResponseTimeout(inv.Params()["responseTimeout"].(time.Duration)), + WithCallSerialID(inv.Params()["serialID"].(SerialID)), + WithCallMeta_All(inv.Params()["callMeta"].(map[interface{}]interface{}))) + } } else { if inv.Reply() == nil { result.Err = Err_No_Reply + } else { + result.Err = di.client.Call(url.Location, *url, inv.MethodName(), inv.Reply(), + WithCallRequestTimeout(inv.Params()["requestTimeout"].(time.Duration)), + WithCallResponseTimeout(inv.Params()["responseTimeout"].(time.Duration)), + WithCallSerialID(inv.Params()["serialID"].(SerialID)), + WithCallMeta_All(inv.Params()["callMeta"].(map[interface{}]interface{}))) + result.Rest = inv.Reply() // reply should be set to result.Rest when sync } - err = di.client.Call(url.Location, url, inv.MethodName(), inv.Reply(), - WithCallRequestTimeout(inv.Params()["requestTimeout"].(time.Duration)), - WithCallResponseTimeout(inv.Params()["responseTimeout"].(time.Duration)), WithCallSerialID(inv.Params()["serialID"].(SerialID)), - WithCallMeta_All(inv.Params()["callMeta"].(map[interface{}]interface{}))) } return result } -func (di *DubboInvoker) GetURL() config.URL { - return di.url +func (di *DubboInvoker) GetUrl() config.IURL { + return &di.url +} + +func (di *DubboInvoker) IsAvailable() bool { + return di.available } func (di *DubboInvoker) Destroy() { - //todo: + if di.destroyed { + return + } + di.destroyLock.Lock() + defer di.destroyLock.Unlock() + + di.destroyed = true + di.available = false + + di.client.Close() // close client } diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go index f65a7bbda7bf36ffa70e7d12c3ab91cc3de48bef..496c9185f624cf78efff538736ce7d7a813aa3ee 100644 --- a/protocol/dubbo/dubbo_protocol.go +++ b/protocol/dubbo/dubbo_protocol.go @@ -30,14 +30,14 @@ func NewDubboProtocol(ctx context.Context) *DubboProtocol { } func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { - url := invoker.GetURL() + url := invoker.GetUrl().(*config.URL) serviceKey := url.Key() - exporter := NewDubboExporter(nil, serviceKey, invoker) + exporter := NewDubboExporter(nil, serviceKey, invoker, dp.exporterMap) dp.exporterMap[serviceKey] = exporter log.Info("Export service: ", url.String()) // start server - dp.openServer(url) + dp.openServer(*url) return exporter } @@ -49,7 +49,8 @@ func (dp *DubboProtocol) Refer(url config.URL) protocol.Invoker { } func (dp *DubboProtocol) Destroy() { - + log.Info("DubboProtocol destroy.") + srv.Stop() // stop server } func (dp *DubboProtocol) openServer(url config.URL) { diff --git a/protocol/dubbo/pool.go b/protocol/dubbo/pool.go index 2cc9cc7ac99142b2ad32beea6cc39722d0dd9a39..1f9828d7b7f28bbe0758a3c0a686fe992fa67133 100644 --- a/protocol/dubbo/pool.go +++ b/protocol/dubbo/pool.go @@ -250,13 +250,8 @@ func (p *gettyRPCClientPool) close() { } func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPCClient, error) { - var builder strings.Builder - builder.WriteString(addr) - builder.WriteString("@") - builder.WriteString(protocol) - - key := builder.String() + key := GenerateEndpointAddr(protocol, addr) p.Lock() defer p.Unlock() @@ -293,13 +288,7 @@ func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) { return } - var builder strings.Builder - - builder.WriteString(conn.addr) - builder.WriteString("@") - builder.WriteString(conn.protocol) - - key := builder.String() + key := GenerateEndpointAddr(conn.protocol, conn.addr) p.Lock() defer p.Unlock() @@ -320,13 +309,7 @@ func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) { return } - var builder strings.Builder - - builder.WriteString(conn.addr) - builder.WriteString("@") - builder.WriteString(conn.protocol) - - key := builder.String() + key := GenerateEndpointAddr(conn.protocol, conn.addr) p.Lock() defer p.Unlock() @@ -344,3 +327,13 @@ func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) { } } } + +func GenerateEndpointAddr(protocol, addr string) string { + var builder strings.Builder + + builder.WriteString(protocol) + builder.WriteString("://") + builder.WriteString(addr) + + return builder.String() +} diff --git a/protocol/dubbo/server.go b/protocol/dubbo/server.go index 69027c690c1880f3f9d71d7c6fc514092d3289e5..f2c90f35bcf9fb1b0d77e3025661254f461cd512 100644 --- a/protocol/dubbo/server.go +++ b/protocol/dubbo/server.go @@ -105,7 +105,7 @@ func (s *Server) Start(url config.URL) { tcpServer getty.Server ) - addr = url.Location() + addr = url.Location tcpServer = getty.NewTCPServer( getty.WithLocalAddress(addr), ) diff --git a/protocol/invocation.go b/protocol/invocation.go index cfd133cdf96663fb2ea479091de60ebe9bf7a6d4..a6d86a3d8bea752d3c110ff5d04245406a6e46c1 100644 --- a/protocol/invocation.go +++ b/protocol/invocation.go @@ -17,24 +17,27 @@ type Invocation interface { ///////////////////////////// // Invocation Impletment of RPC ///////////////////////////// - +// todo: is it necessary to separate fields of consumer(provider) from RPCInvocation type RPCInvocation struct { methodName string parameterTypes []reflect.Type arguments []interface{} reply interface{} + callBack interface{} attachments map[string]string invoker Invoker params map[string]interface{} // Store some parameters that are not easy to refine } +// todo: arguments table is too many func NewRPCInvocation(methodName string, parameterTypes []reflect.Type, arguments []interface{}, - reply interface{}, attachments map[string]string, invoker Invoker, params map[string]interface{}) *RPCInvocation { + reply interface{}, callBack interface{}, attachments map[string]string, invoker Invoker, params map[string]interface{}) *RPCInvocation { return &RPCInvocation{ methodName: methodName, parameterTypes: parameterTypes, arguments: arguments, reply: reply, + callBack: callBack, attachments: attachments, invoker: invoker, params: params, @@ -79,3 +82,7 @@ func (r *RPCInvocation) SetInvoker() Invoker { func (r *RPCInvocation) Params() map[string]interface{} { return r.params } + +func (r *RPCInvocation) CallBack() interface{} { + return r.callBack +} diff --git a/protocol/invoker.go b/protocol/invoker.go index c18042b53c717a8a416f618fb9bffc5325b2cd16..1b974abc421aec9e5cf82aa6710674379d04eef9 100644 --- a/protocol/invoker.go +++ b/protocol/invoker.go @@ -5,5 +5,5 @@ import "github.com/dubbo/dubbo-go/common" // Extension - Invoker type Invoker interface { common.Node - Invoke() + Invoke(Invocation) Result } diff --git a/registry/directory.go b/registry/directory.go index a2f352a6327581ca21a44b6499d018aec17b67f3..9e9fd9b936beb647812f86dfc4a4d8b0d9ea5f61 100644 --- a/registry/directory.go +++ b/registry/directory.go @@ -179,10 +179,7 @@ func (dir *RegistryDirectory) cacheInvoker(url config.URL) sync.Map { if _, ok := newCacheInvokers.Load(url.ToFullString()); !ok { log.Debug("service will be added in cache invokers: invokers key is %s!", url.ToFullString()) - newInvoker,err := extension.GetProtocolExtension(url.Protocol).Refer(url) - if err!=nil{ - - } + newInvoker := extension.GetProtocolExtension(url.Protocol).Refer(&url) newCacheInvokers.Store(url.ToFullString(), newInvoker) } }