This commit is contained in:
2024-02-25 08:30:34 +08:00
commit 4947f39e74
273 changed files with 45396 additions and 0 deletions

210
pkg/cluster/controller.go Normal file
View File

@ -0,0 +1,210 @@
package cluster
import (
"bytes"
"encoding/gob"
"fmt"
"net/url"
"sync"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/common"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
"github.com/cloudreve/Cloudreve/v3/pkg/request"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/jinzhu/gorm"
)
var DefaultController Controller
// Controller controls communications between master and slave
type Controller interface {
// Handle heartbeat sent from master
HandleHeartBeat(*serializer.NodePingReq) (serializer.NodePingResp, error)
// Get Aria2 Instance by master node ID
GetAria2Instance(string) (common.Aria2, error)
// Send event change message to master node
SendNotification(string, string, mq.Message) error
// Submit async task into task pool
SubmitTask(string, interface{}, string, func(interface{})) error
// Get master node info
GetMasterInfo(string) (*MasterInfo, error)
// Get master Oauth based policy credential
GetPolicyOauthToken(string, uint) (string, error)
}
type slaveController struct {
masters map[string]MasterInfo
lock sync.RWMutex
}
// info of master node
type MasterInfo struct {
ID string
TTL int
URL *url.URL
// used to invoke aria2 rpc calls
Instance Node
Client request.Client
jobTracker map[string]bool
}
func InitController() {
DefaultController = &slaveController{
masters: make(map[string]MasterInfo),
}
gob.Register(rpc.StatusInfo{})
}
func (c *slaveController) HandleHeartBeat(req *serializer.NodePingReq) (serializer.NodePingResp, error) {
c.lock.Lock()
defer c.lock.Unlock()
req.Node.AfterFind()
// close old node if exist
origin, ok := c.masters[req.SiteID]
if (ok && req.IsUpdate) || !ok {
if ok {
origin.Instance.Kill()
}
masterUrl, err := url.Parse(req.SiteURL)
if err != nil {
return serializer.NodePingResp{}, err
}
c.masters[req.SiteID] = MasterInfo{
ID: req.SiteID,
URL: masterUrl,
TTL: req.CredentialTTL,
Client: request.NewClient(
request.WithEndpoint(masterUrl.String()),
request.WithSlaveMeta(fmt.Sprintf("%d", req.Node.ID)),
request.WithCredential(auth.HMACAuth{
SecretKey: []byte(req.Node.MasterKey),
}, int64(req.CredentialTTL)),
),
jobTracker: make(map[string]bool),
Instance: NewNodeFromDBModel(&model.Node{
Model: gorm.Model{ID: req.Node.ID},
MasterKey: req.Node.MasterKey,
Type: model.MasterNodeType,
Aria2Enabled: req.Node.Aria2Enabled,
Aria2OptionsSerialized: req.Node.Aria2OptionsSerialized,
}),
}
}
return serializer.NodePingResp{}, nil
}
func (c *slaveController) GetAria2Instance(id string) (common.Aria2, error) {
c.lock.RLock()
defer c.lock.RUnlock()
if node, ok := c.masters[id]; ok {
return node.Instance.GetAria2Instance(), nil
}
return nil, ErrMasterNotFound
}
func (c *slaveController) SendNotification(id, subject string, msg mq.Message) error {
c.lock.RLock()
if node, ok := c.masters[id]; ok {
c.lock.RUnlock()
body := bytes.Buffer{}
enc := gob.NewEncoder(&body)
if err := enc.Encode(&msg); err != nil {
return err
}
res, err := node.Client.Request(
"PUT",
fmt.Sprintf("/api/v3/slave/notification/%s", subject),
&body,
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return err
}
if res.Code != 0 {
return serializer.NewErrorFromResponse(res)
}
return nil
}
c.lock.RUnlock()
return ErrMasterNotFound
}
// SubmitTask 提交异步任务
func (c *slaveController) SubmitTask(id string, job interface{}, hash string, submitter func(interface{})) error {
c.lock.RLock()
defer c.lock.RUnlock()
if node, ok := c.masters[id]; ok {
if _, ok := node.jobTracker[hash]; ok {
// 任务已存在,直接返回
return nil
}
node.jobTracker[hash] = true
submitter(job)
return nil
}
return ErrMasterNotFound
}
// GetMasterInfo 获取主机节点信息
func (c *slaveController) GetMasterInfo(id string) (*MasterInfo, error) {
c.lock.RLock()
defer c.lock.RUnlock()
if node, ok := c.masters[id]; ok {
return &node, nil
}
return nil, ErrMasterNotFound
}
// GetPolicyOauthToken 获取主机存储策略 Oauth 凭证
func (c *slaveController) GetPolicyOauthToken(id string, policyID uint) (string, error) {
c.lock.RLock()
if node, ok := c.masters[id]; ok {
c.lock.RUnlock()
res, err := node.Client.Request(
"GET",
fmt.Sprintf("/api/v3/slave/credential/%d", policyID),
nil,
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return "", err
}
if res.Code != 0 {
return "", serializer.NewErrorFromResponse(res)
}
return res.Data.(string), nil
}
c.lock.RUnlock()
return "", ErrMasterNotFound
}

12
pkg/cluster/errors.go Normal file
View File

@ -0,0 +1,12 @@
package cluster
import (
"errors"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
)
var (
ErrFeatureNotExist = errors.New("No nodes in nodepool match the feature specificed")
ErrIlegalPath = errors.New("path out of boundary of setting temp folder")
ErrMasterNotFound = serializer.NewError(serializer.CodeMasterNotFound, "Unknown master node id", nil)
)

272
pkg/cluster/master.go Normal file
View File

@ -0,0 +1,272 @@
package cluster
import (
"context"
"encoding/json"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/common"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
"github.com/gofrs/uuid"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
)
const (
deleteTempFileDuration = 60 * time.Second
statusRetryDuration = 10 * time.Second
)
type MasterNode struct {
Model *model.Node
aria2RPC rpcService
lock sync.RWMutex
}
// RPCService 通过RPC服务的Aria2任务管理器
type rpcService struct {
Caller rpc.Client
Initialized bool
retryDuration time.Duration
deletePaddingDuration time.Duration
parent *MasterNode
options *clientOptions
}
type clientOptions struct {
Options map[string]interface{} // 创建下载时额外添加的设置
}
// Init 初始化节点
func (node *MasterNode) Init(nodeModel *model.Node) {
node.lock.Lock()
node.Model = nodeModel
node.aria2RPC.parent = node
node.aria2RPC.retryDuration = statusRetryDuration
node.aria2RPC.deletePaddingDuration = deleteTempFileDuration
node.lock.Unlock()
node.lock.RLock()
if node.Model.Aria2Enabled {
node.lock.RUnlock()
node.aria2RPC.Init()
return
}
node.lock.RUnlock()
}
func (node *MasterNode) ID() uint {
node.lock.RLock()
defer node.lock.RUnlock()
return node.Model.ID
}
func (node *MasterNode) Ping(req *serializer.NodePingReq) (*serializer.NodePingResp, error) {
return &serializer.NodePingResp{}, nil
}
// IsFeatureEnabled 查询节点的某项功能是否启用
func (node *MasterNode) IsFeatureEnabled(feature string) bool {
node.lock.RLock()
defer node.lock.RUnlock()
switch feature {
case "aria2":
return node.Model.Aria2Enabled
default:
return false
}
}
func (node *MasterNode) MasterAuthInstance() auth.Auth {
node.lock.RLock()
defer node.lock.RUnlock()
return auth.HMACAuth{SecretKey: []byte(node.Model.MasterKey)}
}
func (node *MasterNode) SlaveAuthInstance() auth.Auth {
node.lock.RLock()
defer node.lock.RUnlock()
return auth.HMACAuth{SecretKey: []byte(node.Model.SlaveKey)}
}
// SubscribeStatusChange 订阅节点状态更改
func (node *MasterNode) SubscribeStatusChange(callback func(isActive bool, id uint)) {
}
// IsActive 返回节点是否在线
func (node *MasterNode) IsActive() bool {
return true
}
// Kill 结束aria2请求
func (node *MasterNode) Kill() {
if node.aria2RPC.Caller != nil {
node.aria2RPC.Caller.Close()
}
}
// GetAria2Instance 获取主机Aria2实例
func (node *MasterNode) GetAria2Instance() common.Aria2 {
node.lock.RLock()
if !node.Model.Aria2Enabled {
node.lock.RUnlock()
return &common.DummyAria2{}
}
if !node.aria2RPC.Initialized {
node.lock.RUnlock()
node.aria2RPC.Init()
return &common.DummyAria2{}
}
defer node.lock.RUnlock()
return &node.aria2RPC
}
func (node *MasterNode) IsMater() bool {
return true
}
func (node *MasterNode) DBModel() *model.Node {
node.lock.RLock()
defer node.lock.RUnlock()
return node.Model
}
func (r *rpcService) Init() error {
r.parent.lock.Lock()
defer r.parent.lock.Unlock()
r.Initialized = false
// 客户端已存在,则关闭先前连接
if r.Caller != nil {
r.Caller.Close()
}
// 解析RPC服务地址
server, err := url.Parse(r.parent.Model.Aria2OptionsSerialized.Server)
if err != nil {
util.Log().Warning("Failed to parse Aria2 RPC server URL: %s", err)
return err
}
server.Path = "/jsonrpc"
// 加载自定义下载配置
var globalOptions map[string]interface{}
if r.parent.Model.Aria2OptionsSerialized.Options != "" {
err = json.Unmarshal([]byte(r.parent.Model.Aria2OptionsSerialized.Options), &globalOptions)
if err != nil {
util.Log().Warning("Failed to parse aria2 options: %s", err)
return err
}
}
r.options = &clientOptions{
Options: globalOptions,
}
timeout := r.parent.Model.Aria2OptionsSerialized.Timeout
caller, err := rpc.New(context.Background(), server.String(), r.parent.Model.Aria2OptionsSerialized.Token, time.Duration(timeout)*time.Second, mq.GlobalMQ)
r.Caller = caller
r.Initialized = err == nil
return err
}
func (r *rpcService) CreateTask(task *model.Download, groupOptions map[string]interface{}) (string, error) {
r.parent.lock.RLock()
// 生成存储路径
guid, _ := uuid.NewV4()
path := filepath.Join(
r.parent.Model.Aria2OptionsSerialized.TempPath,
"aria2",
guid.String(),
)
r.parent.lock.RUnlock()
// 创建下载任务
options := map[string]interface{}{
"dir": path,
}
for k, v := range r.options.Options {
options[k] = v
}
for k, v := range groupOptions {
options[k] = v
}
gid, err := r.Caller.AddURI(task.Source, options)
if err != nil || gid == "" {
return "", err
}
return gid, nil
}
func (r *rpcService) Status(task *model.Download) (rpc.StatusInfo, error) {
res, err := r.Caller.TellStatus(task.GID)
if err != nil {
// 失败后重试
util.Log().Debug("Failed to get download task status, please retry later: %s", err)
time.Sleep(r.retryDuration)
res, err = r.Caller.TellStatus(task.GID)
}
return res, err
}
func (r *rpcService) Cancel(task *model.Download) error {
// 取消下载任务
_, err := r.Caller.Remove(task.GID)
if err != nil {
util.Log().Warning("Failed to cancel task %q: %s", task.GID, err)
}
return err
}
func (r *rpcService) Select(task *model.Download, files []int) error {
var selected = make([]string, len(files))
for i := 0; i < len(files); i++ {
selected[i] = strconv.Itoa(files[i])
}
_, err := r.Caller.ChangeOption(task.GID, map[string]interface{}{"select-file": strings.Join(selected, ",")})
return err
}
func (r *rpcService) GetConfig() model.Aria2Option {
r.parent.lock.RLock()
defer r.parent.lock.RUnlock()
return r.parent.Model.Aria2OptionsSerialized
}
func (s *rpcService) DeleteTempFile(task *model.Download) error {
s.parent.lock.RLock()
defer s.parent.lock.RUnlock()
// 避免被aria2占用异步执行删除
go func(d time.Duration, src string) {
time.Sleep(d)
err := os.RemoveAll(src)
if err != nil {
util.Log().Warning("Failed to delete temp download folder: %q: %s", src, err)
}
}(s.deletePaddingDuration, task.Parent)
return nil
}

60
pkg/cluster/node.go Normal file
View File

@ -0,0 +1,60 @@
package cluster
import (
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/common"
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
)
type Node interface {
// Init a node from database model
Init(node *model.Node)
// Check if given feature is enabled
IsFeatureEnabled(feature string) bool
// Subscribe node status change to a callback function
SubscribeStatusChange(callback func(isActive bool, id uint))
// Ping the node
Ping(req *serializer.NodePingReq) (*serializer.NodePingResp, error)
// Returns if the node is active
IsActive() bool
// Get instances for aria2 calls
GetAria2Instance() common.Aria2
// Returns unique id of this node
ID() uint
// Kill node and recycle resources
Kill()
// Returns if current node is master node
IsMater() bool
// Get auth instance used to check RPC call from slave to master
MasterAuthInstance() auth.Auth
// Get auth instance used to check RPC call from master to slave
SlaveAuthInstance() auth.Auth
// Get node DB model
DBModel() *model.Node
}
// Create new node from DB model
func NewNodeFromDBModel(node *model.Node) Node {
switch node.Type {
case model.SlaveNodeType:
slave := &SlaveNode{}
slave.Init(node)
return slave
default:
master := &MasterNode{}
master.Init(node)
return master
}
}

213
pkg/cluster/pool.go Normal file
View File

@ -0,0 +1,213 @@
package cluster
import (
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/balancer"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
"github.com/samber/lo"
"sync"
)
var Default *NodePool
// 需要分类的节点组
var featureGroup = []string{"aria2"}
// Pool 节点池
type Pool interface {
// Returns active node selected by given feature and load balancer
BalanceNodeByFeature(feature string, lb balancer.Balancer, available []uint, prefer uint) (error, Node)
// Returns node by ID
GetNodeByID(id uint) Node
// Add given node into pool. If node existed, refresh node.
Add(node *model.Node)
// Delete and kill node from pool by given node id
Delete(id uint)
}
// NodePool 通用节点池
type NodePool struct {
active map[uint]Node
inactive map[uint]Node
featureMap map[string][]Node
lock sync.RWMutex
}
// Init 初始化从机节点池
func Init() {
Default = &NodePool{}
Default.Init()
if err := Default.initFromDB(); err != nil {
util.Log().Warning("Failed to initialize node pool: %s", err)
}
}
func (pool *NodePool) Init() {
pool.lock.Lock()
defer pool.lock.Unlock()
pool.featureMap = make(map[string][]Node)
pool.active = make(map[uint]Node)
pool.inactive = make(map[uint]Node)
}
func (pool *NodePool) buildIndexMap() {
pool.lock.Lock()
for _, feature := range featureGroup {
pool.featureMap[feature] = make([]Node, 0)
}
for _, v := range pool.active {
for _, feature := range featureGroup {
if v.IsFeatureEnabled(feature) {
pool.featureMap[feature] = append(pool.featureMap[feature], v)
}
}
}
pool.lock.Unlock()
}
func (pool *NodePool) GetNodeByID(id uint) Node {
pool.lock.RLock()
defer pool.lock.RUnlock()
if node, ok := pool.active[id]; ok {
return node
}
return pool.inactive[id]
}
func (pool *NodePool) nodeStatusChange(isActive bool, id uint) {
util.Log().Debug("Slave node [ID=%d] status changed to [Active=%t].", id, isActive)
var node Node
pool.lock.Lock()
if n, ok := pool.inactive[id]; ok {
node = n
delete(pool.inactive, id)
} else {
node = pool.active[id]
delete(pool.active, id)
}
if isActive {
pool.active[id] = node
} else {
pool.inactive[id] = node
}
pool.lock.Unlock()
pool.buildIndexMap()
}
func (pool *NodePool) initFromDB() error {
nodes, err := model.GetNodesByStatus(model.NodeActive)
if err != nil {
return err
}
pool.lock.Lock()
for i := 0; i < len(nodes); i++ {
pool.add(&nodes[i])
}
pool.lock.Unlock()
pool.buildIndexMap()
return nil
}
func (pool *NodePool) add(node *model.Node) {
newNode := NewNodeFromDBModel(node)
if newNode.IsActive() {
pool.active[node.ID] = newNode
} else {
pool.inactive[node.ID] = newNode
}
// 订阅节点状态变更
newNode.SubscribeStatusChange(func(isActive bool, id uint) {
pool.nodeStatusChange(isActive, id)
})
}
func (pool *NodePool) Add(node *model.Node) {
pool.lock.Lock()
defer pool.buildIndexMap()
defer pool.lock.Unlock()
var (
old Node
ok bool
)
if old, ok = pool.active[node.ID]; !ok {
old, ok = pool.inactive[node.ID]
}
if old != nil {
go old.Init(node)
return
}
pool.add(node)
}
func (pool *NodePool) Delete(id uint) {
pool.lock.Lock()
defer pool.buildIndexMap()
defer pool.lock.Unlock()
if node, ok := pool.active[id]; ok {
node.Kill()
delete(pool.active, id)
return
}
if node, ok := pool.inactive[id]; ok {
node.Kill()
delete(pool.inactive, id)
return
}
}
// BalanceNodeByFeature 根据 feature 和 LoadBalancer 取出节点
func (pool *NodePool) BalanceNodeByFeature(feature string, lb balancer.Balancer,
available []uint, prefer uint) (error, Node) {
pool.lock.RLock()
defer pool.lock.RUnlock()
if nodes, ok := pool.featureMap[feature]; ok {
// Find nodes that are allowed to be used in user group
availableNodes := nodes
if len(available) > 0 {
idHash := make(map[uint]struct{}, len(available))
for _, id := range available {
idHash[id] = struct{}{}
}
availableNodes = lo.Filter[Node](nodes, func(node Node, index int) bool {
_, exist := idHash[node.ID()]
return exist
})
}
// Return preferred node if exists
if preferredNode, found := lo.Find[Node](availableNodes, func(node Node) bool {
return node.ID() == prefer
}); found {
return nil, preferredNode
}
err, res := lb.NextPeer(availableNodes)
if err == nil {
return nil, res.(Node)
}
return err, nil
}
return ErrFeatureNotExist, nil
}

451
pkg/cluster/slave.go Normal file
View File

@ -0,0 +1,451 @@
package cluster
import (
"bytes"
"encoding/json"
"errors"
"fmt"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/common"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/conf"
"github.com/cloudreve/Cloudreve/v3/pkg/request"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
"io"
"net/url"
"strings"
"sync"
"time"
)
type SlaveNode struct {
Model *model.Node
Active bool
caller slaveCaller
callback func(bool, uint)
close chan bool
lock sync.RWMutex
}
type slaveCaller struct {
parent *SlaveNode
Client request.Client
}
// Init 初始化节点
func (node *SlaveNode) Init(nodeModel *model.Node) {
node.lock.Lock()
node.Model = nodeModel
// Init http request client
var endpoint *url.URL
if serverURL, err := url.Parse(node.Model.Server); err == nil {
var controller *url.URL
controller, _ = url.Parse("/api/v3/slave/")
endpoint = serverURL.ResolveReference(controller)
}
signTTL := model.GetIntSetting("slave_api_timeout", 60)
node.caller.Client = request.NewClient(
request.WithMasterMeta(),
request.WithTimeout(time.Duration(signTTL)*time.Second),
request.WithCredential(auth.HMACAuth{SecretKey: []byte(nodeModel.SlaveKey)}, int64(signTTL)),
request.WithEndpoint(endpoint.String()),
)
node.caller.parent = node
if node.close != nil {
node.lock.Unlock()
node.close <- true
go node.StartPingLoop()
} else {
node.Active = true
node.lock.Unlock()
go node.StartPingLoop()
}
}
// IsFeatureEnabled 查询节点的某项功能是否启用
func (node *SlaveNode) IsFeatureEnabled(feature string) bool {
node.lock.RLock()
defer node.lock.RUnlock()
switch feature {
case "aria2":
return node.Model.Aria2Enabled
default:
return false
}
}
// SubscribeStatusChange 订阅节点状态更改
func (node *SlaveNode) SubscribeStatusChange(callback func(bool, uint)) {
node.lock.Lock()
node.callback = callback
node.lock.Unlock()
}
// Ping 从机节点,返回从机负载
func (node *SlaveNode) Ping(req *serializer.NodePingReq) (*serializer.NodePingResp, error) {
node.lock.RLock()
defer node.lock.RUnlock()
reqBodyEncoded, err := json.Marshal(req)
if err != nil {
return nil, err
}
bodyReader := strings.NewReader(string(reqBodyEncoded))
resp, err := node.caller.Client.Request(
"POST",
"heartbeat",
bodyReader,
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return nil, err
}
// 处理列取结果
if resp.Code != 0 {
return nil, serializer.NewErrorFromResponse(resp)
}
var res serializer.NodePingResp
if resStr, ok := resp.Data.(string); ok {
err = json.Unmarshal([]byte(resStr), &res)
if err != nil {
return nil, err
}
}
return &res, nil
}
// IsActive 返回节点是否在线
func (node *SlaveNode) IsActive() bool {
node.lock.RLock()
defer node.lock.RUnlock()
return node.Active
}
// Kill 结束节点内相关循环
func (node *SlaveNode) Kill() {
node.lock.RLock()
defer node.lock.RUnlock()
if node.close != nil {
close(node.close)
}
}
// GetAria2Instance 获取从机Aria2实例
func (node *SlaveNode) GetAria2Instance() common.Aria2 {
node.lock.RLock()
defer node.lock.RUnlock()
if !node.Model.Aria2Enabled {
return &common.DummyAria2{}
}
return &node.caller
}
func (node *SlaveNode) ID() uint {
node.lock.RLock()
defer node.lock.RUnlock()
return node.Model.ID
}
func (node *SlaveNode) StartPingLoop() {
node.lock.Lock()
node.close = make(chan bool)
node.lock.Unlock()
tickDuration := time.Duration(model.GetIntSetting("slave_ping_interval", 300)) * time.Second
recoverDuration := time.Duration(model.GetIntSetting("slave_recover_interval", 600)) * time.Second
pingTicker := time.Duration(0)
util.Log().Debug("Slave node %q heartbeat loop started.", node.Model.Name)
retry := 0
recoverMode := false
isFirstLoop := true
loop:
for {
select {
case <-time.After(pingTicker):
if pingTicker == 0 {
pingTicker = tickDuration
}
util.Log().Debug("Slave node %q send ping.", node.Model.Name)
res, err := node.Ping(node.getHeartbeatContent(isFirstLoop))
isFirstLoop = false
if err != nil {
util.Log().Debug("Error while ping slave node %q: %s", node.Model.Name, err)
retry++
if retry >= model.GetIntSetting("slave_node_retry", 3) {
util.Log().Debug("Retry threshold for pinging slave node %q exceeded, mark it as offline.", node.Model.Name)
node.changeStatus(false)
if !recoverMode {
// 启动恢复监控循环
util.Log().Debug("Slave node %q entered recovery mode.", node.Model.Name)
pingTicker = recoverDuration
recoverMode = true
}
}
} else {
if recoverMode {
util.Log().Debug("Slave node %q recovered.", node.Model.Name)
pingTicker = tickDuration
recoverMode = false
isFirstLoop = true
}
util.Log().Debug("Status of slave node %q: %s", node.Model.Name, res)
node.changeStatus(true)
retry = 0
}
case <-node.close:
util.Log().Debug("Slave node %q received shutdown signal.", node.Model.Name)
break loop
}
}
}
func (node *SlaveNode) IsMater() bool {
return false
}
func (node *SlaveNode) MasterAuthInstance() auth.Auth {
node.lock.RLock()
defer node.lock.RUnlock()
return auth.HMACAuth{SecretKey: []byte(node.Model.MasterKey)}
}
func (node *SlaveNode) SlaveAuthInstance() auth.Auth {
node.lock.RLock()
defer node.lock.RUnlock()
return auth.HMACAuth{SecretKey: []byte(node.Model.SlaveKey)}
}
func (node *SlaveNode) DBModel() *model.Node {
node.lock.RLock()
defer node.lock.RUnlock()
return node.Model
}
// getHeartbeatContent gets serializer.NodePingReq used to send heartbeat to slave
func (node *SlaveNode) getHeartbeatContent(isUpdate bool) *serializer.NodePingReq {
return &serializer.NodePingReq{
SiteURL: model.GetSiteURL().String(),
IsUpdate: isUpdate,
SiteID: model.GetSettingByName("siteID"),
Node: node.Model,
CredentialTTL: model.GetIntSetting("slave_api_timeout", 60),
}
}
func (node *SlaveNode) changeStatus(isActive bool) {
node.lock.RLock()
id := node.Model.ID
if isActive != node.Active {
node.lock.RUnlock()
node.lock.Lock()
node.Active = isActive
node.lock.Unlock()
node.callback(isActive, id)
} else {
node.lock.RUnlock()
}
}
func (s *slaveCaller) Init() error {
return nil
}
// SendAria2Call send remote aria2 call to slave node
func (s *slaveCaller) SendAria2Call(body *serializer.SlaveAria2Call, scope string) (*serializer.Response, error) {
reqReader, err := getAria2RequestBody(body)
if err != nil {
return nil, err
}
return s.Client.Request(
"POST",
"aria2/"+scope,
reqReader,
).CheckHTTPResponse(200).DecodeResponse()
}
func (s *slaveCaller) CreateTask(task *model.Download, options map[string]interface{}) (string, error) {
s.parent.lock.RLock()
defer s.parent.lock.RUnlock()
req := &serializer.SlaveAria2Call{
Task: task,
GroupOptions: options,
}
res, err := s.SendAria2Call(req, "task")
if err != nil {
return "", err
}
if res.Code != 0 {
return "", serializer.NewErrorFromResponse(res)
}
return res.Data.(string), err
}
func (s *slaveCaller) Status(task *model.Download) (rpc.StatusInfo, error) {
s.parent.lock.RLock()
defer s.parent.lock.RUnlock()
req := &serializer.SlaveAria2Call{
Task: task,
}
res, err := s.SendAria2Call(req, "status")
if err != nil {
return rpc.StatusInfo{}, err
}
if res.Code != 0 {
return rpc.StatusInfo{}, serializer.NewErrorFromResponse(res)
}
var status rpc.StatusInfo
res.GobDecode(&status)
return status, err
}
func (s *slaveCaller) Cancel(task *model.Download) error {
s.parent.lock.RLock()
defer s.parent.lock.RUnlock()
req := &serializer.SlaveAria2Call{
Task: task,
}
res, err := s.SendAria2Call(req, "cancel")
if err != nil {
return err
}
if res.Code != 0 {
return serializer.NewErrorFromResponse(res)
}
return nil
}
func (s *slaveCaller) Select(task *model.Download, files []int) error {
s.parent.lock.RLock()
defer s.parent.lock.RUnlock()
req := &serializer.SlaveAria2Call{
Task: task,
Files: files,
}
res, err := s.SendAria2Call(req, "select")
if err != nil {
return err
}
if res.Code != 0 {
return serializer.NewErrorFromResponse(res)
}
return nil
}
func (s *slaveCaller) GetConfig() model.Aria2Option {
s.parent.lock.RLock()
defer s.parent.lock.RUnlock()
return s.parent.Model.Aria2OptionsSerialized
}
func (s *slaveCaller) DeleteTempFile(task *model.Download) error {
s.parent.lock.RLock()
defer s.parent.lock.RUnlock()
req := &serializer.SlaveAria2Call{
Task: task,
}
res, err := s.SendAria2Call(req, "delete")
if err != nil {
return err
}
if res.Code != 0 {
return serializer.NewErrorFromResponse(res)
}
return nil
}
func getAria2RequestBody(body *serializer.SlaveAria2Call) (io.Reader, error) {
reqBodyEncoded, err := json.Marshal(body)
if err != nil {
return nil, err
}
return strings.NewReader(string(reqBodyEncoded)), nil
}
// RemoteCallback 发送远程存储策略上传回调请求
func RemoteCallback(url string, body serializer.UploadCallback) error {
callbackBody, err := json.Marshal(struct {
Data serializer.UploadCallback `json:"data"`
}{
Data: body,
})
if err != nil {
return serializer.NewError(serializer.CodeCallbackError, "Failed to encode callback content", err)
}
resp := request.GeneralClient.Request(
"POST",
url,
bytes.NewReader(callbackBody),
request.WithTimeout(time.Duration(conf.SlaveConfig.CallbackTimeout)*time.Second),
request.WithCredential(auth.General, int64(conf.SlaveConfig.SignatureTTL)),
)
if resp.Err != nil {
return serializer.NewError(serializer.CodeCallbackError, "Slave cannot send callback request", resp.Err)
}
// 解析回调服务端响应
response, err := resp.DecodeResponse()
if err != nil {
msg := fmt.Sprintf("Slave cannot parse callback response from master (StatusCode=%d).", resp.Response.StatusCode)
return serializer.NewError(serializer.CodeCallbackError, msg, err)
}
if response.Code != 0 {
return serializer.NewError(response.Code, response.Msg, errors.New(response.Error))
}
return nil
}