Pipeline Explained
Easegress offers a rich set of filters tailored for diverse use cases, including validation, request building, and more. At its core, the Pipeline
serves as an integrated framework within Easegress, designed specifically to coordinate and sequence filters for processing requests.
- It supports calling multiple backend services for one input request.
- It supports conditional forward direction jumping.
- The
HTTPServer
receives incoming traffic and routes to one dedicatedPipeline
in Easegress according to the HTTP header, path matching, etc.
Details
Sequences executing
- The basic model of Pipeline execution is a sequence. Filters are executed one by one in the order described by the
flow
field in Pipeline’s spec.
Request
│
┌──────────┼──────────┐
│ Pipeline │ │
│ │ │
│ ┌───────┴───────┐ │
│ │requestAdaptor │ │
│ └───────┬───────┘ │
│ │ │
│ │ │
│ ┌───────▼───────┐ │
│ │ Proxy │ │
│ └───────┬───────┘ │
│ │ │
└──────────┼──────────┘
│
▼
Response
$ echo '
name: pipeline-demo
kind: Pipeline
flow:
- filter: requestAdaptor
- filter: proxy
filters:
- name: requestAdaptor
kind: RequestAdaptor
header:
set:
X-Adapt-Key: goodplan
- name: proxy
kind: Proxy
pools:
- servers:
- url: http://127.0.0.1:9095
' | egctl create -f -
- The
pipeline-demo
above will execute therequestAdaptor
filter first, thenproxy
. So theproxy
filter can forward the request with the headerX-Adapt-Key
which was set by therequestAdaptor
.
JumpIf
- Easegress’ filter returns a string message as the execution result. If it’s empty, that means these filters handle the request without failure. Otherwise, Easegress will use these no-empty result strings as the basis for the
JumpIf
mechanism. - The Pipeline supports
JumpIf
mechanism[2]. We use it to avoid some chosen filters’ execution when something goes wrong.
Request
│
│
┌──────────────┼─────────────────┐
│ Pipeline │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Validator ├────┐ │
│ └────────┬─────────┘ │ │
│ │ │ │
│ ▼ │ │
│ ┌──────────────────┐ │ │
│ │ RequestAdaptor │ │ │
│ └────────┬─────────┘ │ │
│ │ Invalid│
│ ▼ │ │
│ ┌──────────────────┐ │ │
│ │ Proxy │ │ │
│ └────────┬─────────┘ │ │
│ │ │ │
│ ◄──────────────┘ │
└──────────────┼─────────────────┘
│
▼
Response
$ echo '
name: pipeline-demo
kind: Pipeline
flow:
- filter: validator
jumpIf: { invalid: END }
- filter: requestAdaptor
- filter: proxy
filters:
- name: validator
kind: Validator
headers:
Content-Type:
values:
- application/json
- name: requestAdaptor
kind: RequestAdaptor
header:
set:
X-Adapt-Key: goodplan
- name: proxy
kind: Proxy
pools:
- servers:
- url: http://127.0.0.1:9095
' | egctl apply -f -
- As we can see above,
pipeline-demo
will jump to the end of pipeline execution whenvalidator
’s execution result isinvalid
.
Built-in Filter END
END
is a built-in filter, that’s we can use without defining it. From the above example, we already see thatEND
can be used as the target ofjumpIf
, and we can also use it directly in the flow.
$ echo '
name: pipeline-demo
kind: Pipeline
flow:
- filter: validator
jumpIf: { invalid: buildFailureResponse }
- filter: requestAdaptor
- filter: proxy
- filter: END
- filter: buildFailureResponse
filters:
- name: validator
kind: Validator
headers:
Content-Type:
values:
- application/json
- name: requestAdaptor
kind: RequestAdaptor
header:
set:
X-Adapt-Key: goodplan
- name: proxy
kind: Proxy
pools:
- servers:
- url: http://127.0.0.1:9095
- name: responseBuilder
kind: ResponseBuilder
template: |
statusCode: 400
body: the request is invalid.
' | egctl apply -f -
- By using the
END
filter, we are now possible to build a custom failure response.
Alias
- We have assigned a name to a filter when we define it, but a filter can be used more than once in the flow, in this case, we can assign an alias to each appearance.
$ echo '
name: pipeline-demo
kind: Pipeline
flow:
- filter: validator
jumpIf:
invalid: mockRequestForProxy2
- filter: mockRequest
- filter: proxy1
- filter: END
- filter: mockRequest
alias: mockRequestForProxy2
- filter: proxy2
filters:
- name: validator
kind: Validator
headers:
Content-Type:
values:
- application/json
- name: mockRequest
kind: RequestBuilder
template: |
method: GET
url: /hello
- name: proxy1
kind: Proxy
pools:
- servers:
- url: http://127.0.0.1:9095
- name: proxy2
kind: Proxy
pools:
- servers:
- url: http://127.0.0.2:9095
' | egctl apply -f -
Namespace
- The pipeline can handle multiple requests/responses, requests/responses are grouped by namespaces, and each namespace can have at most one request and one response.
- We can set the
namespace
attribute of filters to let the filter know which request/response it should use. RequestBuilder
andResponseBuilder
can access all requests and responses, they output the result request/response to the namespace they are assigned.- The default namespace is
DEFAULT
.
echo '
name: pipeline-api
kind: Pipeline
flow:
- filter: copyRequest
namespace: demo1
- filter: copyRequest
namespace: demo2
- filter: copyRequest
namespace: demo3
- filter: proxy-demo1
namespace: demo1
- filter: proxy-demo2
namespace: demo2
- filter: proxy-demo3
namespace: demo3
- filter: buildResponse
filters:
- name: copyRequest
kind: RequestBuilder
sourceNamespace: DEFAULT
- name: proxy-demo1
kind: Proxy
pools:
- servers:
- url: https://demo1
- name: proxy-demo2
kind: Proxy
pools:
- servers:
- url: https://demo2
- name: proxy-demo3
kind: Proxy
pools:
- servers:
- url: https://demo3
- name: buildResponse
kind: ResponseBuilder
template: |
statusCode: 200
body: [{{.responses.demo1.Body}}, {{.response.demo2.Body}}, {{.response.demo3.Body}}]
' | egctl create -f -
Usage
GlobalFilter
GlobalFilter is a special pipeline that can be executed before or/and after all pipelines in a server. For example:
name: globalFilter-example
kind: GlobalFilter
beforePipeline:
flow:
- filter: validator
filters:
- name: validator
kind: Validator
...
afterPipeline:
...
---
name: server-example
kind: HTTPServer
globalFilter: globalFilter-example
...
In this case, all requests in HTTPServer
server-example
go through GlobalFilter
globalFilter-example
before and after executing any other pipelines.
Load Balancer
The reverse proxy is the common middleware that is accessed by clients, forwards them to backend servers. The reverse proxy is a very core role played by Easegress.
The filter Proxy
is the filter to fire requests to backend servers.
It contains servers group under load balance, whose policy support roundRobin
, random
, weightedRandom
, ipHash
, headerHash
. For more information, please check load balance.
name: pipeline-reverse-proxy
kind: Pipeline
flow:
- filter: proxy
filters:
- name: proxy
kind: Proxy
pools:
- servers:
- url: http://127.0.0.1:9095
- url: http://127.0.0.1:9096
- url: http://127.0.0.1:9097
loadBalance:
policy: roundRobin
Traffic Adaptor: Change Something of Two-Way Traffic
Sometimes backend applications can’t adapt to quick changes of requirements of traffic. Easegress could be an adaptor between new traffic and old applications. There are 2 phases of adaption in reverse proxy: request adaption, response adaption. RequestAdaptor
supports the adaption of a method, path, header, and body. ResponseAdaptor
supports the adaption of header and body. As you can see, the flow in spec plays a critical role.
name: pipeline-reverse-proxy
kind: Pipeline
flow:
- filter: requestAdaptor
- filter: proxy
- filter: responseAdaptor
filters:
- name: requestAdaptor
kind: RequestAdaptor
host: easegress.megaease.com
method: POST
path:
trimPrefix: /apis/v2
header:
set:
X-Api-Version: v2
- name: responseAdaptor
kind: ResponseAdaptor
header:
set:
Server: Easegress v1.0.0
add:
X-Easegress-Pipeline: pipeline-reverse-proxy
- name: proxy
kind: Proxy
pools:
- servers:
- url: http://127.0.0.1:9095
- url: http://127.0.0.1:9096
- url: http://127.0.0.1:9097
loadBalance:
policy: roundRobin
API Aggregation
API aggregation is a pattern to aggregate multiple individual requests into a single request. This pattern is useful when a client must make multiple calls to different backend systems to operate. Easegress provides filters RequestBuilder & ResponseBuilder for this powerful feature.
Example 1: Simple aggregation
- Suppose we have three backend services called
demo1
,demo2
, anddemo3
. We want to call these three services and combine their response to one.demo1
returns{"mega":"ease"}
as the HTTP response body,demo2
returns{"hello":"world"}
, anddemo3
returns{"hello":"new world"}
.
echo '
name: pipeline-api
kind: Pipeline
flow:
- filter: copyRequest
namespace: demo1
- filter: copyRequest
namespace: demo2
- filter: copyRequest
namespace: demo3
- filter: proxy-demo1
namespace: demo1
- filter: proxy-demo2
namespace: demo2
- filter: proxy-demo3
namespace: demo3
- filter: buildResponse
filters:
- name: copyRequest
kind: RequestBuilder
sourceNamespace: DEFAULT
- name: proxy-demo1
kind: Proxy
pools:
- servers:
- url: https://demo1
- name: proxy-demo2
kind: Proxy
pools:
- servers:
- url: https://demo2
- name: proxy-demo3
kind: Proxy
pools:
- servers:
- url: https://demo3
- name: buildResponse
kind: ResponseBuilder
template: |
statusCode: 200
body: |
[{{.responses.demo1.Body}}, {{.responses.demo2.Body}}, {{.responses.demo3.Body}}]
' | egctl create -f -
- Creating an HTTPServer for forwarding the traffic to this pipeline.
echo '
kind: HTTPServer
name: server-demo
port: 10080
keepAlive: true
https: false
rules:
- paths:
- pathPrefix: /api
backend: pipeline-api ' | egctl create -f -
- Send a request to the pipeline
$ curl -X GET http://127.0.0.1:10080/api -v
[{"hello":"world"},{"hello":"new world"},{"mega":"ease"}]
Example 2: Merge response body
- In #Example 1,
demo2
anddemo3
’s responses share the same JSON key, we want to merge their response body by the JSON key together. If there are duplicated keys, we will use the last value.
- Update the pipeline spec
echo '
name: pipeline-api
kind: Pipeline
flow:
- filter: copyRequest
namespace: demo1
- filter: copyRequest
namespace: demo2
- filter: copyRequest
namespace: demo3
- filter: proxy-demo1
namespace: demo1
- filter: proxy-demo2
namespace: demo2
- filter: proxy-demo3
namespace: demo3
- filter: buildResponse
filters:
- name: copyRequest
kind: RequestBuilder
sourceNamespace: DEFAULT
- name: proxy-demo1
kind: Proxy
pools:
- servers:
- url: https://demo1
- name: proxy-demo2
kind: Proxy
pools:
- servers:
- url: https://demo2
- name: proxy-demo3
kind: Proxy
pools:
- servers:
- url: https://demo3
- name: buildResponse
kind: ResponseBuilder
template: |
statusCode: 200
body: |
{{mergeObject .responses.demo1.JSONBody .responses.demo2.JSONBody .responses.demo3.JSONBody | toRawJson}}
' | egctl apply -f -
- Send a request to the pipeline
$ curl -X GET http://127.0.0.1:10080/api -v
{"hello":"new world","mega":"ease"}
Example 3: Handle Failures
In #Example 1, if one of the backend services encounters a failure, the pipeline will result a wrong result. We can improve the pipeline to handle this kind of failures.
- Update the pipeline spec
echo '
name: pipeline-api
kind: Pipeline
flow:
- filter: copyRequest
namespace: demo1
- filter: copyRequest
namespace: demo2
- filter: copyRequest
namespace: demo3
- filter: proxy-demo1
namespace: demo1
jumpIf: { serverError: proxy-demo2 }
- filter: proxy-demo2
namespace: demo2
- filter: proxy-demo3
namespace: demo3
- filter: buildResponse
filters:
- name: copyRequest
kind: RequestBuilder
sourceNamespace: DEFAULT
- name: proxy-demo1
kind: Proxy
pools:
- servers:
- url: https://demo1
- name: proxy-demo2
kind: Proxy
pools:
- servers:
- url: https://demo2
- name: proxy-demo3
kind: Proxy
pools:
- servers:
- url: https://demo3
- name: buildResponse
kind: ResponseBuilder
template: |
statusCode: {{$code := 503}}{{range $resp := .responses}}{{if eq $resp.StatusCode 200}}{{$code = 200}}{{break}}{{end}}{{end}}{{$code}}
body: |
{{$first := true -}}
[{{range $resp := .responses -}}
{{if eq $resp.StatusCode 200 -}}
{{if not $first}},{{end}}{{$resp.Body}}{{$first = false -}}
{{- end}}
{{- end}}]
' | egctl apply -f -
- Stop service
demo1
and send a request to the pipeline
$ curl -X GET http://127.0.0.1:10080/api -v
[{"hello":"world"},{"hello":"new world"}]