Skip to content

Batch Operations

Batch operations allow efficient bulk reads, writes, and deletes with automatic chunking and retry handling.

Overview

DynamoDB has limits on batch operations:

Operation Limit
BatchGetItem 100 items per request
BatchWriteItem 25 items per request

Dynorm automatically handles chunking and retries for unprocessed items.

Batch Get

Retrieve multiple entities by ID:

ids := []string{"01HQ3K...", "01HQ4L...", "01HQ5M..."}

users, err := UserRepo.BatchGet(ids)
if err != nil {
    return err
}

for _, user := range users {
    fmt.Printf("User: %s\n", user.Email)
}

Automatic Chunking

If you pass more than 100 IDs, Dynorm automatically splits them into chunks.

Batch Save

Save multiple entities efficiently:

users := []*User{
    {Email: "user1@example.com", FirstName: "User", LastName: "One"},
    {Email: "user2@example.com", FirstName: "User", LastName: "Two"},
    {Email: "user3@example.com", FirstName: "User", LastName: "Three"},
}

err := UserRepo.BatchSave(users)
if err != nil {
    return err
}

// All users now have IDs and timestamps set
for _, user := range users {
    fmt.Printf("Saved: %s\n", user.ID)
}

Batch Delete

Delete multiple entities by ID:

ids := []string{"01HQ3K...", "01HQ4L...", "01HQ5M..."}

err := UserRepo.BatchDelete(ids)
if err != nil {
    return err
}

Delete by Entity

users, _ := UserRepo.
    Where("Status", "=", "banned").
    GetAll()

err := UserRepo.BatchDeleteEntities(users.Items())

Collection Integration

Collections provide batch operations:

users, _ := UserRepo.
    Where("Status", "=", "pending").
    GetAll()

// Batch save all
err := users.SaveAll()

// Batch delete all
err := users.DeleteAll()

// Update field on all and save
err := users.UpdateField("Status", "active")

// Update multiple fields and save
err := users.UpdateAll(map[string]any{
    "Status":    "processed",
    "UpdatedAt": time.Now(),
})

Chunking

Process large collections in chunks:

users, _ := UserRepo.All()

// Split into chunks of 25 (DynamoDB write limit)
chunks := users.Chunk(25)

for i, chunk := range chunks {
    fmt.Printf("Processing batch %d of %d\n", i+1, len(chunks))

    chunk.Each(func(u *User) {
        u.ProcessedAt = time.Now()
    })

    if err := chunk.SaveAll(); err != nil {
        return fmt.Errorf("batch %d failed: %w", i+1, err)
    }
}

Retry Handling

Dynorm automatically retries unprocessed items:

// Up to 3 retries by default
err := UserRepo.BatchSave(largeUserList)

The retry behavior:

  1. Execute batch operation
  2. Check for unprocessed items
  3. Retry unprocessed items (up to MaxRetries)
  4. Return after all items processed or max retries reached

Constants

const (
    MaxBatchGetItems   = 100 // Max items per BatchGetItem
    MaxBatchWriteItems = 25  // Max items per BatchWriteItem
    MaxRetries         = 3   // Max retries for unprocessed items
)

Error Handling

err := UserRepo.BatchSave(users)
if err != nil {
    // Could be:
    // - ValidationException (invalid items)
    // - ProvisionedThroughputExceededException (throttled)
    // - InternalServerError (DynamoDB issue)
    return err
}

Partial Failures

After max retries, some items may remain unprocessed. Check your data or implement additional retry logic.

Examples

Import Users from CSV

func ImportUsers(csvFile string) error {
    users, err := parseCSV(csvFile)
    if err != nil {
        return err
    }

    // Process in batches
    for i := 0; i < len(users); i += 25 {
        end := i + 25
        if end > len(users) {
            end = len(users)
        }

        batch := users[i:end]
        if err := UserRepo.BatchSave(batch); err != nil {
            return fmt.Errorf("batch starting at %d failed: %w", i, err)
        }

        fmt.Printf("Imported %d/%d users\n", end, len(users))
    }

    return nil
}

Bulk Status Update

func DeactivateUsers(userIDs []string) error {
    // First, get all users
    users, err := UserRepo.BatchGet(userIDs)
    if err != nil {
        return err
    }

    // Update status
    for _, user := range users {
        user.Status = "inactive"
        user.DeactivatedAt = time.Now()
    }

    // Batch save
    return UserRepo.BatchSave(users)
}

Clean Up Old Records

func CleanupOldRecords(olderThan time.Time) error {
    records, err := RecordRepo.
        Where("CreatedAt", "<", olderThan).
        GetAll()
    if err != nil {
        return err
    }

    if records.IsEmpty() {
        return nil
    }

    return records.DeleteAll()
}

Parallel Processing

func ProcessAllUsers() error {
    users, err := UserRepo.All()
    if err != nil {
        return err
    }

    chunks := users.Chunk(100)
    errChan := make(chan error, len(chunks))

    // Process chunks in parallel
    for _, chunk := range chunks {
        go func(c *dynorm.Collection[User]) {
            c.Each(func(u *User) {
                u.ProcessedAt = time.Now()
            })
            errChan <- c.SaveAll()
        }(chunk)
    }

    // Wait for all to complete
    for range chunks {
        if err := <-errChan; err != nil {
            return err
        }
    }

    return nil
}

Best Practices

Do

  • Use batch operations for bulk processing
  • Process in chunks for very large datasets
  • Handle partial failures gracefully
  • Use collection chunking methods

Don't

  • Save entities one at a time in loops
  • Ignore unprocessed items
  • Use huge batch sizes (stick to limits)

Batch Flow

sequenceDiagram
    participant App
    participant Dynorm
    participant DynamoDB

    App->>Dynorm: BatchSave(100 users)
    Dynorm->>Dynorm: Split into 4 chunks (25 each)

    loop For each chunk
        Dynorm->>DynamoDB: BatchWriteItem
        DynamoDB-->>Dynorm: Response + UnprocessedItems

        alt Has Unprocessed
            loop Retry (max 3)
                Dynorm->>DynamoDB: BatchWriteItem (unprocessed)
                DynamoDB-->>Dynorm: Response
            end
        end
    end

    Dynorm-->>App: Success/Error

Next Steps