From b911c7e3510a2ee02752e39c435402af7226d55a Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Sun, 18 Aug 2019 14:56:16 +0400 Subject: [PATCH] feat: Move event storage to BuntDB --- go.mod | 6 +++ go.sum | 16 ++++++ internal/backends/memory/backend.go | 80 +++++++++++++++++++++++------ 3 files changed, 87 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index 88646cd..786e2d5 100644 --- a/go.mod +++ b/go.mod @@ -5,5 +5,11 @@ go 1.12 require ( github.com/gorilla/mux v1.7.3 github.com/stretchr/testify v1.3.0 + github.com/tidwall/btree v0.0.0-20170113224114-9876f1454cf0 // indirect + github.com/tidwall/buntdb v1.1.0 + github.com/tidwall/gjson v1.3.2 // indirect + github.com/tidwall/grect v0.0.0-20161006141115-ba9a043346eb // indirect + github.com/tidwall/rtree v0.0.0-20180113144539-6cd427091e0e // indirect + github.com/tidwall/tinyqueue v0.0.0-20180302190814-1e39f5511563 // indirect github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30 ) diff --git a/go.sum b/go.sum index ad0aca6..d2462dc 100644 --- a/go.sum +++ b/go.sum @@ -7,5 +7,21 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/tidwall/btree v0.0.0-20170113224114-9876f1454cf0 h1:QnyrPZZvPmR0AtJCxxfCtI1qN+fYpKTKJ/5opWmZ34k= +github.com/tidwall/btree v0.0.0-20170113224114-9876f1454cf0/go.mod h1:huei1BkDWJ3/sLXmO+bsCNELL+Bp2Kks9OLyQFkzvA8= +github.com/tidwall/buntdb v1.1.0 h1:H6LzK59KiNjf1nHVPFrYj4Qnl8d8YLBsYamdL8N+Bao= +github.com/tidwall/buntdb v1.1.0/go.mod h1:Y39xhcDW10WlyYXeLgGftXVbjtM0QP+/kpz8xl9cbzE= +github.com/tidwall/gjson v1.3.2 h1:+7p3qQFaH3fOMXAJSrdZwGKcOO/lYdGS0HqGhPqDdTI= +github.com/tidwall/gjson v1.3.2/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls= +github.com/tidwall/grect v0.0.0-20161006141115-ba9a043346eb h1:5NSYaAdrnblKByzd7XByQEJVT8+9v0W/tIY0Oo4OwrE= +github.com/tidwall/grect v0.0.0-20161006141115-ba9a043346eb/go.mod h1:lKYYLFIr9OIgdgrtgkZ9zgRxRdvPYsExnYBsEAd8W5M= +github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc= +github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= +github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= +github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/tidwall/rtree v0.0.0-20180113144539-6cd427091e0e h1:+NL1GDIUOKxVfbp2KoJQD9cTQ6dyP2co9q4yzmT9FZo= +github.com/tidwall/rtree v0.0.0-20180113144539-6cd427091e0e/go.mod h1:/h+UnNGt0IhNNJLkGikcdcJqm66zGD/uJGMRxK/9+Ao= +github.com/tidwall/tinyqueue v0.0.0-20180302190814-1e39f5511563 h1:Otn9S136ELckZ3KKDyCkxapfufrqDqwmGjcHfAyXRrE= +github.com/tidwall/tinyqueue v0.0.0-20180302190814-1e39f5511563/go.mod h1:mLqSmt7Dv/CNneF2wfcChfN1rvapyQr01LGKnKex0DQ= github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30 h1:kZiWylALnUy4kzoKJemjH8eqwCl3RjW1r1ITCjjW7G8= github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE= diff --git a/internal/backends/memory/backend.go b/internal/backends/memory/backend.go index 1f92103..82a6f4c 100644 --- a/internal/backends/memory/backend.go +++ b/internal/backends/memory/backend.go @@ -1,6 +1,7 @@ package memory import ( + "encoding/json" "fmt" "regexp" "sort" @@ -13,13 +14,14 @@ import ( "github.com/signaller-matrix/signaller/internal/models/common" "github.com/signaller-matrix/signaller/internal/models/createroom" "github.com/signaller-matrix/signaller/internal/models/events" + "github.com/tidwall/buntdb" "github.com/wangjia184/sortedset" ) type Backend struct { data map[string]internal.User rooms map[string]internal.Room - events *sortedset.SortedSet + events *buntdb.DB roomAliases map[string]internal.Room hostname string validateUsernameFunc func(string) error // TODO: create ability to redefine validation func @@ -31,12 +33,18 @@ type Token struct { } func NewBackend(hostname string) *Backend { + eventDB, err := buntdb.Open(":memory:") + if err != nil { + panic(err) + } + eventDB.CreateIndex("origin_server_ts", "*", buntdb.IndexJSON("origin_server_ts")) + eventDB.CreateIndex("room_id", "*", buntdb.IndexJSON("room_id")) return &Backend{ hostname: hostname, validateUsernameFunc: defaultValidationUsernameFunc, rooms: make(map[string]internal.Room), roomAliases: make(map[string]internal.Room), - events: sortedset.New(), + events: eventDB, data: make(map[string]internal.User)} } @@ -181,34 +189,76 @@ func (backend *Backend) GetEventByID(id string) events.Event { backend.mutex.RLock() defer backend.mutex.RUnlock() - return backend.events.GetByKey(id).Value.(events.Event) + var event events.Event + + backend.events.View(func(tx *buntdb.Tx) error { + val, err := tx.Get(id, true) + if err == nil { + json.Unmarshal([]byte(val), event) + } + return nil + }) + return event } func (backend *Backend) PutEvent(event events.Event) error { backend.mutex.Lock() defer backend.mutex.Unlock() - backend.events.AddOrUpdate(event.ID(), sortedset.SCORE(time.Now().Unix()), event) + marshalledEvent, err := json.Marshal(event) + if err != nil { + return err + } + + err = backend.events.Update(func(tx *buntdb.Tx) error { + _, _, err := tx.Set(event.ID(), string(marshalledEvent), nil) + return err + }) + + if err != nil { + return err + } return nil } func (backend *Backend) GetEventsSince(user internal.User, sinceToken string, limit int) []events.Event { - sinceEventNode := backend.events.GetByKey(sinceToken) - sEvents := backend.events.GetByScoreRange(sinceEventNode.Score(), -1, &sortedset.GetByScoreRangeOptions{ - Limit: limit, - }) + if sinceToken != "" { + var sinceEvent events.Event + var eventSlice []events.Event + err := backend.events.View(func(tx *buntdb.Tx) error { + // handler error + val, err := tx.Get(sinceToken, true) + if err == nil { + json.Unmarshal([]byte(val), sinceEvent) + } + sinceRoomEvent := sinceEvent.(events.RoomEvent) + tx.AscendRange("origin_server_ts", `{"origin_server_ts": `+string(sinceRoomEvent.OriginServerTs)+`}`, `{"origin_server_ts": `+string(time.Now().Unix())+`}`, func(key, value string) bool { + var unmarshalledEvent events.Event + json.Unmarshal([]byte(value), unmarshalledEvent) + eventSlice = append(eventSlice, unmarshalledEvent) + return true + }) + return err + }) - eventsSlice := extractEventsFromNodes(sEvents) - - var returnEvents []events.Event - for _, event := range eventsSlice { - if isEventRelatedToUser(event, user) { - returnEvents = append(returnEvents, event) + if err != nil { + panic(err) } + + var returnEvents []events.Event + if eventSlice != nil { + for _, event := range eventSlice { + if isEventRelatedToUser(event, user) { + returnEvents = append(returnEvents, event) + } + } + } + + return returnEvents } - return returnEvents + return nil } func extractEventsFromNodes(nodes []*sortedset.SortedSetNode) []events.Event {