Cloudstic supports multiple data sources through a unified Source interface. You can implement custom sources to back up data from any system that can provide a file tree.
Source Interface
All sources must implement the Source interface defined in pkg/store/interface.go:
type Source interface {
Walk(ctx context.Context, callback func(core.FileMeta) error) error
GetFileStream(fileID string) (io.ReadCloser, error)
Info() core.SourceInfo
Size(ctx context.Context) (*SourceSize, error)
}
Method Descriptions
Walk
Walk(ctx context.Context, callback func(core.FileMeta) error) error
Enumerates every file and folder in the source. Parent folders MUST be emitted before their children to ensure the HAMT tree structure is built correctly.
Parameters:
ctx - Context for cancellation
callback - Function called for each file/folder, receives FileMeta
Returns:
- Error if enumeration fails or callback returns an error
GetFileStream
GetFileStream(fileID string) (io.ReadCloser, error)
Returns a readable stream for a file identified by its source-specific fileID. The backup engine calls this for each file that needs to be uploaded.
Parameters:
fileID - Source-specific unique identifier (from FileMeta.FileID)
Returns:
io.ReadCloser containing the file data
- Error if the file cannot be opened
The caller is responsible for closing the returned reader.
Info
Returns metadata about the source. This is stored in the snapshot and used to:
- Find the previous snapshot from the same source (for incremental comparison)
- Group snapshots in retention policies (
forget --group-by source,account,path)
Returns:
SourceInfo struct with Type, Account, and Path fields
Size
Size(ctx context.Context) (*SourceSize, error)
Returns the total size of the source. This is used for progress reporting during backup.
Returns:
SourceSize with Bytes and Files counts
- Error if size cannot be determined (implementation can return approximate values)
Supporting Types
SourceInfo
type SourceInfo struct {
Type string // e.g. "gdrive", "local", "sftp", "onedrive", "gdrive-changes"
Account string // Google email, hostname, user@host, etc.
Path string // drive path, filesystem path, etc.
}
Uniquely identifies a source instance. Snapshots from the same source (matching Type, Account, and Path) are considered part of the same backup chain.
type FileMeta struct {
Version int // Always 1
FileID string // Source-specific unique ID (HAMT key)
Name string // Display name
Type FileType // "file" or "folder"
Parents []string // Parent references (source-specific IDs during Walk)
ContentHash string // SHA-256 of file content (if available from source)
Size int64 // File size in bytes
Mtime int64 // Last modified time (Unix timestamp)
Owner string // Owner identifier
Extra map[string]interface{} // Source-specific metadata
}
Important fields:
FileID - Must be stable and unique within the source. Used as the HAMT key.
Type - Must be "file" or "folder"
Parents - List of parent FileIDs. Can be empty for root items.
ContentHash - If your source provides checksums (like Google Drive), include them here to avoid re-downloading unchanged files. Leave empty if not available.
SourceSize
type SourceSize struct {
Bytes int64 `json:"bytes"`
Files int64 `json:"files"`
}
Example: Simple HTTP Source
Here’s a complete example implementing a source that backs up files from an HTTP server:
package store
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"github.com/cloudstic/cli/internal/core"
)
// HTTPSource backs up files from an HTTP API that returns a file listing.
type HTTPSource struct {
baseURL string
client *http.Client
}
func NewHTTPSource(baseURL string) *HTTPSource {
return &HTTPSource{
baseURL: baseURL,
client: &http.Client{
Timeout: 30 * time.Second,
},
}
}
// Walk fetches the file listing from /api/files and emits each entry.
func (s *HTTPSource) Walk(ctx context.Context, callback func(core.FileMeta) error) error {
resp, err := s.client.Get(s.baseURL + "/api/files")
if err != nil {
return fmt.Errorf("fetch file list: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("server returned %d", resp.StatusCode)
}
var files []struct {
ID string `json:"id"`
Name string `json:"name"`
Size int64 `json:"size"`
Modified int64 `json:"modified"` // Unix timestamp
IsDir bool `json:"is_dir"`
Parent string `json:"parent"`
}
if err := json.NewDecoder(resp.Body).Decode(&files); err != nil {
return fmt.Errorf("parse file list: %w", err)
}
// Emit folders first, then files (ensures parents before children)
for _, f := range files {
if !f.IsDir {
continue
}
meta := core.FileMeta{
Version: 1,
FileID: f.ID,
Name: f.Name,
Type: core.FileTypeFolder,
Mtime: f.Modified,
}
if f.Parent != "" {
meta.Parents = []string{f.Parent}
}
if err := callback(meta); err != nil {
return err
}
}
// Emit files
for _, f := range files {
if f.IsDir {
continue
}
meta := core.FileMeta{
Version: 1,
FileID: f.ID,
Name: f.Name,
Type: core.FileTypeFile,
Size: f.Size,
Mtime: f.Modified,
}
if f.Parent != "" {
meta.Parents = []string{f.Parent}
}
if err := callback(meta); err != nil {
return err
}
}
return nil
}
// GetFileStream downloads the file content from /api/files/{id}.
func (s *HTTPSource) GetFileStream(fileID string) (io.ReadCloser, error) {
resp, err := s.client.Get(s.baseURL + "/api/files/" + fileID)
if err != nil {
return nil, fmt.Errorf("fetch file %s: %w", fileID, err)
}
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
return nil, fmt.Errorf("server returned %d for file %s", resp.StatusCode, fileID)
}
return resp.Body, nil
}
// Info returns source metadata.
func (s *HTTPSource) Info() core.SourceInfo {
return core.SourceInfo{
Type: "http",
Account: s.baseURL,
Path: "/",
}
}
// Size returns the total size by summing all files.
func (s *HTTPSource) Size(ctx context.Context) (*SourceSize, error) {
var totalBytes, totalFiles int64
err := s.Walk(ctx, func(meta core.FileMeta) error {
if meta.Type == core.FileTypeFile {
totalFiles++
totalBytes += meta.Size
}
return nil
})
if err != nil {
return nil, err
}
return &SourceSize{
Bytes: totalBytes,
Files: totalFiles,
}, nil
}
Using the Custom Source
source := store.NewHTTPSource("https://example.com")
result, err := client.Backup(ctx, source)
Incremental Source Interface
For sources that support delta-based backups (like Google Drive Changes API or OneDrive Delta API), implement the IncrementalSource interface:
type IncrementalSource interface {
Source
GetStartPageToken() (string, error)
WalkChanges(ctx context.Context, token string, callback func(FileChange) error) (newToken string, err error)
}
IncrementalSource Methods
GetStartPageToken
GetStartPageToken() (string, error)
Returns an opaque token representing the current head of the change stream. The engine calls this before the first full Walk to capture the baseline state.
Returns:
- Token string to persist in the snapshot
- Error if token cannot be retrieved
WalkChanges
WalkChanges(ctx context.Context, token string, callback func(FileChange) error) (newToken string, err error)
Emits only the entries that changed since token. Returns the new token to persist for the next run.
Parameters:
ctx - Context for cancellation
token - Token from the previous snapshot (or from GetStartPageToken)
callback - Function called for each change
Returns:
newToken - New token to store in the snapshot
err - Error if change enumeration fails
FileChange
type FileChange struct {
Type ChangeType // "upsert" or "delete"
Meta core.FileMeta
}
type ChangeType string
const (
ChangeUpsert ChangeType = "upsert" // File or folder created/modified
ChangeDelete ChangeType = "delete" // File or folder removed
)
For ChangeDelete, only Meta.FileID is required. All other fields can be empty.
Example: Incremental HTTP Source
Extending the previous HTTP source to support incremental backups:
// HTTPChangeSource adds incremental support to HTTPSource.
type HTTPChangeSource struct {
*HTTPSource
}
func NewHTTPChangeSource(baseURL string) *HTTPChangeSource {
return &HTTPChangeSource{
HTTPSource: NewHTTPSource(baseURL),
}
}
// GetStartPageToken fetches the current change token from the server.
func (s *HTTPChangeSource) GetStartPageToken() (string, error) {
resp, err := s.client.Get(s.baseURL + "/api/changes/token")
if err != nil {
return "", fmt.Errorf("fetch change token: %w", err)
}
defer resp.Body.Close()
var result struct {
Token string `json:"token"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return "", fmt.Errorf("parse change token: %w", err)
}
return result.Token, nil
}
// WalkChanges fetches changes since the given token.
func (s *HTTPChangeSource) WalkChanges(
ctx context.Context,
token string,
callback func(FileChange) error,
) (string, error) {
resp, err := s.client.Get(s.baseURL + "/api/changes?since=" + token)
if err != nil {
return "", fmt.Errorf("fetch changes: %w", err)
}
defer resp.Body.Close()
var result struct {
Changes []struct {
Type string `json:"type"` // "upsert" or "delete"
ID string `json:"id"`
Name string `json:"name"`
Size int64 `json:"size"`
Modified int64 `json:"modified"`
IsDir bool `json:"is_dir"`
Parent string `json:"parent"`
} `json:"changes"`
NewToken string `json:"new_token"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return "", fmt.Errorf("parse changes: %w", err)
}
// Emit folder changes first (upserts only, deletes can be in any order)
for _, c := range result.Changes {
if c.Type != "upsert" || !c.IsDir {
continue
}
change := FileChange{
Type: ChangeUpsert,
Meta: core.FileMeta{
Version: 1,
FileID: c.ID,
Name: c.Name,
Type: core.FileTypeFolder,
Mtime: c.Modified,
},
}
if c.Parent != "" {
change.Meta.Parents = []string{c.Parent}
}
if err := callback(change); err != nil {
return "", err
}
}
// Emit file changes and deletions
for _, c := range result.Changes {
if c.Type == "upsert" && c.IsDir {
continue // Already emitted above
}
var change FileChange
if c.Type == "delete" {
change = FileChange{
Type: ChangeDelete,
Meta: core.FileMeta{
FileID: c.ID,
},
}
} else {
change = FileChange{
Type: ChangeUpsert,
Meta: core.FileMeta{
Version: 1,
FileID: c.ID,
Name: c.Name,
Type: core.FileTypeFile,
Size: c.Size,
Mtime: c.Modified,
},
}
if c.Parent != "" {
change.Meta.Parents = []string{c.Parent}
}
}
if err := callback(change); err != nil {
return "", err
}
}
return result.NewToken, nil
}
How the Engine Uses Incremental Sources
- First backup - Engine calls
GetStartPageToken(), then Walk() for full scan, stores token in snapshot
- Subsequent backups - Engine calls
WalkChanges(token) with the stored token, processes only the delta
- Token updated - New token from
WalkChanges is stored in the snapshot for the next run
If WalkChanges fails (e.g., token expired), the engine automatically falls back to a full Walk.
Implementation Guidelines
Parent-Child Ordering
The most critical requirement is that parents must be emitted before children. This ensures the HAMT tree structure is built correctly.
// BAD: Children before parents
callback(core.FileMeta{FileID: "file.txt", Parents: []string{"folder1"}})
callback(core.FileMeta{FileID: "folder1", Type: core.FileTypeFolder})
// GOOD: Parents before children
callback(core.FileMeta{FileID: "folder1", Type: core.FileTypeFolder})
callback(core.FileMeta{FileID: "file.txt", Parents: []string{"folder1"}})
For sources with complex hierarchies, use topological sorting to ensure correct ordering.
FileID Stability
FileID must be stable across backups. If a file’s FileID changes, the engine treats it as a deletion + addition rather than a modification.
// GOOD: Use stable IDs from the source API
FileID: googleDriveFileID // Stable across renames
// BAD: Use paths that change when files move
FileID: "/home/user/documents/report.pdf" // Changes if moved
ContentHash Optimization
If your source provides checksums (like Google Drive’s SHA-256 or OneDrive’s QuickXorHash), include them in ContentHash:
meta.ContentHash = file.SHA256Checksum // From source API
This allows the engine to skip downloading unchanged files during incremental backups.
Error Handling
Return descriptive errors that help users diagnose issues:
if resp.StatusCode == 401 {
return fmt.Errorf("authentication failed: check your API credentials")
}
if resp.StatusCode == 429 {
return fmt.Errorf("rate limit exceeded: try again later")
}
return fmt.Errorf("fetch file list: HTTP %d: %s", resp.StatusCode, resp.Status)
Context Cancellation
Respect the context.Context passed to Walk, WalkChanges, and Size:
func (s *MySource) Walk(ctx context.Context, callback func(core.FileMeta) error) error {
for _, file := range files {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err := callback(convertFileMeta(file)); err != nil {
return err
}
}
return nil
}
Testing Your Source
Create a simple test to verify your source implementation:
func TestHTTPSource(t *testing.T) {
source := NewHTTPSource("https://example.com")
// Test Info
info := source.Info()
if info.Type != "http" {
t.Errorf("expected type 'http', got %q", info.Type)
}
// Test Walk
ctx := context.Background()
var files []core.FileMeta
err := source.Walk(ctx, func(meta core.FileMeta) error {
files = append(files, meta)
return nil
})
if err != nil {
t.Fatal(err)
}
if len(files) == 0 {
t.Fatal("expected files, got none")
}
// Test GetFileStream
for _, f := range files {
if f.Type != core.FileTypeFile {
continue
}
stream, err := source.GetFileStream(f.FileID)
if err != nil {
t.Errorf("GetFileStream(%s): %v", f.FileID, err)
continue
}
stream.Close()
}
}
Registering Your Source
To use your custom source with the CLI, add it to cmd/cloudstic/main.go in the initSource function:
func initSource(sourceType string, flags *flag.FlagSet) (store.Source, error) {
switch sourceType {
case "local":
// ... existing code ...
case "http":
var baseURL string
flags.StringVar(&baseURL, "url", "", "HTTP server base URL")
flags.Parse(os.Args[3:])
if baseURL == "" {
return nil, fmt.Errorf("-url is required")
}
return store.NewHTTPSource(baseURL), nil
default:
return nil, fmt.Errorf("unknown source type: %s", sourceType)
}
}
Then use it:
cloudstic backup -source http -url https://example.com