Skip to content
Snippets Groups Projects
Commit 0c94049d authored by cvictory's avatar cvictory
Browse files

Merge branch 'develop' of github.com:apache/dubbo-go into notify_all

parents 5f31e3a7 ab44015e
No related branches found
No related tags found
No related merge requests found
<!-- Thanks for sending a pull request! <!-- Thanks for sending a pull request!
Read https://github.com/apache/dubbo-go/blob/master/contributing.md before commit pull request.
--> -->
**What this PR does**: **What this PR does**:
......
...@@ -19,10 +19,6 @@ jobs: ...@@ -19,10 +19,6 @@ jobs:
os: os:
- ubuntu-latest - ubuntu-latest
env:
DING_TOKEN: "6374f1bf8d4f23cde81d4a4b8c1f0bc98cc92b5151ca938ab938d3d7f4230fc4"
DING_SIGN: "SECa98677289194bb0e5caec3051301d06515750ff1bd2f932a4704298afb2e0ae6"
steps: steps:
- name: Set up Go 1.x - name: Set up Go 1.x
...@@ -68,45 +64,4 @@ jobs: ...@@ -68,45 +64,4 @@ jobs:
chmod +x integrate_test.sh && ./integrate_test.sh chmod +x integrate_test.sh && ./integrate_test.sh
- name: Coverage - name: Coverage
run: bash <(curl -s https://codecov.io/bash) run: bash <(curl -s https://codecov.io/bash)
\ No newline at end of file
# Because the contexts of push and PR are different, there are two Notify.
# Notifications are triggered only in the apache/dubbo-go repository.
- name: DingTalk Message Notify only Push
uses: zcong1993/actions-ding@v3.0.1
# Whether job is successful or not, always () is always true.
if: |
always() &&
github.event_name == 'push' &&
github.repository == 'apache/dubbo-go'
with:
# DingDing bot token
dingToken: ${{ env.DING_TOKEN }}
secret: ${{ env.DING_SIGN }}
# Post Body to send
body: |
{
"msgtype": "markdown",
"markdown": {
"title": "Github Actions",
"text": "## Github Actions \n - name: CI \n - repository: ${{ github.repository }} \n - trigger: ${{ github.actor }} \n - event: ${{ github.event_name }} \n - ref: ${{ github.ref }} \n - status: [${{ job.status }}](https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}) \n - environment: ${{ runner.os }} \n - SHA: [${{ github.sha }}](${{ github.event.compare }})"
}
}
- name: DingTalk Message Notify only PR
uses: zcong1993/actions-ding@v3.0.1
if: |
always() &&
github.event_name == 'pull_request' &&
github.repository == 'apache/dubbo-go'
with:
dingToken: ${{ env.DING_TOKEN }}
secret: ${{ env.DING_SIGN }}
body: |
{
"msgtype": "markdown",
"markdown": {
"title": "Github Actions",
"text": "## Github Actions \n - name: CI \n - repository: ${{ github.repository }} \n - pr_title: ${{ github.event.pull_request.title }} \n - trigger: ${{ github.actor }} \n - event: ${{ github.event_name }} \n - ref: [${{ github.ref }}](${{ github.event.pull_request._links.html.href }}) \n - status: [${{ job.status }}](https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}) \n - environment: ${{ runner.os }} \n > SHA: [${{ github.sha }}](${{ github.event.pull_request._links.html.href }})"
}
}
...@@ -36,8 +36,8 @@ xcopy /f "%zkJar%" "cluster/router/chain/zookeeper-4unittest/contrib/fatjar/" ...@@ -36,8 +36,8 @@ xcopy /f "%zkJar%" "cluster/router/chain/zookeeper-4unittest/contrib/fatjar/"
md cluster\router\condition\zookeeper-4unittest\contrib\fatjar md cluster\router\condition\zookeeper-4unittest\contrib\fatjar
xcopy /f "%zkJar%" "cluster/router/condition/zookeeper-4unittest/contrib/fatjar/" xcopy /f "%zkJar%" "cluster/router/condition/zookeeper-4unittest/contrib/fatjar/"
mkdir -p cluster/router/tag/zookeeper-4unittest/contrib/fatjar md cluster/router/tag/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} cluster/router/tag/zookeeper-4unittest/contrib/fatjar xcopy /f "%zkJar%" "cluster/router/tag/zookeeper-4unittest/contrib/fatjar/"
md metadata\report\zookeeper\zookeeper-4unittest\contrib\fatjar md metadata\report\zookeeper\zookeeper-4unittest\contrib\fatjar
xcopy /f "%zkJar%" "metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar/" xcopy /f "%zkJar%" "metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar/"
\ No newline at end of file
...@@ -26,6 +26,7 @@ import ( ...@@ -26,6 +26,7 @@ import (
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"sync"
) )
import ( import (
...@@ -77,10 +78,12 @@ func (t RoleType) Role() string { ...@@ -77,10 +78,12 @@ func (t RoleType) Role() string {
} }
type baseUrl struct { type baseUrl struct {
Protocol string Protocol string
Location string // ip+port Location string // ip+port
Ip string Ip string
Port string Port string
//url.Values is not safe map, add to avoid concurrent map read and map write error
paramsLock sync.RWMutex
params url.Values params url.Values
PrimitiveURL string PrimitiveURL string
} }
...@@ -393,16 +396,16 @@ func (c URL) Service() string { ...@@ -393,16 +396,16 @@ func (c URL) Service() string {
} }
// AddParam will add the key-value pair // AddParam will add the key-value pair
// Not thread-safe
// think twice before using it.
func (c *URL) AddParam(key string, value string) { func (c *URL) AddParam(key string, value string) {
c.paramsLock.Lock()
defer c.paramsLock.Unlock()
c.params.Add(key, value) c.params.Add(key, value)
} }
// AddParamAvoidNil will add key-value pair // AddParamAvoidNil will add key-value pair
// Not thread-safe
// think twice before using it.
func (c *URL) AddParamAvoidNil(key string, value string) { func (c *URL) AddParamAvoidNil(key string, value string) {
c.paramsLock.Lock()
defer c.paramsLock.Unlock()
if c.params == nil { if c.params == nil {
c.params = url.Values{} c.params = url.Values{}
} }
...@@ -411,16 +414,17 @@ func (c *URL) AddParamAvoidNil(key string, value string) { ...@@ -411,16 +414,17 @@ func (c *URL) AddParamAvoidNil(key string, value string) {
} }
// SetParam will put the key-value pair into url // SetParam will put the key-value pair into url
// it's not thread safe.
// think twice before you want to use this method
// usually it should only be invoked when you want to initialized an url // usually it should only be invoked when you want to initialized an url
func (c *URL) SetParam(key string, value string) { func (c *URL) SetParam(key string, value string) {
c.paramsLock.Lock()
defer c.paramsLock.Unlock()
c.params.Set(key, value) c.params.Set(key, value)
} }
// RangeParams will iterate the params // RangeParams will iterate the params
// it's not thread-safe
func (c *URL) RangeParams(f func(key, value string) bool) { func (c *URL) RangeParams(f func(key, value string) bool) {
c.paramsLock.RLock()
defer c.paramsLock.RUnlock()
for k, v := range c.params { for k, v := range c.params {
if !f(k, v[0]) { if !f(k, v[0]) {
break break
...@@ -430,6 +434,8 @@ func (c *URL) RangeParams(f func(key, value string) bool) { ...@@ -430,6 +434,8 @@ func (c *URL) RangeParams(f func(key, value string) bool) {
// GetParam gets value by key // GetParam gets value by key
func (c URL) GetParam(s string, d string) string { func (c URL) GetParam(s string, d string) string {
c.paramsLock.RLock()
defer c.paramsLock.RUnlock()
r := c.params.Get(s) r := c.params.Get(s)
if len(r) == 0 { if len(r) == 0 {
r = d r = d
......
...@@ -89,15 +89,15 @@ func TestDubboInvokerInvoke(t *testing.T) { ...@@ -89,15 +89,15 @@ func TestDubboInvokerInvoke(t *testing.T) {
// destroy // destroy
lock.Lock() lock.Lock()
defer lock.Unlock()
proto.Destroy() proto.Destroy()
lock.Unlock()
} }
func InitTest(t *testing.T) (protocol.Protocol, common.URL) { func InitTest(t *testing.T) (protocol.Protocol, common.URL) {
hessian.RegisterPOJO(&User{}) hessian.RegisterPOJO(&User{})
methods, err := common.ServiceMap.Register("", "dubbo", &UserProvider{}) methods, err := common.ServiceMap.Register("com.ikurento.user.UserProvider", "dubbo", &UserProvider{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, "GetBigPkg,GetUser,GetUser0,GetUser1,GetUser2,GetUser3,GetUser4,GetUser5,GetUser6", methods) assert.Equal(t, "GetBigPkg,GetUser,GetUser0,GetUser1,GetUser2,GetUser3,GetUser4,GetUser5,GetUser6", methods)
...@@ -176,10 +176,9 @@ type ( ...@@ -176,10 +176,9 @@ type (
// size:4801228 // size:4801228
func (u *UserProvider) GetBigPkg(ctx context.Context, req []interface{}, rsp *User) error { func (u *UserProvider) GetBigPkg(ctx context.Context, req []interface{}, rsp *User) error {
argBuf := new(bytes.Buffer) argBuf := new(bytes.Buffer)
for i := 0; i < 400; i++ { for i := 0; i < 800; i++ {
// use chinese for test // use chinese for test
argBuf.WriteString("击鼓其镗,踊跃用兵。土国城漕,我独南行。从孙子仲,平陈与宋。不我以归,忧心有忡。爰居爰处?爰丧其马?于以求之?于林之下。死生契阔,与子成说。执子之手,与子偕老。于嗟阔兮,不我活兮。于嗟洵兮,不我信兮。") argBuf.WriteString("击鼓其镗,踊跃用兵。土国城漕,我独南行。从孙子仲,平陈与宋。不我以归,忧心有忡。爰居爰处?爰丧其马?于以求之?于林之下。死生契阔,与子成说。执子之手,与子偕老。于嗟阔兮,不我活兮。于嗟洵兮,不我信兮。")
argBuf.WriteString("击鼓其镗,踊跃用兵。土国城漕,我独南行。从孙子仲,平陈与宋。不我以归,忧心有忡。爰居爰处?爰丧其马?于以求之?于林之下。死生契阔,与子成说。执子之手,与子偕老。于嗟阔兮,不我活兮。于嗟洵兮,不我信兮。")
} }
rsp.Id = argBuf.String() rsp.Id = argBuf.String()
rsp.Name = argBuf.String() rsp.Name = argBuf.String()
......
...@@ -36,7 +36,8 @@ import ( ...@@ -36,7 +36,8 @@ import (
) )
const ( const (
registryConnDelay = 3 registryConnDelay = 3
registryDestroyDefaultTimeout = time.Second * 3
) )
func init() { func init() {
...@@ -187,5 +188,25 @@ func (r *consulRegistry) IsAvailable() bool { ...@@ -187,5 +188,25 @@ func (r *consulRegistry) IsAvailable() bool {
// Destroy consul registry center // Destroy consul registry center
func (r *consulRegistry) Destroy() { func (r *consulRegistry) Destroy() {
if r.URL != nil {
done := make(chan struct{}, 1)
go func() {
defer func() {
if e := recover(); e != nil {
logger.Errorf("consulRegistry destory with panic: %v", e)
}
done <- struct{}{}
}()
if err := r.UnRegister(*r.URL); err != nil {
logger.Errorf("consul registry unregister with err: %s", err.Error())
}
}()
select {
case <-done:
logger.Infof("consulRegistry unregister done")
case <-time.After(registryDestroyDefaultTimeout):
logger.Errorf("consul unregister timeout")
}
}
close(r.done) close(r.done)
} }
...@@ -55,3 +55,19 @@ func (suite *consulRegistryTestSuite) testSubscribe() { ...@@ -55,3 +55,19 @@ func (suite *consulRegistryTestSuite) testSubscribe() {
assert.NoError(suite.t, err) assert.NoError(suite.t, err)
suite.listener = listener suite.listener = listener
} }
func (suite *consulRegistryTestSuite) testDestroy() {
consumerRegistryUrl := newConsumerRegistryUrl(registryHost, registryPort)
consumerRegistry, _ := newConsulRegistry(consumerRegistryUrl)
consulRegistryImp := consumerRegistry.(*consulRegistry)
assert.True(suite.t, consulRegistryImp.IsAvailable())
consulRegistryImp.Destroy()
assert.False(suite.t, consulRegistryImp.IsAvailable())
consumerRegistry, _ = newConsulRegistry(consumerRegistryUrl)
consulRegistryImp = consumerRegistry.(*consulRegistry)
consulRegistryImp.URL = nil
assert.True(suite.t, consulRegistryImp.IsAvailable())
consulRegistryImp.Destroy()
assert.False(suite.t, consulRegistryImp.IsAvailable())
}
...@@ -163,6 +163,7 @@ func test1(t *testing.T) { ...@@ -163,6 +163,7 @@ func test1(t *testing.T) {
suite.testListener(remoting.EventTypeAdd) suite.testListener(remoting.EventTypeAdd)
suite.testUnregister() suite.testUnregister()
suite.testListener(remoting.EventTypeDel) suite.testListener(remoting.EventTypeDel)
suite.testDestroy()
} }
// subscribe -> register -> unregister // subscribe -> register -> unregister
...@@ -183,6 +184,7 @@ func test2(t *testing.T) { ...@@ -183,6 +184,7 @@ func test2(t *testing.T) {
suite.testListener(remoting.EventTypeAdd) suite.testListener(remoting.EventTypeAdd)
suite.testUnregister() suite.testUnregister()
suite.testListener(remoting.EventTypeDel) suite.testListener(remoting.EventTypeDel)
suite.testDestroy()
} }
func TestConsulRegistry(t *testing.T) { func TestConsulRegistry(t *testing.T) {
......
...@@ -288,6 +288,7 @@ func getNacosConfig(url *common.URL) (map[string]interface{}, error) { ...@@ -288,6 +288,7 @@ func getNacosConfig(url *common.URL) (map[string]interface{}, error) {
clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "") clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "")
clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "") clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "")
clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "") clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "")
clientConfig.NamespaceId = url.GetParam(constant.NACOS_NAMESPACE_ID, "")
clientConfig.NotLoadCacheAtStart = true clientConfig.NotLoadCacheAtStart = true
configMap["clientConfig"] = clientConfig configMap["clientConfig"] = clientConfig
......
...@@ -81,19 +81,26 @@ func newGettyRPCClientConn(pool *gettyRPCClientPool, addr string) (*gettyRPCClie ...@@ -81,19 +81,26 @@ func newGettyRPCClientConn(pool *gettyRPCClientPool, addr string) (*gettyRPCClie
gettyClient: gettyClient, gettyClient: gettyClient,
} }
go c.gettyClient.RunEventLoop(c.newSession) go c.gettyClient.RunEventLoop(c.newSession)
idx := 1 idx := 1
times := int(pool.rpcClient.opts.ConnectTimeout / 1e6) start := time.Now()
connectTimeout := pool.rpcClient.opts.ConnectTimeout
for { for {
idx++ idx++
if c.isAvailable() { if c.isAvailable() {
break break
} }
if idx > times { if time.Now().Sub(start) > connectTimeout {
c.gettyClient.Close() c.gettyClient.Close()
return nil, perrors.New(fmt.Sprintf("failed to create client connection to %s in %f seconds", addr, float32(times)/1000)) return nil, perrors.New(fmt.Sprintf("failed to create client connection to %s in %s", addr, connectTimeout))
}
interval := time.Millisecond * time.Duration(idx)
if interval > time.Duration(100e6) {
interval = 100e6 // 100 ms
} }
time.Sleep(time.Millisecond * time.Duration(times)) time.Sleep(interval)
} }
logger.Debug("client init ok") logger.Debug("client init ok")
c.updateActive(time.Now().Unix()) c.updateActive(time.Now().Unix())
......
github.com/apache/dubbo-go-hessian2 v1.6.0-rc1.0.20200906044240-6c1fb5c3bd44/go.mod h1:7rEw9guWABQa6Aqb8HeZcsYPHsOS7XT1qtJvkmI6c5w= github.com/apache/dubbo-go-hessian2 v1.6.0-rc1.0.20200906044240-6c1fb5c3bd44/go.mod h1:7rEw9guWABQa6Aqb8HeZcsYPHsOS7XT1qtJvkmI6c5w=
github.com/apache/dubbo-go-hessian2 v1.7.0/go.mod h1:7rEw9guWABQa6Aqb8HeZcsYPHsOS7XT1qtJvkmI6c5w=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
......
github.com/apache/dubbo-go-hessian2 v1.6.0-rc1.0.20200906044240-6c1fb5c3bd44/go.mod h1:7rEw9guWABQa6Aqb8HeZcsYPHsOS7XT1qtJvkmI6c5w= github.com/apache/dubbo-go-hessian2 v1.6.0-rc1.0.20200906044240-6c1fb5c3bd44/go.mod h1:7rEw9guWABQa6Aqb8HeZcsYPHsOS7XT1qtJvkmI6c5w=
github.com/apache/dubbo-go-hessian2 v1.7.0/go.mod h1:7rEw9guWABQa6Aqb8HeZcsYPHsOS7XT1qtJvkmI6c5w=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment