Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): Support background task priority #4804

Open
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

v0y4g3r
Copy link
Contributor

@v0y4g3r v0y4g3r commented Oct 8, 2024

I hereby agree to the terms of the GreptimeDB CLA.

Refer to a related PR or issue link (optional)

What's changed and what's your intention?

This PR adds priorities to background tasks like flush/compaction/file purge.

In the current local scheduler implementation, the scheduler spawns coroutines to poll background tasks from a FIFO queue, waiting for each job to finish before polling the next one. During heavy insert workloads, flush jobs can be blocked by compaction jobs unless we assign priorities to background tasks.

This PR introduces an asynchronous priority channel with two levels: High and Low. It includes an optional deadline for low-priority jobs. The channel first attempts to poll jobs from the high-priority queue, switching to the low-priority queue only if the high-priority queue is empty. If a job at the head of the low-priority queue reaches its deadline, it will be polled immediately. Although jobs in low-priority channel have deadline, all jobs enqueued still follow the FIFO convention.

Currently, flush jobs are given high priority, while compaction and file purge jobs are assigned low priority. The deadline for compaction tasks are set to 60 seconds.

Checklist

  • I have written the necessary rustdoc comments.
  • I have added the necessary unit tests and integration tests.
  • This PR requires documentation updates.

Summary by CodeRabbit

  • New Features

    • Introduced a new Job struct for enhanced job scheduling, allowing for clearer organization of tasks.
    • Added a Priority enum to manage job priorities effectively.
    • Implemented new asynchronous message passing channels with priority handling.
    • Added a public static reference for tracking elapsed time of scheduler tasks.
  • Bug Fixes

    • Updated error handling with a new SendToChannel variant for better context on message sending issues.
  • Documentation

    • Improved comments and formatting for better readability across various modules.
  • Tests

    • Adjusted test cases to align with the new job scheduling structure.

 - Introduced `Job` struct to encapsulate job type and task.
 - Updated `CompactionScheduler`, `RegionFlushTask`, and `FilePurger` to use `Job` struct.
 - Added `SCHEDULER_TASK_ELAPSED` metric to track task execution time.
…EDULER_TASK_ELAPSED metric in LocalScheduler
 • Added rate_limit dependency to Cargo.lock and Cargo.toml.
 • Introduced SendToChannel error variant in error.rs.
 • Implemented priority-based scheduling in schedule.rs with a new priority.rs module.
 • Added tests for high and low priority message handling, including deadline-based scheduling.
 - Introduced `try_send_high` and `try_send_low` methods in `Sender` for priority-based job sending.
 - Implemented `Clone` trait for `Receiver` to support cloning of receivers.
 - Updated `LocalScheduler` to use `priority::Sender` and `priority::unbounded` for job scheduling.
 Add Priority handling to Job scheduling and update channels in priority module

 - Implement Priority enum with High and Low variants for job scheduling
 - Update Job struct to include priority field and modify constructors
 - Modify LocalScheduler to handle job priority when scheduling
 - Adjust Sender and Receiver in priority module to support priority with deadlines
 - Add new tests for multi-threaded receiving with priority handling
 - Update compaction and flush tasks to use new Priority in Job creation
@v0y4g3r v0y4g3r requested review from evenyag, waynexia and a team as code owners October 8, 2024 23:00
@github-actions github-actions bot added the docs-not-required This change does not impact docs. label Oct 8, 2024
Copy link

codecov bot commented Oct 8, 2024

Codecov Report

Attention: Patch coverage is 93.70277% with 25 lines in your changes missing coverage. Please review.

Project coverage is 84.18%. Comparing base (5f0a83b) to head (b68a73a).
Report is 4 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #4804      +/-   ##
==========================================
- Coverage   84.48%   84.18%   -0.31%     
==========================================
  Files        1120     1126       +6     
  Lines      204412   205208     +796     
==========================================
+ Hits       172707   172763      +56     
- Misses      31705    32445     +740     

@v0y4g3r
Copy link
Contributor Author

v0y4g3r commented Oct 8, 2024

@coderabbitai full review

Copy link
Contributor

coderabbitai bot commented Oct 8, 2024

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

coderabbitai bot commented Oct 8, 2024

Walkthrough

The changes primarily focus on enhancing the job scheduling system within the codebase. Key modifications include the introduction of a Job struct that encapsulates job details such as name and priority, replacing direct method calls for scheduling tasks. The Error enum has been expanded with a new variant for handling channel errors. Additionally, new functionalities for asynchronous message passing with varying priorities have been implemented. The overall structure of several modules has been updated to accommodate these enhancements, improving the organization and clarity of the scheduling process.

Changes

File Path Change Summary
src/mito2/src/compaction.rs Updated CompactionScheduler to use Job struct in schedule_compaction_request. Updated import statement to include Job and Priority.
src/mito2/src/engine/compaction_test.rs Added Job import and modified test_readonly_during_compaction to use Job::new_test.
src/mito2/src/error.rs Added new SendToChannel variant to Error enum for handling message-sending errors. Updated ErrorExt trait to include this new variant.
src/mito2/src/flush.rs Modified FlushScheduler to include Priority in Job creation within into_flush_job. Added import for Priority.
src/mito2/src/metrics.rs Introduced a new public static reference SCHEDULER_TASK_ELAPSED for tracking elapsed time of scheduler tasks.
src/mito2/src/schedule.rs Added new module priority with #[allow(dead_code)].
src/mito2/src/schedule/priority.rs Introduced bounded and unbounded channel implementations for async message passing with Sender and Receiver structs. Added methods for high and low priority messages.
src/mito2/src/schedule/scheduler.rs Added Priority enum and Job struct. Updated LocalScheduler to use priority::Sender<Job>. Modified schedule method to handle job priorities.
src/mito2/src/sst/file_purger.rs Updated LocalFilePurger to use Job struct for scheduling file deletions. Changed FileType from Puffin to Parquet in cache removal logic.

Poem

In the land of code where rabbits play,
Jobs now hop in a clearer way.
With priorities set, they leap and bound,
Asynchronous tasks in harmony found.
A sprinkle of errors, a dash of delight,
Scheduling's magic shines ever so bright! 🐇✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

coderabbitai bot commented Oct 8, 2024

✅ Actions performed

Full review triggered.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🧹 Outside diff range and nitpick comments (7)
src/mito2/src/engine/compaction_test.rs (1)

32-32: Summary: Minimal changes to support new job scheduling system.

The modifications in this file are part of the larger refactoring to support task prioritization, as outlined in the PR objectives. The changes are minimal and focused, updating only the import statement and the scheduling mechanism in the test_readonly_during_compaction function.

While these changes align well with the PR goals and don't alter the test's logic, it's important to consider the following:

  1. Ensure that all tests in this file still pass after these changes.
  2. Verify that the timing and behavior of the tests, especially test_readonly_during_compaction, remain consistent with the previous implementation.
  3. Consider adding new tests specifically for the priority scheduling system, if not already done in other files.
  4. Update the documentation to reflect the new usage of Job in scheduling tasks, particularly noting any differences between production and test environments.

These changes seem to be part of a broader architectural shift towards a more flexible job scheduling system. As you continue this refactoring:

  1. Ensure consistent usage of the new Job struct across all relevant parts of the codebase.
  2. Consider adding integration tests that specifically verify the priority scheduling behavior in various scenarios.
  3. Update any relevant documentation or comments to explain the new job scheduling system and its impact on the engine's behavior.

Also applies to: 319-321

src/mito2/src/metrics.rs (1)

53-60: LGTM! Consider adding a brief comment for clarity.

The new SCHEDULER_TASK_ELAPSED metric is well-implemented and consistent with the existing metrics. It will provide valuable insights into the performance of different scheduler tasks.

Consider adding a brief comment above the metric declaration to explain its purpose and the significance of the TYPE_LABEL, similar to comments for other metrics in this file. For example:

/// Histogram of elapsed time for different types of scheduler tasks.
pub static ref SCHEDULER_TASK_ELAPSED: HistogramVec = register_histogram_vec!(
    // ... (rest of the code remains the same)
src/mito2/src/schedule/scheduler.rs (3)

40-47: Consider renaming r#type field to avoid using a raw identifier

Using r#type as a field name requires the use of a raw identifier because type is a reserved keyword in Rust. To improve readability and adhere to Rust naming conventions, consider renaming this field to job_type or task_type.

Apply this diff to rename the field:

 pub struct Job {
-    r#type: &'static str,
+    job_type: &'static str,
     priority: Priority,
     task: Pin<Box<dyn Future<Output = ()> + Send>>,
 }

Remember to update all usages of r#type in the codebase accordingly.


162-171: Enhance error handling with more informative messages

Currently, errors from try_send_high and try_send_low are mapped to InvalidSenderSnafu without additional context. Consider including details about the job's priority or type in the error to aid in debugging when a job fails to be scheduled.

For example, you could modify the error handling as follows:

 match job.priority {
     Priority::High => sender
         .try_send_high(job)
-        .map_err(|_| InvalidSenderSnafu {}.build()),
+        .map_err(|_| InvalidSenderSnafu {
+            description: format!("Failed to schedule high-priority job: {}", job.job_type),
+        }
+        .build()),
     Priority::Low(deadline) => sender
         .try_send_low(job, deadline)
-        .map_err(|_| InvalidSenderSnafu {}.build()),
+        .map_err(|_| InvalidSenderSnafu {
+            description: format!("Failed to schedule low-priority job: {}", job.job_type),
+        }
+        .build()),
 }

Make sure to update the InvalidSenderSnafu error variant to accept a description field.


Line range hint 227-300: Add test cases to verify priority scheduling logic

The current tests do not cover scenarios where both high and low priority jobs are scheduled together. To ensure that the scheduler correctly prioritizes high-priority jobs and respects deadlines for low-priority jobs, consider adding test cases that:

  • Schedule a mix of high and low priority jobs.
  • Verify that high-priority jobs execute before low-priority ones.
  • Test the behavior of low-priority jobs with deadlines.

For example, you could add a test like this:

#[tokio::test]
async fn test_priority_scheduling() {
    let scheduler = LocalScheduler::new(1);

    let execution_order = Arc::new(Mutex::new(Vec::new()));

    // Low priority job without deadline
    let order_clone = execution_order.clone();
    scheduler.schedule(Job::new(
        "low_no_deadline",
        Priority::Low(None),
        Box::pin(async move {
            order_clone.lock().await.push("low_no_deadline");
        }),
    )).unwrap();

    // High priority job
    let order_clone = execution_order.clone();
    scheduler.schedule(Job::new(
        "high",
        Priority::High,
        Box::pin(async move {
            order_clone.lock().await.push("high");
        }),
    )).unwrap();

    // Low priority job with deadline in the past
    let order_clone = execution_order.clone();
    scheduler.schedule(Job::new(
        "low_with_past_deadline",
        Priority::Low(Some(Duration::from_secs(0))),
        Box::pin(async move {
            order_clone.lock().await.push("low_with_past_deadline");
        }),
    )).unwrap();

    scheduler.stop(true).await.unwrap();

    let order = execution_order.lock().await.clone();
    assert_eq!(
        order,
        vec!["high", "low_with_past_deadline", "low_no_deadline"],
        "Jobs did not execute in expected order"
    );
}

This test schedules three jobs and checks that they are executed in the correct order according to their priorities and deadlines.

src/mito2/src/schedule/priority.rs (1)

114-114: Correct grammatical error in documentation comment

In the documentation comment for the into_stream method, there's a minor grammatical error. The word "a" should be replaced with "an" before a vowel sound.

-/// Converts the receiver into a async stream.
+/// Converts the receiver into an async stream.
src/mito2/src/flush.rs (1)

263-269: Consider adding unit tests for priority scheduling

To verify that flush jobs are correctly prioritized over other tasks, consider adding unit tests that simulate scheduling of different job types with various priorities.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 5f0a83b and 0b664a7.

📒 Files selected for processing (9)
  • src/mito2/src/compaction.rs (2 hunks)
  • src/mito2/src/engine/compaction_test.rs (2 hunks)
  • src/mito2/src/error.rs (2 hunks)
  • src/mito2/src/flush.rs (2 hunks)
  • src/mito2/src/metrics.rs (1 hunks)
  • src/mito2/src/schedule.rs (1 hunks)
  • src/mito2/src/schedule/priority.rs (1 hunks)
  • src/mito2/src/schedule/scheduler.rs (8 hunks)
  • src/mito2/src/sst/file_purger.rs (2 hunks)
✅ Files skipped from review due to trivial changes (1)
  • src/mito2/src/schedule.rs
🧰 Additional context used
🔇 Additional comments (14)
src/mito2/src/engine/compaction_test.rs (2)

32-32: New import added for Job struct.

The addition of use crate::schedule::scheduler::Job; suggests that the Job struct is now being used in this test file. This change is consistent with the modification in the test_readonly_during_compaction function.


319-321: Approved: Updated scheduling to use Job struct.

The modification to use Job::new_test aligns with the new priority-based scheduling system mentioned in the PR objectives. This change doesn't alter the test logic but updates how the task is scheduled.

To ensure the new job scheduling system doesn't impact the test's behavior, please verify:

  1. The test still passes and behaves as expected.
  2. The timing and execution order of tasks remain consistent with the previous implementation.

You may want to run this test multiple times to check for any potential race conditions or timing issues introduced by the new scheduling system.

src/mito2/src/metrics.rs (1)

Line range hint 1-60: The new metric aligns well with the PR objectives.

The addition of the SCHEDULER_TASK_ELAPSED metric is a valuable enhancement to the metrics system. It will provide insights into the performance of scheduler tasks, which is crucial for the new priority system for background tasks introduced in this PR. This metric will help monitor and optimize the handling of high-priority tasks (like flush jobs) and low-priority tasks (like compaction and file purge jobs).

src/mito2/src/error.rs (2)

874-879: LGTM: New error variant added correctly.

The new SendToChannel variant is well-implemented and consistent with the existing error handling pattern in this enum. It properly captures the error message and location information.


1012-1012: LGTM: ErrorExt trait implementation updated correctly.

The SendToChannel variant is properly mapped to StatusCode::Internal, which is consistent with similar error types in this implementation. The placement in the match expression follows the existing pattern.

src/mito2/src/sst/file_purger.rs (5)

23-23: Importing scheduler components for priority-based task scheduling

The addition of Job, Priority, and SchedulerRef imports is appropriate to support the new priority-based scheduling mechanism for background tasks.


82-85: Scheduling file purge operations as low-priority jobs

Wrapping the file purge logic within a Job scheduled with Priority::Low(None) correctly aligns with the PR objective to process file purge tasks as low-priority background jobs.


86-94: Robust error handling during SST file deletion

The code properly handles errors when deleting the SST file by logging an error message on failure and an informational message on success, ensuring visibility into the operation's outcome.


96-98: Ensuring cache manager and write cache availability

The check for the presence of a cache manager and write cache before attempting cache removal operations prevents potential null pointer dereferences, enhancing the code's safety.


105-106: Verify the use of FileType::Puffin for inverted index cache removal

Ensure that FileType::Puffin is the correct file type for inverted index entries in the cache. If the file type for inverted indexes has changed elsewhere in the codebase, this usage may need to be updated.

To confirm the correct file type, please run the following script:

✅ Verification successful

FileType::Puffin correctly used for inverted index cache removal

The usage of FileType::Puffin in src/mito2/src/sst/file_purger.rs for inverted index cache removal is correct.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify that `FileType::Puffin` is used for inverted index cache entries.

# Search for cache removals of inverted index files and check the file type.
rg --type rust 'write_cache' -A 5 | rg 'inverted_index' -A 5 | rg 'FileType::Puffin'

Length of output: 163

src/mito2/src/schedule/scheduler.rs (1)

32-38: Definition of Priority enum is appropriate

The Priority enum is well-defined, allowing for high-priority jobs and low-priority jobs with optional deadlines. This enhances the scheduler's capability to handle tasks based on their urgency.

src/mito2/src/compaction.rs (1)

66-66: Updated imports to include Job and Priority

The import statement now includes Job and Priority, which are necessary for the new job scheduling functionality.

src/mito2/src/flush.rs (2)

44-44: Importing Priority enum is appropriate

The addition of the Priority import allows setting job priorities in the scheduler.


263-269: Flush jobs are now scheduled with high priority

Setting the priority of flush jobs to Priority::High ensures they are prioritized appropriately in the job queue.

src/mito2/src/sst/file_purger.rs Show resolved Hide resolved
src/mito2/src/sst/file_purger.rs Show resolved Hide resolved
src/mito2/src/sst/file_purger.rs Show resolved Hide resolved
src/mito2/src/schedule/priority.rs Outdated Show resolved Hide resolved
src/mito2/src/schedule/priority.rs Outdated Show resolved Hide resolved
src/mito2/src/schedule/priority.rs Outdated Show resolved Hide resolved
src/mito2/src/compaction.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Outside diff range and nitpick comments (4)
src/mito2/src/metrics.rs (1)

53-60: LGTM! Consider adding a brief comment for clarity.

The new SCHEDULER_TASK_ELAPSED metric is well-integrated and consistent with existing metrics. It follows the established naming conventions and uses appropriate types and labels.

Consider adding a brief comment above the metric to explain its purpose, similar to other metrics in this file. For example:

/// Elapsed time of tasks in scheduler, categorized by task type.
pub static ref SCHEDULER_TASK_ELAPSED: HistogramVec = register_histogram_vec!(
    // ... (rest of the code remains the same)
src/mito2/src/sst/file_purger.rs (1)

82-84: Consider specifying a deadline for the purge job's priority

Since low-priority jobs can have an optional deadline to ensure they are eventually executed, consider adding a deadline to Priority::Low for the file purge tasks if timely execution is desired.

src/mito2/src/schedule/priority.rs (2)

40-44: Add documentation comments to Sender<T> struct

To improve code readability and maintainability, consider adding Rust doc comments to the Sender<T> struct, explaining its purpose and usage.


96-99: Add documentation comments to Receiver<T> struct

For better documentation, add doc comments to the Receiver<T> struct to explain its role and how it should be used.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 5f0a83b and 0b664a7.

📒 Files selected for processing (9)
  • src/mito2/src/compaction.rs (2 hunks)
  • src/mito2/src/engine/compaction_test.rs (2 hunks)
  • src/mito2/src/error.rs (2 hunks)
  • src/mito2/src/flush.rs (2 hunks)
  • src/mito2/src/metrics.rs (1 hunks)
  • src/mito2/src/schedule.rs (1 hunks)
  • src/mito2/src/schedule/priority.rs (1 hunks)
  • src/mito2/src/schedule/scheduler.rs (8 hunks)
  • src/mito2/src/sst/file_purger.rs (2 hunks)
🧰 Additional context used
🔇 Additional comments (12)
src/mito2/src/schedule.rs (1)

15-16: New priority module added with #[allow(dead_code)] attribute

The addition of the priority module aligns with the PR objective of introducing a priority system for background tasks. However, the #[allow(dead_code)] attribute suggests that some code within this module might currently be unused.

Consider the following points:

  1. Ensure that the priority module is properly integrated with the existing scheduling system as development progresses.
  2. The #[allow(dead_code)] attribute is useful during development but should be removed once the module is fully implemented and integrated.
  3. As the PR objectives mention high and low priority tasks, verify that this module implements the described functionality for flush, compaction, and file purge jobs.

To verify the contents and integration of the new priority module, please run the following script:

This script will help us understand the implementation details of the priority module and how it's being used in the codebase.

✅ Verification successful

priority module is actively used and integrated

The priority module is implemented and utilized within scheduler.rs, indicating that it serves its intended purpose. The #[allow(dead_code)] attribute may no longer be necessary and can be removed to ensure all code is actively maintained and utilized.

  • Action Items:
    • Consider removing the #[allow(dead_code)] attribute from the priority module.
    • Ensure that all functionalities within the priority module are actively used and tested.
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the contents of the priority module and its integration

# Check the contents of the priority module
echo "Contents of the priority module:"
cat src/mito2/src/schedule/priority.rs

# Check for references to the priority module in other files
echo "References to the priority module in other files:"
rg "use (crate|super)::schedule::priority" src/mito2/src/

Length of output: 13781

src/mito2/src/engine/compaction_test.rs (2)

32-32: LGTM: Import statement added correctly.

The addition of the Job import from crate::schedule::scheduler is appropriate and aligns with its usage in the test_readonly_during_compaction function.


319-321: LGTM: Job scheduling updated, but clarification needed.

The change to use Job::new_test is consistent with the new import and likely part of a larger refactoring. However, I have a few questions:

  1. Can you provide more information about the Job::new_test method? Is it specifically designed for testing scenarios?
  2. Does this change affect how the job is prioritized or executed compared to the previous implementation?
  3. Are there any potential impacts on the test's behavior or assertions that we should be aware of?

To ensure this change doesn't introduce any unintended side effects, could you run the following verification script?

This will help us understand if this change is part of a broader refactoring across test files and if there are any inconsistencies in how job scheduling is handled in tests.

✅ Verification successful

Verified: Job scheduling change is localized.

The usage of Job::new_test is confined to compaction_test.rs, indicating it's a specific change for this test case. There are no other instances of Job::new_test or modifications to schedule across other test files, suggesting no broader impact on job scheduling or test behavior.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the usage of Job::new_test in test files

# Test: Search for other occurrences of Job::new_test in test files
rg --type rust -g '*_test.rs' 'Job::new_test'

# Test: Check if there are any other changes related to job scheduling in test files
rg --type rust -g '*_test.rs' '\bschedule\b'

Length of output: 357

src/mito2/src/error.rs (2)

874-879: LGTM: New SendToChannel variant added correctly.

The new SendToChannel variant has been added to the Error enum with appropriate fields and attributes. It follows the existing pattern and provides a clear error message for channel send failures.


1012-1012: LGTM: ErrorExt trait implementation updated correctly.

The SendToChannel variant has been properly added to the status_code method in the ErrorExt trait implementation. Associating it with StatusCode::Internal is appropriate for this type of error.

src/mito2/src/flush.rs (3)

44-44: New import added for Priority enum

The addition of Priority to the import statement is consistent with the changes made in the into_flush_job method. This import allows the use of the Priority enum in the file.


263-269: Approved: High priority assigned to flush jobs

The addition of Priority::High to the Job::new call in the into_flush_job method is a positive change. This modification aligns with the PR objective of introducing a priority system for background tasks. By assigning high priority to flush jobs, the system ensures that these critical operations are processed before lower-priority tasks like compaction, potentially improving overall system performance and responsiveness.


Line range hint 1-1006: Summary: Successful implementation of priority for flush jobs

The changes made to this file successfully implement the priority system for flush jobs as part of the larger goal of supporting background task priority. The modifications are minimal, focused, and consistent with the PR objectives. The high priority assigned to flush jobs should help improve system performance by ensuring these critical tasks are processed before lower-priority operations.

No further changes or improvements are necessary based on the current implementation.

src/mito2/src/sst/file_purger.rs (3)

23-23: Imports updated correctly

The new import statement correctly includes Job, Priority, and SchedulerRef, which are necessary for the job scheduling enhancements.


82-119: Encapsulation of purge logic into a scheduled job enhances maintainability

By wrapping the file purge operations within a Job and scheduling it, the code improves the organization, readability, and maintainability of asynchronous tasks.


86-94: Appropriate error handling and logging for SST deletion

The code correctly handles errors during the SST file deletion by logging them, ensuring that any issues are visible for debugging and monitoring purposes.

src/mito2/src/compaction.rs (1)

66-66: LGTM!

The import statement correctly includes Job and Priority from the scheduler module.

src/mito2/src/schedule/scheduler.rs Outdated Show resolved Hide resolved
src/mito2/src/schedule/scheduler.rs Show resolved Hide resolved
src/mito2/src/schedule/priority.rs Outdated Show resolved Hide resolved
src/mito2/src/schedule/priority.rs Outdated Show resolved Hide resolved
src/mito2/src/schedule/priority.rs Outdated Show resolved Hide resolved
src/mito2/src/compaction.rs Outdated Show resolved Hide resolved
…ods to use map_send_error function

 • Implement map_send_error function to standardize error mapping in Sender
 • Replace inline error mapping with map_send_error calls in Sender methods
 • Change Receiver's into_stream to return impl Stream instead of a boxed trait object
 • Add missing assert_eq! in tests to verify expected behavior
…rn an unboxed stream and update usage to box the stream where necessary.
src/mito2/src/schedule.rs Outdated Show resolved Hide resolved
src/mito2/src/compaction.rs Outdated Show resolved Hide resolved
src/mito2/src/schedule/priority.rs Show resolved Hide resolved
src/mito2/src/schedule/priority.rs Show resolved Hide resolved
loop {
select! {
biased;
Some(_) = maybe_timeout(&mut timer) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can simplify the channel to always execute a pending low priority task by a fixed interval or after executing N high priority task.

We can remove the deadline argument from the prority.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can simplify the channel to always execute a pending low priority task by a fixed interval or after executing N high priority task.

We can remove the deadline argument from the prority.

The initial implementation was like this, but the task waiting time will be indefinite.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the task waiting time will be indefinite.

If we send low-pri jobs in this order: None, deadline 5m, then the job with None can still block the job with deadline 5m longer than 5 minutes.

Copy link
Contributor Author

@v0y4g3r v0y4g3r Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the task waiting time will be indefinite.

If we send low-pri jobs in this order: None, deadline 5m, then the job with None can still block the job with deadline 5m longer than 5 minutes.

Yes, the enqueued jobs in either high or low priority channels are still guaranteed FIFO, deadline only affects low-priority jobs competing with high-priority jobs.

src/mito2/src/schedule/scheduler.rs Outdated Show resolved Hide resolved
src/mito2/src/schedule/scheduler.rs Show resolved Hide resolved
src/mito2/src/schedule/scheduler.rs Show resolved Hide resolved
…refactor priority channel send methods

 • Implement SCHEDULER_PENDING_JOBS metric to track pending jobs in the scheduler.
 • Remove bounded channel creation in priority.rs and add send_high and send_low async methods for testing.
 • Modify scheduler.rs to increment and decrement SCHEDULER_PENDING_JOBS metric upon job submission and completion.
 • Replace direct calls to send_high and send_low with try_send_high and try_send_low in tests.
 Improve handling of expired low-priority items and add test for low-priority overloading
src/mito2/src/schedule/scheduler.rs Outdated Show resolved Hide resolved
src/mito2/src/schedule/scheduler.rs Outdated Show resolved Hide resolved
src/mito2/src/schedule/priority.rs Show resolved Hide resolved
src/mito2/src/compaction.rs Outdated Show resolved Hide resolved
loop {
select! {
biased;
Some(_) = maybe_timeout(&mut timer) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the task waiting time will be indefinite.

If we send low-pri jobs in this order: None, deadline 5m, then the job with None can still block the job with deadline 5m longer than 5 minutes.

src/mito2/src/schedule/priority.rs Show resolved Hide resolved
src/mito2/src/compaction.rs Outdated Show resolved Hide resolved
…r and remove InvalidSender error

 • Remove InvalidSender error variant from Error enum.
 • Refactor Sender methods to use a single try_send method with a Priority parameter instead of separate try_send_high and try_send_low methods.
 • Update tests to use the new try_send method with Priority.
 • Add Clone and Copy traits to Priority enum.
 Introduce COMPACTION_TASK_DEADLINE constant for compaction task duration
… compactions to High

 • Implement manual compaction request handling with high priority scheduling.
 • Extend CompactionScheduler and related methods to accept a manual boolean flag to determine
   compaction priority.
 • Update unit tests to pass the manual flag as false for automated compaction scenarios.
 • Add Debug trait to Priority enum for improved logging.
Copy link
Contributor

@evenyag evenyag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a workaround before we refactor the scheduler.

src/mito2/src/schedule/priority.rs Show resolved Hide resolved
src/mito2/src/schedule/priority.rs Show resolved Hide resolved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs-not-required This change does not impact docs.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants