1 //! [Session Extension](https://sqlite.org/sessionintro.html)
2 #![allow(non_camel_case_types)]
3
4 use std::ffi::CStr;
5 use std::io::{Read, Write};
6 use std::marker::PhantomData;
7 use std::os::raw::{c_char, c_int, c_uchar, c_void};
8 use std::panic::{catch_unwind, RefUnwindSafe};
9 use std::ptr;
10 use std::slice::{from_raw_parts, from_raw_parts_mut};
11
12 use fallible_streaming_iterator::FallibleStreamingIterator;
13
14 use crate::error::{check, error_from_sqlite_code};
15 use crate::ffi;
16 use crate::hooks::Action;
17 use crate::types::ValueRef;
18 use crate::{errmsg_to_string, str_to_cstring, Connection, DatabaseName, Result};
19
20 // https://sqlite.org/session.html
21
22 type Filter = Option<Box<dyn Fn(&str) -> bool>>;
23
24 /// An instance of this object is a session that can be
25 /// used to record changes to a database.
26 pub struct Session<'conn> {
27 phantom: PhantomData<&'conn Connection>,
28 s: *mut ffi::sqlite3_session,
29 filter: Filter,
30 }
31
32 impl Session<'_> {
33 /// Create a new session object
34 #[inline]
new(db: &Connection) -> Result<Session<'_>>35 pub fn new(db: &Connection) -> Result<Session<'_>> {
36 Session::new_with_name(db, DatabaseName::Main)
37 }
38
39 /// Create a new session object
40 #[inline]
new_with_name<'conn>( db: &'conn Connection, name: DatabaseName<'_>, ) -> Result<Session<'conn>>41 pub fn new_with_name<'conn>(
42 db: &'conn Connection,
43 name: DatabaseName<'_>,
44 ) -> Result<Session<'conn>> {
45 let name = name.as_cstring()?;
46
47 let db = db.db.borrow_mut().db;
48
49 let mut s: *mut ffi::sqlite3_session = ptr::null_mut();
50 check(unsafe { ffi::sqlite3session_create(db, name.as_ptr(), &mut s) })?;
51
52 Ok(Session {
53 phantom: PhantomData,
54 s,
55 filter: None,
56 })
57 }
58
59 /// Set a table filter
table_filter<F>(&mut self, filter: Option<F>) where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,60 pub fn table_filter<F>(&mut self, filter: Option<F>)
61 where
62 F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
63 {
64 unsafe extern "C" fn call_boxed_closure<F>(
65 p_arg: *mut c_void,
66 tbl_str: *const c_char,
67 ) -> c_int
68 where
69 F: Fn(&str) -> bool + RefUnwindSafe,
70 {
71 use std::str;
72
73 let boxed_filter: *mut F = p_arg as *mut F;
74 let tbl_name = {
75 let c_slice = CStr::from_ptr(tbl_str).to_bytes();
76 str::from_utf8(c_slice)
77 };
78 c_int::from(
79 catch_unwind(|| (*boxed_filter)(tbl_name.expect("non-utf8 table name")))
80 .unwrap_or_default(),
81 )
82 }
83
84 match filter {
85 Some(filter) => {
86 let boxed_filter = Box::new(filter);
87 unsafe {
88 ffi::sqlite3session_table_filter(
89 self.s,
90 Some(call_boxed_closure::<F>),
91 &*boxed_filter as *const F as *mut _,
92 );
93 }
94 self.filter = Some(boxed_filter);
95 }
96 _ => {
97 unsafe { ffi::sqlite3session_table_filter(self.s, None, ptr::null_mut()) }
98 self.filter = None;
99 }
100 };
101 }
102
103 /// Attach a table. `None` means all tables.
attach(&mut self, table: Option<&str>) -> Result<()>104 pub fn attach(&mut self, table: Option<&str>) -> Result<()> {
105 let table = if let Some(table) = table {
106 Some(str_to_cstring(table)?)
107 } else {
108 None
109 };
110 let table = table.as_ref().map(|s| s.as_ptr()).unwrap_or(ptr::null());
111 check(unsafe { ffi::sqlite3session_attach(self.s, table) })
112 }
113
114 /// Generate a Changeset
changeset(&mut self) -> Result<Changeset>115 pub fn changeset(&mut self) -> Result<Changeset> {
116 let mut n = 0;
117 let mut cs: *mut c_void = ptr::null_mut();
118 check(unsafe { ffi::sqlite3session_changeset(self.s, &mut n, &mut cs) })?;
119 Ok(Changeset { cs, n })
120 }
121
122 /// Write the set of changes represented by this session to `output`.
123 #[inline]
changeset_strm(&mut self, output: &mut dyn Write) -> Result<()>124 pub fn changeset_strm(&mut self, output: &mut dyn Write) -> Result<()> {
125 let output_ref = &output;
126 check(unsafe {
127 ffi::sqlite3session_changeset_strm(
128 self.s,
129 Some(x_output),
130 output_ref as *const &mut dyn Write as *mut c_void,
131 )
132 })
133 }
134
135 /// Generate a Patchset
136 #[inline]
patchset(&mut self) -> Result<Changeset>137 pub fn patchset(&mut self) -> Result<Changeset> {
138 let mut n = 0;
139 let mut ps: *mut c_void = ptr::null_mut();
140 check(unsafe { ffi::sqlite3session_patchset(self.s, &mut n, &mut ps) })?;
141 // TODO Validate: same struct
142 Ok(Changeset { cs: ps, n })
143 }
144
145 /// Write the set of patches represented by this session to `output`.
146 #[inline]
patchset_strm(&mut self, output: &mut dyn Write) -> Result<()>147 pub fn patchset_strm(&mut self, output: &mut dyn Write) -> Result<()> {
148 let output_ref = &output;
149 check(unsafe {
150 ffi::sqlite3session_patchset_strm(
151 self.s,
152 Some(x_output),
153 output_ref as *const &mut dyn Write as *mut c_void,
154 )
155 })
156 }
157
158 /// Load the difference between tables.
diff(&mut self, from: DatabaseName<'_>, table: &str) -> Result<()>159 pub fn diff(&mut self, from: DatabaseName<'_>, table: &str) -> Result<()> {
160 let from = from.as_cstring()?;
161 let table = str_to_cstring(table)?;
162 let table = table.as_ptr();
163 unsafe {
164 let mut errmsg = ptr::null_mut();
165 let r =
166 ffi::sqlite3session_diff(self.s, from.as_ptr(), table, &mut errmsg as *mut *mut _);
167 if r != ffi::SQLITE_OK {
168 let errmsg: *mut c_char = errmsg;
169 let message = errmsg_to_string(&*errmsg);
170 ffi::sqlite3_free(errmsg as *mut c_void);
171 return Err(error_from_sqlite_code(r, Some(message)));
172 }
173 }
174 Ok(())
175 }
176
177 /// Test if a changeset has recorded any changes
178 #[inline]
is_empty(&self) -> bool179 pub fn is_empty(&self) -> bool {
180 unsafe { ffi::sqlite3session_isempty(self.s) != 0 }
181 }
182
183 /// Query the current state of the session
184 #[inline]
is_enabled(&self) -> bool185 pub fn is_enabled(&self) -> bool {
186 unsafe { ffi::sqlite3session_enable(self.s, -1) != 0 }
187 }
188
189 /// Enable or disable the recording of changes
190 #[inline]
set_enabled(&mut self, enabled: bool)191 pub fn set_enabled(&mut self, enabled: bool) {
192 unsafe {
193 ffi::sqlite3session_enable(self.s, c_int::from(enabled));
194 }
195 }
196
197 /// Query the current state of the indirect flag
198 #[inline]
is_indirect(&self) -> bool199 pub fn is_indirect(&self) -> bool {
200 unsafe { ffi::sqlite3session_indirect(self.s, -1) != 0 }
201 }
202
203 /// Set or clear the indirect change flag
204 #[inline]
set_indirect(&mut self, indirect: bool)205 pub fn set_indirect(&mut self, indirect: bool) {
206 unsafe {
207 ffi::sqlite3session_indirect(self.s, c_int::from(indirect));
208 }
209 }
210 }
211
212 impl Drop for Session<'_> {
213 #[inline]
drop(&mut self)214 fn drop(&mut self) {
215 if self.filter.is_some() {
216 self.table_filter(None::<fn(&str) -> bool>);
217 }
218 unsafe { ffi::sqlite3session_delete(self.s) };
219 }
220 }
221
222 /// Invert a changeset
223 #[inline]
invert_strm(input: &mut dyn Read, output: &mut dyn Write) -> Result<()>224 pub fn invert_strm(input: &mut dyn Read, output: &mut dyn Write) -> Result<()> {
225 let input_ref = &input;
226 let output_ref = &output;
227 check(unsafe {
228 ffi::sqlite3changeset_invert_strm(
229 Some(x_input),
230 input_ref as *const &mut dyn Read as *mut c_void,
231 Some(x_output),
232 output_ref as *const &mut dyn Write as *mut c_void,
233 )
234 })
235 }
236
237 /// Combine two changesets
238 #[inline]
concat_strm( input_a: &mut dyn Read, input_b: &mut dyn Read, output: &mut dyn Write, ) -> Result<()>239 pub fn concat_strm(
240 input_a: &mut dyn Read,
241 input_b: &mut dyn Read,
242 output: &mut dyn Write,
243 ) -> Result<()> {
244 let input_a_ref = &input_a;
245 let input_b_ref = &input_b;
246 let output_ref = &output;
247 check(unsafe {
248 ffi::sqlite3changeset_concat_strm(
249 Some(x_input),
250 input_a_ref as *const &mut dyn Read as *mut c_void,
251 Some(x_input),
252 input_b_ref as *const &mut dyn Read as *mut c_void,
253 Some(x_output),
254 output_ref as *const &mut dyn Write as *mut c_void,
255 )
256 })
257 }
258
259 /// Changeset or Patchset
260 pub struct Changeset {
261 cs: *mut c_void,
262 n: c_int,
263 }
264
265 impl Changeset {
266 /// Invert a changeset
267 #[inline]
invert(&self) -> Result<Changeset>268 pub fn invert(&self) -> Result<Changeset> {
269 let mut n = 0;
270 let mut cs = ptr::null_mut();
271 check(unsafe {
272 ffi::sqlite3changeset_invert(self.n, self.cs, &mut n, &mut cs as *mut *mut _)
273 })?;
274 Ok(Changeset { cs, n })
275 }
276
277 /// Create an iterator to traverse a changeset
278 #[inline]
iter(&self) -> Result<ChangesetIter<'_>>279 pub fn iter(&self) -> Result<ChangesetIter<'_>> {
280 let mut it = ptr::null_mut();
281 check(unsafe { ffi::sqlite3changeset_start(&mut it as *mut *mut _, self.n, self.cs) })?;
282 Ok(ChangesetIter {
283 phantom: PhantomData,
284 it,
285 item: None,
286 })
287 }
288
289 /// Concatenate two changeset objects
290 #[inline]
concat(a: &Changeset, b: &Changeset) -> Result<Changeset>291 pub fn concat(a: &Changeset, b: &Changeset) -> Result<Changeset> {
292 let mut n = 0;
293 let mut cs = ptr::null_mut();
294 check(unsafe {
295 ffi::sqlite3changeset_concat(a.n, a.cs, b.n, b.cs, &mut n, &mut cs as *mut *mut _)
296 })?;
297 Ok(Changeset { cs, n })
298 }
299 }
300
301 impl Drop for Changeset {
302 #[inline]
drop(&mut self)303 fn drop(&mut self) {
304 unsafe {
305 ffi::sqlite3_free(self.cs);
306 }
307 }
308 }
309
310 /// Cursor for iterating over the elements of a changeset
311 /// or patchset.
312 pub struct ChangesetIter<'changeset> {
313 phantom: PhantomData<&'changeset Changeset>,
314 it: *mut ffi::sqlite3_changeset_iter,
315 item: Option<ChangesetItem>,
316 }
317
318 impl ChangesetIter<'_> {
319 /// Create an iterator on `input`
320 #[inline]
start_strm<'input>(input: &&'input mut dyn Read) -> Result<ChangesetIter<'input>>321 pub fn start_strm<'input>(input: &&'input mut dyn Read) -> Result<ChangesetIter<'input>> {
322 let mut it = ptr::null_mut();
323 check(unsafe {
324 ffi::sqlite3changeset_start_strm(
325 &mut it as *mut *mut _,
326 Some(x_input),
327 input as *const &mut dyn Read as *mut c_void,
328 )
329 })?;
330 Ok(ChangesetIter {
331 phantom: PhantomData,
332 it,
333 item: None,
334 })
335 }
336 }
337
338 impl FallibleStreamingIterator for ChangesetIter<'_> {
339 type Error = crate::error::Error;
340 type Item = ChangesetItem;
341
342 #[inline]
advance(&mut self) -> Result<()>343 fn advance(&mut self) -> Result<()> {
344 let rc = unsafe { ffi::sqlite3changeset_next(self.it) };
345 match rc {
346 ffi::SQLITE_ROW => {
347 self.item = Some(ChangesetItem { it: self.it });
348 Ok(())
349 }
350 ffi::SQLITE_DONE => {
351 self.item = None;
352 Ok(())
353 }
354 code => Err(error_from_sqlite_code(code, None)),
355 }
356 }
357
358 #[inline]
get(&self) -> Option<&ChangesetItem>359 fn get(&self) -> Option<&ChangesetItem> {
360 self.item.as_ref()
361 }
362 }
363
364 /// Operation
365 pub struct Operation<'item> {
366 table_name: &'item str,
367 number_of_columns: i32,
368 code: Action,
369 indirect: bool,
370 }
371
372 impl Operation<'_> {
373 /// Returns the table name.
374 #[inline]
table_name(&self) -> &str375 pub fn table_name(&self) -> &str {
376 self.table_name
377 }
378
379 /// Returns the number of columns in table
380 #[inline]
number_of_columns(&self) -> i32381 pub fn number_of_columns(&self) -> i32 {
382 self.number_of_columns
383 }
384
385 /// Returns the action code.
386 #[inline]
code(&self) -> Action387 pub fn code(&self) -> Action {
388 self.code
389 }
390
391 /// Returns `true` for an 'indirect' change.
392 #[inline]
indirect(&self) -> bool393 pub fn indirect(&self) -> bool {
394 self.indirect
395 }
396 }
397
398 impl Drop for ChangesetIter<'_> {
399 #[inline]
drop(&mut self)400 fn drop(&mut self) {
401 unsafe {
402 ffi::sqlite3changeset_finalize(self.it);
403 }
404 }
405 }
406
407 /// An item passed to a conflict-handler by
408 /// [`Connection::apply`](crate::Connection::apply), or an item generated by
409 /// [`ChangesetIter::next`](ChangesetIter::next).
410 // TODO enum ? Delete, Insert, Update, ...
411 pub struct ChangesetItem {
412 it: *mut ffi::sqlite3_changeset_iter,
413 }
414
415 impl ChangesetItem {
416 /// Obtain conflicting row values
417 ///
418 /// May only be called with an `SQLITE_CHANGESET_DATA` or
419 /// `SQLITE_CHANGESET_CONFLICT` conflict handler callback.
420 #[inline]
conflict(&self, col: usize) -> Result<ValueRef<'_>>421 pub fn conflict(&self, col: usize) -> Result<ValueRef<'_>> {
422 unsafe {
423 let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut();
424 check(ffi::sqlite3changeset_conflict(
425 self.it,
426 col as i32,
427 &mut p_value,
428 ))?;
429 Ok(ValueRef::from_value(p_value))
430 }
431 }
432
433 /// Determine the number of foreign key constraint violations
434 ///
435 /// May only be called with an `SQLITE_CHANGESET_FOREIGN_KEY` conflict
436 /// handler callback.
437 #[inline]
fk_conflicts(&self) -> Result<i32>438 pub fn fk_conflicts(&self) -> Result<i32> {
439 unsafe {
440 let mut p_out = 0;
441 check(ffi::sqlite3changeset_fk_conflicts(self.it, &mut p_out))?;
442 Ok(p_out)
443 }
444 }
445
446 /// Obtain new.* Values
447 ///
448 /// May only be called if the type of change is either `SQLITE_UPDATE` or
449 /// `SQLITE_INSERT`.
450 #[inline]
new_value(&self, col: usize) -> Result<ValueRef<'_>>451 pub fn new_value(&self, col: usize) -> Result<ValueRef<'_>> {
452 unsafe {
453 let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut();
454 check(ffi::sqlite3changeset_new(self.it, col as i32, &mut p_value))?;
455 Ok(ValueRef::from_value(p_value))
456 }
457 }
458
459 /// Obtain old.* Values
460 ///
461 /// May only be called if the type of change is either `SQLITE_DELETE` or
462 /// `SQLITE_UPDATE`.
463 #[inline]
old_value(&self, col: usize) -> Result<ValueRef<'_>>464 pub fn old_value(&self, col: usize) -> Result<ValueRef<'_>> {
465 unsafe {
466 let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut();
467 check(ffi::sqlite3changeset_old(self.it, col as i32, &mut p_value))?;
468 Ok(ValueRef::from_value(p_value))
469 }
470 }
471
472 /// Obtain the current operation
473 #[inline]
op(&self) -> Result<Operation<'_>>474 pub fn op(&self) -> Result<Operation<'_>> {
475 let mut number_of_columns = 0;
476 let mut code = 0;
477 let mut indirect = 0;
478 let tab = unsafe {
479 let mut pz_tab: *const c_char = ptr::null();
480 check(ffi::sqlite3changeset_op(
481 self.it,
482 &mut pz_tab,
483 &mut number_of_columns,
484 &mut code,
485 &mut indirect,
486 ))?;
487 CStr::from_ptr(pz_tab)
488 };
489 let table_name = tab.to_str()?;
490 Ok(Operation {
491 table_name,
492 number_of_columns,
493 code: Action::from(code),
494 indirect: indirect != 0,
495 })
496 }
497
498 /// Obtain the primary key definition of a table
499 #[inline]
pk(&self) -> Result<&[u8]>500 pub fn pk(&self) -> Result<&[u8]> {
501 let mut number_of_columns = 0;
502 unsafe {
503 let mut pks: *mut c_uchar = ptr::null_mut();
504 check(ffi::sqlite3changeset_pk(
505 self.it,
506 &mut pks,
507 &mut number_of_columns,
508 ))?;
509 Ok(from_raw_parts(pks, number_of_columns as usize))
510 }
511 }
512 }
513
514 /// Used to combine two or more changesets or
515 /// patchsets
516 pub struct Changegroup {
517 cg: *mut ffi::sqlite3_changegroup,
518 }
519
520 impl Changegroup {
521 /// Create a new change group.
522 #[inline]
new() -> Result<Self>523 pub fn new() -> Result<Self> {
524 let mut cg = ptr::null_mut();
525 check(unsafe { ffi::sqlite3changegroup_new(&mut cg) })?;
526 Ok(Changegroup { cg })
527 }
528
529 /// Add a changeset
530 #[inline]
add(&mut self, cs: &Changeset) -> Result<()>531 pub fn add(&mut self, cs: &Changeset) -> Result<()> {
532 check(unsafe { ffi::sqlite3changegroup_add(self.cg, cs.n, cs.cs) })
533 }
534
535 /// Add a changeset read from `input` to this change group.
536 #[inline]
add_stream(&mut self, input: &mut dyn Read) -> Result<()>537 pub fn add_stream(&mut self, input: &mut dyn Read) -> Result<()> {
538 let input_ref = &input;
539 check(unsafe {
540 ffi::sqlite3changegroup_add_strm(
541 self.cg,
542 Some(x_input),
543 input_ref as *const &mut dyn Read as *mut c_void,
544 )
545 })
546 }
547
548 /// Obtain a composite Changeset
549 #[inline]
output(&mut self) -> Result<Changeset>550 pub fn output(&mut self) -> Result<Changeset> {
551 let mut n = 0;
552 let mut output: *mut c_void = ptr::null_mut();
553 check(unsafe { ffi::sqlite3changegroup_output(self.cg, &mut n, &mut output) })?;
554 Ok(Changeset { cs: output, n })
555 }
556
557 /// Write the combined set of changes to `output`.
558 #[inline]
output_strm(&mut self, output: &mut dyn Write) -> Result<()>559 pub fn output_strm(&mut self, output: &mut dyn Write) -> Result<()> {
560 let output_ref = &output;
561 check(unsafe {
562 ffi::sqlite3changegroup_output_strm(
563 self.cg,
564 Some(x_output),
565 output_ref as *const &mut dyn Write as *mut c_void,
566 )
567 })
568 }
569 }
570
571 impl Drop for Changegroup {
572 #[inline]
drop(&mut self)573 fn drop(&mut self) {
574 unsafe {
575 ffi::sqlite3changegroup_delete(self.cg);
576 }
577 }
578 }
579
580 impl Connection {
581 /// Apply a changeset to a database
apply<F, C>(&self, cs: &Changeset, filter: Option<F>, conflict: C) -> Result<()> where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,582 pub fn apply<F, C>(&self, cs: &Changeset, filter: Option<F>, conflict: C) -> Result<()>
583 where
584 F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
585 C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
586 {
587 let db = self.db.borrow_mut().db;
588
589 let filtered = filter.is_some();
590 let tuple = &mut (filter, conflict);
591 check(unsafe {
592 if filtered {
593 ffi::sqlite3changeset_apply(
594 db,
595 cs.n,
596 cs.cs,
597 Some(call_filter::<F, C>),
598 Some(call_conflict::<F, C>),
599 tuple as *mut (Option<F>, C) as *mut c_void,
600 )
601 } else {
602 ffi::sqlite3changeset_apply(
603 db,
604 cs.n,
605 cs.cs,
606 None,
607 Some(call_conflict::<F, C>),
608 tuple as *mut (Option<F>, C) as *mut c_void,
609 )
610 }
611 })
612 }
613
614 /// Apply a changeset to a database
apply_strm<F, C>( &self, input: &mut dyn Read, filter: Option<F>, conflict: C, ) -> Result<()> where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,615 pub fn apply_strm<F, C>(
616 &self,
617 input: &mut dyn Read,
618 filter: Option<F>,
619 conflict: C,
620 ) -> Result<()>
621 where
622 F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
623 C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
624 {
625 let input_ref = &input;
626 let db = self.db.borrow_mut().db;
627
628 let filtered = filter.is_some();
629 let tuple = &mut (filter, conflict);
630 check(unsafe {
631 if filtered {
632 ffi::sqlite3changeset_apply_strm(
633 db,
634 Some(x_input),
635 input_ref as *const &mut dyn Read as *mut c_void,
636 Some(call_filter::<F, C>),
637 Some(call_conflict::<F, C>),
638 tuple as *mut (Option<F>, C) as *mut c_void,
639 )
640 } else {
641 ffi::sqlite3changeset_apply_strm(
642 db,
643 Some(x_input),
644 input_ref as *const &mut dyn Read as *mut c_void,
645 None,
646 Some(call_conflict::<F, C>),
647 tuple as *mut (Option<F>, C) as *mut c_void,
648 )
649 }
650 })
651 }
652 }
653
654 /// Constants passed to the conflict handler
655 /// See [here](https://sqlite.org/session.html#SQLITE_CHANGESET_CONFLICT) for details.
656 #[allow(missing_docs)]
657 #[repr(i32)]
658 #[derive(Debug, PartialEq, Eq)]
659 #[non_exhaustive]
660 #[allow(clippy::upper_case_acronyms)]
661 pub enum ConflictType {
662 UNKNOWN = -1,
663 SQLITE_CHANGESET_DATA = ffi::SQLITE_CHANGESET_DATA,
664 SQLITE_CHANGESET_NOTFOUND = ffi::SQLITE_CHANGESET_NOTFOUND,
665 SQLITE_CHANGESET_CONFLICT = ffi::SQLITE_CHANGESET_CONFLICT,
666 SQLITE_CHANGESET_CONSTRAINT = ffi::SQLITE_CHANGESET_CONSTRAINT,
667 SQLITE_CHANGESET_FOREIGN_KEY = ffi::SQLITE_CHANGESET_FOREIGN_KEY,
668 }
669 impl From<i32> for ConflictType {
from(code: i32) -> ConflictType670 fn from(code: i32) -> ConflictType {
671 match code {
672 ffi::SQLITE_CHANGESET_DATA => ConflictType::SQLITE_CHANGESET_DATA,
673 ffi::SQLITE_CHANGESET_NOTFOUND => ConflictType::SQLITE_CHANGESET_NOTFOUND,
674 ffi::SQLITE_CHANGESET_CONFLICT => ConflictType::SQLITE_CHANGESET_CONFLICT,
675 ffi::SQLITE_CHANGESET_CONSTRAINT => ConflictType::SQLITE_CHANGESET_CONSTRAINT,
676 ffi::SQLITE_CHANGESET_FOREIGN_KEY => ConflictType::SQLITE_CHANGESET_FOREIGN_KEY,
677 _ => ConflictType::UNKNOWN,
678 }
679 }
680 }
681
682 /// Constants returned by the conflict handler
683 /// See [here](https://sqlite.org/session.html#SQLITE_CHANGESET_ABORT) for details.
684 #[allow(missing_docs)]
685 #[repr(i32)]
686 #[derive(Debug, PartialEq, Eq)]
687 #[non_exhaustive]
688 #[allow(clippy::upper_case_acronyms)]
689 pub enum ConflictAction {
690 SQLITE_CHANGESET_OMIT = ffi::SQLITE_CHANGESET_OMIT,
691 SQLITE_CHANGESET_REPLACE = ffi::SQLITE_CHANGESET_REPLACE,
692 SQLITE_CHANGESET_ABORT = ffi::SQLITE_CHANGESET_ABORT,
693 }
694
call_filter<F, C>(p_ctx: *mut c_void, tbl_str: *const c_char) -> c_int where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,695 unsafe extern "C" fn call_filter<F, C>(p_ctx: *mut c_void, tbl_str: *const c_char) -> c_int
696 where
697 F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
698 C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
699 {
700 use std::str;
701
702 let tuple: *mut (Option<F>, C) = p_ctx as *mut (Option<F>, C);
703 let tbl_name = {
704 let c_slice = CStr::from_ptr(tbl_str).to_bytes();
705 str::from_utf8(c_slice)
706 };
707 match *tuple {
708 (Some(ref filter), _) => c_int::from(
709 catch_unwind(|| filter(tbl_name.expect("illegal table name"))).unwrap_or_default(),
710 ),
711 _ => unimplemented!(),
712 }
713 }
714
call_conflict<F, C>( p_ctx: *mut c_void, e_conflict: c_int, p: *mut ffi::sqlite3_changeset_iter, ) -> c_int where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,715 unsafe extern "C" fn call_conflict<F, C>(
716 p_ctx: *mut c_void,
717 e_conflict: c_int,
718 p: *mut ffi::sqlite3_changeset_iter,
719 ) -> c_int
720 where
721 F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
722 C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
723 {
724 let tuple: *mut (Option<F>, C) = p_ctx as *mut (Option<F>, C);
725 let conflict_type = ConflictType::from(e_conflict);
726 let item = ChangesetItem { it: p };
727 if let Ok(action) = catch_unwind(|| (*tuple).1(conflict_type, item)) {
728 action as c_int
729 } else {
730 ffi::SQLITE_CHANGESET_ABORT
731 }
732 }
733
x_input(p_in: *mut c_void, data: *mut c_void, len: *mut c_int) -> c_int734 unsafe extern "C" fn x_input(p_in: *mut c_void, data: *mut c_void, len: *mut c_int) -> c_int {
735 if p_in.is_null() {
736 return ffi::SQLITE_MISUSE;
737 }
738 let bytes: &mut [u8] = from_raw_parts_mut(data as *mut u8, *len as usize);
739 let input = p_in as *mut &mut dyn Read;
740 match (*input).read(bytes) {
741 Ok(n) => {
742 *len = n as i32; // TODO Validate: n = 0 may not mean the reader will always no longer be able to
743 // produce bytes.
744 ffi::SQLITE_OK
745 }
746 Err(_) => ffi::SQLITE_IOERR_READ, // TODO check if err is a (ru)sqlite Error => propagate
747 }
748 }
749
x_output(p_out: *mut c_void, data: *const c_void, len: c_int) -> c_int750 unsafe extern "C" fn x_output(p_out: *mut c_void, data: *const c_void, len: c_int) -> c_int {
751 if p_out.is_null() {
752 return ffi::SQLITE_MISUSE;
753 }
754 // The sessions module never invokes an xOutput callback with the third
755 // parameter set to a value less than or equal to zero.
756 let bytes: &[u8] = from_raw_parts(data as *const u8, len as usize);
757 let output = p_out as *mut &mut dyn Write;
758 match (*output).write_all(bytes) {
759 Ok(_) => ffi::SQLITE_OK,
760 Err(_) => ffi::SQLITE_IOERR_WRITE, // TODO check if err is a (ru)sqlite Error => propagate
761 }
762 }
763
764 #[cfg(test)]
765 mod test {
766 use fallible_streaming_iterator::FallibleStreamingIterator;
767 use std::io::Read;
768 use std::sync::atomic::{AtomicBool, Ordering};
769
770 use super::{Changeset, ChangesetIter, ConflictAction, ConflictType, Session};
771 use crate::hooks::Action;
772 use crate::{Connection, Result};
773
one_changeset() -> Result<Changeset>774 fn one_changeset() -> Result<Changeset> {
775 let db = Connection::open_in_memory()?;
776 db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
777
778 let mut session = Session::new(&db)?;
779 assert!(session.is_empty());
780
781 session.attach(None)?;
782 db.execute("INSERT INTO foo (t) VALUES (?1);", ["bar"])?;
783
784 session.changeset()
785 }
786
one_changeset_strm() -> Result<Vec<u8>>787 fn one_changeset_strm() -> Result<Vec<u8>> {
788 let db = Connection::open_in_memory()?;
789 db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
790
791 let mut session = Session::new(&db)?;
792 assert!(session.is_empty());
793
794 session.attach(None)?;
795 db.execute("INSERT INTO foo (t) VALUES (?1);", ["bar"])?;
796
797 let mut output = Vec::new();
798 session.changeset_strm(&mut output)?;
799 Ok(output)
800 }
801
802 #[test]
test_changeset() -> Result<()>803 fn test_changeset() -> Result<()> {
804 let changeset = one_changeset()?;
805 let mut iter = changeset.iter()?;
806 let item = iter.next()?;
807 assert!(item.is_some());
808
809 let item = item.unwrap();
810 let op = item.op()?;
811 assert_eq!("foo", op.table_name());
812 assert_eq!(1, op.number_of_columns());
813 assert_eq!(Action::SQLITE_INSERT, op.code());
814 assert!(!op.indirect());
815
816 let pk = item.pk()?;
817 assert_eq!(&[1], pk);
818
819 let new_value = item.new_value(0)?;
820 assert_eq!(Ok("bar"), new_value.as_str());
821 Ok(())
822 }
823
824 #[test]
test_changeset_strm() -> Result<()>825 fn test_changeset_strm() -> Result<()> {
826 let output = one_changeset_strm()?;
827 assert!(!output.is_empty());
828 assert_eq!(14, output.len());
829
830 let input: &mut dyn Read = &mut output.as_slice();
831 let mut iter = ChangesetIter::start_strm(&input)?;
832 let item = iter.next()?;
833 assert!(item.is_some());
834 Ok(())
835 }
836
837 #[test]
test_changeset_apply() -> Result<()>838 fn test_changeset_apply() -> Result<()> {
839 let changeset = one_changeset()?;
840
841 let db = Connection::open_in_memory()?;
842 db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
843
844 static CALLED: AtomicBool = AtomicBool::new(false);
845 db.apply(
846 &changeset,
847 None::<fn(&str) -> bool>,
848 |_conflict_type, _item| {
849 CALLED.store(true, Ordering::Relaxed);
850 ConflictAction::SQLITE_CHANGESET_OMIT
851 },
852 )?;
853
854 assert!(!CALLED.load(Ordering::Relaxed));
855 let check = db.query_row("SELECT 1 FROM foo WHERE t = ?1", ["bar"], |row| {
856 row.get::<_, i32>(0)
857 })?;
858 assert_eq!(1, check);
859
860 // conflict expected when same changeset applied again on the same db
861 db.apply(
862 &changeset,
863 None::<fn(&str) -> bool>,
864 |conflict_type, item| {
865 CALLED.store(true, Ordering::Relaxed);
866 assert_eq!(ConflictType::SQLITE_CHANGESET_CONFLICT, conflict_type);
867 let conflict = item.conflict(0).unwrap();
868 assert_eq!(Ok("bar"), conflict.as_str());
869 ConflictAction::SQLITE_CHANGESET_OMIT
870 },
871 )?;
872 assert!(CALLED.load(Ordering::Relaxed));
873 Ok(())
874 }
875
876 #[test]
test_changeset_apply_strm() -> Result<()>877 fn test_changeset_apply_strm() -> Result<()> {
878 let output = one_changeset_strm()?;
879
880 let db = Connection::open_in_memory()?;
881 db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
882
883 let mut input = output.as_slice();
884 db.apply_strm(
885 &mut input,
886 None::<fn(&str) -> bool>,
887 |_conflict_type, _item| ConflictAction::SQLITE_CHANGESET_OMIT,
888 )?;
889
890 let check = db.query_row("SELECT 1 FROM foo WHERE t = ?1", ["bar"], |row| {
891 row.get::<_, i32>(0)
892 })?;
893 assert_eq!(1, check);
894 Ok(())
895 }
896
897 #[test]
test_session_empty() -> Result<()>898 fn test_session_empty() -> Result<()> {
899 let db = Connection::open_in_memory()?;
900 db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
901
902 let mut session = Session::new(&db)?;
903 assert!(session.is_empty());
904
905 session.attach(None)?;
906 db.execute("INSERT INTO foo (t) VALUES (?1);", ["bar"])?;
907
908 assert!(!session.is_empty());
909 Ok(())
910 }
911
912 #[test]
test_session_set_enabled() -> Result<()>913 fn test_session_set_enabled() -> Result<()> {
914 let db = Connection::open_in_memory()?;
915
916 let mut session = Session::new(&db)?;
917 assert!(session.is_enabled());
918 session.set_enabled(false);
919 assert!(!session.is_enabled());
920 Ok(())
921 }
922
923 #[test]
test_session_set_indirect() -> Result<()>924 fn test_session_set_indirect() -> Result<()> {
925 let db = Connection::open_in_memory()?;
926
927 let mut session = Session::new(&db)?;
928 assert!(!session.is_indirect());
929 session.set_indirect(true);
930 assert!(session.is_indirect());
931 Ok(())
932 }
933 }
934