Camel
對於 Java 來說是一個整合函式庫,期提供 API 使你可以整合不同的來源數據,Camel
可以說是不同數據之間的膠水,從一個應用程式獲取數據,並透過管道傳輸到另一個應用程式,該過程可以轉換、變更。
當有資源需要從 A 到 B 或許 Camel
是一個好的整合工具,其場景可能是
- 發送 Mail
- 地端資料送到雲端硬碟
- 從 AWS SQS 獲取訊息,再調用我們自己 API
- XML 轉 CSV
- 從資料庫獲取資料
- 等等
Camel
有以下核心元件
- Camel Core
- 包含 Camel APIs、Camel runtime engine、Camel 應用程式運行時管理的功能
- Component JARs
- 每個組件擴展了 Camel 與第三方系統或 API 交互的功能
- Platform support JARs
- 提供 Camel 在不同平台上運行的功能
下圖是 Camel
概念圖
Camel
提供 runtime engine,他叫 Camel Context,負責運行 Camel
整合。
Camel 集成概念
Route
在 Camel
中,這些集成管道稱為 Routes。一個 Route
由一連串步驟組成。第一步要嘛接收或獲取數據,然後再透過後續步驟傳遞到 Route
的末尾。
from("file:documents/invoices") // 接收數據的路由
.to("file:documents/done"); // 最後結束的路由
定義 Route 可用 Java DSL 或 XML DSL。
Endpoint
在寫 Route
時,經常需要與其它系統進行交互。Endpoint
是 Camel
與另一個系統交換資料的接口,Camel 可從 Endpoint
接收或是發送訊息。
以上述範例來看,Camel 接收來自 file
Endpoint
的訊息,其讀取硬碟上的檔案。在這過程中也許會因為需求而已某些方式進行處理轉換。而 Camel 路由中的 Endpoint
由組件(Component) 組成。
Component
要讓 Endpoint
能夠作用,需使用組件(Component)。它是一個套件,允許你和外部系統交互,像是 mail、Dropbox 等。
藉由組件我們可以不必浪費時間在寫程式碼,只須找適合的組件來應用。而組件是可重用的,並且也可自行創建。
Component | Purpose | Endpoint URI |
---|---|---|
File | read or write a file | file: |
Direct | join your Camel routes together | direct: |
HTTP | make an HTTP request | http: |
如果以 producer 和 consumer 角度來看,前者表示寫(硬碟寫檔、發送訊息)後者表示讀(硬碟讀檔、接收 API 請求)。
Processor
Processor
是 Camel 路由中另一個步驟。允許針對訊息運行一些自定義代碼。可以實現一些業務邏輯、調用另一個系統或 Java 的 API。
Processor
是一個實現 Camel 中的 Processor
接口的一個 Java 類別。
Example
下面定義了三個路由的範例。
package com.example.cch.cameldemo.route;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
// 將 src-folder 目錄中的所有檔案移動到 des-folder 目錄
@Component
public class FileRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
// TODO Auto-generated method stub
from("file:"+ "src/test/src-folder/" + "?noop=true")
.log("Received...")
.to("file:"+ "src/test/des-folder/");
}
}
Gradel 需要加以下的套件
// https://mvnrepository.com/artifact/org.apache.camel.springboot/camel-core-starter
implementation group: 'org.apache.camel.springboot', name: 'camel-core-starter', version: '3.16.0'
// https://mvnrepository.com/artifact/org.apache.camel.springboot/camel-file-starter
implementation group: 'org.apache.camel.springboot', name: 'camel-file-starter', version: '3.16.0'
Enterprise Integration Patterns
Enterprise Integration Patterns
(EIPs) 對於 Camel 來說是一個重要的模式。是 Camel 的基本組成部分,一個 Pipes and Filters pattern 在 Camel 中表示為 Route。
Camel 完整實現模式的列表資源 Camel EIPs
End of a route
在 Route
結束時,Camel
會根據消息的屬性(Message Exchange Pattern, MEP) 做兩件事之一。
- Return the final output message back to the consumer
- Camel 將來自最終端點的輸出訊息返回給調用者,並完成路由
- Do nothing
- Exchange pattern 設置為
InOnly
,就是消費者(consumer )從不期望從調用者那裡收到訊息。
- Exchange pattern 設置為
上述的範例就是一個 Do nothing
的例子。
Messages in Camel
Camel 使用訊息模式(message model)處理數據。Camel 將 Route 中的數據視為訊息,有可能是以下
- Web 服務請求
- 硬碟中的檔案
- 從 JMS 佇列接收訊息
在 Camel 中有一個 message 的 API 物件來表示一條訊息,會有 Body、Header 和屬性(可用來保存訊息關聯的值),最後沿著 Route 往下傳遞。
Introducing the Exchange
Exchange
只是當前在 Route 內發生的訊息或交互內容。
Exchange
是在 Route 開始建立時建立的,也就是 from
,會隨著訊息經過的 Route 而更新。而 Exchange
物件會保存當前的請求和響應訊息。
Message contain ?
對於訊息的 Body 可以是 Java 物件、JSON 或是純文字等,對於 Camel 來說這個優勢非常的強大,我們可以省下轉換的程式碼。
Component | What it is |
---|---|
Message | 在執行 Camel Route 期間建立的出入站訊息 |
Headers | 鍵值對,可定義訊息(Message)的屬性或是要儲存的自定義值 |
Body | 放置訊息內容的位置 |
Exchange | Message 的容器,保存當前出入的訊息,和有關當前請求的其它元數據 |
Components and endpoints
Camel
中的 endpoint
是 Route
的基石。它們是接收、發送訊息的一部分,由 component
實現,它們可以與外部系統進行整合,像是
- file
- messaging
- web service
- cloud app
Example
以下是複雜一點的範例。
Example 1
以下是一個透過 timer.period
傳入秒數,並定期運行該 Route。greeting.word
和 timer.period
都是環境變數
@Component
public class DemoRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
// TODO Auto-generated method stub
from("timer:hello?period={{timer.period}}")
.setBody(simple("{{greeting.word}}, Hello from timer!"))
.to("log:out");
}
}
結果的 log
2022-05-02 12:14:44.945 INFO 13332 --- [- timer://hello] out : Exchange[ExchangePattern: InOnly, BodyType: String, Body: Itachi, Hello from timer!]
2022-05-02 12:14:45.934 INFO 13332 --- [- timer://hello] out : Exchange[ExchangePattern: InOnly, BodyType: String, Body: Itachi, Hello from timer!]
2022-05-02 12:14:46.951 INFO 13332 --- [- timer://hello] out : Exchange[ExchangePattern: InOnly, BodyType: String, Body: Itachi, Hello from timer!]
Example 2
@Component
public class MethodRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
// TODO Auto-generated method stub
from("timer:hello?period={{timer.period}}&delay=2000").routeId("hello")
.transform().method("myBeanServiceImpl", "saySomething")
.log(">>> ${body}")
.filter(simple("${body} contains 'foo'"))
.to("log:foo")
.end()
.to("stream:out"); // System.in、System.out、System.err
}
}
Summary
Camel
的Timer
組件(component) 可用於定期觸發路由(Route)transform
我們想要更改訊息的內容,透過method
呼叫bean()
方法。Camel 會在 Spring boot 註冊表中找到名為myBeanServiceImpl
的Bean
,而method
第二個參數則是呼叫的方法filter
告訴Camel
根據表達式過濾訊息- 最後
to
則是以表準輸出進行訊息打印
範例鏈結