Batch Operations¶
Batch operations allow efficient bulk reads, writes, and deletes with automatic chunking and retry handling.
Overview¶
DynamoDB has limits on batch operations:
| Operation | Limit |
|---|---|
| BatchGetItem | 100 items per request |
| BatchWriteItem | 25 items per request |
Dynorm automatically handles chunking and retries for unprocessed items.
Batch Get¶
Retrieve multiple entities by ID:
ids := []string{"01HQ3K...", "01HQ4L...", "01HQ5M..."}
users, err := UserRepo.BatchGet(ids)
if err != nil {
return err
}
for _, user := range users {
fmt.Printf("User: %s\n", user.Email)
}
Automatic Chunking
If you pass more than 100 IDs, Dynorm automatically splits them into chunks.
Batch Save¶
Save multiple entities efficiently:
users := []*User{
{Email: "user1@example.com", FirstName: "User", LastName: "One"},
{Email: "user2@example.com", FirstName: "User", LastName: "Two"},
{Email: "user3@example.com", FirstName: "User", LastName: "Three"},
}
err := UserRepo.BatchSave(users)
if err != nil {
return err
}
// All users now have IDs and timestamps set
for _, user := range users {
fmt.Printf("Saved: %s\n", user.ID)
}
Batch Delete¶
Delete multiple entities by ID:
ids := []string{"01HQ3K...", "01HQ4L...", "01HQ5M..."}
err := UserRepo.BatchDelete(ids)
if err != nil {
return err
}
Delete by Entity¶
users, _ := UserRepo.
Where("Status", "=", "banned").
GetAll()
err := UserRepo.BatchDeleteEntities(users.Items())
Collection Integration¶
Collections provide batch operations:
users, _ := UserRepo.
Where("Status", "=", "pending").
GetAll()
// Batch save all
err := users.SaveAll()
// Batch delete all
err := users.DeleteAll()
// Update field on all and save
err := users.UpdateField("Status", "active")
// Update multiple fields and save
err := users.UpdateAll(map[string]any{
"Status": "processed",
"UpdatedAt": time.Now(),
})
Chunking¶
Process large collections in chunks:
users, _ := UserRepo.All()
// Split into chunks of 25 (DynamoDB write limit)
chunks := users.Chunk(25)
for i, chunk := range chunks {
fmt.Printf("Processing batch %d of %d\n", i+1, len(chunks))
chunk.Each(func(u *User) {
u.ProcessedAt = time.Now()
})
if err := chunk.SaveAll(); err != nil {
return fmt.Errorf("batch %d failed: %w", i+1, err)
}
}
Retry Handling¶
Dynorm automatically retries unprocessed items:
The retry behavior:
- Execute batch operation
- Check for unprocessed items
- Retry unprocessed items (up to
MaxRetries) - Return after all items processed or max retries reached
Constants¶
const (
MaxBatchGetItems = 100 // Max items per BatchGetItem
MaxBatchWriteItems = 25 // Max items per BatchWriteItem
MaxRetries = 3 // Max retries for unprocessed items
)
Error Handling¶
err := UserRepo.BatchSave(users)
if err != nil {
// Could be:
// - ValidationException (invalid items)
// - ProvisionedThroughputExceededException (throttled)
// - InternalServerError (DynamoDB issue)
return err
}
Partial Failures
After max retries, some items may remain unprocessed. Check your data or implement additional retry logic.
Examples¶
Import Users from CSV¶
func ImportUsers(csvFile string) error {
users, err := parseCSV(csvFile)
if err != nil {
return err
}
// Process in batches
for i := 0; i < len(users); i += 25 {
end := i + 25
if end > len(users) {
end = len(users)
}
batch := users[i:end]
if err := UserRepo.BatchSave(batch); err != nil {
return fmt.Errorf("batch starting at %d failed: %w", i, err)
}
fmt.Printf("Imported %d/%d users\n", end, len(users))
}
return nil
}
Bulk Status Update¶
func DeactivateUsers(userIDs []string) error {
// First, get all users
users, err := UserRepo.BatchGet(userIDs)
if err != nil {
return err
}
// Update status
for _, user := range users {
user.Status = "inactive"
user.DeactivatedAt = time.Now()
}
// Batch save
return UserRepo.BatchSave(users)
}
Clean Up Old Records¶
func CleanupOldRecords(olderThan time.Time) error {
records, err := RecordRepo.
Where("CreatedAt", "<", olderThan).
GetAll()
if err != nil {
return err
}
if records.IsEmpty() {
return nil
}
return records.DeleteAll()
}
Parallel Processing¶
func ProcessAllUsers() error {
users, err := UserRepo.All()
if err != nil {
return err
}
chunks := users.Chunk(100)
errChan := make(chan error, len(chunks))
// Process chunks in parallel
for _, chunk := range chunks {
go func(c *dynorm.Collection[User]) {
c.Each(func(u *User) {
u.ProcessedAt = time.Now()
})
errChan <- c.SaveAll()
}(chunk)
}
// Wait for all to complete
for range chunks {
if err := <-errChan; err != nil {
return err
}
}
return nil
}
Best Practices¶
Do
- Use batch operations for bulk processing
- Process in chunks for very large datasets
- Handle partial failures gracefully
- Use collection chunking methods
Don't
- Save entities one at a time in loops
- Ignore unprocessed items
- Use huge batch sizes (stick to limits)
Batch Flow¶
sequenceDiagram
participant App
participant Dynorm
participant DynamoDB
App->>Dynorm: BatchSave(100 users)
Dynorm->>Dynorm: Split into 4 chunks (25 each)
loop For each chunk
Dynorm->>DynamoDB: BatchWriteItem
DynamoDB-->>Dynorm: Response + UnprocessedItems
alt Has Unprocessed
loop Retry (max 3)
Dynorm->>DynamoDB: BatchWriteItem (unprocessed)
DynamoDB-->>Dynorm: Response
end
end
end
Dynorm-->>App: Success/Error Next Steps¶
- Transactions - ACID operations
- Collection - Collection operations
- Query Builder - Query for batch processing