Skip to content

Commit

Permalink
Add app id and flow id to common ids and logs
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Wahl <[email protected]>
  • Loading branch information
mfelsche authored and Licenser committed Feb 9, 2023
1 parent 1e51261 commit 6439ac0
Show file tree
Hide file tree
Showing 97 changed files with 1,278 additions and 1,053 deletions.
2 changes: 1 addition & 1 deletion src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ pub fn resolve(config: &config::Codec) -> Result<Box<dyn Codec>> {
"binflux" => Ok(Box::<binflux::BInflux>::default()),
"csv" => Ok(Box::new(csv::Csv {})),
"dogstatsd" => Ok(Box::<dogstatsd::DogStatsD>::default()),
"influx" => Ok(Box::new(influx::Influx {})),
"json" => json::from_config(config.config.as_ref()),
"msgpack" => Ok(Box::new(msgpack::MsgPack {})),
"influx" => Ok(Box::new(influx::Influx {})),
"null" => Ok(Box::new(null::Null {})),
"statsd" => Ok(Box::<statsd::StatsD>::default()),
"string" => Ok(Box::new(string::String {})),
Expand Down
8 changes: 4 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ mod tests {
use serde::Deserialize;

use super::*;
use crate::{errors::Result, system::flow};
use crate::{errors::Result, ids::FlowInstanceId};

#[test]
fn test_reconnect_serde() -> Result<()> {
Expand Down Expand Up @@ -351,7 +351,7 @@ mod tests {
#[test]
fn test_config_builtin_preproc_with_config() -> Result<()> {
let c = Connector::from_config(
&Alias::new("flow", "my_otel_client"),
&Alias::new(FlowInstanceId::new("app", "flow"), "my_otel_client"),
ConnectorType::from("otel_client".to_string()),
&literal!({
"preprocessors": [ {"name": "snot", "config": { "separator": "\n" }}],
Expand Down Expand Up @@ -382,10 +382,10 @@ mod tests {
"reconnect": {},
"metrics_interval_s": "wrong_type"
});
let id = Alias::new(flow::Alias::new("flow"), "my_id");
let id = Alias::new(FlowInstanceId::new("app", "flow"), "my_id");
let res = Connector::from_config(&id, "fancy_schmancy".into(), &config);
assert!(res.is_err());
assert_eq!(String::from("Invalid Definition for connector \"flow::my_id\": Expected type I64 for key metrics_interval_s but got String"), res.err().map(|e| e.to_string()).unwrap_or_default());
assert_eq!(String::from("Invalid Definition for connector \"app/flow::my_id\": Expected type I64 for key metrics_interval_s but got String"), res.err().map(|e| e.to_string()).unwrap_or_default());
}

#[test]
Expand Down
59 changes: 37 additions & 22 deletions src/connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,35 +29,38 @@ mod google;
#[cfg(test)]
pub(crate) mod tests;

use self::metrics::{SinkReporter, SourceReporter};
use self::sink::{SinkAddr, SinkContext, SinkMsg};
use self::source::{SourceAddr, SourceContext, SourceMsg};
use self::utils::quiescence::QuiescenceBeacon;
use self::{prelude::Attempt, utils::reconnect};
use self::{
sink::{SinkAddr, SinkContext, SinkMsg},
utils::{metrics::SourceReporter, reconnect::ConnectionLostNotifier},
};
use self::{
source::{SourceAddr, SourceContext, SourceMsg},
utils::{metrics::SinkReporter, reconnect::ReconnectRuntime},
};
pub(crate) use crate::config::Connector as ConnectorConfig;
use crate::{
channel::{bounded, Sender},
errors::{connector_send_err, Error, Kind as ErrorKind, Result},
ids::FlowInstanceId,
instance::State,
log_error, pipeline, qsize,
system::{flow, KillSwitch, Runtime},
system::{KillSwitch, Runtime},
};
use beef::Cow;
use futures::Future;
use halfbrown::HashMap;
use simd_json::{Builder, Mutable, ValueAccess};
use std::{fmt::Display, time::Duration};
use tokio::task::{self, JoinHandle};
use tremor_common::{
ids::{ConnectorId, ConnectorIdGen, SourceId},
ports::{Port, ERR, IN, OUT},
uids::{ConnectorUId, ConnectorUIdGen, SourceUId},
};
use tremor_pipeline::METRICS_CHANNEL;
use tremor_script::ast::DeployEndpoint;
use tremor_value::Value;
use utils::reconnect::{Attempt, ConnectionLostNotifier, ReconnectRuntime};
use value_trait::{Builder, Mutable, ValueAccess};

/// quiescence stuff
pub(crate) use utils::{metrics, reconnect};

/// Accept timeout
pub(crate) const ACCEPT_TIMEOUT: Duration = Duration::from_millis(100);
Expand Down Expand Up @@ -314,6 +317,7 @@ pub(crate) trait Context: Display + Clone {
/// connector context
#[derive(Clone)]
pub(crate) struct ConnectorContext {
node_id: openraft::NodeId,
/// alias of the connector instance
pub(crate) alias: Alias,
/// type of the connector
Expand All @@ -326,7 +330,7 @@ pub(crate) struct ConnectorContext {

impl Display for ConnectorContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[Connector::{}]", &self.alias)
write!(f, "[Node::{}][Connector::{}]", self.node_id, &self.alias)
}
}

Expand Down Expand Up @@ -421,26 +425,35 @@ pub(crate) type Known =
/// # Errors
/// if the connector can not be built or the config is invalid
pub(crate) async fn spawn(
node_id: openraft::NodeId,
alias: &Alias,
connector_id_gen: &mut ConnectorIdGen,
connector_id_gen: &mut ConnectorUIdGen,
builder: &dyn ConnectorBuilder,
config: ConnectorConfig,
kill_switch: &KillSwitch,
) -> Result<Addr> {
// instantiate connector
let connector = builder.build(alias, &config, kill_switch).await?;
let r = connector_task(alias.clone(), connector, config, connector_id_gen.next_id()).await?;
let r = connector_task(
node_id,
alias.clone(),
connector,
config,
connector_id_gen.next_id(),
)
.await?;

Ok(r)
}

#[allow(clippy::too_many_lines)]
// instantiates the connector and starts listening for control plane messages
async fn connector_task(
node_id: openraft::NodeId,
alias: Alias,
mut connector: Box<dyn Connector>,
config: ConnectorConfig,
uid: ConnectorId,
uid: ConnectorUId,
) -> Result<Addr> {
let qsize = qsize();
// channel for connector-level control plane communication
Expand All @@ -465,12 +478,13 @@ async fn connector_task(
.into());
}
let source_builder = source::builder(
SourceId::from(uid),
SourceUId::from(uid),
&config,
codec_requirement,
source_metrics_reporter,
)?;
let source_ctx = SourceContext {
node_id,
alias: alias.clone(),
uid: uid.into(),
connector_type: config.connector_type.clone(),
Expand All @@ -485,6 +499,7 @@ async fn connector_task(
);
let sink_builder = sink::builder(&config, codec_requirement, &alias, sink_metrics_reporter)?;
let sink_ctx = SinkContext::new(
node_id,
uid.into(),
alias.clone(),
config.connector_type.clone(),
Expand All @@ -504,11 +519,11 @@ async fn connector_task(
sink: sink_addr,
};

let mut reconnect: ReconnectRuntime =
ReconnectRuntime::new(&connector_addr, notifier.clone(), &config.reconnect);
let mut reconnect = ReconnectRuntime::new(&connector_addr, notifier.clone(), &config.reconnect);
let notifier = reconnect.notifier();

let ctx = ConnectorContext {
node_id,
alias: alias.clone(),
connector_type: config.connector_type.clone(),
quiescence_beacon: quiescence_beacon.clone(),
Expand Down Expand Up @@ -1128,13 +1143,13 @@ where
/// unique instance alias/id of a connector within a deployment
#[derive(Debug, PartialEq, PartialOrd, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct Alias {
flow_alias: flow::Alias,
flow_alias: FlowInstanceId,
connector_alias: String,
}

impl Alias {
/// construct a new `ConnectorId` from the id of the containing flow and the connector instance id
pub fn new(flow_alias: impl Into<flow::Alias>, connector_alias: impl Into<String>) -> Self {
pub fn new(flow_alias: impl Into<FlowInstanceId>, connector_alias: impl Into<String>) -> Self {
Self {
flow_alias: flow_alias.into(),
connector_alias: connector_alias.into(),
Expand All @@ -1143,7 +1158,7 @@ impl Alias {

/// get a reference to the flow alias
#[must_use]
pub fn flow_alias(&self) -> &flow::Alias {
pub fn flow_alias(&self) -> &FlowInstanceId {
&self.flow_alias
}

Expand Down Expand Up @@ -1296,7 +1311,7 @@ where

#[cfg(test)]
pub(crate) mod unit_tests {
use crate::system::flow;
use crate::ids::FlowInstanceId;

use super::*;

Expand All @@ -1312,7 +1327,7 @@ pub(crate) mod unit_tests {
pub(crate) fn new(tx: Sender<Msg>) -> Self {
Self {
t: ConnectorType::from("snot"),
alias: Alias::new(flow::Alias::new("fake"), "fake"),
alias: Alias::new(FlowInstanceId::new("app", "fake"), "fake"),
notifier: reconnect::ConnectionLostNotifier::new(tx),
beacon: QuiescenceBeacon::default(),
}
Expand Down
8 changes: 4 additions & 4 deletions src/connectors/impls/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ mod tests {
use elasticsearch::http::request::Body;

use super::*;
use crate::config::Connector as ConnectorConfig;
use crate::{config::Connector as ConnectorConfig, ids::FlowInstanceId};

#[tokio::test(flavor = "multi_thread")]
async fn connector_builder_empty_nodes() -> Result<()> {
Expand All @@ -969,14 +969,14 @@ mod tests {
"nodes": []
}
});
let alias = Alias::new("flow", "my_elastic");
let alias = Alias::new(FlowInstanceId::new("app", "flow"), "my_elastic");
let builder = super::Builder::default();
let connector_config =
ConnectorConfig::from_config(&alias, builder.connector_type(), &config)?;
let kill_switch = KillSwitch::dummy();
assert_eq!(
String::from(
"Invalid Definition for connector \"flow::my_elastic\": empty nodes provided"
"Invalid Definition for connector \"app/flow::my_elastic\": empty nodes provided"
),
builder
.build(&alias, &connector_config, &kill_switch)
Expand All @@ -998,7 +998,7 @@ mod tests {
]
}
});
let alias = Alias::new("snot", "my_elastic");
let alias = Alias::new(FlowInstanceId::new("app", "snot"), "my_elastic");
let builder = super::Builder::default();
let connector_config =
ConnectorConfig::from_config(&alias, builder.connector_type(), &config)?;
Expand Down
20 changes: 13 additions & 7 deletions src/connectors/impls/gbq/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ impl ConnectorBuilder for Builder {
#[cfg(test)]
mod tests {
use tokio::sync::broadcast;
use tremor_common::ids::SinkId;
use tremor_common::uids::SinkUId;

use super::*;
use crate::connectors::reconnect::ConnectionLostNotifier;
use crate::connectors::sink::builder;
use crate::connectors::{metrics::SinkReporter, utils::quiescence::QuiescenceBeacon};
use crate::connectors::utils::quiescence::QuiescenceBeacon;
use crate::connectors::{reconnect::ConnectionLostNotifier, utils::metrics::SinkReporter};
use crate::ids::FlowInstanceId;

#[tokio::test(flavor = "multi_thread")]
pub async fn can_spawn_sink() -> Result<()> {
Expand All @@ -105,17 +106,22 @@ mod tests {
let sink_address = connector
.create_sink(
SinkContext::new(
SinkId::default(),
Alias::new("a", "b"),
openraft::NodeId::default(),
SinkUId::default(),
Alias::new(FlowInstanceId::new("app", "a"), "b"),
ConnectorType::default(),
QuiescenceBeacon::default(),
ConnectionLostNotifier::new(crate::channel::bounded(128).0),
),
builder(
&ConnectorConfig::default(),
CodecReq::Structured,
&Alias::new("a", "b"),
SinkReporter::new(Alias::new("a", "b"), broadcast::channel(1).0, None),
&Alias::new(FlowInstanceId::new("app", "a"), "b"),
SinkReporter::new(
Alias::new(FlowInstanceId::new("app", "a"), "b"),
broadcast::channel(1).0,
None,
),
)?,
)
.await?;
Expand Down
Loading

0 comments on commit 6439ac0

Please sign in to comment.