Skip to content

Commit

Permalink
Merge pull request #680 from shocknet/increment-initial
Browse files Browse the repository at this point in the history
increment initial balance
  • Loading branch information
shocknet-justin authored May 3, 2024
2 parents 0f38d5b + 1cdc494 commit cb6df0d
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 16 deletions.
34 changes: 34 additions & 0 deletions src/services/helpers/functionQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { PubLogger, getLogger } from "../helpers/logger.js"

type Item<T> = { res: (v: T) => void, rej: (message: string) => void }
export default class FunctionQueue<T> {
log: PubLogger
queue: Item<T>[] = []
running: boolean = false
f: () => Promise<T>
constructor(name: string, f: () => Promise<T>) {
this.log = getLogger({ appName: name })
this.f = f
}

Run = (item: Item<T>) => {
this.queue.push(item)
if (!this.running) {
this.execF()
}
}

execF = async () => {
this.running = true
try {
const res = await this.f()
this.queue.forEach(q => q.res(res))
} catch (err) {
this.queue.forEach(q => q.rej((err as any).message))
}
this.queue = []
this.running = false
}
}


8 changes: 4 additions & 4 deletions src/services/lnd/lnd.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { LightningClient } from '../../../proto/lnd/lightning.client.js'
import { InvoicesClient } from '../../../proto/lnd/invoices.client.js'
import { RouterClient } from '../../../proto/lnd/router.client.js'
import { ChainNotifierClient } from '../../../proto/lnd/chainnotifier.client.js'
import { GetInfoResponse, AddressType, NewAddressResponse, AddInvoiceResponse, Invoice_InvoiceState, PayReq, Payment_PaymentStatus, Payment, PaymentFailureReason, SendCoinsResponse, EstimateFeeResponse, ChannelBalanceResponse, TransactionDetails, ListChannelsResponse, ClosedChannelsResponse, PendingChannelsResponse } from '../../../proto/lnd/lightning.js'
import { GetInfoResponse, AddressType, NewAddressResponse, AddInvoiceResponse, Invoice_InvoiceState, PayReq, Payment_PaymentStatus, Payment, PaymentFailureReason, SendCoinsResponse, EstimateFeeResponse, ChannelBalanceResponse, TransactionDetails, ListChannelsResponse, ClosedChannelsResponse, PendingChannelsResponse, ForwardingHistoryResponse } from '../../../proto/lnd/lightning.js'
import { OpenChannelReq } from './openChannelReq.js';
import { AddInvoiceReq } from './addInvoiceReq.js';
import { PayInvoiceReq } from './payInvoiceReq.js';
Expand Down Expand Up @@ -358,9 +358,9 @@ export default class {
return { confirmedBalance: Number(confirmedBalance), unconfirmedBalance: Number(unconfirmedBalance), totalBalance: Number(totalBalance), channelsBalance }
}

async GetForwardingHistory(indexOffset: number): Promise<{ fee: number, chanIdIn: string, chanIdOut: string, timestampNs: number, offset: number }[]> {
const { response } = await this.lightning.forwardingHistory({ indexOffset, numMaxEvents: 0, startTime: 0n, endTime: 0n, peerAliasLookup: false }, DeadLineMetadata())
return response.forwardingEvents.map(e => ({ fee: Number(e.fee), chanIdIn: e.chanIdIn, chanIdOut: e.chanIdOut, timestampNs: Number(e.timestampNs), offset: response.lastOffsetIndex }))
async GetForwardingHistory(indexOffset: number, startTime = 0): Promise<ForwardingHistoryResponse> {
const { response } = await this.lightning.forwardingHistory({ indexOffset, numMaxEvents: 0, startTime: BigInt(startTime), endTime: 0n, peerAliasLookup: false }, DeadLineMetadata())
return response
}

async GetAllPaidInvoices(max: number) {
Expand Down
48 changes: 37 additions & 11 deletions src/services/main/watchdog.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { EnvCanBeInteger } from "../helpers/envParser.js";
import FunctionQueue from "../helpers/functionQueue.js";
import { getLogger } from "../helpers/logger.js";
import LND from "../lnd/lnd.js";
import { ChannelBalance } from "../lnd/settings.js";
Expand All @@ -12,20 +13,24 @@ export const LoadWatchdogSettingsFromEnv = (test = false): WatchdogSettings => {
}
}
export class Watchdog {

queue: FunctionQueue<void>
initialLndBalance: number;
initialUsersBalance: number;
startedAtUnix: number;
latestIndexOffset: number;
accumulatedHtlcFees: number;
lnd: LND;
settings: WatchdogSettings;
storage: Storage;
latestCheckStart = 0
log = getLogger({ appName: "watchdog" })
enabled = false
ready = false
interval: NodeJS.Timer;
constructor(settings: WatchdogSettings, lnd: LND, storage: Storage) {
this.lnd = lnd;
this.settings = settings;
this.storage = storage;
this.queue = new FunctionQueue("watchdog::queue", () => this.StartCheck())
}

Stop() {
Expand All @@ -35,17 +40,31 @@ export class Watchdog {
}

Start = async () => {
this.startedAtUnix = Math.floor(Date.now() / 1000)
const totalUsersBalance = await this.storage.paymentStorage.GetTotalUsersBalance()
this.initialLndBalance = await this.getTotalLndBalance(totalUsersBalance)
this.initialUsersBalance = totalUsersBalance
this.enabled = true
const fwEvents = await this.lnd.GetForwardingHistory(0, this.startedAtUnix)
this.latestIndexOffset = fwEvents.lastOffsetIndex
this.accumulatedHtlcFees = 0

this.interval = setInterval(() => {
if (this.latestCheckStart + (1000 * 60) < Date.now()) {
this.log("No balance check was made in the last minute, checking now")
this.PaymentRequested()
}
}, 1000 * 60)

this.ready = true
}

updateAccumulatedHtlcFees = async () => {
const fwEvents = await this.lnd.GetForwardingHistory(this.latestIndexOffset, this.startedAtUnix)
this.latestIndexOffset = fwEvents.lastOffsetIndex
fwEvents.forwardingEvents.forEach((event) => {
this.accumulatedHtlcFees += Number(event.fee)
})

}


Expand All @@ -54,7 +73,8 @@ export class Watchdog {
const walletBalance = await this.lnd.GetWalletBalance()
this.log(Number(walletBalance.confirmedBalance), "sats in chain wallet")
const channelsBalance = await this.lnd.GetChannelBalance()
getLogger({ appName: "debugLndBalancev3" })({ w: walletBalance, c: channelsBalance, u: usersTotal })
getLogger({ appName: "debugLndBalancev3" })({ w: walletBalance, c: channelsBalance, u: usersTotal, f: this.accumulatedHtlcFees })

const localChannelsBalance = Number(channelsBalance.localBalance?.sat || 0)
const unsettledLocalBalance = Number(channelsBalance.unsettledLocalBalance?.sat || 0)
return Number(walletBalance.confirmedBalance) + localChannelsBalance + unsettledLocalBalance
Expand Down Expand Up @@ -111,16 +131,12 @@ export class Watchdog {
return false
}

PaymentRequested = async () => {
this.log("Payment requested, checking balance")
if (!this.enabled) {
this.log("WARNING! Watchdog not enabled, skipping balance check")
return
}
StartCheck = async () => {
this.latestCheckStart = Date.now()
await this.updateAccumulatedHtlcFees()
const totalUsersBalance = await this.storage.paymentStorage.GetTotalUsersBalance()
const totalLndBalance = await this.getTotalLndBalance(totalUsersBalance)
const deltaLnd = totalLndBalance - this.initialLndBalance
const deltaLnd = totalLndBalance - (this.initialLndBalance + this.accumulatedHtlcFees)
const deltaUsers = totalUsersBalance - this.initialUsersBalance
const deny = this.checkBalanceUpdate(deltaLnd, deltaUsers)
if (deny) {
Expand All @@ -131,6 +147,16 @@ export class Watchdog {
this.lnd.UnlockOutgoingOperations()
}

PaymentRequested = async () => {
this.log("Payment requested, checking balance")
if (!this.ready) {
throw new Error("Watchdog not ready")
}
return new Promise<void>((res, rej) => {
this.queue.Run({ res, rej })
})
}

checkDeltas = (deltaLnd: number, deltaUsers: number): DeltaCheckResult => {
if (deltaLnd < 0) {
if (deltaUsers < 0) {
Expand Down
3 changes: 2 additions & 1 deletion src/services/metrics/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ export default class Handler {

async FetchLatestForwardingEvents() {
const latestIndex = await this.storage.metricsStorage.GetLatestForwardingIndexOffset()
const forwards = await this.lnd.GetForwardingHistory(latestIndex)
const res = await this.lnd.GetForwardingHistory(latestIndex)
const forwards = res.forwardingEvents.map(e => ({ fee: Number(e.fee), chanIdIn: e.chanIdIn, chanIdOut: e.chanIdOut, timestampNs: e.timestampNs.toString(), offset: res.lastOffsetIndex }))
await Promise.all(forwards.map(async f => {
await this.storage.metricsStorage.IncrementChannelRouting(f.chanIdIn, { forward_fee_as_input: f.fee, latest_index_offset: f.offset })
await this.storage.metricsStorage.IncrementChannelRouting(f.chanIdOut, { forward_fee_as_output: f.fee, latest_index_offset: f.offset })
Expand Down

0 comments on commit cb6df0d

Please sign in to comment.