Skip to content

Commit

Permalink
Fix: Fallback to RPC in more cases when reconcile stream errors (#2396)
Browse files Browse the repository at this point in the history
## Why is this change needed?

In cases like this, we're not falling back to the RPC 

```
{"jobName":"reconcile","jobId":"9273216","reason":"Unable to get all casts for FID 652934: server timeout","errorName":"Error","errorMessage":"Unable to get all casts for FID 652934: server timeout","errorStack":"Error: Unable to get all casts for FID 652934: server timeout\n    at
MessageReconciliation.getAllCastMessagesByFidInBatchesOf (file:///app/node_modules/@farcaster/shuttle/dist/index.mjs:1060:15)\n    at runNextTicks (node:internal/process/task_queues:60:5)\n    at process.processTimers (node:internal/timers:511:9)\n    at async MessageReconciliation.allHubMessagesOfTypeForFid (file:///app/node_modules/@farcaster/shuttle/dist/index.mjs:980:22)\n    at async MessageReconciliation.reconcileMessagesOfTypeForFid (file:///app/node_modules/@farcaster/shuttle/dist/index.mjs:918:22)\n    at async MessageReconciliation.reconcileMessagesForFid (file:///app/node_modules/@farcaster/shuttle/dist/index.mjs:913:7)\n    at App.reconcileFids (/app/src/shuttle.ts:132:13)\n    at Worker.Worker.autorun (/app/src/worker.ts:21:9)\n    at Worker.processJob (/app/node_modules/bullmq/src/classes/worker.ts:776:22)\n    at Worker.retryIfFailed (/app/node_modules/bullmq/src/classes/worker.ts:982:16)"}
```

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.

<!-- start pr-codex -->

---

## PR-Codex overview
This PR focuses on improving error handling in the `reconcile` process
by enhancing the fallback mechanism to RPC in more scenarios when stream
errors occur, and temporarily skipping a test for unresponsive server
requests.

### Detailed summary
- Updated the `shuttle.integration.test.ts` file to skip the test for
unresponsive server requests.
- Enhanced the `messageReconciliation.ts` file to log a warning when a
stream fetch times out and fall back to RPC.
- Added a cancellation mechanism to prevent hanging connections.

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
sanjayprabhu authored Nov 7, 2024
1 parent 2cc08d9 commit 07aaf85
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 11 deletions.
5 changes: 5 additions & 0 deletions .changeset/angry-deers-repair.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/shuttle": patch
---

fix: Make fallback to rpc in more cases when reconcile stream errors
3 changes: 2 additions & 1 deletion packages/shuttle/src/shuttle.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,8 @@ describe("shuttle", () => {
expect(messagesInDb.length).toBe(2);
});

test("reconciler lets unresponsive server requests terminate in error", async () => {
// TODO: Skip for now, and figure out how to test that the fallback is called correctly
xtest("reconciler lets unresponsive server requests terminate in error", async () => {
const startTimestamp = getFarcasterTime()._unsafeUnwrap();

const linkAddMessage = await Factories.LinkAddMessage.create(
Expand Down
21 changes: 11 additions & 10 deletions packages/shuttle/src/shuttle/messageReconciliation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,22 +183,23 @@ export class MessageReconciliation {
) {
const id = randomUUID();
const result = new Promise<HubResult<MessagesResponse>>((resolve) => {
// Do not allow hanging unresponsive connections to linger:
const cancel = setTimeout(
() => resolve(err(new HubError("unavailable", "server timeout"))),
this.connectionTimeout,
);

if (!this.stream) {
fallback()
.then((result) => resolve(result))
.finally(() => clearTimeout(cancel));
fallback().then((result) => resolve(result));
return;
}
const process = async (response: StreamFetchResponse) => {
// Do not allow hanging unresponsive connections to linger:
const cancel = setTimeout(() => {
this.log.warn("Stream fetch timed out, falling back to RPC");
this.stream?.cancel();
this.stream = undefined;
fallback().then((result) => resolve(result));
}, this.connectionTimeout);

if (!this.stream) {
clearTimeout(cancel);
resolve(err(new HubError("unavailable", "unexpected stream termination")));
this.log.warn("Stream unavailable, falling back to RPC");
fallback().then((result) => resolve(result));
return;
}
this.stream.off("data", process);
Expand Down

0 comments on commit 07aaf85

Please sign in to comment.