-
Notifications
You must be signed in to change notification settings - Fork 11.9k
[ISSUE #9254] Add CombineConsumeQueue to support CQ migration #9256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #9254] Add CombineConsumeQueue to support CQ migration #9256
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #9256 +/- ##
=============================================
+ Coverage 47.96% 48.05% +0.08%
- Complexity 11908 11982 +74
=============================================
Files 1307 1308 +1
Lines 92021 92166 +145
Branches 11775 11789 +14
=============================================
+ Hits 44139 44288 +149
+ Misses 42401 42375 -26
- Partials 5481 5503 +22 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
ad190f2
to
61ee2f4
Compare
dc014b5
to
a8996fa
Compare
da78405
to
59ee17a
Compare
store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
Outdated
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/metrics/RocksDBStoreMetricsManager.java
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
Show resolved
Hide resolved
@@ -480,4 +486,21 @@ public CqUnit nextAndRelease() { | |||
} | |||
} | |||
} | |||
|
|||
public void initializeWithOffset(long offset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used for mount --> main in container mode? If so, add some java docs here.
|
||
// update the max and min offset | ||
if (offset > 0) { | ||
this.consumeQueueStore.updateCqOffset(topic, queueId, 0L, offset - 1, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about use two methods for updating CQ offset for min and max offsets.
store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
f56cea9
to
5165c2f
Compare
final String storePath, | ||
final int mappedFileSize, | ||
final MessageStore messageStore, | ||
final ConsumeQueueStore consumeQueueStore) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This format looks a bit strange
store/src/main/java/org/apache/rocketmq/store/metrics/RocksDBStoreMetricsManager.java
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
|
||
currentReadStore = getInnerStoreByString(messageStoreConfig.getCombineCQPreferCQType()); | ||
if (currentReadStore == null) { | ||
log.error("CombineConsumeQueue choosePreferCQ fail, prefer={}", messageStoreConfig.getCombineCQPreferCQType()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
日志的风格,前后不太一致
store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM now.
Which Issue(s) This PR Fixes
Fixes #9254
Brief Description
How Did You Test This Change?