Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use semaphore in ObjectPool #426

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

harmandeeps
Copy link
Contributor

@harmandeeps harmandeeps commented Jul 13, 2020

The intent of the change is to remove the synchronization block while getting/returning objects from pool and use a semaphore to enable parallel object creation.

Copy link
Contributor Author

@harmandeeps harmandeeps left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review @stagraqubole.

decreaseObject(object);
log.debug(String.format("Returning object: %s to queue. Queue size: %d", object, objectQueue.size()));
if (!objectQueue.offer(object)) {
log.warn("Created more objects than configured. Created=" + totalCount + " QueueSize=" + objectQueue.size());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it ok to release the semaphore incase more objects were created than maxSize

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this needs a change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I am missing something. Theoretically, this case should happen as we are controlling parallelism via counter-based semaphore.
suppose, this happens, then why shouldn't we release the semaphore as the returning thread is done with the object? won't it give a chance to other waiting threads?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are controlling parallelism strictly via counter-based semaphore, then this is an error state. We should error out if this happens instead.
I don't think it makes sense to operate a semaphore if the number of objects can be larger than permits (it can lead to wrong calculations).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, true. throwing error in such a case.

try {
if (blocking) {
freeObject = objectQueue.take();
takeSemaphore.acquire();
object = objectQueue.take();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the reason for not calling tryGetObject here?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should. Or we should remove the code path for blocking=true case as it is not applicable for us and it is only complicating maintenance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tryGetObject call object.Queue.poll() which is non blocking. But, in case of blocking mode, though there is semaphore acquire blocking call, I just a called 'take' to be on safe side.

I guess better removing this blocking mode as we are not using it anymore in the code.

private final String host;
private final int socketTimeout;
private final int connectTimeout;
private final Semaphore takeSemaphore;
private final AtomicInteger totalCount;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to remove totalCount and rather use takeSemaphore#permits

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, will change in next iteration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked it is not possible. We initialize the pool with min size without acquiring semaphore.

}

public synchronized int getTotalCount()
public int getTotalCount()
Copy link
Member

@rohangarg rohangarg Jul 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be getActiveObjectCount (the var too)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change this.

}
object.setLastAccessTs(System.currentTimeMillis());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this set while giving the object? what if this object is being used for more than the max-idle time?
shouldn't this be set to some sentinel value while giving the object, and to a timestamp while release of the object of the pool?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should fix this separately, I will open a issue. Right now it works because thrift call time is much less than idle timeout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by max-idle time? If you are referring scavenging the under connection, then in this case , object won't be present in the queue to get scavenged.

All the objects are inherently same (don't have any state properties associated), setting timestamp while getting or returning shouldn't matter much. Or am I missing something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with Shubham as to why it works for now.

setting timestamp while getting or returning shouldn't matter much.

Ok, if this will not be a part of scavenger thread, then doesn't make sense to set the last access time in release method? borrow is logically not the last access time according to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, got it. makes sense. for example, if in a query burst all objects are taken for 20 minutes (suppose) and the released. Scanvenger will think these objects are not getting used and scavenge.

We can take this in a separate commit as subhuam mentioned.

@rohangarg
Copy link
Member

Also, can you please the exact motivation for using semaphore in the object pool?
Were things like atomic counter + mutex locking or blocking queue not enough for controlling concurrency? (and why if they were not).

@harmandeeps
Copy link
Contributor Author

Also, can you please the exact motivation for using semaphore in the object pool?
Were things like atomic counter + mutex locking or blocking queue not enough for controlling concurrency? (and why if they were not).

changed the description. please check.

@@ -310,7 +308,7 @@ public static int getTransportPoolMaxWait(Configuration conf)

public static int getScavengeInterval(Configuration conf)
{
return conf.getInt(KEY_POOL_MAX_WAIT_TIMEOUT, DEFAULT_SCAVENGE_INTERVAL);
return conf.getInt(KEY_POOL_SCAVENGER_INTERVAL, DEFAULT_SCAVENGE_INTERVAL);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please separate out commits. Have one separate for changing config name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created a separate commit.

{
Poolable<T> object;
try {
takeSemaphore.tryAcquire(config.getMaxWaitMilliseconds(), TimeUnit.MILLISECONDS);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return value denotes timeout. If we timeout then we need to return instead of going ahead without permit, that can cause creating more than max connections for some time. At the time of returning we would destroy extra ones and then double decrement semaphore.
Please add UTs around this to capture different cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added logic to check the return type, if it is false (lock not acquired), return null. Also, added UT around the semaphore logic. Please have a look.

@@ -78,8 +76,11 @@ public void registerHost(String host)
}
log.debug(this.name + " : Borrowing object for partition: " + host);
for (int i = 0; i < 3; i++) { // try at most three times

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this retry, better exit fast and fallback to direct reads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants