-
Notifications
You must be signed in to change notification settings - Fork 0
/
cluster.go
500 lines (447 loc) · 21.1 KB
/
cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
// Copyright 2022 API7.ai, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cloud
import (
"context"
"encoding/json"
"path"
"time"
"github.com/pkg/errors"
)
// ClusterStage is used to depict different cluster lifecycles.
type ClusterStage int
func (cs ClusterStage) String() string {
switch cs {
case ClusterPending:
return "pending"
case ClusterCreating:
return "creating"
case ClusterNormal:
return "normal"
case ClusterCreateFailed:
return "create failed"
case ClusterDeleting:
return "deleting"
case ClusterDeleted:
return "deleted"
default:
return "unknown"
}
}
const (
// ClusterPending means a cluster is not created yet.
ClusterPending = ClusterStage(iota + 1)
// ClusterCreating means a cluster is being created.
ClusterCreating
// ClusterNormal means a cluster was created, and now it's normal.
ClusterNormal
// ClusterCreateFailed means a cluster was not created successfully.
ClusterCreateFailed
// ClusterDeleting means a cluster is being deleted.
ClusterDeleting
// ClusterDeleted means a cluster was deleted.
ClusterDeleted
)
const (
// DeleteURITailSlash means delete the tail slash of the request uri before matching
DeleteURITailSlash = "Delete Tail Slash"
)
const (
// RewriteServerHeader means rewrite the Server header in the response.
RewriteServerHeader = "Rewrite"
// HideVersionToken means hide the APISIX version info in the Server header.
HideVersionToken = "Hide Version Token"
)
const (
// RealIPPositionHeader indicates the real ip is in an HTTP header.
RealIPPositionHeader = "header"
// RealIPPositionQuery indicates the real ip is in a query string.
RealIPPositionQuery = "query"
// RealIPPositionCookie indicates the real ip is in cookie.
RealIPPositionCookie = "cookie"
)
// GatewayInstanceStatus is the status of an gateway instance.
type GatewayInstanceStatus string
const (
// GatewayInstanceHealthy indicates the instance is healthy. Note Healthy means
// the heartbeat probes sent from the instance are received periodically,
// at the same while, the configuration delivery (currently it's ETCD
// connections) is also normal.
GatewayInstanceHealthy = GatewayInstanceStatus("Healthy")
// GatewayInstanceOnlyHeartbeats indicates the instance sends heartbeat probes
// periodically but the configuration cannot be delivered to the instance.
GatewayInstanceOnlyHeartbeats = GatewayInstanceStatus("Only Heartbeats")
// GatewayInstanceLostConnection indicate the instance lose heartbeat in short time(between InstanceLostConnectionThresholdDuration and InstanceOfflineThresholdDuration)
GatewayInstanceLostConnection = GatewayInstanceStatus("Lost Connection")
// GatewayInstanceOffline indicates the instance loses heartbeat for long time(out-of the InstanceLiveThresholdDuration)
GatewayInstanceOffline = GatewayInstanceStatus("Offline")
)
// Cluster contains the cluster specification and management fields.
type Cluster struct {
ClusterSpec
// ID is the unique identify of this cluster.
ID ID `json:"id,inline"`
// Name is the cluster name.
Name string `json:"name"`
// CreatedAt is the creation time.
CreatedAt time.Time `json:"created_at"`
// UpdatedAt is the last modified time.
UpdatedAt time.Time `json:"updated_at"`
}
// ClusterSpec is the specification of cluster.
type ClusterSpec struct {
// OrganizationID refers to an Organization object, which
// indicates the belonged organization for this cluster.
OrganizationID ID `json:"org_id"`
// RegionID refers to a Region object, which indicates the
// region that the Cloud Plane resides.
RegionID ID `json:"region_id"`
// Status indicates the cluster status, candidate values are:
// * ClusterBuildInProgress: the cluster is being created.
// * ClusterCreating means a cluster is being created.
// * ClusterNormal: the cluster is built, and can be used normally.
// * ClusterCreateFailed means a cluster was not created successfully.
// * ClusterDeleting means a cluster is being deleted.
// * ClusterDeleted means a cluster was deleted.
Status ClusterStage `json:"status"`
// Domain is the domain assigned by API7 Cloud and has correct
// records so that gateway instances can access API7 Cloud by it.
Domain string `json:"domain"`
// ConfigPayload is the customized gateway config for specific cluster
ConfigPayload string `json:"-"`
// Settings is the settings for the cluster.
Settings ClusterSettings `json:"settings"`
// Plugins settings on cluster level
Plugins Plugins `json:"policies,omitempty"`
// ConfigVersion is the version for the cluster.
ConfigVersion int `json:"config_version"`
}
// ClusterSettings is cluster settings
type ClusterSettings struct {
// ClientSettings is the client settings config that used in apisix
ClientSettings ClientSettings `json:"client_settings"`
// ObservabilitySettings is the observability settings config that used in apisix
ObservabilitySettings ObservabilitySettings `json:"observability_settings"`
// APIProxySettings is the api proxy settings config that used in apisix
APIProxySettings APIProxySettings `json:"api_proxy_settings"`
}
// APIProxySettings is the api proxy settings config
type APIProxySettings struct {
// EnableRequestBuffering indicates whether to enable request buffering
EnableRequestBuffering bool `json:"enable_request_buffering"`
// ServerHeaderCustomization is the server header customization settings
ServerHeaderCustomization *ServerHeaderCustomization `json:"server_header_customization,omitempty"`
// URLHandlingOptions is the url handling options using in gateway
// Optional values are:
// * DeleteURITailSlash
URLHandlingOptions []string `json:"url_handling_options"`
}
// ServerHeaderCustomization is the server header customization settings
type ServerHeaderCustomization struct {
// Mode is the mode of the customization
// Optional values can be:
// * RewriteServerHeader, rewrite the server header, value is specified by `NewServerHeader`.
// * HideServerToken, still use APISIX as the server header, but hide the version token.
Mode string `json:"mode,omitempty"`
// NewServerHeader is the new server header
NewServerHeader string `json:"new_server_header,omitempty"`
}
// ClientSettings is the client settings config
type ClientSettings struct {
// ClientRealIP is the client real ip config that used in apisix
ClientRealIP ClientRealIPConfig `json:"client_real_ip"`
// MaximumRequestBodySize is the maximum request body size that used in apisix, 0 means no limit
MaximumRequestBodySize uint64 `json:"maximum_request_body_size"`
}
// ClientRealIPConfig is the client real ip config
type ClientRealIPConfig struct {
// ReplaceFrom is the client ip replace from config
ReplaceFrom ClientIPReplaceFrom `json:"replace_from"`
// TrustedAddresses is the client ip trusted addresses
TrustedAddresses []string `json:"trusted_addresses,omitempty"`
// RecursiveSearch indicates whether the client ip is searched recursively
RecursiveSearch bool `json:"recursive_search"`
// Enable indicates whether real ip is enabled
Enabled bool `json:"enabled"`
}
// ClientIPReplaceFrom is the client ip replace from config
type ClientIPReplaceFrom struct {
// Position is the position that the client ip should be got from
// Optional values are:
// * RealIPPositionHeader, indicates the real ip is in an HTTP header, and the header name is specified by `Name` field.
// * RealIPPositionQuery, indicates the real ip is in the query string, and the query name is specified by `Name` field.
// * RealIPPositionCookie, indicates the real ip is in the Cookie, and the field name is specified by `Name` field.
Position string `json:"position,omitempty"`
// Name is the name of the variable that the client ip should be got from
Name string `json:"name,omitempty"`
}
// ObservabilitySettings is the observability settings config
type ObservabilitySettings struct {
Metrics MetricsConfig `json:"metrics,omitempty"`
// ShowUpstreamStatusInResponseHeader indicates whether to show all upstream status
// in `X-APISIX-Upstream-Status` header.
// This header will be shown only when the status code is `5xx` when this field is diable.
ShowUpstreamStatusInResponseHeader bool `json:"show_upstream_status_in_response_header"`
// AccessLogRotate is the access log rotate settings config
AccessLogRotate AccessLogRotateSettings `json:"access_log_rotate"`
}
// MetricsConfig contains configurations related to metrics.
type MetricsConfig struct {
// Enable indicates whether gateway instances' metrics should be collected to API7 Cloud.
Enabled bool `json:"enabled"`
}
// AccessLogRotateSettings is the access log rotate settings config
type AccessLogRotateSettings struct {
// Enabled indicates whether access log rotation is enabled.
Enabled bool `json:"enabled"`
// Interval is time in seconds specifying how often to rotate the logs.
Interval uint64 `json:"interval,omitempty"`
// MaximumKeptLogEntries is the maximum number of log entries to keep.
MaximumKeptLogEntries uint64 `json:"maximum_kept_log_entries,omitempty"`
// EnableCompression indicates whether to compress the log files.
EnableCompression bool `json:"enable_compression"`
}
// TLSBundle contains a pair of certificate, private key,
// and the issuing certificate.
type TLSBundle struct {
Certificate string `json:"certificate"`
PrivateKey string `json:"private_key"`
CACertificate string `json:"ca_certificate"`
}
// GatewayInstancePayload contains basic information for a gateway instance.
type GatewayInstancePayload struct {
// ID is the unique identity for the gateway instance.
ID string `json:"id"`
// Hostname is the name for the VM or container that the gateway
// instance resides.
Hostname string `json:"hostname"`
// IP is the IP address of the VM or container that the gateway
// instance resides.
IP string `json:"ip"`
// Domain is the domain assigned by API7 Cloud for the owner
// (organization) of the gateway instance.
Domain string `json:"domain"`
// APICalls is the number of HTTP requests counted in the reporting period
APICalls uint64 `json:"api_calls"`
// Version is the version of the gateway
Version string `json:"version"`
// EtcdReachable indicates whether the instance can reach the etcd.
EtcdReachable bool `json:"etcd_reachable"`
// ConfigVersion is the version of the config currently in use on the gateway
ConfigVersion uint64 `json:"config_version"`
}
// GatewayInstance shows the gateway instance (Apache APISIX) status.
type GatewayInstance struct {
GatewayInstancePayload `json:",inline"`
// LastSeenTime is the last time that Cloud seen this instance.
// An instance should be marked as offline once the elapsed time is over
// 30s since the last seen time.
LastSeenTime time.Time `json:"last_seen_time"`
// RegisterTime is the first time that Cloud seen this instance.
RegisterTime time.Time `json:"register_time"`
// Status is the instance status.
Status GatewayInstanceStatus `json:"status"`
}
// ClusterInterface is the interface for manipulating cluster.
type ClusterInterface interface {
// GetCluster gets an existing API7 Cloud Cluster.
// The given `clusterID` parameter should specify the Cluster that you want to get.
// Users need to specify the Organization.ID in the `opts`.
GetCluster(ctx context.Context, clusterID ID, opts *ResourceGetOptions) (*Cluster, error)
// UpdateClusterSettings updates the ClusterSettings for the specified Cluster.
// The given `clusterID` parameter should specify the Cluster that you want to update.
// The given `settings` parameter should specify the new settings you want to apply.
// Users need to specify the Organization.ID in the `opts`.
UpdateClusterSettings(ctx context.Context, clusterID ID, settings *ClusterSettings, opts *ResourceUpdateOptions) error
// UpdateClusterPlugins updates the plugins bound on the specified Cluster.
// The given `clusterID` parameter should specify the Cluster that you want to update.
// The given `plugins` parameter should specify the new plugins you want to bind.
// Users need to specify the Organization.ID in the `opts`.
UpdateClusterPlugins(ctx context.Context, clusterID ID, plugins Plugins, opts *ResourceUpdateOptions) error
// ListClusters returns an iterator for listing clusters in the specified Organization with the
// given list conditions.
// Users need to specify the Organization, Paging, and Filter conditions (if necessary)
// in the `opts`.
ListClusters(ctx context.Context, opts *ResourceListOptions) (ClusterListIterator, error)
// GenerateGatewaySideCertificate generates the tls bundle for gateway instances to communicate with
// the specified cluster on API7 Cloud.
// The `clusterID` parameter specifies the cluster ID.
// Note currently users don't need to pass the `opts` parameter. Just pass `nil` is OK.
GenerateGatewaySideCertificate(ctx context.Context, clusterID ID, opts *ResourceCreateOptions) (*TLSBundle, error)
// GetGatewayInstanceStartupConfigTemplate returns the startup configuration template (Apache APISIX config.yaml)
// for starting a gateway instance.
// The configType specifies the configuration type for this call. Optional values can be:
// * apisix: indicates the original APISIX config.yaml
// * helm: indicates the APISIX helm chart values.yaml
GetGatewayInstanceStartupConfigTemplate(ctx context.Context, clusterID ID, configType string, opts *ResourceGetOptions) (string, error)
// ListAllGatewayInstances returns all the gateway instances (ever) connected to the given cluster.
// Note currently users don't need to pass the `opts` parameter. Just pass `nil` is OK.
ListAllGatewayInstances(ctx context.Context, clusterID ID, opts *ResourceListOptions) ([]GatewayInstance, error)
// ListAllAPILabels lists all labels for API.
// The `clusterID` parameter specifies the cluster ID.
// Note currently users don't need to pass the `opts` parameter. Just pass `nil` is OK.
// The returned label slice will be `nil` if there is no any labels for API.
ListAllAPILabels(ctx context.Context, clusterID ID, opts *ResourceListOptions) ([]string, error)
// ListAllApplicationLabels lists all labels for Application.
// The `clusterID` parameter specifies the cluster ID.
// Note currently users don't need to pass the `opts` parameter. Just pass `nil` is OK.
// The returned label slice will be `nil` if there is no any labels for Application.
ListAllApplicationLabels(ctx context.Context, clusterID ID, opts *ResourceListOptions) ([]string, error)
// ListAllCertificateLabels lists all labels for Certificate.
// The `clusterID` parameter specifies the cluster ID.
// Note currently users don't need to pass the `opts` parameter. Just pass `nil` is OK.
// The returned label slice will be `nil` if there is no any labels for Certificate.
ListAllCertificateLabels(ctx context.Context, clusterID ID, opts *ResourceListOptions) ([]string, error)
// ListAllConsumerLabels lists all labels for Consumer.
// The `clusterID` parameter specifies the cluster ID.
// Note currently users don't need to pass the `opts` parameter. Just pass `nil` is OK.
// The returned label slice will be `nil` if there is no any labels for Consumer.
ListAllConsumerLabels(ctx context.Context, clusterID ID, opts *ResourceListOptions) ([]string, error)
// DebugClusterSettings returns the corresponding translated APISIX global rules for this Cluster.
DebugClusterSettings(ctx context.Context, opts *ResourceGetOptions) (string, error)
}
// ClusterListIterator is an iterator for listing clusters.
type ClusterListIterator interface {
// Next returns the next cluster according to the filter conditions.
Next() (*Cluster, error)
}
type clusterImpl struct {
client httpClient
}
type clusterListIterator struct {
iter listIterator
}
func (iter *clusterListIterator) Next() (*Cluster, error) {
var cluster Cluster
rawData, err := iter.iter.Next()
if err != nil {
return nil, err
}
if rawData == nil {
return nil, nil
}
if err = json.Unmarshal(rawData, &cluster); err != nil {
return nil, err
}
return &cluster, nil
}
func newCluster(cli httpClient) ClusterInterface {
return &clusterImpl{
client: cli,
}
}
func (impl *clusterImpl) GetCluster(ctx context.Context, clusterID ID, opts *ResourceGetOptions) (*Cluster, error) {
var cluster Cluster
uri := path.Join(_apiPathPrefix, "clusters", clusterID.String())
if err := impl.client.sendGetRequest(ctx, uri, "", jsonPayloadDecodeFactory(&cluster), appendHeader(mapClusterId(clusterID))); err != nil {
return nil, err
}
return &cluster, nil
}
func (impl *clusterImpl) UpdateClusterSettings(ctx context.Context, clusterID ID, settings *ClusterSettings, opts *ResourceUpdateOptions) error {
uri := path.Join(_apiPathPrefix, "clusters", clusterID.String(), "config")
if err := impl.client.sendPatchRequest(ctx, uri, "", settings, nil, appendHeader(mapClusterId(clusterID))); err != nil {
return err
}
return nil
}
func (impl *clusterImpl) UpdateClusterPlugins(ctx context.Context, clusterID ID, plugins Plugins, opts *ResourceUpdateOptions) error {
uri := path.Join(_apiPathPrefix, "clusters", clusterID.String(), "plugins")
if err := impl.client.sendPatchRequest(ctx, uri, "", plugins, nil, appendHeader(mapClusterId(clusterID))); err != nil {
return err
}
return nil
}
func (impl *clusterImpl) ListClusters(ctx context.Context, opts *ResourceListOptions) (ClusterListIterator, error) {
iter := listIterator{
ctx: ctx,
resource: "cluster",
client: impl.client,
path: path.Join(_apiPathPrefix, "orgs", opts.Organization.ID.String(), "clusters"),
paging: mergePagination(opts.Pagination),
filter: opts.Filter,
headers: appendHeader(mapClusterIdFromOpts(opts)),
}
return &clusterListIterator{iter: iter}, nil
}
func (impl *clusterImpl) GenerateGatewaySideCertificate(ctx context.Context, clusterID ID, _ *ResourceCreateOptions) (*TLSBundle, error) {
var bundle TLSBundle
uri := path.Join(_apiPathPrefix, "clusters", clusterID.String(), "gateway_certificate")
err := impl.client.sendGetRequest(ctx, uri, "", jsonPayloadDecodeFactory(&bundle), appendHeader(mapClusterId(clusterID)))
if err != nil {
return nil, err
}
return &bundle, nil
}
func (impl *clusterImpl) GetGatewayInstanceStartupConfigTemplate(ctx context.Context, clusterID ID, configType string, _ *ResourceGetOptions) (string, error) {
var configPayload struct {
Configuration string `json:"configuration"`
}
uri := path.Join(_apiPathPrefix, "clusters", clusterID.String(), "startup_config_tpl", configType)
err := impl.client.sendGetRequest(ctx, uri, "", jsonPayloadDecodeFactory(&configPayload), appendHeader(mapClusterId(clusterID)))
if err != nil {
return "", err
}
return configPayload.Configuration, nil
}
func (impl *clusterImpl) ListAllGatewayInstances(ctx context.Context, clusterID ID, _ *ResourceListOptions) ([]GatewayInstance, error) {
var (
lr listResponse
instances []GatewayInstance
)
uri := path.Join(_apiPathPrefix, "clusters", clusterID.String(), "instances")
err := impl.client.sendGetRequest(ctx, uri, "", jsonPayloadDecodeFactory(&lr), appendHeader(mapClusterId(clusterID)))
if err != nil {
return nil, err
}
for i, raw := range lr.List {
var instance GatewayInstance
if err = json.Unmarshal(raw, &instance); err != nil {
return nil, errors.Wrapf(err, "unmarshal gateway instance #%d", i)
}
instances = append(instances, instance)
}
return instances, nil
}
func (impl *clusterImpl) ListAllAPILabels(ctx context.Context, clusterID ID, _ *ResourceListOptions) ([]string, error) {
return impl.listAllLabels(ctx, clusterID, "api")
}
func (impl *clusterImpl) ListAllApplicationLabels(ctx context.Context, clusterID ID, _ *ResourceListOptions) ([]string, error) {
return impl.listAllLabels(ctx, clusterID, "application")
}
func (impl *clusterImpl) ListAllConsumerLabels(ctx context.Context, clusterID ID, _ *ResourceListOptions) ([]string, error) {
return impl.listAllLabels(ctx, clusterID, "consumer")
}
func (impl *clusterImpl) ListAllCertificateLabels(ctx context.Context, clusterID ID, _ *ResourceListOptions) ([]string, error) {
return impl.listAllLabels(ctx, clusterID, "certificate")
}
func (impl *clusterImpl) listAllLabels(ctx context.Context, clusterID ID, resource string) ([]string, error) {
var labels []string
uri := path.Join(_apiPathPrefix, "clusters", clusterID.String(), "labels", resource)
err := impl.client.sendGetRequest(ctx, uri, "", jsonPayloadDecodeFactory(&labels), appendHeader(mapClusterId(clusterID)))
if err != nil {
return nil, err
}
return labels, nil
}
func (impl *clusterImpl) DebugClusterSettings(ctx context.Context, opts *ResourceGetOptions) (string, error) {
var rawData json.RawMessage
clusterID := opts.Cluster.ID.String()
uri := path.Join(_apiPathPrefix, "debug", "config", "clusters", clusterID, "cluster_settings", clusterID)
err := impl.client.sendGetRequest(ctx, uri, "", jsonPayloadDecodeFactory(&rawData), appendHeader(mapClusterIdFromOpts(opts)))
if err != nil {
return "", err
}
return formatJSONData(rawData)
}