Skip to content

Commit

Permalink
ft: TCP backend
Browse files Browse the repository at this point in the history
  • Loading branch information
hauleth committed May 20, 2024
1 parent 3e7bbc0 commit 04b3d60
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 1 deletion.
38 changes: 38 additions & 0 deletions lib/logflare/backends/adaptor/tcp_adaptor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule Logflare.Backends.Adaptor.TCPAdaptor do
use TypedStruct

alias Logflare.Backends.Adaptor.TCPAdaptor.Pool
alias Logflare.Backends.Adaptor.TCPAdaptor.Syslog

@behaviour Logflare.Backends.Adaptor

typedstruct enforce: true do
field(:tls, boolean())
field(:host, String.t())
field(:port, non_neg_integer())
end

@impl true
def start_link({_source, backend}) do
Pool.start_link(backend.config)
end

@impl true
def cast_config(_params) do
end

@impl true
def validate_config(changeset) do
changeset
end

@impl true
def ingest(pool, log_events, _opts) do
content = Enum.map(log_events, &Syslog.format(&1, []))

Pool.send(pool, content)
end

@impl true
def execute_query(_id, _query), do: {:error, :not_implemented}
end
51 changes: 51 additions & 0 deletions lib/logflare/backends/adaptor/tcp_adaptor/pool.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
defmodule Logflare.Backends.Adaptor.TCPAdaptor.Pool do
@behaviour NimblePool

def start_link(config) do
NimblePool.start_link(worker: {__MODULE__, config})
end

def send(pool, message) do
NimblePool.checkout!(pool, :send, fn _from, socket ->
result = :gen_tcp.send(socket, message)

{result, result}
end)
end

@impl NimblePool
def init_worker(%{host: host, port: port} = state) do
this = self()

async = fn ->
{:ok, socket} =
:gen_tcp.connect(to_charlist(host), port,
mode: :binary,
nodelay: true
)

:gen_tcp.controlling_process(socket, this)

socket
end

{:async, async, state}
end

@impl NimblePool
def handle_checkout(_command, _from, socket, state) do
{:ok, socket, socket, state}
end

@impl NimblePool
# Ignore any data sent over the socket
def handle_info({:tcp, socket, _}, socket),
do: {:ok, socket}

def handle_info({:tcp_closed, socket}, socket),
do: {:remove, "connection closed"}

def handle_info(_other, socket) do
{:ok, socket}
end
end
78 changes: 78 additions & 0 deletions lib/logflare/backends/adaptor/tcp_adaptor/syslog.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
defmodule Logflare.Backends.Adaptor.TCPAdaptor.Syslog do
@moduledoc """
Implementation of [RFC5424 The Syslog Protocol][]
This implementation uses *Octet Counting* framing from [RFC6587][]
[RFC5424]: https://www.rfc-editor.org/rfc/rfc5424
[RFC6587]: https://www.rfc-editor.org/rfc/rfc6587#section-3.4.1
"""

alias Logflare.LogEvent

# TODO: Change it to real value
@pen 2137

def format(%LogEvent{} = le, options) do
body = [
header(le, options),
" ",
structured_data(le, options),
" ",
Jason.encode!(le.body),
"\n"
]

len = IO.iodata_length(body)

[Integer.to_string(len), " ", body]
end

defp header(%LogEvent{} = le, options) do
level = to_level(le.body["level"] || le.body["metadata"]["level"])
facility = options[:facility] || 16

[
# Level and facility
"<#{facility * 8 + level}>1 ",
NaiveDateTime.to_iso8601(le.ingested_at),
# XXX: Unknown hostname?
" -",
" ",
le.source.name,
# Unknown procname
" -",
" ",
Ecto.UUID.cast!(le.id)
]
end

defp structured_data(%LogEvent{} = le, _options) do
[
"[source@#{@pen} name=#{inspect(le.source.name)}]"
]
end

@levels Map.new(
Enum.with_index(~w[emergency alert critical error warning notice informational debug])
)
@shorhands %{
"emer" => @levels["emergency"],
"crit" => @levels["critical"],
"err" => @levels["error"],
"warn" => @levels["warning"],
"info" => @levels["informational"]
}

@default @levels["notice"]

defp to_level(level) when level in 0..7, do: level

defp to_level(str) when is_binary(str) do
str = String.downcase(str)
# Unquote there to force compile time evaluation
@levels[str] || @shorhands[str] || @default
end

defp to_level(_), do: @default
end
2 changes: 1 addition & 1 deletion lib/logflare/backends/backend.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule Logflare.Backends.Backend do
alias Logflare.User
alias Logflare.Rule

@adaptor_types [:bigquery, :webhook, :postgres, :datadog, :elastic]
@adaptor_types [:bigquery, :webhook, :postgres, :datadog, :elastic, :tcp]

typed_schema "backends" do
field(:name, :string)
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ defmodule Logflare.Mixfile do
{:swoosh, "~> 0.23"},
{:ex_twilio, "~> 0.8.1"},
{:tesla, "~> 1.6"},
{:nimble_pool, "~> 1.1"},

# Concurrency and pipelines
{:broadway, "~> 1.0.6"},
Expand Down
80 changes: 80 additions & 0 deletions test/logflare/backends/adaptor/tcp_adaptor_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
defmodule Logflare.Backends.Adaptor.TCPAdaptorTest do
use Logflare.DataCase

@subject Logflare.Backends.Adaptor.TCPAdaptor

doctest @subject

setup do
user = insert(:user)
source = insert(:source, user_id: user.id)

{port, socket} = listen()

backend =
insert(:backend,
type: :tcp,
sources: [source],
config: %{host: "localhost", port: port, tls: false}
)

{:ok, source: source, backend: backend, port: port, socket: socket}
end

test "ingest/3", %{source: source, backend: backend} do
le = build(:log_event, source: source)

{:ok, pid} = @subject.start_link({source, backend})

_ = @subject.ingest(pid, [le], [])

assert_receive {:tcp, _msg}, 5000
end

# Simple TCP server
defp listen do
this = self()
spawn_link(fn ->
{:ok, sock} =
:gen_tcp.listen(0,
mode: :binary,
active: :once
)

{:ok, port} = :inet.port(sock)

send(this, {port, sock})

acceptor(sock, this)
end)

receive do
{port, sock} -> {port, sock}
end
end

defp acceptor(socket, parent) do
{:ok, lsock} = :gen_tcp.accept(socket)
ref = make_ref()

pid = spawn_link(fn ->
receive do
^ref -> server(lsock, parent)
end
end)

:gen_tcp.controlling_process(lsock, pid)
send(pid, ref)

acceptor(socket, parent)
end

defp server(sock, pid) do
receive do
{:tcp_close, ^sock} -> :ok
{:tcp, ^sock, msg} ->
send(pid, {:tcp, msg})
server(sock, pid)
end
end
end

0 comments on commit 04b3d60

Please sign in to comment.