-
Notifications
You must be signed in to change notification settings - Fork 44
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
210 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
18 changes: 18 additions & 0 deletions
18
Olive.Entities.Data.Replication/Publish/ExportDataAttribute.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
using System; | ||
|
||
namespace Olive.Entities.Replication | ||
{ | ||
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)] | ||
public class ExportDataAttribute : Attribute | ||
{ | ||
public Type Type { get; } | ||
|
||
public ExportDataAttribute(Type type) | ||
{ | ||
Type = type ?? throw new ArgumentNullException(nameof(type)); | ||
|
||
if (!type.IsA<ReplicatedData>()) | ||
throw new ArgumentException(type.FullName + " is not a subclass of " + typeof(ReplicatedData).FullName); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
using System.Reflection; | ||
|
||
namespace Olive.Entities.Replication | ||
{ | ||
public class ExportedField | ||
{ | ||
string title; | ||
public PropertyInfo Property { get; } | ||
|
||
public bool IsAssociation => Property.PropertyType.IsA<IEntity>(); | ||
|
||
public bool IsInverseAssociation => Property.PropertyType.IsA<IDatabaseQuery>() && Property.Defines<CalculatedAttribute>(); | ||
|
||
public ExportedField(PropertyInfo property) | ||
{ | ||
Property = property; | ||
title = property.GetCustomAttribute<System.ComponentModel.DisplayNameAttribute>()?.DisplayName; | ||
if (title.IsEmpty()) | ||
title = property.Name.ToLiteralFromPascalCase(); | ||
} | ||
|
||
public string GetTitle() => title; | ||
|
||
public ExportedField Title(string exportTitle) | ||
{ | ||
title = exportTitle; | ||
return this; | ||
} | ||
|
||
public object GetValue(IEntity entity) | ||
{ | ||
var result = Property.GetValue(entity); | ||
if (result is IEntity ent) return ent.GetId(); | ||
else return result; | ||
} | ||
} | ||
} |
66 changes: 66 additions & 0 deletions
66
Olive.Entities.Data.Replication/Publish/ReplicatedData-T.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
using System; | ||
using System.Linq; | ||
using System.Linq.Expressions; | ||
using System.Threading.Tasks; | ||
|
||
namespace Olive.Entities.Replication | ||
{ | ||
public abstract class ReplicatedData<TDomain> : ReplicatedData | ||
where TDomain : IEntity | ||
{ | ||
Type domainType; | ||
|
||
public override Type DomainType => domainType ?? (domainType = GetType().BaseType.GenericTypeArguments.Single()); | ||
|
||
protected virtual string QueueUrlConfigKey => string.Empty; | ||
|
||
IEventBusQueue Queue | ||
{ | ||
get | ||
{ | ||
if (QueueUrlConfigKey.HasValue()) | ||
return EventBus.Queue(Config.GetOrThrow(QueueUrlConfigKey)); | ||
else | ||
return EventBus.Queue(QueueUrl); | ||
} | ||
} | ||
|
||
internal override void Start() | ||
{ | ||
GlobalEntityEvents.InstanceSaved.Handle(async x => | ||
{ | ||
if (!x.Entity.GetType().IsA(DomainType)) return; | ||
await Queue.Publish(ToMessage(x.Entity)); | ||
}); | ||
} | ||
|
||
public ExportedField Export<T>(Expression<Func<TDomain, T>> field) | ||
{ | ||
var result = new ExportedField(field.GetProperty()); | ||
Fields.Add(result); | ||
return result; | ||
} | ||
|
||
public void ExportAll() | ||
{ | ||
var properties = DomainType.GetProperties() | ||
.Where(x => x.CanRead && x.CanWrite) | ||
.Where(x => x.DeclaringType.Assembly == DomainType.Assembly) | ||
.ToArray(); | ||
|
||
foreach (var p in properties) | ||
{ | ||
if (p.PropertyType == typeof(Guid?) && p.Name.EndsWith("Id") && properties.Any(x => x.Name == | ||
p.Name.TrimEnd(2))) continue; | ||
|
||
Fields.Add(new ExportedField(p)); | ||
} | ||
} | ||
|
||
internal override async Task UploadAll() | ||
{ | ||
foreach (var item in await Context.Current.Database().GetList<TDomain>()) | ||
await Queue.Publish(ToMessage(item)); // TODO: Should this be done in parallel batches? | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
using Newtonsoft.Json; | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Threading.Tasks; | ||
|
||
namespace Olive.Entities.Replication | ||
{ | ||
public abstract class ReplicatedData | ||
{ | ||
internal string QueueUrl { get; set; } | ||
|
||
public List<ExportedField> Fields = new List<ExportedField>(); | ||
|
||
public abstract Type DomainType { get; } | ||
|
||
public ReplicateDataMessage ToMessage(IEntity entity) | ||
{ | ||
var properties = new Dictionary<string, object>(); | ||
|
||
properties["ID"] = entity.GetId(); | ||
|
||
foreach (var f in Fields.Except(x => x.IsInverseAssociation)) | ||
properties[f.Property.Name] = f.GetValue(entity); | ||
|
||
var serialized = JsonConvert.SerializeObject(properties); | ||
|
||
return new ReplicateDataMessage | ||
{ | ||
TypeFullName = GetType().FullName, | ||
Entity = serialized, | ||
CreationUtc = DateTime.UtcNow | ||
}; | ||
} | ||
|
||
internal abstract void Start(); | ||
|
||
internal abstract Task UploadAll(); | ||
|
||
protected internal abstract void Define(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Reflection; | ||
using System.Threading.Tasks; | ||
|
||
namespace Olive.Entities.Replication | ||
{ | ||
public abstract partial class SourceEndpoint | ||
{ | ||
Dictionary<string, ReplicatedData> Agents = new Dictionary<string, ReplicatedData>(); | ||
|
||
string UrlPattern => Config.GetOrThrow("DataReplication:" + GetType().FullName + ":Url"); | ||
|
||
/// <summary> | ||
/// Starts publishing an end point for the specified data types. | ||
/// It handles all save events on such objects, and publishes them on the event bus. | ||
/// </summary> | ||
public void Publish() | ||
{ | ||
var types = GetType().GetCustomAttributes<ExportDataAttribute>().Select(x => x.Type).Distinct(); | ||
|
||
if (types.None()) | ||
throw new Exception("No data is exported on " + GetType().FullName); | ||
|
||
foreach (var type in types) | ||
{ | ||
var agent = type.CreateInstance<ReplicatedData>(); | ||
agent.Define(); | ||
|
||
agent.QueueUrl = UrlPattern; | ||
Agents.Add(type.FullName, agent); | ||
agent.Start(); | ||
} | ||
|
||
HandleRefreshRequests(); | ||
} | ||
|
||
void HandleRefreshRequests() | ||
{ | ||
EventBus.Queue(UrlPattern.TrimEnd(".fifo") + "-REFRESH.fifo").Subscribe<RefreshMessage>(message => | ||
{ | ||
Agents[message.TypeName].UploadAll(); | ||
return Task.CompletedTask; | ||
}); | ||
} | ||
} | ||
} |