commit 5d8e25a658b7e7b398f0821d7ae5273c823b4e26
parent a6ecfd0e89af930d19d6d6230587d150eae87f8a
Author: Eric Davis <edavis@insanum.com>
Date: Wed, 2 Jul 2014 19:02:38 -0700
refactoring... save_worker
Diffstat:
M | notes_db.py | | | 258 | ++++++++++++++++++++++++++++++------------------------------------------------- |
1 file changed, 96 insertions(+), 162 deletions(-)
diff --git a/notes_db.py b/notes_db.py
@@ -18,9 +18,6 @@
import time
import utils
-ACTION_SAVE = 0
-ACTION_SYNC_PARTIAL_TO_SERVER = 1
-
class SyncError(RuntimeError):
pass
@@ -69,14 +66,6 @@ def __init__(self, config):
# they're in sync with the disc.
n['savedate'] = now
- # save and sync queue
- self.q_save = Queue()
- self.q_save_res = Queue()
-
- thread_save = Thread(target=self.worker_save)
- thread_save.setDaemon(True)
- thread_save.start()
-
# initialise the simplenote instance we're going to use
# this does not yet need network access
self.simplenote = Simplenote(self.config.get_config('sn_username'),
@@ -87,11 +76,6 @@ def __init__(self, config):
# in progress. This variable is only used by the background thread.
self.threaded_syncing_keys = {}
- # reading a variable or setting this variable is atomic
- # so sync thread will write to it, main thread will only
- # check it sometimes.
- self.waiting_for_simplenote = False
-
self.q_sync = Queue()
self.q_sync_res = Queue()
@@ -350,9 +334,6 @@ def get_note_status(self, key):
return o
- def get_save_queue_len(self):
- return self.q_save.qsize()
-
def get_sync_queue_len(self):
return self.q_sync.qsize()
@@ -427,37 +408,25 @@ def sync_note_unthreaded(self, k):
else:
return None
- def save_threaded(self):
- for k,n in self.notes.items():
- savedate = float(n.get('savedate'))
- if float(n.get('modifydate')) > savedate or \
- float(n.get('syncdate')) > savedate:
- cn = copy.deepcopy(n)
- # put it on my queue as a save
- o = utils.KeyValueObject(action=ACTION_SAVE, key=k, note=cn)
- self.q_save.put(o)
-
- # in this same call, we process stuff that might have been put on the result queue
- nsaved = 0
- something_in_queue = True
- while something_in_queue:
- try:
- o = self.q_save_res.get_nowait()
-
- except Empty:
- something_in_queue = False
-
- else:
- # o (.action, .key, .note) is something that was written to disk
- # we only record the savedate.
- self.notes[o.key]['savedate'] = o.note['savedate']
- self.notify_observers('change:note-status',
- utils.KeyValueObject(what='savedate',
- key=o.key,
- msg="Note saved."))
- nsaved += 1
-
- return nsaved
+ # worker thread...
+ def save_worker(self):
+ logging.debug('Save worker: started')
+ while True:
+ time.sleep(5)
+ #logging.debug('Save worker: checking for work')
+ for k,n in self.notes.items():
+ savedate = float(n.get('savedate'))
+ if float(n.get('modifydate')) > savedate or \
+ float(n.get('syncdate')) > savedate:
+ try:
+ # this will write the new savedate into the note
+ self.helper_save_note(k, n)
+ logging.debug('Saved note: %s', k)
+ except WriteError, e:
+ msg = 'ERROR: Failed to write file to the filesystem!'
+ logging.error(msg)
+ print msg
+ os._exit(1)
def sync_to_server_threaded(self, wait_for_idle=True):
"""Only sync notes that have been changed / created locally since previous sync.
@@ -497,70 +466,67 @@ def sync_to_server_threaded(self, wait_for_idle=True):
# we store the timestamp when this copy was made as the syncdate
cn['syncdate'] = time.time()
# put it on my queue as a sync
- o = utils.KeyValueObject(action=ACTION_SYNC_PARTIAL_TO_SERVER, key=k, note=cn)
+ o = utils.KeyValueObject(key=k, note=cn)
self.q_sync.put(o)
# in this same call, we read out the result queue
nsynced = 0
nerrored = 0
- something_in_queue = True
- while something_in_queue:
+ while True:
try:
o = self.q_sync_res.get_nowait()
-
except Empty:
- something_in_queue = False
+ break
- else:
- okey = o.key
+ okey = o.key
+
+ if o.error:
+ nerrored += 1
+ del self.threaded_syncing_keys[okey]
+ continue
- if o.error:
- nerrored += 1
+ # o (.key, .note) is something that was synced
+
+ # we only apply the changes if the syncdate is newer than
+ # what we already have, since the main thread could be
+ # running a full sync whilst the worker thread is putting
+ # results in the queue.
+ if float(o.note['syncdate']) > float(self.notes[okey]['syncdate']):
+
+ if float(o.note['syncdate']) > float(self.notes[okey]['modifydate']):
+ # note was synced AFTER the last modification to our local version
+ # do an in-place update of the existing note
+ # this could be with or without new content.
+ old_note = copy.deepcopy(self.notes[okey])
+ self.notes[okey].update(o.note)
+ self.notify_observers('synced:note',
+ utils.KeyValueObject(lkey=okey,
+ old_note=old_note,
+ msg='Note synced.'))
else:
- # o (.action, .key, .note) is something that was synced
-
- # we only apply the changes if the syncdate is newer than
- # what we already have, since the main thread could be
- # running a full sync whilst the worker thread is putting
- # results in the queue.
- if float(o.note['syncdate']) > float(self.notes[okey]['syncdate']):
-
- if float(o.note['syncdate']) > float(self.notes[okey]['modifydate']):
- # note was synced AFTER the last modification to our local version
- # do an in-place update of the existing note
- # this could be with or without new content.
- old_note = copy.deepcopy(self.notes[okey])
- self.notes[okey].update(o.note)
- # notify anyone (probably nvPY) that this note has been changed
- self.notify_observers('synced:note',
- utils.KeyValueObject(lkey=okey,
- old_note=old_note,
- msg='Note synced.'))
-
- else:
- # the user has changed stuff since the version that got synced
- # just record syncnum and version that we got from simplenote
- # if we don't do this, merging problems start happening.
- # VERY importantly: also store the key. It
- # could be that we've just created the
- # note, but that the user continued
- # typing. We need to store the new server
- # key, else we'll keep on sending new
- # notes.
- tkeys = ['syncnum', 'version', 'syncdate', 'key']
- for tk in tkeys:
- self.notes[okey][tk] = o.note[tk]
-
- nsynced += 1
- self.notify_observers('change:note-status',
- utils.KeyValueObject(what='syncdate',
- key=okey,
- msg='Note synced.'))
-
- # after having handled the note that just came back,
- # we can take it from this blocker dict
- del self.threaded_syncing_keys[okey]
+ # the user has changed stuff since the version that got synced
+ # just record syncnum and version that we got from simplenote
+ # if we don't do this, merging problems start happening.
+ # VERY importantly: also store the key. It
+ # could be that we've just created the
+ # note, but that the user continued
+ # typing. We need to store the new server
+ # key, else we'll keep on sending new
+ # notes.
+ tkeys = ['syncnum', 'version', 'syncdate', 'key']
+ for tk in tkeys:
+ self.notes[okey][tk] = o.note[tk]
+
+ nsynced += 1
+ self.notify_observers('change:note-status',
+ utils.KeyValueObject(what='syncdate',
+ key=okey,
+ msg='Note synced.'))
+
+ # after having handled the note that just came back,
+ # we can take it from this blocker dict
+ del self.threaded_syncing_keys[okey]
return (nsynced, nerrored)
@@ -588,7 +554,7 @@ def sync_full(self):
del self.notes[lk]
# in either case (new or existing note), save note at assigned key
k = uret[0].get('key')
- # we merge the note we got back (content coud be empty!)
+ # we merge the note we got back (content could be empty!)
n.update(uret[0])
# and put it at the new key slot
self.notes[k] = n
@@ -718,86 +684,54 @@ def set_note_pinned(self, key, pinned):
if pinned != old_pinned:
if 'systemtags' not in n:
n['systemtags'] = []
-
systemtags = n['systemtags']
-
if pinned:
- # which by definition means that it was NOT pinned
systemtags.append('pinned')
-
else:
systemtags.remove('pinned')
-
n['modifydate'] = time.time()
self.notify_observers('change:note-status',
utils.KeyValueObject(what='modifydate',
key=key,
msg='Note pinned.' if pinned else 'Note unpinned.'))
- def worker_save(self):
- while True:
- o = self.q_save.get()
-
- if o.action == ACTION_SAVE:
- # this will write the savedate into o.note
- # with filename o.key.json
- try:
- self.helper_save_note(o.key, o.note)
-
- except WriteError, e:
- logging.error('FATAL ERROR in access to file system')
- print "FATAL ERROR: Check the nvpy.log"
- os._exit(1)
-
- else:
- # put the whole thing back into the result q
- # now we don't have to copy, because this thread
- # is never going to use o again.
- # somebody has to read out the queue...
- self.q_save_res.put(o)
-
def worker_sync(self):
while True:
o = self.q_sync.get()
- if o.action == ACTION_SYNC_PARTIAL_TO_SERVER:
- self.waiting_for_simplenote = True
- if 'key' in o.note:
- logging.debug('Updating note %s (local key %s) to server.' % (o.note['key'], o.key))
-
- else:
- logging.debug('Sending new note (local key %s) to server.' % (o.key,))
-
- uret = self.simplenote.update_note(o.note)
- self.waiting_for_simplenote = False
+ if 'key' in o.note:
+ logging.debug('Updating note %s (local key %s) to server.' % (o.note['key'], o.key))
+ else:
+ logging.debug('Sending new note (local key %s) to server.' % (o.key,))
- if uret[1] == 0:
- # success!
- n = uret[0]
+ uret = self.simplenote.update_note(o.note)
- if not n.get('content', None):
- # if note has not been changed, we don't get content back
- # delete our own copy too.
- del o.note['content']
+ if uret[1] != 0:
+ logging.error(uret[0])
+ # put it on the result queue
+ o.error = 1
+ self.q_sync_res.put(o)
+ continue
- logging.debug('Server replies with updated note ' + n['key'])
+ # success!
+ n = uret[0]
- # syncdate was set when the note was copied into our queue
- # we rely on that to determine when a returned note should
- # overwrite a note in the main list.
+ if not n.get('content', None):
+ # if note has not been changed, we don't get content back
+ # delete our own copy too.
+ del o.note['content']
- # store the actual note back into o
- # in-place update of our existing note copy
- o.note.update(n)
+ logging.debug('Server replies with updated note ' + n['key'])
- # success!
- o.error = 0
+ # syncdate was set when the note was copied into our queue
+ # we rely on that to determine when a returned note should
+ # overwrite a note in the main list.
- # and put it on the result queue
- self.q_sync_res.put(o)
+ # store the actual note back into o
+ # in-place update of our existing note copy
+ o.note.update(n)
- else:
- logging.error(uret[0])
- o.error = 1
- self.q_sync_res.put(o)
+ # put it on the result queue
+ o.error = 0
+ self.q_sync_res.put(o)