Skip to content
Snippets Groups Projects
Commit 4e2382b4 authored by vito.he's avatar vito.he
Browse files

Merge remote-tracking branch 'apache/develop' into config_center

parents 52bb0e17 7636e965
No related branches found
No related tags found
No related merge requests found
# Release Notes
## 1.1.0
### New Features
- Support Java bigdecimal<https://github.com/apache/dubbo-go/pull/126>
- Support all JDK exceptions<https://github.com/apache/dubbo-go/pull/120>
- Support multi-version of service<https://github.com/apache/dubbo-go/pull/119>
- Allow user set custom params for registry<https://github.com/apache/dubbo-go/pull/117>
- Support zookeeper config center<https://github.com/apache/dubbo-go/pull/99>
- Failsafe/Failback Cluster Strategy<https://github.com/apache/dubbo-go/pull/136>;
### Enhancement
- Use time wheel instead of time.After to defeat timer object memory leakage<https://github.com/apache/dubbo-go/pull/130>
### Bugfixes
- Preventing dead loop when got zookeeper unregister event<https://github.com/apache/dubbo-go/pull/129>
- Delete ineffassign<https://github.com/apache/dubbo-go/pull/127>
- Add wg.Done() for mockDataListener<https://github.com/apache/dubbo-go/pull/118>
- Delete wrong spelling words<https://github.com/apache/dubbo-go/pull/107>
- Use sync.Map to defeat from gettyClientPool deadlock<https://github.com/apache/dubbo-go/pull/106>
- Handle panic when function args list is empty<https://github.com/apache/dubbo-go/pull/98>
- url.Values is not safe map<https://github.com/apache/dubbo-go/pull/172>;
...@@ -57,7 +57,7 @@ func initSignal() { ...@@ -57,7 +57,7 @@ func initSignal() {
case syscall.SIGHUP: case syscall.SIGHUP:
// reload() // reload()
default: default:
go time.AfterFunc(time.Duration(float64(survivalTimeout)*float64(time.Second)), func() { time.AfterFunc(time.Duration(float64(survivalTimeout)*float64(time.Second)), func() {
logger.Warnf("app exit now by force...") logger.Warnf("app exit now by force...")
os.Exit(1) os.Exit(1)
}) })
......
...@@ -35,10 +35,6 @@ import ( ...@@ -35,10 +35,6 @@ import (
_ "github.com/apache/dubbo-go/registry/zookeeper" _ "github.com/apache/dubbo-go/registry/zookeeper"
) )
var (
survivalTimeout int = 10e9
)
// they are necessary: // they are necessary:
// export CONF_CONSUMER_FILE_PATH="xxx" // export CONF_CONSUMER_FILE_PATH="xxx"
// export APP_LOG_CONF_FILE="xxx" // export APP_LOG_CONF_FILE="xxx"
......
...@@ -20,9 +20,6 @@ package main ...@@ -20,9 +20,6 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"os"
"os/signal"
"syscall"
"time" "time"
) )
...@@ -44,10 +41,6 @@ import ( ...@@ -44,10 +41,6 @@ import (
_ "github.com/apache/dubbo-go/registry/zookeeper" _ "github.com/apache/dubbo-go/registry/zookeeper"
) )
var (
survivalTimeout int = 10e9
)
// they are necessary: // they are necessary:
// export CONF_CONSUMER_FILE_PATH="xxx" // export CONF_CONSUMER_FILE_PATH="xxx"
// export APP_LOG_CONF_FILE="xxx" // export APP_LOG_CONF_FILE="xxx"
...@@ -102,29 +95,4 @@ func main() { ...@@ -102,29 +95,4 @@ func main() {
resGot := <-getUser1Chan resGot := <-getUser1Chan
logger.Infof("[GetUser1] %v", resGot) logger.Infof("[GetUser1] %v", resGot)
} }
//initSignal()
}
func initSignal() {
signals := make(chan os.Signal, 1)
// It is not possible to block SIGKILL or syscall.SIGSTOP
signal.Notify(signals, os.Interrupt, os.Kill, syscall.SIGHUP,
syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
for {
sig := <-signals
logger.Infof("get signal %s", sig.String())
switch sig {
case syscall.SIGHUP:
// reload()
default:
go time.AfterFunc(time.Duration(survivalTimeout)*time.Second, func() {
logger.Warnf("app exit now by force...")
os.Exit(1)
})
// 要么fastFailTimeout时间内执行完毕下面的逻辑然后程序退出,要么执行上面的超时函数程序强行退出
fmt.Println("app exit now...")
return
}
}
} }
...@@ -22,6 +22,7 @@ import ( ...@@ -22,6 +22,7 @@ import (
"math/rand" "math/rand"
"net" "net"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
...@@ -78,11 +79,19 @@ func newGettyRPCClientConn(pool *gettyRPCClientPool, protocol, addr string) (*ge ...@@ -78,11 +79,19 @@ func newGettyRPCClientConn(pool *gettyRPCClientPool, protocol, addr string) (*ge
time.Sleep(1e6) time.Sleep(1e6)
} }
logger.Infof("client init ok") logger.Infof("client init ok")
c.created = time.Now().Unix() c.updateActive(time.Now().Unix())
return c, nil return c, nil
} }
func (c *gettyRPCClient) updateActive(active int64) {
atomic.StoreInt64(&c.created, active)
}
func (c *gettyRPCClient) getActive() int64 {
return atomic.LoadInt64(&c.created)
}
func (c *gettyRPCClient) newSession(session getty.Session) error { func (c *gettyRPCClient) newSession(session getty.Session) error {
var ( var (
ok bool ok bool
...@@ -169,9 +178,8 @@ func (c *gettyRPCClient) removeSession(session getty.Session) { ...@@ -169,9 +178,8 @@ func (c *gettyRPCClient) removeSession(session getty.Session) {
} }
logger.Infof("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions)) logger.Infof("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions))
if len(c.sessions) == 0 { if len(c.sessions) == 0 {
c.pool.Lock() c.pool.safeRemove(c)
c.close() // -> pool.remove(c) c.close()
c.pool.Unlock()
} }
} }
...@@ -225,10 +233,8 @@ func (c *gettyRPCClient) isAvailable() bool { ...@@ -225,10 +233,8 @@ func (c *gettyRPCClient) isAvailable() bool {
} }
func (c *gettyRPCClient) close() error { func (c *gettyRPCClient) close() error {
err := perrors.Errorf("close gettyRPCClient{%#v} again", c) closeErr := perrors.Errorf("close gettyRPCClient{%#v} again", c)
c.once.Do(func() { c.once.Do(func() {
// delete @c from client pool
c.pool.remove(c)
c.gettyClient.Close() c.gettyClient.Close()
c.gettyClient = nil c.gettyClient = nil
for _, s := range c.sessions { for _, s := range c.sessions {
...@@ -238,10 +244,17 @@ func (c *gettyRPCClient) close() error { ...@@ -238,10 +244,17 @@ func (c *gettyRPCClient) close() error {
} }
c.sessions = c.sessions[:0] c.sessions = c.sessions[:0]
c.created = 0 c.updateActive(0)
err = nil closeErr = nil
}) })
return err return closeErr
}
func (c *gettyRPCClient) safeClose() error {
c.lock.Lock()
defer c.lock.Unlock()
return c.close()
} }
type gettyRPCClientPool struct { type gettyRPCClientPool struct {
...@@ -268,7 +281,7 @@ func (p *gettyRPCClientPool) close() { ...@@ -268,7 +281,7 @@ func (p *gettyRPCClientPool) close() {
p.conns = nil p.conns = nil
p.Unlock() p.Unlock()
for _, conn := range conns { for _, conn := range conns {
conn.close() conn.safeClose()
} }
} }
...@@ -286,11 +299,14 @@ func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPC ...@@ -286,11 +299,14 @@ func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPC
conn := p.conns[len(p.conns)-1] conn := p.conns[len(p.conns)-1]
p.conns = p.conns[:len(p.conns)-1] p.conns = p.conns[:len(p.conns)-1]
if d := now - conn.created; d > p.ttl { if d := now - conn.getActive(); d > p.ttl {
conn.close() // -> pool.remove(c) if closeErr := conn.safeClose(); closeErr == nil {
p.remove(conn)
}
continue continue
} }
conn.created = now //update created time conn.updateActive(now) //update created time
return conn, nil return conn, nil
} }
// create new conn // create new conn
...@@ -298,34 +314,37 @@ func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPC ...@@ -298,34 +314,37 @@ func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPC
} }
func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) { func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) {
if conn == nil || conn.created == 0 { if conn == nil || conn.getActive() == 0 {
return return
} }
if err != nil { if err != nil {
conn.close() conn.safeClose()
return return
} }
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
if p.conns == nil { if p.conns == nil {
return return
} }
if len(p.conns) >= p.size { if len(p.conns) >= p.size {
conn.close() if closeErr := conn.safeClose(); closeErr == nil {
// delete @conn from client pool
p.remove(conn)
}
return return
} }
p.conns = append(p.conns, conn) p.conns = append(p.conns, conn)
} }
func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) { func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) {
if conn == nil || conn.created == 0 { if conn == nil || conn.getActive() == 0 {
return return
} }
//p.Lock()
//defer p.Unlock()
if p.conns == nil { if p.conns == nil {
return return
} }
...@@ -339,3 +358,10 @@ func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) { ...@@ -339,3 +358,10 @@ func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) {
} }
} }
} }
func (p *gettyRPCClientPool) safeRemove(conn *gettyRPCClient) {
p.Lock()
defer p.Unlock()
p.remove(conn)
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment