Skip to content
Snippets Groups Projects
Commit 0ec3556d authored by fangyincheng's avatar fangyincheng
Browse files

Add: support complete attachments

parent a1b5db0a
No related branches found
No related tags found
No related merge requests found
Showing
with 190 additions and 57 deletions
......@@ -4,6 +4,7 @@
*.dll
*.so
*.dylib
*.jar
# Test binary, build with `go test -c`
*.test
......
......@@ -21,4 +21,6 @@ const (
CONF_CONSUMER_FILE_PATH = "CONF_CONSUMER_FILE_PATH"
CONF_PROVIDER_FILE_PATH = "CONF_PROVIDER_FILE_PATH"
APP_LOG_CONF_FILE string = "APP_LOG_CONF_FILE"
DUBBO_VERSION_KEY = "dubbo"
DUBBO_VERSION = "2.0.2"
)
......@@ -69,8 +69,9 @@ type ProxyInvoker struct {
func (pi *ProxyInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
result := &protocol.RPCResult{}
result.SetAttachments(invocation.Attachments())
url := pi.GetUrl().SubURL
url := pi.GetUrl()
methodName := invocation.MethodName()
proto := url.Protocol
......
......@@ -95,18 +95,18 @@ func println(format string, args ...interface{}) {
}
func test() {
//println("\n\n\necho")
//res, err := userProvider.Echo(context.TODO(), "OK")
//if err != nil {
// panic(err)
//}
//println("res: %v\n", res)
println("\n\n\necho")
res, err := userProvider.Echo(context.TODO(), "OK")
if err != nil {
panic(err)
}
println("res: %v\n", res)
time.Sleep(3e9)
println("\n\n\nstart to test dubbo")
user := &User{}
err := userProvider.GetUser(context.TODO(), []interface{}{"A003"}, user)
err = userProvider.GetUser(context.TODO(), []interface{}{"A003"}, user)
if err != nil {
panic(err)
}
......
......@@ -5,6 +5,7 @@ require (
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e // indirect
github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.13+incompatible
......@@ -47,3 +48,5 @@ require (
google.golang.org/grpc v1.22.1
gopkg.in/yaml.v2 v2.2.2
)
replace github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 => ./../hessian2
......@@ -151,30 +151,30 @@ func NewClient(opt Options) *Client {
}
// call one way
func (c *Client) CallOneway(addr string, svcUrl common.URL, method string, args interface{}) error {
func (c *Client) CallOneway(addr string, svcUrl common.URL, method string, args interface{}, atta map[string]string) error {
return perrors.WithStack(c.call(CT_OneWay, addr, svcUrl, method, args, nil, nil))
return perrors.WithStack(c.call(CT_OneWay, addr, svcUrl, method, args, nil, nil, atta))
}
// if @reply is nil, the transport layer will get the response without notify the invoker.
func (c *Client) Call(addr string, svcUrl common.URL, method string, args, reply interface{}) error {
func (c *Client) Call(addr string, svcUrl common.URL, method string, args, reply interface{}, atta map[string]string) error {
ct := CT_TwoWay
if reply == nil {
ct = CT_OneWay
}
return perrors.WithStack(c.call(ct, addr, svcUrl, method, args, reply, nil))
return perrors.WithStack(c.call(ct, addr, svcUrl, method, args, reply, nil, atta))
}
func (c *Client) AsyncCall(addr string, svcUrl common.URL, method string, args interface{},
callback AsyncCallback, reply interface{}) error {
callback AsyncCallback, reply interface{}, atta map[string]string) error {
return perrors.WithStack(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback))
return perrors.WithStack(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback, atta))
}
func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string,
args, reply interface{}, callback AsyncCallback) error {
args, reply interface{}, callback AsyncCallback, atta map[string]string) error {
p := &DubboPackage{}
p.Service.Path = strings.TrimPrefix(svcUrl.Path, "/")
......@@ -183,7 +183,7 @@ func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string
p.Service.Method = method
p.Service.Timeout = c.opts.RequestTimeout
p.Header.SerialID = byte(S_Dubbo)
p.Body = args
p.Body = hessian.NewRequest(args, atta)
var rsp *PendingResponse
if ct != CT_OneWay {
......@@ -259,6 +259,7 @@ func (c *Client) transfer(session getty.Session, pkg *DubboPackage,
if pkg == nil {
pkg = &DubboPackage{}
pkg.Body = hessian.NewRequest([]interface{}{}, nil)
pkg.Body = []interface{}{}
pkg.Header.Type = hessian.PackageHeartbeat
pkg.Header.SerialID = byte(S_Dubbo)
......
......@@ -51,7 +51,7 @@ func TestClient_CallOneway(t *testing.T) {
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
//user := &User{}
err := c.CallOneway("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"})
err := c.CallOneway("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil)
assert.NoError(t, err)
// destroy
......@@ -77,50 +77,50 @@ func TestClient_Call(t *testing.T) {
)
user = &User{}
err = c.Call("127.0.0.1:20000", url, "GetBigPkg", []interface{}{nil}, user)
err = c.Call("127.0.0.1:20000", url, "GetBigPkg", []interface{}{nil}, user, nil)
assert.NoError(t, err)
assert.NotEqual(t, "", user.Id)
assert.NotEqual(t, "", user.Name)
user = &User{}
err = c.Call("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, user)
err = c.Call("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, user, nil)
assert.NoError(t, err)
assert.Equal(t, User{Id: "1", Name: "username"}, *user)
user = &User{}
err = c.Call("127.0.0.1:20000", url, "GetUser0", []interface{}{"1", nil, "username"}, user)
err = c.Call("127.0.0.1:20000", url, "GetUser0", []interface{}{"1", nil, "username"}, user, nil)
assert.NoError(t, err)
assert.Equal(t, User{Id: "1", Name: "username"}, *user)
err = c.Call("127.0.0.1:20000", url, "GetUser1", []interface{}{}, user)
err = c.Call("127.0.0.1:20000", url, "GetUser1", []interface{}{}, user, nil)
assert.NoError(t, err)
err = c.Call("127.0.0.1:20000", url, "GetUser2", []interface{}{}, user)
err = c.Call("127.0.0.1:20000", url, "GetUser2", []interface{}{}, user, nil)
assert.EqualError(t, err, "error")
user2 := []interface{}{}
err = c.Call("127.0.0.1:20000", url, "GetUser3", []interface{}{}, &user2)
err = c.Call("127.0.0.1:20000", url, "GetUser3", []interface{}{}, &user2, nil)
assert.NoError(t, err)
assert.Equal(t, &User{Id: "1", Name: "username"}, user2[0])
user2 = []interface{}{}
err = c.Call("127.0.0.1:20000", url, "GetUser4", []interface{}{[]interface{}{"1", "username"}}, &user2)
err = c.Call("127.0.0.1:20000", url, "GetUser4", []interface{}{[]interface{}{"1", "username"}}, &user2, nil)
assert.NoError(t, err)
assert.Equal(t, &User{Id: "1", Name: "username"}, user2[0])
user3 := map[interface{}]interface{}{}
err = c.Call("127.0.0.1:20000", url, "GetUser5", []interface{}{map[interface{}]interface{}{"id": "1", "name": "username"}}, &user3)
err = c.Call("127.0.0.1:20000", url, "GetUser5", []interface{}{map[interface{}]interface{}{"id": "1", "name": "username"}}, &user3, nil)
assert.NoError(t, err)
assert.NotNil(t, user3)
assert.Equal(t, &User{Id: "1", Name: "username"}, user3["key"])
user = &User{}
err = c.Call("127.0.0.1:20000", url, "GetUser6", []interface{}{0}, user)
err = c.Call("127.0.0.1:20000", url, "GetUser6", []interface{}{0}, user, nil)
assert.NoError(t, err)
assert.Equal(t, User{Id: "", Name: ""}, *user)
user = &User{}
err = c.Call("127.0.0.1:20000", url, "GetUser6", []interface{}{1}, user)
err = c.Call("127.0.0.1:20000", url, "GetUser6", []interface{}{1}, user, nil)
assert.NoError(t, err)
assert.Equal(t, User{Id: "1", Name: ""}, *user)
......@@ -147,7 +147,7 @@ func TestClient_AsyncCall(t *testing.T) {
err := c.AsyncCall("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, func(response CallResponse) {
assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*User))
lock.Unlock()
}, user)
}, user, nil)
assert.NoError(t, err)
assert.Equal(t, User{}, *user)
......
......@@ -70,5 +70,5 @@ func TestDubboPackage_MarshalAndUnmarshal(t *testing.T) {
assert.Equal(t, "Method", pkgres.Body.([]interface{})[3])
assert.Equal(t, "Ljava/lang/String;", pkgres.Body.([]interface{})[4])
assert.Equal(t, []interface{}{"a"}, pkgres.Body.([]interface{})[5])
assert.Equal(t, map[interface{}]interface{}{"group": "", "interface": "Service", "path": "path", "timeout": "1000"}, pkgres.Body.([]interface{})[6])
assert.Equal(t, map[string]string{"group": "", "interface": "Service", "path": "path", "timeout": "1000"}, pkgres.Body.([]interface{})[6])
}
......@@ -56,6 +56,10 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
result protocol.RPCResult
)
if invocation.Attachments() != nil {
invocation.Attachments()[constant.DUBBO_VERSION_KEY] = constant.DUBBO_VERSION
}
inv := invocation.(*invocation_impl.RPCInvocation)
url := di.GetUrl()
// async
......@@ -66,15 +70,15 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
}
if async {
if callBack, ok := inv.CallBack().(func(response CallResponse)); ok {
result.Err = di.client.AsyncCall(url.Location, url, inv.MethodName(), inv.Arguments(), callBack, inv.Reply())
result.Err = di.client.AsyncCall(url.Location, url, inv.MethodName(), inv.Arguments(), callBack, inv.Reply(), inv.Attachments())
} else {
result.Err = di.client.CallOneway(url.Location, url, inv.MethodName(), inv.Arguments())
result.Err = di.client.CallOneway(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments())
}
} else {
if inv.Reply() == nil {
result.Err = Err_No_Reply
} else {
result.Err = di.client.Call(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Reply())
result.Err = di.client.Call(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Reply(), inv.Attachments())
}
}
if result.Err == nil {
......
......@@ -244,19 +244,15 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
}
invoker := exporter.(protocol.Exporter).GetInvoker()
if invoker != nil {
result := invoker.Invoke(invocation.NewRPCInvocation(p.Service.Method, p.Body.(map[string]interface{})["args"].([]interface{}), map[string]string{
constant.PATH_KEY: p.Service.Path,
constant.GROUP_KEY: p.Service.Group,
constant.INTERFACE_KEY: p.Service.Interface,
constant.VERSION_KEY: p.Service.Version,
}))
result := invoker.Invoke(invocation.NewRPCInvocation(p.Service.Method, p.Body.(map[string]interface{})["args"].([]interface{}),
p.Body.(map[string]interface{})["attachments"].(map[string]string)))
if err := result.Error(); err != nil {
p.Header.ResponseStatus = hessian.Response_OK
p.Body = err
p.Body = hessian.NewResponse(nil, err, result.Attachments())
} else {
res := result.Result()
p.Header.ResponseStatus = hessian.Response_OK
p.Body = res
p.Body = hessian.NewResponse(res, nil, result.Attachments())
}
}
......
......@@ -23,7 +23,7 @@ import (
)
import (
hessian "github.com/apache/dubbo-go-hessian2"
"github.com/apache/dubbo-go-hessian2"
"github.com/dubbogo/getty"
perrors "github.com/pkg/errors"
)
......@@ -118,7 +118,7 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface
if len(req) > 0 {
var dubboVersion, argsTypes string
var args []interface{}
var attachments map[interface{}]interface{}
var attachments map[string]string
if req[0] != nil {
dubboVersion = req[0].(string)
}
......@@ -138,14 +138,14 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface
args = req[5].([]interface{})
}
if req[6] != nil {
attachments = req[6].(map[interface{}]interface{})
attachments = req[6].(map[string]string)
}
pkg.Service.Interface = attachments[constant.INTERFACE_KEY].(string)
if pkg.Service.Path == "" && attachments[constant.PATH_KEY] != nil {
pkg.Service.Path = attachments[constant.PATH_KEY].(string)
pkg.Service.Interface = attachments[constant.INTERFACE_KEY]
if pkg.Service.Path == "" && attachments[constant.PATH_KEY] != "" {
pkg.Service.Path = attachments[constant.PATH_KEY]
}
if attachments[constant.GROUP_KEY] != nil {
pkg.Service.Group = attachments[constant.GROUP_KEY].(string)
if attachments[constant.GROUP_KEY] != "" {
pkg.Service.Group = attachments[constant.GROUP_KEY]
}
pkg.Body = map[string]interface{}{
"dubboVersion": dubboVersion,
......
As of Consul 0.7.0, the peers.json file is only used for recovery
after an outage. The format of this file depends on what the server has
configured for its Raft protocol version. Please see the agent configuration
page at https://www.consul.io/docs/agent/options.html#_raft_protocol for more
details about this parameter.
For Raft protocol version 2 and earlier, this should be formatted as a JSON
array containing the address and port of each Consul server in the cluster, like
this:
[
"10.1.0.1:8300",
"10.1.0.2:8300",
"10.1.0.3:8300"
]
For Raft protocol version 3 and later, this should be formatted as a JSON
array containing the node ID, address:port, and suffrage information of each
Consul server in the cluster, like this:
[
{
"id": "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
"address": "10.1.0.1:8300",
"non_voter": false
},
{
"id": "8b6dda82-3103-11e7-93ae-92361f002671",
"address": "10.1.0.2:8300",
"non_voter": false
},
{
"id": "97e17742-3103-11e7-93ae-92361f002671",
"address": "10.1.0.3:8300",
"non_voter": false
}
]
The "id" field is the node ID of the server. This can be found in the logs when
the server starts up, or in the "node-id" file inside the server's data
directory.
The "address" field is the address and port of the server.
The "non_voter" field controls whether the server is a non-voter, which is used
in some advanced Autopilot configurations, please see
https://www.consul.io/docs/guides/autopilot.html for more information. If
"non_voter" is omitted it will default to false, which is typical for most
clusters.
Under normal operation, the peers.json file will not be present.
When Consul starts for the first time, it will create this peers.info file and
delete any existing peers.json file so that recovery doesn't occur on the first
startup.
Once this peers.info file is present, any peers.json file will be ingested at
startup, and will set the Raft peer configuration manually to recover from an
outage. It's crucial that all servers in the cluster are shut down before
creating the peers.json file, and that all servers receive the same
configuration. Once the peers.json file is successfully ingested and applied, it
will be deleted.
Please see https://www.consul.io/docs/guides/outage.html for more information.
File added
As of Consul 0.7.0, the peers.json file is only used for recovery
after an outage. The format of this file depends on what the server has
configured for its Raft protocol version. Please see the agent configuration
page at https://www.consul.io/docs/agent/options.html#_raft_protocol for more
details about this parameter.
For Raft protocol version 2 and earlier, this should be formatted as a JSON
array containing the address and port of each Consul server in the cluster, like
this:
[
"10.1.0.1:8300",
"10.1.0.2:8300",
"10.1.0.3:8300"
]
For Raft protocol version 3 and later, this should be formatted as a JSON
array containing the node ID, address:port, and suffrage information of each
Consul server in the cluster, like this:
[
{
"id": "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
"address": "10.1.0.1:8300",
"non_voter": false
},
{
"id": "8b6dda82-3103-11e7-93ae-92361f002671",
"address": "10.1.0.2:8300",
"non_voter": false
},
{
"id": "97e17742-3103-11e7-93ae-92361f002671",
"address": "10.1.0.3:8300",
"non_voter": false
}
]
The "id" field is the node ID of the server. This can be found in the logs when
the server starts up, or in the "node-id" file inside the server's data
directory.
The "address" field is the address and port of the server.
The "non_voter" field controls whether the server is a non-voter, which is used
in some advanced Autopilot configurations, please see
https://www.consul.io/docs/guides/autopilot.html for more information. If
"non_voter" is omitted it will default to false, which is typical for most
clusters.
Under normal operation, the peers.json file will not be present.
When Consul starts for the first time, it will create this peers.info file and
delete any existing peers.json file so that recovery doesn't occur on the first
startup.
Once this peers.info file is present, any peers.json file will be ingested at
startup, and will set the Raft peer configuration manually to recover from an
outage. It's crucial that all servers in the cluster are shut down before
creating the peers.json file, and that all servers receive the same
configuration. Once the peers.json file is successfully ingested and applied, it
will be deleted.
Please see https://www.consul.io/docs/guides/outage.html for more information.
File added
......@@ -18,6 +18,7 @@
package protocol
import (
"github.com/apache/dubbo-go/common/proxy/proxy_factory"
"sync"
)
......@@ -183,23 +184,17 @@ func GetProtocol() protocol.Protocol {
type wrappedInvoker struct {
invoker protocol.Invoker
url common.URL
protocol.BaseInvoker
}
func newWrappedInvoker(invoker protocol.Invoker, url common.URL) *wrappedInvoker {
return &wrappedInvoker{
invoker: invoker,
url: url,
BaseInvoker: *protocol.NewBaseInvoker(common.URL{}),
BaseInvoker: *protocol.NewBaseInvoker(url),
}
}
func (ivk *wrappedInvoker) GetUrl() common.URL {
return ivk.url
}
func (ivk *wrappedInvoker) getInvoker() protocol.Invoker {
return ivk.invoker
}
func (ivk *wrappedInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
// get right url
ivk.invoker.(*proxy_factory.ProxyInvoker).BaseInvoker = *protocol.NewBaseInvoker(ivk.GetUrl())
return ivk.invoker.Invoke(invocation)
}
File added
File added
File added
File added
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment