Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error on socket shutdown: [Errno 107] Transport endpoint is not connected #2

Open
pochenok opened this issue Apr 1, 2018 · 2 comments

Comments

@pochenok
Copy link

pochenok commented Apr 1, 2018

GCP - 24 CPU, 32 RAM.
Semaphore - just 4
Query is fast enough

Script starts running well but after some time I start getting this error:
Error on socket shutdown: [Errno 107] Transport endpoint is not connected

Before I was getting another error - OSError: [Errno 24] Too many open files
I fixed by raising limits - ulimit -n 100000

async def get_result(comb):
    sem = asyncio.Semaphore(4)
    async with sem:
        client = Client('localhost', database='sna_gandalf')
        num = await client.execute(
            'select count(distinct id_follower)*20 from followers_women sample 0.05 where arrayExists(x -> x = id_blogger, ' + str(comb) + ') = 1  select sum(followers) from bloggers_tmp_price where arrayExists(x -> x = id, ' + str(comb) + ') = 1 select sum(money) from bloggers_tmp_price where arrayExists(x -> x = id, ' + str(comb) + ') = 1')
        combinations_dict[str(comb)] = str(num[0][0]) + ', ' + str(num[1][0]) + ', ' + str(num[2][0])


combinations_dict = {}
for i in range(1, 8):
    print('range ', i, ' done')
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(
        [get_result(list(comb)) for comb in itertools.combinations(list(followers_df.id_blogger.unique()), i)]))

Google does not help me =(

@Mixser
Copy link

Mixser commented Feb 8, 2019

Hi @pochenok
I think, that your not right using semaphores - you declare semaphore in the get_result method and each coroutine has own semaphore instance, as result you try to create a lot of connection to the clickhouse server. For doing it right you must to share one semaphore instance between all you coroutines.

async def get_result(semaphore, combination):
    async with semaphore:
        ...

semaphore = asyncio.Semaphore(4)
combinations_dict = {}

for i in range(1, 8):
    print('range ', i, ' done')
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(
        [get_result(semaphore, list(comb)) for comb in itertools.combinations(list(followers_df.id_blogger.unique()), i)]))

@xzkostyan
Copy link
Member

Hi. @pochenok is this issue still actual?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants