Skip to content

Commit

Permalink
snappy compression
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephen Marin committed Jan 27, 2016
1 parent cde146d commit 7096f80
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
{cover_print_enabled, true}.
{clean_files, ["ebin/*.beam"]}.
{deps,[
{gproc, ".*", {git, "https://github.com/uwiger/gproc.git"}}
{gproc, ".*", {git, "https://github.com/uwiger/gproc.git"}},
{esnappy, ".*", {git, "https://github.com/thekvs/esnappy.git"}}
]}.
{sub_dirs, ["src"]}.
10 changes: 6 additions & 4 deletions src/ekaf_protocol_produce.erl
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,15 @@ encode_partition(Partition) when is_integer(Partition)->
encode_partition(#partition{ id = Id, message_sets = MessageSets })->
{ok, Codec} = ekaf_lib:get_compression_codec(),
MessageSetsEncoded = case Codec of
gzip->
gzip ->
MsgVal = zlib:gzip(encode_message_sets(MessageSets)),
SecondMsgSet = #message_set{size = 1, messages = [#message{attributes = 1, key = undefined, value = MsgVal}]},
encode_message_set(SecondMsgSet);
snappy->
%not implementing yet because I don't need it
encode_message_sets(MessageSets);
snappy ->
{ok, Ctx} = esnappy:create_ctx(),
MsgVal = esnappy:compress(Ctx, encode_message_sets(MessageSets)),
SecondMsgSet = #message_set{size = 1, messages = [#message{attributes = 2, key = undefined, value = MsgVal}]},
encode_message_sets(SecondMsgSet);
_ ->
encode_message_sets(MessageSets)
end,
Expand Down

0 comments on commit 7096f80

Please sign in to comment.