Skip to content

Commit

Permalink
Merge branch 'development' into 'master'
Browse files Browse the repository at this point in the history
Development

See merge request b650/Deep-Lynx!333
  • Loading branch information
DnOberon committed Aug 31, 2022
2 parents 03b3cd5 + 6aeb6fc commit 6d6be23
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 69 deletions.
4 changes: 4 additions & 0 deletions AdminWebApp/src/api/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,10 @@ export class Client {
return this.get<string[]>(`/containers/${containerID}/users/${userID}/roles`);
}

removeAllUserRoles(containerID: string, userID: string): Promise<boolean> {
return this.delete(`/containers/${containerID}/users/${userID}/roles`);
}

retrieveTypeMapping(containerID: string, dataSourceID: string, typeMappingID: string): Promise<TypeMappingT> {
return this.get<TypeMappingT>(`/containers/${containerID}/import/datasources/${dataSourceID}/mappings/${typeMappingID}`);
}
Expand Down
1 change: 1 addition & 0 deletions AdminWebApp/src/auth/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export type UserT = {
id: string;
role: string;
identity_provider_id: string;
identity_provider: string;
display_name: string;
Expand Down
33 changes: 33 additions & 0 deletions AdminWebApp/src/views/ContainerUsers.vue
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,28 @@
<invite-user-to-container-dialog :containerID="containerID" @userInvited="flashSuccess"></invite-user-to-container-dialog>
</v-toolbar>
</template>
<template v-slot:[`item.role`]="{ item }">
<div v-if="$store.getters.activeContainer.created_by === item.id">Owner</div>
<div v-else>{{retrieveUserRole(item)}} {{item.role}}</div>
</template>

<template v-slot:[`item.actions`]="{ item }">
<v-icon
v-if="$store.getters.activeContainer.created_by !== item.id || item.id !== $auth.CurrentUser().id"
small
class="mr-2"
@click="editUser(item)"
>
mdi-pencil
</v-icon>
<v-icon
v-if="$store.getters.activeContainer.created_by !== item.id || item.id !== $auth.CurrentUser().id"
small
class="mr-2"
@click="deleteUser(item)"
>
mdi-account-multiple-minus
</v-icon>
</template>
</v-data-table>

Expand Down Expand Up @@ -108,6 +122,7 @@
return [
{ text: this.$t("containerUsers.name"), value: 'display_name' },
{ text: this.$t("containerUsers.email"), value: 'email'},
{ text: this.$t("containerUsers.role"), value: 'role'},
{ text: this.$t("containerUsers.actions"), value: 'actions', sortable: false }
]
}
Expand All @@ -134,6 +149,16 @@
}
}
retrieveUserRole(user: UserT) {
this.$client.retrieveUserRoles(this.containerID, user.id)
.then(roles => {
if(roles.length > 0) {
user.role = roles[0]
}
})
.catch(e => this.errorMessage = e)
}
retrieveUserRoles(user: UserT) {
if(this.toEdit) {
Expand All @@ -153,6 +178,14 @@
this.retrieveUserRoles(user)
}
deleteUser(user: UserT) {
this.$client.removeAllUserRoles(this.containerID, user.id!)
.then(() => {
this.refreshUsers();
})
.catch(e => this.errorMessage = e)
}
flashSuccess(){
this.inviteSuccess = this.$t('containerUsers.successfullyInvited') as string
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export default class EdgeQueueItemMapper extends Mapper {
});
}

public async SetNextAttemptAt(id: string, nextRunDate: Date, error?: string): Promise<Result<boolean>> {
public async SetNextAttemptAt(id: string, nextRunDate: Date | string, error?: string): Promise<Result<boolean>> {
return super.runStatement(this.setNextRunAtStatement(id, nextRunDate, error));
}

Expand All @@ -73,7 +73,7 @@ export default class EdgeQueueItemMapper extends Mapper {
return format(text, values);
}

private setNextRunAtStatement(id: string, nextAttemptDate: Date, error?: string): QueryConfig {
private setNextRunAtStatement(id: string, nextAttemptDate: Date | string, error?: string): QueryConfig {
if (error) {
return {
text: `UPDATE edge_queue_items SET next_attempt_at = $2, attempts = attempts + 1, error = $3 WHERE id = $1`,
Expand Down Expand Up @@ -102,6 +102,6 @@ export default class EdgeQueueItemMapper extends Mapper {
}

public needRetriedStreamingStatement(): string {
return `SElECT * FROM edge_queue_items WHERE next_attempt_at < NOW()`;
return `SElECT * FROM edge_queue_items WHERE next_attempt_at < NOW() AT TIME ZONE 'utc'`;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -323,15 +323,15 @@ export default class UserRepository extends Repository implements RepositoryInte
return Promise.resolve(Result.Success(await Authorization.AssignRole(payload.user_id!, payload.role_name!, payload.container_id)));
}

async removeAllRoles(user: User, domain: string): Promise<Result<boolean>> {
async removeAllRoles(user: User, userID: string, domain: string): Promise<Result<boolean>> {
// generally the route authorization methods in http_server would handle
// authentication, but I've found that in a few places we need this additional
// check as the route might not have all the information needed to make a
// permissions check when removing roles
const authed = await Authorization.AuthUser(user, 'write', 'users');
const authed = await Authorization.AuthUser(user, 'write', 'users', domain);
if (!authed) return Promise.resolve(Result.Error(ErrorUnauthorized));

const deleted = await Authorization.DeleteAllRoles(user.id!, domain);
const deleted = await Authorization.DeleteAllRoles(userID, domain);

return Promise.resolve(Result.Success(deleted));
}
Expand Down Expand Up @@ -471,11 +471,12 @@ export default class UserRepository extends Repository implements RepositoryInte
);
}

const containerUsers = await this.listServiceUsersForContainer(containerID)
if(containerUsers.isError) return Promise.resolve(Result.Pass(containerUsers))
const containerUsers = await this.listServiceUsersForContainer(containerID);
if (containerUsers.isError) return Promise.resolve(Result.Pass(containerUsers));
else {
const found = containerUsers.value.find(user => user.id === userID)
if(!found) return Promise.resolve(Result.Failure('unable to set permissions for service user, service user does not belong to proposed container'))
const found = containerUsers.value.find((user) => user.id === userID);
if (!found)
return Promise.resolve(Result.Failure('unable to set permissions for service user, service user does not belong to proposed container'));
}

return await permissionSet.writePermissions(userID, containerID);
Expand Down
11 changes: 8 additions & 3 deletions src/data_processing/edge_inserter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,18 @@ export async function InsertEdge(edgeQueueItem: EdgeQueueItem): Promise<Result<b

// if we failed, need to iterate the attempts and set the next attempt date, so we don't swamp the database - this
// is an exponential backoff
edgeQueueItem.next_attempt_at.setSeconds(
edgeQueueItem.next_attempt_at.getSeconds() + Math.pow(Config.edge_insertion_backoff_multiplier, edgeQueueItem.attempts++),
const currentTime = new Date().getTime();

edgeQueueItem.next_attempt_at = new Date(
currentTime + (edgeQueueItem.next_attempt_at.getSeconds() + Math.pow(Config.edge_insertion_backoff_multiplier, edgeQueueItem.attempts++)) * 1000,
);

const set = await queueMapper.SetNextAttemptAt(edgeQueueItem.id!, edgeQueueItem.next_attempt_at, inserted.error?.error);
const set = await queueMapper.SetNextAttemptAt(edgeQueueItem.id!, edgeQueueItem.next_attempt_at.toISOString(), inserted.error?.error);
if (set.isError) {
Logger.debug(`unable to set next retry time for edge queue item ${set.error?.error}`);
}

await Cache.del(`edge_insertion_${edgeQueueItem.id}`);
return Promise.resolve(Result.Failure(`unable to save edge ${inserted.error?.error}`));
}

Expand Down Expand Up @@ -75,9 +78,11 @@ export async function InsertEdge(edgeQueueItem: EdgeQueueItem): Promise<Result<b
await mapper.rollbackTransaction(transaction.value);
Logger.debug(`unable to delete edge queue item: ${deleted.error?.error}`);

await Cache.del(`edge_insertion_${edgeQueueItem.id}`);
return Promise.resolve(Result.Failure(`unable to delete edge queue item ${deleted.error?.error}`));
}

await mapper.completeTransaction(transaction.value);
await Cache.del(`edge_insertion_${edgeQueueItem.id}`);
return Promise.resolve(Result.Success(true));
}
3 changes: 2 additions & 1 deletion src/http_server/routes/access_management/oauth_routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const keyRepo = new KeyPairRepository();
const oauthRepo = new OAuthRepository();
import DOMPurify from 'isomorphic-dompurify';
import Result from '../../../common_classes/result';
import Logger from '../../../services/logger';

/*
OAuthRoutes contain all routes pertaining to oauth application management and
Expand Down Expand Up @@ -407,7 +408,7 @@ export default class OAuthRoutes {
return res.redirect(req.query.redirect_uri as string);
}

return res.redirect('/oauth');
res.redirect('/oauth');
}

private static login(req: Request, res: Response, next: NextFunction) {
Expand Down
16 changes: 16 additions & 0 deletions src/http_server/routes/access_management/user_routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export default class UserRoutes {

app.post('/containers/:containerID/users/roles', ...middleware, authInContainer('write', 'users'), this.assignRole);
app.get('/containers/:containerID/users/:userID/roles', ...middleware, authInContainer('read', 'users'), this.listUserRoles);
app.delete('/containers/:containerID/users/:userID/roles', ...middleware, authInContainer('read', 'users'), this.removeAllUserRoles);

app.post('/containers/:containerID/users/invite', ...middleware, authInContainer('write', 'users'), this.inviteUserToContainer);
app.get('/containers/:containerID/users/invite', ...middleware, authInContainer('read', 'users'), this.listInvitedUsers);
Expand Down Expand Up @@ -191,6 +192,21 @@ export default class UserRoutes {
}
}

private static removeAllUserRoles(req: Request, res: Response, next: NextFunction) {
if (req.routeUser && req.container && req.currentUser) {
userRepo
.removeAllRoles(req.currentUser, req.routeUser.id!, req.container.id!)
.then((result) => {
result.asResponse(res);
})
.catch((err) => Result.Error(err).asResponse(res))
.finally(() => next());
} else {
Result.Failure('user not found', 404).asResponse(res);
next();
}
}

private static listUserPermissions(req: Request, res: Response, next: NextFunction) {
if (req.currentUser) {
userRepo
Expand Down
1 change: 1 addition & 0 deletions src/http_server/views/login.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
</div>
</div>
<div class='w-100 px-4'>
<input type="submit" style="display: none" />
<button type='submit' class='btn btn-lg btn-custom mb-2 btn-block'>Login</button>
</div>
{{/unless}}
Expand Down
54 changes: 31 additions & 23 deletions src/jobs/edge_queue_emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import QueryStream from 'pg-query-stream';
import Logger from '../services/logger';
import Cache from '../services/cache/cache';
import Config from '../services/config';
import {plainToClass, classToPlain} from 'class-transformer';
import {plainToClass, classToPlain, instanceToPlain, plainToInstance} from 'class-transformer';
import {parentPort} from 'worker_threads';
import EdgeQueueItemMapper from '../data_access_layer/mappers/data_warehouse/data/edge_queue_item_mapper';
import {EdgeQueueItem} from '../domain_objects/data_warehouse/data/edge';
Expand All @@ -26,29 +26,35 @@ void postgresAdapter
const emitter = () => {
void postgresAdapter.Pool.connect((err, client, done) => {
const stream = client.query(new QueryStream(mapper.needRetriedStreamingStatement()));
const promises: Promise<boolean>[] = [];
const putPromises: Promise<boolean>[] = [];

stream.on('data', (data) => {
const item = plainToClass(EdgeQueueItem, data as object);
const item = plainToInstance(EdgeQueueItem, data as object);

// check to see if the edge queue item is in the cache, indicating that there is a high probability that
// this message is already in the queue and either is being processed or waiting to be processed
Cache.get(`edge_insertion_${item.id}`)
.then((set) => {
if (!set) {
// if the item isn't the cache, we can go ahead and queue data
putPromises.push(queue.Put(Config.edge_insertion_queue, classToPlain(item)));
}
})
// if we error out we need to go ahead and queue this message anyway, just so we're not dropping
// data
.catch((e) => {
Logger.error(`error reading from cache for staging emitter ${e}`);
putPromises.push(queue.Put(Config.edge_insertion_queue, classToPlain(item)));
})
.finally(() => {
void Cache.set(`edge_insertion_${item.id}`, {}, Config.initial_import_cache_ttl);
});
promises.push(
new Promise((resolve) => {
Cache.get(`edge_insertion_${item.id}`)
.then((set) => {
if (!set) {
// if the item isn't the cache, we can go ahead and queue data
putPromises.push(queue.Put(Config.edge_insertion_queue, instanceToPlain(item)));
}
})
// if we error out we need to go ahead and queue this message anyway, just so we're not dropping
// data
.catch((e) => {
Logger.error(`error reading from cache for staging emitter ${e}`);
putPromises.push(queue.Put(Config.edge_insertion_queue, instanceToPlain(item)));
})
.finally(() => {
void Cache.set(`edge_insertion_${item.id}`, {}, Config.initial_import_cache_ttl);
resolve(true);
});
}),
);
});

stream.on('error', (e: Error) => {
Expand All @@ -62,11 +68,13 @@ void postgresAdapter
stream.on('end', () => {
done();

Promise.all(putPromises).finally(() => {
if (parentPort) parentPort.postMessage('done');
else {
process.exit(0);
}
Promise.all(promises).finally(() => {
Promise.all(putPromises).finally(() => {
if (parentPort) parentPort.postMessage('done');
else {
process.exit(0);
}
});
});
});

Expand Down
Loading

0 comments on commit 6d6be23

Please sign in to comment.