{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Distributed Synchronous Value Iteration\n",
"\n",
"\n",
"The goal of this assignment is to implement both single-core and distributed versions of syncronous value iteration (VI). In particuar, VI will be applied to Markov Decision Processes (MDPs) in order to compute policies that optimize expected infinite horizon discounted cummulative reward. \n",
"\n",
"The relevant content about MDPs and VI are in the following course notes from CS533. \n",
"\n",
"https://oregonstate.instructure.com/courses/1719746/files/74716197/download?wrap=1\n",
"https://oregonstate.instructure.com/courses/1719746/files/74828408/download?wrap=1\n",
"\n",
"\n",
"### Synchronous Value Iteration Recap\n",
"\n",
"Below is a review of the synchronous value iteration algorithm. The algorithm is iterative and each iteration produces a newly updated value function $V_{new}$ based on the value function from the previous iteration $V_{curr}$. This is done by applying the Bellman backup operator to $V_{curr}$ at each state. That is, \n",
"\begin{equation}\n",
"V_{new}(s) = \max_{a\in A} R(s,a) + \beta \sum_{s'\in S} T(s,a,s') V_{curr}(s')\n",
"\end{equation}\n",
"where $\beta \in [0,1)$ is the discount factor, $R$ is the reward function, and $T$ is the transition function. \n",
"\n",
"The algorithm also maintains the greedy policy $\pi$ at each iteration, which is based on a one-step look ahead operator: \n",
"\begin{equation}\n",
"\pi_{curr}(s) = \arg\max_{a\in A} R(s,a) + \beta \sum_{s'\in S} T(s,a,s') V_{curr}(s')\n",
"\end{equation}\n",
"\n",
"After an update we define the Bellman error of that iteration as $\max_s |V_{new}(s)-V_{curr}(s)|$. In the notes, it is shown that this error allows us to bound the difference between the value function of $\pi_{curr}$ and the optimal value function $V^{}$. Thus, a typical stopping condition for VI is to iterate until the Bellman error is below a specified threshold python\n", "\n", "\"\"\"\n", "\n", " +---------------+\n", " | |\n", " | Main Process |------------------------------------\n", " | | |\n", " | | |\n", " +---------------+ |\n", " | |\n", " | |\n", " | |\n", " | |\n", " | |\n", " +---Re-init Worker-----+-------------------+-----Re-init Worker---+ Check\n", " | | | | Coverage\n", "+-----------+ +-----------+ +-----------+ +-----------+ Iteratively\n", "| | | | | | | | |\n", "| Worker | | Worker | | Worker | | Worker | |\n", "| (env) | | (env) | | (env) | | (env) | | \n", "| | | | | | | | |\n", "+-----------+ +-----------+ +-----------+ +-----------+ |\n", " ^ ^ ^ ^ |\n", " | | | | |\n", " +------ One-Value ----+---------+---------+----- One-Value -----+ |\n", " | |\n", " | |\n", " +----------------+ | \n", " | | | \n", " | Value Server |------------------------------------- \n", " | | \n", " +----------------+\n", "\n", "\"\"\"\n", "\n", "
\n",
"\n",
"A key part of this implementation is the Value Server, which is a Ray actor that workers interface with to update the value function at each iteration. In order to avoid \n",
"\n",
"You need to complete the following code by adding the Bellman backup operator to it. Once you implemented the function, run the following cell to test it and to store the value and policy matrices to file. Note that this implementation should replicate the results of the non-distributed version of synchronous value iteration. \n",
"\n",
"Importantly you should see that this version is significantly slower than the above non-distributed version. Think about why this might be the case. "
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": [
"@ray.remote\n",
"class VI_server_v1(object):\n",
" def init(self, size):\n",
" self.v_current = [0] * size\n",
" self.pi = [0] * size\n",
" self.v_new = [0] * size\n",
" \n",
" def get_value_and_policy(self):\n",
" return self.v_current, self.pi\n",
" \n",
" def update(self, update_index, update_v, update_pi):\n",
" self.v_new[update_index] = update_v\n",
" self.pi[update_index] = update_pi\n",
" \n",
" def get_error_and_update(self):\n",
" max_error = 0\n",
" for i in range(len(self.v_current)):\n",
" error = abs(self.v_new[i] - self.v_current[i])\n",
" if error > max_error:\n",
" max_error = error\n",
" self.v_current[i] = self.v_new[i]\n",
" \n",
" return max_error\n",
" \n",
"@ray.remote\n",
"def VI_worker_v1(VI_server, data, worker_id, update_state):\n",
" env, workers_num, beta, epsilon = data\n",
" A = env.GetActionSpace()\n",
" S = env.GetStateSpace()\n",
" \n",
" # get shared variable \n",
" V, _ = ray.get(VI_server.get_value_and_policy.remote())\n",
" \n",
" # bellman backup\n",
" \n",
" #INSERT YOUR CODE HERE\n",
" max_v = float('-inf')\n",
" max_a = 0\n",
" for action in range(A):\n",
" sum_r = 0\n",
" for state_next, prob in env.GetSuccessors(update_state, action):\n",
" if state_next == update_state:\n",
" continue\n",
" # ∑s′∈ST(s,a,s′)V(s′)\n",
" sum_r += prob * v[state_next]\n",
"\n",
" # Vnew(s)=maxa∈AR(s,a)+β∑s′∈ST(s,a,s′)V(s′)\n",
" v_a = env.GetReward(update_state, action) + beta * sum_r \n",
"\n",
" if v_a > max_v:\n",
" max_a = action\n",
" max_v = v_a\n",
" \n",
" VI_server.update.remote(update_state, max_v, max_a)\n",
" \n",
" # return ith worker\n",
" return worker_id\n",
" \n",
"def sync_value_iteration_distributed_v1(env, beta = 0.999, epsilon = 0.01, workers_num = 4, stop_steps = 2000):\n",
" S = env.GetStateSpace()\n",
" VI_server = VI_server_v1.remote(S)\n",
" workers_list = []\n",
" data_id = ray.put((env, workers_num, beta, epsilon))\n",
" \n",
" start = 0\n",
" # start the all worker, store their id in a list\n",
" for i in range(workers_num):\n",
" w_id = VI_worker_v1.remote(VI_server, data_id, i, start)\n",
" workers_list.append(w_id)\n",
" start += 1\n",
" \n",
" error = float('inf')\n",
" while error > epsilon:\n",
" for update_state in range(start, S):\n",
" # Wait for one worker finishing, get its reuslt, and delete it from list\n",
" finished_worker_id = ray.wait(workers_list, num_returns = 1, timeout = None)[0][0]\n",
" finish_worker = ray.get(finished_worker_id)\n",
" workers_list.remove(finished_worker_id)\n",
"\n",
" # start a new worker, and add it to the list\n",
" w_id = VI_worker_v1.remote(VI_server, data_id, finish_worker, update_state)\n",
" workers_list.append(w_id)\n",
" start = 0\n",
" error = ray.get(VI_server.get_error_and_update.remote())\n",
"\n",
" v, pi = ray.get(VI_server.get_value_and_policy.remote())\n",
" return v, pi"
]
},
{
"cell_type": "code",
"execution_count": NULL,
"metadata": {},
"outputs": [],
"source": [
"beta = 0.999\n",
"env = FrozenLakeEnv(desc = MAP[0], is_slippery = True)\n",
"print("Game Map:")\n",
"env.render()\n",
"\n",
"start_time = time.time()\n",
"v, pi = sync_value_iteration_distributed_v1(env, beta = beta, workers_num = 4)\n",
"v_np, pi_np = np.array(v), np.array(pi)\n",
"end_time = time.time()\n",
"run_time['Sync distributed v1'] = end_time - start_time\n",
"print("time:", run_time['Sync distributed v1'])\n",
"\n",
"print_results(v, pi, map_size, env, beta, 'dist_vi_v1')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Distributed Synchronous Value Iteration -- Version 2\n",
"\n",
"One way to improve the above approach is to create a limited number of workers and have each worker perform backups on a batch of states. Effectively, this approach partitions the state space and uses a worker to handle each state subset of the partition. The following diagram demonstrates the architecture of such a system.\n",
"\n",
"\n",
"python\n", "\n", "\"\"\"\n", "\n", " +---------------+\n", " | |\n", " | Main Process |------------------------------------\n", " | | |\n", " | | |\n", " +---------------+ |\n", " | |\n", " | |\n", " | |\n", " | |\n", " | |\n", " +---Re-init Worker-----+-------------------+-----Re-init Worker---+ Check\n", " | | | | Coverage\n", "+-----------+ +-----------+ +-----------+ +-----------+ Iteratively\n", "| | | | | | | | |\n", "| Worker | | Worker | | Worker | | Worker | |\n", "| (env) | | (env) | | (env) | | (env) | | \n", "| | | | | | | | |\n", "+-----------+ +-----------+ +-----------+ +-----------+ |\n", " ^ ^ ^ ^ |\n", " | | | | |\n", " +---- Batch-Value ----+---------+---------+---- Batch-Value ----+ |\n", " | |\n", " | |\n", " +----------------+ | \n", " | | | \n", " | Value Server |------------------------------------- \n", " | | \n", " +----------------+\n", "\n", "\"\"\"\n", "\n", "
\n",
"In this section, you should implement the idea described above.\n",
"- Partition the states into batches. The number of batches should be equal to the number of the workers.\n",
"- Create workers to handle each batch and run them\n",
"- Terminate the workers once the error is less than the given epsilon\n",
"\n",
"Again, this implementation should exactly emulate the result of each iteration of non-distributed value iteration. "
]
},
{
"cell_type": "code",
"execution_count": NULL,
"metadata": {},
"outputs": [],
"source": [
"@ray.remote\n",
"class VI_server_v2(object):\n",
" #INSERT YOUR CODE HERE\n",
" def init(self,size):\n",
" self.v_current = [0] * size\n",
" self.pi = [0] * size\n",
" self.v_new = [0] * size\n",
" \n",
" def get_value_and_policy(self):\n",
" return self.v_current, self.pi\n",
" \n",
" def update(self, update_index, update_v, update_pi):\n",
" self.v_new[update_index] = update_v\n",
" self.pi[update_index] = update_pi\n",
" \n",
" def get_error_and_update(self):\n",
" max_error = 0\n",
" for i in range(len(self.v_current)):\n",
" error = abs(self.v_new[i] - self.v_current[i])\n",
" if error > max_error:\n",
" max_error = error\n",
" self.v_current[i] = self.v_new[i]\n",
" \n",
" return max_error\n",
" \n",
"@ray.remote\n",
"def VI_worker_v2(VI_server, data, start_state, end_state):\n",
" env, workers_num, beta, epsilon = data\n",
" A = env.GetActionSpace()\n",
" S = env.GetStateSpace()\n",
" \n",
" #INSERT YOUR CODE HERE\n",
" # get shared variable \n",
" V, _ = ray.get(VI_server.get_value_and_policy.remote())\n",
" \n",
" # bellman backup\n",
" \n",
" max_v = float('-inf')\n",
" max_a = 0\n",
" for action in range(A):\n",
" sum_r = 0\n",
" for state_next, prob in env.GetSuccessors(update_state, action):\n",
" if state_next == update_state:\n",
" continue\n",
" # ∑s′∈ST(s,a,s′)V(s′)\n",
" sum_r += prob * v[state_next]\n",
"\n",
" # Vnew(s)=maxa∈AR(s,a)+β∑s′∈ST(s,a,s′)V(s′)\n",
" v_a = env.GetReward(update_state, action) + beta * sum_r \n",
"\n",
" if v_a > max_v:\n",
" max_a = action\n",
" max_v = v_a\n",
" \n",
" VI_server.update.remote(update_state, max_v, max_a)\n",
" \n",
" # return ith worker\n",
" return worker_id\n",
" \n",
"def sync_value_iteration_distributed_v2(env, beta = 0.999, epsilon = 0.01, workers_num = 4, stop_steps = 2000):\n",
" S = env.GetStateSpace()\n",
" VI_server = VI_server_v2.remote(S)\n",
" workers_list = []\n",
" data_id = ray.put((env, workers_num, beta, epsilon))\n",
" #INSERT YOUR CODE HERE\n",
" start = 0\n",
" # start the all worker, store their id in a list\n",
" for i in range(workers_num):\n",
" w_id = VI_worker_v1.remote(VI_server, data_id, i, start)\n",
" workers_list.append(w_id)\n",
" start += 1\n",
"\n",
" error = float('inf')\n",
" while error > epsilon:\n",
" #INSERT YOUR CODE HERE\n",
" for update_state in range(start, S):\n",
" # Wait for one worker finishing, get its reuslt, and delete it from list\n",
" finished_worker_id = ray.wait(workers_list, num_returns = 1, timeout = None)[0][0]\n",
" finish_worker = ray.get(finished_worker_id)\n",
" workers_list.remove(finished_worker_id)\n",
"\n",
" # start a new worker, and add it to the list\n",
" w_id = VI_worker_v1.remote(VI_server, data_id, finish_worker, update_state)\n",
" workers_list.append(w_id)\n",
" start = 0\n",
" error = ray.get(VI_server.get_error_and_update.remote())\n",
"\n",
" v, pi = ray.get(VI_server.get_value_and_policy.remote()) \n",
" return v, pi"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Run the following code to see the running time of your code. This code stores the policy and state values to disk."
]
},
{
"cell_type": "code",
"execution_count": NULL,
"metadata": {},
"outputs": [],
"source": [
"beta = 0.999\n",
"env = FrozenLakeEnv(desc = MAP[0], is_slippery = True)\n",
"print("Game Map:")\n",
"env.render()\n",
"\n",
"start_time = time.time()\n",
"v, pi = sync_value_iteration_distributed_v2(env, beta = beta, workers_num = 4)\n",
"v_np, pi_np = np.array(v), np.array(pi)\n",
"end_time = time.time()\n",
"run_time['Sync distributed v2'] = end_time - start_time\n",
"print("time:", run_time['Sync distributed v2'])\n",
"print_results(v, pi, map_size, env, beta, 'dist_vi_v2')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Comparison of different approachs\n",
"\n",
"Run the following cell to compare the running time of different approaches. "
]
},
{
"cell_type": "code",
"execution_count": NULL,
"metadata": {},
"outputs": [],
"source": [
"from copy import deepcopy\n",
"temp_dict = deepcopy(run_time)\n",
"print("All:")\n",
"for _ in range(len(temp_dict)):\n",
" min_v = float('inf')\n",
" for k, v in temp_dict.items():\n",
" if v is None:\n",
" continue\n",
" if v < min_v:\n",
" min_v = v\n",
" name = k\n",
" temp_dict[name] = float('inf')\n",
" print(name + ": " + str(min_v))\n",
" print()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Report\n",
"Write a report that includes the following:\n",
"- A plot that shows the running time of the above 4 approaches agianst the map sizes f 8, 16 and 32. \n",
"- A plot that shows the running time of both distributed approaches against the number of the workers with 2, 4 and 8 workers.\n",
"- Breifly explain why the second distributed method is faster than the first one?\n",
"- Compere the best distributed method with the best non-distributed appraoch. Which one is better? Briefly explain why."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Distributed Synchronous VI Competition\n",
"In this part, you should design and implement your own distributed synchronous VI method based on what you have learned in the previous parts. Your implementation has the following constraints:\n",
"- It must terminate and return a value function (and corresponding greedy policy) that satisfies the specified Bellman error threshold\n",
"- It must be iterative in the sense that it produces the same sequence of value functions as non-distributed synchronous value iteration\n",
"\n",
"For this part, you should create a stand alone python file named competition.py
. You can copy the needed functions from this notebook to your file. Your code should contain a main function called fast_value_iteration
with the following exact signature: \n",
"\n",
"def fast_value_iteration(env, beta = 0.999, epsilon = 0.01, workers_num = 4)
\n",
"\n",
"Here epsilon is the Bellman error threshold and worker_num is the maximum number of workers. This function should return policy and value vectors that satsify the Bellman error constraint. \n",
"\n",
"To test your code, you should use an exclusive compution node of DevCloud. You can use the qsub -I -lselect=1
command to connect to a computation node and run your code on it. We may test your programs on problems as large as 100x100 FrozenLake environments. \n",
"\n",
"Some possible ideas to consider\n",
"\n",
"- How should the number of workers be selected and how should states be partitioned across workers?\n",
"- Are there alternative protocols between the server and workers?\n",
"- Where are the communication bottlenecks in the system and how might they be improved? "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Deliverables\n",
"\n",
"Submit a zip file to Canvas that contains:\n",
"- completed version of this notebook\n",
"- the .pkl files generated by print_results function for your runs on map of size 8x8\n",
"- a python file for distributed VI competition\n",
"- your PDF report file"
]
},
{
"cell_type": "code",
"execution_count": NULL,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (Intel, 2019 update 2)",
"language": "python",
"name": "c009-intel_distribution_of_python_3_2019u2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.8"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
-
Notifications
You must be signed in to change notification settings - Fork 1
huazhong28/github
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
About
No description, website, or topics provided.
Resources
Stars
Watchers
Forks
Releases
No releases published
Packages 0
No packages published