Skip to content

Commit

Permalink
wip: A new "push" API for the payload.
Browse files Browse the repository at this point in the history
  • Loading branch information
bgrozev committed Jul 18, 2024
1 parent 452da85 commit 6ace4fe
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 20 deletions.
37 changes: 36 additions & 1 deletion src/main/java/org/ice4j/ice/Component.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

import org.ice4j.*;
import org.ice4j.socket.*;
import org.ice4j.util.*;
import org.jetbrains.annotations.*;
import org.jitsi.utils.logging2.*;

/**
Expand All @@ -39,7 +41,7 @@
* @author Boris Grozev
*/
public class Component
implements PropertyChangeListener
implements PropertyChangeListener, BufferHandler
{
/**
* The component ID to use with RTP streams.
Expand Down Expand Up @@ -153,6 +155,8 @@ public class Component
= Collections.newSetFromMap(
new ConcurrentHashMap<CandidatePair, Boolean>());

private BufferHandler bufferCallback = null;

/**
* Creates a new <tt>Component</tt> with the specified <tt>componentID</tt>
* as a child of the specified <tt>IceMediaStream</tt>.
Expand Down Expand Up @@ -1188,4 +1192,35 @@ public Logger getLogger()
{
return logger;
}

/**
* TODO
*/
@Override
public void handleBuffer(@NotNull Buffer buffer)
{
BufferHandler bufferCallback = this.bufferCallback;
if (bufferCallback == null)
{
logger.warn(
"The push API is used while no buffer callback has been set, dropping a packet (use-push-api="
+ AgentConfig.config.getUsePushApi() + ").");
BufferPool.returnBuffer.invoke(buffer);
return;
}

try
{
bufferCallback.handleBuffer(buffer);
}
catch (Exception e)
{
logger.warn("Buffer handling failed", e);
}
}

public void setBufferCallback(BufferHandler bufferCallback)
{
this.bufferCallback = bufferCallback;
}
}
142 changes: 131 additions & 11 deletions src/main/java/org/ice4j/ice/harvest/AbstractUdpListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

import org.ice4j.*;
import org.ice4j.attribute.*;
import org.ice4j.ice.*;
import org.ice4j.message.*;
import org.ice4j.socket.*;
import org.ice4j.util.*;
import org.jetbrains.annotations.*;
import org.jitsi.utils.queue.*;

import java.io.*;
Expand Down Expand Up @@ -68,6 +71,9 @@ public abstract class AbstractUdpListener
*/
private static final int POOL_SIZE = 256;

public static int BYTES_TO_LEAVE_AT_START_OF_PACKET = 0;
public static int BYTES_TO_LEAVE_AT_END_OF_PACKET = 0;

/**
* Returns the list of {@link TransportAddress}es, one for each allowed IP
* address found on each allowed network interface, with the given port.
Expand Down Expand Up @@ -244,14 +250,17 @@ protected AbstractUdpListener(TransportAddress localAddress)
}
logger.info(logMessage);

thread = new Thread()
thread = new Thread(() ->
{
@Override
public void run()
if (AgentConfig.config.getUsePushApi())
{
AbstractUdpListener.this.runInHarvesterThreadPush();
}
else
{
AbstractUdpListener.this.runInHarvesterThread();
}
};
});

thread.setName(AbstractUdpListener.class.getName() + " thread for " + this.localAddress);
thread.setDaemon(true);
Expand Down Expand Up @@ -335,8 +344,15 @@ private void runInHarvesterThread()
continue;
}

maybeAcceptNewSession(buf, remoteAddress, ufrag);
// Maybe add to #sockets here in the base class?
MySocket newSocket = maybeAcceptNewSession(buf, remoteAddress, ufrag);
if (newSocket == null)
{
pool.offer(buf);
}
else
{
newSocket.addBuffer(buf);
}
}
}
while (true);
Expand All @@ -349,6 +365,97 @@ private void runInHarvesterThread()
socket.close();
}

private void runInHarvesterThreadPush()
{
DatagramPacket pkt = new DatagramPacket(new byte[1500], 0, 1500);
MySocket destinationSocket;
InetSocketAddress remoteAddress;
Clock clock = Clock.systemUTC();
Instant receivedTime;

do
{
if (close)
{
break;
}

pkt.setData(pkt.getData(), 0, pkt.getData().length);

try
{
socket.receive(pkt);
receivedTime = clock.instant();
}
catch (IOException ioe)
{
if (!close)
{
logger.severe("Failed to receive from socket: " + ioe);
}
break;
}


remoteAddress = (InetSocketAddress) pkt.getSocketAddress();
destinationSocket = sockets.get(remoteAddress);
if (destinationSocket == null)
{
// Packet from an unknown source. Is it a STUN Binding Request?
String ufrag = getUfrag(pkt.getData(), pkt.getOffset(), pkt.getLength());
if (ufrag == null)
{
// Not a STUN Binding Request or doesn't have a valid USERNAME attribute. Drop it.
continue;
}

Buffer buffer = bufferFromPacket(pkt, receivedTime);
MySocket newSocket = maybeAcceptNewSession(buffer, remoteAddress, ufrag);
if (newSocket == null)
{
BufferPool.returnBuffer.invoke(buffer);
}
else
{
newSocket.addBuffer(buffer);
}
}
else
{
Buffer buf = bufferFromPacket(pkt, receivedTime);
if (StunDatagramPacketFilter.isStunPacket(pkt))
{
// STUN packets are made available to the DatagramSocket-based API used by ice4j internally.
destinationSocket.addBuffer(buf);
}
else
{
// Payload goes through the push API.
destinationSocket.bufferHandler.handleBuffer(buf);
}
}
}
while (true);

// now clean up and exit
for (MySocket candidateSocket : new ArrayList<>(sockets.values()))
{
candidateSocket.close();
}
socket.close();
}

private Buffer bufferFromPacket(DatagramPacket p, Instant receivedTime)
{
int off = BYTES_TO_LEAVE_AT_START_OF_PACKET;
Buffer buffer = BufferPool.getBuffer.invoke(off + p.getLength() + BYTES_TO_LEAVE_AT_END_OF_PACKET);

System.arraycopy(p.getData(), p.getOffset(), buffer.getBuffer(), off, p.getLength());
buffer.setReceivedTime(receivedTime);

return buffer;
}

/**
* Handles the reception of a STUN Binding Request with a valid USERNAME
* attribute, from a "new" remote address (one which is not in
Expand All @@ -367,7 +474,7 @@ private void runInHarvesterThread()
* @param ufrag the local ICE username fragment of the received STUN Binding
* Request.
*/
protected abstract void maybeAcceptNewSession(
protected abstract MySocket maybeAcceptNewSession(
Buffer buf,
InetSocketAddress remoteAddress,
String ufrag);
Expand Down Expand Up @@ -397,12 +504,13 @@ private Buffer getFreeBuffer()
* @param remoteAddress the remote address with which to associate the new
* socket instance.
* @param ufrag The username fragment associated with the socket.
* @param bufferHandler The handler to call when the push API is used.
* @return the created socket instance.
*/
protected MySocket addSocket(InetSocketAddress remoteAddress, String ufrag)
protected MySocket addSocket(InetSocketAddress remoteAddress, String ufrag, @NotNull BufferHandler bufferHandler)
throws SocketException
{
MySocket newSocket = new MySocket(remoteAddress, ufrag);
MySocket newSocket = new MySocket(remoteAddress, ufrag, bufferHandler);
sockets.put(remoteAddress, newSocket);
return newSocket;
}
Expand Down Expand Up @@ -447,20 +555,24 @@ protected class MySocket

private final String ufrag;

@NotNull
private final BufferHandler bufferHandler;

/**
* Initializes a new <tt>MySocket</tt> instance with the given
* remote address.
* @param remoteAddress the remote address to be associated with the
* new instance.
* @throws SocketException
*/
MySocket(InetSocketAddress remoteAddress, String ufrag)
MySocket(InetSocketAddress remoteAddress, String ufrag, @NotNull BufferHandler bufferHandler)
throws SocketException
{
// unbound
super((SocketAddress)null);

this.ufrag = ufrag;
this.bufferHandler = bufferHandler;
this.remoteAddress = remoteAddress;
if (logger.isLoggable(Level.FINEST))
{
Expand Down Expand Up @@ -653,7 +765,15 @@ public void receive(DatagramPacket p)
p.setLength(buf.getLength());
p.setSocketAddress(remoteAddress);

pool.offer(buf);
// TODO doc
if (AgentConfig.config.getUsePushApi())
{
BufferPool.returnBuffer.invoke(buf);
}
else
{
pool.offer(buf);
}
}

/**
Expand Down
17 changes: 10 additions & 7 deletions src/main/java/org/ice4j/ice/harvest/SinglePortUdpHarvester.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,40 +128,43 @@ public HarvestStatistics getHarvestStatistics()
* local ufrag of {@code ufrag}, and if one is found it accepts the new
* socket and adds it to the candidate.
*/
protected void maybeAcceptNewSession(Buffer buf,
protected MySocket maybeAcceptNewSession(Buffer buf,
InetSocketAddress remoteAddress,
String ufrag)
{
MyCandidate candidate = candidates.get(ufrag);
if (candidate == null)
{
// A STUN Binding Request with an unknown USERNAME. Drop it.
return;
return null;
}

// This is a STUN Binding Request destined for this
// specific Candidate/Component/Agent.
try
{
// 1. Create a socket for this remote address
// 2. Set-up de-multiplexing for future datagrams
// with this address to this socket.
MySocket newSocket = addSocket(remoteAddress, ufrag);
// 2. Set-up de-multiplexing for future datagrams with this address to this socket.
MySocket newSocket = addSocket(
remoteAddress,
ufrag,
candidate.getParentComponent());

// 3. Let the candidate and its STUN stack no about the
// new socket.
candidate.addSocket(newSocket, remoteAddress);

// 4. Add the original datagram to the new socket.
newSocket.addBuffer(buf);
return newSocket;
}
catch (SocketException se)
{
logger.info("Could not create a socket: " + se);
return null;
}
catch (IOException ioe)
{
logger.info("Failed to handle new socket: " + ioe);
return null;
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/main/kotlin/org/ice4j/ice/AgentConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ class AgentConfig {
"ice4j.use-component-socket".from(configSource)
}

val usePushApi: Boolean by config {
"org.ice4j.use-push-api".from(configSource)
}

companion object {
@JvmField
val config = AgentConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.ice4j.util

class Buffer(val buffer: ByteArray, var offset: Int, var length: Int)
import java.time.Instant

/**
* TODO
*/
class BufferPool {
companion object {
@JvmField
var getBuffer: (Int) -> Buffer = { size -> Buffer(ByteArray(size), 0, size) }

@JvmField
var returnBuffer: (Buffer) -> Unit = { }
}
}

class Buffer @JvmOverloads constructor(
val buffer: ByteArray,
var offset: Int,
var length: Int,
var receivedTime: Instant? = null
)

interface BufferHandler {
fun handleBuffer(buffer: Buffer)
}
2 changes: 2 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ ice4j {
// the socket instance from the desired [CandidatePair] must be used.
use-component-socket = true

use-push-api = false

// Whether remote IP addresses should be redacted in log messages
redact-remote-addresses = false

Expand Down

0 comments on commit 6ace4fe

Please sign in to comment.