Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 415a7a4e4e | |||
| cb4f1b5989 | |||
| b37f935e87 | |||
| 213001b139 | |||
| 95cf42d1f6 | |||
| 95cfc3872b |
+131
-13
@@ -1,8 +1,9 @@
|
||||
import { db } from './db.js'
|
||||
import { db, type SyncQueueItem } from './db.js'
|
||||
import { getActiveMasterKey } from './auth.js'
|
||||
|
||||
const API_BASE = '/api/sync'
|
||||
const syncingLogbooks = new Set<string>()
|
||||
const pendingResync = new Set<string>()
|
||||
|
||||
let isSyncing = false
|
||||
const listeners = new Set<(syncing: boolean) => void>()
|
||||
@@ -27,13 +28,108 @@ function isNewer(timeA: string | Date, timeB: string | Date): boolean {
|
||||
return new Date(timeA).getTime() > new Date(timeB).getTime()
|
||||
}
|
||||
|
||||
function entityKey(item: SyncQueueItem): string {
|
||||
return `${item.type}:${item.payloadId}`
|
||||
}
|
||||
|
||||
function latestQueueItem(items: SyncQueueItem[]): SyncQueueItem {
|
||||
return items.reduce((a, b) => ((a.id ?? 0) > (b.id ?? 0) ? a : b))
|
||||
}
|
||||
|
||||
async function entityExistsLocally(item: SyncQueueItem): Promise<boolean> {
|
||||
switch (item.type) {
|
||||
case 'logbook':
|
||||
return !!(await db.logbooks.get(item.payloadId))
|
||||
case 'yacht':
|
||||
return !!(await db.yachts.get(item.logbookId))
|
||||
case 'deviation':
|
||||
return !!(await db.deviations.get(item.logbookId))
|
||||
case 'crew':
|
||||
return !!(await db.crews.get(item.payloadId))
|
||||
case 'entry':
|
||||
return !!(await db.entries.get(item.payloadId))
|
||||
case 'photo':
|
||||
return !!(await db.photos.get(item.payloadId))
|
||||
case 'gpsTrack':
|
||||
return !!(await db.gpsTracks.get(item.payloadId))
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Pick one queue entry per entity. If the record still exists locally, the latest
|
||||
// action wins (supports recreate-after-delete). If it was removed locally, a delete
|
||||
// wins over stale upserts with higher IDs; orphaned upserts are dropped entirely.
|
||||
async function resolveCoalescedItem(group: SyncQueueItem[]): Promise<SyncQueueItem | null> {
|
||||
const exists = await entityExistsLocally(group[0])
|
||||
if (exists) {
|
||||
return latestQueueItem(group)
|
||||
}
|
||||
|
||||
const deletes = group.filter((item) => item.action === 'delete')
|
||||
if (deletes.length > 0) {
|
||||
return latestQueueItem(deletes)
|
||||
}
|
||||
|
||||
return null
|
||||
}
|
||||
|
||||
async function coalesceSyncQueue(logbookId: string): Promise<SyncQueueItem[]> {
|
||||
const pending = await db.syncQueue.where({ logbookId }).toArray()
|
||||
if (pending.length <= 1) return pending
|
||||
|
||||
const byEntity = new Map<string, SyncQueueItem[]>()
|
||||
for (const item of pending) {
|
||||
const key = entityKey(item)
|
||||
const group = byEntity.get(key)
|
||||
if (group) group.push(item)
|
||||
else byEntity.set(key, [item])
|
||||
}
|
||||
|
||||
const kept: SyncQueueItem[] = []
|
||||
const staleIds: number[] = []
|
||||
|
||||
for (const group of byEntity.values()) {
|
||||
const winner = await resolveCoalescedItem(group)
|
||||
|
||||
if (winner) {
|
||||
kept.push(winner)
|
||||
for (const item of group) {
|
||||
if (item.id !== undefined && item.id !== winner.id) {
|
||||
staleIds.push(item.id)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (const item of group) {
|
||||
if (item.id !== undefined) {
|
||||
staleIds.push(item.id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (staleIds.length > 0) {
|
||||
await db.syncQueue.bulkDelete(staleIds)
|
||||
}
|
||||
|
||||
return kept.sort((a, b) => (a.id ?? 0) - (b.id ?? 0))
|
||||
}
|
||||
|
||||
function scheduleResync(logbookId: string) {
|
||||
if (pendingResync.has(logbookId)) return
|
||||
pendingResync.add(logbookId)
|
||||
queueMicrotask(() => {
|
||||
pendingResync.delete(logbookId)
|
||||
syncLogbook(logbookId).catch((err) => console.warn('Deferred sync failed:', err))
|
||||
})
|
||||
}
|
||||
|
||||
// Push local sync queue items to the server
|
||||
async function pushChanges(logbookId: string): Promise<boolean> {
|
||||
const userId = localStorage.getItem('active_userid')
|
||||
if (!userId) return false
|
||||
|
||||
// Fetch all pending queue items for this logbook
|
||||
const pending = await db.syncQueue.where({ logbookId }).toArray()
|
||||
const pending = await coalesceSyncQueue(logbookId)
|
||||
if (pending.length === 0) return true
|
||||
|
||||
try {
|
||||
@@ -53,13 +149,14 @@ async function pushChanges(logbookId: string): Promise<boolean> {
|
||||
|
||||
const { results } = await response.json()
|
||||
|
||||
// Process results
|
||||
for (const res of results) {
|
||||
// Match results by index — payloadId alone is not unique in the queue
|
||||
for (let i = 0; i < results.length; i++) {
|
||||
const res = results[i]
|
||||
const queueItem = pending[i]
|
||||
if (!queueItem) continue
|
||||
|
||||
if (res.status === 'success' || res.status === 'conflict') {
|
||||
// Find matching queue item
|
||||
const queueItem = pending.find((item) => item.payloadId === res.payloadId)
|
||||
if (queueItem && queueItem.id !== undefined) {
|
||||
// Delete from sync queue
|
||||
if (queueItem.id !== undefined) {
|
||||
await db.syncQueue.delete(queueItem.id)
|
||||
}
|
||||
} else {
|
||||
@@ -73,6 +170,21 @@ async function pushChanges(logbookId: string): Promise<boolean> {
|
||||
}
|
||||
}
|
||||
|
||||
async function flushPushQueue(logbookId: string): Promise<boolean> {
|
||||
let ok = true
|
||||
for (let attempt = 0; attempt < 5; attempt++) {
|
||||
const before = await db.syncQueue.where({ logbookId }).count()
|
||||
if (before === 0) return ok
|
||||
|
||||
const pushed = await pushChanges(logbookId)
|
||||
ok = ok && pushed
|
||||
|
||||
const after = await db.syncQueue.where({ logbookId }).count()
|
||||
if (after === 0 || after === before) break
|
||||
}
|
||||
return ok
|
||||
}
|
||||
|
||||
// Pull updates from the server and apply last-write-wins
|
||||
async function pullChanges(logbookId: string): Promise<boolean> {
|
||||
const userId = localStorage.getItem('active_userid')
|
||||
@@ -266,14 +378,20 @@ export async function syncLogbook(logbookId: string): Promise<boolean> {
|
||||
const masterKey = getActiveMasterKey()
|
||||
if (!masterKey) return false
|
||||
|
||||
if (syncingLogbooks.has(logbookId)) return false
|
||||
if (syncingLogbooks.has(logbookId)) {
|
||||
scheduleResync(logbookId)
|
||||
return false
|
||||
}
|
||||
|
||||
syncingLogbooks.add(logbookId)
|
||||
setSyncing(true)
|
||||
|
||||
try {
|
||||
const pushed = await pushChanges(logbookId)
|
||||
const pushed = await flushPushQueue(logbookId)
|
||||
const pulled = await pullChanges(logbookId)
|
||||
return pushed && pulled;
|
||||
// Push again in case pull surfaced nothing but queue grew during pull
|
||||
const pushedAfterPull = await flushPushQueue(logbookId)
|
||||
return pushed && pulled && pushedAfterPull
|
||||
} finally {
|
||||
syncingLogbooks.delete(logbookId)
|
||||
setSyncing(syncingLogbooks.size > 0)
|
||||
@@ -304,7 +422,7 @@ export async function syncAllLogbooks(): Promise<void> {
|
||||
}
|
||||
|
||||
// Setup background sync intervals
|
||||
let syncIntervalId: any = null
|
||||
let syncIntervalId: ReturnType<typeof setInterval> | null = null
|
||||
|
||||
export function startBackgroundSync(intervalMs = 30000) {
|
||||
if (syncIntervalId) clearInterval(syncIntervalId)
|
||||
|
||||
+28
-19
@@ -103,15 +103,34 @@ router.post('/push', async (req: any, res) => {
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse Payload parameters
|
||||
if (action === 'delete') {
|
||||
if (type === 'yacht') {
|
||||
await prisma.yachtPayload.deleteMany({ where: { logbookId } })
|
||||
} else if (type === 'deviation') {
|
||||
await prisma.deviationPayload.deleteMany({ where: { logbookId } })
|
||||
} else if (type === 'crew') {
|
||||
await prisma.crewPayload.deleteMany({ where: { logbookId, payloadId } })
|
||||
} else if (type === 'entry') {
|
||||
await prisma.entryPayload.deleteMany({ where: { logbookId, payloadId } })
|
||||
} else if (type === 'photo') {
|
||||
await prisma.photoPayload.deleteMany({ where: { logbookId, payloadId } })
|
||||
} else if (type === 'gpsTrack') {
|
||||
await prisma.gpsTrackPayload.deleteMany({ where: { logbookId, entryId: payloadId } })
|
||||
} else {
|
||||
results.push({ payloadId, status: 'error', error: `Unsupported delete type: ${type}` })
|
||||
continue
|
||||
}
|
||||
results.push({ payloadId, status: 'success' })
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse payload for create/update operations
|
||||
const parsed = JSON.parse(data)
|
||||
const encryptedData = parsed.encryptedData || parsed.ciphertext
|
||||
const { iv, tag } = parsed
|
||||
|
||||
if (type === 'yacht') {
|
||||
if (action === 'delete') {
|
||||
await prisma.yachtPayload.deleteMany({ where: { logbookId } })
|
||||
} else {
|
||||
{
|
||||
const existing = await prisma.yachtPayload.findUnique({ where: { logbookId } })
|
||||
if (existing && new Date(existing.updatedAt) > itemUpdatedAt) {
|
||||
results.push({ payloadId, status: 'conflict', reason: 'Server version is newer' })
|
||||
@@ -124,9 +143,7 @@ router.post('/push', async (req: any, res) => {
|
||||
})
|
||||
}
|
||||
} else if (type === 'deviation') {
|
||||
if (action === 'delete') {
|
||||
await prisma.deviationPayload.deleteMany({ where: { logbookId } })
|
||||
} else {
|
||||
{
|
||||
const existing = await prisma.deviationPayload.findUnique({ where: { logbookId } })
|
||||
if (existing && new Date(existing.updatedAt) > itemUpdatedAt) {
|
||||
results.push({ payloadId, status: 'conflict', reason: 'Server version is newer' })
|
||||
@@ -139,9 +156,7 @@ router.post('/push', async (req: any, res) => {
|
||||
})
|
||||
}
|
||||
} else if (type === 'crew') {
|
||||
if (action === 'delete') {
|
||||
await prisma.crewPayload.deleteMany({ where: { logbookId, payloadId } })
|
||||
} else {
|
||||
{
|
||||
const existing = await prisma.crewPayload.findUnique({
|
||||
where: { logbookId_payloadId: { logbookId, payloadId } }
|
||||
})
|
||||
@@ -156,9 +171,7 @@ router.post('/push', async (req: any, res) => {
|
||||
})
|
||||
}
|
||||
} else if (type === 'entry') {
|
||||
if (action === 'delete') {
|
||||
await prisma.entryPayload.deleteMany({ where: { logbookId, payloadId } })
|
||||
} else {
|
||||
{
|
||||
const existing = await prisma.entryPayload.findUnique({
|
||||
where: { logbookId_payloadId: { logbookId, payloadId } }
|
||||
})
|
||||
@@ -173,9 +186,7 @@ router.post('/push', async (req: any, res) => {
|
||||
})
|
||||
}
|
||||
} else if (type === 'photo') {
|
||||
if (action === 'delete') {
|
||||
await prisma.photoPayload.deleteMany({ where: { logbookId, payloadId } })
|
||||
} else {
|
||||
{
|
||||
const existing = await prisma.photoPayload.findUnique({
|
||||
where: { logbookId_payloadId: { logbookId, payloadId } }
|
||||
})
|
||||
@@ -191,9 +202,7 @@ router.post('/push', async (req: any, res) => {
|
||||
})
|
||||
}
|
||||
} else if (type === 'gpsTrack') {
|
||||
if (action === 'delete') {
|
||||
await prisma.gpsTrackPayload.deleteMany({ where: { logbookId, entryId: payloadId } })
|
||||
} else {
|
||||
{
|
||||
const existing = await prisma.gpsTrackPayload.findUnique({
|
||||
where: { entryId: payloadId }
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user