Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OTP, InfoSys, and the Wolfram API #12

Open
wants to merge 7 commits into
base: 09-channels
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ TEST_DATABASE_URL=postgres://phoenix-app:s3cR3+@db:5431/phoenix_in_docker_test
POSTGRES_USER=phoenix-app
POSTGRES_PASSWORD=s3cR3+
POSTGRES_PORT=5431
WOLFRAM_APP_ID=your-app-id
26 changes: 19 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
# Rumbl

create `annotation` model
Try the OTP counter

```bash
$ dc exec app \
mix phoenix.gen.model Annotation annotations \
body:text \
at:integer \
user_id:references:users \
video_id:references:videos
$ dc run app \
iex -S mix
iex> alias Rumbl.Counter
iex> {:ok, c} = Counter.start_link(0)
iex> Counter.inc(c)
iex> Counter.inc(c)
iex> Counter.val(c)
iex> Counter.dec(c)
iex> Counter.val(c)
```

Query Wolfram|Alpha API

```bash
$ dc run app \
iex -S mix
iex> Rumbl.InfoSys.compute("what is elixir?")
iex> flush()
```
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ services:
- APP_PORT=${APP_PORT}
- DATABASE_URL=${DATABASE_URL}
- TEST_DATABASE_URL=${TEST_DATABASE_URL}
- WOLFRAM_APP_ID=${WOLFRAM_APP_ID}
depends_on:
- db
db:
Expand Down
14 changes: 3 additions & 11 deletions lib/rumbl.ex
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
defmodule Rumbl do
use Application

# See http://elixir-lang.org/docs/stable/elixir/Application.html
# for more information on OTP Applications
def start(_type, _args) do
import Supervisor.Spec

# Define workers and child supervisors to be supervised
children = [
# Start the Ecto repository
supervisor(Rumbl.Repo, []),
# Start the endpoint when the application starts
supervisor(Rumbl.Endpoint, [])
# Start your own worker by calling: Rumbl.Worker.start_link(arg1, arg2, arg3)
# worker(Rumbl.Worker, [arg1, arg2, arg3]),
supervisor(Rumbl.InfoSys.Supervisor, []),
supervisor(Rumbl.Endpoint, []),
]

# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Rumbl.Supervisor]
opts = [strategy: :one_for_all, name: Rumbl.Supervisor]
Supervisor.start_link(children, opts)
end

Expand Down
40 changes: 40 additions & 0 deletions lib/rumbl/counter.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
defmodule Rumbl.Counter do
use GenServer

def inc(pid), do: GenServer.cast(pid, :inc)

def dec(pid), do: GenServer.cast(pid, :dec)

def val(pid) do
GenServer.call(pid, :val)
end

def start_link(initial_val) do
GenServer.start_link(__MODULE__, initial_val)
end

def init(initial_val) do
Process.send_after(self(), :tick, 1000)
{:ok, initial_val}
end

def handle_info(:tick, val) when val <= 0, do: raise "boom!"

def handle_info(:tick, val) do
IO.puts("tick #{val}")
Process.send_after(self(), :tick, 1000)
{:noreply, val - 1}
end

def handle_cast(:inc, val) do
{:noreply, val + 1}
end

def handle_cast(:dec, val) do
{:noreply, val - 1}
end

def handle_call(:val, _from, val) do
{:reply, val, val}
end
end
71 changes: 71 additions & 0 deletions lib/rumbl/info_sys.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
defmodule Rumbl.InfoSys do
@backends [Rumbl.InfoSys.Wolfram]

defmodule Result do
defstruct score: 0, text: nil, url: nil, backend: nil
end

def start_link(backend, query, query_ref, owner, limit) do
backend.start_link(query, query_ref, owner, limit)
end

def compute(query, opts \\ []) do
limit = opts[:limit] || 10
backends = opts[:backends] || @backends

backends
|> Enum.map(&spawn_query(&1, query, limit))
|> await_results(opts)
|> Enum.sort(&(&1.score >= &2.score))
|> Enum.take(limit)
end

defp spawn_query(backend, query, limit) do
query_ref = make_ref()
opts = [backend, query, query_ref, self(), limit]
{:ok, pid} = Supervisor.start_child(Rumbl.InfoSys.Supervisor, opts)
monitor_ref = Process.monitor(pid)
{pid, monitor_ref, query_ref}
end

defp await_results(children, opts) do
timeout = opts[:timeout] || 5000
timer = Process.send_after(self(), :timedout, timeout)
results = await_result(children, [], :infinity)
cleanup(timer)
results
end

defp await_result([head|tail], acc, timeout) do
{pid, monitor_ref, query_ref} = head

receive do
{:results, ^query_ref, results} ->
Process.demonitor(monitor_ref, [:flush])
await_result(tail, results ++ acc, timeout)
{:DOWN, ^monitor_ref, :process, ^pid, _reason} ->
await_result(tail, acc, timeout)
:timedout ->
kill(pid, monitor_ref)
await_result(tail, acc, 0)
end
end

defp await_result([], acc, _) do
acc
end

defp kill(pid, ref) do
Process.demonitor(ref, [:flush])
Process.exit(pid, :kill)
end

defp cleanup(timer) do
:erlang.cancel_timer(timer)
receive do
:timedout -> :ok
after
0 -> :ok
end
end
end
15 changes: 15 additions & 0 deletions lib/rumbl/info_sys/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule Rumbl.InfoSys.Supervisor do
use Supervisor

def start_link() do
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end

def init(_opts) do
children = [
worker(Rumbl.InfoSys, [], restart: :temporary)
]

supervise(children, strategy: :simple_one_for_one)
end
end
36 changes: 36 additions & 0 deletions lib/rumbl/info_sys/wolfram.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
defmodule Rumbl.InfoSys.Wolfram do
import SweetXml
alias Rumbl.InfoSys.Result

def start_link(query, query_ref, owner, limit) do
Task.start_link(__MODULE__, :fetch, [query, query_ref, owner, limit])
end

def fetch(query_str, query_ref, owner, _limit) do
query_str
|> fetch_xml()
|> xpath(~x"/queryresult/pod[contains(@title, 'Result') or contains(@title, 'Definitions')]/subpod/plaintext/text()")
|> send_results(query_ref, owner)
end

defp send_results(nil, query_ref, owner) do
send(owner, {:results, query_ref, []})
end

defp send_results(answer, query_ref, owner) do
results = [%Result{backend: "wolfram", score: 95, text: to_string(answer)}]
send(owner, {:results, query_ref, results})
end

defp fetch_xml(query_str) do
{:ok, {_, _, body}} = :httpc.request(
String.to_char_list("http://api.wolframalpha.com/v2/query" <>
"?appid=#{appid()}" <>
"&input=#{URI.encode(query_str)}" <>
"&format=plaintext")
)
body
end

defp appid, do: System.get_env("WOLFRAM_APP_ID")
end
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ defmodule Rumbl.Mixfile do
{:phoenix_live_reload, "~> 1.0", only: :dev},
{:gettext, "~> 0.11.0"},
{:cowboy, "~> 1.0"},
{:comeonin, "~> 2.0"}
{:comeonin, "~> 2.0"},
{:sweet_xml, "~> 0.5.0"}
]
end

Expand Down
3 changes: 2 additions & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@
"poison": {:hex, :poison, "2.2.0", "4763b69a8a77bd77d26f477d196428b741261a761257ff1cf92753a0d4d24a63", [:mix], [], "hexpm", "519bc209e4433961284174c497c8524c001e285b79bdf80212b47a1f898084cc"},
"poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"},
"postgrex": {:hex, :postgrex, "0.11.2", "139755c1359d3c5c6d6e8b1ea72556d39e2746f61c6ddfb442813c91f53487e8", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 1.0-rc", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: false]}], "hexpm", "3c24cb3cca8e0a73d8f56b7b1db161a94a460891f451247581cdff1e425316ef"},
"ranch": {:hex, :ranch, "1.3.2", "e4965a144dc9fbe70e5c077c65e73c57165416a901bd02ea899cfd95aa890986", [:rebar3], [], "hexpm", "6e56493a862433fccc3aca3025c946d6720d8eedf6e3e6fb911952a7071c357f"}}
"ranch": {:hex, :ranch, "1.3.2", "e4965a144dc9fbe70e5c077c65e73c57165416a901bd02ea899cfd95aa890986", [:rebar3], [], "hexpm", "6e56493a862433fccc3aca3025c946d6720d8eedf6e3e6fb911952a7071c357f"},
"sweet_xml": {:hex, :sweet_xml, "0.5.1", "b7568bba29224cec444a25827fcfa11c8c22886fda8389d0c32cbae3c5bc259f", [:mix], [], "hexpm", "3266dedc5e2e6c6b1c5b8a088504a58980632727803de22a5a276da847ea6947"}}
11 changes: 11 additions & 0 deletions notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,14 @@ update dependencies
$ dc exec app \
mix deps.update --all
```

create `annotation` model

```bash
$ dc exec app \
mix phoenix.gen.model Annotation annotations \
body:text \
at:integer \
user_id:references:users \
video_id:references:videos
```
41 changes: 40 additions & 1 deletion priv/repo/seeds.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,47 @@

alias Rumbl.Repo
alias Rumbl.Category
alias Rumbl.User
alias Rumbl.Video

# create categories
categories = ~w(Action Drama Romance Comedy Sci-fi Technology)
for category <- categories do
Repo.get_by(Category, name: category) || Repo.insert!(%Category{name: category})
Repo.get_by(Category, name: category) ||
Repo.insert!(%Category{name: category})
end

# create app user
Repo.get_by(User, username: "rumbl-app") ||
Repo.insert!(%User{name: "Rumbl Application", username: "rumbl-app"})

# create wolfram user
Repo.get_by(User, username: "wolfram") ||
Repo.insert!(%User{name: "Wolfram|Alpha API", username: "wolfram"})

# create videos
videos = [
%{
url: "https://www.youtube.com/watch?v=xrIjfIjssLE",
title: "Erlang: The Movie",
description: "Demo of the Erlang programming language from Ericsson."
},
%{
url: "https://www.youtube.com/watch?v=lxYFOM3UJzo",
title: "Elixir: The Documentary",
description: "Explore the origins of Elixir."
},
%{
url: "https://www.youtube.com/watch?v=u21S_vq5CTw",
title: "Phoenix Takes Flight (ElixirConf EU 2015)",
description: "Chris McCord's keynote on Phoenix."
}
]

for video_attrs <- videos do
changeset = Repo.get_by!(User, username: "rumbl-app")
|> Ecto.build_assoc(:videos)
|> Video.changeset(video_attrs)

Repo.get_by(Video, url: video_attrs.url) || Repo.insert!(changeset)
end
31 changes: 24 additions & 7 deletions web/channels/video_channel.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
defmodule Rumbl.VideoChannel do
require Logger
use Rumbl.Web, :channel
alias Rumbl.AnnotationView

Expand Down Expand Up @@ -31,17 +32,33 @@ defmodule Rumbl.VideoChannel do

case Repo.insert(changeset) do
{:ok, annotation} ->
broadcast!(socket, "new_annotation", %{
id: annotation.id,
user: Rumbl.UserView.render("user.json", %{user: user}),
body: annotation.body,
at: annotation.at
})
# broadcast!(socket, "new_annotation", AnnotationView.render("annotation.json", annotation))
broadcast_annotation(socket, annotation)
Task.start_link(fn -> compute_additional_info(annotation, socket) end)
{:reply, :ok, socket}
{:error, changeset} ->
{:reply, {:error, %{errors: changeset}}, socket}
end
end

defp broadcast_annotation(socket, annotation) do
annotation = Repo.preload(annotation, :user)
annotation_json = Phoenix.View.render(AnnotationView, "annotation.json", %{annotation: annotation})
broadcast!(socket, "new_annotation", annotation_json)
end

defp compute_additional_info(annotation, socket) do
for result <- Rumbl.InfoSys.compute(annotation.body, limit: 1, timeout: 10_000) do
attrs = %{url: result.url, body: result.text, at: annotation.at}
info_changeset = Repo.get_by(Rumbl.User, username: result.backend)
|> build_assoc(:annotations, video_id: annotation.video_id)
|> Rumbl.Annotation.changeset(attrs)

case Repo.insert(info_changeset) do
{:ok, info_annotation} -> broadcast_annotation(socket, info_annotation)
{:error, _changeset} ->
Logger.error("Could not insert annotation: #{inspect(info_changeset)}")
:ignore
end
end
end
end
2 changes: 1 addition & 1 deletion web/controllers/auth.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ defmodule Rumbl.Auth do
user = repo.get_by(Rumbl.User, username: username)

cond do
user && checkpw(password, user.password_hash) ->
user && user.password_hash && checkpw(password, user.password_hash) ->
{:ok, login(conn, user)}
user ->
{:error, :unauthorized, conn}
Expand Down