diff --git a/docs/developer_guides/chap04_message_stream.md b/docs/developer_guides/chap04_message_stream.md index 3aa66a0c4fdcdc650883616fadaa9b3204f8e084..2983afbf7b8ffff5489617ef3b03c8ae8051fa13 100644 --- a/docs/developer_guides/chap04_message_stream.md +++ b/docs/developer_guides/chap04_message_stream.md @@ -187,64 +187,55 @@ func NewUnmarshalDispatcher() *UnmarshalDispatcher RocksMQ is a RocksDB-based messaging/streaming library. -```go +```GO +// All the following UniqueIDs are 64-bit integer, which is combined with timestamp and increasing number + type ProducerMessage struct { payload []byte } -``` - - -```go type ConsumerMessage struct { - msgID MessageID + msgID UniqueID payload []byte } -``` - - -```GO -type Channel struct { - beginOffset MessageID - endOffset MessageID -} - -type ConsumerGroupContext struct { - currentOffset MessageID +type IDAllocator interface { + Alloc(count uint32) (UniqueID, UniqueID, error) + AllocOne() (UniqueID, error) + UpdateID() error } // Every collection has its RocksMQ type RocksMQ struct { - channels map[string]Channel - cgCtxs map[string]ConsumerGroupContext - mu sync.Mutex -} - -func (rmq *RocksMQ) CreateChannel(channelName string) error // create channel, add record in meta-store -func (rmq *RocksMQ) DestroyChannel(channelName string) error // drop channel, delete record in meta-store -func (rmq *RocksMQ) CreateConsumerGroup(groupName string) error // create consumer group, add record in meta-store -func (rmq *RocksMQ) DestroyConsumerGroup(groupName string) error // drop consumer group, delete record in meta-store -func (rmq *RocksMQ) Produce(channelName string, messages []ProducerMessage) error // produce a batch of message, insert into rocksdb -func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]ConsumerMessage, error) // comsume up to n messages, modify current_id in Etcd -func (rmq *RocksMQ) Seek(groupName string, channelName string, msgID MessageID) error // modify current_id in Etcd + store *gorocksdb.DB + kv kv.Base + idAllocator IDAllocator + produceMu sync.Mutex + consumeMu sync.Mutex +} + +func (rmq *RocksMQ) CreateChannel(channelName string) error +func (rmq *RocksMQ) DestroyChannel(channelName string) error +func (rmq *RocksMQ) CreateConsumerGroup(groupName string) error +func (rmq *RocksMQ) DestroyConsumerGroup(groupName string) error +func (rmq *RocksMQ) Produce(channelName string, messages []ProducerMessage) error +func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]ConsumerMessage, error) +func (rmq *RocksMQ) Seek(groupName string, channelName string, msgID MessageID) error + +func NewRocksMQ(name string, idAllocator IDAllocator) (*RocksMQ, error) ``` ##### A.4.1 Meta (stored in Etcd) -* channel meta - ```go -"$(channel_name)/begin_id", MessageID -"$(channel_name)/end_id", MessageID -``` +// channel meta +"$(channel_name)/begin_id", UniqueID +"$(channel_name)/end_id", UniqueID -* consumer group meta - -```go -"$(group_name)/$(channel_name)/current_id", MessageID +// consumer group meta +"$(group_name)/$(channel_name)/current_id", UniqueID ```