Skip to content
Snippets Groups Projects
Unverified Commit fd1989f3 authored by lni's avatar lni Committed by GitHub
Browse files

hakeeper: added the ability to set initial cluster info (#3741)

parent 51da9b87
No related branches found
No related tags found
No related merge requests found
......@@ -25,6 +25,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/common/moerr"
pb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
)
var (
......@@ -53,6 +54,8 @@ const (
logHeartbeatTag
getIDTag
updateScheduleCommandTag
setStateTag
initialClusterRequestTag
)
type StateQuery struct{}
......@@ -69,6 +72,39 @@ func parseCmdTag(cmd []byte) uint16 {
return binaryEnc.Uint16(cmd)
}
func GetInitialClusterRequestCmd(numOfLogShards uint64,
numOfDNShards uint64, numOfLogReplicas uint64) []byte {
req := pb.InitialClusterRequest{
NumOfLogShards: numOfLogShards,
NumOfDNShards: numOfDNShards,
NumOfLogReplicas: numOfLogReplicas,
}
payload, err := req.Marshal()
if err != nil {
panic(err)
}
cmd := make([]byte, headerSize+len(payload))
binaryEnc.PutUint16(cmd, initialClusterRequestTag)
copy(cmd[headerSize:], payload)
return cmd
}
func isInitialClusterRequestCmd(cmd []byte) bool {
return parseCmdTag(cmd) == initialClusterRequestTag
}
func parseInitialClusterRequestCmd(cmd []byte) pb.InitialClusterRequest {
if parseCmdTag(cmd) != initialClusterRequestTag {
panic("not a initialClusterRequestTag")
}
payload := cmd[headerSize:]
var result pb.InitialClusterRequest
if err := result.Unmarshal(payload); err != nil {
panic(err)
}
return result
}
func GetUpdateCommandsCmd(term uint64, cmds []pb.ScheduleCommand) []byte {
b := pb.CommandBatch{
Term: term,
......@@ -149,6 +185,21 @@ func parseGetIDCmd(cmd []byte) uint64 {
return binaryEnc.Uint64(cmd[headerSize:])
}
func isSetStateCmd(cmd []byte) bool {
return len(cmd) == headerSize+4 && binaryEnc.Uint16(cmd) == setStateTag
}
func parseSetStateCmd(cmd []byte) pb.HAKeeperState {
return pb.HAKeeperState(binaryEnc.Uint32(cmd[headerSize:]))
}
func GetSetStateCmd(state pb.HAKeeperState) []byte {
cmd := make([]byte, headerSize+4)
binaryEnc.PutUint16(cmd, setStateTag)
binaryEnc.PutUint32(cmd[headerSize:], uint32(state))
return cmd
}
func GetTickCmd() []byte {
cmd := make([]byte, headerSize)
binaryEnc.PutUint16(cmd, tickTag)
......@@ -263,7 +314,73 @@ func (s *stateMachine) handleGetIDCmd(cmd []byte) sm.Result {
return sm.Result{Value: v}
}
func (s *stateMachine) handleSetStateCmd(cmd []byte) sm.Result {
re := func() sm.Result {
data := make([]byte, 4)
binaryEnc.PutUint32(data, uint32(s.state.State))
return sm.Result{Data: data}
}
state := parseSetStateCmd(cmd)
switch s.state.State {
case pb.HAKeeperCreated:
return re()
case pb.HAKeeperBootstrapping:
if state == pb.HAKeeperBootstrapFailed || state == pb.HAKeeperRunning {
s.state.State = state
return sm.Result{}
}
return re()
case pb.HAKeeperBootstrapFailed:
return re()
case pb.HAKeeperRunning:
return re()
default:
panic("unknown HAKeeper state")
}
}
func (s *stateMachine) handleInitialClusterRequestCmd(cmd []byte) sm.Result {
result := sm.Result{Value: uint64(s.state.State)}
if s.state.State != pb.HAKeeperCreated {
return result
}
req := parseInitialClusterRequestCmd(cmd)
if req.NumOfLogShards != req.NumOfDNShards {
panic("DN:Log 1:1 mode is the only supported mode")
}
// FIXME: NextID should be initialized to 1, as 0 is already statically
// assigned to HAKeeper itself
s.state.NextID++
dnShards := make([]metadata.DNShardRecord, 0)
logShards := make([]metadata.LogShardRecord, 0)
for i := uint64(0); i < req.NumOfLogShards; i++ {
rec := metadata.LogShardRecord{
ShardID: s.state.NextID,
NumberOfReplicas: req.NumOfLogReplicas,
}
s.state.NextID++
logShards = append(logShards, rec)
drec := metadata.DNShardRecord{
ShardID: s.state.NextID,
LogShardID: rec.ShardID,
}
s.state.NextID++
dnShards = append(dnShards, drec)
}
s.state.ClusterInfo = pb.ClusterInfo{
DNShards: dnShards,
LogShards: logShards,
}
plog.Infof("HAKeeper set to the BOOTSTRAPPING state")
s.state.State = pb.HAKeeperBootstrapping
return result
}
func (s *stateMachine) Update(e sm.Entry) (sm.Result, error) {
// TODO: we need to make sure InitialClusterRequestCmd is the
// first user cmd added to the Raft log
cmd := e.Cmd
if _, ok := isCreateLogShardCmd(cmd); ok {
return s.handleCreateLogShardCmd(cmd), nil
......@@ -277,13 +394,17 @@ func (s *stateMachine) Update(e sm.Entry) (sm.Result, error) {
return s.handleGetIDCmd(cmd), nil
} else if isUpdateCommandsCmd(cmd) {
return s.handleUpdateCommandsCmd(cmd), nil
} else if isSetStateCmd(cmd) {
return s.handleSetStateCmd(cmd), nil
} else if isInitialClusterRequestCmd(cmd) {
return s.handleInitialClusterRequestCmd(cmd), nil
}
panic(moerr.NewError(moerr.INVALID_INPUT, "unexpected haKeeper cmd"))
}
func (s *stateMachine) handleStateQuery() interface{} {
// FIXME: pretty sure we need to deepcopy here
return &pb.HAKeeperState{
return &pb.CheckerState{
Tick: s.state.Tick,
ClusterInfo: s.state.ClusterInfo,
DNState: s.state.DNState,
......
......@@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/require"
pb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
)
func TestAssignID(t *testing.T) {
......@@ -339,3 +340,89 @@ func TestScheduleCommandQuery(t *testing.T) {
}
assert.Equal(t, b, *cb)
}
func TestInitialState(t *testing.T) {
rsm := NewStateMachine(0, 1).(*stateMachine)
assert.Equal(t, pb.HAKeeperCreated, rsm.state.State)
}
func TestSetState(t *testing.T) {
tests := []struct {
initialState pb.HAKeeperState
newState pb.HAKeeperState
result pb.HAKeeperState
}{
{pb.HAKeeperCreated, pb.HAKeeperBootstrapping, pb.HAKeeperCreated},
{pb.HAKeeperCreated, pb.HAKeeperBootstrapFailed, pb.HAKeeperCreated},
{pb.HAKeeperCreated, pb.HAKeeperRunning, pb.HAKeeperCreated},
{pb.HAKeeperCreated, pb.HAKeeperCreated, pb.HAKeeperCreated},
{pb.HAKeeperBootstrapping, pb.HAKeeperCreated, pb.HAKeeperBootstrapping},
{pb.HAKeeperBootstrapping, pb.HAKeeperBootstrapFailed, pb.HAKeeperBootstrapFailed},
{pb.HAKeeperBootstrapping, pb.HAKeeperRunning, pb.HAKeeperRunning},
{pb.HAKeeperBootstrapping, pb.HAKeeperBootstrapping, pb.HAKeeperBootstrapping},
{pb.HAKeeperBootstrapFailed, pb.HAKeeperBootstrapFailed, pb.HAKeeperBootstrapFailed},
{pb.HAKeeperBootstrapFailed, pb.HAKeeperCreated, pb.HAKeeperBootstrapFailed},
{pb.HAKeeperBootstrapFailed, pb.HAKeeperBootstrapping, pb.HAKeeperBootstrapFailed},
{pb.HAKeeperBootstrapFailed, pb.HAKeeperRunning, pb.HAKeeperBootstrapFailed},
{pb.HAKeeperRunning, pb.HAKeeperRunning, pb.HAKeeperRunning},
{pb.HAKeeperRunning, pb.HAKeeperCreated, pb.HAKeeperRunning},
{pb.HAKeeperRunning, pb.HAKeeperBootstrapping, pb.HAKeeperRunning},
{pb.HAKeeperRunning, pb.HAKeeperBootstrapFailed, pb.HAKeeperRunning},
}
for _, tt := range tests {
rsm := stateMachine{
state: pb.HAKeeperRSMState{
State: tt.initialState,
},
}
cmd := GetSetStateCmd(tt.newState)
rsm.Update(sm.Entry{Cmd: cmd})
assert.Equal(t, tt.result, rsm.state.State)
}
}
func TestInitialClusterRequestCmd(t *testing.T) {
cmd := GetInitialClusterRequestCmd(2, 2, 3)
assert.True(t, isInitialClusterRequestCmd(cmd))
assert.False(t, isInitialClusterRequestCmd(GetTickCmd()))
req := parseInitialClusterRequestCmd(cmd)
assert.Equal(t, uint64(2), req.NumOfLogShards)
assert.Equal(t, uint64(2), req.NumOfDNShards)
assert.Equal(t, uint64(3), req.NumOfLogReplicas)
}
func TestHandleInitialClusterRequestCmd(t *testing.T) {
cmd := GetInitialClusterRequestCmd(2, 2, 3)
rsm := NewStateMachine(0, 1).(*stateMachine)
result, err := rsm.Update(sm.Entry{Cmd: cmd})
require.NoError(t, err)
assert.Equal(t, sm.Result{Value: 0}, result)
expected := pb.ClusterInfo{
LogShards: []metadata.LogShardRecord{
{
ShardID: 1,
NumberOfReplicas: 3,
},
{
ShardID: 3,
NumberOfReplicas: 3,
},
},
DNShards: []metadata.DNShardRecord{
{
ShardID: 2,
LogShardID: 1,
},
{
ShardID: 4,
LogShardID: 3,
},
},
}
assert.Equal(t, expected, rsm.state.ClusterInfo)
assert.Equal(t, pb.HAKeeperBootstrapping, rsm.state.State)
assert.Equal(t, uint64(5), rsm.state.NextID)
}
......@@ -100,7 +100,7 @@ func (l *store) healthCheck() {
// TODO: check whether this is temp error
return
}
state := s.(*pb.HAKeeperState)
state := s.(*pb.CheckerState)
cmds := l.checker.Check(l.alloc,
state.ClusterInfo, state.DNState, state.LogState, state.Tick)
if len(cmds) > 0 {
......
This diff is collapsed.
......@@ -31,11 +31,7 @@ type DNShardRecord struct {
// ShardID the id of the DN shard.
ShardID uint64 `protobuf:"varint,1,opt,name=ShardID,proto3" json:"ShardID,omitempty"`
// LogShardID a DN corresponds to a unique Shard of LogService.
LogShardID uint64 `protobuf:"varint,2,opt,name=LogShardID,proto3" json:"LogShardID,omitempty"`
// Start A DN manages data within a range [Start, end)
Start []byte `protobuf:"bytes,3,opt,name=Start,proto3" json:"Start,omitempty"`
// End A DN manages data within a range [Start, end)
End []byte `protobuf:"bytes,4,opt,name=End,proto3" json:"End,omitempty"`
LogShardID uint64 `protobuf:"varint,2,opt,name=LogShardID,proto3" json:"LogShardID,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
......@@ -88,20 +84,6 @@ func (m *DNShardRecord) GetLogShardID() uint64 {
return 0
}
func (m *DNShardRecord) GetStart() []byte {
if m != nil {
return m.Start
}
return nil
}
func (m *DNShardRecord) GetEnd() []byte {
if m != nil {
return m.End
}
return nil
}
// DNShard
type DNShard struct {
// DNShard extends DNShardRecord
......@@ -242,27 +224,25 @@ func init() {
func init() { proto.RegisterFile("metadata.proto", fileDescriptor_56d9f74966f40d04) }
var fileDescriptor_56d9f74966f40d04 = []byte{
// 306 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x91, 0xd1, 0x4a, 0xc3, 0x30,
0x14, 0x86, 0x8d, 0x9b, 0x6e, 0x3b, 0xea, 0x18, 0x41, 0xb0, 0x88, 0x74, 0x63, 0x57, 0x43, 0xb0,
0x45, 0x7d, 0x00, 0x71, 0x4c, 0x44, 0x90, 0x0a, 0xd9, 0x9d, 0x77, 0x69, 0x93, 0x65, 0x55, 0xdb,
0xd4, 0x34, 0x05, 0x9f, 0xc1, 0x27, 0xdb, 0xe5, 0x9e, 0x60, 0x48, 0x9f, 0x44, 0x96, 0x35, 0x73,
0xce, 0x0b, 0xef, 0xce, 0xf7, 0x9f, 0x36, 0xff, 0xf9, 0xcf, 0x81, 0x76, 0xc2, 0x35, 0x65, 0x54,
0x53, 0x2f, 0x53, 0x52, 0x4b, 0xdc, 0xb4, 0x7c, 0x7a, 0x21, 0x62, 0x3d, 0x2d, 0x42, 0x2f, 0x92,
0x89, 0x2f, 0xa4, 0x90, 0xbe, 0xf9, 0x20, 0x2c, 0x26, 0x86, 0x0c, 0x98, 0x6a, 0xf5, 0x63, 0xff,
0x1d, 0x8e, 0x46, 0xc1, 0x78, 0x4a, 0x15, 0x23, 0x3c, 0x92, 0x8a, 0x61, 0x07, 0x1a, 0x06, 0x1f,
0x46, 0x0e, 0xea, 0xa1, 0x41, 0x9d, 0x58, 0xc4, 0x2e, 0xc0, 0xa3, 0x14, 0xb6, 0xb9, 0x6b, 0x9a,
0x1b, 0x0a, 0x3e, 0x86, 0xbd, 0xb1, 0xa6, 0x4a, 0x3b, 0xb5, 0x1e, 0x1a, 0x1c, 0x92, 0x15, 0xe0,
0x0e, 0xd4, 0xee, 0x52, 0xe6, 0xd4, 0x8d, 0xb6, 0x2c, 0xfb, 0x9f, 0x08, 0x1a, 0x95, 0x27, 0xbe,
0xdf, 0xb2, 0x37, 0x9e, 0x07, 0x57, 0x27, 0xde, 0x3a, 0xdf, 0xaf, 0xf6, 0xb0, 0x39, 0x5b, 0x74,
0x77, 0xe6, 0x8b, 0x2e, 0x22, 0x5b, 0x63, 0x9f, 0x41, 0x8b, 0xf0, 0xec, 0x2d, 0x8e, 0xe8, 0x7a,
0xb6, 0x1f, 0x61, 0x19, 0xea, 0x96, 0x31, 0xc5, 0xf3, 0xdc, 0x0c, 0xd7, 0x22, 0x16, 0xfb, 0x2f,
0xd0, 0xb6, 0x11, 0xfe, 0x5d, 0xc0, 0x39, 0x74, 0x82, 0x22, 0x09, 0xb9, 0x7a, 0x9a, 0x54, 0x4f,
0xe7, 0x95, 0xd5, 0x1f, 0x1d, 0x63, 0xa8, 0x07, 0x34, 0xe1, 0x95, 0x9d, 0xa9, 0x87, 0x37, 0xb3,
0xd2, 0x45, 0xf3, 0xd2, 0x45, 0x5f, 0xa5, 0x8b, 0x9e, 0x2f, 0x37, 0x0e, 0x95, 0x50, 0xad, 0xe2,
0x0f, 0xa9, 0x62, 0x11, 0xa7, 0x16, 0x52, 0xee, 0x67, 0xaf, 0xc2, 0xcf, 0x42, 0xdf, 0xee, 0x22,
0xdc, 0x37, 0x37, 0xbb, 0xfe, 0x0e, 0x00, 0x00, 0xff, 0xff, 0x44, 0xd1, 0x9e, 0x99, 0xfe, 0x01,
0x00, 0x00,
// 279 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcb, 0x4d, 0x2d, 0x49,
0x4c, 0x49, 0x2c, 0x49, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x80, 0xf1, 0xa5, 0x74,
0xd3, 0x33, 0x4b, 0x32, 0x4a, 0x93, 0xf4, 0x92, 0xf3, 0x73, 0xf5, 0xd3, 0xf3, 0xd3, 0xf3, 0xf5,
0xc1, 0x0a, 0x92, 0x4a, 0xd3, 0xc0, 0x3c, 0x30, 0x07, 0xcc, 0x82, 0x68, 0x54, 0xf2, 0xe4, 0xe2,
0x75, 0xf1, 0x0b, 0xce, 0x48, 0x2c, 0x4a, 0x09, 0x4a, 0x4d, 0xce, 0x2f, 0x4a, 0x11, 0x92, 0xe0,
0x62, 0x07, 0x73, 0x3d, 0x5d, 0x24, 0x18, 0x15, 0x18, 0x35, 0x58, 0x82, 0x60, 0x5c, 0x21, 0x39,
0x2e, 0x2e, 0x9f, 0xfc, 0x74, 0x98, 0x24, 0x13, 0x58, 0x12, 0x49, 0x44, 0xa9, 0x8b, 0x91, 0x8b,
0x1d, 0x6a, 0x96, 0x90, 0x3b, 0x9a, 0xb1, 0x60, 0xb3, 0xb8, 0x8d, 0xc4, 0xf5, 0xe0, 0xee, 0x46,
0x91, 0x76, 0xe2, 0x38, 0x71, 0x4f, 0x9e, 0xe1, 0xc2, 0x3d, 0x79, 0xc6, 0x20, 0x34, 0xe7, 0xc8,
0x70, 0x71, 0x06, 0xa5, 0x16, 0xe4, 0x64, 0x26, 0x27, 0xc2, 0xed, 0x44, 0x08, 0x80, 0x1c, 0xeb,
0x98, 0x92, 0x52, 0x94, 0x5a, 0x5c, 0x2c, 0xc1, 0xac, 0xc0, 0xa8, 0xc1, 0x19, 0x04, 0xe3, 0x2a,
0x65, 0x71, 0xf1, 0xc1, 0x9c, 0x46, 0xd0, 0x63, 0x5a, 0x5c, 0x02, 0x7e, 0xa5, 0xb9, 0x49, 0xa9,
0x45, 0xfe, 0x69, 0x50, 0xa3, 0x8b, 0xa1, 0x56, 0x61, 0x88, 0x0b, 0x09, 0x71, 0xb1, 0xf8, 0x25,
0xe6, 0xa6, 0x42, 0xad, 0x03, 0xb3, 0x9d, 0xec, 0x4f, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e,
0xf1, 0xc1, 0x23, 0x39, 0xc6, 0x28, 0x43, 0xa4, 0x08, 0xc8, 0x4d, 0x2c, 0x29, 0xca, 0xac, 0xc8,
0x2f, 0xca, 0x4c, 0xcf, 0xcc, 0x83, 0x71, 0xf2, 0x52, 0xf5, 0x0b, 0xb2, 0xd3, 0xf5, 0x0b, 0x92,
0xf4, 0x61, 0x61, 0x91, 0xc4, 0x06, 0x8e, 0x0b, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0x56,
0xc5, 0x4d, 0x2e, 0xd6, 0x01, 0x00, 0x00,
}
func (m *DNShardRecord) Marshal() (dAtA []byte, err error) {
......@@ -289,20 +269,6 @@ func (m *DNShardRecord) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.End) > 0 {
i -= len(m.End)
copy(dAtA[i:], m.End)
i = encodeVarintMetadata(dAtA, i, uint64(len(m.End)))
i--
dAtA[i] = 0x22
}
if len(m.Start) > 0 {
i -= len(m.Start)
copy(dAtA[i:], m.Start)
i = encodeVarintMetadata(dAtA, i, uint64(len(m.Start)))
i--
dAtA[i] = 0x1a
}
if m.LogShardID != 0 {
i = encodeVarintMetadata(dAtA, i, uint64(m.LogShardID))
i--
......@@ -432,14 +398,6 @@ func (m *DNShardRecord) Size() (n int) {
if m.LogShardID != 0 {
n += 1 + sovMetadata(uint64(m.LogShardID))
}
l = len(m.Start)
if l > 0 {
n += 1 + l + sovMetadata(uint64(l))
}
l = len(m.End)
if l > 0 {
n += 1 + l + sovMetadata(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
......@@ -562,74 +520,6 @@ func (m *DNShardRecord) Unmarshal(dAtA []byte) error {
break
}
}
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Start", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetadata
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthMetadata
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthMetadata
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Start = append(m.Start[:0], dAtA[iNdEx:postIndex]...)
if m.Start == nil {
m.Start = []byte{}
}
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field End", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetadata
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthMetadata
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthMetadata
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.End = append(m.End[:0], dAtA[iNdEx:postIndex]...)
if m.End == nil {
m.End = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipMetadata(dAtA[iNdEx:])
......
......@@ -179,120 +179,132 @@ message LogRecordResponse {
// HAKeeper related pb
//
enum HAKeeperState {
HAKeeperCreated = 0;
HAKeeperBootstrapping = 1;
HAKeeperBootstrapFailed = 2;
HAKeeperRunning = 3;
}
// Replica of the shard
message Replica {
// UUID which store the Replica is located in
string UUID = 1;
// UUID which store the Replica is located in
string UUID = 1;
uint64 ShardID = 2;
uint64 ReplicaID = 3;
uint64 Epoch = 4;
uint64 ShardID = 2;
uint64 ReplicaID = 3;
uint64 Epoch = 4;
}
// ConfigChangeType indicates config change command type.
enum ConfigChangeType {
AddReplica = 0;
RemoveReplica = 1;
StartReplica = 2;
StopReplica = 3;
AddReplica = 0;
RemoveReplica = 1;
StartReplica = 2;
StopReplica = 3;
}
// ConfigChange is the detail of a config change.
message ConfigChange {
Replica Replica = 1 [(gogoproto.nullable) = false];
ConfigChangeType ChangeType = 2;
Replica Replica = 1 [(gogoproto.nullable) = false];
ConfigChangeType ChangeType = 2;
// only used for bootstrap
map<uint64, string> InitialMembers = 3;
// only used for bootstrap
map<uint64, string> InitialMembers = 3;
}
// ShutdownStore would stop store.
message ShutdownStore {
string StoreID = 1;
string StoreID = 1;
}
// ServiceType specifies type of service
enum ServiceType {
LogService = 0;
DnService = 1;
LogService = 0;
DnService = 1;
}
// ScheduleCommand contains a shard schedule command.
message ScheduleCommand {
// UUID which store the ScheduleCommand is sent to
string UUID = 1;
// UUID which store the ScheduleCommand is sent to
string UUID = 1;
ConfigChange ConfigChange = 2;
ServiceType ServiceType = 3;
ShutdownStore ShutdownStore = 4;
ConfigChange ConfigChange = 2;
ServiceType ServiceType = 3;
ShutdownStore ShutdownStore = 4;
}
message CommandBatch {
uint64 Term = 1;
repeated ScheduleCommand Commands = 2 [(gogoproto.nullable) = false];
uint64 Term = 1;
repeated ScheduleCommand Commands = 2 [(gogoproto.nullable) = false];
}
// DNStoreInfo contins information on a list of shards.
message DNStoreInfo {
uint64 Tick = 1;
repeated DNShardInfo Shards = 2 [(gogoproto.nullable) = false];
uint64 Tick = 1;
repeated DNShardInfo Shards = 2 [(gogoproto.nullable) = false];
}
// DNState contains all DN details known to the HAKeeper.
message DNState {
// Stores is keyed by DN store UUID, it contains details found on each DN
// store. Each DNStoreInfo reflects what was last reported by each DN store.
map<string, DNStoreInfo> Stores = 1 [(gogoproto.nullable) = false];
// Stores is keyed by DN store UUID, it contains details found on each DN
// store. Each DNStoreInfo reflects what was last reported by each DN store.
map<string, DNStoreInfo> Stores = 1 [(gogoproto.nullable) = false];
}
// ClusterInfo provides a global view of all shards in the cluster. It
// describes the logical sharding of the system, rather than physical
// distribution of all replicas that belong to those shards.
message ClusterInfo {
repeated metadata.DNShardRecord DNShards = 1 [(gogoproto.nullable) = false];
repeated metadata.LogShardRecord LogShards = 2
[(gogoproto.nullable) = false];
repeated metadata.DNShardRecord DNShards = 1 [(gogoproto.nullable) = false];
repeated metadata.LogShardRecord LogShards = 2 [(gogoproto.nullable) = false];
}
message InitialClusterRequest {
uint64 NumOfLogShards = 1;
uint64 NumOfDNShards = 2;
uint64 NumOfLogReplicas = 3;
}
// LogStoreInfo contains information of all replicas found on a Log store.
message LogStoreInfo {
uint64 Tick = 1;
string RaftAddress = 2;
string ServiceAddress = 3;
string GossipAddress = 4;
repeated LogReplicaInfo Replicas = 5 [(gogoproto.nullable) = false];
uint64 Tick = 1;
string RaftAddress = 2;
string ServiceAddress = 3;
string GossipAddress = 4;
repeated LogReplicaInfo Replicas = 5 [(gogoproto.nullable) = false];
}
message LogState {
// Shards is keyed by ShardID, it contains details aggregated from all Log
// stores. Each pb.LogShardInfo here contains data aggregated from
// different replicas and thus reflect a more accurate description on each
// shard.
map<uint64, LogShardInfo> Shards = 1 [(gogoproto.nullable) = false];
// Stores is keyed by log store UUID, it contains details found on each
// store. Each LogStoreInfo here reflects what was last reported by each Log
// store.
map<string, LogStoreInfo> Stores = 2 [(gogoproto.nullable) = false];
// Shards is keyed by ShardID, it contains details aggregated from all Log
// stores. Each pb.LogShardInfo here contains data aggregated from
// different replicas and thus reflect a more accurate description on each
// shard.
map<uint64, LogShardInfo> Shards = 1 [(gogoproto.nullable) = false];
// Stores is keyed by log store UUID, it contains details found on each
// store. Each LogStoreInfo here reflects what was last reported by each Log
// store.
map<string, LogStoreInfo> Stores = 2 [(gogoproto.nullable) = false];
}
// HAKeeperState contains all HAKeeper state required for making schedule
// CheckerState contains all HAKeeper state required for making schedule
// commands.
message HAKeeperState {
uint64 Tick = 1;
ClusterInfo ClusterInfo = 2 [(gogoproto.nullable) = false];
DNState DNState = 3 [(gogoproto.nullable) = false];
LogState LogState = 4 [(gogoproto.nullable) = false];
message CheckerState {
uint64 Tick = 1;
ClusterInfo ClusterInfo = 2 [(gogoproto.nullable) = false];
DNState DNState = 3 [(gogoproto.nullable) = false];
LogState LogState = 4 [(gogoproto.nullable) = false];
}
// HAKeeperRSMState contains state maintained by HAKeeper's RSM.
message HAKeeperRSMState {
uint64 Tick = 1;
uint64 NextID = 2;
uint64 Term = 3;
map<string, CommandBatch> ScheduleCommands = 4 [(gogoproto.nullable) = false];
map<string, uint64> LogShards = 5;
DNState DNState = 6 [(gogoproto.nullable) = false];
LogState LogState = 7 [(gogoproto.nullable) = false];
ClusterInfo ClusterInfo = 8 [(gogoproto.nullable) = false];
uint64 Tick = 1;
uint64 NextID = 2;
uint64 Term = 3;
HAKeeperState State = 4;
map<string, CommandBatch> ScheduleCommands = 5 [(gogoproto.nullable) = false];
map<string, uint64> LogShards = 6;
DNState DNState = 7 [(gogoproto.nullable) = false];
LogState LogState = 8 [(gogoproto.nullable) = false];
ClusterInfo ClusterInfo = 9 [(gogoproto.nullable) = false];
}
......@@ -25,10 +25,6 @@ message DNShardRecord {
uint64 ShardID = 1;
// LogShardID a DN corresponds to a unique Shard of LogService.
uint64 LogShardID = 2;
// Start A DN manages data within a range [Start, end)
bytes Start = 3;
// End A DN manages data within a range [Start, end)
bytes End = 4;
};
// DNShard
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment