Camel 概觀

December 8, 2020

Camel 對於 Java 來說是一個整合函式庫,期提供 API 使你可以整合不同的來源數據,Camel 可以說是不同數據之間的膠水,從一個應用程式獲取數據,並透過管道傳輸到另一個應用程式,該過程可以轉換、變更。

當有資源需要從 A 到 B 或許 Camel 是一個好的整合工具,其場景可能是

Camel 有以下核心元件

下圖是 Camel 概念圖

Camel 提供 runtime engine,他叫 Camel Context,負責運行 Camel 整合。

Camel 集成概念

Route

Camel 中,這些集成管道稱為 Routes。一個 Route 由一連串步驟組成。第一步要嘛接收獲取數據,然後再透過後續步驟傳遞到 Route 的末尾。

from("file:documents/invoices") // 接收數據的路由
  .to("file:documents/done"); // 最後結束的路由

定義 Route 可用 Java DSL 或 XML DSL。

Endpoint

在寫 Route 時,經常需要與其它系統進行交互。EndpointCamel 與另一個系統交換資料的接口,Camel 可從 Endpoint 接收或是發送訊息。

以上述範例來看,Camel 接收來自 file Endpoint 的訊息,其讀取硬碟上的檔案。在這過程中也許會因為需求而已某些方式進行處理轉換。而 Camel 路由中的 Endpoint 由組件(Component) 組成。

Component

要讓 Endpoint 能夠作用,需使用組件(Component)。它是一個套件,允許你和外部系統交互,像是 mail、Dropbox 等。

藉由組件我們可以不必浪費時間在寫程式碼,只須找適合的組件來應用。而組件是可重用的,並且也可自行創建。

ComponentPurposeEndpoint URI
Fileread or write a filefile:
Directjoin your Camel routes togetherdirect:
HTTPmake an HTTP requesthttp:

如果以 producer 和 consumer 角度來看,前者表示寫(硬碟寫檔、發送訊息)後者表示讀(硬碟讀檔、接收 API 請求)。

Processor

Processor 是 Camel 路由中另一個步驟。允許針對訊息運行一些自定義代碼。可以實現一些業務邏輯、調用另一個系統或 Java 的 API。

Processor 是一個實現 Camel 中的 Processor 接口的一個 Java 類別。

Example

下面定義了三個路由的範例。

package com.example.cch.cameldemo.route;

import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
// 將 src-folder 目錄中的所有檔案移動到 des-folder 目錄
@Component
public class FileRouteBuilder extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        // TODO Auto-generated method stub
        from("file:"+ "src/test/src-folder/" + "?noop=true")
        .log("Received...")
        .to("file:"+ "src/test/des-folder/");
    }
}

Gradel 需要加以下的套件

// https://mvnrepository.com/artifact/org.apache.camel.springboot/camel-core-starter
implementation group: 'org.apache.camel.springboot', name: 'camel-core-starter', version: '3.16.0'
	// https://mvnrepository.com/artifact/org.apache.camel.springboot/camel-file-starter
implementation group: 'org.apache.camel.springboot', name: 'camel-file-starter', version: '3.16.0'

Enterprise Integration Patterns

Enterprise Integration Patterns(EIPs) 對於 Camel 來說是一個重要的模式。是 Camel 的基本組成部分,一個 Pipes and Filters pattern 在 Camel 中表示為 Route。

Camel 完整實現模式的列表資源 Camel EIPs

End of a route

Route 結束時,Camel 會根據消息的屬性(Message Exchange Pattern, MEP) 做兩件事之一。

上述的範例就是一個 Do nothing 的例子。

Messages in Camel

Camel 使用訊息模式(message model)處理數據。Camel 將 Route 中的數據視為訊息,有可能是以下

在 Camel 中有一個 message 的 API 物件來表示一條訊息,會有 Body、Header 和屬性(可用來保存訊息關聯的值),最後沿著 Route 往下傳遞。

Introducing the Exchange

Exchange 只是當前在 Route 內發生的訊息或交互內容。

Exchange 是在 Route 開始建立時建立的,也就是 from,會隨著訊息經過的 Route 而更新。而 Exchange 物件會保存當前的請求和響應訊息。

Message contain ?

對於訊息的 Body 可以是 Java 物件、JSON 或是純文字等,對於 Camel 來說這個優勢非常的強大,我們可以省下轉換的程式碼。

ComponentWhat it is
Message在執行 Camel Route 期間建立的出入站訊息
Headers鍵值對,可定義訊息(Message)的屬性或是要儲存的自定義值
Body放置訊息內容的位置
ExchangeMessage 的容器,保存當前出入的訊息,和有關當前請求的其它元數據

Components and endpoints

Camel 中的 endpointRoute 的基石。它們是接收、發送訊息的一部分,由 component 實現,它們可以與外部系統進行整合,像是

Example

以下是複雜一點的範例。

Example 1

以下是一個透過 timer.period 傳入秒數,並定期運行該 Route。greeting.wordtimer.period 都是環境變數

@Component
public class DemoRouteBuilder extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        // TODO Auto-generated method stub
        from("timer:hello?period={{timer.period}}")
                .setBody(simple("{{greeting.word}}, Hello from timer!"))
                .to("log:out");
    }

}

結果的 log

2022-05-02 12:14:44.945  INFO 13332 --- [- timer://hello] out                                      : Exchange[ExchangePattern: InOnly, BodyType: String, Body: Itachi, Hello from timer!]
2022-05-02 12:14:45.934  INFO 13332 --- [- timer://hello] out                                      : Exchange[ExchangePattern: InOnly, BodyType: String, Body: Itachi, Hello from timer!]
2022-05-02 12:14:46.951  INFO 13332 --- [- timer://hello] out                                      : Exchange[ExchangePattern: InOnly, BodyType: String, Body: Itachi, Hello from timer!]

Example 2

@Component
public class MethodRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        // TODO Auto-generated method stub
        from("timer:hello?period={{timer.period}}&delay=2000").routeId("hello")
                .transform().method("myBeanServiceImpl", "saySomething")
                .log(">>> ${body}")
                .filter(simple("${body} contains 'foo'"))
                .to("log:foo")
                .end()
                .to("stream:out"); // System.in、System.out、System.err
    }

}

Summary

  1. CamelTimer 組件(component) 可用於定期觸發路由(Route)
  2. transform 我們想要更改訊息的內容,透過 method 呼叫 bean() 方法。Camel 會在 Spring boot 註冊表中找到名為 myBeanServiceImplBean,而 method 第二個參數則是呼叫的方法
  3. filter 告訴 Camel 根據表達式過濾訊息
  4. 最後 to 則是以表準輸出進行訊息打印

範例鏈結

Ref