| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566 |
- // Copyright 2018 The Gitea Authors. All rights reserved.
- // SPDX-License-Identifier: MIT
-
- package integration
-
- import (
- "fmt"
- "net/http"
- "testing"
-
- auth_model "code.gitea.io/gitea/models/auth"
- "code.gitea.io/gitea/models/unittest"
- user_model "code.gitea.io/gitea/models/user"
- "code.gitea.io/gitea/modules/log"
- api "code.gitea.io/gitea/modules/structs"
- "code.gitea.io/gitea/tests"
-
- "github.com/stretchr/testify/assert"
- )
-
- // TestAPICreateAndDeleteToken tests that token that was just created can be deleted
- func TestAPICreateAndDeleteToken(t *testing.T) {
- defer tests.PrepareTestEnv(t)()
- user := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 1})
-
- newAccessToken := createAPIAccessTokenWithoutCleanUp(t, "test-key-1", user, []auth_model.AccessTokenScope{auth_model.AccessTokenScopeAll})
- deleteAPIAccessToken(t, newAccessToken, user)
-
- newAccessToken = createAPIAccessTokenWithoutCleanUp(t, "test-key-2", user, []auth_model.AccessTokenScope{auth_model.AccessTokenScopeAll})
- deleteAPIAccessToken(t, newAccessToken, user)
- }
-
- // TestAPIDeleteMissingToken ensures that error is thrown when token not found
- func TestAPIDeleteMissingToken(t *testing.T) {
- defer tests.PrepareTestEnv(t)()
- user := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 1})
-
- req := NewRequestf(t, "DELETE", "/api/v1/users/user1/tokens/%d", unittest.NonexistentID).
- AddBasicAuth(user.Name)
- MakeRequest(t, req, http.StatusNotFound)
- }
-
- // TestAPIGetTokensPermission ensures that only the admin can get tokens from other users
- func TestAPIGetTokensPermission(t *testing.T) {
- defer tests.PrepareTestEnv(t)()
-
- // admin can get tokens for other users
- user := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 1})
- req := NewRequest(t, "GET", "/api/v1/users/user2/tokens").
- AddBasicAuth(user.Name)
- MakeRequest(t, req, http.StatusOK)
-
- // non-admin can get tokens for himself
- user = unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
- req = NewRequest(t, "GET", "/api/v1/users/user2/tokens").
- AddBasicAuth(user.Name)
- MakeRequest(t, req, http.StatusOK)
-
- // non-admin can't get tokens for other users
- user = unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 4})
- req = NewRequest(t, "GET", "/api/v1/users/user2/tokens").
- AddBasicAuth(user.Name)
- MakeRequest(t, req, http.StatusForbidden)
- }
-
- // TestAPIDeleteTokensPermission ensures that only the admin can delete tokens from other users
- func TestAPIDeleteTokensPermission(t *testing.T) {
- defer tests.PrepareTestEnv(t)()
-
- admin := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 1})
- user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
- user4 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 4})
-
- // admin can delete tokens for other users
- createAPIAccessTokenWithoutCleanUp(t, "test-key-1", user2, []auth_model.AccessTokenScope{auth_model.AccessTokenScopeAll})
- req := NewRequest(t, "DELETE", "/api/v1/users/"+user2.LoginName+"/tokens/test-key-1").
- AddBasicAuth(admin.Name)
- MakeRequest(t, req, http.StatusNoContent)
-
- // non-admin can delete tokens for himself
- createAPIAccessTokenWithoutCleanUp(t, "test-key-2", user2, []auth_model.AccessTokenScope{auth_model.AccessTokenScopeAll})
- req = NewRequest(t, "DELETE", "/api/v1/users/"+user2.LoginName+"/tokens/test-key-2").
- AddBasicAuth(user2.Name)
- MakeRequest(t, req, http.StatusNoContent)
-
- // non-admin can't delete tokens for other users
- createAPIAccessTokenWithoutCleanUp(t, "test-key-3", user2, []auth_model.AccessTokenScope{auth_model.AccessTokenScopeAll})
- req = NewRequest(t, "DELETE", "/api/v1/users/"+user2.LoginName+"/tokens/test-key-3").
- AddBasicAuth(user4.Name)
- MakeRequest(t, req, http.StatusForbidden)
- }
-
- type permission struct {
- category auth_model.AccessTokenScopeCategory
- level auth_model.AccessTokenScopeLevel
- }
-
- type requiredScopeTestCase struct {
- url string
- method string
- requiredPermissions []permission
- }
-
- func (c *requiredScopeTestCase) Name() string {
- return fmt.Sprintf("%v %v", c.method, c.url)
- }
-
- // TestAPIDeniesPermissionBasedOnTokenScope tests that API routes forbid access
- // when the correct token scope is not included.
- func TestAPIDeniesPermissionBasedOnTokenScope(t *testing.T) {
- defer tests.PrepareTestEnv(t)()
-
- // We'll assert that each endpoint, when fetched with a token with all
- // scopes *except* the ones specified, a forbidden status code is returned.
- //
- // This is to protect against endpoints having their access check copied
- // from other endpoints and not updated.
- //
- // Test cases are in alphabetical order by URL.
- testCases := []requiredScopeTestCase{
- {
- "/api/v1/admin/emails",
- "GET",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryAdmin,
- auth_model.Read,
- },
- },
- },
- {
- "/api/v1/admin/users",
- "GET",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryAdmin,
- auth_model.Read,
- },
- },
- },
- {
- "/api/v1/admin/users",
- "POST",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryAdmin,
- auth_model.Write,
- },
- },
- },
- {
- "/api/v1/admin/users/user2",
- "PATCH",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryAdmin,
- auth_model.Write,
- },
- },
- },
- {
- "/api/v1/admin/users/user2/orgs",
- "GET",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryAdmin,
- auth_model.Read,
- },
- },
- },
- {
- "/api/v1/admin/users/user2/orgs",
- "POST",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryAdmin,
- auth_model.Write,
- },
- },
- },
- {
- "/api/v1/admin/orgs",
- "GET",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryAdmin,
- auth_model.Read,
- },
- },
- },
- {
- "/api/v1/notifications",
- "GET",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryNotification,
- auth_model.Read,
- },
- },
- },
- {
- "/api/v1/notifications",
- "PUT",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryNotification,
- auth_model.Write,
- },
- },
- },
- {
- "/api/v1/org/org1/repos",
- "POST",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryOrganization,
- auth_model.Write,
- },
- {
- auth_model.AccessTokenScopeCategoryRepository,
- auth_model.Write,
- },
- },
- },
- {
- "/api/v1/packages/user1/type/name/1",
- "GET",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryPackage,
- auth_model.Read,
- },
- },
- },
- {
- "/api/v1/packages/user1/type/name/1",
- "DELETE",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryPackage,
- auth_model.Write,
- },
- },
- },
- {
- "/api/v1/repos/user1/repo1",
- "GET",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryRepository,
- auth_model.Read,
- },
- },
- },
- {
- "/api/v1/repos/user1/repo1",
- "PATCH",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryRepository,
- auth_model.Write,
- },
- },
- },
- {
- "/api/v1/repos/user1/repo1",
- "DELETE",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryRepository,
- auth_model.Write,
- },
- },
- },
- {
- "/api/v1/repos/user1/repo1/branches",
- "GET",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryRepository,
- auth_model.Read,
- },
- },
- },
- {
- "/api/v1/repos/user1/repo1/archive/foo",
- "GET",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryRepository,
- auth_model.Read,
- },
- },
- },
- {
- "/api/v1/repos/user1/repo1/issues",
- "GET",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryIssue,
- auth_model.Read,
- },
- },
- },
- {
- "/api/v1/repos/user1/repo1/media/foo",
- "GET",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryRepository,
- auth_model.Read,
- },
- },
- },
- {
- "/api/v1/repos/user1/repo1/raw/foo",
- "GET",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryRepository,
- auth_model.Read,
- },
- },
- },
- {
- "/api/v1/repos/user1/repo1/teams",
- "GET",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryRepository,
- auth_model.Read,
- },
- },
- },
- {
- "/api/v1/repos/user1/repo1/teams/team1",
- "PUT",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryRepository,
- auth_model.Write,
- },
- },
- },
- {
- "/api/v1/repos/user1/repo1/transfer",
- "POST",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryRepository,
- auth_model.Write,
- },
- },
- },
- // Private repo
- {
- "/api/v1/repos/user2/repo2",
- "GET",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryRepository,
- auth_model.Read,
- },
- },
- },
- // Private repo
- {
- "/api/v1/repos/user2/repo2",
- "GET",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryRepository,
- auth_model.Read,
- },
- },
- },
- {
- "/api/v1/user",
- "GET",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryUser,
- auth_model.Read,
- },
- },
- },
- {
- "/api/v1/user/emails",
- "GET",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryUser,
- auth_model.Read,
- },
- },
- },
- {
- "/api/v1/user/emails",
- "POST",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryUser,
- auth_model.Write,
- },
- },
- },
- {
- "/api/v1/user/emails",
- "DELETE",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryUser,
- auth_model.Write,
- },
- },
- },
- {
- "/api/v1/user/applications/oauth2",
- "GET",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryUser,
- auth_model.Read,
- },
- },
- },
- {
- "/api/v1/user/applications/oauth2",
- "POST",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryUser,
- auth_model.Write,
- },
- },
- },
- {
- "/api/v1/users/search",
- "GET",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryUser,
- auth_model.Read,
- },
- },
- },
- // Private user
- {
- "/api/v1/users/user31",
- "GET",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryUser,
- auth_model.Read,
- },
- },
- },
- // Private user
- {
- "/api/v1/users/user31/gpg_keys",
- "GET",
- []permission{
- {
- auth_model.AccessTokenScopeCategoryUser,
- auth_model.Read,
- },
- },
- },
- }
-
- // User needs to be admin so that we can verify that tokens without admin
- // scopes correctly deny access.
- user := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 1})
- assert.True(t, user.IsAdmin, "User needs to be admin")
-
- for _, testCase := range testCases {
- runTestCase(t, &testCase, user)
- }
- }
-
- // runTestCase Helper function to run a single test case.
- func runTestCase(t *testing.T, testCase *requiredScopeTestCase, user *user_model.User) {
- t.Run(testCase.Name(), func(t *testing.T) {
- defer tests.PrintCurrentTest(t)()
-
- // Create a token with all scopes NOT required by the endpoint.
- var unauthorizedScopes []auth_model.AccessTokenScope
- for _, category := range auth_model.AllAccessTokenScopeCategories {
- // For permissions, Write > Read > NoAccess. So we need to
- // find the minimum required, and only grant permission up to but
- // not including the minimum required.
- minRequiredLevel := auth_model.Write
- categoryIsRequired := false
- for _, requiredPermission := range testCase.requiredPermissions {
- if requiredPermission.category != category {
- continue
- }
- categoryIsRequired = true
- if requiredPermission.level < minRequiredLevel {
- minRequiredLevel = requiredPermission.level
- }
- }
- unauthorizedLevel := auth_model.Write
- if categoryIsRequired {
- if minRequiredLevel == auth_model.Read {
- unauthorizedLevel = auth_model.NoAccess
- } else if minRequiredLevel == auth_model.Write {
- unauthorizedLevel = auth_model.Read
- } else {
- assert.FailNow(t, "Invalid test case", "Unknown access token scope level: %v", minRequiredLevel)
- }
- }
-
- if unauthorizedLevel == auth_model.NoAccess {
- continue
- }
- cateogoryUnauthorizedScopes := auth_model.GetRequiredScopes(
- unauthorizedLevel,
- category)
- unauthorizedScopes = append(unauthorizedScopes, cateogoryUnauthorizedScopes...)
- }
-
- accessToken := createAPIAccessTokenWithoutCleanUp(t, "test-token", user, unauthorizedScopes)
- defer deleteAPIAccessToken(t, accessToken, user)
-
- // Request the endpoint. Verify that permission is denied.
- req := NewRequest(t, testCase.method, testCase.url).
- AddTokenAuth(accessToken.Token)
- MakeRequest(t, req, http.StatusForbidden)
- })
- }
-
- // createAPIAccessTokenWithoutCleanUp Create an API access token and assert that
- // creation succeeded. The caller is responsible for deleting the token.
- func createAPIAccessTokenWithoutCleanUp(t *testing.T, tokenName string, user *user_model.User, scopes []auth_model.AccessTokenScope) api.AccessToken {
- payload := map[string]any{
- "name": tokenName,
- "scopes": scopes,
- }
-
- log.Debug("Requesting creation of token with scopes: %v", scopes)
- req := NewRequestWithJSON(t, "POST", "/api/v1/users/"+user.LoginName+"/tokens", payload).
- AddBasicAuth(user.Name)
- resp := MakeRequest(t, req, http.StatusCreated)
-
- var newAccessToken api.AccessToken
- DecodeJSON(t, resp, &newAccessToken)
- unittest.AssertExistsAndLoadBean(t, &auth_model.AccessToken{
- ID: newAccessToken.ID,
- Name: newAccessToken.Name,
- Token: newAccessToken.Token,
- UID: user.ID,
- })
-
- return newAccessToken
- }
-
- // deleteAPIAccessToken deletes an API access token and assert that deletion succeeded.
- func deleteAPIAccessToken(t *testing.T, accessToken api.AccessToken, user *user_model.User) {
- req := NewRequestf(t, "DELETE", "/api/v1/users/"+user.LoginName+"/tokens/%d", accessToken.ID).
- AddBasicAuth(user.Name)
- MakeRequest(t, req, http.StatusNoContent)
-
- unittest.AssertNotExistsBean(t, &auth_model.AccessToken{ID: accessToken.ID})
- }
|