Co-Pilot / 辅助式
更新于 a month ago

dynamodb

Iitsmostafa
1.0k
itsmostafa/aws-agent-skills/skills/dynamodb
78
Agent 评分

💡 摘要

一个用于与 AWS DynamoDB 交互的全面参考和指南,涵盖核心概念、CLI/boto3 操作、最佳实践和故障排除。

🎯 适合人群

构建无服务器应用程序的 AWS 开发人员管理 NoSQL 数据库的 DevOps 工程师设计可扩展数据层的解决方案架构师实现数据存储模式的数据工程师

🤖 AI 吐槽:这项技能本质上是一份组织良好的速查表,证明有时最有用的工具就是知道去哪里查找。

安全分析中风险

该技能需要具有 DynamoDB 权限的 AWS 凭证,如果凭证处理不当,存在权限提升或数据泄露的风险。缓解措施:使用具有最小权限策略的 IAM 角色,切勿在脚本中硬编码凭证。


name: dynamodb description: AWS DynamoDB NoSQL database for scalable data storage. Use when designing table schemas, writing queries, configuring indexes, managing capacity, implementing single-table design, or troubleshooting performance issues. last_updated: "2026-01-07" doc_source: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/

AWS DynamoDB

Amazon DynamoDB is a fully managed NoSQL database service providing fast, predictable performance at any scale. It supports key-value and document data structures.

Table of Contents

Core Concepts

Keys

| Key Type | Description | |----------|-------------| | Partition Key (PK) | Required. Determines data distribution | | Sort Key (SK) | Optional. Enables range queries within partition | | Composite Key | PK + SK combination |

Secondary Indexes

| Index Type | Description | |------------|-------------| | GSI (Global Secondary Index) | Different PK/SK, separate throughput, eventually consistent | | LSI (Local Secondary Index) | Same PK, different SK, shares table throughput, strongly consistent option |

Capacity Modes

| Mode | Use Case | |------|----------| | On-Demand | Unpredictable traffic, pay-per-request | | Provisioned | Predictable traffic, lower cost, can use auto-scaling |

Common Patterns

Create a Table

AWS CLI:

aws dynamodb create-table \ --table-name Users \ --attribute-definitions \ AttributeName=PK,AttributeType=S \ AttributeName=SK,AttributeType=S \ --key-schema \ AttributeName=PK,KeyType=HASH \ AttributeName=SK,KeyType=RANGE \ --billing-mode PAY_PER_REQUEST

boto3:

import boto3 dynamodb = boto3.resource('dynamodb') table = dynamodb.create_table( TableName='Users', KeySchema=[ {'AttributeName': 'PK', 'KeyType': 'HASH'}, {'AttributeName': 'SK', 'KeyType': 'RANGE'} ], AttributeDefinitions=[ {'AttributeName': 'PK', 'AttributeType': 'S'}, {'AttributeName': 'SK', 'AttributeType': 'S'} ], BillingMode='PAY_PER_REQUEST' ) table.wait_until_exists()

Basic CRUD Operations

import boto3 from boto3.dynamodb.conditions import Key, Attr dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('Users') # Put item table.put_item( Item={ 'PK': 'USER#123', 'SK': 'PROFILE', 'name': 'John Doe', 'email': 'john@example.com', 'created_at': '2024-01-15T10:30:00Z' } ) # Get item response = table.get_item( Key={'PK': 'USER#123', 'SK': 'PROFILE'} ) item = response.get('Item') # Update item table.update_item( Key={'PK': 'USER#123', 'SK': 'PROFILE'}, UpdateExpression='SET #name = :name, updated_at = :updated', ExpressionAttributeNames={'#name': 'name'}, ExpressionAttributeValues={ ':name': 'John Smith', ':updated': '2024-01-16T10:30:00Z' } ) # Delete item table.delete_item( Key={'PK': 'USER#123', 'SK': 'PROFILE'} )

Query Operations

# Query by partition key response = table.query( KeyConditionExpression=Key('PK').eq('USER#123') ) # Query with sort key condition response = table.query( KeyConditionExpression=Key('PK').eq('USER#123') & Key('SK').begins_with('ORDER#') ) # Query with filter response = table.query( KeyConditionExpression=Key('PK').eq('USER#123'), FilterExpression=Attr('status').eq('active') ) # Query with projection response = table.query( KeyConditionExpression=Key('PK').eq('USER#123'), ProjectionExpression='PK, SK, #name, email', ExpressionAttributeNames={'#name': 'name'} ) # Paginated query paginator = dynamodb.meta.client.get_paginator('query') for page in paginator.paginate( TableName='Users', KeyConditionExpression='PK = :pk', ExpressionAttributeValues={':pk': {'S': 'USER#123'}} ): for item in page['Items']: print(item)

Batch Operations

# Batch write (up to 25 items) with table.batch_writer() as batch: for i in range(100): batch.put_item(Item={ 'PK': f'USER#{i}', 'SK': 'PROFILE', 'name': f'User {i}' }) # Batch get (up to 100 items) dynamodb = boto3.resource('dynamodb') response = dynamodb.batch_get_item( RequestItems={ 'Users': { 'Keys': [ {'PK': 'USER#1', 'SK': 'PROFILE'}, {'PK': 'USER#2', 'SK': 'PROFILE'} ] } } )

Create GSI

aws dynamodb update-table \ --table-name Users \ --attribute-definitions AttributeName=email,AttributeType=S \ --global-secondary-index-updates '[ { "Create": { "IndexName": "email-index", "KeySchema": [{"AttributeName": "email", "KeyType": "HASH"}], "Projection": {"ProjectionType": "ALL"} } } ]'

Conditional Writes

from botocore.exceptions import ClientError # Only put if item doesn't exist try: table.put_item( Item={'PK': 'USER#123', 'SK': 'PROFILE', 'name': 'John'}, ConditionExpression='attribute_not_exists(PK)' ) except ClientError as e: if e.response['Error']['Code'] == 'ConditionalCheckFailedException': print("Item already exists") # Optimistic locking with version table.update_item( Key={'PK': 'USER#123', 'SK': 'PROFILE'}, UpdateExpression='SET #name = :name, version = version + :inc', ConditionExpression='version = :current_version', ExpressionAttributeNames={'#name': 'name'}, ExpressionAttributeValues={ ':name': 'New Name', ':inc': 1, ':current_version': 5 } )

CLI Reference

Table Operations

| Command | Description | |---------|-------------| | aws dynamodb create-table | Create table | | aws dynamodb describe-table | Get table info | | aws dynamodb update-table | Modify table/indexes | | aws dynamodb delete-table | Delete table | | aws dynamodb list-tables | List all tables |

Item Operations

| Command | Description | |---------|-------------| | aws dynamodb put-item | Create/replace item | | aws dynamodb get-item | Read single item | | aws dynamodb update-item | Update item attributes | | aws dynamodb delete-item | Delete item | | aws dynamodb query | Query by key | | aws dynamodb scan | Full table scan |

Batch Operations

| Command | Description | |---------|-------------| | aws dynamodb batch-write-item | Batch write (25 max) | | aws dynamodb batch-get-item | Batch read (100 max) | | aws dynamodb transact-write-items | Transaction write | | aws dynamodb transact-get-items | Transaction read |

Best Practices

Data Modeling

  • Design for access patterns — know your queries before designing
  • Use composite keys — PK for grouping, SK for sorting/filtering
  • Prefer query over scan — scans are expensive
  • Use sparse indexes — only items with index attributes are indexed
  • Consider single-table design for related entities

Performance

  • Distribute partition keys evenly — avoid hot partitions
  • Use batch operations to reduce API calls
  • Enable DAX for read-heavy workloads
  • Use projections to reduce data transfer

Cost Optimization

  • Use on-demand for variable workloads
  • Use provisioned + auto-scaling for predictable workloads
  • Set TTL for expiring data
  • Archive to S3 for cold data

Troubleshooting

Throttling

Symptom: ProvisionedThroughputExceededException

Causes:

  • Hot partition (uneven key distribution)
  • Burst traffic exceeding capacity
  • GSI throttling affecting base table

Solutions:

# Use exponential backoff import time from botocore.config import Config config = Config( retries={ 'max_attempts': 10, 'mode': 'adaptive' } ) dynamodb = boto3.resource('dynamodb', config=config)

Hot Partitions

Debug:

# Check consumed capacity by partition aws cloudwatch get-metric-statistics \ --namespace AWS/DynamoDB \ --metric-name ConsumedReadCapacityUnits \ --dimensions Name=TableName,Value=Users \ --start-time $(date -d '1 hour ago' -u +%Y-%m-%dT%H:%M:%SZ) \ --end-time $(date -u +%Y-%m-%dT%H:%M:%SZ) \ --period 60 \ --statistics Sum

Solutions:

  • Add randomness to partition keys
  • Use write sharding
  • Distribute access across partitions

Query Returns No Items

Debug checklist:

  1. Verify key values exactly match (case-sensitive)
  2. Check key types (S, N, B)
  3. Confirm table/index name
  4. Review filter expressions (they apply AFTER read)

Scan Performance

Issue: Scans are slow and expensive

Solutions:

  • Use parallel scan for large tables
  • Create GSI for the access pattern
  • Use filter expressions to reduce returned data
# Parallel scan import concurrent.futures def scan_segment(segment, total_segments): return table.scan( Segment=segment, TotalSegments=total_segments ) with concurrent.futures.ThreadPoolExecutor() as executor: results = list(executor.map( lambda s: scan_segment(s, 4), range(4) ))

References

五维分析
清晰度9/10
创新性3/10
实用性10/10
完整性9/10
可维护性8/10
优缺点分析

优点

  • 非常实用,提供即用型代码片段
  • 涵盖操作命令和架构最佳实践
  • 包含针对常见问题的宝贵故障排除指导

缺点

  • 缺乏交互式或代理特定功能(例如,模式生成、查询构建)
  • 主要是静态知识库而非主动工具
  • 除了整合现有文档外,没有新颖功能

相关技能

pytorch

S
toolCode Lib / 代码库
92/ 100

“它是深度学习的瑞士军刀,但祝你好运能从47种安装方法里找到那个不会搞崩你系统的那一个。”

agno

S
toolCode Lib / 代码库
90/ 100

“它承诺成为智能体领域的Kubernetes,但得看开发者有没有耐心学习又一个编排层。”

nuxt-skills

S
toolCo-Pilot / 辅助式
90/ 100

“这本质上是一份组织良好的小抄,能把你的 AI 助手变成一只 Nuxt 框架的复读机。”

免责声明:本内容来源于 GitHub 开源项目,仅供展示和评分分析使用。

版权归原作者所有 itsmostafa.