diff --git a/docs/developer_guides/chap04_message_stream.md b/docs/developer_guides/chap04_message_stream.md index 2983afbf7b8ffff5489617ef3b03c8ae8051fa13..5f4312ad103bb5d8d02d0b66b98fd431e79bb079 100644 --- a/docs/developer_guides/chap04_message_stream.md +++ b/docs/developer_guides/chap04_message_stream.md @@ -74,6 +74,8 @@ type DescribeChannelResponse struct { * Interface ``` go +// Msg + type MsgType uint32 const { kInsert MsgType = 400 @@ -88,12 +90,12 @@ const { } type TsMsg interface { - SetTs(ts Timestamp) - BeginTs() Timestamp - EndTs() Timestamp - Type() MsgType - Marshal(*TsMsg) []byte - Unmarshal([]byte) *TsMsg + SetTs(ts Timestamp) + BeginTs() Timestamp + EndTs() Timestamp + Type() MsgType + Marshal(*TsMsg) interface{} + Unmarshal(interface{}) *TsMsg } type MsgPosition { @@ -110,15 +112,71 @@ type MsgPack struct { EndPositions []MsgPosition } +type RepackFunc(msgs []* TsMsg, hashKeys [][]int32) map[int32] *MsgPack +``` + + + +```go +// Unmarshal + +// Interface +type UnmarshalFunc func(interface{}) *TsMsg + +type UnmarshalDispatcher interface { + Unmarshal(interface{}, msgType commonpb.MsgType) (msgstream.TsMsg, error) +} + +type UnmarshalDispatcherFactory interface { + NewUnmarshalDispatcher() *UnmarshalDispatcher +} + +// Proto & Mem Implementation +type ProtoUDFactory struct {} +func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *UnmarshalDispatcher + +type MemUDFactory struct {} +func (mudf *MemUDFactory) NewUnmarshalDispatcher() *UnmarshalDispatcher +``` + + + + +```go +// MsgStream + +// Interface type MsgStream interface { - Produce(*MsgPack) error - Broadcast(*MsgPack) error - Consume() *MsgPack // message can be consumed exactly once - ShowChannelNames() []string - Seek(offset MsgPosition) error + Start() + Close() + AsProducer(channels []string) + AsConsumer(channels []string, subName string) + Produce(*MsgPack) error + Broadcast(*MsgPack) error + Consume() *MsgPack // message can be consumed exactly once + Seek(mp *MsgPosition) error } -type RepackFunc(msgs []* TsMsg, hashKeys [][]int32) map[int32] *MsgPack +type MsgStreamFactory interface { + NewMsgStream() *MsgStream + NewTtMsgStream() *MsgStream +} + +// Pulsar +type PulsarMsgStreamFactory interface {} +func (pmsf *PulsarMsgStreamFactory) NewMsgStream() *MsgStream +func (pmsf *PulsarMsgStreamFactory) NewTtMsgStream() *MsgStream + +// RockMQ +type RmqMsgStreamFactory interface {} +func (rmsf *RmqMsgStreamFactory) NewMsgStream() *MsgStream +func (rmsf *RmqMsgStreamFactory) NewTtMsgStream() *MsgStream +``` + + + +```go +// PulsarMsgStream type PulsarMsgStream struct { client *pulsar.Client @@ -128,16 +186,17 @@ type PulsarMsgStream struct { unmarshal *UnmarshalDispatcher } -func (ms *PulsarMsgStream) CreatePulsarProducers(topics []string) -func (ms *PulsarMsgStream) CreatePulsarConsumers(subname string, topics []string, unmarshal *UnmarshalDispatcher) -func (ms *PulsarMsgStream) SetRepackFunc(repackFunc RepackFunc) +func (ms *PulsarMsgStream) Start() error +func (ms *PulsarMsgStream) Close() error +func (ms *PulsarMsgStream) AsProducer(channels []string) +func (ms *PulsarMsgStream) AsConsumer(channels []string, subName string) func (ms *PulsarMsgStream) Produce(msgs *MsgPack) error func (ms *PulsarMsgStream) Broadcast(msgs *MsgPack) error func (ms *PulsarMsgStream) Consume() (*MsgPack, error) -func (ms *PulsarMsgStream) Start() error -func (ms *PulsarMsgStream) Close() error +func (ms *PulsarMsgStream) Seek(mp *MsgPosition) error +func (ms *PulsarMsgStream) SetRepackFunc(repackFunc RepackFunc) -func NewPulsarMsgStream(ctx context.Context, pulsarAddr string) *PulsarMsgStream +func NewPulsarMsgStream(ctx context.Context, pulsarAddr string, bufferSize int64) *PulsarMsgStream type PulsarTtMsgStream struct { @@ -151,38 +210,67 @@ type PulsarTtMsgStream struct { msgPacks []*MsgPack } -func (ms *PulsarTtMsgStream) CreatePulsarProducers(topics []string) -func (ms *PulsarTtMsgStream) CreatePulsarConsumers(subname string, topics []string, unmarshal *UnmarshalDispatcher) -func (ms *PulsarTtMsgStream) SetRepackFunc(repackFunc RepackFunc) +func (ms *PulsarTtMsgStream) Start() error +func (ms *PulsarTtMsgStream) Close() error +func (ms *PulsarTtMsgStream) AsProducer(channels []string) +func (ms *PulsarTtMsgStream) AsConsumer(channels []string, subName string) func (ms *PulsarTtMsgStream) Produce(msgs *MsgPack) error func (ms *PulsarTtMsgStream) Broadcast(msgs *MsgPack) error func (ms *PulsarTtMsgStream) Consume() *MsgPack //return messages in one time tick -func (ms *PulsarTtMsgStream) Start() error -func (ms *PulsarTtMsgStream) Close() error - -func NewPulsarTtMsgStream(ctx context.Context, pulsarAddr string) *PulsarTtMsgStream -``` - - +func (ms *PulsarTtMsgStream) Seek(mp *MsgPosition) error +func (ms *PulsarTtMsgStream) SetRepackFunc(repackFunc RepackFunc) -```go -type MarshalFunc func(*TsMsg) []byte -type UnmarshalFunc func([]byte) *TsMsg +func NewPulsarTtMsgStream(ctx context.Context, pulsarAddr string, bufferSize int64) *PulsarTtMsgStream +// RmqMsgStream -type UnmarshalDispatcher struct { - tempMap map[ReqType]UnmarshalFunc +type RmqMsgStream struct { + client *rockermq.RocksMQ + repackFunc RepackFunc + producers []string + consumers []string + subName string + unmarshal *UnmarshalDispatcher } -func (dispatcher *MarshalDispatcher) Unmarshal([]byte) *TsMsg -func (dispatcher *MarshalDispatcher) AddMsgTemplate(msgType MsgType, marshal MarshalFunc) -func (dispatcher *MarshalDispatcher) addDefaultMsgTemplates() +func (ms *RmqMsgStream) Start() error +func (ms *RmqMsgStream) Close() error +func (ms *RmqMsgStream) AsProducer(channels []string) +func (ms *RmqMsgStream) AsConsumer(channels []string, subName string) +func (ms *RmqMsgStream) Produce(msgs *MsgPack) error +func (ms *RmqMsgStream) Broadcast(msgs *MsgPack) error +func (ms *RmqMsgStream) Consume() (*MsgPack, error) +func (ms *RmqMsgStream) Seek(mp *MsgPosition) error +func (ms *RmqMsgStream) SetRepackFunc(repackFunc RepackFunc) + +func NewRmqMsgStream(ctx context.Context) *RmqMsgStream + +type RmqTtMsgStream struct { + client *rockermq.RocksMQ + repackFunc RepackFunc + producers []string + consumers []string + subName string + unmarshal *UnmarshalDispatcher +} -func NewUnmarshalDispatcher() *UnmarshalDispatcher +func (ms *RmqTtMsgStream) Start() error +func (ms *RmqTtMsgStream) Close() error +func (ms *RmqTtMsgStream) AsProducer(channels []string) +func (ms *RmqTtMsgStream) AsConsumer(channels []string, subName string) +func (ms *RmqTtMsgStream) Produce(msgs *MsgPack) error +func (ms *RmqTtMsgStream) Broadcast(msgs *MsgPack) error +func (ms *RmqTtMsgStream) Consume() (*MsgPack, error) +func (ms *RmqTtMsgStream) Seek(mp *MsgPosition) error +func (ms *RmqTtMsgStream) SetRepackFunc(repackFunc RepackFunc) + +func NewRmqTtMsgStream(ctx context.Context) *RmqTtMsgStream ``` + + #### A.4 RocksMQ RocksMQ is a RocksDB-based messaging/streaming library. diff --git a/internal/proxynode/insert_channels.go b/internal/proxynode/insert_channels.go index 73465f9928a5705d036e1fdfc12b64cd39989a20..c480319d1ed4be268abaaab147769db2d826623b 100644 --- a/internal/proxynode/insert_channels.go +++ b/internal/proxynode/insert_channels.go @@ -75,7 +75,6 @@ type InsertChannelsMap struct { insertChannels [][]string // it's a little confusing to use []string as the key of map insertMsgStreams []msgstream.MsgStream // maybe there's a better way to implement Set, just agilely now droppedBitMap []int // 0 -> normal, 1 -> dropped - usageHistogram []int // message stream can be closed only when the use count is zero mtx sync.RWMutex nodeInstance *NodeImpl } @@ -94,7 +93,6 @@ func (m *InsertChannelsMap) createInsertMsgStream(collID UniqueID, channels []st for loc, existedChannels := range m.insertChannels { if m.droppedBitMap[loc] == 0 && SortedSliceEqual(existedChannels, channels) { m.collectionID2InsertChannels[collID] = loc - m.usageHistogram[loc]++ return nil } } @@ -110,7 +108,6 @@ func (m *InsertChannelsMap) createInsertMsgStream(collID UniqueID, channels []st stream.Start() m.insertMsgStreams = append(m.insertMsgStreams, stream) m.droppedBitMap = append(m.droppedBitMap, 0) - m.usageHistogram = append(m.usageHistogram, 1) return nil } @@ -126,14 +123,7 @@ func (m *InsertChannelsMap) closeInsertMsgStream(collID UniqueID) error { if m.droppedBitMap[loc] != 0 { return errors.New("insert message stream already closed") } - if m.usageHistogram[loc] <= 0 { - return errors.New("insert message stream already closed") - } - - m.usageHistogram[loc]-- - if m.usageHistogram[loc] <= 0 { - m.insertMsgStreams[loc].Close() - } + m.insertMsgStreams[loc].Close() log.Print("close insert message stream ...") m.droppedBitMap[loc] = 1 @@ -174,28 +164,11 @@ func (m *InsertChannelsMap) getInsertMsgStream(collID UniqueID) (msgstream.MsgSt return m.insertMsgStreams[loc], nil } -func (m *InsertChannelsMap) closeAllMsgStream() { - m.mtx.Lock() - defer m.mtx.Unlock() - - for _, stream := range m.insertMsgStreams { - stream.Close() - } - - m.collectionID2InsertChannels = make(map[UniqueID]int) - m.insertChannels = make([][]string, 0) - m.insertMsgStreams = make([]msgstream.MsgStream, 0) - m.droppedBitMap = make([]int, 0) - m.usageHistogram = make([]int, 0) -} - func newInsertChannelsMap(node *NodeImpl) *InsertChannelsMap { return &InsertChannelsMap{ collectionID2InsertChannels: make(map[UniqueID]int), insertChannels: make([][]string, 0), insertMsgStreams: make([]msgstream.MsgStream, 0), - droppedBitMap: make([]int, 0), - usageHistogram: make([]int, 0), nodeInstance: node, } } diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go index 62b06b79db43c1dff6482acb84bdce6909eaa4d5..a871558c55266c618d896122166dba7fa53913bb 100644 --- a/internal/proxynode/proxy_node.go +++ b/internal/proxynode/proxy_node.go @@ -285,7 +285,6 @@ func (node *NodeImpl) Start() error { func (node *NodeImpl) Stop() error { node.cancel() - globalInsertChannelsMap.closeAllMsgStream() node.tsoAllocator.Close() node.idAllocator.Close() node.segAssigner.Close()