Skip to content

Commit f1c65eb

Browse files
committed
Sub-thread checkpoint opti
1 parent 77796d0 commit f1c65eb

File tree

5 files changed

+224
-116
lines changed

5 files changed

+224
-116
lines changed

apple/WCDB/abstract/handle.cpp

+21-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ const std::string Handle::backupSuffix("-backup");
3535

3636
static void GlobalLog(void *userInfo, int code, const char *message)
3737
{
38-
Error::ReportSQLiteGlobal(code, message?message:"", nullptr);
38+
Error::ReportSQLiteGlobal(code, message ? message : "", nullptr);
3939
}
4040

4141
const auto UNUSED_UNIQUE_ID = []() {
@@ -260,6 +260,26 @@ bool Handle::setCipherKey(const void *data, int size)
260260
#endif //SQLITE_HAS_CODEC
261261
}
262262

263+
void Handle::registerCommitedHook(const CommitedHook &onCommited, void *info)
264+
{
265+
m_commitedHookInfo.onCommited = onCommited;
266+
m_commitedHookInfo.info = info;
267+
m_commitedHookInfo.handle = this;
268+
if (m_commitedHookInfo.onCommited) {
269+
sqlite3_wal_hook(
270+
(sqlite3 *) m_handle,
271+
[](void *p, sqlite3 *, const char *, int pages) -> int {
272+
CommitedHookInfo *commitedHookInfo = (CommitedHookInfo *) p;
273+
commitedHookInfo->onCommited(commitedHookInfo->handle, pages,
274+
commitedHookInfo->info);
275+
return SQLITE_OK;
276+
},
277+
&m_commitedHookInfo);
278+
} else {
279+
sqlite3_wal_hook((sqlite3 *) m_handle, nullptr, nullptr);
280+
}
281+
}
282+
263283
std::string Handle::getBackupPath() const
264284
{
265285
return path + backupSuffix;

apple/WCDB/abstract/handle.hpp

+11
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ typedef std::function<void(
3737
Tag, const std::map<std::string, unsigned int> &, const int64_t &)>
3838
Trace;
3939

40+
typedef std::function<void(Handle *, int, void *)> CommitedHook;
41+
4042
class Handle {
4143
public:
4244
Handle(const std::string &path);
@@ -75,6 +77,8 @@ class Handle {
7577

7678
const Error &getError() const;
7779

80+
void registerCommitedHook(const CommitedHook &onCommited, void *info);
81+
7882
static const std::string backupSuffix;
7983

8084
protected:
@@ -88,6 +92,13 @@ class Handle {
8892
void addTrace(const std::string &sql, const int64_t &cost);
8993
bool shouldAggregation() const;
9094

95+
typedef struct {
96+
CommitedHook onCommited;
97+
void *info;
98+
Handle *handle;
99+
} CommitedHookInfo;
100+
CommitedHookInfo m_commitedHookInfo;
101+
91102
Trace m_trace;
92103
std::map<std::string, unsigned int> m_footprint;
93104
int64_t m_cost;

apple/WCDB/core/database.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class Database : public CoreBase {
5454
static const std::string defaultConfigName;
5555
static const std::string defaultCipherConfigName;
5656
static const std::string defaultTraceConfigName;
57+
static const std::string defaultCheckpointConfigName;
5758
static const Configs defaultConfigs;
5859
void setConfig(const std::string &name,
5960
const Config &config,

apple/WCDB/core/database_config.cpp

+190-114
Original file line numberDiff line numberDiff line change
@@ -22,130 +22,206 @@
2222
#include <WCDB/handle_statement.hpp>
2323
#include <WCDB/macro.hpp>
2424
#include <WCDB/utility.hpp>
25+
#include <queue>
2526
#include <sqlcipher/sqlite3.h>
27+
#include <thread>
2628
#include <vector>
2729

2830
namespace WCDB {
2931

3032
const std::string Database::defaultConfigName = "default";
3133
const std::string Database::defaultCipherConfigName = "cipher";
3234
const std::string Database::defaultTraceConfigName = "trace";
35+
const std::string Database::defaultCheckpointConfigName = "checkpoint";
3336
std::shared_ptr<Trace> Database::s_globalTrace = nullptr;
3437

35-
const Configs Database::defaultConfigs({
36-
{
37-
Database::defaultTraceConfigName,
38-
[](std::shared_ptr<Handle> &handle, Error &error) -> bool {
39-
std::shared_ptr<Trace> trace = s_globalTrace;
40-
if (trace) {
41-
handle->setTrace(*trace.get());
42-
}
43-
return true;
44-
},
45-
0,
46-
},
47-
{
48-
Database::defaultCipherConfigName,
49-
[](std::shared_ptr<Handle> &handle, Error &error) -> bool {
50-
//place holder
51-
return true;
52-
},
53-
1,
54-
},
55-
{
56-
Database::defaultConfigName,
57-
[](std::shared_ptr<Handle> &handle, Error &error) -> bool {
58-
59-
//Locking Mode
60-
{
61-
static const StatementPragma s_getLockingMode =
62-
StatementPragma().pragma(Pragma::LockingMode);
63-
static const StatementPragma s_setLockingModeNormal =
64-
StatementPragma().pragma(Pragma::LockingMode, "NORMAL");
65-
66-
//Get Locking Mode
67-
std::shared_ptr<StatementHandle> statementHandle =
68-
handle->prepare(s_getLockingMode);
69-
if (!statementHandle) {
70-
error = handle->getError();
71-
return false;
72-
}
73-
statementHandle->step();
74-
if (!statementHandle->isOK()) {
75-
error = statementHandle->getError();
76-
return false;
77-
}
78-
std::string lockingMode =
79-
statementHandle->getValue<WCDB::ColumnType::Text>(0);
80-
statementHandle->finalize();
81-
82-
//Set Locking Mode
83-
if (strcasecmp(lockingMode.c_str(), "NORMAL") != 0 &&
84-
!handle->exec(s_setLockingModeNormal)) {
85-
error = handle->getError();
86-
return false;
87-
}
88-
}
89-
90-
//Journal Mode
91-
{
92-
static const StatementPragma s_getJournalMode =
93-
StatementPragma().pragma(Pragma::JournalMode);
94-
static const StatementPragma s_setJournalModeWAL =
95-
StatementPragma().pragma(Pragma::JournalMode, "WAL");
96-
97-
//Get Journal Mode
98-
std::shared_ptr<StatementHandle> statementHandle =
99-
handle->prepare(s_getJournalMode);
100-
if (!statementHandle) {
101-
error = handle->getError();
102-
return false;
103-
}
104-
statementHandle->step();
105-
if (!statementHandle->isOK()) {
106-
error = statementHandle->getError();
107-
return false;
108-
}
109-
std::string journalMode =
110-
statementHandle->getValue<WCDB::ColumnType::Text>(0);
111-
statementHandle->finalize();
112-
113-
//Set Journal Mode
114-
if (strcasecmp(journalMode.c_str(), "WAL") != 0 &&
115-
!handle->exec(s_setJournalModeWAL)) {
116-
error = handle->getError();
117-
return false;
118-
}
119-
}
120-
121-
//Synchronous
122-
{
123-
static const StatementPragma s_setSynchronousFull =
124-
StatementPragma().pragma(Pragma::Synchronous, "FULL");
125-
126-
if (!handle->exec(s_setSynchronousFull)) {
127-
error = handle->getError();
128-
return false;
129-
}
130-
}
131-
132-
//Fullfsync
133-
{
134-
static const StatementPragma s_setFullFsync =
135-
StatementPragma().pragma(Pragma::Fullfsync, "ON");
136-
137-
if (!handle->exec(s_setFullFsync)) {
138-
error = handle->getError();
139-
return false;
140-
}
141-
}
142-
143-
error.reset();
144-
return true;
145-
},
146-
2,
147-
},
148-
});
38+
const Configs Database::defaultConfigs(
39+
{{
40+
Database::defaultTraceConfigName,
41+
[](std::shared_ptr<Handle> &handle, Error &error) -> bool {
42+
std::shared_ptr<Trace> trace = s_globalTrace;
43+
if (trace) {
44+
handle->setTrace(*trace.get());
45+
}
46+
return true;
47+
},
48+
0,
49+
},
50+
{
51+
Database::defaultCipherConfigName,
52+
[](std::shared_ptr<Handle> &handle, Error &error) -> bool {
53+
//place holder
54+
return true;
55+
},
56+
1,
57+
},
58+
{
59+
Database::defaultConfigName,
60+
[](std::shared_ptr<Handle> &handle, Error &error) -> bool {
61+
62+
//Locking Mode
63+
{
64+
static const StatementPragma s_getLockingMode =
65+
StatementPragma().pragma(Pragma::LockingMode);
66+
static const StatementPragma s_setLockingModeNormal =
67+
StatementPragma().pragma(Pragma::LockingMode, "NORMAL");
68+
69+
//Get Locking Mode
70+
std::shared_ptr<StatementHandle> statementHandle =
71+
handle->prepare(s_getLockingMode);
72+
if (!statementHandle) {
73+
error = handle->getError();
74+
return false;
75+
}
76+
statementHandle->step();
77+
if (!statementHandle->isOK()) {
78+
error = statementHandle->getError();
79+
return false;
80+
}
81+
std::string lockingMode =
82+
statementHandle->getValue<WCDB::ColumnType::Text>(0);
83+
statementHandle->finalize();
84+
85+
//Set Locking Mode
86+
if (strcasecmp(lockingMode.c_str(), "NORMAL") != 0 &&
87+
!handle->exec(s_setLockingModeNormal)) {
88+
error = handle->getError();
89+
return false;
90+
}
91+
}
92+
93+
//Journal Mode
94+
{
95+
static const StatementPragma s_getJournalMode =
96+
StatementPragma().pragma(Pragma::JournalMode);
97+
static const StatementPragma s_setJournalModeWAL =
98+
StatementPragma().pragma(Pragma::JournalMode, "WAL");
99+
100+
//Get Journal Mode
101+
std::shared_ptr<StatementHandle> statementHandle =
102+
handle->prepare(s_getJournalMode);
103+
if (!statementHandle) {
104+
error = handle->getError();
105+
return false;
106+
}
107+
statementHandle->step();
108+
if (!statementHandle->isOK()) {
109+
error = statementHandle->getError();
110+
return false;
111+
}
112+
std::string journalMode =
113+
statementHandle->getValue<WCDB::ColumnType::Text>(0);
114+
statementHandle->finalize();
115+
116+
//Set Journal Mode
117+
if (strcasecmp(journalMode.c_str(), "WAL") != 0 &&
118+
!handle->exec(s_setJournalModeWAL)) {
119+
error = handle->getError();
120+
return false;
121+
}
122+
}
123+
124+
//Synchronous
125+
{
126+
static const StatementPragma s_setSynchronousFull =
127+
StatementPragma().pragma(Pragma::Synchronous, "FULL");
128+
129+
if (!handle->exec(s_setSynchronousFull)) {
130+
error = handle->getError();
131+
return false;
132+
}
133+
}
134+
135+
//Fullfsync
136+
{
137+
static const StatementPragma s_setFullFsync =
138+
StatementPragma().pragma(Pragma::Fullfsync, "ON");
139+
140+
if (!handle->exec(s_setFullFsync)) {
141+
error = handle->getError();
142+
return false;
143+
}
144+
}
145+
146+
error.reset();
147+
return true;
148+
},
149+
2,
150+
},
151+
{
152+
Database::defaultCheckpointConfigName,
153+
[](std::shared_ptr<Handle> &handle, Error &error) -> bool {
154+
155+
static std::unordered_map<std::string, int> s_checkpointStep;
156+
static std::mutex s_checkpointStepMutex;
157+
s_checkpointStepMutex.lock();
158+
auto iter = s_checkpointStep.find(handle->path);
159+
if (iter == s_checkpointStep.end()) {
160+
iter = s_checkpointStep.insert({handle->path, 1000}).first;
161+
}
162+
int *checkpointStepPointer = &iter->second;
163+
s_checkpointStepMutex.unlock();
164+
165+
handle->registerCommitedHook(
166+
[](Handle *handle, int pages, void *p) {
167+
//Since sqlite can't write concurrently, checkpointStepPointer does not need a mutex.
168+
int *checkpointStepPointer = (int *) p;
169+
if (*checkpointStepPointer != 0) {
170+
if (pages > *checkpointStepPointer) {
171+
*checkpointStepPointer = 0;
172+
if (pthread_main_np() != 0) {
173+
//dispatch checkpoint op to sub-thread
174+
static std::queue<std::string> s_toCheckpoint;
175+
static std::condition_variable s_cond;
176+
static std::mutex s_mutex;
177+
static std::thread s_checkpointThread([]() {
178+
pthread_setname_np(
179+
("WCDB-" +
180+
Database::defaultCheckpointConfigName)
181+
.c_str());
182+
while (true) {
183+
std::string path;
184+
{
185+
std::unique_lock<std::mutex>
186+
lockGuard(s_mutex);
187+
if (s_toCheckpoint.empty()) {
188+
s_cond.wait(lockGuard);
189+
continue;
190+
}
191+
path = s_toCheckpoint.front();
192+
s_toCheckpoint.pop();
193+
}
194+
Database database(path);
195+
WCDB::Error innerError;
196+
database.exec(
197+
StatementPragma().pragma(
198+
Pragma::WalCheckpoint),
199+
innerError);
200+
}
201+
});
202+
static std::once_flag s_flag;
203+
std::call_once(s_flag, []() {
204+
s_checkpointThread.detach();
205+
});
206+
207+
std::unique_lock<std::mutex> lockGuard(
208+
s_mutex);
209+
s_toCheckpoint.push(handle->path);
210+
s_cond.notify_one();
211+
} else {
212+
handle->exec(StatementPragma().pragma(
213+
Pragma::WalCheckpoint));
214+
}
215+
}
216+
} else {
217+
*checkpointStepPointer = pages + 1000;
218+
}
219+
},
220+
checkpointStepPointer);
221+
return true;
222+
},
223+
3,
224+
}});
149225

150226
void Database::setConfig(const std::string &name,
151227
const Config &config,

0 commit comments

Comments
 (0)