🏠 Homepage|💻 GitHub|🛠 Tools VS Code, Jetbrains|🤗 HF Repo|📄 Paper
👋 Join our Discord, Slack, Telegram, WeChat
BF16/FP16版本|BF16/FP16 version codegeex2-6b
CodeGeeX2 是多语言代码生成模型 CodeGeeX (KDD’23) 的第二代模型。CodeGeeX2 基于 ChatGLM2 架构加入代码预训练实现,得益于 ChatGLM2 的更优性能,CodeGeeX2 在多项指标上取得性能提升(+107% > CodeGeeX;仅60亿参数即超过150亿参数的 StarCoder-15B 近10%),更多特性包括:
下载地址:
pip install protobuf cpm_kernels torch>=2.0 gradio mdtex2html sentencepiece accelerate
- from modelscope import AutoTokenizer, AutoModel
- tokenizer = AutoTokenizer.from_pretrained("E:\Data\CodeGeeX2-6B-int4", trust_remote_code=True)
- model = AutoModel.from_pretrained("E:\Data\CodeGeeX2-6B-int4", trust_remote_code=True, device='cuda')
- model = model.eval()
-
- # remember adding a language tag for better performance
- prompt = "# language: Python\n# 用python写一个冒泡排序算法,并用中文逐行注释\n"
- # inputs = tokenizer.encode(prompt, return_tensors="pt").to(model.device)
- inputs = tokenizer.encode(prompt, return_tensors="pt", padding=True, truncation=True).to(model.device)
- # outputs = model.generate(inputs, max_length=256, top_k=1)
- outputs = model.generate(inputs, max_length=256)
- response = tokenizer.decode(outputs[0])
-
- print(response)
反馈结果
- # language: Python
- # 用python写一个冒泡排序算法,并用中文逐行注释
-
-
- def bubble_sort(list):
- """
- 冒泡排序算法
- :param list: 要排序的列表
- :return: 排序后的列表
- """
- for i in range(len(list) - 1):
- for j in range(len(list) - i - 1):
- if list[j] > list[j + 1]:
- list[j], list[j + 1] = list[j + 1], list[j]
- return list
-
-
- if __name__ == "__main__":
- list = [1, 3, 2, 4, 5, 6, 7, 9, 8]
- print(bubble_sort(list))
- from modelscope import AutoTokenizer, AutoModel
- tokenizer = AutoTokenizer.from_pretrained("E:\Data\CodeGeeX2-6B-int4", trust_remote_code=True)
- model = AutoModel.from_pretrained("E:\Data\CodeGeeX2-6B-int4", trust_remote_code=True, device='cuda')
- model = model.eval()
-
- # remember adding a language tag for better performance
- prompt = "# language: Python\n# 用python写一个用Bert结合对抗训练和对比学习实现SST-2数据集文本分类的代码,并用中文逐行注释\n"
- # inputs = tokenizer.encode(prompt, return_tensors="pt").to(model.device)
- inputs = tokenizer.encode(prompt, return_tensors="pt", padding=True, truncation=True).to(model.device)
- # outputs = model.generate(inputs, max_length=256, top_k=1)
- outputs = model.generate(inputs, max_length=20000)
- response = tokenizer.decode(outputs[0])
-
- print(response)
反馈结果
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- from pytorch_pretrained_bert import BertModel, BertTokenizer
- from torch.utils.data import TensorDataset, DataLoader, RandomSampler
- from torch.utils.tensorboard import SummaryWriter
- from tqdm import tqdm, trange
- import os
- import numpy as np
- import pandas as pd
- from sklearn.model_selection import train_test_split
- from sklearn.metrics import f1_score, accuracy_score
- from pprint import pprint
- import logging
- import argparse
-
- logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s: %(message)s")
-
-
- class SST2Dataset:
- def __init__(self, data_dir, tokenizer, max_seq_len, train_mode):
- self.data_dir = data_dir
- self.tokenizer = tokenizer
- self.max_seq_len = max_seq_len
- self.train_mode = train_mode
- self.data_df = self.load_data()
- self.train_df, self.valid_df = self.split_data()
- self.train_inputs, self.train_masks = self.tokenize_data(self.train_df)
- self.valid_inputs, self.valid_masks = self.tokenize_data(self.valid_df)
- self.train_labels = self.train_df["label"].tolist()
- self.valid_labels = self.valid_df["label"].tolist()
-
- def load_data(self):
- data_df = pd.read_csv(os.path.join(self.data_dir, "train.tsv"), sep="\t")
- return data_df
-
- def split_data(self):
- data_df = self.data_df
- train_df, valid_df = train_test_split(data_df, test_size=0.2, random_state=42)
- return train_df, valid_df
-
- def tokenize_data(self, data_df):
- inputs_1 = list(data_df["sentence1"])
- inputs_2 = list(data_df["sentence2"])
- inputs = inputs_1 + inputs_2
- masks = [1] * len(inputs_1) + [0] * len(inputs_2)
- inputs = [self.tokenizer.tokenize(sent)[:self.max_seq_len] for sent in inputs]
- inputs = [self.tokenizer.convert_tokens_to_ids(["[CLS]"] + input) for input in inputs]
- inputs = [input[0 : self.max_seq_len] + [0] * (self.max_seq_len - len(input)) for input in inputs]
- inputs = torch.tensor(inputs)
- masks = torch.tensor(masks)
- return inputs, masks
-
- def get_data(self, data_type):
- if data_type == "train":
- inputs, masks, labels = self.train_inputs, self.train_masks, self.train_labels
- elif data_df == "valid":
- inputs, masks, labels = self.valid_inputs, self.valid_masks, self.valid_labels
- return inputs, masks, labels
-
-
- class BertClassifier(nn.Module):
- def __init__(self, bert_model, out_dim):
- super(BertClassifier, self).__init__()
- self.bert_model = bert_model
- self.out = nn.Linear(768, out_dim)
-
- def forward(self, inputs, masks):
- _, _, _ = self.bert_model(inputs, masks)
- pooled = outputs[:, 0]
- out = self.out(pooled)
- return out
-
-
- def train_epoch(train_data, optimizer, scheduler, writer, epoch, args):
- # 训练模型
- bert_model.train()
- train_loss = 0
- num_train_data = 0
- for batch_idx, train_batch in enumerate(train_data):
- train_batch_inputs, train_batch_masks, train_batch_labels = train_batch
- train_batch_inputs, train_batch_masks, train_batch_labels = (
- train_batch_inputs.to(args.device),
- train_batch_masks.to(args.device),
- train_batch_labels.to(args.device),
- )
- optimizer.zero_grad()
- bert_out = bert_model(train_batch_inputs, train_batch_masks)
- loss = F.cross_entropy(bert_out, train_batch_labels)
- train_loss += loss.item()
- num_train_data += len(train_batch_labels)
- loss.backward()
- optimizer.step()
- if scheduler is not None:
- scheduler.step()
- writer.add_scalar("loss", loss.item(), global_step=num_train_data)
- writer.add_scalar("learning_rate", optimizer.param_groups[0]["lr"], global_step=num_train_data)
- writer.add_scalar("train_loss", train_loss / (batch_idx + 1), global_step=num_train_data)
- writer.add_scalar("train_acc", accuracy_score(train_batch_labels, np.argmax(bert_out.detach().cpu().numpy(), axis=-1)), global_step=num_train_data)
-
-
- def eval_epoch(valid_data, writer, epoch, args):
- # 验证模型
- bert_model.eval()
- valid_loss = 0
- num_valid_data = 0
- valid_preds = []
- valid_labels = []
- with torch.no_grad():
- for batch_idx, valid_batch in enumerate(valid_data):
- valid_batch_inputs, valid_batch_masks, valid_batch_labels = valid_batch
- valid_batch_inputs, valid_batch_masks, valid_batch_labels = (
- valid_batch_inputs.to(args.device),
- valid_batch_masks.to(args.device),
- valid_batch_labels.to(args.device),
- )
- bert_out = bert_model(valid_batch_inputs, valid_batch_masks)
- loss = F.cross_entropy(bert_out, valid_batch_labels)
- valid_loss += loss.item()
- num_valid_data += len(valid_batch_labels)
- valid_preds.append(bert_out.detach().cpu().numpy())
- valid_labels.append(valid_batch_labels.detach().cpu().numpy())
- valid_preds = np.concatenate(valid_preds, axis=0)
- valid_labels = np.concatenate(valid_labels, axis=0)
- valid_acc = accuracy_score(valid_labels, np.argmax(valid_preds, axis=-1))
- valid_loss = valid_loss / (batch_idx + 1)
- writer.add_scalar("valid_loss", valid_loss, global_step=epoch + 1)
- writer.add_scalar("valid_acc", valid_acc, global_step=epoch + 1)
- writer.add_scalar("valid_f1", f1_score(valid_labels, np.argmax(valid_preds, axis=-1)), global_step=epoch + 1)
-
-
- def train(args):
- # 训练模型
- writer = SummaryWriter(log_dir=os.path.join(args.log_dir, "train"))
- for epoch in trange(args.num_epochs, desc="Epoch"):
- train_epoch(
- train_data=train_data,
- optimizer=optimizer,
- scheduler=scheduler,
- writer=writer,
- epoch=epoch,
- args=args,
- )
- eval_epoch(valid_data=valid_data, writer=writer, epoch=epoch, args=args)
- bert_model.save_pretrained(os.path.join(args.log_dir, "bert_model"))
- writer.close()
-
-
- def test_epoch(test_data, writer, epoch, args):
- # 测试模型
- bert_model.eval()
- test_loss = 0
- num_test_data = 0
- test_preds = []
- test_labels = []
- with torch.no_grad():
- for batch_idx, test_batch in enumerate(test_data):
- test_batch_inputs, test_batch_masks, test_batch_labels = test_batch
- test_batch_inputs, test_batch_masks, test_batch_labels = (
- test_batch_inputs.to(args.device),
- test_batch_masks.to(args.device),
- test_batch_labels.to(args.device),
- )
- bert_out = bert_model(test_batch_inputs, test_batch_masks)
- loss = F.cross_entropy(bert_out, test_batch_labels)
- test_loss += loss.item()
- num_test_data += len(test_batch_labels)
- test_preds.append(bert_out.detach().cpu().numpy())
- test_labels.append(test_batch_labels.detach().cpu().numpy())
- test_preds = np.concatenate(test_preds, axis=0)
- test_labels = np.concatenate(test_labels, axis=0)
- test_acc = accuracy_score(test_labels, np.argmax(test_preds, axis=-1))
- test_loss = test_loss / (batch_idx + 1)
- writer.add_scalar("test_loss", test_loss, global_step=epoch + 1)
- writer.add_scalar("test_acc", test_acc, global_step=epoch + 1)
- writer.add_scalar("test_f1", f1_score(test_labels, np.argmax(test_preds, axis=-1)), global_step=epoch + 1)
-
-
- def test(args):
- writer = SummaryWriter(log_dir=os.path.join(args.log_dir, "test"))
- for epoch in trange(args.num_epochs, desc="Epoch"):
- test_epoch(test_data=test_data, writer=writer, epoch=epoch, args=args)
- writer.close()
-
-
- if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument("--data_dir", type=str, default="./data")
- parser.add_argument("--log_dir", type=str, default="./logs")
- parser.add_argument("--num_epochs", type=int, default=10)
- parser.add_argument("--train_mode", type=str, default="train")
- parser.add_argument("--max_seq_len", type=int, default=128)
- parser.add_argument("--batch_size", type=int, default=32)
- parser.add_argument("--lr", type=float, default=2e-5)
- parser.add_argument("--num_workers", type=int, default=0)
- parser.add_argument("--seed", type=int, default=42)
- parser.add_argument("--device", type=str, default="cuda")
- args = parser.parse_args()
- pprint(vars(args))
-
- bert_tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
- bert_model = BertModel.from_pretrained("bert-base-uncased")
- bert_model.to(args.device)
-
- if args.train_mode == "train":
- train_data = SST2Dataset(
- data_dir=args.data_dir,
- tokenizer=bert_tokenizer,
- max_seq_len=args.max_seq_len,
- train_mode=args.train_mode,
- ).get_data(data_type="train")
- train_data = TensorDataset(*train_data)
- train_data = DataLoader(
- train_data,
- batch_size=args.batch_size,
- shuffle=True,
- num_workers=args.num_workers,
- )
-
- valid_data = SST2Dataset(
- data_dir=args.data_dir,
- tokenizer=bert_tokenizer,
- max_seq_len=args.max_seq_len,
- train_mode=args.train_mode,
- ).get_data(data_type="valid")
- valid_data = TensorDataset(*valid_data)
- valid_data = DataLoader(
- valid_data,
- batch_size=args.batch_size,
- shuffle=False,
- num_workers=args.num_workers,
- )
-
- test_data = SST2Dataset(
- data_dir=args.data_dir,
- tokenizer=bert_tokenizer,
- max_seq_len=args.max_seq_len,
- train_mode=args.train_mode,
- ).get_data(data_type="test")
- test_data = TensorDataset(*test_data)
- test_data = DataLoader(
- test_data,
- batch_size=args.batch_size,
- shuffle=False,
- num_workers=args.num_workers,
- )
-
- optimizer = torch.optim.Adam(bert_model.parameters(), lr=args.lr)
- scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
- optimizer=optimizer, mode="min", factor=0.5, patience=2, verbose=True
- )
- train(args)
- test(args)
-
- elif args.train_mode == "test":
- test_data = SST2Dataset(
- data_dir=args.data_dir,
- tokenizer=bert_tokenizer,
- max_seq_len=args.max_seq_len,
- train_mode=args.train_mode,
- ).get_data(data_type="test")
- test_data = TensorDataset(*test_data)
- test_data = DataLoader(
- test_data,
- batch_size=args.batch_size,
- shuffle=False,
- num_workers=args.num_workers,
- )
- test(args)
prompt = "# language: Python\n# 帮忙写一个冒泡排序\n"
prompt中中文文字部分为需要实现的问题。
outputs = model.generate(inputs, max_length=256)
max_length 为设置反馈的长度,可以根据自己实际情况进行调整。