-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ekaf is not honoring broker's partition change #15
Comments
HTH |
-module(kafka_client).
-include_lib("ekaf/include/ekaf_definitions.hrl").
%% analytics topic in Kafka
-define(ANALYTICS_TOPIC, <<"analytics">>).
%% insights topic in Kafka
-define(INSIGHTS_TOPIC, <<"insights">>).
%% API
-export([start_node/1,
stop_node/0,
publish_metric/1,
publish_metric/2,
kafka_callback/5]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Standard methods for managing kafka clients backed by service discovery.
%%% When downstream nodes die, the clients will get callback from ekaf. ekaf
%%% maintains the connection pool (workers) and using gen_fsm it changes the connection
%%% state of workers
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_node({string(), integer()}) -> ok.
start_node({Host, Port}) ->
IsStarted = fun(MyApp) ->
lists:any(fun({AppName, _Desc, _Version}) ->
MyApp == AppName end, application:which_applications())
end,
case IsStarted(ekaf) of
false ->
%%% Callbacks for server state and worker (connection) state
application:set_env(ekaf, ?EKAF_CALLBACK_DOWNTIME_REPLAYED, {kafka_client, kafka_callback}),
application:set_env(ekaf, ?EKAF_CALLBACK_DOWNTIME_SAVED, {kafka_client, kafka_callback}),
application:set_env(ekaf, ?EKAF_CALLBACK_FLUSH, {kafka_client, kafka_callback}),
application:set_env(ekaf, ?EKAF_CALLBACK_FLUSHED_REPLIED, {kafka_client, kafka_callback}),
application:set_env(ekaf, ?EKAF_CALLBACK_TIME_TO_CONNECT, {kafka_client, kafka_callback}),
application:set_env(ekaf, ?EKAF_CALLBACK_WORKER_DOWN, {kafka_client, kafka_callback}),
application:set_env(ekaf, ?EKAF_CALLBACK_WORKER_UP, {kafka_client, kafka_callback}),
PartitionWorkers = ekaf_per_partition_workers,
PartitionWorkersMax = ekaf_per_partition_workers_max,
BufferTtl = ekaf_buffer_ttl,
application:set_env(ekaf, PartitionWorkers, application:get_env(ekaf, PartitionWorkers, 20)),
application:set_env(ekaf, PartitionWorkersMax, application:get_env(ekaf, PartitionWorkersMax, 100)),
application:set_env(ekaf, ekaf_bootstrap_broker, {Host,Port}),
application:set_env(ekaf, BufferTtl, application:get_env(ekaf, BufferTtl, 1000)),
{ok, _} = application:ensure_all_started(ekaf);
_ -> ok
end,
ok.
stop_node() ->
application:stop(ekaf),
ok.
-spec publish_metric(list(binary()) | binary()) -> ok.
publish_metric(MetricName) ->
publish_metric(analytics, MetricName).
-spec publish_metric(atom() | binary(), list(binary()) | binary()) -> ok.
publish_metric(insights, MetricMessage) ->
ekaf:produce_async(?INSIGHTS_TOPIC, MetricMessage);
publish_metric(analytics, MetricName) ->
ekaf:produce_async(?ANALYTICS_TOPIC, MetricName);
publish_metric(TopicName, MetricMessage) when is_binary(TopicName) ->
ekaf:produce_async(TopicName, MetricMessage).
%%%%% Callback handlers for kafka-broker (node) and worker (connection) state %%%
%%% Recycled from ekaf demo code
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
kafka_callback(Event, _From, _StateName,
#ekaf_server{ topic = Topic },
Extra) ->
Stat = <<Topic/binary,".", Event/binary>>,
case Event of
?EKAF_CALLBACK_DOWNTIME_SAVED ->
%io:format("~n ~s => 1",[Stat]),
ok;
?EKAF_CALLBACK_DOWNTIME_REPLAYED ->
%io:format("~n ~s => 1 during ~p",[Stat, StateName]),
ok;
?EKAF_CALLBACK_TIME_DOWN ->
case Extra of
{ok, Micros}->
io:format("~n ~s => ~p",[Stat, ekaf_utils:ceiling(Micros/1000)]);
_ ->
ok
end;
?EKAF_CALLBACK_WORKER_DOWN ->
FinalStat = <<Topic/binary,".mainbroker_unreachable">>,
io:format("~n ~s => 1",[FinalStat]),
ok;
_ ->
?INFO_MSG("ekaf_server callback got ~p ~p",[Event, Extra])
end;
kafka_callback(Event, _From, _StateName,
#ekaf_fsm{topic = Topic, broker = _Broker, partition = PartitionId, last_known_size = _BufferLength, cor_id = CorId, leader = Leader},
Extra)->
Stat = <<Topic/binary,".", Event/binary, ".broker", (ekaf_utils:itob(Leader))/binary, ".", (ekaf_utils:itob(PartitionId))/binary>>,
case Event of
?EKAF_CALLBACK_FLUSH ->
ok;
?EKAF_CALLBACK_FLUSHED_REPLIED ->
case Extra of
{ok, {{replied, _, _}, #produce_response{ cor_id = ReplyCorId }} }->
Diff = case (CorId - ReplyCorId ) of Neg when Neg < 0 -> 0; SomeDiff -> SomeDiff end,
FinalStat = <<Stat/binary,".diff">>,
io:format("~n~s ~w",[FinalStat, Diff]);
_ ->
?INFO_MSG("ekaf_fsm callback got ~p some:~p ~nextra:~p",[Event, Extra])
end;
?EKAF_CALLBACK_WORKER_UP ->
io:format("~n ~s 1",[Stat]),
ok;
?EKAF_CALLBACK_WORKER_DOWN ->
io:format("~n ~s 1",[Stat]),
ok;
?EKAF_CALLBACK_TIME_TO_CONNECT ->
case Extra of
{ok, Micros}->
io:format("~n ~s => ~p",[Stat, ekaf_utils:ceiling(Micros/1000)]);
_ ->
ok
end;
_ ->
?INFO_MSG("ekaf_fsm callback got ~p ~p",[Event, Extra])
end.
|
@bosky101 For 2.: There are no worker down messages, but there are worker up message. I'm assuming the workers were still up because the nodes didn't actually go down. And then ekaf_server would start new workers because the metadata has changed but it never stopped the old ones. Could that be the case? As far as I can tell, there is no checking of server response on produce calls, so errors like "no longer leader" (error code 6) are silently ignored. |
It appears from our production environment that broker's partition change is not being honored in ekaf and we are seeing
From kafka protocol:
seems like (2) (invalid partition error code) is not handled in ekaf.
The text was updated successfully, but these errors were encountered: