Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/dynamic uplink interfaces #636

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ install-multinet:
@kubectl apply -f test/yaml/multinet/projectcalico.org_networks.yaml
@kubectl apply -f test/yaml/multinet/whereabouts-daemonset-install.yaml
@echo "--Installing multus daemonset..."
@kubectl apply -f https://github.com/k8snetworkplumbingwg/multus-cni/raw/master/deployments/multus-daemonset-thick.yml
@kubectl apply -f https://github.com/k8snetworkplumbingwg/multus-cni/raw/99c4481e08a4a8f0a3d0013446f03e4206033cae/deployments/multus-daemonset-thick.yml
@echo "--Installing whereabouts daemonset..."
@kubectl apply -f https://github.com/k8snetworkplumbingwg/whereabouts/raw/master/doc/crds/whereabouts.cni.cncf.io_ippools.yaml
@kubectl apply -f https://github.com/k8snetworkplumbingwg/whereabouts/raw/master/doc/crds/whereabouts.cni.cncf.io_overlappingrangeipreservations.yaml
Expand All @@ -233,7 +233,7 @@ delete-multinet:
@kubectl delete -f https://github.com/k8snetworkplumbingwg/whereabouts/raw/master/doc/crds/whereabouts.cni.cncf.io_ippools.yaml
@kubectl delete -f https://github.com/k8snetworkplumbingwg/whereabouts/raw/master/doc/crds/whereabouts.cni.cncf.io_overlappingrangeipreservations.yaml
@echo "--Deleting multus daemonset..."
@kubectl delete -f https://github.com/k8snetworkplumbingwg/multus-cni/raw/master/deployments/multus-daemonset-thick.yml
@kubectl delete -f https://github.com/k8snetworkplumbingwg/multus-cni/raw/99c4481e08a4a8f0a3d0013446f03e4206033cae/deployments/multus-daemonset-thick.yml
@echo "--Deleting network CRD..."
@kubectl delete -f test/yaml/multinet/projectcalico.org_networks.yaml
@kubectl delete -f test/yaml/multinet/whereabouts-daemonset-install.yaml
Expand Down
6 changes: 5 additions & 1 deletion calico-vpp-agent/cmd/calico_vpp_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ func main() {
if err != nil {
log.Fatalf("Cannot create VPP client: %v", err)
}
usr2SignalChannel := make(chan os.Signal, 10)
signal.Notify(usr2SignalChannel, syscall.SIGUSR2)
// Once we have the api connection, we know vpp & vpp-manager are running and the
// state is accurately reported. Wait for vpp-manager to finish the config.
common.VppManagerInfo, err = common.WaitForVppManager()
Expand All @@ -98,6 +100,8 @@ func main() {
}
common.ThePubSub = common.NewPubSub(log.WithFields(logrus.Fields{"component": "pubsub"}))

uplinkManagerWatcher := watchers.NewUplinkManagerWatcher(usr2SignalChannel, log.WithFields(logrus.Fields{"subcomponent": "uplink-manager-watcher"}))
Go(uplinkManagerWatcher.WatchUplinks)
/**
* Create the API clients we need
*/
Expand Down Expand Up @@ -136,7 +140,7 @@ func main() {
* Start watching nodes & fetch our BGP spec
*/
routeWatcher := watchers.NewRouteWatcher(log.WithFields(logrus.Fields{"subcomponent": "host-route-watcher"}))
linkWatcher := watchers.NewLinkWatcher(common.VppManagerInfo.UplinkStatuses, log.WithFields(logrus.Fields{"subcomponent": "host-link-watcher"}))
linkWatcher := watchers.NewLinkWatcher(log.WithFields(logrus.Fields{"subcomponent": "host-link-watcher"}))
bgpConfigurationWatcher := watchers.NewBGPConfigurationWatcher(clientv3, log.WithFields(logrus.Fields{"subcomponent": "bgp-conf-watch"}))
prefixWatcher := watchers.NewPrefixWatcher(client, log.WithFields(logrus.Fields{"subcomponent": "prefix-watcher"}))
peerWatcher := watchers.NewPeerWatcher(clientv3, k8sclient, log.WithFields(logrus.Fields{"subcomponent": "peer-watcher"}))
Expand Down
5 changes: 2 additions & 3 deletions calico-vpp-agent/common/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@ const (
NetDeleted CalicoVppEventType = "NetDeleted"
NetsSynced CalicoVppEventType = "NetsSynced"

IpamPoolUpdate CalicoVppEventType = "IpamPoolUpdate"
IpamPoolRemove CalicoVppEventType = "IpamPoolRemove"

WireguardPublicKeyChanged CalicoVppEventType = "WireguardPublicKeyChanged"

UplinksUpdated CalicoVppEventType = "UplinksUpdated"
)

var (
Expand Down
21 changes: 10 additions & 11 deletions calico-vpp-agent/watchers/uplink_link_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,20 @@ import (
"github.com/vishvananda/netlink"
"gopkg.in/tomb.v2"

"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
"github.com/projectcalico/vpp-dataplane/v3/config"
)

type LinkWatcher struct {
UplinkStatuses map[string]config.UplinkStatus
close chan struct{}
netlinkFailed chan struct{}
closeLock sync.Mutex
log *log.Entry
close chan struct{}
netlinkFailed chan struct{}
closeLock sync.Mutex
log *log.Entry
}

func NewLinkWatcher(uplinkStatus map[string]config.UplinkStatus, log *log.Entry) *LinkWatcher {
func NewLinkWatcher(log *log.Entry) *LinkWatcher {
return &LinkWatcher{
UplinkStatuses: uplinkStatus,
log: log,
log: log,
}
}

Expand Down Expand Up @@ -78,7 +77,7 @@ func (r *LinkWatcher) WatchLinks(t *tomb.Tomb) error {
r.safeClose()
goto restart
}
for _, v := range r.UplinkStatuses {
for _, v := range common.VppManagerInfo.UplinkStatuses {
link, err = netlink.LinkByIndex(v.LinkIndex)
if err != nil || link.Attrs().Name != v.Name {
r.log.Errorf("error getting link to watch: %v %v", link, err)
Expand Down Expand Up @@ -113,7 +112,7 @@ func (r *LinkWatcher) WatchLinks(t *tomb.Tomb) error {
}
found := false
v := config.UplinkStatus{}
for _, v := range r.UplinkStatuses {
for _, v = range common.VppManagerInfo.UplinkStatuses {
if update.Attrs().Index == v.LinkIndex {
found = true
break
Expand All @@ -122,7 +121,7 @@ func (r *LinkWatcher) WatchLinks(t *tomb.Tomb) error {
if found {
if update.Attrs().Name == v.Name {
if update.Attrs().MTU != v.Mtu {
if err = netlink.LinkSetMTU(link, v.Mtu); err != nil {
if err = netlink.LinkSetMTU(update.Link, v.Mtu); err != nil {
r.log.Warnf("Error resetting link mtu: %v", err)
r.safeClose()
goto restart
Expand Down
64 changes: 64 additions & 0 deletions calico-vpp-agent/watchers/uplink_manager_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (C) 2023 Cisco Systems Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package watchers

import (
"encoding/json"
"os"

"github.com/pkg/errors"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
"github.com/projectcalico/vpp-dataplane/v3/config"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
)

type UplinkManagerWatcher struct {
log *log.Entry
usr2SignalChannel chan os.Signal
}

func NewUplinkManagerWatcher(usr2SignalChannel chan os.Signal, log *log.Entry) *UplinkManagerWatcher {
return &UplinkManagerWatcher{
usr2SignalChannel: usr2SignalChannel,
log: log,
}
}

func (w *UplinkManagerWatcher) WatchUplinks(t *tomb.Tomb) error {
for t.Alive() {
<-w.usr2SignalChannel
/* vpp-manager pokes us with USR2 if status changes (dynamically added interface) */
w.log.Info("Vpp manager state changed")
dat, err := os.ReadFile(config.VppManagerInfoFile)
if err != nil {
w.log.Error(err)
} else {
err2 := json.Unmarshal(dat, common.VppManagerInfo)
if err2 != nil {
w.log.Error(errors.Errorf("cannot unmarshal vpp manager info file %s", err2))
} else if common.VppManagerInfo.Status == config.Ready {
w.log.Info("local vppmanager state updated")
common.SendEvent(common.CalicoVppEvent{
Type: common.UplinksUpdated,
})
} else {
w.log.Error(errors.Errorf("vpp manager file status not ready after dynamically added interface"))
}
}
}
return nil
}
17 changes: 12 additions & 5 deletions calico-vpp-agent/watchers/uplink_route_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func NewRouteWatcher(log *log.Entry) *RouteWatcher {
common.IpamConfChanged,
common.NetAddedOrUpdated,
common.NetDeleted,
common.UplinksUpdated,
)
return routeWatcher
}
Expand Down Expand Up @@ -174,11 +175,7 @@ func (r *RouteWatcher) getNetworkRoute(network string, physicalNet string) (rout
return routes, nil
}

func (r *RouteWatcher) WatchRoutes(t *tomb.Tomb) error {
r.netlinkFailed = make(chan struct{}, 1)
r.addrUpdate = make(chan struct{}, 10)

go r.watchAddresses(t)
func (r *RouteWatcher) AddRoutesForServices() {
for _, serviceCIDR := range *config.ServiceCIDRs {
// Add a route for the service prefix through VPP. This is required even if kube-proxy is
// running on the host to ensure correct source address selection if the host has multiple interfaces
Expand Down Expand Up @@ -208,6 +205,14 @@ func (r *RouteWatcher) WatchRoutes(t *tomb.Tomb) error {
}
}
}
}

func (r *RouteWatcher) WatchRoutes(t *tomb.Tomb) error {
r.netlinkFailed = make(chan struct{}, 1)
r.addrUpdate = make(chan struct{}, 10)

go r.watchAddresses(t)
r.AddRoutesForServices()
for {
r.closeLock.Lock()
netlinkUpdates := make(chan netlink.RouteUpdate, 10)
Expand Down Expand Up @@ -239,6 +244,8 @@ func (r *RouteWatcher) WatchRoutes(t *tomb.Tomb) error {
return nil
case event := <-r.eventChan:
switch event.Type {
case common.UplinksUpdated:
r.AddRoutesForServices()
case common.NetDeleted:
netDef := event.Old.(*NetworkDefinition)
key := netDef.Range
Expand Down
23 changes: 14 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func RunHook(hookScript *string, hookName string, params *VppManagerParams, log
if *hookScript == "" {
return
}
template, err := TemplateScriptReplace(*hookScript, params, nil)
template, err := TemplateScriptReplace(*hookScript, params, false)
if err != nil {
log.Warnf("Running hook %s errored with %s", hookName, err)
return
Expand Down Expand Up @@ -233,6 +233,11 @@ type UplinkInterfaceSpec struct {
SwIfIndex uint32 `json:"-"`
}

type AttachedUplinkInterfaceSpec struct {
*UplinkInterfaceSpec
LinuxConf *LinuxInterfaceState
}

func (u *UplinkInterfaceSpec) GetIsMain() bool {
if u.IsMain == nil {
return false
Expand Down Expand Up @@ -539,7 +544,7 @@ const (
)

type VppManagerParams struct {
UplinksSpecs []UplinkInterfaceSpec
AttachedUplinksSpecs []*AttachedUplinkInterfaceSpec
/* Capabilities */
LoadedDrivers map[string]bool
KernelVersion *KernelVersion
Expand Down Expand Up @@ -663,22 +668,22 @@ func getCpusetCpu() (string, error) {
return regexp.MustCompile("[,-]").Split(cpusetCpu, 2)[0], nil
}

func TemplateScriptReplace(input string, params *VppManagerParams, conf []*LinuxInterfaceState) (template string, err error) {
func TemplateScriptReplace(input string, params *VppManagerParams, withConf bool) (template string, err error) {
template = input
if conf != nil {
if len(params.AttachedUplinksSpecs) != 0 && withConf {
/* We might template scripts before reading interface conf */
template = strings.ReplaceAll(template, "__PCI_DEVICE_ID__", conf[0].PciId)
for i, ifcConf := range conf {
template = strings.ReplaceAll(template, "__PCI_DEVICE_ID_"+strconv.Itoa(i)+"__", ifcConf.PciId)
template = strings.ReplaceAll(template, "__PCI_DEVICE_ID__", params.AttachedUplinksSpecs[0].LinuxConf.PciId)
for i, attachedInterface := range params.AttachedUplinksSpecs {
template = strings.ReplaceAll(template, "__PCI_DEVICE_ID_"+strconv.Itoa(i)+"__", attachedInterface.LinuxConf.PciId)
}
}
vppcpu, err := getCpusetCpu()
if err != nil {
return "", err
}
template = strings.ReplaceAll(template, "__CPUSET_CPUS_FIRST__", vppcpu)
template = strings.ReplaceAll(template, "__VPP_DATAPLANE_IF__", params.UplinksSpecs[0].InterfaceName)
for i, ifc := range params.UplinksSpecs {
template = strings.ReplaceAll(template, "__VPP_DATAPLANE_IF__", params.AttachedUplinksSpecs[0].InterfaceName)
for i, ifc := range params.AttachedUplinksSpecs {
template = strings.ReplaceAll(template, "__VPP_DATAPLANE_IF_"+fmt.Sprintf("%d", i)+"__", ifc.InterfaceName)
}
for key, value := range params.NodeAnnotations {
Expand Down
2 changes: 1 addition & 1 deletion docs/kind.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ nodes:
# multinet CRDs, multus plugin and ipam
kubectl create -f https://raw.githubusercontent.com/projectcalico/vpp-dataplane/master/test/yaml/multinet/projectcalico.org_networks.yaml
kubectl create -f https://raw.githubusercontent.com/projectcalico/vpp-dataplane/master/test/yaml/multinet/whereabouts-daemonset-install.yaml
kubectl create -f https://github.com/k8snetworkplumbingwg/multus-cni/raw/master/deployments/multus-daemonset-thick.yml
kubectl create -f https://github.com/k8snetworkplumbingwg/multus-cni/raw/99c4481e08a4a8f0a3d0013446f03e4206033cae/deployments/multus-daemonset-thick.yml
kubectl create -f https://github.com/k8snetworkplumbingwg/whereabouts/raw/master/doc/crds/whereabouts.cni.cncf.io_ippools.yaml
kubectl create -f https://github.com/k8snetworkplumbingwg/whereabouts/raw/master/doc/crds/whereabouts.cni.cncf.io_overlappingrangeipreservations.yaml

Expand Down
2 changes: 1 addition & 1 deletion docs/multinet.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ kubectl apply -f test/yaml/multinet/whereabouts-daemonset-install.yaml
#### Installing multus deamonset

````yaml
kubectl apply -f https://github.com/k8snetworkplumbingwg/multus-cni/raw/master/deployments/multus-daemonset-thick.yml
kubectl apply -f https://github.com/k8snetworkplumbingwg/multus-cni/raw/99c4481e08a4a8f0a3d0013446f03e4206033cae/deployments/multus-daemonset-thick.yml
kubectl apply -f https://github.com/k8snetworkplumbingwg/whereabouts/raw/master/doc/crds/whereabouts.cni.cncf.io_ippools.yaml
kubectl apply -f https://github.com/k8snetworkplumbingwg/whereabouts/raw/master/doc/crds/whereabouts.cni.cncf.io_overlappingrangeipreservations.yaml
````
Expand Down
27 changes: 17 additions & 10 deletions vpp-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,18 @@ const (
maxCoreFiles = 2
)

type dynamicInterface struct {
params *config.VppManagerParams
driver uplink.UplinkDriver
}

var (
runningCond *sync.Cond
vppProcess *os.Process
vppDeadChan chan bool
signals chan os.Signal

dynamicInterfaceAdd chan dynamicInterface
/* Was VPP terminated by us ? */
internalKill bool
/* Increasing index for timeout */
Expand Down Expand Up @@ -148,6 +155,7 @@ func main() {
log = logrus.New()

vppDeadChan = make(chan bool, 1)
dynamicInterfaceAdd = make(chan dynamicInterface, 1)
VPPgotSigCHLD = make(map[int]bool)
VPPgotTimeout = make(map[int]bool)

Expand Down Expand Up @@ -175,29 +183,29 @@ func main() {
log.Errorf("Error raising memlock limit, VPP may fail to start: %v", err)
}

confs, err := startup.GetInterfaceConfig(params)
err = startup.GetInterfaceConfig(params)
if err != nil {
log.Fatalf("Error getting initial interface configuration: %s", err)
}

runningCond = sync.NewCond(&sync.Mutex{})
go handleSignals()

startup.PrintVppManagerConfig(params, confs)
startup.PrintVppManagerConfig(params)

runner := NewVPPRunner(params, confs)
runner := NewVPPRunner()

makeNewVPPIndex()

if len(params.UplinksSpecs) == 1 && params.UplinksSpecs[0].VppDriver == "" {
for _, driver := range uplink.SupportedUplinkDrivers(params, confs[0], &params.UplinksSpecs[0]) {
if len(params.AttachedUplinksSpecs) == 1 && params.AttachedUplinksSpecs[0].VppDriver == "" {
for _, driver := range uplink.SupportedUplinkDrivers(params, 0) {
err := utils.CleanupCoreFiles(config.GetCalicoVppInitialConfig().CorePattern, maxCoreFiles)
if err != nil {
log.Errorf("CleanupCoreFiles errored %s", err)
}

internalKill = false
err = runner.Run([]uplink.UplinkDriver{driver})
err = runner.Run([]uplink.UplinkDriver{driver}, params)
if err != nil {
config.RunHook(config.HookScriptVppErrored, "VPP_ERRORED", params, log)
log.Errorf("VPP(%s) run failed with %s", driver.GetName(), err)
Expand All @@ -216,12 +224,11 @@ func main() {
}

var drivers []uplink.UplinkDriver
for idx := 0; idx < len(params.UplinksSpecs); idx++ {
drivers = append(drivers, uplink.NewUplinkDriver(params.UplinksSpecs[idx].VppDriver,
params, confs[idx], &params.UplinksSpecs[idx]))
for idx := 0; idx < len(params.AttachedUplinksSpecs); idx++ {
drivers = append(drivers, uplink.NewUplinkDriver(params, idx))
}

err = runner.Run(drivers)
err = runner.Run(drivers, params)
if err != nil {
config.RunHook(config.HookScriptVppErrored, "VPP_ERRORED", params, log)
log.Errorf("VPP run failed with %v", err)
Expand Down
Loading