Skip to content

Commit

Permalink
aggregate all Pubsub and improve hook and event functions
Browse files Browse the repository at this point in the history
  • Loading branch information
shahryarjb committed May 24, 2024
1 parent 75d406c commit abde5dc
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 108 deletions.
19 changes: 19 additions & 0 deletions lib/mishka_installer.ex
Original file line number Diff line number Diff line change
@@ -1,2 +1,21 @@
defmodule MishkaInstaller do
def broadcast(channel, status, data, broadcast \\ true) do
if broadcast do
MishkaInstaller.PubSub
|> Phoenix.PubSub.broadcast("mishka:plugin:#{channel}", %{
status: status,
data: data
})
else
:ok
end
end

def subscribe(channel) do
Phoenix.PubSub.subscribe(MishkaInstaller.PubSub, "mishka:plugin:#{channel}")
end

def unsubscribe(channel) do
Phoenix.PubSub.unsubscribe(MishkaInstaller.PubSub, "mishka:plugin:#{channel}")
end
end
79 changes: 60 additions & 19 deletions lib/plugins_management/event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,52 +103,69 @@ defmodule MishkaInstaller.PluginsManagement.Event do
deps_list <- hold_statuses(plg.depends),
{:ok, db_plg} <-
create({:ok, Map.merge(plg, Hook.depends_status(deps_list, plg.status))}),
:ok <- Hook.broadcast(:register, db_plg, broadcast) do
:ok <- MishkaInstaller.broadcast("event", :register, db_plg, broadcast) do
{:ok, :register, db_plg}
end
end

def start(name, event, broadcast) do
with {:ok, data} <- Hook.get_plugin(name: name),
with {:ok, data} <- get(name: name),
:ok <- Hook.plugin_status(data.status),
:ok <- hold_statuses?(data.depends),
plugins when is_list(plugins) <- read(event: event),
mod_plugins <- Enum.reject(plugins, &(&1.status in [:stopped, :held])),
sorted_plugins <- Enum.sort_by(mod_plugins, &{&1.priority, &1.name}),
:ok <- ModuleStateEvent.create(sorted_plugins),
{:ok, db_plg} <- update(:status, :started, data.id),
:ok <- Hook.broadcast(:start, data, broadcast) do
:ok <- MishkaInstaller.broadcast("event", :start, data, broadcast) do
{:ok, :start, db_plg}
end
end

def start(event) do
event
# TODO: start all plugins of an event
end

# TODO: create new module state
# TODO: we need some options like `event: event`, `module: module_name`, extension: extension
# |__ Check we need these options or not in this new version?
def start() do
# TODO: start all plugins of all event
end

def restart(name, broadcast) do
with {:ok, :start, data} <- name.start(false),
{:ok, db_plg} <- update(:status, :restarted, data.id),
:ok <- Hook.broadcast(:restart, db_plg, broadcast) do
:ok <- MishkaInstaller.broadcast("event", :restart, db_plg, broadcast) do
{:ok, :restart, db_plg}
end
end

def restart(event) do
event
# TODO: restart all plugins of an event
end

def restart() do
# TODO: restart all plugins of all event
end

def stop(name, event, broadcast) do
with {:ok, data} <- Hook.get_plugin(name: name),
with {:ok, data} <- get(name: name),
{:ok, db_plg} <- update(:status, :stopped, data.id),
plugins when is_list(plugins) <- read(event: event),
mod_plugins <- Enum.reject(plugins, &(&1.status in [:stopped, :held])),
sorted_plugins <- Enum.sort_by(mod_plugins, &{&1.priority, &1.name}),
:ok <- ModuleStateEvent.create(sorted_plugins),
:ok <- Hook.broadcast(:stop, db_plg, broadcast) do
:ok <- MishkaInstaller.broadcast("event", :stop, db_plg, broadcast) do
{:ok, :stop, db_plg}
end
end

def stop(event) do
event
# TODO: stop all plugins of an event
end

def stop() do
# TODO: stop all plugins of all event
end

Expand All @@ -158,20 +175,49 @@ defmodule MishkaInstaller.PluginsManagement.Event do
mod_plugins <- Enum.reject(plugins, &(&1.status in [:stopped, :held])),
sorted_plugins <- Enum.sort_by(mod_plugins, &{&1.priority, &1.name}),
:ok <- ModuleStateEvent.create(sorted_plugins),
:ok <- Hook.broadcast(:stop, db_plg, broadcast),
:ok <- Phoenix.PubSub.unsubscribe(MishkaInstaller.PubSub, "plugin:hook"),
:ok <- MishkaInstaller.broadcast("event", :stop, db_plg, broadcast),
:ok <- MishkaInstaller.unsubscribe("event"),
:ok <- GenServer.stop(name, :normal) do
{:ok, :unregister, db_plg}
end
end

def delete(name, _broadcast) do
def unregister(event) do
event
# TODO: unregister all plugins of an event
end

def unregister() do
# TODO: unregister all plugins of all event
end

def purge(name, _broadcast) do
with {:ok, :unregister, _db_plg} <- name.unregister(false) do
# TODO: After creating `installation_management` we can delete its project
# TODO: After creating `installation_management` we can purge its project
end
end

def get() do
def purge(event) do
event
# TODO: purge all plugins of an event
end

def purge() do
# TODO: purge all plugins of all event
end

def get(key) do
case read(key) do
{:error, error} ->
{:error, error}

nil ->
message = "This plugin is not available in the database."
{:error, [%{message: message, field: :global, action: :read}]}

data ->
{:ok, data}
end
end

def backup() do
Expand All @@ -183,11 +229,6 @@ defmodule MishkaInstaller.PluginsManagement.Event do
# TODO: restore from json file
end

def purge() do
# TODO: purge all plugins of an event
# TODO: purge all plugins of all event
end

def ensure?() do
end

Expand Down
44 changes: 13 additions & 31 deletions lib/plugins_management/hook.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@ defmodule MishkaInstaller.PluginsManagement.Hook do
Event.unregister(config(:__plugin__), config(:__event__), broadcast)
end

def delete(broadcast \\ true) do
Event.delete(config(:__plugin__), broadcast)
def purge(broadcast \\ true) do
Event.purge(config(:__plugin__), broadcast)
end

def get() do
GenServer.call(__MODULE__, :get)
end

# TODO: we need genserver
Expand All @@ -51,7 +55,7 @@ defmodule MishkaInstaller.PluginsManagement.Hook do
start: 1,
restart: 1,
stop: 1,
delete: 1,
purge: 1,
unregister: 1

def __after_compile__(_env, _bytecode) do
Expand All @@ -69,10 +73,15 @@ defmodule MishkaInstaller.PluginsManagement.Hook do

@impl true
def init(state) do
Phoenix.PubSub.subscribe(MishkaInstaller.PubSub, "plugin:hook")
MishkaInstaller.subscribe("event")
{:ok, state}
end

@impl true
def handle_call(:get, _from, state) do
{:reply, state, state}
end

@impl true
def handle_info(_reason, state) do
{:noreply, state}
Expand Down Expand Up @@ -105,18 +114,6 @@ defmodule MishkaInstaller.PluginsManagement.Hook do
end
end

@doc false
def broadcast(status, data, broadcast, channel \\ "hook") do
if broadcast do
Phoenix.PubSub.broadcast(MishkaInstaller.PubSub, "plugin:#{channel}", %{
status: status,
data: data
})
else
:ok
end
end

@doc false
def depends_status([], status), do: %{status: status}
def depends_status(_deps, _status), do: %{status: :held}
Expand All @@ -128,19 +125,4 @@ defmodule MishkaInstaller.PluginsManagement.Hook do
end

def plugin_status(_status), do: :ok

@doc false
def get_plugin(key) do
case Event.read(key) do
{:error, error} ->
{:error, error}

nil ->
message = "This plugin is not available in the database."
{:error, [%{message: message, field: :global, action: :read}]}

data ->
{:ok, data}
end
end
end
29 changes: 14 additions & 15 deletions lib/processing_pipelines/queue/job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ defmodule MishkaInstaller.ProcessingPipelines.Queue.Job do
@impl true
def handle_continue(:start_jobs, state) do
job_data = Queue.read(state)
job_broadcast(Map.get(job_data || %{}, :worker), "start", job_data)
broadcast_data = %{worker: Keyword.get(state, :worker), meta: job_data}
MishkaInstaller.broadcast("queue:job", :start, broadcast_data)

if !QueueAssistant.is_empty?(Map.get(job_data || %{}, :queued)) and
job_data.status == :started do
Expand Down Expand Up @@ -314,7 +315,8 @@ defmodule MishkaInstaller.ProcessingPipelines.Queue.Job do
"Identifier: #{inspect(worker)} ::: GenServerError: Unexpected message ::: Source: #{inspect(reason)}"
)

job_broadcast(worker, "error", reason)
broadcast_data = %{worker: worker, meta: reason}
MishkaInstaller.broadcast("queue:job", :error, broadcast_data)
{:noreply, apply(worker, :on_failure, [state, reason, worker])}
end

Expand All @@ -340,7 +342,8 @@ defmodule MishkaInstaller.ProcessingPipelines.Queue.Job do
def remove_job(worker) do
worker_pid = List.first(find_worker_pid(worker))
if !is_nil(worker_pid) and is_tuple(worker_pid), do: terminate(elem(worker_pid, 0))
job_broadcast(worker, "remove", nil)
broadcast_data = %{worker: worker, meta: nil}
MishkaInstaller.broadcast("queue:job", :remove, broadcast_data)
Queue.delete(worker: worker)
end

Expand Down Expand Up @@ -370,7 +373,8 @@ defmodule MishkaInstaller.ProcessingPipelines.Queue.Job do
pid
)

job_broadcast(worker, "terminate", result)
broadcast_data = %{worker: worker, meta: result}
MishkaInstaller.broadcast("queue:job", :terminate, broadcast_data)
result

nil ->
Expand All @@ -389,7 +393,8 @@ defmodule MishkaInstaller.ProcessingPipelines.Queue.Job do
{pid, _type} ->
kill_running_queues(worker)
Queue.change_status(:stopped, worker: worker)
job_broadcast(worker, "stop_terminate", nil)
broadcast_data = %{worker: worker, meta: nil}
MishkaInstaller.broadcast("queue:job", :stop_terminate, broadcast_data)
terminate(pid)
end
end
Expand Down Expand Up @@ -439,7 +444,8 @@ defmodule MishkaInstaller.ProcessingPipelines.Queue.Job do

defp run_running_queues([], worker, action) do
Queue.change_status(:no_queue, worker: worker)
job_broadcast(worker, "no_queue", action)
broadcast_data = %{worker: worker, meta: action}
MishkaInstaller.broadcast("queue:job", :no_queue, broadcast_data)

message =
"Unfortunately, There is no running queue to start as a task inside #{worker} worker."
Expand All @@ -448,7 +454,8 @@ defmodule MishkaInstaller.ProcessingPipelines.Queue.Job do
end

defp run_running_queues(running, worker, _action) do
job_broadcast(worker, "perform_queues", running)
broadcast_data = %{worker: worker, meta: running}
MishkaInstaller.broadcast("queue:job", :perform_queues, broadcast_data)

Enum.map(
running,
Expand Down Expand Up @@ -541,12 +548,4 @@ defmodule MishkaInstaller.ProcessingPipelines.Queue.Job do
task -> Process.cancel_timer(task.__timeout__)
end
end

defp job_broadcast(worker, status, data) do
Phoenix.PubSub.broadcast(MishkaInstaller.PubSub, "queue:job", %{
worker: worker,
status: status,
data: data
})
end
end
Loading

0 comments on commit abde5dc

Please sign in to comment.