Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve logging #449

Merged
merged 3 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ open class RdCoroutineScope : CoroutineScope {
if (!currentHost.compareAndSet(null, host))
throw IllegalStateException("Could not override RdCoroutineHost")

logger.debug { "RdCoroutineHost has been overridden" }
logger.trace { "RdCoroutineHost has been overridden" }
host.coroutineContext.job.invokeOnCompletion {
currentHost.getAndSet(null)
logger.debug { "RdCoroutineHost has been reset" }
logger.trace { "RdCoroutineHost has been reset" }
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ interface IMarshaller<T : Any> : ISerializer<T> {
}

val IMarshaller<*>.fqn: String get() {
return if (this is LazyCompanionMarshaller) this.fgn
return if (this is LazyCompanionMarshaller) this.fqn
else _type.qualifiedName ?: _type.jvmName
}

Expand All @@ -108,7 +108,7 @@ class LazyCompanionMarshaller<T : Any>(
val fqn: String
) : IMarshaller<T> {
private val lazy = lazy(LazyThreadSafetyMode.PUBLICATION) {
Class.forName(fgn, true, classLoader).getDeclaredField("Companion").get(null) as IMarshaller<T>
Class.forName(fqn, true, classLoader).getDeclaredField("Companion").get(null) as IMarshaller<T>
}

override val _type: KClass<*>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,11 @@ open class RdMap<K : Any, V : Any> private constructor(
val lifetime = dispatchHelper.lifetime
val definition = tryPreBindValue(lifetime, key, value, true)

logReceived.trace { "onWireReceived:: ${logmsg(op, version, key, value)}" }

dispatchHelper.dispatch {
if (msgVersioned || !master || !isPendingForAck(key)) {
logReceived.trace { logmsg(op, version, key, value) }
logReceived.trace { "dispatched:: ${logmsg(op, version, key, value)}" }

if (value != null) {
val definitions = tryGetBindDefinitions(lifetime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ abstract class RdPropertyBase<T>(val valueSerializer: ISerializer<T>) : RdReacti

val definition = tryPreBindValue(dispatchHelper.lifetime, v, true)

logReceived.trace { "onWireReceived:: ${getMessage(version, v)}" }

dispatchHelper.dispatch {
val rejected = master && version < masterVersion
logReceived.trace { "property `$location` ($rdid):: oldver = $masterVersion, newver = $version, value = ${v.printToString()}${rejected.condstr { " >> REJECTED" }}" }
logReceived.trace { "dispatched:: ${getMessage(version, v)}${rejected.condstr { " >> REJECTED" }}" }

if (rejected) {
definition?.terminate()
Expand All @@ -124,6 +126,9 @@ abstract class RdPropertyBase<T>(val valueSerializer: ISerializer<T>) : RdReacti
}
}

private fun getMessage(version: Int, v: T) =
"property `$location` ($rdid):: oldver = $masterVersion, newver = $version, value = ${v.printToString()}"

override fun advise(lifetime: Lifetime, handler: (T) -> Unit) {
if (isBound) assertThreading()
property.advise(lifetime, handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ open class RdSet<T : Any> constructor(val valueSerializer: ISerializer<T>, priva
val kind = buffer.readEnum<AddRemove>()
val v = valueSerializer.read(ctx, buffer)

logReceived.trace { "onWireReceived:: $this :: $kind :: ${v.printToString()}" }
dispatchHelper.dispatch {
logReceived.trace { "dispatched:: $this :: $kind :: ${v.printToString()}" }
when (kind) {
AddRemove.Add -> set.add(v)
AddRemove.Remove -> set.remove(v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ class RdSignal<T>(val valueSerializer: ISerializer<T> = Polymorphic<T>()) : RdRe

override fun onWireReceived(proto: IProtocol, buffer: AbstractBuffer, ctx: SerializationCtx, dispatchHelper: IRdWireableDispatchHelper) {
val value = valueSerializer.read(ctx, buffer)
logReceived.trace {"signal `$location` ($rdid):: value = ${value.printToString()}"}
logReceived.trace {"onWireReceived:: signal `$location` ($rdid):: value = ${value.printToString()}"}
dispatchHelper.dispatch(scheduler) {
logReceived.trace {"dispatched:: signal `$location` ($rdid):: value = ${value.printToString()}"}
signal.fire(value)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,10 @@ class RdCall<TReq, TRes>(internal val requestSzr: ISerializer<TReq> = Polymorphi
private fun onWireReceived(proto: IProtocol, ctx: SerializationCtx, buffer: AbstractBuffer, wiredRdTask: EndpointWiredRdTask<TReq, TRes>, dispatchHelper: IRdWireableDispatchHelper) {
val externalCancellation = wiredRdTask.lifetime
val value = requestSzr.read(ctx, buffer)
logReceived.trace { "endpoint `$location`::($rdid) taskId=(${wiredRdTask.rdid}) request = ${value.printToString()}" }
logReceived.trace { "onWireReceived:: endpoint `$location`::($rdid) taskId=(${wiredRdTask.rdid}) request = ${value.printToString()}" }
dispatchHelper.dispatch(handlerScheduler) {
logReceived.trace { "dispatch:: endpoint `$location`::($rdid) taskId=(${wiredRdTask.rdid}) request = ${value.printToString()}" }

val rdTask = try {
val handlerLocal = handler
if (handlerLocal == null) {
Expand Down
15 changes: 11 additions & 4 deletions rd-net/RdFramework/Impl/RdMap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@ protected override void Unbind()
myBindDefinitions = null;
}

private static string getMessage(bool msgVersioned, long version, bool isPut, V? value)
{
return "{this} :: {kind} :: key = {key.PrintToString()}" +
(msgVersioned ? " :: version = " + version : "") +
(isPut ? $" :: value = {value.PrintToString()}" : "");
}

public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, UnsafeReader stream, IRdWireableDispatchHelper dispatchHelper)
{
var header = stream.ReadInt();
Expand Down Expand Up @@ -237,14 +244,14 @@ public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, Unsaf
var isPut = kind is AddUpdateRemove.Add or AddUpdateRemove.Update;
var value = isPut ? ReadValueDelegate(ctx, stream) : default;
var definition = TryPreBindValue(lifetime, key, value, true);

ReceiveTrace?.Log($"OnWireReceived:: {getMessage(msgVersioned, version, isPut, value)}");

dispatchHelper.Dispatch(() =>
{
if (msgVersioned || !IsMaster || !IsPendingForAck(key))
{
ReceiveTrace?.Log($"{this} :: {kind} :: key = {key.PrintToString()}" +
(msgVersioned ? " :: version = " + version : "") +
(isPut ? $" :: value = {value.PrintToString()}" : ""));
ReceiveTrace?.Log($"Dispatched:: {getMessage(msgVersioned, version, isPut, value)}");

if (isPut)
{
Expand All @@ -271,7 +278,7 @@ public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, Unsaf
}
else
{
ReceiveTrace?.Log(">> CHANGE IGNORED");
ReceiveTrace?.Log($">> CHANGE IGNORED {getMessage(msgVersioned, version, isPut, value)}");
}

if (msgVersioned)
Expand Down
9 changes: 8 additions & 1 deletion rd-net/RdFramework/Impl/RdProperty.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,13 @@ public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, Unsaf
var lifetime = dispatchHelper.Lifetime;
var definition = TryPreBindValue(lifetime, value, true);

ReceiveTrace?.Log($"OnWireReceived:: {GetMessage(version, value)}");

dispatchHelper.Dispatch(() =>
{
var rejected = IsMaster && version < myMasterVersion;

ReceiveTrace?.Log($"{this} :: oldver = {myMasterVersion}, newver = {version}, value = {value.PrintToString()}{(rejected ? " REJECTED" : "")}");
ReceiveTrace?.Log($"Dispatch:: {GetMessage(version, value)}{(rejected ? " REJECTED" : "")}");

if (rejected)
{
Expand All @@ -203,6 +205,11 @@ public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, Unsaf
});
}

private string GetMessage(int version, T value)
{
return $"{this} :: oldver = {myMasterVersion}, newver = {version}, value = {value.PrintToString()}";
}

private LifetimeDefinition? TryPreBindValue(Lifetime lifetime, T value, bool bindAlso)
{
if (OptimizeNested || !value.IsBindable())
Expand Down
3 changes: 2 additions & 1 deletion rd-net/RdFramework/Impl/RdSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,11 @@ public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, Unsaf
{
var kind = (AddRemove) stream.ReadInt();
var value = ReadValueDelegate(ctx, stream);
ReceiveTrace?.Log($"{this} :: {kind} :: {value.PrintToString()}");
ReceiveTrace?.Log($"OnWireReceived:: {this} :: {kind} :: {value.PrintToString()}");

dispatchHelper.Dispatch(() =>
{
ReceiveTrace?.Log($"Dispatched:: {this} :: {kind} :: {value.PrintToString()}");
switch (kind)
{
case AddRemove.Add:
Expand Down
3 changes: 2 additions & 1 deletion rd-net/RdFramework/Impl/RdSignal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ protected override void PreInit(Lifetime lifetime, IProtocol proto)
public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, UnsafeReader reader, IRdWireableDispatchHelper dispatchHelper)
{
var value = ReadValueDelegate(ctx, reader);
ReceiveTrace?.Log($"{this} :: value = {value.PrintToString()}");
ReceiveTrace?.Log($"OnWireReceived:: {this} :: value = {value.PrintToString()}");
dispatchHelper.Dispatch(Scheduler, () =>
{
ReceiveTrace?.Log($"Dispatched:: {this} :: value = {value.PrintToString()}");
mySignal.Fire(value);
});
}
Expand Down
3 changes: 2 additions & 1 deletion rd-net/RdFramework/Tasks/RdCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ private void OnWireReceived(SerializationCtx ctx, UnsafeReader reader, WiredRdTa
{
var externalCancellation = wiredTask.Lifetime;
var value = ReadRequestDelegate(ctx, reader);
ReceiveTrace?.Log($"{wiredTask} :: received request: {value.PrintToString()}");
ReceiveTrace?.Log($"OnWireReceived:: {wiredTask} :: received request: {value.PrintToString()}");

dispatchHelper.Dispatch(myHandlerScheduler, () =>
{
ReceiveTrace?.Log($"Dispatched:: {wiredTask} :: received request: {value.PrintToString()}");
var rdTask = RunHandler(value, externalCancellation, wiredTask);

rdTask.Result.Advise(Lifetime.Eternal, result =>
Expand Down
Loading