Skip to content
Snippets Groups Projects
Commit be7294c8 authored by fangyincheng's avatar fangyincheng
Browse files

Ftr:complete protocol(dubbo & jsonrpc)

parent ad8a392e
No related branches found
No related tags found
No related merge requests found
Showing
with 791 additions and 422 deletions
......@@ -3,3 +3,16 @@ package constant
const (
ASYNC_KEY = "async" // it's value should be "true" or "false" of string type
)
const (
GROUP_KEY = "group"
VERSION_KEY = "Version"
INTERFACE_KEY = "interface"
PATH_KEY = "path"
SERVICE_KEY = "service"
)
const (
SERVICE_FILTER_KEY = "service.filter"
REFERENCE_FILTER_KEY = "reference.filter"
)
package extension
import (
"github.com/dubbo/dubbo-go/filter"
)
var (
filters map[string]func() filter.Filter
)
func init() {
filters = make(map[string]func() filter.Filter)
}
func SetFilter(name string, v func() filter.Filter) {
filters[name] = v
}
func GetFilterExtension(name string) filter.Filter {
return filters[name]()
}
package config
import (
"context"
"reflect"
"strings"
"sync"
"unicode"
"unicode/utf8"
......@@ -9,6 +11,7 @@ import (
import (
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
)
// rpc service interface
......@@ -18,11 +21,6 @@ type RPCService interface {
}
var (
// A value sent as a placeholder for the server's response value when the server
// receives an invalid request. It is never decoded by the client since the Response
// contains an error when it is used.
invalidRequest = struct{}{}
// Precompute the reflect type for error. Can't use error directly
// because Typeof takes an empty interface value. This is annoying.
typeOfError = reflect.TypeOf((*error)(nil)).Elem()
......@@ -32,7 +30,10 @@ var (
}
)
//////////////////////////
// info of method
//////////////////////////
type MethodType struct {
method reflect.Method
ctxType reflect.Type // type of the request context
......@@ -52,17 +53,26 @@ func (m *MethodType) ArgType() reflect.Type {
func (m *MethodType) ReplyType() reflect.Type {
return m.replyType
}
func (m *MethodType) SuiteContext(ctx context.Context) reflect.Value {
if contextv := reflect.ValueOf(ctx); contextv.IsValid() {
return contextv
}
return reflect.Zero(m.ctxType)
}
//////////////////////////
// info of service interface
//////////////////////////
type Service struct {
name string
rcvr reflect.Value
rcvrType reflect.Type
method map[string]*MethodType
methods map[string]*MethodType
}
func (s *Service) Method() map[string]*MethodType {
return s.method
return s.methods
}
func (s *Service) RcvrType() reflect.Type {
return s.rcvrType
......@@ -71,13 +81,66 @@ func (s *Service) Rcvr() reflect.Value {
return s.rcvr
}
//////////////////////////
// serviceMap
// todo: use sync.Map?
//////////////////////////
type serviceMap struct {
mutex sync.Mutex // protects the serviceMap
mutex sync.RWMutex // protects the serviceMap
serviceMap map[string]*Service // service name -> service
}
func (sm *serviceMap) GetService(name string) *Service {
return sm.serviceMap[name]
sm.mutex.RLock()
defer sm.mutex.RUnlock()
if s, ok := sm.serviceMap[name]; ok {
return s
}
return nil
}
func (sm *serviceMap) Register(rcvr RPCService) (string, error) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
if sm.serviceMap == nil {
sm.serviceMap = make(map[string]*Service)
}
s := new(Service)
s.rcvrType = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name()
if sname == "" {
s := "no service name for type " + s.rcvrType.String()
log.Error(s)
return "", jerrors.New(s)
}
if !isExported(sname) {
s := "type " + sname + " is not exported"
log.Error(s)
return "", jerrors.New(s)
}
sname = rcvr.Service()
if server := sm.GetService(sname); server == nil {
return "", jerrors.New("service already defined: " + sname)
}
s.name = sname
s.methods = make(map[string]*MethodType)
// Install the methods
methods := ""
methods, s.methods = suitableMethods(s.rcvrType)
if len(s.methods) == 0 {
s := "type " + sname + " has no exported methods of suitable type"
log.Error(s)
return "", jerrors.New(s)
}
sm.serviceMap[s.name] = s
return strings.TrimSuffix(methods, ","), nil
}
// Is this an exported - upper case - name
......
package filter
import (
"github.com/dubbo/dubbo-go/protocol"
)
// Extension - Protocol
type Filter interface {
Invoke(protocol.Invoker, protocol.Invocation) protocol.Result
OnResponse(protocol.Result, protocol.Invoker, protocol.Invocation) protocol.Result
}
package jsonrpc
import (
"bytes"
"context"
"io"
"io/ioutil"
"net"
"net/http"
"reflect"
"strings"
"sync"
"unicode"
"unicode/utf8"
)
import (
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
)
var (
// A value sent as a placeholder for the server's response value when the server
// receives an invalid request. It is never decoded by the client since the Response
// contains an error when it is used.
invalidRequest = struct{}{}
// Precompute the reflect type for error. Can't use error directly
// because Typeof takes an empty interface value. This is annoying.
typeOfError = reflect.TypeOf((*error)(nil)).Elem()
)
type serviceMethod struct {
method reflect.Method // receiver method
ctxType reflect.Type // type of the request context
argsType reflect.Type // type of the request argument
replyType reflect.Type // type of the response argument
}
func (m *serviceMethod) suiteContext(ctx context.Context) reflect.Value {
if contextv := reflect.ValueOf(ctx); contextv.IsValid() {
return contextv
}
return reflect.Zero(m.ctxType)
}
type svc struct {
name string // name of service
rcvr reflect.Value // receiver of methods for the service
rcvrType reflect.Type // type of the receiver
methods map[string]*serviceMethod // registered methods, function name -> reflect.function
}
type serviceMap struct {
mutex sync.Mutex // protects the serviceMap
serviceMap map[string]*svc // service name -> service
}
func initServer() *serviceMap {
return &serviceMap{
serviceMap: make(map[string]*svc),
}
}
// isExported returns true of a string is an exported (upper case) name.
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune)
}
// isExportedOrBuiltin returns true if a type is exported or a builtin.
func isExportedOrBuiltin(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return isExported(t.Name()) || t.PkgPath() == ""
}
func suiteMethod(method reflect.Method) *serviceMethod {
mtype := method.Type
mname := method.Name
// Method must be exported.
if method.PkgPath != "" {
return nil
}
var replyType, argType, ctxType reflect.Type
switch mtype.NumIn() {
case 3:
argType = mtype.In(1)
replyType = mtype.In(2)
case 4:
ctxType = mtype.In(1)
argType = mtype.In(2)
replyType = mtype.In(3)
default:
log.Error("method %s of mtype %v has wrong number of in parameters %d; needs exactly 3/4",
mname, mtype, mtype.NumIn())
return nil
}
// First arg need not be a pointer.
if !isExportedOrBuiltin(argType) {
log.Error("argument type of method %q is not exported %v", mname, argType)
return nil
}
// Second arg must be a pointer.
if replyType.Kind() != reflect.Ptr {
log.Error("reply type of method %q is not a pointer %v", mname, replyType)
return nil
}
// Reply type must be exported.
if !isExportedOrBuiltin(replyType) {
log.Error("reply type of method %s not exported{%v}", mname, replyType)
return nil
}
// Method needs one out.
if mtype.NumOut() != 1 {
log.Error("method %q has %d out parameters; needs exactly 1", mname, mtype.NumOut())
return nil
}
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
log.Error("return type %s of method %q is not error", returnType, mname)
return nil
}
return &serviceMethod{method: method, argsType: argType, replyType: replyType, ctxType: ctxType}
}
func (server *serviceMap) register(rcvr Handler) (string, error) {
server.mutex.Lock()
defer server.mutex.Unlock()
if server.serviceMap == nil {
server.serviceMap = make(map[string]*svc)
}
s := new(svc)
s.rcvrType = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name()
if sname == "" {
s := "no service name for type " + s.rcvrType.String()
log.Error(s)
return "", jerrors.New(s)
}
if !isExported(sname) {
s := "type " + sname + " is not exported"
log.Error(s)
return "", jerrors.New(s)
}
sname = rcvr.Service()
if _, dup := server.serviceMap[sname]; dup {
return "", jerrors.New("service already defined: " + sname)
}
s.name = sname
s.methods = make(map[string]*serviceMethod)
// Install the methods
methods := ""
num := s.rcvrType.NumMethod()
for m := 0; m < num; m++ {
method := s.rcvrType.Method(m)
if mt := suiteMethod(method); mt != nil {
s.methods[method.Name] = mt
methods += method.Name + ","
}
}
if len(s.methods) == 0 {
s := "type " + sname + " has no exported methods of suitable type"
log.Error(s)
return "", jerrors.New(s)
}
server.serviceMap[s.name] = s
return strings.TrimSuffix(methods, ","), nil
}
func (server *serviceMap) serveRequest(ctx context.Context,
header map[string]string, body []byte, conn net.Conn) error {
// read request header
codec := newServerCodec()
err := codec.ReadHeader(header, body)
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
return jerrors.Trace(err)
}
return jerrors.New("server cannot decode request: " + err.Error())
}
serviceName := header["Path"]
methodName := codec.req.Method
if len(serviceName) == 0 || len(methodName) == 0 {
codec.ReadBody(nil)
return jerrors.New("service/method request ill-formed: " + serviceName + "/" + methodName)
}
// get method
server.mutex.Lock()
svc := server.serviceMap[serviceName]
server.mutex.Unlock()
if svc == nil {
codec.ReadBody(nil)
return jerrors.New("cannot find svc " + serviceName)
}
mtype := svc.methods[methodName]
if mtype == nil {
codec.ReadBody(nil)
return jerrors.New("cannot find method " + methodName + " of svc " + serviceName)
}
// get args
var argv reflect.Value
argIsValue := false
if mtype.argsType.Kind() == reflect.Ptr {
argv = reflect.New(mtype.argsType.Elem())
} else {
argv = reflect.New(mtype.argsType)
argIsValue = true
}
// argv guaranteed to be a pointer now.
if err = codec.ReadBody(argv.Interface()); err != nil {
return jerrors.Trace(err)
}
if argIsValue {
argv = argv.Elem()
}
replyv := reflect.New(mtype.replyType.Elem())
// call service.method(args)
var errMsg string
returnValues := mtype.method.Func.Call([]reflect.Value{
svc.rcvr,
mtype.suiteContext(ctx),
reflect.ValueOf(argv.Interface()),
reflect.ValueOf(replyv.Interface()),
})
// The return value for the method is an error.
if retErr := returnValues[0].Interface(); retErr != nil {
errMsg = retErr.(error).Error()
}
// write response
code := 200
rspReply := replyv.Interface()
if len(errMsg) != 0 {
code = 500
rspReply = invalidRequest
}
rspStream, err := codec.Write(errMsg, rspReply)
if err != nil {
return jerrors.Trace(err)
}
rsp := &http.Response{
StatusCode: code,
ProtoMajor: 1,
ProtoMinor: 1,
Header: make(http.Header),
ContentLength: int64(len(rspStream)),
Body: ioutil.NopCloser(bytes.NewReader(rspStream)),
}
delete(header, "Content-Type")
delete(header, "Content-Length")
delete(header, "Timeout")
for k, v := range header {
rsp.Header.Set(k, v)
}
rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize))
rspBuf.Reset()
if err = rsp.Write(rspBuf); err != nil {
log.Warn("rsp.Write(rsp:%#v) = error:%s", rsp, err)
return nil
}
if _, err = rspBuf.WriteTo(conn); err != nil {
log.Warn("rspBuf.WriteTo(conn:%#v) = error:%s", conn, err)
}
return nil
}
package dubbo
import (
"context"
)
import (
log "github.com/AlexStocks/log4go"
)
import (
"github.com/dubbo/dubbo-go/protocol"
)
// wrapping invoker
type DubboExporter struct {
ctx context.Context
key string
invoker protocol.Invoker
exporterMap map[string]protocol.Exporter
protocol.BaseExporter
}
func NewDubboExporter(ctx context.Context, key string, invoker protocol.Invoker, exporterMap map[string]protocol.Exporter) *DubboExporter {
func NewDubboExporter(key string, invoker protocol.Invoker, exporterMap map[string]protocol.Exporter) *DubboExporter {
return &DubboExporter{
ctx: ctx,
key: key,
invoker: invoker,
exporterMap: exporterMap,
BaseExporter: *protocol.NewBaseExporter(key, invoker, exporterMap),
}
}
func (de *DubboExporter) GetInvoker() protocol.Invoker {
return de.invoker
}
func (de *DubboExporter) Unexport() {
log.Info("DubboExporter unexport.")
de.invoker.Destroy()
delete(de.exporterMap, de.key)
}
package dubbo
import (
"context"
"errors"
"strconv"
"sync"
......@@ -18,24 +17,18 @@ import (
"github.com/dubbo/dubbo-go/protocol"
)
var Err_No_Reply = errors.New("no reply")
var Err_No_Reply = errors.New("request need @reply")
type DubboInvoker struct {
ctx context.Context
url config.URL
protocol.BaseInvoker
client *Client
available bool
destroyed bool
destroyLock sync.Mutex
}
func NewDubboInvoker(ctx context.Context, url config.URL, client *Client) *DubboInvoker {
func NewDubboInvoker(url config.IURL, client *Client) *DubboInvoker {
return &DubboInvoker{
ctx: ctx,
url: url,
client: client,
available: true,
destroyed: false,
BaseInvoker: protocol.NewBaseInvoker(url),
client: client,
}
}
......@@ -56,13 +49,13 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
}
if async {
if callBack, ok := inv.CallBack().(func(response CallResponse)); ok {
result.Err = di.client.AsyncCall(url.Location, *url, inv.MethodName(), inv.Reply(), callBack,
result.Err = di.client.AsyncCall(url.Location, *url, inv.MethodName(), inv.Arguments(), callBack, inv.Reply(),
WithCallRequestTimeout(inv.Params()["requestTimeout"].(time.Duration)),
WithCallResponseTimeout(inv.Params()["responseTimeout"].(time.Duration)),
WithCallSerialID(inv.Params()["serialID"].(SerialID)),
WithCallMeta_All(inv.Params()["callMeta"].(map[interface{}]interface{})))
} else {
result.Err = di.client.CallOneway(url.Location, *url, inv.MethodName(),
result.Err = di.client.CallOneway(url.Location, *url, inv.MethodName(), inv.Arguments(),
WithCallRequestTimeout(inv.Params()["requestTimeout"].(time.Duration)),
WithCallResponseTimeout(inv.Params()["responseTimeout"].(time.Duration)),
WithCallSerialID(inv.Params()["serialID"].(SerialID)),
......@@ -72,7 +65,7 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
if inv.Reply() == nil {
result.Err = Err_No_Reply
} else {
result.Err = di.client.Call(url.Location, *url, inv.MethodName(), inv.Reply(),
result.Err = di.client.Call(url.Location, *url, inv.MethodName(), inv.Arguments(), inv.Reply(),
WithCallRequestTimeout(inv.Params()["requestTimeout"].(time.Duration)),
WithCallResponseTimeout(inv.Params()["responseTimeout"].(time.Duration)),
WithCallSerialID(inv.Params()["serialID"].(SerialID)),
......@@ -81,26 +74,23 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
}
}
return result
}
func (di *DubboInvoker) GetUrl() config.IURL {
return &di.url
}
func (di *DubboInvoker) IsAvailable() bool {
return di.available
return &result
}
func (di *DubboInvoker) Destroy() {
if di.destroyed {
if di.IsDestroyed() {
return
}
di.destroyLock.Lock()
defer di.destroyLock.Unlock()
di.destroyed = true
di.available = false
if di.IsDestroyed() {
return
}
di.client.Close() // close client
di.BaseInvoker.Destroy()
if di.client != nil {
di.client.Close() // close client
}
}
package dubbo
import (
"context"
log "github.com/AlexStocks/log4go"
)
......@@ -20,20 +19,22 @@ func init() {
var dubboProtocol *DubboProtocol
type DubboProtocol struct {
ctx context.Context
exporterMap map[string]protocol.Exporter
invokers []protocol.Invoker
protocol.BaseProtocol
serverMap map[string]*Server
}
func NewDubboProtocol(ctx context.Context) *DubboProtocol {
return &DubboProtocol{ctx: ctx, exporterMap: make(map[string]protocol.Exporter)}
func NewDubboProtocol() *DubboProtocol {
return &DubboProtocol{
BaseProtocol: protocol.NewBaseProtocol(),
serverMap: make(map[string]*Server),
}
}
func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
url := invoker.GetUrl().(*config.URL)
serviceKey := url.Key()
exporter := NewDubboExporter(nil, serviceKey, invoker, dp.exporterMap)
dp.exporterMap[serviceKey] = exporter
exporter := NewDubboExporter(serviceKey, invoker, dp.ExporterMap())
dp.SetExporterMap(serviceKey, exporter)
log.Info("Export service: ", url.String())
// start server
......@@ -41,19 +42,28 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
return exporter
}
func (dp *DubboProtocol) Refer(url config.URL) protocol.Invoker {
invoker := NewDubboInvoker(nil, url, getClient())
dp.invokers = append(dp.invokers, invoker)
log.Info("Refer service: ", url.String())
func (dp *DubboProtocol) Refer(url config.IURL) protocol.Invoker {
invoker := NewDubboInvoker(url, NewClient())
dp.SetInvokers(invoker)
log.Info("Refer service: ", url.(*config.URL).String())
return invoker
}
func (dp *DubboProtocol) Destroy() {
log.Info("DubboProtocol destroy.")
srv.Stop() // stop server
dp.BaseProtocol.Destroy()
// stop server
for key, server := range dp.serverMap {
delete(dp.serverMap, key)
server.Stop()
}
}
func (dp *DubboProtocol) openServer(url config.URL) {
srv := NewServer(dp.ExporterMap()[url.Key()])
dp.serverMap[url.Location] = srv
srv.Start(url)
}
......@@ -61,9 +71,5 @@ func GetProtocol() protocol.Protocol {
if dubboProtocol != nil {
return dubboProtocol
}
return NewDubboProtocol(nil)
}
func getClient() *Client {
return NewClient()
return NewDubboProtocol()
}
......@@ -10,11 +10,16 @@ import (
import (
"github.com/AlexStocks/getty"
log "github.com/AlexStocks/log4go"
"github.com/dubbo/dubbo-go/config"
"github.com/dubbogo/hessian2"
jerrors "github.com/juju/errors"
)
import (
"github.com/dubbo/dubbo-go/common/constant"
"github.com/dubbo/dubbo-go/config"
"github.com/dubbo/dubbo-go/protocol"
)
// todo: WritePkg_Timeout will entry *.yml
const WritePkg_Timeout = 5 * time.Second
......@@ -107,14 +112,16 @@ func (h *RpcClientHandler) OnCron(session getty.Session) {
////////////////////////////////////////////
type RpcServerHandler struct {
exporter protocol.Exporter
maxSessionNum int
sessionTimeout time.Duration
sessionMap map[getty.Session]*rpcSession
rwlock sync.RWMutex
}
func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler {
func NewRpcServerHandler(exporter protocol.Exporter, maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler {
return &RpcServerHandler{
exporter: exporter,
maxSessionNum: maxSessionNum,
sessionTimeout: sessionTimeout,
sessionMap: make(map[getty.Session]*rpcSession),
......@@ -167,6 +174,23 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
}
p.Header.ResponseStatus = hessian.Response_OK
invoker := h.exporter.GetInvoker()
if invoker != nil {
attchments := map[string]string{}
// todo: use them followingly if need
url := invoker.GetUrl().(*config.URL)
attchments[constant.PATH_KEY] = url.Path
attchments[constant.GROUP_KEY] = url.Group
attchments[constant.SERVICE_KEY] = url.Service
attchments[constant.VERSION_KEY] = url.Version
result := invoker.Invoke(protocol.NewRPCInvocationForProvider(attchments))
if err := result.Error(); err != nil {
p.Header.ResponseStatus = hessian.Response_SERVER_ERROR
p.Body = err
return
}
}
// heartbeat
if p.Header.Type&hessian.Heartbeat != 0x00 {
log.Debug("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body)
......@@ -174,7 +198,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
return
}
// twoway
// not twoway
if p.Header.Type&hessian.Request_TwoWay == 0x00 {
h.reply(session, p, hessian.Response)
h.callService(p, nil)
......@@ -228,7 +252,14 @@ func (h *RpcServerHandler) callService(req *DubboPackage, ctx context.Context) {
}
}()
svc := req.Body.(map[string]interface{})["service"].(*config.Service)
svcIf := req.Body.(map[string]interface{})["service"]
if svcIf == nil {
log.Error("service not found!")
req.Header.ResponseStatus = hessian.Response_SERVICE_NOT_FOUND
req.Body = nil
return
}
svc := svcIf.(*config.Service)
method := svc.Method()[req.Service.Method]
// prepare argv
......
......@@ -2,6 +2,7 @@ package dubbo
import (
"fmt"
"github.com/dubbo/dubbo-go/protocol"
"gopkg.in/yaml.v2"
"io/ioutil"
"net"
......@@ -18,45 +19,51 @@ import (
"github.com/dubbo/dubbo-go/config"
)
var srv = NewServer()
var srvConf *ServerConfig
const CONF_SERVER_FILE_PATH = "CONF_SERVER_FILE_PATH"
type Server struct {
conf ServerConfig
tcpServerList []getty.Server
}
func NewServer() *Server {
s := &Server{}
const CONF_DUBBO_SERVER_FILE_PATH = "CONF_SERVER_FILE_PATH"
func init() {
// load serverconfig from *.yml
path := os.Getenv(CONF_SERVER_FILE_PATH)
path := os.Getenv(CONF_DUBBO_SERVER_FILE_PATH)
if path == "" {
log.Info("CONF_SERVER_FILE_PATH is null")
return s
return
}
file, err := ioutil.ReadFile(path)
if err != nil {
log.Error(jerrors.Trace(err))
return s
return
}
conf := &ServerConfig{}
err = yaml.Unmarshal(file, conf)
if err != nil {
log.Error(jerrors.Trace(err))
return s
return
}
if err := conf.CheckValidity(); err != nil {
log.Error("ServerConfig check failed: ", err)
return s
return
}
s.conf = *conf
srvConf = conf
}
type Server struct {
conf ServerConfig
tcpServer getty.Server
exporter protocol.Exporter
}
func NewServer(exporter protocol.Exporter) *Server {
s := &Server{
exporter: exporter,
conf: *srvConf,
}
return s
}
......@@ -86,8 +93,8 @@ func (s *Server) newSession(session getty.Session) error {
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcServerPackageHandler()) // TODO: now, a server will bind all service
session.SetEventListener(NewRpcServerHandler(conf.SessionNumber, conf.sessionTimeout))
session.SetPkgHandler(NewRpcServerPackageHandler())
session.SetEventListener(NewRpcServerHandler(s.exporter, conf.SessionNumber, conf.sessionTimeout))
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
......@@ -111,16 +118,10 @@ func (s *Server) Start(url config.URL) {
)
tcpServer.RunEventLoop(s.newSession)
log.Debug("s bind addr{%s} ok!", addr)
s.tcpServerList = append(s.tcpServerList, tcpServer)
s.tcpServer = tcpServer
}
func (s *Server) Stop() {
list := s.tcpServerList
s.tcpServerList = nil
if list != nil {
for _, tcpServer := range list {
tcpServer.Close()
}
}
s.tcpServer.Close()
}
......@@ -30,7 +30,7 @@ type RPCInvocation struct {
}
// todo: arguments table is too many
func NewRPCInvocation(methodName string, parameterTypes []reflect.Type, arguments []interface{},
func NewRPCInvocationForConsumer(methodName string, parameterTypes []reflect.Type, arguments []interface{},
reply interface{}, callBack interface{}, attachments map[string]string, invoker Invoker, params map[string]interface{}) *RPCInvocation {
return &RPCInvocation{
methodName: methodName,
......@@ -44,6 +44,12 @@ func NewRPCInvocation(methodName string, parameterTypes []reflect.Type, argument
}
}
func NewRPCInvocationForProvider(attachments map[string]string) *RPCInvocation {
return &RPCInvocation{
attachments: attachments,
}
}
func (r *RPCInvocation) MethodName() string {
return r.methodName
}
......
package protocol
import "github.com/dubbo/dubbo-go/common"
import (
log "github.com/AlexStocks/log4go"
)
import (
"github.com/dubbo/dubbo-go/common"
"github.com/dubbo/dubbo-go/config"
)
// Extension - Invoker
type Invoker interface {
common.Node
Invoke(Invocation) Result
}
/////////////////////////////
// base invoker
/////////////////////////////
type BaseInvoker struct {
url config.IURL
available bool
destroyed bool
}
func NewBaseInvoker(url config.IURL) BaseInvoker {
return BaseInvoker{
url: url,
available: true,
destroyed: false,
}
}
func (bi *BaseInvoker) GetUrl() config.IURL {
return bi.url
}
func (bi *BaseInvoker) IsAvailable() bool {
return bi.available
}
func (bi *BaseInvoker) IsDestroyed() bool {
return bi.destroyed
}
func (bi *BaseInvoker) Invoke(invocation Invocation) Result {
return nil
}
func (bi *BaseInvoker) Destroy() {
log.Info("Destroy invoker: %s", bi.GetUrl().(*config.URL).String())
bi.destroyed = true
bi.available = false
}
......@@ -20,9 +20,8 @@ import (
)
import (
"github.com/dubbo/dubbo-go/client"
"github.com/dubbo/dubbo-go/config"
"github.com/dubbo/dubbo-go/public"
"github.com/dubbo/dubbo-go/registry"
)
//////////////////////////////////////////////
......@@ -38,11 +37,6 @@ type Request struct {
method string
args interface{}
contentType string
conf registry.ReferenceConfig
}
func (r *Request) ServiceConfig() registry.ReferenceConfig {
return r.conf
}
//////////////////////////////////////////////
......@@ -83,30 +77,28 @@ func NewHTTPClient(opt *HTTPOptions) *HTTPClient {
}
}
func (c *HTTPClient) NewRequest(conf registry.ReferenceConfig, method string, args interface{}) (client.Request, error) {
func (c *HTTPClient) NewRequest(service config.URL, method string, args interface{}) *Request {
return &Request{
ID: atomic.AddInt64(&c.ID, 1),
group: conf.Group(),
protocol: conf.Protocol(),
version: conf.Version(),
service: conf.Service(),
group: service.Group,
protocol: service.Protocol,
version: service.Version,
service: service.Service,
method: method,
args: args,
conf: conf,
}, nil
}
}
func (c *HTTPClient) Call(ctx context.Context, service config.URL, request client.Request, rsp interface{}) error {
func (c *HTTPClient) Call(ctx context.Context, service config.URL, req *Request, rsp interface{}) error {
// header
req := request.(*Request)
httpHeader := http.Header{}
httpHeader.Set("Content-Type", "application/json")
httpHeader.Set("Accept", "application/json")
reqTimeout := c.options.HTTPTimeout
if service.Timeout() != 0 && service.Timeout() < reqTimeout {
reqTimeout = time.Duration(service.Timeout())
if service.Timeout != 0 && service.Timeout < reqTimeout {
reqTimeout = time.Duration(service.Timeout)
}
if reqTimeout <= 0 {
reqTimeout = 1e8
......@@ -130,7 +122,7 @@ func (c *HTTPClient) Call(ctx context.Context, service config.URL, request clien
return jerrors.Trace(err)
}
rspBody, err := c.Do(service.Location(), service.Query().Get("interface"), httpHeader, reqBody)
rspBody, err := c.Do(service.Location, service.Params.Get("interface"), httpHeader, reqBody)
if err != nil {
return jerrors.Trace(err)
}
......
File moved
package jsonrpc
import (
"github.com/dubbo/dubbo-go/protocol"
)
type JsonrpcExporter struct {
protocol.BaseExporter
}
func NewJsonrpcExporter(key string, invoker protocol.Invoker, exporterMap map[string]protocol.Exporter) *JsonrpcExporter {
return &JsonrpcExporter{
BaseExporter: *protocol.NewBaseExporter(key, invoker, exporterMap),
}
}
package jsonrpc
import (
"context"
)
import (
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
)
import (
"github.com/dubbo/dubbo-go/config"
"github.com/dubbo/dubbo-go/protocol"
"github.com/dubbo/dubbo-go/public"
)
type JsonrpcInvoker struct {
protocol.BaseInvoker
client *HTTPClient
}
func NewJsonrpcInvoker(url config.IURL, client *HTTPClient) *JsonrpcInvoker {
return &JsonrpcInvoker{
BaseInvoker: protocol.NewBaseInvoker(url),
client: client,
}
}
func (ji *JsonrpcInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
var (
result protocol.RPCResult
)
inv := invocation.(*protocol.RPCInvocation)
url := inv.Invoker().GetUrl().(*config.URL)
req := ji.client.NewRequest(*url, inv.MethodName(), inv.Arguments())
ctx := context.WithValue(context.Background(), public.DUBBOGO_CTX_KEY, map[string]string{
"X-Proxy-Id": "dubbogo",
"X-Services": url.Service,
"X-Method": inv.MethodName(),
})
if err := ji.client.Call(ctx, *url, req, inv.Reply()); err != nil {
log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err))
result.Err = err
} else {
result.Rest = inv.Reply()
}
return &result
}
package jsonrpc
import (
log "github.com/AlexStocks/log4go"
)
import (
"github.com/dubbo/dubbo-go/common/extension"
"github.com/dubbo/dubbo-go/config"
"github.com/dubbo/dubbo-go/protocol"
)
const JSONRPC = "jsonrpc"
func init() {
extension.SetProtocol(JSONRPC, GetProtocol)
}
var jsonrpcProtocol *JsonrpcProtocol
type JsonrpcProtocol struct {
protocol.BaseProtocol
serverMap map[string]*Server
}
func NewDubboProtocol() *JsonrpcProtocol {
return &JsonrpcProtocol{
BaseProtocol: protocol.NewBaseProtocol(),
serverMap: make(map[string]*Server),
}
}
func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
url := invoker.GetUrl().(*config.URL)
serviceKey := url.Key()
exporter := NewJsonrpcExporter(serviceKey, invoker, jp.ExporterMap())
jp.SetExporterMap(serviceKey, exporter)
log.Info("Export service: ", url.String())
// start server
jp.openServer(*url)
return exporter
}
func (jp *JsonrpcProtocol) Refer(url config.IURL) protocol.Invoker {
invoker := NewJsonrpcInvoker(url, NewHTTPClient(&HTTPOptions{
HandshakeTimeout: config.GetConsumerConfig().ConnectTimeout,
HTTPTimeout: config.GetConsumerConfig().RequestTimeout,
}))
jp.SetInvokers(invoker)
log.Info("Refer service: ", url.(*config.URL).String())
return invoker
}
func (jp *JsonrpcProtocol) Destroy() {
log.Info("jsonrpcProtocol destroy.")
jp.BaseProtocol.Destroy()
// stop server
for key, server := range jp.serverMap {
delete(jp.serverMap, key)
server.Stop()
}
}
func (jp *JsonrpcProtocol) openServer(url config.URL) {
srv := NewServer(jp.ExporterMap()[url.Key()])
jp.serverMap[url.Location] = srv
srv.Start(url)
}
func GetProtocol() protocol.Protocol {
if jsonrpcProtocol != nil {
return jsonrpcProtocol
}
return NewDubboProtocol()
}
......@@ -4,10 +4,12 @@ import (
"bufio"
"bytes"
"context"
"github.com/dubbo/dubbo-go/plugins"
"github.com/dubbo/dubbo-go/common/constant"
"io"
"io/ioutil"
"net"
"net/http"
"reflect"
"runtime"
"runtime/debug"
"sync"
......@@ -15,14 +17,20 @@ import (
)
import (
"github.com/AlexStocks/goext/net"
log "github.com/AlexStocks/log4go"
"github.com/dubbo/dubbo-go/registry"
jerrors "github.com/juju/errors"
)
import (
"github.com/dubbo/dubbo-go/server"
"github.com/dubbo/dubbo-go/config"
"github.com/dubbo/dubbo-go/protocol"
)
var (
// A value sent as a placeholder for the server's response value when the server
// receives an invalid request. It is never decoded by the client since the Response
// contains an error when it is used.
invalidRequest = struct{}{}
)
const (
......@@ -31,107 +39,24 @@ const (
PathPrefix = byte('/')
)
// Handler interface represents a Service request handler. It's generated
// by passing any type of public concrete object with methods into server.NewHandler.
// Most will pass in a struct.
//
// Example:
//
// type Hello struct {}
//
// func (s *Hello) Method(context, request, response) error {
// return nil
// }
//
// func (s *Hello) Service() string {
// return "com.youni.service"
// }
//
// func (s *Hello) Version() string {
// return "1.0.0"
// }
type Handler interface {
Service() string // Service Interface
Version() string
}
type Option func(*Options)
type Options struct {
Registry registry.Registry
ConfList []server.ServerConfig
ServiceConfList []registry.ProviderServiceConfig
Timeout time.Duration
}
func newOptions(opt ...Option) Options {
opts := Options{}
for _, o := range opt {
o(&opts)
}
if opts.Registry == nil {
panic("server.Options.Registry is nil")
}
return opts
}
// Registry used for discovery
func Registry(r registry.Registry) Option {
return func(o *Options) {
o.Registry = r
}
}
func ConfList(confList []server.ServerConfig) Option {
return func(o *Options) {
o.ConfList = confList
for i := 0; i < len(o.ConfList); i++ {
if o.ConfList[i].IP == "" {
o.ConfList[i].IP, _ = gxnet.GetLocalIP()
}
}
}
}
func ServiceConfList(confList []registry.ProviderServiceConfig) Option {
return func(o *Options) {
o.ServiceConfList = confList
}
}
type Server struct {
rpc []*serviceMap
done chan struct{}
once sync.Once
exporter protocol.Exporter
done chan struct{}
once sync.Once
sync.RWMutex
opts Options
handlers map[string]Handler
wg sync.WaitGroup
wg sync.WaitGroup
timeout time.Duration
}
func NewServer(opts ...Option) *Server {
var (
num int
)
options := newOptions(opts...)
Servers := make([]*serviceMap, len(options.ConfList))
num = len(options.ConfList)
for i := 0; i < num; i++ {
Servers[i] = initServer()
}
func NewServer(exporter protocol.Exporter) *Server {
return &Server{
opts: options,
rpc: Servers,
handlers: make(map[string]Handler),
exporter: exporter,
done: make(chan struct{}),
}
}
func (s *Server) handlePkg(rpc *serviceMap, conn net.Conn) {
func (s *Server) handlePkg(conn net.Conn) {
defer func() {
if r := recover(); r != nil {
log.Warn("connection{local:%v, remote:%v} panic error:%#v, debug stack:%s",
......@@ -194,7 +119,7 @@ func (s *Server) handlePkg(rpc *serviceMap, conn net.Conn) {
}
reqHeader["HttpMethod"] = r.Method
httpTimeout := s.Options().Timeout
httpTimeout := s.timeout
contentType := reqHeader["Content-Type"]
if contentType != "application/json" && contentType != "application/json-rpc" {
setReadTimeout(conn, httpTimeout)
......@@ -206,6 +131,26 @@ func (s *Server) handlePkg(rpc *serviceMap, conn net.Conn) {
return
}
// exporter invoke
invoker := s.exporter.GetInvoker()
if invoker != nil {
attchments := map[string]string{}
// todo: use them followingly if need
url := invoker.GetUrl().(*config.URL)
attchments[constant.PATH_KEY] = url.Path
attchments[constant.GROUP_KEY] = url.Group
attchments[constant.SERVICE_KEY] = url.Service
attchments[constant.VERSION_KEY] = url.Version
result := invoker.Invoke(protocol.NewRPCInvocationForProvider(attchments))
if err := result.Error(); err != nil {
if errRsp := sendErrorResp(r.Header, []byte(err.Error())); errRsp != nil {
log.Warn("Exporter: sendErrorResp(header:%#v, error:%v) = error:%s",
r.Header, err, errRsp)
}
return
}
}
ctx := context.Background()
if len(reqHeader["Timeout"]) > 0 {
timeout, err := time.ParseDuration(reqHeader["Timeout"])
......@@ -217,7 +162,7 @@ func (s *Server) handlePkg(rpc *serviceMap, conn net.Conn) {
}
setReadTimeout(conn, httpTimeout)
if err := rpc.serveRequest(ctx, reqHeader, reqBody, conn); err != nil {
if err := serveRequest(ctx, reqHeader, reqBody, conn); err != nil {
if errRsp := sendErrorResp(r.Header, []byte(jerrors.ErrorStack(err))); errRsp != nil {
log.Warn("sendErrorResp(header:%#v, error:%s) = error:%s",
r.Header, jerrors.ErrorStack(err), errRsp)
......@@ -229,68 +174,6 @@ func (s *Server) handlePkg(rpc *serviceMap, conn net.Conn) {
}
}
func (s *Server) Options() Options {
s.RLock()
defer s.RUnlock()
return s.opts
}
func (s *Server) Handle(h Handler) error {
var (
err error
)
opts := s.Options()
serviceConf := plugins.DefaultProviderServiceConfig()()
serviceConf.SetService(h.Service())
serviceConf.SetVersion(h.Version())
flag := 0
serviceNum := len(opts.ServiceConfList)
ServerNum := len(opts.ConfList)
for i := 0; i < serviceNum; i++ {
if opts.ServiceConfList[i].Service() == serviceConf.Service() &&
opts.ServiceConfList[i].Version() == serviceConf.Version() {
serviceConf.SetProtocol(opts.ServiceConfList[i].Protocol())
serviceConf.SetGroup(opts.ServiceConfList[i].Group())
// serviceConf.Version = opts.ServiceConfList[i].Version
var methods, path string
for j := 0; j < ServerNum; j++ {
if opts.ConfList[j].Protocol == serviceConf.Protocol() {
s.Lock()
methods, err = s.rpc[j].register(h)
s.Unlock()
if err != nil {
return err
}
serviceConf.SetMethods(methods)
path = opts.ConfList[j].Address()
serviceConf.SetPath(path)
err = opts.Registry.Register(serviceConf)
if err != nil {
return err
}
flag = 1
}
}
}
}
if flag == 0 {
return jerrors.Errorf("fail to register Handler{service:%s, version:%s}",
serviceConf.Service(), serviceConf.Version())
}
s.Lock()
s.handlers[h.Service()] = h
s.Unlock()
return nil
}
func accept(listener net.Listener, fn func(net.Conn)) error {
var (
err error
......@@ -335,49 +218,137 @@ func accept(listener net.Listener, fn func(net.Conn)) error {
}
}
func (s *Server) Start() error {
config := s.Options()
func (s *Server) Start(url config.URL) {
listener, err := net.Listen("tcp", url.Location)
if err != nil {
log.Error("jsonrpc server [%s] start failed: %v", url.Service, err)
return
}
log.Info("rpc server start to listen on %s", listener.Addr())
ServerNum := len(config.ConfList)
for i := 0; i < ServerNum; i++ {
listener, err := net.Listen("tcp", config.ConfList[i].Address())
s.wg.Add(1)
go func() {
accept(listener, func(conn net.Conn) { s.handlePkg(conn) })
s.wg.Done()
}()
s.wg.Add(1)
go func() { // Server done goroutine
var err error
<-s.done // step1: block to wait for done channel(wait Server.Stop step2)
err = listener.Close() // step2: and then close listener
if err != nil {
return err
log.Warn("listener{addr:%s}.Close() = error{%#v}", listener.Addr(), err)
}
log.Info("rpc server start to listen on %s", listener.Addr())
s.Lock()
rpc := s.rpc[i]
s.Unlock()
s.wg.Add(1)
go func(servo *serviceMap) {
accept(listener, func(conn net.Conn) { s.handlePkg(rpc, conn) })
s.wg.Done()
}(rpc)
s.wg.Add(1)
go func(servo *serviceMap) { // Server done goroutine
var err error
<-s.done // step1: block to wait for done channel(wait Server.Stop step2)
err = listener.Close() // step2: and then close listener
if err != nil {
log.Warn("listener{addr:%s}.Close() = error{%#v}", listener.Addr(), err)
}
s.wg.Done()
}(rpc)
}
return nil
s.wg.Done()
}()
}
func (s *Server) Stop() {
s.once.Do(func() {
close(s.done)
s.wg.Wait()
if s.opts.Registry != nil {
s.opts.Registry.Close()
s.opts.Registry = nil
})
}
func serveRequest(ctx context.Context,
header map[string]string, body []byte, conn net.Conn) error {
// read request header
codec := newServerCodec()
err := codec.ReadHeader(header, body)
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
return jerrors.Trace(err)
}
return jerrors.New("server cannot decode request: " + err.Error())
}
serviceName := header["Path"]
methodName := codec.req.Method
if len(serviceName) == 0 || len(methodName) == 0 {
codec.ReadBody(nil)
return jerrors.New("service/method request ill-formed: " + serviceName + "/" + methodName)
}
// get method
svc := config.ServiceMap.GetService(serviceName)
if svc == nil {
codec.ReadBody(nil)
return jerrors.New("cannot find svc " + serviceName)
}
mtype := svc.Method()[methodName]
if mtype == nil {
codec.ReadBody(nil)
return jerrors.New("cannot find method " + methodName + " of svc " + serviceName)
}
// get args
var argv reflect.Value
argIsValue := false
if mtype.ArgType().Kind() == reflect.Ptr {
argv = reflect.New(mtype.ArgType().Elem())
} else {
argv = reflect.New(mtype.ArgType())
argIsValue = true
}
// argv guaranteed to be a pointer now.
if err = codec.ReadBody(argv.Interface()); err != nil {
return jerrors.Trace(err)
}
if argIsValue {
argv = argv.Elem()
}
replyv := reflect.New(mtype.ReplyType().Elem())
// call service.method(args)
var errMsg string
returnValues := mtype.Method().Func.Call([]reflect.Value{
svc.Rcvr(),
mtype.SuiteContext(ctx),
reflect.ValueOf(argv.Interface()),
reflect.ValueOf(replyv.Interface()),
})
// The return value for the method is an error.
if retErr := returnValues[0].Interface(); retErr != nil {
errMsg = retErr.(error).Error()
}
// write response
code := 200
rspReply := replyv.Interface()
if len(errMsg) != 0 {
code = 500
rspReply = invalidRequest
}
rspStream, err := codec.Write(errMsg, rspReply)
if err != nil {
return jerrors.Trace(err)
}
rsp := &http.Response{
StatusCode: code,
ProtoMajor: 1,
ProtoMinor: 1,
Header: make(http.Header),
ContentLength: int64(len(rspStream)),
Body: ioutil.NopCloser(bytes.NewReader(rspStream)),
}
delete(header, "Content-Type")
delete(header, "Content-Length")
delete(header, "Timeout")
for k, v := range header {
rsp.Header.Set(k, v)
}
rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize))
rspBuf.Reset()
if err = rsp.Write(rspBuf); err != nil {
log.Warn("rsp.Write(rsp:%#v) = error:%s", rsp, err)
return nil
}
if _, err = rspBuf.WriteTo(conn); err != nil {
log.Warn("rspBuf.WriteTo(conn:%#v) = error:%s", conn, err)
}
return nil
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment