diff --git a/client/client_transport.go b/client/client_transport.go index 14544cc92d2ce9b119cb7d7dff9896bb13dca6c1..e3d31b60efb91c4d095782614f1d5705f5fc9d81 100644 --- a/client/client_transport.go +++ b/client/client_transport.go @@ -1,35 +1,31 @@ package client -import ( - "context" - "github.com/dubbo/dubbo-go/service" -) - -type Transport interface { - Call(ctx context.Context, url *service.ServiceURL, request Request, resp interface{}) error - NewRequest(conf service.ServiceConfig, method string, args interface{}) Request -} - -////////////////////////////////////////////// -// Request -////////////////////////////////////////////// - -type Request struct { - ID int64 - Group string - Protocol string - Version string - Service string - Method string - Args interface{} - ContentType string -} - -func (r *Request) ServiceConfig() service.ServiceConfigIf { - return &service.ServiceConfig{ - Protocol: r.Protocol, - Service: r.Service, - Group: r.Group, - Version: r.Version, - } -} +// +//type Transport interface { +// Call(ctx context.Context, url *service.ServiceURL, request Request, resp interface{}) error +// NewRequest(conf service.ServiceConfig, method string, args interface{}) Request +//} +// +//////////////////////////////////////////////// +//// Request +//////////////////////////////////////////////// +// +//type Request struct { +// ID int64 +// Group string +// Protocol string +// Version string +// Service string +// Method string +// Args interface{} +// ContentType string +//} +// +//func (r *Request) ServiceConfig() service.ServiceConfigIf { +// return &service.ServiceConfig{ +// Protocol: r.Protocol, +// Service: r.Service, +// Group: r.Group, +// Version: r.Version, +// } +//} diff --git a/client/invoker/invoker.go b/client/invoker/invoker.go index eac4f16119134afe39888bf75613164fa98ac9bb..704de9231bc31156d9fa0e1c124987799a4aa566 100644 --- a/client/invoker/invoker.go +++ b/client/invoker/invoker.go @@ -2,7 +2,7 @@ package invoker import ( "context" - "github.com/dubbo/dubbo-go/client" + "github.com/dubbo/dubbo-go/dubbo" "sync" "time" ) @@ -14,6 +14,7 @@ import ( import ( "github.com/dubbo/dubbo-go/client/loadBalance" + "github.com/dubbo/dubbo-go/jsonrpc" "github.com/dubbo/dubbo-go/registry" "github.com/dubbo/dubbo-go/service" ) @@ -21,7 +22,9 @@ import ( type Options struct { ServiceTTL time.Duration selector loadBalance.Selector - Transport client.Transport + //TODO:we should provider a transport client interface + HttpClient *jsonrpc.HTTPClient + DubboClient *dubbo.Client } type Option func(*Options) @@ -31,9 +34,14 @@ func WithServiceTTL(ttl time.Duration) Option { } } -func WithClientTransport(client client.Transport) Option { +func WithHttpClient(client *jsonrpc.HTTPClient) Option { return func(o *Options) { - o.Transport = client + o.HttpClient = client + } +} +func WithDubboClient(client *dubbo.Client) Option { + return func(o *Options) { + o.DubboClient = client } } @@ -59,8 +67,8 @@ func NewInvoker(registry registry.Registry, opts ...Option) (*Invoker, error) { for _, opt := range opts { opt(&options) } - if options.Transport == nil { - return nil, jerrors.New("Must specify the client transport !") + if options.HttpClient == nil && options.DubboClient == nil { + return nil, jerrors.New("Must specify the transport client!") } invoker := &Invoker{ Options: options, @@ -152,7 +160,7 @@ func (ivk *Invoker) getService(serviceConf *service.ServiceConfig) (*ServiceArra return newSvcArr, nil } -func (ivk *Invoker) Call(ctx context.Context, reqId int64, serviceConf *service.ServiceConfig, req client.Request, resp interface{}) error { +func (ivk *Invoker) HttpCall(ctx context.Context, reqId int64, serviceConf *service.ServiceConfig, req jsonrpc.Request, resp interface{}) error { serviceArray, err := ivk.getService(serviceConf) if err != nil { @@ -165,10 +173,32 @@ func (ivk *Invoker) Call(ctx context.Context, reqId int64, serviceConf *service. if err != nil { return err } - if err = ivk.Transport.Call(ctx, url, req, resp); err != nil { + if err = ivk.HttpClient.Call(ctx, url, req, resp); err != nil { log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err)) return err } log.Info("response result:%s", resp) return nil } + +func (ivk *Invoker) DubboCall(reqId int64, serviceConf *service.ServiceConfig, method string, args, reply interface{}, opts ...dubbo.CallOption) error { + + serviceArray, err := ivk.getService(serviceConf) + if err != nil { + return err + } + if len(serviceArray.arr) == 0 { + return jerrors.New("cannot find svc " + serviceConf.String()) + } + url, err := ivk.selector.Select(reqId, serviceArray) + if err != nil { + return err + } + //TODO:杩欓噷瑕佹敼涓€涓媍all鏂规硶鏀逛负鎺ユ敹鎸囬拡绫诲瀷 + if err = ivk.DubboClient.Call(url.Ip+":"+url.Port, *url, method, args, reply, opts...); err != nil { + log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err)) + return err + } + log.Info("response result:%s", reply) + return nil +} diff --git a/client/loadBalance/load_balance.go b/client/loadBalance/load_balance.go index 3e8137a7d8d13fba8dcda977e7f1fe3c5e12bb35..61a5473d3f6423aa1fee3fb6bd76c30c7e8eb557 100644 --- a/client/loadBalance/load_balance.go +++ b/client/loadBalance/load_balance.go @@ -6,7 +6,7 @@ import ( ) type Selector interface { - Select(ID int64, array client.ServiceArrayIf) (*service.ServiceURL, error) + Select(ID int64, array client.ServiceArrayIf) (*service.ServiceURL, error) } ////////////////////////////////////////// @@ -37,5 +37,3 @@ type Selector interface { // // return "" //} - - diff --git a/dubbo/client.go b/dubbo/client.go index 08e0a72a3607c07c85c34553f82a15cba7e16567..958706a6b73561e4e85535b67a4bfbb32228a8cd 100644 --- a/dubbo/client.go +++ b/dubbo/client.go @@ -16,7 +16,7 @@ import ( import ( "github.com/dubbo/dubbo-go/public" - "github.com/dubbo/dubbo-go/registry" + svc "github.com/dubbo/dubbo-go/service" ) var ( @@ -104,7 +104,7 @@ func NewClient(conf *ClientConfig) (*Client, error) { } // call one way -func (c *Client) CallOneway(addr string, svcUrl registry.ServiceURL, method string, args interface{}, opts ...CallOption) error { +func (c *Client) CallOneway(addr string, svcUrl svc.ServiceURL, method string, args interface{}, opts ...CallOption) error { var copts CallOptions for _, o := range opts { @@ -115,7 +115,7 @@ func (c *Client) CallOneway(addr string, svcUrl registry.ServiceURL, method stri } // if @reply is nil, the transport layer will get the response without notify the invoker. -func (c *Client) Call(addr string, svcUrl registry.ServiceURL, method string, args, reply interface{}, opts ...CallOption) error { +func (c *Client) Call(addr string, svcUrl svc.ServiceURL, method string, args, reply interface{}, opts ...CallOption) error { var copts CallOptions for _, o := range opts { @@ -130,7 +130,7 @@ func (c *Client) Call(addr string, svcUrl registry.ServiceURL, method string, ar return jerrors.Trace(c.call(ct, addr, svcUrl, method, args, reply, nil, copts)) } -func (c *Client) AsyncCall(addr string, svcUrl registry.ServiceURL, method string, args interface{}, +func (c *Client) AsyncCall(addr string, svcUrl svc.ServiceURL, method string, args interface{}, callback AsyncCallback, reply interface{}, opts ...CallOption) error { var copts CallOptions @@ -141,7 +141,7 @@ func (c *Client) AsyncCall(addr string, svcUrl registry.ServiceURL, method strin return jerrors.Trace(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback, copts)) } -func (c *Client) call(ct CallType, addr string, svcUrl registry.ServiceURL, method string, +func (c *Client) call(ct CallType, addr string, svcUrl svc.ServiceURL, method string, args, reply interface{}, callback AsyncCallback, opts CallOptions) error { if opts.RequestTimeout == 0 { diff --git a/examples/dubbo/go-client/app/client.go b/examples/dubbo/go-client/app/client.go index d0b092c38da1d2f65fe3e465ee2d18d6b1d9d949..43c3bae1e0df8ed6eca91b4af06dc0cb4e079edf 100644 --- a/examples/dubbo/go-client/app/client.go +++ b/examples/dubbo/go-client/app/client.go @@ -129,8 +129,12 @@ func initClient(clientConfig *examples.ClientConfig) { SessionName: "client", }, }) + if err != nil { + log.Error("hessian.NewClient(conf) = error:%s", jerrors.ErrorStack(err)) + return + } clientInvoker, err = invoker.NewInvoker(clientRegistry, - invoker.WithClientTransport(cltD), + invoker.WithDubboClient(cltD), invoker.WithLBSelector(configClientLB)) } diff --git a/examples/dubbo/go-client/app/test.go b/examples/dubbo/go-client/app/test.go index 5e78c983eb44e88a23bbfd3cf77c2a6917cf2d11..804b294fa332ccf64579f696037f64ef019cb461 100644 --- a/examples/dubbo/go-client/app/test.go +++ b/examples/dubbo/go-client/app/test.go @@ -3,6 +3,7 @@ package main import ( "fmt" "github.com/dubbo/dubbo-go/examples" + "github.com/dubbo/dubbo-go/service" "github.com/dubbogo/hessian2" _ "net/http/pprof" ) @@ -16,53 +17,21 @@ import ( import ( "github.com/dubbo/dubbo-go/dubbo" "github.com/dubbo/dubbo-go/public" - "github.com/dubbo/dubbo-go/registry" ) -func testDubborpc(clientConfig *examples.ClientConfig,userKey string) { +func testDubborpc(clientConfig *examples.ClientConfig, userKey string) { var ( err error - service string + svc string method string serviceIdx int user *DubboUser - conf registry.ServiceConfig - serviceURL *registry.ServiceURL - cltD *dubbo.Client + conf service.ServiceConfig ) - - cltD, err = dubbo.NewClient(&dubbo.ClientConfig{ - PoolSize: 64, - PoolTTL: 600, - ConnectionNum: 2, // 涓嶈兘澶ぇ - FailFastTimeout: "5s", - SessionTimeout: "20s", - HeartbeatPeriod: "5s", - GettySessionParam: dubbo.GettySessionParam{ - CompressEncoding: false, // 蹇呴』false - TcpNoDelay: true, - KeepAlivePeriod: "120s", - TcpRBufSize: 262144, - TcpKeepAlive: true, - TcpWBufSize: 65536, - PkgRQSize: 1024, - PkgWQSize: 512, - TcpReadTimeout: "1s", - TcpWriteTimeout: "5s", - WaitTimeout: "1s", - MaxMsgLen: 1024, - SessionName: "client", - }, - }) - if err != nil { - log.Error("hessian.NewClient(conf) = error:%s", jerrors.ErrorStack(err)) - return - } - defer cltD.Close() serviceIdx = -1 - service = "com.ikurento.user.UserProvider" + svc = "com.ikurento.user.UserProvider" for i := range clientConfig.Service_List { - if clientConfig.Service_List[i].Service == service && clientConfig.Service_List[i].Protocol == public.CODECTYPE_DUBBO.String() { + if clientConfig.Service_List[i].Service == svc && clientConfig.Service_List[i].Protocol == public.CODECTYPE_DUBBO.String() { serviceIdx = i break } @@ -73,20 +42,13 @@ func testDubborpc(clientConfig *examples.ClientConfig,userKey string) { // Create request method = string("GetUser") - conf = registry.ServiceConfig{ + conf = service.ServiceConfig{ Group: clientConfig.Service_List[serviceIdx].Group, Protocol: public.CodecType(public.CODECTYPE_DUBBO).String(), Version: clientConfig.Service_List[serviceIdx].Version, Service: clientConfig.Service_List[serviceIdx].Service, } - serviceURL, err = clientRegistry.Filter(conf, 1) - if err != nil { - log.Error("registry.Filter(conf:%#v) = error:%s", conf, jerrors.ErrorStack(err)) - return - } - log.Debug("got serviceURL: %s", serviceURL) - // registry pojo hessian.RegisterJavaEnum(Gender(MAN)) hessian.RegisterJavaEnum(Gender(WOMAN)) @@ -94,7 +56,8 @@ func testDubborpc(clientConfig *examples.ClientConfig,userKey string) { hessian.RegisterPOJO(&Response{}) user = new(DubboUser) - err = cltD.Call(serviceURL.Ip+":"+serviceURL.Port, *serviceURL, method, []interface{}{userKey}, user, dubbo.WithCallRequestTimeout(10e9), dubbo.WithCallResponseTimeout(10e9), dubbo.WithCallSerialID(dubbo.S_Default)) + defer clientInvoker.DubboClient.Close() + err = clientInvoker.DubboCall(1, &conf, method, []interface{}{userKey}, user, dubbo.WithCallRequestTimeout(10e9), dubbo.WithCallResponseTimeout(10e9), dubbo.WithCallSerialID(dubbo.S_Default)) // Call service if err != nil { log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err)) diff --git a/examples/dubbo/go-client/profiles/dev/client.yml b/examples/dubbo/go-client/profiles/dev/client.yml index d3332bade9c3b305f509de520584f2993b3cdae6..de296c7c0574bf3c37d35f4c715854530941d5a6 100644 --- a/examples/dubbo/go-client/profiles/dev/client.yml +++ b/examples/dubbo/go-client/profiles/dev/client.yml @@ -13,6 +13,7 @@ connect_timeout : "100ms" selector : "cache" selector_ttl : "10m" registry : "zookeeper" +client_load_balance: "round_robin" # application config application_config: @@ -23,7 +24,7 @@ application_config: owner : "ZX" environment : "dev" -registry_config: +zk_registry_config: timeout : "3s" address: - "127.0.0.1:2181" diff --git a/examples/jsonrpc/go-client/app/client.go b/examples/jsonrpc/go-client/app/client.go index bb3e853e2d51c2564ab3e490662aa929c47f2e21..28f4f267e0881f63d87734fcfd3b72aded2932c6 100644 --- a/examples/jsonrpc/go-client/app/client.go +++ b/examples/jsonrpc/go-client/app/client.go @@ -115,7 +115,7 @@ func initClient(clientConfig *examples.ClientConfig) { ) clientInvoker, err = invoker.NewInvoker(clientRegistry, - invoker.WithClientTransport(clt), + invoker.WithHttpClient(clt), invoker.WithLBSelector(configClientLB)) } diff --git a/examples/jsonrpc/go-client/app/test.go b/examples/jsonrpc/go-client/app/test.go index c649e6ea7fdf212afb32dba373b05588007f3e96..fb3d008b04d39b02888e46f605907de7f75265fb 100644 --- a/examples/jsonrpc/go-client/app/test.go +++ b/examples/jsonrpc/go-client/app/test.go @@ -3,7 +3,6 @@ package main import ( "context" "fmt" - "github.com/dubbo/dubbo-go/client" _ "net/http/pprof" ) @@ -13,6 +12,7 @@ import ( import ( "github.com/dubbo/dubbo-go/examples" + "github.com/dubbo/dubbo-go/jsonrpc" "github.com/dubbo/dubbo-go/public" "github.com/dubbo/dubbo-go/service" ) @@ -25,7 +25,7 @@ func testJsonrpc(clientConfig *examples.ClientConfig, userKey string, method str user *JsonRPCUser ctx context.Context conf service.ServiceConfig - req client.Request + req jsonrpc.Request ) serviceIdx = -1 @@ -49,7 +49,7 @@ func testJsonrpc(clientConfig *examples.ClientConfig, userKey string, method str Service: clientConfig.Service_List[serviceIdx].Service, } // Attention the last parameter : []UserKey{userKey} - req = clientInvoker.Transport.NewRequest(conf, method, []string{userKey}) + req = clientInvoker.HttpClient.NewRequest(conf, method, []string{userKey}) ctx = context.WithValue(context.Background(), public.DUBBOGO_CTX_KEY, map[string]string{ "X-Proxy-Id": "dubbogo", @@ -59,7 +59,7 @@ func testJsonrpc(clientConfig *examples.ClientConfig, userKey string, method str user = new(JsonRPCUser) - err = clientInvoker.Call(ctx, 1, &conf, req, user) + err = clientInvoker.HttpCall(ctx, 1, &conf, req, user) if err != nil { panic(err) } else { diff --git a/jsonrpc/http.go b/jsonrpc/http.go index fb6050cf6024896092a990aa1b5a95e6232d17a3..053a135bd0f9ba2d8793290776933e83d42f6df2 100644 --- a/jsonrpc/http.go +++ b/jsonrpc/http.go @@ -5,7 +5,6 @@ import ( "bytes" "context" "fmt" - "github.com/dubbo/dubbo-go/client" "github.com/dubbo/dubbo-go/service" "io/ioutil" "net" @@ -25,6 +24,30 @@ import ( "github.com/dubbo/dubbo-go/public" ) +////////////////////////////////////////////// +// Request +////////////////////////////////////////////// + +type Request struct { + ID int64 + group string + protocol string + version string + service string + method string + args interface{} + contentType string +} + +func (r *Request) ServiceConfig() service.ServiceConfigIf { + return &service.ServiceConfig{ + Protocol: r.protocol, + Service: r.service, + Group: r.group, + Version: r.version, + } +} + ////////////////////////////////////////////// // HTTP Client ////////////////////////////////////////////// @@ -63,19 +86,19 @@ func NewHTTPClient(opt *HTTPOptions) *HTTPClient { } } -func (c *HTTPClient) NewRequest(conf service.ServiceConfig, method string, args interface{}) client.Request { - return client.Request{ +func (c *HTTPClient) NewRequest(conf service.ServiceConfig, method string, args interface{}) Request { + return Request{ ID: atomic.AddInt64(&c.ID, 1), - Group: conf.Group, - Protocol: conf.Protocol, - Version: conf.Version, - Service: conf.Service, - Method: method, - Args: args, + group: conf.Group, + protocol: conf.Protocol, + version: conf.Version, + service: conf.Service, + method: method, + args: args, } } -func (c *HTTPClient) Call(ctx context.Context, service *service.ServiceURL, req client.Request, rsp interface{}) error { +func (c *HTTPClient) Call(ctx context.Context, service *service.ServiceURL, req Request, rsp interface{}) error { // header httpHeader := http.Header{} httpHeader.Set("Content-Type", "application/json") @@ -99,8 +122,8 @@ func (c *HTTPClient) Call(ctx context.Context, service *service.ServiceURL, req codec := newJsonClientCodec() codecData := CodecData{ ID: req.ID, - Method: req.Method, - Args: req.Args, + Method: req.method, + Args: req.args, } reqBody, err := codec.Write(&codecData) if err != nil {