Skip to content

Commit

Permalink
Fix bugs and add some logs
Browse files Browse the repository at this point in the history
  • Loading branch information
wuhaixian1984 committed Sep 11, 2024
1 parent 65c9fef commit 8ac2685
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 12 deletions.
8 changes: 4 additions & 4 deletions src/db/file-record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export function createFileOrderOperator(db: Database): DbOrderOperator {
indexer: string,
): Promise<FileRecord[]> => {
const records = await db.all(
'select id, cid, expire_at, size, amount, replicas, indexer, status, last_updated, create_at from file_record where cid in (?) and indexer = ?',
'select id, cid, expire_at, size, amount, replicas, indexer, status, last_updated, create_at, retry_count from file_record where cid in (?) and indexer = ?',
[cids, indexer],
);
return records;
Expand Down Expand Up @@ -185,13 +185,13 @@ export function createFileOrderOperator(db: Database): DbOrderOperator {
if (indexer === null) {
return db.get(
`select id, cid, expire_at, size, amount, replicas,
indexer, status, last_updated, create_at
indexer, status, last_updated, create_at, retry_count
from file_record where status = "new" and ${sizeCond} order by amount desc, id asc limit 1`,
);
}
return db.get(
`select id, cid, expire_at, size, amount, replicas,
indexer, status, last_updated, create_at
indexer, status, last_updated, create_at, retry_count
from file_record where indexer = ? and status = "new" and ${sizeCond} order by amount desc, id asc limit 1`,
[indexer],
);
Expand All @@ -210,7 +210,7 @@ export function createFileOrderOperator(db: Database): DbOrderOperator {
addFiles,
getFileInfo: async (cid, indexer) => {
const record = await db.get(
'select id, cid, expired_at, size, amount, replicas, indexer, status, last_updated, create_at from file_record where cid = ? and indexer = ? limit 1',
'select id, cid, expired_at, size, amount, replicas, indexer, status, last_updated, create_at, retry_count from file_record where cid = ? and indexer = ? limit 1',
[cid, indexer],
);
return record || null;
Expand Down
11 changes: 9 additions & 2 deletions src/db/pin-record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export function createPinRecordOperator(db: Database): PinRecordOperator {
const getSealingRecords = async (): DbResult<PinRecord[]> => {
// get all sealing one time
return db.all(
'select id, cid, size, status, pin_at, last_updated, pin_by, sealed_size, last_check_time from pin_record where status = "sealing"',
'select id, cid, size, status, pin_at, last_updated, pin_by, sealed_size, last_check_time, file_record_id from pin_record where status = "sealing"',
);
};
const addPinRecord = async (
Expand All @@ -31,6 +31,13 @@ export function createPinRecordOperator(db: Database): PinRecordOperator {
pinBy: PullingStrategy,
fileRecordId: number
): DbWriteResult => {
// remove old pin record with the same cid + fileRecordId and status is failed
await db.run(
'delete from pin_record where cid = ? and file_record_id = ? and status = "failed"',
[cid, fileRecordId]
);

// insert new record
await db.run(
'insert into pin_record ' +
'(`cid`, `size`, `status`, `pin_at`, `last_updated`, `pin_by`, `file_record_id`) ' +
Expand All @@ -40,7 +47,7 @@ export function createPinRecordOperator(db: Database): PinRecordOperator {
};
const getPinRecordsByCid = async (cid: string): DbResult<PinRecord[]> => {
const result = await db.all(
'select id, cid, size, status, pin_at, last_updated, pin_by, sealed_size, last_check_time from pin_record where cid = ? ',
'select id, cid, size, status, pin_at, last_updated, pin_by, sealed_size, last_check_time, file_record_id from pin_record where cid = ? ',
[cid],
);
return result;
Expand Down
13 changes: 9 additions & 4 deletions src/tasks/file-retry-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,24 @@ const MinFileRetryInterval = Dayjs.duration({
minutes: 30,
}).asSeconds();

async function handleRetry(context: AppContext) {
async function handleRetry(
context: AppContext,
logger: Logger,
) {
const { database } = context;

const now = getTimestamp();
const maxCreateTime = now - MaxFilePendingTime;
const maxRetryTime = now - MinFileRetryInterval;

await database.run(
const failedResult = await database.run(
`update file_record set status = "failed"
where status in (${toQuotedList(PendingStatus)})
and last_updated < ?`, // Should use last_updated instead of create_at, since replay old order will only update last_updated
[maxCreateTime],
);

await database.run(
const retryableResult = await database.run(
`update file_record set status = "new"
where status in (${toQuotedList(RetryableStatus)})
and last_updated < ?`,
Expand All @@ -41,14 +44,16 @@ async function handleRetry(context: AppContext) {
}).asSeconds();
const maxPinFailedRetryTime = now - sealFailedRetryInterval;

await database.run(
const sealFailedRetryResult = await database.run(
`update file_record
set status = "new",
retry_count = COALESCE(retry_count, 0) + 1
where status = 'sealFailedRetry'
and last_updated < ?`,
[maxPinFailedRetryTime],
);

logger.info(`Handle Retry: Mark failed - ${failedResult.changes}; Normal Retry: ${retryableResult.changes}; Seal Failed Retry: ${sealFailedRetryResult.changes}`);
}

export async function createFileRetryTask(
Expand Down
6 changes: 4 additions & 2 deletions src/tasks/seal-status-updater-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import { IsStopped, makeIntervalTask } from './task-utils';
import { createFileOrderOperator } from '../db/file-record';

const MinSealStartTime = 30; // 30 seconds for a sealing job to start
const SealUpdateTimeout = 5 * 60; // 5 minutes for a sealing job timeout
const SealStartTimeout = 5 * 60; // 5 minutes for a sealing job start timeout
const SealUpdateTimeout = 10 * 60; // 10 minutes for a sealing job update timeout

/**
* task to update the sealing status in the pin records table
Expand Down Expand Up @@ -86,7 +87,7 @@ async function checkAndUpdateStatus(
// cid not in seal info map, either means sealing is done or sealing is not started
const done = await isSealDone(record.cid, sworkerApi, logger);
if (!done) {
if (sealUpdateInterval > SealUpdateTimeout) {
if (sealUpdateInterval > SealStartTimeout) {
logger.info('sealing blocked for file "%s", cancel sealing', record.cid);
await markRecordAsFailed(record, pinRecordOps, fileOrderOps, context, logger, false);
}
Expand Down Expand Up @@ -122,6 +123,7 @@ async function markRecordAsFailed(
const retry_count = _.isNil(fileRecord.retry_count) ? 0 : fileRecord.retry_count;
if (retry_count < context.config.scheduler.sealFailedRetryCount) {
fileStatus = 'sealFailedRetry';
logger.info(`file "${record.cid}" retry count is ${retry_count}, mark to keep retrying`);
}
}
}
Expand Down

0 comments on commit 8ac2685

Please sign in to comment.