Skip to content

Commit

Permalink
The first commit for k8s-ovs project.
Browse files Browse the repository at this point in the history
Signed-off-by: tangle329 <[email protected]>
  • Loading branch information
tangle329 committed Jun 23, 2017
1 parent 97b1d25 commit d9b9f56
Show file tree
Hide file tree
Showing 8,584 changed files with 2,498,672 additions and 1 deletion.
The diff you're trying to view is too large. We only load the first 3000 changed files.
475 changes: 474 additions & 1 deletion README.md

Large diffs are not rendered by default.

114 changes: 114 additions & 0 deletions cniclient/k8s-ovs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"strings"

"k8s-ovs/cniserver"

"github.com/containernetworking/cni/pkg/skel"
"github.com/containernetworking/cni/pkg/types"
"github.com/containernetworking/cni/pkg/version"
)

type cniPlugin struct {
socketPath string
}

func NewCNIPlugin(socketPath string) *cniPlugin {
return &cniPlugin{socketPath: socketPath}
}

// Create and fill a CNIRequest with this plugin's environment and stdin which
// contain the CNI variables and configuration
func newCNIRequest(args *skel.CmdArgs) *cniserver.CNIRequest {
envMap := make(map[string]string)
for _, item := range os.Environ() {
idx := strings.Index(item, "=")
if idx > 0 {
envMap[strings.TrimSpace(item[:idx])] = item[idx+1:]
}
}

return &cniserver.CNIRequest{
Env: envMap,
Config: args.StdinData,
}
}

// Send a CNI request to the CNI server via JSON + HTTP over a root-owned unix socket,
// and return the result
func (p *cniPlugin) doCNI(url string, req *cniserver.CNIRequest) ([]byte, error) {
data, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("failed to marshal CNI request %v: %v", req, err)
}

client := &http.Client{
Transport: &http.Transport{
Dial: func(proto, addr string) (net.Conn, error) {
return net.Dial("unix", p.socketPath)
},
},
}

resp, err := client.Post(url, "application/json", bytes.NewReader(data))
if err != nil {
return nil, fmt.Errorf("failed to send CNI request: %v", err)
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read CNI result: %v", err)
}

if resp.StatusCode != 200 {
return nil, fmt.Errorf("CNI request failed with status %v: '%s'", resp.StatusCode, string(body))
}

return body, nil
}

// Send the ADD command environment and config to the CNI server, returning
// the IPAM result to the caller
func (p *cniPlugin) CmdAdd(args *skel.CmdArgs) (*types.Result, error) {
body, err := p.doCNI("http://dummy/", newCNIRequest(args))
if err != nil {
return nil, err
}

result := &types.Result{}
if err := json.Unmarshal(body, result); err != nil {
return nil, fmt.Errorf("failed to unmarshal response '%s': %v", string(body), err)
}

return result, nil
}

// Send the ADD command environment and config to the CNI server, printing
// the IPAM result to stdout when called as a CNI plugin
func (p *cniPlugin) skelCmdAdd(args *skel.CmdArgs) error {
result, err := p.CmdAdd(args)
if err != nil {
return err
}
return result.Print()
}

// Send the DEL command environment and config to the CNI server
func (p *cniPlugin) CmdDel(args *skel.CmdArgs) error {
_, err := p.doCNI("http://dummy/", newCNIRequest(args))
return err
}

func main() {
p := NewCNIPlugin(cniserver.CNIServerSocketPath)
skel.PluginMain(p.skelCmdAdd, p.CmdDel, version.Legacy)
}
215 changes: 215 additions & 0 deletions cniserver/cniserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package cniserver

import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"path"
"strings"

"github.com/golang/glog"
"github.com/gorilla/mux"

utilruntime "k8s.io/kubernetes/pkg/util/runtime"
utilwait "k8s.io/kubernetes/pkg/util/wait"
)

// Default CNIServer unix domain socket path which the k8s-ovs SDN CNI
// plugin uses to talk to the CNIServer
const CNIServerSocketPath string = "/var/run/k8s-ovs/cni-server.sock"

// Explicit type for CNI commands the server handles
type CNICommand string

const CNI_ADD CNICommand = "ADD"
const CNI_UPDATE CNICommand = "UPDATE"
const CNI_DEL CNICommand = "DEL"

// Request sent to the CNIServer by the k8s-ovs SDN CNI plugin
type CNIRequest struct {
// CNI environment variables, like CNI_COMMAND and CNI_NETNS
Env map[string]string `json:"env,omitempty"`
// CNI configuration passed via stdin to the CNI plugin
Config []byte `json:"config,omitempty"`
}

// Request structure built from CNIRequest which is passed to the
// handler function given to the CNIServer at creation time
type PodRequest struct {
// The CNI command of the operation
Command CNICommand
// kubernetes namespace name
PodNamespace string
// kubernetes pod name
PodName string
// kubernetes container ID
ContainerId string
// kernel network namespace path
Netns string
// Channel for returning the operation result to the CNIServer
Result chan *PodResult
}

// Result of a PodRequest sent through the PodRequest's Result channel.
type PodResult struct {
// Response to be returned to the k8s-ovs SDN CNI plugin on success
Response []byte
// Error to be returned to the k8s-ovs SDN CNI plugin on failure
Err error
}

type cniRequestFunc func(request *PodRequest) ([]byte, error)

// CNI server object that listens for JSON-marshaled CNIRequest objects
// on a private root-only Unix domain socket.
type CNIServer struct {
http.Server
requestFunc cniRequestFunc
path string
}

// Create and return a new CNIServer object which will listen on the given
// socket path
func NewCNIServer(socketPath string) *CNIServer {
router := mux.NewRouter()

s := &CNIServer{
Server: http.Server{
Handler: router,
},
path: socketPath,
}
router.NotFoundHandler = http.HandlerFunc(http.NotFound)
router.HandleFunc("/", s.handleCNIRequest).Methods("POST")
return s
}

// Start the CNIServer's local HTTP server on a root-owned Unix domain socket.
// requestFunc will be called to handle pod setup/teardown operations on each
// request to the CNIServer's HTTP server, and should return a PodResult
// when the operation has completed.
func (s *CNIServer) Start(requestFunc cniRequestFunc) error {
if requestFunc == nil {
return fmt.Errorf("no pod request handler")
}
s.requestFunc = requestFunc

// Remove and re-create the socket directory with root-only permissions
dirName := path.Dir(s.path)
if err := os.RemoveAll(dirName); err != nil {
return fmt.Errorf("failed to removing old pod info socket: %v", err)
}
if err := os.MkdirAll(dirName, 0700); err != nil {
return fmt.Errorf("failed to create pod info socket directory: %v", err)
}

// On Linux the socket is created with the permissions of the directory
// it is in, so as long as the directory is root-only we can avoid
// racy umask manipulation.
l, err := net.Listen("unix", s.path)
if err != nil {
return fmt.Errorf("failed to listen on pod info socket: %v", err)
}
if err := os.Chmod(s.path, 0600); err != nil {
l.Close()
return fmt.Errorf("failed to set pod info socket mode: %v", err)
}

s.SetKeepAlivesEnabled(false)
go utilwait.Forever(func() {
if err := s.Serve(l); err != nil {
utilruntime.HandleError(fmt.Errorf("CNI server Serve() failed: %v", err))
}
}, 0)
return nil
}

// Split the "CNI_ARGS" environment variable's value into a map. CNI_ARGS
// contains arbitrary key/value pairs separated by ';' and is for runtime or
// plugin specific uses. Kubernetes passes the pod namespace and name in
// CNI_ARGS.
func gatherCNIArgs(env map[string]string) (map[string]string, error) {
cniArgs, ok := env["CNI_ARGS"]
if !ok {
return nil, fmt.Errorf("missing CNI_ARGS: '%s'", env)
}

mapArgs := make(map[string]string)
for _, arg := range strings.Split(cniArgs, ";") {
parts := strings.Split(arg, "=")
if len(parts) != 2 {
return nil, fmt.Errorf("invalid CNI_ARG '%s'", arg)
}
mapArgs[strings.TrimSpace(parts[0])] = strings.TrimSpace(parts[1])
}
return mapArgs, nil
}

func cniRequestToPodRequest(r *http.Request) (*PodRequest, error) {
var cr CNIRequest
b, _ := ioutil.ReadAll(r.Body)
if err := json.Unmarshal(b, &cr); err != nil {
return nil, fmt.Errorf("JSON unmarshal error: %v", err)
}

cmd, ok := cr.Env["CNI_COMMAND"]
if !ok {
return nil, fmt.Errorf("Unexpected or missing CNI_COMMAND")
}

req := &PodRequest{
Command: CNICommand(cmd),
Result: make(chan *PodResult),
}

req.ContainerId, ok = cr.Env["CNI_CONTAINERID"]
if !ok {
return nil, fmt.Errorf("missing CNI_CONTAINERID")
}
req.Netns, ok = cr.Env["CNI_NETNS"]
if !ok {
return nil, fmt.Errorf("missing CNI_NETNS")
}

cniArgs, err := gatherCNIArgs(cr.Env)
if err != nil {
return nil, err
}

req.PodNamespace, ok = cniArgs["K8S_POD_NAMESPACE"]
if err != nil {
return nil, fmt.Errorf("missing K8S_POD_NAMESPACE")
}

req.PodName, ok = cniArgs["K8S_POD_NAME"]
if err != nil {
return nil, fmt.Errorf("missing K8S_POD_NAME")
}

return req, nil
}

// Dispatch a pod request to the request handler and return the result to the
// CNI server client
func (s *CNIServer) handleCNIRequest(w http.ResponseWriter, r *http.Request) {
req, err := cniRequestToPodRequest(r)
if err != nil {
http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
return
}

glog.V(5).Infof("Waiting for %s result for pod %s/%s", req.Command, req.PodNamespace, req.PodName)
result, err := s.requestFunc(req)
if err != nil {
http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
} else {
// Empty response JSON means success with no body
w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(result); err != nil {
glog.Warningf("Error writing %s HTTP response: %v", req.Command, err)
}
}
}
Loading

0 comments on commit d9b9f56

Please sign in to comment.