Need some help for using Port Forward feature #1411
Replies: 10 comments
-
sdk should work the same as kubectl from you ssh log, seems the connection stuck. did you test other algo? |
Beta Was this translation helpful? Give feedback.
-
Thank you for the reply. |
Beta Was this translation helpful? Give feedback.
-
ok allow me some time to repro and see if any bug in sdk |
Beta Was this translation helpful? Give feedback.
-
Thank you so much! |
Beta Was this translation helpful? Give feedback.
-
Can you send your port forwarding code? Did you see the example here: https://github.com/kubernetes-client/csharp/blob/master/examples/portforward/PortForward.cs#L41 Are you making sure to spawn multiple threads to handle the reading/writing simultaneously in both directions? Also you need to make sure there's no contention between those threads. |
Beta Was this translation helpful? Give feedback.
-
I have been working on a new port forwarder that you can try. Once I get it stabilized, I will PR it over here. using k8s;
using System.Net.NetworkInformation;
using System.Net;
using System.Net.Sockets;
namespace KubeUI.Core.Client;
public class PortForwarder
{
IKubernetes client;
TcpListener listener;
public string PodName { get; }
public string Namespace { get; }
public int ContainerPort { get; }
public int LocalPort { get; }
public string Status { get; private set; }
public PortForwarder(IKubernetes client, string podName, string @namespace, int containerPort, int localPort)
{
this.client = client;
PodName = podName;
Namespace = @namespace;
ContainerPort = containerPort;
LocalPort = localPort;
listener = new TcpListener(IPAddress.Loopback, localPort);
}
public void Start()
{
if (!IsPortAvailable(LocalPort))
{
Status = "Local port is busy";
return;
}
listener.Start();
listener.BeginAcceptSocket(new AsyncCallback(ClientConnected), null);
Status = "Active";
}
public void Stop()
{
listener.Stop();
Status = "Inactive";
}
private void ClientConnected(IAsyncResult result)
{
var socket = listener.EndAcceptSocket(result);
Task.Run(async () => await HandleConnection(socket));
listener.BeginAcceptSocket(new AsyncCallback(ClientConnected), null);
}
private async Task HandleConnection(Socket socket)
{
using var webSocket = await client.WebSocketNamespacedPodPortForwardAsync(PodName, Namespace, new int[] { ContainerPort }, "v4.channel.k8s.io");
using var demux = new StreamDemuxer(webSocket, StreamType.PortForward);
demux.Start();
using var stream = demux.GetStream((byte?)0, (byte?)0);
var read = Task.Run(() =>
{
var buffer = new byte[4096];
while (SocketConnected(socket))
{
var bytesReceived = socket.Receive(buffer);
stream.Write(buffer, 0, bytesReceived);
}
});
var write = Task.Run(() =>
{
var buffer = new byte[4096];
while (SocketConnected(socket))
{
var bytesReceived = stream.Read(buffer, 0, 4096);
socket.Send(buffer, bytesReceived, 0);
}
});
await read;
await write;
socket.Close();
}
private static bool IsPortAvailable(int port)
{
var properties = IPGlobalProperties.GetIPGlobalProperties();
var activeTcpListeners = properties.GetActiveTcpListeners();
return !activeTcpListeners.Any(x => x.Port == port);
}
private static bool SocketConnected(Socket s)
{
var part1 = s.Poll(1000, SelectMode.SelectRead);
var part2 = s.Available == 0;
if (part1 && part2)
return false;
else
return true;
}
} |
Beta Was this translation helpful? Give feedback.
-
Here is the port forwarded code I used. I had to cleanup the stuff that were not relevant. using k8s;
using Serilog;
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
namespace VWAC.VirtualVehicles.Support
{
public class ForwardedPort : IDisposable
{
private const int ReceiveBufferSize = 0xFFFF;
private const int SendBufferSize = 0xFFFF;
private readonly List<TcpConnection> _tcpConnections = new();
private readonly CancellationTokenSource _instanceCts = new CancellationTokenSource();
private TcpListener _tcpListener;
public ForwardedPort(int localPort)
{
LocalPort = localPort;
}
public Guid Id { get; }
public Guid ServiceId { get; }
public Guid VehicleId { get; }
public string TargetNodeName { get; }
public string TargetProtocol { get; }
public int? TargetPort { get; }
public int LocalPort { get; }
public void Initialize()
{
_tcpListener = new TcpListener(IPAddress.Loopback, LocalPort);
_tcpListener.Start();
_ = Task.Factory.StartNew(RunTcpListener);
}
private async Task RunTcpListener()
{
while (!_instanceCts.IsCancellationRequested)
{
TcpClient tcpClient = null;
TcpConnection tcpConnection = null;
try
{
tcpClient = await _tcpListener.AcceptTcpClientAsync();
tcpClient.NoDelay = true;
tcpClient.ReceiveBufferSize = ReceiveBufferSize;
tcpClient.SendBufferSize = SendBufferSize;
tcpConnection = new TcpConnection { TcpClient = tcpClient, PodSocketStream = await GetPodStream(), Stream = tcpClient.GetStream() };
tcpConnection.TunnelReceiveTask = Task.Factory.StartNew(() => RunTcpReceiveTunnel(tcpConnection));
tcpConnection.TunnelSendTask = Task.Factory.StartNew(() => RunTcpSendTunnel(tcpConnection));
lock (_tcpConnections)
_tcpConnections.Add(tcpConnection);
}
catch (Exception ex)
{
// TODO: Setup better handling + logging
if (tcpClient != null)
{
tcpClient.Dispose();
}
if (tcpConnection != null)
tcpConnection.Dispose();
}
}
}
private async Task RunTcpSendTunnel(TcpConnection tcpConnection)
{
using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_instanceCts.Token, tcpConnection.ConnectionCancellationToken);
try
{
await RunTcpTunnel(new byte[SendBufferSize], tcpConnection.Stream, tcpConnection.PodSocketStream, cancellationTokenSource.Token);
}
catch
{
tcpConnection.Cancel();
// TODO: Setup better handling + logging and closing of the connection
}
finally
{
tcpConnection.Dispose(); // Only need to dispose once, so the send tunnel does it
}
lock (_tcpConnections)
_tcpConnections.Remove(tcpConnection);
}
private async Task RunTcpReceiveTunnel(TcpConnection tcpConnection)
{
using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_instanceCts.Token, tcpConnection.ConnectionCancellationToken);
try
{
await RunTcpTunnel(new byte[ReceiveBufferSize], tcpConnection.PodSocketStream, tcpConnection.Stream, cancellationTokenSource.Token);
}
catch
{
tcpConnection.Cancel();
// TODO: Setup better handling + logging and closing of the connection
}
finally
{
//await tcpConnection.DisposeAsync(); // Only need to dispose once, so the send tunnel does it
}
lock (_tcpConnections)
_tcpConnections.Remove(tcpConnection);
}
private static async Task RunTcpTunnel(byte[] buffer, Stream readStream, Stream writeStream, CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
{
var readAmount = await readStream.ReadAsync(buffer, 0, buffer.Length, cancellationToken);
if (readAmount <= 0)
{
break;
}
await writeStream.WriteAsync(buffer, 0, readAmount, cancellationToken);
}
}
private async Task<Stream> GetPodStream()
{
var config = KubernetesClientConfiguration.BuildConfigFromConfigFile();
var client = new Kubernetes(config);
var webSocket = await client.WebSocketNamespacedPodPortForwardAsync("vv-4321f07e-124f-4b3f-b8e3-5402074c2c19-0", "virtual-vehicle", new int[] { 22 }, "v4.channel.k8s.io");
var demux = new StreamDemuxer(webSocket, StreamType.PortForward);
demux.Start();
var stream = demux.GetStream((byte?)0, (byte?)0);
return stream;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
// Determine if waiting for tcp listener task to complete is necessary
_tcpListener.Stop();
_instanceCts.Cancel();
TcpConnection[] connections;
lock (_tcpConnections)
connections = _tcpConnections.ToArray();
foreach (var tcpConnection in connections)
tcpConnection.Dispose();
_instanceCts.Dispose();
}
private class TcpConnection : IDisposable
{
private readonly CancellationTokenSource _connectionCts = new();
public CancellationToken ConnectionCancellationToken => _connectionCts.Token;
public TcpClient TcpClient { get; set; }
public Task TunnelReceiveTask { get; set; }
public Task TunnelSendTask { get; set; }
public Stream PodSocketStream { get; set; }
public NetworkStream Stream { get; set; }
public void Cancel()
{
try
{
_connectionCts.Cancel();
}
catch (Exception ex)
{
Log.Error(ex, "Cancel Error");
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
// TODO: Determine if waiting for tunnel receive/send task completion is necessary
try
{
_connectionCts.Dispose();
}
catch (Exception ex)
{
Log.Error(ex, "connectionCts Dispose Error");
}
TcpClient.Close();
TcpClient.Dispose();
Stream.Dispose();
}
}
}
} |
Beta Was this translation helpful? Give feedback.
-
@IvanJosipovic Thank you. I will give it a try. |
Beta Was this translation helpful? Give feedback.
-
@IvanJosipovic Good news!! The code you provided totally works!!! I am wondering what are the changes that make it working? |
Beta Was this translation helpful? Give feedback.
-
thanks all, could you please send a pr of working example code? |
Beta Was this translation helpful? Give feedback.
-
Hi all, here is an issue I bump into while using the WebSocketNamespacedPodPortForwardAsync() in the client.
My usecase is that I want to create a tunnel between the local port and the ssh port (22) on the pod. I try using the
Kubectl port-forward
command, and it's working just fine. When I try using the port-forward function from the Kubernetes's client, things are not working properly.Here is the ssh debug logging when I try to ssh into the pod:
//
OpenSSH_9.1p1, OpenSSL 1.1.1s 1 Nov 2022
debug1: Reading configuration data /c/Users/LeeTony(USAC-ER)/.ssh/config
debug1: Reading configuration data /etc/ssh/ssh_config
debug1: Connecting to 127.0.0.1 [127.0.0.1] port 22001.
debug1: Connection established.
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_rsa type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_rsa-cert type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_ecdsa type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_ecdsa-cert type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_ecdsa_sk type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_ecdsa_sk-cert type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_ed25519 type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_ed25519-cert type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_ed25519_sk type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_ed25519_sk-cert type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_xmss type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_xmss-cert type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_dsa type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_dsa-cert type -1
debug1: Local version string SSH-2.0-OpenSSH_9.1
debug1: Remote protocol version 2.0, remote software version OpenSSH_8.9p1 Ubuntu-3ubuntu0.4
debug1: compat_banner: match: OpenSSH_8.9p1 Ubuntu-3ubuntu0.4 pat OpenSSH compat 0x04000000
debug1: Authenticating to 127.0.0.1:22001 as 'tonylee'
debug1: load_hostkeys: fopen /c/Users/LeeTony(USAC-ER)/.ssh/known_hosts2: No such file or directory
debug1: load_hostkeys: fopen /etc/ssh/ssh_known_hosts: No such file or directory
debug1: load_hostkeys: fopen /etc/ssh/ssh_known_hosts2: No such file or directory
debug1: SSH2_MSG_KEXINIT sent
debug1: SSH2_MSG_KEXINIT received
debug1: kex: algorithm: [email protected]
debug1: kex: host key algorithm: ssh-ed25519
debug1: kex: server->client cipher: [email protected] MAC: compression: none
debug1: kex: client->server cipher: [email protected] MAC: compression: none
debug1: expecting SSH2_MSG_KEX_ECDH_REPLY
/*/
It seems like the connection does get established, but it just stuck on waiting for the response of "SSH2_MSG_KEX_ECDH_REPLY". I tried googling the issue and tried what people suggested, but it still didn't work.
I am wondering if anyone has similar issue or knows what kind of issues it is and could point me to a direction.
Thank you all.
Beta Was this translation helpful? Give feedback.
All reactions