-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add a brod process monitor #124
base: main
Are you sure you want to change the base?
Conversation
lib/kafee/process_manager.ex
Outdated
) | ||
|
||
Process.sleep(@restart_delay) | ||
{:noreply, state, {:continue, :start_child}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh, TIL I didn't realize you could call :continue
from a handle_continue
function 🤯
lib/kafee/process_manager.ex
Outdated
{:noreply, state, {:continue, :start_child}} | ||
end | ||
|
||
defp start_child(%{child_spec: child_spec, supervisor: supervisor} = state) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we can just do {:continue, :start_child}
we can probably get rid of this function and make the handle_continue
function more direct.
So, just to be clear on trade offs, this sets the brod client to a temporary process so there will be times it does not exist. This means any sync work (like the sync producer) will error if trying to send a message when the client is down. You will want to use the async producer if you want better handling of errors. Two, there is a very small chance that this process manager process crashes or for some reason doesn't restart the client. I think it's pretty rare (thanks BEAM!) but worth mentioning. Lastly it might be useful to purge some of the brod client logs and attempt to rewrite them to something more helpful. Otherwise my much longer term thought is writing our own Kafka client off the kpro library that brod uses 🤷 Code looks good to me if. Limited testing seems to fix the issue. |
For the first point... that's what I'd expect from the sync producer. We're typically outboxing all kafka messages, or throwing them into a transaction that would ultimately get retried. The second one is more concerning, but I don't currently have an idea for that. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this PR as it's more concise and seems more along the grains of OTP than #123 .
There are some comments / questions surrounding the work section of the new process.
One thing to add for the above comments - I suggest for the possibility of ProcessManager not starting the child:
I think you can have a threshold, or a max retry attempt number that, when reached, would just shutdown ProcessManager.
Relying on it's parent supervisor to restart ProcessManager (it will restart based on the restart strategy Supervisors have) will restart it, and ProcessManager will reset / retry / crash when max is reached. I think this is a good two-phase crash triage loop that could work imho.
lib/kafee/process_manager.ex
Outdated
Process.sleep(@restart_delay) | ||
{:noreply, state, {:continue, :start_child}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about moving it into its own function?
Also - to my knowledge GenServer's way of doing things shouldn't involve Process.sleep()
most of the time if it can help it.
How about perhaps Process.send_after
-> handle_info()
-> runing an abstracted out logic inside handle_continue
?
That way you can use handle_continue()
for init()
.
Alternative way of perhaps shortening the timeout function, with protection against repeated calls to start a child is with the GenServer timeout feature, instead of Process.send_after()
or Process.sleep()
. You'd write less boilerplate handler code.
For example:
defmodule SimpleTimeoutServer do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
@impl true
def init(:ok) do
# third argument here (or the last one after :continue) is the timeout millisec
# If no messages come into the process inbox, the `:timeout` message will emit into the process inbox
# if messages do come in, the timeout message scheduling is canceled
{:ok, 0, 5000}
end
@impl true
def handle_info(:timeout, state) do
IO.puts("Timeout occurred. Current state: #{state}")
# below code would set another timeout to happen
{:noreply, state, 5000}
end
end
Example from wms-service is in the StreamReleaseAgent.
lib/kafee/process_manager.ex
Outdated
@impl GenServer | ||
def handle_info({:DOWN, ref, :process, pid, reason}, %{monitor_ref: ref, child_pid: pid} = state) do | ||
Logger.info("#{@log_prefix} Child process down. Restarting in #{@restart_delay}ms...", | ||
reason: reason |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding the @log_prefix
here can allow for easier filtering by DataDog I think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was the idea, but i've pulled it off. I'll just make a view that handles this.
test/kafee/process_manager_test.exs
Outdated
start: {Agent, :start_link, [fn -> %{} end]} | ||
} | ||
|
||
ProcessManager.start_link(opts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ProcessManager.start_link(opts) | |
assert{:error, {{:badkey, :supervisor}, _}} = ProcessManager.start_link(opts) |
I think that'd be enough and you don't need to check on the assert_receive
.
Checking the value on the return of this function showed:
{:error,
{{:badkey, :supervisor},
[
{:erlang, :map_get,
[
:supervisor,
%{
id: :test_child,
start: {Agent, :start_link,
[#Function<3.102245195/0 in Kafee.ProcessManagerTest."test start_link/1 fails to start without required supervisor option"/1>]}
}
], [error_info: %{module: :erl_erts_errors}]},
{Kafee.ProcessManager, :init, 1,
[file: ~c"lib/kafee/process_manager.ex", line: 28]},
{:gen_server, :init_it, 2, [file: ~c"gen_server.erl", line: 980]},
{:gen_server, :init_it, 6, [file: ~c"gen_server.erl", line: 935]},
{:proc_lib, :init_p_do_apply, 3, [file: ~c"proc_lib.erl", line: 241]}
]}}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested the consumer side - with disconnecting the brod clients (by stopping the kafka docker image), and restarting it.
Expectedly the main
branch version of kafee would just crash the application, but this branch recovered after retrying.
While I like all async adapters using the ProcessManager approach, I have a comment about keeping Producer.SyncAdapter as-is so that we don't introduce any async-like behavior to it.
I can make the change if you agree with the path forward, but it would probably mean switching over to using AsyncAdapter or utilizing some version of siloing blast radius be Supervisors wherever BrodAdapter is used.
restart: :permanent, | ||
shutdown: 500 | ||
} | ||
{Kafee.ProcessManager, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TL;DR - how about we keep the publshing sync adapter as-is? Below is a way to still create robustness via stacking supervisors.
Based on what @btkostner said about the time window where the brod clients won't exist, it seems if we use Kafee.ProcessManager
here the end result would be that SyncAdapter would need to behave like AsyncAdapter, with a temporary queue to capture the message that needs to be published.
Else, we really would lose the message to the ether, and we'd need to at least log it to DataDog in order to track which messages were dropped due to brod client being brought back online.
What if we keep this part as-is? That way, we still hold true to word that this is a synchronous adapter, and if brod client crashes, it means the entire application would have to restart.
Now about the last part about the entire application would have to restart - in Elixir in Action 3rd ed, chapter 8 - Sasa Juric goes over how to "silo" the application with multiple supervisors. This is the diagram that is in the book. Todo.System
is the main supervisor that handles the "domain" of Todo - it's not the application level supervisor, rather it'd be a child to the main application supervisor.

Now if OMS services are similarly structured as WMS's, then it probably would have one main application supervisor handling almost all of the processes.
However, the book goes over creating boundaries of processes based on "timing strategies" / business context.
So if we were to keep SyncAdapter
as-is without the ProcessManager, what we could do is have a topology design such as:
graph TD
A[Application Supervisor]
B[KafkaPublisherSupervisor]
C[KafkaConsumerSupervisor]
D[Kafee.Producer.BrodAdapter]
E[Kafee.Consumer.BrodAdapter]
A --> B
A --> C
B --> D
C --> E
classDef supervisor fill:#006400,stroke:#333,stroke-width:2px;
classDef child fill:#228B22,stroke:#333,stroke-width:2px;
class A,B,C supervisor;
class D,E child;
That way, even if brod client fails, that would only trigger their respective supervisors to restart. We can set custom max restart number that is greater than default 3 for their children, so that they can keep retrying until they give up, at which time the supervisors will crash.
When they crash, the Application's master supervisor will then restart the child supervisors - which will start getting tallying into the supervisor's max restart count.
Therefore if max restart attempt is 3 on the supervisors, and let's say for the chlidren processes of the supervisors, we give 5 max restart attempts. That means the brod clients get 15 restart opportunities. We can choose a robust number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats basically what I was doing in my original approach 😅. I'm on board with it. The point is ultimately that if we're going to move away from elsa
, we need to make sure we're 'as robust' as it is. We can't tolerate the lack of a kafka connection tanking the entire application, so whatever we have to do to get there is 👍.
From an implementation perspective, we use the transactional outbox pattern for all publishing, so i'm not particularly concerned. As a library, we'd ideally want that solved without suggesting that its handled solely in userland by using an outbox 😰 .
Hopefully this makes sense :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that I think about your comment, SyncAdapter probably still would need an outbox pattern because messages can be attempted to be published while brod client restarts. The distinction between Sync and Async hinges on if we wait on brod client finishes publishing for next message to be published, so the outbox requirement woud probably need to be held for both approaches.
For AsyncAdapter, we do have the queue already offering that role as an outbox. However for SyncAdapter we don't - for now we can note that the responsibility of maintaining the outbox would be in the userland when using SyncAdapter, as that won't be the part of this PR.
We probably would need to introduce queue to SyncAdapter in another PR, and maybe note that it is optional. SyncAdapter probably would have more messages pile up in queue than AsyncAdapter, so it would have a higher chance of hitting a memory ceiling when using the queue, so the outbox option can default to internal queue or offloaded to userland to be managed outside of kafee.
Checklist
Problem
Brod client crashes everything when not able to connect to Kafka in some instances -- ultimately meaning that an app will crashloop instead of gracefully handling kafka connection issues.
Details
You can test this by checking out this branch,
path
ing dependency and starting/stopping abroker
container in a given service. Without this branch, things will 💣 . With this it'll keep trying to recover.Scenarios that have been tested: