Skip to content

Commit

Permalink
lazily initialize lite client (#524)
Browse files Browse the repository at this point in the history
* lazily initialize lite client

* remove console log

* Create dull-lobsters-hope.md
  • Loading branch information
conico974 authored Oct 2, 2024
1 parent 8bd6523 commit 6b894df
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 128 deletions.
5 changes: 5 additions & 0 deletions .changeset/dull-lobsters-hope.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@opennextjs/aws": patch
---

lazily initialize lite client
60 changes: 29 additions & 31 deletions packages/open-next/src/cache/incremental/s3-lite.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */
import { AwsClient } from "aws4fetch";
import path from "path";
import { IgnorableError, RecoverableError } from "utils/error";
Expand All @@ -7,24 +8,33 @@ import { parseNumberFromEnv } from "../../adapters/util";
import { Extension } from "../next-types";
import { IncrementalCache } from "./types";

const {
CACHE_BUCKET_REGION,
CACHE_BUCKET_KEY_PREFIX,
NEXT_BUILD_ID,
CACHE_BUCKET_NAME,
} = process.env;
let awsClient: AwsClient | null = null;

const awsClient = new AwsClient({
accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
sessionToken: process.env.AWS_SESSION_TOKEN,
region: CACHE_BUCKET_REGION,
retries: parseNumberFromEnv(process.env.AWS_SDK_S3_MAX_ATTEMPTS),
});
const getAwsClient = () => {
const { CACHE_BUCKET_REGION } = process.env;
if (awsClient) {
return awsClient;
} else {
awsClient = new AwsClient({
accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
sessionToken: process.env.AWS_SESSION_TOKEN,
region: CACHE_BUCKET_REGION,
retries: parseNumberFromEnv(process.env.AWS_SDK_S3_MAX_ATTEMPTS),
});
return awsClient;
}
};

const awsFetch = customFetchClient(awsClient);
const awsFetch = async (key: string, options: RequestInit) => {
const { CACHE_BUCKET_REGION, CACHE_BUCKET_NAME } = process.env;
const client = getAwsClient();
const url = `https://${CACHE_BUCKET_NAME}.s3.${CACHE_BUCKET_REGION}.amazonaws.com/${key}`;
return customFetchClient(client)(url, options);
};

function buildS3Key(key: string, extension: Extension) {
const { CACHE_BUCKET_KEY_PREFIX, NEXT_BUILD_ID } = process.env;
return path.posix.join(
CACHE_BUCKET_KEY_PREFIX ?? "",
extension === "fetch" ? "__fetch" : "",
Expand All @@ -36,10 +46,7 @@ function buildS3Key(key: string, extension: Extension) {
const incrementalCache: IncrementalCache = {
async get(key, isFetch) {
const result = await awsFetch(
`https://${CACHE_BUCKET_NAME}.s3.${CACHE_BUCKET_REGION}.amazonaws.com/${buildS3Key(
key,
isFetch ? "fetch" : "cache",
)}`,
buildS3Key(key, isFetch ? "fetch" : "cache"),
{
method: "GET",
},
Expand All @@ -61,10 +68,7 @@ const incrementalCache: IncrementalCache = {
},
async set(key, value, isFetch): Promise<void> {
const response = await awsFetch(
`https://${CACHE_BUCKET_NAME}.s3.${CACHE_BUCKET_REGION}.amazonaws.com/${buildS3Key(
key,
isFetch ? "fetch" : "cache",
)}`,
buildS3Key(key, isFetch ? "fetch" : "cache"),
{
method: "PUT",
body: JSON.stringify(value),
Expand All @@ -75,15 +79,9 @@ const incrementalCache: IncrementalCache = {
}
},
async delete(key): Promise<void> {
const response = await awsFetch(
`https://${CACHE_BUCKET_NAME}.s3.${CACHE_BUCKET_REGION}.amazonaws.com/${buildS3Key(
key,
"cache",
)}`,
{
method: "DELETE",
},
);
const response = await awsFetch(buildS3Key(key, "cache"), {
method: "DELETE",
});
if (response.status !== 204) {
throw new RecoverableError(`Failed to delete cache: ${response.status}`);
}
Expand Down
150 changes: 76 additions & 74 deletions packages/open-next/src/cache/tag/dynamodb-lite.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */
import { AwsClient } from "aws4fetch";
import path from "path";
import { RecoverableError } from "utils/error";
Expand All @@ -11,18 +12,46 @@ import {
} from "./constants";
import { TagCache } from "./types";

const { CACHE_BUCKET_REGION, CACHE_DYNAMO_TABLE, NEXT_BUILD_ID } = process.env;
let awsClient: AwsClient | null = null;

const awsClient = new AwsClient({
accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
sessionToken: process.env.AWS_SESSION_TOKEN,
region: CACHE_BUCKET_REGION,
retries: parseNumberFromEnv(process.env.AWS_SDK_S3_MAX_ATTEMPTS),
});
const awsFetch = customFetchClient(awsClient);
const getAwsClient = () => {
const { CACHE_BUCKET_REGION } = process.env;
if (awsClient) {
return awsClient;
} else {
awsClient = new AwsClient({
accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
sessionToken: process.env.AWS_SESSION_TOKEN,
region: CACHE_BUCKET_REGION,
retries: parseNumberFromEnv(process.env.AWS_SDK_S3_MAX_ATTEMPTS),
});
return awsClient;
}
};
const awsFetch = (
body: RequestInit["body"],
type: "query" | "batchWrite" = "query",
) => {
const { CACHE_BUCKET_REGION } = process.env;
const client = getAwsClient();
return customFetchClient(client)(
`https://dynamodb.${CACHE_BUCKET_REGION}.amazonaws.com`,
{
method: "POST",
headers: {
"Content-Type": "application/x-amz-json-1.0",
"X-Amz-Target": `DynamoDB_20120810.${
type === "query" ? "Query" : "BatchWriteItem"
}`,
},
body,
},
);
};

function buildDynamoKey(key: string) {
const { NEXT_BUILD_ID } = process.env;
// FIXME: We should probably use something else than path.join here
// this could transform some fetch cache key into a valid path
return path.posix.join(NEXT_BUILD_ID ?? "", key);
Expand All @@ -40,26 +69,19 @@ const tagCache: TagCache = {
async getByPath(path) {
try {
if (globalThis.disableDynamoDBCache) return [];
const { CACHE_DYNAMO_TABLE, NEXT_BUILD_ID } = process.env;
const result = await awsFetch(
`https://dynamodb.${CACHE_BUCKET_REGION}.amazonaws.com`,
{
method: "POST",
headers: {
"Content-Type": "application/x-amz-json-1.0",
"X-Amz-Target": "DynamoDB_20120810.Query",
JSON.stringify({
TableName: CACHE_DYNAMO_TABLE,
IndexName: "revalidate",
KeyConditionExpression: "#key = :key",
ExpressionAttributeNames: {
"#key": "path",
},
body: JSON.stringify({
TableName: CACHE_DYNAMO_TABLE,
IndexName: "revalidate",
KeyConditionExpression: "#key = :key",
ExpressionAttributeNames: {
"#key": "path",
},
ExpressionAttributeValues: {
":key": { S: buildDynamoKey(path) },
},
}),
},
ExpressionAttributeValues: {
":key": { S: buildDynamoKey(path) },
},
}),
);
if (result.status !== 200) {
throw new RecoverableError(
Expand All @@ -80,25 +102,18 @@ const tagCache: TagCache = {
async getByTag(tag) {
try {
if (globalThis.disableDynamoDBCache) return [];
const { CACHE_DYNAMO_TABLE, NEXT_BUILD_ID } = process.env;
const result = await awsFetch(
`https://dynamodb.${CACHE_BUCKET_REGION}.amazonaws.com`,
{
method: "POST",
headers: {
"Content-Type": "application/x-amz-json-1.0",
"X-Amz-Target": "DynamoDB_20120810.Query",
JSON.stringify({
TableName: CACHE_DYNAMO_TABLE,
KeyConditionExpression: "#tag = :tag",
ExpressionAttributeNames: {
"#tag": "tag",
},
body: JSON.stringify({
TableName: CACHE_DYNAMO_TABLE,
KeyConditionExpression: "#tag = :tag",
ExpressionAttributeNames: {
"#tag": "tag",
},
ExpressionAttributeValues: {
":tag": { S: buildDynamoKey(tag) },
},
}),
},
ExpressionAttributeValues: {
":tag": { S: buildDynamoKey(tag) },
},
}),
);
if (result.status !== 200) {
throw new RecoverableError(`Failed to get by tag: ${result.status}`);
Expand All @@ -119,29 +134,22 @@ const tagCache: TagCache = {
async getLastModified(key, lastModified) {
try {
if (globalThis.disableDynamoDBCache) return lastModified ?? Date.now();
const { CACHE_DYNAMO_TABLE } = process.env;
const result = await awsFetch(
`https://dynamodb.${CACHE_BUCKET_REGION}.amazonaws.com`,
{
method: "POST",
headers: {
"Content-Type": "application/x-amz-json-1.0",
"X-Amz-Target": "DynamoDB_20120810.Query",
JSON.stringify({
TableName: CACHE_DYNAMO_TABLE,
IndexName: "revalidate",
KeyConditionExpression:
"#key = :key AND #revalidatedAt > :lastModified",
ExpressionAttributeNames: {
"#key": "path",
"#revalidatedAt": "revalidatedAt",
},
body: JSON.stringify({
TableName: CACHE_DYNAMO_TABLE,
IndexName: "revalidate",
KeyConditionExpression:
"#key = :key AND #revalidatedAt > :lastModified",
ExpressionAttributeNames: {
"#key": "path",
"#revalidatedAt": "revalidatedAt",
},
ExpressionAttributeValues: {
":key": { S: buildDynamoKey(key) },
":lastModified": { N: String(lastModified ?? 0) },
},
}),
},
ExpressionAttributeValues: {
":key": { S: buildDynamoKey(key) },
":lastModified": { N: String(lastModified ?? 0) },
},
}),
);
if (result.status !== 200) {
throw new RecoverableError(
Expand All @@ -159,6 +167,7 @@ const tagCache: TagCache = {
},
async writeTags(tags) {
try {
const { CACHE_DYNAMO_TABLE } = process.env;
if (globalThis.disableDynamoDBCache) return;
const dataChunks = chunk(tags, MAX_DYNAMO_BATCH_WRITE_ITEM_COUNT).map(
(Items) => ({
Expand All @@ -181,15 +190,8 @@ const tagCache: TagCache = {
await Promise.all(
paramsChunk.map(async (params) => {
const response = await awsFetch(
`https://dynamodb.${CACHE_BUCKET_REGION}.amazonaws.com`,
{
method: "POST",
headers: {
"Content-Type": "application/x-amz-json-1.0",
"X-Amz-Target": "DynamoDB_20120810.BatchWriteItem",
},
body: JSON.stringify(params),
},
JSON.stringify(params),
"batchWrite",
);
if (response.status !== 200) {
throw new RecoverableError(
Expand Down
60 changes: 37 additions & 23 deletions packages/open-next/src/queue/sqs-lite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,48 @@ import { customFetchClient } from "utils/fetch";
import { error } from "../adapters/logger";
import { Queue } from "./types";

// Expected environment variables
const { REVALIDATION_QUEUE_REGION, REVALIDATION_QUEUE_URL } = process.env;
let awsClient: AwsClient | null = null;

const awsClient = new AwsClient({
accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
sessionToken: process.env.AWS_SESSION_TOKEN,
region: REVALIDATION_QUEUE_REGION,
});
const awsFetch = customFetchClient(awsClient);
const getAwsClient = () => {
if (awsClient) {
return awsClient;
} else {
awsClient = new AwsClient({
accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
sessionToken: process.env.AWS_SESSION_TOKEN,
region: process.env.REVALIDATION_QUEUE_REGION,
});
return awsClient;
}
};

const awsFetch = (body: RequestInit["body"]) => {
const { REVALIDATION_QUEUE_REGION } = process.env;
const client = getAwsClient();
return customFetchClient(client)(
`https://sqs.${REVALIDATION_QUEUE_REGION ?? "us-east-1"}.amazonaws.com`,
{
method: "POST",
headers: {
"Content-Type": "application/x-amz-json-1.0",
"X-Amz-Target": "AmazonSQS.SendMessage",
},
body,
},
);
};
const queue: Queue = {
send: async ({ MessageBody, MessageDeduplicationId, MessageGroupId }) => {
try {
const { REVALIDATION_QUEUE_URL } = process.env;
const result = await awsFetch(
`https://sqs.${REVALIDATION_QUEUE_REGION ?? "us-east-1"}.amazonaws.com`,
{
method: "POST",
headers: {
"Content-Type": "application/x-amz-json-1.0",
"X-Amz-Target": "AmazonSQS.SendMessage",
},
body: JSON.stringify({
QueueUrl: REVALIDATION_QUEUE_URL,
MessageBody: JSON.stringify(MessageBody),
MessageDeduplicationId,
MessageGroupId,
}),
},
JSON.stringify({
QueueUrl: REVALIDATION_QUEUE_URL,
MessageBody: JSON.stringify(MessageBody),
MessageDeduplicationId,
MessageGroupId,
}),
);
if (result.status !== 200) {
throw new RecoverableError(`Failed to send message: ${result.status}`);
Expand Down

0 comments on commit 6b894df

Please sign in to comment.