Skip to main content
Cloudstic supports multiple data sources through a unified Source interface. You can implement custom sources to back up data from any system that can provide a file tree.

Source Interface

All sources must implement the Source interface defined in pkg/store/interface.go:
type Source interface {
	Walk(ctx context.Context, callback func(core.FileMeta) error) error
	GetFileStream(fileID string) (io.ReadCloser, error)
	Info() core.SourceInfo
	Size(ctx context.Context) (*SourceSize, error)
}

Method Descriptions

Walk

Walk(ctx context.Context, callback func(core.FileMeta) error) error
Enumerates every file and folder in the source. Parent folders MUST be emitted before their children to ensure the HAMT tree structure is built correctly. Parameters:
  • ctx - Context for cancellation
  • callback - Function called for each file/folder, receives FileMeta
Returns:
  • Error if enumeration fails or callback returns an error

GetFileStream

GetFileStream(fileID string) (io.ReadCloser, error)
Returns a readable stream for a file identified by its source-specific fileID. The backup engine calls this for each file that needs to be uploaded. Parameters:
  • fileID - Source-specific unique identifier (from FileMeta.FileID)
Returns:
  • io.ReadCloser containing the file data
  • Error if the file cannot be opened
The caller is responsible for closing the returned reader.

Info

Info() core.SourceInfo
Returns metadata about the source. This is stored in the snapshot and used to:
  • Find the previous snapshot from the same source (for incremental comparison)
  • Group snapshots in retention policies (forget --group-by source,account,path)
Returns:
  • SourceInfo struct with Type, Account, and Path fields

Size

Size(ctx context.Context) (*SourceSize, error)
Returns the total size of the source. This is used for progress reporting during backup. Returns:
  • SourceSize with Bytes and Files counts
  • Error if size cannot be determined (implementation can return approximate values)

Supporting Types

SourceInfo

type SourceInfo struct {
	Type    string // e.g. "gdrive", "local", "sftp", "onedrive", "gdrive-changes"
	Account string // Google email, hostname, user@host, etc.
	Path    string // drive path, filesystem path, etc.
}
Uniquely identifies a source instance. Snapshots from the same source (matching Type, Account, and Path) are considered part of the same backup chain.

FileMeta

type FileMeta struct {
	Version     int                    // Always 1
	FileID      string                 // Source-specific unique ID (HAMT key)
	Name        string                 // Display name
	Type        FileType               // "file" or "folder"
	Parents     []string               // Parent references (source-specific IDs during Walk)
	ContentHash string                 // SHA-256 of file content (if available from source)
	Size        int64                  // File size in bytes
	Mtime       int64                  // Last modified time (Unix timestamp)
	Owner       string                 // Owner identifier
	Extra       map[string]interface{} // Source-specific metadata
}
Important fields:
  • FileID - Must be stable and unique within the source. Used as the HAMT key.
  • Type - Must be "file" or "folder"
  • Parents - List of parent FileIDs. Can be empty for root items.
  • ContentHash - If your source provides checksums (like Google Drive), include them here to avoid re-downloading unchanged files. Leave empty if not available.

SourceSize

type SourceSize struct {
	Bytes int64 `json:"bytes"`
	Files int64 `json:"files"`
}

Example: Simple HTTP Source

Here’s a complete example implementing a source that backs up files from an HTTP server:
package store

import (
	"context"
	"encoding/json"
	"fmt"
	"io"
	"net/http"
	"time"

	"github.com/cloudstic/cli/internal/core"
)

// HTTPSource backs up files from an HTTP API that returns a file listing.
type HTTPSource struct {
	baseURL string
	client  *http.Client
}

func NewHTTPSource(baseURL string) *HTTPSource {
	return &HTTPSource{
		baseURL: baseURL,
		client: &http.Client{
			Timeout: 30 * time.Second,
		},
	}
}

// Walk fetches the file listing from /api/files and emits each entry.
func (s *HTTPSource) Walk(ctx context.Context, callback func(core.FileMeta) error) error {
	resp, err := s.client.Get(s.baseURL + "/api/files")
	if err != nil {
		return fmt.Errorf("fetch file list: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("server returned %d", resp.StatusCode)
	}

	var files []struct {
		ID       string `json:"id"`
		Name     string `json:"name"`
		Size     int64  `json:"size"`
		Modified int64  `json:"modified"` // Unix timestamp
		IsDir    bool   `json:"is_dir"`
		Parent   string `json:"parent"`
	}

	if err := json.NewDecoder(resp.Body).Decode(&files); err != nil {
		return fmt.Errorf("parse file list: %w", err)
	}

	// Emit folders first, then files (ensures parents before children)
	for _, f := range files {
		if !f.IsDir {
			continue
		}
		meta := core.FileMeta{
			Version: 1,
			FileID:  f.ID,
			Name:    f.Name,
			Type:    core.FileTypeFolder,
			Mtime:   f.Modified,
		}
		if f.Parent != "" {
			meta.Parents = []string{f.Parent}
		}
		if err := callback(meta); err != nil {
			return err
		}
	}

	// Emit files
	for _, f := range files {
		if f.IsDir {
			continue
		}
		meta := core.FileMeta{
			Version: 1,
			FileID:  f.ID,
			Name:    f.Name,
			Type:    core.FileTypeFile,
			Size:    f.Size,
			Mtime:   f.Modified,
		}
		if f.Parent != "" {
			meta.Parents = []string{f.Parent}
		}
		if err := callback(meta); err != nil {
			return err
		}
	}

	return nil
}

// GetFileStream downloads the file content from /api/files/{id}.
func (s *HTTPSource) GetFileStream(fileID string) (io.ReadCloser, error) {
	resp, err := s.client.Get(s.baseURL + "/api/files/" + fileID)
	if err != nil {
		return nil, fmt.Errorf("fetch file %s: %w", fileID, err)
	}

	if resp.StatusCode != http.StatusOK {
		resp.Body.Close()
		return nil, fmt.Errorf("server returned %d for file %s", resp.StatusCode, fileID)
	}

	return resp.Body, nil
}

// Info returns source metadata.
func (s *HTTPSource) Info() core.SourceInfo {
	return core.SourceInfo{
		Type:    "http",
		Account: s.baseURL,
		Path:    "/",
	}
}

// Size returns the total size by summing all files.
func (s *HTTPSource) Size(ctx context.Context) (*SourceSize, error) {
	var totalBytes, totalFiles int64

	err := s.Walk(ctx, func(meta core.FileMeta) error {
		if meta.Type == core.FileTypeFile {
			totalFiles++
			totalBytes += meta.Size
		}
		return nil
	})

	if err != nil {
		return nil, err
	}

	return &SourceSize{
		Bytes: totalBytes,
		Files: totalFiles,
	}, nil
}

Using the Custom Source

source := store.NewHTTPSource("https://example.com")
result, err := client.Backup(ctx, source)

Incremental Source Interface

For sources that support delta-based backups (like Google Drive Changes API or OneDrive Delta API), implement the IncrementalSource interface:
type IncrementalSource interface {
	Source
	GetStartPageToken() (string, error)
	WalkChanges(ctx context.Context, token string, callback func(FileChange) error) (newToken string, err error)
}

IncrementalSource Methods

GetStartPageToken

GetStartPageToken() (string, error)
Returns an opaque token representing the current head of the change stream. The engine calls this before the first full Walk to capture the baseline state. Returns:
  • Token string to persist in the snapshot
  • Error if token cannot be retrieved

WalkChanges

WalkChanges(ctx context.Context, token string, callback func(FileChange) error) (newToken string, err error)
Emits only the entries that changed since token. Returns the new token to persist for the next run. Parameters:
  • ctx - Context for cancellation
  • token - Token from the previous snapshot (or from GetStartPageToken)
  • callback - Function called for each change
Returns:
  • newToken - New token to store in the snapshot
  • err - Error if change enumeration fails

FileChange

type FileChange struct {
	Type ChangeType // "upsert" or "delete"
	Meta core.FileMeta
}

type ChangeType string

const (
	ChangeUpsert ChangeType = "upsert" // File or folder created/modified
	ChangeDelete ChangeType = "delete" // File or folder removed
)
For ChangeDelete, only Meta.FileID is required. All other fields can be empty.

Example: Incremental HTTP Source

Extending the previous HTTP source to support incremental backups:
// HTTPChangeSource adds incremental support to HTTPSource.
type HTTPChangeSource struct {
	*HTTPSource
}

func NewHTTPChangeSource(baseURL string) *HTTPChangeSource {
	return &HTTPChangeSource{
		HTTPSource: NewHTTPSource(baseURL),
	}
}

// GetStartPageToken fetches the current change token from the server.
func (s *HTTPChangeSource) GetStartPageToken() (string, error) {
	resp, err := s.client.Get(s.baseURL + "/api/changes/token")
	if err != nil {
		return "", fmt.Errorf("fetch change token: %w", err)
	}
	defer resp.Body.Close()

	var result struct {
		Token string `json:"token"`
	}

	if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
		return "", fmt.Errorf("parse change token: %w", err)
	}

	return result.Token, nil
}

// WalkChanges fetches changes since the given token.
func (s *HTTPChangeSource) WalkChanges(
	ctx context.Context,
	token string,
	callback func(FileChange) error,
) (string, error) {
	resp, err := s.client.Get(s.baseURL + "/api/changes?since=" + token)
	if err != nil {
		return "", fmt.Errorf("fetch changes: %w", err)
	}
	defer resp.Body.Close()

	var result struct {
		Changes []struct {
			Type     string `json:"type"` // "upsert" or "delete"
			ID       string `json:"id"`
			Name     string `json:"name"`
			Size     int64  `json:"size"`
			Modified int64  `json:"modified"`
			IsDir    bool   `json:"is_dir"`
			Parent   string `json:"parent"`
		} `json:"changes"`
		NewToken string `json:"new_token"`
	}

	if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
		return "", fmt.Errorf("parse changes: %w", err)
	}

	// Emit folder changes first (upserts only, deletes can be in any order)
	for _, c := range result.Changes {
		if c.Type != "upsert" || !c.IsDir {
			continue
		}
		change := FileChange{
			Type: ChangeUpsert,
			Meta: core.FileMeta{
				Version: 1,
				FileID:  c.ID,
				Name:    c.Name,
				Type:    core.FileTypeFolder,
				Mtime:   c.Modified,
			},
		}
		if c.Parent != "" {
			change.Meta.Parents = []string{c.Parent}
		}
		if err := callback(change); err != nil {
			return "", err
		}
	}

	// Emit file changes and deletions
	for _, c := range result.Changes {
		if c.Type == "upsert" && c.IsDir {
			continue // Already emitted above
		}

		var change FileChange
		if c.Type == "delete" {
			change = FileChange{
				Type: ChangeDelete,
				Meta: core.FileMeta{
					FileID: c.ID,
				},
			}
		} else {
			change = FileChange{
				Type: ChangeUpsert,
				Meta: core.FileMeta{
					Version: 1,
					FileID:  c.ID,
					Name:    c.Name,
					Type:    core.FileTypeFile,
					Size:    c.Size,
					Mtime:   c.Modified,
				},
			}
			if c.Parent != "" {
				change.Meta.Parents = []string{c.Parent}
			}
		}

		if err := callback(change); err != nil {
			return "", err
		}
	}

	return result.NewToken, nil
}

How the Engine Uses Incremental Sources

  1. First backup - Engine calls GetStartPageToken(), then Walk() for full scan, stores token in snapshot
  2. Subsequent backups - Engine calls WalkChanges(token) with the stored token, processes only the delta
  3. Token updated - New token from WalkChanges is stored in the snapshot for the next run
If WalkChanges fails (e.g., token expired), the engine automatically falls back to a full Walk.

Implementation Guidelines

Parent-Child Ordering

The most critical requirement is that parents must be emitted before children. This ensures the HAMT tree structure is built correctly.
// BAD: Children before parents
callback(core.FileMeta{FileID: "file.txt", Parents: []string{"folder1"}})
callback(core.FileMeta{FileID: "folder1", Type: core.FileTypeFolder})

// GOOD: Parents before children
callback(core.FileMeta{FileID: "folder1", Type: core.FileTypeFolder})
callback(core.FileMeta{FileID: "file.txt", Parents: []string{"folder1"}})
For sources with complex hierarchies, use topological sorting to ensure correct ordering.

FileID Stability

FileID must be stable across backups. If a file’s FileID changes, the engine treats it as a deletion + addition rather than a modification.
// GOOD: Use stable IDs from the source API
FileID: googleDriveFileID // Stable across renames

// BAD: Use paths that change when files move
FileID: "/home/user/documents/report.pdf" // Changes if moved

ContentHash Optimization

If your source provides checksums (like Google Drive’s SHA-256 or OneDrive’s QuickXorHash), include them in ContentHash:
meta.ContentHash = file.SHA256Checksum // From source API
This allows the engine to skip downloading unchanged files during incremental backups.

Error Handling

Return descriptive errors that help users diagnose issues:
if resp.StatusCode == 401 {
	return fmt.Errorf("authentication failed: check your API credentials")
}
if resp.StatusCode == 429 {
	return fmt.Errorf("rate limit exceeded: try again later")
}
return fmt.Errorf("fetch file list: HTTP %d: %s", resp.StatusCode, resp.Status)

Context Cancellation

Respect the context.Context passed to Walk, WalkChanges, and Size:
func (s *MySource) Walk(ctx context.Context, callback func(core.FileMeta) error) error {
	for _, file := range files {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}

		if err := callback(convertFileMeta(file)); err != nil {
			return err
		}
	}
	return nil
}

Testing Your Source

Create a simple test to verify your source implementation:
func TestHTTPSource(t *testing.T) {
	source := NewHTTPSource("https://example.com")

	// Test Info
	info := source.Info()
	if info.Type != "http" {
		t.Errorf("expected type 'http', got %q", info.Type)
	}

	// Test Walk
	ctx := context.Background()
	var files []core.FileMeta
	err := source.Walk(ctx, func(meta core.FileMeta) error {
		files = append(files, meta)
		return nil
	})
	if err != nil {
		t.Fatal(err)
	}

	if len(files) == 0 {
		t.Fatal("expected files, got none")
	}

	// Test GetFileStream
	for _, f := range files {
		if f.Type != core.FileTypeFile {
			continue
		}
		stream, err := source.GetFileStream(f.FileID)
		if err != nil {
			t.Errorf("GetFileStream(%s): %v", f.FileID, err)
			continue
		}
		stream.Close()
	}
}

Registering Your Source

To use your custom source with the CLI, add it to cmd/cloudstic/main.go in the initSource function:
func initSource(sourceType string, flags *flag.FlagSet) (store.Source, error) {
	switch sourceType {
	case "local":
		// ... existing code ...
	case "http":
		var baseURL string
		flags.StringVar(&baseURL, "url", "", "HTTP server base URL")
		flags.Parse(os.Args[3:])
		if baseURL == "" {
			return nil, fmt.Errorf("-url is required")
		}
		return store.NewHTTPSource(baseURL), nil
	default:
		return nil, fmt.Errorf("unknown source type: %s", sourceType)
	}
}
Then use it:
cloudstic backup -source http -url https://example.com