Skip to content
Snippets Groups Projects
Unverified Commit b8be65fc authored by Xin.Zh's avatar Xin.Zh Committed by GitHub
Browse files

Merge pull request #29 from fangyincheng/feature/dubbotcpserver

Add:complete dubbo tcp server
parents fa4d69b9 52eec16d
No related branches found
No related tags found
No related merge requests found
Showing
with 1113 additions and 180 deletions
......@@ -10,7 +10,7 @@ import (
type Transport interface {
Call(ctx context.Context, url registry.ServiceURL, request Request, resp interface{}) error
NewRequest(conf registry.ServiceConfig, method string, args interface{}) (Request,error)
NewRequest(conf registry.ServiceConfig, method string, args interface{}) (Request, error)
}
//////////////////////////////////////////////
......
......@@ -12,11 +12,11 @@ import (
)
import (
"github.com/dubbo/dubbo-go/client"
"github.com/dubbo/dubbo-go/client/selector"
"github.com/dubbo/dubbo-go/dubbo"
"github.com/dubbo/dubbo-go/jsonrpc"
"github.com/dubbo/dubbo-go/registry"
"github.com/dubbo/dubbo-go/client"
)
const RegistryConnDelay = 3
......
......@@ -157,12 +157,11 @@ func (c *Client) call(ct CallType, addr string, svcUrl registry.ServiceURL, meth
p.Service.Version = svcUrl.Version()
p.Service.Method = method
p.Service.Timeout = opts.RequestTimeout
//if opts.SerialID == 0 || opts.SerialID == 1 || opts.SerialID == 3 || opts.SerialID == 4 || opts.SerialID == 5 || opts.SerialID == 6 ||
// opts.SerialID == 7 || pts.SerialID == 8 {
// p.Header.SerialID = byte(S_Dubbo)
//} else {
p.Header.SerialID = byte(opts.SerialID)
//}
if opts.SerialID == 0 {
p.Header.SerialID = byte(S_Dubbo)
} else {
p.Header.SerialID = byte(opts.SerialID)
}
p.Body = args
var rsp *PendingResponse
......@@ -236,7 +235,7 @@ func (c *Client) transfer(session getty.Session, pkg *DubboPackage,
pkg = &DubboPackage{}
pkg.Body = []interface{}{}
pkg.Header.Type = hessian.Heartbeat
//pkg.Header.SerialID = byte(4)
pkg.Header.SerialID = byte(S_Dubbo)
} else {
pkg.Header.Type = hessian.Request
}
......
......@@ -56,15 +56,19 @@ func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
return bytes.NewBuffer(pkg), nil
}
func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, pkgType hessian.PackgeType) error {
func (p *DubboPackage) Unmarshal(buf *bytes.Buffer) error {
codec := hessian.NewHessianCodec(bufio.NewReader(buf))
// read header
err := codec.ReadHeader(&p.Header, pkgType)
err := codec.ReadHeader(&p.Header)
if err != nil {
return jerrors.Trace(err)
}
if p.Header.Type&hessian.Heartbeat != 0x00 {
return nil
}
// read body
err = codec.ReadBody(p.Body)
return jerrors.Trace(err)
......
......@@ -5,6 +5,7 @@ import (
)
import (
"github.com/dubbo/dubbo-go/server"
jerrors "github.com/juju/errors"
)
......@@ -31,11 +32,12 @@ type (
// Config holds supported types by the multiconfig package
ServerConfig struct {
server.ServerConfig
// local address
AppName string `default:"rpc-server" yaml:"app_name" json:"app_name,omitempty"`
Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"`
Ports []string `yaml:"ports" json:"ports,omitempty"` // `default:["10000"]`
ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"`
//AppName string `default:"rpc-server" yaml:"app_name" json:"app_name,omitempty"`
//Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"`
//Ports []string `yaml:"ports" json:"ports,omitempty"` // `default:["10000"]`
//ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"`
// session
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
......@@ -53,9 +55,9 @@ type (
// Config holds supported types by the multiconfig package
ClientConfig struct {
// local address
AppName string `default:"rpc-client" yaml:"app_name" json:"app_name,omitempty"`
Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"`
ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"`
//AppName string `default:"rpc-client" yaml:"app_name" json:"app_name,omitempty"`
//Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"`
//ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"`
// session pool
ConnectionNum int `default:"16" yaml:"connection_number" json:"connection_number,omitempty"`
......
package dubbo
import (
"context"
"reflect"
"sync"
"time"
)
......@@ -11,6 +14,9 @@ import (
jerrors "github.com/juju/errors"
)
// todo: WritePkg_Timeout will entry *.yml
const WritePkg_Timeout = 5 * time.Second
var (
errTooManySessions = jerrors.New("too many sessions")
)
......@@ -54,11 +60,11 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
return
}
if p.Header.Type == hessian.Heartbeat {
if p.Header.Type&hessian.Heartbeat != 0x00 {
log.Debug("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body)
return
}
log.Debug("get rpc response{header: %#v, header: %#v}", p.Header, p.Body)
log.Debug("get rpc response{header: %#v, body: %#v}", p.Header, p.Body)
h.conn.updateSession(session)
......@@ -94,3 +100,190 @@ func (h *RpcClientHandler) OnCron(session getty.Session) {
h.conn.pool.rpcClient.heartbeat(session)
}
////////////////////////////////////////////
// RpcServerHandler
////////////////////////////////////////////
type RpcServerHandler struct {
maxSessionNum int
sessionTimeout time.Duration
sessionMap map[getty.Session]*rpcSession
rwlock sync.RWMutex
}
func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler {
return &RpcServerHandler{
maxSessionNum: maxSessionNum,
sessionTimeout: sessionTimeout,
sessionMap: make(map[getty.Session]*rpcSession),
}
}
func (h *RpcServerHandler) OnOpen(session getty.Session) error {
var err error
h.rwlock.RLock()
if h.maxSessionNum <= len(h.sessionMap) {
err = errTooManySessions
}
h.rwlock.RUnlock()
if err != nil {
return jerrors.Trace(err)
}
log.Info("got session:%s", session.Stat())
h.rwlock.Lock()
h.sessionMap[session] = &rpcSession{session: session}
h.rwlock.Unlock()
return nil
}
func (h *RpcServerHandler) OnError(session getty.Session, err error) {
log.Info("session{%s} got error{%v}, will be closed.", session.Stat(), err)
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
}
func (h *RpcServerHandler) OnClose(session getty.Session) {
log.Info("session{%s} is closing......", session.Stat())
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
}
func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
h.rwlock.Lock()
if _, ok := h.sessionMap[session]; ok {
h.sessionMap[session].reqNum++
}
h.rwlock.Unlock()
p, ok := pkg.(*DubboPackage)
if !ok {
log.Error("illegal packge{%#v}", pkg)
return
}
p.Header.ResponseStatus = hessian.Response_OK
// heartbeat
if p.Header.Type&hessian.Heartbeat != 0x00 {
log.Debug("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body)
h.reply(session, p, hessian.Heartbeat)
return
}
// twoway
if p.Header.Type&hessian.Request_TwoWay == 0x00 {
h.reply(session, p, hessian.Response)
h.callService(p, nil)
return
}
h.callService(p, nil)
h.reply(session, p, hessian.Response)
}
func (h *RpcServerHandler) OnCron(session getty.Session) {
var (
flag bool
active time.Time
)
h.rwlock.RLock()
if _, ok := h.sessionMap[session]; ok {
active = session.GetActive()
if h.sessionTimeout.Nanoseconds() < time.Since(active).Nanoseconds() {
flag = true
log.Warn("session{%s} timeout{%s}, reqNum{%d}",
session.Stat(), time.Since(active).String(), h.sessionMap[session].reqNum)
}
}
h.rwlock.RUnlock()
if flag {
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
session.Close()
}
}
func (h *RpcServerHandler) callService(req *DubboPackage, ctx context.Context) {
defer func() {
if e := recover(); e != nil {
req.Header.ResponseStatus = hessian.Response_BAD_REQUEST
if err, ok := e.(error); ok {
log.Error("callService panic: %#v", err)
req.Body = e.(error)
} else if err, ok := e.(string); ok {
log.Error("callService panic: %#v", jerrors.New(err))
req.Body = jerrors.New(err)
} else {
log.Error("callService panic: %#v", e)
req.Body = e
}
}
}()
svc := req.Body.(map[string]interface{})["service"].(*service)
method := svc.method[req.Service.Method]
// prepare argv
var argv reflect.Value
argIsValue := false // if true, need to indirect before calling.
if method.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(method.ArgType.Elem())
} else {
argv = reflect.New(method.ArgType)
argIsValue = true
}
argvTmp := argv.Interface()
argvTmp = req.Body.(map[string]interface{})["args"] // type is []interface
if argIsValue {
argv = argv.Elem()
}
// prepare replyv
replyv := reflect.New(method.ReplyType.Elem())
var returnValues []reflect.Value
if method.CtxType == nil {
returnValues = method.method.Func.Call([]reflect.Value{svc.rcvr, reflect.ValueOf(argvTmp), reflect.ValueOf(replyv.Interface())})
} else {
if contextv := reflect.ValueOf(ctx); contextv.IsValid() {
returnValues = method.method.Func.Call([]reflect.Value{svc.rcvr, contextv, reflect.ValueOf(argvTmp), reflect.ValueOf(replyv.Interface())})
} else {
returnValues = method.method.Func.Call([]reflect.Value{svc.rcvr, reflect.Zero(method.CtxType), reflect.ValueOf(argvTmp), reflect.ValueOf(replyv.Interface())})
}
}
// The return value for the method is an error.
if retErr := returnValues[0].Interface(); retErr != nil {
req.Header.ResponseStatus = hessian.Response_SERVER_ERROR
req.Body = retErr.(error)
} else {
req.Body = replyv.Interface()
}
}
func (h *RpcServerHandler) reply(session getty.Session, req *DubboPackage, tp hessian.PackgeType) {
resp := &DubboPackage{
Header: hessian.DubboHeader{
SerialID: req.Header.SerialID,
Type: tp,
ID: req.Header.ID,
ResponseStatus: req.Header.ResponseStatus,
},
}
if req.Header.Type&hessian.Request != 0x00 {
resp.Body = req.Body
} else {
resp.Body = nil
}
if err := session.WritePkg(resp, WritePkg_Timeout); err != nil {
log.Error("WritePkg error: %#v, %#v", jerrors.Trace(err), req.Header)
}
}
......@@ -2,12 +2,12 @@ package dubbo
import (
"bytes"
"reflect"
)
import (
"github.com/AlexStocks/getty"
log "github.com/AlexStocks/log4go"
"github.com/dubbogo/hessian2"
jerrors "github.com/juju/errors"
)
......@@ -29,9 +29,9 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface
}
buf := bytes.NewBuffer(data)
err := pkg.Unmarshal(buf, hessian.Response)
err := pkg.Unmarshal(buf)
if err != nil {
pkg.Err = jerrors.Trace(err)
pkg.Err = jerrors.Trace(err) // client will get this err
return pkg, len(data), nil
}
......@@ -60,24 +60,77 @@ func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) error
type RpcServerPackageHandler struct {
server *Server
srvMap serviceMap
}
func NewRpcServerPackageHandler(server *Server) *RpcServerPackageHandler {
func NewRpcServerPackageHandler(server *Server, srvMap serviceMap) *RpcServerPackageHandler {
return &RpcServerPackageHandler{
server: server,
srvMap: srvMap,
}
}
func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
pkg := &DubboPackage{
Body: nil,
Body: make([]interface{}, 7),
}
buf := bytes.NewBuffer(data)
err := pkg.Unmarshal(buf)
if err != nil {
return nil, 0, jerrors.Trace(err)
}
// convert params of request
req := pkg.Body.([]interface{}) // length of body should be 7
if len(req) > 0 {
var dubboVersion, argsTypes string
var args []interface{}
var attachments map[interface{}]interface{}
if req[0] != nil {
dubboVersion = req[0].(string)
}
if req[1] != nil {
pkg.Service.Target = req[1].(string)
}
if req[2] != nil {
pkg.Service.Version = req[2].(string)
}
if req[3] != nil {
pkg.Service.Method = req[3].(string)
}
if req[4] != nil {
argsTypes = req[4].(string)
}
if req[5] != nil {
args = req[5].([]interface{})
}
if req[6] != nil {
attachments = req[6].(map[interface{}]interface{})
}
pkg.Body = map[string]interface{}{
"dubboVersion": dubboVersion,
"argsTypes": argsTypes,
"args": args,
"service": p.srvMap[pkg.Service.Target],
"attachments": attachments,
}
}
// todo:
return pkg, len(data), nil
}
func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) error {
// todo:
return nil
res, ok := pkg.(*DubboPackage)
if !ok {
log.Error("illegal pkg:%+v\n, it is %+v", pkg, reflect.TypeOf(pkg))
return jerrors.New("invalid rpc response")
}
buf, err := res.Marshal()
if err != nil {
log.Warn("binary.Write(res{%#v}) = err{%#v}", res, jerrors.ErrorStack(err))
return jerrors.Trace(err)
}
return jerrors.Trace(ss.WriteBytes(buf.Bytes()))
}
......@@ -23,15 +23,16 @@ type GettyRPCService interface {
type methodType struct {
sync.Mutex
method reflect.Method
CtxType reflect.Type // type of the request context
ArgType reflect.Type
ReplyType reflect.Type
}
type service struct {
name string
rcvr reflect.Value
typ reflect.Type
method map[string]*methodType
name string
rcvr reflect.Value
rcvrType reflect.Type
method map[string]*methodType
}
// Is this an exported - upper case - name
......@@ -51,56 +52,68 @@ func isExportedOrBuiltinType(t reflect.Type) bool {
}
// suitableMethods returns suitable Rpc methods of typ
func suitableMethods(typ reflect.Type) map[string]*methodType {
func suitableMethods(typ reflect.Type) (string, map[string]*methodType) {
methods := make(map[string]*methodType)
mts := ""
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := method.Name
// Method must be exported.
if method.PkgPath != "" {
continue
}
// service Method needs three ins: receiver, *args, *reply.
// notify Method needs two ins: receiver, *args.
mInNum := mtype.NumIn()
if mInNum != 2 && mInNum != 3 {
log.Warn("method %s has wrong number of ins %d which should be "+
"2(notify method) or 3(serive method)", mname, mtype.NumIn())
continue
}
// First arg need not be a pointer.
argType := mtype.In(1)
if !isExportedOrBuiltinType(argType) {
log.Error("method{%s} argument type not exported{%v}", mname, argType)
continue
if mt := suiteMethod(method); mt != nil {
methods[method.Name] = mt
mts += method.Name + ","
}
}
return mts, methods
}
var replyType reflect.Type
if mInNum == 3 {
// Second arg must be a pointer.
replyType = mtype.In(2)
if replyType.Kind() != reflect.Ptr {
log.Error("method{%s} reply type not a pointer{%v}", mname, replyType)
continue
}
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
log.Error("method{%s} reply type not exported{%v}", mname, replyType)
continue
}
}
// Method needs one out.
if mtype.NumOut() != 1 {
log.Error("method{%s} has wrong number of out parameters{%d}", mname, mtype.NumOut())
continue
}
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
log.Error("method{%s}'s return type{%s} is not error", mname, returnType.String())
continue
}
methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
// suiteMethod returns a suitable Rpc methodType
func suiteMethod(method reflect.Method) *methodType {
mtype := method.Type
mname := method.Name
// Method must be exported.
if method.PkgPath != "" {
return nil
}
return methods
var replyType, argType, ctxType reflect.Type
switch mtype.NumIn() {
case 3:
argType = mtype.In(1)
replyType = mtype.In(2)
case 4:
ctxType = mtype.In(1)
argType = mtype.In(2)
replyType = mtype.In(3)
default:
log.Error("method %s of mtype %v has wrong number of in parameters %d; needs exactly 3/4",
mname, mtype, mtype.NumIn())
return nil
}
// First arg need not be a pointer.
if !isExportedOrBuiltinType(argType) {
log.Error("argument type of method %q is not exported %v", mname, argType)
return nil
}
// Second arg must be a pointer.
if replyType.Kind() != reflect.Ptr {
log.Error("reply type of method %q is not a pointer %v", mname, replyType)
return nil
}
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
log.Error("reply type of method %s not exported{%v}", mname, replyType)
return nil
}
// Method needs one out.
if mtype.NumOut() != 1 {
log.Error("method %q has %d out parameters; needs exactly 1", mname, mtype.NumOut())
return nil
}
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
log.Error("return type %s of method %q is not error", returnType, mname)
return nil
}
return &methodType{method: method, ArgType: argType, ReplyType: replyType, CtxType: ctxType}
}
......@@ -2,8 +2,10 @@ package dubbo
import (
"fmt"
"github.com/dubbo/dubbo-go/plugins"
"net"
"reflect"
"strconv"
)
import (
......@@ -13,62 +15,168 @@ import (
jerrors "github.com/juju/errors"
)
type Server struct {
conf ServerConfig
serviceMap map[string]*service
tcpServerList []getty.Server
import (
"github.com/dubbo/dubbo-go/registry"
)
type Option func(*Options)
type Options struct {
Registry registry.Registry
ConfList []ServerConfig
ServiceConfList []registry.ServiceConfig
}
func NewServer(conf *ServerConfig) (*Server, error) {
if err := conf.CheckValidity(); err != nil {
return nil, jerrors.Trace(err)
func newOptions(opt ...Option) Options {
opts := Options{}
for _, o := range opt {
o(&opts)
}
s := &Server{
serviceMap: make(map[string]*service),
conf: *conf,
if opts.Registry == nil {
panic("server.Options.Registry is nil")
}
return s, nil
return opts
}
func (s *Server) Register(rcvr GettyRPCService) error {
svc := &service{
typ: reflect.TypeOf(rcvr),
rcvr: reflect.ValueOf(rcvr),
name: reflect.Indirect(reflect.ValueOf(rcvr)).Type().Name(),
// Install the methods
method: suitableMethods(reflect.TypeOf(rcvr)),
}
if svc.name == "" {
s := "rpc.Register: no service name for type " + svc.typ.String()
log.Error(s)
return jerrors.New(s)
// Registry used for discovery
func Registry(r registry.Registry) Option {
return func(o *Options) {
o.Registry = r
}
if !isExported(svc.name) {
s := "rpc.Register: type " + svc.name + " is not exported"
log.Error(s)
return jerrors.New(s)
}
if _, present := s.serviceMap[svc.name]; present {
return jerrors.New("rpc: service already defined: " + svc.name)
}
func ConfList(confList []ServerConfig) Option {
return func(o *Options) {
o.ConfList = confList
for i := 0; i < len(o.ConfList); i++ {
if err := o.ConfList[i].CheckValidity(); err != nil {
log.Error("ServerConfig check failed: ", err)
o.ConfList = []ServerConfig{}
return
}
if o.ConfList[i].IP == "" {
o.ConfList[i].IP, _ = gxnet.GetLocalIP()
}
}
}
}
if len(svc.method) == 0 {
// To help the user, see if a pointer receiver would work.
method := suitableMethods(reflect.PtrTo(svc.typ))
str := "rpc.Register: type " + svc.name + " has no exported methods of suitable type"
if len(method) != 0 {
str = "rpc.Register: type " + svc.name + " has no exported methods of suitable type (" +
"hint: pass a pointer to value of that type)"
func ServiceConfList(confList []registry.ServiceConfig) Option {
return func(o *Options) {
o.ServiceConfList = confList
if o.ServiceConfList == nil {
o.ServiceConfList = []registry.ServiceConfig{}
}
log.Error(str)
}
}
type serviceMap map[string]*service
return jerrors.New(str)
type Server struct {
opts Options
indexOfConfList int
srvs []serviceMap
tcpServerList []getty.Server
}
func NewServer(opts ...Option) *Server {
options := newOptions(opts...)
num := len(options.ConfList)
servers := make([]serviceMap, len(options.ConfList))
for i := 0; i < num; i++ {
servers[i] = map[string]*service{}
}
s.serviceMap[svc.name] = svc
s := &Server{
opts: options,
srvs: servers,
}
return s
}
// Register export services and register with the registry
func (s *Server) Register(rcvr GettyRPCService) error {
serviceConf := plugins.DefaultProviderServiceConfig()()
opts := s.opts
serviceConf.SetService(rcvr.Service())
serviceConf.SetVersion(rcvr.Version())
flag := false
serviceNum := len(opts.ServiceConfList)
serverNum := len(opts.ConfList)
for i := 0; i < serviceNum; i++ {
if opts.ServiceConfList[i].Service() == serviceConf.Service() &&
opts.ServiceConfList[i].Version() == serviceConf.Version() {
serviceConf.SetProtocol(opts.ServiceConfList[i].Protocol())
serviceConf.SetGroup(opts.ServiceConfList[i].Group())
for j := 0; j < serverNum; j++ {
if opts.ConfList[j].Protocol == serviceConf.Protocol() {
rcvrName := reflect.Indirect(reflect.ValueOf(rcvr)).Type().Name()
svc := &service{
rcvrType: reflect.TypeOf(rcvr),
rcvr: reflect.ValueOf(rcvr),
}
if rcvrName == "" {
s := "rpc.Register: no service name for type " + svc.rcvrType.String()
log.Error(s)
return jerrors.New(s)
}
if !isExported(rcvrName) {
s := "rpc.Register: type " + rcvrName + " is not exported"
log.Error(s)
return jerrors.New(s)
}
svc.name = rcvr.Service() // service name is from 'Service()'
if _, present := s.srvs[j][svc.name]; present {
return jerrors.New("rpc: service already defined: " + svc.name)
}
// Install the methods
mts, methods := suitableMethods(svc.rcvrType)
svc.method = methods
if len(svc.method) == 0 {
// To help the user, see if a pointer receiver would work.
mts, methods = suitableMethods(reflect.PtrTo(svc.rcvrType))
str := "rpc.Register: type " + rcvrName + " has no exported methods of suitable type"
if len(methods) != 0 {
str = "rpc.Register: type " + rcvrName + " has no exported methods of suitable type (" +
"hint: pass a pointer to value of that type)"
}
log.Error(str)
return jerrors.New(str)
}
s.srvs[j][svc.name] = svc
serviceConf.SetMethods(mts)
serviceConf.SetPath(opts.ConfList[j].Address())
err := opts.Registry.Register(serviceConf)
if err != nil {
return err
}
flag = true
}
}
}
}
if !flag {
return jerrors.Errorf("fail to register Handler{service:%s, version:%s}",
serviceConf.Service, serviceConf.Version)
}
return nil
}
......@@ -77,8 +185,9 @@ func (s *Server) newSession(session getty.Session) error {
ok bool
tcpConn *net.TCPConn
)
conf := s.opts.ConfList[s.indexOfConfList]
if s.conf.GettySessionParam.CompressEncoding {
if conf.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
......@@ -86,24 +195,24 @@ func (s *Server) newSession(session getty.Session) error {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
}
tcpConn.SetNoDelay(s.conf.GettySessionParam.TcpNoDelay)
tcpConn.SetKeepAlive(s.conf.GettySessionParam.TcpKeepAlive)
if s.conf.GettySessionParam.TcpKeepAlive {
tcpConn.SetKeepAlivePeriod(s.conf.GettySessionParam.keepAlivePeriod)
tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay)
tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive)
if conf.GettySessionParam.TcpKeepAlive {
tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod)
}
tcpConn.SetReadBuffer(s.conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(s.conf.GettySessionParam.TcpWBufSize)
session.SetName(s.conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(s.conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcServerPackageHandler(s))
//session.SetEventListener(NewRpcServerHandler(s.conf.SessionNumber, s.conf.sessionTimeout))
session.SetRQLen(s.conf.GettySessionParam.PkgRQSize)
session.SetWQLen(s.conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(s.conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(s.conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(s.conf.sessionTimeout.Nanoseconds() / 1e6))
session.SetWaitTime(s.conf.GettySessionParam.waitTimeout)
tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcServerPackageHandler(s, s.srvs[s.indexOfConfList]))
session.SetEventListener(NewRpcServerHandler(conf.SessionNumber, conf.sessionTimeout))
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
log.Debug("app accepts new session:%s\n", session.Stat())
return nil
......@@ -112,23 +221,24 @@ func (s *Server) newSession(session getty.Session) error {
func (s *Server) Start() {
var (
addr string
portList []string
tcpServer getty.Server
)
portList = s.conf.Ports
if len(portList) == 0 {
panic("portList is nil")
if len(s.opts.ConfList) == 0 {
panic("ConfList is nil")
}
for _, port := range portList {
addr = gxnet.HostAddress2(s.conf.Host, port)
for i := 0; i < len(s.opts.ConfList); i++ {
addr = gxnet.HostAddress2(s.opts.ConfList[i].IP, strconv.Itoa(s.opts.ConfList[i].Port))
tcpServer = getty.NewTCPServer(
getty.WithLocalAddress(addr),
)
s.indexOfConfList = i
tcpServer.RunEventLoop(s.newSession)
log.Debug("s bind addr{%s} ok!", addr)
s.tcpServerList = append(s.tcpServerList, tcpServer)
}
}
func (s *Server) Stop() {
......
......@@ -45,7 +45,6 @@ func testDubborpc(clientConfig *examples.ClientConfig, userKey string) {
hessian.RegisterJavaEnum(Gender(MAN))
hessian.RegisterJavaEnum(Gender(WOMAN))
hessian.RegisterPOJO(&DubboUser{})
hessian.RegisterPOJO(&Response{})
user = new(DubboUser)
defer clientInvoker.DubboClient.Close()
......
......@@ -2,30 +2,14 @@ package main
import (
"fmt"
"github.com/dubbogo/hessian2"
"strconv"
"time"
)
import (
"github.com/AlexStocks/goext/time"
"github.com/dubbogo/hessian2"
)
type JsonRPCUser struct {
ID string `json:"id"`
Name string `json:"name"`
Age int64 `json:"age"`
Time int64 `json:"time"`
Sex string `json:"sex"`
}
func (u JsonRPCUser) String() string {
return fmt.Sprintf(
"User{ID:%s, Name:%s, Age:%d, Time:%s, Sex:%s}",
u.ID, u.Name, u.Age, gxtime.YMDPrint(int(u.Time), 0), u.Sex,
)
}
type Gender hessian.JavaEnum
const (
......@@ -83,20 +67,3 @@ func (u DubboUser) String() string {
func (DubboUser) JavaClassName() string {
return "com.ikurento.user.User"
}
type Response struct {
Status int
Err string
Data int
}
func (r Response) String() string {
return fmt.Sprintf(
"Response{Status:%d, Err:%s, Data:%d}",
r.Status, r.Err, r.Data,
)
}
func (Response) JavaClassName() string {
return "com.ikurento.user.Response"
}
......@@ -28,7 +28,7 @@ zk_registry_config:
timeout : "3s"
address:
- "127.0.0.1:2181"
service_config_type: "default"
service_list:
-
protocol : "dubbo"
......
......@@ -28,7 +28,7 @@ zk_registry_config:
timeout : "3s"
address:
- "127.0.0.1:2181"
service_config_type: "default"
service_list:
-
protocol : "dubbo"
......
......@@ -28,7 +28,7 @@ zk_registry_config:
timeout : "3s"
address:
- "127.0.0.1:2181"
service_config_type: "default"
service_list:
-
protocol : "dubbo"
......
package main
import (
"fmt"
"github.com/dubbo/dubbo-go/plugins"
"io/ioutil"
"os"
"path"
"time"
)
import (
"github.com/AlexStocks/goext/log"
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
yaml "gopkg.in/yaml.v2"
)
import (
"github.com/dubbo/dubbo-go/registry"
"github.com/dubbo/dubbo-go/registry/zookeeper"
"github.com/dubbo/dubbo-go/server"
)
const (
APP_CONF_FILE string = "APP_CONF_FILE"
APP_LOG_CONF_FILE string = "APP_LOG_CONF_FILE"
)
var (
conf *ServerConfig
)
type (
ServerConfig struct {
// pprof
Pprof_Enabled bool `default:"false" yaml:"pprof_enabled" json:"pprof_enabled,omitempty"`
Pprof_Port int `default:"10086" yaml:"pprof_port" json:"pprof_port,omitempty"`
// transport & registry
Transport string `default:"http" yaml:"transport" json:"transport,omitempty"`
NetTimeout string `default:"100ms" yaml:"net_timeout" json:"net_timeout,omitempty"` // in ms
netTimeout time.Duration
// application
Application_Config registry.ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"`
// Registry_Address string `default:"192.168.35.3:2181"`
Registry string `default:"zookeeper" yaml:"registry" json:"registry,omitempty"`
ZkRegistryConfig zookeeper.ZkRegistryConfig `yaml:"zk_registry_config" json:"zk_registry_config,omitempty"`
ServiceConfigType string `default:"default" yaml:"service_config_type" json:"service_config_type,omitempty"`
ServiceConfigList []registry.ServiceConfig `yaml:"-"`
ServiceConfigMapList []map[string]string `yaml:"service_list" json:"service_list,omitempty"`
Server_List []server.ServerConfig `yaml:"server_list" json:"server_list,omitempty"`
}
)
func initServerConf() *ServerConfig {
var (
err error
confFile string
)
confFile = os.Getenv(APP_CONF_FILE)
if confFile == "" {
panic(fmt.Sprintf("application configure file name is nil"))
return nil
}
if path.Ext(confFile) != ".yml" {
panic(fmt.Sprintf("application configure file name{%v} suffix must be .yml", confFile))
return nil
}
conf = &ServerConfig{}
confFileStream, err := ioutil.ReadFile(confFile)
if err != nil {
panic(fmt.Sprintf("ioutil.ReadFile(file:%s) = error:%s", confFile, jerrors.ErrorStack(err)))
return nil
}
err = yaml.Unmarshal(confFileStream, conf)
if err != nil {
panic(fmt.Sprintf("yaml.Unmarshal() = error:%s", jerrors.ErrorStack(err)))
return nil
}
if conf.netTimeout, err = time.ParseDuration(conf.NetTimeout); err != nil {
panic(fmt.Sprintf("time.ParseDuration(NetTimeout:%#v) = error:%s", conf.NetTimeout, err))
return nil
}
if conf.ZkRegistryConfig.Timeout, err = time.ParseDuration(conf.ZkRegistryConfig.TimeoutStr); err != nil {
panic(fmt.Sprintf("time.ParseDuration(Registry_Config.Timeout:%#v) = error:%s",
conf.ZkRegistryConfig.TimeoutStr, err))
return nil
}
// set designated service_config_type to default
plugins.SetDefaultProviderServiceConfig(conf.ServiceConfigType)
for _, service := range conf.ServiceConfigMapList {
svc := plugins.DefaultProviderServiceConfig()()
svc.SetProtocol(service["protocol"])
svc.SetService(service["service"])
conf.ServiceConfigList = append(conf.ServiceConfigList, svc)
}
gxlog.CInfo("config{%#v}\n", conf)
return conf
}
func configInit() error {
var (
confFile string
)
initServerConf()
confFile = os.Getenv(APP_LOG_CONF_FILE)
if confFile == "" {
panic(fmt.Sprintf("log configure file name is nil"))
return nil
}
if path.Ext(confFile) != ".xml" {
panic(fmt.Sprintf("log configure file name{%v} suffix must be .xml", confFile))
return nil
}
log.LoadConfiguration(confFile)
return nil
}
package main
import (
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"strconv"
"syscall"
)
import (
"github.com/AlexStocks/goext/net"
"github.com/AlexStocks/goext/time"
log "github.com/AlexStocks/log4go"
"github.com/dubbogo/hessian2"
jerrors "github.com/juju/errors"
)
import (
"github.com/dubbo/dubbo-go/dubbo"
"github.com/dubbo/dubbo-go/plugins"
"github.com/dubbo/dubbo-go/registry"
"github.com/dubbo/dubbo-go/registry/zookeeper"
)
var (
survivalTimeout = int(3e9)
servo *dubbo.Server
)
func main() {
var (
err error
)
err = configInit()
if err != nil {
log.Error("configInit() = error{%#v}", err)
return
}
initProfiling()
hessian.RegisterJavaEnum(Gender(MAN))
hessian.RegisterJavaEnum(Gender(WOMAN))
hessian.RegisterPOJO(&DubboUser{})
servo = initServer()
err = servo.Register(&UserProvider{})
if err != nil {
panic(err)
return
}
servo.Start()
initSignal()
}
func initServer() *dubbo.Server {
var (
srv *dubbo.Server
)
if conf == nil {
panic(fmt.Sprintf("conf is nil"))
return nil
}
// registry
regs, err := plugins.PluggableRegistries[conf.Registry](
registry.WithDubboType(registry.PROVIDER),
registry.WithApplicationConf(conf.Application_Config),
zookeeper.WithRegistryConf(conf.ZkRegistryConfig),
)
if err != nil || regs == nil {
panic(fmt.Sprintf("fail to init registry.Registy, err:%s", jerrors.ErrorStack(err)))
return nil
}
// generate server config
serverConfig := make([]dubbo.ServerConfig, len(conf.Server_List))
for i := 0; i < len(conf.Server_List); i++ {
serverConfig[i] = dubbo.ServerConfig{
SessionNumber: 700,
FailFastTimeout: "5s",
SessionTimeout: "20s",
GettySessionParam: dubbo.GettySessionParam{
CompressEncoding: false, // 必须false
TcpNoDelay: true,
KeepAlivePeriod: "120s",
TcpRBufSize: 262144,
TcpKeepAlive: true,
TcpWBufSize: 65536,
PkgRQSize: 1024,
PkgWQSize: 512,
TcpReadTimeout: "1s",
TcpWriteTimeout: "5s",
WaitTimeout: "1s",
MaxMsgLen: 1024,
SessionName: "server",
},
}
serverConfig[i].IP = conf.Server_List[i].IP
serverConfig[i].Port = conf.Server_List[i].Port
serverConfig[i].Protocol = conf.Server_List[i].Protocol
}
// provider
srv = dubbo.NewServer(
dubbo.Registry(regs),
dubbo.ConfList(serverConfig),
dubbo.ServiceConfList(conf.ServiceConfigList),
)
return srv
}
func uninitServer() {
if servo != nil {
servo.Stop()
}
log.Close()
}
func initProfiling() {
if !conf.Pprof_Enabled {
return
}
const (
PprofPath = "/debug/pprof/"
)
var (
err error
ip string
addr string
)
ip, err = gxnet.GetLocalIP()
if err != nil {
panic("cat not get local ip!")
}
addr = ip + ":" + strconv.Itoa(conf.Pprof_Port)
log.Info("App Profiling startup on address{%v}", addr+PprofPath)
go func() {
log.Info(http.ListenAndServe(addr, nil))
}()
}
func initSignal() {
signals := make(chan os.Signal, 1)
// It is not possible to block SIGKILL or syscall.SIGSTOP
signal.Notify(signals, os.Interrupt, os.Kill, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
for {
sig := <-signals
log.Info("get signal %s", sig.String())
switch sig {
case syscall.SIGHUP:
// reload()
default:
go gxtime.Future(survivalTimeout, func() {
log.Warn("app exit now by force...")
os.Exit(1)
})
// 要么fastFailTimeout时间内执行完毕下面的逻辑然后程序退出,要么执行上面的超时函数程序强行退出
uninitServer()
fmt.Println("provider app exit now...")
return
}
}
}
package main
import (
// "encoding/json"
"context"
"fmt"
"strconv"
"time"
)
import (
"github.com/AlexStocks/goext/log"
"github.com/dubbogo/hessian2"
)
type Gender hessian.JavaEnum
const (
MAN hessian.JavaEnum = iota
WOMAN
)
var genderName = map[hessian.JavaEnum]string{
MAN: "MAN",
WOMAN: "WOMAN",
}
var genderValue = map[string]hessian.JavaEnum{
"MAN": MAN,
"WOMAN": WOMAN,
}
func (g Gender) JavaClassName() string {
return "com.ikurento.user.Gender"
}
func (g Gender) String() string {
s, ok := genderName[hessian.JavaEnum(g)]
if ok {
return s
}
return strconv.Itoa(int(g))
}
func (g Gender) EnumValue(s string) hessian.JavaEnum {
v, ok := genderValue[s]
if ok {
return v
}
return hessian.InvalidJavaEnum
}
type (
DubboUser struct {
// !!! Cannot define lowercase names of variable
Id string
Name string
Age int32
Time time.Time
Sex Gender // 注意此处,java enum Object <--> go string
}
UserProvider struct {
user map[string]DubboUser
}
)
func (u DubboUser) String() string {
return fmt.Sprintf(
"User{Id:%s, Name:%s, Age:%d, Time:%s, Sex:%s}",
u.Id, u.Name, u.Age, u.Time, u.Sex,
)
}
func (DubboUser) JavaClassName() string {
return "com.ikurento.user.User"
}
var (
DefaultUser = DubboUser{
Id: "0", Name: "Alex Stocks", Age: 31,
Sex: Gender(MAN),
}
userMap = UserProvider{user: make(map[string]DubboUser)}
)
func init() {
//DefaultUser.Sex = DefaultUser.sex.String()
userMap.user["A000"] = DefaultUser
userMap.user["A001"] = DubboUser{Id: "001", Name: "ZhangSheng", Age: 18, Sex: Gender(MAN)}
userMap.user["A002"] = DubboUser{Id: "002", Name: "Lily", Age: 20, Sex: Gender(WOMAN)}
userMap.user["A003"] = DubboUser{Id: "113", Name: "Moorse", Age: 30, Sex: Gender(WOMAN)}
for k, v := range userMap.user {
userMap.user[k] = v
}
}
func (u *UserProvider) getUser(userId string) (*DubboUser, error) {
if user, ok := userMap.user[userId]; ok {
return &user, nil
}
return nil, fmt.Errorf("invalid user id:%s", userId)
}
/*
!!! req must be []interface{}
*/
func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *DubboUser) error {
var (
err error
user *DubboUser
)
gxlog.CInfo("req:%#v", req)
user, err = u.getUser(req[0].(string))
if err == nil {
*rsp = *user
gxlog.CInfo("rsp:%#v", rsp)
}
return err
}
func (u *UserProvider) Service() string {
return "com.ikurento.user.UserProvider"
}
func (u *UserProvider) Version() string {
return ""
}
package main
var (
Version string = "0.3.1"
)
#!/usr/bin/env bash
# ******************************************************
# DESC : dubbogo app devops script
# AUTHOR : Alex Stocks
# VERSION : 1.0
# LICENCE : Apache License 2.0
# EMAIL : alexstocks@foxmail.com
# MOD : 2016-05-13 02:01
# FILE : load.sh
# ******************************************************
APP_NAME="APPLICATION_NAME"
APP_ARGS=""
PROJECT_HOME=""
OS_NAME=`uname`
if [[ ${OS_NAME} != "Windows" ]]; then
PROJECT_HOME=`pwd`
PROJECT_HOME=${PROJECT_HOME}"/"
fi
export APP_CONF_FILE=${PROJECT_HOME}"TARGET_CONF_FILE"
export APP_LOG_CONF_FILE=${PROJECT_HOME}"TARGET_LOG_CONF_FILE"
usage() {
echo "Usage: $0 start"
echo " $0 stop"
echo " $0 term"
echo " $0 restart"
echo " $0 list"
echo " $0 monitor"
echo " $0 crontab"
exit
}
start() {
APP_LOG_PATH="${PROJECT_HOME}logs/"
mkdir -p ${APP_LOG_PATH}
APP_BIN=${PROJECT_HOME}sbin/${APP_NAME}
chmod u+x ${APP_BIN}
# CMD="nohup ${APP_BIN} ${APP_ARGS} >>${APP_NAME}.nohup.out 2>&1 &"
CMD="${APP_BIN}"
eval ${CMD}
PID=`ps aux | grep -w ${APP_NAME} | grep -v grep | awk '{print $2}'`
if [[ ${OS_NAME} != "Linux" && ${OS_NAME} != "Darwin" ]]; then
PID=`ps aux | grep -w ${APP_NAME} | grep -v grep | awk '{print $1}'`
fi
CUR=`date +%FT%T`
if [ "${PID}" != "" ]; then
for p in ${PID}
do
echo "start ${APP_NAME} ( pid =" ${p} ") at " ${CUR}
done
fi
}
stop() {
PID=`ps aux | grep -w ${APP_NAME} | grep -v grep | awk '{print $2}'`
if [[ ${OS_NAME} != "Linux" && ${OS_NAME} != "Darwin" ]]; then
PID=`ps aux | grep -w ${APP_NAME} | grep -v grep | awk '{print $1}'`
fi
if [ "${PID}" != "" ];
then
for ps in ${PID}
do
echo "kill -SIGINT ${APP_NAME} ( pid =" ${ps} ")"
kill -2 ${ps}
done
fi
}
term() {
PID=`ps aux | grep -w ${APP_NAME} | grep -v grep | awk '{print $2}'`
if [[ ${OS_NAME} != "Linux" && ${OS_NAME} != "Darwin" ]]; then
PID=`ps aux | grep -w ${APP_NAME} | grep -v grep | awk '{print $1}'`
fi
if [ "${PID}" != "" ];
then
for ps in ${PID}
do
echo "kill -9 ${APP_NAME} ( pid =" ${ps} ")"
kill -9 ${ps}
done
fi
}
list() {
PID=`ps aux | grep -w ${APP_NAME} | grep -v grep | awk '{printf("%s,%s,%s,%s\n", $1, $2, $9, $10)}'`
if [[ ${OS_NAME} != "Linux" && ${OS_NAME} != "Darwin" ]]; then
PID=`ps aux | grep -w ${APP_NAME} | grep -v grep | awk '{printf("%s,%s,%s,%s,%s\n", $1, $4, $6, $7, $8)}'`
fi
if [ "${PID}" != "" ]; then
echo "list ${APP_NAME}"
if [[ ${OS_NAME} == "Linux" || ${OS_NAME} == "Darwin" ]]; then
echo "index: user, pid, start, duration"
else
echo "index: PID, WINPID, UID, STIME, COMMAND"
fi
idx=0
for ps in ${PID}
do
echo "${idx}: ${ps}"
((idx ++))
done
fi
}
opt=$1
case C"$opt" in
Cstart)
start
;;
Cstop)
stop
;;
Cterm)
term
;;
Crestart)
term
start
;;
Clist)
list
;;
C*)
usage
;;
esac
# dubbogo application configure script
# ******************************************************
# DESC : application environment variable
# AUTHOR : Alex Stocks
# VERSION : 1.0
# LICENCE : Apache License 2.0
# EMAIL : alexstocks@foxmail.com
# MOD : 2016-07-12 16:29
# FILE : app.properties
# ******************************************************
TARGET_EXEC_NAME="user_info_server"
# BUILD_PACKAGE="dubbogo-examples/user-info/server/app"
BUILD_PACKAGE="app"
TARGET_CONF_FILE="conf/server.yml"
TARGET_LOG_CONF_FILE="conf/log.xml"
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment