diff --git a/README.md b/README.md index ce73ead94dae55d50a51906f8488c944c455ebe0..cc90069fca28955d944d59ba3249a2883a8a4a8d 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Apache License, Version 2.0 + 4 Registry: ZooKeeper(√), Etcd(X), Redis(X) + 5 Strategy: Failover(√), Failfast(√) + 6 Load Balance: Random(√), RoundRobin(√) -+ 7 Role: Consumer(√), Provider(X) ++ 7 Role: Consumer(√), Provider(√) ## Code Example diff --git a/examples/go-client/app/client.go b/examples/go-client/app/client.go index cbb46987d21f1e9eaef59f76c94cc4f5ee1e687c..8f470ad3e363700430ff2f1c137afb49908297cd 100644 --- a/examples/go-client/app/client.go +++ b/examples/go-client/app/client.go @@ -43,10 +43,11 @@ func main() { time.Sleep(3e9) gxlog.CInfo("\n\n\nstart to test jsonrpc") - for i := 0; i < 100; i++ { - testJsonrpc("A003") - time.Sleep(1e9) - } + testJsonrpc("A003") + time.Sleep(3e9) + + gxlog.CInfo("\n\n\nstart to test jsonrpc illegal method") + testJsonrpcIllegalMethod("A003") initSignal() } diff --git a/examples/go-client/app/config.go b/examples/go-client/app/config.go index a4a58d6634e98ba31df9680f9e7450c01d68f396..0eb3ffd48380db4e4571f5e7cca984c0fc2ada0e 100644 --- a/examples/go-client/app/config.go +++ b/examples/go-client/app/config.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "io/ioutil" "os" "path" "time" @@ -10,7 +11,8 @@ import ( import ( "github.com/AlexStocks/goext/log" log "github.com/AlexStocks/log4go" - config "github.com/koding/multiconfig" + jerrors "github.com/juju/errors" + yaml "gopkg.in/yaml.v2" ) import ( @@ -30,25 +32,25 @@ type ( // Client holds supported types by the multiconfig package ClientConfig struct { // pprof - Pprof_Enabled bool `default:"false"` - Pprof_Port int `default:"10086"` + Pprof_Enabled bool `default:"false" yaml:"pprof_enabled" json:"pprof_enabled,omitempty"` + Pprof_Port int `default:"10086" yaml:"pprof_port" json:"pprof_port,omitempty"` // client - Connect_Timeout string `default:"100ms"` + Connect_Timeout string `default:"100ms" yaml:"connect_timeout" json:"connect_timeout,omitempty"` connectTimeout time.Duration - Request_Timeout string `default:"5s"` // 500ms, 1m + Request_Timeout string `yaml:"request_timeout" default:"5s" json:"request_timeout,omitempty"` // 500ms, 1m requestTimeout time.Duration // codec & selector & transport & registry - Selector string `default:"cache"` - Selector_TTL string `default:"10m"` - Registry string `default:"zookeeper"` + Selector string `default:"cache" yaml:"selector" json:"selector,omitempty"` + Selector_TTL string `default:"10m" yaml:"selector_ttl" json:"selector_ttl,omitempty"` + Registry string `default:"zookeeper" yaml:"registry" json:"registry,omitempty"` // application - Application_Config registry.ApplicationConfig - Registry_Config registry.RegistryConfig + Application_Config registry.ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"` + Registry_Config registry.RegistryConfig `yaml:"registry_config" json:"registry_config,omitempty"` // 一个客户端只允许使用一个service的其中一个group和其中一个version - Service_List []registry.ServiceConfig + Service_List []registry.ServiceConfig `yaml:"service_list" json:"service_list,omitempty"` } ) @@ -63,12 +65,22 @@ func initClientConfig() error { panic(fmt.Sprintf("application configure file name is nil")) return nil // I know it is of no usage. Just Err Protection. } - if path.Ext(confFile) != ".toml" { - panic(fmt.Sprintf("application configure file name{%v} suffix must be .toml", confFile)) + if path.Ext(confFile) != ".yml" { + panic(fmt.Sprintf("application configure file name{%v} suffix must be .yml", confFile)) return nil } clientConfig = new(ClientConfig) - config.MustLoadWithPath(confFile, clientConfig) + + confFileStream, err := ioutil.ReadFile(confFile) + if err != nil { + panic(fmt.Sprintf("ioutil.ReadFile(file:%s) = error:%s", confFile, jerrors.ErrorStack(err))) + return nil + } + err = yaml.Unmarshal(confFileStream, clientConfig) + if err != nil { + panic(fmt.Sprintf("yaml.Unmarshal() = error:%s", jerrors.ErrorStack(err))) + return nil + } gxlog.CInfo("config{%#v}\n", clientConfig) // log diff --git a/examples/go-client/app/test.go b/examples/go-client/app/test.go index 7f486c0aa6c5c82c0f0a9151cc51311e6d065228..2868e5d395026ad7c3013f5dc4c0bd4efd4d67d5 100644 --- a/examples/go-client/app/test.go +++ b/examples/go-client/app/test.go @@ -88,3 +88,74 @@ func testJsonrpc(userKey string) { log.Info("response result:%s", user) // gxlog.CInfo("response result:%s", user) } + +func testJsonrpcIllegalMethod(userKey string) { + var ( + err error + service string + method string + serviceIdx int + user *JsonRPCUser + ctx context.Context + conf registry.ServiceConfig + req jsonrpc.Request + serviceURL *registry.ServiceURL + clt *jsonrpc.HTTPClient + ) + + clt = jsonrpc.NewHTTPClient( + &jsonrpc.HTTPOptions{ + HandshakeTimeout: clientConfig.connectTimeout, + HTTPTimeout: clientConfig.requestTimeout, + }, + ) + + serviceIdx = -1 + service = "com.ikurento.user.UserProvider" + for i := range clientConfig.Service_List { + if clientConfig.Service_List[i].Service == service && clientConfig.Service_List[i].Protocol == public.CODECTYPE_JSONRPC.String() { + serviceIdx = i + break + } + } + if serviceIdx == -1 { + panic(fmt.Sprintf("can not find service in config service list:%#v", clientConfig.Service_List)) + } + + // Create request + method = string("GetUser1") + // gxlog.CInfo("jsonrpc selected service %#v", clientConfig.Service_List[serviceIdx]) + conf = registry.ServiceConfig{ + Group: clientConfig.Service_List[serviceIdx].Group, + Protocol: public.CodecType(public.CODECTYPE_JSONRPC).String(), + Version: clientConfig.Service_List[serviceIdx].Version, + Service: clientConfig.Service_List[serviceIdx].Service, + } + // Attention the last parameter : []UserKey{userKey} + req = clt.NewRequest(conf, method, []string{userKey}) + + serviceURL, err = clientRegistry.Filter(req.ServiceConfig(), 1) + if err != nil { + log.Error("registry.Filter(conf:%#v) = error:%s", req.ServiceConfig(), jerrors.ErrorStack(err)) + // gxlog.CError("registry.Filter(conf:%#v) = error:%s", req.ServiceConfig(), jerrors.ErrorStack(err)) + return + } + log.Debug("got serviceURL: %s", serviceURL) + // Set arbitrary headers in context + ctx = context.WithValue(context.Background(), public.DUBBOGO_CTX_KEY, map[string]string{ + "X-Proxy-Id": "dubbogo", + "X-Services": service, + "X-Method": method, + }) + + user = new(JsonRPCUser) + // Call service + if err = clt.Call(ctx, *serviceURL, req, user); err != nil { + log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err)) + // gxlog.CError("client.Call() return error:%+v", jerrors.ErrorStack(err)) + return + } + + log.Info("response result:%s", user) + // gxlog.CInfo("response result:%s", user) +} diff --git a/examples/go-client/assembly/common/app.properties b/examples/go-client/assembly/common/app.properties index c772e4bf5c784e117c0f676c5b4a0a104aadfdb8..a4fe0dc49c83e7c180408b02010ebf4bbefc98a9 100644 --- a/examples/go-client/assembly/common/app.properties +++ b/examples/go-client/assembly/common/app.properties @@ -13,5 +13,5 @@ export TARGET_EXEC_NAME="user_info_client" # BUILD_PACKAGE="dubbogo-examples/user-info/client/app" export BUILD_PACKAGE="app" -export TARGET_CONF_FILE="conf/client.toml" +export TARGET_CONF_FILE="conf/client.yml" export TARGET_LOG_CONF_FILE="conf/log.xml" diff --git a/examples/go-client/profiles/dev/client.toml b/examples/go-client/profiles/dev/client.toml deleted file mode 100644 index e9b86a2c4121e2217a813efe0670f6a848b25630..0000000000000000000000000000000000000000 --- a/examples/go-client/profiles/dev/client.toml +++ /dev/null @@ -1,32 +0,0 @@ -# dubbo client toml configure file - -# pprof -Pprof_Enabled = true -Pprof_Port = 10086 - -# client -Request_Timeout = "3500ms" -NET_IO_Timeout = "2s" -Retries = 1 -# connect timeout -Connect_Timeout = "100ms" -Selector = "cache" -Selector_TTL = "10m" -Registry = "zookeeper" - -# application config -[Application_Config] -Organization = "ikurento.com" -Name = "Pusher" -Module = "dubbogo user-info client" -Version = "0.0.1" -Owner = "ZX" - -[Registry_Config] - # You can indent as you please. Tabs or spaces. TOML don't care. - Address = ["127.0.0.1:2181"] - Timeout = 3 - -[[Service_List]] - Protocol = "jsonrpc" - Service = "com.ikurento.user.UserProvider" diff --git a/examples/go-client/profiles/dev/client.yml b/examples/go-client/profiles/dev/client.yml new file mode 100644 index 0000000000000000000000000000000000000000..6ea7e6596291e9e0d3bc344cd7c0020354ceb97e --- /dev/null +++ b/examples/go-client/profiles/dev/client.yml @@ -0,0 +1,33 @@ +# dubbo client yaml configure file + +# pprof +pprof_enabled : true +pprof_port : 10086 + +# client +request_timeout : "3500ms" +net_io_timeout : "2s" +retries : 1 +# connect timeout +connect_timeout : "100ms" +selector : "cache" +selector_ttl : "10m" +registry : "zookeeper" + +# application config +application_config: + organization : "ikurento.com" + name : "BDTService" + module : "dubbogo user-info client" + version : "0.0.1" + owner : "ZX" + +registry_config: + timeout : 3 + address: + - "127.0.0.1:2181" + +service_list: + - + protocol : "jsonrpc" + service : "com.ikurento.user.UserProvider" diff --git a/examples/go-client/profiles/release/client.toml b/examples/go-client/profiles/release/client.toml deleted file mode 100644 index 60e2d2f26b7fcb9b972ce9eeb2b16ca804a91642..0000000000000000000000000000000000000000 --- a/examples/go-client/profiles/release/client.toml +++ /dev/null @@ -1,42 +0,0 @@ -# dubbo client toml configure file - -# pprof -Pprof_Enabled = true -Pprof_Port = 10086 - -# client -Request_Timeout = "350ms" -NET_IO_Timeout = "2s" -Retries = 2 -# 连接池中每个地址的最大连接数 -Pool_Size = 32 -# 连接池中每个连接的有效时间 -Pool_TTL = "10m" -# connect timeout -Connect_Timeout = "100ms" -Selector = "cache" -Selector_TTL = "10m" -Registry = "zookeeper" - -# application config -[Application_Config] -Organization = "ikurento.com" -Name = "Pusher" -Module = "dubbogo user-info client" -Version = "0.0.1" -Owner = "ZX" - -[Registry_Config] - # You can indent as you please. Tabs or spaces. TOML don't care. - Address = ["192.168.35.3:2181"] - Timeout = 3 - -[[Service_List]] - Protocol = "jsonrpc" - Service = "com.ikurento.user.UserProvider" - -[[Service_List]] - Protocol = "dubbo" - Service = "com.ikurento.user.UserProvider" - Version = "2.0" - diff --git a/examples/go-client/profiles/release/client.yml b/examples/go-client/profiles/release/client.yml new file mode 100644 index 0000000000000000000000000000000000000000..6ea7e6596291e9e0d3bc344cd7c0020354ceb97e --- /dev/null +++ b/examples/go-client/profiles/release/client.yml @@ -0,0 +1,33 @@ +# dubbo client yaml configure file + +# pprof +pprof_enabled : true +pprof_port : 10086 + +# client +request_timeout : "3500ms" +net_io_timeout : "2s" +retries : 1 +# connect timeout +connect_timeout : "100ms" +selector : "cache" +selector_ttl : "10m" +registry : "zookeeper" + +# application config +application_config: + organization : "ikurento.com" + name : "BDTService" + module : "dubbogo user-info client" + version : "0.0.1" + owner : "ZX" + +registry_config: + timeout : 3 + address: + - "127.0.0.1:2181" + +service_list: + - + protocol : "jsonrpc" + service : "com.ikurento.user.UserProvider" diff --git a/examples/go-client/profiles/test/client.toml b/examples/go-client/profiles/test/client.toml deleted file mode 100644 index 60e2d2f26b7fcb9b972ce9eeb2b16ca804a91642..0000000000000000000000000000000000000000 --- a/examples/go-client/profiles/test/client.toml +++ /dev/null @@ -1,42 +0,0 @@ -# dubbo client toml configure file - -# pprof -Pprof_Enabled = true -Pprof_Port = 10086 - -# client -Request_Timeout = "350ms" -NET_IO_Timeout = "2s" -Retries = 2 -# 连接池中每个地址的最大连接数 -Pool_Size = 32 -# 连接池中每个连接的有效时间 -Pool_TTL = "10m" -# connect timeout -Connect_Timeout = "100ms" -Selector = "cache" -Selector_TTL = "10m" -Registry = "zookeeper" - -# application config -[Application_Config] -Organization = "ikurento.com" -Name = "Pusher" -Module = "dubbogo user-info client" -Version = "0.0.1" -Owner = "ZX" - -[Registry_Config] - # You can indent as you please. Tabs or spaces. TOML don't care. - Address = ["192.168.35.3:2181"] - Timeout = 3 - -[[Service_List]] - Protocol = "jsonrpc" - Service = "com.ikurento.user.UserProvider" - -[[Service_List]] - Protocol = "dubbo" - Service = "com.ikurento.user.UserProvider" - Version = "2.0" - diff --git a/examples/go-client/profiles/test/client.yml b/examples/go-client/profiles/test/client.yml new file mode 100644 index 0000000000000000000000000000000000000000..6ea7e6596291e9e0d3bc344cd7c0020354ceb97e --- /dev/null +++ b/examples/go-client/profiles/test/client.yml @@ -0,0 +1,33 @@ +# dubbo client yaml configure file + +# pprof +pprof_enabled : true +pprof_port : 10086 + +# client +request_timeout : "3500ms" +net_io_timeout : "2s" +retries : 1 +# connect timeout +connect_timeout : "100ms" +selector : "cache" +selector_ttl : "10m" +registry : "zookeeper" + +# application config +application_config: + organization : "ikurento.com" + name : "BDTService" + module : "dubbogo user-info client" + version : "0.0.1" + owner : "ZX" + +registry_config: + timeout : 3 + address: + - "127.0.0.1:2181" + +service_list: + - + protocol : "jsonrpc" + service : "com.ikurento.user.UserProvider" diff --git a/examples/go-server/app/config.go b/examples/go-server/app/config.go new file mode 100644 index 0000000000000000000000000000000000000000..6f22aee3a283ec8d783e2a13e0f73cd95b01487c --- /dev/null +++ b/examples/go-server/app/config.go @@ -0,0 +1,115 @@ +/****************************************************** +# DESC : env var & configure +# AUTHOR : Alex Stocks +# VERSION : 1.0 +# LICENCE : Apache License 2.0 +# EMAIL : alexstocks@foxmail.com +# MOD : 2016-07-21 16:41 +# FILE : config.go +******************************************************/ + +package main + +import ( + "fmt" + "io/ioutil" + "os" + "path" + "time" + + "github.com/AlexStocks/goext/log" + + log "github.com/AlexStocks/log4go" + + jerrors "github.com/juju/errors" + + "github.com/dubbo/dubbo-go/registry" + yaml "gopkg.in/yaml.v2" +) + +const ( + APP_CONF_FILE string = "APP_CONF_FILE" + APP_LOG_CONF_FILE string = "APP_LOG_CONF_FILE" +) + +var ( + conf *ServerConfig +) + +type ( + ServerConfig struct { + // pprof + Pprof_Enabled bool `default:"false" yaml:"pprof_enabled" json:"pprof_enabled,omitempty"` + Pprof_Port int `default:"10086" yaml:"pprof_port" json:"pprof_port,omitempty"` + + // transport & registry + Transport string `default:"http" yaml:"transport" json:"transport,omitempty"` + NetTimeout string `default:"100ms" yaml:"net_timeout" json:"net_timeout,omitempty"` // in ms + netTimeout time.Duration + // application + Application_Config registry.ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"` + // Registry_Address string `default:"192.168.35.3:2181"` + Registry_Config registry.RegistryConfig `yaml:"registry_config" json:"registry_config,omitempty"` + Service_List []registry.ServiceConfig `yaml:"service_list" json:"service_list,omitempty"` + Server_List []registry.ServerConfig `yaml:"server_list" json:"server_list,omitempty"` + } +) + +func initServerConf() *ServerConfig { + var ( + err error + confFile string + ) + + confFile = os.Getenv(APP_CONF_FILE) + if confFile == "" { + panic(fmt.Sprintf("application configure file name is nil")) + return nil + } + if path.Ext(confFile) != ".yml" { + panic(fmt.Sprintf("application configure file name{%v} suffix must be .yml", confFile)) + return nil + } + + conf = &ServerConfig{} + confFileStream, err := ioutil.ReadFile(confFile) + if err != nil { + panic(fmt.Sprintf("ioutil.ReadFile(file:%s) = error:%s", confFile, jerrors.ErrorStack(err))) + return nil + } + err = yaml.Unmarshal(confFileStream, conf) + if err != nil { + panic(fmt.Sprintf("yaml.Unmarshal() = error:%s", jerrors.ErrorStack(err))) + return nil + } + if conf.netTimeout, err = time.ParseDuration(conf.NetTimeout); err != nil { + panic(fmt.Sprintf("time.ParseDuration(NetTimeout:%#v) = error:%s", conf.NetTimeout, err)) + return nil + } + + gxlog.CInfo("config{%#v}\n", conf) + + return conf +} + +func configInit() error { + var ( + confFile string + ) + + initServerConf() + + confFile = os.Getenv(APP_LOG_CONF_FILE) + if confFile == "" { + panic(fmt.Sprintf("log configure file name is nil")) + return nil + } + if path.Ext(confFile) != ".xml" { + panic(fmt.Sprintf("log configure file name{%v} suffix must be .xml", confFile)) + return nil + } + + log.LoadConfiguration(confFile) + + return nil +} diff --git a/examples/go-server/app/server.go b/examples/go-server/app/server.go new file mode 100644 index 0000000000000000000000000000000000000000..cc788cef10785f7ded25f3fa3af94add0ea38c36 --- /dev/null +++ b/examples/go-server/app/server.go @@ -0,0 +1,148 @@ +/****************************************************** +# DESC : provider example +# AUTHOR : Alex Stocks +# VERSION : 1.0 +# LICENCE : Apache License 2.0 +# EMAIL : alexstocks@foxmail.com +# MOD : 2016-07-21 16:41 +# FILE : server.go +******************************************************/ + +package main + +import ( + "fmt" + "net/http" + _ "net/http/pprof" + "os" + "os/signal" + "strconv" + "syscall" + + // "github.com/AlexStocks/gocolor" + + "github.com/AlexStocks/goext/net" + "github.com/AlexStocks/goext/time" + log "github.com/AlexStocks/log4go" + "github.com/dubbo/dubbo-go/jsonrpc" + "github.com/dubbo/dubbo-go/registry" + jerrors "github.com/juju/errors" +) + +var ( + survivalTimeout int = 3e9 + servo *jsonrpc.Server +) + +func main() { + var ( + err error + ) + + err = configInit() + if err != nil { + log.Error("configInit() = error{%#v}", err) + return + } + initProfiling() + + servo = initServer() + err = servo.Handle(&UserProvider{}) + if err != nil { + panic(err) + return + } + servo.Start() + + initSignal() +} + +func initServer() *jsonrpc.Server { + var ( + err error + serverRegistry *registry.ZkProviderRegistry + srv *jsonrpc.Server + ) + + if conf == nil { + panic(fmt.Sprintf("conf is nil")) + return nil + } + + // registry + serverRegistry, err = registry.NewZkProviderRegistry( + registry.ApplicationConf(conf.Application_Config), + registry.RegistryConf(conf.Registry_Config), + registry.BalanceMode(registry.SM_RoundRobin), + registry.ServiceTTL(conf.netTimeout), + ) + if err != nil || serverRegistry == nil { + panic(fmt.Sprintf("fail to init registry.Registy, err:%s", jerrors.ErrorStack(err))) + return nil + } + + // provider + srv = jsonrpc.NewServer( + jsonrpc.Registry(serverRegistry), + jsonrpc.ConfList(conf.Server_List), + jsonrpc.ServiceConfList(conf.Service_List), + ) + + return srv +} + +func uninitServer() { + if servo != nil { + servo.Stop() + } + log.Close() +} + +func initProfiling() { + if !conf.Pprof_Enabled { + return + } + const ( + PprofPath = "/debug/pprof/" + ) + var ( + err error + ip string + addr string + ) + + ip, err = gxnet.GetLocalIP() + if err != nil { + panic("cat not get local ip!") + } + addr = ip + ":" + strconv.Itoa(conf.Pprof_Port) + log.Info("App Profiling startup on address{%v}", addr+PprofPath) + + go func() { + log.Info(http.ListenAndServe(addr, nil)) + }() +} + +func initSignal() { + signals := make(chan os.Signal, 1) + // It is not possible to block SIGKILL or syscall.SIGSTOP + signal.Notify(signals, os.Interrupt, os.Kill, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) + for { + sig := <-signals + log.Info("get signal %s", sig.String()) + switch sig { + case syscall.SIGHUP: + // reload() + default: + go gxtime.Future(survivalTimeout, func() { + log.Warn("app exit now by force...") + os.Exit(1) + }) + + // 要么fastFailTimeout时间内执行完毕下面的逻辑然后程序退出,要么执行上面的超时函数程序强行退出 + uninitServer() + fmt.Println("provider app exit now...") + return + } + } +} diff --git a/examples/go-server/app/user.go b/examples/go-server/app/user.go new file mode 100644 index 0000000000000000000000000000000000000000..65aa049174a1e22416e31df50229d322c7957919 --- /dev/null +++ b/examples/go-server/app/user.go @@ -0,0 +1,173 @@ +/***************************************************** +# DESC : UserProvider Service +# AUTHOR : Alex Stocks +# VERSION : 1.0 +# LICENCE : Apache License 2.0 +# EMAIL : alexstocks@foxmail.com +# MOD : 2016-07-21 19:22 +# FILE : user.go +******************************************************/ + +package main + +import ( + // "encoding/json" + "context" + "fmt" + "time" +) + +import ( + "github.com/AlexStocks/goext/log" + "github.com/AlexStocks/goext/time" +) + +type Gender int + +const ( + MAN = iota + WOMAN +) + +var genderStrings = [...]string{ + "MAN", + "WOMAN", +} + +func (g Gender) String() string { + return genderStrings[g] +} + +type ( + User struct { + Id string `json:"id"` + Name string `json:"name"` + Age int `json:"age"` + sex Gender + Birth int `json:"time"` + Sex string `json:"sex"` + } + + UserId struct { + Id string + } + + UserProvider struct { + user map[string]User + } +) + +var ( + DefaultUser = User{ + Id: "0", Name: "Alex Stocks", Age: 31, + // Birth: int(time.Date(1985, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), + Birth: gxtime.YMD(1985, 11, 24, 15, 15, 0), + sex: Gender(MAN), + } + + userMap = UserProvider{user: make(map[string]User)} +) + +func init() { + DefaultUser.Sex = DefaultUser.sex.String() + userMap.user["A000"] = DefaultUser + userMap.user["A001"] = User{Id: "001", Name: "ZhangSheng", Age: 18, sex: MAN} + userMap.user["A002"] = User{Id: "002", Name: "Lily", Age: 20, sex: WOMAN} + userMap.user["A003"] = User{Id: "113", Name: "Moorse", Age: 30, sex: MAN} + for k, v := range userMap.user { + v.Birth = int(time.Now().AddDate(-1*v.Age, 0, 0).Unix()) + v.Sex = userMap.user[k].sex.String() + userMap.user[k] = v + } +} + +/* +// you can define your json unmarshal function here +func (this *UserId) UnmarshalJSON(value []byte) error { + this.Id = string(value) + this.Id = strings.TrimPrefix(this.Id, "\"") + this.Id = strings.TrimSuffix(this.Id, `"`) + + return nil +} +*/ + +func (this *UserProvider) getUser(userId string) (*User, error) { + if user, ok := userMap.user[userId]; ok { + return &user, nil + } + + return nil, fmt.Errorf("invalid user id:%s", userId) +} + +/* +// can not work +func (this *UserProvider) GetUser(ctx context.Context, req *UserId, rsp *User) error { + var ( + err error + user *User + ) + user, err = this.getUser(req.Id) + if err == nil { + *rsp = *user + gxlog.CInfo("rsp:%#v", rsp) + // s, _ := json.Marshal(rsp) + // fmt.Println(string(s)) + + // s, _ = json.Marshal(*rsp) + // fmt.Println(string(s)) + } + return err +} +*/ + +/* +// work +func (this *UserProvider) GetUser(ctx context.Context, req *string, rsp *User) error { + var ( + err error + user *User + ) + + gxlog.CInfo("req:%#v", *req) + user, err = this.getUser(*req) + if err == nil { + *rsp = *user + gxlog.CInfo("rsp:%#v", rsp) + // s, _ := json.Marshal(rsp) + // fmt.Println(string(s)) + + // s, _ = json.Marshal(*rsp) + // fmt.Println(string(s)) + } + return err +} +*/ + +func (this *UserProvider) GetUser(ctx context.Context, req []string, rsp *User) error { + var ( + err error + user *User + ) + + gxlog.CInfo("req:%#v", req) + user, err = this.getUser(req[0]) + if err == nil { + *rsp = *user + gxlog.CInfo("rsp:%#v", rsp) + // s, _ := json.Marshal(rsp) + // fmt.Println(string(s)) + + // s, _ = json.Marshal(*rsp) + // fmt.Println(string(s)) + } + return err +} + +func (this *UserProvider) Service() string { + return "com.ikurento.user.UserProvider" +} + +func (this *UserProvider) Version() string { + return "" +} diff --git a/examples/go-server/app/version.go b/examples/go-server/app/version.go new file mode 100644 index 0000000000000000000000000000000000000000..520c7dc6b9946aad8996b6da2af0b4fbd6f727bc --- /dev/null +++ b/examples/go-server/app/version.go @@ -0,0 +1,15 @@ +/****************************************************** +# DESC : client version +# AUTHOR : Alex Stocks +# VERSION : 1.0 +# LICENCE : Apache License 2.0 +# EMAIL : alexstocks@foxmail.com +# MOD : 2016-07-21 16:41 +# FILE : version.go +******************************************************/ + +package main + +var ( + Version string = "0.2.0" +) diff --git a/examples/go-server/assembly/bin/load.sh b/examples/go-server/assembly/bin/load.sh new file mode 100644 index 0000000000000000000000000000000000000000..e202ff65f436f08191ae5364378f659de858777a --- /dev/null +++ b/examples/go-server/assembly/bin/load.sh @@ -0,0 +1,134 @@ +#!/usr/bin/env bash +# ****************************************************** +# DESC : dubbogo app devops script +# AUTHOR : Alex Stocks +# VERSION : 1.0 +# LICENCE : Apache License 2.0 +# EMAIL : alexstocks@foxmail.com +# MOD : 2016-05-13 02:01 +# FILE : load.sh +# ****************************************************** + +APP_NAME="APPLICATION_NAME" +APP_ARGS="" + + +PROJECT_HOME="" +OS_NAME=`uname` +if [[ ${OS_NAME} != "Windows" ]]; then + PROJECT_HOME=`pwd` + PROJECT_HOME=${PROJECT_HOME}"/" +fi + +export APP_CONF_FILE=${PROJECT_HOME}"TARGET_CONF_FILE" +export APP_LOG_CONF_FILE=${PROJECT_HOME}"TARGET_LOG_CONF_FILE" + +usage() { + echo "Usage: $0 start" + echo " $0 stop" + echo " $0 term" + echo " $0 restart" + echo " $0 list" + echo " $0 monitor" + echo " $0 crontab" + exit +} + +start() { + APP_LOG_PATH="${PROJECT_HOME}logs/" + mkdir -p ${APP_LOG_PATH} + APP_BIN=${PROJECT_HOME}sbin/${APP_NAME} + chmod u+x ${APP_BIN} + # CMD="nohup ${APP_BIN} ${APP_ARGS} >>${APP_NAME}.nohup.out 2>&1 &" + CMD="${APP_BIN}" + eval ${CMD} + PID=`ps aux | grep -w ${APP_NAME} | grep -v grep | awk '{print $2}'` + if [[ ${OS_NAME} != "Linux" && ${OS_NAME} != "Darwin" ]]; then + PID=`ps aux | grep -w ${APP_NAME} | grep -v grep | awk '{print $1}'` + fi + CUR=`date +%FT%T` + if [ "${PID}" != "" ]; then + for p in ${PID} + do + echo "start ${APP_NAME} ( pid =" ${p} ") at " ${CUR} + done + fi +} + +stop() { + PID=`ps aux | grep -w ${APP_NAME} | grep -v grep | awk '{print $2}'` + if [[ ${OS_NAME} != "Linux" && ${OS_NAME} != "Darwin" ]]; then + PID=`ps aux | grep -w ${APP_NAME} | grep -v grep | awk '{print $1}'` + fi + if [ "${PID}" != "" ]; + then + for ps in ${PID} + do + echo "kill -SIGINT ${APP_NAME} ( pid =" ${ps} ")" + kill -2 ${ps} + done + fi +} + + +term() { + PID=`ps aux | grep -w ${APP_NAME} | grep -v grep | awk '{print $2}'` + if [[ ${OS_NAME} != "Linux" && ${OS_NAME} != "Darwin" ]]; then + PID=`ps aux | grep -w ${APP_NAME} | grep -v grep | awk '{print $1}'` + fi + if [ "${PID}" != "" ]; + then + for ps in ${PID} + do + echo "kill -9 ${APP_NAME} ( pid =" ${ps} ")" + kill -9 ${ps} + done + fi +} + +list() { + PID=`ps aux | grep -w ${APP_NAME} | grep -v grep | awk '{printf("%s,%s,%s,%s\n", $1, $2, $9, $10)}'` + if [[ ${OS_NAME} != "Linux" && ${OS_NAME} != "Darwin" ]]; then + PID=`ps aux | grep -w ${APP_NAME} | grep -v grep | awk '{printf("%s,%s,%s,%s,%s\n", $1, $4, $6, $7, $8)}'` + fi + + if [ "${PID}" != "" ]; then + echo "list ${APP_NAME}" + + if [[ ${OS_NAME} == "Linux" || ${OS_NAME} == "Darwin" ]]; then + echo "index: user, pid, start, duration" + else + echo "index: PID, WINPID, UID, STIME, COMMAND" + fi + idx=0 + for ps in ${PID} + do + echo "${idx}: ${ps}" + ((idx ++)) + done + fi +} + +opt=$1 +case C"$opt" in + Cstart) + start + ;; + Cstop) + stop + ;; + Cterm) + term + ;; + Crestart) + term + start + ;; + Clist) + list + ;; + C*) + usage + ;; +esac + diff --git a/examples/go-server/assembly/common/app.properties b/examples/go-server/assembly/common/app.properties new file mode 100644 index 0000000000000000000000000000000000000000..d230d5efc4ee84c4a99e1b27e7b49d97046d91a3 --- /dev/null +++ b/examples/go-server/assembly/common/app.properties @@ -0,0 +1,17 @@ +# dubbogo application configure script +# ****************************************************** +# DESC : application environment variable +# AUTHOR : Alex Stocks +# VERSION : 1.0 +# LICENCE : Apache License 2.0 +# EMAIL : alexstocks@foxmail.com +# MOD : 2016-07-12 16:29 +# FILE : app.properties +# ****************************************************** + +TARGET_EXEC_NAME="user_info_server" +# BUILD_PACKAGE="dubbogo-examples/user-info/server/app" +BUILD_PACKAGE="app" + +TARGET_CONF_FILE="conf/server.yml" +TARGET_LOG_CONF_FILE="conf/log.xml" diff --git a/examples/go-server/assembly/common/build.sh b/examples/go-server/assembly/common/build.sh new file mode 100644 index 0000000000000000000000000000000000000000..a41fbbac321b74849d71b28a65f8b7c5de13cf0f --- /dev/null +++ b/examples/go-server/assembly/common/build.sh @@ -0,0 +1,74 @@ +#!/usr/bin/env bash +# ****************************************************** +# DESC : build script +# AUTHOR : Alex Stocks +# VERSION : 1.0 +# LICENCE : Apache License 2.0 +# EMAIL : alexstocks@foxmail.com +# MOD : 2016-07-12 16:28 +# FILE : build.sh +# ****************************************************** + +rm -rf target/ + +PROJECT_HOME=`pwd` +TARGET_FOLDER=${PROJECT_HOME}/target/${GOOS} + +TARGET_SBIN_NAME=${TARGET_EXEC_NAME} +version=`cat app/version.go | grep Version | awk -F '=' '{print $2}' | awk -F '"' '{print $2}'` +if [[ ${GOOS} == "windows" ]]; then + TARGET_SBIN_NAME=${TARGET_SBIN_NAME}.exe +fi +TARGET_NAME=${TARGET_FOLDER}/${TARGET_SBIN_NAME} +if [[ $PROFILE = "test" ]]; then + # GFLAGS=-gcflags "-N -l" -race -x -v # -x会把go build的详细过程输出 + # GFLAGS=-gcflags "-N -l" -race -v + # GFLAGS="-gcflags \"-N -l\" -v" + cd ${BUILD_PACKAGE} && go build -gcflags "-N -l" -x -v -i -o ${TARGET_NAME} && cd - +else + # -s去掉符号表(然后panic时候的stack trace就没有任何文件名/行号信息了,这个等价于普通C/C++程序被strip的效果), + # -w去掉DWARF调试信息,得到的程序就不能用gdb调试了。-s和-w也可以分开使用,一般来说如果不打算用gdb调试, + # -w基本没啥损失。-s的损失就有点大了。 + cd ${BUILD_PACKAGE} && go build -ldflags "-w" -x -v -i -o ${TARGET_NAME} && cd - +fi + +TAR_NAME=${TARGET_EXEC_NAME}-${version}-`date "+%Y%m%d-%H%M"`-${PROFILE} + +mkdir -p ${TARGET_FOLDER}/${TAR_NAME} + +SBIN_DIR=${TARGET_FOLDER}/${TAR_NAME}/sbin +BIN_DIR=${TARGET_FOLDER}/${TAR_NAME} +CONF_DIR=${TARGET_FOLDER}/${TAR_NAME}/conf + +mkdir -p ${SBIN_DIR} +mkdir -p ${CONF_DIR} + +mv ${TARGET_NAME} ${SBIN_DIR} +cp -r assembly/bin ${BIN_DIR} +# modify APPLICATION_NAME +# OS=`uname` +# if [[ $OS=="Darwin" ]]; then +if [ "$(uname)" == "Darwin" ]; then + sed -i "" "s~APPLICATION_NAME~${TARGET_EXEC_NAME}~g" ${BIN_DIR}/bin/* +else + sed -i "s~APPLICATION_NAME~${TARGET_EXEC_NAME}~g" ${BIN_DIR}/bin/* +fi +# modify TARGET_CONF_FILE +if [ "$(uname)" == "Darwin" ]; then + sed -i "" "s~TARGET_CONF_FILE~${TARGET_CONF_FILE}~g" ${BIN_DIR}/bin/* +else + sed -i "s~TARGET_CONF_FILE~${TARGET_CONF_FILE}~g" ${BIN_DIR}/bin/* +fi +# modify TARGET_LOG_CONF_FILE +if [ "$(uname)" == "Darwin" ]; then + sed -i "" "s~TARGET_LOG_CONF_FILE~${TARGET_LOG_CONF_FILE}~g" ${BIN_DIR}/bin/* +else + sed -i "s~TARGET_LOG_CONF_FILE~${TARGET_LOG_CONF_FILE}~g" ${BIN_DIR}/bin/* +fi + +cp -r profiles/${PROFILE}/* ${CONF_DIR} + +cd ${TARGET_FOLDER} + +tar czf ${TAR_NAME}.tar.gz ${TAR_NAME}/* + diff --git a/examples/go-server/assembly/linux/dev.sh b/examples/go-server/assembly/linux/dev.sh new file mode 100644 index 0000000000000000000000000000000000000000..55886f09fb4873be84cfa46aae592f5f000120a4 --- /dev/null +++ b/examples/go-server/assembly/linux/dev.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# ****************************************************** +# DESC : build script for dev env +# AUTHOR : Alex Stocks +# VERSION : 1.0 +# LICENCE : Apache License 2.0 +# EMAIL : alexstocks@foxmail.com +# MOD : 2018-06-24 17:32 +# FILE : dev.sh +# ****************************************************** + + +set -e + +export GOOS=linux +export GOARCH=amd64 + +PROFILE=dev + +PROJECT_HOME=`pwd` + +if [ -f "${PROJECT_HOME}/assembly/common/app.properties" ]; then +. ${PROJECT_HOME}/assembly/common/app.properties +fi + + +if [ -f "${PROJECT_HOME}/assembly/common/build.sh" ]; then +. ${PROJECT_HOME}/assembly/common/build.sh +fi diff --git a/examples/go-server/assembly/linux/release.sh b/examples/go-server/assembly/linux/release.sh new file mode 100644 index 0000000000000000000000000000000000000000..9772ad9614583917d62beba2db9fcaefea63e1ed --- /dev/null +++ b/examples/go-server/assembly/linux/release.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# ****************************************************** +# DESC : build script for release env +# AUTHOR : Alex Stocks +# VERSION : 1.0 +# LICENCE : Apache License 2.0 +# EMAIL : alexstocks@foxmail.com +# MOD : 2016-07-12 16:25 +# FILE : release.sh +# ****************************************************** + + +set -e + +export GOOS=linux +export GOARCH=amd64 + +PROFILE=release + +PROJECT_HOME=`pwd` + +if [ -f "${PROJECT_HOME}/assembly/common/app.properties" ]; then +. ${PROJECT_HOME}/assembly/common/app.properties +fi + + +if [ -f "${PROJECT_HOME}/assembly/common/build.sh" ]; then +. ${PROJECT_HOME}/assembly/common/build.sh +fi diff --git a/examples/go-server/assembly/linux/test.sh b/examples/go-server/assembly/linux/test.sh new file mode 100644 index 0000000000000000000000000000000000000000..2fc4a98862bee5ef11a23e1b74058627a899181d --- /dev/null +++ b/examples/go-server/assembly/linux/test.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# ****************************************************** +# DESC : build script for test env +# AUTHOR : Alex Stocks +# VERSION : 1.0 +# LICENCE : Apache License 2.0 +# EMAIL : alexstocks@foxmail.com +# MOD : 2016-07-12 16:34 +# FILE : test.sh +# ****************************************************** + + +set -e + +export GOOS=linux +export GOARCH=amd64 + +PROFILE=test + +PROJECT_HOME=`pwd` + +if [ -f "${PROJECT_HOME}/assembly/common/app.properties" ]; then +. ${PROJECT_HOME}/assembly/common/app.properties +fi + + +if [ -f "${PROJECT_HOME}/assembly/common/build.sh" ]; then +. ${PROJECT_HOME}/assembly/common/build.sh +fi diff --git a/examples/go-server/assembly/mac/dev.sh b/examples/go-server/assembly/mac/dev.sh new file mode 100644 index 0000000000000000000000000000000000000000..5dfa78490b895ce556c809ead32b6f517a5f1450 --- /dev/null +++ b/examples/go-server/assembly/mac/dev.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# ****************************************************** +# DESC : build script for dev env +# AUTHOR : Alex Stocks +# VERSION : 1.0 +# LICENCE : Apache License 2.0 +# EMAIL : alexstocks@foxmail.com +# MOD : 2018-06-24 17:32 +# FILE : dev.sh +# ****************************************************** + + +set -e + +export GOOS=darwin +export GOARCH=amd64 + +PROFILE=dev + +PROJECT_HOME=`pwd` + +if [ -f "${PROJECT_HOME}/assembly/common/app.properties" ]; then +. ${PROJECT_HOME}/assembly/common/app.properties +fi + + +if [ -f "${PROJECT_HOME}/assembly/common/build.sh" ]; then +. ${PROJECT_HOME}/assembly/common/build.sh +fi diff --git a/examples/go-server/assembly/mac/release.sh b/examples/go-server/assembly/mac/release.sh new file mode 100644 index 0000000000000000000000000000000000000000..1ec21c7b511ccce9eddfac22a2374b57a7a697bf --- /dev/null +++ b/examples/go-server/assembly/mac/release.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# ****************************************************** +# DESC : build script for release env +# AUTHOR : Alex Stocks +# VERSION : 1.0 +# LICENCE : Apache License 2.0 +# EMAIL : alexstocks@foxmail.com +# MOD : 2016-07-12 16:25 +# FILE : release.sh +# ****************************************************** + + +set -e + +export GOOS=darwin +export GOARCH=amd64 + +PROFILE=release + +PROJECT_HOME=`pwd` + +if [ -f "${PROJECT_HOME}/assembly/common/app.properties" ]; then +. ${PROJECT_HOME}/assembly/common/app.properties +fi + + +if [ -f "${PROJECT_HOME}/assembly/common/build.sh" ]; then +. ${PROJECT_HOME}/assembly/common/build.sh +fi diff --git a/examples/go-server/assembly/mac/test.sh b/examples/go-server/assembly/mac/test.sh new file mode 100644 index 0000000000000000000000000000000000000000..d34914c7dbed0e442b4accf51a3ecdf7c2984db8 --- /dev/null +++ b/examples/go-server/assembly/mac/test.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# ****************************************************** +# DESC : build script for release env +# AUTHOR : Alex Stocks +# VERSION : 1.0 +# LICENCE : Apache License 2.0 +# EMAIL : alexstocks@foxmail.com +# MOD : 2016-07-12 16:25 +# FILE : release.sh +# ****************************************************** + +set -e + +export GOOS=darwin +export GOARCH=amd64 + +PROFILE=test + +PROJECT_HOME=`pwd` + +if [ -f "${PROJECT_HOME}/assembly/common/app.properties" ]; then +. ${PROJECT_HOME}/assembly/common/app.properties +fi + + +if [ -f "${PROJECT_HOME}/assembly/common/build.sh" ]; then +. ${PROJECT_HOME}/assembly/common/build.sh +fi + diff --git a/examples/go-server/assembly/windows/dev.sh b/examples/go-server/assembly/windows/dev.sh new file mode 100644 index 0000000000000000000000000000000000000000..97fbb6f698e500ad08d971b13cc1ffd00cd97803 --- /dev/null +++ b/examples/go-server/assembly/windows/dev.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# ****************************************************** +# DESC : build script for dev env +# AUTHOR : Alex Stocks +# VERSION : 1.0 +# LICENCE : Apache License 2.0 +# EMAIL : alexstocks@foxmail.com +# MOD : 2018-06-24 17:34 +# FILE : dev.sh +# ****************************************************** + + +set -e + +export GOOS=windows +export GOARCH=amd64 + +PROFILE=dev + +PROJECT_HOME=`pwd` + +if [ -f "${PROJECT_HOME}/assembly/common/app.properties" ]; then +. ${PROJECT_HOME}/assembly/common/app.properties +fi + + +if [ -f "${PROJECT_HOME}/assembly/common/build.sh" ]; then +. ${PROJECT_HOME}/assembly/common/build.sh +fi diff --git a/examples/go-server/assembly/windows/release.sh b/examples/go-server/assembly/windows/release.sh new file mode 100644 index 0000000000000000000000000000000000000000..782cb10c7828eb277b5905f10f8dd6ad1c2d6bed --- /dev/null +++ b/examples/go-server/assembly/windows/release.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# ****************************************************** +# DESC : build script for release env +# AUTHOR : Alex Stocks +# VERSION : 1.0 +# LICENCE : Apache License 2.0 +# EMAIL : alexstocks@foxmail.com +# MOD : 2016-07-12 16:25 +# FILE : release.sh +# ****************************************************** + + +set -e + +export GOOS=windows +export GOARCH=amd64 + +PROFILE=release + +PROJECT_HOME=`pwd` + +if [ -f "${PROJECT_HOME}/assembly/common/app.properties" ]; then +. ${PROJECT_HOME}/assembly/common/app.properties +fi + + +if [ -f "${PROJECT_HOME}/assembly/common/build.sh" ]; then +. ${PROJECT_HOME}/assembly/common/build.sh +fi diff --git a/examples/go-server/assembly/windows/test.sh b/examples/go-server/assembly/windows/test.sh new file mode 100644 index 0000000000000000000000000000000000000000..2037ddecf2545f1543d5d28be728fb0899722098 --- /dev/null +++ b/examples/go-server/assembly/windows/test.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# ****************************************************** +# DESC : build script for test env +# AUTHOR : Alex Stocks +# VERSION : 1.0 +# LICENCE : Apache License 2.0 +# EMAIL : alexstocks@foxmail.com +# MOD : 2016-07-12 16:34 +# FILE : test.sh +# ****************************************************** + + +set -e + +export GOOS=windows +export GOARCH=amd64 + +PROFILE=test + +PROJECT_HOME=`pwd` + +if [ -f "${PROJECT_HOME}/assembly/common/app.properties" ]; then +. ${PROJECT_HOME}/assembly/common/app.properties +fi + + +if [ -f "${PROJECT_HOME}/assembly/common/build.sh" ]; then +. ${PROJECT_HOME}/assembly/common/build.sh +fi diff --git a/examples/go-server/profiles/dev/log.xml b/examples/go-server/profiles/dev/log.xml new file mode 100644 index 0000000000000000000000000000000000000000..d2a0d89394aa2b5a882924752d9b7bab7f424dc7 --- /dev/null +++ b/examples/go-server/profiles/dev/log.xml @@ -0,0 +1,73 @@ +<logging> + <filter enabled="true"> + <tag>stdout</tag> + <type>console</type> + <!-- level is (:?FINEST|FINE|DEBUG|TRACE|INFO|WARNING|ERROR) --> + <level>DEBUG</level> + <property name="json">false</property> <!-- true enables json log format, its priority is high than format --> + <property name="format">[%D %T] [%L] (%S) %M</property> <!-- log format, if json is false this option is enable --> + </filter> + <filter enabled="true"> + <tag>debug_file</tag> + <type>file</type> + <level>DEBUG</level> + <property name="filename">logs/debug.log</property> + <property name="json">false</property> <!-- true enables json log format, its priority is high than format --> + <property name="format">[%D %T] [%L] [%S] %M</property> + <property name="rotate">true</property> <!-- true enables log rotation, otherwise append --> + <property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 --> + <property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands --> + <property name="maxbackup">16</property> <!-- \d+ --> + <property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight --> + </filter> + <filter enabled="true"> + <tag>info_file</tag> + <type>file</type> + <level>INFO</level> + <property name="filename">logs/info.log</property> + <!-- + %T - Time (15:04:05 MST) + %t - Time (15:04) + %D - Date (2006/01/02) + %d - Date (01/02/06) + %L - Level (FNST, FINE, DEBG, TRAC, WARN, EROR, CRIT) + %S - Source + %M - Message + It ignores unknown format strings (and removes them) + Recommended: "[%D %T] [%L] (%S) %M" + --> + <property name="json">false</property> <!-- true enables json log format, its priority is high than format --> + <property name="format">[%D %T] [%L] [%S] %M</property> + <property name="rotate">true</property> <!-- true enables log rotation, otherwise append --> + <property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 --> + <property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands --> + <property name="maxbackup">16</property> <!-- \d+ --> + <property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight --> + </filter> + <filter enabled="true"> + <tag>warn_file</tag> + <type>file</type> + <level>WARNING</level> + <property name="filename">logs/warn.log</property> + <property name="json">false</property> <!-- true enables json log format, its priority is high than format --> + <property name="format">[%D %T] [%L] [%S] %M</property> + <property name="rotate">true</property> <!-- true enables log rotation, otherwise append --> + <property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 --> + <property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands --> + <property name="maxbackup">16</property> <!-- \d+ --> + <property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight --> + </filter> + <filter enabled="true"> + <tag>error_file</tag> + <type>file</type> + <level>ERROR</level> + <property name="filename">logs/error.log</property> + <property name="json">false</property> <!-- true enables json log format, its priority is high than format --> + <property name="format">[%D %T] [%L] [%S] %M</property> + <property name="rotate">true</property> <!-- true enables log rotation, otherwise append --> + <property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 --> + <property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands --> + <property name="maxbackup">16</property> <!-- \d+ --> + <property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight --> + </filter> +</logging> diff --git a/examples/go-server/profiles/dev/server.yml b/examples/go-server/profiles/dev/server.yml new file mode 100644 index 0000000000000000000000000000000000000000..2bff4202a9c35ad15e61d135d23e2626df6fa349 --- /dev/null +++ b/examples/go-server/profiles/dev/server.yml @@ -0,0 +1,35 @@ +# dubbo server yaml configure file + +# pprof +pprof_enabled : true +pprof_port : 20080 + +# server +transport : "http" +net_timeout : "3s" + +# application config +application_config: + organization : "ikurento.com" + name : "BDTService" + module : "dubbogo user-info server" + version : "0.0.1" + owner : "ZX" + +registry_config: + timeout : 3 + address: + - "127.0.0.1:2181" + +service_list: + - + protocol : "jsonrpc" + # 相当于dubbo.xml中的interface + service : "com.ikurento.user.UserProvider" + +server_list: + - + ip : "127.0.0.1" + port : 20000 + # 本server能够提供所有支持同样的Protocol的servicelist的服务 + protocol : "jsonrpc" diff --git a/examples/go-server/profiles/release/log.xml b/examples/go-server/profiles/release/log.xml new file mode 100644 index 0000000000000000000000000000000000000000..834bab5b07e72f1c250d500b60fe3af25e74cfc1 --- /dev/null +++ b/examples/go-server/profiles/release/log.xml @@ -0,0 +1,73 @@ +<logging> + <filter enabled="false"> + <tag>stdout</tag> + <type>console</type> + <!-- level is (:?FINEST|FINE|DEBUG|TRACE|INFO|WARNING|ERROR) --> + <level>DEBUG</level> + <property name="json">false</property> <!-- true enables json log format, its priority is high than format --> + <property name="format">[%D %T] [%L] (%S) %M</property> <!-- log format, if json is false this option is enable --> + </filter> + <filter enabled="false"> + <tag>debug_file</tag> + <type>file</type> + <level>DEBUG</level> + <property name="filename">logs/debug.log</property> + <property name="json">false</property> <!-- true enables json log format, its priority is high than format --> + <property name="format">[%D %T] [%L] [%S] %M</property> + <property name="rotate">true</property> <!-- true enables log rotation, otherwise append --> + <property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 --> + <property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands --> + <property name="maxbackup">16</property> <!-- \d+ --> + <property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight --> + </filter> + <filter enabled="false"> + <tag>info_file</tag> + <type>file</type> + <level>INFO</level> + <property name="filename">logs/info.log</property> + <!-- + %T - Time (15:04:05 MST) + %t - Time (15:04) + %D - Date (2006/01/02) + %d - Date (01/02/06) + %L - Level (FNST, FINE, DEBG, TRAC, WARN, EROR, CRIT) + %S - Source + %M - Message + It ignores unknown format strings (and removes them) + Recommended: "[%D %T] [%L] (%S) %M" + --> + <property name="json">false</property> <!-- true enables json log format, its priority is high than format --> + <property name="format">[%D %T] [%L] [%S] %M</property> + <property name="rotate">true</property> <!-- true enables log rotation, otherwise append --> + <property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 --> + <property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands --> + <property name="maxbackup">16</property> <!-- \d+ --> + <property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight --> + </filter> + <filter enabled="true"> + <tag>warn_file</tag> + <type>file</type> + <level>WARNING</level> + <property name="filename">logs/warn.log</property> + <property name="json">false</property> <!-- true enables json log format, its priority is high than format --> + <property name="format">[%D %T] [%L] [%S] %M</property> + <property name="rotate">true</property> <!-- true enables log rotation, otherwise append --> + <property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 --> + <property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands --> + <property name="maxbackup">16</property> <!-- \d+ --> + <property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight --> + </filter> + <filter enabled="true"> + <tag>error_file</tag> + <type>file</type> + <level>ERROR</level> + <property name="filename">logs/error.log</property> + <property name="json">false</property> <!-- true enables json log format, its priority is high than format --> + <property name="format">[%D %T] [%L] [%S] %M</property> + <property name="rotate">true</property> <!-- true enables log rotation, otherwise append --> + <property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 --> + <property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands --> + <property name="maxbackup">16</property> <!-- \d+ --> + <property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight --> + </filter> +</logging> diff --git a/examples/go-server/profiles/release/server.yml b/examples/go-server/profiles/release/server.yml new file mode 100644 index 0000000000000000000000000000000000000000..2bff4202a9c35ad15e61d135d23e2626df6fa349 --- /dev/null +++ b/examples/go-server/profiles/release/server.yml @@ -0,0 +1,35 @@ +# dubbo server yaml configure file + +# pprof +pprof_enabled : true +pprof_port : 20080 + +# server +transport : "http" +net_timeout : "3s" + +# application config +application_config: + organization : "ikurento.com" + name : "BDTService" + module : "dubbogo user-info server" + version : "0.0.1" + owner : "ZX" + +registry_config: + timeout : 3 + address: + - "127.0.0.1:2181" + +service_list: + - + protocol : "jsonrpc" + # 相当于dubbo.xml中的interface + service : "com.ikurento.user.UserProvider" + +server_list: + - + ip : "127.0.0.1" + port : 20000 + # 本server能够提供所有支持同样的Protocol的servicelist的服务 + protocol : "jsonrpc" diff --git a/examples/go-server/profiles/test/log.xml b/examples/go-server/profiles/test/log.xml new file mode 100644 index 0000000000000000000000000000000000000000..834bab5b07e72f1c250d500b60fe3af25e74cfc1 --- /dev/null +++ b/examples/go-server/profiles/test/log.xml @@ -0,0 +1,73 @@ +<logging> + <filter enabled="false"> + <tag>stdout</tag> + <type>console</type> + <!-- level is (:?FINEST|FINE|DEBUG|TRACE|INFO|WARNING|ERROR) --> + <level>DEBUG</level> + <property name="json">false</property> <!-- true enables json log format, its priority is high than format --> + <property name="format">[%D %T] [%L] (%S) %M</property> <!-- log format, if json is false this option is enable --> + </filter> + <filter enabled="false"> + <tag>debug_file</tag> + <type>file</type> + <level>DEBUG</level> + <property name="filename">logs/debug.log</property> + <property name="json">false</property> <!-- true enables json log format, its priority is high than format --> + <property name="format">[%D %T] [%L] [%S] %M</property> + <property name="rotate">true</property> <!-- true enables log rotation, otherwise append --> + <property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 --> + <property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands --> + <property name="maxbackup">16</property> <!-- \d+ --> + <property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight --> + </filter> + <filter enabled="false"> + <tag>info_file</tag> + <type>file</type> + <level>INFO</level> + <property name="filename">logs/info.log</property> + <!-- + %T - Time (15:04:05 MST) + %t - Time (15:04) + %D - Date (2006/01/02) + %d - Date (01/02/06) + %L - Level (FNST, FINE, DEBG, TRAC, WARN, EROR, CRIT) + %S - Source + %M - Message + It ignores unknown format strings (and removes them) + Recommended: "[%D %T] [%L] (%S) %M" + --> + <property name="json">false</property> <!-- true enables json log format, its priority is high than format --> + <property name="format">[%D %T] [%L] [%S] %M</property> + <property name="rotate">true</property> <!-- true enables log rotation, otherwise append --> + <property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 --> + <property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands --> + <property name="maxbackup">16</property> <!-- \d+ --> + <property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight --> + </filter> + <filter enabled="true"> + <tag>warn_file</tag> + <type>file</type> + <level>WARNING</level> + <property name="filename">logs/warn.log</property> + <property name="json">false</property> <!-- true enables json log format, its priority is high than format --> + <property name="format">[%D %T] [%L] [%S] %M</property> + <property name="rotate">true</property> <!-- true enables log rotation, otherwise append --> + <property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 --> + <property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands --> + <property name="maxbackup">16</property> <!-- \d+ --> + <property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight --> + </filter> + <filter enabled="true"> + <tag>error_file</tag> + <type>file</type> + <level>ERROR</level> + <property name="filename">logs/error.log</property> + <property name="json">false</property> <!-- true enables json log format, its priority is high than format --> + <property name="format">[%D %T] [%L] [%S] %M</property> + <property name="rotate">true</property> <!-- true enables log rotation, otherwise append --> + <property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 --> + <property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands --> + <property name="maxbackup">16</property> <!-- \d+ --> + <property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight --> + </filter> +</logging> diff --git a/examples/go-server/profiles/test/server.yml b/examples/go-server/profiles/test/server.yml new file mode 100644 index 0000000000000000000000000000000000000000..2bff4202a9c35ad15e61d135d23e2626df6fa349 --- /dev/null +++ b/examples/go-server/profiles/test/server.yml @@ -0,0 +1,35 @@ +# dubbo server yaml configure file + +# pprof +pprof_enabled : true +pprof_port : 20080 + +# server +transport : "http" +net_timeout : "3s" + +# application config +application_config: + organization : "ikurento.com" + name : "BDTService" + module : "dubbogo user-info server" + version : "0.0.1" + owner : "ZX" + +registry_config: + timeout : 3 + address: + - "127.0.0.1:2181" + +service_list: + - + protocol : "jsonrpc" + # 相当于dubbo.xml中的interface + service : "com.ikurento.user.UserProvider" + +server_list: + - + ip : "127.0.0.1" + port : 20000 + # 本server能够提供所有支持同样的Protocol的servicelist的服务 + protocol : "jsonrpc" diff --git a/jsonrpc/http.go b/jsonrpc/http.go index eb871e06767c23873ddc1e6b883afac7dbb6d15f..b93fb5dc33b4c0929d0d9fea30fe6096477f73cd 100644 --- a/jsonrpc/http.go +++ b/jsonrpc/http.go @@ -173,9 +173,7 @@ func (c *HTTPClient) Do(addr, path string, httpHeader http.Header, body []byte) conn.SetReadDeadline(t) } - setNetConnTimeout(tcpConn, c.options.HTTPTimeout) - // defer setNetConnTimeout(tcpConn, 0) if _, err := reqBuf.WriteTo(tcpConn); err != nil { return nil, jerrors.Trace(err) diff --git a/jsonrpc/json.go b/jsonrpc/json.go index 4fa0b79839d496e5fc25f722197e9996c568309f..8ff48882f46bbaa303ca388fe6ef5c2aa7c481e7 100644 --- a/jsonrpc/json.go +++ b/jsonrpc/json.go @@ -29,6 +29,7 @@ import ( "fmt" "io" "reflect" + "strings" ) import ( @@ -40,10 +41,6 @@ const ( VERSION = "2.0" ) -////////////////////////////////////////// -// json codec -////////////////////////////////////////// - type CodecData struct { ID int64 Method string @@ -51,7 +48,19 @@ type CodecData struct { Error string } -// response Error +const ( + // Errors defined in the JSON-RPC spec. See + // http://www.jsonrpc.org/specification#error_object. + CodeParseError = -32700 + CodeInvalidRequest = -32600 + CodeMethodNotFound = -32601 + CodeInvalidParams = -32602 + CodeInternalError = -32603 + codeServerErrorStart = -32099 + codeServerErrorEnd = -32000 +) + +// rsponse Error type Error struct { Code int `json:"code"` Message string `json:"message"` @@ -70,6 +79,10 @@ func (e *Error) Error() string { return string(buf) } +////////////////////////////////////////// +// json client codec +////////////////////////////////////////// + type clientRequest struct { Version string `json:"jsonrpc"` Method string `json:"method"` @@ -147,7 +160,7 @@ func (c *jsonClientCodec) Write(d *CodecData) ([]byte, error) { c.req.Method = d.Method c.req.Params = param c.req.ID = d.ID & MAX_JSONRPC_ID - // can not use d.ID. otherwise you will get error: can not find method of rsponse id 280698512 + // can not use d.ID. otherwise you will get error: can not find method of response id 280698512 c.pending[c.req.ID] = d.Method buf := bytes.NewBuffer(nil) @@ -187,3 +200,211 @@ func (c *jsonClientCodec) Read(streamBytes []byte, x interface{}) error { return jerrors.Trace(json.Unmarshal(*c.rsp.Result, x)) } + +////////////////////////////////////////// +// json server codec +////////////////////////////////////////// + +type serverRequest struct { + Version string `json:"jsonrpc"` + Method string `json:"method"` + Params *json.RawMessage `json:"params"` + ID *json.RawMessage `json:"id"` +} + +func (r *serverRequest) reset() { + r.Version = "" + r.Method = "" + if r.Params != nil { + *r.Params = (*r.Params)[:0] + } + if r.ID != nil { + *r.ID = (*r.ID)[:0] + } +} + +func (r *serverRequest) UnmarshalJSON(raw []byte) error { + r.reset() + + type req *serverRequest + // Attention: if do not define a new struct named @req, the json.Unmarshal will invoke + // (*serverRequest)UnmarshalJSON recursively. + if err := json.Unmarshal(raw, req(r)); err != nil { + return jerrors.New("bad request") + } + + var o = make(map[string]*json.RawMessage) + if err := json.Unmarshal(raw, &o); err != nil { + return jerrors.New("bad request") + } + if o["jsonrpc"] == nil || o["method"] == nil { + return jerrors.New("bad request") + } + _, okID := o["id"] + _, okParams := o["params"] + if len(o) == 3 && !(okID || okParams) || len(o) == 4 && !(okID && okParams) || len(o) > 4 { + return jerrors.New("bad request") + } + if r.Version != Version { + return jerrors.New("bad request") + } + if okParams { + if r.Params == nil || len(*r.Params) == 0 { + return jerrors.New("bad request") + } + switch []byte(*r.Params)[0] { + case '[', '{': + default: + return jerrors.New("bad request") + } + } + if okID && r.ID == nil { + r.ID = &null + } + if okID { + if len(*r.ID) == 0 { + return jerrors.New("bad request") + } + switch []byte(*r.ID)[0] { + case 't', 'f', '{', '[': + return jerrors.New("bad request") + } + } + + return nil +} + +type serverResponse struct { + Version string `json:"jsonrpc"` + ID *json.RawMessage `json:"id"` + Result interface{} `json:"result,omitempty"` + Error interface{} `json:"error,omitempty"` +} + +type ServerCodec struct { + req serverRequest +} + +var ( + null = json.RawMessage([]byte("null")) + Version = "2.0" +) + +func newServerCodec() *ServerCodec { + return &ServerCodec{} +} + +func (c *ServerCodec) ReadHeader(header map[string]string, body []byte) error { + if header["HttpMethod"] != "POST" { + return &Error{Code: -32601, Message: "Method not found"} + } + + // If return error: + // - codec will be closed + // So, try to send error reply to client before returning error. + + buf := bytes.NewBuffer(body) + defer buf.Reset() + dec := json.NewDecoder(buf) + + var raw json.RawMessage + c.req.reset() + if err := dec.Decode(&raw); err != nil { + // rspError := &Error{Code: -32700, Message: "Parse error"} + // c.resp = serverResponse{Version: Version, ID: &null, Error: rspError} + return err + } + if err := json.Unmarshal(raw, &c.req); err != nil { + // if err.Error() == "bad request" { + // rspError := &Error{Code: -32600, Message: "Invalid request"} + // c.resp = serverResponse{Version: Version, ID: &null, Error: rspError} + // } + return err + } + + return nil +} + +func (c *ServerCodec) ReadBody(x interface{}) error { + // If x!=nil and return error e: + // - Write() will be called with e.Error() in r.Error + if x == nil { + return nil + } + if c.req.Params == nil { + return nil + } + + // 在这里把请求参数json 字符串转换成了相应的struct + params := []byte(*c.req.Params) + if err := json.Unmarshal(*c.req.Params, x); err != nil { + // Note: if c.request.Params is nil it's not an error, it's an optional member. + // JSON params structured object. Unmarshal to the args object. + + if 2 < len(params) && params[0] == '[' && params[len(params)-1] == ']' { + // Clearly JSON params is not a structured object, + // fallback and attempt an unmarshal with JSON params as + // array value and RPC params is struct. Unmarshal into + // array containing the request struct. + params := [1]interface{}{x} + if err = json.Unmarshal(*c.req.Params, ¶ms); err != nil { + return &Error{Code: -32602, Message: "Invalid params, error:" + err.Error()} + } + } else { + return &Error{Code: -32602, Message: "Invalid params, error:" + err.Error()} + } + } + + return nil +} + +func NewError(code int, message string) *Error { + return &Error{Code: code, Message: message} +} + +func newError(message string) *Error { + switch { + case strings.HasPrefix(message, "rpc: service/method request ill-formed"): + return NewError(-32601, message) + case strings.HasPrefix(message, "rpc: can't find service"): + return NewError(-32601, message) + case strings.HasPrefix(message, "rpc: can't find method"): + return NewError(-32601, message) + default: + return NewError(-32000, message) + } +} + +func (c *ServerCodec) Write(errMsg string, x interface{}) ([]byte, error) { + // If return error: nothing happens. + // In r.Error will be "" or .Error() of error returned by: + // - ReadBody() + // - called RPC method + resp := serverResponse{Version: Version, ID: c.req.ID, Result: x} + if len(errMsg) == 0 { + if x == nil { + resp.Result = &null + } else { + resp.Result = x + } + } else if errMsg[0] == '{' && errMsg[len(errMsg)-1] == '}' { + // Well& this check for '{'&'}' isn't too strict, but I + // suppose we're trusting our own RPC methods (this way they + // can force sending wrong reply or many replies instead + // of one) and normal errors won't be formatted this way. + raw := json.RawMessage(errMsg) + resp.Error = &raw + } else { + raw := json.RawMessage(newError(errMsg).Error()) + resp.Error = &raw + } + + buf := bytes.NewBuffer(nil) + defer buf.Reset() + enc := json.NewEncoder(buf) + if err := enc.Encode(resp); err != nil { + return nil, jerrors.Trace(err) + } + + return buf.Bytes(), nil +} diff --git a/jsonrpc/map.go b/jsonrpc/map.go new file mode 100644 index 0000000000000000000000000000000000000000..d56aa75a449bacd433e24480489c9008f18ec8a6 --- /dev/null +++ b/jsonrpc/map.go @@ -0,0 +1,283 @@ +package jsonrpc + +import ( + "bytes" + "context" + "io" + "io/ioutil" + "net" + "net/http" + "reflect" + "strings" + "sync" + "unicode" + "unicode/utf8" +) + +import ( + log "github.com/AlexStocks/log4go" + jerrors "github.com/juju/errors" +) + +var ( + // A value sent as a placeholder for the server's response value when the server + // receives an invalid request. It is never decoded by the client since the Response + // contains an error when it is used. + invalidRequest = struct{}{} + + // Precompute the reflect type for error. Can't use error directly + // because Typeof takes an empty interface value. This is annoying. + typeOfError = reflect.TypeOf((*error)(nil)).Elem() +) + +type serviceMethod struct { + method reflect.Method // receiver method + ctxType reflect.Type // type of the request context + argsType reflect.Type // type of the request argument + replyType reflect.Type // type of the response argument +} + +func (m *serviceMethod) suiteContext(ctx context.Context) reflect.Value { + if contextv := reflect.ValueOf(ctx); contextv.IsValid() { + return contextv + } + return reflect.Zero(m.ctxType) +} + +type service struct { + name string // name of service + rcvr reflect.Value // receiver of methods for the service + rcvrType reflect.Type // type of the receiver + methods map[string]*serviceMethod // registered methods, function name -> reflect.function +} + +type serviceMap struct { + mutex sync.Mutex // protects the serviceMap + serviceMap map[string]*service // service name -> service +} + +func initServer() *serviceMap { + return &serviceMap{ + serviceMap: make(map[string]*service), + } +} + +// isExported returns true of a string is an exported (upper case) name. +func isExported(name string) bool { + rune, _ := utf8.DecodeRuneInString(name) + return unicode.IsUpper(rune) +} + +// isExportedOrBuiltin returns true if a type is exported or a builtin. +func isExportedOrBuiltin(t reflect.Type) bool { + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + // PkgPath will be non-empty even for an exported type, + // so we need to check the type name as well. + return isExported(t.Name()) || t.PkgPath() == "" +} + +func suiteMethod(method reflect.Method) *serviceMethod { + mtype := method.Type + mname := method.Name + + // Method must be exported. + if method.PkgPath != "" { + return nil + } + + var replyType, argType, ctxType reflect.Type + switch mtype.NumIn() { + case 3: + argType = mtype.In(1) + replyType = mtype.In(2) + case 4: + ctxType = mtype.In(1) + argType = mtype.In(2) + replyType = mtype.In(3) + default: + log.Error("method %s of mtype %v has wrong number of in parameters %d; needs exactly 3/4", + mname, mtype, mtype.NumIn()) + return nil + } + // First arg need not be a pointer. + if !isExportedOrBuiltin(argType) { + log.Error("argument type of method %q is not exported %v", mname, argType) + return nil + } + // Second arg must be a pointer. + if replyType.Kind() != reflect.Ptr { + log.Error("reply type of method %q is not a pointer %v", mname, replyType) + return nil + } + // Reply type must be exported. + if !isExportedOrBuiltin(replyType) { + log.Error("reply type of method %s not exported{%v}", mname, replyType) + return nil + } + // Method needs one out. + if mtype.NumOut() != 1 { + log.Error("method %q has %d out parameters; needs exactly 1", mname, mtype.NumOut()) + return nil + } + // The return type of the method must be error. + if returnType := mtype.Out(0); returnType != typeOfError { + log.Error("return type %s of method %q is not error", returnType, mname) + return nil + } + + return &serviceMethod{method: method, argsType: argType, replyType: replyType, ctxType: ctxType} +} + +func (server *serviceMap) register(rcvr Handler) (string, error) { + server.mutex.Lock() + defer server.mutex.Unlock() + if server.serviceMap == nil { + server.serviceMap = make(map[string]*service) + } + + s := new(service) + s.rcvrType = reflect.TypeOf(rcvr) + s.rcvr = reflect.ValueOf(rcvr) + sname := reflect.Indirect(s.rcvr).Type().Name() + if sname == "" { + s := "no service name for type " + s.rcvrType.String() + log.Error(s) + return "", jerrors.New(s) + } + if !isExported(sname) { + s := "type " + sname + " is not exported" + log.Error(s) + return "", jerrors.New(s) + } + + sname = rcvr.Service() + if _, dup := server.serviceMap[sname]; dup { + return "", jerrors.New("service already defined: " + sname) + } + s.name = sname + s.methods = make(map[string]*serviceMethod) + + // Install the methods + methods := "" + num := s.rcvrType.NumMethod() + for m := 0; m < num; m++ { + method := s.rcvrType.Method(m) + if mt := suiteMethod(method); mt != nil { + s.methods[method.Name] = mt + methods += method.Name + "," + } + } + + if len(s.methods) == 0 { + s := "type " + sname + " has no exported methods of suitable type" + log.Error(s) + return "", jerrors.New(s) + } + server.serviceMap[s.name] = s + + return strings.TrimSuffix(methods, ","), nil +} + +func (server *serviceMap) serveRequest(ctx context.Context, + header map[string]string, body []byte, conn net.Conn) error { + + // read request header + codec := newServerCodec() + err := codec.ReadHeader(header, body) + if err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { + return jerrors.Trace(err) + } + + return jerrors.New("server cannot decode request: " + err.Error()) + } + serviceName := header["Path"] + methodName := codec.req.Method + if len(serviceName) == 0 || len(methodName) == 0 { + codec.ReadBody(nil) + return jerrors.New("service/method request ill-formed: " + serviceName + "/" + methodName) + } + + // get method + server.mutex.Lock() + svc := server.serviceMap[serviceName] + server.mutex.Unlock() + if svc == nil { + codec.ReadBody(nil) + return jerrors.New("cannot find svc " + serviceName) + } + mtype := svc.methods[methodName] + if mtype == nil { + codec.ReadBody(nil) + return jerrors.New("cannot find method " + methodName + " of svc " + serviceName) + } + + // get args + var argv reflect.Value + argIsValue := false + if mtype.argsType.Kind() == reflect.Ptr { + argv = reflect.New(mtype.argsType.Elem()) + } else { + argv = reflect.New(mtype.argsType) + argIsValue = true + } + // argv guaranteed to be a pointer now. + if err = codec.ReadBody(argv.Interface()); err != nil { + return jerrors.Trace(err) + } + if argIsValue { + argv = argv.Elem() + } + + replyv := reflect.New(mtype.replyType.Elem()) + + // call service.method(args) + var errMsg string + returnValues := mtype.method.Func.Call([]reflect.Value{ + svc.rcvr, + mtype.suiteContext(ctx), + reflect.ValueOf(argv.Interface()), + reflect.ValueOf(replyv.Interface()), + }) + // The return value for the method is an error. + if retErr := returnValues[0].Interface(); retErr != nil { + errMsg = retErr.(error).Error() + } + + // write response + code := 200 + rspReply := replyv.Interface() + if len(errMsg) != 0 { + code = 500 + rspReply = invalidRequest + } + rspStream, err := codec.Write(errMsg, rspReply) + if err != nil { + return jerrors.Trace(err) + } + rsp := &http.Response{ + Header: make(http.Header), + StatusCode: code, + ContentLength: int64(len(rspStream)), + Body: ioutil.NopCloser(bytes.NewReader(rspStream)), + } + delete(header, "Content-Type") + delete(header, "Content-Length") + delete(header, "Timeout") + for k, v := range header { + rsp.Header.Set(k, v) + } + + rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize)) + rspBuf.Reset() + if err = rsp.Write(rspBuf); err != nil { + log.Warn("rsp.Write(rsp:%#v) = error:%s", rsp, err) + return nil + } + if _, err = rspBuf.WriteTo(conn); err != nil { + log.Warn("rspBuf.WriteTo(conn:%#v) = error:%s", conn, err) + } + return nil +} diff --git a/jsonrpc/server.go b/jsonrpc/server.go new file mode 100644 index 0000000000000000000000000000000000000000..6be864d27f5f0a9d74fcd1ed6c6071a51aeb33dc --- /dev/null +++ b/jsonrpc/server.go @@ -0,0 +1,376 @@ +package jsonrpc + +import ( + "bufio" + "bytes" + "context" + "io/ioutil" + "net" + "net/http" + "runtime" + "runtime/debug" + "strconv" + "sync" + "time" +) + +import ( + "github.com/AlexStocks/goext/net" + log "github.com/AlexStocks/log4go" + "github.com/dubbo/dubbo-go/registry" + jerrors "github.com/juju/errors" +) + +const ( + DefaultMaxSleepTime = 1 * time.Second // accept中间最大sleep interval + DefaultHTTPRspBufferSize = 1024 + PathPrefix = byte('/') +) + +// Handler interface represents a Service request handler. It's generated +// by passing any type of public concrete object with methods into server.NewHandler. +// Most will pass in a struct. +// +// Example: +// +// type Hello struct {} +// +// func (s *Hello) Method(context, request, response) error { +// return nil +// } +// +// func (s *Hello) Service() string { +// return "com.youni.service" +// } +// +// func (s *Hello) Version() string { +// return "1.0.0" +// } + +type Handler interface { + Service() string // Service Interface + Version() string +} + +type Option func(*Options) + +type Options struct { + Registry registry.Registry + ConfList []registry.ServerConfig + ServiceConfList []registry.ServiceConfig + Timeout time.Duration +} + +func newOptions(opt ...Option) Options { + opts := Options{} + for _, o := range opt { + o(&opts) + } + + if opts.Registry == nil { + panic("server.Options.Registry is nil") + } + + return opts +} + +// Registry used for discovery +func Registry(r registry.Registry) Option { + return func(o *Options) { + o.Registry = r + } +} + +func ConfList(confList []registry.ServerConfig) Option { + return func(o *Options) { + o.ConfList = confList + for i := 0; i < len(o.ConfList); i++ { + if o.ConfList[i].IP == "" { + o.ConfList[i].IP, _ = gxnet.GetLocalIP() + } + } + } +} + +func ServiceConfList(confList []registry.ServiceConfig) Option { + return func(o *Options) { + o.ServiceConfList = confList + } +} + +type Server struct { + rpc []*serviceMap + done chan struct{} + once sync.Once + + sync.RWMutex + opts Options + handlers map[string]Handler + wg sync.WaitGroup +} + +func NewServer(opts ...Option) *Server { + var ( + num int + ) + options := newOptions(opts...) + Servers := make([]*serviceMap, len(options.ConfList)) + num = len(options.ConfList) + for i := 0; i < num; i++ { + Servers[i] = initServer() + } + return &Server{ + opts: options, + rpc: Servers, + handlers: make(map[string]Handler), + done: make(chan struct{}), + } +} + +func (s *Server) handlePkg(rpc *serviceMap, conn net.Conn) { + defer func() { + if r := recover(); r != nil { + log.Warn("connection{local:%v, remote:%v} panic error:%#v, debug stack:%s", + conn.LocalAddr(), conn.RemoteAddr(), r, string(debug.Stack())) + } + + conn.Close() + }() + + setReadTimeout := func(conn net.Conn, timeout time.Duration) { + t := time.Time{} + if timeout > time.Duration(0) { + t = time.Now().Add(timeout) + } + + conn.SetDeadline(t) + } + + sendErrorResp := func(header http.Header, body []byte) error { + rsp := &http.Response{ + Header: header, + StatusCode: 500, + ContentLength: int64(len(body)), + Body: ioutil.NopCloser(bytes.NewReader(body)), + } + rsp.Header.Del("Content-Type") + rsp.Header.Del("Content-Length") + rsp.Header.Del("Timeout") + + rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize)) + rspBuf.Reset() + err := rsp.Write(rspBuf) + if err != nil { + return jerrors.Trace(err) + } + _, err = rspBuf.WriteTo(conn) + return jerrors.Trace(err) + } + + for { + bufReader := bufio.NewReader(conn) + r, err := http.ReadRequest(bufReader) + if err != nil { + return + } + + reqBody, err := ioutil.ReadAll(r.Body) + if err != nil { + return + } + r.Body.Close() + + reqHeader := make(map[string]string) + for k := range r.Header { + reqHeader[k] = r.Header.Get(k) + } + reqHeader["Path"] = r.URL.Path[1:] // to get service name + if r.URL.Path[0] != PathPrefix { + reqHeader["Path"] = r.URL.Path + } + reqHeader["HttpMethod"] = r.Method + + httpTimeout := s.Options().Timeout + contentType := reqHeader["Content-Type"] + if contentType != "application/json" && contentType != "application/json-rpc" { + setReadTimeout(conn, httpTimeout) + r.Header.Set("Content-Type", "text/plain") + if errRsp := sendErrorResp(r.Header, []byte(jerrors.ErrorStack(err))); errRsp != nil { + log.Warn("sendErrorResp(header:%#v, error:%s) = error:%s", + r.Header, jerrors.ErrorStack(err), errRsp) + } + return + } + + ctx := context.Background() + if len(reqHeader["Timeout"]) > 0 { + timeout, err := strconv.ParseUint(reqHeader["Timeout"], 10, 64) + if err == nil { + httpTimeout = time.Duration(timeout) + ctx, _ = context.WithTimeout(ctx, httpTimeout) + } + delete(reqHeader, "Timeout") + } + setReadTimeout(conn, httpTimeout) + + if err := rpc.serveRequest(ctx, reqHeader, reqBody, conn); err != nil { + if errRsp := sendErrorResp(r.Header, []byte(jerrors.ErrorStack(err))); errRsp != nil { + log.Warn("sendErrorResp(header:%#v, error:%s) = error:%s", + r.Header, jerrors.ErrorStack(err), errRsp) + } + + log.Info("Unexpected error serving request, closing socket: %v", err) + return + } + } +} + +func (s *Server) Options() Options { + s.RLock() + defer s.RUnlock() + return s.opts +} + +func (s *Server) Handle(h Handler) error { + var ( + err error + serviceConf registry.ProviderServiceConfig + ) + + opts := s.Options() + + serviceConf.Service = h.Service() + serviceConf.Version = h.Version() + + flag := 0 + serviceNum := len(opts.ServiceConfList) + ServerNum := len(opts.ConfList) + for i := 0; i < serviceNum; i++ { + if opts.ServiceConfList[i].Service == serviceConf.Service && + opts.ServiceConfList[i].Version == serviceConf.Version { + + serviceConf.Protocol = opts.ServiceConfList[i].Protocol + serviceConf.Group = opts.ServiceConfList[i].Group + // serviceConf.Version = opts.ServiceConfList[i].Version + for j := 0; j < ServerNum; j++ { + if opts.ConfList[j].Protocol == serviceConf.Protocol { + s.Lock() + serviceConf.Methods, err = s.rpc[j].register(h) + s.Unlock() + if err != nil { + return err + } + + serviceConf.Path = opts.ConfList[j].Address() + err = opts.Registry.Register(serviceConf) + if err != nil { + return err + } + flag = 1 + } + } + } + } + + if flag == 0 { + return jerrors.Errorf("fail to register Handler{service:%s, version:%s}", + serviceConf.Service, serviceConf.Version) + } + + s.Lock() + s.handlers[h.Service()] = h + s.Unlock() + + return nil +} + +func accept(listener net.Listener, fn func(net.Conn)) error { + var ( + err error + c net.Conn + ok bool + ne net.Error + tmpDelay time.Duration + ) + + for { + c, err = listener.Accept() + if err != nil { + if ne, ok = err.(net.Error); ok && ne.Temporary() { + if tmpDelay != 0 { + tmpDelay <<= 1 + } else { + tmpDelay = 5 * time.Millisecond + } + if tmpDelay > DefaultMaxSleepTime { + tmpDelay = DefaultMaxSleepTime + } + log.Info("http: Accept error: %v; retrying in %v\n", err, tmpDelay) + time.Sleep(tmpDelay) + continue + } + return jerrors.Trace(err) + } + + go func() { + defer func() { + if r := recover(); r != nil { + const size = 64 << 10 + buf := make([]byte, size) + buf = buf[:runtime.Stack(buf, false)] + log.Error("http: panic serving %v: %v\n%s", c.RemoteAddr(), r, buf) + c.Close() + } + }() + + fn(c) + }() + } +} + +func (s *Server) Start() error { + config := s.Options() + + ServerNum := len(config.ConfList) + for i := 0; i < ServerNum; i++ { + listener, err := net.Listen("tcp", config.ConfList[i].Address()) + if err != nil { + return err + } + log.Info("rpc server start to listen on %s", listener.Addr()) + + s.Lock() + rpc := s.rpc[i] + s.Unlock() + + s.wg.Add(1) + go func(servo *serviceMap) { + accept(listener, func(conn net.Conn) { s.handlePkg(rpc, conn) }) + s.wg.Done() + }(rpc) + + s.wg.Add(1) + go func(servo *serviceMap) { // Server done goroutine + var err error + <-s.done // step1: block to wait for done channel(wait Server.Stop step2) + err = listener.Close() // step2: and then close listener + if err != nil { + log.Warn("listener{addr:%s}.Close() = error{%#v}", listener.Addr(), err) + } + s.wg.Done() + }(rpc) + } + + return nil +} + +func (s *Server) Stop() { + s.once.Do(func() { + close(s.done) + s.wg.Wait() + if s.opts.Registry != nil { + s.opts.Registry.Close() + s.opts.Registry = nil + } + }) +} diff --git a/registry/config.go b/registry/config.go deleted file mode 100644 index 444cb622f882fe510b34bffc170dd1bc149ca126..0000000000000000000000000000000000000000 --- a/registry/config.go +++ /dev/null @@ -1,73 +0,0 @@ -package registry - -import ( - "fmt" -) - -////////////////////////////////////////////// -// application config -////////////////////////////////////////////// - -type ApplicationConfig struct { - Organization string - Name string - Module string - Version string - Owner string -} - -func (c *ApplicationConfig) ToString() string { - return fmt.Sprintf("ApplicationConfig is {name:%s, version:%s, owner:%s, module:%s, organization:%s}", - c.Name, c.Version, c.Owner, c.Module, c.Organization) -} - -type RegistryConfig struct { - Address []string `required:"true"` - UserName string - Password string - Timeout int `default:"5"` // unit: second -} - -////////////////////////////////////////////// -// service config -////////////////////////////////////////////// - -type ServiceConfigIf interface { - String() string - ServiceEqual(url *ServiceURL) bool -} - -type ServiceConfig struct { - Protocol string `required:"true",default:"dubbo"` - Service string `required:"true"` - Group string - Version string -} - -func (c ServiceConfig) Key() string { - return fmt.Sprintf("%s@%s", c.Service, c.Protocol) -} - -func (c ServiceConfig) String() string { - return fmt.Sprintf("%s@%s-%s-%s", c.Service, c.Protocol, c.Group, c.Version) -} - -func (c ServiceConfig) ServiceEqual(url *ServiceURL) bool { - if c.Protocol != url.Protocol { - return false - } - - if c.Service != url.Query.Get("interface") { - return false - } - - if c.Group != url.Group { - return false - } - - if c.Version != url.Version { - return false - } - - return true -} diff --git a/registry/consumer.go b/registry/consumer.go new file mode 100644 index 0000000000000000000000000000000000000000..b63abb8941e9234d52bee0966c184ebabef7f8f9 --- /dev/null +++ b/registry/consumer.go @@ -0,0 +1,636 @@ +package registry + +import ( + "fmt" + "net/url" + "os" + "sync" + "time" +) + +import ( + "github.com/AlexStocks/goext/net" + log "github.com/AlexStocks/log4go" + jerrors "github.com/juju/errors" +) + +import ( + "github.com/dubbo/dubbo-go/version" +) + +const ( + REGISTRY_CONN_DELAY = 3 +) + +var ( + ErrorRegistryNotFound = jerrors.New("registry not found") +) + +////////////////////////////////////////////// +// DubboType +////////////////////////////////////////////// + +type DubboType int + +const ( + CONSUMER = iota + CONFIGURATOR + ROUTER + PROVIDER +) + +var ( + DubboNodes = [...]string{"consumers", "configurators", "routers", "providers"} + DubboRole = [...]string{"consumer", "", "", "provider"} + RegistryZkClient = "zk registry" + processID = "" + localIP = "" +) + +func init() { + processID = fmt.Sprintf("%d", os.Getpid()) + localIP, _ = gxnet.GetLocalIP() +} + +func (t DubboType) String() string { + return DubboNodes[t] +} + +func (t DubboType) Role() string { + return DubboRole[t] +} + +////////////////////////////////////////////// +// ZkConsumerRegistry +////////////////////////////////////////////// + +const ( + DEFAULT_REGISTRY_TIMEOUT = 1 + ConsumerRegistryZkClient = "consumer zk registry" +) + +type Options struct { + ApplicationConfig + RegistryConfig // ZooKeeperServers []string + mode Mode + serviceTTL time.Duration +} + +type Option func(*Options) + +func ApplicationConf(conf ApplicationConfig) Option { + return func(o *Options) { + o.ApplicationConfig = conf + } +} + +func RegistryConf(conf RegistryConfig) Option { + return func(o *Options) { + o.RegistryConfig = conf + } +} + +func BalanceMode(mode Mode) Option { + return func(o *Options) { + o.mode = mode + } +} + +func ServiceTTL(ttl time.Duration) Option { + return func(o *Options) { + o.serviceTTL = ttl + } +} + +type ZkConsumerRegistry struct { + Options + birth int64 // time of file birth, seconds since Epoch; 0 if unknown + wg sync.WaitGroup // wg+done for zk restart + done chan struct{} + + sync.Mutex + client *zookeeperClient + services map[string]ServiceConfigIf // service name + protocol -> service config + listenerLock sync.Mutex + listener *zkEventListener + listenerServiceMap map[string]*serviceArray +} + +func NewZkConsumerRegistry(opts ...Option) (*ZkConsumerRegistry, error) { + var ( + err error + r *ZkConsumerRegistry + ) + + r = &ZkConsumerRegistry{ + birth: time.Now().Unix(), + done: make(chan struct{}), + services: make(map[string]ServiceConfigIf), + listenerServiceMap: make(map[string]*serviceArray), + } + + for _, opt := range opts { + opt(&r.Options) + } + + if len(r.Name) == 0 { + r.Name = ConsumerRegistryZkClient + } + if len(r.Version) == 0 { + r.Version = version.Version + } + if r.RegistryConfig.Timeout == 0 { + r.RegistryConfig.Timeout = DEFAULT_REGISTRY_TIMEOUT + } + err = r.validateZookeeperClient() + if err != nil { + return nil, jerrors.Trace(err) + } + + r.wg.Add(1) + go r.handleZkRestart() + r.wg.Add(1) + go r.listen() + + return r, nil +} + +func (r *ZkConsumerRegistry) isClosed() bool { + select { + case <-r.done: + return true + default: + return false + } +} + +func (r *ZkConsumerRegistry) handleZkRestart() { + var ( + err error + flag bool + failTimes int + confIf ServiceConfigIf + services []ServiceConfigIf + ) + + defer r.wg.Done() +LOOP: + for { + select { + case <-r.done: + log.Warn("(consumerZkConsumerRegistry)reconnectZkRegistry goroutine exit now...") + break LOOP + // re-register all services + case <-r.client.done(): + r.Lock() + r.client.Close() + r.client = nil + r.Unlock() + + failTimes = 0 + for { + select { + case <-r.done: + log.Warn("(consumerZkConsumerRegistry)reconnectZkRegistry goroutine exit now...") + break LOOP + case <-time.After(timeSecondDuration(failTimes * REGISTRY_CONN_DELAY)): + } + err = r.validateZookeeperClient() + if err == nil { + // copy r.services + r.Lock() + for _, confIf = range r.services { + services = append(services, confIf) + } + r.Unlock() + + flag = true + for _, confIf = range services { + err = r.register(confIf.(*ServiceConfig)) + if err != nil { + log.Error("in (consumerZkConsumerRegistry)reRegister, (consumerZkConsumerRegistry)register(conf{%#v}) = error{%#v}", + confIf.(*ServiceConfig), jerrors.ErrorStack(err)) + flag = false + break + } + } + if flag { + break + } + } + failTimes++ + if MAX_TIMES <= failTimes { + failTimes = MAX_TIMES + } + } + } + } +} + +func (r *ZkConsumerRegistry) validateZookeeperClient() error { + var ( + err error + ) + + err = nil + r.Lock() + defer r.Unlock() + if r.client == nil { + r.client, err = newZookeeperClient(ConsumerRegistryZkClient, r.Address, r.RegistryConfig.Timeout) + if err != nil { + log.Warn("newZookeeperClient(name{%s}, zk addresss{%v}, timeout{%d}) = error{%v}", + ConsumerRegistryZkClient, r.Address, r.Timeout, err) + } + } + + return jerrors.Annotatef(err, "newZookeeperClient(address:%+v)", r.Address) +} + +func (r *ZkConsumerRegistry) registerZookeeperNode(root string, data []byte) error { + var ( + err error + zkPath string + ) + + r.Lock() + defer r.Unlock() + err = r.client.Create(root) + if err != nil { + log.Error("zk.Create(root{%s}) = err{%v}", root, jerrors.ErrorStack(err)) + return jerrors.Annotatef(err, "zkclient.Create(root:%s)", root) + } + zkPath, err = r.client.RegisterTempSeq(root, data) + if err != nil { + log.Error("createTempSeqNode(root{%s}) = error{%v}", root, jerrors.ErrorStack(err)) + return jerrors.Annotatef(err, "createTempSeqNode(root{%s})", root) + } + log.Debug("create a zookeeper node:%s", zkPath) + + return nil +} + +func (r *ZkConsumerRegistry) registerTempZookeeperNode(root string, node string) error { + var ( + err error + zkPath string + ) + + r.Lock() + defer r.Unlock() + err = r.client.Create(root) + if err != nil { + log.Error("zk.Create(root{%s}) = err{%v}", root, jerrors.ErrorStack(err)) + return jerrors.Trace(err) + } + zkPath, err = r.client.RegisterTemp(root, node) + if err != nil { + log.Error("RegisterTempNode(root{%s}, node{%s}) = error{%v}", root, node, jerrors.ErrorStack(err)) + return jerrors.Annotatef(err, "RegisterTempNode(root{%s}, node{%s})", root, node) + } + log.Debug("create a zookeeper node:%s", zkPath) + + return nil +} + +func (r *ZkConsumerRegistry) register(conf *ServiceConfig) error { + var ( + err error + params url.Values + revision string + rawURL string + encodedURL string + dubboPath string + ) + + err = r.validateZookeeperClient() + if err != nil { + log.Error("client.validateZookeeperClient() = err:%#v", err) + return jerrors.Trace(err) + } + dubboPath = fmt.Sprintf("/dubbo/%s/%s", conf.Service, DubboNodes[CONSUMER]) + r.Lock() + err = r.client.Create(dubboPath) + r.Unlock() + if err != nil { + log.Error("zkClient.create(path{%s}) = error{%v}", dubboPath, jerrors.ErrorStack(err)) + return jerrors.Trace(err) + } + dubboPath = fmt.Sprintf("/dubbo/%s/%s", conf.Service, DubboNodes[PROVIDER]) + r.Lock() + err = r.client.Create(dubboPath) + r.Unlock() + if err != nil { + log.Error("zkClient.create(path{%s}) = error{%v}", dubboPath, jerrors.ErrorStack(err)) + return jerrors.Trace(err) + } + + params = url.Values{} + params.Add("interface", conf.Service) + params.Add("application", r.ApplicationConfig.Name) + revision = r.ApplicationConfig.Version + if revision == "" { + revision = "0.1.0" + } + params.Add("revision", revision) + if conf.Group != "" { + params.Add("group", conf.Group) + } + params.Add("category", (DubboType(CONSUMER)).String()) + params.Add("dubbo", "dubbogo-consumer-"+version.Version) + params.Add("org", r.Organization) + params.Add("module", r.Module) + params.Add("owner", r.Owner) + params.Add("side", (DubboType(CONSUMER)).Role()) + params.Add("pid", processID) + params.Add("ip", localIP) + params.Add("timeout", fmt.Sprintf("%v", r.Timeout)) + params.Add("timestamp", fmt.Sprintf("%d", r.birth)) + if conf.Version != "" { + params.Add("version", conf.Version) + } + rawURL = fmt.Sprintf("%s://%s/%s?%s", conf.Protocol, localIP, conf.Service+conf.Version, params.Encode()) + encodedURL = url.QueryEscape(rawURL) + + dubboPath = fmt.Sprintf("/dubbo/%s/%s", conf.Service, (DubboType(CONSUMER)).String()) + log.Debug("consumer path:%s, url:%s", dubboPath, rawURL) + err = r.registerTempZookeeperNode(dubboPath, encodedURL) + if err != nil { + return jerrors.Trace(err) + } + + return nil +} + +func (r *ZkConsumerRegistry) Register(conf ServiceConfig) error { + var ( + ok bool + err error + listener *zkEventListener + ) + + ok = false + r.Lock() + _, ok = r.services[conf.Key()] + r.Unlock() + if ok { + return jerrors.Errorf("Service{%s} has been registered", conf.Service) + } + + err = r.register(&conf) + if err != nil { + return jerrors.Trace(err) + } + + r.Lock() + r.services[conf.Key()] = &conf + r.Unlock() + log.Debug("(consumerZkConsumerRegistry)Register(conf{%#v})", conf) + + r.listenerLock.Lock() + listener = r.listener + r.listenerLock.Unlock() + if listener != nil { + go listener.listenServiceEvent(conf) + } + + return nil +} + +// name: service@protocol +func (r *ZkConsumerRegistry) get(sc ServiceConfig) ([]*ServiceURL, error) { + var ( + ok bool + err error + dubboPath string + nodes []string + listener *zkEventListener + serviceURL *ServiceURL + serviceConfIf ServiceConfigIf + serviceConf *ServiceConfig + ) + + r.listenerLock.Lock() + listener = r.listener + r.listenerLock.Unlock() + + if listener != nil { + listener.listenServiceEvent(sc) + } + + r.Lock() + serviceConfIf, ok = r.services[sc.Key()] + r.Unlock() + if !ok { + return nil, jerrors.Errorf("Service{%s} has not been registered", sc.Key()) + } + serviceConf, ok = serviceConfIf.(*ServiceConfig) + if !ok { + return nil, jerrors.Errorf("Service{%s}: failed to get serviceConfigIf type", sc.Key()) + } + + dubboPath = fmt.Sprintf("/dubbo/%s/providers", sc.Service) + err = r.validateZookeeperClient() + if err != nil { + return nil, jerrors.Trace(err) + } + r.Lock() + nodes, err = r.client.getChildren(dubboPath) + r.Unlock() + if err != nil { + log.Warn("getChildren(dubboPath{%s}) = error{%v}", dubboPath, err) + return nil, jerrors.Trace(err) + } + + var listenerServiceMap = make(map[string]*ServiceURL) + for _, n := range nodes { + serviceURL, err = NewServiceURL(n) + if err != nil { + log.Error("NewServiceURL({%s}) = error{%v}", n, err) + continue + } + if !serviceConf.ServiceEqual(serviceURL) { + log.Warn("serviceURL{%s} is not compatible with ServiceConfig{%#v}", serviceURL, serviceConf) + continue + } + + _, ok := listenerServiceMap[serviceURL.Query.Get(serviceURL.Location)] + if !ok { + listenerServiceMap[serviceURL.Location] = serviceURL + continue + } + } + + var services []*ServiceURL + for _, service := range listenerServiceMap { + services = append(services, service) + } + + return services, nil +} + +func (r *ZkConsumerRegistry) Filter(s ServiceConfigIf, reqID int64) (*ServiceURL, error) { + var serviceConf ServiceConfig + if scp, ok := s.(*ServiceConfig); ok { + serviceConf = *scp + } else if sc, ok := s.(ServiceConfig); ok { + serviceConf = sc + } else { + return nil, jerrors.Errorf("illegal @s:%#v", s) + } + + serviceKey := serviceConf.Key() + + r.listenerLock.Lock() + svcArr, sok := r.listenerServiceMap[serviceKey] + log.Debug("r.svcArr[serviceString{%v}] = svcArr{%s}", serviceKey, svcArr) + if sok { + if serviceURL, err := svcArr.Select(reqID, r.Options.mode, r.Options.serviceTTL); err == nil { + r.listenerLock.Unlock() + return serviceURL, nil + } + } + r.listenerLock.Unlock() + + svcs, err := r.get(serviceConf) + r.listenerLock.Lock() + defer r.listenerLock.Unlock() + if err != nil { + log.Error("Registry.get(conf:%+v) = {err:%s, svcs:%+v}", + serviceConf, jerrors.ErrorStack(err), svcs) + if sok && len(svcArr.arr) > 0 { + log.Error("serviceArray{%v} timeout, can not get new, use old instead", svcArr) + service, err := svcArr.Select(reqID, r.Options.mode, 0) + return service, jerrors.Trace(err) + } + + return nil, jerrors.Trace(err) + } + + newSvcArr := newServiceArray(svcs) + service, err := newSvcArr.Select(reqID, r.Options.mode, 0) + r.listenerServiceMap[serviceKey] = newSvcArr + return service, jerrors.Trace(err) +} + +func (r *ZkConsumerRegistry) getListener() (*zkEventListener, error) { + var ( + ok bool + zkListener *zkEventListener + serviceConf *ServiceConfig + ) + + r.listenerLock.Lock() + zkListener = r.listener + r.listenerLock.Unlock() + if zkListener != nil { + return zkListener, nil + } + + r.Lock() + client := r.client + r.Unlock() + if client == nil { + return nil, jerrors.New("zk connection broken") + } + + // new client & listener + zkListener = newZkEventListener(client) + + r.listenerLock.Lock() + r.listener = zkListener + r.listenerLock.Unlock() + + // listen + r.Lock() + for _, service := range r.services { + if serviceConf, ok = service.(*ServiceConfig); ok { + go zkListener.listenServiceEvent(*serviceConf) + } + } + r.Unlock() + + return zkListener, nil +} + +func (r *ZkConsumerRegistry) update(res *ServiceURLEvent) { + if res == nil || res.Service == nil { + return + } + + log.Debug("registry update, result{%s}", res) + serviceKey := res.Service.ServiceConfig().Key() + + r.listenerLock.Lock() + defer r.listenerLock.Unlock() + + svcArr, ok := r.listenerServiceMap[serviceKey] + log.Debug("service name:%s, its current member lists:%+v", serviceKey, svcArr) + + switch res.Action { + case ServiceURLAdd: + if ok { + svcArr.Add(res.Service, r.Options.serviceTTL) + } else { + r.listenerServiceMap[serviceKey] = newServiceArray([]*ServiceURL{res.Service}) + } + case ServiceURLDel: + if ok { + svcArr.Del(res.Service, r.Options.serviceTTL) + if len(svcArr.arr) == 0 { + delete(r.listenerServiceMap, serviceKey) + log.Warn("delete service %s from service map", serviceKey) + } + } + log.Error("selector delete serviceURL{%s}", *res.Service) + } +} + +func (r *ZkConsumerRegistry) listen() { + defer r.wg.Done() + + for { + if r.isClosed() { + log.Warn("event listener game over.") + return + } + + listener, err := r.getListener() + if err != nil { + if r.isClosed() { + log.Warn("event listener game over.") + return + } + log.Warn("getListener() = err:%s", jerrors.ErrorStack(err)) + time.Sleep(timeSecondDuration(REGISTRY_CONN_DELAY)) + continue + } + + if err = listener.listenEvent(r); err != nil { + log.Warn("Selector.watch() = error{%v}", jerrors.ErrorStack(err)) + + r.listenerLock.Lock() + r.listener = nil + r.listenerLock.Unlock() + + listener.close() + + time.Sleep(timeSecondDuration(REGISTRY_CONN_DELAY)) + continue + } + } +} + +func (r *ZkConsumerRegistry) closeRegisters() { + r.Lock() + log.Info("begin to close zk client") + r.client.Close() + r.client = nil + r.services = nil + r.Unlock() +} + +func (r *ZkConsumerRegistry) Close() { + close(r.done) + r.wg.Wait() + r.closeRegisters() +} diff --git a/registry/provider.go b/registry/provider.go new file mode 100644 index 0000000000000000000000000000000000000000..e32bc24516719bfc06ea3b9df5d652eb3b26099b --- /dev/null +++ b/registry/provider.go @@ -0,0 +1,357 @@ +package registry + +import ( + "fmt" + "net/url" + "strconv" + "sync" + "time" +) + +import ( + log "github.com/AlexStocks/log4go" + jerrors "github.com/juju/errors" +) + +import ( + "github.com/dubbo/dubbo-go/version" +) + +////////////////////////////////////////////// +// ZkProviderRegistry +////////////////////////////////////////////// + +const ( + ProviderRegistryZkClient = "provider zk registry" +) + +type ProviderServiceConfig struct { + ServiceConfig + Path string + Methods string +} + +func (c ProviderServiceConfig) String() string { + return fmt.Sprintf( + "%s@%s-%s-%s-%s/%s", + c.ServiceConfig.Service, + c.ServiceConfig.Protocol, + c.ServiceConfig.Group, + c.ServiceConfig.Version, + c.Path, + c.Methods, + ) +} + +func (c ProviderServiceConfig) ServiceEqual(url *ServiceURL) bool { + if c.ServiceConfig.Protocol != url.Protocol { + return false + } + + if c.ServiceConfig.Service != url.Query.Get("interface") { + return false + } + + if c.Group != "" && c.ServiceConfig.Group != url.Group { + return false + } + + if c.ServiceConfig.Version != "" && c.ServiceConfig.Version != url.Version { + return false + } + + if c.Path != url.Path { + return false + } + + if c.Methods != url.Query.Get("methods") { + return false + } + + return true +} + +type ZkProviderRegistry struct { + Options + birth int64 // time of file birth, seconds since Epoch; 0 if unknown + wg sync.WaitGroup // wg+done for zk restart + done chan struct{} + sync.Mutex // lock for client + services + client *zookeeperClient + services map[string]ServiceConfigIf // service name + protocol -> service config + zkPath map[string]int // key = protocol://ip:port/interface +} + +func NewZkProviderRegistry(opts ...Option) (*ZkProviderRegistry, error) { + r := &ZkProviderRegistry{ + birth: time.Now().Unix(), + done: make(chan struct{}), + services: make(map[string]ServiceConfigIf), + zkPath: make(map[string]int), + } + + for _, o := range opts { + o(&r.Options) + } + + if r.Name == "" { + r.Name = ProviderRegistryZkClient + } + if r.Version == "" { + r.Version = version.Version + } + if r.RegistryConfig.Timeout == 0 { + r.RegistryConfig.Timeout = DEFAULT_REGISTRY_TIMEOUT + } + err := r.validateZookeeperClient() + if err != nil { + return nil, jerrors.Trace(err) + } + r.wg.Add(1) + go r.handleZkRestart() + + return r, nil +} + +func (r *ZkProviderRegistry) validateZookeeperClient() error { + var ( + err error + ) + + err = nil + r.Lock() + if r.client == nil { + r.client, err = newZookeeperClient(ProviderRegistryZkClient, r.Address, r.RegistryConfig.Timeout) + if err != nil { + log.Warn("newZookeeperClient(name{%s}, zk addresss{%v}, timeout{%d}) = error{%#v}", + ProviderRegistryZkClient, r.Address, r.Timeout, jerrors.ErrorStack(err)) + } + } + r.Unlock() + + return jerrors.Annotatef(err, "newZookeeperClient(ProviderRegistryZkClient, addr:%+v)", r.Address) +} + +func (r *ZkProviderRegistry) Register(c interface{}) error { + var ( + ok bool + err error + conf ProviderServiceConfig + ) + + if conf, ok = c.(ProviderServiceConfig); !ok { + return jerrors.Errorf("@c{%v} type is not ServiceConfig", c) + } + + // 检验服务是否已经注册过 + ok = false + r.Lock() + // 注意此处与consumerZookeeperRegistry的差异,consumer用的是conf.Service, + // 因为consumer要提供watch功能给selector使用, provider允许注册同一个service的多个group or version + _, ok = r.services[conf.String()] + r.Unlock() + if ok { + return jerrors.Errorf("Service{%s} has been registered", conf.String()) + } + + err = r.register(&conf) + if err != nil { + return jerrors.Annotatef(err, "register(conf:%+v)", conf) + } + + r.Lock() + r.services[conf.String()] = &conf + log.Debug("(ZkProviderRegistry)Register(conf{%#v})", conf) + r.Unlock() + + return nil +} + +func (r *ZkProviderRegistry) registerTempZookeeperNode(root string, node string) error { + var ( + err error + zkPath string + ) + + r.Lock() + defer r.Unlock() + err = r.client.Create(root) + if err != nil { + log.Error("zk.Create(root{%s}) = err{%s}", root, jerrors.ErrorStack(err)) + return jerrors.Trace(err) + } + zkPath, err = r.client.RegisterTemp(root, node) + if err != nil { + log.Error("RegisterTempNode(root{%s}, node{%s}) = error{%v}", root, node, jerrors.ErrorStack(err)) + return jerrors.Annotatef(err, "RegisterTempNode(root{%s}, node{%s})", root, node) + } + log.Debug("create a zookeeper node:%s", zkPath) + + return nil +} + +func (r *ZkProviderRegistry) register(conf *ProviderServiceConfig) error { + var ( + err error + revision string + params url.Values + urlPath string + rawURL string + encodedURL string + dubboPath string + ) + + if conf.ServiceConfig.Service == "" || conf.Methods == "" { + return jerrors.Errorf("conf{Service:%s, Methods:%s}", conf.ServiceConfig.Service, conf.Methods) + } + + err = r.validateZookeeperClient() + if err != nil { + return jerrors.Trace(err) + } + // 先创建服务下面的provider node + dubboPath = fmt.Sprintf("/dubbo/%s/%s", conf.Service, DubboNodes[PROVIDER]) + r.Lock() + err = r.client.Create(dubboPath) + r.Unlock() + if err != nil { + log.Error("zkClient.create(path{%s}) = error{%#v}", dubboPath, jerrors.ErrorStack(err)) + return jerrors.Annotatef(err, "zkclient.Create(path:%s)", dubboPath) + } + + params = url.Values{} + params.Add("interface", conf.ServiceConfig.Service) + params.Add("application", r.ApplicationConfig.Name) + revision = r.ApplicationConfig.Version + if revision == "" { + revision = "0.1.0" + } + params.Add("revision", revision) // revision是pox.xml中application的version属性的值 + if conf.ServiceConfig.Group != "" { + params.Add("group", conf.ServiceConfig.Group) + } + // dubbo java consumer来启动找provider url时,因为category不匹配,会找不到provider,导致consumer启动不了,所以使用consumers&providers + // DubboRole = [...]string{"consumer", "", "", "provider"} + // params.Add("category", (DubboType(PROVIDER)).Role()) + params.Add("category", (DubboType(PROVIDER)).String()) + params.Add("dubbo", "dubbo-provider-golang-"+version.Version) + params.Add("org", r.ApplicationConfig.Organization) + params.Add("module", r.ApplicationConfig.Module) + params.Add("owner", r.ApplicationConfig.Owner) + params.Add("side", (DubboType(PROVIDER)).Role()) + params.Add("pid", processID) + params.Add("ip", localIP) + params.Add("timeout", fmt.Sprintf("%v", r.Timeout)) + // params.Add("timestamp", time.Now().Format("20060102150405")) + params.Add("timestamp", fmt.Sprintf("%d", r.birth)) + if conf.ServiceConfig.Version != "" { + params.Add("version", conf.ServiceConfig.Version) + } + if conf.Methods != "" { + params.Add("methods", conf.Methods) + } + log.Debug("provider zk url params:%#v", params) + if conf.Path == "" { + conf.Path = localIP + } + + urlPath = conf.Service + if r.zkPath[urlPath] != 0 { + urlPath += strconv.Itoa(r.zkPath[urlPath]) + } + r.zkPath[urlPath]++ + rawURL = fmt.Sprintf("%s://%s/%s?%s", conf.Protocol, conf.Path, urlPath, params.Encode()) + encodedURL = url.QueryEscape(rawURL) + + // 把自己注册service providers + dubboPath = fmt.Sprintf("/dubbo/%s/%s", conf.Service, (DubboType(PROVIDER)).String()) + err = r.registerTempZookeeperNode(dubboPath, encodedURL) + log.Debug("provider path:%s, url:%s", dubboPath, rawURL) + if err != nil { + return jerrors.Annotatef(err, "registerTempZookeeperNode(path:%s, url:%s)", dubboPath, rawURL) + } + + return nil +} + +func (r *ZkProviderRegistry) handleZkRestart() { + var ( + err error + flag bool + failTimes int + confIf ServiceConfigIf + services []ServiceConfigIf + ) + + defer r.wg.Done() +LOOP: + for { + select { + case <-r.done: + log.Warn("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...") + break LOOP + // re-register all services + case <-r.client.done(): + r.Lock() + r.client.Close() + r.client = nil + r.Unlock() + + // 接zk,直至成功 + failTimes = 0 + for { + select { + case <-r.done: + log.Warn("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...") + break LOOP + case <-time.After(time.Duration(1e9 * failTimes * REGISTRY_CONN_DELAY)): // 防止疯狂重连zk + } + err = r.validateZookeeperClient() + log.Info("ZkProviderRegistry.validateZookeeperClient(zkAddr{%s}) = error{%#v}", + r.client.zkAddrs, jerrors.ErrorStack(err)) + if err == nil { + // copy r.services + r.Lock() + for _, confIf = range r.services { + services = append(services, confIf) + } + r.Unlock() + + flag = true + for _, confIf = range services { + err = r.register(confIf.(*ProviderServiceConfig)) + if err != nil { + log.Error("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}", + confIf.(*ProviderServiceConfig), jerrors.ErrorStack(err)) + flag = false + break + } + } + if flag { + break + } + } + failTimes++ + if MAX_TIMES <= failTimes { + failTimes = MAX_TIMES + } + } + } + } +} + +func (r *ZkProviderRegistry) closeRegisters() { + r.Lock() + defer r.Unlock() + log.Info("begin to close provider zk client") + // 先关闭旧client,以关闭tmp node + r.client.Close() + r.client = nil + r.services = nil +} + +func (r *ZkProviderRegistry) Close() { + close(r.done) + r.wg.Wait() + r.closeRegisters() +} diff --git a/registry/registry.go b/registry/registry.go index 1e4fdd554d62d84dc4579dcdc24924a17be1abb3..cf97e1c38ca8f6e3eb50e2250289ffb66951bbef 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -2,635 +2,96 @@ package registry import ( "fmt" - "net/url" - "os" - "sync" - "time" ) import ( "github.com/AlexStocks/goext/net" - log "github.com/AlexStocks/log4go" - jerrors "github.com/juju/errors" -) - -import ( - "github.com/dubbo/dubbo-go/version" -) - -const ( - REGISTRY_CONN_DELAY = 3 -) - -var ( - ErrorRegistryNotFound = jerrors.New("registry not found") ) ////////////////////////////////////////////// -// DubboType +// Registry Interface ////////////////////////////////////////////// -type DubboType int - -const ( - CONSUMER = iota - CONFIGURATOR - ROUTER - PROVIDER -) - -var ( - DubboNodes = [...]string{"consumers", "configurators", "routers", "providers"} - DubboRole = [...]string{"consumer", "", "", "provider"} - RegistryZkClient = "zk registry" - processID = "" - localIP = "" -) - -func init() { - processID = fmt.Sprintf("%d", os.Getpid()) - localIP, _ = gxnet.GetLocalIP() -} - -func (t DubboType) String() string { - return DubboNodes[t] -} - -func (t DubboType) Role() string { - return DubboRole[t] +// for service discovery/registry +type Registry interface { + Register(conf interface{}) error + Close() } ////////////////////////////////////////////// -// ZkConsumerRegistry +// application config ////////////////////////////////////////////// -const ( - DEFAULT_REGISTRY_TIMEOUT = 1 - ConsumerRegistryZkClient = "consumer zk registry" -) - -type Options struct { - ApplicationConfig - RegistryConfig // ZooKeeperServers []string - mode Mode - serviceTTL time.Duration -} - -type Option func(*Options) - -func ApplicationConf(conf ApplicationConfig) Option { - return func(o *Options) { - o.ApplicationConfig = conf - } -} - -func RegistryConf(conf RegistryConfig) Option { - return func(o *Options) { - o.RegistryConfig = conf - } -} - -func BalanceMode(mode Mode) Option { - return func(o *Options) { - o.mode = mode - } -} - -func ServiceTTL(ttl time.Duration) Option { - return func(o *Options) { - o.serviceTTL = ttl - } -} - -type ZkConsumerRegistry struct { - Options - birth int64 // time of file birth, seconds since Epoch; 0 if unknown - wg sync.WaitGroup // wg+done for zk restart - done chan struct{} - - sync.Mutex - client *zookeeperClient - services map[string]ServiceConfigIf // service name + protocol -> service config - listenerLock sync.Mutex - listener *zkEventListener - listenerServiceMap map[string]*serviceArray +type ApplicationConfig struct { + Organization string `yaml:"organization" json:"organization,omitempty"` + Name string `yaml:"name" json:"name,omitempty"` + Module string `yaml:"module" json:"module,omitempty"` + Version string `yaml:"version" json:"version,omitempty"` + Owner string `yaml:"owner" json:"owner,omitempty"` } -func NewZkConsumerRegistry(opts ...Option) (*ZkConsumerRegistry, error) { - var ( - err error - r *ZkConsumerRegistry - ) - - r = &ZkConsumerRegistry{ - birth: time.Now().Unix(), - done: make(chan struct{}), - services: make(map[string]ServiceConfigIf), - listenerServiceMap: make(map[string]*serviceArray), - } - - for _, opt := range opts { - opt(&r.Options) - } - - if len(r.Name) == 0 { - r.Name = ConsumerRegistryZkClient - } - if len(r.Version) == 0 { - r.Version = version.Version - } - if r.RegistryConfig.Timeout == 0 { - r.RegistryConfig.Timeout = DEFAULT_REGISTRY_TIMEOUT - } - err = r.validateZookeeperClient() - if err != nil { - return nil, jerrors.Trace(err) - } - - r.wg.Add(1) - go r.handleZkRestart() - r.wg.Add(1) - go r.listen() - - return r, nil -} - -func (r *ZkConsumerRegistry) isClosed() bool { - select { - case <-r.done: - return true - default: - return false - } -} - -func (r *ZkConsumerRegistry) handleZkRestart() { - var ( - err error - flag bool - failTimes int - confIf ServiceConfigIf - services []ServiceConfigIf - ) - - defer r.wg.Done() -LOOP: - for { - select { - case <-r.done: - log.Warn("(consumerZkConsumerRegistry)reconnectZkRegistry goroutine exit now...") - break LOOP - // re-register all services - case <-r.client.done(): - r.Lock() - r.client.Close() - r.client = nil - r.Unlock() - - failTimes = 0 - for { - select { - case <-r.done: - log.Warn("(consumerZkConsumerRegistry)reconnectZkRegistry goroutine exit now...") - break LOOP - case <-time.After(timeSecondDuration(failTimes * REGISTRY_CONN_DELAY)): - } - err = r.validateZookeeperClient() - if err == nil { - // copy r.services - r.Lock() - for _, confIf = range r.services { - services = append(services, confIf) - } - r.Unlock() - - flag = true - for _, confIf = range services { - err = r.register(confIf.(*ServiceConfig)) - if err != nil { - log.Error("in (consumerZkConsumerRegistry)reRegister, (consumerZkConsumerRegistry)register(conf{%#v}) = error{%#v}", - confIf.(*ServiceConfig), jerrors.ErrorStack(err)) - flag = false - break - } - } - if flag { - break - } - } - failTimes++ - if MAX_TIMES <= failTimes { - failTimes = MAX_TIMES - } - } - } - } +func (c *ApplicationConfig) ToString() string { + return fmt.Sprintf("ApplicationConfig is {name:%s, version:%s, owner:%s, module:%s, organization:%s}", + c.Name, c.Version, c.Owner, c.Module, c.Organization) } -func (r *ZkConsumerRegistry) validateZookeeperClient() error { - var ( - err error - ) - - err = nil - r.Lock() - defer r.Unlock() - if r.client == nil { - r.client, err = newZookeeperClient(ConsumerRegistryZkClient, r.Address, r.RegistryConfig.Timeout) - if err != nil { - log.Warn("newZookeeperClient(name{%s}, zk addresss{%v}, timeout{%d}) = error{%v}", - ConsumerRegistryZkClient, r.Address, r.Timeout, err) - } - } - - return jerrors.Annotatef(err, "newZookeeperClient(address:%+v)", r.Address) +type RegistryConfig struct { + Address []string `required:"true" yaml:"address" json:"address,omitempty"` + UserName string `yaml:"user_name" json:"user_name,omitempty"` + Password string `yaml:"password" json:"password,omitempty"` + Timeout int `yaml:"timeout" default:"5" json:"timeout,omitempty"` // unit: second } -func (r *ZkConsumerRegistry) registerZookeeperNode(root string, data []byte) error { - var ( - err error - zkPath string - ) - - r.Lock() - defer r.Unlock() - err = r.client.Create(root) - if err != nil { - log.Error("zk.Create(root{%s}) = err{%v}", root, jerrors.ErrorStack(err)) - return jerrors.Annotatef(err, "zkclient.Create(root:%s)", root) - } - zkPath, err = r.client.RegisterTempSeq(root, data) - if err != nil { - log.Error("createTempSeqNode(root{%s}) = error{%v}", root, jerrors.ErrorStack(err)) - return jerrors.Annotatef(err, "createTempSeqNode(root{%s})", root) - } - log.Debug("create a zookeeper node:%s", zkPath) - - return nil -} - -func (r *ZkConsumerRegistry) registerTempZookeeperNode(root string, node string) error { - var ( - err error - zkPath string - ) - - r.Lock() - defer r.Unlock() - err = r.client.Create(root) - if err != nil { - log.Error("zk.Create(root{%s}) = err{%v}", root, jerrors.ErrorStack(err)) - return jerrors.Trace(err) - } - zkPath, err = r.client.RegisterTemp(root, node) - if err != nil { - log.Error("RegisterTempNode(root{%s}, node{%s}) = error{%v}", root, node, jerrors.ErrorStack(err)) - return jerrors.Annotatef(err, "RegisterTempNode(root{%s}, node{%s})", root, node) - } - log.Debug("create a zookeeper node:%s", zkPath) - - return nil -} - -func (r *ZkConsumerRegistry) register(conf *ServiceConfig) error { - var ( - err error - params url.Values - revision string - rawURL string - encodedURL string - dubboPath string - ) - - err = r.validateZookeeperClient() - if err != nil { - log.Error("client.validateZookeeperClient() = err:%#v", err) - return jerrors.Trace(err) - } - dubboPath = fmt.Sprintf("/dubbo/%s/%s", conf.Service, DubboNodes[CONSUMER]) - r.Lock() - err = r.client.Create(dubboPath) - r.Unlock() - if err != nil { - log.Error("zkClient.create(path{%s}) = error{%v}", dubboPath, jerrors.ErrorStack(err)) - return jerrors.Trace(err) - } - dubboPath = fmt.Sprintf("/dubbo/%s/%s", conf.Service, DubboNodes[PROVIDER]) - r.Lock() - err = r.client.Create(dubboPath) - r.Unlock() - if err != nil { - log.Error("zkClient.create(path{%s}) = error{%v}", dubboPath, jerrors.ErrorStack(err)) - return jerrors.Trace(err) - } - - params = url.Values{} - params.Add("interface", conf.Service) - params.Add("application", r.ApplicationConfig.Name) - revision = r.ApplicationConfig.Version - if revision == "" { - revision = "0.1.0" - } - params.Add("revision", revision) - if conf.Group != "" { - params.Add("group", conf.Group) - } - params.Add("category", (DubboType(CONSUMER)).String()) - params.Add("dubbo", "dubbogo-consumer-"+version.Version) - params.Add("org", r.Organization) - params.Add("module", r.Module) - params.Add("owner", r.Owner) - params.Add("side", (DubboType(CONSUMER)).Role()) - params.Add("pid", processID) - params.Add("ip", localIP) - params.Add("timeout", fmt.Sprintf("%v", r.Timeout)) - params.Add("timestamp", fmt.Sprintf("%d", r.birth)) - if conf.Version != "" { - params.Add("version", conf.Version) - } - rawURL = fmt.Sprintf("%s://%s/%s?%s", conf.Protocol, localIP, conf.Service+conf.Version, params.Encode()) - encodedURL = url.QueryEscape(rawURL) - - dubboPath = fmt.Sprintf("/dubbo/%s/%s", conf.Service, (DubboType(CONSUMER)).String()) - log.Debug("consumer path:%s, url:%s", dubboPath, rawURL) - err = r.registerTempZookeeperNode(dubboPath, encodedURL) - if err != nil { - return jerrors.Trace(err) - } +////////////////////////////////////////////// +// service config +////////////////////////////////////////////// - return nil +type ServiceConfigIf interface { + String() string + ServiceEqual(url *ServiceURL) bool } -func (r *ZkConsumerRegistry) Register(conf ServiceConfig) error { - var ( - ok bool - err error - listener *zkEventListener - ) - - ok = false - r.Lock() - _, ok = r.services[conf.Key()] - r.Unlock() - if ok { - return jerrors.Errorf("Service{%s} has been registered", conf.Service) - } - - err = r.register(&conf) - if err != nil { - return jerrors.Trace(err) - } - - r.Lock() - r.services[conf.Key()] = &conf - r.Unlock() - log.Debug("(consumerZkConsumerRegistry)Register(conf{%#v})", conf) - - r.listenerLock.Lock() - listener = r.listener - r.listenerLock.Unlock() - if listener != nil { - go listener.listenServiceEvent(conf) - } - - return nil +type ServiceConfig struct { + Protocol string `required:"true",default:"dubbo" yaml:"protocol" json:"protocol,omitempty"` + Service string `required:"true" yaml:"service" json:"service,omitempty"` + Group string `yaml:"group" json:"group,omitempty"` + Version string `yaml:"version" json:"version,omitempty"` } -// name: service@protocol -func (r *ZkConsumerRegistry) get(sc ServiceConfig) ([]*ServiceURL, error) { - var ( - ok bool - err error - dubboPath string - nodes []string - listener *zkEventListener - serviceURL *ServiceURL - serviceConfIf ServiceConfigIf - serviceConf *ServiceConfig - ) - - r.listenerLock.Lock() - listener = r.listener - r.listenerLock.Unlock() - - if listener != nil { - listener.listenServiceEvent(sc) - } - - r.Lock() - serviceConfIf, ok = r.services[sc.Key()] - r.Unlock() - if !ok { - return nil, jerrors.Errorf("Service{%s} has not been registered", sc.Key()) - } - serviceConf, ok = serviceConfIf.(*ServiceConfig) - if !ok { - return nil, jerrors.Errorf("Service{%s}: failed to get serviceConfigIf type", sc.Key()) - } - - dubboPath = fmt.Sprintf("/dubbo/%s/providers", sc.Service) - err = r.validateZookeeperClient() - if err != nil { - return nil, jerrors.Trace(err) - } - r.Lock() - nodes, err = r.client.getChildren(dubboPath) - r.Unlock() - if err != nil { - log.Warn("getChildren(dubboPath{%s}) = error{%v}", dubboPath, err) - return nil, jerrors.Trace(err) - } - - var listenerServiceMap = make(map[string]*ServiceURL) - for _, n := range nodes { - serviceURL, err = NewServiceURL(n) - if err != nil { - log.Error("NewServiceURL({%s}) = error{%v}", n, err) - continue - } - if !serviceConf.ServiceEqual(serviceURL) { - log.Warn("serviceURL{%s} is not compatible with ServiceConfig{%#v}", serviceURL, serviceConf) - continue - } - - _, ok := listenerServiceMap[serviceURL.Query.Get(serviceURL.Location)] - if !ok { - listenerServiceMap[serviceURL.Location] = serviceURL - continue - } - } - - var services []*ServiceURL - for _, service := range listenerServiceMap { - services = append(services, service) - } - - return services, nil +func (c ServiceConfig) Key() string { + return fmt.Sprintf("%s@%s", c.Service, c.Protocol) } -func (r *ZkConsumerRegistry) Filter(s ServiceConfigIf, reqID int64) (*ServiceURL, error) { - var serviceConf ServiceConfig - if scp, ok := s.(*ServiceConfig); ok { - serviceConf = *scp - } else if sc, ok := s.(ServiceConfig); ok { - serviceConf = sc - } else { - return nil, jerrors.Errorf("illegal @s:%#v", s) - } - - serviceKey := serviceConf.Key() - - r.listenerLock.Lock() - svcArr, sok := r.listenerServiceMap[serviceKey] - log.Debug("r.svcArr[serviceString{%v}] = svcArr{%s}", serviceKey, svcArr) - if sok { - if serviceURL, err := svcArr.Select(reqID, r.Options.mode, r.Options.serviceTTL); err == nil { - r.listenerLock.Unlock() - return serviceURL, nil - } - } - r.listenerLock.Unlock() - - svcs, err := r.get(serviceConf) - r.listenerLock.Lock() - defer r.listenerLock.Unlock() - if err != nil { - log.Error("Registry.get(conf:%+v) = {err:%r, svcs:%+v}", - serviceConf, jerrors.ErrorStack(err), svcs) - if sok && len(svcArr.arr) > 0 { - log.Error("serviceArray{%v} timeout, can not get new, use old instead", svcArr) - service, err := svcArr.Select(reqID, r.Options.mode, 0) - return service, jerrors.Trace(err) - } - - return nil, jerrors.Trace(err) - } - - newSvcArr := newServiceArray(svcs) - service, err := newSvcArr.Select(reqID, r.Options.mode, 0) - r.listenerServiceMap[serviceKey] = newSvcArr - return service, jerrors.Trace(err) +func (c ServiceConfig) String() string { + return fmt.Sprintf("%s@%s-%s-%s", c.Service, c.Protocol, c.Group, c.Version) } -func (r *ZkConsumerRegistry) getListener() (*zkEventListener, error) { - var ( - ok bool - zkListener *zkEventListener - serviceConf *ServiceConfig - ) - - r.listenerLock.Lock() - zkListener = r.listener - r.listenerLock.Unlock() - if zkListener != nil { - return zkListener, nil - } - - r.Lock() - client := r.client - r.Unlock() - if client == nil { - return nil, jerrors.New("zk connection broken") +func (c ServiceConfig) ServiceEqual(url *ServiceURL) bool { + if c.Protocol != url.Protocol { + return false } - // new client & listener - zkListener = newZkEventListener(client) - - r.listenerLock.Lock() - r.listener = zkListener - r.listenerLock.Unlock() - - // listen - r.Lock() - for _, service := range r.services { - if serviceConf, ok = service.(*ServiceConfig); ok { - go zkListener.listenServiceEvent(*serviceConf) - } + if c.Service != url.Query.Get("interface") { + return false } - r.Unlock() - - return zkListener, nil -} -func (r *ZkConsumerRegistry) update(res *ServiceURLEvent) { - if res == nil || res.Service == nil { - return + if c.Group != url.Group { + return false } - log.Debug("registry update, result{%s}", res) - serviceKey := res.Service.ServiceConfig().Key() - - r.listenerLock.Lock() - defer r.listenerLock.Unlock() - - svcArr, ok := r.listenerServiceMap[serviceKey] - log.Debug("service name:%s, its current member lists:%+v", serviceKey, svcArr) - - switch res.Action { - case ServiceURLAdd: - if ok { - svcArr.Add(res.Service, r.Options.serviceTTL) - } else { - r.listenerServiceMap[serviceKey] = newServiceArray([]*ServiceURL{res.Service}) - } - case ServiceURLDel: - if ok { - svcArr.Del(res.Service, r.Options.serviceTTL) - if len(svcArr.arr) == 0 { - delete(r.listenerServiceMap, serviceKey) - log.Warn("delete service %s from service map", serviceKey) - } - } - log.Error("selector delete serviceURL{%s}", *res.Service) + if c.Version != url.Version { + return false } -} - -func (r *ZkConsumerRegistry) listen() { - defer r.wg.Done() - - for { - if r.isClosed() { - log.Warn("event listener game over.") - return - } - - listener, err := r.getListener() - if err != nil { - if r.isClosed() { - log.Warn("event listener game over.") - return - } - log.Warn("getListener() = err:%s", jerrors.ErrorStack(err)) - time.Sleep(timeSecondDuration(REGISTRY_CONN_DELAY)) - continue - } - if err = listener.listenEvent(r); err != nil { - log.Warn("Selector.watch() = error{%v}", jerrors.ErrorStack(err)) - - r.listenerLock.Lock() - r.listener = nil - r.listenerLock.Unlock() - - listener.close() - - time.Sleep(timeSecondDuration(REGISTRY_CONN_DELAY)) - continue - } - } + return true } -func (r *ZkConsumerRegistry) closeRegisters() { - r.Lock() - log.Info("begin to close zk client") - r.client.Close() - r.client = nil - r.services = nil - r.Unlock() +type ServerConfig struct { + Protocol string `required:"true",default:"dubbo" yaml:"protocol" json:"protocol,omitempty"` // codec string, jsonrpc etc + IP string `yaml:"ip" json:"ip,omitempty"` + Port int `required:"true" yaml:"port" json:"port,omitempty"` } -func (r *ZkConsumerRegistry) Close() { - close(r.done) - r.wg.Wait() - r.closeRegisters() +func (c *ServerConfig) Address() string { + return gxnet.HostAddress(c.IP, c.Port) } diff --git a/registry/zk_client.go b/registry/zk_client.go index 05ab907fe1f5140d8bc5e8fac1b9c550660d4e41..5a97cce19e4b7b1b87b039b05bd750bc0a769eef 100644 --- a/registry/zk_client.go +++ b/registry/zk_client.go @@ -6,10 +6,9 @@ import ( "strings" "sync" "time" -) -import ( log "github.com/AlexStocks/log4go" + jerrors "github.com/juju/errors" "github.com/samuel/go-zookeeper/zk" )