Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch operations do not work properly when using a connection pool #73

Open
jacinpoz opened this issue Mar 26, 2017 · 2 comments
Open

Comments

@jacinpoz
Copy link

jacinpoz commented Mar 26, 2017

I faced this when I tried to implement batch inserts in my application and I was using a connection pool (without the pool setting it works perfectly fine).

It is fairly easy to reproduce using this test:

    @Test
    public void testWithPool() throws InterruptedException {
        Database db = Database.builder()
                .url("jdbc:h2:mem:test1;DB_CLOSE_DELAY=-1")
                .pool(1,32)
                .build()
                .asynchronous();

        db.update("CREATE TABLE TRADE(\n" +
                "        STATUS VARCHAR(64)  ,\n" +
                "PRICE NUMERIC(20,5)  ,\n" +
                "CLIENT_ID VARCHAR(64)  NOT NULL,\n" +
                "QUANTITY INTEGER  ,\n" +
                "TIMESTAMP BIGINT  ,\n" +
                "RECORD_ID VARBINARY(8)  ,\n" +
                "ID VARCHAR(64)  NOT NULL,\n" +
                "        PRIMARY KEY(ID),\n" +
                "CONSTRAINT TRADE_BY_CLIENT_ID UNIQUE (CLIENT_ID, ID) ,\n" +
                "        CONSTRAINT chk_TRADE_STATUS CHECK (STATUS IN ('PENDING', 'COMPLETE')))").count().toBlocking().single();

        int numPeopleBefore = db.select("select count(*) from TRADE") //
                .getAs(Integer.class) //
                .toBlocking().single();
        final List<Observable<Map<String, Object>>> params = new ArrayList<>();
        for(int i = 0; i < 5; i++){
            final Map<String, Object> paramMap = new HashMap<>();
            paramMap.put("ID", "Trade" + i);
            paramMap.put("QUANTITY", 5000 + 1);
            paramMap.put("PRICE", new BigDecimal(32.44 + i));
            paramMap.put("STATUS", "PENDING");
            paramMap.put("CLIENT_ID", "Client1");
            paramMap.put("TIMESTAMP", System.currentTimeMillis());
            paramMap.put("RECORD_ID", ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array());
            params.add(Observable.just(paramMap));
        }

        Observable<Integer> count = db.update("insert into TRADE(ID,QUANTITY,PRICE,STATUS,CLIENT_ID,TIMESTAMP,RECORD_ID) values(:ID,:QUANTITY,:PRICE,:STATUS,:CLIENT_ID,:TIMESTAMP,:RECORD_ID)")
                .dependsOn(db.beginTransaction())
                // set batch size
                .batchSize(3)
                // get parameters from last query
                .parameters(Observable.merge(params))
                // go
                .count()
                // end transaction
                .count();
        assertTrue(db.commit(count).toBlocking().single());
        int numPeople = db.select("select count(*) from TRADE") //
                .getAs(Integer.class) //
                .toBlocking().single();
        assertEquals(numPeopleBefore + 5, numPeople);
    }

This is the output using Hikari 2.6.0 and H2 1.4.192.
H2FailedTest.txt

If I use Hikari 2.3.13 (like rxjava-jdbc project does) it still fails. Seems like the pool is shut down after the batch insert due to a failure at trying to close the connection.

This is the output if I replace H2 for HSQLDB 2.3.2 by replacing the url for this "jdbc:hsqldb:mem:aname;user=user;" :

HSQLDBFailedTest.txt

Is there something I am not taking into account?

@jacinpoz
Copy link
Author

I think this is the problematic bit inside Database.java

    /**
     * Resets the current thread local {@link ConnectionProvider} to default.
     */
    void endTransactionObserve() {
        log.debug("endTransactionObserve");
        ConnectionProvider c = currentConnectionProvider.get();
        if (c instanceof ConnectionProviderBatch) {
            c.close();
        }
        currentConnectionProvider.set(cp);
        isTransactionOpen.set(false);
        rsCache.set(null);
    }

@davidmoten
Copy link
Owner

Thanks for the unit test! I'll have a look soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants