1 //! [Session Extension](https://sqlite.org/sessionintro.html)
2 #![allow(non_camel_case_types)]
3 
4 use std::ffi::CStr;
5 use std::io::{Read, Write};
6 use std::marker::PhantomData;
7 use std::os::raw::{c_char, c_int, c_uchar, c_void};
8 use std::panic::{catch_unwind, RefUnwindSafe};
9 use std::ptr;
10 use std::slice::{from_raw_parts, from_raw_parts_mut};
11 
12 use fallible_streaming_iterator::FallibleStreamingIterator;
13 
14 use crate::error::{check, error_from_sqlite_code};
15 use crate::ffi;
16 use crate::hooks::Action;
17 use crate::types::ValueRef;
18 use crate::{errmsg_to_string, str_to_cstring, Connection, DatabaseName, Result};
19 
20 // https://sqlite.org/session.html
21 
22 type Filter = Option<Box<dyn Fn(&str) -> bool>>;
23 
24 /// An instance of this object is a session that can be
25 /// used to record changes to a database.
26 pub struct Session<'conn> {
27     phantom: PhantomData<&'conn Connection>,
28     s: *mut ffi::sqlite3_session,
29     filter: Filter,
30 }
31 
32 impl Session<'_> {
33     /// Create a new session object
34     #[inline]
new(db: &Connection) -> Result<Session<'_>>35     pub fn new(db: &Connection) -> Result<Session<'_>> {
36         Session::new_with_name(db, DatabaseName::Main)
37     }
38 
39     /// Create a new session object
40     #[inline]
new_with_name<'conn>( db: &'conn Connection, name: DatabaseName<'_>, ) -> Result<Session<'conn>>41     pub fn new_with_name<'conn>(
42         db: &'conn Connection,
43         name: DatabaseName<'_>,
44     ) -> Result<Session<'conn>> {
45         let name = name.as_cstring()?;
46 
47         let db = db.db.borrow_mut().db;
48 
49         let mut s: *mut ffi::sqlite3_session = ptr::null_mut();
50         check(unsafe { ffi::sqlite3session_create(db, name.as_ptr(), &mut s) })?;
51 
52         Ok(Session {
53             phantom: PhantomData,
54             s,
55             filter: None,
56         })
57     }
58 
59     /// Set a table filter
table_filter<F>(&mut self, filter: Option<F>) where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,60     pub fn table_filter<F>(&mut self, filter: Option<F>)
61     where
62         F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
63     {
64         unsafe extern "C" fn call_boxed_closure<F>(
65             p_arg: *mut c_void,
66             tbl_str: *const c_char,
67         ) -> c_int
68         where
69             F: Fn(&str) -> bool + RefUnwindSafe,
70         {
71             use std::str;
72 
73             let boxed_filter: *mut F = p_arg as *mut F;
74             let tbl_name = {
75                 let c_slice = CStr::from_ptr(tbl_str).to_bytes();
76                 str::from_utf8(c_slice)
77             };
78             c_int::from(
79                 catch_unwind(|| (*boxed_filter)(tbl_name.expect("non-utf8 table name")))
80                     .unwrap_or_default(),
81             )
82         }
83 
84         match filter {
85             Some(filter) => {
86                 let boxed_filter = Box::new(filter);
87                 unsafe {
88                     ffi::sqlite3session_table_filter(
89                         self.s,
90                         Some(call_boxed_closure::<F>),
91                         &*boxed_filter as *const F as *mut _,
92                     );
93                 }
94                 self.filter = Some(boxed_filter);
95             }
96             _ => {
97                 unsafe { ffi::sqlite3session_table_filter(self.s, None, ptr::null_mut()) }
98                 self.filter = None;
99             }
100         };
101     }
102 
103     /// Attach a table. `None` means all tables.
attach(&mut self, table: Option<&str>) -> Result<()>104     pub fn attach(&mut self, table: Option<&str>) -> Result<()> {
105         let table = if let Some(table) = table {
106             Some(str_to_cstring(table)?)
107         } else {
108             None
109         };
110         let table = table.as_ref().map(|s| s.as_ptr()).unwrap_or(ptr::null());
111         check(unsafe { ffi::sqlite3session_attach(self.s, table) })
112     }
113 
114     /// Generate a Changeset
changeset(&mut self) -> Result<Changeset>115     pub fn changeset(&mut self) -> Result<Changeset> {
116         let mut n = 0;
117         let mut cs: *mut c_void = ptr::null_mut();
118         check(unsafe { ffi::sqlite3session_changeset(self.s, &mut n, &mut cs) })?;
119         Ok(Changeset { cs, n })
120     }
121 
122     /// Write the set of changes represented by this session to `output`.
123     #[inline]
changeset_strm(&mut self, output: &mut dyn Write) -> Result<()>124     pub fn changeset_strm(&mut self, output: &mut dyn Write) -> Result<()> {
125         let output_ref = &output;
126         check(unsafe {
127             ffi::sqlite3session_changeset_strm(
128                 self.s,
129                 Some(x_output),
130                 output_ref as *const &mut dyn Write as *mut c_void,
131             )
132         })
133     }
134 
135     /// Generate a Patchset
136     #[inline]
patchset(&mut self) -> Result<Changeset>137     pub fn patchset(&mut self) -> Result<Changeset> {
138         let mut n = 0;
139         let mut ps: *mut c_void = ptr::null_mut();
140         check(unsafe { ffi::sqlite3session_patchset(self.s, &mut n, &mut ps) })?;
141         // TODO Validate: same struct
142         Ok(Changeset { cs: ps, n })
143     }
144 
145     /// Write the set of patches represented by this session to `output`.
146     #[inline]
patchset_strm(&mut self, output: &mut dyn Write) -> Result<()>147     pub fn patchset_strm(&mut self, output: &mut dyn Write) -> Result<()> {
148         let output_ref = &output;
149         check(unsafe {
150             ffi::sqlite3session_patchset_strm(
151                 self.s,
152                 Some(x_output),
153                 output_ref as *const &mut dyn Write as *mut c_void,
154             )
155         })
156     }
157 
158     /// Load the difference between tables.
diff(&mut self, from: DatabaseName<'_>, table: &str) -> Result<()>159     pub fn diff(&mut self, from: DatabaseName<'_>, table: &str) -> Result<()> {
160         let from = from.as_cstring()?;
161         let table = str_to_cstring(table)?;
162         let table = table.as_ptr();
163         unsafe {
164             let mut errmsg = ptr::null_mut();
165             let r =
166                 ffi::sqlite3session_diff(self.s, from.as_ptr(), table, &mut errmsg as *mut *mut _);
167             if r != ffi::SQLITE_OK {
168                 let errmsg: *mut c_char = errmsg;
169                 let message = errmsg_to_string(&*errmsg);
170                 ffi::sqlite3_free(errmsg as *mut c_void);
171                 return Err(error_from_sqlite_code(r, Some(message)));
172             }
173         }
174         Ok(())
175     }
176 
177     /// Test if a changeset has recorded any changes
178     #[inline]
is_empty(&self) -> bool179     pub fn is_empty(&self) -> bool {
180         unsafe { ffi::sqlite3session_isempty(self.s) != 0 }
181     }
182 
183     /// Query the current state of the session
184     #[inline]
is_enabled(&self) -> bool185     pub fn is_enabled(&self) -> bool {
186         unsafe { ffi::sqlite3session_enable(self.s, -1) != 0 }
187     }
188 
189     /// Enable or disable the recording of changes
190     #[inline]
set_enabled(&mut self, enabled: bool)191     pub fn set_enabled(&mut self, enabled: bool) {
192         unsafe {
193             ffi::sqlite3session_enable(self.s, c_int::from(enabled));
194         }
195     }
196 
197     /// Query the current state of the indirect flag
198     #[inline]
is_indirect(&self) -> bool199     pub fn is_indirect(&self) -> bool {
200         unsafe { ffi::sqlite3session_indirect(self.s, -1) != 0 }
201     }
202 
203     /// Set or clear the indirect change flag
204     #[inline]
set_indirect(&mut self, indirect: bool)205     pub fn set_indirect(&mut self, indirect: bool) {
206         unsafe {
207             ffi::sqlite3session_indirect(self.s, c_int::from(indirect));
208         }
209     }
210 }
211 
212 impl Drop for Session<'_> {
213     #[inline]
drop(&mut self)214     fn drop(&mut self) {
215         if self.filter.is_some() {
216             self.table_filter(None::<fn(&str) -> bool>);
217         }
218         unsafe { ffi::sqlite3session_delete(self.s) };
219     }
220 }
221 
222 /// Invert a changeset
223 #[inline]
invert_strm(input: &mut dyn Read, output: &mut dyn Write) -> Result<()>224 pub fn invert_strm(input: &mut dyn Read, output: &mut dyn Write) -> Result<()> {
225     let input_ref = &input;
226     let output_ref = &output;
227     check(unsafe {
228         ffi::sqlite3changeset_invert_strm(
229             Some(x_input),
230             input_ref as *const &mut dyn Read as *mut c_void,
231             Some(x_output),
232             output_ref as *const &mut dyn Write as *mut c_void,
233         )
234     })
235 }
236 
237 /// Combine two changesets
238 #[inline]
concat_strm( input_a: &mut dyn Read, input_b: &mut dyn Read, output: &mut dyn Write, ) -> Result<()>239 pub fn concat_strm(
240     input_a: &mut dyn Read,
241     input_b: &mut dyn Read,
242     output: &mut dyn Write,
243 ) -> Result<()> {
244     let input_a_ref = &input_a;
245     let input_b_ref = &input_b;
246     let output_ref = &output;
247     check(unsafe {
248         ffi::sqlite3changeset_concat_strm(
249             Some(x_input),
250             input_a_ref as *const &mut dyn Read as *mut c_void,
251             Some(x_input),
252             input_b_ref as *const &mut dyn Read as *mut c_void,
253             Some(x_output),
254             output_ref as *const &mut dyn Write as *mut c_void,
255         )
256     })
257 }
258 
259 /// Changeset or Patchset
260 pub struct Changeset {
261     cs: *mut c_void,
262     n: c_int,
263 }
264 
265 impl Changeset {
266     /// Invert a changeset
267     #[inline]
invert(&self) -> Result<Changeset>268     pub fn invert(&self) -> Result<Changeset> {
269         let mut n = 0;
270         let mut cs = ptr::null_mut();
271         check(unsafe {
272             ffi::sqlite3changeset_invert(self.n, self.cs, &mut n, &mut cs as *mut *mut _)
273         })?;
274         Ok(Changeset { cs, n })
275     }
276 
277     /// Create an iterator to traverse a changeset
278     #[inline]
iter(&self) -> Result<ChangesetIter<'_>>279     pub fn iter(&self) -> Result<ChangesetIter<'_>> {
280         let mut it = ptr::null_mut();
281         check(unsafe { ffi::sqlite3changeset_start(&mut it as *mut *mut _, self.n, self.cs) })?;
282         Ok(ChangesetIter {
283             phantom: PhantomData,
284             it,
285             item: None,
286         })
287     }
288 
289     /// Concatenate two changeset objects
290     #[inline]
concat(a: &Changeset, b: &Changeset) -> Result<Changeset>291     pub fn concat(a: &Changeset, b: &Changeset) -> Result<Changeset> {
292         let mut n = 0;
293         let mut cs = ptr::null_mut();
294         check(unsafe {
295             ffi::sqlite3changeset_concat(a.n, a.cs, b.n, b.cs, &mut n, &mut cs as *mut *mut _)
296         })?;
297         Ok(Changeset { cs, n })
298     }
299 }
300 
301 impl Drop for Changeset {
302     #[inline]
drop(&mut self)303     fn drop(&mut self) {
304         unsafe {
305             ffi::sqlite3_free(self.cs);
306         }
307     }
308 }
309 
310 /// Cursor for iterating over the elements of a changeset
311 /// or patchset.
312 pub struct ChangesetIter<'changeset> {
313     phantom: PhantomData<&'changeset Changeset>,
314     it: *mut ffi::sqlite3_changeset_iter,
315     item: Option<ChangesetItem>,
316 }
317 
318 impl ChangesetIter<'_> {
319     /// Create an iterator on `input`
320     #[inline]
start_strm<'input>(input: &&'input mut dyn Read) -> Result<ChangesetIter<'input>>321     pub fn start_strm<'input>(input: &&'input mut dyn Read) -> Result<ChangesetIter<'input>> {
322         let mut it = ptr::null_mut();
323         check(unsafe {
324             ffi::sqlite3changeset_start_strm(
325                 &mut it as *mut *mut _,
326                 Some(x_input),
327                 input as *const &mut dyn Read as *mut c_void,
328             )
329         })?;
330         Ok(ChangesetIter {
331             phantom: PhantomData,
332             it,
333             item: None,
334         })
335     }
336 }
337 
338 impl FallibleStreamingIterator for ChangesetIter<'_> {
339     type Error = crate::error::Error;
340     type Item = ChangesetItem;
341 
342     #[inline]
advance(&mut self) -> Result<()>343     fn advance(&mut self) -> Result<()> {
344         let rc = unsafe { ffi::sqlite3changeset_next(self.it) };
345         match rc {
346             ffi::SQLITE_ROW => {
347                 self.item = Some(ChangesetItem { it: self.it });
348                 Ok(())
349             }
350             ffi::SQLITE_DONE => {
351                 self.item = None;
352                 Ok(())
353             }
354             code => Err(error_from_sqlite_code(code, None)),
355         }
356     }
357 
358     #[inline]
get(&self) -> Option<&ChangesetItem>359     fn get(&self) -> Option<&ChangesetItem> {
360         self.item.as_ref()
361     }
362 }
363 
364 /// Operation
365 pub struct Operation<'item> {
366     table_name: &'item str,
367     number_of_columns: i32,
368     code: Action,
369     indirect: bool,
370 }
371 
372 impl Operation<'_> {
373     /// Returns the table name.
374     #[inline]
table_name(&self) -> &str375     pub fn table_name(&self) -> &str {
376         self.table_name
377     }
378 
379     /// Returns the number of columns in table
380     #[inline]
number_of_columns(&self) -> i32381     pub fn number_of_columns(&self) -> i32 {
382         self.number_of_columns
383     }
384 
385     /// Returns the action code.
386     #[inline]
code(&self) -> Action387     pub fn code(&self) -> Action {
388         self.code
389     }
390 
391     /// Returns `true` for an 'indirect' change.
392     #[inline]
indirect(&self) -> bool393     pub fn indirect(&self) -> bool {
394         self.indirect
395     }
396 }
397 
398 impl Drop for ChangesetIter<'_> {
399     #[inline]
drop(&mut self)400     fn drop(&mut self) {
401         unsafe {
402             ffi::sqlite3changeset_finalize(self.it);
403         }
404     }
405 }
406 
407 /// An item passed to a conflict-handler by
408 /// [`Connection::apply`](crate::Connection::apply), or an item generated by
409 /// [`ChangesetIter::next`](ChangesetIter::next).
410 // TODO enum ? Delete, Insert, Update, ...
411 pub struct ChangesetItem {
412     it: *mut ffi::sqlite3_changeset_iter,
413 }
414 
415 impl ChangesetItem {
416     /// Obtain conflicting row values
417     ///
418     /// May only be called with an `SQLITE_CHANGESET_DATA` or
419     /// `SQLITE_CHANGESET_CONFLICT` conflict handler callback.
420     #[inline]
conflict(&self, col: usize) -> Result<ValueRef<'_>>421     pub fn conflict(&self, col: usize) -> Result<ValueRef<'_>> {
422         unsafe {
423             let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut();
424             check(ffi::sqlite3changeset_conflict(
425                 self.it,
426                 col as i32,
427                 &mut p_value,
428             ))?;
429             Ok(ValueRef::from_value(p_value))
430         }
431     }
432 
433     /// Determine the number of foreign key constraint violations
434     ///
435     /// May only be called with an `SQLITE_CHANGESET_FOREIGN_KEY` conflict
436     /// handler callback.
437     #[inline]
fk_conflicts(&self) -> Result<i32>438     pub fn fk_conflicts(&self) -> Result<i32> {
439         unsafe {
440             let mut p_out = 0;
441             check(ffi::sqlite3changeset_fk_conflicts(self.it, &mut p_out))?;
442             Ok(p_out)
443         }
444     }
445 
446     /// Obtain new.* Values
447     ///
448     /// May only be called if the type of change is either `SQLITE_UPDATE` or
449     /// `SQLITE_INSERT`.
450     #[inline]
new_value(&self, col: usize) -> Result<ValueRef<'_>>451     pub fn new_value(&self, col: usize) -> Result<ValueRef<'_>> {
452         unsafe {
453             let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut();
454             check(ffi::sqlite3changeset_new(self.it, col as i32, &mut p_value))?;
455             Ok(ValueRef::from_value(p_value))
456         }
457     }
458 
459     /// Obtain old.* Values
460     ///
461     /// May only be called if the type of change is either `SQLITE_DELETE` or
462     /// `SQLITE_UPDATE`.
463     #[inline]
old_value(&self, col: usize) -> Result<ValueRef<'_>>464     pub fn old_value(&self, col: usize) -> Result<ValueRef<'_>> {
465         unsafe {
466             let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut();
467             check(ffi::sqlite3changeset_old(self.it, col as i32, &mut p_value))?;
468             Ok(ValueRef::from_value(p_value))
469         }
470     }
471 
472     /// Obtain the current operation
473     #[inline]
op(&self) -> Result<Operation<'_>>474     pub fn op(&self) -> Result<Operation<'_>> {
475         let mut number_of_columns = 0;
476         let mut code = 0;
477         let mut indirect = 0;
478         let tab = unsafe {
479             let mut pz_tab: *const c_char = ptr::null();
480             check(ffi::sqlite3changeset_op(
481                 self.it,
482                 &mut pz_tab,
483                 &mut number_of_columns,
484                 &mut code,
485                 &mut indirect,
486             ))?;
487             CStr::from_ptr(pz_tab)
488         };
489         let table_name = tab.to_str()?;
490         Ok(Operation {
491             table_name,
492             number_of_columns,
493             code: Action::from(code),
494             indirect: indirect != 0,
495         })
496     }
497 
498     /// Obtain the primary key definition of a table
499     #[inline]
pk(&self) -> Result<&[u8]>500     pub fn pk(&self) -> Result<&[u8]> {
501         let mut number_of_columns = 0;
502         unsafe {
503             let mut pks: *mut c_uchar = ptr::null_mut();
504             check(ffi::sqlite3changeset_pk(
505                 self.it,
506                 &mut pks,
507                 &mut number_of_columns,
508             ))?;
509             Ok(from_raw_parts(pks, number_of_columns as usize))
510         }
511     }
512 }
513 
514 /// Used to combine two or more changesets or
515 /// patchsets
516 pub struct Changegroup {
517     cg: *mut ffi::sqlite3_changegroup,
518 }
519 
520 impl Changegroup {
521     /// Create a new change group.
522     #[inline]
new() -> Result<Self>523     pub fn new() -> Result<Self> {
524         let mut cg = ptr::null_mut();
525         check(unsafe { ffi::sqlite3changegroup_new(&mut cg) })?;
526         Ok(Changegroup { cg })
527     }
528 
529     /// Add a changeset
530     #[inline]
add(&mut self, cs: &Changeset) -> Result<()>531     pub fn add(&mut self, cs: &Changeset) -> Result<()> {
532         check(unsafe { ffi::sqlite3changegroup_add(self.cg, cs.n, cs.cs) })
533     }
534 
535     /// Add a changeset read from `input` to this change group.
536     #[inline]
add_stream(&mut self, input: &mut dyn Read) -> Result<()>537     pub fn add_stream(&mut self, input: &mut dyn Read) -> Result<()> {
538         let input_ref = &input;
539         check(unsafe {
540             ffi::sqlite3changegroup_add_strm(
541                 self.cg,
542                 Some(x_input),
543                 input_ref as *const &mut dyn Read as *mut c_void,
544             )
545         })
546     }
547 
548     /// Obtain a composite Changeset
549     #[inline]
output(&mut self) -> Result<Changeset>550     pub fn output(&mut self) -> Result<Changeset> {
551         let mut n = 0;
552         let mut output: *mut c_void = ptr::null_mut();
553         check(unsafe { ffi::sqlite3changegroup_output(self.cg, &mut n, &mut output) })?;
554         Ok(Changeset { cs: output, n })
555     }
556 
557     /// Write the combined set of changes to `output`.
558     #[inline]
output_strm(&mut self, output: &mut dyn Write) -> Result<()>559     pub fn output_strm(&mut self, output: &mut dyn Write) -> Result<()> {
560         let output_ref = &output;
561         check(unsafe {
562             ffi::sqlite3changegroup_output_strm(
563                 self.cg,
564                 Some(x_output),
565                 output_ref as *const &mut dyn Write as *mut c_void,
566             )
567         })
568     }
569 }
570 
571 impl Drop for Changegroup {
572     #[inline]
drop(&mut self)573     fn drop(&mut self) {
574         unsafe {
575             ffi::sqlite3changegroup_delete(self.cg);
576         }
577     }
578 }
579 
580 impl Connection {
581     /// Apply a changeset to a database
apply<F, C>(&self, cs: &Changeset, filter: Option<F>, conflict: C) -> Result<()> where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,582     pub fn apply<F, C>(&self, cs: &Changeset, filter: Option<F>, conflict: C) -> Result<()>
583     where
584         F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
585         C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
586     {
587         let db = self.db.borrow_mut().db;
588 
589         let filtered = filter.is_some();
590         let tuple = &mut (filter, conflict);
591         check(unsafe {
592             if filtered {
593                 ffi::sqlite3changeset_apply(
594                     db,
595                     cs.n,
596                     cs.cs,
597                     Some(call_filter::<F, C>),
598                     Some(call_conflict::<F, C>),
599                     tuple as *mut (Option<F>, C) as *mut c_void,
600                 )
601             } else {
602                 ffi::sqlite3changeset_apply(
603                     db,
604                     cs.n,
605                     cs.cs,
606                     None,
607                     Some(call_conflict::<F, C>),
608                     tuple as *mut (Option<F>, C) as *mut c_void,
609                 )
610             }
611         })
612     }
613 
614     /// Apply a changeset to a database
apply_strm<F, C>( &self, input: &mut dyn Read, filter: Option<F>, conflict: C, ) -> Result<()> where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,615     pub fn apply_strm<F, C>(
616         &self,
617         input: &mut dyn Read,
618         filter: Option<F>,
619         conflict: C,
620     ) -> Result<()>
621     where
622         F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
623         C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
624     {
625         let input_ref = &input;
626         let db = self.db.borrow_mut().db;
627 
628         let filtered = filter.is_some();
629         let tuple = &mut (filter, conflict);
630         check(unsafe {
631             if filtered {
632                 ffi::sqlite3changeset_apply_strm(
633                     db,
634                     Some(x_input),
635                     input_ref as *const &mut dyn Read as *mut c_void,
636                     Some(call_filter::<F, C>),
637                     Some(call_conflict::<F, C>),
638                     tuple as *mut (Option<F>, C) as *mut c_void,
639                 )
640             } else {
641                 ffi::sqlite3changeset_apply_strm(
642                     db,
643                     Some(x_input),
644                     input_ref as *const &mut dyn Read as *mut c_void,
645                     None,
646                     Some(call_conflict::<F, C>),
647                     tuple as *mut (Option<F>, C) as *mut c_void,
648                 )
649             }
650         })
651     }
652 }
653 
654 /// Constants passed to the conflict handler
655 /// See [here](https://sqlite.org/session.html#SQLITE_CHANGESET_CONFLICT) for details.
656 #[allow(missing_docs)]
657 #[repr(i32)]
658 #[derive(Debug, PartialEq, Eq)]
659 #[non_exhaustive]
660 #[allow(clippy::upper_case_acronyms)]
661 pub enum ConflictType {
662     UNKNOWN = -1,
663     SQLITE_CHANGESET_DATA = ffi::SQLITE_CHANGESET_DATA,
664     SQLITE_CHANGESET_NOTFOUND = ffi::SQLITE_CHANGESET_NOTFOUND,
665     SQLITE_CHANGESET_CONFLICT = ffi::SQLITE_CHANGESET_CONFLICT,
666     SQLITE_CHANGESET_CONSTRAINT = ffi::SQLITE_CHANGESET_CONSTRAINT,
667     SQLITE_CHANGESET_FOREIGN_KEY = ffi::SQLITE_CHANGESET_FOREIGN_KEY,
668 }
669 impl From<i32> for ConflictType {
from(code: i32) -> ConflictType670     fn from(code: i32) -> ConflictType {
671         match code {
672             ffi::SQLITE_CHANGESET_DATA => ConflictType::SQLITE_CHANGESET_DATA,
673             ffi::SQLITE_CHANGESET_NOTFOUND => ConflictType::SQLITE_CHANGESET_NOTFOUND,
674             ffi::SQLITE_CHANGESET_CONFLICT => ConflictType::SQLITE_CHANGESET_CONFLICT,
675             ffi::SQLITE_CHANGESET_CONSTRAINT => ConflictType::SQLITE_CHANGESET_CONSTRAINT,
676             ffi::SQLITE_CHANGESET_FOREIGN_KEY => ConflictType::SQLITE_CHANGESET_FOREIGN_KEY,
677             _ => ConflictType::UNKNOWN,
678         }
679     }
680 }
681 
682 /// Constants returned by the conflict handler
683 /// See [here](https://sqlite.org/session.html#SQLITE_CHANGESET_ABORT) for details.
684 #[allow(missing_docs)]
685 #[repr(i32)]
686 #[derive(Debug, PartialEq, Eq)]
687 #[non_exhaustive]
688 #[allow(clippy::upper_case_acronyms)]
689 pub enum ConflictAction {
690     SQLITE_CHANGESET_OMIT = ffi::SQLITE_CHANGESET_OMIT,
691     SQLITE_CHANGESET_REPLACE = ffi::SQLITE_CHANGESET_REPLACE,
692     SQLITE_CHANGESET_ABORT = ffi::SQLITE_CHANGESET_ABORT,
693 }
694 
call_filter<F, C>(p_ctx: *mut c_void, tbl_str: *const c_char) -> c_int where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,695 unsafe extern "C" fn call_filter<F, C>(p_ctx: *mut c_void, tbl_str: *const c_char) -> c_int
696 where
697     F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
698     C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
699 {
700     use std::str;
701 
702     let tuple: *mut (Option<F>, C) = p_ctx as *mut (Option<F>, C);
703     let tbl_name = {
704         let c_slice = CStr::from_ptr(tbl_str).to_bytes();
705         str::from_utf8(c_slice)
706     };
707     match *tuple {
708         (Some(ref filter), _) => c_int::from(
709             catch_unwind(|| filter(tbl_name.expect("illegal table name"))).unwrap_or_default(),
710         ),
711         _ => unimplemented!(),
712     }
713 }
714 
call_conflict<F, C>( p_ctx: *mut c_void, e_conflict: c_int, p: *mut ffi::sqlite3_changeset_iter, ) -> c_int where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,715 unsafe extern "C" fn call_conflict<F, C>(
716     p_ctx: *mut c_void,
717     e_conflict: c_int,
718     p: *mut ffi::sqlite3_changeset_iter,
719 ) -> c_int
720 where
721     F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
722     C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
723 {
724     let tuple: *mut (Option<F>, C) = p_ctx as *mut (Option<F>, C);
725     let conflict_type = ConflictType::from(e_conflict);
726     let item = ChangesetItem { it: p };
727     if let Ok(action) = catch_unwind(|| (*tuple).1(conflict_type, item)) {
728         action as c_int
729     } else {
730         ffi::SQLITE_CHANGESET_ABORT
731     }
732 }
733 
x_input(p_in: *mut c_void, data: *mut c_void, len: *mut c_int) -> c_int734 unsafe extern "C" fn x_input(p_in: *mut c_void, data: *mut c_void, len: *mut c_int) -> c_int {
735     if p_in.is_null() {
736         return ffi::SQLITE_MISUSE;
737     }
738     let bytes: &mut [u8] = from_raw_parts_mut(data as *mut u8, *len as usize);
739     let input = p_in as *mut &mut dyn Read;
740     match (*input).read(bytes) {
741         Ok(n) => {
742             *len = n as i32; // TODO Validate: n = 0 may not mean the reader will always no longer be able to
743                              // produce bytes.
744             ffi::SQLITE_OK
745         }
746         Err(_) => ffi::SQLITE_IOERR_READ, // TODO check if err is a (ru)sqlite Error => propagate
747     }
748 }
749 
x_output(p_out: *mut c_void, data: *const c_void, len: c_int) -> c_int750 unsafe extern "C" fn x_output(p_out: *mut c_void, data: *const c_void, len: c_int) -> c_int {
751     if p_out.is_null() {
752         return ffi::SQLITE_MISUSE;
753     }
754     // The sessions module never invokes an xOutput callback with the third
755     // parameter set to a value less than or equal to zero.
756     let bytes: &[u8] = from_raw_parts(data as *const u8, len as usize);
757     let output = p_out as *mut &mut dyn Write;
758     match (*output).write_all(bytes) {
759         Ok(_) => ffi::SQLITE_OK,
760         Err(_) => ffi::SQLITE_IOERR_WRITE, // TODO check if err is a (ru)sqlite Error => propagate
761     }
762 }
763 
764 #[cfg(test)]
765 mod test {
766     use fallible_streaming_iterator::FallibleStreamingIterator;
767     use std::io::Read;
768     use std::sync::atomic::{AtomicBool, Ordering};
769 
770     use super::{Changeset, ChangesetIter, ConflictAction, ConflictType, Session};
771     use crate::hooks::Action;
772     use crate::{Connection, Result};
773 
one_changeset() -> Result<Changeset>774     fn one_changeset() -> Result<Changeset> {
775         let db = Connection::open_in_memory()?;
776         db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
777 
778         let mut session = Session::new(&db)?;
779         assert!(session.is_empty());
780 
781         session.attach(None)?;
782         db.execute("INSERT INTO foo (t) VALUES (?1);", ["bar"])?;
783 
784         session.changeset()
785     }
786 
one_changeset_strm() -> Result<Vec<u8>>787     fn one_changeset_strm() -> Result<Vec<u8>> {
788         let db = Connection::open_in_memory()?;
789         db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
790 
791         let mut session = Session::new(&db)?;
792         assert!(session.is_empty());
793 
794         session.attach(None)?;
795         db.execute("INSERT INTO foo (t) VALUES (?1);", ["bar"])?;
796 
797         let mut output = Vec::new();
798         session.changeset_strm(&mut output)?;
799         Ok(output)
800     }
801 
802     #[test]
test_changeset() -> Result<()>803     fn test_changeset() -> Result<()> {
804         let changeset = one_changeset()?;
805         let mut iter = changeset.iter()?;
806         let item = iter.next()?;
807         assert!(item.is_some());
808 
809         let item = item.unwrap();
810         let op = item.op()?;
811         assert_eq!("foo", op.table_name());
812         assert_eq!(1, op.number_of_columns());
813         assert_eq!(Action::SQLITE_INSERT, op.code());
814         assert!(!op.indirect());
815 
816         let pk = item.pk()?;
817         assert_eq!(&[1], pk);
818 
819         let new_value = item.new_value(0)?;
820         assert_eq!(Ok("bar"), new_value.as_str());
821         Ok(())
822     }
823 
824     #[test]
test_changeset_strm() -> Result<()>825     fn test_changeset_strm() -> Result<()> {
826         let output = one_changeset_strm()?;
827         assert!(!output.is_empty());
828         assert_eq!(14, output.len());
829 
830         let input: &mut dyn Read = &mut output.as_slice();
831         let mut iter = ChangesetIter::start_strm(&input)?;
832         let item = iter.next()?;
833         assert!(item.is_some());
834         Ok(())
835     }
836 
837     #[test]
test_changeset_apply() -> Result<()>838     fn test_changeset_apply() -> Result<()> {
839         let changeset = one_changeset()?;
840 
841         let db = Connection::open_in_memory()?;
842         db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
843 
844         static CALLED: AtomicBool = AtomicBool::new(false);
845         db.apply(
846             &changeset,
847             None::<fn(&str) -> bool>,
848             |_conflict_type, _item| {
849                 CALLED.store(true, Ordering::Relaxed);
850                 ConflictAction::SQLITE_CHANGESET_OMIT
851             },
852         )?;
853 
854         assert!(!CALLED.load(Ordering::Relaxed));
855         let check = db.query_row("SELECT 1 FROM foo WHERE t = ?1", ["bar"], |row| {
856             row.get::<_, i32>(0)
857         })?;
858         assert_eq!(1, check);
859 
860         // conflict expected when same changeset applied again on the same db
861         db.apply(
862             &changeset,
863             None::<fn(&str) -> bool>,
864             |conflict_type, item| {
865                 CALLED.store(true, Ordering::Relaxed);
866                 assert_eq!(ConflictType::SQLITE_CHANGESET_CONFLICT, conflict_type);
867                 let conflict = item.conflict(0).unwrap();
868                 assert_eq!(Ok("bar"), conflict.as_str());
869                 ConflictAction::SQLITE_CHANGESET_OMIT
870             },
871         )?;
872         assert!(CALLED.load(Ordering::Relaxed));
873         Ok(())
874     }
875 
876     #[test]
test_changeset_apply_strm() -> Result<()>877     fn test_changeset_apply_strm() -> Result<()> {
878         let output = one_changeset_strm()?;
879 
880         let db = Connection::open_in_memory()?;
881         db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
882 
883         let mut input = output.as_slice();
884         db.apply_strm(
885             &mut input,
886             None::<fn(&str) -> bool>,
887             |_conflict_type, _item| ConflictAction::SQLITE_CHANGESET_OMIT,
888         )?;
889 
890         let check = db.query_row("SELECT 1 FROM foo WHERE t = ?1", ["bar"], |row| {
891             row.get::<_, i32>(0)
892         })?;
893         assert_eq!(1, check);
894         Ok(())
895     }
896 
897     #[test]
test_session_empty() -> Result<()>898     fn test_session_empty() -> Result<()> {
899         let db = Connection::open_in_memory()?;
900         db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
901 
902         let mut session = Session::new(&db)?;
903         assert!(session.is_empty());
904 
905         session.attach(None)?;
906         db.execute("INSERT INTO foo (t) VALUES (?1);", ["bar"])?;
907 
908         assert!(!session.is_empty());
909         Ok(())
910     }
911 
912     #[test]
test_session_set_enabled() -> Result<()>913     fn test_session_set_enabled() -> Result<()> {
914         let db = Connection::open_in_memory()?;
915 
916         let mut session = Session::new(&db)?;
917         assert!(session.is_enabled());
918         session.set_enabled(false);
919         assert!(!session.is_enabled());
920         Ok(())
921     }
922 
923     #[test]
test_session_set_indirect() -> Result<()>924     fn test_session_set_indirect() -> Result<()> {
925         let db = Connection::open_in_memory()?;
926 
927         let mut session = Session::new(&db)?;
928         assert!(!session.is_indirect());
929         session.set_indirect(true);
930         assert!(session.is_indirect());
931         Ok(())
932     }
933 }
934