Skip to content

Commit

Permalink
rename bq api user flows
Browse files Browse the repository at this point in the history
  • Loading branch information
yashgorana committed Sep 26, 2024
1 parent 2ca51cf commit 1418756
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 25 deletions.
34 changes: 23 additions & 11 deletions tests/scenariosv2/flows/user_bigquery_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# relative
from ..sim.core import SimulatorContext

__all__ = ["bq_test_query", "bq_submit_query", "bq_submit_query_results"]
__all__ = ["bq_test_query", "bq_submit_query", "bq_check_query_results"]


def query_sql():
Expand All @@ -26,39 +26,51 @@ def query_sql():


def bq_test_query(ctx: SimulatorContext, client: sy.DatasiteClient):
ctx.logger.info(
f"User: {client.logged_in_user} - Calling client.api.bigquery.test_query (mock)"
)
user = client.logged_in_user
ctx.logger.info(f"User: {user} - Calling client.api.bigquery.test_query (mock)")
res = client.api.bigquery.test_query(sql_query=query_sql())
assert len(res) == 10000
ctx.logger.info(f"User: {client.logged_in_user} - Received {len(res)} rows")
ctx.logger.info(f"User: {user} - Received {len(res)} rows")
return res


def bq_submit_query(ctx: SimulatorContext, client: sy.DatasiteClient):
user = client.logged_in_user
# Randomly define a func_name a function to call
func_name = "invalid_func" if random.random() < 0.5 else "test_query"

ctx.logger.info(
f"User: {client.logged_in_user} - Calling client.api.services.bigquery.submit_query func_name={func_name}"
f"User: {user} - Calling client.api.services.bigquery.submit_query func_name={func_name}"
)
res = client.api.bigquery.submit_query(
func_name=func_name,
query=query_sql(),
)
ctx.logger.info(f"User: {client.logged_in_user} - Received {res}")
assert isinstance(res, sy.SyftSuccess), res
ctx.logger.info(f"User: {user} - Received {res}")
return res


def bq_submit_query_results(ctx: SimulatorContext, client: sy.DatasiteClient):
def bq_check_query_results(ctx: SimulatorContext, client: sy.DatasiteClient):
user = client.logged_in_user

for request in client.requests:
if request.get_status() == RequestStatus.APPROVED:
status = request.get_status()

if status == RequestStatus.APPROVED:
job = request.code(blocking=False)
result = job.wait()
assert len(result) == 10000
if request.get_status() == RequestStatus.REJECTED:
ctx.logger.info(
f"User: {client.logged_in_user} - Request rejected {request.code.service_func_name}"
f"User: {user} - {request.code.service_func_name} - Request approved"
)
elif status == RequestStatus.REJECTED:
ctx.logger.info(
f"User: {user} - {request.code.service_func_name} - Request rejected"
)
else:
ctx.logger.info(
f"User: {user} - {request.code.service_func_name} - Request pending"
)

return True
18 changes: 9 additions & 9 deletions tests/scenariosv2/l0_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

# relative
from .flows.user_bigquery_api import bq_submit_query
from .flows.user_bigquery_api import bq_submit_query_results
from .flows.user_bigquery_api import bq_check_query_results
from .flows.user_bigquery_api import bq_test_query
from .sim.core import BaseEvent
from .sim.core import Simulator
Expand Down Expand Up @@ -66,16 +66,16 @@ class Event(BaseEvent):
wait_for=Event.ADMIN_LOW_SIDE_ENDPOINTS_AVAILABLE,
trigger=Event.USER_CAN_QUERY_TEST_ENDPOINT,
)
async def user_query_test_endpoint(ctx: SimulatorContext, client: sy.DatasiteClient):
async def user_bq_test_query(ctx: SimulatorContext, client: sy.DatasiteClient):
"""Run query on test endpoint"""
await asyncio.to_thread(bq_test_query, ctx, client)


@sim_activity(
wait_for=Event.USER_CAN_QUERY_TEST_ENDPOINT,
wait_for=Event.ADMIN_LOW_SIDE_ENDPOINTS_AVAILABLE,
trigger=Event.USER_CAN_SUBMIT_QUERY,
)
async def user_bq_submit(ctx: SimulatorContext, client: sy.DatasiteClient):
async def user_bq_submit_query(ctx: SimulatorContext, client: sy.DatasiteClient):
"""Submit query to be run on private data"""
await asyncio.to_thread(bq_submit_query, ctx, client)

Expand All @@ -84,8 +84,8 @@ async def user_bq_submit(ctx: SimulatorContext, client: sy.DatasiteClient):
wait_for=Event.ADMIN_LOW_ALL_RESULTS_AVAILABLE,
trigger=Event.USER_CHECKED_RESULTS,
)
async def user_checks_results(ctx: SimulatorContext, client: sy.DatasiteClient):
await asyncio.to_thread(bq_submit_query_results, ctx, client)
async def user_bq_results(ctx: SimulatorContext, client: sy.DatasiteClient):
await asyncio.to_thread(bq_check_query_results, ctx, client)


@sim_activity(wait_for=Event.GUEST_USERS_CREATED, trigger=Event.USER_FLOW_COMPLETED)
Expand All @@ -98,9 +98,9 @@ async def user_flow(ctx: SimulatorContext, server_url_low: str, user: dict):
ctx.logger.info(f"User: {client.logged_in_user} - logged in")

# this must be executed sequentially.
await user_query_test_endpoint(ctx, client)
await user_bq_submit(ctx, client)
await user_checks_results(ctx, client)
await user_bq_test_query(ctx, client)
await user_bq_submit_query(ctx, client)
await user_bq_results(ctx, client)


# ------------------------------------------------------------------------------------------------
Expand Down
10 changes: 5 additions & 5 deletions tests/scenariosv2/l2_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,19 @@ async def admin_flow(
],
trigger=Event.USER_CAN_QUERY_TEST_ENDPOINT,
)
async def user_query_test_endpoint(ctx: SimulatorContext, client: sy.DatasiteClient):
async def user_bq_test_query(ctx: SimulatorContext, client: sy.DatasiteClient):
"""Run query on test endpoint"""
await asyncio.to_thread(bq_test_query, ctx, client)


@sim_activity(
wait_for=[
Event.USER_CAN_QUERY_TEST_ENDPOINT,
Event.ADMIN_ALL_ENDPOINTS_CREATED,
Event.ADMIN_HIGHSIDE_WORKER_POOL_CREATED,
],
trigger=Event.USER_CAN_SUBMIT_QUERY,
)
async def user_bq_submit(ctx: SimulatorContext, client: sy.DatasiteClient):
async def user_bq_submit_query(ctx: SimulatorContext, client: sy.DatasiteClient):
"""Submit query to be run on private data"""
await asyncio.to_thread(bq_submit_query, ctx, client)

Expand All @@ -101,8 +101,8 @@ async def user_flow(ctx: SimulatorContext, server_url: str, user: dict):
)
ctx.logger.info(f"User: {client.logged_in_user} - logged in")

await user_query_test_endpoint(ctx, client)
await user_bq_submit(ctx, client)
await user_bq_test_query(ctx, client)
await user_bq_submit_query(ctx, client)


# ---------------------------------- test ----------------------------------
Expand Down

0 comments on commit 1418756

Please sign in to comment.