Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement apiToken failover mechanism #1256

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

cr7258
Copy link
Collaborator

@cr7258 cr7258 commented Aug 27, 2024

Ⅰ. Describe what this PR did

配置示例:

provider:
  type: qwen
  apiTokens:
    - "api-token-1"
    - "api-token-2"
    - "api-token-3"
  modelMapping:
    'gpt-3': "qwen-turbo"
    'gpt-4-turbo': "qwen-max"
    '*': "qwen-turbo"
  failover:
    enabled: true
    failureThreshold: 3
    successThreshold: 1
    healthCheckInterval: 5000
    healthCheckTimeout: 5000
    healthCheckModel: gpt-3

目前仅根据 HTTP 请求的响应状态码是否是 200 来判断 apiToken 是否可用,应该暂时用不到其他复杂的判断条件。

Ⅱ. Does this pull request fix one issue?

fixes #1227

Ⅲ. Why don't you add test cases (unit test/integration test)?

Ⅳ. Describe how to verify it

Ⅴ. Special notes for reviews

Question

目前还有两个问题:

    1. 由于 Envoy 会启动多个 Wasm VM,当前的故障切换和健康检测是每个 Wasm VM 分别去做的(也就是说 VM1 可能已经把某个 apiToken 移除了,但是 VM2 可能还会继续用这个 apiToken 进行请求),是否需要通过 proxywasm.SetSharedData 在多个 Wasm VM 间进行同步?如果同步的话会带来另一个问题,如果 apiToken 不可用时,多个 Wasm VM 会同时发起多个健康检测请求。
    1. 我需要发送请求到 envoy 本地监听的服务和端口来对 apiToken 做健康检测,目前我的做法是手动创建一个 cluster,指向 envoy 本地 Listen 的地址和端口,这样好像不太灵活,而且需要用户额外设置 cluster。有没有更好的方式?
healthCheckClient = wrapper.NewClusterClient(wrapper.StaticIpCluster{
		ServiceName: "local_cluster",
		Port:        10000,
	})
    - name: outbound|10000||local_cluster.static
      connect_timeout: 0.25s
      type: STATIC
      load_assignment:
        cluster_name: outbound|10000||local_cluster.static
        endpoints:
          - lb_endpoints:
              - endpoint:
                  address:
                    socket_address:
                      address: 127.0.0.1
                      port_value: 10000

@codecov-commenter
Copy link

codecov-commenter commented Aug 27, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 44.32%. Comparing base (ef31e09) to head (ee49848).
Report is 89 commits behind head on main.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1256      +/-   ##
==========================================
+ Coverage   35.91%   44.32%   +8.41%     
==========================================
  Files          69       75       +6     
  Lines       11576     9823    -1753     
==========================================
+ Hits         4157     4354     +197     
+ Misses       7104     5142    -1962     
- Partials      315      327      +12     

see 90 files with indirect coverage changes

@johnlanni
Copy link
Collaborator

@cr7258 可以用SetSharedData同步一下,要注意用cas机制避免冲突,同时也可以基于SetSharedData机制进行选主,让一个worker做健康检查恢复,不过要注意SharedData中的数据是VM级别的,即使插件配置更新也不会清理。

healthCheckClient = wrapper.NewClusterClient(wrapper.StaticIpCluster{
ServiceName: "local_cluster",
Port: 10000,
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个应该要配置吗吧?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的,现在是需要配置一个 cluster, 指向 127.0.0.1。不知道有没有更好的方式处理?可以让用户不需要额外配置这个 cluster。

- name: outbound|10000||local_cluster.static
  connect_timeout: 0.25s
  type: STATIC
  load_assignment:
    cluster_name: outbound|10000||local_cluster.static
    endpoints:
      - lb_endpoints:
          - endpoint:
              address:
                socket_address:
                  address: 127.0.0.1
                  port_value: 10000

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

直接用 RouteCluster 就好了,就是当前路由到的服务,然后去请求这个服务来校验健康,直接用 Authorization 头就行了,不用设置这个 ApiToken-Health-Check 头

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

健康检测任务是在 parseConfig 阶段设置的,我试了下应该是拿不到当前请求的 cluster 的?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是不是在有一次失败之后再触发会好一些?失败的时候把RouteCluster和失败的apikey组合在一起加入到检查队列里。

@cr7258
Copy link
Collaborator Author

cr7258 commented Aug 31, 2024

@johnlanni 我修改了代码,使用 SetSharedData 在多个 VM 之间同步 apiToken 的信息,并且也使用 SetSharedData 进行选主了。

不过要注意SharedData中的数据是VM级别的,即使插件配置更新也不会清理。

这个地方提到的注意点,我需要做那些处理?

healthCheckClient = wrapper.NewClusterClient(wrapper.StaticIpCluster{
ServiceName: "local_cluster",
Port: 10000,
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

直接用 RouteCluster 就好了,就是当前路由到的服务,然后去请求这个服务来校验健康,直接用 Authorization 头就行了,不用设置这个 ApiToken-Health-Check 头

}

func generateVMID() string {
return fmt.Sprintf("%016x", time.Now().Nanosecond())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vm id 可以通过 getProperty 直接拿到,key 是 "plugin_vm_id"

参考:https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/advanced/attributes.html#wasm-attributes

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我试了下不知道为啥拿不到 plugin_vm_id。。。 其他变量是可以拿到的。。。🫠

vmIDByte, err := proxywasm.GetProperty([]string{"plugin_vm_id"})
fmt.Println("vmID: ", string(vmIDByte))
vmPlugin, _ := proxywasm.GetProperty([]string{"plugin_name"})
fmt.Println("plugin name: ", string(vmPlugin))
image

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

哦 是我搞错了 这个vm_id是配置里的,不是标识一个唯一的vm,现在配置的是空字符串

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

那是不是生成一个 uuid 好一些?

if c.failover != nil && c.failover.enabled {
wrapper.RegisteTickFunc(c.failover.healthCheckTimeout, func() {
// Only the Wasm VM that successfully acquires the lease will perform health check
if tryAcquireOrRenewLease(vmID, log) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

需要加个 else 逻辑,没有选到主的,需要定时(健康检查的间隔)从 shared data 中获取全局token,来更新当前自己本地 thread local 的全局token。
这样避免每次请求来都去请求 shared data,因为envoy底层实现这个get/set shared data操作都要加锁,有额外开销

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没有选到主的 Wasm VM 是不会去做健康检测的,只有选到主的 VM 才会去获取全局的 unavailableTokens 进行健康检测。所以这里好像不需要加上 else 的逻辑?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

但是不止有健康检查的时候要去请求shared data,当次请求失败,需要增加fail count的时候也要去访问

})

vmID := generateVMID()
err := c.initApiTokens()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

因为 shared data 中的内容是跟随插件 vm 的生命周期的,只有插件关闭/版本升级等情况内容才会被清理。所以这里初始化的时候,要重置所有 shared data 中相关的数据,availiabe和unavailiable的都要重置。

log.Errorf("Failed to get unavailable apiToken: %v", err)
return
}
c.addApiToken(ctxUnavailableApiTokens, apiToken, unavailableTokens, unavailableCas, log)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle unavailable api token 会在每个worker线程里都掉用,所以这里 add 的时候不能简单覆盖,要考虑冲突的情况,应该先 get 出来,再在基础上加上对应的 token,再去 set,这个过程中用 cas 来识别冲突,如果冲突进行重试。可以最多重试例如10次。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里我是有先 get 出来,再 append 进行添加的。

unavailableTokens, unavailableCas, err := getApiTokens(ctxUnavailableApiTokens)
if err != nil {
log.Errorf("Failed to get unavailable apiToken: %v", err)
return
}
c.addApiToken(ctxUnavailableApiTokens, apiToken, unavailableTokens, unavailableCas, log)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我补充一下 cas 重试的逻辑。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已添加 cas 重试逻辑。

@johnlanni
Copy link
Collaborator

@johnlanni 我修改了代码,使用 SetSharedData 在多个 VM 之间同步 apiToken 的信息,并且也使用 SetSharedData 进行选主了。

不过要注意SharedData中的数据是VM级别的,即使插件配置更新也不会清理。

这个地方提到的注意点,我需要做那些处理?

大的问题没有,上面提到一些跟机制相关的细节处理,辛苦再调整下

@CH3CHO
Copy link
Collaborator

CH3CHO commented Sep 4, 2024

README.md 应该也要更新一下

return
}

failureCount[apiToken]++
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个计数是连续失败次数还是总失败次数呢?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是该 apiToken 总的失败次数,如果达到失败阈值会被加入 unavailableTokens 后续做健康检测,同时从 apiTokens 中移除后续请求不再使用该 token,并且失败计数重置为 0。

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是否统计连续失败更合理呢?比如阈值是3的时候,前天请求10次失败1次,昨天请求20次失败一次,今天又10次失败一次,这样拉出是否不太合理诶?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AI apitoken failover 机制设计
4 participants