-
Notifications
You must be signed in to change notification settings - Fork 74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use semaphore in ObjectPool #426
base: master
Are you sure you want to change the base?
Conversation
rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java
Outdated
Show resolved
Hide resolved
rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java
Outdated
Show resolved
Hide resolved
rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java
Outdated
Show resolved
Hide resolved
rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java
Outdated
Show resolved
Hide resolved
rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java
Outdated
Show resolved
Hide resolved
rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java
Outdated
Show resolved
Hide resolved
rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java
Outdated
Show resolved
Hide resolved
rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the review @stagraqubole.
568bfda
to
bf78376
Compare
rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java
Show resolved
Hide resolved
decreaseObject(object); | ||
log.debug(String.format("Returning object: %s to queue. Queue size: %d", object, objectQueue.size())); | ||
if (!objectQueue.offer(object)) { | ||
log.warn("Created more objects than configured. Created=" + totalCount + " QueueSize=" + objectQueue.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it ok to release the semaphore incase more objects were created than maxSize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this needs a change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe I am missing something. Theoretically, this case should happen as we are controlling parallelism via counter-based semaphore.
suppose, this happens, then why shouldn't we release the semaphore as the returning thread is done with the object? won't it give a chance to other waiting threads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we are controlling parallelism strictly via counter-based semaphore, then this is an error state. We should error out if this happens instead.
I don't think it makes sense to operate a semaphore if the number of objects can be larger than permits (it can lead to wrong calculations).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, true. throwing error in such a case.
try { | ||
if (blocking) { | ||
freeObject = objectQueue.take(); | ||
takeSemaphore.acquire(); | ||
object = objectQueue.take(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the reason for not calling tryGetObject
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should. Or we should remove the code path for blocking=true
case as it is not applicable for us and it is only complicating maintenance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tryGetObject call object.Queue.poll() which is non blocking. But, in case of blocking mode, though there is semaphore acquire blocking call, I just a called 'take' to be on safe side.
I guess better removing this blocking mode as we are not using it anymore in the code.
private final String host; | ||
private final int socketTimeout; | ||
private final int connectTimeout; | ||
private final Semaphore takeSemaphore; | ||
private final AtomicInteger totalCount; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to remove totalCount
and rather use takeSemaphore#permits
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, will change in next iteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked it is not possible. We initialize the pool with min size without acquiring semaphore.
} | ||
|
||
public synchronized int getTotalCount() | ||
public int getTotalCount() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be getActiveObjectCount
(the var too)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will change this.
} | ||
object.setLastAccessTs(System.currentTimeMillis()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this set while giving the object? what if this object is being used for more than the max-idle time?
shouldn't this be set to some sentinel value while giving the object, and to a timestamp while release of the object of the pool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should fix this separately, I will open a issue. Right now it works because thrift call time
is much less than idle timeout
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by max-idle time? If you are referring scavenging the under connection, then in this case , object won't be present in the queue to get scavenged.
All the objects are inherently same (don't have any state properties associated), setting timestamp while getting or returning shouldn't matter much. Or am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with Shubham as to why it works for now.
setting timestamp while getting or returning shouldn't matter much.
Ok, if this will not be a part of scavenger thread, then doesn't make sense to set the last access time in release
method? borrow
is logically not the last access time according to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, got it. makes sense. for example, if in a query burst all objects are taken for 20 minutes (suppose) and the released. Scanvenger will think these objects are not getting used and scavenge.
We can take this in a separate commit as subhuam mentioned.
Also, can you please the exact motivation for using semaphore in the object pool? |
6f3d482
to
129a304
Compare
changed the description. please check. |
129a304
to
a6cf9aa
Compare
@@ -310,7 +308,7 @@ public static int getTransportPoolMaxWait(Configuration conf) | |||
|
|||
public static int getScavengeInterval(Configuration conf) | |||
{ | |||
return conf.getInt(KEY_POOL_MAX_WAIT_TIMEOUT, DEFAULT_SCAVENGE_INTERVAL); | |||
return conf.getInt(KEY_POOL_SCAVENGER_INTERVAL, DEFAULT_SCAVENGE_INTERVAL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please separate out commits. Have one separate for changing config name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
created a separate commit.
{ | ||
Poolable<T> object; | ||
try { | ||
takeSemaphore.tryAcquire(config.getMaxWaitMilliseconds(), TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return value denotes timeout. If we timeout then we need to return instead of going ahead without permit, that can cause creating more than max connections for some time. At the time of returning we would destroy extra ones and then double decrement semaphore.
Please add UTs around this to capture different cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added logic to check the return type, if it is false (lock not acquired), return null. Also, added UT around the semaphore logic. Please have a look.
@@ -78,8 +76,11 @@ public void registerHost(String host) | |||
} | |||
log.debug(this.name + " : Borrowing object for partition: " + host); | |||
for (int i = 0; i < 3; i++) { // try at most three times |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this retry, better exit fast and fallback to direct reads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
e59dc21
to
1721dc0
Compare
The intent of the change is to remove the synchronization block while getting/returning objects from pool and use a semaphore to enable parallel object creation.