mirror of
https://github.com/signaller-matrix/signaller.git
synced 2024-11-22 02:12:20 +00:00
feat: Move event storage to BuntDB
This commit is contained in:
parent
e009c054cb
commit
b911c7e351
6
go.mod
6
go.mod
@ -5,5 +5,11 @@ go 1.12
|
|||||||
require (
|
require (
|
||||||
github.com/gorilla/mux v1.7.3
|
github.com/gorilla/mux v1.7.3
|
||||||
github.com/stretchr/testify v1.3.0
|
github.com/stretchr/testify v1.3.0
|
||||||
|
github.com/tidwall/btree v0.0.0-20170113224114-9876f1454cf0 // indirect
|
||||||
|
github.com/tidwall/buntdb v1.1.0
|
||||||
|
github.com/tidwall/gjson v1.3.2 // indirect
|
||||||
|
github.com/tidwall/grect v0.0.0-20161006141115-ba9a043346eb // indirect
|
||||||
|
github.com/tidwall/rtree v0.0.0-20180113144539-6cd427091e0e // indirect
|
||||||
|
github.com/tidwall/tinyqueue v0.0.0-20180302190814-1e39f5511563 // indirect
|
||||||
github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30
|
github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30
|
||||||
)
|
)
|
||||||
|
16
go.sum
16
go.sum
@ -7,5 +7,21 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
|
|||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
|
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
|
||||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||||
|
github.com/tidwall/btree v0.0.0-20170113224114-9876f1454cf0 h1:QnyrPZZvPmR0AtJCxxfCtI1qN+fYpKTKJ/5opWmZ34k=
|
||||||
|
github.com/tidwall/btree v0.0.0-20170113224114-9876f1454cf0/go.mod h1:huei1BkDWJ3/sLXmO+bsCNELL+Bp2Kks9OLyQFkzvA8=
|
||||||
|
github.com/tidwall/buntdb v1.1.0 h1:H6LzK59KiNjf1nHVPFrYj4Qnl8d8YLBsYamdL8N+Bao=
|
||||||
|
github.com/tidwall/buntdb v1.1.0/go.mod h1:Y39xhcDW10WlyYXeLgGftXVbjtM0QP+/kpz8xl9cbzE=
|
||||||
|
github.com/tidwall/gjson v1.3.2 h1:+7p3qQFaH3fOMXAJSrdZwGKcOO/lYdGS0HqGhPqDdTI=
|
||||||
|
github.com/tidwall/gjson v1.3.2/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls=
|
||||||
|
github.com/tidwall/grect v0.0.0-20161006141115-ba9a043346eb h1:5NSYaAdrnblKByzd7XByQEJVT8+9v0W/tIY0Oo4OwrE=
|
||||||
|
github.com/tidwall/grect v0.0.0-20161006141115-ba9a043346eb/go.mod h1:lKYYLFIr9OIgdgrtgkZ9zgRxRdvPYsExnYBsEAd8W5M=
|
||||||
|
github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc=
|
||||||
|
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
|
||||||
|
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
|
||||||
|
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
|
||||||
|
github.com/tidwall/rtree v0.0.0-20180113144539-6cd427091e0e h1:+NL1GDIUOKxVfbp2KoJQD9cTQ6dyP2co9q4yzmT9FZo=
|
||||||
|
github.com/tidwall/rtree v0.0.0-20180113144539-6cd427091e0e/go.mod h1:/h+UnNGt0IhNNJLkGikcdcJqm66zGD/uJGMRxK/9+Ao=
|
||||||
|
github.com/tidwall/tinyqueue v0.0.0-20180302190814-1e39f5511563 h1:Otn9S136ELckZ3KKDyCkxapfufrqDqwmGjcHfAyXRrE=
|
||||||
|
github.com/tidwall/tinyqueue v0.0.0-20180302190814-1e39f5511563/go.mod h1:mLqSmt7Dv/CNneF2wfcChfN1rvapyQr01LGKnKex0DQ=
|
||||||
github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30 h1:kZiWylALnUy4kzoKJemjH8eqwCl3RjW1r1ITCjjW7G8=
|
github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30 h1:kZiWylALnUy4kzoKJemjH8eqwCl3RjW1r1ITCjjW7G8=
|
||||||
github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE=
|
github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE=
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package memory
|
package memory
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"regexp"
|
"regexp"
|
||||||
"sort"
|
"sort"
|
||||||
@ -13,13 +14,14 @@ import (
|
|||||||
"github.com/signaller-matrix/signaller/internal/models/common"
|
"github.com/signaller-matrix/signaller/internal/models/common"
|
||||||
"github.com/signaller-matrix/signaller/internal/models/createroom"
|
"github.com/signaller-matrix/signaller/internal/models/createroom"
|
||||||
"github.com/signaller-matrix/signaller/internal/models/events"
|
"github.com/signaller-matrix/signaller/internal/models/events"
|
||||||
|
"github.com/tidwall/buntdb"
|
||||||
"github.com/wangjia184/sortedset"
|
"github.com/wangjia184/sortedset"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Backend struct {
|
type Backend struct {
|
||||||
data map[string]internal.User
|
data map[string]internal.User
|
||||||
rooms map[string]internal.Room
|
rooms map[string]internal.Room
|
||||||
events *sortedset.SortedSet
|
events *buntdb.DB
|
||||||
roomAliases map[string]internal.Room
|
roomAliases map[string]internal.Room
|
||||||
hostname string
|
hostname string
|
||||||
validateUsernameFunc func(string) error // TODO: create ability to redefine validation func
|
validateUsernameFunc func(string) error // TODO: create ability to redefine validation func
|
||||||
@ -31,12 +33,18 @@ type Token struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewBackend(hostname string) *Backend {
|
func NewBackend(hostname string) *Backend {
|
||||||
|
eventDB, err := buntdb.Open(":memory:")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
eventDB.CreateIndex("origin_server_ts", "*", buntdb.IndexJSON("origin_server_ts"))
|
||||||
|
eventDB.CreateIndex("room_id", "*", buntdb.IndexJSON("room_id"))
|
||||||
return &Backend{
|
return &Backend{
|
||||||
hostname: hostname,
|
hostname: hostname,
|
||||||
validateUsernameFunc: defaultValidationUsernameFunc,
|
validateUsernameFunc: defaultValidationUsernameFunc,
|
||||||
rooms: make(map[string]internal.Room),
|
rooms: make(map[string]internal.Room),
|
||||||
roomAliases: make(map[string]internal.Room),
|
roomAliases: make(map[string]internal.Room),
|
||||||
events: sortedset.New(),
|
events: eventDB,
|
||||||
data: make(map[string]internal.User)}
|
data: make(map[string]internal.User)}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -181,36 +189,78 @@ func (backend *Backend) GetEventByID(id string) events.Event {
|
|||||||
backend.mutex.RLock()
|
backend.mutex.RLock()
|
||||||
defer backend.mutex.RUnlock()
|
defer backend.mutex.RUnlock()
|
||||||
|
|
||||||
return backend.events.GetByKey(id).Value.(events.Event)
|
var event events.Event
|
||||||
|
|
||||||
|
backend.events.View(func(tx *buntdb.Tx) error {
|
||||||
|
val, err := tx.Get(id, true)
|
||||||
|
if err == nil {
|
||||||
|
json.Unmarshal([]byte(val), event)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return event
|
||||||
}
|
}
|
||||||
|
|
||||||
func (backend *Backend) PutEvent(event events.Event) error {
|
func (backend *Backend) PutEvent(event events.Event) error {
|
||||||
backend.mutex.Lock()
|
backend.mutex.Lock()
|
||||||
defer backend.mutex.Unlock()
|
defer backend.mutex.Unlock()
|
||||||
|
|
||||||
backend.events.AddOrUpdate(event.ID(), sortedset.SCORE(time.Now().Unix()), event)
|
marshalledEvent, err := json.Marshal(event)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = backend.events.Update(func(tx *buntdb.Tx) error {
|
||||||
|
_, _, err := tx.Set(event.ID(), string(marshalledEvent), nil)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (backend *Backend) GetEventsSince(user internal.User, sinceToken string, limit int) []events.Event {
|
func (backend *Backend) GetEventsSince(user internal.User, sinceToken string, limit int) []events.Event {
|
||||||
sinceEventNode := backend.events.GetByKey(sinceToken)
|
if sinceToken != "" {
|
||||||
sEvents := backend.events.GetByScoreRange(sinceEventNode.Score(), -1, &sortedset.GetByScoreRangeOptions{
|
var sinceEvent events.Event
|
||||||
Limit: limit,
|
var eventSlice []events.Event
|
||||||
|
err := backend.events.View(func(tx *buntdb.Tx) error {
|
||||||
|
// handler error
|
||||||
|
val, err := tx.Get(sinceToken, true)
|
||||||
|
if err == nil {
|
||||||
|
json.Unmarshal([]byte(val), sinceEvent)
|
||||||
|
}
|
||||||
|
sinceRoomEvent := sinceEvent.(events.RoomEvent)
|
||||||
|
tx.AscendRange("origin_server_ts", `{"origin_server_ts": `+string(sinceRoomEvent.OriginServerTs)+`}`, `{"origin_server_ts": `+string(time.Now().Unix())+`}`, func(key, value string) bool {
|
||||||
|
var unmarshalledEvent events.Event
|
||||||
|
json.Unmarshal([]byte(value), unmarshalledEvent)
|
||||||
|
eventSlice = append(eventSlice, unmarshalledEvent)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
return err
|
||||||
})
|
})
|
||||||
|
|
||||||
eventsSlice := extractEventsFromNodes(sEvents)
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
var returnEvents []events.Event
|
var returnEvents []events.Event
|
||||||
for _, event := range eventsSlice {
|
if eventSlice != nil {
|
||||||
|
for _, event := range eventSlice {
|
||||||
if isEventRelatedToUser(event, user) {
|
if isEventRelatedToUser(event, user) {
|
||||||
returnEvents = append(returnEvents, event)
|
returnEvents = append(returnEvents, event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return returnEvents
|
return returnEvents
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func extractEventsFromNodes(nodes []*sortedset.SortedSetNode) []events.Event {
|
func extractEventsFromNodes(nodes []*sortedset.SortedSetNode) []events.Event {
|
||||||
var eventsSlice []events.Event
|
var eventsSlice []events.Event
|
||||||
for _, e := range nodes {
|
for _, e := range nodes {
|
||||||
|
Loading…
Reference in New Issue
Block a user