fix: Sync-Warteschlange im Online-Modus zuverlässig leeren
Lösch-Sync schlug serverseitig an JSON.parse('') fehl; clientseitig werden Duplikate zusammengeführt, parallele Läufe nachgeholt und die Queue bis zum Leeren durchgeschoben.
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
+86
-13
@@ -1,8 +1,9 @@
|
|||||||
import { db } from './db.js'
|
import { db, type SyncQueueItem } from './db.js'
|
||||||
import { getActiveMasterKey } from './auth.js'
|
import { getActiveMasterKey } from './auth.js'
|
||||||
|
|
||||||
const API_BASE = '/api/sync'
|
const API_BASE = '/api/sync'
|
||||||
const syncingLogbooks = new Set<string>()
|
const syncingLogbooks = new Set<string>()
|
||||||
|
const pendingResync = new Set<string>()
|
||||||
|
|
||||||
let isSyncing = false
|
let isSyncing = false
|
||||||
const listeners = new Set<(syncing: boolean) => void>()
|
const listeners = new Set<(syncing: boolean) => void>()
|
||||||
@@ -27,13 +28,63 @@ function isNewer(timeA: string | Date, timeB: string | Date): boolean {
|
|||||||
return new Date(timeA).getTime() > new Date(timeB).getTime()
|
return new Date(timeA).getTime() > new Date(timeB).getTime()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function entityKey(item: SyncQueueItem): string {
|
||||||
|
return `${item.type}:${item.payloadId}`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keep only the latest queue entry per entity; delete wins over create/update.
|
||||||
|
async function coalesceSyncQueue(logbookId: string): Promise<SyncQueueItem[]> {
|
||||||
|
const pending = await db.syncQueue.where({ logbookId }).toArray()
|
||||||
|
if (pending.length <= 1) return pending
|
||||||
|
|
||||||
|
const byEntity = new Map<string, SyncQueueItem[]>()
|
||||||
|
for (const item of pending) {
|
||||||
|
const key = entityKey(item)
|
||||||
|
const group = byEntity.get(key)
|
||||||
|
if (group) group.push(item)
|
||||||
|
else byEntity.set(key, [item])
|
||||||
|
}
|
||||||
|
|
||||||
|
const kept: SyncQueueItem[] = []
|
||||||
|
const staleIds: number[] = []
|
||||||
|
|
||||||
|
for (const group of byEntity.values()) {
|
||||||
|
const deletes = group.filter((item) => item.action === 'delete')
|
||||||
|
const latest =
|
||||||
|
deletes.length > 0
|
||||||
|
? deletes.reduce((a, b) => ((a.id ?? 0) > (b.id ?? 0) ? a : b))
|
||||||
|
: group.reduce((a, b) => ((a.id ?? 0) > (b.id ?? 0) ? a : b))
|
||||||
|
|
||||||
|
kept.push(latest)
|
||||||
|
for (const item of group) {
|
||||||
|
if (item.id !== undefined && item.id !== latest.id) {
|
||||||
|
staleIds.push(item.id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (staleIds.length > 0) {
|
||||||
|
await db.syncQueue.bulkDelete(staleIds)
|
||||||
|
}
|
||||||
|
|
||||||
|
return kept.sort((a, b) => (a.id ?? 0) - (b.id ?? 0))
|
||||||
|
}
|
||||||
|
|
||||||
|
function scheduleResync(logbookId: string) {
|
||||||
|
if (pendingResync.has(logbookId)) return
|
||||||
|
pendingResync.add(logbookId)
|
||||||
|
queueMicrotask(() => {
|
||||||
|
pendingResync.delete(logbookId)
|
||||||
|
syncLogbook(logbookId).catch((err) => console.warn('Deferred sync failed:', err))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// Push local sync queue items to the server
|
// Push local sync queue items to the server
|
||||||
async function pushChanges(logbookId: string): Promise<boolean> {
|
async function pushChanges(logbookId: string): Promise<boolean> {
|
||||||
const userId = localStorage.getItem('active_userid')
|
const userId = localStorage.getItem('active_userid')
|
||||||
if (!userId) return false
|
if (!userId) return false
|
||||||
|
|
||||||
// Fetch all pending queue items for this logbook
|
const pending = await coalesceSyncQueue(logbookId)
|
||||||
const pending = await db.syncQueue.where({ logbookId }).toArray()
|
|
||||||
if (pending.length === 0) return true
|
if (pending.length === 0) return true
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@@ -53,13 +104,14 @@ async function pushChanges(logbookId: string): Promise<boolean> {
|
|||||||
|
|
||||||
const { results } = await response.json()
|
const { results } = await response.json()
|
||||||
|
|
||||||
// Process results
|
// Match results by index — payloadId alone is not unique in the queue
|
||||||
for (const res of results) {
|
for (let i = 0; i < results.length; i++) {
|
||||||
|
const res = results[i]
|
||||||
|
const queueItem = pending[i]
|
||||||
|
if (!queueItem) continue
|
||||||
|
|
||||||
if (res.status === 'success' || res.status === 'conflict') {
|
if (res.status === 'success' || res.status === 'conflict') {
|
||||||
// Find matching queue item
|
if (queueItem.id !== undefined) {
|
||||||
const queueItem = pending.find((item) => item.payloadId === res.payloadId)
|
|
||||||
if (queueItem && queueItem.id !== undefined) {
|
|
||||||
// Delete from sync queue
|
|
||||||
await db.syncQueue.delete(queueItem.id)
|
await db.syncQueue.delete(queueItem.id)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -73,6 +125,21 @@ async function pushChanges(logbookId: string): Promise<boolean> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function flushPushQueue(logbookId: string): Promise<boolean> {
|
||||||
|
let ok = true
|
||||||
|
for (let attempt = 0; attempt < 5; attempt++) {
|
||||||
|
const before = await db.syncQueue.where({ logbookId }).count()
|
||||||
|
if (before === 0) return ok
|
||||||
|
|
||||||
|
const pushed = await pushChanges(logbookId)
|
||||||
|
ok = ok && pushed
|
||||||
|
|
||||||
|
const after = await db.syncQueue.where({ logbookId }).count()
|
||||||
|
if (after === 0 || after === before) break
|
||||||
|
}
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
// Pull updates from the server and apply last-write-wins
|
// Pull updates from the server and apply last-write-wins
|
||||||
async function pullChanges(logbookId: string): Promise<boolean> {
|
async function pullChanges(logbookId: string): Promise<boolean> {
|
||||||
const userId = localStorage.getItem('active_userid')
|
const userId = localStorage.getItem('active_userid')
|
||||||
@@ -266,14 +333,20 @@ export async function syncLogbook(logbookId: string): Promise<boolean> {
|
|||||||
const masterKey = getActiveMasterKey()
|
const masterKey = getActiveMasterKey()
|
||||||
if (!masterKey) return false
|
if (!masterKey) return false
|
||||||
|
|
||||||
if (syncingLogbooks.has(logbookId)) return false
|
if (syncingLogbooks.has(logbookId)) {
|
||||||
|
scheduleResync(logbookId)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
syncingLogbooks.add(logbookId)
|
syncingLogbooks.add(logbookId)
|
||||||
setSyncing(true)
|
setSyncing(true)
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const pushed = await pushChanges(logbookId)
|
const pushed = await flushPushQueue(logbookId)
|
||||||
const pulled = await pullChanges(logbookId)
|
const pulled = await pullChanges(logbookId)
|
||||||
return pushed && pulled;
|
// Push again in case pull surfaced nothing but queue grew during pull
|
||||||
|
const pushedAfterPull = await flushPushQueue(logbookId)
|
||||||
|
return pushed && pulled && pushedAfterPull
|
||||||
} finally {
|
} finally {
|
||||||
syncingLogbooks.delete(logbookId)
|
syncingLogbooks.delete(logbookId)
|
||||||
setSyncing(syncingLogbooks.size > 0)
|
setSyncing(syncingLogbooks.size > 0)
|
||||||
@@ -304,7 +377,7 @@ export async function syncAllLogbooks(): Promise<void> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Setup background sync intervals
|
// Setup background sync intervals
|
||||||
let syncIntervalId: any = null
|
let syncIntervalId: ReturnType<typeof setInterval> | null = null
|
||||||
|
|
||||||
export function startBackgroundSync(intervalMs = 30000) {
|
export function startBackgroundSync(intervalMs = 30000) {
|
||||||
if (syncIntervalId) clearInterval(syncIntervalId)
|
if (syncIntervalId) clearInterval(syncIntervalId)
|
||||||
|
|||||||
+28
-19
@@ -103,15 +103,34 @@ router.post('/push', async (req: any, res) => {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse Payload parameters
|
if (action === 'delete') {
|
||||||
|
if (type === 'yacht') {
|
||||||
|
await prisma.yachtPayload.deleteMany({ where: { logbookId } })
|
||||||
|
} else if (type === 'deviation') {
|
||||||
|
await prisma.deviationPayload.deleteMany({ where: { logbookId } })
|
||||||
|
} else if (type === 'crew') {
|
||||||
|
await prisma.crewPayload.deleteMany({ where: { logbookId, payloadId } })
|
||||||
|
} else if (type === 'entry') {
|
||||||
|
await prisma.entryPayload.deleteMany({ where: { logbookId, payloadId } })
|
||||||
|
} else if (type === 'photo') {
|
||||||
|
await prisma.photoPayload.deleteMany({ where: { logbookId, payloadId } })
|
||||||
|
} else if (type === 'gpsTrack') {
|
||||||
|
await prisma.gpsTrackPayload.deleteMany({ where: { logbookId, entryId: payloadId } })
|
||||||
|
} else {
|
||||||
|
results.push({ payloadId, status: 'error', error: `Unsupported delete type: ${type}` })
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
results.push({ payloadId, status: 'success' })
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse payload for create/update operations
|
||||||
const parsed = JSON.parse(data)
|
const parsed = JSON.parse(data)
|
||||||
const encryptedData = parsed.encryptedData || parsed.ciphertext
|
const encryptedData = parsed.encryptedData || parsed.ciphertext
|
||||||
const { iv, tag } = parsed
|
const { iv, tag } = parsed
|
||||||
|
|
||||||
if (type === 'yacht') {
|
if (type === 'yacht') {
|
||||||
if (action === 'delete') {
|
{
|
||||||
await prisma.yachtPayload.deleteMany({ where: { logbookId } })
|
|
||||||
} else {
|
|
||||||
const existing = await prisma.yachtPayload.findUnique({ where: { logbookId } })
|
const existing = await prisma.yachtPayload.findUnique({ where: { logbookId } })
|
||||||
if (existing && new Date(existing.updatedAt) > itemUpdatedAt) {
|
if (existing && new Date(existing.updatedAt) > itemUpdatedAt) {
|
||||||
results.push({ payloadId, status: 'conflict', reason: 'Server version is newer' })
|
results.push({ payloadId, status: 'conflict', reason: 'Server version is newer' })
|
||||||
@@ -124,9 +143,7 @@ router.post('/push', async (req: any, res) => {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
} else if (type === 'deviation') {
|
} else if (type === 'deviation') {
|
||||||
if (action === 'delete') {
|
{
|
||||||
await prisma.deviationPayload.deleteMany({ where: { logbookId } })
|
|
||||||
} else {
|
|
||||||
const existing = await prisma.deviationPayload.findUnique({ where: { logbookId } })
|
const existing = await prisma.deviationPayload.findUnique({ where: { logbookId } })
|
||||||
if (existing && new Date(existing.updatedAt) > itemUpdatedAt) {
|
if (existing && new Date(existing.updatedAt) > itemUpdatedAt) {
|
||||||
results.push({ payloadId, status: 'conflict', reason: 'Server version is newer' })
|
results.push({ payloadId, status: 'conflict', reason: 'Server version is newer' })
|
||||||
@@ -139,9 +156,7 @@ router.post('/push', async (req: any, res) => {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
} else if (type === 'crew') {
|
} else if (type === 'crew') {
|
||||||
if (action === 'delete') {
|
{
|
||||||
await prisma.crewPayload.deleteMany({ where: { logbookId, payloadId } })
|
|
||||||
} else {
|
|
||||||
const existing = await prisma.crewPayload.findUnique({
|
const existing = await prisma.crewPayload.findUnique({
|
||||||
where: { logbookId_payloadId: { logbookId, payloadId } }
|
where: { logbookId_payloadId: { logbookId, payloadId } }
|
||||||
})
|
})
|
||||||
@@ -156,9 +171,7 @@ router.post('/push', async (req: any, res) => {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
} else if (type === 'entry') {
|
} else if (type === 'entry') {
|
||||||
if (action === 'delete') {
|
{
|
||||||
await prisma.entryPayload.deleteMany({ where: { logbookId, payloadId } })
|
|
||||||
} else {
|
|
||||||
const existing = await prisma.entryPayload.findUnique({
|
const existing = await prisma.entryPayload.findUnique({
|
||||||
where: { logbookId_payloadId: { logbookId, payloadId } }
|
where: { logbookId_payloadId: { logbookId, payloadId } }
|
||||||
})
|
})
|
||||||
@@ -173,9 +186,7 @@ router.post('/push', async (req: any, res) => {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
} else if (type === 'photo') {
|
} else if (type === 'photo') {
|
||||||
if (action === 'delete') {
|
{
|
||||||
await prisma.photoPayload.deleteMany({ where: { logbookId, payloadId } })
|
|
||||||
} else {
|
|
||||||
const existing = await prisma.photoPayload.findUnique({
|
const existing = await prisma.photoPayload.findUnique({
|
||||||
where: { logbookId_payloadId: { logbookId, payloadId } }
|
where: { logbookId_payloadId: { logbookId, payloadId } }
|
||||||
})
|
})
|
||||||
@@ -191,9 +202,7 @@ router.post('/push', async (req: any, res) => {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
} else if (type === 'gpsTrack') {
|
} else if (type === 'gpsTrack') {
|
||||||
if (action === 'delete') {
|
{
|
||||||
await prisma.gpsTrackPayload.deleteMany({ where: { logbookId, entryId: payloadId } })
|
|
||||||
} else {
|
|
||||||
const existing = await prisma.gpsTrackPayload.findUnique({
|
const existing = await prisma.gpsTrackPayload.findUnique({
|
||||||
where: { entryId: payloadId }
|
where: { entryId: payloadId }
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user