Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple network stream support #986

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 27 additions & 16 deletions src/frontend/mosh-server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,18 @@
#include "select.h"
#include "timestamp.h"
#include "fatal_assert.h"
#include "multiplexer.h"

#ifndef _PATH_BSHELL
#define _PATH_BSHELL "/bin/sh"
#endif

#include "networktransport-impl.h"

typedef Network::Transport< Terminal::Complete, Network::UserStream > ServerConnection;
typedef Network::Transport< Network::MultiplexerStream, Network::MultiplexerStream > ServerConnection;

static void serve( int host_fd,
Network::MultiplexerStream &local,
Terminal::Complete &terminal,
ServerConnection &network,
long network_timeout,
Expand Down Expand Up @@ -422,11 +424,14 @@ static int run_server( const char *desired_ip, const char *desired_port,

/* open parser and terminal */
Terminal::Complete terminal( window_size.ws_col, window_size.ws_row );
Network::MultiplexerStream local({&terminal});

/* open network */
Network::UserStream blank;
Network::MultiplexerStream remote({&blank});

typedef shared::shared_ptr<ServerConnection> NetworkPointer;
NetworkPointer network( new ServerConnection( terminal, blank, desired_ip, desired_port ) );
NetworkPointer network( new ServerConnection( local, remote, desired_ip, desired_port ) );

network->set_verbose( verbose );
Select::set_verbose( verbose );
Expand Down Expand Up @@ -624,7 +629,7 @@ static int run_server( const char *desired_ip, const char *desired_port,
#endif

try {
serve( master, terminal, *network, network_timeout, network_signaled_timeout );
serve( master, local, terminal, *network, network_timeout, network_signaled_timeout );
} catch ( const Network::NetworkException &e ) {
fprintf( stderr, "Network exception: %s\n",
e.what() );
Expand All @@ -648,7 +653,7 @@ static int run_server( const char *desired_ip, const char *desired_port,
return 0;
}

static void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &network, long network_timeout, long network_signaled_timeout )
static void serve( int host_fd, Network::MultiplexerStream &local, Terminal::Complete &terminal, ServerConnection &network, long network_timeout, long network_signaled_timeout )
{
/* scale timeouts */
const uint64_t network_timeout_ms = static_cast<uint64_t>( network_timeout ) * 1000;
Expand Down Expand Up @@ -731,14 +736,14 @@ static void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &
if ( sel.read( network_fd ) ) {
/* packet received from the network */
network.recv();

/* is new user input available for the terminal? */
if ( network.get_remote_state_num() != last_remote_num ) {
last_remote_num = network.get_remote_state_num();


Network::UserStream us;
us.apply_string( network.get_remote_diff() );
us.apply_string( Network::MultiplexerStream::diffForStream(0, network.get_remote_diff()) );
/* apply userstream to terminal */
for ( size_t i = 0; i < us.size(); i++ ) {
const Parser::Action &action = us.get_action( i );
Expand Down Expand Up @@ -775,7 +780,9 @@ static void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &

/* update client with new state of terminal */
if ( !network.shutdown_in_progress() ) {
network.set_current_state( terminal );
terminal.reset_input();
local.set(0, &terminal);
network.set_current_state( local );
}
#if defined(HAVE_SYSLOG) || defined(HAVE_UTEMPTER)
#ifdef HAVE_UTEMPTER
Expand Down Expand Up @@ -833,12 +840,12 @@ static void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &
}
}
}

if ( (!network.shutdown_in_progress()) && sel.read( host_fd ) ) {
/* input from the host needs to be fed to the terminal */
const int buf_size = 16384;
char buf[ buf_size ];

/* fill buffer if possible */
ssize_t bytes_read = read( host_fd, buf, buf_size );

Expand All @@ -848,9 +855,11 @@ static void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &
network.start_shutdown();
} else {
terminal_to_host += terminal.act( string( buf, bytes_read ) );

/* update client with new state of terminal */
network.set_current_state( terminal );
terminal.reset_input();
local.set(0, &terminal);
network.set_current_state( local );
}
}

Expand All @@ -863,7 +872,7 @@ static void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &
if ( network_timeout_ms &&
network_timeout_ms <= time_since_remote_state ) {
idle_shutdown = true;
fprintf( stderr, "Network idle for %llu seconds.\n",
fprintf( stderr, "Network idle for %llu seconds.\n",
static_cast<unsigned long long>( time_since_remote_state / 1000 ) );
}
if ( sel.signal( SIGUSR1 )
Expand All @@ -881,7 +890,7 @@ static void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &
break;
}
}

/* quit if our shutdown has been acknowledged */
if ( network.shutdown_in_progress() && network.shutdown_acknowledged() ) {
break;
Expand All @@ -899,7 +908,7 @@ static void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &

#ifdef HAVE_UTEMPTER
/* update utmp if has been more than 30 seconds since heard from client */
if ( connected_utmp
if ( connected_utmp
&& time_since_remote_state > 30000 ) {
utempter_remove_record( host_fd );

Expand All @@ -913,7 +922,9 @@ static void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &

if ( terminal.set_echo_ack( now ) && !network.shutdown_in_progress() ) {
/* update client with new echo ack */
network.set_current_state( terminal );
terminal.reset_input();
local.set(0, &terminal);
network.set_current_state( local );
}

if ( !network.get_remote_state_num()
Expand Down
45 changes: 27 additions & 18 deletions src/frontend/stmclient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ void STMClient::shutdown( void )

/* Restore terminal and terminal-driver state */
swrite( STDOUT_FILENO, display.close().c_str() );

if ( tcsetattr( STDIN_FILENO, TCSANOW, &saved_termios ) < 0 ) {
perror( "tcsetattr" );
exit( 1 );
Expand Down Expand Up @@ -237,7 +237,7 @@ void STMClient::main_init( void )
if ( ioctl( STDIN_FILENO, TIOCGWINSZ, &window_size ) < 0 ) {
perror( "ioctl TIOCGWINSZ" );
return;
}
}

/* local state */
local_framebuffer = Terminal::Framebuffer( window_size.ws_col, window_size.ws_row );
Expand All @@ -248,14 +248,23 @@ void STMClient::main_init( void )
swrite( STDOUT_FILENO, init.data(), init.size() );

/* open network */
Network::UserStream blank;
Terminal::Complete local_terminal( window_size.ws_col, window_size.ws_row );
network = NetworkPointer( new NetworkType( blank, local_terminal, key.c_str(), ip.c_str(), port.c_str() ) );
vector<Network::Stream*> localStreams = {
new Network::UserStream(),
};

vector<Network::Stream*> remoteStreams = {
new Terminal::Complete( window_size.ws_col, window_size.ws_row ),
};

Network::MultiplexerStream local(localStreams);
Network::MultiplexerStream remote(remoteStreams);

network = NetworkPointer( new NetworkType( local, remote, key.c_str(), ip.c_str(), port.c_str() ) );

network->set_send_delay( 1 ); /* minimal delay on outgoing keystrokes */

/* tell server the size of the terminal */
network->get_current_state().push_back( Parser::Resize( window_size.ws_col, window_size.ws_row ) );
network->get_current_state().stream<Network::UserStream>(0)->push_back( Parser::Resize( window_size.ws_col, window_size.ws_row ) );

/* be noisy as necessary */
network->set_verbose( verbose );
Expand All @@ -269,7 +278,7 @@ void STMClient::output_new_frame( void )
}

/* fetch target state */
new_state = network->get_latest_remote_state().state.get_fb();
new_state = network->get_latest_remote_state().state.stream<Terminal::Complete>(0)->get_fb();

/* apply local overlays */
overlays.apply( new_state );
Expand All @@ -288,14 +297,14 @@ void STMClient::output_new_frame( void )
void STMClient::process_network_input( void )
{
network->recv();

/* Now give hints to the overlays */
overlays.get_notification_engine().server_heard( network->get_latest_remote_state().timestamp );
overlays.get_notification_engine().server_acked( network->get_sent_state_acked_timestamp() );

overlays.get_prediction_engine().set_local_frame_acked( network->get_sent_state_acked() );
overlays.get_prediction_engine().set_send_interval( network->send_interval() );
overlays.get_prediction_engine().set_local_frame_late_acked( network->get_latest_remote_state().state.get_echo_ack() );
overlays.get_prediction_engine().set_local_frame_late_acked( network->get_latest_remote_state().state.stream<Terminal::Complete>(0)->get_echo_ack() );
}

bool STMClient::process_user_input( int fd )
Expand Down Expand Up @@ -360,11 +369,11 @@ bool STMClient::process_user_input( int fd )
} else if ( (the_byte == escape_pass_key) || (the_byte == escape_pass_key2) ) {
/* Emulation sequence to type escape_key is escape_key +
escape_pass_key (that is escape key without Ctrl) */
net.get_current_state().push_back( Parser::UserByte( escape_key ) );
net.get_current_state().stream<Network::UserStream>(0)->push_back( Parser::UserByte( escape_key ) );
} else {
/* Escape key followed by anything other than . and ^ gets sent literally */
net.get_current_state().push_back( Parser::UserByte( escape_key ) );
net.get_current_state().push_back( Parser::UserByte( the_byte ) );
net.get_current_state().stream<Network::UserStream>(0)->push_back( Parser::UserByte( escape_key ) );
net.get_current_state().stream<Network::UserStream>(0)->push_back( Parser::UserByte( the_byte ) );
}

quit_sequence_started = false;
Expand All @@ -389,7 +398,7 @@ bool STMClient::process_user_input( int fd )
repaint_requested = true;
}

net.get_current_state().push_back( Parser::UserByte( the_byte ) );
net.get_current_state().stream<Network::UserStream>(0)->push_back( Parser::UserByte( the_byte ) );
}

return true;
Expand All @@ -402,16 +411,16 @@ bool STMClient::process_resize( void )
perror( "ioctl TIOCGWINSZ" );
return false;
}

/* tell remote emulator */
Parser::Resize res( window_size.ws_col, window_size.ws_row );

if ( !network->shutdown_in_progress() ) {
network->get_current_state().push_back( res );
network->get_current_state().stream<Network::UserStream>(0)->push_back( res );
}

/* note remote emulator will probably reply with its own Resize to adjust our state */

/* tell prediction engine */
overlays.get_prediction_engine().reset();

Expand Down Expand Up @@ -478,7 +487,7 @@ bool STMClient::main( void )
if ( network_ready_to_read ) {
process_network_input();
}

if ( sel.read( STDIN_FILENO ) && !process_user_input( STDIN_FILENO ) ) { /* input from the user needs to be fed to the network */
if ( !network->has_remote_addr() ) {
break;
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/stmclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "user.h"
#include "shared.h"
#include "terminaloverlay.h"
#include "multiplexer.h"

class STMClient {
private:
Expand All @@ -61,7 +62,7 @@ class STMClient {

Terminal::Framebuffer local_framebuffer, new_state;
Overlay::OverlayManager overlays;
typedef Network::Transport< Network::UserStream, Terminal::Complete > NetworkType;
typedef Network::Transport< Network::MultiplexerStream, Network::MultiplexerStream > NetworkType;
typedef shared::shared_ptr< NetworkType > NetworkPointer;
NetworkPointer network;
Terminal::Display display;
Expand Down Expand Up @@ -122,7 +123,7 @@ class STMClient {
}
if ( predict_overwrite && !strcmp( predict_overwrite, "yes" ) ) {
overlays.get_prediction_engine().set_predict_overwrite( true );
}
}
}

void init( void );
Expand Down
1 change: 0 additions & 1 deletion src/network/transportsender.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ namespace Network {
{
assert( !shutdown_in_progress );
current_state = x;
current_state.reset_input();
}
void set_verbose( unsigned int s_verbose ) { verbose = s_verbose; }

Expand Down
2 changes: 1 addition & 1 deletion src/protobufs/Makefile.am
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
source = userinput.proto hostinput.proto transportinstruction.proto
source = userinput.proto hostinput.proto transportinstruction.proto stream.proto

AM_CPPFLAGS = $(protobuf_CFLAGS)
AM_CXXFLAGS = $(WARNING_CXXFLAGS) $(HARDEN_CFLAGS) $(MISC_CXXFLAGS) -Wno-error
Expand Down
9 changes: 9 additions & 0 deletions src/protobufs/stream.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
syntax = "proto2";

option optimize_for = LITE_RUNTIME;

package StreamBuffers;

message StreamMessage {
repeated string diffs = 1;
}
2 changes: 1 addition & 1 deletion src/statesync/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ AM_CXXFLAGS = $(WARNING_CXXFLAGS) $(PICKY_CXXFLAGS) $(HARDEN_CFLAGS) $(MISC_CXXF

noinst_LIBRARIES = libmoshstatesync.a

libmoshstatesync_a_SOURCES = completeterminal.cc completeterminal.h user.cc user.h
libmoshstatesync_a_SOURCES = completeterminal.cc completeterminal.h user.cc user.h multiplexer.cc multiplexer.h stream.h
10 changes: 6 additions & 4 deletions src/statesync/completeterminal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ string Complete::act( const string &str )
for ( unsigned int i = 0; i < str.size(); i++ ) {
/* parse octet into up to three actions */
parser.input( str[ i ], actions );

/* apply actions to terminal and delete them */
for ( Actions::iterator it = actions.begin();
it != actions.end();
Expand All @@ -69,8 +69,9 @@ string Complete::act( const Action &act )
}

/* interface for Network::Transport */
string Complete::diff_from( const Complete &existing ) const
string Complete::diff_from( const Network::Stream &existingStream ) const
{
const Complete &existing = dynamic_cast<const Complete&>(existingStream);
HostBuffers::HostMessage output;

if ( existing.get_echo_ack() != get_echo_ack() ) {
Expand All @@ -92,7 +93,7 @@ string Complete::diff_from( const Complete &existing ) const
new_inst->MutableExtension( hostbytes )->set_hoststring( update );
}
}

return output.SerializeAsString();
}

Expand Down Expand Up @@ -121,8 +122,9 @@ void Complete::apply_string( const string & diff )
}
}

bool Complete::operator==( Complete const &x ) const
bool Complete::operator==( Network::Stream const &xStream ) const
{
const Complete &x = dynamic_cast<const Complete&>(xStream);
// assert( parser == x.parser ); /* parser state is irrelevant for us */
return (terminal == x.terminal) && (echo_ack == x.echo_ack);
}
Expand Down
Loading