Implement getting event timeline by chat in EventStorageManager

This commit is contained in:
ChronosX88 2021-03-25 14:25:51 +03:00
parent a685e3d0b5
commit a565a0e330
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
5 changed files with 105 additions and 34 deletions

View File

@ -3,6 +3,8 @@ using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using ChatSubsystem.Storage.Interfaces; using ChatSubsystem.Storage.Interfaces;
using ChatSubsystem.Storage.Models; using ChatSubsystem.Storage.Models;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Driver; using MongoDB.Driver;
using Zirconium.Core.Models; using Zirconium.Core.Models;
@ -12,9 +14,6 @@ namespace ChatSubsystem.Storage
{ {
private const string EventsCollectionName = "events"; private const string EventsCollectionName = "events";
// get event by id
// get events for user after some point in timeline (via sorting by event id)
private ChatStorageManager _chatStorageManager; private ChatStorageManager _chatStorageManager;
private IMongoCollection<Event> events; private IMongoCollection<Event> events;
@ -24,56 +23,105 @@ namespace ChatSubsystem.Storage
events = database.GetCollection<Event>(EventsCollectionName); events = database.GetCollection<Event>(EventsCollectionName);
} }
public async Task<IList<Event>> GetEventsForUser(EntityID user, EntityID since, int limit) public async Task<(IList<Event>, IList<Chat>)> GetEventsForUser(EntityID user, EntityID since, int limit)
{ {
var chats = await _chatStorageManager.GetChatsForUser(user); var chats = await _chatStorageManager.GetChatsForUser(user);
return new List<Event>(); var evs = new List<Event>();
foreach (var c in chats)
{
var res = await GetEventsForChat(c, since, limit);
evs.AddRange(res);
}
evs = evs.OrderBy(x => x.Timestamp).ToList();
return (evs, chats);
} }
private async Task<IList<Event>> GetEventsForChat(EntityID since, int limit) private async Task<IList<Event>> GetEventsForChat(Chat chat, EntityID since, int limit)
{ {
return new List<Event>(); var graphLookupStage = new BsonDocument("$graphLookup",
new BsonDocument
{
{ "from", EventsCollectionName },
{ "startWith", "{ EventID: "+since+" }" },
{ "connectFromField", "EventID"},
{ "connectToField", "PrevEvents" },
{ "as", "Children" },
{ "maxDepth", limit },
{ "restrictSearchWithMatch", "{ ChatId: "+chat.Id+" }" }
});
var result = await events.Aggregate()
.AppendStage<BsonDocument>(graphLookupStage).ToListAsync();
var deserializedResult = new List<Event>();
result.ForEach(x =>
{
deserializedResult.Add(BsonSerializer.Deserialize<EventWithChildren>(x));
});
return deserializedResult;
} }
public async Task SaveEvent(Event e) public async Task SaveEvent(Event e)
{ {
var connectFromField = (FieldDefinition<Event, EntityID>)"_id"; var graphLookupStage = new BsonDocument("$graphLookup",
var connectToField = (FieldDefinition<Event, EntityID>)"PrevID"; new BsonDocument
var startWith = (AggregateExpressionDefinition<Event, string>)"$_id"; {
var @as = (FieldDefinition<EventWithChildren, IEnumerable<Event>>)"Children"; { "from", EventsCollectionName },
{ "startWith", "$_id" },
{ "connectFromField", "_id"},
{ "connectToField", "PrevID" },
{ "as", "Children" },
{ "maxDepth", 0 }
});
// link to previous global events var result = await events.Aggregate()
var res = await events.Aggregate() .AppendStage<BsonDocument>(graphLookupStage)
.GraphLookup(events, connectFromField, connectToField, startWith, @as) .Match("{ Children: { $size: 0 } }")
.Match("{ Children: { $size: 0 } }").ToListAsync(); .ToListAsync();
if (res.Count == 0)
if (result.Count == 0)
{ {
await events.InsertOneAsync(e); await events.InsertOneAsync(e);
return; return;
} }
res.ForEach(x => result.ForEach(x =>
{ {
e.PrevID.Append(x.Id); var y = BsonSerializer.Deserialize<EventWithChildren>(x);
e.PrevID.Append(y.Id);
}); });
// link to previous events in chat graphLookupStage = new BsonDocument("$graphLookup",
connectToField = (FieldDefinition<Event, EntityID>)"PrevEvents"; new BsonDocument
var opts = new AggregateGraphLookupOptions<Event, Event, EventWithChildren>() {
{ { "from", EventsCollectionName },
RestrictSearchWithMatch = "{ ChatId: "+e.ChatId+" }" { "startWith", "$EventID" },
}; { "connectFromField", "EventID"},
res = await events.Aggregate() { "connectToField", "PrevEvents" },
.GraphLookup(events, connectFromField, connectToField, startWith, @as, opts) { "as", "Children" },
.Match("{ Children: { $size: 0 } }").ToListAsync(); { "maxDepth", 0 },
if (res.Count == 0) { "restrictSearchWithMatch", "{ ChatId: "+e.ChatId+" }" }
});
result = await events.Aggregate()
.AppendStage<BsonDocument>(graphLookupStage)
.Match("{ Children: { $size: 0 } }")
.ToListAsync();
if (result.Count == 0)
{ {
await events.InsertOneAsync(e); await events.InsertOneAsync(e);
return; return;
} }
res.ForEach(x => result.ForEach(x =>
{ {
e.PrevEvents.Append(x.EventID); var y = BsonSerializer.Deserialize<EventWithChildren>(x);
e.PrevEvents.Append(y.EventID);
}); });
await events.InsertOneAsync(e); await events.InsertOneAsync(e);

View File

@ -7,7 +7,7 @@ namespace ChatSubsystem.Storage.Interfaces
{ {
public interface IEventStorageManager public interface IEventStorageManager
{ {
Task<IList<Event>> GetEventsForUser(EntityID user, EntityID since, int limit); Task<(IList<Event>, IList<Chat>)> GetEventsForUser(EntityID user, EntityID since, int limit);
Task<Event> GetEventById(EntityID id); Task<Event> GetEventById(EntityID id);
Task SaveEvent(Event e); Task SaveEvent(Event e);
} }

View File

@ -0,0 +1,12 @@
using Zirconium.Core.Models;
namespace ChatSubsystem.Storage.Models
{
public class SyncRequest
{
public EntityID Since { get; set; }
public int Limit { get; set; }
public EntityID ChatID { get; set; }
public string SyncDirection { get; set; }
}
}

View File

@ -0,0 +1,12 @@
using System.Collections.Generic;
using Zirconium.Core.Models;
namespace ChatSubsystem.Storage.Models
{
public class SyncResponse
{
public EntityID NextBatch { get; set; }
public IList<Chat> MentionedChats { get; set; }
public IList<Event> Events { get; set; }
}
}

View File

@ -3,7 +3,6 @@ using MongoDB.Bson.Serialization.Attributes;
using MongoDB.Driver; using MongoDB.Driver;
using Zirconium.Core; using Zirconium.Core;
using Zirconium.Core.Plugins.Interfaces; using Zirconium.Core.Plugins.Interfaces;
using System.Linq;
using Newtonsoft.Json; using Newtonsoft.Json;
using JWT.Algorithms; using JWT.Algorithms;
using JWT.Builder; using JWT.Builder;