Skip to content
Snippets Groups Projects
Commit c1f89b0b authored by vito.he's avatar vito.he
Browse files

rollback add transport interface in client & success to integrate to call dubbo~

parent 4d47771b
No related branches found
No related tags found
No related merge requests found
package client
import (
"context"
"github.com/dubbo/dubbo-go/service"
)
type Transport interface {
Call(ctx context.Context, url *service.ServiceURL, request Request, resp interface{}) error
NewRequest(conf service.ServiceConfig, method string, args interface{}) Request
}
//////////////////////////////////////////////
// Request
//////////////////////////////////////////////
type Request struct {
ID int64
Group string
Protocol string
Version string
Service string
Method string
Args interface{}
ContentType string
}
func (r *Request) ServiceConfig() service.ServiceConfigIf {
return &service.ServiceConfig{
Protocol: r.Protocol,
Service: r.Service,
Group: r.Group,
Version: r.Version,
}
}
//
//type Transport interface {
// Call(ctx context.Context, url *service.ServiceURL, request Request, resp interface{}) error
// NewRequest(conf service.ServiceConfig, method string, args interface{}) Request
//}
//
////////////////////////////////////////////////
//// Request
////////////////////////////////////////////////
//
//type Request struct {
// ID int64
// Group string
// Protocol string
// Version string
// Service string
// Method string
// Args interface{}
// ContentType string
//}
//
//func (r *Request) ServiceConfig() service.ServiceConfigIf {
// return &service.ServiceConfig{
// Protocol: r.Protocol,
// Service: r.Service,
// Group: r.Group,
// Version: r.Version,
// }
//}
......@@ -2,7 +2,7 @@ package invoker
import (
"context"
"github.com/dubbo/dubbo-go/client"
"github.com/dubbo/dubbo-go/dubbo"
"sync"
"time"
)
......@@ -14,6 +14,7 @@ import (
import (
"github.com/dubbo/dubbo-go/client/loadBalance"
"github.com/dubbo/dubbo-go/jsonrpc"
"github.com/dubbo/dubbo-go/registry"
"github.com/dubbo/dubbo-go/service"
)
......@@ -21,7 +22,9 @@ import (
type Options struct {
ServiceTTL time.Duration
selector loadBalance.Selector
Transport client.Transport
//TODO:we should provider a transport client interface
HttpClient *jsonrpc.HTTPClient
DubboClient *dubbo.Client
}
type Option func(*Options)
......@@ -31,9 +34,14 @@ func WithServiceTTL(ttl time.Duration) Option {
}
}
func WithClientTransport(client client.Transport) Option {
func WithHttpClient(client *jsonrpc.HTTPClient) Option {
return func(o *Options) {
o.Transport = client
o.HttpClient = client
}
}
func WithDubboClient(client *dubbo.Client) Option {
return func(o *Options) {
o.DubboClient = client
}
}
......@@ -59,8 +67,8 @@ func NewInvoker(registry registry.Registry, opts ...Option) (*Invoker, error) {
for _, opt := range opts {
opt(&options)
}
if options.Transport == nil {
return nil, jerrors.New("Must specify the client transport !")
if options.HttpClient == nil && options.DubboClient == nil {
return nil, jerrors.New("Must specify the transport client!")
}
invoker := &Invoker{
Options: options,
......@@ -152,7 +160,7 @@ func (ivk *Invoker) getService(serviceConf *service.ServiceConfig) (*ServiceArra
return newSvcArr, nil
}
func (ivk *Invoker) Call(ctx context.Context, reqId int64, serviceConf *service.ServiceConfig, req client.Request, resp interface{}) error {
func (ivk *Invoker) HttpCall(ctx context.Context, reqId int64, serviceConf *service.ServiceConfig, req jsonrpc.Request, resp interface{}) error {
serviceArray, err := ivk.getService(serviceConf)
if err != nil {
......@@ -165,10 +173,32 @@ func (ivk *Invoker) Call(ctx context.Context, reqId int64, serviceConf *service.
if err != nil {
return err
}
if err = ivk.Transport.Call(ctx, url, req, resp); err != nil {
if err = ivk.HttpClient.Call(ctx, url, req, resp); err != nil {
log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err))
return err
}
log.Info("response result:%s", resp)
return nil
}
func (ivk *Invoker) DubboCall(reqId int64, serviceConf *service.ServiceConfig, method string, args, reply interface{}, opts ...dubbo.CallOption) error {
serviceArray, err := ivk.getService(serviceConf)
if err != nil {
return err
}
if len(serviceArray.arr) == 0 {
return jerrors.New("cannot find svc " + serviceConf.String())
}
url, err := ivk.selector.Select(reqId, serviceArray)
if err != nil {
return err
}
//TODO:这里要改一下call方法改为接收指针类型
if err = ivk.DubboClient.Call(url.Ip+":"+url.Port, *url, method, args, reply, opts...); err != nil {
log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err))
return err
}
log.Info("response result:%s", reply)
return nil
}
......@@ -6,7 +6,7 @@ import (
)
type Selector interface {
Select(ID int64, array client.ServiceArrayIf) (*service.ServiceURL, error)
Select(ID int64, array client.ServiceArrayIf) (*service.ServiceURL, error)
}
//////////////////////////////////////////
......@@ -37,5 +37,3 @@ type Selector interface {
//
// return ""
//}
......@@ -16,7 +16,7 @@ import (
import (
"github.com/dubbo/dubbo-go/public"
"github.com/dubbo/dubbo-go/registry"
svc "github.com/dubbo/dubbo-go/service"
)
var (
......@@ -104,7 +104,7 @@ func NewClient(conf *ClientConfig) (*Client, error) {
}
// call one way
func (c *Client) CallOneway(addr string, svcUrl registry.ServiceURL, method string, args interface{}, opts ...CallOption) error {
func (c *Client) CallOneway(addr string, svcUrl svc.ServiceURL, method string, args interface{}, opts ...CallOption) error {
var copts CallOptions
for _, o := range opts {
......@@ -115,7 +115,7 @@ func (c *Client) CallOneway(addr string, svcUrl registry.ServiceURL, method stri
}
// if @reply is nil, the transport layer will get the response without notify the invoker.
func (c *Client) Call(addr string, svcUrl registry.ServiceURL, method string, args, reply interface{}, opts ...CallOption) error {
func (c *Client) Call(addr string, svcUrl svc.ServiceURL, method string, args, reply interface{}, opts ...CallOption) error {
var copts CallOptions
for _, o := range opts {
......@@ -130,7 +130,7 @@ func (c *Client) Call(addr string, svcUrl registry.ServiceURL, method string, ar
return jerrors.Trace(c.call(ct, addr, svcUrl, method, args, reply, nil, copts))
}
func (c *Client) AsyncCall(addr string, svcUrl registry.ServiceURL, method string, args interface{},
func (c *Client) AsyncCall(addr string, svcUrl svc.ServiceURL, method string, args interface{},
callback AsyncCallback, reply interface{}, opts ...CallOption) error {
var copts CallOptions
......@@ -141,7 +141,7 @@ func (c *Client) AsyncCall(addr string, svcUrl registry.ServiceURL, method strin
return jerrors.Trace(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback, copts))
}
func (c *Client) call(ct CallType, addr string, svcUrl registry.ServiceURL, method string,
func (c *Client) call(ct CallType, addr string, svcUrl svc.ServiceURL, method string,
args, reply interface{}, callback AsyncCallback, opts CallOptions) error {
if opts.RequestTimeout == 0 {
......
......@@ -129,8 +129,12 @@ func initClient(clientConfig *examples.ClientConfig) {
SessionName: "client",
},
})
if err != nil {
log.Error("hessian.NewClient(conf) = error:%s", jerrors.ErrorStack(err))
return
}
clientInvoker, err = invoker.NewInvoker(clientRegistry,
invoker.WithClientTransport(cltD),
invoker.WithDubboClient(cltD),
invoker.WithLBSelector(configClientLB))
}
......
......@@ -3,6 +3,7 @@ package main
import (
"fmt"
"github.com/dubbo/dubbo-go/examples"
"github.com/dubbo/dubbo-go/service"
"github.com/dubbogo/hessian2"
_ "net/http/pprof"
)
......@@ -16,53 +17,21 @@ import (
import (
"github.com/dubbo/dubbo-go/dubbo"
"github.com/dubbo/dubbo-go/public"
"github.com/dubbo/dubbo-go/registry"
)
func testDubborpc(clientConfig *examples.ClientConfig,userKey string) {
func testDubborpc(clientConfig *examples.ClientConfig, userKey string) {
var (
err error
service string
svc string
method string
serviceIdx int
user *DubboUser
conf registry.ServiceConfig
serviceURL *registry.ServiceURL
cltD *dubbo.Client
conf service.ServiceConfig
)
cltD, err = dubbo.NewClient(&dubbo.ClientConfig{
PoolSize: 64,
PoolTTL: 600,
ConnectionNum: 2, // 不能太大
FailFastTimeout: "5s",
SessionTimeout: "20s",
HeartbeatPeriod: "5s",
GettySessionParam: dubbo.GettySessionParam{
CompressEncoding: false, // 必须false
TcpNoDelay: true,
KeepAlivePeriod: "120s",
TcpRBufSize: 262144,
TcpKeepAlive: true,
TcpWBufSize: 65536,
PkgRQSize: 1024,
PkgWQSize: 512,
TcpReadTimeout: "1s",
TcpWriteTimeout: "5s",
WaitTimeout: "1s",
MaxMsgLen: 1024,
SessionName: "client",
},
})
if err != nil {
log.Error("hessian.NewClient(conf) = error:%s", jerrors.ErrorStack(err))
return
}
defer cltD.Close()
serviceIdx = -1
service = "com.ikurento.user.UserProvider"
svc = "com.ikurento.user.UserProvider"
for i := range clientConfig.Service_List {
if clientConfig.Service_List[i].Service == service && clientConfig.Service_List[i].Protocol == public.CODECTYPE_DUBBO.String() {
if clientConfig.Service_List[i].Service == svc && clientConfig.Service_List[i].Protocol == public.CODECTYPE_DUBBO.String() {
serviceIdx = i
break
}
......@@ -73,20 +42,13 @@ func testDubborpc(clientConfig *examples.ClientConfig,userKey string) {
// Create request
method = string("GetUser")
conf = registry.ServiceConfig{
conf = service.ServiceConfig{
Group: clientConfig.Service_List[serviceIdx].Group,
Protocol: public.CodecType(public.CODECTYPE_DUBBO).String(),
Version: clientConfig.Service_List[serviceIdx].Version,
Service: clientConfig.Service_List[serviceIdx].Service,
}
serviceURL, err = clientRegistry.Filter(conf, 1)
if err != nil {
log.Error("registry.Filter(conf:%#v) = error:%s", conf, jerrors.ErrorStack(err))
return
}
log.Debug("got serviceURL: %s", serviceURL)
// registry pojo
hessian.RegisterJavaEnum(Gender(MAN))
hessian.RegisterJavaEnum(Gender(WOMAN))
......@@ -94,7 +56,8 @@ func testDubborpc(clientConfig *examples.ClientConfig,userKey string) {
hessian.RegisterPOJO(&Response{})
user = new(DubboUser)
err = cltD.Call(serviceURL.Ip+":"+serviceURL.Port, *serviceURL, method, []interface{}{userKey}, user, dubbo.WithCallRequestTimeout(10e9), dubbo.WithCallResponseTimeout(10e9), dubbo.WithCallSerialID(dubbo.S_Default))
defer clientInvoker.DubboClient.Close()
err = clientInvoker.DubboCall(1, &conf, method, []interface{}{userKey}, user, dubbo.WithCallRequestTimeout(10e9), dubbo.WithCallResponseTimeout(10e9), dubbo.WithCallSerialID(dubbo.S_Default))
// Call service
if err != nil {
log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err))
......
......@@ -13,6 +13,7 @@ connect_timeout : "100ms"
selector : "cache"
selector_ttl : "10m"
registry : "zookeeper"
client_load_balance: "round_robin"
# application config
application_config:
......@@ -23,7 +24,7 @@ application_config:
owner : "ZX"
environment : "dev"
registry_config:
zk_registry_config:
timeout : "3s"
address:
- "127.0.0.1:2181"
......
......@@ -115,7 +115,7 @@ func initClient(clientConfig *examples.ClientConfig) {
)
clientInvoker, err = invoker.NewInvoker(clientRegistry,
invoker.WithClientTransport(clt),
invoker.WithHttpClient(clt),
invoker.WithLBSelector(configClientLB))
}
......
......@@ -3,7 +3,6 @@ package main
import (
"context"
"fmt"
"github.com/dubbo/dubbo-go/client"
_ "net/http/pprof"
)
......@@ -13,6 +12,7 @@ import (
import (
"github.com/dubbo/dubbo-go/examples"
"github.com/dubbo/dubbo-go/jsonrpc"
"github.com/dubbo/dubbo-go/public"
"github.com/dubbo/dubbo-go/service"
)
......@@ -25,7 +25,7 @@ func testJsonrpc(clientConfig *examples.ClientConfig, userKey string, method str
user *JsonRPCUser
ctx context.Context
conf service.ServiceConfig
req client.Request
req jsonrpc.Request
)
serviceIdx = -1
......@@ -49,7 +49,7 @@ func testJsonrpc(clientConfig *examples.ClientConfig, userKey string, method str
Service: clientConfig.Service_List[serviceIdx].Service,
}
// Attention the last parameter : []UserKey{userKey}
req = clientInvoker.Transport.NewRequest(conf, method, []string{userKey})
req = clientInvoker.HttpClient.NewRequest(conf, method, []string{userKey})
ctx = context.WithValue(context.Background(), public.DUBBOGO_CTX_KEY, map[string]string{
"X-Proxy-Id": "dubbogo",
......@@ -59,7 +59,7 @@ func testJsonrpc(clientConfig *examples.ClientConfig, userKey string, method str
user = new(JsonRPCUser)
err = clientInvoker.Call(ctx, 1, &conf, req, user)
err = clientInvoker.HttpCall(ctx, 1, &conf, req, user)
if err != nil {
panic(err)
} else {
......
......@@ -5,7 +5,6 @@ import (
"bytes"
"context"
"fmt"
"github.com/dubbo/dubbo-go/client"
"github.com/dubbo/dubbo-go/service"
"io/ioutil"
"net"
......@@ -25,6 +24,30 @@ import (
"github.com/dubbo/dubbo-go/public"
)
//////////////////////////////////////////////
// Request
//////////////////////////////////////////////
type Request struct {
ID int64
group string
protocol string
version string
service string
method string
args interface{}
contentType string
}
func (r *Request) ServiceConfig() service.ServiceConfigIf {
return &service.ServiceConfig{
Protocol: r.protocol,
Service: r.service,
Group: r.group,
Version: r.version,
}
}
//////////////////////////////////////////////
// HTTP Client
//////////////////////////////////////////////
......@@ -63,19 +86,19 @@ func NewHTTPClient(opt *HTTPOptions) *HTTPClient {
}
}
func (c *HTTPClient) NewRequest(conf service.ServiceConfig, method string, args interface{}) client.Request {
return client.Request{
func (c *HTTPClient) NewRequest(conf service.ServiceConfig, method string, args interface{}) Request {
return Request{
ID: atomic.AddInt64(&c.ID, 1),
Group: conf.Group,
Protocol: conf.Protocol,
Version: conf.Version,
Service: conf.Service,
Method: method,
Args: args,
group: conf.Group,
protocol: conf.Protocol,
version: conf.Version,
service: conf.Service,
method: method,
args: args,
}
}
func (c *HTTPClient) Call(ctx context.Context, service *service.ServiceURL, req client.Request, rsp interface{}) error {
func (c *HTTPClient) Call(ctx context.Context, service *service.ServiceURL, req Request, rsp interface{}) error {
// header
httpHeader := http.Header{}
httpHeader.Set("Content-Type", "application/json")
......@@ -99,8 +122,8 @@ func (c *HTTPClient) Call(ctx context.Context, service *service.ServiceURL, req
codec := newJsonClientCodec()
codecData := CodecData{
ID: req.ID,
Method: req.Method,
Args: req.Args,
Method: req.method,
Args: req.args,
}
reqBody, err := codec.Write(&codecData)
if err != nil {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment