Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offset doesn't get updated between runs and runs crash with OOM errors #107

Open
bpardee opened this issue Jan 31, 2021 · 4 comments
Open

Comments

@bpardee
Copy link

bpardee commented Jan 31, 2021

We are seeing an issue in production (AWS Container) where the offsets are not being updated between runs. It seems that we start up and we slowly run out of memory (after about 15 minutes for 2G, 60 minutes for 8G) until the app crashes and a new instance restarts with the same begin_offset. I cannot seem to reproduce the problem in development.

Something that is possibly related is a rebalance occurs every some multiple of 15 seconds:

	
... 226 lines omitted ...
{"environment":"release","level":"notice","message":"Group member (portfolio-monitor-consumer-group,coor=#PID<0.2720.0>,cb=#PID<0.2715.0>,generation=104232):\nre-joining group, reason::rebalance_in_progress","payload":{},"timestamp":"2021-01-31T14:33:52.857406Z"}
{"application":"kaffe","environment":"release","level":"info","message":"event#assignments_revoked=Elixir.Kaffe.GroupMember.portfolio-monitor-consumer-group.release_tradelines","payload":{},"timestamp":"2021-01-31T14:33:52.857583Z"}
... 3 lines omitted ...
{"environment":"release","level":"notice","message":"Group member (portfolio-monitor-consumer-group,coor=#PID<0.2701.0>,cb=#PID<0.2416.0>,generation=104232):\nre-joining group, reason::rebalance_in_progress","payload":{},"timestamp":"2021-01-31T14:33:52.860172Z"}
... 13 lines omitted ...
{"environment":"release","level":"notice","message":"Group member (portfolio-monitor-consumer-group,coor=#PID<0.2720.0>,cb=#PID<0.2715.0>,generation=104233):\nassignments received:\n  release_tradelines:\n    partition=0 begin_offset=1036850","payload":{},"timestamp":"2021-01-31T14:33:52.865159Z"}
{"application":"kaffe","environment":"release","level":"info","message":"event#assignments_received=Elixir.Kaffe.GroupMember.portfolio-monitor-consumer-group.release_tradelines generation_id=104233","payload":{},"timestamp":"2021-01-31T14:33:52.865255Z"}
Show all 257 lines
... 56 lines omitted ...
{"environment":"release","level":"notice","message":"Group member (portfolio-monitor-consumer-group,coor=#PID<0.2720.0>,cb=#PID<0.2715.0>,generation=104230):\nre-joining group, reason::rebalance_in_progress","payload":{},"timestamp":"2021-01-31T14:33:22.851670Z"}
... 4 lines omitted ...
{"environment":"release","level":"notice","message":"Group member (portfolio-monitor-consumer-group,coor=#PID<0.2701.0>,cb=#PID<0.2416.0>,generation=104230):\nre-joining group, reason::rebalance_in_progress","payload":{},"timestamp":"2021-01-31T14:33:22.854326Z"}
{"application":"kaffe","environment":"release","level":"info","message":"event#assignments_revoked=Elixir.Kaffe.GroupMember.portfolio-monitor-consumer-group.release_inquiries","payload":{},"timestamp":"2021-01-31T14:33:22.854427Z"}
... 12 lines omitted ...
{"application":"kaffe","environment":"release","level":"info","message":"event#assignments_received=Elixir.Kaffe.GroupMember.portfolio-monitor-consumer-group.release_inquiries generation_id=104231","payload":{},"timestamp":"2021-01-31T14:33:22.858850Z"}
{"application":"kaffe","environment":"release","level":"info","message":"event#assignments_received=Elixir.Kaffe.GroupMember.portfolio-monitor-consumer-group.release_tradelines generation_id=104231","payload":{},"timestamp":"2021-01-31T14:33:22.858933Z"}

Not much is going on in my handle_messages function:

  def handle_messages(messages) do
    Logger.info("kafka.handle_messages begin")

    for message <- messages do
      Logger.info("Incoming kafka message", message)
      ... handling message here, not much going here except a select that returns in a couple ms and doesnt return any rows
    end

    Logger.info("kafka.handle_messages end")
    :ok
  end

Which results in:

{
    "message": "kafka.handle_messages begin",
    "timestamp": "2021-01-31T08:57:51.715323Z"
}

The following message occurs around 300 times/second with offset incrementing by 1 each time until the end message occurs around every 45 seconds or so. Messages are backed up from 10 days ago (the processed_at timestamp)

{
    "message": "Incoming kafka message",
    "payload": {
        "headers": [],
        "key": "",
        "offset": 1036850,
        "partition": 0,
        "topic": "release_tradelines",
        "ts": 1611241962094,
        "ts_type": "create",
        "value": "{\"processed_at\":\"2021-01-21T15:12:42.000Z\",\"tracking_number\":\"d73sd596e5\"}"
    },
    "timestamp": "2021-01-31T08:57:51.715395Z"
}

{
    "message": "kafka.handle_messages end",
    "payload": {},
    "timestamp": "2021-01-31T08:58:31.437327Z"
}

begin_offset is always the same for startup. The first message only occurs on startup while the next 2 occur on startup and after every rebalance. The begin_offset is always the same value even when a new container starts up

{
    "message": "    :supervisor: {:local, :brod_sup}\n    :started: [\n  pid: #PID<0.2408.0>,\n  id: :\"portfolio-monitor-consumer-group\",\n  mfargs: {:brod_client, :start_link,\n   [\n     [\n       {'b-3.nonprod-msk-tls.u32mzj.c7.kafka.us-east-1.amazonaws.com', 9094},\n       {'b-1.nonprod-msk-tls.u32mzj.c7.kafka.us-east-1.amazonaws.com', 9094},\n       {'b-2.nonprod-msk-tls.u32mzj.c7.kafka.us-east-1.amazonaws.com', 9094}\n     ],\n     :\"portfolio-monitor-consumer-group\",\n     [\n       auto_start_producers: false,\n       allow_topic_auto_creation: false,\n       begin_offset: -1,\n       ssl: true\n     ]\n   ]},\n  restart_type: {:permanent, 10},\n  shutdown: 5000,\n  child_type: :worker\n]",
    "timestamp": "2021-01-31T08:57:39.985012Z"
}
{
    "message": "Group member (portfolio-monitor-consumer-group,coor=#PID<0.2721.0>,cb=#PID<0.2717.0>,generation=102903):\nassignments received:\n  release_tradelines:\n    partition=0 begin_offset=1036850",
    "timestamp": "2021-01-31T08:57:41.418348Z"
}
{
    "message": "Group member (portfolio-monitor-consumer-group,coor=#PID<0.2715.0>,cb=#PID<0.2416.0>,generation=102903):\nassignments received:\n  release_inquiries:\n    partition=0 begin_offset=474016",
    "timestamp": "2021-01-31T08:57:41.418398Z"
}

Here is a 15 second cycle of kaffe specific messages:

2021-01-31T03:57:51.420-05:00	{"application":"kaffe","environment":"release","level":"info","message":"event#allocate_subscribers=#PID<0.2717.0> generation_id=102903","payload":{},"timestamp":"2021-01-31T08:57:51.419582Z"}

2021-01-31T03:57:51.420-05:00	{"application":"kaffe","environment":"release","level":"info","message":"event#allocate_subscribers=#PID<0.2416.0> generation_id=102903","payload":{},"timestamp":"2021-01-31T08:57:51.419687Z"}

2021-01-31T03:57:51.420-05:00	{"application":"kaffe","environment":"release","level":"info","message":"event#starting=Elixir.Kaffe.Worker name=worker_release_tradelines_0","payload":{},"timestamp":"2021-01-31T08:57:51.419970Z"}

2021-01-31T03:57:51.420-05:00	{"application":"kaffe","environment":"release","level":"info","message":"event#starting=Elixir.Kaffe.Worker name=worker_release_inquiries_0","payload":{},"timestamp":"2021-01-31T08:57:51.420175Z"}

2021-01-31T03:57:55.002-05:00	{"application":"kaffe","environment":"release","level":"info","message":"event#assignments_revoked=Elixir.Kaffe.GroupMember.portfolio-monitor-consumer-group.release_inquiries","payload":{},"timestamp":"2021-01-31T08:57:55.002526Z"}

2021-01-31T03:57:55.002-05:00	{"application":"kaffe","environment":"release","level":"info","message":"event#stopping=#PID<0.2416.0>","payload":{},"timestamp":"2021-01-31T08:57:55.002572Z"}

2021-01-31T03:57:55.003-05:00	{"application":"kaffe","environment":"release","level":"info","message":"event#assignments_revoked=Elixir.Kaffe.GroupMember.portfolio-monitor-consumer-group.release_tradelines","payload":{},"timestamp":"2021-01-31T08:57:55.003436Z"}

2021-01-31T03:57:55.003-05:00	{"application":"kaffe","environment":"release","level":"info","message":"event#stopping=#PID<0.2717.0>","payload":{},"timestamp":"2021-01-31T08:57:55.003471Z"}

2021-01-31T03:57:55.009-05:00	{"application":"kaffe","environment":"release","level":"info","message":"event#assignments_received=Elixir.Kaffe.GroupMember.portfolio-monitor-consumer-group.release_tradelines generation_id=102904","payload":{},"timestamp":"2021-01-31T08:57:55.009133Z"}

2021-01-31T03:57:55.009-05:00	{"application":"kaffe","environment":"release","level":"info","message":"event#assignments_received=Elixir.Kaffe.GroupMember.portfolio-monitor-consumer-group.release_inquiries generation_id=102904","payload":{},"timestamp":"2021-01-31T08:57:55.009311Z"}

Running kaffe 1.18.0 and brod 3.14.0. I tried to upgrade to the latest and ran into this (#106). Here is the config where there are 3 brokers and 2 topics:

  config :kaffe,
    consumer: [
      endpoints: brokers,
      topics: topics,
      ssl: config_env() == :prod,
      # the consumer group for tracking offsets in Kafka
      consumer_group: "portfolio-monitor-consumer-group",
      # the module that will process messages
      message_handler: EventHandler.KafkaConsumer,
      offset_reset_policy: :reset_to_latest,
      worker_allocation_strategy: :worker_per_topic_partition
    ]

Sorry if this is TMI but I didn't want to leave out anything that might be relevant. Any help resolving this issue would be greatly appreciated!

@objectuser
Copy link
Contributor

There were some issues with continual rebalancing in a "recent" version of Kafka ... but I don't recall which version. The rebalancing could definitely cause messages to be continually reprocessed so that the consumers never commit and make progress.

I think there's an old issue here that might contain that Kafka version ...

@bpardee
Copy link
Author

bpardee commented Feb 3, 2021

I saw that was a problem for 2.3.0. We are using Amazon MSK and it uses Kafka version 2.3.1

@bpardee
Copy link
Author

bpardee commented Feb 3, 2021

I dropped the max_bytes down from 1M to 10K. We now commit several times per second instead of once every 45 seconds. The rebalance is still occurring every 15 seconds or so but the begin_offset is increasing each time whereas it was holding steady previously. Memory is also holding steady.

@objectuser
Copy link
Contributor

I think the rebalancing is the issue to solve first. That should not be happening with such a high and consistent frequency. I wonder if you'd experience the same thing with a different cluster (like set one up in a local docker or something).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants