Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

physical memory issues on EMR #18

Open
refaelos opened this issue Nov 21, 2016 · 48 comments
Open

physical memory issues on EMR #18

refaelos opened this issue Nov 21, 2016 · 48 comments

Comments

@refaelos
Copy link

@carlomedas maybe you can help me here ...

When I'm running my EMR mapreduce and want the output compressed with 4mz I get multiple errors like:
... is running beyond physical memory limits. Current usage: 4.6 GB of 4 GB physical memory used; 6.3 GB of 20 GB virtual memory used. Killing container. ...

I tried increasing the memory limit and I still get these errors (only later on the reducer processing).

Do you have an idea why the moment I started compressing with 4mz I started getting these errors? (When I compressed with lzo or bz I wasn't getting it)

Thanks!

@carlomedas
Copy link
Collaborator

Hello.

I'm not aware of any memory leak in current 4mc.
Where are you using it, as input or output?
Are you doing just Maps or also reduces and it's final output format?

To be honest I never tried it as output of reduce as we've been using it directly in spark as final output format, and it's ages faster than standard map/reduce, and btw it's not leaking even there.

Carlo

@refaelos
Copy link
Author

Thanks!

It happens when I'm running on larger amounts of data (still not large enough... just a few 10s of GBs).

I'm using 4mz (not 4mc) as the compression of the final output. I basically stream files through one end and compressing them out to another place on the other end.

If you have any suggestions it'll be great!

@refaelos
Copy link
Author

refaelos commented Dec 19, 2016

@carlomedas still having this issue. Can you suggest a solution?

It happens even when running with smaller amounts of data (a few hundred MBs).
Also, I tried using BZip and it worked. Just when switching to 4mz it fails.

Note: we use this codec as the output format: com.hadoop.compression.fourmc.FourMzMediumCodec

@carlomedas
Copy link
Collaborator

does it fail only with 4mz or also 4mc?
we have been using both with quite huge files (several GB each) on small and medium size cluster.
The only difference is that I've been using it on an Hadoop/Spark cluster on AWS, but I never tried them on EMR.
Unfortunately I do not have time now to compare manual hadoop/Spark with EMR, but it's in our plans for the future.

@refaelos
Copy link
Author

@carlomedas thanks!

We only tried 4mz because we needed a splittable compression that is better than Snappy.

Can you offer a different compression until it'll be ready? Maybe something that comes natively with Hadoop?
From what you say I understand that for now 4mz is not usable for us.

@refaelos
Copy link
Author

@carlomedas also tested FourMcMediumCodec and it's still failing :(

@carlomedas
Copy link
Collaborator

I guess it's not working properly with EMR. What version of hadoop do you have there and can you better describe how your job is composed?
If it's spark, please describe the spark DAG.
Maybe we can guess what's wrong.

@refaelos
Copy link
Author

refaelos commented Dec 19, 2016

@carlomedas

Hadoop distribution: Amazon 2.7.3 (probably based on Hadoop 2.7.3)

These is the step configuration:

-libjars /home/hadoop/CustomOutputFormats.jar,/home/hadoop/hadoop-4mc-2.0.0.jar -D dfs.replication=1 -D stream.map.output.field.separator='\t' -D mapreduce.reduce.shuffle.input.buffer.percent=0.5 -D mapreduce.map.memory.mb=4096 -D mapreduce.reduce.memory.mb=4096 -D mapreduce.map.java.opts=-Xmx3277m -D mapreduce.reduce.java.opts=-Xmx3277m -D mapreduce.map.output.compress=true -D mapreduce.output.fileoutputformat.compress=true -D mapreduce.output.fileoutputformat.compress.codec=com.hadoop.compression.fourmc.FourMzMediumCodec -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator -D mapreduce.partition.keycomparator.options=-r -outputformat com.soomla.hadoop.MultipleTextOutputFormatByKey -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

It's not spark ... it's a only hadoop streaming on EMR.

It must be something with the compression b/c the moment I change back to BZip codec it works fine. Of course I don't want to use BZip (VERY slow) and I can't find any other good compression codec that supports streaming.

  • Maybe you can point me to a few places in the code where I can put some logs that can maybe shed some light on this?
  • Maybe there's a way to not do the compression in memory (I suppose no)?
  • I can see that my job creates 3 reducers - 2 succeed and 1 fails with the physical memory problem. Maybe there's something I can look into in there?

Thank you very much for your help. I really want it to work b/c it'll be a pain for us to change compression again.

@carlomedas
Copy link
Collaborator

As far as I understood you are using it to store output of reduce, each reduce is writing to a 4mz/4mz file.
Each file only uses 4+4MB of direct memory buffers to handle buffering and compression, no more than that.
To make it even more performant and to control better the direct memory buffer, in previous releases I introduced direct memory buffer pooling, to make sure even if you create a huge number of files, it's not exhausting those special buffers.

Is it failing in standard OOM or in direct memory buffer related OOM?
Please try again by adding this setting to Java VM:
-XX:MaxDirectMemorySize=256M

@refaelos
Copy link
Author

Thanks for the info!

Do you mean setting it for mapper or reducer?
I see 2 options:

mapreduce.map.java.opts=...
mapreduce.reduce.java.opts=...

@carlomedas
Copy link
Collaborator

Try both please, they will not harm. But if you are compressing with 4mz only on final reduce stage, it could stay only on reducer.

@refaelos
Copy link
Author

refaelos commented Dec 20, 2016

I tried setting it on both like this:

 -D mapreduce.reducer.java.opts='-Xmx3277m -XX:MaxDirectMemorySize=256m'

But I'm getting an error: Unrecognized option: -XX:MaxDirectMemorySize=256m'

I'm trying to figure out how it should be done in hadoop streaming.

@refaelos
Copy link
Author

I tried this -D mapred.child.java.opts=-XX:MaxDirectMemorySize=256m and still getting the physical memory error.

I don't understand why it doesn't start using virtual memory once physical memory is "filled up".

@carlomedas
Copy link
Collaborator

can you please paste the full exception?

@refaelos
Copy link
Author

refaelos commented Dec 20, 2016

Container [pid=19884,containerID=container_1482229402435_0001_01_000035] is running beyond physical memory limits. Current usage: 4.0 GB of 4 GB physical memory used; 6.4 GB of 20 GB virtual memory used. Killing container. Dump of the process-tree for container_1482229402435_0001_01_000035 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 19892 19884 19884 19884 (java) 1551 216 6754443264 1052796 /usr/lib/jvm/java-openjdk/bin/java -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN -Xmx3277m -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1482229402435_0001/container_1482229402435_0001_01_000035/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1482229402435_0001/container_1482229402435_0001_01_000035 -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog -Dyarn.app.mapreduce.shuffle.logger=INFO,shuffleCLA -Dyarn.app.mapreduce.shuffle.logfile=syslog.shuffle -Dyarn.app.mapreduce.shuffle.log.filesize=0 -Dyarn.app.mapreduce.shuffle.log.backups=0 org.apache.hadoop.mapred.YarnChild 10.0.4.82 42967 attempt_1482229402435_0001_r_000001_0 35 |- 19955 19892 19884 19884 (split_raw_reduc) 460 70 19734528 3565 /mnt/yarn/usercache/hadoop/appcache/application_1482229402435_0001/container_1482229402435_0001_01_000035/./split_raw_reducer |- 19960 19955 19884 19884 (split_raw_reduc) 0 0 12095488 1824 /mnt/yarn/usercache/hadoop/filecache/13/split_raw_reducer |- 19884 19882 19884 19884 (bash) 0 0 115806208 673 /bin/bash -c /usr/lib/jvm/java-openjdk/bin/java -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN -Xmx3277m -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1482229402435_0001/container_1482229402435_0001_01_000035/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1482229402435_0001/container_1482229402435_0001_01_000035 -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog -Dyarn.app.mapreduce.shuffle.logger=INFO,shuffleCLA -Dyarn.app.mapreduce.shuffle.logfile=syslog.shuffle -Dyarn.app.mapreduce.shuffle.log.filesize=0 -Dyarn.app.mapreduce.shuffle.log.backups=0 org.apache.hadoop.mapred.YarnChild 10.0.4.82 42967 attempt_1482229402435_0001_r_000001_0 35 1>/var/log/hadoop-yarn/containers/application_1482229402435_0001/container_1482229402435_0001_01_000035/stdout 2>/var/log/hadoop-yarn/containers/application_1482229402435_0001/container_1482229402435_0001_01_000035/stderr Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143

@refaelos
Copy link
Author

Ok so I managed to run with -XX:MaxDirectMemorySize=256m by setting only this argument like this:
-D mapreduce.reducer.java.opts=-XX:MaxDirectMemorySize=256m
(It dosn't have the -Xmx3277m like before).

This fails the reducer immediately with:
image

@carlomedas
Copy link
Collaborator

To be honest I don't know how to help you on this as I'm not familiar with hadoop streaming PipeMapRed and don't know how the EMR is tuned on AWS.
Seems like PipeMapRed is using direct memory buffers as well and needs much more than 256MB.

While the error you reported above in previous comment is from YARN that is automatically killing the container since it exceeded the max allowed memory. Did you try already to increase it, just to see if it helps? I'm talking about the setting of 4 GB physical.
Please note anyways that 4mc codecs just use above mentioned 4MB+4MB of direct memory buffers plus some little overhead but that's it.
It runs with much smaller tunings, but again, on different hadoop(+spark) 2.6.0 and not on EMR.
Apart from this test, I'll try and find the time to test it myself on EMR.

@refaelos
Copy link
Author

@carlomedas I did try to increase it as much as possible (hadoop allows up to ~5gb on a c4.xlarge instance) but still it fails.

Something interesting is that when I run it with mapreduce.reduce.java.opts=-Xmx3277m it fails with the YARN error but without it I'm getting the 'direct memory' error. It's not even related to the MaxDirectMemorySize setting.

Maybe the only solution is really to start using larges instances that have more memory ... but it also costs more.

@refaelos
Copy link
Author

What do you mean by 4MB+4MB of direct memory ?
Is it per chunk or something?

@carlomedas
Copy link
Collaborator

It buffers chunks up to 4 MB and then compresses and writes.

@refaelos
Copy link
Author

So it's really weird that it fails ... Either there's a memory leak or something with my config is wrong. Why would it succeed with BZ2 and fail with 4mc ?

@refaelos
Copy link
Author

@carlomedas we have a theory ... does it keep 4MB per open file that the reducer needs to compress?
I mean, our out contains of a huge amount of small files (1000s of ~1MB files). If it keeps the 4MB saved until the process of all reducers finishes, this might overflow the memory.

@carlomedas
Copy link
Collaborator

Yes exactly, and I agree with you: that's the reason.
4mc/4mz was designed to be very powerful with huge files, both in input but also in output.
Maybe I'm not understanding your use-case because it's pretty unusual that you have so huge number of reducers.

@refaelos
Copy link
Author

@carlomedas No I don't have a lot of reducers .. just a lot of files are created from the 7 reducers that I have. I mean, the output consists of 1000s of files.

@refaelos
Copy link
Author

@carlomedas I think we've figured out the problem and it looks like it is related to the output consisting of A LOT of files.

You said that you keep 4MB of memory that you compress and print. The thing is that when we want to output ~4000 files that are all less than 4MB then it keeps 4MB*4000 in memory which is about 16GB of memory and it crashes the job.

About the ~4000 files: We use a custom output format that separates the data according to different params in the input. Sometimes it causes the output to consist of a lot of very small files which is fine with us b/c we need it split into curtain folders.

Do you have an idea how this can be solved? Maybe we can keep 1MB instead of 4MB? Will it cause a performance issue?

@carlomedas
Copy link
Collaborator

I see now. It was not designed to work with such use cases, so I don't think it's easily fixable for this non usual big-data use case.
As matter of fact I removed the possibility configure it in https://github.com/carlomedas/4mc/blob/master/java/hadoop-4mc/src/main/java/com/hadoop/compression/fourmc/FourMzCodec.java
because it was causing problems, beyond the fact that it was strongly decreasing the compression ratio achieved, so it's fixed at 4 MB right now.

@refaelos
Copy link
Author

@carlomedas :(

Do you have a suggested compression method that is splittable and can work for this use case?

@carlomedas
Copy link
Collaborator

A contributor, @advancedxy , added compression/decompression for standard zstd (i.e. no 4mc).
You can find them here: https://github.com/carlomedas/4mc/tree/master/java/hadoop-4mc/src/main/java/com/hadoop/compression/fourmc/zstd

As far as I remember that is just 128KB and not 4MB.

@refaelos
Copy link
Author

Thanks.

What's the difference with/without mc? Is it not splittable now?

@advancedxy
Copy link
Contributor

ZstCodec is not splittable and uses each ~128KB for raw input buffer and compressed buffer.

@refaelos When you are generating so many small files, there is no point to keep them splittable. You need to combine them rather than splitting.

@advancedxy
Copy link
Contributor

As for the 4MB compress buffer, it should be possible to make it configurable with some additional work. But I don't have the spare time now and I don't think your case justify it.

However, the 4MB buffer makes it unpractical to use it in the mapreduce.map.output.compress, as the reduce task may have a lot of copier threads(80 for example) to fetch map output. That's 80 * (4+4) = 640MB additional memory overhead. But that's when ZstCodec comes in handy.

@carlomedas
Copy link
Collaborator

Yes, as matter of fact I did remove the configurability of the buffer because of some issue we had on the native part, that unfortunately I did not have time to follow up and fix.
Thanks for the note on the reducer, I never thought about that...!

@refaelos
Copy link
Author

@advancedxy thanks!

The reason why we keep so many files is that we separate files per day and category. We need this to be able to have an s3 storage that we can invoke some processes on top of. The processes we invoke are running on a combination of that day+category.

The thing is that not all files are small... Some might get to 1gb or more (compressed). I think we want that splittable... Don't you?

@advancedxy
Copy link
Contributor

After reviewing the source code of MultipleTextOutputFormat, one possible solution is to increase the number of reducers, then one reducer can process a smaller number of files and run faster

@refaelos
Copy link
Author

@advancedxy yeah but still ... there will be less mappers so it'll be slow.

@advancedxy
Copy link
Contributor

advancedxy commented Dec 23, 2016 via email

@refaelos
Copy link
Author

@advancedxy I mean that if there's a big file and it's not splittable then the entire file will be handled by one mapper. I hope I'm not wrong on this one ...
That's the whole idea of splitting ... isn't it?

@advancedxy
Copy link
Contributor

advancedxy commented Dec 23, 2016 via email

@pradeepg26
Copy link

@refaelos @advancedxy @carlomedas I may be a little late to the party, this is can be solved another way by setting the yarn memory overhead to be higher.

The container itself has 4GB of memory. And JVM has 3277MB of heap. That only leaves 819MB of overhead. In CDH distro, the default overhead is 20%. (0.8 * 4096 = 3277). So you're eating up most of the overhead with your direct buffers and there isn't enough additional overhead for the other off-heap usages. I suspect that if you leave the heap as is and set mapreduce.reduce.memory.mb=5120, your job will succeed.

@refaelos
Copy link
Author

@advancedxy getting back to this one. So you're saying increase number of reducers without increasing number of nodes or memory configurations?

On MR2 - increasing number of reducers means decreasing memory limits with mapreduce.reduce.memory.mb config. Is that what you mean? (I'm pretty sure doing that will still make the process fail as there's even less memory now).

@DataWanderer
Copy link

Any solution here I am seeing the same issue on my end.

@refaelos
Copy link
Author

@DataWanderer I wish there was. The problem is that when your reducer is creating too many output files, 4MC needs to use a lot of memory to handle them. So it'll probably fail.

@advancedxy
Copy link
Contributor

This is an old issue...

@advancedxy getting back to this one. So you're saying increase number of reducers without increasing number of nodes or memory configurations?

Yes, increase the number of reducers: the more reducers, the less memory requirement for one reducer task. And there should be capacity setting.

Let's say, you job has 100 reducers and your resource supports you run the job concurrently, then increase the number of reducers to 200, but set your job reducer's capacity to 100, there will be only 100 reducers running concurrently.

@refaelos
Copy link
Author

@advancedxy thanks. Increasing the number of reducers means adding more machines to the cluster? If yes, This makes the entire process more expensive to run.

@advancedxy
Copy link
Contributor

Increasing the number of reducers means adding more machines to the cluster?

No. Number of reducers should be an setting of your MR Job. It's a logical concept

@refaelos
Copy link
Author

@advancedxy ok got it. I'll take a look.

@trixpan
Copy link
Contributor

trixpan commented Nov 27, 2017

@carlomedas I see you haven't released after the merge of @advancedxy PR.

Do you plan to do so?

@carlomedas
Copy link
Collaborator

Yes it's in my plan but I'm travelling and not be from here to recompile the native on all platforms, which is needed for a release.
If you can help compile on some platform (e.g. Win64 is worst atm as I miss the original VM I used), it'd help :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants