ALTER TABLE user_info ADD school varchar(15) AFTER `level`; ALTER TABLE user_info CHANGE job profession varchar(10); ALTER TABLE user_info CHANGE COLUMN achievement achievement intDEFAULT0;
DML
给指定字段添加数据:INSERT INTO 表名 (字段1,字段2,...) VALUES (值1,值2);
给全部字段添加数据:INSERT INTO 表名 VALUES (值1,值2);
批量添加数据:
1 2
INSERT INTO 表名 (字段1,字段2,...) VALUES (值1,值2, ...), (值1,值2, ...), (值1,值2, ...); INSERT INTO 表名 VALUES (值1,值2,...), (值1,值2,...), (值1,值2,...);
Each page consits of the header, the record space and a pointer array. Each slot in this array points to a record within the page.
A record is located by providing its page address and the slot number. The combination is called RID.
Life Cycle of a record:
Insert: Find an empty space to put it and set a slot at the very end of the page to point to it.
Delete: Remove the record and reclaim this space, set its slot number to null. When there are too many garbage slots, the system will drop the index structure, do reorganization on this disk page by removing all null slots, and reconstruct the index structure from scratch.
Non-clustered Index: the data of the disk page is independent of the bucket or leaf node of index.
Clustered Index: the data of the disk page resides within the bucket or leaf node of index.
Hash Index: the item in buckets is not ordered by the attribute value of index.
B+Tree Index: the item in leaf nodes is ordered by the attribute value of index.
Primary Index: the data in the disk page is ordered.
Secondary Index: the data in the disk page is not ordered.
The HRPS declusters a relation into fragment based on the following criteria:
Each fragment contains approximately FC tuples.
Each fragment contains a unique range of values of the partitioning attribute.
The variable FC is determined based on the processing capability of the system and the resource requirements of the queries that access the relation (rather than the number of processors in the configuration).
A major underlying assumption of this partitioning strategy is that the selection operators which access the database retrieve and process the selected tuples using either a range predicate or an equality predicate.
For each query Qi, the workload defines the CPU processing time (CPUi), the Disk Processing Time (Diski), and the Network Processing time (Neti) of that query. Observe that these times are determined based on the resource requirements of each individual query and the processing capability of the system. Each query retrieves and processes (TuplesPerQi) tuples from the database. Furthermore, we assume that the workload defines the frequency of occurrence of each query (FreqQi).
Rather than describing the HRPS with respect to each query in the workload, we deline an average query (Qavg) that is representative of all the queries in the workload. The CPU, disk and network processing quanta for this query are:
Assume that a single processor cannot overlap the use of two resources for an individual query. Thus, the execution time of Qavg on a single processor in a single user environment is:
As more processors are used for query execution, the response time decreases. However, this also incurs additional overhead, represented by the variable CP, which refers to the cost of coordinating the query execution across multiple processors (e.g., messaging overhead). The response time of the query on M processors can be described by the following formula:
In a single-user environment, both HRPS and range partitioning perform similarly because they both efficiently execute the query on the required processor. However, in a multi-user environment, the range partitioning strategy is likely to perform better because it can distribute the workload across multiple processors, improving system throughput. In contrast, HRPS might not utilize all available processors as effectively, potentially leading to lower throughput.
Instead of M representing the number of processors over which a relation should be declustered, M is used instead to represent the number of processors that should participate in the execution of Qavg. Since Qavg processes TuplesPerQavg tuples, each fragment of the relation should contain FC = TuplesPerQavg / M tuples.
The process of fragmenting and distributing data in HRPS:
Sorting the relation: The relation is first sorted based on the partitioning attribute to ensure each fragment contains a distinct range of values.
Fragmentation: The relation is then split into fragments, each containing approximately FC tuples.
Round-robin distribution: These fragments are distributed to processors in a round-robin fashion, ensuring that adjacent fragments are assigned to different processors (unless the number of processors N is less than the required processors M).
Storing fragments: All the fragments for a relation on a given processor are stored in the same physical file.
Range table: The mapping of fragments to processors is maintained in a one-dimensional directory called the range table.
This method ensures that at least M processors and at most M + 1 processors participate in the execution of a query.
M = N:系统和查询需求匹配,HRPS 调度所有处理器,达到最大并行度和最优性能。
M < N:HRPS 只调度一部分处理器执行查询,减少通信开销,但部分处理器资源可能闲置。
M > N:HRPS 将多个片段分配给处理器,尽量利用所有处理器,但每个处理器负担加重,查询执行速度可能受到影响。
HRPS in this paper supports only homogeneous nodes.
Questions
How does HRPS decide the ideal degree of parallelism for a query?
HRPS (Hybrid-Range Partitioning Strategy) decides the ideal degree of parallelism by analyzing the resource requirements of the query, such as CPU, disk I/O, and communication costs. It calculates the optimal number of processors (denoted as M) based on these factors. The strategy strikes a balance between minimizing query response time and avoiding excessive overhead from using too many processors.
Why is it not appropriate to direct a query that fetches one record using an index structure to all the nodes of a system based on the shared-nothing architecture?
Fetching one record should only involve the node that contains the relevant data, as querying all nodes wastes resources and increases response time.
How to extend HRPS to support heterogeneous nodes?
More powerful nodes would receive more fragments, while weaker nodes would handle fewer fragments.
The system could monitor node performance and dynamically adjust the degree of parallelism and fragment allocation based on current load and node availability.
Heavier tasks may be directed to more powerful nodes, while smaller or simpler queries could be executed on less powerful nodes.
Gamma is based on the concept of a shared-nothing architecture in which processors do not share disk drives or random access memory and can only communicate with one another by sending messages through an interconnection network. Mass storage in such an architecture is generally distributed among the processors by connecting one or more disk drives to each processor.
Reasons why the shared-nothing approach has become the architecture of choice.
There is nothing to prevent the architecture from scaling to 1000s of processors unlike shared-memory machines for which scaling beyond 30-40 processors may be impossible.
By associating a small number of disks with each processor and distributing the tuples of each relation across the disk drives, it is possible to achieve very high aggregate I/O bandwidths without using custom disk controllers
When Gamma’s system is figuring out the best way to run a query, it uses information about how the data is divided up (partitioned). This partitioning information helps the system decide how many processors (computers) need to be involved in running the query.
For hash partitioning: If a table (say, “X”) is divided based on a hash function applied to a certain column (like “y”), and the query asks for records where “X.y = some value,” the system can directly go to the specific processor that holds the data matching that value.
For range partitioning: If the table is divided based on ranges of values for a column, the system can limit the query to only the processors that have data within the relevant range. For example, if “X” is partitioned such that one processor handles values from 1 to 100, and another handles values from 101 to 200, then a query asking for “X.y between 50 and 150” will involve only the processors that have data in those ranges.
Different processes in the Gamma system work together. Here’s a simplified explanation of the main types of processes and their roles:
Catalog Manager: Acts like a “database encyclopedia,” storing all the information about data tables and structures. It ensures that data remains consistent when multiple users access the database.
Query Manager: Each user gets a query manager that handles query requests. It is responsible for parsing the query, optimizing it, and generating the execution plan.
Scheduler Processes: When a query is executed, the scheduler coordinates the execution steps. It activates the necessary operator processes (such as scan, selection, etc.) and ensures that all steps are performed in the correct order.
Operator Processes: These processes carry out specific database operations, like filtering data or joining tables. To reduce the startup delay during query execution, some operator processes are pre-initialized when the system starts.
Other Processes:
Deadlock Detection Process: Detects situations where two or more processes are stuck waiting for each other to release resources.
Recovery Process: Manages data recovery after a system failure.
How the Gamma system executes database queries?
Query Parsing and Optimization: When a user submits a query, Gamma first parses it to understand what the query is asking for. Then, the system optimizes the query to find the most efficient way to execute it.
Query Compilation: After optimization, the query is compiled into an “operator tree“ made up of different operations (such as scan, selection, join, etc.). This tree outlines the steps and the order in which the query will be executed.
Single-Site vs. Multi-Site Queries: If the query only involves data on a single node (e.g., querying a small table), the system executes it directly on that node. However, if the query involves data distributed across multiple nodes (e.g., joining large tables), the system uses a “scheduler process” to coordinate the execution.
Scheduler Coordination: The scheduler process is responsible for activating various operator processes across the nodes, such as instructing one node to scan data while another filters it. The scheduler also manages the flow of data between these operations, ensuring they happen in the correct order.
Returning the Results: Once all operations are completed, the query results are collected and returned to the user. For queries embedded in a program, the results are passed back to the program that initiated the query.
Different operations (like scanning data, filtering, joining tables, etc.) are carried out in a parallel manner. Here’s a simplified explanation:
Operator Processes: In Gamma, each operation in a query is handled by something called an “operator process.” For example, if the query needs to scan data from a table, filter some rows, and then join with another table, there would be separate operator processes for scanning, filtering, and joining.
Data Flow: The data flows from one operator process to the next. For instance, the scan operator reads data from the disk and sends it to the filter operator, which then passes the filtered results to the join operator. This creates a kind of “data pipeline.”
Split Table: Gamma uses a “split table” to decide where the data should go next. Think of it like a routing table that directs the flow of data. For example, if the data needs to be sent to multiple nodes for parallel processing, the split table helps determine which node each piece of data should go to.
End of Processing: Once an operator finishes processing all its data, it closes its output streams and sends a signal to the scheduler process (which coordinates the whole query) to let it know that this part of the work is done.
In simple terms, the operator and process structure in Gamma is like an assembly line where data moves from one step (operator) to the next, with each operator performing a specific task, and the split table guiding the data flow. This setup allows the system to process data in parallel across multiple nodes, making it much faster.
Operators
Selection Operator
Data Spread Across Multiple Disks: In Gamma, data tables are split up and stored across multiple disks (this is called “declustering”). Because of this, when you want to search (select) for specific data, the system can perform the search in parallel across multiple disks.
Parallel Selection Process:
If the search condition (predicate) matches the way the data is divided (partitioned), the system can narrow down the search to just the relevant nodes (computers with disks) that have the data. For example:
If the data is divided using a hash or range partitioning method based on a certain attribute (like “employee ID”), and the search is also based on that attribute (e.g., “employee ID = 123”), then the search can be directed only to the node that holds data matching that condition.
If the data is divided using a round-robin method (spreading data evenly across all disks) or if the search condition doesn’t match the partitioning attribute, then the system has to search on all nodes.
Performance Optimization:
To make the search faster, Gamma uses a “read-ahead“ technique. This means that when it reads one page of data, it starts loading the next page at the same time, so that the processing of data can keep going without waiting for the next page to load.
Join Operator
Using Hash Partitioning: The join algorithms in Gamma are based on a concept called “buckets.” This means splitting the two tables to be joined into separate groups (buckets) that don’t overlap. The groups are created by applying a hash function to the join attribute (e.g., Employee ID), so that data with the same hash value ends up in the same bucket.
By partitioning the data into different buckets, each bucket contains unique data subsets, allowing parallel processing of these buckets, which speeds up the join operation. Additionally, all data with the same join attribute value is in the same bucket, making it easier to perform the join.
Gamma implements four different parallel join algorithms:
Sort-Merge Join: Joins data by sorting and merging.
Grace Join: A distributed hash-based join algorithm.
Simple Hash Join: A straightforward hash-based partitioning join.
Hybrid Hash Join: A combination of different join techniques.
Default to Hybrid Hash Join: Research showed that the Hybrid Hash Join almost always performs the best, so Gamma uses this algorithm by default.
Limitations: These hash-based join algorithms can only handle equi-joins (joins with equality conditions, like “Employee ID = Department ID”). They currently don’t support non-equi-joins (conditions like “Salary > Department Budget * 2”). To address this, Gamma is working on designing a new parallel non-equi-join algorithm.
Hybrid Hash-Join
In the first phase, the algorithm uses a hash function to partition the inner (smaller) relation, R, into N buckets. The tuples of the first bucket are used to build an in-memory hash table while the remaining N-1 buckets are stored in temporary files. A good hash function produces just enough buckets to ensure that each bucket of tuples will be small enough to fit entirely in main memory.
During the second phase, relation S is partitioned using the hash function from step 1. Again, the last N-1 buckets are stored in temporary files while the tuples in the first bucket are used to immediately probe the in-memory hash table built during the first phase.
During the third phase, the algorithm joins the remaining N-1 buckets from relation R with their respective buckets from relation S.
The join is thus broken up into a series of smaller joins; each of which hopefully can be computed without experiencing join overflow. The size of the smaller relation determines the number of buckets; this calculation is independent of the size of the larger relation.
Parallel version of Hybrid Hash-Join
Partitioning into Buckets: The data from the two tables being joined is first divided into N buckets (small groups). The number of buckets is chosen so that each bucket can fit in the combined memory of the processors that are handling the join.
Storage of Buckets: Out of the N buckets, N-1 buckets are stored temporarily on disk across different disk sites, while one bucket is kept in memory for immediate processing.
Parallel Processing: A joining split table is used to decide which processor should handle each bucket, helping to divide the work across multiple processors. This means that different processors can work on different parts of the join at the same time, speeding up the process.
Overlapping Phases for Efficiency:
When partitioning the first table (R) into buckets, Gamma simultaneously builds a hash table for the first bucket in memory at each processor.
When partitioning the second table (S), Gamma simultaneously performs the join for the first bucket from S with the first bucket from R. This way, partitioning and joining overlap, making the process more efficient.
Adjusting the Split Table for Parallel Joining: The joining split table is updated to make sure that the data from the first bucket of both tables is sent to the right processors that will perform the join. When the remaining N-1 buckets are processed, only the routing for joining is needed.
Aggregate Operator
Parallel Calculation of Partial Results: Each processor in the Gamma system calculates the aggregate result for its own portion of the data simultaneously. For example, if the goal is to calculate a sum, each processor will first compute the sum for the data it is responsible for.
Combining Partial Results: After calculating their partial results, the processors send these results to a central process. This central process is responsible for combining all the partial results to produce the final answer.
Two-Step Computation:
Step 1: Each processor calculates the aggregate value (e.g., sum, count) for its data partition, resulting in partial results.
Step 2: The processors then redistribute these partial results based on the “group by” attribute. This means that the partial results for each group are collected at a single processor, where the final aggregation for that group is completed.
Update Operator
For the most part, the update operators (replace, delete, and append) are implemented using standard techniques. The only exception occurs when a replace operator modifies the partitioning attribute of a tuple. In this case, rather than writing the modified tuple back into the local fragment of the relation, the modified tuple is passed through a split table to determine which site should contain the tuple.
Concurrency Control
Gamma uses a two-phase locking strategy to manage concurrency. This means that before accessing data, a process must first acquire locks (first phase), and then release the locks after completing its operations (second phase). This ensures that multiple operations do not modify the same data at the same time, preventing conflicts.
Gamma supports two levels of lock granularity: file-level and page-level (smaller scope). There are also five lock modes:
S (Shared) Lock: Allows multiple operations to read the data simultaneously.
X (Exclusive) Lock: Only one operation can modify the data, while others must wait.
IS, IX, and SIX Locks: Used to manage locking at larger scopes, such as entire files, allowing different combinations of read and write permissions.
Each node in Gamma has its own lock manager and deadlock detector to handle local data locking. The lock manager maintains a lock table and a transaction wait-for-graph, which tracks which operations are waiting for which locks.
The cost of setting a lock depends on whether there is a conflict:
No Conflict: Takes about 100 instructions.
With Conflict: Takes about 250 instructions because the system needs to check the wait-for-graph for deadlocks and suspend the requesting transaction using a semaphore mechanism.
Gamma uses a centralized deadlock detection algorithm to handle deadlocks across nodes:
Periodically (initially every second), the centralized deadlock detector requests each node’s local wait-for-graph.
If no deadlock is found, the detection interval is doubled (up to 60 seconds). If a deadlock is found, the interval is halved (down to 1 second).
The collected graphs are combined into a global wait-for-graph. If a cycle is detected in this global graph, it indicates a deadlock.
When a deadlock is detected, the system will abort the transaction holding the fewest locks to free up resources quickly and allow other operations to proceed.
Recovery and Log
Logging Changes:
When a record in the database is updated, Gamma creates a log record that notes the change. Each log record has a unique identifier called a Log Sequence Number (LSN), which includes a node number (determined when the system is set up) and a local sequence number (which keeps increasing). These log records are used for recovery if something goes wrong.
Log Management:
The system sends log records from query processors to Log Managers, which are separate processors that organize the logs into a single stream.
If there are multiple Log Managers (M of them), a query processor sends its logs to one of them based on a simple formula: processor number mod M. This way, each query processor always sends its logs to the same Log Manager, making it easy to find logs later for recovery.
Writing Logs to Disk:
Once a “page” of log records is filled, it is saved to disk.
The Log Manager keeps a Flushed Log Table, which tracks the last log record written to disk for each node. This helps know which logs are safely stored.
Writing Data to Disk (WAL Protocol):
Before writing any changed data (a dirty page) to disk, the system checks if the corresponding log records have already been saved.
If the logs are saved, the data can be safely written to disk. If not, the system must first ensure the logs are written to disk before proceeding.
To avoid waiting too long for log confirmations, the system always tries to keep a certain number of clean buffer pages (unused pages) available.
Commit and Abort Handling:
Commit: If a transaction completes successfully, the system sends a commit message to all the relevant Log Managers.
Abort: If a transaction fails, an abort message is sent to all processors involved, and each processor retrieves its log records to undo the changes using the ARIES algorithm, which rolls back changes in the reverse order they occurred.
Recovery Process:
The system uses the ARIES algorithms for undoing changes, checkpointing, and restarting after a crash.
Checkpointing helps the system know the most recent stable state, reducing the amount of work needed during recovery.
Dataflow scheduling technologies
Data-Driven Execution Instead of Operator Control: Gamma’s dataflow scheduling lets data automatically move between operators, forming a pipeline. Each operator acts like a step on an assembly line: when data reaches the operator, it processes the data and then passes the processed results to the next operator.
Reducing Coordination Overhead: Because of this dataflow design, the system does not need to frequently coordinate or synchronize the execution of each operator. This approach reduces the complexity and overhead of scheduling, especially when multiple operators are running in parallel, and avoids performance bottlenecks caused by waiting or synchronization.
Inherent Support for Parallelism: Dataflow scheduling is well-suited for parallel processing because data can flow between multiple operators at the same time. For example, a query can simultaneously perform scanning, joining, and aggregation across different processors. Each operator can independently process the data it receives without waiting for other operators to finish, allowing the system to efficiently utilize the computational power of multiple processors.
Adaptability to Dynamic Environments: During query execution, dataflow scheduling can be adjusted based on the actual system load and data characteristics. This flexibility allows the system to dynamically optimize the performance of query execution, especially for large and complex queries, by better adapting to changing query demands and system conditions.
Gamma’s unique dataflow scheduling techniques allow data to flow naturally between operators, reducing the need for direct control over operations. This significantly lowers coordination overhead in multi-processor environments, enhances the system’s parallel processing capabilities, and improves the efficiency of executing complex queries.
In Gamma’s dataflow scheduling techniques, parallelism is extensively used to improve query execution efficiency. Here’s where and how parallelism is applied:
Parallel Execution of Operators: Queries often involve multiple operators (e.g., scan, filter, join, aggregation). With dataflow scheduling, these operators can run in parallel:
Scan and Filter in Parallel: While one processor scans a data block, another processor can be filtering the data from previous blocks.
Parallel Joins: If a join operation involves large datasets distributed across different nodes, Gamma can perform the join operation on these different parts of the data simultaneously. The result of the join is computed in parallel across multiple processors.
Data Partitioning for Parallelism: The relations (data tables) are often partitioned across multiple processors in Gamma. This means that different processors can work on different partitions of the data at the same time. For example:
Partitioned Hash Joins: Data can be split into “buckets” based on a hash function, and different processors can handle the join for different buckets simultaneously.
Parallel Aggregation: When computing aggregate functions (e.g., sum or average), each processor calculates a partial result for its own partition of the data, and these partial results are later combined.
In summary, parallelism in Gamma is achieved through:
Distributing query operators across multiple processors.
Partitioning data so different processors work on different sections simultaneously.
Enabling multiple stages of query execution (e.g., scanning, filtering, joining) to happen concurrently.
Questions
What is a fragment or a shard in Gamma?
A fragment or shard refers to a portion of a database relation that is horizontally partitioned across multiple disk drives.
How does a Gamma operator know where to send its stream of records?
There is a structure called split table to determine where each tuple should be sent, based on the values of tuples.
With interleaved declusttering, why not use a cluster size that includes all nodes in the system?
If an interleaved declustteing system includes all nodes, it will become more vulnerable to failures. The failure of any two nodes could make the data inaccessible. A smaller cluster will limits the risk of complete data unavailability and balance the load.
Hash-join is appropriate for processing equi-join predicates (Emp.dno = Dept.dno). How can Gamma process nonequi-join predicates (Emp.Sal > Dept.dno*1000) in a pipelined manner?
Range partitioning: Pre-partition the data based on ranges of values to reduce the search space.
Broadcast join: When the smaller relation is broadcasted to all nodes, and then each node evaluates the nonequi-join predicate in parallel.
Nested-loop join: Use a nested-loop join strategy where each tuple from one relation is compared against all tuples from the other relation.
What is the difference between Gamma, Google MapReduce, Microsoft Dryad and Apache Flink?
Aspect
Gamma
MapReduce
Dryad
Flink
Primary Use
Parallel database queries
Batch processing
Graph-based parallel computation
Stream and batch processing
Architecture
Shared-nothing, partitioned data
Cluster-based, distributed
DAG of tasks
Distributed, supports DAG
Data Model
Relational operations (SQL-like)
Key-value pairs
Data flow in DAG
Stream processing with state
Partitioning
Horizontal partitioning
Data split into chunks
Data partitioned across graph
Data partitioned into streams
Fault Tolerance
Limited
Checkpointing
Task-level recovery
State snapshots, exactly-once
Programming
Relational (SQL-style)
Functional (Map/Reduce)
Sequential tasks in DAG
Functional, stream APIs
Scalability
Hundreds of processors
Horizontally across many nodes
Scales with more nodes
Highly scalable, stream and batch
Use Case
Database query processing
Log processing, data aggregation
Scientific computing
Real-time analytics, event processing
Will a version of Gamma using FLOW be more modular than its current design?
Yes. FLOW enables more fine-grained control over the data flow and process interactions, which could simplify the addition of new operators and functionalities. It would also make the system easier to maintain and extend, as each component could be developed and optimized independently.