Skip to content

Commit

Permalink
Fix race problem, fix delete, change logging
Browse files Browse the repository at this point in the history
  • Loading branch information
mszostok committed Mar 7, 2022
1 parent 195c177 commit 7049ba0
Show file tree
Hide file tree
Showing 9 changed files with 5,604 additions and 5,590 deletions.
11,036 changes: 5,525 additions & 5,511 deletions hub-js/package-lock.json

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion hub-js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"@grpc/grpc-js": "^1.3.1",
"@types/lodash": "^4.14.179",
"apollo-server-express": "^3.6.2",
"async-mutex": "^0.3.2",
"express": "^4.17.0",
"graphql": "^15.4.0",
"graphql-tools": "^8.1.0",
Expand All @@ -46,9 +47,9 @@
"@types/express-serve-static-core": "^4.17.19",
"@types/node": "^16.4.13",
"@types/ws": "^7.4.7",
"eslint": "^8.10.0",
"@typescript-eslint/eslint-plugin": "^5.13.0",
"@typescript-eslint/parser": "^5.13.0",
"eslint": "^8.10.0",
"husky": "^4.0.0",
"lint-staged": "^10.5.4",
"npm-force-resolutions": "^0.0.10",
Expand Down
7 changes: 4 additions & 3 deletions hub-js/src/local/mutation/delete-type-instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,23 @@ export async function deleteTypeInstance(
MATCH (typeRef)
WHERE NOT (typeRef)--()
DELETE (typeRef)
RETURN 'remove typeRef'
RETURN count([]) as _tmp0
}
WITH *
CALL {
MATCH (backendRef)
WHERE NOT (backendRef)--()
DELETE (backendRef)
RETURN 'remove backendRef'
RETURN count([]) as _tmp1
}
WITH *
CALL {
OPTIONAL MATCH (attrRef)
WHERE attrRef IS NOT NULL AND NOT (attrRef)--()
DELETE (attrRef)
RETURN 'remove attr'
RETURN count([]) as _tmp2
}
RETURN out`,
Expand All @@ -97,6 +97,7 @@ export async function deleteTypeInstance(
const deleteExternally = new Map<string, any>();
result.records.forEach((record) => {
const out = record.get("out");

if (out.backend.abstract) {
return;
}
Expand Down
2 changes: 1 addition & 1 deletion hub-js/src/local/mutation/update-type-instances.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ function generateConflictError(customErr: CustomCypherErrorOutput) {
function extractUpdateMutationResult(result: QueryResult) {
const data = result.records.map((record) => record.get("typeInstance"));
// handle Integer fields
// @ts-ignore
return _.cloneDeepWith(data, (field) => {
if (neo4j.isInt(field)) {
// See: https://neo4j.com/docs/api/javascript-driver/current/class/src/v1/integer.js~Integer.html
return field.inSafeRange() ? field.toNumber() : field.toString();
}
return
});
}

Expand Down
73 changes: 33 additions & 40 deletions hub-js/src/local/query/spec-value-field.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import { GetInput } from "../storage/service";
import { Context } from "../mutation/context";
import { Operation } from "../storage/update-args-context";
import _ from "lodash";
import { ServiceError } from "@grpc/grpc-js";
import { Status } from "nice-grpc";
import { Mutex } from "async-mutex";

const mutex = new Mutex();

// Represents contract defined on `TypeInstanceResourceVersionSpec.Value` field cypher query.
interface InputObject {
Expand All @@ -25,21 +26,26 @@ export async function getTypeInstanceResourceVersionSpecValueField(
_: any,
context: Context
) {
logger.debug("Executing custom field resolver for 'value' field", obj);
if (obj.abstract) {
logger.debug("Return data stored in built-in storage");
return obj.builtinValue;
}
// This is a field resolver, it can be called multiple times within the same `query/mutation`.
// We also perform, external calls to change the state if needed, due to that fact, we use
// mutex to ensure that we won't call backend multiple times as backends may be not thread safe.
return await mutex.runExclusive(async () => {
logger.debug("Executing custom field resolver for 'value' field", obj);
if (obj.abstract) {
logger.debug("Return data stored in built-in storage");
return obj.builtinValue;
}

switch (context.updateArgs.GetOperation()) {
case Operation.UpdateTypeInstancesMutation:
return await resolveMutationReturnValue(context, obj.fetchInput);
default: {
logger.debug("Return data stored in external storage");
const resp = await context.delegatedStorage.Get(obj.fetchInput);
return resp[obj.fetchInput.typeInstance.id];
switch (context.updateArgs.GetOperation()) {
case Operation.UpdateTypeInstancesMutation:
return await resolveMutationReturnValue(context, obj.fetchInput);
default: {
logger.debug("Return data stored in external storage");
const resp = await context.delegatedStorage.Get(obj.fetchInput);
return resp[obj.fetchInput.typeInstance.id];
}
}
}
});
}

async function resolveMutationReturnValue(
Expand All @@ -59,15 +65,15 @@ async function resolveMutationReturnValue(
// - and/or `resourceVersions` which holds also previous already stored revisions
if (revToResolve <= lastKnownRev) {
logger.debug(
`Fetch data from external storage for already known '${revToResolve}' revision`,
"Fetch data from external storage for already known revision",
fetchInput
);
const resp = await context.delegatedStorage.Get(fetchInput);
return resp[tiId];
}

// If the revision is higher that the last known revision version, it means that we need to store that into deleted
// storage
// If the revision is higher that the last known revision version, it means that we need to store that into delegated
// storage.

// 1. Based on our contract, if user didn't provide value, we need to fetch the old one and put it
// to the new revision.
Expand All @@ -93,28 +99,15 @@ async function resolveMutationReturnValue(
},
};

try {
logger.debug("Storing new value into external storage", update);
await context.delegatedStorage.Update(update);
logger.debug("Storing new value into external storage", update);
await context.delegatedStorage.Update(update);

// 3. Update last known revision, so if `value` resolver is called next time we won't update it once again
// run into `ALREADY_EXISTS` error.
context.updateArgs.SetLastKnownRev(
update.typeInstance.id,
update.typeInstance.newResourceVersion
);

return newValue;
} catch (e) {
const err = e as ServiceError;
if (err.code == Status.ALREADY_EXISTS) {
context.updateArgs.SetLastKnownRev(
update.typeInstance.id,
update.typeInstance.newResourceVersion
);
// 3. Update last known revision, so if `value` resolver is called next time we won't update it once again
// run into `ALREADY_EXISTS` error.
context.updateArgs.SetLastKnownRev(
update.typeInstance.id,
update.typeInstance.newResourceVersion
);

return newValue;
}
throw e;
}
return newValue;
}
55 changes: 30 additions & 25 deletions hub-js/src/local/storage/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,10 @@ export default class DelegatedStorageService {
let mapping: UpdatedContexts = {};

for (const input of inputs) {
logger.debug(
`Storing TypeInstance ${input.typeInstance.id} in external backend ${input.backend.id}`
);
logger.debug("Storing TypeInstance in external backend", {
typeInstanceId: input.typeInstance.id,
backendId: input.backend.id,
});
const cli = await this.getClient(input.backend.id);
if (!cli) {
// TODO: remove after using a real backend in e2e tests.
Expand All @@ -112,9 +113,7 @@ export default class DelegatedStorageService {

const req: OnCreateRequest = {
typeInstanceId: input.typeInstance.id,
value: new TextEncoder().encode(
JSON.stringify(input.typeInstance.value)
),
value: this.encode(input.typeInstance.value),
context: this.encode(input.backend.context),
};
const res = await cli.onCreate(req);
Expand Down Expand Up @@ -144,9 +143,10 @@ export default class DelegatedStorageService {
*/
async Update(...inputs: UpdateInput[]) {
for (const input of inputs) {
logger.debug(
`Updating TypeInstance ${input.typeInstance.id} in external backend ${input.backend.id}`
);
logger.debug("Updating TypeInstance in external backend", {
typeInstanceId: input.typeInstance.id,
backendId: input.backend.id,
});
const cli = await this.getClient(input.backend.id);
if (!cli) {
// TODO: remove after using a real backend in e2e tests.
Expand All @@ -156,9 +156,7 @@ export default class DelegatedStorageService {
const req: OnUpdateRequest = {
typeInstanceId: input.typeInstance.id,
newResourceVersion: input.typeInstance.newResourceVersion,
newValue: new TextEncoder().encode(
JSON.stringify(input.typeInstance.newValue)
),
newValue: this.encode(input.typeInstance.newValue),
context: this.encode(input.backend.context),
};

Expand All @@ -178,9 +176,10 @@ export default class DelegatedStorageService {
let result: UpdatedContexts = {};

for (const input of inputs) {
logger.debug(
`Fetching TypeInstance ${input.typeInstance.id} from external backend ${input.backend.id}`
);
logger.debug("Fetching TypeInstance from external backend", {
typeInstanceId: input.typeInstance.id,
backendId: input.backend.id,
});
const cli = await this.getClient(input.backend.id);
if (!cli) {
// TODO: remove after using a real backend in e2e tests.
Expand Down Expand Up @@ -218,9 +217,10 @@ export default class DelegatedStorageService {
*/
async Delete(...inputs: DeleteInput[]) {
for (const input of inputs) {
logger.debug(
`Deleting TypeInstance ${input.typeInstance.id} from external backend ${input.backend.id}`
);
logger.debug("Deleting TypeInstance from external backend", {
typeInstanceId: input.typeInstance.id,
backendId: input.backend.id,
});
const cli = await this.getClient(input.backend.id);
if (!cli) {
// TODO: remove after using a real backend in e2e tests.
Expand All @@ -243,9 +243,10 @@ export default class DelegatedStorageService {
*/
async Lock(...inputs: LockInput[]) {
for (const input of inputs) {
logger.debug(
`Locking TypeInstance ${input.typeInstance.id} in external backend ${input.backend.id}`
);
logger.debug("Locking TypeInstance in external backend", {
typeInstanceId: input.typeInstance.id,
backendId: input.backend.id,
});
const cli = await this.getClient(input.backend.id);
if (!cli) {
// TODO: remove after using a real backend in e2e tests.
Expand All @@ -269,9 +270,10 @@ export default class DelegatedStorageService {
*/
async Unlock(...inputs: UnlockInput[]) {
for (const input of inputs) {
logger.debug(
`Unlocking TypeInstance ${input.typeInstance.id} in external backend ${input.backend.id}`
);
logger.debug(`Unlocking TypeInstance in external backend`, {
typeInstanceId: input.typeInstance.id,
backendId: input.backend.id,
});
const cli = await this.getClient(input.backend.id);
if (!cli) {
// TODO: remove after using a real backend in e2e tests.
Expand Down Expand Up @@ -339,7 +341,10 @@ export default class DelegatedStorageService {
return undefined;
}

logger.debug(`Initialize gRPC client for Backend ${id} with URL ${url}`);
logger.debug("Initialize gRPC client", {
backend: id,
url,
});
const channel = createChannel(url);
const client: StorageClient = createClient(
StorageBackendDefinition,
Expand Down
2 changes: 1 addition & 1 deletion internal/secret-storage-backend/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ package secretstoragebackend

import tellercore "github.com/spectralops/teller/pkg/core"

func (h *Handler) GetProviderFromContext(contextBytes []byte) (tellercore.Provider, error) {
func (h *Handler) GetProviderFromContext(contextBytes []byte) (tellercore.Provider, []byte, error) {
return h.getProviderFromContext(contextBytes)
}
2 changes: 1 addition & 1 deletion internal/secret-storage-backend/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ func TestHandler_GetProviderFromContext(t *testing.T) {
handler := secret_storage_backend.NewHandler(logger.Noop(), testCase.InputProviders)

// when
provider, err := handler.GetProviderFromContext(testCase.InputContextBytes)
provider, _, err := handler.GetProviderFromContext(testCase.InputContextBytes)

// then
if testCase.ExpectedErrorMessage != nil {
Expand Down
14 changes: 7 additions & 7 deletions pkg/hub/client/local/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestThatShowcaseExternalStorage(t *testing.T) {
defer cleanup()

// SCENARIO - CREATE
_, err := cli.CreateTypeInstances(ctx, &gqllocalapi.CreateTypeInstancesInput{
family, err := cli.CreateTypeInstances(ctx, &gqllocalapi.CreateTypeInstancesInput{
TypeInstances: []*gqllocalapi.CreateTypeInstanceInput{
{
// This TypeInstance:
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestThatShowcaseExternalStorage(t *testing.T) {
defer removeAllMembers(t, cli, familyDetails)

fmt.Print("\n\n======== After create result ============\n\n")
resourcePrinter := cliprinter.NewForResource(os.Stdout, cliprinter.WithTable(typeInstanceDetailsMapper(nil, getDataDirectlyFromStorage(t, srvAddr, familyDetails))))
resourcePrinter := cliprinter.NewForResource(os.Stdout, cliprinter.WithTable(typeInstanceDetailsMapper(family, getDataDirectlyFromStorage(t, srvAddr, familyDetails))))
require.NoError(t, resourcePrinter.Print(familyDetails))

// SCENARIO - UPDATE
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestThatShowcaseExternalStorage(t *testing.T) {
require.NoError(t, err)

fmt.Print("\n\n======== After update result ============\n\n")
resourcePrinter = cliprinter.NewForResource(os.Stdout, cliprinter.WithTable(typeInstanceDetailsMapper(nil, getDataDirectlyFromStorage(t, srvAddr, updatedFamily))))
resourcePrinter = cliprinter.NewForResource(os.Stdout, cliprinter.WithTable(typeInstanceDetailsMapper(family, getDataDirectlyFromStorage(t, srvAddr, updatedFamily))))
require.NoError(t, resourcePrinter.Print(updatedFamily))

// SCENARIO - LOCK
Expand All @@ -201,7 +201,7 @@ func TestThatShowcaseExternalStorage(t *testing.T) {
require.NoError(t, err)

fmt.Print("\n\n======== After locking result ============\n\n")
resourcePrinter = cliprinter.NewForResource(os.Stdout, cliprinter.WithTable(typeInstanceDetailsMapper(nil, getDataDirectlyFromStorage(t, srvAddr, familyDetails))))
resourcePrinter = cliprinter.NewForResource(os.Stdout, cliprinter.WithTable(typeInstanceDetailsMapper(family, getDataDirectlyFromStorage(t, srvAddr, familyDetails))))
require.NoError(t, resourcePrinter.Print(familyDetails))

// SCENARIO - UNLOCK
Expand All @@ -216,11 +216,13 @@ func TestThatShowcaseExternalStorage(t *testing.T) {
require.NoError(t, err)

fmt.Print("\n\n======== After unlocking result ============\n\n")
resourcePrinter = cliprinter.NewForResource(os.Stdout, cliprinter.WithTable(typeInstanceDetailsMapper(nil, getDataDirectlyFromStorage(t, srvAddr, familyDetails))))
resourcePrinter = cliprinter.NewForResource(os.Stdout, cliprinter.WithTable(typeInstanceDetailsMapper(family, getDataDirectlyFromStorage(t, srvAddr, familyDetails))))
require.NoError(t, resourcePrinter.Print(familyDetails))

}

// ======= HELPERS =======

func registerExternalDotenvStorage(ctx context.Context, t *testing.T, cli *Client, srvAddr string) (gqllocalapi.CreateTypeInstanceOutput, func()) {
t.Helper()

Expand Down Expand Up @@ -266,8 +268,6 @@ func fixExternalDotenvStorage(addr string) *gqllocalapi.CreateTypeInstancesInput
}
}

// ======= HELPERS =======

type externalData struct {
Value string
LockedBy *string
Expand Down

0 comments on commit 7049ba0

Please sign in to comment.