Skip to content

Commit

Permalink
rollback feature uploads and http changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris committed Nov 29, 2016
1 parent c2f4de7 commit 59ee643
Show file tree
Hide file tree
Showing 17 changed files with 50 additions and 276 deletions.
105 changes: 15 additions & 90 deletions kitsune/audio_features_upload_task.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "FreeRTOS.h"
#include "audio_features_upload_task.h"
#include "audio_features_upload_task_helpers.h"
#include <queue.h>

#include <stdbool.h>
#include "protobuf/simple_matrix.pb.h"
Expand All @@ -28,8 +27,6 @@
#define CIRCULAR_BUFFER_SIZE_BYTES (8192)
#define MAX_NUM_TICKS_TO_RESET (1 << 30)

#define QUEUE_LENGTH (1)
#define DELAY_TIME (10000)

/**************************
* STATIC VARIABLES
Expand All @@ -39,11 +36,7 @@ static volatile int _is_waiting_for_uploading = 0;
static char _id_buf[128];
static RateLimiter_t _ratelimiterdata = {MAX_UPLOADS_PER_PERIOD,TICKS_PER_UPLOAD,0,0,0};
static SimpleMatrix _mat;
static volatile bool _disabled_uploads = true;

//delayed send queue
static xQueueHandle _delayed_send_queue = NULL;
static xQueueHandle _wakeup_queue = NULL;

//meant to be called from the same thread that triggers the upload
void audio_features_upload_task_buffer_bytes(void * data, uint32_t len) {
Expand All @@ -60,34 +53,20 @@ void audio_features_upload_task_buffer_bytes(void * data, uint32_t len) {

}

void audio_features_upload_set_upload_status(bool enabled) {
_disabled_uploads = !enabled;
}

static void cleanup(hlo_stream_t * stream) {
//close stream, no matter if anything was transfered
if (stream) {
static void net_response(const NetworkResponse_t * response, char * reply_buf, int reply_sz,void * context) {
hlo_stream_t * stream = (hlo_stream_t *)context;

LOGI("audio_features_upload -- closing audio features stream\r\n");
LOGI("audio_features_upload -- closing audio features stream\r\n");

hlo_stream_close(stream);
}
//close stream, no matter if anything was transfered
hlo_stream_close(stream);

//set stream ready for being re-opened
_circstream = NULL;

//set that we are ready to upload again
_is_waiting_for_uploading = 0;

LOGI("audio_features_upload -- reset state\r\n");

}

static void net_response(const NetworkResponse_t * response, char * reply_buf, int reply_sz,void * context) {
hlo_stream_t * stream = (hlo_stream_t *)context;

cleanup(stream);

}

static SimpleMatrixDataType map_type(FeaturesPayloadType_t feats_type) {
Expand Down Expand Up @@ -123,60 +102,20 @@ static void setup_protbuf(SimpleMatrix * mat,hlo_stream_t * bytestream, const ch
mat->payload.funcs.encode = encode_repeated_streaming_bytes_and_mark_done;
mat->payload.arg = bytestream;

mat->device_id.funcs.encode = encode_device_id_string;

}
#define DBG_UPLOADER(...)

void audio_features_upload_task(void * not_used) {
int ctr;

_delayed_send_queue = xQueueCreate(QUEUE_LENGTH,sizeof(NetworkTaskServerSendMessage_t));
_wakeup_queue = xQueueCreate(2,sizeof(int));

NetworkTaskServerSendMessage_t message;

for (; ;) {

//only proceed if we get the last wakeup and no more wakups come during the delay time.
if( xQueueReceive( _wakeup_queue, &ctr, portMAX_DELAY ) ) {
DBG_UPLOADER("\n\t\t\t\t\t got something in queue! %d %d\n", uxQueueMessagesWaiting( _wakeup_queue ), ctr );
vTaskDelay( DELAY_TIME );
if( uxQueueMessagesWaiting( _wakeup_queue ) ) {
DBG_UPLOADER("\n\t\t\t\t\t messages in queue! %d\n", uxQueueMessagesWaiting( _wakeup_queue ) );
continue;
}
}
DBG_UPLOADER("\n\t\t\t\t\t uploadingsssss %d %d\n", uxQueueMessagesWaiting( _wakeup_queue ), ctr );

//if no delay, and there is a message
if ( xQueueReceive( _delayed_send_queue, &message, 0 ) ) {
LOGI("audio_features_upload -- sending\r\n");

//relay
if (NetworkTask_AddMessageToQueue(&message) == pdFALSE) {
LOGE("audio_features_upload -- UNABLE TO ADD TO NETWORK QUEUE\r\n");

cleanup((hlo_stream_t *) message.context);
} else {
//toggle state if adding message to queue was successful
_is_waiting_for_uploading = 1;
}
}
}
}


//triggers upload, called from same thread as "audio_features_upload_task_buffer_bytes"
void audio_features_upload_trigger_async_upload(const char * net_id,const char * keyword,const uint32_t num_cols,FeaturesPayloadType_t feats_type) {
static int ctr = 0;
NetworkTaskServerSendMessage_t netmessage;

if (_disabled_uploads) {
LOGI("audio_features_upload -- uploads are disabled, ignoring upload request\r\n");
if (is_rate_limited(&_ratelimiterdata,xTaskGetTickCount())) {
return;
}

if (_is_waiting_for_uploading) {
return;
}

memset(&netmessage,0,sizeof(netmessage));
memset(&_mat,0,sizeof(_mat));
Expand All @@ -191,30 +130,16 @@ void audio_features_upload_trigger_async_upload(const char * net_id,const char *
netmessage.response_callback = net_response;
netmessage.context = _circstream;

//if it fails it's because the queue is overflowing, and that's ok
++ctr;
xQueueSend(_wakeup_queue,&ctr,0);
DBG_UPLOADER("\n\t\t\t\t\t queuing %d %d\n", uxQueueMessagesWaiting( _wakeup_queue ), ctr );

if( uxQueueMessagesWaiting( _wakeup_queue ) > 1 ) {
LOGE("audio_features_upload -- too many waiting\r\n");
if (NetworkTask_AddMessageToQueue(&netmessage) == pdFALSE) {
LOGE("audio_features_upload -- UNABLE TO ADD TO NETWORK QUEUE\r\n");
return;
}

if (_is_waiting_for_uploading) {
LOGI("audio_features_upload -- upload already in the pipe, ignoring upload request\r\n");
return;
}
if (xQueueSend(_delayed_send_queue,&netmessage,0) == pdFALSE) {
LOGE("audio_features_upload -- UNABLE TO ADD TO QUEUE\r\n");
return;
}
if (is_rate_limited(&_ratelimiterdata,xTaskGetTickCount())) {
LOGI("audio_features_upload -- rate limited, ignoring upload request\r\n");
return;
}
LOGI("audio_features_upload -- added to network queue\r\n");

//toggle state if adding message to queue was successful
_is_waiting_for_uploading = 1;

LOGI("audio_features_upload -- added to delay queue\r\n");

}

Expand Down
3 changes: 0 additions & 3 deletions kitsune/audio_features_upload_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#define _AUDIO_FEATURES_UPLOAD_TASK_H_

#include "hlo_stream.h"
#include <stdbool.h>

typedef enum {
feats_sint8,
Expand All @@ -13,8 +12,6 @@ void audio_features_upload_task_buffer_bytes(void * data, uint32_t len);

void audio_features_upload_trigger_async_upload(const char * net_id,const char * keyword,const uint32_t num_cols,FeaturesPayloadType_t feats_type);

void audio_features_upload_set_upload_status(bool enabled);

void audio_features_upload_task(void * ctx);

#endif //_AUDIO_FEATURES_UPLOAD_TASK_H_
3 changes: 2 additions & 1 deletion kitsune/audiotask.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ static void _sense_state_task(hlo_future_t * result, void * ctx){
sense_state.has_voice_control_enabled = true;
sense_state.voice_control_enabled = !disable_voice;

LOGI("AudioState %s, %s\r\n", _playing ?"Playing":"Stopped", sense_state.audio_state.file_path);
LOGI("AudioState %s, %s\t\t\t\t\t\t %d \r\n",
_playing ?"Playing":"Stopped", sense_state.audio_state.file_path, sense_state.volume);
state_sent = NetworkTask_SendProtobuf(true, DATA_SERVER,
SENSE_STATE_ENDPOINT, SenseState_fields, &sense_state, 0,
NULL, NULL, NULL, true);
Expand Down
4 changes: 0 additions & 4 deletions kitsune/commands.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@
#include "octogram.h"

#include "audiohelper.h"
#include "audio_features_upload_task.h"

#define ONLY_MID 0

Expand Down Expand Up @@ -1787,9 +1786,6 @@ void launch_tasks() {

xTaskCreate(AudioControlTask, "AudioControl", 7*1024 / 4, NULL, 2, NULL);

//only network message is used in the stack of this task
xTaskCreate(audio_features_upload_task,"audioFeatsUpload",512/4,NULL,2,NULL);

}

int Cmd_boot(int argc, char *argv[]) {
Expand Down
3 changes: 1 addition & 2 deletions kitsune/endpoints.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ char * get_speech_server(void);
#define DEV_SPEECH_SERVER "dev-speech.hello.is"

#define SPEECH_ENDPOINT "/v2/upload/audio"
#define SPEECH_KEEPALIVE_ENDPOINT "/v2/ping"

#define MESSEJI_ENDPOINT "/receive"

Expand All @@ -41,7 +40,7 @@ char * get_speech_server(void);
#define DATA_RECEIVE_ENDPOINT "/in/sense/batch"
#define MORPHEUS_REGISTER_ENDPOINT "/register/morpheus"
#define PILL_REGISTER_ENDPOINT "/register/pill"
#define AUDIO_KEYWORD_FEATURES_ENDPOINT "/audio/keyword_features"
#define AUDIO_KEYWORD_FEATURES_ENDPOINT "/v1/audio/keyword_features"
#define RAW_AUDIO_ENDPOINT "/audio/raw"
#define PILL_DATA_RECEIVE_ENDPOINT "/in/pill"
#define CHECK_KEY_ENDPOINT "/check"
Expand Down
40 changes: 4 additions & 36 deletions kitsune/hlo_audio_tools.c
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,10 @@ typedef struct{
uint16_t reserved;
uint32_t timeout;
uint8_t is_speaking;
uint32_t keyword_begin_time;
}nn_keyword_ctx_t;

static void _voice_begin_keyword(void * ctx, Keyword_t keyword, int16_t value){
LOGI("KEYWORD BEGIN\n");
nn_keyword_ctx_t * p = (nn_keyword_ctx_t*)ctx;
}

bool cancel_alarm();
Expand All @@ -252,7 +250,6 @@ static void _voice_finish_keyword(void * ctx, Keyword_t keyword, int16_t value){
tinytensor_features_force_voice_activity_detection();
p->is_speaking = true;
p->speech_pb.has_word = true;
p->keyword_begin_time = xTaskGetTickCount();

switch (keyword ) {
case okay_sense:
Expand Down Expand Up @@ -306,14 +303,8 @@ static void _speech_detect_callback(void * context, SpeechTransition_t transitio
extern volatile int sys_volume;
int32_t set_volume(int v, unsigned int dly);
#define AUDIO_NET_RATE (AUDIO_SAMPLE_RATE/1024)
#define BASE_KEEPALIVE_INTERVAL (3 * 60 * 1000)
#define KEEPALIVE_INTERVAL_RANGE (60 * 1000)
uint32_t _next_keepalive_interval(uint32_t base, uint32_t range){
if (range == 0){
return base;
}
return base + (rand() % KEEPALIVE_INTERVAL_RANGE);
}


int hlo_filter_voice_command(hlo_stream_t * input, hlo_stream_t * output, void * ctx, hlo_stream_signal signal){
#define NSAMPLES 512
int ret = 0;
Expand Down Expand Up @@ -349,7 +340,6 @@ int hlo_filter_voice_command(hlo_stream_t * input, hlo_stream_t * output, void *
hlo_stream_t * send_str = hmac_payload_str;

uint32_t begin = xTaskGetTickCount();
uint32_t keepalive_interval = _next_keepalive_interval(BASE_KEEPALIVE_INTERVAL, KEEPALIVE_INTERVAL_RANGE);
uint32_t speech_detected_time;

while( (ret = hlo_stream_transfer_all(FROM_STREAM, input, (uint8_t*)samples, NUM_SAMPLES_TO_RUN_FFT*2, 4)) > 0 ){
Expand Down Expand Up @@ -384,9 +374,6 @@ int hlo_filter_voice_command(hlo_stream_t * input, hlo_stream_t * output, void *
ret = hlo_stream_transfer_all(INTO_STREAM, send_str, (uint8_t*)compressed, sizeof(compressed), 4);
if( ret < 0 ) {
break;
}else if(nn_ctx.keyword_begin_time){
analytics_event("{speech_connect_latency:%d}", xTaskGetTickCount() - nn_ctx.keyword_begin_time);
nn_ctx.keyword_begin_time = 0;
}
}
if (!nn_ctx.is_speaking) {
Expand All @@ -399,7 +386,6 @@ int hlo_filter_voice_command(hlo_stream_t * input, hlo_stream_t * output, void *
//workaround to refresh connection once time server responds
static TickType_t _last_refresh_check = 0;
if( xTaskGetTickCount() - _last_refresh_check > 1000 ) {
//check ip changed
static bool _had_ip = false;
bool have_ip = wifi_status_get(HAS_IP);
if( have_ip && !_had_ip ) {
Expand All @@ -410,7 +396,6 @@ int hlo_filter_voice_command(hlo_stream_t * input, hlo_stream_t * output, void *
}
_had_ip = have_ip;

//check time
static bool _had_time = false;
bool have_time = has_good_time();
if( have_time && !_had_time ) {
Expand All @@ -421,29 +406,12 @@ int hlo_filter_voice_command(hlo_stream_t * input, hlo_stream_t * output, void *
}
_had_time = have_time;
_last_refresh_check = xTaskGetTickCount();
#if 0
//check server reachable
if(xTaskGetTickCount() - begin > keepalive_interval){
begin = xTaskGetTickCount();
keepalive_interval = _next_keepalive_interval(BASE_KEEPALIVE_INTERVAL, KEEPALIVE_INTERVAL_RANGE);
int code = hlo_http_keep_alive(output, get_speech_server(), SPEECH_KEEPALIVE_ENDPOINT);
//int code = 0;
if( code != 200){
LOGW("Unable to reach Voice server. Retry...");
ret = HLO_STREAM_EAGAIN;
break;
}else{
LOGI("Voice server alive. Checking again in %d seconds!\r\n", keepalive_interval / 1000);
}
}
#endif
}//end connection health check
}
}

BREAK_ON_SIG(signal);

if(!nn_ctx.speech_pb.has_word &&
xTaskGetTickCount() - begin > keepalive_interval ) {
xTaskGetTickCount() - begin > 4*60*1000 ) {
ret = HLO_STREAM_EAGAIN;
break;
}
Expand Down
Loading

0 comments on commit 59ee643

Please sign in to comment.