Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection error when two browser peers are connected to one python user #1133

Open
el07694 opened this issue Jul 14, 2024 · 0 comments
Open

Comments

@el07694
Copy link

el07694 commented Jul 14, 2024

The error:

======== Running on http://192.168.1.10:8080 ========
(Press CTRL+C to quit)
Exception in callback Transaction.__retry()
handle: <TimerHandle when=12134.937 Transaction.__retry()>
Traceback (most recent call last):
  File "C:\Python\lib\asyncio\events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "C:\Python\lib\site-packages\aioice\stun.py", line 309, in __retry
    self.__future.set_exception(TransactionTimeout())
asyncio.exceptions.InvalidStateError: invalid state
Task exception was never retrieved
future: <Task finished name='Task-27689' coro=<RTCSctpTransport._data_channel_flush() done, defined at C:\Python\lib\site-packages\aiortc\rtcsctptransport.py:1642> exception=ConnectionError('Cannot send data, not connected')>
Traceback (most recent call last):
  File "C:\Python\lib\site-packages\aiortc\rtcsctptransport.py", line 1673, in _data_channel_flush
    await self._send(
  File "C:\Python\lib\site-packages\aiortc\rtcsctptransport.py", line 1330, in _send
    await self._transmit()
  File "C:\Python\lib\site-packages\aiortc\rtcsctptransport.py", line 1553, in _transmit
    await self._send_chunk(chunk)
  File "C:\Python\lib\site-packages\aiortc\rtcsctptransport.py", line 1337, in _send_chunk
    await self.__transport._send_data(
  File "C:\Python\lib\site-packages\aiortc\rtcdtlstransport.py", line 649, in _send_data
    await self._write_ssl()
  File "C:\Python\lib\site-packages\aiortc\rtcdtlstransport.py", line 691, in _write_ssl
    await self.transport._send(data)
  File "C:\Python\lib\site-packages\aioice\ice.py", line 604, in send
    await self.sendto(data, 1)
  File "C:\Python\lib\site-packages\aioice\ice.py", line 619, in sendto
    raise ConnectionError("Cannot send data, not connected")
ConnectionError: Cannot send data, not connected

When i connect the first browser peer everything seems to work ok.
But when i connect a second browser peer:

  1. There is a big video/audio latency
  2. After some seconds the up error happens and the connections are closed.

I don't use exactly 100% the code in one of the projects examples, but my code is based in the server example.

As for the first error this may be caused because i use my mobile phone as hotspot here in my village (i am on vacations) and the python server user is in another town. The second error also happened without using hotspot.

Can you please advice me with this error?

Note that i can give you some code if you ask.

Edit:

Here is the offer python method :

    async def offer(self, request):
        try:
            params = await request.json()

            peer_connection = {
                "name": params["name"],
                "surname": params["surname"],
                "pc": None,
                "is_closed": False,
                "dc": None,
                "uid": uuid.uuid4(),
                "audio_track": None,
                "audio_track_for_local_use": None,
                "audio_blackhole": None,
                "video_track": None,
                "video_track_for_local_use": None,
                "video_blackhole": None,
                "offer_in_progress": True,
                "call_answered": False,
                "manage_call_end_thread": None,
                "stop_in_progress": False,
                "call_number": None
            }

            if len(list(self.pcs.keys())) == 3:
                return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""}))

            reserved_call_numbers = list(self.pcs.keys())

            if 1 not in reserved_call_numbers:
                call_number = 1
            elif 2 not in reserved_call_numbers:
                call_number = 2
            else:
                call_number = 3

            peer_connection["call_number"] = call_number
            self.pcs[call_number] = peer_connection

            if call_number == 1:
                self.to_emitter.send({"type": "call_1_offering", "name": peer_connection["name"], "surname": peer_connection["surname"]})
                #data_from_mother = self.call_queues[0]
            elif call_number == 2:
                self.to_emitter.send({"type": "call_2_offering", "name": peer_connection["name"], "surname": peer_connection["surname"]})
                #data_from_mother = self.call_queues[1]
            else:
                self.to_emitter.send({"type": "call_3_offering", "name": peer_connection["name"], "surname": peer_connection["surname"]})
                #data_from_mother = self.call_queues[2]

            timer = 0
            while (timer < 30 and self.ip_calls_queue.qsize() == 0 and self.pcs[call_number]["call_answered"] == False):
                if request.transport is None or request.transport.is_closing():
                    self.to_emitter.send({"type":"transport-error","call-number":call_number})
                    try:
                        request.transport.close()
                    except:
                        pass
                    del self.pcs[call_number]
                    return None
                timer += 0.1
                await asyncio.sleep(0.1)
            self.pcs[call_number]["call_answered"] = True
            if self.ip_calls_queue.qsize() == 0:
                # reject offer
                while not self.ip_calls_queue.empty():
                    self.ip_calls_queue.get()
                if call_number == 1:
                    self.to_emitter.send({"type": "call-1-status", "status": "closed-by-server"})
                elif call_number == 2:
                    self.to_emitter.send({"type": "call-2-status", "status": "closed-by-server"})
                else:
                    self.to_emitter.send({"type": "call-3-status", "status": "closed-by-server"})
                await self.stop_peer_connection(peer_connection["uid"])
                return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""}))
            else:
                data = self.ip_calls_queue.get()
                if (data["type"] == "call-1" and data["call"] == "reject") or (
                        data["type"] == "call-2" and data["call"] == "reject") or (
                        data["type"] == "call-3" and data["call"] == "reject"):
                    # reject call
                    while not self.ip_calls_queue.empty():
                        self.ip_calls_queue.get()
                    await self.stop_peer_connection(peer_connection["uid"])
                    return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""}))
                elif (data["type"] == "call-1" and data["call"] == "answer") or (
                        data["type"] == "call-2" and data["call"] == "answer") or (
                        data["type"] == "call-3" and data["call"] == "answer"):
                    while not self.ip_calls_queue.empty():
                        self.ip_calls_queue.get()

                    #configuration = RTCConfiguration([RTCIceServer("stun:stun.l.google.com:19302")])
                    #self.pcs[call_number]["pc"] = RTCPeerConnection(configuration)
                    self.pcs[call_number]["pc"] = RTCPeerConnection()

                    @self.pcs[call_number]["pc"].on("connectionstatechange")
                    async def on_connectionstatechange():
                        try:
                            if self.pcs[call_number]["pc"].connectionState == "failed":
                                await self.stop_peer_connection(peer_connection["uid"])
                        except:
                            pass

                    @self.pcs[call_number]["pc"].on("datachannel")
                    async def on_datachannel(channel):
                        self.pcs[call_number]["dc"] = channel
                        try:
                            channel.send('{"type":"uid","uid":"' + str(peer_connection["uid"]) + '"}')
                        except:
                            pass

                        counter = 1
                        for pc_i_call_number in self.pcs:
                            try:
                                if pc_i_call_number != call_number:
                                    pc_i = self.pcs[pc_i_call_number]
                                    channel.send('{"type":"new-client","uid":"' + str(pc_i["uid"]) + '","name":"' + str(
                                        pc_i["name"]) + '","surname":"' + str(pc_i["surname"]) + '"}')
                                    counter += 1
                            except:
                                pass

                        @channel.on("message")
                        async def on_message(message):
                            message = json.loads(message)
                            if message["type"] == "disconnected":
                                await self.stop_peer_connection(peer_connection["uid"])

                    # audio from server to client
                    if self.server_audio_stream_offer == None:
                        self.server_audio_stream_offer = Server_Audio_Stream_Offer(self.speackers_deck_queue)
                    self.pcs[call_number]["pc"].addTrack(self.server_audio_stream_offer)

                    # video from server to client
                    if self.server_video_stream_offer == None:
                        self.server_video_stream_offer = self.create_local_tracks()
                    self.pcs[call_number]["pc"].addTrack(self.server_video_stream_offer)

                    # video from server attach to QLabel
                    if self.server_video_track == None:
                        self.server_video_track = WebCamera(self.server_video_stream_offer, self.to_emitter)
                    if self.server_video_blackholde == None:
                        self.server_video_blackholde = MediaBlackhole()
                        self.server_video_blackholde.addTrack(self.server_video_track)
                        await self.server_video_blackholde.start()

                    @self.pcs[call_number]["pc"].on("track")
                    async def on_track(track):
                        if track.kind == "audio":
                            self.pcs[call_number]["audio_track"] = track
                            # audio from client (server use)
                            self.pcs[call_number]["audio_track_for_local_use"] = ClientTrack(track, self, self.to_emitter,call_number)
                            self.pcs[call_number]["audio_blackhole"] = MediaBlackhole()
                            self.pcs[call_number]["audio_blackhole"].addTrack(
                                self.pcs[call_number]["audio_track_for_local_use"])
                            await self.pcs[call_number]["audio_blackhole"].start()
                        else:
                            self.pcs[call_number]["video_track"] = track
                            # video from client (server use)
                            self.pcs[call_number]["video_track_for_local_use"] = ClientWebCamera(track, self.to_emitter,
                                                                                                 call_number, self)
                            self.pcs[call_number]["video_blackhole"] = MediaBlackhole()
                            self.pcs[call_number]["video_blackhole"].addTrack(
                                self.pcs[call_number]["video_track_for_local_use"])
                            await self.pcs[call_number]["video_blackhole"].start()

                    offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])

                    # handle offer
                    await self.pcs[call_number]["pc"].setRemoteDescription(offer)

                    # send answer
                    answer = await self.pcs[call_number]["pc"].createAnswer()
                    await self.pcs[call_number]["pc"].setLocalDescription(answer)

                    # loop = asyncio.get_event_loop()
                    task = asyncio.ensure_future(self.manage_call_end(peer_connection["uid"]))
                    self.pcs[call_number]["manage_call_end_thread"] = task

                    # asyncio.ensure_future(self.statistics(peer_connection["pc"]))
                    return web.Response(content_type="application/json", text=json.dumps(
                        {"sdp": self.pcs[call_number]["pc"].localDescription.sdp,
                         "type": self.pcs[call_number]["pc"].localDescription.type}))
                else:
                    # reject call
                    while not self.ip_calls_queue.empty():
                        self.ip_calls_queue.get()
                    if call_number == 1:
                        self.to_emitter.send({"type": "call-1-status", "status": "closed-by-server"})
                    elif call_number == 2:
                        self.to_emitter.send({"type": "call-2-status", "status": "closed-by-server"})
                    else:
                        self.to_emitter.send({"type": "call-3-status", "status": "closed-by-server"})
                    await self.stop_peer_connection(peer_connection["uid"])
                    return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""}))
        except:
            error = traceback.format_exc()
            print(error)
            self.to_emitter.send({"type":"error","error_message":error})

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant