Skip to content

Commit

Permalink
introduce: IRenderObservables
Browse files Browse the repository at this point in the history
  • Loading branch information
riemannulus authored and s2quake committed Aug 29, 2024
1 parent 652ba8f commit cc05f77
Show file tree
Hide file tree
Showing 10 changed files with 305 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
using Libplanet.Crypto;
using Libplanet.Node.Options;
using Libplanet.Node.Services;
using Libplanet.Node.Services.Renderer;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Moq;

namespace Libplanet.Node.Tests.Services;

Expand Down Expand Up @@ -33,6 +35,7 @@ public void Create_Test()
storeOptions: storeOptions,
policyService: policyService,
actionLoaderProviders: [],
rendererService: Mock.Of<IRenderObservables>(),
logger: logger);
var blockChain = blockChainService.BlockChain;

Expand Down
1 change: 1 addition & 0 deletions sdk/node/Libplanet.Node/Libplanet.Node.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
<PackageReference Include="Microsoft.Extensions.Options.DataAnnotations" Version="8.0.0" />
<PackageReference Include="NJsonSchema" Version="11.0.2" />
<PackageReference Include="NJsonSchema.CodeGeneration.CSharp" Version="11.0.2" />
<PackageReference Include="R3" Version="1.2.6" />
</ItemGroup>

<ItemGroup>
Expand Down
58 changes: 12 additions & 46 deletions sdk/node/Libplanet.Node/Services/BlockChainService.cs
Original file line number Diff line number Diff line change
@@ -1,91 +1,57 @@
using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Numerics;
using System.Security.Cryptography;
using Bencodex.Types;
using Libplanet.Action;
using Libplanet.Action.Loader;
using Libplanet.Action.Sys;
using Libplanet.Blockchain;
using Libplanet.Blockchain.Policies;
using Libplanet.Blockchain.Renderers;
using Libplanet.Common;
using Libplanet.Crypto;
using Libplanet.Node.Options;
using Libplanet.Node.Services.Renderer;
using Libplanet.RocksDBStore;
using Libplanet.Store;
using Libplanet.Store.Trie;
using Libplanet.Types.Blocks;
using Libplanet.Types.Consensus;
using Libplanet.Types.Tx;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Libplanet.Node.Services;

internal sealed class BlockChainService : IBlockChainService, IActionRenderer
internal sealed class BlockChainService : IBlockChainService
{
private readonly SynchronizationContext _synchronizationContext;
private readonly ILogger<BlockChainService> _logger;
private readonly BlockChain _blockChain;
private readonly ConcurrentDictionary<TxId, ManualResetEvent> _eventByTxId = [];
private readonly ConcurrentDictionary<IValue, Exception> _exceptionByAction = [];

public BlockChainService(
IOptions<GenesisOptions> genesisOptions,
IOptions<StoreOptions> storeOptions,
PolicyService policyService,
IEnumerable<IActionLoaderProvider> actionLoaderProviders,
IRenderObservables rendererService,
ILogger<BlockChainService> logger)
{
_synchronizationContext = SynchronizationContext.Current ?? new();
_logger = logger;
_blockChain = CreateBlockChain(
genesisOptions: genesisOptions.Value,
storeOptions: storeOptions.Value,
stagePolicy: policyService.StagePolicy,
renderers: [this],
renderers:
[
rendererService.RenderActionObservable,
rendererService.RenderActionErrorObservable,
rendererService.RenderBlockObservable,
rendererService.RenderBlockEndObservable,
],
actionLoaders: [.. actionLoaderProviders.Select(item => item.GetActionLoader())]);
}

public event EventHandler<BlockEventArgs>? BlockAppended;

public BlockChain BlockChain => _blockChain;

void IRenderer.RenderBlock(Block oldTip, Block newTip)
{
}

void IActionRenderer.RenderAction(
IValue action, ICommittedActionContext context, HashDigest<SHA256> nextState)
{
}

void IActionRenderer.RenderActionError(
IValue action, ICommittedActionContext context, Exception exception)
{
_exceptionByAction.AddOrUpdate(action, exception, (_, _) => exception);
}

void IActionRenderer.RenderBlockEnd(Block oldTip, Block newTip)
{
_synchronizationContext.Post(Action, state: null);

void Action(object? state)
{
foreach (var transaction in newTip.Transactions)
{
if (_eventByTxId.TryGetValue(transaction.Id, out var manualResetEvent))
{
manualResetEvent.Set();
}
}

_logger.LogInformation("#{Height}: Block appended.", newTip.Index);
BlockAppended?.Invoke(this, new(newTip));
}
}

private static BlockChain CreateBlockChain(
GenesisOptions genesisOptions,
StoreOptions storeOptions,
Expand All @@ -101,8 +67,8 @@ private static BlockChain CreateBlockChain(
stateStore,
actionLoader);
var validators = genesisOptions.Validators.Select(PublicKey.FromHex)
.Select(item => new Validator(item, new BigInteger(1000)))
.ToArray();
.Select(item => new Validator(item, new BigInteger(1000)))
.ToArray();
var validatorSet = new ValidatorSet(validators: [.. validators]);
var nonce = 0L;
IAction[] actions =
Expand Down
2 changes: 0 additions & 2 deletions sdk/node/Libplanet.Node/Services/IBlockChainService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,5 @@ namespace Libplanet.Node.Services;

public interface IBlockChainService
{
event EventHandler<BlockEventArgs>? BlockAppended;

BlockChain BlockChain { get; }
}
12 changes: 12 additions & 0 deletions sdk/node/Libplanet.Node/Services/Renderer/IRenderObservables.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Libplanet.Node.Services.Renderer;

public interface IRenderObservables
{
public RenderActionObservable RenderActionObservable { get; }

public RenderActionErrorObservable RenderActionErrorObservable { get; }

public RenderBlockObservable RenderBlockObservable { get; }

public RenderBlockEndObservable RenderBlockEndObservable { get; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using System.Security.Cryptography;
using Bencodex.Types;
using Libplanet.Action;
using Libplanet.Blockchain.Renderers;
using Libplanet.Common;
using Libplanet.Types.Blocks;
using R3;
using Output = (
Bencodex.Types.IValue,
Libplanet.Action.ICommittedActionContext,
System.Exception);

namespace Libplanet.Node.Services.Renderer;

public class RenderActionErrorObservable : IObservable<Output>, IActionRenderer, IDisposable
{
private readonly List<IObserver<Output>> _observers = [];
private readonly BooleanDisposable _disposable = new BooleanDisposable();

public IDisposable Subscribe(IObserver<Output> observer)
{
_observers.Add(observer);
return _disposable;
}

public void RenderBlock(Block oldTip, Block newTip)
{
}

public void RenderAction(
IValue action,
ICommittedActionContext context,
HashDigest<SHA256> nextState)
{
}

public void RenderActionError(IValue action, ICommittedActionContext context, Exception exception)
{
if (_disposable.IsDisposed)
{
return;
}

foreach (IObserver<Output> observer in _observers)
{
observer.OnNext((action, context, exception));
}
}

public void RenderBlockEnd(Block oldTip, Block newTip)
{
}

public void Dispose()
{
_disposable.Dispose();

foreach (IObserver<Output> observer in _observers)
{
observer.OnCompleted();
}

GC.SuppressFinalize(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using System.Security.Cryptography;
using Bencodex.Types;
using Libplanet.Action;
using Libplanet.Blockchain.Renderers;
using Libplanet.Common;
using Libplanet.Types.Blocks;
using R3;
using Output = (
Bencodex.Types.IValue,
Libplanet.Action.ICommittedActionContext,
Libplanet.Common.HashDigest<System.Security.Cryptography.SHA256>);

namespace Libplanet.Node.Services.Renderer;

public class RenderActionObservable : IObservable<Output>, IActionRenderer, IDisposable
{
private readonly List<IObserver<Output>> _observers = [];
private readonly BooleanDisposable _disposable = new BooleanDisposable();

public IDisposable Subscribe(IObserver<Output> observer)
{
_observers.Add(observer);
return _disposable;
}

public void RenderBlock(Block oldTip, Block newTip)
{
}

public void RenderAction(
IValue action,
ICommittedActionContext context,
HashDigest<SHA256> nextState)
{
if (_disposable.IsDisposed)
{
return;
}

foreach (IObserver<Output> observer in _observers)
{
observer.OnNext((action, context, nextState));
}
}

public void RenderActionError(IValue action, ICommittedActionContext context, Exception exception)
{
}

public void RenderBlockEnd(Block oldTip, Block newTip)
{
}

public void Dispose()
{
_disposable.Dispose();
foreach (IObserver<Output> observer in _observers)
{
observer.OnCompleted();
}

GC.SuppressFinalize(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using System.Security.Cryptography;
using Bencodex.Types;
using Libplanet.Action;
using Libplanet.Blockchain.Renderers;
using Libplanet.Common;
using Libplanet.Types.Blocks;
using R3;

namespace Libplanet.Node.Services.Renderer;

public class RenderBlockEndObservable : IObservable<(Block, Block)>, IActionRenderer, IDisposable
{
private readonly List<IObserver<(Block, Block)>> _observers
= new List<IObserver<(Block, Block)>>();

private readonly BooleanDisposable _disposable = new BooleanDisposable();

public IDisposable Subscribe(IObserver<(Block, Block)> observer)
{
_observers.Add(observer);
return _disposable;
}

public void RenderBlock(Block oldTip, Block newTip)
{
}

public void RenderAction(
IValue action,
ICommittedActionContext context,
HashDigest<SHA256> nextState)
{
}

public void RenderActionError(
IValue action,
ICommittedActionContext context,
Exception exception)
{
}

public void RenderBlockEnd(Block oldTip, Block newTip)
{
if (_disposable.IsDisposed)
{
return;
}

foreach (IObserver<(Block, Block)> observer in _observers)
{
observer.OnNext((oldTip, newTip));
}
}

public void Dispose()
{
_disposable.Dispose();
foreach (IObserver<(Block, Block)> observer in _observers)
{
observer.OnCompleted();
}

GC.SuppressFinalize(this);
}
}
Loading

0 comments on commit cc05f77

Please sign in to comment.