Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

router: small refactoring to remove the need for a batchconn.WriteTo #4651

Merged
merged 5 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions router/control/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,14 @@ func confExternalInterfaces(dp Dataplane, cfg *Config) error {

_, owned := cfg.BR.IFs[ifID]
if !owned {
// XXX The current implementation effectively uses IP/UDP tunnels to create
// the SCION network as an overlay, with forwarding to local hosts being a special case.
// When setting up external interfaces that belong to other routers in the AS, they
// are basically IP/UDP tunnels between the two border routers, and as such is
// configured in the data plane.
// The current implementation effectively uses IP/UDP tunnels to create the SCION
// network as an overlay, with forwarding to local hosts being a special case. When
// setting up external interfaces that belong to other routers in the AS, they are
// basically IP/UDP tunnels between the two border routers. Those are described as a
// link from the internal address of the local router to the internal address of the
// sibling router; and not to the router in the remote AS.
linkInfo.Local.Addr = cfg.BR.InternalAddr
linkInfo.Remote.Addr = iface.InternalAddr
linkInfo.Remote.Addr = iface.InternalAddr // i.e. via sibling router.
// For internal BFD always use the default configuration.
linkInfo.BFD = BFD{}
}
Expand Down
70 changes: 52 additions & 18 deletions router/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ type bfdSession interface {
// BatchConn is a connection that supports batch reads and writes.
type BatchConn interface {
ReadBatch(underlayconn.Messages) (int, error)
WriteTo([]byte, netip.AddrPort) (int, error)
WriteBatch(msgs underlayconn.Messages, flags int) (int, error)
Close() error
}
Expand Down Expand Up @@ -177,7 +176,6 @@ type DataPlane struct {
linkTypes map[uint16]topology.LinkType
neighborIAs map[uint16]addr.IA
peerInterfaces map[uint16]uint16
internal BatchConn
internalIP netip.Addr
internalNextHops map[uint16]netip.AddrPort
svc *services
Expand All @@ -193,6 +191,10 @@ type DataPlane struct {

ExperimentalSCMPAuthentication bool

// The forwarding queues. Each is consumed by a forwarder process and fed by
// one bfd sender and the packet processors.
fwQs map[uint16]chan *packet

// The pool that stores all the packet buffers as described in the design document. See
// https://github.com/scionproto/scion/blob/master/doc/dev/design/BorderRouter.rst
// To avoid garbage collection, most the meta-data that is produced during the processing of a
Expand Down Expand Up @@ -228,6 +230,7 @@ var (
errBFDSessionDown = errors.New("bfd session down")
expiredHop = errors.New("expired hop")
ingressInterfaceInvalid = errors.New("ingress interface invalid")
egressInterfaceInvalid = errors.New("egress interface invalid")
macVerificationFailed = errors.New("MAC verification failed")
badPacketSize = errors.New("bad packet size")

Expand Down Expand Up @@ -334,14 +337,12 @@ func (d *DataPlane) AddInternalInterface(conn BatchConn, ip netip.Addr) error {
if conn == nil {
return emptyValue
}
if d.internal != nil {
return alreadySet
}
if d.interfaces == nil {
d.interfaces = make(map[uint16]BatchConn)
} else if d.interfaces[0] != nil {
return alreadySet
}
d.interfaces[0] = conn
d.internal = conn
d.internalIP = ip
return nil
}
Expand All @@ -361,7 +362,7 @@ func (d *DataPlane) AddExternalInterface(ifID uint16, conn BatchConn,
if conn == nil || !src.Addr.IsValid() || !dst.Addr.IsValid() {
return emptyValue
}
err := d.addExternalInterfaceBFD(ifID, conn, src, dst, cfg)
err := d.addExternalInterfaceBFD(ifID, src, dst, cfg)
if err != nil {
return serrors.Wrap("adding external BFD", err, "if_id", ifID)
}
Expand Down Expand Up @@ -439,7 +440,7 @@ func (d *DataPlane) AddRemotePeer(local, remote uint16) error {
}

// AddExternalInterfaceBFD adds the inter AS connection BFD session.
func (d *DataPlane) addExternalInterfaceBFD(ifID uint16, conn BatchConn,
func (d *DataPlane) addExternalInterfaceBFD(ifID uint16,
src, dst control.LinkEnd, cfg control.BFD) error {

if *cfg.Disable {
Expand All @@ -459,7 +460,7 @@ func (d *DataPlane) addExternalInterfaceBFD(ifID uint16, conn BatchConn,
PacketsReceived: d.Metrics.BFDPacketsReceived.With(labels),
}
}
s, err := newBFDSend(conn, src.IA, dst.IA, src.Addr, dst.Addr, ifID, d.macFactory())
s, err := newBFDSend(d, src.IA, dst.IA, src.Addr, dst.Addr, ifID, d.macFactory())
if err != nil {
return err
}
Expand All @@ -477,7 +478,7 @@ func (d *DataPlane) getInterfaceState(ifID uint16) control.InterfaceState {
return control.InterfaceUp
}

func (d *DataPlane) addBFDController(ifID uint16, s *bfdSend, cfg control.BFD,
func (d *DataPlane) addBFDController(ifID uint16, s bfd.Sender, cfg control.BFD,
metrics bfd.Metrics) error {

if d.bfdSessions == nil {
Expand Down Expand Up @@ -599,7 +600,7 @@ func (d *DataPlane) addNextHopBFD(ifID uint16, src, dst netip.AddrPort, cfg cont
}
}

s, err := newBFDSend(d.internal, d.localIA, d.localIA, src, dst, 0, d.macFactory())
s, err := newBFDSend(d, d.localIA, d.localIA, src, dst, 0, d.macFactory())
if err != nil {
return err
}
Expand All @@ -621,7 +622,6 @@ type RunConfig struct {

func (d *DataPlane) Run(ctx context.Context, cfg *RunConfig) error {
d.mtx.Lock()
d.setRunning()
d.initMetrics()

processorQueueSize := max(
Expand All @@ -630,7 +630,9 @@ func (d *DataPlane) Run(ctx context.Context, cfg *RunConfig) error {

d.initPacketPool(cfg, processorQueueSize)
procQs, fwQs, slowQs := initQueues(cfg, d.interfaces, processorQueueSize)
d.fwQs = fwQs // Shared with BFD senders

d.setRunning()
for ifID, conn := range d.interfaces {
go func(ifID uint16, conn BatchConn) {
defer log.HandlePanic()
Expand Down Expand Up @@ -759,7 +761,7 @@ func (d *DataPlane) runReceiver(ifID uint16, conn BatchConn, cfg *RunConfig,

// Give a new buffer to the msgs elements that have been used in the previous loop.
for i := 0; i < cfg.BatchSize-numReusable; i++ {
p := <-d.packetPool
p := d.getPacketFromPool()
p.reset()
packets[i] = p
msgs[i].Buffers[0] = p.rawPacket
Expand Down Expand Up @@ -805,6 +807,10 @@ func computeProcID(data []byte, numProcRoutines int, hashSeed uint32) (uint32, e
return s % uint32(numProcRoutines), nil
}

func (d *DataPlane) getPacketFromPool() *packet {
return <-d.packetPool
}

func (d *DataPlane) returnPacketToPool(pkt *packet) {
d.packetPool <- pkt
}
Expand Down Expand Up @@ -1821,6 +1827,7 @@ func (p *scionPacketProcessor) handleEgressRouterAlert() disposition {
return pForward
}
if _, ok := p.d.external[p.pkt.egress]; !ok {
// the egress router is not this one.
return pForward
}
*alert = false
Expand Down Expand Up @@ -1990,7 +1997,6 @@ func (p *scionPacketProcessor) process() disposition {
if disp := p.validateEgressUp(); disp != pForward {
return disp
}

if _, ok := p.d.external[egressID]; ok {
// Not ASTransit in
if disp := p.processEgress(); disp != pForward {
Expand Down Expand Up @@ -2316,7 +2322,8 @@ func updateSCIONLayer(rawPkt []byte, s slayers.SCION, buffer gopacket.SerializeB
}

type bfdSend struct {
conn BatchConn
dataPlane *DataPlane
ifID uint16
srcAddr, dstAddr netip.AddrPort
scn *slayers.SCION
ohp *onehop.Path
Expand All @@ -2326,7 +2333,7 @@ type bfdSend struct {
}

// newBFDSend creates and initializes a BFD Sender
func newBFDSend(conn BatchConn, srcIA, dstIA addr.IA, srcAddr, dstAddr netip.AddrPort,
func newBFDSend(d *DataPlane, srcIA, dstIA addr.IA, srcAddr, dstAddr netip.AddrPort,
ifID uint16, mac hash.Hash) (*bfdSend, error) {

scn := &slayers.SCION{
Expand Down Expand Up @@ -2373,8 +2380,13 @@ func newBFDSend(conn BatchConn, srcIA, dstIA addr.IA, srcAddr, dstAddr netip.Add
scn.Path = ohp
}

// bfdSend includes a reference to the dataplane. In general this must not be used until the
// dataplane is running. This is ensured by the fact that bfdSend objects are owned by bfd
// sessions, which are started by dataplane.Run() itself.

return &bfdSend{
conn: conn,
dataPlane: d,
ifID: ifID,
srcAddr: srcAddr,
dstAddr: dstAddr,
scn: scn,
Expand Down Expand Up @@ -2405,7 +2417,29 @@ func (b *bfdSend) Send(bfd *layers.BFD) error {
if err != nil {
return err
}
_, err = b.conn.WriteTo(b.buffer.Bytes(), b.dstAddr)

fwChan, ok := b.dataPlane.fwQs[b.ifID]
if !ok {
// WTF? May be we should just treat that as a panic-able offense
return egressInterfaceInvalid
}

p := b.dataPlane.getPacketFromPool()
p.reset()

// FIXME: would rather build the pkt directly into the rawPacket's buffer.
sz := copy(p.rawPacket, b.buffer.Bytes())
p.rawPacket = p.rawPacket[:sz]
if b.ifID == 0 {
// Using the internal interface: must specify the destination address
updateNetAddrFromAddrPort(p.dstAddr, b.dstAddr)
}
// No need to specify pkt.egress. It isn't used downstream from here.
select {
case fwChan <- p:
default:
b.dataPlane.returnPacketToPool(p) // Do we care enough to have a metric?
}
return err
}

Expand Down
4 changes: 2 additions & 2 deletions router/dataplane_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestReceiver(t *testing.T) {
dp.setRunning()
dp.initMetrics()
go func() {
dp.runReceiver(0, dp.internal, runConfig, procCh)
dp.runReceiver(0, dp.interfaces[0], runConfig, procCh)
}()
ptrMap := make(map[uintptr]struct{})
for i := 0; i < 21; i++ {
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestForwarder(t *testing.T) {
initialPoolSize := len(dp.packetPool)
dp.setRunning()
dp.initMetrics()
go dp.runForwarder(0, dp.internal, runConfig, fwCh[0])
go dp.runForwarder(0, dp.interfaces[0], runConfig, fwCh[0])

dstAddr := &net.UDPAddr{IP: net.IP{10, 0, 200, 200}}
for i := 0; i < 255; i++ {
Expand Down
33 changes: 16 additions & 17 deletions router/dataplane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func TestDataPlaneRun(t *testing.T) {
},
).Times(1)
mExternal.EXPECT().ReadBatch(gomock.Any()).Return(0, nil).AnyTimes()
mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()
mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()
l := control.LinkEnd{
IA: addr.MustParseIA("1-ff00:0:1"),
Addr: netip.MustParseAddrPort("10.0.0.100:0"),
Expand Down Expand Up @@ -373,10 +373,9 @@ func TestDataPlaneRun(t *testing.T) {
},
).Times(1)
mInternal.EXPECT().ReadBatch(gomock.Any()).Return(0, nil).AnyTimes()

mInternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).DoAndReturn(
func(data []byte, _ netip.AddrPort) (int, error) {
pkt := gopacket.NewPacket(data,
mInternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).DoAndReturn(
func(msgs underlayconn.Messages, _ int) (int, error) {
pkt := gopacket.NewPacket(msgs[0].Buffers[0],
slayers.LayerTypeSCION, gopacket.Default)
if b := pkt.Layer(layers.LayerTypeBFD); b != nil {
v := b.(*layers.BFD).YourDiscriminator
Expand All @@ -391,7 +390,7 @@ func TestDataPlaneRun(t *testing.T) {

return 0, fmt.Errorf("no valid BFD message")
}).MinTimes(1)
mInternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()
mInternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()

local := netip.MustParseAddrPort("10.0.200.100:0")
_ = ret.SetKey([]byte("randomkeyformacs"))
Expand All @@ -410,9 +409,9 @@ func TestDataPlaneRun(t *testing.T) {
localAddr := netip.MustParseAddrPort("10.0.200.100:0")
remoteAddr := netip.MustParseAddrPort("10.0.200.200:0")
mInternal := mock_router.NewMockBatchConn(ctrl)
mInternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).DoAndReturn(
func(data []byte, _ netip.AddrPort) (int, error) {
pkt := gopacket.NewPacket(data,
mInternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).DoAndReturn(
func(msgs underlayconn.Messages, _ int) (int, error) {
pkt := gopacket.NewPacket(msgs[0].Buffers[0],
slayers.LayerTypeSCION, gopacket.Default)

if b := pkt.Layer(layers.LayerTypeBFD); b == nil {
Expand Down Expand Up @@ -464,9 +463,9 @@ func TestDataPlaneRun(t *testing.T) {

mExternal := mock_router.NewMockBatchConn(ctrl)
mExternal.EXPECT().ReadBatch(gomock.Any()).Return(0, nil).AnyTimes()
mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).DoAndReturn(
func(data []byte, _ netip.AddrPort) (int, error) {
pkt := gopacket.NewPacket(data,
mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).DoAndReturn(
func(msgs underlayconn.Messages, _ int) (int, error) {
pkt := gopacket.NewPacket(msgs[0].Buffers[0],
slayers.LayerTypeSCION, gopacket.Default)

if b := pkt.Layer(layers.LayerTypeBFD); b == nil {
Expand All @@ -491,7 +490,7 @@ func TestDataPlaneRun(t *testing.T) {
done <- struct{}{}
return 1, nil
}).MinTimes(1)
mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()
mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()

local := control.LinkEnd{
IA: addr.MustParseIA("1-ff00:0:1"),
Expand Down Expand Up @@ -552,9 +551,9 @@ func TestDataPlaneRun(t *testing.T) {
).Times(1)
mExternal.EXPECT().ReadBatch(gomock.Any()).Return(0, nil).AnyTimes()

mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).DoAndReturn(
func(data []byte, _ netip.AddrPort) (int, error) {
pkt := gopacket.NewPacket(data,
mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).DoAndReturn(
func(msgs underlayconn.Messages, _ int) (int, error) {
pkt := gopacket.NewPacket(msgs[0].Buffers[0],
slayers.LayerTypeSCION, gopacket.Default)

if b := pkt.Layer(layers.LayerTypeBFD); b != nil {
Expand All @@ -569,7 +568,7 @@ func TestDataPlaneRun(t *testing.T) {
}
return 0, fmt.Errorf("no valid BFD message")
}).MinTimes(1)
mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()
mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()

local := control.LinkEnd{
IA: addr.MustParseIA("1-ff00:0:1"),
Expand Down
3 changes: 2 additions & 1 deletion router/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func NewDP(
key []byte) *DataPlane {

dp := &DataPlane{
interfaces: map[uint16]BatchConn{0: internal},
localIA: local,
external: external,
linkTypes: linkTypes,
Expand All @@ -84,10 +85,10 @@ func NewDP(
dispatchedPortStart: uint16(dispatchedPortStart),
dispatchedPortEnd: uint16(dispatchedPortEnd),
svc: &services{m: svc},
internal: internal,
internalIP: netip.MustParseAddr("198.51.100.1"),
Metrics: metrics,
}

if err := dp.SetKey(key); err != nil {
panic(err)
}
Expand Down
16 changes: 0 additions & 16 deletions router/mock_router/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading