// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sync

import (
	"context"
	"encoding/json"
	"fmt"
	"sync"
	"testing"
	"time"

	"github.com/matrix-org/dendrite/clientapi/auth/authtypes"

	"github.com/matrix-org/dendrite/syncapi/types"
	"github.com/matrix-org/gomatrixserverlib"
	"github.com/matrix-org/util"
)

var (
	randomMessageEvent  gomatrixserverlib.HeaderedEvent
	aliceInviteBobEvent gomatrixserverlib.HeaderedEvent
	bobLeaveEvent       gomatrixserverlib.HeaderedEvent
	syncPositionVeryOld types.PaginationToken
	syncPositionBefore  types.PaginationToken
	syncPositionAfter   types.PaginationToken
	syncPositionNewEDU  types.PaginationToken
	syncPositionAfter2  types.PaginationToken
)

var (
	roomID = "!test:localhost"
	alice  = "@alice:localhost"
	bob    = "@bob:localhost"
)

func init() {
	baseSyncPos := types.PaginationToken{
		PDUPosition:       0,
		EDUTypingPosition: 0,
	}

	syncPositionVeryOld = baseSyncPos
	syncPositionVeryOld.PDUPosition = 5

	syncPositionBefore = baseSyncPos
	syncPositionBefore.PDUPosition = 11

	syncPositionAfter = baseSyncPos
	syncPositionAfter.PDUPosition = 12

	syncPositionNewEDU = syncPositionAfter
	syncPositionNewEDU.EDUTypingPosition = 1

	syncPositionAfter2 = baseSyncPos
	syncPositionAfter2.PDUPosition = 13

	var err error
	err = json.Unmarshal([]byte(`{
		"_room_version": "1",
		"type": "m.room.message",
		"content": {
			"body": "Hello World",
			"msgtype": "m.text"
		},
		"sender": "@noone:localhost",
		"room_id": "`+roomID+`",
		"origin": "localhost",
		"origin_server_ts": 12345,
		"event_id": "$randomMessageEvent:localhost"
	}`), &randomMessageEvent)
	if err != nil {
		panic(err)
	}
	err = json.Unmarshal([]byte(`{
		"_room_version": "1",
		"type": "m.room.member",
		"state_key": "`+bob+`",
		"content": {
			"membership": "invite"
		},
		"sender": "`+alice+`",
		"room_id": "`+roomID+`",
		"origin": "localhost",
		"origin_server_ts": 12345,
		"event_id": "$aliceInviteBobEvent:localhost"
	}`), &aliceInviteBobEvent)
	if err != nil {
		panic(err)
	}
	err = json.Unmarshal([]byte(`{
		"_room_version": "1",
		"type": "m.room.member",
		"state_key": "`+bob+`",
		"content": {
			"membership": "leave"
		},
		"sender": "`+bob+`",
		"room_id": "`+roomID+`",
		"origin": "localhost",
		"origin_server_ts": 12345,
		"event_id": "$bobLeaveEvent:localhost"
	}`), &bobLeaveEvent)
	if err != nil {
		panic(err)
	}
}

// Test that the current position is returned if a request is already behind.
func TestImmediateNotification(t *testing.T) {
	n := NewNotifier(syncPositionBefore)
	pos, err := waitForEvents(n, newTestSyncRequest(alice, syncPositionVeryOld))
	if err != nil {
		t.Fatalf("TestImmediateNotification error: %s", err)
	}
	if pos != syncPositionBefore {
		t.Fatalf("TestImmediateNotification want %v, got %v", syncPositionBefore, pos)
	}
}

// Test that new events to a joined room unblocks the request.
func TestNewEventAndJoinedToRoom(t *testing.T) {
	n := NewNotifier(syncPositionBefore)
	n.setUsersJoinedToRooms(map[string][]string{
		roomID: {alice, bob},
	})

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionBefore))
		if err != nil {
			t.Errorf("TestNewEventAndJoinedToRoom error: %w", err)
		}
		if pos != syncPositionAfter {
			t.Errorf("TestNewEventAndJoinedToRoom want %v, got %v", syncPositionAfter, pos)
		}
		wg.Done()
	}()

	stream := lockedFetchUserStream(n, bob)
	waitForBlocking(stream, 1)

	n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)

	wg.Wait()
}

// Test that an invite unblocks the request
func TestNewInviteEventForUser(t *testing.T) {
	n := NewNotifier(syncPositionBefore)
	n.setUsersJoinedToRooms(map[string][]string{
		roomID: {alice, bob},
	})

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionBefore))
		if err != nil {
			t.Errorf("TestNewInviteEventForUser error: %w", err)
		}
		if pos != syncPositionAfter {
			t.Errorf("TestNewInviteEventForUser want %v, got %v", syncPositionAfter, pos)
		}
		wg.Done()
	}()

	stream := lockedFetchUserStream(n, bob)
	waitForBlocking(stream, 1)

	n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionAfter)

	wg.Wait()
}

// Test an EDU-only update wakes up the request.
func TestEDUWakeup(t *testing.T) {
	n := NewNotifier(syncPositionAfter)
	n.setUsersJoinedToRooms(map[string][]string{
		roomID: {alice, bob},
	})

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionAfter))
		if err != nil {
			t.Errorf("TestNewInviteEventForUser error: %w", err)
		}
		if pos != syncPositionNewEDU {
			t.Errorf("TestNewInviteEventForUser want %v, got %v", syncPositionNewEDU, pos)
		}
		wg.Done()
	}()

	stream := lockedFetchUserStream(n, bob)
	waitForBlocking(stream, 1)

	n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionNewEDU)

	wg.Wait()
}

// Test that all blocked requests get woken up on a new event.
func TestMultipleRequestWakeup(t *testing.T) {
	n := NewNotifier(syncPositionBefore)
	n.setUsersJoinedToRooms(map[string][]string{
		roomID: {alice, bob},
	})

	var wg sync.WaitGroup
	wg.Add(3)
	poll := func() {
		pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionBefore))
		if err != nil {
			t.Errorf("TestMultipleRequestWakeup error: %w", err)
		}
		if pos != syncPositionAfter {
			t.Errorf("TestMultipleRequestWakeup want %v, got %v", syncPositionAfter, pos)
		}
		wg.Done()
	}
	go poll()
	go poll()
	go poll()

	stream := lockedFetchUserStream(n, bob)
	waitForBlocking(stream, 3)

	n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)

	wg.Wait()

	numWaiting := stream.NumWaiting()
	if numWaiting != 0 {
		t.Errorf("TestMultipleRequestWakeup NumWaiting() want 0, got %d", numWaiting)
	}
}

// Test that you stop getting woken up when you leave a room.
func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
	// listen as bob. Make bob leave room. Make alice send event to room.
	// Make sure alice gets woken up only and not bob as well.
	n := NewNotifier(syncPositionBefore)
	n.setUsersJoinedToRooms(map[string][]string{
		roomID: {alice, bob},
	})

	var leaveWG sync.WaitGroup

	// Make bob leave the room
	leaveWG.Add(1)
	go func() {
		pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionBefore))
		if err != nil {
			t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %w", err)
		}
		if pos != syncPositionAfter {
			t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %v, got %v", syncPositionAfter, pos)
		}
		leaveWG.Done()
	}()
	bobStream := lockedFetchUserStream(n, bob)
	waitForBlocking(bobStream, 1)
	n.OnNewEvent(&bobLeaveEvent, "", nil, syncPositionAfter)
	leaveWG.Wait()

	// send an event into the room. Make sure alice gets it. Bob should not.
	var aliceWG sync.WaitGroup
	aliceStream := lockedFetchUserStream(n, alice)
	aliceWG.Add(1)
	go func() {
		pos, err := waitForEvents(n, newTestSyncRequest(alice, syncPositionAfter))
		if err != nil {
			t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %w", err)
		}
		if pos != syncPositionAfter2 {
			t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %v, got %v", syncPositionAfter2, pos)
		}
		aliceWG.Done()
	}()

	go func() {
		// this should timeout with an error (but the main goroutine won't wait for the timeout explicitly)
		_, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionAfter))
		if err == nil {
			t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom expect error but got nil")
		}
	}()

	waitForBlocking(aliceStream, 1)
	waitForBlocking(bobStream, 1)

	n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter2)
	aliceWG.Wait()

	// it's possible that at this point alice has been informed and bob is about to be informed, so wait
	// for a fraction of a second to account for this race
	time.Sleep(1 * time.Millisecond)
}

func waitForEvents(n *Notifier, req syncRequest) (types.PaginationToken, error) {
	listener := n.GetListener(req)
	defer listener.Close()

	select {
	case <-time.After(5 * time.Second):
		return types.PaginationToken{}, fmt.Errorf(
			"waitForEvents timed out waiting for %s (pos=%v)", req.device.UserID, req.since,
		)
	case <-listener.GetNotifyChannel(*req.since):
		p := listener.GetSyncPosition()
		return p, nil
	}
}

// Wait until something is Wait()ing on the user stream.
func waitForBlocking(s *UserStream, numBlocking uint) {
	for numBlocking != s.NumWaiting() {
		// This is horrible but I don't want to add a signalling mechanism JUST for testing.
		time.Sleep(1 * time.Microsecond)
	}
}

// lockedFetchUserStream invokes Notifier.fetchUserStream, respecting Notifier.streamLock.
// A new stream is made if it doesn't exist already.
func lockedFetchUserStream(n *Notifier, userID string) *UserStream {
	n.streamLock.Lock()
	defer n.streamLock.Unlock()

	return n.fetchUserStream(userID, true)
}

func newTestSyncRequest(userID string, since types.PaginationToken) syncRequest {
	return syncRequest{
		device:        authtypes.Device{UserID: userID},
		timeout:       1 * time.Minute,
		since:         &since,
		wantFullState: false,
		limit:         defaultTimelineLimit,
		log:           util.GetLogger(context.TODO()),
		ctx:           context.TODO(),
	}
}